From ec9a9373a4826ff5ad5c199d2e7ce801c69f81c5 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Fri, 26 Jul 2024 17:07:44 +0100 Subject: [PATCH 1/9] BFT-496: Return at most one quorum for now. --- node/actors/network/src/gossip/batch_votes.rs | 21 ++++++++++-------- node/actors/network/src/gossip/mod.rs | 9 +++----- node/actors/network/src/gossip/tests/mod.rs | 22 +++++++------------ 3 files changed, 23 insertions(+), 29 deletions(-) diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs index 1f5ad613..7a9792a2 100644 --- a/node/actors/network/src/gossip/batch_votes.rs +++ b/node/actors/network/src/gossip/batch_votes.rs @@ -38,7 +38,7 @@ impl BatchUpdateStats { pub(crate) struct BatchVotes { /// The latest vote received from each attester. We only keep the last one /// for now, hoping that with 1 minute batches there's plenty of time for - /// the quorum to be reached, but eventually we'll have to allow multiple + /// the quorum to be reached, but eventually we might have to allow multiple /// votes across different heights. pub(crate) votes: im::HashMap>>, @@ -51,6 +51,10 @@ pub(crate) struct BatchVotes { im::OrdMap>, /// The minimum batch number for which we are still interested in votes. + /// + /// Because we only store 1 vote per attester the memory is very much bounded, + /// but this extra pruning mechanism can ge used to get rid of votes of attesters + /// who rotated out of the committee (which currently requires a re-genesis, but still). pub(crate) min_batch_number: attester::BatchNumber, } @@ -131,20 +135,19 @@ impl BatchVotes { /// Check if we have achieved quorum for any of the batch hashes. /// - /// The return value is a vector because eventually we will be potentially waiting for - /// quorums on multiple heights simultaneously. + /// Returns the first quorum it finds, after which we expect that the state of the main node or L1 + /// will indicate that attestation on the next height can happen, which will either naturally move + /// the QC, or we can do so by increasing the `min_batch_number`. /// - /// For repeated queries we can supply a skip list of heights for which we already saved the QC. - pub(super) fn find_quorums( + /// While we only store 1 vote per attester we'll only ever have at most one quorum anyway. + pub(super) fn find_quorum( &self, attesters: &attester::Committee, genesis: &attester::GenesisHash, - skip: impl Fn(attester::BatchNumber) -> bool, - ) -> Vec { + ) -> Option { let threshold = attesters.threshold(); self.support .iter() - .filter(|(number, _)| !skip(**number)) .flat_map(|(number, candidates)| { candidates .iter() @@ -170,7 +173,7 @@ impl BatchVotes { } }) }) - .collect() + .next() } /// Set the minimum batch number for which we admit votes. diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index f5afd92b..8b0bef57 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -157,15 +157,12 @@ impl Network { let genesis = self.genesis().hash(); let mut sub = self.batch_votes.subscribe(); loop { - // In the future when we might be gossiping about multiple batches at the same time, - // we can collect the ones we submitted into a skip list until we see them confirmed - // on L1 and we can finally increase the minimum as well. - let quorums = { + let quorum_opt = { let votes = sync::changed(ctx, &mut sub).await?; - votes.find_quorums(attesters, &genesis, |_| false) + votes.find_quorum(attesters, &genesis) }; - for qc in quorums { + if let Some(qc) = quorum_opt { // In the future this should come from confirmations, but for now it's best effort, so we can forget ASAP. // TODO: An initial value could be looked up in the database even now. let next_batch_number = qc.message.number.next(); diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/actors/network/src/gossip/tests/mod.rs index 6811071f..17bc6c19 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/actors/network/src/gossip/tests/mod.rs @@ -673,27 +673,21 @@ fn test_batch_votes_quorum() { // Check that as soon as we have quorum it's found. if batches[b].1 >= attesters.threshold() { - let qs = votes.find_quorums(&attesters, &genesis, |_| false); - assert!(!qs.is_empty(), "should find quorum"); - assert!(qs[0].message == *batch); - assert!(qs[0].signatures.keys().count() > 0); + let qc = votes + .find_quorum(&attesters, &genesis) + .expect("should find quorum"); + assert!(qc.message == *batch); + assert!(qc.signatures.keys().count() > 0); } } - if let Some(quorum) = batches + if batches .iter() .find(|b| b.1 >= attesters.threshold()) - .map(|(b, _)| b) + .is_none() { - // Check that a quorum can be skipped - assert!(votes - .find_quorums(&attesters, &genesis, |b| b == quorum.number) - .is_empty()); - } else { // Check that if there was no quoroum then we don't find any. - assert!(votes - .find_quorums(&attesters, &genesis, |_| false) - .is_empty()); + assert!(votes.find_quorum(&attesters, &genesis).is_none()); } // Check that the minimum batch number prunes data. From 2821962f720f78775d3555d98d558b07939645ee Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 29 Jul 2024 17:00:36 +0100 Subject: [PATCH 2/9] BFT-496: Add AttestationStatus and related structs --- node/Cargo.lock | 1 + node/actors/executor/Cargo.toml | 1 + node/actors/executor/src/attestation.rs | 53 ++++----- node/actors/executor/src/lib.rs | 6 +- node/actors/executor/src/tests.rs | 17 ++- .../network/src/gossip/attestation_status.rs | 85 +++++++++++++ node/actors/network/src/gossip/batch_votes.rs | 5 +- node/actors/network/src/gossip/mod.rs | 112 ++++++++++++++---- node/actors/network/src/gossip/tests/mod.rs | 12 +- node/actors/network/src/lib.rs | 19 ++- node/actors/network/src/testonly.rs | 12 +- node/libs/storage/src/batch_store/metrics.rs | 4 +- node/libs/storage/src/batch_store/mod.rs | 32 ++--- node/libs/storage/src/testonly/in_memory.rs | 2 +- node/tools/src/config.rs | 8 +- 15 files changed, 277 insertions(+), 92 deletions(-) create mode 100644 node/actors/network/src/gossip/attestation_status.rs diff --git a/node/Cargo.lock b/node/Cargo.lock index 22ab5e9e..3ea3e4a4 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -4127,6 +4127,7 @@ name = "zksync_consensus_executor" version = "0.1.0-rc.4" dependencies = [ "anyhow", + "async-trait", "rand 0.8.5", "test-casing", "tokio", diff --git a/node/actors/executor/Cargo.toml b/node/actors/executor/Cargo.toml index dc6b5193..56696efc 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/actors/executor/Cargo.toml @@ -20,6 +20,7 @@ zksync_consensus_utils.workspace = true zksync_protobuf.workspace = true anyhow.workspace = true +async-trait.workspace = true rand.workspace = true tracing.workspace = true vise.workspace = true diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 3a5146fc..80f3de3b 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -3,8 +3,8 @@ use crate::Attester; use anyhow::Context; use std::sync::Arc; -use zksync_concurrency::{ctx, time}; -use zksync_consensus_network::gossip::BatchVotesPublisher; +use zksync_concurrency::{ctx, sync, time}; +use zksync_consensus_network::gossip::{AttestationStatusReceiver, BatchVotesPublisher}; use zksync_consensus_roles::attester; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -16,6 +16,7 @@ pub(super) struct AttesterRunner { batch_store: Arc, attester: Attester, publisher: BatchVotesPublisher, + status: AttestationStatusReceiver, } impl AttesterRunner { @@ -25,16 +26,18 @@ impl AttesterRunner { batch_store: Arc, attester: Attester, publisher: BatchVotesPublisher, + status: AttestationStatusReceiver, ) -> Self { Self { block_store, batch_store, attester, publisher, + status, } } /// Poll the database for new L1 batches and publish our signature over the batch. - pub(super) async fn run(self, ctx: &ctx::Ctx) -> ctx::Result<()> { + pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { let public_key = self.attester.key.public(); // TODO: In the future when we have attester rotation these checks will have to be checked inside the loop. let Some(attesters) = self.block_store.genesis().attesters.as_ref() else { @@ -48,28 +51,26 @@ impl AttesterRunner { let genesis = self.block_store.genesis().hash(); - // Find the initial range of batches that we want to (re)sign after a (re)start. - let last_batch_number = self - .batch_store - .wait_until_persisted(ctx, attester::BatchNumber(0)) - .await - .context("wait_until_persisted")? - .last - .unwrap_or_default(); + let mut prev = None; - // Determine the batch to start signing from. - let earliest_batch_number = self - .batch_store - .earliest_batch_number_to_sign(ctx) - .await - .context("earliest_batch_number_to_sign")? - .unwrap_or(last_batch_number); + loop { + let batch_number = + sync::wait_for_some(ctx, &mut self.status, |s| match s.next_batch_to_attest { + next if next == prev => None, + next => next, + }) + .await?; - tracing::info!(%earliest_batch_number, %last_batch_number, "attesting batches"); + tracing::info!(%batch_number, "attestation status"); - let mut batch_number = earliest_batch_number; + // We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence + // to be indicated in memory (which itself relies on polling). This happens once we have the commitment, + // which for nodes that get the blocks through BFT should happen after execution. Nodes which + // rely on batch sync don't participate in attestations as they need the batch on L1 first. + self.batch_store + .wait_until_persisted(ctx, batch_number) + .await?; - loop { // Try to get the next batch to sign; the commitment might not be available just yet. let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?; @@ -85,15 +86,7 @@ impl AttesterRunner { .await .context("publish")?; - batch_number = batch_number.next(); - - // We can avoid actively polling the database by waiting for the next persisted batch to appear - // in the memory (which itself relies on polling). This happens once we have the commitment, - // which for nodes that get the blocks through BFT should happen after execution. Nodes which - // rely on batch sync don't participate in attestations as they need the batch on L1 first. - self.batch_store - .wait_until_persisted(ctx, batch_number) - .await?; + prev = Some(batch_number); } } diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index f0475687..c094e429 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -9,7 +9,7 @@ use std::{ }; use zksync_concurrency::{ctx, limiter, net, scope, time}; use zksync_consensus_bft as bft; -use zksync_consensus_network as network; +use zksync_consensus_network::{self as network, gossip::AttestationStatusClient}; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; @@ -97,6 +97,8 @@ pub struct Executor { pub validator: Option, /// Validator-specific node data. pub attester: Option, + /// Client to use to poll attestation status: either through the main node API or the DB. + pub attestation_status_client: Box, } impl Executor { @@ -138,6 +140,7 @@ impl Executor { self.block_store.clone(), self.batch_store.clone(), network_actor_pipe, + self.attestation_status_client, ); net.register_metrics(); s.spawn(async { runner.run(ctx).await.context("Network stopped") }); @@ -149,6 +152,7 @@ impl Executor { self.batch_store.clone(), attester, net.batch_vote_publisher(), + net.attestation_status_receiver(), ); s.spawn(async { runner.run(ctx).await?; diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 1c8c9f41..22ef5a84 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -4,7 +4,10 @@ use rand::Rng as _; use tracing::Instrument as _; use zksync_concurrency::testonly::abort_on_panic; use zksync_consensus_bft as bft; -use zksync_consensus_network::testonly::{new_configs, new_fullnode}; +use zksync_consensus_network::{ + gossip::LocalAttestationStatus, + testonly::{new_configs, new_fullnode}, +}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; use zksync_consensus_storage::{ testonly::{in_memory, TestMemoryStorage}, @@ -26,12 +29,21 @@ fn config(cfg: &network::Config) -> Config { } } +/// The test executors below are not running with attesters, so it doesn't matter if the clients +/// are returning views based on the store of main node or each to their own. For simplicity this +/// returns an implementation that queries the local store of each instance. Alternatively we +/// could implement an instance that never queries anything. +fn mk_attestation_status_client(batch_store: &Arc) -> impl AttestationStatusClient { + LocalAttestationStatus::new(batch_store.clone()) +} + fn validator( cfg: &network::Config, block_store: Arc, batch_store: Arc, replica_store: impl ReplicaStore, ) -> Executor { + let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store)); Executor { config: config(cfg), block_store, @@ -42,6 +54,7 @@ fn validator( payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), attester: None, + attestation_status_client, } } @@ -50,12 +63,14 @@ fn fullnode( block_store: Arc, batch_store: Arc, ) -> Executor { + let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store)); Executor { config: config(cfg), block_store, batch_store, validator: None, attester: None, + attestation_status_client, } } diff --git a/node/actors/network/src/gossip/attestation_status.rs b/node/actors/network/src/gossip/attestation_status.rs new file mode 100644 index 00000000..3b84cd9a --- /dev/null +++ b/node/actors/network/src/gossip/attestation_status.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; + +use zksync_concurrency::{ctx, sync}; +use zksync_consensus_roles::attester; +use zksync_consensus_storage::BatchStore; + +use crate::watch::Watch; + +/// An interface which is used by attesters and nodes collecting votes over gossip to determine +/// which is the next batch they are all supposed to be voting on, according to the main node. +#[async_trait::async_trait] +pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync { + /// Get the next batch number for which the main node expects a batch QC to be formed. + /// + /// The API might return an error while genesis is being created, which we represent with `None` + /// here and mean that we'll have to try again later. + async fn next_batch_to_attest( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result>; +} + +/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. +#[derive(Debug, Clone)] +pub struct LocalAttestationStatus(Arc); + +impl LocalAttestationStatus { + /// Create local attestation client form a [BatchStore]. + pub fn new(store: Arc) -> Self { + Self(store) + } +} + +#[async_trait::async_trait] +impl AttestationStatusClient for LocalAttestationStatus { + async fn next_batch_to_attest( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + self.0.next_batch_to_attest(ctx).await + } +} + +/// Coordinate the attestation by showing the status as seen by the main node. +pub struct AttestationStatus { + /// Next batch number where voting is expected. + /// + /// Its value is `None` until the background process polling the main node + /// can establish a value to start from. + pub next_batch_to_attest: Option, +} + +/// The subscription over the attestation status which votes can monitor for change. +pub type AttestationStatusReceiver = sync::watch::Receiver; + +/// A [Watch] over an [AttestationStatus] which we can use to notify components about +/// changes in the batch number the main node expects attesters to vote on. +pub(crate) struct AttestationStatusWatch(Watch); + +impl Default for AttestationStatusWatch { + fn default() -> Self { + Self(Watch::new(AttestationStatus { + next_batch_to_attest: None, + })) + } +} + +impl AttestationStatusWatch { + /// Subscribes to AttestationStatus updates. + pub(crate) fn subscribe(&self) -> AttestationStatusReceiver { + self.0.subscribe() + } + + /// Set the next batch number to attest on and notify subscribers it changed. + pub(crate) async fn update(&self, next_batch_to_attest: attester::BatchNumber) { + let this = self.0.lock().await; + this.send_if_modified(|status| { + if status.next_batch_to_attest == Some(next_batch_to_attest) { + return false; + } + status.next_batch_to_attest = Some(next_batch_to_attest); + true + }); + } +} diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs index 7a9792a2..1ea6b8cb 100644 --- a/node/actors/network/src/gossip/batch_votes.rs +++ b/node/actors/network/src/gossip/batch_votes.rs @@ -53,8 +53,9 @@ pub(crate) struct BatchVotes { /// The minimum batch number for which we are still interested in votes. /// /// Because we only store 1 vote per attester the memory is very much bounded, - /// but this extra pruning mechanism can ge used to get rid of votes of attesters - /// who rotated out of the committee (which currently requires a re-genesis, but still). + /// but this extra pruning mechanism can be used to clear votes of attesters + /// who have been removed from the committee, as well as to get rid of the + /// last quorum we found and stored, and look for the a new one in the next round. pub(crate) min_batch_number: attester::BatchNumber, } diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 8b0bef57..3aa5acc1 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -12,16 +12,22 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). +pub use self::attestation_status::{ + AttestationStatusClient, AttestationStatusReceiver, LocalAttestationStatus, +}; pub use self::batch_votes::BatchVotesPublisher; use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; +use attestation_status::AttestationStatusWatch; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; pub(crate) use validator_addrs::*; +use zksync_concurrency::time::Duration; use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; +mod attestation_status; mod batch_votes; mod fetch; mod handshake; @@ -58,6 +64,10 @@ pub(crate) struct Network { pub(crate) fetch_queue: fetch::Queue, /// TESTONLY: how many time push_validator_addrs rpc was called by the peers. pub(crate) push_validator_addrs_calls: AtomicUsize, + /// Shared watch over the current attestation status as indicated by the main node. + pub(crate) attestation_status: Arc, + /// Client to use to check the current attestation status on the main node. + pub(crate) attestation_status_client: Box, } impl Network { @@ -67,6 +77,7 @@ impl Network { block_store: Arc, batch_store: Arc, sender: channel::UnboundedSender, + attestation_status_client: Box, ) -> Arc { Arc::new(Self { sender, @@ -82,6 +93,8 @@ impl Network { block_store, batch_store, push_validator_addrs_calls: 0.into(), + attestation_status: Arc::new(AttestationStatusWatch::default()), + attestation_status_client, }) } @@ -148,34 +161,91 @@ impl Network { .await; } - /// Task that keeps hearing about new votes and looks for an L1 batch qc. - /// It will propagate the QC if there's enough votes. + /// Task that reacts to new votes being added and looks for an L1 batch QC. + /// It persists the certificate once the quorum threshold is passed. pub(crate) async fn run_batch_qc_finder(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { let Some(attesters) = self.genesis().attesters.as_ref() else { + tracing::info!("no attesters in genesis, not looking for batch QCs"); return Ok(()); }; let genesis = self.genesis().hash(); - let mut sub = self.batch_votes.subscribe(); + + let mut recv_votes = self.batch_votes.subscribe(); + let mut recv_status = self.attestation_status.subscribe(); + + let mut prev_batch_number = None; + + 'status: loop { + // Wait until the status indicates that we're ready to sign the next batch. + // This is not strictly necessary but avoids repeatedly finding the same quorum, or having to skip it until it changes. + let next_batch_number = + sync::wait_for_some(ctx, &mut recv_status, |s| match s.next_batch_to_attest { + next if next == prev_batch_number => None, + next => next, + }) + .await?; + + // Next time we'll look for something new. + prev_batch_number = Some(next_batch_number); + + // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. + self.batch_votes + .set_min_batch_number(next_batch_number) + .await; + + // Now wait until we find the next quorum, whatever it is: + // * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps + // * on external nodes the votes might be affected by changes in the value returned by the API, and there might be gaps + // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters + // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. + // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. + loop { + let quorum_opt = { + let votes = sync::changed(ctx, &mut recv_votes).await?; + votes.find_quorum(attesters, &genesis) + }; + + if let Some(qc) = quorum_opt { + self.batch_store + .persist_batch_qc(ctx, qc) + .await + .wrap("persist_batch_qc")?; + + continue 'status; + } + } + } + } + + /// Poll the attestation status and update the watch. + pub(crate) async fn run_attestation_client(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { + if self.genesis().attesters.is_none() { + tracing::info!("no attesters in genesis, not polling the attestation status"); + return Ok(()); + }; + + const POLL_INTERVAL: Duration = Duration::seconds(5); + loop { - let quorum_opt = { - let votes = sync::changed(ctx, &mut sub).await?; - votes.find_quorum(attesters, &genesis) - }; - - if let Some(qc) = quorum_opt { - // In the future this should come from confirmations, but for now it's best effort, so we can forget ASAP. - // TODO: An initial value could be looked up in the database even now. - let next_batch_number = qc.message.number.next(); - - self.batch_store - .persist_batch_qc(ctx, qc) - .await - .wrap("persist_batch_qc")?; - - self.batch_votes - .set_min_batch_number(next_batch_number) - .await; + match self + .attestation_status_client + .next_batch_to_attest(ctx) + .await + { + Ok(Some(batch_number)) => { + self.attestation_status.update(batch_number).await; + // We could also update the minimum batch number here, which might + // help mitigate the problem of missing a vote if the batch number + // happened to decrease. But we decided to fix it at the source, + // so the only place that is adjusted is before looking for a QC. + } + Ok(None) => tracing::debug!("waiting for attestation status..."), + Err(error) => tracing::error!( + ?error, + "failed to poll attestation status, retrying later..." + ), } + ctx.sleep(POLL_INTERVAL).await?; } } } diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/actors/network/src/gossip/tests/mod.rs index 17bc6c19..e0f5f3c2 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/actors/network/src/gossip/tests/mod.rs @@ -643,8 +643,8 @@ fn test_batch_votes_quorum() { let rng = &mut ctx::test_root(&ctx::RealClock).rng(); for _ in 0..10 { - let size = rng.gen_range(1..20); - let keys: Vec = (0..size).map(|_| rng.gen()).collect(); + let committee_size = rng.gen_range(1..20); + let keys: Vec = (0..committee_size).map(|_| rng.gen()).collect(); let attesters = attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { key: k.public(), weight: rng.gen_range(1..=100), @@ -681,12 +681,8 @@ fn test_batch_votes_quorum() { } } - if batches - .iter() - .find(|b| b.1 >= attesters.threshold()) - .is_none() - { - // Check that if there was no quoroum then we don't find any. + // Check that if there was no quoroum then we don't find any. + if !batches.iter().any(|b| b.1 >= attesters.threshold()) { assert!(votes.find_quorum(&attesters, &genesis).is_none()); } diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 1409b087..6bc868f1 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -1,6 +1,6 @@ //! Network actor maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; -use gossip::BatchVotesPublisher; +use gossip::{AttestationStatusClient, AttestationStatusReceiver, BatchVotesPublisher}; use std::sync::Arc; use tracing::Instrument as _; use zksync_concurrency::{ @@ -57,8 +57,15 @@ impl Network { block_store: Arc, batch_store: Arc, pipe: ActorPipe, + attestation_status_client: Box, ) -> (Arc, Runner) { - let gossip = gossip::Network::new(cfg, block_store, batch_store, pipe.send); + let gossip = gossip::Network::new( + cfg, + block_store, + batch_store, + pipe.send, + attestation_status_client, + ); let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( @@ -80,6 +87,11 @@ impl Network { BatchVotesPublisher(self.gossip.batch_votes.clone()) } + /// Subscribe to attestation status change notifications. + pub fn attestation_status_receiver(&self) -> AttestationStatusReceiver { + self.gossip.attestation_status.subscribe() + } + /// Handles a dispatcher message. async fn handle_message( &self, @@ -134,6 +146,9 @@ impl Runner { // Update QC batches in the background. s.spawn(self.net.gossip.run_batch_qc_finder(ctx)); + // Update attestation status in the background. + s.spawn(self.net.gossip.run_attestation_client(ctx)); + // Fetch missing batches in the background. s.spawn(async { self.net.gossip.run_batch_fetcher(ctx).await; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index dfc28dd5..382a24f0 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -1,6 +1,7 @@ //! Testonly utilities. #![allow(dead_code)] use crate::{ + gossip::LocalAttestationStatus, io::{ConsensusInputMessage, Target}, Config, GossipConfig, Network, RpcConfig, Runner, }; @@ -181,8 +182,17 @@ impl Instance { block_store: Arc, batch_store: Arc, ) -> (Self, InstanceRunner) { + // For now let everyone cast their votes on batches based on their own database. + let attestation_status_client = Box::new(LocalAttestationStatus::new(batch_store.clone())); + let (actor_pipe, dispatcher_pipe) = pipe::new(); - let (net, runner) = Network::new(cfg, block_store, batch_store, actor_pipe); + let (net, runner) = Network::new( + cfg, + block_store, + batch_store, + actor_pipe, + attestation_status_client, + ); let (terminate_send, terminate_recv) = channel::bounded(1); ( Self { diff --git a/node/libs/storage/src/batch_store/metrics.rs b/node/libs/storage/src/batch_store/metrics.rs index cabe7e34..deddc351 100644 --- a/node/libs/storage/src/batch_store/metrics.rs +++ b/node/libs/storage/src/batch_store/metrics.rs @@ -7,9 +7,9 @@ pub(super) struct PersistentBatchStore { /// Latency of a successful `get_batch()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) batch_latency: vise::Histogram, - /// Latency of a successful `earliest_batch_number_to_sign()` call. + /// Latency of a successful `next_batch_to_attest_latency()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] - pub(super) earliest_batch_latency: vise::Histogram, + pub(super) next_batch_to_attest_latency: vise::Histogram, /// Latency of a successful `get_batch_to_sign()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) batch_to_sign_latency: vise::Histogram, diff --git a/node/libs/storage/src/batch_store/mod.rs b/node/libs/storage/src/batch_store/mod.rs index 3b6b6fe9..889014f1 100644 --- a/node/libs/storage/src/batch_store/mod.rs +++ b/node/libs/storage/src/batch_store/mod.rs @@ -53,16 +53,12 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { /// Range of batches persisted in storage. fn persisted(&self) -> sync::watch::Receiver; - /// Get the earliest of L1 batches which are missing the corresponding L1 batch quorum certificates - /// and potentially need to be signed by attesters. + /// Get the next L1 batch for which attesters are expected to produce a quorum certificate. /// - /// A replica might never have a complete history of L1 batch QCs; once the L1 batch is included on L1, - /// the replicas might use the [attester::SyncBatch] route to obtain them, in which case they will not - /// have a QC and no reason to get them either. The store will have sufficient information to decide - /// where it's still necessary to gossip votes; for example the main node will want to have a QC on - /// every batch while it's the one submitting them to L1, while replicas can ask the L1 what is considered - /// final. - async fn earliest_batch_number_to_sign( + /// An external node might never have a complete history of L1 batch QCs. Once the L1 batch is included on L1, + /// the external nodes might use the [attester::SyncBatch] route to obtain them, in which case they will not + /// have a QC and no reason to get them either. The main node, however, will want to have a QC for all batches. + async fn next_batch_to_attest( &self, ctx: &ctx::Ctx, ) -> ctx::Result>; @@ -279,28 +275,20 @@ impl BatchStore { Ok(batch) } - /// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed. - /// - /// There might be unsigned batches before this one in the database, however we don't consider it - /// necessary to sign them any more, because for example they have already been submitted to L1. - /// - /// There might also be signed batches *after* this one, due to the way gossiping works, but we - /// might still have to fill the gaps by (re)submitting our signature to allow them to be submitted. - /// - /// Returns `None` if all existing batches are signed, or there are not batches yet to be signed at all. - pub async fn earliest_batch_number_to_sign( + /// Retrieve the next batch number that doesn't have a QC yet and will need to be signed. + pub async fn next_batch_to_attest( &self, ctx: &ctx::Ctx, ) -> ctx::Result> { let t = metrics::PERSISTENT_BATCH_STORE - .earliest_batch_latency + .next_batch_to_attest_latency .start(); let batch = self .persistent - .earliest_batch_number_to_sign(ctx) + .next_batch_to_attest(ctx) .await - .wrap("persistent.get_batch_to_sign()")?; + .wrap("persistent.next_batch_to_attest()")?; t.observe(); Ok(batch) diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 084da660..79a21207 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -139,7 +139,7 @@ impl PersistentBatchStore for BatchStore { Ok(certs.get(last_batch_number).cloned()) } - async fn earliest_batch_number_to_sign( + async fn next_batch_to_attest( &self, _ctx: &ctx::Ctx, ) -> ctx::Result> { diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 221baf04..fd56a421 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -13,7 +13,7 @@ use zksync_concurrency::{ctx, net}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; -use zksync_consensus_network::http; +use zksync_consensus_network::{gossip::LocalAttestationStatus, http}; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner}; use zksync_consensus_utils::debug_page; @@ -262,6 +262,11 @@ impl Configs { ) -> ctx::Result<(executor::Executor, TestMemoryStorageRunner)> { let replica_store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?; let store = TestMemoryStorage::new(ctx, &self.app.genesis).await; + + // We don't have an implementation of an API in these servers, we can only use the stores. + let attestation_status_client = + Box::new(LocalAttestationStatus::new(store.batches.clone())); + let e = executor::Executor { config: executor::Config { server_addr: self.app.server_addr, @@ -302,6 +307,7 @@ impl Configs { .attester_key .as_ref() .map(|key| executor::Attester { key: key.clone() }), + attestation_status_client, }; Ok((e, store.runner)) } From 0273fbc87ce26e4020c1c394741713fbda48aec5 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 30 Jul 2024 09:35:11 +0100 Subject: [PATCH 3/9] BFT-496: Use wait_for_some --- node/actors/network/src/gossip/mod.rs | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 3aa5acc1..457b7ae4 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -175,7 +175,7 @@ impl Network { let mut prev_batch_number = None; - 'status: loop { + loop { // Wait until the status indicates that we're ready to sign the next batch. // This is not strictly necessary but avoids repeatedly finding the same quorum, or having to skip it until it changes. let next_batch_number = @@ -199,21 +199,15 @@ impl Network { // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. - loop { - let quorum_opt = { - let votes = sync::changed(ctx, &mut recv_votes).await?; - votes.find_quorum(attesters, &genesis) - }; - - if let Some(qc) = quorum_opt { - self.batch_store - .persist_batch_qc(ctx, qc) - .await - .wrap("persist_batch_qc")?; - - continue 'status; - } - } + let quorum = sync::wait_for_some(ctx, &mut recv_votes, |votes| { + votes.find_quorum(attesters, &genesis) + }) + .await?; + + self.batch_store + .persist_batch_qc(ctx, qc) + .await + .wrap("persist_batch_qc")?; } } From 99ebe33c3868b710a22e78838b2fec9e43d36fd0 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 30 Jul 2024 09:36:57 +0100 Subject: [PATCH 4/9] BFT-496: Use qualified time::Duration --- node/actors/network/src/gossip/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 457b7ae4..1b5639be 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -22,7 +22,7 @@ use attestation_status::AttestationStatusWatch; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; pub(crate) use validator_addrs::*; -use zksync_concurrency::time::Duration; +use zksync_concurrency::time; use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -199,7 +199,7 @@ impl Network { // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. - let quorum = sync::wait_for_some(ctx, &mut recv_votes, |votes| { + let qc = sync::wait_for_some(ctx, &mut recv_votes, |votes| { votes.find_quorum(attesters, &genesis) }) .await?; @@ -218,7 +218,7 @@ impl Network { return Ok(()); }; - const POLL_INTERVAL: Duration = Duration::seconds(5); + const POLL_INTERVAL: time::Duration = time::Duration::seconds(5); loop { match self From 40197f16f7da123d951b1f611c1233043f7686ab Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 30 Jul 2024 13:57:46 +0100 Subject: [PATCH 5/9] BFT-496: Move the polling of the API into a runner created where the storages are --- node/actors/bft/src/testonly/run.rs | 4 +- node/actors/executor/src/lib.rs | 8 +- node/actors/executor/src/tests.rs | 21 ++-- node/actors/network/src/consensus/tests.rs | 2 + .../network/src/gossip/attestation_status.rs | 114 +++++++++++++----- node/actors/network/src/gossip/mod.rs | 44 +------ node/actors/network/src/lib.rs | 16 +-- node/actors/network/src/testonly.rs | 38 ++++-- node/tools/src/config.rs | 43 +++++-- 9 files changed, 174 insertions(+), 116 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 9f907f14..bc11e121 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -14,7 +14,7 @@ use zksync_concurrency::{ }, oneshot, scope, }; -use zksync_consensus_network as network; +use zksync_consensus_network::{self as network}; use zksync_consensus_roles::validator; use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStore}; use zksync_consensus_utils::pipe; @@ -135,9 +135,11 @@ impl Test { for (i, net) in nets.into_iter().enumerate() { let store = TestMemoryStorage::new(ctx, genesis).await; s.spawn_bg(async { Ok(store.runner.run(ctx).await?) }); + if self.nodes[i].0 == Behavior::Honest { honest.push(store.blocks.clone()); } + nodes.push(Node { net, behavior: self.nodes[i].0, diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index c094e429..844198ad 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -9,7 +9,7 @@ use std::{ }; use zksync_concurrency::{ctx, limiter, net, scope, time}; use zksync_consensus_bft as bft; -use zksync_consensus_network::{self as network, gossip::AttestationStatusClient}; +use zksync_consensus_network::{self as network, gossip::AttestationStatusWatch}; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; @@ -97,8 +97,8 @@ pub struct Executor { pub validator: Option, /// Validator-specific node data. pub attester: Option, - /// Client to use to poll attestation status: either through the main node API or the DB. - pub attestation_status_client: Box, + /// Status showing where the main node wants attester to cast their votes. + pub attestation_status: Arc, } impl Executor { @@ -140,7 +140,7 @@ impl Executor { self.block_store.clone(), self.batch_store.clone(), network_actor_pipe, - self.attestation_status_client, + self.attestation_status.clone(), ); net.register_metrics(); s.spawn(async { runner.run(ctx).await.context("Network stopped") }); diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 22ef5a84..491c19b3 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -4,10 +4,7 @@ use rand::Rng as _; use tracing::Instrument as _; use zksync_concurrency::testonly::abort_on_panic; use zksync_consensus_bft as bft; -use zksync_consensus_network::{ - gossip::LocalAttestationStatus, - testonly::{new_configs, new_fullnode}, -}; +use zksync_consensus_network::testonly::{new_configs, new_fullnode}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; use zksync_consensus_storage::{ testonly::{in_memory, TestMemoryStorage}, @@ -29,12 +26,10 @@ fn config(cfg: &network::Config) -> Config { } } -/// The test executors below are not running with attesters, so it doesn't matter if the clients -/// are returning views based on the store of main node or each to their own. For simplicity this -/// returns an implementation that queries the local store of each instance. Alternatively we -/// could implement an instance that never queries anything. -fn mk_attestation_status_client(batch_store: &Arc) -> impl AttestationStatusClient { - LocalAttestationStatus::new(batch_store.clone()) +/// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch] +/// that will never be updated. +fn never_attest() -> Arc { + Arc::new(AttestationStatusWatch::default()) } fn validator( @@ -43,7 +38,6 @@ fn validator( batch_store: Arc, replica_store: impl ReplicaStore, ) -> Executor { - let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store)); Executor { config: config(cfg), block_store, @@ -54,7 +48,7 @@ fn validator( payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), attester: None, - attestation_status_client, + attestation_status: never_attest(), } } @@ -63,14 +57,13 @@ fn fullnode( block_store: Arc, batch_store: Arc, ) -> Executor { - let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store)); Executor { config: config(cfg), block_store, batch_store, validator: None, attester: None, - attestation_status_client, + attestation_status: never_attest(), } } diff --git a/node/actors/network/src/consensus/tests.rs b/node/actors/network/src/consensus/tests.rs index 60049757..052aea13 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/actors/network/src/consensus/tests.rs @@ -256,9 +256,11 @@ async fn test_address_change() { // should get reconstructed. cfgs[0].server_addr = net::tcp::testonly::reserve_listener(); cfgs[0].public_addr = (*cfgs[0].server_addr).into(); + let (node0, runner) = testonly::Instance::new(cfgs[0].clone(), store.blocks.clone(), store.batches.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0"))); + nodes[0] = node0; for n in &nodes { n.wait_for_consensus_connections().await; diff --git a/node/actors/network/src/gossip/attestation_status.rs b/node/actors/network/src/gossip/attestation_status.rs index 3b84cd9a..7a11cbe6 100644 --- a/node/actors/network/src/gossip/attestation_status.rs +++ b/node/actors/network/src/gossip/attestation_status.rs @@ -1,6 +1,7 @@ +use std::fmt; use std::sync::Arc; -use zksync_concurrency::{ctx, sync}; +use zksync_concurrency::{ctx, sync, time}; use zksync_consensus_roles::attester; use zksync_consensus_storage::BatchStore; @@ -9,7 +10,7 @@ use crate::watch::Watch; /// An interface which is used by attesters and nodes collecting votes over gossip to determine /// which is the next batch they are all supposed to be voting on, according to the main node. #[async_trait::async_trait] -pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync { +pub trait AttestationStatusClient: 'static + fmt::Debug + Send + Sync { /// Get the next batch number for which the main node expects a batch QC to be formed. /// /// The API might return an error while genesis is being created, which we represent with `None` @@ -20,28 +21,8 @@ pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync { ) -> ctx::Result>; } -/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. -#[derive(Debug, Clone)] -pub struct LocalAttestationStatus(Arc); - -impl LocalAttestationStatus { - /// Create local attestation client form a [BatchStore]. - pub fn new(store: Arc) -> Self { - Self(store) - } -} - -#[async_trait::async_trait] -impl AttestationStatusClient for LocalAttestationStatus { - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - self.0.next_batch_to_attest(ctx).await - } -} - /// Coordinate the attestation by showing the status as seen by the main node. +#[derive(Debug, Clone)] pub struct AttestationStatus { /// Next batch number where voting is expected. /// @@ -50,12 +31,19 @@ pub struct AttestationStatus { pub next_batch_to_attest: Option, } -/// The subscription over the attestation status which votes can monitor for change. +/// The subscription over the attestation status which voters can monitor for change. pub type AttestationStatusReceiver = sync::watch::Receiver; /// A [Watch] over an [AttestationStatus] which we can use to notify components about /// changes in the batch number the main node expects attesters to vote on. -pub(crate) struct AttestationStatusWatch(Watch); +pub struct AttestationStatusWatch(Watch); + +impl fmt::Debug for AttestationStatusWatch { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("AttestationStatusWatch") + .finish_non_exhaustive() + } +} impl Default for AttestationStatusWatch { fn default() -> Self { @@ -66,13 +54,23 @@ impl Default for AttestationStatusWatch { } impl AttestationStatusWatch { + /// Create a new [AttestationStatusWatch] paired up with an [AttestationStatusRunner] to keep it up to date. + pub fn new( + client: Box, + poll_interval: time::Duration, + ) -> (Arc, AttestationStatusRunner) { + let status = Arc::new(AttestationStatusWatch::default()); + let runner = AttestationStatusRunner::new(status.clone(), client, poll_interval); + (status, runner) + } + /// Subscribes to AttestationStatus updates. - pub(crate) fn subscribe(&self) -> AttestationStatusReceiver { + pub fn subscribe(&self) -> AttestationStatusReceiver { self.0.subscribe() } /// Set the next batch number to attest on and notify subscribers it changed. - pub(crate) async fn update(&self, next_batch_to_attest: attester::BatchNumber) { + pub async fn update(&self, next_batch_to_attest: attester::BatchNumber) { let this = self.0.lock().await; this.send_if_modified(|status| { if status.next_batch_to_attest == Some(next_batch_to_attest) { @@ -83,3 +81,65 @@ impl AttestationStatusWatch { }); } } + +/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch]. +pub struct AttestationStatusRunner { + status: Arc, + client: Box, + poll_interval: time::Duration, +} + +impl AttestationStatusRunner { + /// Create a new runner to poll the main node. + fn new( + status: Arc, + client: Box, + poll_interval: time::Duration, + ) -> Self { + Self { + status, + client, + poll_interval, + } + } + + /// Run the poll loop. + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + loop { + match self.client.next_batch_to_attest(ctx).await { + Ok(Some(batch_number)) => { + self.status.update(batch_number).await; + } + Ok(None) => tracing::debug!("waiting for attestation status..."), + Err(error) => tracing::error!( + ?error, + "failed to poll attestation status, retrying later..." + ), + } + if let Err(ctx::Canceled) = ctx.sleep(self.poll_interval).await { + return Ok(()); + } + } + } +} + +/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. +#[derive(Debug, Clone)] +pub struct LocalAttestationStatusClient(Arc); + +impl LocalAttestationStatusClient { + /// Create local attestation client form a [BatchStore]. + pub fn new(store: Arc) -> Self { + Self(store) + } +} + +#[async_trait::async_trait] +impl AttestationStatusClient for LocalAttestationStatusClient { + async fn next_batch_to_attest( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + self.0.next_batch_to_attest(ctx).await + } +} diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 1b5639be..e011077f 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -13,16 +13,15 @@ //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). pub use self::attestation_status::{ - AttestationStatusClient, AttestationStatusReceiver, LocalAttestationStatus, + AttestationStatusClient, AttestationStatusReceiver, AttestationStatusRunner, + AttestationStatusWatch, LocalAttestationStatusClient, }; pub use self::batch_votes::BatchVotesPublisher; use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; -use attestation_status::AttestationStatusWatch; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; pub(crate) use validator_addrs::*; -use zksync_concurrency::time; use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -66,8 +65,6 @@ pub(crate) struct Network { pub(crate) push_validator_addrs_calls: AtomicUsize, /// Shared watch over the current attestation status as indicated by the main node. pub(crate) attestation_status: Arc, - /// Client to use to check the current attestation status on the main node. - pub(crate) attestation_status_client: Box, } impl Network { @@ -77,7 +74,7 @@ impl Network { block_store: Arc, batch_store: Arc, sender: channel::UnboundedSender, - attestation_status_client: Box, + attestation_status: Arc, ) -> Arc { Arc::new(Self { sender, @@ -93,8 +90,7 @@ impl Network { block_store, batch_store, push_validator_addrs_calls: 0.into(), - attestation_status: Arc::new(AttestationStatusWatch::default()), - attestation_status_client, + attestation_status, }) } @@ -210,36 +206,4 @@ impl Network { .wrap("persist_batch_qc")?; } } - - /// Poll the attestation status and update the watch. - pub(crate) async fn run_attestation_client(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { - if self.genesis().attesters.is_none() { - tracing::info!("no attesters in genesis, not polling the attestation status"); - return Ok(()); - }; - - const POLL_INTERVAL: time::Duration = time::Duration::seconds(5); - - loop { - match self - .attestation_status_client - .next_batch_to_attest(ctx) - .await - { - Ok(Some(batch_number)) => { - self.attestation_status.update(batch_number).await; - // We could also update the minimum batch number here, which might - // help mitigate the problem of missing a vote if the batch number - // happened to decrease. But we decided to fix it at the source, - // so the only place that is adjusted is before looking for a QC. - } - Ok(None) => tracing::debug!("waiting for attestation status..."), - Err(error) => tracing::error!( - ?error, - "failed to poll attestation status, retrying later..." - ), - } - ctx.sleep(POLL_INTERVAL).await?; - } - } } diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 6bc868f1..d4104093 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -1,6 +1,6 @@ //! Network actor maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; -use gossip::{AttestationStatusClient, AttestationStatusReceiver, BatchVotesPublisher}; +use gossip::{AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher}; use std::sync::Arc; use tracing::Instrument as _; use zksync_concurrency::{ @@ -57,15 +57,10 @@ impl Network { block_store: Arc, batch_store: Arc, pipe: ActorPipe, - attestation_status_client: Box, + attestation_status: Arc, ) -> (Arc, Runner) { - let gossip = gossip::Network::new( - cfg, - block_store, - batch_store, - pipe.send, - attestation_status_client, - ); + let gossip = + gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation_status); let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( @@ -146,9 +141,6 @@ impl Runner { // Update QC batches in the background. s.spawn(self.net.gossip.run_batch_qc_finder(ctx)); - // Update attestation status in the background. - s.spawn(self.net.gossip.run_attestation_client(ctx)); - // Fetch missing batches in the background. s.spawn(async { self.net.gossip.run_batch_fetcher(ctx).await; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 382a24f0..13608c57 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -1,7 +1,7 @@ //! Testonly utilities. #![allow(dead_code)] use crate::{ - gossip::LocalAttestationStatus, + gossip::{AttestationStatusRunner, AttestationStatusWatch, LocalAttestationStatusClient}, io::{ConsensusInputMessage, Target}, Config, GossipConfig, Network, RpcConfig, Runner, }; @@ -13,7 +13,10 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use zksync_concurrency::{ctx, ctx::channel, io, limiter, net, scope, sync}; +use zksync_concurrency::{ + ctx::{self, channel}, + io, limiter, net, scope, sync, time, +}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; use zksync_consensus_utils::pipe; @@ -157,7 +160,8 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { /// Runner for Instance. pub struct InstanceRunner { - runner: Runner, + net_runner: Runner, + attestation_status_runner: AttestationStatusRunner, terminate: channel::Receiver<()>, } @@ -165,7 +169,8 @@ impl InstanceRunner { /// Runs the instance background processes. pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { - s.spawn_bg(self.runner.run(ctx)); + s.spawn_bg(self.net_runner.run(ctx)); + s.spawn_bg(self.attestation_status_runner.run(ctx)); let _ = self.terminate.recv(ctx).await; Ok(()) }) @@ -182,16 +187,18 @@ impl Instance { block_store: Arc, batch_store: Arc, ) -> (Self, InstanceRunner) { - // For now let everyone cast their votes on batches based on their own database. - let attestation_status_client = Box::new(LocalAttestationStatus::new(batch_store.clone())); - + // Semantically we'd want this to be created at the same level as the stores, + // but doing so would introduce a lot of extra cruft in setting + // up tests that don't make any use of attestations. + let (attestation_status, attestation_status_runner) = + new_local_attestation_status(batch_store.clone()); let (actor_pipe, dispatcher_pipe) = pipe::new(); - let (net, runner) = Network::new( + let (net, net_runner) = Network::new( cfg, block_store, batch_store, actor_pipe, - attestation_status_client, + attestation_status, ); let (terminate_send, terminate_recv) = channel::bounded(1); ( @@ -201,7 +208,8 @@ impl Instance { terminate: terminate_send, }, InstanceRunner { - runner, + net_runner, + attestation_status_runner, terminate: terminate_recv, }, ) @@ -341,3 +349,13 @@ pub async fn instant_network( tracing::info!("consensus network established"); Ok(()) } + +/// Create an attestation status and its runner based on a [BatchStore]. +pub fn new_local_attestation_status( + store: Arc, +) -> (Arc, AttestationStatusRunner) { + AttestationStatusWatch::new( + Box::new(LocalAttestationStatusClient::new(store)), + time::Duration::seconds(5), + ) +} diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index fd56a421..d50ca761 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -9,11 +9,14 @@ use std::{ path::PathBuf, }; use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use zksync_concurrency::{ctx, net}; +use zksync_concurrency::{ctx, net, scope, time}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; -use zksync_consensus_network::{gossip::LocalAttestationStatus, http}; +use zksync_consensus_network::{ + gossip::{AttestationStatusRunner, AttestationStatusWatch, LocalAttestationStatusClient}, + http, +}; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner}; use zksync_consensus_utils::debug_page; @@ -259,13 +262,20 @@ impl Configs { pub async fn make_executor( &self, ctx: &ctx::Ctx, - ) -> ctx::Result<(executor::Executor, TestMemoryStorageRunner)> { + ) -> ctx::Result<(executor::Executor, TestExecutorRunner)> { let replica_store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?; let store = TestMemoryStorage::new(ctx, &self.app.genesis).await; - // We don't have an implementation of an API in these servers, we can only use the stores. - let attestation_status_client = - Box::new(LocalAttestationStatus::new(store.batches.clone())); + // We don't have an API to poll in this setup, we can only create a local store based attestation client. + let (attestation_status, attestation_status_runner) = AttestationStatusWatch::new( + Box::new(LocalAttestationStatusClient::new(store.batches.clone())), + time::Duration::seconds(1), + ); + + let runner = TestExecutorRunner { + storage_runner: store.runner, + attestation_status_runner, + }; let e = executor::Executor { config: executor::Config { @@ -307,9 +317,9 @@ impl Configs { .attester_key .as_ref() .map(|key| executor::Attester { key: key.clone() }), - attestation_status_client, + attestation_status, }; - Ok((e, store.runner)) + Ok((e, runner)) } } @@ -334,3 +344,20 @@ fn load_private_key(path: &PathBuf) -> anyhow::Result> { // Load and return a single private key. Ok(rustls_pemfile::private_key(&mut reader).map(|key| key.expect("Private key not found"))?) } + +pub struct TestExecutorRunner { + storage_runner: TestMemoryStorageRunner, + attestation_status_runner: AttestationStatusRunner, +} + +impl TestExecutorRunner { + /// Runs the storage and the attestation status. + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async { + s.spawn(self.storage_runner.run(ctx)); + s.spawn(self.attestation_status_runner.run(ctx)); + Ok(()) + }) + .await + } +} From b3d75bf7afa0f41a65fd8be5eece9d6ba99bc5a7 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 30 Jul 2024 14:06:07 +0100 Subject: [PATCH 6/9] BFT-496: Remove the subscription method --- node/actors/executor/src/lib.rs | 2 +- node/actors/network/src/lib.rs | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 844198ad..64106f6a 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -152,7 +152,7 @@ impl Executor { self.batch_store.clone(), attester, net.batch_vote_publisher(), - net.attestation_status_receiver(), + self.attestation_status.subscribe(), ); s.spawn(async { runner.run(ctx).await?; diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index d4104093..1d5f3cbd 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -1,6 +1,6 @@ //! Network actor maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; -use gossip::{AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher}; +use gossip::{AttestationStatusWatch, BatchVotesPublisher}; use std::sync::Arc; use tracing::Instrument as _; use zksync_concurrency::{ @@ -82,11 +82,6 @@ impl Network { BatchVotesPublisher(self.gossip.batch_votes.clone()) } - /// Subscribe to attestation status change notifications. - pub fn attestation_status_receiver(&self) -> AttestationStatusReceiver { - self.gossip.attestation_status.subscribe() - } - /// Handles a dispatcher message. async fn handle_message( &self, From 47042e4cf3516b259c22d8baafda47a8013ccc1f Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 30 Jul 2024 14:15:13 +0100 Subject: [PATCH 7/9] BFT-496: Use just sync::changed --- node/actors/executor/src/attestation.rs | 17 ++++++++--------- node/actors/network/src/gossip/mod.rs | 19 ++++++++----------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 80f3de3b..7f6f6528 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -51,15 +51,16 @@ impl AttesterRunner { let genesis = self.block_store.genesis().hash(); - let mut prev = None; + // Subscribe starts as seen but we don't want to miss the first item. + self.status.mark_changed(); loop { - let batch_number = - sync::wait_for_some(ctx, &mut self.status, |s| match s.next_batch_to_attest { - next if next == prev => None, - next => next, - }) - .await?; + let Some(batch_number) = sync::changed(ctx, &mut self.status) + .await? + .next_batch_to_attest + else { + continue; + }; tracing::info!(%batch_number, "attestation status"); @@ -85,8 +86,6 @@ impl AttesterRunner { .publish(attesters, &genesis, &self.attester.key, batch) .await .context("publish")?; - - prev = Some(batch_number); } } diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index e011077f..c5dcb231 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -169,20 +169,17 @@ impl Network { let mut recv_votes = self.batch_votes.subscribe(); let mut recv_status = self.attestation_status.subscribe(); - let mut prev_batch_number = None; + // Subscribe starts as seen but we don't want to miss the first item. + recv_status.mark_changed(); loop { // Wait until the status indicates that we're ready to sign the next batch. - // This is not strictly necessary but avoids repeatedly finding the same quorum, or having to skip it until it changes. - let next_batch_number = - sync::wait_for_some(ctx, &mut recv_status, |s| match s.next_batch_to_attest { - next if next == prev_batch_number => None, - next => next, - }) - .await?; - - // Next time we'll look for something new. - prev_batch_number = Some(next_batch_number); + let Some(next_batch_number) = sync::changed(ctx, &mut recv_status) + .await? + .next_batch_to_attest + else { + continue; + }; // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. self.batch_votes From 05ef82090e02de6419baf09e2db808a951e50b7c Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 30 Jul 2024 15:47:55 +0100 Subject: [PATCH 8/9] BFT-496: Move the runner under the executor --- node/actors/executor/src/attestation.rs | 96 ++++++++++++++++++- node/actors/executor/src/lib.rs | 3 +- .../network/src/gossip/attestation_status.rs | 90 +---------------- node/actors/network/src/gossip/mod.rs | 5 +- node/actors/network/src/testonly.rs | 40 ++++---- node/tools/src/config.rs | 14 +-- 6 files changed, 123 insertions(+), 125 deletions(-) diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 7f6f6528..621fa1c3 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -4,12 +4,12 @@ use crate::Attester; use anyhow::Context; use std::sync::Arc; use zksync_concurrency::{ctx, sync, time}; -use zksync_consensus_network::gossip::{AttestationStatusReceiver, BatchVotesPublisher}; +use zksync_consensus_network::gossip::{ + AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, +}; use zksync_consensus_roles::attester; use zksync_consensus_storage::{BatchStore, BlockStore}; -const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); - /// Polls the database for new batches to be signed and publishes them to the gossip channel. pub(super) struct AttesterRunner { block_store: Arc, @@ -17,6 +17,7 @@ pub(super) struct AttesterRunner { attester: Attester, publisher: BatchVotesPublisher, status: AttestationStatusReceiver, + poll_interval: time::Duration, } impl AttesterRunner { @@ -27,6 +28,7 @@ impl AttesterRunner { attester: Attester, publisher: BatchVotesPublisher, status: AttestationStatusReceiver, + poll_interval: time::Duration, ) -> Self { Self { block_store, @@ -34,6 +36,7 @@ impl AttesterRunner { attester, publisher, status, + poll_interval, } } /// Poll the database for new L1 batches and publish our signature over the batch. @@ -104,8 +107,93 @@ impl AttesterRunner { { return Ok(batch); } else { - ctx.sleep(POLL_INTERVAL).await?; + ctx.sleep(self.poll_interval).await?; + } + } + } +} + +/// An interface which is used by attesters and nodes collecting votes over gossip to determine +/// which is the next batch they are all supposed to be voting on, according to the main node. +/// +/// This is a convenience interface to be used with the [AttestationStatusRunner]. +#[async_trait::async_trait] +pub trait AttestationStatusClient: 'static + Send + Sync { + /// Get the next batch number for which the main node expects a batch QC to be formed. + /// + /// The API might return an error while genesis is being created, which we represent with `None` + /// here and mean that we'll have to try again later. + async fn next_batch_to_attest( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result>; +} + +/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch]. +/// +/// This is provided for convenience. +pub struct AttestationStatusRunner { + status: Arc, + client: Box, + poll_interval: time::Duration, +} + +impl AttestationStatusRunner { + /// Create a new runner to poll the main node. + pub fn new( + status: Arc, + client: Box, + poll_interval: time::Duration, + ) -> Self { + Self { + status, + client, + poll_interval, + } + } + + /// Runner based on a [BatchStore]. + pub fn new_from_store( + status: Arc, + store: Arc, + poll_interval: time::Duration, + ) -> Self { + Self::new( + status, + Box::new(LocalAttestationStatusClient(store)), + poll_interval, + ) + } + + /// Run the poll loop. + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + loop { + match self.client.next_batch_to_attest(ctx).await { + Ok(Some(batch_number)) => { + self.status.update(batch_number).await; + } + Ok(None) => tracing::debug!("waiting for attestation status..."), + Err(error) => tracing::error!( + ?error, + "failed to poll attestation status, retrying later..." + ), + } + if let Err(ctx::Canceled) = ctx.sleep(self.poll_interval).await { + return Ok(()); } } } } + +/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. +struct LocalAttestationStatusClient(Arc); + +#[async_trait::async_trait] +impl AttestationStatusClient for LocalAttestationStatusClient { + async fn next_batch_to_attest( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + self.0.next_batch_to_attest(ctx).await + } +} diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 64106f6a..76c19a4c 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -15,7 +15,7 @@ use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; use zksync_protobuf::kB; -mod attestation; +pub mod attestation; mod io; #[cfg(test)] mod tests; @@ -153,6 +153,7 @@ impl Executor { attester, net.batch_vote_publisher(), self.attestation_status.subscribe(), + time::Duration::seconds(1), // TODO: Move to config? ); s.spawn(async { runner.run(ctx).await?; diff --git a/node/actors/network/src/gossip/attestation_status.rs b/node/actors/network/src/gossip/attestation_status.rs index 7a11cbe6..5509bf81 100644 --- a/node/actors/network/src/gossip/attestation_status.rs +++ b/node/actors/network/src/gossip/attestation_status.rs @@ -1,26 +1,10 @@ use std::fmt; -use std::sync::Arc; -use zksync_concurrency::{ctx, sync, time}; +use zksync_concurrency::sync; use zksync_consensus_roles::attester; -use zksync_consensus_storage::BatchStore; use crate::watch::Watch; -/// An interface which is used by attesters and nodes collecting votes over gossip to determine -/// which is the next batch they are all supposed to be voting on, according to the main node. -#[async_trait::async_trait] -pub trait AttestationStatusClient: 'static + fmt::Debug + Send + Sync { - /// Get the next batch number for which the main node expects a batch QC to be formed. - /// - /// The API might return an error while genesis is being created, which we represent with `None` - /// here and mean that we'll have to try again later. - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result>; -} - /// Coordinate the attestation by showing the status as seen by the main node. #[derive(Debug, Clone)] pub struct AttestationStatus { @@ -54,16 +38,6 @@ impl Default for AttestationStatusWatch { } impl AttestationStatusWatch { - /// Create a new [AttestationStatusWatch] paired up with an [AttestationStatusRunner] to keep it up to date. - pub fn new( - client: Box, - poll_interval: time::Duration, - ) -> (Arc, AttestationStatusRunner) { - let status = Arc::new(AttestationStatusWatch::default()); - let runner = AttestationStatusRunner::new(status.clone(), client, poll_interval); - (status, runner) - } - /// Subscribes to AttestationStatus updates. pub fn subscribe(&self) -> AttestationStatusReceiver { self.0.subscribe() @@ -81,65 +55,3 @@ impl AttestationStatusWatch { }); } } - -/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch]. -pub struct AttestationStatusRunner { - status: Arc, - client: Box, - poll_interval: time::Duration, -} - -impl AttestationStatusRunner { - /// Create a new runner to poll the main node. - fn new( - status: Arc, - client: Box, - poll_interval: time::Duration, - ) -> Self { - Self { - status, - client, - poll_interval, - } - } - - /// Run the poll loop. - pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - loop { - match self.client.next_batch_to_attest(ctx).await { - Ok(Some(batch_number)) => { - self.status.update(batch_number).await; - } - Ok(None) => tracing::debug!("waiting for attestation status..."), - Err(error) => tracing::error!( - ?error, - "failed to poll attestation status, retrying later..." - ), - } - if let Err(ctx::Canceled) = ctx.sleep(self.poll_interval).await { - return Ok(()); - } - } - } -} - -/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. -#[derive(Debug, Clone)] -pub struct LocalAttestationStatusClient(Arc); - -impl LocalAttestationStatusClient { - /// Create local attestation client form a [BatchStore]. - pub fn new(store: Arc) -> Self { - Self(store) - } -} - -#[async_trait::async_trait] -impl AttestationStatusClient for LocalAttestationStatusClient { - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - self.0.next_batch_to_attest(ctx).await - } -} diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index c5dcb231..5da55cce 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -12,10 +12,7 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). -pub use self::attestation_status::{ - AttestationStatusClient, AttestationStatusReceiver, AttestationStatusRunner, - AttestationStatusWatch, LocalAttestationStatusClient, -}; +pub use self::attestation_status::{AttestationStatusReceiver, AttestationStatusWatch}; pub use self::batch_votes::BatchVotesPublisher; use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 13608c57..d8d8c8ed 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -1,7 +1,7 @@ //! Testonly utilities. #![allow(dead_code)] use crate::{ - gossip::{AttestationStatusRunner, AttestationStatusWatch, LocalAttestationStatusClient}, + gossip::AttestationStatusWatch, io::{ConsensusInputMessage, Target}, Config, GossipConfig, Network, RpcConfig, Runner, }; @@ -161,7 +161,8 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { /// Runner for Instance. pub struct InstanceRunner { net_runner: Runner, - attestation_status_runner: AttestationStatusRunner, + attestation_status: Arc, + batch_store: Arc, terminate: channel::Receiver<()>, } @@ -170,7 +171,16 @@ impl InstanceRunner { pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { s.spawn_bg(self.net_runner.run(ctx)); - s.spawn_bg(self.attestation_status_runner.run(ctx)); + s.spawn_bg(async { + loop { + if let Ok(Some(n)) = self.batch_store.next_batch_to_attest(ctx).await { + self.attestation_status.update(n).await; + } + if ctx.sleep(time::Duration::seconds(1)).await.is_err() { + return Ok(()); + } + } + }); let _ = self.terminate.recv(ctx).await; Ok(()) }) @@ -188,17 +198,16 @@ impl Instance { batch_store: Arc, ) -> (Self, InstanceRunner) { // Semantically we'd want this to be created at the same level as the stores, - // but doing so would introduce a lot of extra cruft in setting - // up tests that don't make any use of attestations. - let (attestation_status, attestation_status_runner) = - new_local_attestation_status(batch_store.clone()); + // but doing so would introduce a lot of extra cruft in setting up tests. + let attestation_status = Arc::new(AttestationStatusWatch::default()); + let (actor_pipe, dispatcher_pipe) = pipe::new(); let (net, net_runner) = Network::new( cfg, block_store, - batch_store, + batch_store.clone(), actor_pipe, - attestation_status, + attestation_status.clone(), ); let (terminate_send, terminate_recv) = channel::bounded(1); ( @@ -209,7 +218,8 @@ impl Instance { }, InstanceRunner { net_runner, - attestation_status_runner, + attestation_status, + batch_store, terminate: terminate_recv, }, ) @@ -349,13 +359,3 @@ pub async fn instant_network( tracing::info!("consensus network established"); Ok(()) } - -/// Create an attestation status and its runner based on a [BatchStore]. -pub fn new_local_attestation_status( - store: Arc, -) -> (Arc, AttestationStatusRunner) { - AttestationStatusWatch::new( - Box::new(LocalAttestationStatusClient::new(store)), - time::Duration::seconds(5), - ) -} diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index d50ca761..24897611 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -7,16 +7,14 @@ use std::{ fs, io, net::SocketAddr, path::PathBuf, + sync::Arc, }; use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer}; use zksync_concurrency::{ctx, net, scope, time}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; -use zksync_consensus_executor as executor; -use zksync_consensus_network::{ - gossip::{AttestationStatusRunner, AttestationStatusWatch, LocalAttestationStatusClient}, - http, -}; +use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner}; +use zksync_consensus_network::{gossip::AttestationStatusWatch, http}; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner}; use zksync_consensus_utils::debug_page; @@ -267,8 +265,10 @@ impl Configs { let store = TestMemoryStorage::new(ctx, &self.app.genesis).await; // We don't have an API to poll in this setup, we can only create a local store based attestation client. - let (attestation_status, attestation_status_runner) = AttestationStatusWatch::new( - Box::new(LocalAttestationStatusClient::new(store.batches.clone())), + let attestation_status = Arc::new(AttestationStatusWatch::default()); + let attestation_status_runner = AttestationStatusRunner::new_from_store( + attestation_status.clone(), + store.batches.clone(), time::Duration::seconds(1), ); From 45f8d9b873b3254b485d17ed97ed8bec9f884363 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 30 Jul 2024 15:55:34 +0100 Subject: [PATCH 9/9] BFT-496: Move the DB poll interval to Executor Config --- node/actors/executor/src/lib.rs | 5 ++++- node/actors/executor/src/tests.rs | 1 + node/tools/src/config.rs | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 76c19a4c..adaf4016 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -70,6 +70,9 @@ pub struct Config { /// Http debug page configuration. /// If None, debug page is disabled pub debug_page: Option, + + /// How often to poll the database looking for the batch commitment. + pub batch_poll_interval: time::Duration, } impl Config { @@ -153,7 +156,7 @@ impl Executor { attester, net.batch_vote_publisher(), self.attestation_status.subscribe(), - time::Duration::seconds(1), // TODO: Move to config? + self.config.batch_poll_interval, ); s.spawn(async { runner.run(ctx).await?; diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 491c19b3..6ce5c057 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -23,6 +23,7 @@ fn config(cfg: &network::Config) -> Config { gossip_static_outbound: cfg.gossip.static_outbound.clone(), rpc: cfg.rpc.clone(), debug_page: None, + batch_poll_interval: time::Duration::seconds(1), } } diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 24897611..f659bdb3 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -298,6 +298,7 @@ impl Configs { .expect("Could not obtain private key for debug page"), } }), + batch_poll_interval: time::Duration::seconds(1), }, block_store: store.blocks, batch_store: store.batches,