Skip to content

Commit

Permalink
More review fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Nov 1, 2024
1 parent 3b85cb6 commit 77b1a0f
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,38 +1,9 @@
use super::StateMachine;
use std::cmp::max;
use zksync_concurrency::{ctx, error::Wrap as _};
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

impl StateMachine {
/// Makes a justification (for a ReplicaNewView or a LeaderProposal) based on the current state.
pub(crate) fn get_justification(&self) -> validator::ProposalJustification {
// We need some QC in order to be able to create a justification.
// In fact, it should be impossible to get here without a QC. Because
// we only get here after starting a new view, which requires a QC.
assert!(self.high_commit_qc.is_some() || self.high_timeout_qc.is_some());

// We use the highest QC as the justification. If both have the same view, we use the CommitQC.
if self.high_commit_qc.as_ref().map(|x| x.view())
>= self.high_timeout_qc.as_ref().map(|x| &x.view)
{
validator::ProposalJustification::Commit(self.high_commit_qc.clone().unwrap())
} else {
validator::ProposalJustification::Timeout(self.high_timeout_qc.clone().unwrap())
}
}

/// Processes a (already verified) CommitQC. It bumps the local high_commit_qc and if
/// we have the proposal corresponding to this qc, we save the corresponding block to DB.
pub(crate) async fn process_commit_qc(
&mut self,
ctx: &ctx::Ctx,
qc: &validator::CommitQC,
) -> ctx::Result<()> {
self.high_commit_qc = max(Some(qc.clone()), self.high_commit_qc.clone());
self.save_block(ctx, qc).await.wrap("save_block()")
}

/// Tries to build a finalized block from the given CommitQC. We simply search our
/// block proposal cache for the matching block, and if we find it we build the block.
/// If this method succeeds, it saves the finalized block to storage.
Expand Down
33 changes: 25 additions & 8 deletions node/actors/bft/src/chonky_bft/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{io::OutputMessage, metrics, Config};
use std::{
cmp::max,
collections::{BTreeMap, HashMap},
sync::Arc,
};
Expand All @@ -13,8 +14,8 @@ use zksync_concurrency::{
use zksync_consensus_network::io::ConsensusReq;
use zksync_consensus_roles::validator::{self, ConsensusMsg};

mod block;
mod commit;
mod misc;
mod new_view;
mod proposal;
/// The proposer module contains the logic for the proposer role in ChonkyBFT.
Expand All @@ -25,6 +26,9 @@ pub(crate) mod testonly;
mod tests;
mod timeout;

/// The duration of the view timeout.
pub(crate) const VIEW_TIMEOUT_DURATION: time::Duration = time::Duration::milliseconds(2000);

/// The StateMachine struct contains the state of the replica and implements all the
/// logic of ChonkyBFT.
#[derive(Debug)]
Expand Down Expand Up @@ -70,9 +74,6 @@ pub(crate) struct StateMachine {
}

impl StateMachine {
/// The duration of the view timeout.
pub(crate) const VIEW_TIMEOUT_DURATION: time::Duration = time::Duration::milliseconds(2000);

/// Creates a new [`StateMachine`] instance, attempting to recover a past state from the storage module,
/// otherwise initializes the state machine with the current head block.
///
Expand Down Expand Up @@ -115,7 +116,7 @@ impl StateMachine {
commit_qcs_cache: BTreeMap::new(),
timeout_views_cache: BTreeMap::new(),
timeout_qcs_cache: BTreeMap::new(),
view_timeout: time::Deadline::Finite(ctx.now() + Self::VIEW_TIMEOUT_DURATION),
view_timeout: time::Deadline::Finite(ctx.now() + VIEW_TIMEOUT_DURATION),
view_start: ctx.now(),
};

Expand Down Expand Up @@ -148,12 +149,17 @@ impl StateMachine {
return Ok(());
}

// Check for timeout.
let Some(req) = recv.ok() else {
// Check for timeout. If we are already in a timeout phase, we don't
// timeout again. Note though that the underlying network implementation
// needs to keep retrying messages until they are delivered. Otherwise
// the consensus can halt!
if recv.is_err() && self.phase != validator::Phase::Timeout {
self.start_timeout(ctx).await?;
continue;
};
}

// Process the message.
let req = recv.unwrap();
let now = ctx.now();
let label = match &req.msg.msg {
ConsensusMsg::LeaderProposal(_) => {
Expand Down Expand Up @@ -294,4 +300,15 @@ impl StateMachine {
}
}
}

/// Processes a (already verified) CommitQC. It bumps the local high_commit_qc and if
/// we have the proposal corresponding to this qc, we save the corresponding block to DB.
pub(crate) async fn process_commit_qc(
&mut self,
ctx: &ctx::Ctx,
qc: &validator::CommitQC,
) -> ctx::Result<()> {
self.high_commit_qc = max(Some(qc.clone()), self.high_commit_qc.clone());
self.save_block(ctx, qc).await.wrap("save_block()")
}
}
21 changes: 19 additions & 2 deletions node/actors/bft/src/chonky_bft/new_view.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cmp::max;

use super::StateMachine;
use crate::metrics;
use crate::{chonky_bft::VIEW_TIMEOUT_DURATION, metrics};
use zksync_concurrency::{ctx, error::Wrap, metrics::LatencyHistogramExt as _, time};
use zksync_consensus_network::io::ConsensusInputMessage;
use zksync_consensus_roles::validator;
Expand Down Expand Up @@ -151,8 +151,25 @@ impl StateMachine {
self.view_start = now;

// Reset the timeout.
self.view_timeout = time::Deadline::Finite(ctx.now() + Self::VIEW_TIMEOUT_DURATION);
self.view_timeout = time::Deadline::Finite(ctx.now() + VIEW_TIMEOUT_DURATION);

Ok(())
}

/// Makes a justification (for a ReplicaNewView or a LeaderProposal) based on the current state.
pub(crate) fn get_justification(&self) -> validator::ProposalJustification {
// We need some QC in order to be able to create a justification.
// In fact, it should be impossible to get here without a QC. Because
// we only get here after starting a new view, which requires a QC.
assert!(self.high_commit_qc.is_some() || self.high_timeout_qc.is_some());

// We use the highest QC as the justification. If both have the same view, we use the CommitQC.
if self.high_commit_qc.as_ref().map(|x| x.view())
>= self.high_timeout_qc.as_ref().map(|x| &x.view)
{
validator::ProposalJustification::Commit(self.high_commit_qc.clone().unwrap())
} else {
validator::ProposalJustification::Timeout(self.high_timeout_qc.clone().unwrap())
}
}
}
17 changes: 16 additions & 1 deletion node/actors/bft/src/chonky_bft/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub(crate) enum Error {
/// Leader proposed a block that was already pruned from replica's storage.
#[error("leader proposed a block that was already pruned from replica's storage")]
ProposalAlreadyPruned,
/// Reproposal with an unnecessary payload.
#[error("reproposal with an unnecessary payload")]
ReproposalWithPayload,
/// Block proposal payload missing.
#[error("block proposal payload missing")]
MissingPayload,
Expand Down Expand Up @@ -140,7 +143,19 @@ impl StateMachine {
// both the block and the commit QC) it broadcasts the finalized block (this
// was meant to propagate the block to full nodes, but of course validators
// will end up receiving it as well).
Some(hash) => hash,
Some(hash) => {
// We check that the leader didn't send a payload with the reproposal.
// This isn't technically needed for the consensus to work (it will remain
// safe and live), but it's a good practice to avoid unnecessary data in
// blockchain.
// This unnecessary payload would also effectively be a source of free
// data availability, which the leaders would be incentivized to abuse.
if message.proposal_payload.is_some() {
return Err(Error::ReproposalWithPayload);
};

hash
}
// This is a new proposal, so we need to verify it (i.e. execute it).
None => {
// Check that the payload is present.
Expand Down
9 changes: 3 additions & 6 deletions node/actors/bft/src/chonky_bft/proposer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::{io::OutputMessage, metrics, Config};
use std::sync::Arc;
use zksync_concurrency::{ctx, error::Wrap as _, sync, time};
use zksync_concurrency::{ctx, error::Wrap as _, sync};
use zksync_consensus_network::io::ConsensusInputMessage;
use zksync_consensus_roles::validator;

/// Timeout for creating a proposal. If the proposal is not created in this time, the proposer
/// will quit trying to create a proposal for this view. This can be different from the replica
/// timeout for the whole view.
pub(crate) const PROPOSAL_CREATION_TIMEOUT: time::Duration = time::Duration::milliseconds(2000);
use super::VIEW_TIMEOUT_DURATION;

/// The proposer loop is responsible for proposing new blocks to the network. It watches for new
/// justifications from the replica and if it is the leader for the view, it proposes a new block.
Expand All @@ -31,7 +28,7 @@ pub(crate) async fn run_proposer(

// Create a proposal for the given justification, within the timeout.
let proposal = match create_proposal(
&ctx.with_timeout(PROPOSAL_CREATION_TIMEOUT),
&ctx.with_timeout(VIEW_TIMEOUT_DURATION),
cfg.clone(),
justification,
)
Expand Down
28 changes: 28 additions & 0 deletions node/actors/bft/src/chonky_bft/tests/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,34 @@ async fn proposal_pruned_block() {
.unwrap();
}

#[tokio::test]
async fn proposal_reproposal_with_payload() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
s.spawn_bg(runner.run(ctx));

util.new_replica_commit(ctx).await;
let replica_timeout = util.new_replica_timeout(ctx).await;
util.process_replica_timeout_all(ctx, replica_timeout).await;

let mut proposal = util.new_leader_proposal(ctx).await;
assert!(proposal.proposal_payload.is_none());
proposal.proposal_payload = Some(ctx.rng().gen());

let res = util
.process_leader_proposal(ctx, util.leader_key().sign_msg(proposal))
.await;

assert_matches!(res, Err(proposal::Error::ReproposalWithPayload));

Ok(())
})
.await
.unwrap();
}

#[tokio::test]
async fn proposal_missing_payload() {
zksync_concurrency::testonly::abort_on_panic();
Expand Down
7 changes: 1 addition & 6 deletions node/actors/bft/src/chonky_bft/timeout.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::StateMachine;
use crate::metrics;
use std::{cmp::max, collections::HashSet};
use zksync_concurrency::{ctx, error::Wrap, time};
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_network::io::ConsensusInputMessage;
use zksync_consensus_roles::validator;

Expand Down Expand Up @@ -181,11 +181,6 @@ impl StateMachine {
tracing::info!("Timed out at view {}", self.view_number);
metrics::METRICS.replica_view_number.set(self.view_number.0);

// Reset the timeout. This makes us keep sending timeout messages until the consensus progresses.
// However, this isn't strictly necessary since the network retries messages until they are delivered.
// This is just an extra safety measure.
self.view_timeout = time::Deadline::Finite(ctx.now() + Self::VIEW_TIMEOUT_DURATION);

Ok(())
}
}
8 changes: 6 additions & 2 deletions spec/informal-spec/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,12 @@ impl ReplicaState {
// was meant to propagate the block to full nodes, but of course validators
// will end up receiving it as well).

// For sanity reasons, we'll check that there's no block in the proposal.
// But this check is completely unnecessary (in theory at least).
// We check that the leader didn't send a payload with the reproposal.
// This isn't technically needed for the consensus to work (it will remain
// safe and live), but it's a good practice to avoid unnecessary data in
// blockchain.
// This unnecessary payload would also effectively be a source of free
// data availability, which the leaders would be incentivized to abuse.
assert!(proposal.block.is_none());

hash
Expand Down

0 comments on commit 77b1a0f

Please sign in to comment.