From d94652d801ba4d283bea8db4df555fd1b3bf9e42 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 13 Dec 2024 14:10:44 +0100 Subject: [PATCH 1/6] applied upgrades --- infrastructure/loadtests/ansible/playbook.yml | 2 +- .../components/bft/src/chonky_bft/new_view.rs | 34 +++++++++++++------ .../components/bft/src/chonky_bft/testonly.rs | 33 +++++++++--------- .../bft/src/chonky_bft/tests/new_view.rs | 15 +++++--- node/components/network/src/config.rs | 17 +--------- node/components/network/src/consensus/mod.rs | 2 +- node/libs/concurrency/src/ctx/clock.rs | 4 ++- node/libs/storage/src/testonly/in_memory.rs | 27 ++++++++++++++- node/tools/src/config.rs | 33 +++++++++++++++--- node/tools/src/main.rs | 5 +-- 10 files changed, 114 insertions(+), 58 deletions(-) diff --git a/infrastructure/loadtests/ansible/playbook.yml b/infrastructure/loadtests/ansible/playbook.yml index 46b87413..9864774a 100644 --- a/infrastructure/loadtests/ansible/playbook.yml +++ b/infrastructure/loadtests/ansible/playbook.yml @@ -86,7 +86,7 @@ mode: "0755" - name: Start executor binary in background - shell: "nohup ./executor > /var/log/zksync-bft.log 2>&1 &" + shell: "nohup ./executor --database=./database > /var/log/zksync-bft.log 2>&1 &" args: chdir: /etc/zksync-bft/ environment: diff --git a/node/components/bft/src/chonky_bft/new_view.rs b/node/components/bft/src/chonky_bft/new_view.rs index 21ddb65a..52948365 100644 --- a/node/components/bft/src/chonky_bft/new_view.rs +++ b/node/components/bft/src/chonky_bft/new_view.rs @@ -55,6 +55,14 @@ impl StateMachine { let message = &signed_message.msg; let author = &signed_message.key; + // If the replica is already in this view, then ignore it. + // TODO: allow for the same view number for the proposer. + if message.view().number <= self.view_number { + return Err(Error::Old { + current_view: self.view_number, + }); + } + // Check that the message signer is in the validator committee. if !self.config.genesis().validators.contains(author) { return Err(Error::NonValidatorSigner { @@ -62,13 +70,6 @@ impl StateMachine { }); } - // If the message is from a past view, ignore it. - if message.view().number < self.view_number { - return Err(Error::Old { - current_view: self.view_number, - }); - } - // ----------- Checking the signed part of the message -------------- // Check the signature on the message. @@ -118,9 +119,23 @@ impl StateMachine { ) -> ctx::Result<()> { // Update the state machine. self.view_number = view; + metrics::METRICS.replica_view_number.set(self.view_number.0); self.phase = validator::Phase::Prepare; + // It is important that the proposal and new_view messages from the leader + // will contain the same justification. + // Proposal cannot be produced before previous block is processed, + // therefore leader needs to make sure that the high_commit_qc is delivered + // to all replicas, so that the finalized block is distributed over the network. + // In particular it is not guaranteed that the leader has the finalized block when + // sending the NewView, so it might need to wait for the finalized block. + // + // Note that for this process to work e2e, the replicas should NOT ignore th NewView from + // the leader, even if they already advanced to the given view. + // Note that the order of NewView and proposal messages doesn't matter, because + // proposal is a superset of NewView message. + let justification = self.get_justification(); self.proposer_sender - .send(Some(self.get_justification())) + .send(Some(justification.clone())) .expect("justification_watch.send() failed"); // Clear the block proposal cache. @@ -139,7 +154,7 @@ impl StateMachine { .secret_key .sign_msg(validator::ConsensusMsg::ReplicaNewView( validator::ReplicaNewView { - justification: self.get_justification(), + justification: justification.clone(), }, )), }; @@ -147,7 +162,6 @@ impl StateMachine { // Log the event and update the metrics. tracing::info!("Starting view {}", self.view_number); - metrics::METRICS.replica_view_number.set(self.view_number.0); let now = ctx.now(); metrics::METRICS .view_latency diff --git a/node/components/bft/src/chonky_bft/testonly.rs b/node/components/bft/src/chonky_bft/testonly.rs index 7aba4cd7..19cca59b 100644 --- a/node/components/bft/src/chonky_bft/testonly.rs +++ b/node/components/bft/src/chonky_bft/testonly.rs @@ -145,9 +145,15 @@ impl UTHarness { self.try_recv().unwrap().msg } - pub(crate) async fn new_replica_new_view(&self) -> validator::ReplicaNewView { - let justification = self.replica.get_justification(); - validator::ReplicaNewView { justification } + pub(crate) async fn new_replica_new_view( + &mut self, + ctx: &ctx::Ctx, + ) -> validator::ReplicaNewView { + validator::ReplicaNewView { + justification: validator::ProposalJustification::Timeout( + self.new_timeout_qc(ctx).await, + ), + } } pub(crate) async fn new_commit_qc( @@ -164,19 +170,14 @@ impl UTHarness { qc } - // #[allow(dead_code)] - // pub(crate) fn new_timeout_qc( - // &mut self, - // mutate_fn: impl FnOnce(&mut validator::ReplicaTimeout), - // ) -> validator::TimeoutQC { - // let mut msg = self.new_replica_timeout(); - // mutate_fn(&mut msg); - // let mut qc = validator::TimeoutQC::new(msg.view.clone()); - // for key in &self.keys { - // qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap(); - // } - // qc - // } + pub(crate) async fn new_timeout_qc(&mut self, ctx: &ctx::Ctx) -> validator::TimeoutQC { + let msg = self.new_replica_timeout(ctx).await; + let mut qc = validator::TimeoutQC::new(msg.view); + for key in &self.keys { + qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap(); + } + qc + } pub(crate) async fn process_leader_proposal( &mut self, diff --git a/node/components/bft/src/chonky_bft/tests/new_view.rs b/node/components/bft/src/chonky_bft/tests/new_view.rs index 7f1f6550..5cc134da 100644 --- a/node/components/bft/src/chonky_bft/tests/new_view.rs +++ b/node/components/bft/src/chonky_bft/tests/new_view.rs @@ -117,7 +117,7 @@ async fn new_view_non_validator_signer() { let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - let replica_new_view = util.new_replica_new_view().await; + let replica_new_view = util.new_replica_new_view(ctx).await; let non_validator_key: validator::SecretKey = ctx.rng().gen(); let res = util .process_replica_new_view(ctx, non_validator_key.sign_msg(replica_new_view)) @@ -144,10 +144,15 @@ async fn replica_new_view_old() { let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - let replica_new_view = util.new_replica_new_view().await; - util.produce_block(ctx).await; + let replica_new_view = util.new_replica_new_view(ctx).await; + let replica_new_view = util.owner_key().sign_msg(replica_new_view); + + // Process new_view twice. The second time it shouldn't be accepted. + util.process_replica_new_view(ctx, replica_new_view.clone()) + .await + .unwrap(); let res = util - .process_replica_new_view(ctx, util.owner_key().sign_msg(replica_new_view)) + .process_replica_new_view(ctx, replica_new_view.clone()) .await; assert_matches!( @@ -171,7 +176,7 @@ async fn new_view_invalid_sig() { let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - let msg = util.new_replica_new_view().await; + let msg = util.new_replica_new_view(ctx).await; let mut replica_new_view = util.owner_key().sign_msg(msg); replica_new_view.sig = ctx.rng().gen(); diff --git a/node/components/network/src/config.rs b/node/components/network/src/config.rs index 973c3f91..4d2f2e60 100644 --- a/node/components/network/src/config.rs +++ b/node/components/network/src/config.rs @@ -14,16 +14,10 @@ pub struct RpcConfig { pub push_validator_addrs_rate: limiter::Rate, /// Max rate of sending/receiving push_block_store_state messages. pub push_block_store_state_rate: limiter::Rate, - /// Max rate of sending/receiving push_batch_store_state messages. - pub push_batch_store_state_rate: limiter::Rate, /// Max rate of sending/receiving `get_block` RPCs. pub get_block_rate: limiter::Rate, - /// Max rate of sending/receiving `get_batch` RPCs. - pub get_batch_rate: limiter::Rate, /// Timeout for the `get_block` RPC. pub get_block_timeout: Option, - /// Timeout for the `get_batch` RPC. - pub get_batch_timeout: Option, /// Max rate of sending/receiving consensus messages. pub consensus_rate: limiter::Rate, /// Max rate of sending/receiving PushBatchVotes RPCs. @@ -39,22 +33,13 @@ impl Default for RpcConfig { }, push_block_store_state_rate: limiter::Rate { burst: 2, - refresh: time::Duration::milliseconds(300), - }, - push_batch_store_state_rate: limiter::Rate { - burst: 2, - refresh: time::Duration::milliseconds(300), + refresh: time::Duration::milliseconds(200), }, get_block_rate: limiter::Rate { burst: 10, refresh: time::Duration::milliseconds(100), }, - get_batch_rate: limiter::Rate { - burst: 10, - refresh: time::Duration::milliseconds(100), - }, get_block_timeout: Some(time::Duration::seconds(10)), - get_batch_timeout: Some(time::Duration::seconds(10)), consensus_rate: limiter::Rate { burst: 10, refresh: time::Duration::ZERO, diff --git a/node/components/network/src/consensus/mod.rs b/node/components/network/src/consensus/mod.rs index 8e912079..7c03b463 100644 --- a/node/components/network/src/consensus/mod.rs +++ b/node/components/network/src/consensus/mod.rs @@ -125,7 +125,7 @@ pub(crate) struct Network { impl rpc::Handler for &Network { /// Here we bound the buffering of incoming consensus messages. fn max_req_size(&self) -> usize { - self.gossip.cfg.max_block_size.saturating_add(kB) + self.gossip.cfg.max_block_size.saturating_add(100 * kB) } async fn handle( diff --git a/node/libs/concurrency/src/ctx/clock.rs b/node/libs/concurrency/src/ctx/clock.rs index 68138b34..de9ada2f 100644 --- a/node/libs/concurrency/src/ctx/clock.rs +++ b/node/libs/concurrency/src/ctx/clock.rs @@ -45,7 +45,9 @@ pub struct RealClock; impl RealClock { /// Current time according to the monotone clock. pub fn now(&self) -> time::Instant { - time::Instant::now() + // We use `now()` from tokio, so that `tokio::time::pause()` + // works in tests. + tokio::time::Instant::now().into_std().into() } /// Current time according to the system/walltime clock. pub fn now_utc(&self) -> time::Utc { diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index f6719463..55217aea 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -13,6 +13,7 @@ struct BlockStoreInner { genesis: validator::Genesis, persisted: sync::watch::Sender, blocks: Mutex>, + capacity: Option, pregenesis_blocks: HashMap, } @@ -39,6 +40,7 @@ impl BlockStore { genesis: setup.genesis.clone(), persisted: sync::watch::channel(BlockStoreState { first, last: None }).0, blocks: Mutex::default(), + capacity: None, pregenesis_blocks: setup .blocks .iter() @@ -50,6 +52,22 @@ impl BlockStore { })) } + /// New bounded storage. Old blocks get GC'ed onse the storage capacity is full. + pub fn bounded( + genesis: validator::Genesis, + first: validator::BlockNumber, + capacity: usize, + ) -> Self { + assert!(genesis.first_block <= first); + Self(Arc::new(BlockStoreInner { + genesis, + persisted: sync::watch::channel(BlockStoreState { first, last: None }).0, + blocks: Mutex::default(), + capacity: Some(capacity), + pregenesis_blocks: [].into(), + })) + } + /// Truncates the storage to blocks `>=first`. pub fn truncate(&mut self, first: validator::BlockNumber) { let mut blocks = self.0.blocks.lock().unwrap(); @@ -114,9 +132,16 @@ impl PersistentBlockStore for BlockStore { // It may happen that a block gets fetched which is not needed any more. return Ok(()); } + if let Some(c) = self.0.capacity { + if blocks.len() >= c { + blocks.pop_front(); + } + } if block.number() > want { // Blocks should be stored in order though. - return Err(anyhow::anyhow!("got block {:?}, want {want:?}", block.number()).into()); + return Err( + anyhow::format_err!("got block {:?}, want {want:?}", block.number()).into(), + ); } self.0 .persisted diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 8399e036..4874c77d 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -15,7 +15,9 @@ use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, Text use zksync_consensus_executor::{self as executor, attestation}; use zksync_consensus_network as network; use zksync_consensus_roles::{attester, node, validator}; -use zksync_consensus_storage::{BlockStore, BlockStoreRunner}; +use zksync_consensus_storage::{ + testonly::in_memory, BlockStore, BlockStoreRunner, PersistentBlockStore, ReplicaStore, +}; use zksync_protobuf::{ read_optional, read_optional_repr, read_required, required, ProtoFmt, ProtoRepr, }; @@ -248,7 +250,7 @@ pub struct DebugPage { #[derive(Debug)] pub struct Configs { pub app: App, - pub database: PathBuf, + pub database: Option, } impl Configs { @@ -256,8 +258,29 @@ impl Configs { &self, ctx: &ctx::Ctx, ) -> ctx::Result<(executor::Executor, BlockStoreRunner)> { - let store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?; - let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())).await?; + struct Stores { + block: Box, + replica: Box, + } + let stores = if let Some(path) = &self.database { + let store = store::RocksDB::open(self.app.genesis.clone(), path).await?; + Stores { + block: Box::new(store.clone()), + replica: Box::new(store), + } + } else { + let block = in_memory::BlockStore::bounded( + self.app.genesis.clone(), + self.app.genesis.first_block, + 200, + ); + let replica = in_memory::ReplicaStore::default(); + Stores { + block: Box::new(block), + replica: Box::new(replica), + } + }; + let (block_store, runner) = BlockStore::new(ctx, stores.block).await?; let attestation = Arc::new(attestation::Controller::new(self.app.attester_key.clone())); let e = executor::Executor { @@ -304,7 +327,7 @@ impl Configs { .as_ref() .map(|key| executor::Validator { key: key.clone(), - replica_store: Box::new(store.clone()), + replica_store: stores.replica, payload_manager: Box::new(bft::testonly::RandomPayload( self.app.max_payload_size, )), diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index a80bb6b9..b0b522f3 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -20,8 +20,9 @@ struct Cli { #[arg(long, conflicts_with = "config_path")] config: Option, /// Path to the rocksdb database of the node. - #[arg(long, default_value = "./database")] - database: PathBuf, + /// If not provided, an inmemory database will be used instead. + #[arg(long)] + database: Option, } impl Cli { From eda27544830de3401a0599b4c60c5001a6873caf Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 13 Dec 2024 17:13:06 +0100 Subject: [PATCH 2/6] smaller blocks --- node/tools/src/bin/localnet_config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index 0769a62f..b5b14784 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -75,7 +75,7 @@ fn main() -> anyhow::Result<()> { .metrics_server_port .map(|port| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port)), genesis: setup.genesis.clone(), - max_payload_size: 1000000, + max_payload_size: 100000, node_key: node_keys[i].clone(), validator_key: validator_keys.get(i).cloned(), attester_key: attester_keys.get(i).cloned(), From 89cca7f775af678f7a4452b7d30d18777ab5851f Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 16 Dec 2024 11:37:22 +0100 Subject: [PATCH 3/6] author eligible --- node/components/bft/src/chonky_bft/new_view.rs | 10 +++++++--- node/tools/src/bin/localnet_config.rs | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/node/components/bft/src/chonky_bft/new_view.rs b/node/components/bft/src/chonky_bft/new_view.rs index 52948365..7054bed1 100644 --- a/node/components/bft/src/chonky_bft/new_view.rs +++ b/node/components/bft/src/chonky_bft/new_view.rs @@ -55,9 +55,13 @@ impl StateMachine { let message = &signed_message.msg; let author = &signed_message.key; - // If the replica is already in this view, then ignore it. - // TODO: allow for the same view number for the proposer. - if message.view().number <= self.view_number { + // If the replica is already in this view (and the message is NOT from the leader), then ignore it. + // See `start_new_view()` for explanation why we need to process the message from the + // leader. + if message.view().number < self.view_number + || (message.view().number == self.view_number + && author != &self.config.genesis().view_leader(self.view_number)) + { return Err(Error::Old { current_view: self.view_number, }); diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index b5b14784..2f2a3252 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -75,7 +75,7 @@ fn main() -> anyhow::Result<()> { .metrics_server_port .map(|port| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port)), genesis: setup.genesis.clone(), - max_payload_size: 100000, + max_payload_size: args.payload_size, node_key: node_keys[i].clone(), validator_key: validator_keys.get(i).cloned(), attester_key: attester_keys.get(i).cloned(), From 5e32ac73d950e1536d6daa94fec35a6152dd280e Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 16 Dec 2024 11:46:38 +0100 Subject: [PATCH 4/6] fixed test --- node/components/bft/src/chonky_bft/tests/new_view.rs | 5 +++-- node/components/bft/src/testonly/node.rs | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node/components/bft/src/chonky_bft/tests/new_view.rs b/node/components/bft/src/chonky_bft/tests/new_view.rs index 5cc134da..ad1086dd 100644 --- a/node/components/bft/src/chonky_bft/tests/new_view.rs +++ b/node/components/bft/src/chonky_bft/tests/new_view.rs @@ -141,11 +141,12 @@ async fn replica_new_view_old() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); let replica_new_view = util.new_replica_new_view(ctx).await; - let replica_new_view = util.owner_key().sign_msg(replica_new_view); + // Sign the messages with non-leader key. + let replica_new_view = util.keys[1].sign_msg(replica_new_view); // Process new_view twice. The second time it shouldn't be accepted. util.process_replica_new_view(ctx, replica_new_view.clone()) diff --git a/node/components/bft/src/testonly/node.rs b/node/components/bft/src/testonly/node.rs index 25030ed4..f4b9459d 100644 --- a/node/components/bft/src/testonly/node.rs +++ b/node/components/bft/src/testonly/node.rs @@ -1,8 +1,7 @@ use crate::{testonly, FromNetworkMessage, PayloadManager, ToNetworkMessage}; use anyhow::Context as _; use std::sync::Arc; -use zksync_concurrency::time; -use zksync_concurrency::{ctx, ctx::channel, scope, sync}; +use zksync_concurrency::{ctx, ctx::channel, scope, sync, time}; use zksync_consensus_network as network; use zksync_consensus_storage as storage; use zksync_consensus_storage::testonly::in_memory; From b2bee8ae02bc58589c82935e19fe25984cf4c95c Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 16 Dec 2024 15:51:25 +0100 Subject: [PATCH 5/6] Update node/tools/src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bruno França --- node/tools/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index b0b522f3..6987846f 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -20,7 +20,7 @@ struct Cli { #[arg(long, conflicts_with = "config_path")] config: Option, /// Path to the rocksdb database of the node. - /// If not provided, an inmemory database will be used instead. + /// If not provided, an in-memory database will be used instead. #[arg(long)] database: Option, } From 5e46e1808d5bf606fc7e2993a61acb03313c6f3f Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 16 Dec 2024 15:51:39 +0100 Subject: [PATCH 6/6] Update node/components/bft/src/chonky_bft/new_view.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bruno França --- node/components/bft/src/chonky_bft/new_view.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/components/bft/src/chonky_bft/new_view.rs b/node/components/bft/src/chonky_bft/new_view.rs index fe3c3b60..5bb7c6f3 100644 --- a/node/components/bft/src/chonky_bft/new_view.rs +++ b/node/components/bft/src/chonky_bft/new_view.rs @@ -133,7 +133,7 @@ impl StateMachine { // In particular it is not guaranteed that the leader has the finalized block when // sending the NewView, so it might need to wait for the finalized block. // - // Note that for this process to work e2e, the replicas should NOT ignore th NewView from + // Note that for this process to work e2e, the replicas should NOT ignore the NewView from // the leader, even if they already advanced to the given view. // Note that the order of NewView and proposal messages doesn't matter, because // proposal is a superset of NewView message.