diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index db55131f..c949b9bd 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -5,7 +5,7 @@ use anyhow::Context; use std::sync::Arc; use zksync_concurrency::{ctx, sync, time}; use zksync_consensus_network::gossip::{ - AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, + AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, }; use zksync_consensus_roles::attester; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -120,10 +120,10 @@ pub trait AttestationStatusClient: 'static + Send + Sync { /// /// The API might return an error while genesis is being created, which we represent with `None` /// here and mean that we'll have to try again later. - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result>; + /// + /// The genesis hash is returned along with the new batch number to facilitate detecting reorgs + /// on the main node as soon as possible and prevent inconsistent state from entering the system. + async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result>; } /// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch]. @@ -143,8 +143,12 @@ impl AttestationStatusRunner { ctx: &ctx::Ctx, client: Box, poll_interval: time::Duration, - ) -> ctx::OrCanceled<(Arc, Self)> { - let status = Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0))); + genesis: attester::GenesisHash, + ) -> ctx::Result<(Arc, Self)> { + let status = Arc::new(AttestationStatusWatch::new( + genesis, + attester::BatchNumber::default(), + )); let mut runner = Self { status: status.clone(), client, @@ -157,25 +161,32 @@ impl AttestationStatusRunner { /// Initialize an [AttestationStatusWatch] based on a [BatchStore] and return it along with the [AttestationStatusRunner]. pub async fn init_from_store( ctx: &ctx::Ctx, - store: Arc, + batch_store: Arc, poll_interval: time::Duration, - ) -> ctx::OrCanceled<(Arc, Self)> { + genesis: attester::GenesisHash, + ) -> ctx::Result<(Arc, Self)> { Self::init( ctx, - Box::new(LocalAttestationStatusClient(store)), + Box::new(LocalAttestationStatusClient { + genesis, + batch_store, + }), poll_interval, + genesis, ) .await } /// Run the poll loop. pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let _ = self.poll_forever(ctx).await; - Ok(()) + match self.poll_forever(ctx).await { + Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), + Err(ctx::Error::Internal(err)) => Err(err), + } } /// Poll the client forever in a loop or until canceled. - async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> { + async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { loop { self.poll_until_some(ctx).await?; ctx.sleep(self.poll_interval).await?; @@ -183,11 +194,13 @@ impl AttestationStatusRunner { } /// Poll the client until some data is returned and write it into the status. - async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> { + async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { loop { - match self.client.next_batch_to_attest(ctx).await { - Ok(Some(next_batch_to_attest)) => { - self.status.update(next_batch_to_attest).await; + match self.client.attestation_status(ctx).await { + Ok(Some(status)) => { + self.status + .update(status.genesis, status.next_batch_to_attest) + .await?; return Ok(()); } Ok(None) => { @@ -206,14 +219,20 @@ impl AttestationStatusRunner { } /// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. -struct LocalAttestationStatusClient(Arc); +struct LocalAttestationStatusClient { + /// We don't expect the genesis to change while the main node is running, + /// so we can just cache the genesis hash and return it for every request. + genesis: attester::GenesisHash, + batch_store: Arc, +} #[async_trait::async_trait] impl AttestationStatusClient for LocalAttestationStatusClient { - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - self.0.next_batch_to_attest(ctx).await + async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result> { + let batch_number = self.batch_store.next_batch_to_attest(ctx).await?; + Ok(batch_number.map(|n| AttestationStatus { + genesis: self.genesis, + next_batch_to_attest: n, + })) } } diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 92a09462..287a2717 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -1,5 +1,5 @@ //! High-level tests for `Executor`. -use std::sync::atomic::AtomicU64; +use std::sync::{atomic::AtomicU64, Mutex}; use super::*; use attestation::{AttestationStatusClient, AttestationStatusRunner}; @@ -7,7 +7,10 @@ use rand::Rng as _; use tracing::Instrument as _; use zksync_concurrency::{sync, testonly::abort_on_panic}; use zksync_consensus_bft as bft; -use zksync_consensus_network::testonly::{new_configs, new_fullnode}; +use zksync_consensus_network::{ + gossip::AttestationStatus, + testonly::{new_configs, new_fullnode}, +}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; use zksync_consensus_storage::{ testonly::{in_memory, TestMemoryStorage}, @@ -32,8 +35,11 @@ fn config(cfg: &network::Config) -> Config { /// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch] /// that will never be updated. -fn never_attest() -> Arc { - Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0))) +fn never_attest(genesis: &validator::Genesis) -> Arc { + Arc::new(AttestationStatusWatch::new( + genesis.hash(), + attester::BatchNumber::default(), + )) } fn validator( @@ -42,6 +48,7 @@ fn validator( batch_store: Arc, replica_store: impl ReplicaStore, ) -> Executor { + let attestation_status = never_attest(block_store.genesis()); Executor { config: config(cfg), block_store, @@ -52,7 +59,7 @@ fn validator( payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), attester: None, - attestation_status: never_attest(), + attestation_status, } } @@ -61,13 +68,14 @@ fn fullnode( block_store: Arc, batch_store: Arc, ) -> Executor { + let attestation_status = never_attest(block_store.genesis()); Executor { config: config(cfg), block_store, batch_store, validator: None, attester: None, - attestation_status: never_attest(), + attestation_status, } } @@ -322,18 +330,22 @@ async fn test_attestation_status_runner() { abort_on_panic(); let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(5)); let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + let rng = &mut ctx.rng(); + + let genesis: attester::GenesisHash = rng.gen(); - #[derive(Default)] + #[derive(Clone)] struct MockAttestationStatus { - batch_number: AtomicU64, + genesis: Arc>, + batch_number: Arc, } #[async_trait::async_trait] impl AttestationStatusClient for MockAttestationStatus { - async fn next_batch_to_attest( + async fn attestation_status( &self, _ctx: &ctx::Ctx, - ) -> ctx::Result> { + ) -> ctx::Result> { let curr = self .batch_number .fetch_add(1u64, std::sync::atomic::Ordering::Relaxed); @@ -342,16 +354,25 @@ async fn test_attestation_status_runner() { Ok(None) } else { // The first actual result will be 1 on the 2nd poll. - Ok(Some(attester::BatchNumber(curr))) + let status = AttestationStatus { + genesis: *self.genesis.lock().unwrap(), + next_batch_to_attest: attester::BatchNumber(curr), + }; + Ok(Some(status)) } } } - scope::run!(ctx, |ctx, s| async { + let res = scope::run!(ctx, |ctx, s| async { + let client = MockAttestationStatus { + genesis: Arc::new(Mutex::new(genesis)), + batch_number: Arc::new(AtomicU64::default()), + }; let (status, runner) = AttestationStatusRunner::init( ctx, - Box::new(MockAttestationStatus::default()), + Box::new(client.clone()), time::Duration::milliseconds(100), + genesis, ) .await .unwrap(); @@ -364,15 +385,27 @@ async fn test_attestation_status_runner() { let status = sync::changed(ctx, &mut recv_status).await?; assert_eq!(status.next_batch_to_attest.0, 1); } - // Now start polling for new values. - s.spawn_bg(runner.run(ctx)); + // Now start polling for new values. Starting in the foreground because we want it to fail in the end. + s.spawn(runner.run(ctx)); // Check that polling sets the value. { let status = sync::changed(ctx, &mut recv_status).await?; assert_eq!(status.next_batch_to_attest.0, 2); } + // Change the genesis returned by the client. It should cause the scope to fail. + { + let mut genesis = client.genesis.lock().unwrap(); + *genesis = rng.gen(); + } Ok(()) }) - .await - .unwrap(); + .await; + + match res { + Ok(()) => panic!("expected to fail when the genesis changed"), + Err(e) => assert!( + e.to_string().contains("genesis changed"), + "only expect failures due to genesis change" + ), + } } diff --git a/node/actors/network/src/gossip/attestation_status.rs b/node/actors/network/src/gossip/attestation_status.rs index b4e19f6d..514bb84b 100644 --- a/node/actors/network/src/gossip/attestation_status.rs +++ b/node/actors/network/src/gossip/attestation_status.rs @@ -6,13 +6,20 @@ use zksync_consensus_roles::attester; use crate::watch::Watch; /// Coordinate the attestation by showing the status as seen by the main node. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct AttestationStatus { /// Next batch number where voting is expected. /// /// The node is expected to poll the main node during initialization until /// the batch to start from is established. pub next_batch_to_attest: attester::BatchNumber, + /// The hash of the genesis of the chain to which the L1 batches belong. + /// + /// We don't expect to handle a regenesis on the fly without restarting the + /// node, so this value is not expected to change; it's here only to stop + /// any attempt at updating the status with a batch number that refers + /// to a different fork. + pub genesis: attester::GenesisHash, } /// The subscription over the attestation status which voters can monitor for change. @@ -31,8 +38,12 @@ impl fmt::Debug for AttestationStatusWatch { impl AttestationStatusWatch { /// Create a new watch going from a specific batch number. - pub fn new(next_batch_to_attest: attester::BatchNumber) -> Self { + pub fn new( + genesis: attester::GenesisHash, + next_batch_to_attest: attester::BatchNumber, + ) -> Self { Self(Watch::new(AttestationStatus { + genesis, next_batch_to_attest, })) } @@ -43,8 +54,36 @@ impl AttestationStatusWatch { } /// Set the next batch number to attest on and notify subscribers it changed. - pub async fn update(&self, next_batch_to_attest: attester::BatchNumber) { + /// + /// Fails if the genesis we want to update to is not the same as the watch was started with, + /// because the rest of the system is not expected to be able to handle reorgs without a + /// restart of the node. + pub async fn update( + &self, + genesis: attester::GenesisHash, + next_batch_to_attest: attester::BatchNumber, + ) -> anyhow::Result<()> { let this = self.0.lock().await; + { + let status = this.borrow(); + anyhow::ensure!( + status.genesis == genesis, + "the attestation status genesis changed: {:?} -> {:?}", + status.genesis, + genesis + ); + // The next batch to attest moving backwards could cause the voting process + // to get stuck due to the way gossiping works and the BatchVotes discards + // votes below the expected minimum: even if we clear the votes, we might + // not get them again from any peer. By returning an error we can cause + // the node to be restarted and connections re-established for fresh gossip. + anyhow::ensure!( + status.next_batch_to_attest <= next_batch_to_attest, + "next batch to attest moved backwards: {} -> {}", + status.next_batch_to_attest, + next_batch_to_attest + ); + } this.send_if_modified(|status| { if status.next_batch_to_attest == next_batch_to_attest { return false; @@ -52,5 +91,6 @@ impl AttestationStatusWatch { status.next_batch_to_attest = next_batch_to_attest; true }); + Ok(()) } } diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index a6a4e4e1..90e5a1d5 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -12,7 +12,9 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). -pub use self::attestation_status::{AttestationStatusReceiver, AttestationStatusWatch}; +pub use self::attestation_status::{ + AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch, +}; pub use self::batch_votes::BatchVotesPublisher; use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index c61b7658..00de2120 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -162,6 +162,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { pub struct InstanceRunner { net_runner: Runner, attestation_status: Arc, + block_store: Arc, batch_store: Arc, terminate: channel::Receiver<()>, } @@ -172,9 +173,13 @@ impl InstanceRunner { scope::run!(ctx, |ctx, s| async { s.spawn_bg(self.net_runner.run(ctx)); s.spawn_bg(async { + let genesis = self.block_store.genesis().hash(); loop { - if let Ok(Some(n)) = self.batch_store.next_batch_to_attest(ctx).await { - self.attestation_status.update(n).await; + if let Ok(Some(batch_number)) = self.batch_store.next_batch_to_attest(ctx).await + { + self.attestation_status + .update(genesis, batch_number) + .await?; } if ctx.sleep(time::Duration::seconds(1)).await.is_err() { return Ok(()); @@ -199,13 +204,15 @@ impl Instance { ) -> (Self, InstanceRunner) { // Semantically we'd want this to be created at the same level as the stores, // but doing so would introduce a lot of extra cruft in setting up tests. - let attestation_status = - Arc::new(AttestationStatusWatch::new(attester::BatchNumber::default())); + let attestation_status = Arc::new(AttestationStatusWatch::new( + block_store.genesis().hash(), + attester::BatchNumber::default(), + )); let (actor_pipe, dispatcher_pipe) = pipe::new(); let (net, net_runner) = Network::new( cfg, - block_store, + block_store.clone(), batch_store.clone(), actor_pipe, attestation_status.clone(), @@ -220,6 +227,7 @@ impl Instance { InstanceRunner { net_runner, attestation_status, + block_store, batch_store, terminate: terminate_recv, }, diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 30eed712..97d49fbf 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -269,6 +269,7 @@ impl Configs { ctx, store.batches.clone(), time::Duration::seconds(1), + self.app.genesis.hash(), ) .await?;