diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs index ecff00ef..a80f0e03 100644 --- a/node/actors/network/src/gossip/batch_votes.rs +++ b/node/actors/network/src/gossip/batch_votes.rs @@ -4,6 +4,23 @@ use std::{collections::HashSet, fmt, sync::Arc}; use zksync_concurrency::sync; use zksync_consensus_roles::attester; +use super::metrics; + +#[derive(Debug, Default)] +pub(super) struct BatchUpdateStats { + num_added: usize, + weight_added: u64, + last_added: Option, +} + +impl BatchUpdateStats { + fn added(&mut self, number: attester::BatchNumber, weight: u64) { + self.num_added += 1; + self.weight_added += weight; + self.last_added = Some(number); + } +} + /// Represents the current state of node's knowledge about the attester votes. /// /// Eventually this data structure will have to track voting potentially happening @@ -57,13 +74,14 @@ impl BatchVotes { /// It exits as soon as an invalid entry is found. /// `self` might get modified even if an error is returned /// (all entries verified so far are added). - /// Returns true iff some new entry was added. + /// + /// Returns statistics about new entries added. pub(super) fn update( &mut self, attesters: &attester::Committee, data: &[Arc>], - ) -> anyhow::Result { - let mut changed = false; + ) -> anyhow::Result { + let mut stats = BatchUpdateStats::default(); let mut done = HashSet::new(); for d in data { @@ -97,10 +115,10 @@ impl BatchVotes { d.verify()?; self.add(d.clone(), weight); - - changed = true; + stats.added(d.msg.number, weight); } - Ok(changed) + + Ok(stats) } /// Check if we have achieved quorum for any of the batch hashes. @@ -216,9 +234,31 @@ impl BatchVotesWatch { ) -> anyhow::Result<()> { let this = self.0.lock().await; let mut votes = this.borrow().clone(); - if votes.update(attesters, data)? { + let stats = votes.update(attesters, data)?; + + if let Some(last_added) = stats.last_added { this.send_replace(votes); + + #[allow(clippy::float_arithmetic)] + let weight_added = stats.weight_added as f64 / attesters.total_weight() as f64; + + metrics::BATCH_VOTES_METRICS + .last_added_vote_batch_number + .set(last_added.0); + + metrics::BATCH_VOTES_METRICS + .votes_added + .inc_by(stats.num_added as u64); + + metrics::BATCH_VOTES_METRICS + .weight_added + .inc_by(weight_added); } + + metrics::BATCH_VOTES_METRICS + .committee_size + .set(attesters.len()); + Ok(()) } @@ -226,6 +266,10 @@ impl BatchVotesWatch { pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) { let this = self.0.lock().await; this.send_modify(|votes| votes.set_min_batch_number(min_batch_number)); + + metrics::BATCH_VOTES_METRICS + .min_batch_number + .set(min_batch_number.0); } } @@ -251,6 +295,11 @@ impl BatchVotesPublisher { return Ok(()); } let attestation = attester.sign_msg(batch); + + metrics::BATCH_VOTES_METRICS + .last_signed_batch_number + .set(attestation.msg.number.0); + self.0.update(attesters, &[Arc::new(attestation)]).await } } diff --git a/node/actors/network/src/gossip/metrics.rs b/node/actors/network/src/gossip/metrics.rs new file mode 100644 index 00000000..d9df9d0a --- /dev/null +++ b/node/actors/network/src/gossip/metrics.rs @@ -0,0 +1,37 @@ +/// Metrics related to the gossiping of L1 batch votes. +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "network_gossip_batch_votes")] +pub(crate) struct BatchVotesMetrics { + /// Number of members in the attester committee. + pub(crate) committee_size: vise::Gauge, + + /// Number of votes added to the tally. + /// + /// Its rate of change should correlate with the attester committee size, + /// save for any new joiner casting their historic votes in a burst. + pub(crate) votes_added: vise::Counter, + + /// Weight of votes added to the tally normalized by the total committee weight. + /// + /// Its rate of change should correlate with the attester committee weight and batch production rate, + /// that is, it should go up up by 1.0 with each new batch if everyone attests. + pub(crate) weight_added: vise::Counter, + + /// The minimum batch number we still expect votes for. + /// + /// This should go up as the main node indicates the finalisation of batches, + /// or as soon as batch QCs are found and persisted. + pub(crate) min_batch_number: vise::Gauge, + + /// Batch number in the last vote added to the register. + /// + /// This should go up as L1 batches are created, save for any temporary + /// outlier from lagging attesters or ones sending votes far in the future. + pub(crate) last_added_vote_batch_number: vise::Gauge, + + /// Batch number of the last batch signed by this attester. + pub(crate) last_signed_batch_number: vise::Gauge, +} + +#[vise::register] +pub(super) static BATCH_VOTES_METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 9010f99e..d3af5f4a 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -26,6 +26,7 @@ mod batch_votes; mod fetch; mod handshake; pub mod loadtest; +mod metrics; mod runner; #[cfg(test)] mod testonly; @@ -171,7 +172,7 @@ impl Network { self.batch_store .persist_batch_qc(ctx, qc) .await - .wrap("queue_batch_qc")?; + .wrap("persist_batch_qc")?; self.batch_votes .set_min_batch_number(next_batch_number) diff --git a/node/libs/storage/src/batch_store/metrics.rs b/node/libs/storage/src/batch_store/metrics.rs new file mode 100644 index 00000000..cabe7e34 --- /dev/null +++ b/node/libs/storage/src/batch_store/metrics.rs @@ -0,0 +1,50 @@ +//! Storage metrics. +use std::time; + +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "zksync_consensus_storage_persistent_batch_store")] +pub(super) struct PersistentBatchStore { + /// Latency of a successful `get_batch()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) batch_latency: vise::Histogram, + /// Latency of a successful `earliest_batch_number_to_sign()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) earliest_batch_latency: vise::Histogram, + /// Latency of a successful `get_batch_to_sign()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) batch_to_sign_latency: vise::Histogram, +} + +#[vise::register] +pub(super) static PERSISTENT_BATCH_STORE: vise::Global = vise::Global::new(); + +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "zksync_consensus_storage_batch_store")] +pub(super) struct BatchStore { + /// Overall latency of a `queue_batch()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) queue_batch: vise::Histogram, + /// Overall latency of a `persist_batch_qc()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) persist_batch_qc: vise::Histogram, + /// Overall latency of a `wait_until_queued()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) wait_until_queued: vise::Histogram, + /// Overall latency of a `wait_until_persisted()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) wait_until_persisted: vise::Histogram, + /// Last persisted batch QC. + pub(super) last_persisted_batch_qc: vise::Gauge, +} + +#[vise::register] +pub(super) static BATCH_STORE: vise::Global = vise::Global::new(); + +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "zksync_consensus_storage_batch_store")] +pub(super) struct BatchStoreState { + /// BatchNumber of the next batch to queue. + pub(super) next_queued_batch: vise::Gauge, + /// BatchNumber of the next batch to persist. + pub(super) next_persisted_batch: vise::Gauge, +} diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store/mod.rs similarity index 87% rename from node/libs/storage/src/batch_store.rs rename to node/libs/storage/src/batch_store/mod.rs index 29b64e3a..3b6b6fe9 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store/mod.rs @@ -4,6 +4,8 @@ use std::{collections::VecDeque, fmt, sync::Arc}; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{attester, validator}; +mod metrics; + /// State of the `BatchStore`: continuous range of batches. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BatchStoreState { @@ -181,6 +183,12 @@ pub struct BatchStoreRunner(Arc); impl BatchStoreRunner { /// Runs the background tasks of the BatchStore. pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + #[vise::register] + static COLLECTOR: vise::Collector> = + vise::Collector::new(); + let store_ref = Arc::downgrade(&self.0); + let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics())); + let res = scope::run!(ctx, |ctx, s| async { let persisted = self.0.persistent.persisted(); let mut queue_next = persisted.borrow().next(); @@ -259,10 +267,16 @@ impl BatchStore { return Ok(Some(batch)); } } - self.persistent + let t = metrics::PERSISTENT_BATCH_STORE.batch_latency.start(); + + let batch = self + .persistent .get_batch(ctx, number) .await - .wrap("persistent.batch()") + .wrap("persistent.get_batch()")?; + + t.observe(); + Ok(batch) } /// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed. @@ -278,10 +292,18 @@ impl BatchStore { &self, ctx: &ctx::Ctx, ) -> ctx::Result> { - self.persistent + let t = metrics::PERSISTENT_BATCH_STORE + .earliest_batch_latency + .start(); + + let batch = self + .persistent .earliest_batch_number_to_sign(ctx) .await - .wrap("persistent.get_batch_to_sign()") + .wrap("persistent.get_batch_to_sign()")?; + + t.observe(); + Ok(batch) } /// Retrieve a batch to be signed. @@ -293,10 +315,18 @@ impl BatchStore { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::Result> { - self.persistent + let t = metrics::PERSISTENT_BATCH_STORE + .batch_to_sign_latency + .start(); + + let batch = self + .persistent .get_batch_to_sign(ctx, number) .await - .wrap("persistent.get_batch_to_sign()") + .wrap("persistent.get_batch_to_sign()")?; + + t.observe(); + Ok(batch) } /// Append batch to a queue to be persisted eventually. @@ -311,6 +341,8 @@ impl BatchStore { batch: attester::SyncBatch, _genesis: validator::Genesis, ) -> ctx::Result<()> { + let t = metrics::BATCH_STORE.queue_batch.start(); + // XXX: Once we can validate `SyncBatch::proof` we should do it before adding the // batch to the cache, otherwise a malicious peer could serve us data that prevents // other inputs from entering the queue. It will also cause it to be gossiped at the moment. @@ -322,11 +354,13 @@ impl BatchStore { self.inner .send_if_modified(|inner| inner.try_push(batch.clone())); + t.observe(); Ok(()) } /// Wait until the database has a batch, then attach the corresponding QC. pub async fn persist_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { + let t = metrics::BATCH_STORE.persist_batch_qc.start(); // The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload // isn't yet available, but to be safe we can wait for the availability signal here as well. sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { @@ -334,7 +368,14 @@ impl BatchStore { }) .await?; // Now it's definitely safe to store it. - self.persistent.store_qc(ctx, qc).await + metrics::BATCH_STORE + .last_persisted_batch_qc + .set(qc.message.number.0); + + self.persistent.store_qc(ctx, qc).await?; + + t.observe(); + Ok(()) } /// Waits until the given batch is queued (in memory, or persisted). @@ -344,12 +385,17 @@ impl BatchStore { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::OrCanceled { - Ok(sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| { + let t = metrics::BATCH_STORE.wait_until_queued.start(); + + let state = sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| { number < inner.queued.next() }) .await? .queued - .clone()) + .clone(); + + t.observe(); + Ok(state) } /// Waits until the given batch is stored persistently. @@ -359,12 +405,23 @@ impl BatchStore { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::OrCanceled { - Ok( - sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { - number < persisted.next() - }) - .await? - .clone(), - ) + let t = metrics::BATCH_STORE.wait_until_persisted.start(); + + let state = sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { + number < persisted.next() + }) + .await? + .clone(); + + t.observe(); + Ok(state) + } + + fn scrape_metrics(&self) -> metrics::BatchStoreState { + let m = metrics::BatchStoreState::default(); + let inner = self.inner.borrow(); + m.next_queued_batch.set(inner.queued.next().0); + m.next_persisted_batch.set(inner.persisted.next().0); + m } } diff --git a/node/libs/storage/src/block_store/metrics.rs b/node/libs/storage/src/block_store/metrics.rs index 34954f69..04f87bd0 100644 --- a/node/libs/storage/src/block_store/metrics.rs +++ b/node/libs/storage/src/block_store/metrics.rs @@ -23,7 +23,7 @@ pub(super) static PERSISTENT_BLOCK_STORE: vise::Global = v #[derive(Debug, vise::Metrics)] #[metrics(prefix = "zksync_consensus_storage_block_store")] -pub(super) struct BlockStore { +pub(super) struct BlockStoreState { /// BlockNumber of the next block to queue. pub(super) next_queued_block: vise::Gauge, /// BlockNumber of the next block to persist. diff --git a/node/libs/storage/src/block_store/mod.rs b/node/libs/storage/src/block_store/mod.rs index b29a1036..a901c0fe 100644 --- a/node/libs/storage/src/block_store/mod.rs +++ b/node/libs/storage/src/block_store/mod.rs @@ -170,7 +170,8 @@ impl BlockStoreRunner { /// Runs the background tasks of the BlockStore. pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { #[vise::register] - static COLLECTOR: vise::Collector> = vise::Collector::new(); + static COLLECTOR: vise::Collector> = + vise::Collector::new(); let store_ref = Arc::downgrade(&self.0); let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics())); @@ -337,8 +338,8 @@ impl BlockStore { ) } - fn scrape_metrics(&self) -> metrics::BlockStore { - let m = metrics::BlockStore::default(); + fn scrape_metrics(&self) -> metrics::BlockStoreState { + let m = metrics::BlockStoreState::default(); let inner = self.inner.borrow(); m.next_queued_block.set(inner.queued.next().0); m.next_persisted_block.set(inner.persisted.next().0);