Skip to content

Commit

Permalink
@peter/detailed metrics (#256)
Browse files Browse the repository at this point in the history
* add detailed metrics for storage layer
  • Loading branch information
0g-peterzhb authored Nov 13, 2024
1 parent d93f453 commit f4d5228
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 45 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions common/append_merkle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] }
lazy_static = "1.4.0"
tracing = "0.1.36"
once_cell = "1.19.0"

metrics = { workspace = true }

itertools = "0.13.0"
lru = "0.12.5"
15 changes: 15 additions & 0 deletions common/append_merkle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod merkle_tree;
mod metrics;
mod node_manager;
mod proof;
mod sha3;
Expand All @@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Instant;
use tracing::{trace, warn};

use crate::merkle_tree::MerkleTreeWrite;
Expand Down Expand Up @@ -145,17 +147,21 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}

pub fn append(&mut self, new_leaf: E) {
let start_time = Instant::now();
if new_leaf == E::null() {
// appending null is not allowed.
return;
}
self.node_manager.start_transaction();
self.node_manager.push_node(0, new_leaf);
self.recompute_after_append_leaves(self.leaves() - 1);

self.node_manager.commit();
metrics::APPEND.update_since(start_time);
}

pub fn append_list(&mut self, leaf_list: Vec<E>) {
let start_time = Instant::now();
if leaf_list.contains(&E::null()) {
// appending null is not allowed.
return;
Expand All @@ -165,6 +171,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.node_manager.append_nodes(0, &leaf_list);
self.recompute_after_append_leaves(start_index);
self.node_manager.commit();
metrics::APPEND_LIST.update_since(start_time);
}

/// Append a leaf list by providing their intermediate node hash.
Expand All @@ -173,6 +180,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// Other nodes in the subtree will be set to `null` nodes.
/// TODO: Optimize to avoid storing the `null` nodes?
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
let start_time = Instant::now();
if subtree_root == E::null() {
// appending null is not allowed.
bail!("subtree_root is null");
Expand All @@ -182,10 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.append_subtree_inner(subtree_depth, subtree_root)?;
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
self.node_manager.commit();
metrics::APPEND_SUBTREE.update_since(start_time);

Ok(())
}

pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
let start_time = Instant::now();
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
// appending null is not allowed.
bail!("subtree_list contains null");
Expand All @@ -197,12 +208,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
}
self.node_manager.commit();
metrics::APPEND_SUBTREE_LIST.update_since(start_time);

Ok(())
}

/// Change the value of the last leaf and return the new merkle root.
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
pub fn update_last(&mut self, updated_leaf: E) {
let start_time = Instant::now();
if updated_leaf == E::null() {
// updating to null is not allowed.
return;
Expand All @@ -216,6 +230,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
self.recompute_after_append_leaves(self.leaves() - 1);
self.node_manager.commit();
metrics::UPDATE_LAST.update_since(start_time);
}

/// Fill an unknown `null` leaf with its real value.
Expand Down
11 changes: 11 additions & 0 deletions common/append_merkle/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::sync::Arc;

use metrics::{register_timer, Timer};

lazy_static::lazy_static! {
pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
}
15 changes: 7 additions & 8 deletions node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::sync_manager::log_query::LogQuery;
use crate::sync_manager::RETRY_WAIT_MS;
use crate::sync_manager::{metrics, RETRY_WAIT_MS};
use crate::{ContractAddress, LogSyncConfig};
use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, Sha3Algorithm};
Expand All @@ -13,15 +13,12 @@ use jsonrpsee::tracing::{debug, error, info, warn};
use shared_types::{DataRoot, Transaction};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::TaskExecutor;
use tokio::{
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
},
time::Instant,
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
};

