Skip to content

Commit

Permalink
[Indexer-Grpc-V2] Add status pages for DataService and GrpcManager.
Browse files Browse the repository at this point in the history
Signed-off-by: Guoteng Rao <[email protected]>
  • Loading branch information
grao1991 committed Jan 31, 2025
1 parent 00ac087 commit 13b3532
Show file tree
Hide file tree
Showing 20 changed files with 823 additions and 42 deletions.
62 changes: 30 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,11 @@ blst = "0.3.11"
# The __private_bench feature exposes the Fp12 type which we need to implement a multi-threaded multi-pairing.
blstrs = { version = "0.7.1", features = ["serde", "__private_bench"] }
bollard = "0.15"
build_html = "2.5.0"
bulletproofs = { version = "4.0.0" }
byteorder = "1.4.3"
bytes = { version = "1.4.0", features = ["serde"] }
bytesize = { version = "1.3.0" }
camino = { version = "1.1.6" }
chrono = { version = "0.4.19", features = ["clock", "serde"] }
cfg-if = "1.0.0"
Expand Down Expand Up @@ -705,6 +707,8 @@ prometheus-parse = "0.2.4"
proptest = "1.4.0"
proptest-derive = "0.4.0"
prost = { version = "0.13.4", features = ["no-recursion-limit"] }
prost-types = "0.13.3"
quanta = "0.10.1"
quick_cache = "0.5.1"
quick-junit = "0.5.0"
quote = "1.0.18"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ aptos-indexer-grpc-server-framework = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-protos = { workspace = true }
async-trait = { workspace = true }
build_html = { workspace = true }
clap = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
Expand All @@ -32,6 +33,7 @@ tonic = { workspace = true }
tonic-reflection = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
warp = { workspace = true }

[target.'cfg(unix)'.dependencies]
jemallocator = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{net::SocketAddr, sync::Arc};
use tokio::task::JoinHandle;
use tonic::{codec::CompressionEncoding, transport::Server};
use tracing::info;
use warp::{reply::Response, Rejection};

pub(crate) static LIVE_DATA_SERVICE: OnceCell<LiveDataService<'static>> = OnceCell::new();
pub(crate) static HISTORICAL_DATA_SERVICE: OnceCell<HistoricalDataService> = OnceCell::new();
Expand Down Expand Up @@ -260,4 +261,8 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
fn get_server_name(&self) -> String {
"indexer_grpc_data_service_v2".to_string()
}

async fn status_page(&self) -> Result<Response, Rejection> {
crate::status_page::status_page()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl HistoricalDataService {
});
}

pub(crate) fn get_connection_manager(&self) -> &ConnectionManager {
&self.connection_manager
}

async fn start_streaming(
&self,
id: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ mod connection_manager;
mod historical_data_service;
mod live_data_service;
mod service;
mod status_page;
#[cfg(test)]
mod test;
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ impl<'a> LiveDataService<'a> {
self.in_memory_cache.data_manager.read().await.start_version
}

pub(super) fn get_connection_manager(&self) -> &ConnectionManager {
&self.connection_manager
}

async fn start_streaming(
&'a self,
id: String,
Expand Down
128 changes: 128 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/status_page.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
config::{HISTORICAL_DATA_SERVICE, LIVE_DATA_SERVICE},
connection_manager::ConnectionManager,
};
use aptos_indexer_grpc_utils::status_page::{get_throughput_from_samples, render_status_page, Tab};
use build_html::{
Container, ContainerType, HtmlContainer, HtmlElement, HtmlTag, Table, TableCell, TableCellType,
TableRow,
};
use std::time::Duration;
use warp::{reply::Response, Rejection};

pub(crate) fn status_page() -> Result<Response, Rejection> {
let mut tabs = vec![];
// TODO(grao): Add something real.
let overview_tab_content = HtmlElement::new(HtmlTag::Div).with_raw("Welcome!").into();
tabs.push(Tab::new("Overview", overview_tab_content));
if let Some(live_data_service) = LIVE_DATA_SERVICE.get() {
let connection_manager_info =
render_connection_manager_info(live_data_service.get_connection_manager());
let cache_info = render_cache_info();
let content = HtmlElement::new(HtmlTag::Div)
.with_container(connection_manager_info)
.with_container(cache_info)
.into();
tabs.push(Tab::new("LiveDataService", content));
}

if let Some(historical_data_service) = HISTORICAL_DATA_SERVICE.get() {
let connection_manager_info =
render_connection_manager_info(historical_data_service.get_connection_manager());
let file_store_info = render_file_store_info();
let content = HtmlElement::new(HtmlTag::Div)
.with_container(connection_manager_info)
.with_container(file_store_info)
.into();
tabs.push(Tab::new("HistoricalDataService", content));
}

render_status_page(tabs)
}

fn render_connection_manager_info(connection_manager: &ConnectionManager) -> Container {
let known_latest_version = connection_manager.known_latest_version();
let active_streams = connection_manager.get_active_streams();
let active_streams_table = active_streams.into_iter().fold(
Table::new()
.with_attributes([("style", "width: 100%; border: 5px solid black;")])
.with_thead_attributes([("style", "background-color: lightcoral; color: white;")])
.with_custom_header_row(
TableRow::new()
.with_cell(TableCell::new(TableCellType::Header).with_raw("Id"))
.with_cell(TableCell::new(TableCellType::Header).with_raw("Current Version"))
.with_cell(TableCell::new(TableCellType::Header).with_raw("End Version"))
.with_cell(
TableCell::new(TableCellType::Header).with_raw("Past 10s throughput"),
)
.with_cell(
TableCell::new(TableCellType::Header).with_raw("Past 60s throughput"),
)
.with_cell(
TableCell::new(TableCellType::Header).with_raw("Past 10min throughput"),
),
),
|table, active_stream| {
table.with_custom_body_row(
TableRow::new()
.with_cell(TableCell::new(TableCellType::Data).with_raw(&active_stream.id))
.with_cell(TableCell::new(TableCellType::Data).with_raw(format!(
"{:?}",
active_stream.progress.as_ref().and_then(|progress| {
progress.samples.last().map(|sample| sample.version)
})
)))
.with_cell(
TableCell::new(TableCellType::Data).with_raw(active_stream.end_version()),
)
.with_cell(TableCell::new(TableCellType::Data).with_raw(
get_throughput_from_samples(
active_stream.progress.as_ref(),
Duration::from_secs(10),
),
))
.with_cell(TableCell::new(TableCellType::Data).with_raw(
get_throughput_from_samples(
active_stream.progress.as_ref(),
Duration::from_secs(60),
),
))
.with_cell(TableCell::new(TableCellType::Data).with_raw(
get_throughput_from_samples(
active_stream.progress.as_ref(),
Duration::from_secs(600),
),
)),
)
},
);

Container::new(ContainerType::Section)
.with_paragraph_attr("Connection Manager", [(
"style",
"font-size: 24px; font-weight: bold;",
)])
.with_paragraph(format!("Known latest version: {known_latest_version}."))
.with_paragraph_attr("Active Streams", [(
"style",
"font-size: 16px; font-weight: bold;",
)])
.with_table(active_streams_table)
}

fn render_cache_info() -> Container {
Container::new(ContainerType::Section).with_paragraph_attr("In Memory Cache", [(
"style",
"font-size: 24px; font-weight: bold;",
)])
}

fn render_file_store_info() -> Container {
Container::new(ContainerType::Section).with_paragraph_attr("File Store", [(
"style",
"font-size: 24px; font-weight: bold;",
)])
}
Loading

0 comments on commit 13b3532

Please sign in to comment.