From b1b6cbfc8b69f98c36dacfc84e059dcae149d9bd Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Tue, 13 Aug 2024 13:30:38 +0200 Subject: [PATCH] cargo_fmt --- node/actors/executor/src/lib.rs | 3 +- .../network/src/gossip/attestation/mod.rs | 56 +++++++++--- .../network/src/gossip/attestation/tests.rs | 87 +++++++------------ node/actors/network/src/gossip/runner.rs | 2 +- node/actors/network/src/gossip/tests/mod.rs | 44 +++++----- node/actors/network/src/lib.rs | 3 +- .../network/src/rpc/push_batch_votes.rs | 6 +- .../libs/roles/src/attester/messages/batch.rs | 17 ---- node/tools/src/config.rs | 3 +- 9 files changed, 101 insertions(+), 120 deletions(-) diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 14219e8e..2cd86e34 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -2,7 +2,7 @@ use crate::io::Dispatcher; use anyhow::Context as _; use network::http; -pub use network::RpcConfig; +pub use network::{gossip::attestation, RpcConfig}; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -14,7 +14,6 @@ use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; use zksync_protobuf::kB; -pub use network::gossip::attestation; mod io; #[cfg(test)] diff --git a/node/actors/network/src/gossip/attestation/mod.rs b/node/actors/network/src/gossip/attestation/mod.rs index 5e7f6dd8..e5954989 100644 --- a/node/actors/network/src/gossip/attestation/mod.rs +++ b/node/actors/network/src/gossip/attestation/mod.rs @@ -50,17 +50,13 @@ impl State { votes: self.votes.values().cloned().collect(), }; }; - if self - .config - .batch_to_attest - .number != old.config.batch_to_attest.number - { + if self.config.batch_to_attest.number != old.config.batch_to_attest.number { return Diff { config: Some(self.config.clone()), votes: self.votes.values().cloned().collect(), }; } - + Diff { config: None, votes: self @@ -83,9 +79,15 @@ impl State { if vote.msg.number != self.config.batch_to_attest.number { return Ok(()); } - anyhow::ensure!(vote.msg.hash == self.config.batch_to_attest.hash, "batch hash mismatch"); + anyhow::ensure!( + vote.msg.hash == self.config.batch_to_attest.hash, + "batch hash mismatch" + ); let Some(weight) = self.config.committee.weight(&vote.key) else { - anyhow::bail!("received vote signed by an inactive attester: {:?}",vote.key); + anyhow::bail!( + "received vote signed by an inactive attester: {:?}", + vote.key + ); }; if self.votes.contains_key(&vote.key) { return Ok(()); @@ -158,10 +160,32 @@ impl DiffReceiver { /// * adding votes to the state /// * subscribing to the vote set changes /// * waiting for the certificate to be collected -/// +/// /// It also keeps an attester key used to sign the batch vote, /// whenever it belongs the current attester committee. /// Signing happens automatically whenever the committee is updated. +/// +/// Expected usage: +/// ``` +/// let ctrl = attestation::Controller::new(Some(key)); +/// loop { +/// // Check the global attestation registry. +/// // Compute the next expected batch and the committee that should attest it. +/// ... +/// let config = attestation::Config { +/// batch_to_attest: ..., +/// committee: ..., +/// }; +/// ctrl.update_config(Arc::new(config.clone())).unwrap(); +/// s.spawn(async { +/// if let Some(qc) = ctrl.wait_for_qc(ctx, config.batch_to_attest.number).await?; +/// // Submit the certificate `qc` to the global registry +/// ... +/// }); +/// // Wait for the global registry to include the certificate. +/// ... +/// } +/// ``` pub struct Controller { key: Option, state: Watch>, @@ -169,15 +193,14 @@ pub struct Controller { impl fmt::Debug for Controller { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt - .debug_struct("StateWatch") + fmt.debug_struct("StateWatch") .field("key", &self.key) .finish_non_exhaustive() } } impl Controller { - /// Constructs AttestationStatusWatch. + /// Constructs Controller. /// `key` will be used for automatically signing votes. pub fn new(key: Option) -> Self { Self { @@ -220,11 +243,16 @@ impl Controller { } /// Returns votes matching the `want` batch. - pub(crate) fn votes(&self, want: &attester::Batch) -> Vec>> { + pub(crate) fn votes( + &self, + want: &attester::Batch, + ) -> Vec>> { let state = self.state.subscribe(); let state = state.borrow(); let Some(state) = &*state else { return vec![] }; - if &state.config.batch_to_attest != want { return vec![] } + if &state.config.batch_to_attest != want { + return vec![]; + } state.votes.values().cloned().collect() } diff --git a/node/actors/network/src/gossip/attestation/tests.rs b/node/actors/network/src/gossip/attestation/tests.rs index 14f18b2d..227dfa22 100644 --- a/node/actors/network/src/gossip/attestation/tests.rs +++ b/node/actors/network/src/gossip/attestation/tests.rs @@ -35,9 +35,10 @@ async fn test_insert_votes() { key: k.public(), weight: 1250, })) - .unwrap().into(), + .unwrap() + .into(), }); - let ctrl_votes = ||Votes::from(ctrl.votes(&config.batch_to_attest)); + let ctrl_votes = || Votes::from(ctrl.votes(&config.batch_to_attest)); ctrl.update_config(config.clone()).await.unwrap(); assert_eq!(Votes::from([]), ctrl_votes()); let mut recv = ctrl.subscribe(); @@ -51,14 +52,10 @@ async fn test_insert_votes() { .collect(); tracing::info!("Initial votes."); - ctrl - .insert_votes(all_votes[0..3].iter().cloned()) + ctrl.insert_votes(all_votes[0..3].iter().cloned()) .await .unwrap(); - assert_eq!( - Votes::from(all_votes[0..3].iter().cloned()), - ctrl_votes() - ); + assert_eq!(Votes::from(all_votes[0..3].iter().cloned()), ctrl_votes()); let diff = recv.wait_for_diff(ctx).await.unwrap(); assert!(diff.config.is_none()); assert_eq!( @@ -67,18 +64,13 @@ async fn test_insert_votes() { ); tracing::info!("Adding votes gradually."); - ctrl - .insert_votes(all_votes[3..5].iter().cloned()) + ctrl.insert_votes(all_votes[3..5].iter().cloned()) .await .unwrap(); - ctrl - .insert_votes(all_votes[5..7].iter().cloned()) + ctrl.insert_votes(all_votes[5..7].iter().cloned()) .await .unwrap(); - assert_eq!( - Votes::from(all_votes[0..7].iter().cloned()), - ctrl_votes() - ); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); let diff = recv.wait_for_diff(ctx).await.unwrap(); assert!(diff.config.is_none()); assert_eq!( @@ -87,14 +79,10 @@ async fn test_insert_votes() { ); tracing::info!("Readding already inserded votes (noop)."); - ctrl - .insert_votes(all_votes[2..6].iter().cloned()) + ctrl.insert_votes(all_votes[2..6].iter().cloned()) .await .unwrap(); - assert_eq!( - Votes::from(all_votes[0..7].iter().cloned()), - ctrl_votes() - ); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); tracing::info!("Adding votes out of committee (error)."); assert!(ctrl @@ -104,28 +92,21 @@ async fn test_insert_votes() { })) .await .is_err()); - assert_eq!( - Votes::from(all_votes[0..7].iter().cloned()), - ctrl_votes() - ); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); tracing::info!("Adding votes for different batch (noop)."); - ctrl - .insert_votes((0..3).map(|_| { - let k: attester::SecretKey = rng.gen(); - k.sign_msg(attester::Batch { - genesis: config.batch_to_attest.genesis, - number: rng.gen(), - hash: rng.gen(), - }) - .into() - })) - .await - .unwrap(); - assert_eq!( - Votes::from(all_votes[0..7].iter().cloned()), - ctrl_votes() - ); + ctrl.insert_votes((0..3).map(|_| { + let k: attester::SecretKey = rng.gen(); + k.sign_msg(attester::Batch { + genesis: config.batch_to_attest.genesis, + number: rng.gen(), + hash: rng.gen(), + }) + .into() + })) + .await + .unwrap(); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); tracing::info!("Adding incorrect votes (error)."); let mut bad_vote = (*all_votes[7]).clone(); @@ -134,14 +115,10 @@ async fn test_insert_votes() { .insert_votes([bad_vote.into()].into_iter()) .await .is_err()); - assert_eq!( - Votes::from(all_votes[0..7].iter().cloned()), - ctrl_votes() - ); + assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes()); tracing::info!("Add the last vote mixed with already added votes."); - ctrl - .insert_votes(all_votes[5..].iter().cloned()) + ctrl.insert_votes(all_votes[5..].iter().cloned()) .await .unwrap(); assert_eq!(Votes::from(all_votes.clone()), ctrl_votes()); @@ -178,7 +155,8 @@ async fn test_wait_for_qc() { key: k.public(), weight: rng.gen_range(1..=100), })) - .unwrap().into(), + .unwrap() + .into(), }); let mut all_votes: Vec = keys .iter() @@ -189,15 +167,13 @@ async fn test_wait_for_qc() { loop { let end = rng.gen_range(0..=committee_size); tracing::info!("end = {end}"); - ctrl - .insert_votes(all_votes[..end].iter().cloned()) + ctrl.insert_votes(all_votes[..end].iter().cloned()) .await .unwrap(); // Waiting for the previous qc should immediately return None. assert_eq!( None, - ctrl - .wait_for_qc(ctx, config.batch_to_attest.number.prev().unwrap()) + ctrl.wait_for_qc(ctx, config.batch_to_attest.number.prev().unwrap()) .await .unwrap() ); @@ -215,10 +191,7 @@ async fn test_wait_for_qc() { qc.verify(genesis, &config.committee).unwrap(); break; } - assert_eq!( - None, - ctrl.state.subscribe().borrow().as_ref().unwrap().qc() - ); + assert_eq!(None, ctrl.state.subscribe().borrow().as_ref().unwrap().qc()); } } } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 0d675bd3..51158734 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -213,7 +213,7 @@ impl Network { let req = rpc::push_batch_votes::Req { // If the config has changed, we need to re-request all the votes // from peer that we might have ignored earlier. - want_votes_for: diff.config.as_ref().map(|c|c.batch_to_attest.clone()), + want_votes_for: diff.config.as_ref().map(|c| c.batch_to_attest.clone()), votes: diff.votes, }; // NOTE: The response should be non-empty only iff we requested a snapshot. diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/actors/network/src/gossip/tests/mod.rs index a9738ead..e2251a60 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/actors/network/src/gossip/tests/mod.rs @@ -497,24 +497,26 @@ async fn test_batch_votes_propagation() { // Fixed attestation schedule. let first: attester::BatchNumber = rng.gen(); let schedule: Vec<_> = (0..10) - .map(|r| Arc::new(attestation::Config { - batch_to_attest: attester::Batch { - genesis: setup.genesis.hash(), - number: first + r, - hash: rng.gen(), - }, - committee: { - // We select a random subset here. It would be incorrect to choose an empty subset, but - // the chances of that are negligible. - let subset: Vec<_> = setup.attester_keys.iter().filter(|_| rng.gen()).collect(); - attester::Committee::new(subset.iter().map(|k| attester::WeightedAttester { - key: k.public(), - weight: rng.gen_range(5..10), - })) - .unwrap() - .into() - }, - })) + .map(|r| { + Arc::new(attestation::Config { + batch_to_attest: attester::Batch { + genesis: setup.genesis.hash(), + number: first + r, + hash: rng.gen(), + }, + committee: { + // We select a random subset here. It would be incorrect to choose an empty subset, but + // the chances of that are negligible. + let subset: Vec<_> = setup.attester_keys.iter().filter(|_| rng.gen()).collect(); + attester::Committee::new(subset.iter().map(|k| attester::WeightedAttester { + key: k.public(), + weight: rng.gen_range(5..10), + })) + .unwrap() + .into() + }, + }) + }) .collect(); // Round of the schedule that nodes should collect the votes for. @@ -533,10 +535,8 @@ async fn test_batch_votes_propagation() { cfg: cfg.clone(), block_store: store.blocks.clone(), batch_store: store.batches.clone(), - attestation: attestation::Controller::new(Some( - setup.attester_keys[i].clone(), - )) - .into(), + attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone())) + .into(), }); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); // Task going through the schedule, waiting for ANY node to collect the certificate diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 88bb68ec..5578badc 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -59,8 +59,7 @@ impl Network { pipe: ActorPipe, attestation: Arc, ) -> (Arc, Runner) { - let gossip = - gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation); + let gossip = gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation); let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( diff --git a/node/actors/network/src/rpc/push_batch_votes.rs b/node/actors/network/src/rpc/push_batch_votes.rs index 5017edc3..901d120b 100644 --- a/node/actors/network/src/rpc/push_batch_votes.rs +++ b/node/actors/network/src/rpc/push_batch_votes.rs @@ -4,7 +4,7 @@ use crate::proto::gossip as proto; use anyhow::Context as _; use std::sync::Arc; use zksync_consensus_roles::attester; -use zksync_protobuf::{read_optional,ProtoFmt}; +use zksync_protobuf::{read_optional, ProtoFmt}; /// RPC pushing fresh batch votes. pub(crate) struct Rpc; @@ -20,14 +20,14 @@ impl super::Rpc for Rpc { /// Signed batch message that the receiving peer should process. #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct Req { - // Requesting the peer to respond with votes for the batch. + /// Requesting the peer to respond with votes for the batch. pub(crate) want_votes_for: Option, /// New votes that server might be not aware of. pub(crate) votes: Vec>>, } pub(crate) struct Resp { - /// Votes requested by the peer. + /// Votes requested by the peer. pub(crate) votes: Vec>>, } diff --git a/node/libs/roles/src/attester/messages/batch.rs b/node/libs/roles/src/attester/messages/batch.rs index e850740e..41ada768 100644 --- a/node/libs/roles/src/attester/messages/batch.rs +++ b/node/libs/roles/src/attester/messages/batch.rs @@ -128,23 +128,6 @@ pub enum BatchQCVerifyError { GenesisMismatch, } -/// Error returned by `BatchQC::add()` if the signature is invalid. -#[derive(thiserror::Error, Debug)] -pub enum BatchQCAddError { - /// Inconsistent messages. - #[error("Trying to add signature for a different message")] - InconsistentMessages, - /// Signer not present in the committee. - #[error("Signer not in committee: {signer:?}")] - SignerNotInCommittee { - /// Signer of the message. - signer: Box, - }, - /// Message already present in BatchQC. - #[error("Message already signed for BatchQC")] - Exists, -} - impl BatchQC { /// Create a new empty instance for a given `Batch` message. pub fn new(message: Batch) -> Self { diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index e640f9af..f0104610 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -265,8 +265,7 @@ impl Configs { let store = TestMemoryStorage::new(ctx, &self.app.genesis).await; // We don't have an API to poll in this setup, we can only create a local store based attestation client. - let attestation = - Arc::new(attestation::Controller::new(self.app.attester_key.clone())); + let attestation = Arc::new(attestation::Controller::new(self.app.attester_key.clone())); let runner = store.runner; let e = executor::Executor {