diff --git a/infrastructure/loadtests/ansible/playbook.yml b/infrastructure/loadtests/ansible/playbook.yml index 46b87413..9864774a 100644 --- a/infrastructure/loadtests/ansible/playbook.yml +++ b/infrastructure/loadtests/ansible/playbook.yml @@ -86,7 +86,7 @@ mode: "0755" - name: Start executor binary in background - shell: "nohup ./executor > /var/log/zksync-bft.log 2>&1 &" + shell: "nohup ./executor --database=./database > /var/log/zksync-bft.log 2>&1 &" args: chdir: /etc/zksync-bft/ environment: diff --git a/node/components/bft/src/chonky_bft/new_view.rs b/node/components/bft/src/chonky_bft/new_view.rs index ff99f69e..5bb7c6f3 100644 --- a/node/components/bft/src/chonky_bft/new_view.rs +++ b/node/components/bft/src/chonky_bft/new_view.rs @@ -55,6 +55,18 @@ impl StateMachine { let message = &signed_message.msg; let author = &signed_message.key; + // If the replica is already in this view (and the message is NOT from the leader), then ignore it. + // See `start_new_view()` for explanation why we need to process the message from the + // leader. + if message.view().number < self.view_number + || (message.view().number == self.view_number + && author != &self.config.genesis().view_leader(self.view_number)) + { + return Err(Error::Old { + current_view: self.view_number, + }); + } + // Check that the message signer is in the validator committee. if !self.config.genesis().validators.contains(author) { return Err(Error::NonValidatorSigner { @@ -62,13 +74,6 @@ impl StateMachine { }); } - // If the message is from a past view, ignore it. - if message.view().number < self.view_number { - return Err(Error::Old { - current_view: self.view_number, - }); - } - // ----------- Checking the signed part of the message -------------- // Check the signature on the message. @@ -118,9 +123,23 @@ impl StateMachine { ) -> ctx::Result<()> { // Update the state machine. self.view_number = view; + metrics::METRICS.replica_view_number.set(self.view_number.0); self.phase = validator::Phase::Prepare; + // It is important that the proposal and new_view messages from the leader + // will contain the same justification. + // Proposal cannot be produced before previous block is processed, + // therefore leader needs to make sure that the high_commit_qc is delivered + // to all replicas, so that the finalized block is distributed over the network. + // In particular it is not guaranteed that the leader has the finalized block when + // sending the NewView, so it might need to wait for the finalized block. + // + // Note that for this process to work e2e, the replicas should NOT ignore the NewView from + // the leader, even if they already advanced to the given view. + // Note that the order of NewView and proposal messages doesn't matter, because + // proposal is a superset of NewView message. + let justification = self.get_justification(); self.proposer_sender - .send(Some(self.get_justification())) + .send(Some(justification.clone())) .expect("justification_watch.send() failed"); // Clear the block proposal cache. @@ -139,7 +158,7 @@ impl StateMachine { .secret_key .sign_msg(validator::ConsensusMsg::ReplicaNewView( validator::ReplicaNewView { - justification: self.get_justification(), + justification: justification.clone(), }, )), }; @@ -147,7 +166,6 @@ impl StateMachine { // Log the event and update the metrics. tracing::info!("Starting view {}", self.view_number); - metrics::METRICS.replica_view_number.set(self.view_number.0); let now = ctx.now(); metrics::METRICS .view_latency diff --git a/node/components/bft/src/chonky_bft/testonly.rs b/node/components/bft/src/chonky_bft/testonly.rs index 948369c1..50bdc169 100644 --- a/node/components/bft/src/chonky_bft/testonly.rs +++ b/node/components/bft/src/chonky_bft/testonly.rs @@ -150,9 +150,15 @@ impl UTHarness { self.try_recv().unwrap().msg } - pub(crate) async fn new_replica_new_view(&self) -> validator::ReplicaNewView { - let justification = self.replica.get_justification(); - validator::ReplicaNewView { justification } + pub(crate) async fn new_replica_new_view( + &mut self, + ctx: &ctx::Ctx, + ) -> validator::ReplicaNewView { + validator::ReplicaNewView { + justification: validator::ProposalJustification::Timeout( + self.new_timeout_qc(ctx).await, + ), + } } pub(crate) async fn new_commit_qc( @@ -169,19 +175,14 @@ impl UTHarness { qc } - // #[allow(dead_code)] - // pub(crate) fn new_timeout_qc( - // &mut self, - // mutate_fn: impl FnOnce(&mut validator::ReplicaTimeout), - // ) -> validator::TimeoutQC { - // let mut msg = self.new_replica_timeout(); - // mutate_fn(&mut msg); - // let mut qc = validator::TimeoutQC::new(msg.view.clone()); - // for key in &self.keys { - // qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap(); - // } - // qc - // } + pub(crate) async fn new_timeout_qc(&mut self, ctx: &ctx::Ctx) -> validator::TimeoutQC { + let msg = self.new_replica_timeout(ctx).await; + let mut qc = validator::TimeoutQC::new(msg.view); + for key in &self.keys { + qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap(); + } + qc + } pub(crate) async fn process_leader_proposal( &mut self, diff --git a/node/components/bft/src/chonky_bft/tests/new_view.rs b/node/components/bft/src/chonky_bft/tests/new_view.rs index 7f1f6550..ad1086dd 100644 --- a/node/components/bft/src/chonky_bft/tests/new_view.rs +++ b/node/components/bft/src/chonky_bft/tests/new_view.rs @@ -117,7 +117,7 @@ async fn new_view_non_validator_signer() { let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - let replica_new_view = util.new_replica_new_view().await; + let replica_new_view = util.new_replica_new_view(ctx).await; let non_validator_key: validator::SecretKey = ctx.rng().gen(); let res = util .process_replica_new_view(ctx, non_validator_key.sign_msg(replica_new_view)) @@ -141,13 +141,19 @@ async fn replica_new_view_old() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); - let replica_new_view = util.new_replica_new_view().await; - util.produce_block(ctx).await; + let replica_new_view = util.new_replica_new_view(ctx).await; + // Sign the messages with non-leader key. + let replica_new_view = util.keys[1].sign_msg(replica_new_view); + + // Process new_view twice. The second time it shouldn't be accepted. + util.process_replica_new_view(ctx, replica_new_view.clone()) + .await + .unwrap(); let res = util - .process_replica_new_view(ctx, util.owner_key().sign_msg(replica_new_view)) + .process_replica_new_view(ctx, replica_new_view.clone()) .await; assert_matches!( @@ -171,7 +177,7 @@ async fn new_view_invalid_sig() { let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - let msg = util.new_replica_new_view().await; + let msg = util.new_replica_new_view(ctx).await; let mut replica_new_view = util.owner_key().sign_msg(msg); replica_new_view.sig = ctx.rng().gen(); diff --git a/node/components/bft/src/testonly/node.rs b/node/components/bft/src/testonly/node.rs index 25030ed4..f4b9459d 100644 --- a/node/components/bft/src/testonly/node.rs +++ b/node/components/bft/src/testonly/node.rs @@ -1,8 +1,7 @@ use crate::{testonly, FromNetworkMessage, PayloadManager, ToNetworkMessage}; use anyhow::Context as _; use std::sync::Arc; -use zksync_concurrency::time; -use zksync_concurrency::{ctx, ctx::channel, scope, sync}; +use zksync_concurrency::{ctx, ctx::channel, scope, sync, time}; use zksync_consensus_network as network; use zksync_consensus_storage as storage; use zksync_consensus_storage::testonly::in_memory; diff --git a/node/components/network/src/config.rs b/node/components/network/src/config.rs index 973c3f91..4d2f2e60 100644 --- a/node/components/network/src/config.rs +++ b/node/components/network/src/config.rs @@ -14,16 +14,10 @@ pub struct RpcConfig { pub push_validator_addrs_rate: limiter::Rate, /// Max rate of sending/receiving push_block_store_state messages. pub push_block_store_state_rate: limiter::Rate, - /// Max rate of sending/receiving push_batch_store_state messages. - pub push_batch_store_state_rate: limiter::Rate, /// Max rate of sending/receiving `get_block` RPCs. pub get_block_rate: limiter::Rate, - /// Max rate of sending/receiving `get_batch` RPCs. - pub get_batch_rate: limiter::Rate, /// Timeout for the `get_block` RPC. pub get_block_timeout: Option, - /// Timeout for the `get_batch` RPC. - pub get_batch_timeout: Option, /// Max rate of sending/receiving consensus messages. pub consensus_rate: limiter::Rate, /// Max rate of sending/receiving PushBatchVotes RPCs. @@ -39,22 +33,13 @@ impl Default for RpcConfig { }, push_block_store_state_rate: limiter::Rate { burst: 2, - refresh: time::Duration::milliseconds(300), - }, - push_batch_store_state_rate: limiter::Rate { - burst: 2, - refresh: time::Duration::milliseconds(300), + refresh: time::Duration::milliseconds(200), }, get_block_rate: limiter::Rate { burst: 10, refresh: time::Duration::milliseconds(100), }, - get_batch_rate: limiter::Rate { - burst: 10, - refresh: time::Duration::milliseconds(100), - }, get_block_timeout: Some(time::Duration::seconds(10)), - get_batch_timeout: Some(time::Duration::seconds(10)), consensus_rate: limiter::Rate { burst: 10, refresh: time::Duration::ZERO, diff --git a/node/components/network/src/consensus/mod.rs b/node/components/network/src/consensus/mod.rs index 8e912079..7c03b463 100644 --- a/node/components/network/src/consensus/mod.rs +++ b/node/components/network/src/consensus/mod.rs @@ -125,7 +125,7 @@ pub(crate) struct Network { impl rpc::Handler for &Network { /// Here we bound the buffering of incoming consensus messages. fn max_req_size(&self) -> usize { - self.gossip.cfg.max_block_size.saturating_add(kB) + self.gossip.cfg.max_block_size.saturating_add(100 * kB) } async fn handle( diff --git a/node/libs/concurrency/src/ctx/clock.rs b/node/libs/concurrency/src/ctx/clock.rs index 68138b34..de9ada2f 100644 --- a/node/libs/concurrency/src/ctx/clock.rs +++ b/node/libs/concurrency/src/ctx/clock.rs @@ -45,7 +45,9 @@ pub struct RealClock; impl RealClock { /// Current time according to the monotone clock. pub fn now(&self) -> time::Instant { - time::Instant::now() + // We use `now()` from tokio, so that `tokio::time::pause()` + // works in tests. + tokio::time::Instant::now().into_std().into() } /// Current time according to the system/walltime clock. pub fn now_utc(&self) -> time::Utc { diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index f6719463..55217aea 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -13,6 +13,7 @@ struct BlockStoreInner { genesis: validator::Genesis, persisted: sync::watch::Sender, blocks: Mutex>, + capacity: Option, pregenesis_blocks: HashMap, } @@ -39,6 +40,7 @@ impl BlockStore { genesis: setup.genesis.clone(), persisted: sync::watch::channel(BlockStoreState { first, last: None }).0, blocks: Mutex::default(), + capacity: None, pregenesis_blocks: setup .blocks .iter() @@ -50,6 +52,22 @@ impl BlockStore { })) } + /// New bounded storage. Old blocks get GC'ed onse the storage capacity is full. + pub fn bounded( + genesis: validator::Genesis, + first: validator::BlockNumber, + capacity: usize, + ) -> Self { + assert!(genesis.first_block <= first); + Self(Arc::new(BlockStoreInner { + genesis, + persisted: sync::watch::channel(BlockStoreState { first, last: None }).0, + blocks: Mutex::default(), + capacity: Some(capacity), + pregenesis_blocks: [].into(), + })) + } + /// Truncates the storage to blocks `>=first`. pub fn truncate(&mut self, first: validator::BlockNumber) { let mut blocks = self.0.blocks.lock().unwrap(); @@ -114,9 +132,16 @@ impl PersistentBlockStore for BlockStore { // It may happen that a block gets fetched which is not needed any more. return Ok(()); } + if let Some(c) = self.0.capacity { + if blocks.len() >= c { + blocks.pop_front(); + } + } if block.number() > want { // Blocks should be stored in order though. - return Err(anyhow::anyhow!("got block {:?}, want {want:?}", block.number()).into()); + return Err( + anyhow::format_err!("got block {:?}, want {want:?}", block.number()).into(), + ); } self.0 .persisted diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index a35841f4..51597798 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -76,7 +76,7 @@ fn main() -> anyhow::Result<()> { .metrics_server_port .map(|port| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port)), genesis: setup.genesis.clone(), - max_payload_size: 1000000, + max_payload_size: args.payload_size, view_timeout: time::Duration::milliseconds(2000), node_key: node_keys[i].clone(), validator_key: validator_keys.get(i).cloned(), diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index b8cef4b7..18c6d636 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -15,7 +15,9 @@ use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, Text use zksync_consensus_executor::{self as executor, attestation}; use zksync_consensus_network as network; use zksync_consensus_roles::{attester, node, validator}; -use zksync_consensus_storage::{BlockStore, BlockStoreRunner}; +use zksync_consensus_storage::{ + testonly::in_memory, BlockStore, BlockStoreRunner, PersistentBlockStore, ReplicaStore, +}; use zksync_protobuf::{ read_optional, read_optional_repr, read_required, required, ProtoFmt, ProtoRepr, }; @@ -251,7 +253,7 @@ pub struct DebugPage { #[derive(Debug)] pub struct Configs { pub app: App, - pub database: PathBuf, + pub database: Option, } impl Configs { @@ -259,8 +261,29 @@ impl Configs { &self, ctx: &ctx::Ctx, ) -> ctx::Result<(executor::Executor, BlockStoreRunner)> { - let store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?; - let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())).await?; + struct Stores { + block: Box, + replica: Box, + } + let stores = if let Some(path) = &self.database { + let store = store::RocksDB::open(self.app.genesis.clone(), path).await?; + Stores { + block: Box::new(store.clone()), + replica: Box::new(store), + } + } else { + let block = in_memory::BlockStore::bounded( + self.app.genesis.clone(), + self.app.genesis.first_block, + 200, + ); + let replica = in_memory::ReplicaStore::default(); + Stores { + block: Box::new(block), + replica: Box::new(replica), + } + }; + let (block_store, runner) = BlockStore::new(ctx, stores.block).await?; let attestation = Arc::new(attestation::Controller::new(self.app.attester_key.clone())); let e = executor::Executor { @@ -308,7 +331,7 @@ impl Configs { .as_ref() .map(|key| executor::Validator { key: key.clone(), - replica_store: Box::new(store.clone()), + replica_store: stores.replica, payload_manager: Box::new(bft::testonly::RandomPayload( self.app.max_payload_size, )), diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index a80bb6b9..6987846f 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -20,8 +20,9 @@ struct Cli { #[arg(long, conflicts_with = "config_path")] config: Option, /// Path to the rocksdb database of the node. - #[arg(long, default_value = "./database")] - database: PathBuf, + /// If not provided, an in-memory database will be used instead. + #[arg(long)] + database: Option, } impl Cli {