From 627f28c8ded97581626977f088f0ff8e4d496ac9 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 8 Jul 2024 20:01:29 +0100 Subject: [PATCH 1/8] BFT-474: Refactor attestation to use earliest batch number to sign --- node/actors/executor/src/attestation.rs | 82 +++++++++++++-------- node/libs/storage/src/batch_store.rs | 62 +++++++++++++--- node/libs/storage/src/testonly/in_memory.rs | 7 +- 3 files changed, 107 insertions(+), 44 deletions(-) diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 154a6e8f..e4413fbe 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -6,7 +6,6 @@ use anyhow::Context; use zksync_concurrency::ctx; use zksync_concurrency::time; use zksync_consensus_network::gossip::BatchVotesPublisher; -use zksync_consensus_roles::attester::BatchNumber; use zksync_consensus_storage::{BatchStore, BlockStore}; use crate::Attester; @@ -39,51 +38,72 @@ impl AttesterRunner { /// Poll the database for new L1 batches and publish our signature over the batch. pub(super) async fn run(self, ctx: &ctx::Ctx) -> ctx::Result<()> { let public_key = self.attester.key.public(); - // The first batch number we want to publish our vote for. We don't have to re-publish a vote - // because once it enters the vote register even future peers can pull it from there. - let mut min_batch_number = BatchNumber(0); - loop { - // Pretend that the attesters can evolve. - let Some(attesters) = self.block_store.genesis().attesters.as_ref() else { - continue; - }; - if !attesters.contains(&public_key) { - continue; - } - - let mut unsigned_batch_numbers = self - .batch_store - .unsigned_batch_numbers(ctx) - .await - .context("unsigned_batch_numbers")?; + // TODO: In the future when we have attester rotation these checks will have to be checked inside the loop. + let Some(attesters) = self.block_store.genesis().attesters.as_ref() else { + tracing::warn!("Attester key is set, but the attester committee is empty."); + return Ok(()); + }; + if !attesters.contains(&public_key) { + tracing::warn!("Attester key is set, but not part of the attester committee."); + return Ok(()); + } - // Just to be sure we go from smaller to higher batches. - unsigned_batch_numbers.sort(); + // Find the initial range of batches that we want to (re)sign after a (re)start. + let mut latest_batch_number = self + .batch_store + .latest_batch_number(ctx) + .await + .context("latest_batch_number")? + .unwrap_or_default(); - for bn in unsigned_batch_numbers { - // If we have already voted on this we can move on, no need to fetch the payload again. - // Batches appear in the store in order, even if we might have QC for a newer and not for an older batch, - // so once published our vote for a certain height, we can expect that we only have to vote on newer batches. - if bn < min_batch_number { - continue; - } + let mut earliest_batch_number = self + .batch_store + .earliest_batch_number_to_sign(ctx) + .await + .context("earliest_batch_number_to_sign")? + .unwrap_or(latest_batch_number); - if let Some(batch) = self + loop { + while earliest_batch_number <= latest_batch_number { + // Try to get the next batch to sign; the commitment might not be available just yet. + let Some(batch) = self .batch_store - .batch_to_sign(ctx, bn) + .batch_to_sign(ctx, earliest_batch_number) .await .context("batch_to_sign")? - { - min_batch_number = batch.number.next(); + else { + break; + }; + // The certificates might be collected out of order because of how gossip works, + // in which case we can skip signing it. + let has_batch_qc = self + .batch_store + .has_batch_qc(ctx, earliest_batch_number) + .await + .context("has_batch_qc")?; + + if !has_batch_qc { + // We only have to publish a vote once; future peers can pull it from the register. self.publisher .publish(attesters, &self.attester.key, batch) .await .context("publish")?; } + + earliest_batch_number = earliest_batch_number.next(); } + // Wait some time before we poll the database again to see if there is a new batch to sign. ctx.sleep(POLL_INTERVAL).await?; + + // Refresh the upper end of the range. + latest_batch_number = self + .batch_store + .latest_batch_number(ctx) + .await + .context("latest_batch_number")? + .unwrap_or(latest_batch_number) } } } diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index 128182eb..9bb15b82 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -56,7 +56,7 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { /// Returns `None` if no batches have been created yet. async fn last_batch(&self, ctx: &ctx::Ctx) -> ctx::Result>; - /// Get the numbers of L1 batches which are missing the corresponding L1 batch quorum certificates + /// Get the earliest of L1 batches which are missing the corresponding L1 batch quorum certificates /// and potentially need to be signed by attesters. /// /// A replica might never have a complete history of L1 batch QCs; once the L1 batch is included on L1, @@ -65,10 +65,10 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { /// where it's still necessary to gossip votes; for example the main node will want to have a QC on /// every batch while it's the one submitting them to L1, while replicas can ask the L1 what is considered /// final. - async fn unsigned_batch_numbers( + async fn earliest_batch_number_to_sign( &self, ctx: &ctx::Ctx, - ) -> ctx::Result>; + ) -> ctx::Result>; /// Get the L1 batch QC from storage with the highest number. /// @@ -258,23 +258,55 @@ impl BatchStore { Ok(batch) } - /// Retrieve the number of all batches that don't have a QC yet and potentially need to be signed. + /// Retrieve the maximum batch number that we know about. + pub async fn latest_batch_number( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + { + let inner = self.inner.borrow(); + if let Some(ref batch) = inner.queued.last { + return Ok(Some(batch.number)); + } + if let Some(ref batch) = inner.persisted.last { + return Ok(Some(batch.number)); + } + } + + let batch = self + .persistent + .last_batch(ctx) + .await + .context("persistent.last_batch()")?; + + Ok(batch) + } + + /// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed. /// - /// It returns only the numbers which follow the last finalized batch, that is, there might be batches - /// before the earliest in these numbers that isn't signed, but it would be futile to sign them any more. - pub async fn unsigned_batch_numbers( + /// There might be unsigned batches before this one in the database, however we don't consider it + /// necessary to sign them any more, because for example they have already been submitted to L1. + /// + /// There might also be signed batches *after* this one, due to the way gossiping works, but we + /// might still have to fill the gaps by (re)submitting our signature to allow them to be submitted. + /// + /// Returns `None` if all existing batches are signed, or there are not batches yet to be signed at all. + pub async fn earliest_batch_number_to_sign( &self, ctx: &ctx::Ctx, - ) -> ctx::Result> { + ) -> ctx::Result> { let unsigned = self .persistent - .unsigned_batch_numbers(ctx) + .earliest_batch_number_to_sign(ctx) .await .context("persistent.get_batch_to_sign()")?; Ok(unsigned) } /// Retrieve a batch to be signed. + /// + /// This might be `None` even if the L1 batch already exists, because the commitment + /// in it is populated asynchronously. pub async fn batch_to_sign( &self, ctx: &ctx::Ctx, @@ -324,6 +356,18 @@ impl BatchStore { Ok(()) } + /// Check if a given batch number has a quorum certificate stored for it. + pub async fn has_batch_qc( + &self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result { + self.persistent + .get_batch_qc(ctx, number) + .await + .map(|qc| qc.is_some()) + } + /// Waits until the given batch is queued (in memory, or persisted). /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. pub async fn wait_until_queued( diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 61c9acd4..eb57170d 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -141,18 +141,17 @@ impl PersistentBatchStore for BatchStore { Ok(certs.get(last_batch_number).cloned()) } - async fn unsigned_batch_numbers( + async fn earliest_batch_number_to_sign( &self, _ctx: &ctx::Ctx, - ) -> ctx::Result> { + ) -> ctx::Result> { let batches = self.0.batches.lock().unwrap(); let certs = self.0.certs.lock().unwrap(); Ok(batches .iter() .map(|b| b.number) - .filter(|n| !certs.contains_key(n)) - .collect()) + .find(|n| !certs.contains_key(n))) } async fn get_batch_to_sign( From 44615c29272f809f9a92b9382afa413f541dee1e Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 8 Jul 2024 20:13:08 +0100 Subject: [PATCH 2/8] BFT-474: Remove the has_batch_qc check --- node/actors/executor/src/attestation.rs | 22 ++++++++-------------- node/libs/storage/src/batch_store.rs | 12 ------------ 2 files changed, 8 insertions(+), 26 deletions(-) diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index e4413fbe..ce723304 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -75,21 +75,15 @@ impl AttesterRunner { break; }; - // The certificates might be collected out of order because of how gossip works, - // in which case we can skip signing it. - let has_batch_qc = self - .batch_store - .has_batch_qc(ctx, earliest_batch_number) - .await - .context("has_batch_qc")?; + // The certificates might be collected out of order because of how gossip works; + // we could query the DB to see if we already have a QC, or we can just go ahead + // and publish our vote, and let others ignore it. - if !has_batch_qc { - // We only have to publish a vote once; future peers can pull it from the register. - self.publisher - .publish(attesters, &self.attester.key, batch) - .await - .context("publish")?; - } + // We only have to publish a vote once; future peers can pull it from the register. + self.publisher + .publish(attesters, &self.attester.key, batch) + .await + .context("publish")?; earliest_batch_number = earliest_batch_number.next(); } diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index 9bb15b82..a03f361e 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -356,18 +356,6 @@ impl BatchStore { Ok(()) } - /// Check if a given batch number has a quorum certificate stored for it. - pub async fn has_batch_qc( - &self, - ctx: &ctx::Ctx, - number: attester::BatchNumber, - ) -> ctx::Result { - self.persistent - .get_batch_qc(ctx, number) - .await - .map(|qc| qc.is_some()) - } - /// Waits until the given batch is queued (in memory, or persisted). /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. pub async fn wait_until_queued( From 0466e9992b234de5be724fefcd7d35cd640d9014 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 9 Jul 2024 10:21:43 +0100 Subject: [PATCH 3/8] BFT-474: Comments about incomplete things around batches. Ignore queued. --- node/actors/network/src/gossip/runner.rs | 5 +++++ node/libs/storage/src/batch_store.rs | 15 +++++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 60765c46..7cd7a351 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -113,6 +113,9 @@ impl rpc::Handler for &PushBlockStoreStateServ #[async_trait] impl rpc::Handler for &PushBatchStoreStateServer { fn max_req_size(&self) -> usize { + // XXX: The request will actually contain a `SyncBatch` which has all the blocks in the batch, + // so a constant 10kB cannot be the right limit. There is a `max_block_size` in config which + // should come into play, with some other limit on the batch size. 10 * kB } async fn handle( @@ -353,6 +356,8 @@ impl Network { let ctx_with_timeout = self.cfg.rpc.get_batch_timeout.map(|t| ctx.with_timeout(t)); let ctx = ctx_with_timeout.as_ref().unwrap_or(ctx); + // XXX: `max_block_size` isn't the right limit here as the response + // will contain all blocks of a batch. let batch = call .call(ctx, &req, self.cfg.max_block_size.saturating_add(kB)) .await? diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index a03f361e..dfb185ee 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -258,16 +258,20 @@ impl BatchStore { Ok(batch) } - /// Retrieve the maximum batch number that we know about. + /// Retrieve the maximum persisted batch number. pub async fn latest_batch_number( &self, ctx: &ctx::Ctx, ) -> ctx::Result> { { let inner = self.inner.borrow(); - if let Some(ref batch) = inner.queued.last { - return Ok(Some(batch.number)); - } + // For now we ignore the cache here because it's not clear how it's updated, + // validation is missing and it seems to depend entirely on gossip. Don't + // want it to somehow get us stuck and prevent voting. At least the persisted + // cache is maintained by two background processes copying the data from the DB. + // if let Some(ref batch) = inner.queued.last { + // return Ok(Some(batch.number)); + // } if let Some(ref batch) = inner.persisted.last { return Ok(Some(batch.number)); } @@ -332,6 +336,9 @@ impl BatchStore { batch: attester::SyncBatch, _genesis: validator::Genesis, ) -> ctx::Result<()> { + // XXX: Once we can validate `SyncBatch::proof` we should do it before adding the + // batch to the cache, otherwise a malicious peer could serve us data that prevents + // other inputs from entering the queue. It will also cause it to be gossiped at the moment. sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| { inner.queued.next() >= batch.number }) From 1e4720b1ac090cc8ce70b144c35da415f749c9e9 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 9 Jul 2024 12:18:28 +0100 Subject: [PATCH 4/8] BFT-474: Ignore the persisted state for signing purposes --- node/libs/storage/src/batch_store.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index dfb185ee..906a19db 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -259,24 +259,32 @@ impl BatchStore { } /// Retrieve the maximum persisted batch number. - pub async fn latest_batch_number( + pub async fn last_batch_number( &self, ctx: &ctx::Ctx, ) -> ctx::Result> { { - let inner = self.inner.borrow(); - // For now we ignore the cache here because it's not clear how it's updated, + // let inner = self.inner.borrow(); + + // For now we ignore `queued` here because it's not clear how it's updated, // validation is missing and it seems to depend entirely on gossip. Don't // want it to somehow get us stuck and prevent voting. At least the persisted // cache is maintained by two background processes copying the data from the DB. + // if let Some(ref batch) = inner.queued.last { // return Ok(Some(batch.number)); // } - if let Some(ref batch) = inner.persisted.last { - return Ok(Some(batch.number)); - } + + // We also have to ignore `persisted` because `last` is an instance of `SyncBatch` + // which is conceptually only available once we have a proof that it's been included + // on L1, which requires a signature in the first place. + + // if let Some(ref batch) = inner.persisted.last { + // return Ok(Some(batch.number)); + // } } + // Get the last L1 batch that exists in the DB regardless of its status. let batch = self .persistent .last_batch(ctx) From 0bae6387cb257a49fa65839dbae3de5d237d6638 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 9 Jul 2024 12:18:54 +0100 Subject: [PATCH 5/8] BFT-474: Don't poll the last number until we maanged to publish --- node/actors/executor/src/attestation.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index ce723304..3e2dfad9 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -49,11 +49,11 @@ impl AttesterRunner { } // Find the initial range of batches that we want to (re)sign after a (re)start. - let mut latest_batch_number = self + let mut last_batch_number = self .batch_store - .latest_batch_number(ctx) + .last_batch_number(ctx) .await - .context("latest_batch_number")? + .context("last_batch_number")? .unwrap_or_default(); let mut earliest_batch_number = self @@ -61,10 +61,10 @@ impl AttesterRunner { .earliest_batch_number_to_sign(ctx) .await .context("earliest_batch_number_to_sign")? - .unwrap_or(latest_batch_number); + .unwrap_or(last_batch_number); loop { - while earliest_batch_number <= latest_batch_number { + while earliest_batch_number <= last_batch_number { // Try to get the next batch to sign; the commitment might not be available just yet. let Some(batch) = self .batch_store @@ -92,12 +92,14 @@ impl AttesterRunner { ctx.sleep(POLL_INTERVAL).await?; // Refresh the upper end of the range. - latest_batch_number = self - .batch_store - .latest_batch_number(ctx) - .await - .context("latest_batch_number")? - .unwrap_or(latest_batch_number) + if earliest_batch_number > last_batch_number { + last_batch_number = self + .batch_store + .last_batch_number(ctx) + .await + .context("last_batch_number")? + .unwrap_or(last_batch_number); + } } } } From 49a9b94d2d8b76e7713bc6be3edf7fc3ad97ac84 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 9 Jul 2024 15:32:41 +0100 Subject: [PATCH 6/8] BFT-474: Add method to wait for the next payload to simplify --- node/actors/executor/src/attestation.rs | 78 +++++++++++++------------ 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 3e2dfad9..6ef7bbb8 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -6,6 +6,7 @@ use anyhow::Context; use zksync_concurrency::ctx; use zksync_concurrency::time; use zksync_consensus_network::gossip::BatchVotesPublisher; +use zksync_consensus_roles::attester; use zksync_consensus_storage::{BatchStore, BlockStore}; use crate::Attester; @@ -49,56 +50,61 @@ impl AttesterRunner { } // Find the initial range of batches that we want to (re)sign after a (re)start. - let mut last_batch_number = self + let last_batch_number = self .batch_store .last_batch_number(ctx) .await .context("last_batch_number")? .unwrap_or_default(); - let mut earliest_batch_number = self + // Determine the batch to start signing from. + let earliest_batch_number = self .batch_store .earliest_batch_number_to_sign(ctx) .await .context("earliest_batch_number_to_sign")? .unwrap_or(last_batch_number); + tracing::info!(%earliest_batch_number, %last_batch_number, "attesting batches"); + + let mut batch_number = earliest_batch_number; + loop { - while earliest_batch_number <= last_batch_number { - // Try to get the next batch to sign; the commitment might not be available just yet. - let Some(batch) = self - .batch_store - .batch_to_sign(ctx, earliest_batch_number) - .await - .context("batch_to_sign")? - else { - break; - }; - - // The certificates might be collected out of order because of how gossip works; - // we could query the DB to see if we already have a QC, or we can just go ahead - // and publish our vote, and let others ignore it. - - // We only have to publish a vote once; future peers can pull it from the register. - self.publisher - .publish(attesters, &self.attester.key, batch) - .await - .context("publish")?; - - earliest_batch_number = earliest_batch_number.next(); - } + // Try to get the next batch to sign; the commitment might not be available just yet. + let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?; + + // The certificates might be collected out of order because of how gossip works; + // we could query the DB to see if we already have a QC, or we can just go ahead + // and publish our vote, and let others ignore it. + + tracing::info!(%batch_number, "publishing attestation"); - // Wait some time before we poll the database again to see if there is a new batch to sign. - ctx.sleep(POLL_INTERVAL).await?; - - // Refresh the upper end of the range. - if earliest_batch_number > last_batch_number { - last_batch_number = self - .batch_store - .last_batch_number(ctx) - .await - .context("last_batch_number")? - .unwrap_or(last_batch_number); + // We only have to publish a vote once; future peers can pull it from the register. + self.publisher + .publish(attesters, &self.attester.key, batch) + .await + .context("publish")?; + + batch_number = batch_number.next(); + } + } + + /// Wait for the batch commitment to become available. + async fn wait_for_batch_to_sign( + &self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result { + loop { + if let Some(batch) = self + .batch_store + .batch_to_sign(ctx, number) + .await + .context("batch_to_sign")? + { + return Ok(batch); + } else { + ctx.sleep(POLL_INTERVAL).await?; } } } From ed5832d90c3c202dcb28ebcd87926eac51f961d2 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 9 Jul 2024 16:12:58 +0100 Subject: [PATCH 7/8] fix: Add max_batch_size to limit the SyncBatch request sizes (#152) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Adds a `max_batch_size` setting and uses it to set a `max_req_size` limit in the RPC that deals with `SyncBatch`. Fixes some of the comments added in https://github.com/matter-labs/era-consensus/pull/150 ### Alternative configuration The size of `SyncBatch` depends on how many blocks are in the batch, plus a fixed size proof. Since we already have a limit for the size of block payloads, ostensibly we could add a limit on the number of blocks in the batch; I used 100 in the protobuf reader if the field was missing. Perhaps this setting is already available somewhere in zksync-era? ## Why ❔ Tests failed in https://github.com/matter-labs/zksync-era/pull/2340 when I unintentionally enabled the gossiping of `SyncBatch` due to the large size of the requests. It turns out they were either hardcoded constants copied from the block store state gossip (which doesn't depend on the payload size, only the QC), or they wrongly used the block payload limit. --- node/actors/executor/src/lib.rs | 4 ++++ node/actors/executor/src/tests.rs | 1 + node/actors/network/src/config.rs | 2 ++ node/actors/network/src/gossip/runner.rs | 15 ++++++--------- node/actors/network/src/testonly.rs | 2 ++ node/tools/src/bin/deployer.rs | 1 + node/tools/src/bin/localnet_config.rs | 1 + node/tools/src/config.rs | 20 ++++++++++++++++---- node/tools/src/proto/mod.proto | 3 +++ node/tools/src/tests.rs | 1 + 10 files changed, 37 insertions(+), 13 deletions(-) diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 55cb90e7..2044fe2a 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -49,6 +49,9 @@ pub struct Config { pub public_addr: net::Host, /// Maximal size of the block payload. pub max_payload_size: usize, + /// Maximal size of a batch, which includes `max_payload_size` per block in the batch, + /// plus the size of the Merkle proof of the commitment being included on L1 (should be ~1kB). + pub max_batch_size: usize, /// Key of this node. It uniquely identifies the node. /// It should match the secret key provided in the `node_key` file. pub node_key: node::SecretKey, @@ -107,6 +110,7 @@ impl Executor { attester_key: self.attester.as_ref().map(|a| a.key.clone()), ping_timeout: Some(time::Duration::seconds(10)), max_block_size: self.config.max_payload_size.saturating_add(kB), + max_batch_size: self.config.max_batch_size.saturating_add(kB), max_block_queue_size: 20, tcp_accept_rate: limiter::Rate { burst: 10, diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 2781e613..1c8c9f41 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -16,6 +16,7 @@ fn config(cfg: &network::Config) -> Config { server_addr: *cfg.server_addr, public_addr: cfg.public_addr.clone(), max_payload_size: usize::MAX, + max_batch_size: usize::MAX, node_key: cfg.gossip.key.clone(), gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, gossip_static_inbound: cfg.gossip.static_inbound.clone(), diff --git a/node/actors/network/src/config.rs b/node/actors/network/src/config.rs index bd742cd2..2d47cf80 100644 --- a/node/actors/network/src/config.rs +++ b/node/actors/network/src/config.rs @@ -103,6 +103,8 @@ pub struct Config { pub attester_key: Option, /// Maximal size of the proto-encoded `validator::FinalBlock` in bytes. pub max_block_size: usize, + /// Maximal size of the proto-encoded `attester::SyncBatch` in bytes. + pub max_batch_size: usize, /// If a peer doesn't respond to a ping message within `ping_timeout`, /// the connection is dropped. /// `None` disables sending ping messages (useful for tests). diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 7cd7a351..a4d3ae61 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -79,17 +79,19 @@ impl<'a> PushBlockStoreStateServer<'a> { /// Represents what we know about the state of available batches on the remote peer. struct PushBatchStoreStateServer { state: sync::watch::Sender, + max_batch_size: usize, } impl PushBatchStoreStateServer { /// Start out not knowing anything about the remote peer. - fn new() -> Self { + fn new(max_batch_size: usize) -> Self { Self { state: sync::watch::channel(BatchStoreState { first: BatchNumber(0), last: None, }) .0, + max_batch_size, } } } @@ -113,10 +115,7 @@ impl rpc::Handler for &PushBlockStoreStateServ #[async_trait] impl rpc::Handler for &PushBatchStoreStateServer { fn max_req_size(&self) -> usize { - // XXX: The request will actually contain a `SyncBatch` which has all the blocks in the batch, - // so a constant 10kB cannot be the right limit. There is a `max_block_size` in config which - // should come into play, with some other limit on the batch size. - 10 * kB + self.max_batch_size.saturating_add(kB) } async fn handle( &self, @@ -178,7 +177,7 @@ impl Network { ctx, self.cfg.rpc.push_batch_store_state_rate, ); - let push_batch_store_state_server = PushBatchStoreStateServer::new(); + let push_batch_store_state_server = PushBatchStoreStateServer::new(self.cfg.max_batch_size); scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_client(&push_validator_addrs_client) @@ -356,10 +355,8 @@ impl Network { let ctx_with_timeout = self.cfg.rpc.get_batch_timeout.map(|t| ctx.with_timeout(t)); let ctx = ctx_with_timeout.as_ref().unwrap_or(ctx); - // XXX: `max_block_size` isn't the right limit here as the response - // will contain all blocks of a batch. let batch = call - .call(ctx, &req, self.cfg.max_block_size.saturating_add(kB)) + .call(ctx, &req, self.cfg.max_batch_size.saturating_add(kB)) .await? .0 .context("empty response")?; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 2a8a6497..dfc28dd5 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -108,6 +108,7 @@ where static_outbound: HashMap::default(), }, max_block_size: usize::MAX, + max_batch_size: usize::MAX, tcp_accept_rate: limiter::Rate::INF, rpc: RpcConfig::default(), max_block_queue_size: 10, @@ -146,6 +147,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(), }, max_block_size: usize::MAX, + max_batch_size: usize::MAX, tcp_accept_rate: limiter::Rate::INF, rpc: RpcConfig::default(), max_block_queue_size: 10, diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index a9602812..2b5333b8 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -47,6 +47,7 @@ fn generate_consensus_nodes(nodes: usize, seed_nodes_amount: Option) -> V metrics_server_addr: None, genesis: setup.genesis.clone(), max_payload_size: 1000000, + max_batch_size: 100000000, validator_key: Some(validator_keys[i].clone()), attester_key: Some(attester_keys[i].clone()), node_key: node_keys[i].clone(), diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index 1d0f2057..ad5b19b9 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -76,6 +76,7 @@ fn main() -> anyhow::Result<()> { .map(|port| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port)), genesis: setup.genesis.clone(), max_payload_size: 1000000, + max_batch_size: 100000000, node_key: node_keys[i].clone(), validator_key: validator_keys.get(i).cloned(), attester_key: attester_keys.get(i).cloned(), diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 3ed82d7c..11081d9d 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -17,7 +17,7 @@ use zksync_consensus_network::http; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner}; use zksync_consensus_utils::debug_page; -use zksync_protobuf::{read_required, required, ProtoFmt}; +use zksync_protobuf::{kB, read_required, required, ProtoFmt}; fn read_required_secret_text(text: &Option) -> anyhow::Result { Text::new( @@ -96,6 +96,7 @@ pub struct AppConfig { pub genesis: validator::Genesis, pub max_payload_size: usize, + pub max_batch_size: usize, pub validator_key: Option, pub attester_key: Option, @@ -126,6 +127,16 @@ impl ProtoFmt for AppConfig { ProtoFmt::read(e).with_context(|| format!("gossip_static_outbound[{i}]"))?; gossip_static_outbound.insert(node_addr.key, node_addr.addr); } + + let max_payload_size = required(&r.max_payload_size) + .and_then(|x| Ok((*x).try_into()?)) + .context("max_payload_size")?; + + let max_batch_size = match &r.max_batch_size { + Some(x) => (*x).try_into().context("max_payload_size")?, + None => max_payload_size * 100 + kB, // Merkle proof is ~1kB and we have a batch per minute. + }; + Ok(Self { server_addr: read_required_text(&r.server_addr).context("server_addr")?, public_addr: net::Host(required(&r.public_addr).context("public_addr")?.clone()), @@ -134,9 +145,8 @@ impl ProtoFmt for AppConfig { .context("metrics_server_addr")?, genesis: read_required(&r.genesis).context("genesis")?, - max_payload_size: required(&r.max_payload_size) - .and_then(|x| Ok((*x).try_into()?)) - .context("max_payload_size")?, + max_payload_size, + max_batch_size, // TODO: read secret. validator_key: read_optional_secret_text(&r.validator_secret_key) .context("validator_secret_key")?, @@ -180,6 +190,7 @@ impl ProtoFmt for AppConfig { genesis: Some(self.genesis.build()), max_payload_size: Some(self.max_payload_size.try_into().unwrap()), + max_batch_size: Some(self.max_batch_size.try_into().unwrap()), validator_secret_key: self.validator_key.as_ref().map(TextFmt::encode), attester_secret_key: self.attester_key.as_ref().map(TextFmt::encode), @@ -257,6 +268,7 @@ impl Configs { gossip_static_inbound: self.app.gossip_static_inbound.clone(), gossip_static_outbound: self.app.gossip_static_outbound.clone(), max_payload_size: self.app.max_payload_size, + max_batch_size: self.app.max_batch_size, rpc: executor::RpcConfig::default(), debug_page: self.app.debug_page.as_ref().map(|debug_page_config| { http::DebugPageConfig { diff --git a/node/tools/src/proto/mod.proto b/node/tools/src/proto/mod.proto index 0fdc8461..4fd799c5 100644 --- a/node/tools/src/proto/mod.proto +++ b/node/tools/src/proto/mod.proto @@ -80,6 +80,9 @@ message AppConfig { // Maximal size of the block payload. optional uint64 max_payload_size = 5; // required; bytes + // Maximal size of the sync batch payload. + optional uint64 max_batch_size = 17; // optional; bytes + // Validator secret key. optional string validator_secret_key = 10; // optional; ValidatorSecretKey diff --git a/node/tools/src/tests.rs b/node/tools/src/tests.rs index c4e0389e..fb323e6f 100644 --- a/node/tools/src/tests.rs +++ b/node/tools/src/tests.rs @@ -17,6 +17,7 @@ impl Distribution for EncodeDist { genesis: rng.gen(), max_payload_size: rng.gen(), + max_batch_size: rng.gen(), validator_key: self.sample_opt(|| rng.gen()), attester_key: self.sample_opt(|| rng.gen()), From a5bc7dc862fdfa43cd752a3eb6a5c6551c22b3ee Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 9 Jul 2024 16:42:42 +0100 Subject: [PATCH 8/8] BFT-474: Add comments about queued and persisted --- node/libs/storage/src/batch_store.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index 906a19db..e6a0bbfb 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -115,8 +115,25 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { #[derive(Debug)] struct Inner { /// Batches that are queued to be persisted. + /// + /// This reflects the state of the `cache`. Its source is mainly the gossip layer (the RPC protocols started in `Network::run_stream`): + /// * the node pushes `SyncBatch` records which appear in `queued` to its gossip peers + /// * the node pulls `SyncBatch` records that it needs from gossip peers that reported to have them, and adds them to `queued` + /// * the `BatchStoreRunner` looks for new items in `queued` and pushes them into the `PersistentBatchStore` + /// + /// XXX: There doesn't seem to be anything that currently actively pushes into `queued` from outside gossip, + /// like it happens with the `BlockStore::queue_block` being called from BFT. queued: BatchStoreState, /// Batches that are already persisted. + /// + /// This reflects the state of the database. Its source is mainly the `PersistentBatchStore`: + /// * the `BatchStoreRunner` subscribes to `PersistedBatchStore::persisted()` and copies its contents to here; + /// it also uses the opportunity to clear items from the `cache` but notably doesn't update `queued` which + /// which would cause the data to be gossiped + /// + /// Be careful that the `BatchStoreState` works with `SyncBatch` which requires a `proof` of inclusion on L1, + /// so this persistence is much delayed compared to the latest batch physically available in the database: + /// the batch also has to be signed by attesters, submitted to L1, and finalised there to appear here. persisted: BatchStoreState, cache: VecDeque, }