Skip to content

Commit

Permalink
applied comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Aug 14, 2024
1 parent cd7bcf7 commit eb21f6a
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 58 deletions.
72 changes: 36 additions & 36 deletions node/actors/network/src/gossip/attestation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ mod tests;
/// Configuration of the attestation Controller.
/// It determines what should be attested and by whom.
#[derive(Debug, Clone, PartialEq)]
pub struct Config {
pub struct Info {
/// Batch to attest.
pub batch_to_attest: attester::Batch,
/// Committee that should attest the batch.
pub committee: Arc<attester::Committee>,
}

// Internal attestation state: config and the set of votes collected so far.
// Internal attestation state: info and the set of votes collected so far.
#[derive(Clone)]
struct State {
config: Arc<Config>,
info: Arc<Info>,
/// Votes collected so far.
votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<attester::Batch>>>,
// Total weight of the votes collected.
Expand All @@ -33,13 +33,13 @@ struct State {
pub(crate) struct Diff {
/// New votes.
pub(crate) votes: Vec<Arc<attester::Signed<attester::Batch>>>,
/// New config, if changed.
pub(crate) config: Option<Arc<Config>>,
/// New info, if changed.
pub(crate) info: Option<Arc<Info>>,
}

impl Diff {
fn is_empty(&self) -> bool {
self.votes.is_empty() && self.config.is_none()
self.votes.is_empty() && self.info.is_none()
}
}

Expand All @@ -49,19 +49,19 @@ impl State {
fn diff(&self, old: &Option<Self>) -> Diff {
let Some(old) = old.as_ref() else {
return Diff {
config: Some(self.config.clone()),
info: Some(self.info.clone()),
votes: self.votes.values().cloned().collect(),
};
};
if self.config.batch_to_attest.number != old.config.batch_to_attest.number {
if self.info.batch_to_attest.number != old.info.batch_to_attest.number {
return Diff {
config: Some(self.config.clone()),
info: Some(self.info.clone()),
votes: self.votes.values().cloned().collect(),
};
}

Diff {
config: None,
info: None,
votes: self
.votes
.iter()
Expand All @@ -76,17 +76,17 @@ impl State {
/// Returns an error if genesis doesn't match or the signature is invalid.
fn insert_vote(&mut self, vote: Arc<attester::Signed<attester::Batch>>) -> anyhow::Result<()> {
anyhow::ensure!(
vote.msg.genesis == self.config.batch_to_attest.genesis,
vote.msg.genesis == self.info.batch_to_attest.genesis,
"Genesis mismatch"
);
if vote.msg.number != self.config.batch_to_attest.number {
if vote.msg.number != self.info.batch_to_attest.number {
return Ok(());
}
anyhow::ensure!(
vote.msg.hash == self.config.batch_to_attest.hash,
vote.msg.hash == self.info.batch_to_attest.hash,
"batch hash mismatch"
);
let Some(weight) = self.config.committee.weight(&vote.key) else {
let Some(weight) = self.info.committee.weight(&vote.key) else {
anyhow::bail!(
"received vote signed by an inactive attester: {:?}",
vote.key
Expand Down Expand Up @@ -122,15 +122,15 @@ impl State {
}

fn cert(&self) -> Option<attester::BatchQC> {
if self.total_weight < self.config.committee.threshold() {
if self.total_weight < self.info.committee.threshold() {
return None;
}
let mut sigs = attester::MultiSig::default();
for vote in self.votes.values() {
sigs.add(vote.key.clone(), vote.sig.clone());
}
Some(attester::BatchQC {
message: self.config.batch_to_attest.clone(),
message: self.info.batch_to_attest.clone(),
signatures: sigs,
})
}
Expand Down Expand Up @@ -159,7 +159,7 @@ impl DiffReceiver {
}

/// `Controller` manages the attestation state.
/// It maintains a set of votes matching the attestation config.
/// It maintains a set of votes matching the attestation info.
/// It allows for
/// * adding votes to the state
/// * subscribing to the vote set changes
Expand All @@ -183,14 +183,14 @@ impl DiffReceiver {
/// // Based on the local storage, compute the next expected batch hash
/// // and the committee that should attest it.
/// ...
/// let config = attestation::Config {
/// let info = attestation::Info {
/// batch_to_attest: attester::Batch {
/// number: next,
/// ...
/// },
/// committee: ...,
/// };
/// ctrl.start_attestation(Arc::new(config)).unwrap();
/// ctrl.start_attestation(Arc::new(info)).unwrap();
/// // Wait for the attestation to progress, by observing the
/// // global attestation registry.
/// next = ...;
Expand Down Expand Up @@ -269,7 +269,7 @@ impl Controller {
#[allow(clippy::float_arithmetic)]
metrics::METRICS
.weight_collected
.set(state.total_weight as f64 / state.config.committee.total_weight() as f64);
.set(state.total_weight as f64 / state.info.committee.total_weight() as f64);
locked.send_replace(Some(state));
}
res
Expand All @@ -283,7 +283,7 @@ impl Controller {
let state = self.state.subscribe();
let state = state.borrow();
let Some(state) = &*state else { return vec![] };
if &state.config.batch_to_attest != want {
if &state.info.batch_to_attest != want {
return vec![];
}
state.votes.values().cloned().collect()
Expand All @@ -303,10 +303,10 @@ impl Controller {
let Some(state) = state.as_ref() else {
continue;
};
if state.config.batch_to_attest.number < n {
if state.info.batch_to_attest.number < n {
continue;
};
if state.config.batch_to_attest.number > n {
if state.info.batch_to_attest.number > n {
return Ok(None);
}
if let Some(qc) = state.cert() {
Expand All @@ -315,53 +315,53 @@ impl Controller {
}
}

/// Updates the attestation config.
/// Clears the votes collected for the previous config.
/// Updates the internal configuration to start collecting votes for a new batch.
/// Clears the votes collected for the previous info.
/// Batch number has to increase with each update.
#[tracing::instrument(name = "attestation::Controller::start_attestation", skip_all)]
pub async fn start_attestation(&self, config: Arc<Config>) -> anyhow::Result<()> {
pub async fn start_attestation(&self, info: Arc<Info>) -> anyhow::Result<()> {
let locked = self.state.lock().await;
let old = locked.borrow().clone();
if let Some(old) = old.as_ref() {
if *old.config == *config {
if *old.info == *info {
return Ok(());
}
anyhow::ensure!(
old.config.batch_to_attest.genesis == config.batch_to_attest.genesis,
old.info.batch_to_attest.genesis == info.batch_to_attest.genesis,
"tried to change genesis"
);
anyhow::ensure!(
old.config.batch_to_attest.number < config.batch_to_attest.number,
old.info.batch_to_attest.number < info.batch_to_attest.number,
"tried to decrease batch number"
);
}
tracing::info!(
"started collecting votes for batch {:?}",
config.batch_to_attest.number
info.batch_to_attest.number
);
let mut new = State {
config,
info,
votes: im::HashMap::new(),
total_weight: 0,
};
if let Some(key) = self.key.as_ref() {
if new.config.committee.contains(&key.public()) {
let vote = key.sign_msg(new.config.batch_to_attest.clone());
if new.info.committee.contains(&key.public()) {
let vote = key.sign_msg(new.info.batch_to_attest.clone());
// This is our own vote, so it always should be valid.
new.insert_vote(Arc::new(vote)).unwrap();
}
}
metrics::METRICS
.batch_number
.set(new.config.batch_to_attest.number.0);
.set(new.info.batch_to_attest.number.0);
metrics::METRICS
.committee_size
.set(new.config.committee.len());
.set(new.info.committee.len());
metrics::METRICS.votes_collected.set(new.votes.len());
#[allow(clippy::float_arithmetic)]
metrics::METRICS
.weight_collected
.set(new.total_weight as f64 / new.config.committee.total_weight() as f64);
.set(new.total_weight as f64 / new.info.committee.total_weight() as f64);
locked.send_replace(Some(new));
Ok(())
}
Expand Down
38 changes: 19 additions & 19 deletions node/actors/network/src/gossip/attestation/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn test_insert_votes() {
for i in 0..3 {
tracing::info!("iteration {i}");
let keys: Vec<attester::SecretKey> = (0..8).map(|_| rng.gen()).collect();
let config = Arc::new(Config {
let info = Arc::new(Info {
batch_to_attest: attester::Batch {
genesis,
number: first + i,
Expand All @@ -38,17 +38,17 @@ async fn test_insert_votes() {
.unwrap()
.into(),
});
let ctrl_votes = || Votes::from(ctrl.votes(&config.batch_to_attest));
ctrl.start_attestation(config.clone()).await.unwrap();
let ctrl_votes = || Votes::from(ctrl.votes(&info.batch_to_attest));
ctrl.start_attestation(info.clone()).await.unwrap();
assert_eq!(Votes::from([]), ctrl_votes());
let mut recv = ctrl.subscribe();
let diff = recv.wait_for_diff(ctx).await.unwrap();
assert_eq!(diff.config.as_ref(), Some(&config));
assert_eq!(diff.info.as_ref(), Some(&info));
assert_eq!(Votes::default(), diff.votes.into());

let all_votes: Vec<Vote> = keys
.iter()
.map(|k| k.sign_msg(config.batch_to_attest.clone()).into())
.map(|k| k.sign_msg(info.batch_to_attest.clone()).into())
.collect();

tracing::info!("Initial votes.");
Expand All @@ -57,7 +57,7 @@ async fn test_insert_votes() {
.unwrap();
assert_eq!(Votes::from(all_votes[0..3].iter().cloned()), ctrl_votes());
let diff = recv.wait_for_diff(ctx).await.unwrap();
assert!(diff.config.is_none());
assert!(diff.info.is_none());
assert_eq!(
Votes::from(all_votes[0..3].iter().cloned()),
diff.votes.into()
Expand All @@ -72,7 +72,7 @@ async fn test_insert_votes() {
.unwrap();
assert_eq!(Votes::from(all_votes[0..7].iter().cloned()), ctrl_votes());
let diff = recv.wait_for_diff(ctx).await.unwrap();
assert!(diff.config.is_none());
assert!(diff.info.is_none());
assert_eq!(
Votes::from(all_votes[3..7].iter().cloned()),
diff.votes.into()
Expand All @@ -88,7 +88,7 @@ async fn test_insert_votes() {
assert!(ctrl
.insert_votes((0..3).map(|_| {
let k: attester::SecretKey = rng.gen();
k.sign_msg(config.batch_to_attest.clone()).into()
k.sign_msg(info.batch_to_attest.clone()).into()
}))
.await
.is_err());
Expand All @@ -98,7 +98,7 @@ async fn test_insert_votes() {
ctrl.insert_votes((0..3).map(|_| {
let k: attester::SecretKey = rng.gen();
k.sign_msg(attester::Batch {
genesis: config.batch_to_attest.genesis,
genesis: info.batch_to_attest.genesis,
number: rng.gen(),
hash: rng.gen(),
})
Expand All @@ -123,7 +123,7 @@ async fn test_insert_votes() {
.unwrap();
assert_eq!(Votes::from(all_votes.clone()), ctrl_votes());
let diff = recv.wait_for_diff(ctx).await.unwrap();
assert!(diff.config.is_none());
assert!(diff.info.is_none());
assert_eq!(
Votes::from(all_votes[7..].iter().cloned()),
diff.votes.into()
Expand All @@ -145,7 +145,7 @@ async fn test_wait_for_cert() {
tracing::info!("iteration {i}");
let committee_size = rng.gen_range(1..20);
let keys: Vec<attester::SecretKey> = (0..committee_size).map(|_| rng.gen()).collect();
let config = Arc::new(Config {
let info = Arc::new(Info {
batch_to_attest: attester::Batch {
genesis,
number: first + i,
Expand All @@ -160,10 +160,10 @@ async fn test_wait_for_cert() {
});
let mut all_votes: Vec<Vote> = keys
.iter()
.map(|k| k.sign_msg(config.batch_to_attest.clone()).into())
.map(|k| k.sign_msg(info.batch_to_attest.clone()).into())
.collect();
all_votes.shuffle(rng);
ctrl.start_attestation(config.clone()).await.unwrap();
ctrl.start_attestation(info.clone()).await.unwrap();
loop {
let end = rng.gen_range(0..=committee_size);
tracing::info!("end = {end}");
Expand All @@ -173,22 +173,22 @@ async fn test_wait_for_cert() {
// Waiting for the previous qc should immediately return None.
assert_eq!(
None,
ctrl.wait_for_cert(ctx, config.batch_to_attest.number.prev().unwrap())
ctrl.wait_for_cert(ctx, info.batch_to_attest.number.prev().unwrap())
.await
.unwrap()
);
if config
if info
.committee
.weight_of_keys(all_votes[..end].iter().map(|v| &v.key))
>= config.committee.threshold()
>= info.committee.threshold()
{
let qc = ctrl
.wait_for_cert(ctx, config.batch_to_attest.number)
.wait_for_cert(ctx, info.batch_to_attest.number)
.await
.unwrap()
.unwrap();
assert_eq!(qc.message, config.batch_to_attest);
qc.verify(genesis, &config.committee).unwrap();
assert_eq!(qc.message, info.batch_to_attest);
qc.verify(genesis, &info.committee).unwrap();
break;
}
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ impl Network {
loop {
let diff = recv.wait_for_diff(ctx).await?;
let req = rpc::push_batch_votes::Req {
// If the config has changed, we need to re-request all the votes
// If the info has changed, we need to re-request all the votes
// from peer that we might have ignored earlier.
want_votes_for: diff.config.as_ref().map(|c| c.batch_to_attest.clone()),
want_votes_for: diff.info.as_ref().map(|c| c.batch_to_attest.clone()),
votes: diff.votes,
};
// NOTE: The response should be non-empty only iff we requested a snapshot.
Expand Down
2 changes: 1 addition & 1 deletion node/actors/network/src/gossip/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ async fn test_batch_votes_propagation() {
let first: attester::BatchNumber = rng.gen();
let schedule: Vec<_> = (0..10)
.map(|r| {
Arc::new(attestation::Config {
Arc::new(attestation::Info {
batch_to_attest: attester::Batch {
genesis: setup.genesis.hash(),
number: first + r,
Expand Down

0 comments on commit eb21f6a

Please sign in to comment.