diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 9f907f14..bc11e121 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -14,7 +14,7 @@ use zksync_concurrency::{ }, oneshot, scope, }; -use zksync_consensus_network as network; +use zksync_consensus_network::{self as network}; use zksync_consensus_roles::validator; use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStore}; use zksync_consensus_utils::pipe; @@ -135,9 +135,11 @@ impl Test { for (i, net) in nets.into_iter().enumerate() { let store = TestMemoryStorage::new(ctx, genesis).await; s.spawn_bg(async { Ok(store.runner.run(ctx).await?) }); + if self.nodes[i].0 == Behavior::Honest { honest.push(store.blocks.clone()); } + nodes.push(Node { net, behavior: self.nodes[i].0, diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index c094e429..844198ad 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -9,7 +9,7 @@ use std::{ }; use zksync_concurrency::{ctx, limiter, net, scope, time}; use zksync_consensus_bft as bft; -use zksync_consensus_network::{self as network, gossip::AttestationStatusClient}; +use zksync_consensus_network::{self as network, gossip::AttestationStatusWatch}; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; @@ -97,8 +97,8 @@ pub struct Executor { pub validator: Option, /// Validator-specific node data. pub attester: Option, - /// Client to use to poll attestation status: either through the main node API or the DB. - pub attestation_status_client: Box, + /// Status showing where the main node wants attester to cast their votes. + pub attestation_status: Arc, } impl Executor { @@ -140,7 +140,7 @@ impl Executor { self.block_store.clone(), self.batch_store.clone(), network_actor_pipe, - self.attestation_status_client, + self.attestation_status.clone(), ); net.register_metrics(); s.spawn(async { runner.run(ctx).await.context("Network stopped") }); diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 22ef5a84..491c19b3 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -4,10 +4,7 @@ use rand::Rng as _; use tracing::Instrument as _; use zksync_concurrency::testonly::abort_on_panic; use zksync_consensus_bft as bft; -use zksync_consensus_network::{ - gossip::LocalAttestationStatus, - testonly::{new_configs, new_fullnode}, -}; +use zksync_consensus_network::testonly::{new_configs, new_fullnode}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; use zksync_consensus_storage::{ testonly::{in_memory, TestMemoryStorage}, @@ -29,12 +26,10 @@ fn config(cfg: &network::Config) -> Config { } } -/// The test executors below are not running with attesters, so it doesn't matter if the clients -/// are returning views based on the store of main node or each to their own. For simplicity this -/// returns an implementation that queries the local store of each instance. Alternatively we -/// could implement an instance that never queries anything. -fn mk_attestation_status_client(batch_store: &Arc) -> impl AttestationStatusClient { - LocalAttestationStatus::new(batch_store.clone()) +/// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch] +/// that will never be updated. +fn never_attest() -> Arc { + Arc::new(AttestationStatusWatch::default()) } fn validator( @@ -43,7 +38,6 @@ fn validator( batch_store: Arc, replica_store: impl ReplicaStore, ) -> Executor { - let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store)); Executor { config: config(cfg), block_store, @@ -54,7 +48,7 @@ fn validator( payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), attester: None, - attestation_status_client, + attestation_status: never_attest(), } } @@ -63,14 +57,13 @@ fn fullnode( block_store: Arc, batch_store: Arc, ) -> Executor { - let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store)); Executor { config: config(cfg), block_store, batch_store, validator: None, attester: None, - attestation_status_client, + attestation_status: never_attest(), } } diff --git a/node/actors/network/src/consensus/tests.rs b/node/actors/network/src/consensus/tests.rs index 60049757..052aea13 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/actors/network/src/consensus/tests.rs @@ -256,9 +256,11 @@ async fn test_address_change() { // should get reconstructed. cfgs[0].server_addr = net::tcp::testonly::reserve_listener(); cfgs[0].public_addr = (*cfgs[0].server_addr).into(); + let (node0, runner) = testonly::Instance::new(cfgs[0].clone(), store.blocks.clone(), store.batches.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0"))); + nodes[0] = node0; for n in &nodes { n.wait_for_consensus_connections().await; diff --git a/node/actors/network/src/gossip/attestation_status.rs b/node/actors/network/src/gossip/attestation_status.rs index 3b84cd9a..7a11cbe6 100644 --- a/node/actors/network/src/gossip/attestation_status.rs +++ b/node/actors/network/src/gossip/attestation_status.rs @@ -1,6 +1,7 @@ +use std::fmt; use std::sync::Arc; -use zksync_concurrency::{ctx, sync}; +use zksync_concurrency::{ctx, sync, time}; use zksync_consensus_roles::attester; use zksync_consensus_storage::BatchStore; @@ -9,7 +10,7 @@ use crate::watch::Watch; /// An interface which is used by attesters and nodes collecting votes over gossip to determine /// which is the next batch they are all supposed to be voting on, according to the main node. #[async_trait::async_trait] -pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync { +pub trait AttestationStatusClient: 'static + fmt::Debug + Send + Sync { /// Get the next batch number for which the main node expects a batch QC to be formed. /// /// The API might return an error while genesis is being created, which we represent with `None` @@ -20,28 +21,8 @@ pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync { ) -> ctx::Result>; } -/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. -#[derive(Debug, Clone)] -pub struct LocalAttestationStatus(Arc); - -impl LocalAttestationStatus { - /// Create local attestation client form a [BatchStore]. - pub fn new(store: Arc) -> Self { - Self(store) - } -} - -#[async_trait::async_trait] -impl AttestationStatusClient for LocalAttestationStatus { - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - self.0.next_batch_to_attest(ctx).await - } -} - /// Coordinate the attestation by showing the status as seen by the main node. +#[derive(Debug, Clone)] pub struct AttestationStatus { /// Next batch number where voting is expected. /// @@ -50,12 +31,19 @@ pub struct AttestationStatus { pub next_batch_to_attest: Option, } -/// The subscription over the attestation status which votes can monitor for change. +/// The subscription over the attestation status which voters can monitor for change. pub type AttestationStatusReceiver = sync::watch::Receiver; /// A [Watch] over an [AttestationStatus] which we can use to notify components about /// changes in the batch number the main node expects attesters to vote on. -pub(crate) struct AttestationStatusWatch(Watch); +pub struct AttestationStatusWatch(Watch); + +impl fmt::Debug for AttestationStatusWatch { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("AttestationStatusWatch") + .finish_non_exhaustive() + } +} impl Default for AttestationStatusWatch { fn default() -> Self { @@ -66,13 +54,23 @@ impl Default for AttestationStatusWatch { } impl AttestationStatusWatch { + /// Create a new [AttestationStatusWatch] paired up with an [AttestationStatusRunner] to keep it up to date. + pub fn new( + client: Box, + poll_interval: time::Duration, + ) -> (Arc, AttestationStatusRunner) { + let status = Arc::new(AttestationStatusWatch::default()); + let runner = AttestationStatusRunner::new(status.clone(), client, poll_interval); + (status, runner) + } + /// Subscribes to AttestationStatus updates. - pub(crate) fn subscribe(&self) -> AttestationStatusReceiver { + pub fn subscribe(&self) -> AttestationStatusReceiver { self.0.subscribe() } /// Set the next batch number to attest on and notify subscribers it changed. - pub(crate) async fn update(&self, next_batch_to_attest: attester::BatchNumber) { + pub async fn update(&self, next_batch_to_attest: attester::BatchNumber) { let this = self.0.lock().await; this.send_if_modified(|status| { if status.next_batch_to_attest == Some(next_batch_to_attest) { @@ -83,3 +81,65 @@ impl AttestationStatusWatch { }); } } + +/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch]. +pub struct AttestationStatusRunner { + status: Arc, + client: Box, + poll_interval: time::Duration, +} + +impl AttestationStatusRunner { + /// Create a new runner to poll the main node. + fn new( + status: Arc, + client: Box, + poll_interval: time::Duration, + ) -> Self { + Self { + status, + client, + poll_interval, + } + } + + /// Run the poll loop. + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + loop { + match self.client.next_batch_to_attest(ctx).await { + Ok(Some(batch_number)) => { + self.status.update(batch_number).await; + } + Ok(None) => tracing::debug!("waiting for attestation status..."), + Err(error) => tracing::error!( + ?error, + "failed to poll attestation status, retrying later..." + ), + } + if let Err(ctx::Canceled) = ctx.sleep(self.poll_interval).await { + return Ok(()); + } + } + } +} + +/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. +#[derive(Debug, Clone)] +pub struct LocalAttestationStatusClient(Arc); + +impl LocalAttestationStatusClient { + /// Create local attestation client form a [BatchStore]. + pub fn new(store: Arc) -> Self { + Self(store) + } +} + +#[async_trait::async_trait] +impl AttestationStatusClient for LocalAttestationStatusClient { + async fn next_batch_to_attest( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + self.0.next_batch_to_attest(ctx).await + } +} diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 1b5639be..e011077f 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -13,16 +13,15 @@ //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). pub use self::attestation_status::{ - AttestationStatusClient, AttestationStatusReceiver, LocalAttestationStatus, + AttestationStatusClient, AttestationStatusReceiver, AttestationStatusRunner, + AttestationStatusWatch, LocalAttestationStatusClient, }; pub use self::batch_votes::BatchVotesPublisher; use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; -use attestation_status::AttestationStatusWatch; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; pub(crate) use validator_addrs::*; -use zksync_concurrency::time; use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -66,8 +65,6 @@ pub(crate) struct Network { pub(crate) push_validator_addrs_calls: AtomicUsize, /// Shared watch over the current attestation status as indicated by the main node. pub(crate) attestation_status: Arc, - /// Client to use to check the current attestation status on the main node. - pub(crate) attestation_status_client: Box, } impl Network { @@ -77,7 +74,7 @@ impl Network { block_store: Arc, batch_store: Arc, sender: channel::UnboundedSender, - attestation_status_client: Box, + attestation_status: Arc, ) -> Arc { Arc::new(Self { sender, @@ -93,8 +90,7 @@ impl Network { block_store, batch_store, push_validator_addrs_calls: 0.into(), - attestation_status: Arc::new(AttestationStatusWatch::default()), - attestation_status_client, + attestation_status, }) } @@ -210,36 +206,4 @@ impl Network { .wrap("persist_batch_qc")?; } } - - /// Poll the attestation status and update the watch. - pub(crate) async fn run_attestation_client(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { - if self.genesis().attesters.is_none() { - tracing::info!("no attesters in genesis, not polling the attestation status"); - return Ok(()); - }; - - const POLL_INTERVAL: time::Duration = time::Duration::seconds(5); - - loop { - match self - .attestation_status_client - .next_batch_to_attest(ctx) - .await - { - Ok(Some(batch_number)) => { - self.attestation_status.update(batch_number).await; - // We could also update the minimum batch number here, which might - // help mitigate the problem of missing a vote if the batch number - // happened to decrease. But we decided to fix it at the source, - // so the only place that is adjusted is before looking for a QC. - } - Ok(None) => tracing::debug!("waiting for attestation status..."), - Err(error) => tracing::error!( - ?error, - "failed to poll attestation status, retrying later..." - ), - } - ctx.sleep(POLL_INTERVAL).await?; - } - } } diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 6bc868f1..d4104093 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -1,6 +1,6 @@ //! Network actor maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; -use gossip::{AttestationStatusClient, AttestationStatusReceiver, BatchVotesPublisher}; +use gossip::{AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher}; use std::sync::Arc; use tracing::Instrument as _; use zksync_concurrency::{ @@ -57,15 +57,10 @@ impl Network { block_store: Arc, batch_store: Arc, pipe: ActorPipe, - attestation_status_client: Box, + attestation_status: Arc, ) -> (Arc, Runner) { - let gossip = gossip::Network::new( - cfg, - block_store, - batch_store, - pipe.send, - attestation_status_client, - ); + let gossip = + gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation_status); let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( @@ -146,9 +141,6 @@ impl Runner { // Update QC batches in the background. s.spawn(self.net.gossip.run_batch_qc_finder(ctx)); - // Update attestation status in the background. - s.spawn(self.net.gossip.run_attestation_client(ctx)); - // Fetch missing batches in the background. s.spawn(async { self.net.gossip.run_batch_fetcher(ctx).await; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 382a24f0..13608c57 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -1,7 +1,7 @@ //! Testonly utilities. #![allow(dead_code)] use crate::{ - gossip::LocalAttestationStatus, + gossip::{AttestationStatusRunner, AttestationStatusWatch, LocalAttestationStatusClient}, io::{ConsensusInputMessage, Target}, Config, GossipConfig, Network, RpcConfig, Runner, }; @@ -13,7 +13,10 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use zksync_concurrency::{ctx, ctx::channel, io, limiter, net, scope, sync}; +use zksync_concurrency::{ + ctx::{self, channel}, + io, limiter, net, scope, sync, time, +}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; use zksync_consensus_utils::pipe; @@ -157,7 +160,8 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { /// Runner for Instance. pub struct InstanceRunner { - runner: Runner, + net_runner: Runner, + attestation_status_runner: AttestationStatusRunner, terminate: channel::Receiver<()>, } @@ -165,7 +169,8 @@ impl InstanceRunner { /// Runs the instance background processes. pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { - s.spawn_bg(self.runner.run(ctx)); + s.spawn_bg(self.net_runner.run(ctx)); + s.spawn_bg(self.attestation_status_runner.run(ctx)); let _ = self.terminate.recv(ctx).await; Ok(()) }) @@ -182,16 +187,18 @@ impl Instance { block_store: Arc, batch_store: Arc, ) -> (Self, InstanceRunner) { - // For now let everyone cast their votes on batches based on their own database. - let attestation_status_client = Box::new(LocalAttestationStatus::new(batch_store.clone())); - + // Semantically we'd want this to be created at the same level as the stores, + // but doing so would introduce a lot of extra cruft in setting + // up tests that don't make any use of attestations. + let (attestation_status, attestation_status_runner) = + new_local_attestation_status(batch_store.clone()); let (actor_pipe, dispatcher_pipe) = pipe::new(); - let (net, runner) = Network::new( + let (net, net_runner) = Network::new( cfg, block_store, batch_store, actor_pipe, - attestation_status_client, + attestation_status, ); let (terminate_send, terminate_recv) = channel::bounded(1); ( @@ -201,7 +208,8 @@ impl Instance { terminate: terminate_send, }, InstanceRunner { - runner, + net_runner, + attestation_status_runner, terminate: terminate_recv, }, ) @@ -341,3 +349,13 @@ pub async fn instant_network( tracing::info!("consensus network established"); Ok(()) } + +/// Create an attestation status and its runner based on a [BatchStore]. +pub fn new_local_attestation_status( + store: Arc, +) -> (Arc, AttestationStatusRunner) { + AttestationStatusWatch::new( + Box::new(LocalAttestationStatusClient::new(store)), + time::Duration::seconds(5), + ) +} diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index fd56a421..d50ca761 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -9,11 +9,14 @@ use std::{ path::PathBuf, }; use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use zksync_concurrency::{ctx, net}; +use zksync_concurrency::{ctx, net, scope, time}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; -use zksync_consensus_network::{gossip::LocalAttestationStatus, http}; +use zksync_consensus_network::{ + gossip::{AttestationStatusRunner, AttestationStatusWatch, LocalAttestationStatusClient}, + http, +}; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner}; use zksync_consensus_utils::debug_page; @@ -259,13 +262,20 @@ impl Configs { pub async fn make_executor( &self, ctx: &ctx::Ctx, - ) -> ctx::Result<(executor::Executor, TestMemoryStorageRunner)> { + ) -> ctx::Result<(executor::Executor, TestExecutorRunner)> { let replica_store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?; let store = TestMemoryStorage::new(ctx, &self.app.genesis).await; - // We don't have an implementation of an API in these servers, we can only use the stores. - let attestation_status_client = - Box::new(LocalAttestationStatus::new(store.batches.clone())); + // We don't have an API to poll in this setup, we can only create a local store based attestation client. + let (attestation_status, attestation_status_runner) = AttestationStatusWatch::new( + Box::new(LocalAttestationStatusClient::new(store.batches.clone())), + time::Duration::seconds(1), + ); + + let runner = TestExecutorRunner { + storage_runner: store.runner, + attestation_status_runner, + }; let e = executor::Executor { config: executor::Config { @@ -307,9 +317,9 @@ impl Configs { .attester_key .as_ref() .map(|key| executor::Attester { key: key.clone() }), - attestation_status_client, + attestation_status, }; - Ok((e, store.runner)) + Ok((e, runner)) } } @@ -334,3 +344,20 @@ fn load_private_key(path: &PathBuf) -> anyhow::Result> { // Load and return a single private key. Ok(rustls_pemfile::private_key(&mut reader).map(|key| key.expect("Private key not found"))?) } + +pub struct TestExecutorRunner { + storage_runner: TestMemoryStorageRunner, + attestation_status_runner: AttestationStatusRunner, +} + +impl TestExecutorRunner { + /// Runs the storage and the attestation status. + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async { + s.spawn(self.storage_runner.run(ctx)); + s.spawn(self.attestation_status_runner.run(ctx)); + Ok(()) + }) + .await + } +}