diff --git a/node/Cargo.lock b/node/Cargo.lock index 22ab5e9ee..3ea3e4a46 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -4127,6 +4127,7 @@ name = "zksync_consensus_executor" version = "0.1.0-rc.4" dependencies = [ "anyhow", + "async-trait", "rand 0.8.5", "test-casing", "tokio", diff --git a/node/actors/executor/Cargo.toml b/node/actors/executor/Cargo.toml index dc6b51933..56696efce 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/actors/executor/Cargo.toml @@ -20,6 +20,7 @@ zksync_consensus_utils.workspace = true zksync_protobuf.workspace = true anyhow.workspace = true +async-trait.workspace = true rand.workspace = true tracing.workspace = true vise.workspace = true diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 3a5146fc2..80f3de3b8 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -3,8 +3,8 @@ use crate::Attester; use anyhow::Context; use std::sync::Arc; -use zksync_concurrency::{ctx, time}; -use zksync_consensus_network::gossip::BatchVotesPublisher; +use zksync_concurrency::{ctx, sync, time}; +use zksync_consensus_network::gossip::{AttestationStatusReceiver, BatchVotesPublisher}; use zksync_consensus_roles::attester; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -16,6 +16,7 @@ pub(super) struct AttesterRunner { batch_store: Arc, attester: Attester, publisher: BatchVotesPublisher, + status: AttestationStatusReceiver, } impl AttesterRunner { @@ -25,16 +26,18 @@ impl AttesterRunner { batch_store: Arc, attester: Attester, publisher: BatchVotesPublisher, + status: AttestationStatusReceiver, ) -> Self { Self { block_store, batch_store, attester, publisher, + status, } } /// Poll the database for new L1 batches and publish our signature over the batch. - pub(super) async fn run(self, ctx: &ctx::Ctx) -> ctx::Result<()> { + pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { let public_key = self.attester.key.public(); // TODO: In the future when we have attester rotation these checks will have to be checked inside the loop. let Some(attesters) = self.block_store.genesis().attesters.as_ref() else { @@ -48,28 +51,26 @@ impl AttesterRunner { let genesis = self.block_store.genesis().hash(); - // Find the initial range of batches that we want to (re)sign after a (re)start. - let last_batch_number = self - .batch_store - .wait_until_persisted(ctx, attester::BatchNumber(0)) - .await - .context("wait_until_persisted")? - .last - .unwrap_or_default(); + let mut prev = None; - // Determine the batch to start signing from. - let earliest_batch_number = self - .batch_store - .earliest_batch_number_to_sign(ctx) - .await - .context("earliest_batch_number_to_sign")? - .unwrap_or(last_batch_number); + loop { + let batch_number = + sync::wait_for_some(ctx, &mut self.status, |s| match s.next_batch_to_attest { + next if next == prev => None, + next => next, + }) + .await?; - tracing::info!(%earliest_batch_number, %last_batch_number, "attesting batches"); + tracing::info!(%batch_number, "attestation status"); - let mut batch_number = earliest_batch_number; + // We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence + // to be indicated in memory (which itself relies on polling). This happens once we have the commitment, + // which for nodes that get the blocks through BFT should happen after execution. Nodes which + // rely on batch sync don't participate in attestations as they need the batch on L1 first. + self.batch_store + .wait_until_persisted(ctx, batch_number) + .await?; - loop { // Try to get the next batch to sign; the commitment might not be available just yet. let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?; @@ -85,15 +86,7 @@ impl AttesterRunner { .await .context("publish")?; - batch_number = batch_number.next(); - - // We can avoid actively polling the database by waiting for the next persisted batch to appear - // in the memory (which itself relies on polling). This happens once we have the commitment, - // which for nodes that get the blocks through BFT should happen after execution. Nodes which - // rely on batch sync don't participate in attestations as they need the batch on L1 first. - self.batch_store - .wait_until_persisted(ctx, batch_number) - .await?; + prev = Some(batch_number); } } diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index f0475687d..c094e4297 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -9,7 +9,7 @@ use std::{ }; use zksync_concurrency::{ctx, limiter, net, scope, time}; use zksync_consensus_bft as bft; -use zksync_consensus_network as network; +use zksync_consensus_network::{self as network, gossip::AttestationStatusClient}; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; @@ -97,6 +97,8 @@ pub struct Executor { pub validator: Option, /// Validator-specific node data. pub attester: Option, + /// Client to use to poll attestation status: either through the main node API or the DB. + pub attestation_status_client: Box, } impl Executor { @@ -138,6 +140,7 @@ impl Executor { self.block_store.clone(), self.batch_store.clone(), network_actor_pipe, + self.attestation_status_client, ); net.register_metrics(); s.spawn(async { runner.run(ctx).await.context("Network stopped") }); @@ -149,6 +152,7 @@ impl Executor { self.batch_store.clone(), attester, net.batch_vote_publisher(), + net.attestation_status_receiver(), ); s.spawn(async { runner.run(ctx).await?; diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 1c8c9f412..22ef5a840 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -4,7 +4,10 @@ use rand::Rng as _; use tracing::Instrument as _; use zksync_concurrency::testonly::abort_on_panic; use zksync_consensus_bft as bft; -use zksync_consensus_network::testonly::{new_configs, new_fullnode}; +use zksync_consensus_network::{ + gossip::LocalAttestationStatus, + testonly::{new_configs, new_fullnode}, +}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; use zksync_consensus_storage::{ testonly::{in_memory, TestMemoryStorage}, @@ -26,12 +29,21 @@ fn config(cfg: &network::Config) -> Config { } } +/// The test executors below are not running with attesters, so it doesn't matter if the clients +/// are returning views based on the store of main node or each to their own. For simplicity this +/// returns an implementation that queries the local store of each instance. Alternatively we +/// could implement an instance that never queries anything. +fn mk_attestation_status_client(batch_store: &Arc) -> impl AttestationStatusClient { + LocalAttestationStatus::new(batch_store.clone()) +} + fn validator( cfg: &network::Config, block_store: Arc, batch_store: Arc, replica_store: impl ReplicaStore, ) -> Executor { + let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store)); Executor { config: config(cfg), block_store, @@ -42,6 +54,7 @@ fn validator( payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), attester: None, + attestation_status_client, } } @@ -50,12 +63,14 @@ fn fullnode( block_store: Arc, batch_store: Arc, ) -> Executor { + let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store)); Executor { config: config(cfg), block_store, batch_store, validator: None, attester: None, + attestation_status_client, } } diff --git a/node/actors/network/src/gossip/attestation_status.rs b/node/actors/network/src/gossip/attestation_status.rs new file mode 100644 index 000000000..1df1c4375 --- /dev/null +++ b/node/actors/network/src/gossip/attestation_status.rs @@ -0,0 +1,82 @@ +use std::sync::Arc; + +use zksync_concurrency::{ctx, sync}; +use zksync_consensus_roles::attester; +use zksync_consensus_storage::BatchStore; + +use crate::watch::Watch; + +/// An interface which is used by attesters and nodes collecting votes over gossip to determine +/// which is the next batch they are all supposed to be voting on, according to the main node. +#[async_trait::async_trait] +pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync { + /// Get the next batch number for which the main node expects a batch QC to be formed. + /// + /// The API might return an error while genesis is being created, which we represent with `None` + /// here and mean that we'll have to try again later. + async fn next_batch_to_attest( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result>; +} + +/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. +#[derive(Debug, Clone)] +pub struct LocalAttestationStatus(Arc); + +impl LocalAttestationStatus { + /// Create local attestation client form a [BatchStore]. + pub fn new(store: Arc) -> Self { + Self(store) + } +} + +#[async_trait::async_trait] +impl AttestationStatusClient for LocalAttestationStatus { + async fn next_batch_to_attest( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + self.0.next_batch_to_attest(ctx).await + } +} + +/// Coordinate the attestation by showing the status as seen by the main node. +pub struct AttestationStatus { + /// Next batch number where voting is expected. + /// + /// Its value is `None` until the background process polling the main node + /// can establish a value to start from. + pub next_batch_to_attest: Option, +} + +/// The subscription over the attestation status which votes can monitor for change. +pub type AttestationStatusReceiver = sync::watch::Receiver; + +pub(crate) struct AttestationStatusWatch(Watch); + +impl Default for AttestationStatusWatch { + fn default() -> Self { + Self(Watch::new(AttestationStatus { + next_batch_to_attest: None, + })) + } +} + +impl AttestationStatusWatch { + /// Subscribes to AttestationStatus updates. + pub(crate) fn subscribe(&self) -> AttestationStatusReceiver { + self.0.subscribe() + } + + pub(crate) async fn update(&self, next_batch_to_attest: attester::BatchNumber) { + let this = self.0.lock().await; + this.send_if_modified(|status| match status.next_batch_to_attest { + Some(n) if n == next_batch_to_attest => false, + _ => { + status.next_batch_to_attest = Some(next_batch_to_attest); + true + } + }); + } +} diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 8b0bef57f..a28a7586b 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -12,16 +12,22 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). +pub use self::attestation_status::{ + AttestationStatusClient, AttestationStatusReceiver, LocalAttestationStatus, +}; pub use self::batch_votes::BatchVotesPublisher; use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; +use attestation_status::AttestationStatusWatch; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; pub(crate) use validator_addrs::*; +use zksync_concurrency::time::Duration; use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; +mod attestation_status; mod batch_votes; mod fetch; mod handshake; @@ -58,6 +64,10 @@ pub(crate) struct Network { pub(crate) fetch_queue: fetch::Queue, /// TESTONLY: how many time push_validator_addrs rpc was called by the peers. pub(crate) push_validator_addrs_calls: AtomicUsize, + /// Shared watch over the current attestation status as indicated by the main node. + pub(crate) attestation_status: Arc, + /// Client to use to check the current attestation status on the main node. + pub(crate) attestation_status_client: Box, } impl Network { @@ -67,6 +77,7 @@ impl Network { block_store: Arc, batch_store: Arc, sender: channel::UnboundedSender, + attestation_status_client: Box, ) -> Arc { Arc::new(Self { sender, @@ -82,6 +93,8 @@ impl Network { block_store, batch_store, push_validator_addrs_calls: 0.into(), + attestation_status: Arc::new(AttestationStatusWatch::default()), + attestation_status_client, }) } @@ -148,34 +161,85 @@ impl Network { .await; } - /// Task that keeps hearing about new votes and looks for an L1 batch qc. - /// It will propagate the QC if there's enough votes. + /// Task that reacts to new votes being added and looks for an L1 batch QC. + /// It persists the certificateq once the quorum threshold is passed. pub(crate) async fn run_batch_qc_finder(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { let Some(attesters) = self.genesis().attesters.as_ref() else { + tracing::info!("no attesters in genesis, not looking for batch QCs"); return Ok(()); }; let genesis = self.genesis().hash(); - let mut sub = self.batch_votes.subscribe(); + + let mut recv_votes = self.batch_votes.subscribe(); + let mut recv_status = self.attestation_status.subscribe(); + + let mut prev_batch_number = None; + + 'status: loop { + // Wait until the status indicates that we're ready to sign the next batch. + // This is not strictly necessary but avoids repeatedly finding the same quorum, or having to skip it until it changes. + let next_batch_number = + sync::wait_for_some(ctx, &mut recv_status, |s| match s.next_batch_to_attest { + next if next == prev_batch_number => None, + next => next, + }) + .await?; + + // Next time we'll look for something new. + prev_batch_number = Some(next_batch_number); + + // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. + self.batch_votes + .set_min_batch_number(next_batch_number) + .await; + + // Now wait until we find the next quorum, whatever it is: + // * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps + // * on external nodes the votes might be affected by changes in the value returned by the API, and there might be gaps + loop { + let quorum_opt = { + let votes = sync::changed(ctx, &mut recv_votes).await?; + votes.find_quorum(attesters, &genesis) + }; + + if let Some(qc) = quorum_opt { + self.batch_store + .persist_batch_qc(ctx, qc) + .await + .wrap("persist_batch_qc")?; + + // Carry on to wait for the next status update. + continue 'status; + } + } + } + } + + /// Poll the attestation status and update the watch. + pub(crate) async fn run_attestation_client(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { + if self.genesis().attesters.is_none() { + tracing::info!("no attesters in genesis, not polling the attestation status"); + return Ok(()); + }; + + const POLL_INTERVAL: Duration = Duration::seconds(5); + loop { - let quorum_opt = { - let votes = sync::changed(ctx, &mut sub).await?; - votes.find_quorum(attesters, &genesis) - }; - - if let Some(qc) = quorum_opt { - // In the future this should come from confirmations, but for now it's best effort, so we can forget ASAP. - // TODO: An initial value could be looked up in the database even now. - let next_batch_number = qc.message.number.next(); - - self.batch_store - .persist_batch_qc(ctx, qc) - .await - .wrap("persist_batch_qc")?; - - self.batch_votes - .set_min_batch_number(next_batch_number) - .await; + match self + .attestation_status_client + .next_batch_to_attest(ctx) + .await + { + Ok(Some(batch_number)) => { + self.attestation_status.update(batch_number).await; + } + Ok(None) => tracing::debug!("waiting for attestation status..."), + Err(error) => tracing::error!( + ?error, + "failed to poll attestation status, retrying later..." + ), } + ctx.sleep(POLL_INTERVAL).await?; } } } diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 1409b0873..6bc868f18 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -1,6 +1,6 @@ //! Network actor maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; -use gossip::BatchVotesPublisher; +use gossip::{AttestationStatusClient, AttestationStatusReceiver, BatchVotesPublisher}; use std::sync::Arc; use tracing::Instrument as _; use zksync_concurrency::{ @@ -57,8 +57,15 @@ impl Network { block_store: Arc, batch_store: Arc, pipe: ActorPipe, + attestation_status_client: Box, ) -> (Arc, Runner) { - let gossip = gossip::Network::new(cfg, block_store, batch_store, pipe.send); + let gossip = gossip::Network::new( + cfg, + block_store, + batch_store, + pipe.send, + attestation_status_client, + ); let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( @@ -80,6 +87,11 @@ impl Network { BatchVotesPublisher(self.gossip.batch_votes.clone()) } + /// Subscribe to attestation status change notifications. + pub fn attestation_status_receiver(&self) -> AttestationStatusReceiver { + self.gossip.attestation_status.subscribe() + } + /// Handles a dispatcher message. async fn handle_message( &self, @@ -134,6 +146,9 @@ impl Runner { // Update QC batches in the background. s.spawn(self.net.gossip.run_batch_qc_finder(ctx)); + // Update attestation status in the background. + s.spawn(self.net.gossip.run_attestation_client(ctx)); + // Fetch missing batches in the background. s.spawn(async { self.net.gossip.run_batch_fetcher(ctx).await; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index dfc28dd5f..382a24f02 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -1,6 +1,7 @@ //! Testonly utilities. #![allow(dead_code)] use crate::{ + gossip::LocalAttestationStatus, io::{ConsensusInputMessage, Target}, Config, GossipConfig, Network, RpcConfig, Runner, }; @@ -181,8 +182,17 @@ impl Instance { block_store: Arc, batch_store: Arc, ) -> (Self, InstanceRunner) { + // For now let everyone cast their votes on batches based on their own database. + let attestation_status_client = Box::new(LocalAttestationStatus::new(batch_store.clone())); + let (actor_pipe, dispatcher_pipe) = pipe::new(); - let (net, runner) = Network::new(cfg, block_store, batch_store, actor_pipe); + let (net, runner) = Network::new( + cfg, + block_store, + batch_store, + actor_pipe, + attestation_status_client, + ); let (terminate_send, terminate_recv) = channel::bounded(1); ( Self { diff --git a/node/libs/storage/src/batch_store/metrics.rs b/node/libs/storage/src/batch_store/metrics.rs index cabe7e34a..deddc3514 100644 --- a/node/libs/storage/src/batch_store/metrics.rs +++ b/node/libs/storage/src/batch_store/metrics.rs @@ -7,9 +7,9 @@ pub(super) struct PersistentBatchStore { /// Latency of a successful `get_batch()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) batch_latency: vise::Histogram, - /// Latency of a successful `earliest_batch_number_to_sign()` call. + /// Latency of a successful `next_batch_to_attest_latency()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] - pub(super) earliest_batch_latency: vise::Histogram, + pub(super) next_batch_to_attest_latency: vise::Histogram, /// Latency of a successful `get_batch_to_sign()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) batch_to_sign_latency: vise::Histogram, diff --git a/node/libs/storage/src/batch_store/mod.rs b/node/libs/storage/src/batch_store/mod.rs index 3b6b6fe98..889014f10 100644 --- a/node/libs/storage/src/batch_store/mod.rs +++ b/node/libs/storage/src/batch_store/mod.rs @@ -53,16 +53,12 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { /// Range of batches persisted in storage. fn persisted(&self) -> sync::watch::Receiver; - /// Get the earliest of L1 batches which are missing the corresponding L1 batch quorum certificates - /// and potentially need to be signed by attesters. + /// Get the next L1 batch for which attesters are expected to produce a quorum certificate. /// - /// A replica might never have a complete history of L1 batch QCs; once the L1 batch is included on L1, - /// the replicas might use the [attester::SyncBatch] route to obtain them, in which case they will not - /// have a QC and no reason to get them either. The store will have sufficient information to decide - /// where it's still necessary to gossip votes; for example the main node will want to have a QC on - /// every batch while it's the one submitting them to L1, while replicas can ask the L1 what is considered - /// final. - async fn earliest_batch_number_to_sign( + /// An external node might never have a complete history of L1 batch QCs. Once the L1 batch is included on L1, + /// the external nodes might use the [attester::SyncBatch] route to obtain them, in which case they will not + /// have a QC and no reason to get them either. The main node, however, will want to have a QC for all batches. + async fn next_batch_to_attest( &self, ctx: &ctx::Ctx, ) -> ctx::Result>; @@ -279,28 +275,20 @@ impl BatchStore { Ok(batch) } - /// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed. - /// - /// There might be unsigned batches before this one in the database, however we don't consider it - /// necessary to sign them any more, because for example they have already been submitted to L1. - /// - /// There might also be signed batches *after* this one, due to the way gossiping works, but we - /// might still have to fill the gaps by (re)submitting our signature to allow them to be submitted. - /// - /// Returns `None` if all existing batches are signed, or there are not batches yet to be signed at all. - pub async fn earliest_batch_number_to_sign( + /// Retrieve the next batch number that doesn't have a QC yet and will need to be signed. + pub async fn next_batch_to_attest( &self, ctx: &ctx::Ctx, ) -> ctx::Result> { let t = metrics::PERSISTENT_BATCH_STORE - .earliest_batch_latency + .next_batch_to_attest_latency .start(); let batch = self .persistent - .earliest_batch_number_to_sign(ctx) + .next_batch_to_attest(ctx) .await - .wrap("persistent.get_batch_to_sign()")?; + .wrap("persistent.next_batch_to_attest()")?; t.observe(); Ok(batch) diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 084da6606..79a212078 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -139,7 +139,7 @@ impl PersistentBatchStore for BatchStore { Ok(certs.get(last_batch_number).cloned()) } - async fn earliest_batch_number_to_sign( + async fn next_batch_to_attest( &self, _ctx: &ctx::Ctx, ) -> ctx::Result> { diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 221baf049..fd56a4210 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -13,7 +13,7 @@ use zksync_concurrency::{ctx, net}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; -use zksync_consensus_network::http; +use zksync_consensus_network::{gossip::LocalAttestationStatus, http}; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner}; use zksync_consensus_utils::debug_page; @@ -262,6 +262,11 @@ impl Configs { ) -> ctx::Result<(executor::Executor, TestMemoryStorageRunner)> { let replica_store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?; let store = TestMemoryStorage::new(ctx, &self.app.genesis).await; + + // We don't have an implementation of an API in these servers, we can only use the stores. + let attestation_status_client = + Box::new(LocalAttestationStatus::new(store.batches.clone())); + let e = executor::Executor { config: executor::Config { server_addr: self.app.server_addr, @@ -302,6 +307,7 @@ impl Configs { .attester_key .as_ref() .map(|key| executor::Attester { key: key.clone() }), + attestation_status_client, }; Ok((e, store.runner)) }