Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Batch vote metrics (BFT-486) #157

Merged
merged 7 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 56 additions & 7 deletions node/actors/network/src/gossip/batch_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ use std::{collections::HashSet, fmt, sync::Arc};
use zksync_concurrency::sync;
use zksync_consensus_roles::attester;

use super::metrics;

#[derive(Debug, Default)]
pub(super) struct BatchUpdateStats {
num_added: usize,
weight_added: u64,
last_added: Option<attester::BatchNumber>,
}

impl BatchUpdateStats {
fn added(&mut self, number: attester::BatchNumber, weight: u64) {
self.num_added += 1;
self.weight_added += weight;
self.last_added = Some(number);
}
}

/// Represents the current state of node's knowledge about the attester votes.
///
/// Eventually this data structure will have to track voting potentially happening
Expand Down Expand Up @@ -57,13 +74,14 @@ impl BatchVotes {
/// It exits as soon as an invalid entry is found.
/// `self` might get modified even if an error is returned
/// (all entries verified so far are added).
/// Returns true iff some new entry was added.
///
/// Returns statistics about new entries added.
pub(super) fn update(
&mut self,
attesters: &attester::Committee,
data: &[Arc<attester::Signed<attester::Batch>>],
) -> anyhow::Result<bool> {
let mut changed = false;
) -> anyhow::Result<BatchUpdateStats> {
let mut stats = BatchUpdateStats::default();
pompon0 marked this conversation as resolved.
Show resolved Hide resolved

let mut done = HashSet::new();
for d in data {
Expand Down Expand Up @@ -97,10 +115,10 @@ impl BatchVotes {
d.verify()?;

self.add(d.clone(), weight);

changed = true;
stats.added(d.msg.number, weight);
}
Ok(changed)

Ok(stats)
}

/// Check if we have achieved quorum for any of the batch hashes.
Expand Down Expand Up @@ -216,16 +234,42 @@ impl BatchVotesWatch {
) -> anyhow::Result<()> {
let this = self.0.lock().await;
let mut votes = this.borrow().clone();
if votes.update(attesters, data)? {
let stats = votes.update(attesters, data)?;

if let Some(last_added) = stats.last_added {
this.send_replace(votes);

#[allow(clippy::float_arithmetic)]
let weight_added = stats.weight_added as f64 / attesters.total_weight() as f64;

metrics::BATCH_VOTES_METRICS
.last_added_vote_batch_number
.set(last_added.0);

metrics::BATCH_VOTES_METRICS
.votes_added
.inc_by(stats.num_added as u64);

metrics::BATCH_VOTES_METRICS
.weight_added
.inc_by(weight_added);
}

metrics::BATCH_VOTES_METRICS
.committee_size
.set(attesters.len());

Ok(())
}

/// Set the minimum batch number on the votes and discard old data.
pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) {
let this = self.0.lock().await;
this.send_modify(|votes| votes.set_min_batch_number(min_batch_number));

metrics::BATCH_VOTES_METRICS
.min_batch_number
.set(min_batch_number.0);
}
}

Expand All @@ -251,6 +295,11 @@ impl BatchVotesPublisher {
return Ok(());
}
let attestation = attester.sign_msg(batch);

metrics::BATCH_VOTES_METRICS
.last_signed_batch_number
.set(attestation.msg.number.0);

self.0.update(attesters, &[Arc::new(attestation)]).await
}
}
37 changes: 37 additions & 0 deletions node/actors/network/src/gossip/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/// Metrics related to the gossiping of L1 batch votes.
#[derive(Debug, vise::Metrics)]
#[metrics(prefix = "network_gossip_batch_votes")]
pub(crate) struct BatchVotesMetrics {
/// Number of members in the attester committee.
pub(crate) committee_size: vise::Gauge<usize>,

/// Number of votes added to the tally.
///
/// Its rate of change should correlate with the attester committee size,
/// save for any new joiner casting their historic votes in a burst.
pub(crate) votes_added: vise::Counter,

/// Weight of votes added to the tally normalized by the total committee weight.
///
/// Its rate of change should correlate with the attester committee weight and batch production rate,
/// that is, it should go up up by 1.0 with each new batch if everyone attests.
pub(crate) weight_added: vise::Counter<f64>,

/// The minimum batch number we still expect votes for.
///
/// This should go up as the main node indicates the finalisation of batches,
/// or as soon as batch QCs are found and persisted.
pub(crate) min_batch_number: vise::Gauge<u64>,

/// Batch number in the last vote added to the register.
///
/// This should go up as L1 batches are created, save for any temporary
/// outlier from lagging attesters or ones sending votes far in the future.
pub(crate) last_added_vote_batch_number: vise::Gauge<u64>,

/// Batch number of the last batch signed by this attester.
pub(crate) last_signed_batch_number: vise::Gauge<u64>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can postpone this, but can we have a histogram that would show the distribution of votes delay after an l1 batch was produced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds interesting, I will create a ticket for this. There are a few challenges:

