From 7d5351625d5f8c018c97c5293050f87b69393bcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Fran=C3=A7a?= Date: Wed, 30 Oct 2024 00:15:48 +0000 Subject: [PATCH] More unit tests. --- node/actors/bft/src/chonky_bft/mod.rs | 13 +- node/actors/bft/src/chonky_bft/new_view.rs | 4 +- node/actors/bft/src/chonky_bft/proposer.rs | 4 +- node/actors/bft/src/chonky_bft/testonly.rs | 38 +- .../actors/bft/src/chonky_bft/tests/commit.rs | 8 +- node/actors/bft/src/chonky_bft/tests/mod.rs | 64 +++- .../bft/src/chonky_bft/tests/new_view.rs | 204 +++++++++++ .../bft/src/chonky_bft/tests/proposer.rs | 40 +++ .../bft/src/chonky_bft/tests/timeout.rs | 10 +- node/actors/bft/src/chonky_bft/timeout.rs | 2 +- node/actors/bft/src/lib.rs | 14 +- node/actors/bft/src/tests.rs | 334 ++++++------------ 12 files changed, 450 insertions(+), 285 deletions(-) create mode 100644 node/actors/bft/src/chonky_bft/tests/new_view.rs create mode 100644 node/actors/bft/src/chonky_bft/tests/proposer.rs diff --git a/node/actors/bft/src/chonky_bft/mod.rs b/node/actors/bft/src/chonky_bft/mod.rs index 37adaddc..2be467a4 100644 --- a/node/actors/bft/src/chonky_bft/mod.rs +++ b/node/actors/bft/src/chonky_bft/mod.rs @@ -21,7 +21,7 @@ pub(crate) mod proposer; pub(crate) mod timeout; #[cfg(test)] -mod testonly; +pub(crate) mod testonly; #[cfg(test)] mod tests; @@ -35,9 +35,9 @@ pub(crate) struct StateMachine { pub(super) outbound_pipe: OutputSender, /// Pipe through which replica receives network requests. pub(crate) inbound_pipe: sync::prunable_mpsc::Receiver, - /// The sender part of the justification watch. This is used to set the justification - /// and notify the proposer loop. - pub(crate) justification_watch: sync::watch::Sender>, + /// The sender part of the proposer watch channel. This is used to notify the proposer loop + /// and send the neeeded justification. + pub(crate) proposer_pipe: sync::watch::Sender>, /// The current view number. pub(crate) view_number: validator::ViewNumber, @@ -83,6 +83,7 @@ impl StateMachine { ctx: &ctx::Ctx, config: Arc, outbound_pipe: OutputSender, + proposer_pipe: sync::watch::Sender>, ) -> ctx::Result<(Self, sync::prunable_mpsc::Sender)> { let backup = config.replica_store.state(ctx).await?; @@ -99,12 +100,11 @@ impl StateMachine { StateMachine::inbound_selection_function, ); - let (justification_sender, _) = sync::watch::channel(None); - let this = Self { config, outbound_pipe, inbound_pipe: recv, + proposer_pipe, view_number: backup.view, phase: backup.phase, high_vote: backup.high_vote, @@ -115,7 +115,6 @@ impl StateMachine { commit_qcs_cache: BTreeMap::new(), timeout_views_cache: BTreeMap::new(), timeout_qcs_cache: BTreeMap::new(), - justification_watch: justification_sender, timeout_deadline: time::Deadline::Finite(ctx.now() + Self::TIMEOUT_DURATION), phase_start: ctx.now(), }; diff --git a/node/actors/bft/src/chonky_bft/new_view.rs b/node/actors/bft/src/chonky_bft/new_view.rs index e56a9d52..5d85af46 100644 --- a/node/actors/bft/src/chonky_bft/new_view.rs +++ b/node/actors/bft/src/chonky_bft/new_view.rs @@ -115,7 +115,9 @@ impl StateMachine { // Update the state machine. self.view_number = view; self.phase = validator::Phase::Prepare; - // TODO: Update the proposer channel. + self.proposer_pipe + .send(Some(self.get_justification())) + .expect("justification_watch.send() failed"); // Clear the block proposal cache. if let Some(qc) = self.high_commit_qc.as_ref() { diff --git a/node/actors/bft/src/chonky_bft/proposer.rs b/node/actors/bft/src/chonky_bft/proposer.rs index 7d564265..e460ba6e 100644 --- a/node/actors/bft/src/chonky_bft/proposer.rs +++ b/node/actors/bft/src/chonky_bft/proposer.rs @@ -14,7 +14,7 @@ pub(crate) const PROPOSAL_CREATION_TIMEOUT: time::Duration = time::Duration::mil pub(crate) async fn run_proposer( ctx: &ctx::Ctx, cfg: Arc, - pipe: OutputSender, + outbound_pipe: OutputSender, mut justification_watch: sync::watch::Receiver>, ) -> ctx::Result<()> { loop { @@ -49,7 +49,7 @@ pub(crate) async fn run_proposer( .secret_key .sign_msg(validator::ConsensusMsg::LeaderProposal(proposal)); - pipe.send(ConsensusInputMessage { message: msg }.into()); + outbound_pipe.send(ConsensusInputMessage { message: msg }.into()); } } diff --git a/node/actors/bft/src/chonky_bft/testonly.rs b/node/actors/bft/src/chonky_bft/testonly.rs index 0822ef2e..a8f512d2 100644 --- a/node/actors/bft/src/chonky_bft/testonly.rs +++ b/node/actors/bft/src/chonky_bft/testonly.rs @@ -6,8 +6,8 @@ use crate::{ }; use assert_matches::assert_matches; use std::sync::Arc; -use zksync_concurrency::ctx; use zksync_concurrency::sync::prunable_mpsc; +use zksync_concurrency::{ctx, sync}; use zksync_consensus_network as network; use zksync_consensus_network::io::ConsensusReq; use zksync_consensus_roles::validator; @@ -28,8 +28,9 @@ pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000; pub(crate) struct UTHarness { pub(crate) replica: StateMachine, pub(crate) keys: Vec, - output_pipe: ctx::channel::UnboundedReceiver, - input_pipe: prunable_mpsc::Sender, + pub(crate) outbound_pipe: ctx::channel::UnboundedReceiver, + pub(crate) inbound_pipe: prunable_mpsc::Sender, + pub(crate) proposer_pipe: sync::watch::Receiver>, } impl UTHarness { @@ -63,6 +64,7 @@ impl UTHarness { let setup = validator::testonly::Setup::new(rng, num_validators); let store = TestMemoryStorage::new(ctx, &setup).await; let (send, recv) = ctx::channel::unbounded(); + let (proposer_sender, proposer_receiver) = sync::watch::channel(None); let cfg = Arc::new(Config { secret_key: setup.validator_keys[0].clone(), @@ -71,14 +73,16 @@ impl UTHarness { payload_manager, max_payload_size: MAX_PAYLOAD_SIZE, }); - let (replica, input_pipe) = StateMachine::start(ctx, cfg.clone(), send.clone()) - .await - .unwrap(); + let (replica, input_pipe) = + StateMachine::start(ctx, cfg.clone(), send.clone(), proposer_sender) + .await + .unwrap(); let mut this = UTHarness { replica, keys: setup.validator_keys.clone(), - output_pipe: recv, - input_pipe, + outbound_pipe: recv, + inbound_pipe: input_pipe, + proposer_pipe: proposer_receiver, }; this.process_replica_timeout_all(ctx, this.new_replica_timeout()) .await; @@ -109,14 +113,6 @@ impl UTHarness { self.genesis().view_leader(view) } - pub(crate) fn set_owner_as_view_leader(&mut self) { - let mut view = self.replica.view_number; - while self.view_leader(view) != self.owner_key().public() { - view = view.next(); - } - self.replica.view_number = view; - } - pub(crate) fn genesis(&self) -> &validator::Genesis { self.replica.config.genesis() } @@ -157,14 +153,14 @@ impl UTHarness { ) -> validator::CommitQC { let mut msg = self.new_replica_commit(ctx).await; mutate_fn(&mut msg); - let mut qc = validator::CommitQC::new(msg, self.genesis()); + let mut qc = validator::CommitQC::new(msg.clone(), self.genesis()); for key in &self.keys { - qc.add(&key.sign_msg(qc.message.clone()), self.genesis()) - .unwrap(); + qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap(); } qc } + #[allow(dead_code)] pub(crate) fn new_timeout_qc( &mut self, mutate_fn: impl FnOnce(&mut validator::ReplicaTimeout), @@ -293,14 +289,14 @@ impl UTHarness { } pub(crate) fn send(&self, msg: validator::Signed) { - self.input_pipe.send(ConsensusReq { + self.inbound_pipe.send(ConsensusReq { msg, ack: zksync_concurrency::oneshot::channel().0, }); } fn try_recv>(&mut self) -> Option> { - self.output_pipe.try_recv().map(|message| match message { + self.outbound_pipe.try_recv().map(|message| match message { OutputMessage::Network(network::io::ConsensusInputMessage { message, .. }) => { message.cast().unwrap() } diff --git a/node/actors/bft/src/chonky_bft/tests/commit.rs b/node/actors/bft/src/chonky_bft/tests/commit.rs index d02507e6..ccf418e0 100644 --- a/node/actors/bft/src/chonky_bft/tests/commit.rs +++ b/node/actors/bft/src/chonky_bft/tests/commit.rs @@ -67,8 +67,9 @@ async fn replica_commit_old() { let mut replica_commit = util.new_replica_commit(ctx).await; replica_commit.view.number = validator::ViewNumber(util.replica.view_number.0 - 1); - let replica_commit = util.owner_key().sign_msg(replica_commit); - let res = util.process_replica_commit(ctx, replica_commit).await; + let res = util + .process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit)) + .await; assert_matches!( res, @@ -160,13 +161,12 @@ async fn commit_invalid_sig() { async fn commit_invalid_message() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); scope::run!(ctx, |ctx, s| async { let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let mut replica_commit = util.new_replica_commit(ctx).await; - replica_commit.view.genesis = rng.gen(); + replica_commit.view.genesis = ctx.rng().gen(); let res = util .process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit)) diff --git a/node/actors/bft/src/chonky_bft/tests/mod.rs b/node/actors/bft/src/chonky_bft/tests/mod.rs index 1cd17292..6780afde 100644 --- a/node/actors/bft/src/chonky_bft/tests/mod.rs +++ b/node/actors/bft/src/chonky_bft/tests/mod.rs @@ -3,7 +3,9 @@ use zksync_concurrency::{ctx, scope}; use zksync_consensus_roles::validator; mod commit; +mod new_view; mod proposal; +mod proposer; mod timeout; /// Sanity check of the happy path. @@ -42,25 +44,16 @@ async fn block_production_timeout() { /// Sanity check of block production with reproposal. #[tokio::test] -async fn reproposal_block_production() { +async fn block_production_timeout_reproposal() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); scope::run!(ctx, |ctx, s| async { let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); - let proposal = util.new_leader_proposal(ctx).await; - let replica_commit = util - .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal.clone())) - .await - .unwrap() - .msg; + let replica_commit = util.new_replica_commit(ctx).await; + let mut timeout = util.new_replica_timeout(); - let mut timeout = validator::ReplicaTimeout { - view: replica_commit.view.clone(), - high_vote: Some(replica_commit.clone()), - high_qc: util.replica.high_commit_qc.clone(), - }; for i in 0..util.genesis().validators.subquorum_threshold() as usize { util.process_replica_timeout(ctx, util.keys[i].sign_msg(timeout.clone())) .await @@ -85,3 +78,50 @@ async fn reproposal_block_production() { .await .unwrap(); } + +/// Testing liveness after the network becomes idle with replica in commit phase. +#[tokio::test] +async fn block_production_timeout_in_commit() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + util.new_replica_commit(ctx).await; + + // Replica is in `Phase::Commit`, but should still accept messages from newer views. + assert_eq!(util.replica.phase, validator::Phase::Commit); + util.produce_block_after_timeout(ctx).await; + + Ok(()) + }) + .await + .unwrap(); +} + +/// Testing liveness after the network becomes idle with replica having some cached commit messages for the current view. +#[tokio::test] +async fn block_production_timeout_some_commits() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + let replica_commit = util.new_replica_commit(ctx).await; + assert!(util + .process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit)) + .await + .unwrap() + .is_none()); + + // Replica is in `Phase::Commit`, but should still accept prepares from newer views. + assert_eq!(util.replica.phase, validator::Phase::Commit); + util.produce_block_after_timeout(ctx).await; + + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/bft/src/chonky_bft/tests/new_view.rs b/node/actors/bft/src/chonky_bft/tests/new_view.rs new file mode 100644 index 00000000..7f1f6550 --- /dev/null +++ b/node/actors/bft/src/chonky_bft/tests/new_view.rs @@ -0,0 +1,204 @@ +use crate::chonky_bft::{new_view, testonly::UTHarness}; +use assert_matches::assert_matches; +use rand::Rng; +use zksync_concurrency::{ctx, scope}; +use zksync_consensus_roles::validator; + +#[tokio::test] +async fn new_view_sanity() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + let commit_1 = validator::ReplicaCommit { + view: util.view().next(), + proposal: validator::BlockHeader { + number: validator::BlockNumber(1), + payload: ctx.rng().gen(), + }, + }; + let mut commit_qc_1 = validator::CommitQC::new(commit_1.clone(), util.genesis()); + for key in &util.keys { + commit_qc_1 + .add(&key.sign_msg(commit_1.clone()), util.genesis()) + .unwrap(); + } + let new_view_1 = validator::ReplicaNewView { + justification: validator::ProposalJustification::Commit(commit_qc_1.clone()), + }; + + let commit_2 = validator::ReplicaCommit { + view: commit_1.view.next(), + proposal: validator::BlockHeader { + number: commit_1.proposal.number.next(), + payload: ctx.rng().gen(), + }, + }; + let mut commit_qc_2 = validator::CommitQC::new(commit_2.clone(), util.genesis()); + for key in &util.keys { + commit_qc_2 + .add(&key.sign_msg(commit_2.clone()), util.genesis()) + .unwrap(); + } + let new_view_2 = validator::ReplicaNewView { + justification: validator::ProposalJustification::Commit(commit_qc_2.clone()), + }; + + let timeout = validator::ReplicaTimeout { + view: commit_2.view.next(), + high_vote: None, + high_qc: Some(commit_qc_2.clone()), + }; + let mut timeout_qc = validator::TimeoutQC::new(timeout.view); + for key in &util.keys { + timeout_qc + .add(&key.sign_msg(timeout.clone()), util.genesis()) + .unwrap(); + } + let new_view_3 = validator::ReplicaNewView { + justification: validator::ProposalJustification::Timeout(timeout_qc.clone()), + }; + + // Check that first new view with commit QC updates the view and high commit QC. + let res = util + .process_replica_new_view(ctx, util.owner_key().sign_msg(new_view_1.clone())) + .await + .unwrap() + .unwrap() + .msg; + assert_eq!(util.view(), new_view_1.view()); + assert_matches!(res.justification, validator::ProposalJustification::Commit(qc) => { + assert_eq!(util.replica.high_commit_qc.clone().unwrap(), qc); + }); + + // Check that the third new view with timeout QC updates the view, high timeout QC and high commit QC. + let res = util + .process_replica_new_view(ctx, util.owner_key().sign_msg(new_view_3.clone())) + .await + .unwrap() + .unwrap() + .msg; + assert_eq!(util.view(), new_view_3.view()); + assert_matches!(res.justification, validator::ProposalJustification::Timeout(qc) => { + assert_eq!(util.replica.high_timeout_qc.clone().unwrap(), qc); + assert_eq!(util.replica.high_commit_qc.clone().unwrap(), qc.high_qc().unwrap().clone()); + }); + + // Check that the second new view with commit QC is ignored and doesn't affect the state. + let res = util + .process_replica_new_view(ctx, util.owner_key().sign_msg(new_view_2.clone())) + .await; + assert_eq!(util.view(), new_view_3.view()); + assert_eq!(util.replica.high_timeout_qc.clone().unwrap(), timeout_qc); + assert_eq!( + util.replica.high_commit_qc.clone().unwrap(), + timeout_qc.high_qc().unwrap().clone() + ); + assert_matches!( + res, + Err(new_view::Error::Old { current_view }) => { + assert_eq!(current_view, util.replica.view_number); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn new_view_non_validator_signer() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let replica_new_view = util.new_replica_new_view().await; + let non_validator_key: validator::SecretKey = ctx.rng().gen(); + let res = util + .process_replica_new_view(ctx, non_validator_key.sign_msg(replica_new_view)) + .await; + + assert_matches!( + res, + Err(new_view::Error::NonValidatorSigner { signer }) => { + assert_eq!(*signer, non_validator_key.public()); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn replica_new_view_old() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let replica_new_view = util.new_replica_new_view().await; + util.produce_block(ctx).await; + let res = util + .process_replica_new_view(ctx, util.owner_key().sign_msg(replica_new_view)) + .await; + + assert_matches!( + res, + Err(new_view::Error::Old { current_view }) => { + assert_eq!(current_view, util.replica.view_number); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn new_view_invalid_sig() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let msg = util.new_replica_new_view().await; + let mut replica_new_view = util.owner_key().sign_msg(msg); + replica_new_view.sig = ctx.rng().gen(); + + let res = util.process_replica_new_view(ctx, replica_new_view).await; + assert_matches!(res, Err(new_view::Error::InvalidSignature(..))); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn new_view_invalid_message() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let res = util + .process_replica_new_view(ctx, util.owner_key().sign_msg(ctx.rng().gen())) + .await; + assert_matches!(res, Err(new_view::Error::InvalidMessage(_))); + + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/bft/src/chonky_bft/tests/proposer.rs b/node/actors/bft/src/chonky_bft/tests/proposer.rs new file mode 100644 index 00000000..521a74d3 --- /dev/null +++ b/node/actors/bft/src/chonky_bft/tests/proposer.rs @@ -0,0 +1,40 @@ +use crate::chonky_bft::{self, commit, testonly::UTHarness}; +use anyhow::{anyhow, Context}; +use assert_matches::assert_matches; +use rand::Rng; +use zksync_concurrency::{ctx, error::Wrap, scope, sync}; +use zksync_consensus_roles::validator; + +// TODO +// /// Sanity check of the happy path. +// #[tokio::test] +// async fn proposer_sanity() { +// zksync_concurrency::testonly::abort_on_panic(); +// let ctx = &ctx::test_root(&ctx::RealClock); +// scope::run!(ctx, |ctx, s| async { +// let (mut util, runner) = UTHarness::new_many(ctx).await; +// let cfg = util.replica.config.clone(); +// let outbound_pipe = util.replica.outbound_pipe.clone(); +// //let proposer_pipe = util.proposer_pipe.clone(); +// let (proposer_sender, proposer_receiver) = sync::watch::channel(None); + +// s.spawn_bg(runner.run(ctx)); +// s.spawn_bg(async { +// let res = +// chonky_bft::proposer::run_proposer(ctx, cfg, outbound_pipe, proposer_receiver) +// .await; + +// match res { +// Ok(()) => Ok(()), +// Err(ctx::Error::Internal(err)) => Err(err), +// Err(ctx::Error::Canceled(_)) => unreachable!(), +// } +// }); + +// //util.produce_block(ctx).await; + +// Ok(()) +// }) +// .await +// .unwrap(); +// } diff --git a/node/actors/bft/src/chonky_bft/tests/timeout.rs b/node/actors/bft/src/chonky_bft/tests/timeout.rs index c2b64ea3..6e4a284d 100644 --- a/node/actors/bft/src/chonky_bft/tests/timeout.rs +++ b/node/actors/bft/src/chonky_bft/tests/timeout.rs @@ -64,8 +64,7 @@ async fn replica_timeout_old() { let mut replica_timeout = util.new_replica_timeout(); replica_timeout.view.number = validator::ViewNumber(util.replica.view_number.0 - 1); - let replica_timeout = util.owner_key().sign_msg(replica_timeout); - let res = util.process_replica_timeout(ctx, replica_timeout).await; + let res = util.process_replica_timeout(ctx, util.owner_key().sign_msg(replica_timeout)).await; assert_matches!( res, @@ -159,7 +158,6 @@ async fn timeout_invalid_sig() { async fn timeout_invalid_message() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); scope::run!(ctx, |ctx, s| async { let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); @@ -167,7 +165,7 @@ async fn timeout_invalid_message() { let replica_timeout = util.new_replica_timeout(); let mut bad_replica_timeout = replica_timeout.clone(); - bad_replica_timeout.view.genesis = rng.gen(); + bad_replica_timeout.view.genesis = ctx.rng().gen(); let res = util .process_replica_timeout(ctx, util.owner_key().sign_msg(bad_replica_timeout)) .await; @@ -179,7 +177,7 @@ async fn timeout_invalid_message() { ); let mut bad_replica_timeout = replica_timeout.clone(); - bad_replica_timeout.high_vote = Some(rng.gen()); + bad_replica_timeout.high_vote = Some(ctx.rng().gen()); let res = util .process_replica_timeout(ctx, util.owner_key().sign_msg(bad_replica_timeout)) .await; @@ -191,7 +189,7 @@ async fn timeout_invalid_message() { ); let mut bad_replica_timeout = replica_timeout.clone(); - bad_replica_timeout.high_qc = Some(rng.gen()); + bad_replica_timeout.high_qc = Some(ctx.rng().gen()); let res = util .process_replica_timeout(ctx, util.owner_key().sign_msg(bad_replica_timeout)) .await; diff --git a/node/actors/bft/src/chonky_bft/timeout.rs b/node/actors/bft/src/chonky_bft/timeout.rs index 3884e947..d01a9d2b 100644 --- a/node/actors/bft/src/chonky_bft/timeout.rs +++ b/node/actors/bft/src/chonky_bft/timeout.rs @@ -187,7 +187,7 @@ impl StateMachine { tracing::info!("Timed out at view {}", self.view_number); metrics::METRICS.replica_view_number.set(self.view_number.0); - // Reset the timeout. This allows us send more timeout messages until the consensus progresses. + // Reset the timeout. This makes us keep sending timeout messages until the consensus progresses. // However, this isn't strictly necessary since the network retries messages until they are delivered. // This is just an extra safety measure. self.timeout_deadline = time::Deadline::Finite(ctx.now() + Self::TIMEOUT_DURATION); diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index b00f227a..3d42e449 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -5,7 +5,7 @@ use anyhow::Context; pub use config::Config; use std::sync::Arc; use tracing::Instrument; -use zksync_concurrency::{ctx, error::Wrap as _, scope}; +use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; use zksync_consensus_roles::validator; use zksync_consensus_utils::pipe::ActorPipe; @@ -14,8 +14,8 @@ mod config; pub mod io; mod metrics; pub mod testonly; -//#[cfg(test)] -//mod tests; +#[cfg(test)] +mod tests; /// Protocol version of this BFT implementation. pub const PROTOCOL_VERSION: validator::ProtocolVersion = validator::ProtocolVersion::CURRENT; @@ -61,15 +61,15 @@ impl Config { } let cfg = Arc::new(self); + let (proposer_sender, proposer_receiver) = sync::watch::channel(None); let (replica, replica_send) = - chonky_bft::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?; + chonky_bft::StateMachine::start(ctx, cfg.clone(), pipe.send.clone(), proposer_sender) + .await?; let res = scope::run!(ctx, |ctx, s| async { - let justification_recv = replica.justification_watch.subscribe(); - s.spawn_bg(async { replica.run(ctx).await.wrap("replica.run()") }); s.spawn_bg(async { - chonky_bft::proposer::run_proposer(ctx, cfg.clone(), pipe.send, justification_recv) + chonky_bft::proposer::run_proposer(ctx, cfg.clone(), pipe.send, proposer_receiver) .await .wrap("run_proposer()") }); diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 1a53c258..dfc63f52 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -1,7 +1,9 @@ -use crate::testonly::{ - twins::{Cluster, HasKey, ScenarioGenerator, Twin}, - ut_harness::UTHarness, - Behavior, Network, Port, PortRouter, PortSplitSchedule, Test, TestError, NUM_PHASES, +use crate::{ + chonky_bft::testonly::UTHarness, + testonly::{ + twins::{Cluster, HasKey, ScenarioGenerator, Twin}, + Behavior, Network, Port, PortRouter, PortSplitSchedule, Test, TestError, NUM_PHASES, + }, }; use assert_matches::assert_matches; use std::collections::HashMap; @@ -48,122 +50,6 @@ async fn offline_real_network() { run_test(Behavior::Offline, Network::Real).await } -/// Testing liveness after the network becomes idle with leader having no cached prepare messages for the current view. -#[tokio::test] -async fn timeout_leader_no_prepares() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - util.new_replica_timeout(); - util.produce_block_after_timeout(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - -/// Testing liveness after the network becomes idle with leader having some cached prepare messages for the current view. -#[tokio::test] -async fn timeout_leader_some_prepares() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - let replica_prepare = util.new_replica_timeout(); - assert!(util - .process_replica_prepare(ctx, util.sign(replica_prepare)) - .await - .unwrap() - .is_none()); - util.produce_block_after_timeout(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - -/// Testing liveness after the network becomes idle with leader in commit phase. -#[tokio::test] -async fn timeout_leader_in_commit() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - - util.new_leader_proposal(ctx).await; - // Leader is in `Phase::Commit`, but should still accept prepares from newer views. - assert_eq!(util.leader.phase, validator::Phase::Commit); - util.produce_block_after_timeout(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - -/// Testing liveness after the network becomes idle with replica in commit phase. -#[tokio::test] -async fn timeout_replica_in_commit() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - - util.new_replica_commit_from_proposal(ctx).await; - // Leader is in `Phase::Commit`, but should still accept prepares from newer views. - assert_eq!(util.leader.phase, validator::Phase::Commit); - util.produce_block_after_timeout(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - -/// Testing liveness after the network becomes idle with leader having some cached commit messages for the current view. -#[tokio::test] -async fn timeout_leader_some_commits() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - - let replica_commit = util.new_replica_commit_from_proposal(ctx).await; - assert!(util - .process_replica_commit(ctx, util.sign(replica_commit)) - .await - .unwrap() - .is_none()); - // Leader is in `Phase::Commit`, but should still accept prepares from newer views. - assert_eq!(util.leader_phase(), validator::Phase::Commit); - util.produce_block_after_timeout(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - -/// Testing liveness after the network becomes idle with leader in a consecutive prepare phase. -#[tokio::test] -async fn timeout_leader_in_consecutive_prepare() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - - util.new_leader_commit(ctx).await; - util.produce_block_after_timeout(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - /// Not being able to propose a block shouldn't cause a deadlock. #[tokio::test] async fn non_proposing_leader() { @@ -435,110 +321,110 @@ async fn run_twins( /// while some other validators have the payload but don't have the HighQC and cannot finalize the block, and therefore /// don't gossip it, which causes a deadlock unless the one with the HighQC moves on and broadcasts what they have, which /// should cause the others to finalize the block and gossip the payload to them in turn. -#[tokio::test] -async fn test_wait_for_finalized_deadlock() { - // These are the conditions for the deadlock to occur: - // * The problem happens in the handling of LeaderPrepare where the replica waits for the previous block in the justification. - // * For that the replica needs to receive a proposal from a leader that knows the previous block is finalized. - // * For that the leader needs to receive a finalized proposal from an earlier leader, but this proposal did not make it to the replica. - // * Both leaders need to die and never communicate the HighQC they know about to anybody else. - // * The replica has the HighQC but not the payload, and all other replicas might have the payload, but not the HighQC. - // * With two leaders down, and the replica deadlocked, we must lose quorum, so the other nodes cannot repropose the missing block either. - // * In order for 2 leaders to be dow and quorum still be possible, we need at least 11 nodes. - - // Here are a series of steps to reproduce the issue: - // 1. Say we have 11 nodes: [0,1,2,3,4,5,6,7,8,9,10], taking turns leading the views in that order; we need 9 nodes for quorum. The first view is view 1 lead by node 1. - // 2. Node 1 sends LeaderPropose with block 1 to nodes [1-9] and puts together a HighQC. - // 3. Node 1 sends the LeaderCommit to node 2, then dies. - // 4. Node 2 sends LeaderPropose with block 2 to nodes [0, 10], then dies. - // 5. Nodes [0, 10] get stuck processing LeaderPropose because they are waiting for block 1 to appear in their stores. - // 6. Node 3 cannot gather 9 ReplicaPrepare messages for a quorum because nodes [1,2] are down and [0,10] are blocking. Consensus stalls. - - // To simulate this with the Twins network we need to use a custom routing function, because the 2nd leader mustn't broadcast the HighQC - // to its peers, but it must receive their ReplicaPrepare's to be able to construct the PrepareQC; because of this the simple split schedule - // would not be enough as it allows sending messages in both directions. - - // We need 11 nodes so we can turn 2 leaders off. - let num_replicas = 11; - // Let's wait for the first two blocks to be finalised. - // Although theoretically node 1 will be dead after view 1, it will still receive messages and gossip. - let blocks_to_finalize = 2; - // We need more than 1 gossip peer, otherwise the chain of gossip triggers in the Twins network won't kick in, - // and while node 0 will gossip to node 1, node 1 will not send it to node 2, and the test will fail. - let gossip_peers = 2; - - run_with_custom_router( - num_replicas, - gossip_peers, - blocks_to_finalize, - |port_to_id| { - PortRouter::Custom(Box::new(move |msg, from, to| { - use validator::ConsensusMsg::*; - // Map ports back to logical node ID - let from = port_to_id[&from]; - let to = port_to_id[&to]; - let view_number = msg.view().number; - - // If we haven't finalised the blocks in the first few rounds, we failed. - if view_number.0 > 7 { - return None; - } - - // Sending to self is ok. - // If this wasn't here the test would pass even without adding a timeout in process_leader_prepare. - // The reason is that node 2 would move to view 2 as soon as it finalises block 1, but then timeout - // and move to view 3 before they receive any of the ReplicaPrepare from the others, who are still - // waiting to timeout in view 1. By sending ReplicaPrepare to itself it seems to wait or propose. - // Maybe the HighQC doesn't make it from its replica::StateMachine into its leader::StateMachine otherwise. - if from == to { - return Some(true); - } - - let can_send = match view_number { - ViewNumber(1) => { - match from { - // Current leader - 1 => match msg { - // Send the proposal to a subset of nodes - LeaderPrepare(_) => to != 0 && to != 10, - // Send the commit to the next leader only - LeaderCommit(_) => to == 2, - _ => true, - }, - // Replicas - _ => true, - } - } - ViewNumber(2) => match from { - // Previous leader is dead - 1 => false, - // Current leader - 2 => match msg { - // Don't send out the HighQC to the others - ReplicaPrepare(_) => false, - // Send the proposal to the ones which didn't get the previous one - LeaderPrepare(_) => to == 0 || to == 10, - _ => true, - }, - // Replicas - _ => true, - }, - // Previous leaders dead - _ => from != 1 && from != 2, - }; - - // eprintln!( - // "view={view_number} from={from} to={to} kind={} can_send={can_send}", - // msg.label() - // ); - - Some(can_send) - })) - }, - ) - .await - .unwrap(); -} +// #[tokio::test] +// async fn test_wait_for_finalized_deadlock() { +// // These are the conditions for the deadlock to occur: +// // * The problem happens in the handling of LeaderPrepare where the replica waits for the previous block in the justification. +// // * For that the replica needs to receive a proposal from a leader that knows the previous block is finalized. +// // * For that the leader needs to receive a finalized proposal from an earlier leader, but this proposal did not make it to the replica. +// // * Both leaders need to die and never communicate the HighQC they know about to anybody else. +// // * The replica has the HighQC but not the payload, and all other replicas might have the payload, but not the HighQC. +// // * With two leaders down, and the replica deadlocked, we must lose quorum, so the other nodes cannot repropose the missing block either. +// // * In order for 2 leaders to be dow and quorum still be possible, we need at least 11 nodes. + +// // Here are a series of steps to reproduce the issue: +// // 1. Say we have 11 nodes: [0,1,2,3,4,5,6,7,8,9,10], taking turns leading the views in that order; we need 9 nodes for quorum. The first view is view 1 lead by node 1. +// // 2. Node 1 sends LeaderPropose with block 1 to nodes [1-9] and puts together a HighQC. +// // 3. Node 1 sends the LeaderCommit to node 2, then dies. +// // 4. Node 2 sends LeaderPropose with block 2 to nodes [0, 10], then dies. +// // 5. Nodes [0, 10] get stuck processing LeaderPropose because they are waiting for block 1 to appear in their stores. +// // 6. Node 3 cannot gather 9 ReplicaPrepare messages for a quorum because nodes [1,2] are down and [0,10] are blocking. Consensus stalls. + +// // To simulate this with the Twins network we need to use a custom routing function, because the 2nd leader mustn't broadcast the HighQC +// // to its peers, but it must receive their ReplicaPrepare's to be able to construct the PrepareQC; because of this the simple split schedule +// // would not be enough as it allows sending messages in both directions. + +// // We need 11 nodes so we can turn 2 leaders off. +// let num_replicas = 11; +// // Let's wait for the first two blocks to be finalised. +// // Although theoretically node 1 will be dead after view 1, it will still receive messages and gossip. +// let blocks_to_finalize = 2; +// // We need more than 1 gossip peer, otherwise the chain of gossip triggers in the Twins network won't kick in, +// // and while node 0 will gossip to node 1, node 1 will not send it to node 2, and the test will fail. +// let gossip_peers = 2; + +// run_with_custom_router( +// num_replicas, +// gossip_peers, +// blocks_to_finalize, +// |port_to_id| { +// PortRouter::Custom(Box::new(move |msg, from, to| { +// use validator::ConsensusMsg::*; +// // Map ports back to logical node ID +// let from = port_to_id[&from]; +// let to = port_to_id[&to]; +// let view_number = msg.view().number; + +// // If we haven't finalised the blocks in the first few rounds, we failed. +// if view_number.0 > 7 { +// return None; +// } + +// // Sending to self is ok. +// // If this wasn't here the test would pass even without adding a timeout in process_leader_prepare. +// // The reason is that node 2 would move to view 2 as soon as it finalises block 1, but then timeout +// // and move to view 3 before they receive any of the ReplicaPrepare from the others, who are still +// // waiting to timeout in view 1. By sending ReplicaPrepare to itself it seems to wait or propose. +// // Maybe the HighQC doesn't make it from its replica::StateMachine into its leader::StateMachine otherwise. +// if from == to { +// return Some(true); +// } + +// let can_send = match view_number { +// ViewNumber(1) => { +// match from { +// // Current leader +// 1 => match msg { +// // Send the proposal to a subset of nodes +// LeaderPrepare(_) => to != 0 && to != 10, +// // Send the commit to the next leader only +// LeaderCommit(_) => to == 2, +// _ => true, +// }, +// // Replicas +// _ => true, +// } +// } +// ViewNumber(2) => match from { +// // Previous leader is dead +// 1 => false, +// // Current leader +// 2 => match msg { +// // Don't send out the HighQC to the others +// ReplicaPrepare(_) => false, +// // Send the proposal to the ones which didn't get the previous one +// LeaderPrepare(_) => to == 0 || to == 10, +// _ => true, +// }, +// // Replicas +// _ => true, +// }, +// // Previous leaders dead +// _ => from != 1 && from != 2, +// }; + +// // eprintln!( +// // "view={view_number} from={from} to={to} kind={} can_send={can_send}", +// // msg.label() +// // ); + +// Some(can_send) +// })) +// }, +// ) +// .await +// .unwrap(); +// } /// Run a test with the Twins network controlling exactly who can send to whom in each round. ///