Skip to content

Commit

Permalink
cargo_fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Aug 13, 2024
1 parent f5a1337 commit b1b6cbf
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 120 deletions.
3 changes: 1 addition & 2 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::io::Dispatcher;
use anyhow::Context as _;
use network::http;
pub use network::RpcConfig;
pub use network::{gossip::attestation, RpcConfig};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand All @@ -14,7 +14,6 @@ use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
use zksync_protobuf::kB;
pub use network::gossip::attestation;

mod io;
#[cfg(test)]
Expand Down
56 changes: 42 additions & 14 deletions node/actors/network/src/gossip/attestation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,13 @@ impl State {
votes: self.votes.values().cloned().collect(),
};
};
if self
.config
.batch_to_attest
.number != old.config.batch_to_attest.number
{
if self.config.batch_to_attest.number != old.config.batch_to_attest.number {
return Diff {
config: Some(self.config.clone()),
votes: self.votes.values().cloned().collect(),
};
}

Diff {
config: None,
votes: self
Expand All @@ -83,9 +79,15 @@ impl State {
if vote.msg.number != self.config.batch_to_attest.number {
return Ok(());
}
anyhow::ensure!(vote.msg.hash == self.config.batch_to_attest.hash, "batch hash mismatch");
anyhow::ensure!(
vote.msg.hash == self.config.batch_to_attest.hash,
"batch hash mismatch"
);
let Some(weight) = self.config.committee.weight(&vote.key) else {
anyhow::bail!("received vote signed by an inactive attester: {:?}",vote.key);
anyhow::bail!(
"received vote signed by an inactive attester: {:?}",
vote.key
);
};
if self.votes.contains_key(&vote.key) {
return Ok(());
Expand Down Expand Up @@ -158,26 +160,47 @@ impl DiffReceiver {
/// * adding votes to the state
/// * subscribing to the vote set changes
/// * waiting for the certificate to be collected
///
///
/// It also keeps an attester key used to sign the batch vote,
/// whenever it belongs the current attester committee.
/// Signing happens automatically whenever the committee is updated.
///
/// Expected usage:
/// ```
/// let ctrl = attestation::Controller::new(Some(key));
/// loop {
/// // Check the global attestation registry.
/// // Compute the next expected batch and the committee that should attest it.
/// ...
/// let config = attestation::Config {
/// batch_to_attest: ...,
/// committee: ...,
/// };
/// ctrl.update_config(Arc::new(config.clone())).unwrap();
/// s.spawn(async {
/// if let Some(qc) = ctrl.wait_for_qc(ctx, config.batch_to_attest.number).await?;
/// // Submit the certificate `qc` to the global registry
/// ...
/// });
/// // Wait for the global registry to include the certificate.
/// ...
/// }
/// ```
pub struct Controller {
key: Option<attester::SecretKey>,
state: Watch<Option<State>>,
}

impl fmt::Debug for Controller {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt
.debug_struct("StateWatch")
fmt.debug_struct("StateWatch")
.field("key", &self.key)
.finish_non_exhaustive()
}
}

impl Controller {
/// Constructs AttestationStatusWatch.
/// Constructs Controller.
/// `key` will be used for automatically signing votes.
pub fn new(key: Option<attester::SecretKey>) -> Self {
Self {
Expand Down Expand Up @@ -220,11 +243,16 @@ impl Controller {
}

/// Returns votes matching the `want` batch.
pub(crate) fn votes(&self, want: &attester::Batch) -> Vec<Arc<attester::Signed<attester::Batch>>> {
pub(crate) fn votes(
&self,
want: &attester::Batch,
) -> Vec<Arc<attester::Signed<attester::Batch>>> {
let state = self.state.subscribe();
let state = state.borrow();
let Some(state) = &*state else { return vec![] };
if &state.config.batch_to_attest != want { return vec![] }
if &state.config.batch_to_attest != want {
return vec![];
}
state.votes.values().cloned().collect()
}

Expand Down
87 changes: 30 additions & 57 deletions node/actors/network/src/gossip/attestation/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ async fn test_insert_votes() {
key: k.public(),
weight: 1250,
}))
.unwrap().into(),
.unwrap()
.into(),
});
let ctrl_votes = ||Votes::from(ctrl.votes(&config.batch_to_attest));
let ctrl_votes = || Votes::from(ctrl.votes(&config.batch_to_attest));
ctrl.update_config(config.clone()).await.unwrap();
assert_eq!(Votes::from([]), ctrl_votes());
let mut recv = ctrl.subscribe();
Expand All @@ -51,14 +52,10 @@ async fn test_insert_votes() {
.collect();

tracing::info!("Initial votes.");
ctrl
.insert_votes(all_votes[0..3].iter().cloned())
ctrl.insert_votes(all_votes[0..3].iter().cloned())
.await
.unwrap();
assert_eq!(
Votes::from(all_votes[0..3].iter().cloned()),
ctrl_votes()
);
assert_eq!(Votes::from(all_votes[0..3].iter().cloned()), ctrl_votes());
let diff = recv.wait_for_diff(ctx).await.unwrap();
assert!(diff.config.is_none());
assert_eq!(
Expand All @@ -67,18 +64,13 @@ async fn test_insert_votes() {
);

tracing::info!("Adding votes gradually.");
ctrl
.insert_votes(all_votes[3..5].iter().cloned())
ctrl.insert_votes(all_votes[3..5].iter().cloned())
.await
.unwrap();
ctrl
.insert_votes(all_votes[5..7].iter().cloned())
ctrl.insert_votes(all_votes[5..7].iter().cloned())
.await
.unwrap();
assert_eq!(
Votes::from(all_votes[0..7].iter().cloned()),
ctrl_votes()
);
assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes());
let diff = recv.wait_for_diff(ctx).await.unwrap();
assert!(diff.config.is_none());
assert_eq!(
Expand All @@ -87,14 +79,10 @@ async fn test_insert_votes() {
);

tracing::info!("Readding already inserded votes (noop).");
ctrl
.insert_votes(all_votes[2..6].iter().cloned())
ctrl.insert_votes(all_votes[2..6].iter().cloned())
.await
.unwrap();
assert_eq!(
Votes::from(all_votes[0..7].iter().cloned()),
ctrl_votes()
);
assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes());

tracing::info!("Adding votes out of committee (error).");
assert!(ctrl
Expand All @@ -104,28 +92,21 @@ async fn test_insert_votes() {
}))
.await
.is_err());
assert_eq!(
Votes::from(all_votes[0..7].iter().cloned()),
ctrl_votes()
);
assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes());

tracing::info!("Adding votes for different batch (noop).");
ctrl
.insert_votes((0..3).map(|_| {
let k: attester::SecretKey = rng.gen();
k.sign_msg(attester::Batch {
genesis: config.batch_to_attest.genesis,
number: rng.gen(),
hash: rng.gen(),
})
.into()
}))
.await
.unwrap();
assert_eq!(
Votes::from(all_votes[0..7].iter().cloned()),
ctrl_votes()
);
ctrl.insert_votes((0..3).map(|_| {
let k: attester::SecretKey = rng.gen();
k.sign_msg(attester::Batch {
genesis: config.batch_to_attest.genesis,
number: rng.gen(),
hash: rng.gen(),
})
.into()
}))
.await
.unwrap();
assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes());

tracing::info!("Adding incorrect votes (error).");
let mut bad_vote = (*all_votes[7]).clone();
Expand All @@ -134,14 +115,10 @@ async fn test_insert_votes() {
.insert_votes([bad_vote.into()].into_iter())
.await
.is_err());
assert_eq!(
Votes::from(all_votes[0..7].iter().cloned()),
ctrl_votes()
);
assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes());

