diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 154a6e8f..6ef7bbb8 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -6,7 +6,7 @@ use anyhow::Context; use zksync_concurrency::ctx; use zksync_concurrency::time; use zksync_consensus_network::gossip::BatchVotesPublisher; -use zksync_consensus_roles::attester::BatchNumber; +use zksync_consensus_roles::attester; use zksync_consensus_storage::{BatchStore, BlockStore}; use crate::Attester; @@ -39,51 +39,73 @@ impl AttesterRunner { /// Poll the database for new L1 batches and publish our signature over the batch. pub(super) async fn run(self, ctx: &ctx::Ctx) -> ctx::Result<()> { let public_key = self.attester.key.public(); - // The first batch number we want to publish our vote for. We don't have to re-publish a vote - // because once it enters the vote register even future peers can pull it from there. - let mut min_batch_number = BatchNumber(0); + // TODO: In the future when we have attester rotation these checks will have to be checked inside the loop. + let Some(attesters) = self.block_store.genesis().attesters.as_ref() else { + tracing::warn!("Attester key is set, but the attester committee is empty."); + return Ok(()); + }; + if !attesters.contains(&public_key) { + tracing::warn!("Attester key is set, but not part of the attester committee."); + return Ok(()); + } + + // Find the initial range of batches that we want to (re)sign after a (re)start. + let last_batch_number = self + .batch_store + .last_batch_number(ctx) + .await + .context("last_batch_number")? + .unwrap_or_default(); + + // Determine the batch to start signing from. + let earliest_batch_number = self + .batch_store + .earliest_batch_number_to_sign(ctx) + .await + .context("earliest_batch_number_to_sign")? + .unwrap_or(last_batch_number); + + tracing::info!(%earliest_batch_number, %last_batch_number, "attesting batches"); + + let mut batch_number = earliest_batch_number; + loop { - // Pretend that the attesters can evolve. - let Some(attesters) = self.block_store.genesis().attesters.as_ref() else { - continue; - }; - if !attesters.contains(&public_key) { - continue; - } + // Try to get the next batch to sign; the commitment might not be available just yet. + let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?; + + // The certificates might be collected out of order because of how gossip works; + // we could query the DB to see if we already have a QC, or we can just go ahead + // and publish our vote, and let others ignore it. - let mut unsigned_batch_numbers = self + tracing::info!(%batch_number, "publishing attestation"); + + // We only have to publish a vote once; future peers can pull it from the register. + self.publisher + .publish(attesters, &self.attester.key, batch) + .await + .context("publish")?; + + batch_number = batch_number.next(); + } + } + + /// Wait for the batch commitment to become available. + async fn wait_for_batch_to_sign( + &self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result { + loop { + if let Some(batch) = self .batch_store - .unsigned_batch_numbers(ctx) + .batch_to_sign(ctx, number) .await - .context("unsigned_batch_numbers")?; - - // Just to be sure we go from smaller to higher batches. - unsigned_batch_numbers.sort(); - - for bn in unsigned_batch_numbers { - // If we have already voted on this we can move on, no need to fetch the payload again. - // Batches appear in the store in order, even if we might have QC for a newer and not for an older batch, - // so once published our vote for a certain height, we can expect that we only have to vote on newer batches. - if bn < min_batch_number { - continue; - } - - if let Some(batch) = self - .batch_store - .batch_to_sign(ctx, bn) - .await - .context("batch_to_sign")? - { - min_batch_number = batch.number.next(); - - self.publisher - .publish(attesters, &self.attester.key, batch) - .await - .context("publish")?; - } + .context("batch_to_sign")? + { + return Ok(batch); + } else { + ctx.sleep(POLL_INTERVAL).await?; } - - ctx.sleep(POLL_INTERVAL).await?; } } } diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 55cb90e7..2044fe2a 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -49,6 +49,9 @@ pub struct Config { pub public_addr: net::Host, /// Maximal size of the block payload. pub max_payload_size: usize, + /// Maximal size of a batch, which includes `max_payload_size` per block in the batch, + /// plus the size of the Merkle proof of the commitment being included on L1 (should be ~1kB). + pub max_batch_size: usize, /// Key of this node. It uniquely identifies the node. /// It should match the secret key provided in the `node_key` file. pub node_key: node::SecretKey, @@ -107,6 +110,7 @@ impl Executor { attester_key: self.attester.as_ref().map(|a| a.key.clone()), ping_timeout: Some(time::Duration::seconds(10)), max_block_size: self.config.max_payload_size.saturating_add(kB), + max_batch_size: self.config.max_batch_size.saturating_add(kB), max_block_queue_size: 20, tcp_accept_rate: limiter::Rate { burst: 10, diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 2781e613..1c8c9f41 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -16,6 +16,7 @@ fn config(cfg: &network::Config) -> Config { server_addr: *cfg.server_addr, public_addr: cfg.public_addr.clone(), max_payload_size: usize::MAX, + max_batch_size: usize::MAX, node_key: cfg.gossip.key.clone(), gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, gossip_static_inbound: cfg.gossip.static_inbound.clone(), diff --git a/node/actors/network/src/config.rs b/node/actors/network/src/config.rs index bd742cd2..2d47cf80 100644 --- a/node/actors/network/src/config.rs +++ b/node/actors/network/src/config.rs @@ -103,6 +103,8 @@ pub struct Config { pub attester_key: Option, /// Maximal size of the proto-encoded `validator::FinalBlock` in bytes. pub max_block_size: usize, + /// Maximal size of the proto-encoded `attester::SyncBatch` in bytes. + pub max_batch_size: usize, /// If a peer doesn't respond to a ping message within `ping_timeout`, /// the connection is dropped. /// `None` disables sending ping messages (useful for tests). diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 60765c46..a4d3ae61 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -79,17 +79,19 @@ impl<'a> PushBlockStoreStateServer<'a> { /// Represents what we know about the state of available batches on the remote peer. struct PushBatchStoreStateServer { state: sync::watch::Sender, + max_batch_size: usize, } impl PushBatchStoreStateServer { /// Start out not knowing anything about the remote peer. - fn new() -> Self { + fn new(max_batch_size: usize) -> Self { Self { state: sync::watch::channel(BatchStoreState { first: BatchNumber(0), last: None, }) .0, + max_batch_size, } } } @@ -113,7 +115,7 @@ impl rpc::Handler for &PushBlockStoreStateServ #[async_trait] impl rpc::Handler for &PushBatchStoreStateServer { fn max_req_size(&self) -> usize { - 10 * kB + self.max_batch_size.saturating_add(kB) } async fn handle( &self, @@ -175,7 +177,7 @@ impl Network { ctx, self.cfg.rpc.push_batch_store_state_rate, ); - let push_batch_store_state_server = PushBatchStoreStateServer::new(); + let push_batch_store_state_server = PushBatchStoreStateServer::new(self.cfg.max_batch_size); scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_client(&push_validator_addrs_client) @@ -354,7 +356,7 @@ impl Network { self.cfg.rpc.get_batch_timeout.map(|t| ctx.with_timeout(t)); let ctx = ctx_with_timeout.as_ref().unwrap_or(ctx); let batch = call - .call(ctx, &req, self.cfg.max_block_size.saturating_add(kB)) + .call(ctx, &req, self.cfg.max_batch_size.saturating_add(kB)) .await? .0 .context("empty response")?; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 2a8a6497..dfc28dd5 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -108,6 +108,7 @@ where static_outbound: HashMap::default(), }, max_block_size: usize::MAX, + max_batch_size: usize::MAX, tcp_accept_rate: limiter::Rate::INF, rpc: RpcConfig::default(), max_block_queue_size: 10, @@ -146,6 +147,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(), }, max_block_size: usize::MAX, + max_batch_size: usize::MAX, tcp_accept_rate: limiter::Rate::INF, rpc: RpcConfig::default(), max_block_queue_size: 10, diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index 128182eb..e6a0bbfb 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -56,7 +56,7 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { /// Returns `None` if no batches have been created yet. async fn last_batch(&self, ctx: &ctx::Ctx) -> ctx::Result>; - /// Get the numbers of L1 batches which are missing the corresponding L1 batch quorum certificates + /// Get the earliest of L1 batches which are missing the corresponding L1 batch quorum certificates /// and potentially need to be signed by attesters. /// /// A replica might never have a complete history of L1 batch QCs; once the L1 batch is included on L1, @@ -65,10 +65,10 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { /// where it's still necessary to gossip votes; for example the main node will want to have a QC on /// every batch while it's the one submitting them to L1, while replicas can ask the L1 what is considered /// final. - async fn unsigned_batch_numbers( + async fn earliest_batch_number_to_sign( &self, ctx: &ctx::Ctx, - ) -> ctx::Result>; + ) -> ctx::Result>; /// Get the L1 batch QC from storage with the highest number. /// @@ -115,8 +115,25 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { #[derive(Debug)] struct Inner { /// Batches that are queued to be persisted. + /// + /// This reflects the state of the `cache`. Its source is mainly the gossip layer (the RPC protocols started in `Network::run_stream`): + /// * the node pushes `SyncBatch` records which appear in `queued` to its gossip peers + /// * the node pulls `SyncBatch` records that it needs from gossip peers that reported to have them, and adds them to `queued` + /// * the `BatchStoreRunner` looks for new items in `queued` and pushes them into the `PersistentBatchStore` + /// + /// XXX: There doesn't seem to be anything that currently actively pushes into `queued` from outside gossip, + /// like it happens with the `BlockStore::queue_block` being called from BFT. queued: BatchStoreState, /// Batches that are already persisted. + /// + /// This reflects the state of the database. Its source is mainly the `PersistentBatchStore`: + /// * the `BatchStoreRunner` subscribes to `PersistedBatchStore::persisted()` and copies its contents to here; + /// it also uses the opportunity to clear items from the `cache` but notably doesn't update `queued` which + /// which would cause the data to be gossiped + /// + /// Be careful that the `BatchStoreState` works with `SyncBatch` which requires a `proof` of inclusion on L1, + /// so this persistence is much delayed compared to the latest batch physically available in the database: + /// the batch also has to be signed by attesters, submitted to L1, and finalised there to appear here. persisted: BatchStoreState, cache: VecDeque, } @@ -258,23 +275,67 @@ impl BatchStore { Ok(batch) } - /// Retrieve the number of all batches that don't have a QC yet and potentially need to be signed. + /// Retrieve the maximum persisted batch number. + pub async fn last_batch_number( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + { + // let inner = self.inner.borrow(); + + // For now we ignore `queued` here because it's not clear how it's updated, + // validation is missing and it seems to depend entirely on gossip. Don't + // want it to somehow get us stuck and prevent voting. At least the persisted + // cache is maintained by two background processes copying the data from the DB. + + // if let Some(ref batch) = inner.queued.last { + // return Ok(Some(batch.number)); + // } + + // We also have to ignore `persisted` because `last` is an instance of `SyncBatch` + // which is conceptually only available once we have a proof that it's been included + // on L1, which requires a signature in the first place. + + // if let Some(ref batch) = inner.persisted.last { + // return Ok(Some(batch.number)); + // } + } + + // Get the last L1 batch that exists in the DB regardless of its status. + let batch = self + .persistent + .last_batch(ctx) + .await + .context("persistent.last_batch()")?; + + Ok(batch) + } + + /// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed. + /// + /// There might be unsigned batches before this one in the database, however we don't consider it + /// necessary to sign them any more, because for example they have already been submitted to L1. /// - /// It returns only the numbers which follow the last finalized batch, that is, there might be batches - /// before the earliest in these numbers that isn't signed, but it would be futile to sign them any more. - pub async fn unsigned_batch_numbers( + /// There might also be signed batches *after* this one, due to the way gossiping works, but we + /// might still have to fill the gaps by (re)submitting our signature to allow them to be submitted. + /// + /// Returns `None` if all existing batches are signed, or there are not batches yet to be signed at all. + pub async fn earliest_batch_number_to_sign( &self, ctx: &ctx::Ctx, - ) -> ctx::Result> { + ) -> ctx::Result> { let unsigned = self .persistent - .unsigned_batch_numbers(ctx) + .earliest_batch_number_to_sign(ctx) .await .context("persistent.get_batch_to_sign()")?; Ok(unsigned) } /// Retrieve a batch to be signed. + /// + /// This might be `None` even if the L1 batch already exists, because the commitment + /// in it is populated asynchronously. pub async fn batch_to_sign( &self, ctx: &ctx::Ctx, @@ -300,6 +361,9 @@ impl BatchStore { batch: attester::SyncBatch, _genesis: validator::Genesis, ) -> ctx::Result<()> { + // XXX: Once we can validate `SyncBatch::proof` we should do it before adding the + // batch to the cache, otherwise a malicious peer could serve us data that prevents + // other inputs from entering the queue. It will also cause it to be gossiped at the moment. sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| { inner.queued.next() >= batch.number }) diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 61c9acd4..eb57170d 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -141,18 +141,17 @@ impl PersistentBatchStore for BatchStore { Ok(certs.get(last_batch_number).cloned()) } - async fn unsigned_batch_numbers( + async fn earliest_batch_number_to_sign( &self, _ctx: &ctx::Ctx, - ) -> ctx::Result> { + ) -> ctx::Result> { let batches = self.0.batches.lock().unwrap(); let certs = self.0.certs.lock().unwrap(); Ok(batches .iter() .map(|b| b.number) - .filter(|n| !certs.contains_key(n)) - .collect()) + .find(|n| !certs.contains_key(n))) } async fn get_batch_to_sign( diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index a9602812..2b5333b8 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -47,6 +47,7 @@ fn generate_consensus_nodes(nodes: usize, seed_nodes_amount: Option) -> V metrics_server_addr: None, genesis: setup.genesis.clone(), max_payload_size: 1000000, + max_batch_size: 100000000, validator_key: Some(validator_keys[i].clone()), attester_key: Some(attester_keys[i].clone()), node_key: node_keys[i].clone(), diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index 1d0f2057..ad5b19b9 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -76,6 +76,7 @@ fn main() -> anyhow::Result<()> { .map(|port| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port)), genesis: setup.genesis.clone(), max_payload_size: 1000000, + max_batch_size: 100000000, node_key: node_keys[i].clone(), validator_key: validator_keys.get(i).cloned(), attester_key: attester_keys.get(i).cloned(), diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 3ed82d7c..11081d9d 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -17,7 +17,7 @@ use zksync_consensus_network::http; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner}; use zksync_consensus_utils::debug_page; -use zksync_protobuf::{read_required, required, ProtoFmt}; +use zksync_protobuf::{kB, read_required, required, ProtoFmt}; fn read_required_secret_text(text: &Option) -> anyhow::Result { Text::new( @@ -96,6 +96,7 @@ pub struct AppConfig { pub genesis: validator::Genesis, pub max_payload_size: usize, + pub max_batch_size: usize, pub validator_key: Option, pub attester_key: Option, @@ -126,6 +127,16 @@ impl ProtoFmt for AppConfig { ProtoFmt::read(e).with_context(|| format!("gossip_static_outbound[{i}]"))?; gossip_static_outbound.insert(node_addr.key, node_addr.addr); } + + let max_payload_size = required(&r.max_payload_size) + .and_then(|x| Ok((*x).try_into()?)) + .context("max_payload_size")?; + + let max_batch_size = match &r.max_batch_size { + Some(x) => (*x).try_into().context("max_payload_size")?, + None => max_payload_size * 100 + kB, // Merkle proof is ~1kB and we have a batch per minute. + }; + Ok(Self { server_addr: read_required_text(&r.server_addr).context("server_addr")?, public_addr: net::Host(required(&r.public_addr).context("public_addr")?.clone()), @@ -134,9 +145,8 @@ impl ProtoFmt for AppConfig { .context("metrics_server_addr")?, genesis: read_required(&r.genesis).context("genesis")?, - max_payload_size: required(&r.max_payload_size) - .and_then(|x| Ok((*x).try_into()?)) - .context("max_payload_size")?, + max_payload_size, + max_batch_size, // TODO: read secret. validator_key: read_optional_secret_text(&r.validator_secret_key) .context("validator_secret_key")?, @@ -180,6 +190,7 @@ impl ProtoFmt for AppConfig { genesis: Some(self.genesis.build()), max_payload_size: Some(self.max_payload_size.try_into().unwrap()), + max_batch_size: Some(self.max_batch_size.try_into().unwrap()), validator_secret_key: self.validator_key.as_ref().map(TextFmt::encode), attester_secret_key: self.attester_key.as_ref().map(TextFmt::encode), @@ -257,6 +268,7 @@ impl Configs { gossip_static_inbound: self.app.gossip_static_inbound.clone(), gossip_static_outbound: self.app.gossip_static_outbound.clone(), max_payload_size: self.app.max_payload_size, + max_batch_size: self.app.max_batch_size, rpc: executor::RpcConfig::default(), debug_page: self.app.debug_page.as_ref().map(|debug_page_config| { http::DebugPageConfig { diff --git a/node/tools/src/proto/mod.proto b/node/tools/src/proto/mod.proto index 0fdc8461..4fd799c5 100644 --- a/node/tools/src/proto/mod.proto +++ b/node/tools/src/proto/mod.proto @@ -80,6 +80,9 @@ message AppConfig { // Maximal size of the block payload. optional uint64 max_payload_size = 5; // required; bytes + // Maximal size of the sync batch payload. + optional uint64 max_batch_size = 17; // optional; bytes + // Validator secret key. optional string validator_secret_key = 10; // optional; ValidatorSecretKey diff --git a/node/tools/src/tests.rs b/node/tools/src/tests.rs index c4e0389e..fb323e6f 100644 --- a/node/tools/src/tests.rs +++ b/node/tools/src/tests.rs @@ -17,6 +17,7 @@ impl Distribution for EncodeDist { genesis: rng.gen(), max_payload_size: rng.gen(), + max_batch_size: rng.gen(), validator_key: self.sample_opt(|| rng.gen()), attester_key: self.sample_opt(|| rng.gen()),