Skip to content

Commit

Permalink
BFT-496: Do not initialise the AttestationStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Aug 2, 2024
1 parent 0b424a5 commit f55ad05
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 66 deletions.
45 changes: 24 additions & 21 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Context;
use std::sync::Arc;
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_network::gossip::{
AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};
Expand Down Expand Up @@ -58,9 +58,12 @@ impl AttesterRunner {
self.status.mark_changed();

loop {
let batch_number = sync::changed(ctx, &mut self.status)
let Some(batch_number) = sync::changed(ctx, &mut self.status)
.await?
.next_batch_to_attest;
.next_batch_to_attest
else {
continue;
};

tracing::info!(%batch_number, "attestation status");

Expand Down Expand Up @@ -123,7 +126,10 @@ pub trait AttestationStatusClient: 'static + Send + Sync {
///
/// The genesis hash is returned along with the new batch number to facilitate detecting reorgs
/// on the main node as soon as possible and prevent inconsistent state from entering the system.
async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<AttestationStatus>>;
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>>;
}

/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch].
Expand All @@ -140,21 +146,19 @@ impl AttestationStatusRunner {
///
/// It polls the [AttestationStatusClient] until it returns a value to initialize the status with.
pub async fn init(
ctx: &ctx::Ctx,
_ctx: &ctx::Ctx,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
genesis: attester::GenesisHash,
) -> ctx::Result<(Arc<AttestationStatusWatch>, Self)> {
let status = Arc::new(AttestationStatusWatch::new(
genesis,
attester::BatchNumber::default(),
));
let mut runner = Self {
let status = Arc::new(AttestationStatusWatch::new(genesis));
let runner = Self {
status: status.clone(),
client,
poll_interval,
};
runner.poll_until_some(ctx).await?;
// This would initialise the status to some value, however the EN was rolled out first without the main node API.
// runner.poll_until_some(ctx).await?;
Ok((status, runner))
}

Expand Down Expand Up @@ -197,10 +201,8 @@ impl AttestationStatusRunner {
async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
match self.client.attestation_status(ctx).await {
Ok(Some(status)) => {
self.status
.update(status.genesis, status.next_batch_to_attest)
.await?;
Ok(Some((genesis, next_batch_to_attest))) => {
self.status.update(genesis, next_batch_to_attest).await?;
return Ok(());
}
Ok(None) => {
Expand Down Expand Up @@ -228,11 +230,12 @@ struct LocalAttestationStatusClient {

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatusClient {
async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<AttestationStatus>> {
let batch_number = self.batch_store.next_batch_to_attest(ctx).await?;
Ok(batch_number.map(|n| AttestationStatus {
genesis: self.genesis,
next_batch_to_attest: n,
}))
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>> {
let next_batch_to_attest = self.batch_store.next_batch_to_attest(ctx).await?;

Ok(next_batch_to_attest.map(|n| (self.genesis, n)))
}
}
29 changes: 11 additions & 18 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use rand::Rng as _;
use tracing::Instrument as _;
use zksync_concurrency::{sync, testonly::abort_on_panic};
use zksync_consensus_bft as bft;
use zksync_consensus_network::{
gossip::AttestationStatus,
testonly::{new_configs, new_fullnode},
};
use zksync_consensus_network::testonly::{new_configs, new_fullnode};
use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber};
use zksync_consensus_storage::{
testonly::{in_memory, TestMemoryStorage},
Expand All @@ -36,10 +33,7 @@ fn config(cfg: &network::Config) -> Config {
/// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch]
/// that will never be updated.
fn never_attest(genesis: &validator::Genesis) -> Arc<AttestationStatusWatch> {
Arc::new(AttestationStatusWatch::new(
genesis.hash(),
attester::BatchNumber::default(),
))
Arc::new(AttestationStatusWatch::new(genesis.hash()))
}

fn validator(
Expand Down Expand Up @@ -345,7 +339,7 @@ async fn test_attestation_status_runner() {
async fn attestation_status(
&self,
_ctx: &ctx::Ctx,
) -> ctx::Result<Option<AttestationStatus>> {
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>> {
let curr = self
.batch_number
.fetch_add(1u64, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -354,11 +348,9 @@ async fn test_attestation_status_runner() {
Ok(None)
} else {
// The first actual result will be 1 on the 2nd poll.
let status = AttestationStatus {
genesis: *self.genesis.lock().unwrap(),
next_batch_to_attest: attester::BatchNumber(curr),
};
Ok(Some(status))
let genesis = *self.genesis.lock().unwrap();
let next_batch_to_attest = attester::BatchNumber(curr);
Ok(Some((genesis, next_batch_to_attest)))
}
}
}
Expand All @@ -380,17 +372,18 @@ async fn test_attestation_status_runner() {
let mut recv_status = status.subscribe();
recv_status.mark_changed();

// Check that the value has been initialised to a non-default value.
// Check that the value has *not* been initialised to a non-default value.
{
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 1);
assert!(status.next_batch_to_attest.is_none());
}
// Now start polling for new values. Starting in the foreground because we want it to fail in the end.
s.spawn(runner.run(ctx));
// Check that polling sets the value.
{
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 2);
assert!(status.next_batch_to_attest.is_some());
assert_eq!(status.next_batch_to_attest.unwrap().0, 1);
}
// Change the genesis returned by the client. It should cause the scope to fail.
{
Expand All @@ -405,7 +398,7 @@ async fn test_attestation_status_runner() {
Ok(()) => panic!("expected to fail when the genesis changed"),
Err(e) => assert!(
e.to_string().contains("genesis changed"),
"only expect failures due to genesis change"
"only expect failures due to genesis change; got: {e}"
),
}
}
33 changes: 16 additions & 17 deletions node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::watch::Watch;
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
/// The node is expected to poll the main node during initialization until
/// the batch to start from is established.
pub next_batch_to_attest: attester::BatchNumber,
/// The field is optional so that we can start an external node without the main node API
/// already deployed, which is how the initial rollout is.
pub next_batch_to_attest: Option<attester::BatchNumber>,
/// The hash of the genesis of the chain to which the L1 batches belong.
///
/// We don't expect to handle a regenesis on the fly without restarting the
Expand All @@ -37,14 +37,11 @@ impl fmt::Debug for AttestationStatusWatch {
}

impl AttestationStatusWatch {
/// Create a new watch going from a specific batch number.
pub fn new(
genesis: attester::GenesisHash,
next_batch_to_attest: attester::BatchNumber,
) -> Self {
/// Create a new watch with the current genesis, and a yet-to-be-determined batch number.
pub fn new(genesis: attester::GenesisHash) -> Self {
Self(Watch::new(AttestationStatus {
genesis,
next_batch_to_attest,
next_batch_to_attest: None,
}))
}

Expand Down Expand Up @@ -77,18 +74,20 @@ impl AttestationStatusWatch {
// votes below the expected minimum: even if we clear the votes, we might
// not get them again from any peer. By returning an error we can cause
// the node to be restarted and connections re-established for fresh gossip.
anyhow::ensure!(
status.next_batch_to_attest <= next_batch_to_attest,
"next batch to attest moved backwards: {} -> {}",
status.next_batch_to_attest,
next_batch_to_attest
);
if let Some(old_batch_to_attest) = status.next_batch_to_attest {
anyhow::ensure!(
old_batch_to_attest <= next_batch_to_attest,
"next batch to attest moved backwards: {} -> {}",
old_batch_to_attest,
next_batch_to_attest
);
}
}
this.send_if_modified(|status| {
if status.next_batch_to_attest == next_batch_to_attest {
if status.next_batch_to_attest == Some(next_batch_to_attest) {
return false;
}
status.next_batch_to_attest = next_batch_to_attest;
status.next_batch_to_attest = Some(next_batch_to_attest);
true
});
Ok(())
Expand Down
11 changes: 6 additions & 5 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,15 @@ impl Network {

loop {
// Wait until the status indicates that we're ready to sign the next batch.
let next_batch_number = sync::changed(ctx, &mut recv_status)
let Some(batch_number) = sync::changed(ctx, &mut recv_status)
.await?
.next_batch_to_attest;
.next_batch_to_attest
else {
continue;
};

// Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart.
self.batch_votes
.set_min_batch_number(next_batch_number)
.await;
self.batch_votes.set_min_batch_number(batch_number).await;

// Now wait until we find the next quorum, whatever it is:
// * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps
Expand Down
8 changes: 3 additions & 5 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zksync_concurrency::{
ctx::{self, channel},
io, limiter, net, scope, sync, time,
};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore};
use zksync_consensus_utils::pipe;

Expand Down Expand Up @@ -204,10 +204,8 @@ impl Instance {
) -> (Self, InstanceRunner) {
// Semantically we'd want this to be created at the same level as the stores,
// but doing so would introduce a lot of extra cruft in setting up tests.
let attestation_status = Arc::new(AttestationStatusWatch::new(
block_store.genesis().hash(),
attester::BatchNumber::default(),
));
let attestation_status =
Arc::new(AttestationStatusWatch::new(block_store.genesis().hash()));

let (actor_pipe, dispatcher_pipe) = pipe::new();
let (net, net_runner) = Network::new(
Expand Down

0 comments on commit f55ad05

Please sign in to comment.