Skip to content

Commit

Permalink
applied comments, expanded example
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Aug 14, 2024
1 parent dbb9fac commit cd7bcf7
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 43 deletions.
94 changes: 66 additions & 28 deletions node/actors/network/src/gossip/attestation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ mod metrics;
#[cfg(test)]
mod tests;

/// Coordinate the attestation by showing the config as seen by the main node.
/// Configuration of the attestation Controller.
/// It determines what should be attested and by whom.
#[derive(Debug, Clone, PartialEq)]
pub struct Config {
/// Batch to attest.
Expand All @@ -22,8 +23,10 @@ pub struct Config {
#[derive(Clone)]
struct State {
config: Arc<Config>,
/// Votes collected so far.
votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<attester::Batch>>>,
weight: attester::Weight,
// Total weight of the votes collected.
total_weight: attester::Weight,
}

/// Diff between 2 states.
Expand Down Expand Up @@ -94,8 +97,9 @@ impl State {
}
// Verify signature only after checking all the other preconditions.
vote.verify().context("verify")?;
tracing::info!("collected vote with weight {weight} from {:?}", vote.key);
self.votes.insert(vote.key.clone(), vote);
self.weight += weight;
self.total_weight += weight;
Ok(())
}

Expand All @@ -117,8 +121,8 @@ impl State {
Ok(())
}

fn qc(&self) -> Option<attester::BatchQC> {
if self.weight < self.config.committee.threshold() {
fn cert(&self) -> Option<attester::BatchQC> {
if self.total_weight < self.config.committee.threshold() {
return None;
}
let mut sigs = attester::MultiSig::default();
Expand Down Expand Up @@ -167,27 +171,56 @@ impl DiffReceiver {
///
/// Expected usage:
/// ```
/// let ctrl = attestation::Controller::new(Some(key));
/// loop {
/// // Check the global attestation registry.
/// // Compute the next expected batch and the committee that should attest it.
/// ...
/// let config = attestation::Config {
/// batch_to_attest: ...,
/// committee: ...,
/// };
/// ctrl.update_config(Arc::new(config.clone())).unwrap();
/// let ctrl = Arc::new(attestation::Controller::new(Some(key)));
/// // Check what is the number of the next batch to be attested in a
/// // global attestation registry (i.e. L1 chain state).
/// let first : attester::BatchNumber = ...
/// scope::run!(ctx, |ctx,s| async {
/// // Loop starting attestation whenever global attestation state progresses.
/// s.spawn(async {
/// if let Some(qc) = ctrl.wait_for_qc(ctx, config.batch_to_attest.number).await?;
/// // Submit the certificate `qc` to the global registry
/// ...
/// let mut next = first;
/// loop {
/// // Based on the local storage, compute the next expected batch hash
/// // and the committee that should attest it.
/// ...
/// let config = attestation::Config {
/// batch_to_attest: attester::Batch {
/// number: next,
/// ...
/// },
/// committee: ...,
/// };
/// ctrl.start_attestation(Arc::new(config)).unwrap();
/// // Wait for the attestation to progress, by observing the
/// // global attestation registry.
/// next = ...;
/// }
/// });
/// s.spawn(async {
/// // Loop waiting for a certificate to be collected and submitting
/// // it to the global registry
/// loop {
/// let mut next = first;
/// if let Some(qc) = ctrl.wait_for_cert(ctx, next).await?;
/// // Submit the certificate to the global registry.
/// ...
/// next = next.next();
/// }
/// });
/// // Wait for the global registry to include the certificate.
/// ...
///
/// // Make the executor establish the p2p network and
/// // collect the attestation votes.
/// executor::Executor {
/// ...
/// attestation: ctrl.clone(),
/// }.run(ctx).await;
/// }
/// ```
pub struct Controller {
/// Key to automatically vote for batches.
/// None, if the current node is not an attester.
key: Option<attester::SecretKey>,
/// Internal state of the controller.
state: Watch<Option<State>>,
}

Expand Down Expand Up @@ -229,14 +262,14 @@ impl Controller {
let Some(mut state) = locked.borrow().clone() else {
return Ok(());
};
let before = state.weight;
let before = state.total_weight;
let res = state.insert_votes(votes);
if state.weight > before {
if state.total_weight > before {
metrics::METRICS.votes_collected.set(state.votes.len());
#[allow(clippy::float_arithmetic)]
metrics::METRICS
.weight_collected
.set(state.weight as f64 / state.config.committee.total_weight() as f64);
.set(state.total_weight as f64 / state.config.committee.total_weight() as f64);
locked.send_replace(Some(state));
}
res
Expand All @@ -258,7 +291,7 @@ impl Controller {

/// Waits for the certificate for a batch with the given number to be collected.
/// Returns None iff attestation already skipped to collecting certificate for some later batch.
pub async fn wait_for_qc(
pub async fn wait_for_cert(
&self,
ctx: &ctx::Ctx,
n: attester::BatchNumber,
Expand All @@ -276,7 +309,7 @@ impl Controller {
if state.config.batch_to_attest.number > n {
return Ok(None);
}
if let Some(qc) = state.qc() {
if let Some(qc) = state.cert() {
return Ok(Some(qc));
}
}
Expand All @@ -285,7 +318,8 @@ impl Controller {
/// Updates the attestation config.
/// Clears the votes collected for the previous config.
/// Batch number has to increase with each update.
pub async fn update_config(&self, config: Arc<Config>) -> anyhow::Result<()> {
#[tracing::instrument(name = "attestation::Controller::start_attestation", skip_all)]
pub async fn start_attestation(&self, config: Arc<Config>) -> anyhow::Result<()> {
let locked = self.state.lock().await;
let old = locked.borrow().clone();
if let Some(old) = old.as_ref() {
Expand All @@ -301,10 +335,14 @@ impl Controller {
"tried to decrease batch number"
);
}
tracing::info!(
"started collecting votes for batch {:?}",
config.batch_to_attest.number
);
let mut new = State {
config,
votes: im::HashMap::new(),
weight: 0,
total_weight: 0,
};
if let Some(key) = self.key.as_ref() {
if new.config.committee.contains(&key.public()) {
Expand All @@ -323,7 +361,7 @@ impl Controller {
#[allow(clippy::float_arithmetic)]
metrics::METRICS
.weight_collected
.set(new.weight as f64 / new.config.committee.total_weight() as f64);
.set(new.total_weight as f64 / new.config.committee.total_weight() as f64);
locked.send_replace(Some(new));
Ok(())
}
Expand Down
15 changes: 9 additions & 6 deletions node/actors/network/src/gossip/attestation/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn test_insert_votes() {
.into(),
});
let ctrl_votes = || Votes::from(ctrl.votes(&config.batch_to_attest));
ctrl.update_config(config.clone()).await.unwrap();
ctrl.start_attestation(config.clone()).await.unwrap();
assert_eq!(Votes::from([]), ctrl_votes());
let mut recv = ctrl.subscribe();
let diff = recv.wait_for_diff(ctx).await.unwrap();
Expand Down Expand Up @@ -132,7 +132,7 @@ async fn test_insert_votes() {
}

#[tokio::test]
async fn test_wait_for_qc() {
async fn test_wait_for_cert() {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
Expand Down Expand Up @@ -163,7 +163,7 @@ async fn test_wait_for_qc() {
.map(|k| k.sign_msg(config.batch_to_attest.clone()).into())
.collect();
all_votes.shuffle(rng);
ctrl.update_config(config.clone()).await.unwrap();
ctrl.start_attestation(config.clone()).await.unwrap();
loop {
let end = rng.gen_range(0..=committee_size);
tracing::info!("end = {end}");
Expand All @@ -173,7 +173,7 @@ async fn test_wait_for_qc() {
// Waiting for the previous qc should immediately return None.
assert_eq!(
None,
ctrl.wait_for_qc(ctx, config.batch_to_attest.number.prev().unwrap())
ctrl.wait_for_cert(ctx, config.batch_to_attest.number.prev().unwrap())
.await
.unwrap()
);
Expand All @@ -183,15 +183,18 @@ async fn test_wait_for_qc() {
>= config.committee.threshold()
{
let qc = ctrl
.wait_for_qc(ctx, config.batch_to_attest.number)
.wait_for_cert(ctx, config.batch_to_attest.number)
.await
.unwrap()
.unwrap();
assert_eq!(qc.message, config.batch_to_attest);
qc.verify(genesis, &config.committee).unwrap();
break;
}
assert_eq!(None, ctrl.state.subscribe().borrow().as_ref().unwrap().qc());
assert_eq!(
None,
ctrl.state.subscribe().borrow().as_ref().unwrap().cert()
);
}
}
}
4 changes: 2 additions & 2 deletions node/actors/network/src/gossip/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,13 +554,13 @@ async fn test_batch_votes_propagation() {
return Ok(());
};
let attestation = node.net.gossip.attestation.clone();
attestation.update_config(cfg.clone()).await.unwrap();
attestation.start_attestation(cfg.clone()).await.unwrap();
// Wait for the certificate in the background.
s.spawn_bg(async {
let r = r;
let attestation = attestation;
let Ok(Some(qc)) = attestation
.wait_for_qc(ctx, cfg.batch_to_attest.number)
.wait_for_cert(ctx, cfg.batch_to_attest.number)
.await
else {
return Ok(());
Expand Down
11 changes: 4 additions & 7 deletions node/libs/roles/src/validator/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use super::{
ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, View, ViewNumber,
WeightedValidator,
};
use crate::{
attester::{self, BatchNumber, SyncBatch},
validator::LeaderSelectionMode,
};
use crate::{attester, validator::LeaderSelectionMode};
use bit_vec::BitVec;
use rand::{
distributions::{Distribution, Standard},
Expand Down Expand Up @@ -146,12 +143,12 @@ impl Setup {
pub fn push_batch(&mut self, rng: &mut impl Rng) {
let batch_number = match self.0.batches.last() {
Some(b) => b.number.next(),
None => BatchNumber(0),
None => attester::BatchNumber(0),
};
let size: usize = rng.gen_range(500..1000);
let payloads = vec![Payload((0..size).map(|_| rng.gen()).collect())];
let proof = rng.gen::<[u8; 32]>().to_vec();
let batch = SyncBatch {
let batch = attester::SyncBatch {
number: batch_number,
payloads,
proof,
Expand Down Expand Up @@ -205,7 +202,7 @@ pub struct SetupInner {
/// Past blocks.
pub blocks: Vec<FinalBlock>,
/// L1 batches
pub batches: Vec<SyncBatch>,
pub batches: Vec<attester::SyncBatch>,
/// Genesis config.
pub genesis: Genesis,
}
Expand Down

0 comments on commit cd7bcf7

Please sign in to comment.