pub struct LogEntryFetcher {
Expand Down Expand Up @@ -242,6 +239,7 @@ impl LogEntryFetcher {
);
let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await {
let start_time = Instant::now();
match maybe_log {
Ok(log) => {
let sync_progress =
Expand Down Expand Up @@ -301,6 +299,7 @@ impl LogEntryFetcher {
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
}
}
metrics::RECOVER_LOG.update_since(start_time);
}

info!("log recover end");
Expand Down
10 changes: 8 additions & 2 deletions node/log_entry_sync/src/sync_manager/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::sync::Arc;

use metrics::{register_timer, Timer};
use metrics::{register_timer, Gauge, GaugeUsize, Timer};

lazy_static::lazy_static! {
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc<dyn Timer> = register_timer("log_manager_handle_data_transaction");

pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");

pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");

pub static ref RECOVER_LOG: Arc<dyn Timer> = register_timer("log_entry_sync_manager_recover_log");
}
62 changes: 36 additions & 26 deletions node/log_entry_sync/src/sync_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const RETRY_WAIT_MS: u64 = 500;
// Each tx has less than 10KB, so the cache size should be acceptable.
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10;
const CHECK_ROOT_INTERVAL: u64 = 500;

/// Errors while handle data
#[derive(Error, Debug)]
Expand Down Expand Up @@ -402,6 +403,7 @@ impl LogSyncManager {
}
LogFetchProgress::Transaction((tx, block_number)) => {
let mut stop = false;
let start_time = Instant::now();
match self.put_tx(tx.clone()).await {
Some(false) => stop = true,
Some(true) => {
Expand Down Expand Up @@ -435,6 +437,8 @@ impl LogSyncManager {
// no receivers will be created.
warn!("log sync broadcast error, error={:?}", e);
}

metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time);
}
LogFetchProgress::Reverted(reverted) => {
self.process_reverted(reverted).await;
Expand All @@ -447,7 +451,6 @@ impl LogSyncManager {
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
let start_time = Instant::now();
let result = self.store.put_tx(tx.clone());
metrics::STORE_PUT_TX.update_since(start_time);

if let Err(e) = result {
error!("put_tx error: e={:?}", e);
Expand Down Expand Up @@ -508,38 +511,45 @@ impl LogSyncManager {

// Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly.
let flow_contract = self.log_fetcher.flow_contract();
match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
.await
{
Ok(contract_root_bytes) => {
let contract_root = H256::from_slice(&contract_root_bytes);
// contract_root is zero for tx submitted before upgrading.
if !contract_root.is_zero() {
match self.store.get_context() {
Ok((local_root, _)) => {
if contract_root != local_root {
error!(
?contract_root,
?local_root,
"local flow root and on-chain flow root mismatch"
);
return false;
if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 {
let flow_contract = self.log_fetcher.flow_contract();

match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
.await
{
Ok(contract_root_bytes) => {
let contract_root = H256::from_slice(&contract_root_bytes);
// contract_root is zero for tx submitted before upgrading.
if !contract_root.is_zero() {
match self.store.get_context() {
Ok((local_root, _)) => {
if contract_root != local_root {
error!(
?contract_root,
?local_root,
"local flow root and on-chain flow root mismatch"
);
return false;
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
}
}
}
}
Err(e) => {
warn!(?e, "fail to read the on-chain flow root");
Err(e) => {
warn!(?e, "fail to read the on-chain flow root");
}
}
}

metrics::STORE_PUT_TX_SPEED_IN_BYTES
.update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize);
metrics::STORE_PUT_TX.update_since(start_time);

true
}
}
Expand Down
3 changes: 2 additions & 1 deletion node/storage-async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ storage = { path = "../storage" }
task_executor = { path = "../../common/task_executor" }
tokio = { version = "1.19.2", features = ["sync"] }
tracing = "0.1.35"
eth2_ssz = "0.4.0"
eth2_ssz = "0.4.0"
backtrace = "0.3"
5 changes: 5 additions & 0 deletions node/storage-async/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
extern crate tracing;

use anyhow::bail;
use backtrace::Backtrace;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
};
Expand Down Expand Up @@ -139,13 +140,17 @@ impl Store {
{
let store = self.store.clone();
let (tx, rx) = oneshot::channel();
let mut backtrace = Backtrace::new();
let frames = backtrace.frames().to_vec();
backtrace = frames.into();

self.executor.spawn_blocking(
move || {
// FIXME(zz): Not all functions need `write`. Refactor store usage.
let res = f(&*store);

if tx.send(res).is_err() {
warn!("Backtrace: {:?}", backtrace);
error!("Unable to complete async storage operation: the receiver dropped");
}
},
Expand Down
2 changes: 2 additions & 0 deletions node/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ parking_lot = "0.12.3"
serde_json = "1.0.127"
tokio = { version = "1.38.0", features = ["full"] }
task_executor = { path = "../../common/task_executor" }
lazy_static = "1.4.0"
metrics = { workspace = true }
once_cell = { version = "1.19.0", features = [] }

[dev-dependencies]
Expand Down
Loading

0 comments on commit f4d5228

Please sign in to comment.