  • the timestamp of the batch isn't available here
  • currently we can receive votes earlier than we have the batch itself, so we would have to store a timestamp with the vote, and the retrospectively emit negative latencies when the batch is produced on the local node

We can have some specific workarounds:

  • For example when the last_signed_batch_number above changes it can act as an approximation of when the L1 batch was produced, but it's only an approximation because it depends on the polling frequency, and also it's only available on nodes which are attesters.
  • We could observe the time elapsed between the first and last vote. For attester nodes this would indicate the speed of propagation of votes over gossip, compared to the vote produced locally, although again the local vote might come later than the first gossiped one.
  • We could implement Grzegorz's idea about only gossiping votes to nodes which have indicated they have the batch already, which would deal with the negative values; this simplifies the metrics in that we don't have to buffer timestamps and observe them the first time we learn about the batch.
  • We could observe the difference between l1_batches.created_at and l1_batches_consensus.created_at when we insert the QC, which would be the overall latency of gossiping votes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

#[vise::register]
pub(super) static BATCH_VOTES_METRICS: vise::Global<BatchVotesMetrics> = vise::Global::new();
3 changes: 2 additions & 1 deletion node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod batch_votes;
mod fetch;
mod handshake;
pub mod loadtest;
mod metrics;
mod runner;
#[cfg(test)]
mod testonly;
Expand Down Expand Up @@ -171,7 +172,7 @@ impl Network {
self.batch_store
.persist_batch_qc(ctx, qc)
.await
.wrap("queue_batch_qc")?;
.wrap("persist_batch_qc")?;

self.batch_votes
.set_min_batch_number(next_batch_number)
Expand Down
50 changes: 50 additions & 0 deletions node/libs/storage/src/batch_store/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! Storage metrics.
use std::time;

#[derive(Debug, vise::Metrics)]
#[metrics(prefix = "zksync_consensus_storage_persistent_batch_store")]
pub(super) struct PersistentBatchStore {
/// Latency of a successful `get_batch()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) batch_latency: vise::Histogram<time::Duration>,
/// Latency of a successful `earliest_batch_number_to_sign()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) earliest_batch_latency: vise::Histogram<time::Duration>,
/// Latency of a successful `get_batch_to_sign()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) batch_to_sign_latency: vise::Histogram<time::Duration>,
}

#[vise::register]
pub(super) static PERSISTENT_BATCH_STORE: vise::Global<PersistentBatchStore> = vise::Global::new();

#[derive(Debug, vise::Metrics)]
#[metrics(prefix = "zksync_consensus_storage_batch_store")]
pub(super) struct BatchStore {
/// Overall latency of a `queue_batch()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) queue_batch: vise::Histogram<time::Duration>,
/// Overall latency of a `persist_batch_qc()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) persist_batch_qc: vise::Histogram<time::Duration>,
/// Overall latency of a `wait_until_queued()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) wait_until_queued: vise::Histogram<time::Duration>,
/// Overall latency of a `wait_until_persisted()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) wait_until_persisted: vise::Histogram<time::Duration>,
/// Last persisted batch QC.
pub(super) last_persisted_batch_qc: vise::Gauge<u64>,
}

#[vise::register]
pub(super) static BATCH_STORE: vise::Global<BatchStore> = vise::Global::new();

#[derive(Debug, vise::Metrics)]
#[metrics(prefix = "zksync_consensus_storage_batch_store")]
pub(super) struct BatchStoreState {
/// BatchNumber of the next batch to queue.
pub(super) next_queued_batch: vise::Gauge<u64>,
/// BatchNumber of the next batch to persist.
pub(super) next_persisted_batch: vise::Gauge<u64>,
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{collections::VecDeque, fmt, sync::Arc};
use zksync_concurrency::{ctx, error::Wrap as _, scope, sync};
use zksync_consensus_roles::{attester, validator};

mod metrics;

/// State of the `BatchStore`: continuous range of batches.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BatchStoreState {
Expand Down Expand Up @@ -181,6 +183,12 @@ pub struct BatchStoreRunner(Arc<BatchStore>);
impl BatchStoreRunner {
/// Runs the background tasks of the BatchStore.
pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
#[vise::register]
static COLLECTOR: vise::Collector<Option<metrics::BatchStoreState>> =
vise::Collector::new();
let store_ref = Arc::downgrade(&self.0);
let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics()));

let res = scope::run!(ctx, |ctx, s| async {
let persisted = self.0.persistent.persisted();
let mut queue_next = persisted.borrow().next();
Expand Down Expand Up @@ -259,10 +267,16 @@ impl BatchStore {
return Ok(Some(batch));
}
}
self.persistent
let t = metrics::PERSISTENT_BATCH_STORE.batch_latency.start();

let batch = self
.persistent
.get_batch(ctx, number)
.await
.wrap("persistent.batch()")
.wrap("persistent.get_batch()")?;

t.observe();
Ok(batch)
}

/// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed.
Expand All @@ -278,10 +292,18 @@ impl BatchStore {
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
self.persistent
let t = metrics::PERSISTENT_BATCH_STORE
.earliest_batch_latency
.start();

let batch = self
.persistent
.earliest_batch_number_to_sign(ctx)
.await
.wrap("persistent.get_batch_to_sign()")
.wrap("persistent.get_batch_to_sign()")?;

t.observe();
Ok(batch)
}

/// Retrieve a batch to be signed.
Expand All @@ -293,10 +315,18 @@ impl BatchStore {
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::Batch>> {
self.persistent
let t = metrics::PERSISTENT_BATCH_STORE
.batch_to_sign_latency
.start();

let batch = self
.persistent
.get_batch_to_sign(ctx, number)
.await
.wrap("persistent.get_batch_to_sign()")
.wrap("persistent.get_batch_to_sign()")?;

t.observe();
Ok(batch)
}

/// Append batch to a queue to be persisted eventually.
Expand All @@ -311,6 +341,8 @@ impl BatchStore {
batch: attester::SyncBatch,
_genesis: validator::Genesis,
) -> ctx::Result<()> {
let t = metrics::BATCH_STORE.queue_batch.start();

// XXX: Once we can validate `SyncBatch::proof` we should do it before adding the
// batch to the cache, otherwise a malicious peer could serve us data that prevents
// other inputs from entering the queue. It will also cause it to be gossiped at the moment.
Expand All @@ -322,19 +354,28 @@ impl BatchStore {
self.inner
.send_if_modified(|inner| inner.try_push(batch.clone()));

t.observe();
Ok(())
}

/// Wait until the database has a batch, then attach the corresponding QC.
pub async fn persist_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> {
let t = metrics::BATCH_STORE.persist_batch_qc.start();
// The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload
// isn't yet available, but to be safe we can wait for the availability signal here as well.
sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| {
qc.message.number < persisted.next()
})
.await?;
// Now it's definitely safe to store it.
self.persistent.store_qc(ctx, qc).await
metrics::BATCH_STORE
.last_persisted_batch_qc
.set(qc.message.number.0);

self.persistent.store_qc(ctx, qc).await?;

t.observe();
Ok(())
}

/// Waits until the given batch is queued (in memory, or persisted).
Expand All @@ -344,12 +385,17 @@ impl BatchStore {
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::OrCanceled<BatchStoreState> {
Ok(sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| {
let t = metrics::BATCH_STORE.wait_until_queued.start();

let state = sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| {
number < inner.queued.next()
})
.await?
.queued
.clone())
.clone();

t.observe();
Ok(state)
}

/// Waits until the given batch is stored persistently.
Expand All @@ -359,12 +405,23 @@ impl BatchStore {
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::OrCanceled<BatchStoreState> {
Ok(
sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| {
number < persisted.next()
})
.await?
.clone(),
)
let t = metrics::BATCH_STORE.wait_until_persisted.start();

let state = sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| {
number < persisted.next()
})
.await?
.clone();

t.observe();
Ok(state)
}

fn scrape_metrics(&self) -> metrics::BatchStoreState {
let m = metrics::BatchStoreState::default();
let inner = self.inner.borrow();
m.next_queued_batch.set(inner.queued.next().0);
m.next_persisted_batch.set(inner.persisted.next().0);
m
}
}
2 changes: 1 addition & 1 deletion node/libs/storage/src/block_store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(super) static PERSISTENT_BLOCK_STORE: vise::Global<PersistentBlockStore> = v

#[derive(Debug, vise::Metrics)]
#[metrics(prefix = "zksync_consensus_storage_block_store")]
pub(super) struct BlockStore {
pub(super) struct BlockStoreState {
/// BlockNumber of the next block to queue.
pub(super) next_queued_block: vise::Gauge<u64>,
/// BlockNumber of the next block to persist.
Expand Down
Loading
Loading