From b9c5fa7b435bf5239d66eca01f4d0f1b25f29aca Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 16 Aug 2024 08:52:44 +0200 Subject: [PATCH] Support for dynamic attester committee (#175) It will support collecting votes for a single batch for now. I've extended the rpc to support both pushing and pulling the votes (with dynamic committee we cannot really collect future votes). I've reworked the API to concentrate the api surface in one place. I've also moved the rpc registry from rust code to protobuf enum to make enforcing compatibility easier. I'll prepare a corresponding PR in zksync-era before merging to make sure that the integration is smooth e2e. --------- Co-authored-by: Akosh Farkash --- node/actors/executor/src/attestation.rs | 256 ------------ node/actors/executor/src/io.rs | 6 +- node/actors/executor/src/lib.rs | 47 +-- node/actors/executor/src/tests.rs | 102 +---- node/actors/network/src/config.rs | 7 +- .../network/src/gossip/attestation/metrics.rs | 19 + .../network/src/gossip/attestation/mod.rs | 368 ++++++++++++++++++ .../network/src/gossip/attestation/tests.rs | 200 ++++++++++ .../network/src/gossip/attestation_status.rs | 93 ----- node/actors/network/src/gossip/batch_votes.rs | 333 ---------------- node/actors/network/src/gossip/metrics.rs | 37 -- node/actors/network/src/gossip/mod.rs | 77 +--- node/actors/network/src/gossip/runner.rs | 111 +++--- node/actors/network/src/gossip/testonly.rs | 6 +- .../network/src/gossip/tests/fetch_batches.rs | 4 - node/actors/network/src/gossip/tests/mod.rs | 362 +++++------------ .../network/src/gossip/tests/syncing.rs | 2 - node/actors/network/src/lib.rs | 17 +- node/actors/network/src/proto/gossip.proto | 8 + node/actors/network/src/proto/rpc.proto | 15 + node/actors/network/src/rpc/consensus.rs | 5 +- node/actors/network/src/rpc/get_batch.rs | 5 +- node/actors/network/src/rpc/get_block.rs | 5 +- node/actors/network/src/rpc/mod.rs | 35 +- node/actors/network/src/rpc/ping.rs | 5 +- .../network/src/rpc/push_batch_store_state.rs | 5 +- .../network/src/rpc/push_batch_votes.rs | 71 +++- .../network/src/rpc/push_block_store_state.rs | 5 +- .../network/src/rpc/push_validator_addrs.rs | 5 +- node/actors/network/src/rpc/tests.rs | 24 +- node/actors/network/src/testonly.rs | 64 +-- .../libs/roles/src/attester/messages/batch.rs | 80 ++-- node/libs/roles/src/attester/tests.rs | 41 +- node/libs/roles/src/validator/testonly.rs | 11 +- node/libs/storage/src/batch_store/mod.rs | 97 ----- node/libs/storage/src/testonly/in_memory.rs | 62 +-- node/libs/storage/src/testonly/mod.rs | 2 +- node/libs/storage/src/tests.rs | 2 +- node/tools/src/config.rs | 47 +-- 39 files changed, 998 insertions(+), 1643 deletions(-) delete mode 100644 node/actors/executor/src/attestation.rs create mode 100644 node/actors/network/src/gossip/attestation/metrics.rs create mode 100644 node/actors/network/src/gossip/attestation/mod.rs create mode 100644 node/actors/network/src/gossip/attestation/tests.rs delete mode 100644 node/actors/network/src/gossip/attestation_status.rs delete mode 100644 node/actors/network/src/gossip/batch_votes.rs delete mode 100644 node/actors/network/src/gossip/metrics.rs create mode 100644 node/actors/network/src/proto/rpc.proto diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs deleted file mode 100644 index e0bd63d5..00000000 --- a/node/actors/executor/src/attestation.rs +++ /dev/null @@ -1,256 +0,0 @@ -//! Module to publish attestations over batches. - -use crate::Attester; -use anyhow::Context; -use std::sync::Arc; -use tracing::Instrument; -use zksync_concurrency::{ctx, sync, time}; -use zksync_consensus_network::gossip::{ - AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, -}; -use zksync_consensus_roles::attester; -use zksync_consensus_storage::{BatchStore, BlockStore}; - -/// Polls the database for new batches to be signed and publishes them to the gossip channel. -pub(super) struct AttesterRunner { - block_store: Arc, - batch_store: Arc, - attester: Attester, - publisher: BatchVotesPublisher, - status: AttestationStatusReceiver, - poll_interval: time::Duration, -} - -impl AttesterRunner { - /// Create a new instance of a runner. - pub(super) fn new( - block_store: Arc, - batch_store: Arc, - attester: Attester, - publisher: BatchVotesPublisher, - status: AttestationStatusReceiver, - poll_interval: time::Duration, - ) -> Self { - Self { - block_store, - batch_store, - attester, - publisher, - status, - poll_interval, - } - } - /// Poll the database for new L1 batches and publish our signature over the batch. - pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { - let public_key = self.attester.key.public(); - // TODO: In the future when we have attester rotation these checks will have to be checked inside the loop. - let Some(attesters) = self.block_store.genesis().attesters.as_ref() else { - tracing::warn!("Attester key is set, but the attester committee is empty."); - return Ok(()); - }; - if !attesters.contains(&public_key) { - tracing::warn!("Attester key is set, but not part of the attester committee."); - return Ok(()); - } - - let genesis = self.block_store.genesis().hash(); - - // Subscribe starts as seen but we don't want to miss the first item. - self.status.mark_changed(); - - loop { - async { - let Some(batch_number) = sync::changed(ctx, &mut self.status) - .instrument(tracing::info_span!("wait_for_attestation_status")) - .await? - .next_batch_to_attest - else { - return Ok(()); - }; - - tracing::info!(%batch_number, "attestation status"); - - // We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence - // to be indicated in memory (which itself relies on polling). This happens once we have the commitment, - // which for nodes that get the blocks through BFT should happen after execution. Nodes which - // rely on batch sync don't participate in attestations as they need the batch on L1 first. - self.batch_store - .wait_until_persisted(ctx, batch_number) - .await?; - - // Try to get the next batch to sign; the commitment might not be available just yet. - let batch = AttesterRunner::wait_for_batch_to_sign( - ctx, - batch_number, - &self.batch_store, - self.poll_interval, - ) - .await?; - - // The certificates might be collected out of order because of how gossip works; - // we could query the DB to see if we already have a QC, or we can just go ahead - // and publish our vote, and let others ignore it. - - tracing::info!(%batch_number, "publishing attestation"); - - // We only have to publish a vote once; future peers can pull it from the register. - self.publisher - .publish(attesters, &genesis, &self.attester.key, batch) - .await - .context("publish")?; - - ctx::Ok(()) - } - .instrument(tracing::info_span!("attestation_iter")) - .await?; - } - } - - /// Wait for the batch commitment to become available. - #[tracing::instrument(skip_all, fields(l1_batch = %number))] - async fn wait_for_batch_to_sign( - ctx: &ctx::Ctx, - number: attester::BatchNumber, - batch_store: &BatchStore, - poll_interval: time::Duration, - ) -> ctx::Result { - loop { - if let Some(batch) = batch_store - .batch_to_sign(ctx, number) - .await - .context("batch_to_sign")? - { - return Ok(batch); - } else { - ctx.sleep(poll_interval).await?; - } - } - } -} - -/// An interface which is used by attesters and nodes collecting votes over gossip to determine -/// which is the next batch they are all supposed to be voting on, according to the main node. -/// -/// This is a convenience interface to be used with the [AttestationStatusRunner]. -#[async_trait::async_trait] -pub trait AttestationStatusClient: 'static + Send + Sync { - /// Get the next batch number for which the main node expects a batch QC to be formed. - /// - /// The API might return an error while genesis is being created, which we represent with `None` - /// here and mean that we'll have to try again later. - /// - /// The genesis hash is returned along with the new batch number to facilitate detecting reorgs - /// on the main node as soon as possible and prevent inconsistent state from entering the system. - async fn attestation_status( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result>; -} - -/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch]. -/// -/// This is provided for convenience. -pub struct AttestationStatusRunner { - status: Arc, - client: Box, - poll_interval: time::Duration, -} - -impl AttestationStatusRunner { - /// Create a new [AttestationStatusWatch] and an [AttestationStatusRunner] to poll the main node. - /// - /// It polls the [AttestationStatusClient] until it returns a value to initialize the status with. - pub async fn init( - _ctx: &ctx::Ctx, - client: Box, - poll_interval: time::Duration, - genesis: attester::GenesisHash, - ) -> ctx::Result<(Arc, Self)> { - let status = Arc::new(AttestationStatusWatch::new(genesis)); - let runner = Self { - status: status.clone(), - client, - poll_interval, - }; - // This would initialise the status to some value, however the EN was rolled out first without the main node API. - // runner.poll_until_some(ctx).await?; - Ok((status, runner)) - } - - /// Initialize an [AttestationStatusWatch] based on a [BatchStore] and return it along with the [AttestationStatusRunner]. - pub async fn init_from_store( - ctx: &ctx::Ctx, - batch_store: Arc, - poll_interval: time::Duration, - genesis: attester::GenesisHash, - ) -> ctx::Result<(Arc, Self)> { - Self::init( - ctx, - Box::new(LocalAttestationStatusClient { - genesis, - batch_store, - }), - poll_interval, - genesis, - ) - .await - } - - /// Run the poll loop. - pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - match self.poll_forever(ctx).await { - Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), - Err(ctx::Error::Internal(err)) => Err(err), - } - } - - /// Poll the client forever in a loop or until canceled. - async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { - loop { - self.poll_until_some(ctx).await?; - ctx.sleep(self.poll_interval).await?; - } - } - - /// Poll the client until some data is returned and write it into the status. - async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { - loop { - match self.client.attestation_status(ctx).await { - Ok(Some((genesis, next_batch_to_attest))) => { - self.status.update(genesis, next_batch_to_attest).await?; - return Ok(()); - } - Ok(None) => { - tracing::info!("waiting for attestation status...") - } - Err(error) => { - tracing::error!( - ?error, - "failed to poll attestation status, retrying later..." - ) - } - } - ctx.sleep(self.poll_interval).await?; - } - } -} - -/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. -struct LocalAttestationStatusClient { - /// We don't expect the genesis to change while the main node is running, - /// so we can just cache the genesis hash and return it for every request. - genesis: attester::GenesisHash, - batch_store: Arc, -} - -#[async_trait::async_trait] -impl AttestationStatusClient for LocalAttestationStatusClient { - async fn attestation_status( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - let next_batch_to_attest = self.batch_store.next_batch_to_attest(ctx).await?; - - Ok(next_batch_to_attest.map(|n| (self.genesis, n))) - } -} diff --git a/node/actors/executor/src/io.rs b/node/actors/executor/src/io.rs index 914ec4b8..e6fb7dd4 100644 --- a/node/actors/executor/src/io.rs +++ b/node/actors/executor/src/io.rs @@ -36,8 +36,8 @@ impl Dispatcher { } /// Method to start the IO dispatcher. It is simply a loop to receive messages from the actors and then forward them. - pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - scope::run!(ctx, |ctx, s| async { + pub(super) async fn run(mut self, ctx: &ctx::Ctx) { + let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { // Start a task to handle the messages from the consensus actor. s.spawn(async { while let Ok(msg) = self.consensus_output.recv(ctx).await { @@ -65,6 +65,6 @@ impl Dispatcher { Ok(()) }) - .await + .await; } } diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index adaf4016..2cd86e34 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -2,20 +2,19 @@ use crate::io::Dispatcher; use anyhow::Context as _; use network::http; -pub use network::RpcConfig; +pub use network::{gossip::attestation, RpcConfig}; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; use zksync_concurrency::{ctx, limiter, net, scope, time}; use zksync_consensus_bft as bft; -use zksync_consensus_network::{self as network, gossip::AttestationStatusWatch}; -use zksync_consensus_roles::{attester, node, validator}; +use zksync_consensus_network as network; +use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; use zksync_protobuf::kB; -pub mod attestation; mod io; #[cfg(test)] mod tests; @@ -31,13 +30,6 @@ pub struct Validator { pub payload_manager: Box, } -/// Validator-related part of [`Executor`]. -#[derive(Debug)] -pub struct Attester { - /// Consensus network configuration. - pub key: attester::SecretKey, -} - /// Config of the node executor. #[derive(Debug)] pub struct Config { @@ -98,10 +90,9 @@ pub struct Executor { pub batch_store: Arc, /// Validator-specific node data. pub validator: Option, - /// Validator-specific node data. - pub attester: Option, - /// Status showing where the main node wants attester to cast their votes. - pub attestation_status: Arc, + /// Attestation controller. Caller should actively configure the batch + /// for which the attestation votes should be collected. + pub attestation: Arc, } impl Executor { @@ -112,7 +103,6 @@ impl Executor { public_addr: self.config.public_addr.clone(), gossip: self.config.gossip(), validator_key: self.validator.as_ref().map(|v| v.key.clone()), - attester_key: self.attester.as_ref().map(|a| a.key.clone()), ping_timeout: Some(time::Duration::seconds(10)), max_block_size: self.config.max_payload_size.saturating_add(kB), max_batch_size: self.config.max_batch_size.saturating_add(kB), @@ -137,35 +127,20 @@ impl Executor { tracing::debug!("Starting actors in separate threads."); scope::run!(ctx, |ctx, s| async { - s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") }); + s.spawn(async { + dispatcher.run(ctx).await; + Ok(()) + }); let (net, runner) = network::Network::new( network_config, self.block_store.clone(), self.batch_store.clone(), network_actor_pipe, - self.attestation_status.clone(), + self.attestation, ); net.register_metrics(); s.spawn(async { runner.run(ctx).await.context("Network stopped") }); - if let Some(attester) = self.attester { - tracing::info!("Running the node in attester mode."); - let runner = attestation::AttesterRunner::new( - self.block_store.clone(), - self.batch_store.clone(), - attester, - net.batch_vote_publisher(), - self.attestation_status.subscribe(), - self.config.batch_poll_interval, - ); - s.spawn(async { - runner.run(ctx).await?; - Ok(()) - }); - } else { - tracing::info!("Running the node in non-attester mode."); - } - if let Some(debug_config) = self.config.debug_page { s.spawn(async { http::DebugPageServer::new(debug_config, net) diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 50ba7297..4e88f5ef 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -1,10 +1,9 @@ //! High-level tests for `Executor`. use super::*; -use attestation::{AttestationStatusClient, AttestationStatusRunner}; use rand::Rng as _; -use std::sync::{atomic::AtomicU64, Mutex}; +//use std::sync::{atomic::AtomicU64, Mutex}; use tracing::Instrument as _; -use zksync_concurrency::{sync, testonly::abort_on_panic}; +use zksync_concurrency::testonly::abort_on_panic; use zksync_consensus_bft as bft; use zksync_consensus_network::testonly::{new_configs, new_fullnode}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; @@ -31,8 +30,8 @@ fn config(cfg: &network::Config) -> Config { /// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch] /// that will never be updated. -fn never_attest(genesis: &validator::Genesis) -> Arc { - Arc::new(AttestationStatusWatch::new(genesis.hash())) +fn never_attest() -> Arc { + attestation::Controller::new(None).into() } fn validator( @@ -41,7 +40,6 @@ fn validator( batch_store: Arc, replica_store: impl ReplicaStore, ) -> Executor { - let attestation_status = never_attest(block_store.genesis()); Executor { config: config(cfg), block_store, @@ -51,8 +49,7 @@ fn validator( replica_store: Box::new(replica_store), payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), - attester: None, - attestation_status, + attestation: never_attest(), } } @@ -61,14 +58,12 @@ fn fullnode( block_store: Arc, batch_store: Arc, ) -> Executor { - let attestation_status = never_attest(block_store.genesis()); Executor { config: config(cfg), block_store, batch_store, validator: None, - attester: None, - attestation_status, + attestation: never_attest(), } } @@ -316,88 +311,3 @@ async fn test_validator_syncing_from_fullnode() { .await .unwrap(); } - -/// Test that the AttestationStatusRunner initialises and then polls the status. -#[tokio::test] -async fn test_attestation_status_runner() { - abort_on_panic(); - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(5)); - let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); - let rng = &mut ctx.rng(); - - let genesis: attester::GenesisHash = rng.gen(); - - #[derive(Clone)] - struct MockAttestationStatus { - genesis: Arc>, - batch_number: Arc, - } - - #[async_trait::async_trait] - impl AttestationStatusClient for MockAttestationStatus { - async fn attestation_status( - &self, - _ctx: &ctx::Ctx, - ) -> ctx::Result> { - let curr = self - .batch_number - .fetch_add(1u64, std::sync::atomic::Ordering::Relaxed); - if curr == 0 { - // Return None initially to see that the runner will deal with it. - Ok(None) - } else { - // The first actual result will be 1 on the 2nd poll. - let genesis = *self.genesis.lock().unwrap(); - let next_batch_to_attest = attester::BatchNumber(curr); - Ok(Some((genesis, next_batch_to_attest))) - } - } - } - - let res = scope::run!(ctx, |ctx, s| async { - let client = MockAttestationStatus { - genesis: Arc::new(Mutex::new(genesis)), - batch_number: Arc::new(AtomicU64::default()), - }; - let (status, runner) = AttestationStatusRunner::init( - ctx, - Box::new(client.clone()), - time::Duration::milliseconds(100), - genesis, - ) - .await - .unwrap(); - - let mut recv_status = status.subscribe(); - recv_status.mark_changed(); - - // Check that the value has *not* been initialised to a non-default value. - { - let status = sync::changed(ctx, &mut recv_status).await?; - assert!(status.next_batch_to_attest.is_none()); - } - // Now start polling for new values. Starting in the foreground because we want it to fail in the end. - s.spawn(runner.run(ctx)); - // Check that polling sets the value. - { - let status = sync::changed(ctx, &mut recv_status).await?; - assert!(status.next_batch_to_attest.is_some()); - assert_eq!(status.next_batch_to_attest.unwrap().0, 1); - } - // Change the genesis returned by the client. It should cause the scope to fail. - { - let mut genesis = client.genesis.lock().unwrap(); - *genesis = rng.gen(); - } - Ok(()) - }) - .await; - - match res { - Ok(()) => panic!("expected to fail when the genesis changed"), - Err(e) => assert!( - e.to_string().contains("genesis changed"), - "only expect failures due to genesis change; got: {e}" - ), - } -} diff --git a/node/actors/network/src/config.rs b/node/actors/network/src/config.rs index 2d47cf80..03e5020b 100644 --- a/node/actors/network/src/config.rs +++ b/node/actors/network/src/config.rs @@ -1,7 +1,7 @@ //! Network actor configs. use std::collections::{HashMap, HashSet}; use zksync_concurrency::{limiter, net, time}; -use zksync_consensus_roles::{attester, node, validator}; +use zksync_consensus_roles::{node, validator}; /// How often we should retry to establish a connection to a validator. /// TODO(gprusak): once it becomes relevant, choose a more appropriate retry strategy. @@ -26,7 +26,7 @@ pub struct RpcConfig { pub get_batch_timeout: Option, /// Max rate of sending/receiving consensus messages. pub consensus_rate: limiter::Rate, - /// Max rate of sending/receiving l1 batch votes messages. + /// Max rate of sending/receiving PushBatchVotes RPCs. pub push_batch_votes_rate: limiter::Rate, } @@ -98,9 +98,6 @@ pub struct Config { /// Private key of the validator. /// None if the node is NOT a validator. pub validator_key: Option, - /// Private key of the attester. - /// None if the node is NOT a attester. - pub attester_key: Option, /// Maximal size of the proto-encoded `validator::FinalBlock` in bytes. pub max_block_size: usize, /// Maximal size of the proto-encoded `attester::SyncBatch` in bytes. diff --git a/node/actors/network/src/gossip/attestation/metrics.rs b/node/actors/network/src/gossip/attestation/metrics.rs new file mode 100644 index 00000000..4e5137da --- /dev/null +++ b/node/actors/network/src/gossip/attestation/metrics.rs @@ -0,0 +1,19 @@ +/// Metrics related to the gossiping of L1 batch votes. +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "network_gossip_attestation")] +pub(crate) struct Metrics { + /// Batch to be attested. + pub(crate) batch_number: vise::Gauge, + + /// Number of members in the attester committee. + pub(crate) committee_size: vise::Gauge, + + /// Number of votes collected for the current batch. + pub(crate) votes_collected: vise::Gauge, + + /// Weight percentage (in range [0,1]) of votes collected for the current batch. + pub(crate) weight_collected: vise::Gauge, +} + +#[vise::register] +pub(super) static METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/network/src/gossip/attestation/mod.rs b/node/actors/network/src/gossip/attestation/mod.rs new file mode 100644 index 00000000..6effd612 --- /dev/null +++ b/node/actors/network/src/gossip/attestation/mod.rs @@ -0,0 +1,368 @@ +//! Attestation. +use crate::watch::Watch; +use anyhow::Context as _; +use std::{collections::HashSet, fmt, sync::Arc}; +use zksync_concurrency::{ctx, sync}; +use zksync_consensus_roles::attester; + +mod metrics; +#[cfg(test)] +mod tests; + +/// Configuration of the attestation Controller. +/// It determines what should be attested and by whom. +#[derive(Debug, Clone, PartialEq)] +pub struct Info { + /// Batch to attest. + pub batch_to_attest: attester::Batch, + /// Committee that should attest the batch. + pub committee: Arc, +} + +// Internal attestation state: info and the set of votes collected so far. +#[derive(Clone)] +struct State { + info: Arc, + /// Votes collected so far. + votes: im::HashMap>>, + // Total weight of the votes collected. + total_weight: attester::Weight, +} + +/// Diff between 2 states. +pub(crate) struct Diff { + /// New votes. + pub(crate) votes: Vec>>, + /// New info, if changed. + pub(crate) info: Option>, +} + +impl Diff { + fn is_empty(&self) -> bool { + self.votes.is_empty() && self.info.is_none() + } +} + +impl State { + /// Returns a diff between `self` state and `old` state. + /// Diff contains votes which are present is `self`, but not in `old`. + fn diff(&self, old: &Option) -> Diff { + let Some(old) = old.as_ref() else { + return Diff { + info: Some(self.info.clone()), + votes: self.votes.values().cloned().collect(), + }; + }; + if self.info.batch_to_attest.number != old.info.batch_to_attest.number { + return Diff { + info: Some(self.info.clone()), + votes: self.votes.values().cloned().collect(), + }; + } + + Diff { + info: None, + votes: self + .votes + .iter() + .filter(|(k, _)| !old.votes.contains_key(k)) + .map(|(_, v)| v.clone()) + .collect(), + } + } + + /// Verifies and adds a vote. + /// Noop if vote is not signed by a committee member or already inserted. + /// Returns an error if genesis doesn't match or the signature is invalid. + fn insert_vote(&mut self, vote: Arc>) -> anyhow::Result<()> { + anyhow::ensure!( + vote.msg.genesis == self.info.batch_to_attest.genesis, + "Genesis mismatch" + ); + if vote.msg.number != self.info.batch_to_attest.number { + return Ok(()); + } + anyhow::ensure!( + vote.msg.hash == self.info.batch_to_attest.hash, + "batch hash mismatch" + ); + let Some(weight) = self.info.committee.weight(&vote.key) else { + anyhow::bail!( + "received vote signed by an inactive attester: {:?}", + vote.key + ); + }; + if self.votes.contains_key(&vote.key) { + return Ok(()); + } + // Verify signature only after checking all the other preconditions. + vote.verify().context("verify")?; + tracing::info!("collected vote with weight {weight} from {:?}", vote.key); + self.votes.insert(vote.key.clone(), vote); + self.total_weight += weight; + Ok(()) + } + + fn insert_votes( + &mut self, + votes: impl Iterator>>, + ) -> anyhow::Result<()> { + let mut done = HashSet::new(); + for vote in votes { + // Disallow multiple entries for the same key: + // it is important because a malicious attester may spam us with + // new versions and verifying signatures is expensive. + if done.contains(&vote.key) { + anyhow::bail!("duplicate entry for {:?}", vote.key); + } + done.insert(vote.key.clone()); + self.insert_vote(vote)?; + } + Ok(()) + } + + fn cert(&self) -> Option { + if self.total_weight < self.info.committee.threshold() { + return None; + } + let mut sigs = attester::MultiSig::default(); + for vote in self.votes.values() { + sigs.add(vote.key.clone(), vote.sig.clone()); + } + Some(attester::BatchQC { + message: self.info.batch_to_attest.clone(), + signatures: sigs, + }) + } +} + +/// Receiver of state diffs. +pub(crate) struct DiffReceiver { + prev: Option, + recv: sync::watch::Receiver>, +} + +impl DiffReceiver { + /// Waits for the next state diff. + pub(crate) async fn wait_for_diff(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled { + loop { + let Some(new) = (*sync::changed(ctx, &mut self.recv).await?).clone() else { + continue; + }; + let diff = new.diff(&self.prev); + self.prev = Some(new); + if !diff.is_empty() { + return Ok(diff); + } + } + } +} + +/// `Controller` manages the attestation state. +/// It maintains a set of votes matching the attestation info. +/// It allows for +/// * adding votes to the state +/// * subscribing to the vote set changes +/// * waiting for the certificate to be collected +/// +/// It also keeps an attester key used to sign the batch vote, +/// whenever it belongs the current attester committee. +/// Signing happens automatically whenever the committee is updated. +/// +/// Expected usage: +/// ``` +/// let ctrl = Arc::new(attestation::Controller::new(Some(key))); +/// // Check what is the number of the next batch to be attested in a +/// // global attestation registry (i.e. L1 chain state). +/// let first : attester::BatchNumber = ... +/// scope::run!(ctx, |ctx,s| async { +/// // Loop starting attestation whenever global attestation state progresses. +/// s.spawn(async { +/// let mut next = first; +/// loop { +/// // Based on the local storage, compute the next expected batch hash +/// // and the committee that should attest it. +/// ... +/// let info = attestation::Info { +/// batch_to_attest: attester::Batch { +/// number: next, +/// ... +/// }, +/// committee: ..., +/// }; +/// ctrl.start_attestation(Arc::new(info)).unwrap(); +/// // Wait for the attestation to progress, by observing the +/// // global attestation registry. +/// next = ...; +/// } +/// }); +/// s.spawn(async { +/// // Loop waiting for a certificate to be collected and submitting +/// // it to the global registry +/// loop { +/// let mut next = first; +/// if let Some(qc) = ctrl.wait_for_cert(ctx, next).await?; +/// // Submit the certificate to the global registry. +/// ... +/// next = next.next(); +/// } +/// }); +/// +/// // Make the executor establish the p2p network and +/// // collect the attestation votes. +/// executor::Executor { +/// ... +/// attestation: ctrl.clone(), +/// }.run(ctx).await; +/// } +/// ``` +pub struct Controller { + /// Key to automatically vote for batches. + /// None, if the current node is not an attester. + key: Option, + /// Internal state of the controller. + state: Watch>, +} + +impl fmt::Debug for Controller { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("StateWatch") + .field("key", &self.key) + .finish_non_exhaustive() + } +} + +impl Controller { + /// Constructs Controller. + /// `key` will be used for automatically signing votes. + pub fn new(key: Option) -> Self { + Self { + key, + state: Watch::new(None), + } + } + + /// Subscribes to state diffs. + pub(crate) fn subscribe(&self) -> DiffReceiver { + let mut recv = self.state.subscribe(); + recv.mark_changed(); + DiffReceiver { prev: None, recv } + } + + /// Inserts votes to the state. + /// Irrelevant votes are silently ignored. + /// Returns an error if an invalid vote has been found. + /// It is possible that some votes have been added to the state + /// even if eventually an error was returned. + pub(crate) async fn insert_votes( + &self, + votes: impl Iterator>>, + ) -> anyhow::Result<()> { + let locked = self.state.lock().await; + let Some(mut state) = locked.borrow().clone() else { + return Ok(()); + }; + let before = state.total_weight; + let res = state.insert_votes(votes); + if state.total_weight > before { + metrics::METRICS.votes_collected.set(state.votes.len()); + #[allow(clippy::float_arithmetic)] + metrics::METRICS + .weight_collected + .set(state.total_weight as f64 / state.info.committee.total_weight() as f64); + locked.send_replace(Some(state)); + } + res + } + + /// Returns votes matching the `want` batch. + pub(crate) fn votes( + &self, + want: &attester::Batch, + ) -> Vec>> { + let state = self.state.subscribe(); + let state = state.borrow(); + let Some(state) = &*state else { return vec![] }; + if &state.info.batch_to_attest != want { + return vec![]; + } + state.votes.values().cloned().collect() + } + + /// Waits for the certificate for a batch with the given number to be collected. + /// Returns None iff attestation already skipped to collecting certificate for some later batch. + pub async fn wait_for_cert( + &self, + ctx: &ctx::Ctx, + n: attester::BatchNumber, + ) -> ctx::OrCanceled> { + let recv = &mut self.state.subscribe(); + recv.mark_changed(); + loop { + let state = sync::changed(ctx, recv).await?; + let Some(state) = state.as_ref() else { + continue; + }; + if state.info.batch_to_attest.number < n { + continue; + }; + if state.info.batch_to_attest.number > n { + return Ok(None); + } + if let Some(qc) = state.cert() { + return Ok(Some(qc)); + } + } + } + + /// Updates the internal configuration to start collecting votes for a new batch. + /// Clears the votes collected for the previous info. + /// Batch number has to increase with each update. + #[tracing::instrument(name = "attestation::Controller::start_attestation", skip_all)] + pub async fn start_attestation(&self, info: Arc) -> anyhow::Result<()> { + let locked = self.state.lock().await; + let old = locked.borrow().clone(); + if let Some(old) = old.as_ref() { + if *old.info == *info { + return Ok(()); + } + anyhow::ensure!( + old.info.batch_to_attest.genesis == info.batch_to_attest.genesis, + "tried to change genesis" + ); + anyhow::ensure!( + old.info.batch_to_attest.number < info.batch_to_attest.number, + "tried to decrease batch number" + ); + } + tracing::info!( + "started collecting votes for batch {:?}", + info.batch_to_attest.number + ); + let mut new = State { + info, + votes: im::HashMap::new(), + total_weight: 0, + }; + if let Some(key) = self.key.as_ref() { + if new.info.committee.contains(&key.public()) { + let vote = key.sign_msg(new.info.batch_to_attest.clone()); + // This is our own vote, so it always should be valid. + new.insert_vote(Arc::new(vote)).unwrap(); + } + } + metrics::METRICS + .batch_number + .set(new.info.batch_to_attest.number.0); + metrics::METRICS + .committee_size + .set(new.info.committee.len()); + metrics::METRICS.votes_collected.set(new.votes.len()); + #[allow(clippy::float_arithmetic)] + metrics::METRICS + .weight_collected + .set(new.total_weight as f64 / new.info.committee.total_weight() as f64); + locked.send_replace(Some(new)); + Ok(()) + } +} diff --git a/node/actors/network/src/gossip/attestation/tests.rs b/node/actors/network/src/gossip/attestation/tests.rs new file mode 100644 index 00000000..2a5976b4 --- /dev/null +++ b/node/actors/network/src/gossip/attestation/tests.rs @@ -0,0 +1,200 @@ +use super::*; +use rand::{seq::SliceRandom as _, Rng as _}; +use zksync_concurrency::testonly::abort_on_panic; + +type Vote = Arc>; + +#[derive(Default, Debug, PartialEq)] +struct Votes(im::HashMap); + +impl> From for Votes { + fn from(x: T) -> Self { + Self(x.into_iter().map(|v| (v.key.clone(), v)).collect()) + } +} + +#[tokio::test] +async fn test_insert_votes() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + + let ctrl = Controller::new(None); + let genesis: attester::GenesisHash = rng.gen(); + let first: attester::BatchNumber = rng.gen(); + for i in 0..3 { + tracing::info!("iteration {i}"); + let keys: Vec = (0..8).map(|_| rng.gen()).collect(); + let info = Arc::new(Info { + batch_to_attest: attester::Batch { + genesis, + number: first + i, + hash: rng.gen(), + }, + committee: attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { + key: k.public(), + weight: 1250, + })) + .unwrap() + .into(), + }); + let ctrl_votes = || Votes::from(ctrl.votes(&info.batch_to_attest)); + ctrl.start_attestation(info.clone()).await.unwrap(); + assert_eq!(Votes::from([]), ctrl_votes()); + let mut recv = ctrl.subscribe(); + let diff = recv.wait_for_diff(ctx).await.unwrap(); + assert_eq!(diff.info.as_ref(), Some(&info)); + assert_eq!(Votes::default(), diff.votes.into()); + + let all_votes: Vec = keys + .iter() + .map(|k| k.sign_msg(info.batch_to_attest.clone()).into()) + .collect(); + + tracing::info!("Initial votes."); + ctrl.insert_votes(all_votes[0..3].iter().cloned()) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes[0..3].iter().cloned()), ctrl_votes()); + let diff = recv.wait_for_diff(ctx).await.unwrap(); + assert!(diff.info.is_none()); + assert_eq!( + Votes::from(all_votes[0..3].iter().cloned()), + diff.votes.into() + ); + + tracing::info!("Adding votes gradually."); + ctrl.insert_votes(all_votes[3..5].iter().cloned()) + .await + .unwrap(); + ctrl.insert_votes(all_votes[5..7].iter().cloned()) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + let diff = recv.wait_for_diff(ctx).await.unwrap(); + assert!(diff.info.is_none()); + assert_eq!( + Votes::from(all_votes[3..7].iter().cloned()), + diff.votes.into() + ); + + tracing::info!("Readding already inserded votes (noop)."); + ctrl.insert_votes(all_votes[2..6].iter().cloned()) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + + tracing::info!("Adding votes out of committee (error)."); + assert!(ctrl + .insert_votes((0..3).map(|_| { + let k: attester::SecretKey = rng.gen(); + k.sign_msg(info.batch_to_attest.clone()).into() + })) + .await + .is_err()); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + + tracing::info!("Adding votes for different batch (noop)."); + ctrl.insert_votes((0..3).map(|_| { + let k: attester::SecretKey = rng.gen(); + k.sign_msg(attester::Batch { + genesis: info.batch_to_attest.genesis, + number: rng.gen(), + hash: rng.gen(), + }) + .into() + })) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + + tracing::info!("Adding incorrect votes (error)."); + let mut bad_vote = (*all_votes[7]).clone(); + bad_vote.sig = rng.gen(); + assert!(ctrl + .insert_votes([bad_vote.into()].into_iter()) + .await + .is_err()); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + + tracing::info!("Add the last vote mixed with already added votes."); + ctrl.insert_votes(all_votes[5..].iter().cloned()) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes.clone()), ctrl_votes()); + let diff = recv.wait_for_diff(ctx).await.unwrap(); + assert!(diff.info.is_none()); + assert_eq!( + Votes::from(all_votes[7..].iter().cloned()), + diff.votes.into() + ); + } +} + +#[tokio::test] +async fn test_wait_for_cert() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + + let ctrl = Controller::new(None); + let genesis: attester::GenesisHash = rng.gen(); + let first: attester::BatchNumber = rng.gen(); + + for i in 0..10 { + tracing::info!("iteration {i}"); + let committee_size = rng.gen_range(1..20); + let keys: Vec = (0..committee_size).map(|_| rng.gen()).collect(); + let info = Arc::new(Info { + batch_to_attest: attester::Batch { + genesis, + number: first + i, + hash: rng.gen(), + }, + committee: attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { + key: k.public(), + weight: rng.gen_range(1..=100), + })) + .unwrap() + .into(), + }); + let mut all_votes: Vec = keys + .iter() + .map(|k| k.sign_msg(info.batch_to_attest.clone()).into()) + .collect(); + all_votes.shuffle(rng); + ctrl.start_attestation(info.clone()).await.unwrap(); + loop { + let end = rng.gen_range(0..=committee_size); + tracing::info!("end = {end}"); + ctrl.insert_votes(all_votes[..end].iter().cloned()) + .await + .unwrap(); + // Waiting for the previous qc should immediately return None. + assert_eq!( + None, + ctrl.wait_for_cert(ctx, info.batch_to_attest.number.prev().unwrap()) + .await + .unwrap() + ); + if info + .committee + .weight_of_keys(all_votes[..end].iter().map(|v| &v.key)) + >= info.committee.threshold() + { + let qc = ctrl + .wait_for_cert(ctx, info.batch_to_attest.number) + .await + .unwrap() + .unwrap(); + assert_eq!(qc.message, info.batch_to_attest); + qc.verify(genesis, &info.committee).unwrap(); + break; + } + assert_eq!( + None, + ctrl.state.subscribe().borrow().as_ref().unwrap().cert() + ); + } + } +} diff --git a/node/actors/network/src/gossip/attestation_status.rs b/node/actors/network/src/gossip/attestation_status.rs deleted file mode 100644 index b10b0208..00000000 --- a/node/actors/network/src/gossip/attestation_status.rs +++ /dev/null @@ -1,93 +0,0 @@ -use crate::watch::Watch; -use std::fmt; -use zksync_concurrency::sync; -use zksync_consensus_roles::attester; - -/// Coordinate the attestation by showing the status as seen by the main node. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct AttestationStatus { - /// Next batch number where voting is expected. - /// - /// The field is optional so that we can start an external node without the main node API - /// already deployed, which is how the initial rollout is. - pub next_batch_to_attest: Option, - /// The hash of the genesis of the chain to which the L1 batches belong. - /// - /// We don't expect to handle a regenesis on the fly without restarting the - /// node, so this value is not expected to change; it's here only to stop - /// any attempt at updating the status with a batch number that refers - /// to a different fork. - pub genesis: attester::GenesisHash, -} - -/// The subscription over the attestation status which voters can monitor for change. -pub type AttestationStatusReceiver = sync::watch::Receiver; - -/// A [Watch] over an [AttestationStatus] which we can use to notify components about -/// changes in the batch number the main node expects attesters to vote on. -pub struct AttestationStatusWatch(Watch); - -impl fmt::Debug for AttestationStatusWatch { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("AttestationStatusWatch") - .finish_non_exhaustive() - } -} - -impl AttestationStatusWatch { - /// Create a new watch with the current genesis, and a yet-to-be-determined batch number. - pub fn new(genesis: attester::GenesisHash) -> Self { - Self(Watch::new(AttestationStatus { - genesis, - next_batch_to_attest: None, - })) - } - - /// Subscribes to AttestationStatus updates. - pub fn subscribe(&self) -> AttestationStatusReceiver { - self.0.subscribe() - } - - /// Set the next batch number to attest on and notify subscribers it changed. - /// - /// Fails if the genesis we want to update to is not the same as the watch was started with, - /// because the rest of the system is not expected to be able to handle reorgs without a - /// restart of the node. - pub async fn update( - &self, - genesis: attester::GenesisHash, - next_batch_to_attest: attester::BatchNumber, - ) -> anyhow::Result<()> { - let this = self.0.lock().await; - { - let status = this.borrow(); - anyhow::ensure!( - status.genesis == genesis, - "the attestation status genesis changed: {:?} -> {:?}", - status.genesis, - genesis - ); - // The next batch to attest moving backwards could cause the voting process - // to get stuck due to the way gossiping works and the BatchVotes discards - // votes below the expected minimum: even if we clear the votes, we might - // not get them again from any peer. By returning an error we can cause - // the node to be restarted and connections re-established for fresh gossip. - if let Some(old_batch_to_attest) = status.next_batch_to_attest { - anyhow::ensure!( - old_batch_to_attest <= next_batch_to_attest, - "next batch to attest moved backwards: {} -> {}", - old_batch_to_attest, - next_batch_to_attest - ); - } - } - this.send_if_modified(|status| { - if status.next_batch_to_attest == Some(next_batch_to_attest) { - return false; - } - status.next_batch_to_attest = Some(next_batch_to_attest); - true - }); - Ok(()) - } -} diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs deleted file mode 100644 index b48fb205..00000000 --- a/node/actors/network/src/gossip/batch_votes.rs +++ /dev/null @@ -1,333 +0,0 @@ -//! Global state distributed by active attesters, observed by all the nodes in the network. -use super::metrics; -use crate::watch::Watch; -use std::{collections::HashSet, fmt, sync::Arc}; -use zksync_concurrency::sync; -use zksync_consensus_roles::attester; - -#[derive(Debug, Default)] -pub(super) struct BatchUpdateStats { - num_added: usize, - weight_added: u64, - last_added: Option, -} - -impl BatchUpdateStats { - fn added(&mut self, number: attester::BatchNumber, weight: u64) { - self.num_added += 1; - self.weight_added += weight; - self.last_added = Some(number); - } -} - -/// Represents the current state of node's knowledge about the attester votes. -/// -/// Eventually this data structure will have to track voting potentially happening -/// simultaneously on multiple heights, if we decrease the batch interval to be -/// several seconds, instead of a minute. By that point, the replicas should be -/// following the main node (or L1) to know what is the highest finalized batch, -/// which will act as a floor to the batch number we have to track here. It will -/// also help to protect ourselves from DoS attacks by malicious attesters casting -/// votes far into the future. -/// -/// For now, however, we just want a best effort where if we find a quorum, we -/// save it to the database, if not, we move on. For that, a simple protection -/// mechanism is to only allow one active vote per attester, which means any -/// previous vote can be removed when a new one is added. -#[derive(Clone, Default, PartialEq, Eq)] -pub(crate) struct BatchVotes { - /// The latest vote received from each attester. We only keep the last one - /// for now, hoping that with 1 minute batches there's plenty of time for - /// the quorum to be reached, but eventually we might have to allow multiple - /// votes across different heights. - pub(crate) votes: im::HashMap>>, - - /// Total weight of votes at different heights and hashes. - /// - /// We will be looking for any hash that reaches a quorum threshold at any of the heights. - /// At that point we can remove all earlier heights, considering it final. In the future - /// we can instead keep heights until they are observed on the main node (or L1). - pub(crate) support: - im::OrdMap>, - - /// The minimum batch number for which we are still interested in votes. - /// - /// Because we only store 1 vote per attester the memory is very much bounded, - /// but this extra pruning mechanism can be used to clear votes of attesters - /// who have been removed from the committee, as well as to get rid of the - /// last quorum we found and stored, and look for the a new one in the next round. - pub(crate) min_batch_number: attester::BatchNumber, -} - -impl BatchVotes { - /// Returns a set of votes of `self` which are newer than the entries in `b`. - pub(super) fn get_newer(&self, b: &Self) -> Vec>> { - let mut newer = vec![]; - for (k, v) in &self.votes { - if let Some(bv) = b.votes.get(k) { - if v.msg <= bv.msg { - continue; - } - } - newer.push(v.clone()); - } - newer - } - - /// Updates the discovery map with entries from `data`. - /// It exits as soon as an invalid entry is found. - /// `self` might get modified even if an error is returned - /// (all entries verified so far are added). - /// - /// Returns statistics about new entries added. - /// - /// For now it doesn't return an error if a vote with an invalid signature - /// is encountered, so that the node doesn't disconnect from peer if it - /// happens to have a new field in `Batch`. This is only until the feature - /// is stabilized. - pub(super) fn update( - &mut self, - attesters: &attester::Committee, - genesis: &attester::GenesisHash, - data: &[Arc>], - ) -> anyhow::Result { - let mut stats = BatchUpdateStats::default(); - - let mut done = HashSet::new(); - for d in data { - // Disallow multiple entries for the same key: - // it is important because a malicious attester may spam us with - // new versions and verifying signatures is expensive. - if done.contains(&d.key) { - anyhow::bail!("duplicate entry for {:?}", d.key); - } - done.insert(d.key.clone()); - - // Disallow votes from different genesis. It might indicate a reorg, - // in which case either this node or the remote peer has to be restarted. - anyhow::ensure!( - d.msg.genesis == *genesis, - "vote for batch with different genesis hash: {:?}", - d.msg.genesis - ); - - if d.msg.number < self.min_batch_number { - continue; - } - - let Some(weight) = attesters.weight(&d.key) else { - // We just skip the entries we are not interested in. - // For now the set of attesters is static, so we could treat this as an error, - // however we eventually want the attester set to be dynamic. - continue; - }; - - // If we already have a newer vote for this key, we can ignore this one. - if let Some(x) = self.votes.get(&d.key) { - if d.msg <= x.msg { - continue; - } - } - - // Check the signature before insertion. - if let Err(e) = d.verify() { - tracing::error!(error =? e, "failed to verify batch vote: {e:#}"); - } else { - self.add(d.clone(), weight); - stats.added(d.msg.number, weight); - } - } - - Ok(stats) - } - - /// Check if we have achieved quorum for any of the batch hashes. - /// - /// Returns the first quorum it finds, after which we expect that the state of the main node or L1 - /// will indicate that attestation on the next height can happen, which will either naturally move - /// the QC, or we can do so by increasing the `min_batch_number`. - /// - /// While we only store 1 vote per attester we'll only ever have at most one quorum anyway. - pub(super) fn find_quorum( - &self, - attesters: &attester::Committee, - genesis: &attester::GenesisHash, - ) -> Option { - let threshold = attesters.threshold(); - self.support - .iter() - .flat_map(|(number, candidates)| { - candidates - .iter() - .filter(|(_, weight)| **weight >= threshold) - .map(|(hash, _)| { - let sigs = self - .votes - .values() - .filter(|vote| vote.msg.hash == *hash) - .map(|vote| (vote.key.clone(), vote.sig.clone())) - .fold(attester::MultiSig::default(), |mut sigs, (key, sig)| { - sigs.add(key, sig); - sigs - }); - attester::BatchQC { - message: attester::Batch { - number: *number, - hash: *hash, - // This was checked during insertion; we could look up the first in `votes` - genesis: *genesis, - }, - signatures: sigs, - } - }) - }) - .next() - } - - /// Set the minimum batch number for which we admit votes. - /// - /// Discards data about earlier heights. - pub(super) fn set_min_batch_number(&mut self, min_batch_number: attester::BatchNumber) { - self.min_batch_number = min_batch_number; - self.votes.retain(|_, v| v.msg.number >= min_batch_number); - if let Some(prev) = min_batch_number.prev() { - self.support = self.support.split(&prev).1; - } - } - - /// Add an already validated vote from an attester into the register. - fn add(&mut self, vote: Arc>, weight: attester::Weight) { - self.remove(&vote.key, weight); - - let batch = self.support.entry(vote.msg.number).or_default(); - let support = batch.entry(vote.msg.hash).or_default(); - *support = support.saturating_add(weight); - - self.votes.insert(vote.key.clone(), vote); - } - - /// Remove any existing vote. - /// - /// This is for DoS protection, until we have better control over the acceptable vote range. - fn remove(&mut self, key: &attester::PublicKey, weight: attester::Weight) { - let Some(vote) = self.votes.remove(key) else { - return; - }; - - let batch = self.support.entry(vote.msg.number).or_default(); - let support = batch.entry(vote.msg.hash).or_default(); - *support = support.saturating_sub(weight); - - if *support == 0u64 { - batch.remove(&vote.msg.hash); - } - - if batch.is_empty() { - self.support.remove(&vote.msg.number); - } - } -} - -/// Watch wrapper of BatchVotes, -/// which supports subscribing to BatchVotes updates. -pub(crate) struct BatchVotesWatch(Watch); - -impl Default for BatchVotesWatch { - fn default() -> Self { - Self(Watch::new(BatchVotes::default())) - } -} - -impl BatchVotesWatch { - /// Subscribes to BatchVotes updates. - pub(crate) fn subscribe(&self) -> sync::watch::Receiver { - self.0.subscribe() - } - - /// Inserts data to BatchVotes. - /// Subscribers are notified iff at least 1 new entry has - /// been inserted. Returns an error iff an invalid - /// entry in `data` has been found. The provider of the - /// invalid entry should be banned. - pub(crate) async fn update( - &self, - attesters: &attester::Committee, - genesis: &attester::GenesisHash, - data: &[Arc>], - ) -> anyhow::Result<()> { - let this = self.0.lock().await; - let mut votes = this.borrow().clone(); - let stats = votes.update(attesters, genesis, data)?; - - if let Some(last_added) = stats.last_added { - this.send_replace(votes); - - #[allow(clippy::float_arithmetic)] - let weight_added = stats.weight_added as f64 / attesters.total_weight() as f64; - - metrics::BATCH_VOTES_METRICS - .last_added_vote_batch_number - .set(last_added.0); - - metrics::BATCH_VOTES_METRICS - .votes_added - .inc_by(stats.num_added as u64); - - metrics::BATCH_VOTES_METRICS - .weight_added - .inc_by(weight_added); - } - - metrics::BATCH_VOTES_METRICS - .committee_size - .set(attesters.len()); - - Ok(()) - } - - /// Set the minimum batch number on the votes and discard old data. - #[tracing::instrument(skip_all, fields(%min_batch_number))] - pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) { - let this = self.0.lock().await; - this.send_modify(|votes| votes.set_min_batch_number(min_batch_number)); - - metrics::BATCH_VOTES_METRICS - .min_batch_number - .set(min_batch_number.0); - } -} - -/// Wrapper around [BatchVotesWatch] to publish votes over batches signed by an attester key. -pub struct BatchVotesPublisher(pub(crate) Arc); - -impl fmt::Debug for BatchVotesPublisher { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("BatchVotesPublisher") - .finish_non_exhaustive() - } -} - -impl BatchVotesPublisher { - /// Sign an L1 batch and push it into the batch, which should cause it to be gossiped by the network. - #[tracing::instrument(skip_all, fields(l1_batch = %batch.number))] - pub async fn publish( - &self, - attesters: &attester::Committee, - genesis: &attester::GenesisHash, - attester: &attester::SecretKey, - batch: attester::Batch, - ) -> anyhow::Result<()> { - if !attesters.contains(&attester.public()) { - return Ok(()); - } - let attestation = attester.sign_msg(batch); - - metrics::BATCH_VOTES_METRICS - .last_signed_batch_number - .set(attestation.msg.number.0); - - self.0 - .update(attesters, genesis, &[Arc::new(attestation)]) - .await - } -} diff --git a/node/actors/network/src/gossip/metrics.rs b/node/actors/network/src/gossip/metrics.rs deleted file mode 100644 index d9df9d0a..00000000 --- a/node/actors/network/src/gossip/metrics.rs +++ /dev/null @@ -1,37 +0,0 @@ -/// Metrics related to the gossiping of L1 batch votes. -#[derive(Debug, vise::Metrics)] -#[metrics(prefix = "network_gossip_batch_votes")] -pub(crate) struct BatchVotesMetrics { - /// Number of members in the attester committee. - pub(crate) committee_size: vise::Gauge, - - /// Number of votes added to the tally. - /// - /// Its rate of change should correlate with the attester committee size, - /// save for any new joiner casting their historic votes in a burst. - pub(crate) votes_added: vise::Counter, - - /// Weight of votes added to the tally normalized by the total committee weight. - /// - /// Its rate of change should correlate with the attester committee weight and batch production rate, - /// that is, it should go up up by 1.0 with each new batch if everyone attests. - pub(crate) weight_added: vise::Counter, - - /// The minimum batch number we still expect votes for. - /// - /// This should go up as the main node indicates the finalisation of batches, - /// or as soon as batch QCs are found and persisted. - pub(crate) min_batch_number: vise::Gauge, - - /// Batch number in the last vote added to the register. - /// - /// This should go up as L1 batches are created, save for any temporary - /// outlier from lagging attesters or ones sending votes far in the future. - pub(crate) last_added_vote_batch_number: vise::Gauge, - - /// Batch number of the last batch signed by this attester. - pub(crate) last_signed_batch_number: vise::Gauge, -} - -#[vise::register] -pub(super) static BATCH_VOTES_METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 7d22062d..52512425 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -13,26 +13,19 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). -use self::batch_votes::BatchVotesWatch; -pub use self::{ - attestation_status::{AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch}, - batch_votes::BatchVotesPublisher, -}; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; use tracing::Instrument; pub(crate) use validator_addrs::*; -use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; +use zksync_concurrency::{ctx, ctx::channel, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; -mod attestation_status; -mod batch_votes; +pub mod attestation; mod fetch; mod handshake; pub mod loadtest; -mod metrics; mod runner; #[cfg(test)] mod testonly; @@ -50,8 +43,6 @@ pub(crate) struct Network { pub(crate) outbound: PoolWatch>, /// Current state of knowledge about validators' endpoints. pub(crate) validator_addrs: ValidatorAddrsWatch, - /// Current state of knowledge about batch votes. - pub(crate) batch_votes: Arc, /// Block store to serve `get_block` requests from. pub(crate) block_store: Arc, /// Batch store to serve `get_batch` requests from. @@ -64,8 +55,10 @@ pub(crate) struct Network { pub(crate) fetch_queue: fetch::Queue, /// TESTONLY: how many time push_validator_addrs rpc was called by the peers. pub(crate) push_validator_addrs_calls: AtomicUsize, - /// Shared watch over the current attestation status as indicated by the main node. - pub(crate) attestation_status: Arc, + /// Attestation controller, maintaining a set of batch votes. + /// Gossip network exchanges the votes with peers. + /// The batch for which the votes are collected is configured externally. + pub(crate) attestation: Arc, } impl Network { @@ -75,7 +68,7 @@ impl Network { block_store: Arc, batch_store: Arc, sender: channel::UnboundedSender, - attestation_status: Arc, + attestation: Arc, ) -> Arc { Arc::new(Self { sender, @@ -85,13 +78,12 @@ impl Network { ), outbound: PoolWatch::new(cfg.gossip.static_outbound.keys().cloned().collect(), 0), validator_addrs: ValidatorAddrsWatch::default(), - batch_votes: Arc::new(BatchVotesWatch::default()), cfg, fetch_queue: fetch::Queue::default(), block_store, batch_store, push_validator_addrs_calls: 0.into(), - attestation_status, + attestation, }) } @@ -173,57 +165,4 @@ impl Network { }) .await; } - - /// Task that reacts to new votes being added and looks for an L1 batch QC. - /// It persists the certificate once the quorum threshold is passed. - pub(crate) async fn run_batch_qc_finder(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { - let Some(attesters) = self.genesis().attesters.as_ref() else { - tracing::info!("no attesters in genesis, not looking for batch QCs"); - return Ok(()); - }; - let genesis = self.genesis().hash(); - - let mut recv_votes = self.batch_votes.subscribe(); - let mut recv_status = self.attestation_status.subscribe(); - - // Subscribe starts as seen but we don't want to miss the first item. - recv_status.mark_changed(); - - loop { - async { - // Wait until the status indicates that we're ready to sign the next batch. - let Some(batch_number) = sync::changed(ctx, &mut recv_status) - .instrument(tracing::info_span!("wait_for_attestation_status")) - .await? - .next_batch_to_attest - else { - return Ok(()); - }; - - // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. - self.batch_votes.set_min_batch_number(batch_number).await; - - // Now wait until we find the next quorum, whatever it is: - // * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps - // * on external nodes the votes might be affected by changes in the value returned by the API, and there might be gaps - // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters - // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. - // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. - let qc = sync::wait_for_some(ctx, &mut recv_votes, |votes| { - votes.find_quorum(attesters, &genesis) - }) - .instrument(tracing::info_span!("wait_for_quorum")) - .await?; - - self.batch_store - .persist_batch_qc(ctx, qc) - .await - .wrap("persist_batch_qc")?; - - ctx::Ok(()) - } - .instrument(tracing::info_span!("new_votes_iter")) - .await?; - } - } } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index effd0466..23ccf009 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -1,4 +1,4 @@ -use super::{batch_votes::BatchVotes, handshake, Network, ValidatorAddrs}; +use super::{handshake, Network, ValidatorAddrs}; use crate::{noise, preface, rpc}; use anyhow::Context as _; use async_trait::async_trait; @@ -13,7 +13,6 @@ use zksync_protobuf::kB; struct PushServer<'a> { blocks: sync::watch::Sender, batches: sync::watch::Sender, - /// The network is required for the verification of messages. net: &'a Network, } @@ -58,21 +57,34 @@ impl rpc::Handler for &PushServer<'_> { #[async_trait::async_trait] impl rpc::Handler for &PushServer<'_> { - /// Here we bound the buffering of incoming batch messages. fn max_req_size(&self) -> usize { 100 * kB } - async fn handle(&self, _ctx: &ctx::Ctx, req: rpc::push_batch_votes::Req) -> anyhow::Result<()> { - self.net - .batch_votes - .update( - self.net.genesis().attesters.as_ref().context("attesters")?, - &self.net.genesis().hash(), - &req.0, - ) - .await?; - Ok(()) + async fn handle( + &self, + _ctx: &ctx::Ctx, + req: rpc::push_batch_votes::Req, + ) -> anyhow::Result { + if let Err(err) = self + .net + .attestation + .insert_votes(req.votes.into_iter()) + .await + .context("insert_votes()") + { + // Attestation feature is still evolving, so for forward + // compatibility we just ignore any invalid data. + // Once stabilized we will drop the connection instead of + // logging the error. + tracing::warn!("{err:#}"); + } + Ok(rpc::push_batch_votes::Resp { + votes: match req.want_votes_for.as_ref() { + Some(batch) => self.net.attestation.votes(batch), + None => vec![], + }, + }) } } @@ -156,6 +168,8 @@ impl Network { rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate); let get_batch_client = rpc::Client::::new(ctx, self.cfg.rpc.get_batch_rate); + let push_batch_votes_client = + rpc::Client::::new(ctx, self.cfg.rpc.push_batch_votes_rate); scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() @@ -181,42 +195,51 @@ impl Network { .add_server(ctx, &*self.block_store, self.cfg.rpc.get_block_rate) .add_client(&get_batch_client) .add_server(ctx, &*self.batch_store, self.cfg.rpc.get_batch_rate) - .add_server(ctx, rpc::ping::Server, rpc::ping::RATE); - - // If there is an attester committee then - if self.genesis().attesters.as_ref().is_some() { - let push_batch_votes_client = rpc::Client::::new( + .add_server(ctx, rpc::ping::Server, rpc::ping::RATE) + .add_client(&push_batch_votes_client) + .add_server::( ctx, + &push_server, self.cfg.rpc.push_batch_votes_rate, ); - service = service - .add_client(&push_batch_votes_client) - .add_server::( - ctx, - &push_server, - self.cfg.rpc.push_batch_votes_rate, - ); - // Push L1 batch votes updates to peer. - s.spawn::<()>(async { - let push_batch_votes_client = push_batch_votes_client; - // Snapshot of the batches when we last pushed to the peer. - let mut old = BatchVotes::default(); - // Subscribe to what we know about the state of the whole network. - let mut sub = self.batch_votes.subscribe(); - sub.mark_changed(); - loop { - let new = sync::changed(ctx, &mut sub).await?.clone(); - // Get the *new* votes, which haven't been pushed before. - let diff = new.get_newer(&old); - if diff.is_empty() { - continue; + + // Push L1 batch votes updates to peer. + s.spawn::<()>(async { + let push_batch_votes_client = push_batch_votes_client; + // Subscribe to what we know about the state of the whole network. + let mut recv = self.attestation.subscribe(); + loop { + let diff = recv.wait_for_diff(ctx).await?; + let req = rpc::push_batch_votes::Req { + // If the info has changed, we need to re-request all the votes + // from peer that we might have ignored earlier. + want_votes_for: diff.info.as_ref().map(|c| c.batch_to_attest.clone()), + votes: diff.votes, + }; + // NOTE: The response should be non-empty only iff we requested a snapshot. + // Therefore, if we needed we could restrict the response size to ~1kB in + // such a case. + let resp = push_batch_votes_client.call(ctx, &req, 100 * kB).await?; + if !resp.votes.is_empty() { + anyhow::ensure!( + req.want_votes_for.is_some(), + "expected empty response, but votes were returned" + ); + if let Err(err) = self + .attestation + .insert_votes(resp.votes.into_iter()) + .await + .context("insert_votes") + { + // Attestation feature is still evolving, so for forward + // compatibility we just ignore any invalid data. + // Once stabilized we will drop the connection instead of + // logging the error. + tracing::warn!("{err:#}"); } - old = new; - let req = rpc::push_batch_votes::Req(diff); - push_batch_votes_client.call(ctx, &req, kB).await?; } - }); - } + } + }); if let Some(ping_timeout) = &self.cfg.ping_timeout { let ping_client = rpc::Client::::new(ctx, rpc::ping::RATE); diff --git a/node/actors/network/src/gossip/testonly.rs b/node/actors/network/src/gossip/testonly.rs index d37419c0..114a838e 100644 --- a/node/actors/network/src/gossip/testonly.rs +++ b/node/actors/network/src/gossip/testonly.rs @@ -28,7 +28,7 @@ impl ConnRunner { fn mux_entry(ctx: &ctx::Ctx) -> (mux::CapabilityId, Arc) { ( - R::CAPABILITY_ID, + R::CAPABILITY.id(), mux::StreamQueue::new(ctx, R::INFLIGHT, limiter::Rate::INF), ) } @@ -89,7 +89,7 @@ impl Conn { ) -> ctx::OrCanceled> { Ok(ServerStream( self.connect - .get(&R::CAPABILITY_ID) + .get(&R::CAPABILITY.id()) .unwrap() .open(ctx) .await?, @@ -104,7 +104,7 @@ impl Conn { ) -> ctx::OrCanceled> { Ok(ClientStream( self.accept - .get(&R::CAPABILITY_ID) + .get(&R::CAPABILITY.id()) .unwrap() .open(ctx) .await?, diff --git a/node/actors/network/src/gossip/tests/fetch_batches.rs b/node/actors/network/src/gossip/tests/fetch_batches.rs index 12b8e840..eb169278 100644 --- a/node/actors/network/src/gossip/tests/fetch_batches.rs +++ b/node/actors/network/src/gossip/tests/fetch_batches.rs @@ -17,7 +17,6 @@ async fn test_simple() { cfg.rpc.push_batch_store_state_rate = limiter::Rate::INF; cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; - cfg.attester_key = None; cfg.validator_key = None; scope::run!(ctx, |ctx, s| async { @@ -127,7 +126,6 @@ async fn test_concurrent_requests() { cfg.rpc.push_batch_store_state_rate = limiter::Rate::INF; cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; - cfg.attester_key = None; cfg.validator_key = None; scope::run!(ctx, |ctx, s| async { @@ -204,7 +202,6 @@ async fn test_bad_responses() { cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; cfg.validator_key = None; - cfg.attester_key = None; scope::run!(ctx, |ctx, s| async { let store = TestMemoryStorage::new(ctx, &setup.genesis).await; @@ -281,7 +278,6 @@ async fn test_retry() { cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; cfg.validator_key = None; - cfg.attester_key = None; scope::run!(ctx, |ctx, s| async { let store = TestMemoryStorage::new(ctx, &setup.genesis).await; diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/actors/network/src/gossip/tests/mod.rs index 942a7ec3..7ab311bb 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/actors/network/src/gossip/tests/mod.rs @@ -1,10 +1,6 @@ use super::ValidatorAddrs; use crate::{ - gossip::{ - batch_votes::{BatchVotes, BatchVotesWatch}, - handshake, - validator_addrs::ValidatorAddrsWatch, - }, + gossip::{attestation, handshake, validator_addrs::ValidatorAddrsWatch}, metrics, preface, rpc, testonly, }; use anyhow::Context as _; @@ -19,12 +15,12 @@ use tracing::Instrument as _; use zksync_concurrency::{ ctx, error::Wrap as _, - net, scope, sync, + limiter, net, scope, sync, testonly::{abort_on_panic, set_timeout}, time, }; use zksync_consensus_roles::{attester, validator}; -use zksync_consensus_storage::{testonly::TestMemoryStorage, PersistentBatchStore}; +use zksync_consensus_storage::testonly::TestMemoryStorage; mod fetch_batches; mod fetch_blocks; @@ -96,9 +92,6 @@ fn mk_version(rng: &mut R) -> u64 { #[derive(Default)] struct View(im::HashMap>>); -#[derive(Default)] -struct Signatures(im::HashMap>>); - fn mk_netaddr( key: &validator::SecretKey, addr: std::net::SocketAddr, @@ -112,19 +105,6 @@ fn mk_netaddr( }) } -fn mk_batch( - rng: &mut R, - key: &attester::SecretKey, - number: attester::BatchNumber, - genesis: attester::GenesisHash, -) -> attester::Signed { - key.sign_msg(attester::Batch { - number, - hash: rng.gen(), - genesis, - }) -} - fn random_netaddr( rng: &mut R, key: &validator::SecretKey, @@ -137,19 +117,6 @@ fn random_netaddr( )) } -fn random_batch_vote( - rng: &mut R, - key: &attester::SecretKey, - genesis: attester::GenesisHash, -) -> Arc> { - let batch = attester::Batch { - number: attester::BatchNumber(rng.gen_range(0..1000)), - hash: rng.gen(), - genesis, - }; - Arc::new(key.sign_msg(batch.to_owned())) -} - fn update_netaddr( rng: &mut R, addr: &validator::NetAddress, @@ -165,20 +132,6 @@ fn update_netaddr( )) } -fn update_signature( - rng: &mut R, - batch: &attester::Batch, - key: &attester::SecretKey, - batch_number_diff: i64, -) -> Arc> { - let batch = attester::Batch { - number: attester::BatchNumber((batch.number.0 as i64 + batch_number_diff) as u64), - hash: rng.gen(), - genesis: batch.genesis, - }; - Arc::new(key.sign_msg(batch.to_owned())) -} - impl View { fn insert(&mut self, entry: Arc>) { self.0.insert(entry.key.clone(), entry); @@ -193,20 +146,6 @@ impl View { } } -impl Signatures { - fn insert(&mut self, entry: Arc>) { - self.0.insert(entry.key.clone(), entry); - } - - fn get(&mut self, key: &attester::SecretKey) -> Arc> { - self.0.get(&key.public()).unwrap().clone() - } - - fn as_vec(&self) -> Vec>> { - self.0.values().cloned().collect() - } -} - #[tokio::test] async fn test_validator_addrs() { abort_on_panic(); @@ -545,230 +484,105 @@ async fn rate_limiting() { } } -#[tokio::test] -async fn test_batch_votes() { - abort_on_panic(); - let rng = &mut ctx::test_root(&ctx::RealClock).rng(); - - let keys: Vec = (0..8).map(|_| rng.gen()).collect(); - let attesters = attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { - key: k.public(), - weight: 1250, - })) - .unwrap(); - let votes = BatchVotesWatch::default(); - let mut sub = votes.subscribe(); - - let genesis = rng.gen::(); - - // Initial values. - let mut want = Signatures::default(); - for k in &keys[0..6] { - want.insert(random_batch_vote(rng, k, genesis)); - } - votes - .update(&attesters, &genesis, &want.as_vec()) - .await - .unwrap(); - assert_eq!(want.0, sub.borrow_and_update().votes); - - // newer batch number, should be updated - let k0v2 = update_signature(rng, &want.get(&keys[0]).msg, &keys[0], 1); - // same batch number, should be ignored - let k1v2 = update_signature(rng, &want.get(&keys[1]).msg, &keys[1], 0); - // older batch number, should be ignored - let k4v2 = update_signature(rng, &want.get(&keys[4]).msg, &keys[4], -1); - // first entry for a key in the config, should be inserted - let k6v1 = random_batch_vote(rng, &keys[6], genesis); - // entry for a key outside of the config, should be ignored - let k8 = rng.gen(); - let k8v1 = random_batch_vote(rng, &k8, genesis); - - // Update the ones we expect to succeed - want.insert(k0v2.clone()); - want.insert(k6v1.clone()); - - // Send all of them to the votes - let update = [ - k0v2, - k1v2, - k4v2, - // no new entry for keys[5] - k6v1, - // no entry at all for keys[7] - k8v1.clone(), - ]; - votes.update(&attesters, &genesis, &update).await.unwrap(); - assert_eq!(want.0, sub.borrow_and_update().votes); - - // Invalid signature, should be ignored. - let mut k0v3 = mk_batch( - rng, - &keys[1], - attester::BatchNumber(want.get(&keys[0]).msg.number.0 + 1), - genesis, - ); - k0v3.key = keys[0].public(); - assert!(votes - .update(&attesters, &genesis, &[Arc::new(k0v3)]) - .await - .is_ok()); - assert_eq!(want.0, sub.borrow_and_update().votes); - - // Invalid genesis, should be rejected. - let other_genesis = rng.gen(); - let k1v3 = mk_batch( - rng, - &keys[1], - attester::BatchNumber(want.get(&keys[1]).msg.number.0 + 1), - other_genesis, - ); - assert!(votes - .update(&attesters, &genesis, &[Arc::new(k1v3)]) - .await - .is_err()); - - assert_eq!(want.0, sub.borrow_and_update().votes); - - // Duplicate entry in the update, should be rejected. - assert!(votes - .update(&attesters, &genesis, &[k8v1.clone(), k8v1]) - .await - .is_err()); - assert_eq!(want.0, sub.borrow_and_update().votes); -} - -#[test] -fn test_batch_votes_quorum() { - abort_on_panic(); - let rng = &mut ctx::test_root(&ctx::RealClock).rng(); - - for _ in 0..10 { - let committee_size = rng.gen_range(1..20); - let keys: Vec = (0..committee_size).map(|_| rng.gen()).collect(); - let attesters = attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { - key: k.public(), - weight: rng.gen_range(1..=100), - })) - .unwrap(); - - let batch0 = rng.gen::(); - let batch1 = attester::Batch { - number: batch0.number.next(), - hash: rng.gen(), - genesis: batch0.genesis, - }; - let genesis = batch0.genesis; - let mut batches = [(batch0, 0u64), (batch1, 0u64)]; - - let mut votes = BatchVotes::default(); - for sk in &keys { - // We need 4/5+1 for quorum, so let's say ~80% vote on the second batch. - let b = usize::from(rng.gen_range(0..100) < 80); - let batch = &batches[b].0; - let vote = sk.sign_msg(batch.clone()); - votes - .update(&attesters, &genesis, &[Arc::new(vote)]) - .unwrap(); - batches[b].1 += attesters.weight(&sk.public()).unwrap(); - - // Check that as soon as we have quorum it's found. - if batches[b].1 >= attesters.threshold() { - let qc = votes - .find_quorum(&attesters, &genesis) - .expect("should find quorum"); - assert!(qc.message == *batch); - assert!(qc.signatures.keys().count() > 0); - } - } - - // Check that if there was no quoroum then we don't find any. - if !batches.iter().any(|b| b.1 >= attesters.threshold()) { - assert!(votes.find_quorum(&attesters, &genesis).is_none()); - } - - // Check that the minimum batch number prunes data. - let last_batch = batches[1].0.number; - - votes.set_min_batch_number(last_batch); - assert!(votes.votes.values().all(|v| v.msg.number >= last_batch)); - assert!(votes.support.keys().all(|n| *n >= last_batch)); - - votes.set_min_batch_number(last_batch.next()); - assert!(votes.votes.is_empty()); - assert!(votes.support.is_empty()); - } -} - -// #[tokio::test(flavor = "multi_thread")] async fn test_batch_votes_propagation() { - let _guard = set_timeout(time::Duration::seconds(10)); abort_on_panic(); + let _guard = set_timeout(time::Duration::seconds(10)); let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); let rng = &mut ctx.rng(); - let mut setup = validator::testonly::Setup::new(rng, 10); + let setup = validator::testonly::Setup::new(rng, 10); + let cfgs = testonly::new_configs(rng, &setup, 2); - let cfgs = testonly::new_configs(rng, &setup, 1); + // Fixed attestation schedule. + let first: attester::BatchNumber = rng.gen(); + let schedule: Vec<_> = (0..10) + .map(|r| { + Arc::new(attestation::Info { + batch_to_attest: attester::Batch { + genesis: setup.genesis.hash(), + number: first + r, + hash: rng.gen(), + }, + committee: { + // We select a random subset here. It would be incorrect to choose an empty subset, but + // the chances of that are negligible. + let subset: Vec<_> = setup.attester_keys.iter().filter(|_| rng.gen()).collect(); + attester::Committee::new(subset.iter().map(|k| attester::WeightedAttester { + key: k.public(), + weight: rng.gen_range(5..10), + })) + .unwrap() + .into() + }, + }) + }) + .collect(); + + // Round of the schedule that nodes should collect the votes for. + let round = sync::watch::channel(0).0; scope::run!(ctx, |ctx, s| async { - // All nodes share the same in-memory store + // All nodes share the same store - store is not used in this test anyway. let store = TestMemoryStorage::new(ctx, &setup.genesis).await; s.spawn_bg(store.runner.run(ctx)); - // Push the first batch into the store so we have something to vote on. - setup.push_blocks(rng, 2); - setup.push_batch(rng); - - let batch = setup.batches[0].clone(); - - store - .batches - .queue_batch(ctx, batch.clone(), setup.genesis.clone()) - .await - .unwrap(); - - let batch = attester::Batch { - number: batch.number, - hash: rng.gen(), - genesis: setup.genesis.hash(), - }; - // Start all nodes. - let nodes: Vec = cfgs - .iter() - .enumerate() - .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new( - cfg.clone(), - store.blocks.clone(), - store.batches.clone(), - ); - s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); - node - }) - .collect(); - - // Cast votes on each node. It doesn't matter who signs where in this test, - // we just happen to know that we'll have as many nodes as attesters. - let attesters = setup.genesis.attesters.as_ref().unwrap(); - for (node, key) in nodes.iter().zip(setup.attester_keys.iter()) { - node.net - .batch_vote_publisher() - .publish(attesters, &setup.genesis.hash(), key, batch.clone()) - .await - .unwrap(); - } - - // Wait until one of the nodes collects a quorum over gossip and stores. - loop { - if let Some(qc) = store.im_batches.get_batch_qc(ctx, batch.number).await? { - assert_eq!(qc.message, batch); - return Ok(()); - } - ctx.sleep(time::Duration::milliseconds(100)).await?; + for (i, mut cfg) in cfgs.into_iter().enumerate() { + cfg.rpc.push_batch_votes_rate = limiter::Rate::INF; + cfg.validator_key = None; + let (node, runner) = testonly::Instance::new_from_config(testonly::InstanceConfig { + cfg: cfg.clone(), + block_store: store.blocks.clone(), + batch_store: store.batches.clone(), + attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone())) + .into(), + }); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + // Task going through the schedule, waiting for ANY node to collect the certificate + // to advance to the next round of the schedule. + // Test succeeds if all tasks successfully iterate through the whole schedule. + s.spawn( + async { + let node = node; + let sub = &mut round.subscribe(); + sub.mark_changed(); + loop { + let r = ctx::NoCopy(*sync::changed(ctx, sub).await?); + tracing::info!("starting round {}", *r); + let Some(cfg) = schedule.get(*r) else { + return Ok(()); + }; + let attestation = node.net.gossip.attestation.clone(); + attestation.start_attestation(cfg.clone()).await.unwrap(); + // Wait for the certificate in the background. + s.spawn_bg(async { + let r = r; + let attestation = attestation; + let Ok(Some(qc)) = attestation + .wait_for_cert(ctx, cfg.batch_to_attest.number) + .await + else { + return Ok(()); + }; + assert_eq!(qc.message, cfg.batch_to_attest); + qc.verify(setup.genesis.hash(), &cfg.committee).unwrap(); + tracing::info!("got cert for {}", *r); + round.send_if_modified(|round| { + if *round != *r { + return false; + } + *round = *r + 1; + true + }); + Ok(()) + }); + } + } + .instrument(tracing::info_span!("attester", i)), + ); } + Ok(()) }) .await .unwrap(); diff --git a/node/actors/network/src/gossip/tests/syncing.rs b/node/actors/network/src/gossip/tests/syncing.rs index b456350f..1bd51446 100644 --- a/node/actors/network/src/gossip/tests/syncing.rs +++ b/node/actors/network/src/gossip/tests/syncing.rs @@ -327,7 +327,6 @@ async fn coordinated_batch_syncing(node_count: usize, gossip_peers: usize) { cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; cfg.validator_key = None; - cfg.attester_key = None; let store = TestMemoryStorage::new(ctx, &setup.genesis).await; s.spawn_bg(store.runner.run(ctx)); let (node, runner) = testonly::Instance::new(cfg, store.blocks, store.batches); @@ -384,7 +383,6 @@ async fn uncoordinated_batch_syncing( cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; cfg.validator_key = None; - cfg.attester_key = None; let store = TestMemoryStorage::new(ctx, &setup.genesis).await; s.spawn_bg(store.runner.run(ctx)); let (node, runner) = testonly::Instance::new(cfg, store.blocks, store.batches); diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 1d5f3cbd..1e7874d2 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -1,6 +1,6 @@ //! Network actor maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; -use gossip::{AttestationStatusWatch, BatchVotesPublisher}; +use gossip::attestation; use std::sync::Arc; use tracing::Instrument as _; use zksync_concurrency::{ @@ -57,10 +57,9 @@ impl Network { block_store: Arc, batch_store: Arc, pipe: ActorPipe, - attestation_status: Arc, + attestation: Arc, ) -> (Arc, Runner) { - let gossip = - gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation_status); + let gossip = gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation); let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( @@ -77,11 +76,6 @@ impl Network { metrics::NetworkGauges::register(Arc::downgrade(self)); } - /// Create a batch vote publisher to push attestations to gossip. - pub fn batch_vote_publisher(&self) -> BatchVotesPublisher { - BatchVotesPublisher(self.gossip.batch_votes.clone()) - } - /// Handles a dispatcher message. async fn handle_message( &self, @@ -133,9 +127,6 @@ impl Runner { Ok(()) }); - // Update QC batches in the background. - s.spawn(self.net.gossip.run_batch_qc_finder(ctx)); - // Fetch missing batches in the background. s.spawn(async { self.net.gossip.run_batch_fetcher(ctx).await; @@ -175,8 +166,6 @@ impl Runner { } } - // TODO: check if we are active attester to get new L1 Batches, sign them and broadcast the signature - let accept_limiter = limiter::Limiter::new(ctx, self.net.gossip.cfg.tcp_accept_rate); loop { accept_limiter.acquire(ctx, 1).await?; diff --git a/node/actors/network/src/proto/gossip.proto b/node/actors/network/src/proto/gossip.proto index 33e3d16a..8f8c1327 100644 --- a/node/actors/network/src/proto/gossip.proto +++ b/node/actors/network/src/proto/gossip.proto @@ -19,10 +19,18 @@ message PushValidatorAddrs { } message PushBatchVotes { + // Requesting the peer to respond with votes for the batch. + optional roles.attester.Batch want_votes_for = 2; // optional // Signed roles.validator.Msg.votes repeated roles.attester.Signed votes = 1; } +message PushBatchVotesResp { + // Signed roles.validator.Msg.votes + // Empty if want_votes_for in request was not set. + repeated roles.attester.Signed votes = 1; +} + // State of the local block store. // A node is expected to store a continuous range of blocks at all times // and actively fetch newest blocks. diff --git a/node/actors/network/src/proto/rpc.proto b/node/actors/network/src/proto/rpc.proto new file mode 100644 index 00000000..503dbe00 --- /dev/null +++ b/node/actors/network/src/proto/rpc.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package zksync.network.rpc; + +enum Capability { + reserved 5; + CONSENSUS = 0; + PING = 2; + PUSH_VALIDATOR_ADDRS = 1; + PUSH_BLOCK_STORE_STATE = 3; + GET_BLOCK = 4; + PUSH_BATCH_VOTES = 8; + PUSH_BATCH_STORE_STATE = 7; + GET_BATCH = 6; +} diff --git a/node/actors/network/src/rpc/consensus.rs b/node/actors/network/src/rpc/consensus.rs index 40082c2a..40cd6196 100644 --- a/node/actors/network/src/rpc/consensus.rs +++ b/node/actors/network/src/rpc/consensus.rs @@ -1,5 +1,6 @@ //! Defines RPC for passing consensus messages. -use crate::{mux, proto::consensus as proto}; +use super::Capability; +use crate::proto::consensus as proto; use zksync_consensus_roles::validator; use zksync_protobuf::{read_required, ProtoFmt}; @@ -7,7 +8,7 @@ use zksync_protobuf::{read_required, ProtoFmt}; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 0; + const CAPABILITY: Capability = Capability::Consensus; const INFLIGHT: u32 = 3; const METHOD: &'static str = "consensus"; type Req = Req; diff --git a/node/actors/network/src/rpc/get_batch.rs b/node/actors/network/src/rpc/get_batch.rs index 8c83e451..18c53ca4 100644 --- a/node/actors/network/src/rpc/get_batch.rs +++ b/node/actors/network/src/rpc/get_batch.rs @@ -1,5 +1,6 @@ //! RPC for fetching a batch from peer. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context; use zksync_consensus_roles::attester; use zksync_protobuf::{read_optional, ProtoFmt}; @@ -10,7 +11,7 @@ pub(crate) struct Rpc; // TODO: determine more precise `INFLIGHT` / `RATE` values as a result of load testing impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 6; + const CAPABILITY: Capability = Capability::GetBatch; const INFLIGHT: u32 = 5; const METHOD: &'static str = "get_batch"; diff --git a/node/actors/network/src/rpc/get_block.rs b/node/actors/network/src/rpc/get_block.rs index 45b208d0..84474f7a 100644 --- a/node/actors/network/src/rpc/get_block.rs +++ b/node/actors/network/src/rpc/get_block.rs @@ -1,5 +1,6 @@ //! RPC for fetching a block from peer. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context; use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; use zksync_protobuf::{read_optional, ProtoFmt}; @@ -10,7 +11,7 @@ pub(crate) struct Rpc; // TODO: determine more precise `INFLIGHT` / `RATE` values as a result of load testing impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 4; + const CAPABILITY: Capability = Capability::GetBlock; const INFLIGHT: u32 = 5; const METHOD: &'static str = "get_block"; diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index 8c4d6d7f..7d53b3a5 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -16,6 +16,7 @@ //! at the same time (max 1 client + server per CapabilityId). use self::metrics::{CallLatencyType, CallType, RPC_METRICS}; +pub(crate) use crate::proto::rpc::Capability; use crate::{frame, mux}; use anyhow::Context as _; use std::{collections::BTreeMap, sync::Arc}; @@ -35,6 +36,13 @@ pub(crate) mod testonly; #[cfg(test)] mod tests; +impl Capability { + /// Converts capability to `mux::CapabilityId`. + pub(crate) fn id(self) -> mux::CapabilityId { + self as mux::CapabilityId + } +} + /// Multiplexer configuration for the RPC services. pub(crate) const MUX_CONFIG: mux::Config = mux::Config { read_buffer_size: 160 * zksync_protobuf::kB as u64, @@ -47,16 +55,21 @@ pub(crate) const MUX_CONFIG: mux::Config = mux::Config { /// It is just a collection of associated types /// and constants. pub(crate) trait Rpc: Sync + Send + 'static { - /// CapabilityId used to identify the RPC. - /// Client and Server have to agree on CAPABILITY_ID - /// of all supported RPC, for the RPC to actually work. + /// Capability used to identify the RPC. + /// Client and Server both have to support the given + /// capability for the RPC to work. /// Each type implementing `Rpc` should use an unique - /// `CAPABILITY_ID`. - const CAPABILITY_ID: mux::CapabilityId; + /// capability. We use a protobuf enum as a + /// register of capabilitues to enforce uniqueness + /// and compatibility. + const CAPABILITY: Capability; /// Maximal number of calls executed in parallel. /// Both client and server enforce this limit. const INFLIGHT: u32; /// Name of the RPC, used in prometheus metrics. + /// TODO: we can derive it automatically from EnumValueDescriptor of the + /// CAPABILITY (or form Debug implementation of Capability, but Debug formats + /// are not stable). const METHOD: &'static str; /// Type of the request message. type Req: zksync_protobuf::ProtoFmt + Send + Sync; @@ -254,12 +267,12 @@ impl<'a> Service<'a> { if self .mux .accept - .insert(R::CAPABILITY_ID, client.queue.clone()) + .insert(R::CAPABILITY.id(), client.queue.clone()) .is_some() { panic!( - "client for capability {} already registered", - R::CAPABILITY_ID + "client for capability {:?} already registered", + R::CAPABILITY ); } self @@ -276,12 +289,12 @@ impl<'a> Service<'a> { if self .mux .connect - .insert(R::CAPABILITY_ID, queue.clone()) + .insert(R::CAPABILITY.id(), queue.clone()) .is_some() { panic!( - "server for capability {} already registered", - R::CAPABILITY_ID + "server for capability {:?} already registered", + R::CAPABILITY ); } self.servers.push(Box::new(Server { diff --git a/node/actors/network/src/rpc/ping.rs b/node/actors/network/src/rpc/ping.rs index a11e07ed..1e78ba8c 100644 --- a/node/actors/network/src/rpc/ping.rs +++ b/node/actors/network/src/rpc/ping.rs @@ -1,5 +1,6 @@ //! Defines an RPC for sending ping messages. -use crate::{mux, proto::ping as proto}; +use super::Capability; +use crate::proto::ping as proto; use anyhow::Context as _; use rand::Rng; use zksync_concurrency::{ctx, limiter, time}; @@ -9,7 +10,7 @@ use zksync_protobuf::{kB, required, ProtoFmt}; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 2; + const CAPABILITY: Capability = Capability::Ping; const INFLIGHT: u32 = 1; const METHOD: &'static str = "ping"; type Req = Req; diff --git a/node/actors/network/src/rpc/push_batch_store_state.rs b/node/actors/network/src/rpc/push_batch_store_state.rs index aa098a5a..4c561abe 100644 --- a/node/actors/network/src/rpc/push_batch_store_state.rs +++ b/node/actors/network/src/rpc/push_batch_store_state.rs @@ -1,5 +1,6 @@ //! RPC for fetching a batch from peer. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context as _; use zksync_consensus_roles::attester; use zksync_consensus_storage::BatchStoreState; @@ -10,7 +11,7 @@ use zksync_protobuf::{required, ProtoFmt}; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 7; + const CAPABILITY: Capability = Capability::PushBatchStoreState; const INFLIGHT: u32 = 1; const METHOD: &'static str = "push_batch_store_state"; diff --git a/node/actors/network/src/rpc/push_batch_votes.rs b/node/actors/network/src/rpc/push_batch_votes.rs index e0e67126..901d120b 100644 --- a/node/actors/network/src/rpc/push_batch_votes.rs +++ b/node/actors/network/src/rpc/push_batch_votes.rs @@ -1,34 +1,35 @@ //! Defines RPC for passing consensus messages. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context as _; use std::sync::Arc; -use zksync_consensus_roles::attester::{self, Batch}; -use zksync_protobuf::ProtoFmt; +use zksync_consensus_roles::attester; +use zksync_protobuf::{read_optional, ProtoFmt}; -/// PushBatchVotes RPC. +/// RPC pushing fresh batch votes. pub(crate) struct Rpc; -/// Deprecated, because adding `genesis_hash` to `validator::Batch` -/// was not backward compatible - old binaries couldn't verify -/// signatures on messages with `genesis_hash` and were treating it -/// as malicious behavior. -#[allow(dead_code)] -pub(super) const V1: mux::CapabilityId = 5; - -/// Current version. -pub(super) const V2: mux::CapabilityId = 8; - impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = V2; + const CAPABILITY: Capability = Capability::PushBatchVotes; const INFLIGHT: u32 = 1; const METHOD: &'static str = "push_batch_votes"; type Req = Req; - type Resp = (); + type Resp = Resp; } /// Signed batch message that the receiving peer should process. #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct Req(pub(crate) Vec>>); +pub(crate) struct Req { + /// Requesting the peer to respond with votes for the batch. + pub(crate) want_votes_for: Option, + /// New votes that server might be not aware of. + pub(crate) votes: Vec>>, +} + +pub(crate) struct Resp { + /// Votes requested by the peer. + pub(crate) votes: Vec>>, +} impl ProtoFmt for Req { type Proto = proto::PushBatchVotes; @@ -40,12 +41,44 @@ impl ProtoFmt for Req { ProtoFmt::read(e).with_context(|| format!("votes[{i}]"))?, )); } - Ok(Self(votes)) + Ok(Self { + want_votes_for: read_optional(&r.want_votes_for).context("want_votes_for")?, + votes, + }) + } + + fn build(&self) -> Self::Proto { + Self::Proto { + want_votes_for: self.want_votes_for.as_ref().map(ProtoFmt::build), + votes: self + .votes + .iter() + .map(|a| ProtoFmt::build(a.as_ref())) + .collect(), + } + } +} + +impl ProtoFmt for Resp { + type Proto = proto::PushBatchVotesResp; + + fn read(r: &Self::Proto) -> anyhow::Result { + let mut votes = vec![]; + for (i, e) in r.votes.iter().enumerate() { + votes.push(Arc::new( + ProtoFmt::read(e).with_context(|| format!("votes[{i}]"))?, + )); + } + Ok(Self { votes }) } fn build(&self) -> Self::Proto { Self::Proto { - votes: self.0.iter().map(|a| ProtoFmt::build(a.as_ref())).collect(), + votes: self + .votes + .iter() + .map(|a| ProtoFmt::build(a.as_ref())) + .collect(), } } } diff --git a/node/actors/network/src/rpc/push_block_store_state.rs b/node/actors/network/src/rpc/push_block_store_state.rs index 52c1161c..1134185c 100644 --- a/node/actors/network/src/rpc/push_block_store_state.rs +++ b/node/actors/network/src/rpc/push_block_store_state.rs @@ -1,5 +1,6 @@ //! RPC for notifying peer about our BlockStore state. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context; use zksync_consensus_roles::validator; use zksync_consensus_storage::BlockStoreState; @@ -10,7 +11,7 @@ use zksync_protobuf::{read_optional, required, ProtoFmt}; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 3; + const CAPABILITY: Capability = Capability::PushBlockStoreState; const INFLIGHT: u32 = 1; const METHOD: &'static str = "push_block_store_state"; diff --git a/node/actors/network/src/rpc/push_validator_addrs.rs b/node/actors/network/src/rpc/push_validator_addrs.rs index c58803bf..48a85ba4 100644 --- a/node/actors/network/src/rpc/push_validator_addrs.rs +++ b/node/actors/network/src/rpc/push_validator_addrs.rs @@ -1,5 +1,6 @@ //! RPC for synchronizing ValidatorAddrs data. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context as _; use std::sync::Arc; use zksync_consensus_roles::validator; @@ -9,7 +10,7 @@ use zksync_protobuf::ProtoFmt; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 1; + const CAPABILITY: Capability = Capability::PushValidatorAddrs; const INFLIGHT: u32 = 1; const METHOD: &'static str = "push_validator_addrs"; diff --git a/node/actors/network/src/rpc/tests.rs b/node/actors/network/src/rpc/tests.rs index 0f444bf5..d2523fc2 100644 --- a/node/actors/network/src/rpc/tests.rs +++ b/node/actors/network/src/rpc/tests.rs @@ -1,30 +1,10 @@ use super::*; use crate::noise; use rand::Rng as _; -use std::{ - collections::HashSet, - sync::atomic::{AtomicU64, Ordering}, -}; +use std::sync::atomic::{AtomicU64, Ordering}; use zksync_concurrency::{ctx, testonly::abort_on_panic, time}; use zksync_protobuf::{kB, testonly::test_encode_random}; -/// CAPABILITY_ID should uniquely identify the RPC. -#[test] -fn test_capability_rpc_correspondence() { - let ids = [ - consensus::Rpc::CAPABILITY_ID, - push_validator_addrs::Rpc::CAPABILITY_ID, - push_block_store_state::Rpc::CAPABILITY_ID, - get_block::Rpc::CAPABILITY_ID, - ping::Rpc::CAPABILITY_ID, - push_batch_votes::V1, - push_batch_votes::V2, - push_batch_store_state::Rpc::CAPABILITY_ID, - get_batch::Rpc::CAPABILITY_ID, - ]; - assert_eq!(ids.len(), HashSet::from(ids).len()); -} - #[test] fn test_schema_encode_decode() { let rng = &mut ctx::test_root(&ctx::RealClock).rng(); @@ -161,7 +141,7 @@ const RATE: limiter::Rate = limiter::Rate { }; impl Rpc for ExampleRpc { - const CAPABILITY_ID: mux::CapabilityId = 0; + const CAPABILITY: Capability = Capability::Ping; const INFLIGHT: u32 = 5; const METHOD: &'static str = "example"; type Req = (); diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 7728bc45..909e7248 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -1,7 +1,7 @@ //! Testonly utilities. #![allow(dead_code)] use crate::{ - gossip::AttestationStatusWatch, + gossip::attestation, io::{ConsensusInputMessage, Target}, Config, GossipConfig, Network, RpcConfig, Runner, }; @@ -15,7 +15,7 @@ use std::{ }; use zksync_concurrency::{ ctx::{self, channel}, - io, limiter, net, scope, sync, time, + io, limiter, net, scope, sync, }; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -104,7 +104,6 @@ where // due to timeouts. ping_timeout: None, validator_key: Some(validator_key.clone()), - attester_key: None, gossip: GossipConfig { key: rng.gen(), dynamic_inbound_limit: usize::MAX, @@ -143,7 +142,6 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { // due to timeouts. ping_timeout: None, validator_key: None, - attester_key: None, gossip: GossipConfig { key: rng.gen(), dynamic_inbound_limit: usize::MAX, @@ -161,8 +159,6 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { /// Runner for Instance. pub struct InstanceRunner { net_runner: Runner, - attestation_status: Arc, - block_store: Arc, batch_store: Arc, terminate: channel::Receiver<()>, } @@ -172,20 +168,6 @@ impl InstanceRunner { pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { s.spawn_bg(self.net_runner.run(ctx)); - s.spawn_bg(async { - let genesis = self.block_store.genesis().hash(); - loop { - if let Ok(Some(batch_number)) = self.batch_store.next_batch_to_attest(ctx).await - { - self.attestation_status - .update(genesis, batch_number) - .await?; - } - if ctx.sleep(time::Duration::seconds(1)).await.is_err() { - return Ok(()); - } - } - }); let _ = self.terminate.recv(ctx).await; Ok(()) }) @@ -195,25 +177,45 @@ impl InstanceRunner { } } +/// InstanceConfig +pub struct InstanceConfig { + /// cfg + pub cfg: Config, + /// block_store + pub block_store: Arc, + /// batch_store + pub batch_store: Arc, + /// Attestation controller. + /// It is not configured by default. + /// Attestation tests should configure it and consume + /// the certificates on their own (see `attestation::Controller`). + pub attestation: Arc, +} + impl Instance { - /// Construct an instance for a given config. + /// Constructs a new instance. pub fn new( cfg: Config, block_store: Arc, batch_store: Arc, ) -> (Self, InstanceRunner) { - // Semantically we'd want this to be created at the same level as the stores, - // but doing so would introduce a lot of extra cruft in setting up tests. - let attestation_status = - Arc::new(AttestationStatusWatch::new(block_store.genesis().hash())); + Self::new_from_config(InstanceConfig { + cfg, + block_store, + batch_store, + attestation: attestation::Controller::new(None).into(), + }) + } + /// Construct an instance for a given config. + pub fn new_from_config(cfg: InstanceConfig) -> (Self, InstanceRunner) { let (actor_pipe, dispatcher_pipe) = pipe::new(); let (net, net_runner) = Network::new( - cfg, - block_store.clone(), - batch_store.clone(), + cfg.cfg, + cfg.block_store.clone(), + cfg.batch_store.clone(), actor_pipe, - attestation_status.clone(), + cfg.attestation, ); let (terminate_send, terminate_recv) = channel::bounded(1); ( @@ -224,9 +226,7 @@ impl Instance { }, InstanceRunner { net_runner, - attestation_status, - block_store, - batch_store, + batch_store: cfg.batch_store.clone(), terminate: terminate_recv, }, ) diff --git a/node/libs/roles/src/attester/messages/batch.rs b/node/libs/roles/src/attester/messages/batch.rs index e9554314..41ada768 100644 --- a/node/libs/roles/src/attester/messages/batch.rs +++ b/node/libs/roles/src/attester/messages/batch.rs @@ -1,9 +1,5 @@ use super::{GenesisHash, Signed}; -use crate::{ - attester, - validator::{Genesis, Payload}, -}; -use anyhow::{ensure, Context as _}; +use crate::{attester, validator::Payload}; use zksync_consensus_crypto::{keccak256::Keccak256, ByteFmt, Text, TextFmt}; use zksync_consensus_utils::enum_util::Variant; @@ -125,28 +121,11 @@ pub enum BatchQCVerifyError { want: u64, }, /// Bad signer set. - #[error("signers set doesn't match genesis")] + #[error("signers not in committee")] BadSignersSet, - /// No attester committee in genesis. - #[error("No attester committee in genesis")] - AttestersNotInGenesis, -} - -/// Error returned by `BatchQC::add()` if the signature is invalid. -#[derive(thiserror::Error, Debug)] -pub enum BatchQCAddError { - /// Inconsistent messages. - #[error("Trying to add signature for a different message")] - InconsistentMessages, - /// Signer not present in the committee. - #[error("Signer not in committee: {signer:?}")] - SignerNotInCommittee { - /// Signer of the message. - signer: Box, - }, - /// Message already present in BatchQC. - #[error("Message already signed for BatchQC")] - Exists, + /// Genesis mismatch. + #[error("genesis mismatch")] + GenesisMismatch, } impl BatchQC { @@ -160,46 +139,43 @@ impl BatchQC { /// Add a attester's signature. /// Signature is assumed to be already verified. - pub fn add(&mut self, msg: &Signed, genesis: &Genesis) -> anyhow::Result<()> { - use BatchQCAddError as Error; - - let committee = genesis - .attesters - .as_ref() - .context("no attester committee in genesis")?; - - ensure!(self.message == msg.msg, Error::InconsistentMessages); - ensure!(!self.signatures.contains(&msg.key), Error::Exists); - ensure!( - committee.contains(&msg.key), - Error::SignerNotInCommittee { - signer: Box::new(msg.key.clone()), - } + pub fn add( + &mut self, + msg: &Signed, + committee: &attester::Committee, + ) -> anyhow::Result<()> { + anyhow::ensure!(self.message == msg.msg, "inconsistent messages"); + anyhow::ensure!( + !self.signatures.contains(&msg.key), + "signature already present" ); - + anyhow::ensure!(committee.contains(&msg.key), "not in committee"); self.signatures.add(msg.key.clone(), msg.sig.clone()); - Ok(()) } - /// Verifies the signature of the BatchQC. - pub fn verify(&self, genesis: &Genesis) -> Result<(), BatchQCVerifyError> { + /// Verifies the the BatchQC. + pub fn verify( + &self, + genesis: GenesisHash, + committee: &attester::Committee, + ) -> Result<(), BatchQCVerifyError> { use BatchQCVerifyError as Error; - let attesters = genesis - .attesters - .as_ref() - .ok_or(Error::AttestersNotInGenesis)?; + + if self.message.genesis != genesis { + return Err(Error::GenesisMismatch); + } // Verify that all signers are attesters. for pk in self.signatures.keys() { - if !attesters.contains(pk) { + if !committee.contains(pk) { return Err(Error::BadSignersSet); } } // Verify that the signer's weight is sufficient. - let weight = attesters.weight_of_keys(self.signatures.keys()); - let threshold = attesters.threshold(); + let weight = committee.weight_of_keys(self.signatures.keys()); + let threshold = committee.threshold(); if weight < threshold { return Err(Error::NotEnoughSigners { got: weight, diff --git a/node/libs/roles/src/attester/tests.rs b/node/libs/roles/src/attester/tests.rs index 57c577a8..c9d85cd4 100644 --- a/node/libs/roles/src/attester/tests.rs +++ b/node/libs/roles/src/attester/tests.rs @@ -165,25 +165,37 @@ fn test_batch_qc() { // Create QCs with increasing number of attesters. for i in 0..setup1.attester_keys.len() + 1 { - let mut qc = BatchQC::new(rng.gen()); + let mut qc = BatchQC::new(Batch { + genesis: setup1.genesis.hash(), + number: rng.gen(), + hash: rng.gen(), + }); for key in &setup1.attester_keys[0..i] { - qc.add(&key.sign_msg(qc.message.clone()), &setup1.genesis) + qc.add(&key.sign_msg(qc.message.clone()), attesters) .unwrap(); } let expected_weight: u64 = attesters.iter().take(i).map(|w| w.weight).sum(); if expected_weight >= attesters.threshold() { - qc.verify(&setup1.genesis).expect("failed to verify QC"); + qc.verify(setup1.genesis.hash(), attesters) + .expect("failed to verify QC"); } else { assert_matches!( - qc.verify(&setup1.genesis), + qc.verify(setup1.genesis.hash(), attesters), Err(Error::NotEnoughSigners { .. }) ); } // Mismatching attesters sets. - assert!(qc.verify(&setup2.genesis).is_err()); - assert!(qc.verify(&genesis3).is_err()); + assert!(qc + .verify( + setup1.genesis.hash(), + setup2.genesis.attesters.as_ref().unwrap() + ) + .is_err()); + assert!(qc + .verify(setup1.genesis.hash(), genesis3.attesters.as_ref().unwrap()) + .is_err()); } } @@ -196,21 +208,14 @@ fn test_attester_committee_weights() { let setup = Setup::new_with_weights(rng, vec![1000, 600, 800, 6000, 900, 700]); // Expected sum of the attesters weights let sums = [1000, 1600, 2400, 8400, 9300, 10000]; + let attesters = setup.genesis.attesters.as_ref().unwrap(); let msg: Batch = rng.gen(); let mut qc = BatchQC::new(msg.clone()); - for (n, weight) in sums.iter().enumerate() { - let key = &setup.attester_keys[n]; - qc.add(&key.sign_msg(msg.clone()), &setup.genesis).unwrap(); - assert_eq!( - setup - .genesis - .attesters - .as_ref() - .unwrap() - .weight_of_keys(qc.signatures.keys()), - *weight - ); + for (i, weight) in sums.iter().enumerate() { + let key = &setup.attester_keys[i]; + qc.add(&key.sign_msg(msg.clone()), attesters).unwrap(); + assert_eq!(attesters.weight_of_keys(qc.signatures.keys()), *weight); } } diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index f4ce2b35..c9a3a314 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -6,10 +6,7 @@ use super::{ ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, View, ViewNumber, WeightedValidator, }; -use crate::{ - attester::{self, BatchNumber, SyncBatch}, - validator::LeaderSelectionMode, -}; +use crate::{attester, validator::LeaderSelectionMode}; use bit_vec::BitVec; use rand::{ distributions::{Distribution, Standard}, @@ -146,12 +143,12 @@ impl Setup { pub fn push_batch(&mut self, rng: &mut impl Rng) { let batch_number = match self.0.batches.last() { Some(b) => b.number.next(), - None => BatchNumber(0), + None => attester::BatchNumber(0), }; let size: usize = rng.gen_range(500..1000); let payloads = vec![Payload((0..size).map(|_| rng.gen()).collect())]; let proof = rng.gen::<[u8; 32]>().to_vec(); - let batch = SyncBatch { + let batch = attester::SyncBatch { number: batch_number, payloads, proof, @@ -205,7 +202,7 @@ pub struct SetupInner { /// Past blocks. pub blocks: Vec, /// L1 batches - pub batches: Vec, + pub batches: Vec, /// Genesis config. pub genesis: Genesis, } diff --git a/node/libs/storage/src/batch_store/mod.rs b/node/libs/storage/src/batch_store/mod.rs index 43d060a0..1643bedd 100644 --- a/node/libs/storage/src/batch_store/mod.rs +++ b/node/libs/storage/src/batch_store/mod.rs @@ -54,21 +54,6 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { /// Range of batches persisted in storage. fn persisted(&self) -> sync::watch::Receiver; - /// Get the next L1 batch for which attesters are expected to produce a quorum certificate. - /// - /// An external node might never have a complete history of L1 batch QCs. Once the L1 batch is included on L1, - /// the external nodes might use the [attester::SyncBatch] route to obtain them, in which case they will not - /// have a QC and no reason to get them either. The main node, however, will want to have a QC for all batches. - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result>; - - /// Get the L1 batch QC from storage with the highest number. - /// - /// Returns `None` if we don't have a QC for any of the batches yet. - async fn last_batch_qc(&self, ctx: &ctx::Ctx) -> ctx::Result>; - /// Returns the [attester::SyncBatch] with the given number, which is used by peers /// to catch up with L1 batches that they might have missed if they went offline. async fn get_batch( @@ -77,25 +62,6 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { number: attester::BatchNumber, ) -> ctx::Result>; - /// Returns the [attester::Batch] with the given number, which is the `message` that - /// appears in [attester::BatchQC], and represents the content that needs to be signed - /// by the attesters. - async fn get_batch_to_sign( - &self, - ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result>; - - /// Returns the QC of the batch with the given number. - async fn get_batch_qc( - &self, - ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result>; - - /// Store the given batch QC in the storage persistently. - async fn store_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()>; - /// Queue the batch to be persisted in storage. /// `queue_next_batch()` may return BEFORE the batch is actually persisted, /// but if the call succeeded the batch is expected to be persisted eventually. @@ -294,48 +260,6 @@ impl BatchStore { Ok(batch) } - /// Retrieve the next batch number that doesn't have a QC yet and will need to be signed. - pub async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - let t = metrics::PERSISTENT_BATCH_STORE - .next_batch_to_attest_latency - .start(); - - let batch = self - .persistent - .next_batch_to_attest(ctx) - .await - .wrap("persistent.next_batch_to_attest()")?; - - t.observe(); - Ok(batch) - } - - /// Retrieve a batch to be signed. - /// - /// This might be `None` even if the L1 batch already exists, because the commitment - /// in it is populated asynchronously. - pub async fn batch_to_sign( - &self, - ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result> { - let t = metrics::PERSISTENT_BATCH_STORE - .batch_to_sign_latency - .start(); - - let batch = self - .persistent - .get_batch_to_sign(ctx, number) - .await - .wrap("persistent.get_batch_to_sign()")?; - - t.observe(); - Ok(batch) - } - /// Append batch to a queue to be persisted eventually. /// Since persisting a batch may take a significant amount of time, /// BatchStore contains a queue of batches waiting to be persisted. @@ -365,27 +289,6 @@ impl BatchStore { Ok(()) } - /// Wait until the database has a batch, then attach the corresponding QC. - #[tracing::instrument(skip_all, fields(l1_batch = %qc.message.number))] - pub async fn persist_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { - let t = metrics::BATCH_STORE.persist_batch_qc.start(); - // The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload - // isn't yet available, but to be safe we can wait for the availability signal here as well. - sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { - qc.message.number < persisted.next() - }) - .await?; - // Now it's definitely safe to store it. - metrics::BATCH_STORE - .last_persisted_batch_qc - .set(qc.message.number.0); - - self.persistent.store_qc(ctx, qc).await?; - - t.observe(); - Ok(()) - } - /// Waits until the given batch is queued (in memory, or persisted). /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. #[tracing::instrument(skip_all, fields(l1_batch = %number))] diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 2ca8858e..487356ab 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -5,7 +5,7 @@ use crate::{ }; use anyhow::Context as _; use std::{ - collections::{HashMap, VecDeque}, + collections::VecDeque, sync::{Arc, Mutex}, }; use zksync_concurrency::{ctx, sync}; @@ -20,10 +20,8 @@ struct BlockStoreInner { #[derive(Debug)] struct BatchStoreInner { - genesis: validator::Genesis, persisted: sync::watch::Sender, batches: Mutex>, - certs: Mutex>, } /// In-memory block store. @@ -70,12 +68,10 @@ impl BlockStore { impl BatchStore { /// New In-memory `BatchStore`. - pub fn new(genesis: validator::Genesis, first: attester::BatchNumber) -> Self { + pub fn new(first: attester::BatchNumber) -> Self { Self(Arc::new(BatchStoreInner { - genesis, persisted: sync::watch::channel(BatchStoreState { first, last: None }).0, batches: Mutex::default(), - certs: Mutex::default(), })) } } @@ -133,60 +129,6 @@ impl PersistentBatchStore for BatchStore { self.0.persisted.subscribe() } - async fn last_batch_qc(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - let certs = self.0.certs.lock().unwrap(); - let last_batch_number = certs.keys().max().unwrap(); - Ok(certs.get(last_batch_number).cloned()) - } - - async fn next_batch_to_attest( - &self, - _ctx: &ctx::Ctx, - ) -> ctx::Result> { - let batches = self.0.batches.lock().unwrap(); - let certs = self.0.certs.lock().unwrap(); - - Ok(batches - .iter() - .map(|b| b.number) - .find(|n| !certs.contains_key(n))) - } - - async fn get_batch_to_sign( - &self, - ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result> { - // Here we just produce some deterministic mock hash. The real hash is available in the database. - // and contains a commitment to the data submitted to L1. It is *not* over SyncBatch. - let Some(batch) = self.get_batch(ctx, number).await? else { - return Ok(None); - }; - - let bz = zksync_protobuf::canonical(&batch); - let hash = zksync_consensus_crypto::keccak256::Keccak256::new(&bz); - - Ok(Some(attester::Batch { - number, - hash: attester::BatchHash(hash), - genesis: self.0.genesis.hash(), - })) - } - - async fn get_batch_qc( - &self, - _ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result> { - let certs = self.0.certs.lock().unwrap(); - Ok(certs.get(&number).cloned()) - } - - async fn store_qc(&self, _ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { - self.0.certs.lock().unwrap().insert(qc.message.number, qc); - Ok(()) - } - async fn get_batch( &self, _ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/testonly/mod.rs b/node/libs/storage/src/testonly/mod.rs index a6329ba8..38c4a4ba 100644 --- a/node/libs/storage/src/testonly/mod.rs +++ b/node/libs/storage/src/testonly/mod.rs @@ -89,7 +89,7 @@ impl TestMemoryStorage { first: validator::BlockNumber, ) -> Self { let im_blocks = in_memory::BlockStore::new(genesis.clone(), first); - let im_batches = in_memory::BatchStore::new(genesis.clone(), attester::BatchNumber(0)); + let im_batches = in_memory::BatchStore::new(attester::BatchNumber(0)); Self::new_with_im(ctx, im_blocks, im_batches).await } diff --git a/node/libs/storage/src/tests.rs b/node/libs/storage/src/tests.rs index 11fa5431..36266bba 100644 --- a/node/libs/storage/src/tests.rs +++ b/node/libs/storage/src/tests.rs @@ -32,7 +32,7 @@ async fn test_inmemory_batch_store() { let mut setup = Setup::new(rng, 3); setup.push_batches(rng, 5); - let store = &testonly::in_memory::BatchStore::new(setup.genesis.clone(), BatchNumber(0)); + let store = &testonly::in_memory::BatchStore::new(BatchNumber(0)); let mut want = vec![]; for batch in &setup.batches { store.queue_next_batch(ctx, batch.clone()).await.unwrap(); diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 97d49fbf..cb84e48d 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -7,12 +7,13 @@ use std::{ fs, io, net::SocketAddr, path::PathBuf, + sync::Arc, }; use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use zksync_concurrency::{ctx, net, scope, time}; +use zksync_concurrency::{ctx, net, time}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; -use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner}; +use zksync_consensus_executor::{self as executor, attestation}; use zksync_consensus_network::http; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner}; @@ -259,24 +260,12 @@ impl Configs { pub async fn make_executor( &self, ctx: &ctx::Ctx, - ) -> ctx::Result<(executor::Executor, TestExecutorRunner)> { + ) -> ctx::Result<(executor::Executor, TestMemoryStorageRunner)> { let replica_store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?; let store = TestMemoryStorage::new(ctx, &self.app.genesis).await; - // We don't have an API to poll in this setup, we can only create a local store based attestation client. - let (attestation_status, attestation_status_runner) = - AttestationStatusRunner::init_from_store( - ctx, - store.batches.clone(), - time::Duration::seconds(1), - self.app.genesis.hash(), - ) - .await?; - - let runner = TestExecutorRunner { - storage_runner: store.runner, - attestation_status_runner, - }; + let attestation = Arc::new(attestation::Controller::new(self.app.attester_key.clone())); + let runner = store.runner; let e = executor::Executor { config: executor::Config { @@ -314,12 +303,7 @@ impl Configs { self.app.max_payload_size, )), }), - attester: self - .app - .attester_key - .as_ref() - .map(|key| executor::Attester { key: key.clone() }), - attestation_status, + attestation, }; Ok((e, runner)) } @@ -346,20 +330,3 @@ fn load_private_key(path: &PathBuf) -> anyhow::Result> { // Load and return a single private key. Ok(rustls_pemfile::private_key(&mut reader).map(|key| key.expect("Private key not found"))?) } - -pub struct TestExecutorRunner { - storage_runner: TestMemoryStorageRunner, - attestation_status_runner: AttestationStatusRunner, -} - -impl TestExecutorRunner { - /// Runs the storage and the attestation status. - pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - scope::run!(ctx, |ctx, s| async { - s.spawn(self.storage_runner.run(ctx)); - s.spawn(self.attestation_status_runner.run(ctx)); - Ok(()) - }) - .await - } -}