diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs deleted file mode 100644 index e0bd63d5..00000000 --- a/node/actors/executor/src/attestation.rs +++ /dev/null @@ -1,256 +0,0 @@ -//! Module to publish attestations over batches. - -use crate::Attester; -use anyhow::Context; -use std::sync::Arc; -use tracing::Instrument; -use zksync_concurrency::{ctx, sync, time}; -use zksync_consensus_network::gossip::{ - AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, -}; -use zksync_consensus_roles::attester; -use zksync_consensus_storage::{BatchStore, BlockStore}; - -/// Polls the database for new batches to be signed and publishes them to the gossip channel. -pub(super) struct AttesterRunner { - block_store: Arc, - batch_store: Arc, - attester: Attester, - publisher: BatchVotesPublisher, - status: AttestationStatusReceiver, - poll_interval: time::Duration, -} - -impl AttesterRunner { - /// Create a new instance of a runner. - pub(super) fn new( - block_store: Arc, - batch_store: Arc, - attester: Attester, - publisher: BatchVotesPublisher, - status: AttestationStatusReceiver, - poll_interval: time::Duration, - ) -> Self { - Self { - block_store, - batch_store, - attester, - publisher, - status, - poll_interval, - } - } - /// Poll the database for new L1 batches and publish our signature over the batch. - pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { - let public_key = self.attester.key.public(); - // TODO: In the future when we have attester rotation these checks will have to be checked inside the loop. - let Some(attesters) = self.block_store.genesis().attesters.as_ref() else { - tracing::warn!("Attester key is set, but the attester committee is empty."); - return Ok(()); - }; - if !attesters.contains(&public_key) { - tracing::warn!("Attester key is set, but not part of the attester committee."); - return Ok(()); - } - - let genesis = self.block_store.genesis().hash(); - - // Subscribe starts as seen but we don't want to miss the first item. - self.status.mark_changed(); - - loop { - async { - let Some(batch_number) = sync::changed(ctx, &mut self.status) - .instrument(tracing::info_span!("wait_for_attestation_status")) - .await? - .next_batch_to_attest - else { - return Ok(()); - }; - - tracing::info!(%batch_number, "attestation status"); - - // We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence - // to be indicated in memory (which itself relies on polling). This happens once we have the commitment, - // which for nodes that get the blocks through BFT should happen after execution. Nodes which - // rely on batch sync don't participate in attestations as they need the batch on L1 first. - self.batch_store - .wait_until_persisted(ctx, batch_number) - .await?; - - // Try to get the next batch to sign; the commitment might not be available just yet. - let batch = AttesterRunner::wait_for_batch_to_sign( - ctx, - batch_number, - &self.batch_store, - self.poll_interval, - ) - .await?; - - // The certificates might be collected out of order because of how gossip works; - // we could query the DB to see if we already have a QC, or we can just go ahead - // and publish our vote, and let others ignore it. - - tracing::info!(%batch_number, "publishing attestation"); - - // We only have to publish a vote once; future peers can pull it from the register. - self.publisher - .publish(attesters, &genesis, &self.attester.key, batch) - .await - .context("publish")?; - - ctx::Ok(()) - } - .instrument(tracing::info_span!("attestation_iter")) - .await?; - } - } - - /// Wait for the batch commitment to become available. - #[tracing::instrument(skip_all, fields(l1_batch = %number))] - async fn wait_for_batch_to_sign( - ctx: &ctx::Ctx, - number: attester::BatchNumber, - batch_store: &BatchStore, - poll_interval: time::Duration, - ) -> ctx::Result { - loop { - if let Some(batch) = batch_store - .batch_to_sign(ctx, number) - .await - .context("batch_to_sign")? - { - return Ok(batch); - } else { - ctx.sleep(poll_interval).await?; - } - } - } -} - -/// An interface which is used by attesters and nodes collecting votes over gossip to determine -/// which is the next batch they are all supposed to be voting on, according to the main node. -/// -/// This is a convenience interface to be used with the [AttestationStatusRunner]. -#[async_trait::async_trait] -pub trait AttestationStatusClient: 'static + Send + Sync { - /// Get the next batch number for which the main node expects a batch QC to be formed. - /// - /// The API might return an error while genesis is being created, which we represent with `None` - /// here and mean that we'll have to try again later. - /// - /// The genesis hash is returned along with the new batch number to facilitate detecting reorgs - /// on the main node as soon as possible and prevent inconsistent state from entering the system. - async fn attestation_status( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result>; -} - -/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch]. -/// -/// This is provided for convenience. -pub struct AttestationStatusRunner { - status: Arc, - client: Box, - poll_interval: time::Duration, -} - -impl AttestationStatusRunner { - /// Create a new [AttestationStatusWatch] and an [AttestationStatusRunner] to poll the main node. - /// - /// It polls the [AttestationStatusClient] until it returns a value to initialize the status with. - pub async fn init( - _ctx: &ctx::Ctx, - client: Box, - poll_interval: time::Duration, - genesis: attester::GenesisHash, - ) -> ctx::Result<(Arc, Self)> { - let status = Arc::new(AttestationStatusWatch::new(genesis)); - let runner = Self { - status: status.clone(), - client, - poll_interval, - }; - // This would initialise the status to some value, however the EN was rolled out first without the main node API. - // runner.poll_until_some(ctx).await?; - Ok((status, runner)) - } - - /// Initialize an [AttestationStatusWatch] based on a [BatchStore] and return it along with the [AttestationStatusRunner]. - pub async fn init_from_store( - ctx: &ctx::Ctx, - batch_store: Arc, - poll_interval: time::Duration, - genesis: attester::GenesisHash, - ) -> ctx::Result<(Arc, Self)> { - Self::init( - ctx, - Box::new(LocalAttestationStatusClient { - genesis, - batch_store, - }), - poll_interval, - genesis, - ) - .await - } - - /// Run the poll loop. - pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - match self.poll_forever(ctx).await { - Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), - Err(ctx::Error::Internal(err)) => Err(err), - } - } - - /// Poll the client forever in a loop or until canceled. - async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { - loop { - self.poll_until_some(ctx).await?; - ctx.sleep(self.poll_interval).await?; - } - } - - /// Poll the client until some data is returned and write it into the status. - async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { - loop { - match self.client.attestation_status(ctx).await { - Ok(Some((genesis, next_batch_to_attest))) => { - self.status.update(genesis, next_batch_to_attest).await?; - return Ok(()); - } - Ok(None) => { - tracing::info!("waiting for attestation status...") - } - Err(error) => { - tracing::error!( - ?error, - "failed to poll attestation status, retrying later..." - ) - } - } - ctx.sleep(self.poll_interval).await?; - } - } -} - -/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. -struct LocalAttestationStatusClient { - /// We don't expect the genesis to change while the main node is running, - /// so we can just cache the genesis hash and return it for every request. - genesis: attester::GenesisHash, - batch_store: Arc, -} - -#[async_trait::async_trait] -impl AttestationStatusClient for LocalAttestationStatusClient { - async fn attestation_status( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - let next_batch_to_attest = self.batch_store.next_batch_to_attest(ctx).await?; - - Ok(next_batch_to_attest.map(|n| (self.genesis, n))) - } -} diff --git a/node/actors/executor/src/io.rs b/node/actors/executor/src/io.rs index 914ec4b8..e6fb7dd4 100644 --- a/node/actors/executor/src/io.rs +++ b/node/actors/executor/src/io.rs @@ -36,8 +36,8 @@ impl Dispatcher { } /// Method to start the IO dispatcher. It is simply a loop to receive messages from the actors and then forward them. - pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - scope::run!(ctx, |ctx, s| async { + pub(super) async fn run(mut self, ctx: &ctx::Ctx) { + let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { // Start a task to handle the messages from the consensus actor. s.spawn(async { while let Ok(msg) = self.consensus_output.recv(ctx).await { @@ -65,6 +65,6 @@ impl Dispatcher { Ok(()) }) - .await + .await; } } diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index adaf4016..2cd86e34 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -2,20 +2,19 @@ use crate::io::Dispatcher; use anyhow::Context as _; use network::http; -pub use network::RpcConfig; +pub use network::{gossip::attestation, RpcConfig}; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; use zksync_concurrency::{ctx, limiter, net, scope, time}; use zksync_consensus_bft as bft; -use zksync_consensus_network::{self as network, gossip::AttestationStatusWatch}; -use zksync_consensus_roles::{attester, node, validator}; +use zksync_consensus_network as network; +use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; use zksync_protobuf::kB; -pub mod attestation; mod io; #[cfg(test)] mod tests; @@ -31,13 +30,6 @@ pub struct Validator { pub payload_manager: Box, } -/// Validator-related part of [`Executor`]. -#[derive(Debug)] -pub struct Attester { - /// Consensus network configuration. - pub key: attester::SecretKey, -} - /// Config of the node executor. #[derive(Debug)] pub struct Config { @@ -98,10 +90,9 @@ pub struct Executor { pub batch_store: Arc, /// Validator-specific node data. pub validator: Option, - /// Validator-specific node data. - pub attester: Option, - /// Status showing where the main node wants attester to cast their votes. - pub attestation_status: Arc, + /// Attestation controller. Caller should actively configure the batch + /// for which the attestation votes should be collected. + pub attestation: Arc, } impl Executor { @@ -112,7 +103,6 @@ impl Executor { public_addr: self.config.public_addr.clone(), gossip: self.config.gossip(), validator_key: self.validator.as_ref().map(|v| v.key.clone()), - attester_key: self.attester.as_ref().map(|a| a.key.clone()), ping_timeout: Some(time::Duration::seconds(10)), max_block_size: self.config.max_payload_size.saturating_add(kB), max_batch_size: self.config.max_batch_size.saturating_add(kB), @@ -137,35 +127,20 @@ impl Executor { tracing::debug!("Starting actors in separate threads."); scope::run!(ctx, |ctx, s| async { - s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") }); + s.spawn(async { + dispatcher.run(ctx).await; + Ok(()) + }); let (net, runner) = network::Network::new( network_config, self.block_store.clone(), self.batch_store.clone(), network_actor_pipe, - self.attestation_status.clone(), + self.attestation, ); net.register_metrics(); s.spawn(async { runner.run(ctx).await.context("Network stopped") }); - if let Some(attester) = self.attester { - tracing::info!("Running the node in attester mode."); - let runner = attestation::AttesterRunner::new( - self.block_store.clone(), - self.batch_store.clone(), - attester, - net.batch_vote_publisher(), - self.attestation_status.subscribe(), - self.config.batch_poll_interval, - ); - s.spawn(async { - runner.run(ctx).await?; - Ok(()) - }); - } else { - tracing::info!("Running the node in non-attester mode."); - } - if let Some(debug_config) = self.config.debug_page { s.spawn(async { http::DebugPageServer::new(debug_config, net) diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 50ba7297..4e88f5ef 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -1,10 +1,9 @@ //! High-level tests for `Executor`. use super::*; -use attestation::{AttestationStatusClient, AttestationStatusRunner}; use rand::Rng as _; -use std::sync::{atomic::AtomicU64, Mutex}; +//use std::sync::{atomic::AtomicU64, Mutex}; use tracing::Instrument as _; -use zksync_concurrency::{sync, testonly::abort_on_panic}; +use zksync_concurrency::testonly::abort_on_panic; use zksync_consensus_bft as bft; use zksync_consensus_network::testonly::{new_configs, new_fullnode}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; @@ -31,8 +30,8 @@ fn config(cfg: &network::Config) -> Config { /// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch] /// that will never be updated. -fn never_attest(genesis: &validator::Genesis) -> Arc { - Arc::new(AttestationStatusWatch::new(genesis.hash())) +fn never_attest() -> Arc { + attestation::Controller::new(None).into() } fn validator( @@ -41,7 +40,6 @@ fn validator( batch_store: Arc, replica_store: impl ReplicaStore, ) -> Executor { - let attestation_status = never_attest(block_store.genesis()); Executor { config: config(cfg), block_store, @@ -51,8 +49,7 @@ fn validator( replica_store: Box::new(replica_store), payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), - attester: None, - attestation_status, + attestation: never_attest(), } } @@ -61,14 +58,12 @@ fn fullnode( block_store: Arc, batch_store: Arc, ) -> Executor { - let attestation_status = never_attest(block_store.genesis()); Executor { config: config(cfg), block_store, batch_store, validator: None, - attester: None, - attestation_status, + attestation: never_attest(), } } @@ -316,88 +311,3 @@ async fn test_validator_syncing_from_fullnode() { .await .unwrap(); } - -/// Test that the AttestationStatusRunner initialises and then polls the status. -#[tokio::test] -async fn test_attestation_status_runner() { - abort_on_panic(); - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(5)); - let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); - let rng = &mut ctx.rng(); - - let genesis: attester::GenesisHash = rng.gen(); - - #[derive(Clone)] - struct MockAttestationStatus { - genesis: Arc>, - batch_number: Arc, - } - - #[async_trait::async_trait] - impl AttestationStatusClient for MockAttestationStatus { - async fn attestation_status( - &self, - _ctx: &ctx::Ctx, - ) -> ctx::Result> { - let curr = self - .batch_number - .fetch_add(1u64, std::sync::atomic::Ordering::Relaxed); - if curr == 0 { - // Return None initially to see that the runner will deal with it. - Ok(None) - } else { - // The first actual result will be 1 on the 2nd poll. - let genesis = *self.genesis.lock().unwrap(); - let next_batch_to_attest = attester::BatchNumber(curr); - Ok(Some((genesis, next_batch_to_attest))) - } - } - } - - let res = scope::run!(ctx, |ctx, s| async { - let client = MockAttestationStatus { - genesis: Arc::new(Mutex::new(genesis)), - batch_number: Arc::new(AtomicU64::default()), - }; - let (status, runner) = AttestationStatusRunner::init( - ctx, - Box::new(client.clone()), - time::Duration::milliseconds(100), - genesis, - ) - .await - .unwrap(); - - let mut recv_status = status.subscribe(); - recv_status.mark_changed(); - - // Check that the value has *not* been initialised to a non-default value. - { - let status = sync::changed(ctx, &mut recv_status).await?; - assert!(status.next_batch_to_attest.is_none()); - } - // Now start polling for new values. Starting in the foreground because we want it to fail in the end. - s.spawn(runner.run(ctx)); - // Check that polling sets the value. - { - let status = sync::changed(ctx, &mut recv_status).await?; - assert!(status.next_batch_to_attest.is_some()); - assert_eq!(status.next_batch_to_attest.unwrap().0, 1); - } - // Change the genesis returned by the client. It should cause the scope to fail. - { - let mut genesis = client.genesis.lock().unwrap(); - *genesis = rng.gen(); - } - Ok(()) - }) - .await; - - match res { - Ok(()) => panic!("expected to fail when the genesis changed"), - Err(e) => assert!( - e.to_string().contains("genesis changed"), - "only expect failures due to genesis change; got: {e}" - ), - } -} diff --git a/node/actors/network/src/config.rs b/node/actors/network/src/config.rs index 2d47cf80..03e5020b 100644 --- a/node/actors/network/src/config.rs +++ b/node/actors/network/src/config.rs @@ -1,7 +1,7 @@ //! Network actor configs. use std::collections::{HashMap, HashSet}; use zksync_concurrency::{limiter, net, time}; -use zksync_consensus_roles::{attester, node, validator}; +use zksync_consensus_roles::{node, validator}; /// How often we should retry to establish a connection to a validator. /// TODO(gprusak): once it becomes relevant, choose a more appropriate retry strategy. @@ -26,7 +26,7 @@ pub struct RpcConfig { pub get_batch_timeout: Option, /// Max rate of sending/receiving consensus messages. pub consensus_rate: limiter::Rate, - /// Max rate of sending/receiving l1 batch votes messages. + /// Max rate of sending/receiving PushBatchVotes RPCs. pub push_batch_votes_rate: limiter::Rate, } @@ -98,9 +98,6 @@ pub struct Config { /// Private key of the validator. /// None if the node is NOT a validator. pub validator_key: Option, - /// Private key of the attester. - /// None if the node is NOT a attester. - pub attester_key: Option, /// Maximal size of the proto-encoded `validator::FinalBlock` in bytes. pub max_block_size: usize, /// Maximal size of the proto-encoded `attester::SyncBatch` in bytes. diff --git a/node/actors/network/src/gossip/attestation/metrics.rs b/node/actors/network/src/gossip/attestation/metrics.rs new file mode 100644 index 00000000..4e5137da --- /dev/null +++ b/node/actors/network/src/gossip/attestation/metrics.rs @@ -0,0 +1,19 @@ +/// Metrics related to the gossiping of L1 batch votes. +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "network_gossip_attestation")] +pub(crate) struct Metrics { + /// Batch to be attested. + pub(crate) batch_number: vise::Gauge, + + /// Number of members in the attester committee. + pub(crate) committee_size: vise::Gauge, + + /// Number of votes collected for the current batch. + pub(crate) votes_collected: vise::Gauge, + + /// Weight percentage (in range [0,1]) of votes collected for the current batch. + pub(crate) weight_collected: vise::Gauge, +} + +#[vise::register] +pub(super) static METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/network/src/gossip/attestation/mod.rs b/node/actors/network/src/gossip/attestation/mod.rs new file mode 100644 index 00000000..6effd612 --- /dev/null +++ b/node/actors/network/src/gossip/attestation/mod.rs @@ -0,0 +1,368 @@ +//! Attestation. +use crate::watch::Watch; +use anyhow::Context as _; +use std::{collections::HashSet, fmt, sync::Arc}; +use zksync_concurrency::{ctx, sync}; +use zksync_consensus_roles::attester; + +mod metrics; +#[cfg(test)] +mod tests; + +/// Configuration of the attestation Controller. +/// It determines what should be attested and by whom. +#[derive(Debug, Clone, PartialEq)] +pub struct Info { + /// Batch to attest. + pub batch_to_attest: attester::Batch, + /// Committee that should attest the batch. + pub committee: Arc, +} + +// Internal attestation state: info and the set of votes collected so far. +#[derive(Clone)] +struct State { + info: Arc, + /// Votes collected so far. + votes: im::HashMap>>, + // Total weight of the votes collected. + total_weight: attester::Weight, +} + +/// Diff between 2 states. +pub(crate) struct Diff { + /// New votes. + pub(crate) votes: Vec>>, + /// New info, if changed. + pub(crate) info: Option>, +} + +impl Diff { + fn is_empty(&self) -> bool { + self.votes.is_empty() && self.info.is_none() + } +} + +impl State { + /// Returns a diff between `self` state and `old` state. + /// Diff contains votes which are present is `self`, but not in `old`. + fn diff(&self, old: &Option) -> Diff { + let Some(old) = old.as_ref() else { + return Diff { + info: Some(self.info.clone()), + votes: self.votes.values().cloned().collect(), + }; + }; + if self.info.batch_to_attest.number != old.info.batch_to_attest.number { + return Diff { + info: Some(self.info.clone()), + votes: self.votes.values().cloned().collect(), + }; + } + + Diff { + info: None, + votes: self + .votes + .iter() + .filter(|(k, _)| !old.votes.contains_key(k)) + .map(|(_, v)| v.clone()) + .collect(), + } + } + + /// Verifies and adds a vote. + /// Noop if vote is not signed by a committee member or already inserted. + /// Returns an error if genesis doesn't match or the signature is invalid. + fn insert_vote(&mut self, vote: Arc>) -> anyhow::Result<()> { + anyhow::ensure!( + vote.msg.genesis == self.info.batch_to_attest.genesis, + "Genesis mismatch" + ); + if vote.msg.number != self.info.batch_to_attest.number { + return Ok(()); + } + anyhow::ensure!( + vote.msg.hash == self.info.batch_to_attest.hash, + "batch hash mismatch" + ); + let Some(weight) = self.info.committee.weight(&vote.key) else { + anyhow::bail!( + "received vote signed by an inactive attester: {:?}", + vote.key + ); + }; + if self.votes.contains_key(&vote.key) { + return Ok(()); + } + // Verify signature only after checking all the other preconditions. + vote.verify().context("verify")?; + tracing::info!("collected vote with weight {weight} from {:?}", vote.key); + self.votes.insert(vote.key.clone(), vote); + self.total_weight += weight; + Ok(()) + } + + fn insert_votes( + &mut self, + votes: impl Iterator>>, + ) -> anyhow::Result<()> { + let mut done = HashSet::new(); + for vote in votes { + // Disallow multiple entries for the same key: + // it is important because a malicious attester may spam us with + // new versions and verifying signatures is expensive. + if done.contains(&vote.key) { + anyhow::bail!("duplicate entry for {:?}", vote.key); + } + done.insert(vote.key.clone()); + self.insert_vote(vote)?; + } + Ok(()) + } + + fn cert(&self) -> Option { + if self.total_weight < self.info.committee.threshold() { + return None; + } + let mut sigs = attester::MultiSig::default(); + for vote in self.votes.values() { + sigs.add(vote.key.clone(), vote.sig.clone()); + } + Some(attester::BatchQC { + message: self.info.batch_to_attest.clone(), + signatures: sigs, + }) + } +} + +/// Receiver of state diffs. +pub(crate) struct DiffReceiver { + prev: Option, + recv: sync::watch::Receiver>, +} + +impl DiffReceiver { + /// Waits for the next state diff. + pub(crate) async fn wait_for_diff(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled { + loop { + let Some(new) = (*sync::changed(ctx, &mut self.recv).await?).clone() else { + continue; + }; + let diff = new.diff(&self.prev); + self.prev = Some(new); + if !diff.is_empty() { + return Ok(diff); + } + } + } +} + +/// `Controller` manages the attestation state. +/// It maintains a set of votes matching the attestation info. +/// It allows for +/// * adding votes to the state +/// * subscribing to the vote set changes +/// * waiting for the certificate to be collected +/// +/// It also keeps an attester key used to sign the batch vote, +/// whenever it belongs the current attester committee. +/// Signing happens automatically whenever the committee is updated. +/// +/// Expected usage: +/// ``` +/// let ctrl = Arc::new(attestation::Controller::new(Some(key))); +/// // Check what is the number of the next batch to be attested in a +/// // global attestation registry (i.e. L1 chain state). +/// let first : attester::BatchNumber = ... +/// scope::run!(ctx, |ctx,s| async { +/// // Loop starting attestation whenever global attestation state progresses. +/// s.spawn(async { +/// let mut next = first; +/// loop { +/// // Based on the local storage, compute the next expected batch hash +/// // and the committee that should attest it. +/// ... +/// let info = attestation::Info { +/// batch_to_attest: attester::Batch { +/// number: next, +/// ... +/// }, +/// committee: ..., +/// }; +/// ctrl.start_attestation(Arc::new(info)).unwrap(); +/// // Wait for the attestation to progress, by observing the +/// // global attestation registry. +/// next = ...; +/// } +/// }); +/// s.spawn(async { +/// // Loop waiting for a certificate to be collected and submitting +/// // it to the global registry +/// loop { +/// let mut next = first; +/// if let Some(qc) = ctrl.wait_for_cert(ctx, next).await?; +/// // Submit the certificate to the global registry. +/// ... +/// next = next.next(); +/// } +/// }); +/// +/// // Make the executor establish the p2p network and +/// // collect the attestation votes. +/// executor::Executor { +/// ... +/// attestation: ctrl.clone(), +/// }.run(ctx).await; +/// } +/// ``` +pub struct Controller { + /// Key to automatically vote for batches. + /// None, if the current node is not an attester. + key: Option, + /// Internal state of the controller. + state: Watch>, +} + +impl fmt::Debug for Controller { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("StateWatch") + .field("key", &self.key) + .finish_non_exhaustive() + } +} + +impl Controller { + /// Constructs Controller. + /// `key` will be used for automatically signing votes. + pub fn new(key: Option) -> Self { + Self { + key, + state: Watch::new(None), + } + } + + /// Subscribes to state diffs. + pub(crate) fn subscribe(&self) -> DiffReceiver { + let mut recv = self.state.subscribe(); + recv.mark_changed(); + DiffReceiver { prev: None, recv } + } + + /// Inserts votes to the state. + /// Irrelevant votes are silently ignored. + /// Returns an error if an invalid vote has been found. + /// It is possible that some votes have been added to the state + /// even if eventually an error was returned. + pub(crate) async fn insert_votes( + &self, + votes: impl Iterator>>, + ) -> anyhow::Result<()> { + let locked = self.state.lock().await; + let Some(mut state) = locked.borrow().clone() else { + return Ok(()); + }; + let before = state.total_weight; + let res = state.insert_votes(votes); + if state.total_weight > before { + metrics::METRICS.votes_collected.set(state.votes.len()); + #[allow(clippy::float_arithmetic)] + metrics::METRICS + .weight_collected + .set(state.total_weight as f64 / state.info.committee.total_weight() as f64); + locked.send_replace(Some(state)); + } + res + } + + /// Returns votes matching the `want` batch. + pub(crate) fn votes( + &self, + want: &attester::Batch, + ) -> Vec>> { + let state = self.state.subscribe(); + let state = state.borrow(); + let Some(state) = &*state else { return vec![] }; + if &state.info.batch_to_attest != want { + return vec![]; + } + state.votes.values().cloned().collect() + } + + /// Waits for the certificate for a batch with the given number to be collected. + /// Returns None iff attestation already skipped to collecting certificate for some later batch. + pub async fn wait_for_cert( + &self, + ctx: &ctx::Ctx, + n: attester::BatchNumber, + ) -> ctx::OrCanceled> { + let recv = &mut self.state.subscribe(); + recv.mark_changed(); + loop { + let state = sync::changed(ctx, recv).await?; + let Some(state) = state.as_ref() else { + continue; + }; + if state.info.batch_to_attest.number < n { + continue; + }; + if state.info.batch_to_attest.number > n { + return Ok(None); + } + if let Some(qc) = state.cert() { + return Ok(Some(qc)); + } + } + } + + /// Updates the internal configuration to start collecting votes for a new batch. + /// Clears the votes collected for the previous info. + /// Batch number has to increase with each update. + #[tracing::instrument(name = "attestation::Controller::start_attestation", skip_all)] + pub async fn start_attestation(&self, info: Arc) -> anyhow::Result<()> { + let locked = self.state.lock().await; + let old = locked.borrow().clone(); + if let Some(old) = old.as_ref() { + if *old.info == *info { + return Ok(()); + } + anyhow::ensure!( + old.info.batch_to_attest.genesis == info.batch_to_attest.genesis, + "tried to change genesis" + ); + anyhow::ensure!( + old.info.batch_to_attest.number < info.batch_to_attest.number, + "tried to decrease batch number" + ); + } + tracing::info!( + "started collecting votes for batch {:?}", + info.batch_to_attest.number + ); + let mut new = State { + info, + votes: im::HashMap::new(), + total_weight: 0, + }; + if let Some(key) = self.key.as_ref() { + if new.info.committee.contains(&key.public()) { + let vote = key.sign_msg(new.info.batch_to_attest.clone()); + // This is our own vote, so it always should be valid. + new.insert_vote(Arc::new(vote)).unwrap(); + } + } + metrics::METRICS + .batch_number + .set(new.info.batch_to_attest.number.0); + metrics::METRICS + .committee_size + .set(new.info.committee.len()); + metrics::METRICS.votes_collected.set(new.votes.len()); + #[allow(clippy::float_arithmetic)] + metrics::METRICS + .weight_collected + .set(new.total_weight as f64 / new.info.committee.total_weight() as f64); + locked.send_replace(Some(new)); + Ok(()) + } +} diff --git a/node/actors/network/src/gossip/attestation/tests.rs b/node/actors/network/src/gossip/attestation/tests.rs new file mode 100644 index 00000000..2a5976b4 --- /dev/null +++ b/node/actors/network/src/gossip/attestation/tests.rs @@ -0,0 +1,200 @@ +use super::*; +use rand::{seq::SliceRandom as _, Rng as _}; +use zksync_concurrency::testonly::abort_on_panic; + +type Vote = Arc>; + +#[derive(Default, Debug, PartialEq)] +struct Votes(im::HashMap); + +impl> From for Votes { + fn from(x: T) -> Self { + Self(x.into_iter().map(|v| (v.key.clone(), v)).collect()) + } +} + +#[tokio::test] +async fn test_insert_votes() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + + let ctrl = Controller::new(None); + let genesis: attester::GenesisHash = rng.gen(); + let first: attester::BatchNumber = rng.gen(); + for i in 0..3 { + tracing::info!("iteration {i}"); + let keys: Vec = (0..8).map(|_| rng.gen()).collect(); + let info = Arc::new(Info { + batch_to_attest: attester::Batch { + genesis, + number: first + i, + hash: rng.gen(), + }, + committee: attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { + key: k.public(), + weight: 1250, + })) + .unwrap() + .into(), + }); + let ctrl_votes = || Votes::from(ctrl.votes(&info.batch_to_attest)); + ctrl.start_attestation(info.clone()).await.unwrap(); + assert_eq!(Votes::from([]), ctrl_votes()); + let mut recv = ctrl.subscribe(); + let diff = recv.wait_for_diff(ctx).await.unwrap(); + assert_eq!(diff.info.as_ref(), Some(&info)); + assert_eq!(Votes::default(), diff.votes.into()); + + let all_votes: Vec = keys + .iter() + .map(|k| k.sign_msg(info.batch_to_attest.clone()).into()) + .collect(); + + tracing::info!("Initial votes."); + ctrl.insert_votes(all_votes[0..3].iter().cloned()) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes[0..3].iter().cloned()), ctrl_votes()); + let diff = recv.wait_for_diff(ctx).await.unwrap(); + assert!(diff.info.is_none()); + assert_eq!( + Votes::from(all_votes[0..3].iter().cloned()), + diff.votes.into() + ); + + tracing::info!("Adding votes gradually."); + ctrl.insert_votes(all_votes[3..5].iter().cloned()) + .await + .unwrap(); + ctrl.insert_votes(all_votes[5..7].iter().cloned()) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + let diff = recv.wait_for_diff(ctx).await.unwrap(); + assert!(diff.info.is_none()); + assert_eq!( + Votes::from(all_votes[3..7].iter().cloned()), + diff.votes.into() + ); + + tracing::info!("Readding already inserded votes (noop)."); + ctrl.insert_votes(all_votes[2..6].iter().cloned()) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + + tracing::info!("Adding votes out of committee (error)."); + assert!(ctrl + .insert_votes((0..3).map(|_| { + let k: attester::SecretKey = rng.gen(); + k.sign_msg(info.batch_to_attest.clone()).into() + })) + .await + .is_err()); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + + tracing::info!("Adding votes for different batch (noop)."); + ctrl.insert_votes((0..3).map(|_| { + let k: attester::SecretKey = rng.gen(); + k.sign_msg(attester::Batch { + genesis: info.batch_to_attest.genesis, + number: rng.gen(), + hash: rng.gen(), + }) + .into() + })) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + + tracing::info!("Adding incorrect votes (error)."); + let mut bad_vote = (*all_votes[7]).clone(); + bad_vote.sig = rng.gen(); + assert!(ctrl + .insert_votes([bad_vote.into()].into_iter()) + .await + .is_err()); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); + + tracing::info!("Add the last vote mixed with already added votes."); + ctrl.insert_votes(all_votes[5..].iter().cloned()) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes.clone()), ctrl_votes()); + let diff = recv.wait_for_diff(ctx).await.unwrap(); + assert!(diff.info.is_none()); + assert_eq!( + Votes::from(all_votes[7..].iter().cloned()), + diff.votes.into() + ); + } +} + +#[tokio::test] +async fn test_wait_for_cert() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + + let ctrl = Controller::new(None); + let genesis: attester::GenesisHash = rng.gen(); + let first: attester::BatchNumber = rng.gen(); + + for i in 0..10 { + tracing::info!("iteration {i}"); + let committee_size = rng.gen_range(1..20); + let keys: Vec = (0..committee_size).map(|_| rng.gen()).collect(); + let info = Arc::new(Info { + batch_to_attest: attester::Batch { + genesis, + number: first + i, + hash: rng.gen(), + }, + committee: attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { + key: k.public(), + weight: rng.gen_range(1..=100), + })) + .unwrap() + .into(), + }); + let mut all_votes: Vec = keys + .iter() + .map(|k| k.sign_msg(info.batch_to_attest.clone()).into()) + .collect(); + all_votes.shuffle(rng); + ctrl.start_attestation(info.clone()).await.unwrap(); + loop { + let end = rng.gen_range(0..=committee_size); + tracing::info!("end = {end}"); + ctrl.insert_votes(all_votes[..end].iter().cloned()) + .await + .unwrap(); + // Waiting for the previous qc should immediately return None. + assert_eq!( + None, + ctrl.wait_for_cert(ctx, info.batch_to_attest.number.prev().unwrap()) + .await + .unwrap() + ); + if info + .committee + .weight_of_keys(all_votes[..end].iter().map(|v| &v.key)) + >= info.committee.threshold() + { + let qc = ctrl + .wait_for_cert(ctx, info.batch_to_attest.number) + .await + .unwrap() + .unwrap(); + assert_eq!(qc.message, info.batch_to_attest); + qc.verify(genesis, &info.committee).unwrap(); + break; + } + assert_eq!( + None, + ctrl.state.subscribe().borrow().as_ref().unwrap().cert() + ); + } + } +} diff --git a/node/actors/network/src/gossip/attestation_status.rs b/node/actors/network/src/gossip/attestation_status.rs deleted file mode 100644 index b10b0208..00000000 --- a/node/actors/network/src/gossip/attestation_status.rs +++ /dev/null @@ -1,93 +0,0 @@ -use crate::watch::Watch; -use std::fmt; -use zksync_concurrency::sync; -use zksync_consensus_roles::attester; - -/// Coordinate the attestation by showing the status as seen by the main node. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct AttestationStatus { - /// Next batch number where voting is expected. - /// - /// The field is optional so that we can start an external node without the main node API - /// already deployed, which is how the initial rollout is. - pub next_batch_to_attest: Option, - /// The hash of the genesis of the chain to which the L1 batches belong. - /// - /// We don't expect to handle a regenesis on the fly without restarting the - /// node, so this value is not expected to change; it's here only to stop - /// any attempt at updating the status with a batch number that refers - /// to a different fork. - pub genesis: attester::GenesisHash, -} - -/// The subscription over the attestation status which voters can monitor for change. -pub type AttestationStatusReceiver = sync::watch::Receiver; - -/// A [Watch] over an [AttestationStatus] which we can use to notify components about -/// changes in the batch number the main node expects attesters to vote on. -pub struct AttestationStatusWatch(Watch); - -impl fmt::Debug for AttestationStatusWatch { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("AttestationStatusWatch") - .finish_non_exhaustive() - } -} - -impl AttestationStatusWatch { - /// Create a new watch with the current genesis, and a yet-to-be-determined batch number. - pub fn new(genesis: attester::GenesisHash) -> Self { - Self(Watch::new(AttestationStatus { - genesis, - next_batch_to_attest: None, - })) - } - - /// Subscribes to AttestationStatus updates. - pub fn subscribe(&self) -> AttestationStatusReceiver { - self.0.subscribe() - } - - /// Set the next batch number to attest on and notify subscribers it changed. - /// - /// Fails if the genesis we want to update to is not the same as the watch was started with, - /// because the rest of the system is not expected to be able to handle reorgs without a - /// restart of the node. - pub async fn update( - &self, - genesis: attester::GenesisHash, - next_batch_to_attest: attester::BatchNumber, - ) -> anyhow::Result<()> { - let this = self.0.lock().await; - { - let status = this.borrow(); - anyhow::ensure!( - status.genesis == genesis, - "the attestation status genesis changed: {:?} -> {:?}", - status.genesis, - genesis - ); - // The next batch to attest moving backwards could cause the voting process - // to get stuck due to the way gossiping works and the BatchVotes discards - // votes below the expected minimum: even if we clear the votes, we might - // not get them again from any peer. By returning an error we can cause - // the node to be restarted and connections re-established for fresh gossip. - if let Some(old_batch_to_attest) = status.next_batch_to_attest { - anyhow::ensure!( - old_batch_to_attest <= next_batch_to_attest, - "next batch to attest moved backwards: {} -> {}", - old_batch_to_attest, - next_batch_to_attest - ); - } - } - this.send_if_modified(|status| { - if status.next_batch_to_attest == Some(next_batch_to_attest) { - return false; - } - status.next_batch_to_attest = Some(next_batch_to_attest); - true - }); - Ok(()) - } -} diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs deleted file mode 100644 index b48fb205..00000000 --- a/node/actors/network/src/gossip/batch_votes.rs +++ /dev/null @@ -1,333 +0,0 @@ -//! Global state distributed by active attesters, observed by all the nodes in the network. -use super::metrics; -use crate::watch::Watch; -use std::{collections::HashSet, fmt, sync::Arc}; -use zksync_concurrency::sync; -use zksync_consensus_roles::attester; - -#[derive(Debug, Default)] -pub(super) struct BatchUpdateStats { - num_added: usize, - weight_added: u64, - last_added: Option, -} - -impl BatchUpdateStats { - fn added(&mut self, number: attester::BatchNumber, weight: u64) { - self.num_added += 1; - self.weight_added += weight; - self.last_added = Some(number); - } -} - -/// Represents the current state of node's knowledge about the attester votes. -/// -/// Eventually this data structure will have to track voting potentially happening -/// simultaneously on multiple heights, if we decrease the batch interval to be -/// several seconds, instead of a minute. By that point, the replicas should be -/// following the main node (or L1) to know what is the highest finalized batch, -/// which will act as a floor to the batch number we have to track here. It will -/// also help to protect ourselves from DoS attacks by malicious attesters casting -/// votes far into the future. -/// -/// For now, however, we just want a best effort where if we find a quorum, we -/// save it to the database, if not, we move on. For that, a simple protection -/// mechanism is to only allow one active vote per attester, which means any -/// previous vote can be removed when a new one is added. -#[derive(Clone, Default, PartialEq, Eq)] -pub(crate) struct BatchVotes { - /// The latest vote received from each attester. We only keep the last one - /// for now, hoping that with 1 minute batches there's plenty of time for - /// the quorum to be reached, but eventually we might have to allow multiple - /// votes across different heights. - pub(crate) votes: im::HashMap>>, - - /// Total weight of votes at different heights and hashes. - /// - /// We will be looking for any hash that reaches a quorum threshold at any of the heights. - /// At that point we can remove all earlier heights, considering it final. In the future - /// we can instead keep heights until they are observed on the main node (or L1). - pub(crate) support: - im::OrdMap>, - - /// The minimum batch number for which we are still interested in votes. - /// - /// Because we only store 1 vote per attester the memory is very much bounded, - /// but this extra pruning mechanism can be used to clear votes of attesters - /// who have been removed from the committee, as well as to get rid of the - /// last quorum we found and stored, and look for the a new one in the next round. - pub(crate) min_batch_number: attester::BatchNumber, -} - -impl BatchVotes { - /// Returns a set of votes of `self` which are newer than the entries in `b`. - pub(super) fn get_newer(&self, b: &Self) -> Vec>> { - let mut newer = vec![]; - for (k, v) in &self.votes { - if let Some(bv) = b.votes.get(k) { - if v.msg <= bv.msg { - continue; - } - } - newer.push(v.clone()); - } - newer - } - - /// Updates the discovery map with entries from `data`. - /// It exits as soon as an invalid entry is found. - /// `self` might get modified even if an error is returned - /// (all entries verified so far are added). - /// - /// Returns statistics about new entries added. - /// - /// For now it doesn't return an error if a vote with an invalid signature - /// is encountered, so that the node doesn't disconnect from peer if it - /// happens to have a new field in `Batch`. This is only until the feature - /// is stabilized. - pub(super) fn update( - &mut self, - attesters: &attester::Committee, - genesis: &attester::GenesisHash, - data: &[Arc>], - ) -> anyhow::Result { - let mut stats = BatchUpdateStats::default(); - - let mut done = HashSet::new(); - for d in data { - // Disallow multiple entries for the same key: - // it is important because a malicious attester may spam us with - // new versions and verifying signatures is expensive. - if done.contains(&d.key) { - anyhow::bail!("duplicate entry for {:?}", d.key); - } - done.insert(d.key.clone()); - - // Disallow votes from different genesis. It might indicate a reorg, - // in which case either this node or the remote peer has to be restarted. - anyhow::ensure!( - d.msg.genesis == *genesis, - "vote for batch with different genesis hash: {:?}", - d.msg.genesis - ); - - if d.msg.number < self.min_batch_number { - continue; - } - - let Some(weight) = attesters.weight(&d.key) else { - // We just skip the entries we are not interested in. - // For now the set of attesters is static, so we could treat this as an error, - // however we eventually want the attester set to be dynamic. - continue; - }; - - // If we already have a newer vote for this key, we can ignore this one. - if let Some(x) = self.votes.get(&d.key) { - if d.msg <= x.msg { - continue; - } - } - - // Check the signature before insertion. - if let Err(e) = d.verify() { - tracing::error!(error =? e, "failed to verify batch vote: {e:#}"); - } else { - self.add(d.clone(), weight); - stats.added(d.msg.number, weight); - } - } - - Ok(stats) - } - - /// Check if we have achieved quorum for any of the batch hashes. - /// - /// Returns the first quorum it finds, after which we expect that the state of the main node or L1 - /// will indicate that attestation on the next height can happen, which will either naturally move - /// the QC, or we can do so by increasing the `min_batch_number`. - /// - /// While we only store 1 vote per attester we'll only ever have at most one quorum anyway. - pub(super) fn find_quorum( - &self, - attesters: &attester::Committee, - genesis: &attester::GenesisHash, - ) -> Option { - let threshold = attesters.threshold(); - self.support - .iter() - .flat_map(|(number, candidates)| { - candidates - .iter() - .filter(|(_, weight)| **weight >= threshold) - .map(|(hash, _)| { - let sigs = self - .votes - .values() - .filter(|vote| vote.msg.hash == *hash) - .map(|vote| (vote.key.clone(), vote.sig.clone())) - .fold(attester::MultiSig::default(), |mut sigs, (key, sig)| { - sigs.add(key, sig); - sigs - }); - attester::BatchQC { - message: attester::Batch { - number: *number, - hash: *hash, - // This was checked during insertion; we could look up the first in `votes` - genesis: *genesis, - }, - signatures: sigs, - } - }) - }) - .next() - } - - /// Set the minimum batch number for which we admit votes. - /// - /// Discards data about earlier heights. - pub(super) fn set_min_batch_number(&mut self, min_batch_number: attester::BatchNumber) { - self.min_batch_number = min_batch_number; - self.votes.retain(|_, v| v.msg.number >= min_batch_number); - if let Some(prev) = min_batch_number.prev() { - self.support = self.support.split(&prev).1; - } - } - - /// Add an already validated vote from an attester into the register. - fn add(&mut self, vote: Arc>, weight: attester::Weight) { - self.remove(&vote.key, weight); - - let batch = self.support.entry(vote.msg.number).or_default(); - let support = batch.entry(vote.msg.hash).or_default(); - *support = support.saturating_add(weight); - - self.votes.insert(vote.key.clone(), vote); - } - - /// Remove any existing vote. - /// - /// This is for DoS protection, until we have better control over the acceptable vote range. - fn remove(&mut self, key: &attester::PublicKey, weight: attester::Weight) { - let Some(vote) = self.votes.remove(key) else { - return; - }; - - let batch = self.support.entry(vote.msg.number).or_default(); - let support = batch.entry(vote.msg.hash).or_default(); - *support = support.saturating_sub(weight); - - if *support == 0u64 { - batch.remove(&vote.msg.hash); - } - - if batch.is_empty() { - self.support.remove(&vote.msg.number); - } - } -} - -/// Watch wrapper of BatchVotes, -/// which supports subscribing to BatchVotes updates. -pub(crate) struct BatchVotesWatch(Watch); - -impl Default for BatchVotesWatch { - fn default() -> Self { - Self(Watch::new(BatchVotes::default())) - } -} - -impl BatchVotesWatch { - /// Subscribes to BatchVotes updates. - pub(crate) fn subscribe(&self) -> sync::watch::Receiver { - self.0.subscribe() - } - - /// Inserts data to BatchVotes. - /// Subscribers are notified iff at least 1 new entry has - /// been inserted. Returns an error iff an invalid - /// entry in `data` has been found. The provider of the - /// invalid entry should be banned. - pub(crate) async fn update( - &self, - attesters: &attester::Committee, - genesis: &attester::GenesisHash, - data: &[Arc>], - ) -> anyhow::Result<()> { - let this = self.0.lock().await; - let mut votes = this.borrow().clone(); - let stats = votes.update(attesters, genesis, data)?; - - if let Some(last_added) = stats.last_added { - this.send_replace(votes); - - #[allow(clippy::float_arithmetic)] - let weight_added = stats.weight_added as f64 / attesters.total_weight() as f64; - - metrics::BATCH_VOTES_METRICS - .last_added_vote_batch_number - .set(last_added.0); - - metrics::BATCH_VOTES_METRICS - .votes_added - .inc_by(stats.num_added as u64); - - metrics::BATCH_VOTES_METRICS - .weight_added - .inc_by(weight_added); - } - - metrics::BATCH_VOTES_METRICS - .committee_size - .set(attesters.len()); - - Ok(()) - } - - /// Set the minimum batch number on the votes and discard old data. - #[tracing::instrument(skip_all, fields(%min_batch_number))] - pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) { - let this = self.0.lock().await; - this.send_modify(|votes| votes.set_min_batch_number(min_batch_number)); - - metrics::BATCH_VOTES_METRICS - .min_batch_number - .set(min_batch_number.0); - } -} - -/// Wrapper around [BatchVotesWatch] to publish votes over batches signed by an attester key. -pub struct BatchVotesPublisher(pub(crate) Arc); - -impl fmt::Debug for BatchVotesPublisher { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("BatchVotesPublisher") - .finish_non_exhaustive() - } -} - -impl BatchVotesPublisher { - /// Sign an L1 batch and push it into the batch, which should cause it to be gossiped by the network. - #[tracing::instrument(skip_all, fields(l1_batch = %batch.number))] - pub async fn publish( - &self, - attesters: &attester::Committee, - genesis: &attester::GenesisHash, - attester: &attester::SecretKey, - batch: attester::Batch, - ) -> anyhow::Result<()> { - if !attesters.contains(&attester.public()) { - return Ok(()); - } - let attestation = attester.sign_msg(batch); - - metrics::BATCH_VOTES_METRICS - .last_signed_batch_number - .set(attestation.msg.number.0); - - self.0 - .update(attesters, genesis, &[Arc::new(attestation)]) - .await - } -} diff --git a/node/actors/network/src/gossip/metrics.rs b/node/actors/network/src/gossip/metrics.rs deleted file mode 100644 index d9df9d0a..00000000 --- a/node/actors/network/src/gossip/metrics.rs +++ /dev/null @@ -1,37 +0,0 @@ -/// Metrics related to the gossiping of L1 batch votes. -#[derive(Debug, vise::Metrics)] -#[metrics(prefix = "network_gossip_batch_votes")] -pub(crate) struct BatchVotesMetrics { - /// Number of members in the attester committee. - pub(crate) committee_size: vise::Gauge, - - /// Number of votes added to the tally. - /// - /// Its rate of change should correlate with the attester committee size, - /// save for any new joiner casting their historic votes in a burst. - pub(crate) votes_added: vise::Counter, - - /// Weight of votes added to the tally normalized by the total committee weight. - /// - /// Its rate of change should correlate with the attester committee weight and batch production rate, - /// that is, it should go up up by 1.0 with each new batch if everyone attests. - pub(crate) weight_added: vise::Counter, - - /// The minimum batch number we still expect votes for. - /// - /// This should go up as the main node indicates the finalisation of batches, - /// or as soon as batch QCs are found and persisted. - pub(crate) min_batch_number: vise::Gauge, - - /// Batch number in the last vote added to the register. - /// - /// This should go up as L1 batches are created, save for any temporary - /// outlier from lagging attesters or ones sending votes far in the future. - pub(crate) last_added_vote_batch_number: vise::Gauge, - - /// Batch number of the last batch signed by this attester. - pub(crate) last_signed_batch_number: vise::Gauge, -} - -#[vise::register] -pub(super) static BATCH_VOTES_METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 7d22062d..52512425 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -13,26 +13,19 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). -use self::batch_votes::BatchVotesWatch; -pub use self::{ - attestation_status::{AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch}, - batch_votes::BatchVotesPublisher, -}; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; use tracing::Instrument; pub(crate) use validator_addrs::*; -use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; +use zksync_concurrency::{ctx, ctx::channel, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; -mod attestation_status; -mod batch_votes; +pub mod attestation; mod fetch; mod handshake; pub mod loadtest; -mod metrics; mod runner; #[cfg(test)] mod testonly; @@ -50,8 +43,6 @@ pub(crate) struct Network { pub(crate) outbound: PoolWatch>, /// Current state of knowledge about validators' endpoints. pub(crate) validator_addrs: ValidatorAddrsWatch, - /// Current state of knowledge about batch votes. - pub(crate) batch_votes: Arc, /// Block store to serve `get_block` requests from. pub(crate) block_store: Arc, /// Batch store to serve `get_batch` requests from. @@ -64,8 +55,10 @@ pub(crate) struct Network { pub(crate) fetch_queue: fetch::Queue, /// TESTONLY: how many time push_validator_addrs rpc was called by the peers. pub(crate) push_validator_addrs_calls: AtomicUsize, - /// Shared watch over the current attestation status as indicated by the main node. - pub(crate) attestation_status: Arc, + /// Attestation controller, maintaining a set of batch votes. + /// Gossip network exchanges the votes with peers. + /// The batch for which the votes are collected is configured externally. + pub(crate) attestation: Arc, } impl Network { @@ -75,7 +68,7 @@ impl Network { block_store: Arc, batch_store: Arc, sender: channel::UnboundedSender, - attestation_status: Arc, + attestation: Arc, ) -> Arc { Arc::new(Self { sender, @@ -85,13 +78,12 @@ impl Network { ), outbound: PoolWatch::new(cfg.gossip.static_outbound.keys().cloned().collect(), 0), validator_addrs: ValidatorAddrsWatch::default(), - batch_votes: Arc::new(BatchVotesWatch::default()), cfg, fetch_queue: fetch::Queue::default(), block_store, batch_store, push_validator_addrs_calls: 0.into(), - attestation_status, + attestation, }) } @@ -173,57 +165,4 @@ impl Network { }) .await; } - - /// Task that reacts to new votes being added and looks for an L1 batch QC. - /// It persists the certificate once the quorum threshold is passed. - pub(crate) async fn run_batch_qc_finder(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { - let Some(attesters) = self.genesis().attesters.as_ref() else { - tracing::info!("no attesters in genesis, not looking for batch QCs"); - return Ok(()); - }; - let genesis = self.genesis().hash(); - - let mut recv_votes = self.batch_votes.subscribe(); - let mut recv_status = self.attestation_status.subscribe(); - - // Subscribe starts as seen but we don't want to miss the first item. - recv_status.mark_changed(); - - loop { - async { - // Wait until the status indicates that we're ready to sign the next batch. - let Some(batch_number) = sync::changed(ctx, &mut recv_status) - .instrument(tracing::info_span!("wait_for_attestation_status")) - .await? - .next_batch_to_attest - else { - return Ok(()); - }; - - // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. - self.batch_votes.set_min_batch_number(batch_number).await; - - // Now wait until we find the next quorum, whatever it is: - // * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps - // * on external nodes the votes might be affected by changes in the value returned by the API, and there might be gaps - // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters - // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. - // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. - let qc = sync::wait_for_some(ctx, &mut recv_votes, |votes| { - votes.find_quorum(attesters, &genesis) - }) - .instrument(tracing::info_span!("wait_for_quorum")) - .await?; - - self.batch_store - .persist_batch_qc(ctx, qc) - .await - .wrap("persist_batch_qc")?; - - ctx::Ok(()) - } - .instrument(tracing::info_span!("new_votes_iter")) - .await?; - } - } } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index effd0466..23ccf009 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -1,4 +1,4 @@ -use super::{batch_votes::BatchVotes, handshake, Network, ValidatorAddrs}; +use super::{handshake, Network, ValidatorAddrs}; use crate::{noise, preface, rpc}; use anyhow::Context as _; use async_trait::async_trait; @@ -13,7 +13,6 @@ use zksync_protobuf::kB; struct PushServer<'a> { blocks: sync::watch::Sender, batches: sync::watch::Sender, - /// The network is required for the verification of messages. net: &'a Network, } @@ -58,21 +57,34 @@ impl rpc::Handler for &PushServer<'_> { #[async_trait::async_trait] impl rpc::Handler for &PushServer<'_> { - /// Here we bound the buffering of incoming batch messages. fn max_req_size(&self) -> usize { 100 * kB } - async fn handle(&self, _ctx: &ctx::Ctx, req: rpc::push_batch_votes::Req) -> anyhow::Result<()> { - self.net - .batch_votes - .update( - self.net.genesis().attesters.as_ref().context("attesters")?, - &self.net.genesis().hash(), - &req.0, - ) - .await?; - Ok(()) + async fn handle( + &self, + _ctx: &ctx::Ctx, + req: rpc::push_batch_votes::Req, + ) -> anyhow::Result { + if let Err(err) = self + .net + .attestation + .insert_votes(req.votes.into_iter()) + .await + .context("insert_votes()") + { + // Attestation feature is still evolving, so for forward + // compatibility we just ignore any invalid data. + // Once stabilized we will drop the connection instead of + // logging the error. + tracing::warn!("{err:#}"); + } + Ok(rpc::push_batch_votes::Resp { + votes: match req.want_votes_for.as_ref() { + Some(batch) => self.net.attestation.votes(batch), + None => vec![], + }, + }) } } @@ -156,6 +168,8 @@ impl Network { rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate); let get_batch_client = rpc::Client::::new(ctx, self.cfg.rpc.get_batch_rate); + let push_batch_votes_client = + rpc::Client::::new(ctx, self.cfg.rpc.push_batch_votes_rate); scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() @@ -181,42 +195,51 @@ impl Network { .add_server(ctx, &*self.block_store, self.cfg.rpc.get_block_rate) .add_client(&get_batch_client) .add_server(ctx, &*self.batch_store, self.cfg.rpc.get_batch_rate) - .add_server(ctx, rpc::ping::Server, rpc::ping::RATE); - - // If there is an attester committee then - if self.genesis().attesters.as_ref().is_some() { - let push_batch_votes_client = rpc::Client::::new( + .add_server(ctx, rpc::ping::Server, rpc::ping::RATE) + .add_client(&push_batch_votes_client) + .add_server::( ctx, + &push_server, self.cfg.rpc.push_batch_votes_rate, ); - service = service - .add_client(&push_batch_votes_client) - .add_server::( - ctx, - &push_server, - self.cfg.rpc.push_batch_votes_rate, - ); - // Push L1 batch votes updates to peer. - s.spawn::<()>(async { - let push_batch_votes_client = push_batch_votes_client; - // Snapshot of the batches when we last pushed to the peer. - let mut old = BatchVotes::default(); - // Subscribe to what we know about the state of the whole network. - let mut sub = self.batch_votes.subscribe(); - sub.mark_changed(); - loop { - let new = sync::changed(ctx, &mut sub).await?.clone(); - // Get the *new* votes, which haven't been pushed before. - let diff = new.get_newer(&old); - if diff.is_empty() { - continue; + + // Push L1 batch votes updates to peer. + s.spawn::<()>(async { + let push_batch_votes_client = push_batch_votes_client; + // Subscribe to what we know about the state of the whole network. + let mut recv = self.attestation.subscribe(); + loop { + let diff = recv.wait_for_diff(ctx).await?; + let req = rpc::push_batch_votes::Req { + // If the info has changed, we need to re-request all the votes + // from peer that we might have ignored earlier. + want_votes_for: diff.info.as_ref().map(|c| c.batch_to_attest.clone()), + votes: diff.votes, + }; + // NOTE: The response should be non-empty only iff we requested a snapshot. + // Therefore, if we needed we could restrict the response size to ~1kB in + // such a case. + let resp = push_batch_votes_client.call(ctx, &req, 100 * kB).await?; + if !resp.votes.is_empty() { + anyhow::ensure!( + req.want_votes_for.is_some(), + "expected empty response, but votes were returned" + ); + if let Err(err) = self + .attestation + .insert_votes(resp.votes.into_iter()) + .await + .context("insert_votes") + { + // Attestation feature is still evolving, so for forward + // compatibility we just ignore any invalid data. + // Once stabilized we will drop the connection instead of + // logging the error. + tracing::warn!("{err:#}"); } - old = new; - let req = rpc::push_batch_votes::Req(diff); - push_batch_votes_client.call(ctx, &req, kB).await?; } - }); - } + } + }); if let Some(ping_timeout) = &self.cfg.ping_timeout { let ping_client = rpc::Client::::new(ctx, rpc::ping::RATE); diff --git a/node/actors/network/src/gossip/testonly.rs b/node/actors/network/src/gossip/testonly.rs index d37419c0..114a838e 100644 --- a/node/actors/network/src/gossip/testonly.rs +++ b/node/actors/network/src/gossip/testonly.rs @@ -28,7 +28,7 @@ impl ConnRunner { fn mux_entry(ctx: &ctx::Ctx) -> (mux::CapabilityId, Arc) { ( - R::CAPABILITY_ID, + R::CAPABILITY.id(), mux::StreamQueue::new(ctx, R::INFLIGHT, limiter::Rate::INF), ) } @@ -89,7 +89,7 @@ impl Conn { ) -> ctx::OrCanceled> { Ok(ServerStream( self.connect - .get(&R::CAPABILITY_ID) + .get(&R::CAPABILITY.id()) .unwrap() .open(ctx) .await?, @@ -104,7 +104,7 @@ impl Conn { ) -> ctx::OrCanceled> { Ok(ClientStream( self.accept - .get(&R::CAPABILITY_ID) + .get(&R::CAPABILITY.id()) .unwrap() .open(ctx) .await?, diff --git a/node/actors/network/src/gossip/tests/fetch_batches.rs b/node/actors/network/src/gossip/tests/fetch_batches.rs index 12b8e840..eb169278 100644 --- a/node/actors/network/src/gossip/tests/fetch_batches.rs +++ b/node/actors/network/src/gossip/tests/fetch_batches.rs @@ -17,7 +17,6 @@ async fn test_simple() { cfg.rpc.push_batch_store_state_rate = limiter::Rate::INF; cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; - cfg.attester_key = None; cfg.validator_key = None; scope::run!(ctx, |ctx, s| async { @@ -127,7 +126,6 @@ async fn test_concurrent_requests() { cfg.rpc.push_batch_store_state_rate = limiter::Rate::INF; cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; - cfg.attester_key = None; cfg.validator_key = None; scope::run!(ctx, |ctx, s| async { @@ -204,7 +202,6 @@ async fn test_bad_responses() { cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; cfg.validator_key = None; - cfg.attester_key = None; scope::run!(ctx, |ctx, s| async { let store = TestMemoryStorage::new(ctx, &setup.genesis).await; @@ -281,7 +278,6 @@ async fn test_retry() { cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; cfg.validator_key = None; - cfg.attester_key = None; scope::run!(ctx, |ctx, s| async { let store = TestMemoryStorage::new(ctx, &setup.genesis).await; diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/actors/network/src/gossip/tests/mod.rs index 942a7ec3..7ab311bb 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/actors/network/src/gossip/tests/mod.rs @@ -1,10 +1,6 @@ use super::ValidatorAddrs; use crate::{ - gossip::{ - batch_votes::{BatchVotes, BatchVotesWatch}, - handshake, - validator_addrs::ValidatorAddrsWatch, - }, + gossip::{attestation, handshake, validator_addrs::ValidatorAddrsWatch}, metrics, preface, rpc, testonly, }; use anyhow::Context as _; @@ -19,12 +15,12 @@ use tracing::Instrument as _; use zksync_concurrency::{ ctx, error::Wrap as _, - net, scope, sync, + limiter, net, scope, sync, testonly::{abort_on_panic, set_timeout}, time, }; use zksync_consensus_roles::{attester, validator}; -use zksync_consensus_storage::{testonly::TestMemoryStorage, PersistentBatchStore}; +use zksync_consensus_storage::testonly::TestMemoryStorage; mod fetch_batches; mod fetch_blocks; @@ -96,9 +92,6 @@ fn mk_version(rng: &mut R) -> u64 { #[derive(Default)] struct View(im::HashMap>>); -#[derive(Default)] -struct Signatures(im::HashMap>>); - fn mk_netaddr( key: &validator::SecretKey, addr: std::net::SocketAddr, @@ -112,19 +105,6 @@ fn mk_netaddr( }) } -fn mk_batch( - rng: &mut R, - key: &attester::SecretKey, - number: attester::BatchNumber, - genesis: attester::GenesisHash, -) -> attester::Signed { - key.sign_msg(attester::Batch { - number, - hash: rng.gen(), - genesis, - }) -} - fn random_netaddr( rng: &mut R, key: &validator::SecretKey, @@ -137,19 +117,6 @@ fn random_netaddr( )) } -fn random_batch_vote( - rng: &mut R, - key: &attester::SecretKey, - genesis: attester::GenesisHash, -) -> Arc> { - let batch = attester::Batch { - number: attester::BatchNumber(rng.gen_range(0..1000)), - hash: rng.gen(), - genesis, - }; - Arc::new(key.sign_msg(batch.to_owned())) -} - fn update_netaddr( rng: &mut R, addr: &validator::NetAddress, @@ -165,20 +132,6 @@ fn update_netaddr( )) } -fn update_signature( - rng: &mut R, - batch: &attester::Batch, - key: &attester::SecretKey, - batch_number_diff: i64, -) -> Arc> { - let batch = attester::Batch { - number: attester::BatchNumber((batch.number.0 as i64 + batch_number_diff) as u64), - hash: rng.gen(), - genesis: batch.genesis, - }; - Arc::new(key.sign_msg(batch.to_owned())) -} - impl View { fn insert(&mut self, entry: Arc>) { self.0.insert(entry.key.clone(), entry); @@ -193,20 +146,6 @@ impl View { } } -impl Signatures { - fn insert(&mut self, entry: Arc>) { - self.0.insert(entry.key.clone(), entry); - } - - fn get(&mut self, key: &attester::SecretKey) -> Arc> { - self.0.get(&key.public()).unwrap().clone() - } - - fn as_vec(&self) -> Vec>> { - self.0.values().cloned().collect() - } -} - #[tokio::test] async fn test_validator_addrs() { abort_on_panic(); @@ -545,230 +484,105 @@ async fn rate_limiting() { } } -#[tokio::test] -async fn test_batch_votes() { - abort_on_panic(); - let rng = &mut ctx::test_root(&ctx::RealClock).rng(); - - let keys: Vec = (0..8).map(|_| rng.gen()).collect(); - let attesters = attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { - key: k.public(), - weight: 1250, - })) - .unwrap(); - let votes = BatchVotesWatch::default(); - let mut sub = votes.subscribe(); - - let genesis = rng.gen::(); - - // Initial values. - let mut want = Signatures::default(); - for k in &keys[0..6] { - want.insert(random_batch_vote(rng, k, genesis)); - } - votes - .update(&attesters, &genesis, &want.as_vec()) - .await - .unwrap(); - assert_eq!(want.0, sub.borrow_and_update().votes); - - // newer batch number, should be updated - let k0v2 = update_signature(rng, &want.get(&keys[0]).msg, &keys[0], 1); - // same batch number, should be ignored - let k1v2 = update_signature(rng, &want.get(&keys[1]).msg, &keys[1], 0); - // older batch number, should be ignored - let k4v2 = update_signature(rng, &want.get(&keys[4]).msg, &keys[4], -1); - // first entry for a key in the config, should be inserted - let k6v1 = random_batch_vote(rng, &keys[6], genesis); - // entry for a key outside of the config, should be ignored - let k8 = rng.gen(); - let k8v1 = random_batch_vote(rng, &k8, genesis); - - // Update the ones we expect to succeed - want.insert(k0v2.clone()); - want.insert(k6v1.clone()); - - // Send all of them to the votes - let update = [ - k0v2, - k1v2, - k4v2, - // no new entry for keys[5] - k6v1, - // no entry at all for keys[7] - k8v1.clone(), - ]; - votes.update(&attesters, &genesis, &update).await.unwrap(); - assert_eq!(want.0, sub.borrow_and_update().votes); - - // Invalid signature, should be ignored. - let mut k0v3 = mk_batch( - rng, - &keys[1], - attester::BatchNumber(want.get(&keys[0]).msg.number.0 + 1), - genesis, - ); - k0v3.key = keys[0].public(); - assert!(votes - .update(&attesters, &genesis, &[Arc::new(k0v3)]) - .await - .is_ok()); - assert_eq!(want.0, sub.borrow_and_update().votes); - - // Invalid genesis, should be rejected. - let other_genesis = rng.gen(); - let k1v3 = mk_batch( - rng, - &keys[1], - attester::BatchNumber(want.get(&keys[1]).msg.number.0 + 1), - other_genesis, - ); - assert!(votes - .update(&attesters, &genesis, &[Arc::new(k1v3)]) - .await - .is_err()); - - assert_eq!(want.0, sub.borrow_and_update().votes); - - // Duplicate entry in the update, should be rejected. - assert!(votes - .update(&attesters, &genesis, &[k8v1.clone(), k8v1]) - .await - .is_err()); - assert_eq!(want.0, sub.borrow_and_update().votes); -} - -#[test] -fn test_batch_votes_quorum() { - abort_on_panic(); - let rng = &mut ctx::test_root(&ctx::RealClock).rng(); - - for _ in 0..10 { - let committee_size = rng.gen_range(1..20); - let keys: Vec = (0..committee_size).map(|_| rng.gen()).collect(); - let attesters = attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { - key: k.public(), - weight: rng.gen_range(1..=100), - })) - .unwrap(); - - let batch0 = rng.gen::(); - let batch1 = attester::Batch { - number: batch0.number.next(), - hash: rng.gen(), - genesis: batch0.genesis, - }; - let genesis = batch0.genesis; - let mut batches = [(batch0, 0u64), (batch1, 0u64)]; - - let mut votes = BatchVotes::default(); - for sk in &keys { - // We need 4/5+1 for quorum, so let's say ~80% vote on the second batch. - let b = usize::from(rng.gen_range(0..100) < 80); - let batch = &batches[b].0; - let vote = sk.sign_msg(batch.clone()); - votes - .update(&attesters, &genesis, &[Arc::new(vote)]) - .unwrap(); - batches[b].1 += attesters.weight(&sk.public()).unwrap(); - - // Check that as soon as we have quorum it's found. - if batches[b].1 >= attesters.threshold() { - let qc = votes - .find_quorum(&attesters, &genesis) - .expect("should find quorum"); - assert!(qc.message == *batch); - assert!(qc.signatures.keys().count() > 0); - } - } - - // Check that if there was no quoroum then we don't find any. - if !batches.iter().any(|b| b.1 >= attesters.threshold()) { - assert!(votes.find_quorum(&attesters, &genesis).is_none()); - } - - // Check that the minimum batch number prunes data. - let last_batch = batches[1].0.number; - - votes.set_min_batch_number(last_batch); - assert!(votes.votes.values().all(|v| v.msg.number >= last_batch)); - assert!(votes.support.keys().all(|n| *n >= last_batch)); - - votes.set_min_batch_number(last_batch.next()); - assert!(votes.votes.is_empty()); - assert!(votes.support.is_empty()); - } -} - -// #[tokio::test(flavor = "multi_thread")] async fn test_batch_votes_propagation() { - let _guard = set_timeout(time::Duration::seconds(10)); abort_on_panic(); + let _guard = set_timeout(time::Duration::seconds(10)); let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); let rng = &mut ctx.rng(); - let mut setup = validator::testonly::Setup::new(rng, 10); + let setup = validator::testonly::Setup::new(rng, 10); + let cfgs = testonly::new_configs(rng, &setup, 2); - let cfgs = testonly::new_configs(rng, &setup, 1); + // Fixed attestation schedule. + let first: attester::BatchNumber = rng.gen(); + let schedule: Vec<_> = (0..10) + .map(|r| { + Arc::new(attestation::Info { + batch_to_attest: attester::Batch { + genesis: setup.genesis.hash(), + number: first + r, + hash: rng.gen(), + }, + committee: { + // We select a random subset here. It would be incorrect to choose an empty subset, but + // the chances of that are negligible. + let subset: Vec<_> = setup.attester_keys.iter().filter(|_| rng.gen()).collect(); + attester::Committee::new(subset.iter().map(|k| attester::WeightedAttester { + key: k.public(), + weight: rng.gen_range(5..10), + })) + .unwrap() + .into() + }, + }) + }) + .collect(); + + // Round of the schedule that nodes should collect the votes for. + let round = sync::watch::channel(0).0; scope::run!(ctx, |ctx, s| async { - // All nodes share the same in-memory store + // All nodes share the same store - store is not used in this test anyway. let store = TestMemoryStorage::new(ctx, &setup.genesis).await; s.spawn_bg(store.runner.run(ctx)); - // Push the first batch into the store so we have something to vote on. - setup.push_blocks(rng, 2); - setup.push_batch(rng); - - let batch = setup.batches[0].clone(); - - store - .batches - .queue_batch(ctx, batch.clone(), setup.genesis.clone()) - .await - .unwrap(); - - let batch = attester::Batch { - number: batch.number, - hash: rng.gen(), - genesis: setup.genesis.hash(), - }; - // Start all nodes. - let nodes: Vec = cfgs - .iter() - .enumerate() - .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new( - cfg.clone(), - store.blocks.clone(), - store.batches.clone(), - ); - s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); - node - }) - .collect(); - - // Cast votes on each node. It doesn't matter who signs where in this test, - // we just happen to know that we'll have as many nodes as attesters. - let attesters = setup.genesis.attesters.as_ref().unwrap(); - for (node, key) in nodes.iter().zip(setup.attester_keys.iter()) { - node.net - .batch_vote_publisher() - .publish(attesters, &setup.genesis.hash(), key, batch.clone()) - .await - .unwrap(); - } - - // Wait until one of the nodes collects a quorum over gossip and stores. - loop { - if let Some(qc) = store.im_batches.get_batch_qc(ctx, batch.number).await? { - assert_eq!(qc.message, batch); - return Ok(()); - } - ctx.sleep(time::Duration::milliseconds(100)).await?; + for (i, mut cfg) in cfgs.into_iter().enumerate() { + cfg.rpc.push_batch_votes_rate = limiter::Rate::INF; + cfg.validator_key = None; + let (node, runner) = testonly::Instance::new_from_config(testonly::InstanceConfig { + cfg: cfg.clone(), + block_store: store.blocks.clone(), + batch_store: store.batches.clone(), + attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone())) + .into(), + }); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + // Task going through the schedule, waiting for ANY node to collect the certificate + // to advance to the next round of the schedule. + // Test succeeds if all tasks successfully iterate through the whole schedule. + s.spawn( + async { + let node = node; + let sub = &mut round.subscribe(); + sub.mark_changed(); + loop { + let r = ctx::NoCopy(*sync::changed(ctx, sub).await?); + tracing::info!("starting round {}", *r); + let Some(cfg) = schedule.get(*r) else { + return Ok(()); + }; + let attestation = node.net.gossip.attestation.clone(); + attestation.start_attestation(cfg.clone()).await.unwrap(); + // Wait for the certificate in the background. + s.spawn_bg(async { + let r = r; + let attestation = attestation; + let Ok(Some(qc)) = attestation + .wait_for_cert(ctx, cfg.batch_to_attest.number) + .await + else { + return Ok(()); + }; + assert_eq!(qc.message, cfg.batch_to_attest); + qc.verify(setup.genesis.hash(), &cfg.committee).unwrap(); + tracing::info!("got cert for {}", *r); + round.send_if_modified(|round| { + if *round != *r { + return false; + } + *round = *r + 1; + true + }); + Ok(()) + }); + } + } + .instrument(tracing::info_span!("attester", i)), + ); } + Ok(()) }) .await .unwrap(); diff --git a/node/actors/network/src/gossip/tests/syncing.rs b/node/actors/network/src/gossip/tests/syncing.rs index b456350f..1bd51446 100644 --- a/node/actors/network/src/gossip/tests/syncing.rs +++ b/node/actors/network/src/gossip/tests/syncing.rs @@ -327,7 +327,6 @@ async fn coordinated_batch_syncing(node_count: usize, gossip_peers: usize) { cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; cfg.validator_key = None; - cfg.attester_key = None; let store = TestMemoryStorage::new(ctx, &setup.genesis).await; s.spawn_bg(store.runner.run(ctx)); let (node, runner) = testonly::Instance::new(cfg, store.blocks, store.batches); @@ -384,7 +383,6 @@ async fn uncoordinated_batch_syncing( cfg.rpc.get_batch_rate = limiter::Rate::INF; cfg.rpc.get_batch_timeout = None; cfg.validator_key = None; - cfg.attester_key = None; let store = TestMemoryStorage::new(ctx, &setup.genesis).await; s.spawn_bg(store.runner.run(ctx)); let (node, runner) = testonly::Instance::new(cfg, store.blocks, store.batches); diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 1d5f3cbd..1e7874d2 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -1,6 +1,6 @@ //! Network actor maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; -use gossip::{AttestationStatusWatch, BatchVotesPublisher}; +use gossip::attestation; use std::sync::Arc; use tracing::Instrument as _; use zksync_concurrency::{ @@ -57,10 +57,9 @@ impl Network { block_store: Arc, batch_store: Arc, pipe: ActorPipe, - attestation_status: Arc, + attestation: Arc, ) -> (Arc, Runner) { - let gossip = - gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation_status); + let gossip = gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation); let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( @@ -77,11 +76,6 @@ impl Network { metrics::NetworkGauges::register(Arc::downgrade(self)); } - /// Create a batch vote publisher to push attestations to gossip. - pub fn batch_vote_publisher(&self) -> BatchVotesPublisher { - BatchVotesPublisher(self.gossip.batch_votes.clone()) - } - /// Handles a dispatcher message. async fn handle_message( &self, @@ -133,9 +127,6 @@ impl Runner { Ok(()) }); - // Update QC batches in the background. - s.spawn(self.net.gossip.run_batch_qc_finder(ctx)); - // Fetch missing batches in the background. s.spawn(async { self.net.gossip.run_batch_fetcher(ctx).await; @@ -175,8 +166,6 @@ impl Runner { } } - // TODO: check if we are active attester to get new L1 Batches, sign them and broadcast the signature - let accept_limiter = limiter::Limiter::new(ctx, self.net.gossip.cfg.tcp_accept_rate); loop { accept_limiter.acquire(ctx, 1).await?; diff --git a/node/actors/network/src/proto/gossip.proto b/node/actors/network/src/proto/gossip.proto index 33e3d16a..8f8c1327 100644 --- a/node/actors/network/src/proto/gossip.proto +++ b/node/actors/network/src/proto/gossip.proto @@ -19,10 +19,18 @@ message PushValidatorAddrs { } message PushBatchVotes { + // Requesting the peer to respond with votes for the batch. + optional roles.attester.Batch want_votes_for = 2; // optional // Signed roles.validator.Msg.votes repeated roles.attester.Signed votes = 1; } +message PushBatchVotesResp { + // Signed roles.validator.Msg.votes + // Empty if want_votes_for in request was not set. + repeated roles.attester.Signed votes = 1; +} + // State of the local block store. // A node is expected to store a continuous range of blocks at all times // and actively fetch newest blocks. diff --git a/node/actors/network/src/proto/rpc.proto b/node/actors/network/src/proto/rpc.proto new file mode 100644 index 00000000..503dbe00 --- /dev/null +++ b/node/actors/network/src/proto/rpc.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package zksync.network.rpc; + +enum Capability { + reserved 5; + CONSENSUS = 0; + PING = 2; + PUSH_VALIDATOR_ADDRS = 1; + PUSH_BLOCK_STORE_STATE = 3; + GET_BLOCK = 4; + PUSH_BATCH_VOTES = 8; + PUSH_BATCH_STORE_STATE = 7; + GET_BATCH = 6; +} diff --git a/node/actors/network/src/rpc/consensus.rs b/node/actors/network/src/rpc/consensus.rs index 40082c2a..40cd6196 100644 --- a/node/actors/network/src/rpc/consensus.rs +++ b/node/actors/network/src/rpc/consensus.rs @@ -1,5 +1,6 @@ //! Defines RPC for passing consensus messages. -use crate::{mux, proto::consensus as proto}; +use super::Capability; +use crate::proto::consensus as proto; use zksync_consensus_roles::validator; use zksync_protobuf::{read_required, ProtoFmt}; @@ -7,7 +8,7 @@ use zksync_protobuf::{read_required, ProtoFmt}; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 0; + const CAPABILITY: Capability = Capability::Consensus; const INFLIGHT: u32 = 3; const METHOD: &'static str = "consensus"; type Req = Req; diff --git a/node/actors/network/src/rpc/get_batch.rs b/node/actors/network/src/rpc/get_batch.rs index 8c83e451..18c53ca4 100644 --- a/node/actors/network/src/rpc/get_batch.rs +++ b/node/actors/network/src/rpc/get_batch.rs @@ -1,5 +1,6 @@ //! RPC for fetching a batch from peer. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context; use zksync_consensus_roles::attester; use zksync_protobuf::{read_optional, ProtoFmt}; @@ -10,7 +11,7 @@ pub(crate) struct Rpc; // TODO: determine more precise `INFLIGHT` / `RATE` values as a result of load testing impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 6; + const CAPABILITY: Capability = Capability::GetBatch; const INFLIGHT: u32 = 5; const METHOD: &'static str = "get_batch"; diff --git a/node/actors/network/src/rpc/get_block.rs b/node/actors/network/src/rpc/get_block.rs index 45b208d0..84474f7a 100644 --- a/node/actors/network/src/rpc/get_block.rs +++ b/node/actors/network/src/rpc/get_block.rs @@ -1,5 +1,6 @@ //! RPC for fetching a block from peer. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context; use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; use zksync_protobuf::{read_optional, ProtoFmt}; @@ -10,7 +11,7 @@ pub(crate) struct Rpc; // TODO: determine more precise `INFLIGHT` / `RATE` values as a result of load testing impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 4; + const CAPABILITY: Capability = Capability::GetBlock; const INFLIGHT: u32 = 5; const METHOD: &'static str = "get_block"; diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index 8c4d6d7f..7d53b3a5 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -16,6 +16,7 @@ //! at the same time (max 1 client + server per CapabilityId). use self::metrics::{CallLatencyType, CallType, RPC_METRICS}; +pub(crate) use crate::proto::rpc::Capability; use crate::{frame, mux}; use anyhow::Context as _; use std::{collections::BTreeMap, sync::Arc}; @@ -35,6 +36,13 @@ pub(crate) mod testonly; #[cfg(test)] mod tests; +impl Capability { + /// Converts capability to `mux::CapabilityId`. + pub(crate) fn id(self) -> mux::CapabilityId { + self as mux::CapabilityId + } +} + /// Multiplexer configuration for the RPC services. pub(crate) const MUX_CONFIG: mux::Config = mux::Config { read_buffer_size: 160 * zksync_protobuf::kB as u64, @@ -47,16 +55,21 @@ pub(crate) const MUX_CONFIG: mux::Config = mux::Config { /// It is just a collection of associated types /// and constants. pub(crate) trait Rpc: Sync + Send + 'static { - /// CapabilityId used to identify the RPC. - /// Client and Server have to agree on CAPABILITY_ID - /// of all supported RPC, for the RPC to actually work. + /// Capability used to identify the RPC. + /// Client and Server both have to support the given + /// capability for the RPC to work. /// Each type implementing `Rpc` should use an unique - /// `CAPABILITY_ID`. - const CAPABILITY_ID: mux::CapabilityId; + /// capability. We use a protobuf enum as a + /// register of capabilitues to enforce uniqueness + /// and compatibility. + const CAPABILITY: Capability; /// Maximal number of calls executed in parallel. /// Both client and server enforce this limit. const INFLIGHT: u32; /// Name of the RPC, used in prometheus metrics. + /// TODO: we can derive it automatically from EnumValueDescriptor of the + /// CAPABILITY (or form Debug implementation of Capability, but Debug formats + /// are not stable). const METHOD: &'static str; /// Type of the request message. type Req: zksync_protobuf::ProtoFmt + Send + Sync; @@ -254,12 +267,12 @@ impl<'a> Service<'a> { if self .mux .accept - .insert(R::CAPABILITY_ID, client.queue.clone()) + .insert(R::CAPABILITY.id(), client.queue.clone()) .is_some() { panic!( - "client for capability {} already registered", - R::CAPABILITY_ID + "client for capability {:?} already registered", + R::CAPABILITY ); } self @@ -276,12 +289,12 @@ impl<'a> Service<'a> { if self .mux .connect - .insert(R::CAPABILITY_ID, queue.clone()) + .insert(R::CAPABILITY.id(), queue.clone()) .is_some() { panic!( - "server for capability {} already registered", - R::CAPABILITY_ID + "server for capability {:?} already registered", + R::CAPABILITY ); } self.servers.push(Box::new(Server { diff --git a/node/actors/network/src/rpc/ping.rs b/node/actors/network/src/rpc/ping.rs index a11e07ed..1e78ba8c 100644 --- a/node/actors/network/src/rpc/ping.rs +++ b/node/actors/network/src/rpc/ping.rs @@ -1,5 +1,6 @@ //! Defines an RPC for sending ping messages. -use crate::{mux, proto::ping as proto}; +use super::Capability; +use crate::proto::ping as proto; use anyhow::Context as _; use rand::Rng; use zksync_concurrency::{ctx, limiter, time}; @@ -9,7 +10,7 @@ use zksync_protobuf::{kB, required, ProtoFmt}; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 2; + const CAPABILITY: Capability = Capability::Ping; const INFLIGHT: u32 = 1; const METHOD: &'static str = "ping"; type Req = Req; diff --git a/node/actors/network/src/rpc/push_batch_store_state.rs b/node/actors/network/src/rpc/push_batch_store_state.rs index aa098a5a..4c561abe 100644 --- a/node/actors/network/src/rpc/push_batch_store_state.rs +++ b/node/actors/network/src/rpc/push_batch_store_state.rs @@ -1,5 +1,6 @@ //! RPC for fetching a batch from peer. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context as _; use zksync_consensus_roles::attester; use zksync_consensus_storage::BatchStoreState; @@ -10,7 +11,7 @@ use zksync_protobuf::{required, ProtoFmt}; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 7; + const CAPABILITY: Capability = Capability::PushBatchStoreState; const INFLIGHT: u32 = 1; const METHOD: &'static str = "push_batch_store_state"; diff --git a/node/actors/network/src/rpc/push_batch_votes.rs b/node/actors/network/src/rpc/push_batch_votes.rs index e0e67126..901d120b 100644 --- a/node/actors/network/src/rpc/push_batch_votes.rs +++ b/node/actors/network/src/rpc/push_batch_votes.rs @@ -1,34 +1,35 @@ //! Defines RPC for passing consensus messages. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context as _; use std::sync::Arc; -use zksync_consensus_roles::attester::{self, Batch}; -use zksync_protobuf::ProtoFmt; +use zksync_consensus_roles::attester; +use zksync_protobuf::{read_optional, ProtoFmt}; -/// PushBatchVotes RPC. +/// RPC pushing fresh batch votes. pub(crate) struct Rpc; -/// Deprecated, because adding `genesis_hash` to `validator::Batch` -/// was not backward compatible - old binaries couldn't verify -/// signatures on messages with `genesis_hash` and were treating it -/// as malicious behavior. -#[allow(dead_code)] -pub(super) const V1: mux::CapabilityId = 5; - -/// Current version. -pub(super) const V2: mux::CapabilityId = 8; - impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = V2; + const CAPABILITY: Capability = Capability::PushBatchVotes; const INFLIGHT: u32 = 1; const METHOD: &'static str = "push_batch_votes"; type Req = Req; - type Resp = (); + type Resp = Resp; } /// Signed batch message that the receiving peer should process. #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct Req(pub(crate) Vec>>); +pub(crate) struct Req { + /// Requesting the peer to respond with votes for the batch. + pub(crate) want_votes_for: Option, + /// New votes that server might be not aware of. + pub(crate) votes: Vec>>, +} + +pub(crate) struct Resp { + /// Votes requested by the peer. + pub(crate) votes: Vec>>, +} impl ProtoFmt for Req { type Proto = proto::PushBatchVotes; @@ -40,12 +41,44 @@ impl ProtoFmt for Req { ProtoFmt::read(e).with_context(|| format!("votes[{i}]"))?, )); } - Ok(Self(votes)) + Ok(Self { + want_votes_for: read_optional(&r.want_votes_for).context("want_votes_for")?, + votes, + }) + } + + fn build(&self) -> Self::Proto { + Self::Proto { + want_votes_for: self.want_votes_for.as_ref().map(ProtoFmt::build), + votes: self + .votes + .iter() + .map(|a| ProtoFmt::build(a.as_ref())) + .collect(), + } + } +} + +impl ProtoFmt for Resp { + type Proto = proto::PushBatchVotesResp; + + fn read(r: &Self::Proto) -> anyhow::Result { + let mut votes = vec![]; + for (i, e) in r.votes.iter().enumerate() { + votes.push(Arc::new( + ProtoFmt::read(e).with_context(|| format!("votes[{i}]"))?, + )); + } + Ok(Self { votes }) } fn build(&self) -> Self::Proto { Self::Proto { - votes: self.0.iter().map(|a| ProtoFmt::build(a.as_ref())).collect(), + votes: self + .votes + .iter() + .map(|a| ProtoFmt::build(a.as_ref())) + .collect(), } } } diff --git a/node/actors/network/src/rpc/push_block_store_state.rs b/node/actors/network/src/rpc/push_block_store_state.rs index 52c1161c..1134185c 100644 --- a/node/actors/network/src/rpc/push_block_store_state.rs +++ b/node/actors/network/src/rpc/push_block_store_state.rs @@ -1,5 +1,6 @@ //! RPC for notifying peer about our BlockStore state. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context; use zksync_consensus_roles::validator; use zksync_consensus_storage::BlockStoreState; @@ -10,7 +11,7 @@ use zksync_protobuf::{read_optional, required, ProtoFmt}; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 3; + const CAPABILITY: Capability = Capability::PushBlockStoreState; const INFLIGHT: u32 = 1; const METHOD: &'static str = "push_block_store_state"; diff --git a/node/actors/network/src/rpc/push_validator_addrs.rs b/node/actors/network/src/rpc/push_validator_addrs.rs index c58803bf..48a85ba4 100644 --- a/node/actors/network/src/rpc/push_validator_addrs.rs +++ b/node/actors/network/src/rpc/push_validator_addrs.rs @@ -1,5 +1,6 @@ //! RPC for synchronizing ValidatorAddrs data. -use crate::{mux, proto::gossip as proto}; +use super::Capability; +use crate::proto::gossip as proto; use anyhow::Context as _; use std::sync::Arc; use zksync_consensus_roles::validator; @@ -9,7 +10,7 @@ use zksync_protobuf::ProtoFmt; pub(crate) struct Rpc; impl super::Rpc for Rpc { - const CAPABILITY_ID: mux::CapabilityId = 1; + const CAPABILITY: Capability = Capability::PushValidatorAddrs; const INFLIGHT: u32 = 1; const METHOD: &'static str = "push_validator_addrs"; diff --git a/node/actors/network/src/rpc/tests.rs b/node/actors/network/src/rpc/tests.rs index 0f444bf5..d2523fc2 100644 --- a/node/actors/network/src/rpc/tests.rs +++ b/node/actors/network/src/rpc/tests.rs @@ -1,30 +1,10 @@ use super::*; use crate::noise; use rand::Rng as _; -use std::{ - collections::HashSet, - sync::atomic::{AtomicU64, Ordering}, -}; +use std::sync::atomic::{AtomicU64, Ordering}; use zksync_concurrency::{ctx, testonly::abort_on_panic, time}; use zksync_protobuf::{kB, testonly::test_encode_random}; -/// CAPABILITY_ID should uniquely identify the RPC. -#[test] -fn test_capability_rpc_correspondence() { - let ids = [ - consensus::Rpc::CAPABILITY_ID, - push_validator_addrs::Rpc::CAPABILITY_ID, - push_block_store_state::Rpc::CAPABILITY_ID, - get_block::Rpc::CAPABILITY_ID, - ping::Rpc::CAPABILITY_ID, - push_batch_votes::V1, - push_batch_votes::V2, - push_batch_store_state::Rpc::CAPABILITY_ID, - get_batch::Rpc::CAPABILITY_ID, - ]; - assert_eq!(ids.len(), HashSet::from(ids).len()); -} - #[test] fn test_schema_encode_decode() { let rng = &mut ctx::test_root(&ctx::RealClock).rng(); @@ -161,7 +141,7 @@ const RATE: limiter::Rate = limiter::Rate { }; impl Rpc for ExampleRpc { - const CAPABILITY_ID: mux::CapabilityId = 0; + const CAPABILITY: Capability = Capability::Ping; const INFLIGHT: u32 = 5; const METHOD: &'static str = "example"; type Req = (); diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 7728bc45..909e7248 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -1,7 +1,7 @@ //! Testonly utilities. #![allow(dead_code)] use crate::{ - gossip::AttestationStatusWatch, + gossip::attestation, io::{ConsensusInputMessage, Target}, Config, GossipConfig, Network, RpcConfig, Runner, }; @@ -15,7 +15,7 @@ use std::{ }; use zksync_concurrency::{ ctx::{self, channel}, - io, limiter, net, scope, sync, time, + io, limiter, net, scope, sync, }; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -104,7 +104,6 @@ where // due to timeouts. ping_timeout: None, validator_key: Some(validator_key.clone()), - attester_key: None, gossip: GossipConfig { key: rng.gen(), dynamic_inbound_limit: usize::MAX, @@ -143,7 +142,6 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { // due to timeouts. ping_timeout: None, validator_key: None, - attester_key: None, gossip: GossipConfig { key: rng.gen(), dynamic_inbound_limit: usize::MAX, @@ -161,8 +159,6 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { /// Runner for Instance. pub struct InstanceRunner { net_runner: Runner, - attestation_status: Arc, - block_store: Arc, batch_store: Arc, terminate: channel::Receiver<()>, } @@ -172,20 +168,6 @@ impl InstanceRunner { pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { s.spawn_bg(self.net_runner.run(ctx)); - s.spawn_bg(async { - let genesis = self.block_store.genesis().hash(); - loop { - if let Ok(Some(batch_number)) = self.batch_store.next_batch_to_attest(ctx).await - { - self.attestation_status - .update(genesis, batch_number) - .await?; - } - if ctx.sleep(time::Duration::seconds(1)).await.is_err() { - return Ok(()); - } - } - }); let _ = self.terminate.recv(ctx).await; Ok(()) }) @@ -195,25 +177,45 @@ impl InstanceRunner { } } +/// InstanceConfig +pub struct InstanceConfig { + /// cfg + pub cfg: Config, + /// block_store + pub block_store: Arc, + /// batch_store + pub batch_store: Arc, + /// Attestation controller. + /// It is not configured by default. + /// Attestation tests should configure it and consume + /// the certificates on their own (see `attestation::Controller`). + pub attestation: Arc, +} + impl Instance { - /// Construct an instance for a given config. + /// Constructs a new instance. pub fn new( cfg: Config, block_store: Arc, batch_store: Arc, ) -> (Self, InstanceRunner) { - // Semantically we'd want this to be created at the same level as the stores, - // but doing so would introduce a lot of extra cruft in setting up tests. - let attestation_status = - Arc::new(AttestationStatusWatch::new(block_store.genesis().hash())); + Self::new_from_config(InstanceConfig { + cfg, + block_store, + batch_store, + attestation: attestation::Controller::new(None).into(), + }) + } + /// Construct an instance for a given config. + pub fn new_from_config(cfg: InstanceConfig) -> (Self, InstanceRunner) { let (actor_pipe, dispatcher_pipe) = pipe::new(); let (net, net_runner) = Network::new( - cfg, - block_store.clone(), - batch_store.clone(), + cfg.cfg, + cfg.block_store.clone(), + cfg.batch_store.clone(), actor_pipe, - attestation_status.clone(), + cfg.attestation, ); let (terminate_send, terminate_recv) = channel::bounded(1); ( @@ -224,9 +226,7 @@ impl Instance { }, InstanceRunner { net_runner, - attestation_status, - block_store, - batch_store, + batch_store: cfg.batch_store.clone(), terminate: terminate_recv, }, ) diff --git a/node/libs/roles/src/attester/messages/batch.rs b/node/libs/roles/src/attester/messages/batch.rs index e9554314..41ada768 100644 --- a/node/libs/roles/src/attester/messages/batch.rs +++ b/node/libs/roles/src/attester/messages/batch.rs @@ -1,9 +1,5 @@ use super::{GenesisHash, Signed}; -use crate::{ - attester, - validator::{Genesis, Payload}, -}; -use anyhow::{ensure, Context as _}; +use crate::{attester, validator::Payload}; use zksync_consensus_crypto::{keccak256::Keccak256, ByteFmt, Text, TextFmt}; use zksync_consensus_utils::enum_util::Variant; @@ -125,28 +121,11 @@ pub enum BatchQCVerifyError { want: u64, }, /// Bad signer set. - #[error("signers set doesn't match genesis")] + #[error("signers not in committee")] BadSignersSet, - /// No attester committee in genesis. - #[error("No attester committee in genesis")] - AttestersNotInGenesis, -} - -/// Error returned by `BatchQC::add()` if the signature is invalid. -#[derive(thiserror::Error, Debug)] -pub enum BatchQCAddError { - /// Inconsistent messages. - #[error("Trying to add signature for a different message")] - InconsistentMessages, - /// Signer not present in the committee. - #[error("Signer not in committee: {signer:?}")] - SignerNotInCommittee { - /// Signer of the message. - signer: Box, - }, - /// Message already present in BatchQC. - #[error("Message already signed for BatchQC")] - Exists, + /// Genesis mismatch. + #[error("genesis mismatch")] + GenesisMismatch, } impl BatchQC { @@ -160,46 +139,43 @@ impl BatchQC { /// Add a attester's signature. /// Signature is assumed to be already verified. - pub fn add(&mut self, msg: &Signed, genesis: &Genesis) -> anyhow::Result<()> { - use BatchQCAddError as Error; - - let committee = genesis - .attesters - .as_ref() - .context("no attester committee in genesis")?; - - ensure!(self.message == msg.msg, Error::InconsistentMessages); - ensure!(!self.signatures.contains(&msg.key), Error::Exists); - ensure!( - committee.contains(&msg.key), - Error::SignerNotInCommittee { - signer: Box::new(msg.key.clone()), - } + pub fn add( + &mut self, + msg: &Signed, + committee: &attester::Committee, + ) -> anyhow::Result<()> { + anyhow::ensure!(self.message == msg.msg, "inconsistent messages"); + anyhow::ensure!( + !self.signatures.contains(&msg.key), + "signature already present" ); - + anyhow::ensure!(committee.contains(&msg.key), "not in committee"); self.signatures.add(msg.key.clone(), msg.sig.clone()); - Ok(()) } - /// Verifies the signature of the BatchQC. - pub fn verify(&self, genesis: &Genesis) -> Result<(), BatchQCVerifyError> { + /// Verifies the the BatchQC. + pub fn verify( + &self, + genesis: GenesisHash, + committee: &attester::Committee, + ) -> Result<(), BatchQCVerifyError> { use BatchQCVerifyError as Error; - let attesters = genesis - .attesters - .as_ref() - .ok_or(Error::AttestersNotInGenesis)?; + + if self.message.genesis != genesis { + return Err(Error::GenesisMismatch); + } // Verify that all signers are attesters. for pk in self.signatures.keys() { - if !attesters.contains(pk) { + if !committee.contains(pk) { return Err(Error::BadSignersSet); } } // Verify that the signer's weight is sufficient. - let weight = attesters.weight_of_keys(self.signatures.keys()); - let threshold = attesters.threshold(); + let weight = committee.weight_of_keys(self.signatures.keys()); + let threshold = committee.threshold(); if weight < threshold { return Err(Error::NotEnoughSigners { got: weight, diff --git a/node/libs/roles/src/attester/tests.rs b/node/libs/roles/src/attester/tests.rs index 57c577a8..c9d85cd4 100644 --- a/node/libs/roles/src/attester/tests.rs +++ b/node/libs/roles/src/attester/tests.rs @@ -165,25 +165,37 @@ fn test_batch_qc() { // Create QCs with increasing number of attesters. for i in 0..setup1.attester_keys.len() + 1 { - let mut qc = BatchQC::new(rng.gen()); + let mut qc = BatchQC::new(Batch { + genesis: setup1.genesis.hash(), + number: rng.gen(), + hash: rng.gen(), + }); for key in &setup1.attester_keys[0..i] { - qc.add(&key.sign_msg(qc.message.clone()), &setup1.genesis) + qc.add(&key.sign_msg(qc.message.clone()), attesters) .unwrap(); } let expected_weight: u64 = attesters.iter().take(i).map(|w| w.weight).sum(); if expected_weight >= attesters.threshold() { - qc.verify(&setup1.genesis).expect("failed to verify QC"); + qc.verify(setup1.genesis.hash(), attesters) + .expect("failed to verify QC"); } else { assert_matches!( - qc.verify(&setup1.genesis), + qc.verify(setup1.genesis.hash(), attesters), Err(Error::NotEnoughSigners { .. }) ); } // Mismatching attesters sets. - assert!(qc.verify(&setup2.genesis).is_err()); - assert!(qc.verify(&genesis3).is_err()); + assert!(qc + .verify( + setup1.genesis.hash(), + setup2.genesis.attesters.as_ref().unwrap() + ) + .is_err()); + assert!(qc + .verify(setup1.genesis.hash(), genesis3.attesters.as_ref().unwrap()) + .is_err()); } } @@ -196,21 +208,14 @@ fn test_attester_committee_weights() { let setup = Setup::new_with_weights(rng, vec![1000, 600, 800, 6000, 900, 700]); // Expected sum of the attesters weights let sums = [1000, 1600, 2400, 8400, 9300, 10000]; + let attesters = setup.genesis.attesters.as_ref().unwrap(); let msg: Batch = rng.gen(); let mut qc = BatchQC::new(msg.clone()); - for (n, weight) in sums.iter().enumerate() { - let key = &setup.attester_keys[n]; - qc.add(&key.sign_msg(msg.clone()), &setup.genesis).unwrap(); - assert_eq!( - setup - .genesis - .attesters - .as_ref() - .unwrap() - .weight_of_keys(qc.signatures.keys()), - *weight - ); + for (i, weight) in sums.iter().enumerate() { + let key = &setup.attester_keys[i]; + qc.add(&key.sign_msg(msg.clone()), attesters).unwrap(); + assert_eq!(attesters.weight_of_keys(qc.signatures.keys()), *weight); } } diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index f4ce2b35..c9a3a314 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -6,10 +6,7 @@ use super::{ ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, View, ViewNumber, WeightedValidator, }; -use crate::{ - attester::{self, BatchNumber, SyncBatch}, - validator::LeaderSelectionMode, -}; +use crate::{attester, validator::LeaderSelectionMode}; use bit_vec::BitVec; use rand::{ distributions::{Distribution, Standard}, @@ -146,12 +143,12 @@ impl Setup { pub fn push_batch(&mut self, rng: &mut impl Rng) { let batch_number = match self.0.batches.last() { Some(b) => b.number.next(), - None => BatchNumber(0), + None => attester::BatchNumber(0), }; let size: usize = rng.gen_range(500..1000); let payloads = vec![Payload((0..size).map(|_| rng.gen()).collect())]; let proof = rng.gen::<[u8; 32]>().to_vec(); - let batch = SyncBatch { + let batch = attester::SyncBatch { number: batch_number, payloads, proof, @@ -205,7 +202,7 @@ pub struct SetupInner { /// Past blocks. pub blocks: Vec, /// L1 batches - pub batches: Vec, + pub batches: Vec, /// Genesis config. pub genesis: Genesis, } diff --git a/node/libs/storage/src/batch_store/mod.rs b/node/libs/storage/src/batch_store/mod.rs index 43d060a0..1643bedd 100644 --- a/node/libs/storage/src/batch_store/mod.rs +++ b/node/libs/storage/src/batch_store/mod.rs @@ -54,21 +54,6 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { /// Range of batches persisted in storage. fn persisted(&self) -> sync::watch::Receiver; - /// Get the next L1 batch for which attesters are expected to produce a quorum certificate. - /// - /// An external node might never have a complete history of L1 batch QCs. Once the L1 batch is included on L1, - /// the external nodes might use the [attester::SyncBatch] route to obtain them, in which case they will not - /// have a QC and no reason to get them either. The main node, however, will want to have a QC for all batches. - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result>; - - /// Get the L1 batch QC from storage with the highest number. - /// - /// Returns `None` if we don't have a QC for any of the batches yet. - async fn last_batch_qc(&self, ctx: &ctx::Ctx) -> ctx::Result>; - /// Returns the [attester::SyncBatch] with the given number, which is used by peers /// to catch up with L1 batches that they might have missed if they went offline. async fn get_batch( @@ -77,25 +62,6 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { number: attester::BatchNumber, ) -> ctx::Result>; - /// Returns the [attester::Batch] with the given number, which is the `message` that - /// appears in [attester::BatchQC], and represents the content that needs to be signed - /// by the attesters. - async fn get_batch_to_sign( - &self, - ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result>; - - /// Returns the QC of the batch with the given number. - async fn get_batch_qc( - &self, - ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result>; - - /// Store the given batch QC in the storage persistently. - async fn store_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()>; - /// Queue the batch to be persisted in storage. /// `queue_next_batch()` may return BEFORE the batch is actually persisted, /// but if the call succeeded the batch is expected to be persisted eventually. @@ -294,48 +260,6 @@ impl BatchStore { Ok(batch) } - /// Retrieve the next batch number that doesn't have a QC yet and will need to be signed. - pub async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - let t = metrics::PERSISTENT_BATCH_STORE - .next_batch_to_attest_latency - .start(); - - let batch = self - .persistent - .next_batch_to_attest(ctx) - .await - .wrap("persistent.next_batch_to_attest()")?; - - t.observe(); - Ok(batch) - } - - /// Retrieve a batch to be signed. - /// - /// This might be `None` even if the L1 batch already exists, because the commitment - /// in it is populated asynchronously. - pub async fn batch_to_sign( - &self, - ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result> { - let t = metrics::PERSISTENT_BATCH_STORE - .batch_to_sign_latency - .start(); - - let batch = self - .persistent - .get_batch_to_sign(ctx, number) - .await - .wrap("persistent.get_batch_to_sign()")?; - - t.observe(); - Ok(batch) - } - /// Append batch to a queue to be persisted eventually. /// Since persisting a batch may take a significant amount of time, /// BatchStore contains a queue of batches waiting to be persisted. @@ -365,27 +289,6 @@ impl BatchStore { Ok(()) } - /// Wait until the database has a batch, then attach the corresponding QC. - #[tracing::instrument(skip_all, fields(l1_batch = %qc.message.number))] - pub async fn persist_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { - let t = metrics::BATCH_STORE.persist_batch_qc.start(); - // The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload - // isn't yet available, but to be safe we can wait for the availability signal here as well. - sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { - qc.message.number < persisted.next() - }) - .await?; - // Now it's definitely safe to store it. - metrics::BATCH_STORE - .last_persisted_batch_qc - .set(qc.message.number.0); - - self.persistent.store_qc(ctx, qc).await?; - - t.observe(); - Ok(()) - } - /// Waits until the given batch is queued (in memory, or persisted). /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. #[tracing::instrument(skip_all, fields(l1_batch = %number))] diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 2ca8858e..487356ab 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -5,7 +5,7 @@ use crate::{ }; use anyhow::Context as _; use std::{ - collections::{HashMap, VecDeque}, + collections::VecDeque, sync::{Arc, Mutex}, }; use zksync_concurrency::{ctx, sync}; @@ -20,10 +20,8 @@ struct BlockStoreInner { #[derive(Debug)] struct BatchStoreInner { - genesis: validator::Genesis, persisted: sync::watch::Sender, batches: Mutex>, - certs: Mutex>, } /// In-memory block store. @@ -70,12 +68,10 @@ impl BlockStore { impl BatchStore { /// New In-memory `BatchStore`. - pub fn new(genesis: validator::Genesis, first: attester::BatchNumber) -> Self { + pub fn new(first: attester::BatchNumber) -> Self { Self(Arc::new(BatchStoreInner { - genesis, persisted: sync::watch::channel(BatchStoreState { first, last: None }).0, batches: Mutex::default(), - certs: Mutex::default(), })) } } @@ -133,60 +129,6 @@ impl PersistentBatchStore for BatchStore { self.0.persisted.subscribe() } - async fn last_batch_qc(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - let certs = self.0.certs.lock().unwrap(); - let last_batch_number = certs.keys().max().unwrap(); - Ok(certs.get(last_batch_number).cloned()) - } - - async fn next_batch_to_attest( - &self, - _ctx: &ctx::Ctx, - ) -> ctx::Result> { - let batches = self.0.batches.lock().unwrap(); - let certs = self.0.certs.lock().unwrap(); - - Ok(batches - .iter() - .map(|b| b.number) - .find(|n| !certs.contains_key(n))) - } - - async fn get_batch_to_sign( - &self, - ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result> { - // Here we just produce some deterministic mock hash. The real hash is available in the database. - // and contains a commitment to the data submitted to L1. It is *not* over SyncBatch. - let Some(batch) = self.get_batch(ctx, number).await? else { - return Ok(None); - }; - - let bz = zksync_protobuf::canonical(&batch); - let hash = zksync_consensus_crypto::keccak256::Keccak256::new(&bz); - - Ok(Some(attester::Batch { - number, - hash: attester::BatchHash(hash), - genesis: self.0.genesis.hash(), - })) - } - - async fn get_batch_qc( - &self, - _ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result> { - let certs = self.0.certs.lock().unwrap(); - Ok(certs.get(&number).cloned()) - } - - async fn store_qc(&self, _ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { - self.0.certs.lock().unwrap().insert(qc.message.number, qc); - Ok(()) - } - async fn get_batch( &self, _ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/testonly/mod.rs b/node/libs/storage/src/testonly/mod.rs index a6329ba8..38c4a4ba 100644 --- a/node/libs/storage/src/testonly/mod.rs +++ b/node/libs/storage/src/testonly/mod.rs @@ -89,7 +89,7 @@ impl TestMemoryStorage { first: validator::BlockNumber, ) -> Self { let im_blocks = in_memory::BlockStore::new(genesis.clone(), first); - let im_batches = in_memory::BatchStore::new(genesis.clone(), attester::BatchNumber(0)); + let im_batches = in_memory::BatchStore::new(attester::BatchNumber(0)); Self::new_with_im(ctx, im_blocks, im_batches).await } diff --git a/node/libs/storage/src/tests.rs b/node/libs/storage/src/tests.rs index 11fa5431..36266bba 100644 --- a/node/libs/storage/src/tests.rs +++ b/node/libs/storage/src/tests.rs @@ -32,7 +32,7 @@ async fn test_inmemory_batch_store() { let mut setup = Setup::new(rng, 3); setup.push_batches(rng, 5); - let store = &testonly::in_memory::BatchStore::new(setup.genesis.clone(), BatchNumber(0)); + let store = &testonly::in_memory::BatchStore::new(BatchNumber(0)); let mut want = vec![]; for batch in &setup.batches { store.queue_next_batch(ctx, batch.clone()).await.unwrap(); diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 97d49fbf..cb84e48d 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -7,12 +7,13 @@ use std::{ fs, io, net::SocketAddr, path::PathBuf, + sync::Arc, }; use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use zksync_concurrency::{ctx, net, scope, time}; +use zksync_concurrency::{ctx, net, time}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; -use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner}; +use zksync_consensus_executor::{self as executor, attestation}; use zksync_consensus_network::http; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner}; @@ -259,24 +260,12 @@ impl Configs { pub async fn make_executor( &self, ctx: &ctx::Ctx, - ) -> ctx::Result<(executor::Executor, TestExecutorRunner)> { + ) -> ctx::Result<(executor::Executor, TestMemoryStorageRunner)> { let replica_store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?; let store = TestMemoryStorage::new(ctx, &self.app.genesis).await; - // We don't have an API to poll in this setup, we can only create a local store based attestation client. - let (attestation_status, attestation_status_runner) = - AttestationStatusRunner::init_from_store( - ctx, - store.batches.clone(), - time::Duration::seconds(1), - self.app.genesis.hash(), - ) - .await?; - - let runner = TestExecutorRunner { - storage_runner: store.runner, - attestation_status_runner, - }; + let attestation = Arc::new(attestation::Controller::new(self.app.attester_key.clone())); + let runner = store.runner; let e = executor::Executor { config: executor::Config { @@ -314,12 +303,7 @@ impl Configs { self.app.max_payload_size, )), }), - attester: self - .app - .attester_key - .as_ref() - .map(|key| executor::Attester { key: key.clone() }), - attestation_status, + attestation, }; Ok((e, runner)) } @@ -346,20 +330,3 @@ fn load_private_key(path: &PathBuf) -> anyhow::Result> { // Load and return a single private key. Ok(rustls_pemfile::private_key(&mut reader).map(|key| key.expect("Private key not found"))?) } - -pub struct TestExecutorRunner { - storage_runner: TestMemoryStorageRunner, - attestation_status_runner: AttestationStatusRunner, -} - -impl TestExecutorRunner { - /// Runs the storage and the attestation status. - pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - scope::run!(ctx, |ctx, s| async { - s.spawn(self.storage_runner.run(ctx)); - s.spawn(self.attestation_status_runner.run(ctx)); - Ok(()) - }) - .await - } -}