tracing::info!("Add the last vote mixed with already added votes.");
ctrl
.insert_votes(all_votes[5..].iter().cloned())
ctrl.insert_votes(all_votes[5..].iter().cloned())
.await
.unwrap();
assert_eq!(Votes::from(all_votes.clone()), ctrl_votes());
Expand Down Expand Up @@ -178,7 +155,8 @@ async fn test_wait_for_qc() {
key: k.public(),
weight: rng.gen_range(1..=100),
}))
.unwrap().into(),
.unwrap()
.into(),
});
let mut all_votes: Vec<Vote> = keys
.iter()
Expand All @@ -189,15 +167,13 @@ async fn test_wait_for_qc() {
loop {
let end = rng.gen_range(0..=committee_size);
tracing::info!("end = {end}");
ctrl
.insert_votes(all_votes[..end].iter().cloned())
ctrl.insert_votes(all_votes[..end].iter().cloned())
.await
.unwrap();
// Waiting for the previous qc should immediately return None.
assert_eq!(
None,
ctrl
.wait_for_qc(ctx, config.batch_to_attest.number.prev().unwrap())
ctrl.wait_for_qc(ctx, config.batch_to_attest.number.prev().unwrap())
.await
.unwrap()
);
Expand All @@ -215,10 +191,7 @@ async fn test_wait_for_qc() {
qc.verify(genesis, &config.committee).unwrap();
break;
}
assert_eq!(
None,
ctrl.state.subscribe().borrow().as_ref().unwrap().qc()
);
assert_eq!(None, ctrl.state.subscribe().borrow().as_ref().unwrap().qc());
}
}
}
2 changes: 1 addition & 1 deletion node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl Network {
let req = rpc::push_batch_votes::Req {
// If the config has changed, we need to re-request all the votes
// from peer that we might have ignored earlier.
want_votes_for: diff.config.as_ref().map(|c|c.batch_to_attest.clone()),
want_votes_for: diff.config.as_ref().map(|c| c.batch_to_attest.clone()),
votes: diff.votes,
};
// NOTE: The response should be non-empty only iff we requested a snapshot.
Expand Down
44 changes: 22 additions & 22 deletions node/actors/network/src/gossip/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,24 +497,26 @@ async fn test_batch_votes_propagation() {
// Fixed attestation schedule.
let first: attester::BatchNumber = rng.gen();
let schedule: Vec<_> = (0..10)
.map(|r| Arc::new(attestation::Config {
batch_to_attest: attester::Batch {
genesis: setup.genesis.hash(),
number: first + r,
hash: rng.gen(),
},
committee: {
// We select a random subset here. It would be incorrect to choose an empty subset, but
// the chances of that are negligible.
let subset: Vec<_> = setup.attester_keys.iter().filter(|_| rng.gen()).collect();
attester::Committee::new(subset.iter().map(|k| attester::WeightedAttester {
key: k.public(),
weight: rng.gen_range(5..10),
}))
.unwrap()
.into()
},
}))
.map(|r| {
Arc::new(attestation::Config {
batch_to_attest: attester::Batch {
genesis: setup.genesis.hash(),
number: first + r,
hash: rng.gen(),
},
committee: {
// We select a random subset here. It would be incorrect to choose an empty subset, but
// the chances of that are negligible.
let subset: Vec<_> = setup.attester_keys.iter().filter(|_| rng.gen()).collect();
attester::Committee::new(subset.iter().map(|k| attester::WeightedAttester {
key: k.public(),
weight: rng.gen_range(5..10),
}))
.unwrap()
.into()
},
})
})
.collect();

// Round of the schedule that nodes should collect the votes for.
Expand All @@ -533,10 +535,8 @@ async fn test_batch_votes_propagation() {
cfg: cfg.clone(),
block_store: store.blocks.clone(),
batch_store: store.batches.clone(),
attestation: attestation::Controller::new(Some(
setup.attester_keys[i].clone(),
))
.into(),
attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone()))
.into(),
});
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i)));
// Task going through the schedule, waiting for ANY node to collect the certificate
Expand Down
3 changes: 1 addition & 2 deletions node/actors/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ impl Network {
pipe: ActorPipe<io::InputMessage, io::OutputMessage>,
attestation: Arc<attestation::Controller>,
) -> (Arc<Self>, Runner) {
let gossip =
gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation);
let gossip = gossip::Network::new(cfg, block_store, batch_store, pipe.send, attestation);
let consensus = consensus::Network::new(gossip.clone());
let net = Arc::new(Self { gossip, consensus });
(
Expand Down
6 changes: 3 additions & 3 deletions node/actors/network/src/rpc/push_batch_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::proto::gossip as proto;
use anyhow::Context as _;
use std::sync::Arc;
use zksync_consensus_roles::attester;
use zksync_protobuf::{read_optional,ProtoFmt};
use zksync_protobuf::{read_optional, ProtoFmt};

/// RPC pushing fresh batch votes.
pub(crate) struct Rpc;
Expand All @@ -20,14 +20,14 @@ impl super::Rpc for Rpc {
/// Signed batch message that the receiving peer should process.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Req {
// Requesting the peer to respond with votes for the batch.
/// Requesting the peer to respond with votes for the batch.
pub(crate) want_votes_for: Option<attester::Batch>,
/// New votes that server might be not aware of.
pub(crate) votes: Vec<Arc<attester::Signed<attester::Batch>>>,
}

pub(crate) struct Resp {
/// Votes requested by the peer.
/// Votes requested by the peer.
pub(crate) votes: Vec<Arc<attester::Signed<attester::Batch>>>,
}

Expand Down
Loading

0 comments on commit b1b6cbf

Please sign in to comment.