Skip to content

Commit

Permalink
feat: ChonkyBFT logic (#210)
Browse files Browse the repository at this point in the history
- Everything in chonky_bft folder is basically new.
- UTHarness was moved to chonky_bft/testonly.rs. Several changes to it
though.
- All tests that use UTHarness were moved to chonky_bft/tests. Some of
the old unit tests were repurposed, but a fair amount of the tests in
chonky_bft/tests are new.
- The tests in tests.rs were split between tests/mod.rs and
tests/twins.rs (except for a few that used UTHarness and were moved as
said before). There were no changes to them though.

Part of BFT-452
  • Loading branch information
brunoffranca authored Nov 1, 2024
1 parent f6168f8 commit 00f6214
Show file tree
Hide file tree
Showing 60 changed files with 3,560 additions and 4,123 deletions.
Original file line number Diff line number Diff line change
@@ -1,31 +1,17 @@
use super::StateMachine;
use zksync_concurrency::ctx;
use zksync_concurrency::{ctx, error::Wrap as _};
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

impl StateMachine {
/// Tries to build a finalized block from the given CommitQC. We simply search our
/// block proposal cache for the matching block, and if we find it we build the block.
/// If this method succeeds, it sends the finalized block to the executor.
/// It also updates the High QC in the replica state machine, if the received QC is
/// higher.
#[tracing::instrument(level = "debug", skip_all)]
/// If this method succeeds, it saves the finalized block to storage.
pub(crate) async fn save_block(
&mut self,
ctx: &ctx::Ctx,
commit_qc: &validator::CommitQC,
) -> ctx::Result<()> {
// Update high_qc.
if self
.high_qc
.as_ref()
.map(|qc| qc.view().number < commit_qc.view().number)
.unwrap_or(true)
{
self.high_qc = Some(commit_qc.clone());
}
// TODO(gprusak): for availability of finalized blocks,
// replicas should be able to broadcast highest quorums without
// the corresponding block (same goes for synchronization).
let Some(cache) = self.block_proposal_cache.get(&commit_qc.header().number) else {
return Ok(());
};
Expand All @@ -46,7 +32,11 @@ impl StateMachine {
.block_store
.queue_block(ctx, block.clone().into())
.await?;

// For availability, replica should not proceed until it stores the block persistently.
// Rationale is that after save_block, there is start_new_view which prunes the
// cache. Without persisting this block, if all replicas crash just after
// start_new_view, the payload becomes unavailable.
self.config
.block_store
.wait_until_persisted(ctx, block.header().number)
Expand All @@ -55,6 +45,32 @@ impl StateMachine {
let number_metric = &crate::metrics::METRICS.finalized_block_number;
let current_number = number_metric.get();
number_metric.set(current_number.max(block.header().number.0));

Ok(())
}

/// Backups the replica state to DB.
pub(crate) async fn backup_state(&self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let mut proposals = vec![];
for (number, payloads) in &self.block_proposal_cache {
proposals.extend(payloads.values().map(|p| storage::Proposal {
number: *number,
payload: p.clone(),
}));
}
let backup = storage::ReplicaState {
view: self.view_number,
phase: self.phase,
high_vote: self.high_vote.clone(),
high_commit_qc: self.high_commit_qc.clone(),
high_timeout_qc: self.high_timeout_qc.clone(),
proposals,
};
self.config
.replica_store
.set_state(ctx, &backup)
.await
.wrap("set_state()")?;
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -1,43 +1,59 @@
//! Handler of a ReplicaCommit message.
use super::StateMachine;
use crate::metrics;
use std::collections::HashSet;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_concurrency::{ctx, error::Wrap, metrics::LatencyHistogramExt as _};
use zksync_consensus_roles::validator;

/// Errors that can occur when processing a "replica commit" message.
/// Errors that can occur when processing a ReplicaCommit message.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
/// Message signer isn't part of the validator set.
#[error("Message signer isn't part of the validator set (signer: {signer:?})")]
#[error("message signer isn't part of the validator set (signer: {signer:?})")]
NonValidatorSigner {
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
/// Past view or phase.
#[error("past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")]
/// Past view.
#[error("past view (current view: {current_view:?})")]
Old {
/// Current view.
current_view: validator::ViewNumber,
/// Current phase.
current_phase: validator::Phase,
},
/// The processing node is not a lead for this message's view.
#[error("we are not a leader for this message's view")]
NotLeaderInView,
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] validator::ReplicaCommitVerifyError),
/// Duplicate signer. We already have a commit message from the same validator
/// for the same or past view.
#[error("duplicate signer (message view: {message_view:?}, signer: {signer:?})")]
DuplicateSigner {
/// View number of the message.
message_view: validator::ViewNumber,
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
InvalidSignature(#[source] anyhow::Error),
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] validator::ReplicaCommitVerifyError),
/// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable.
#[error(transparent)]
Internal(#[from] ctx::Error),
}

impl Wrap for Error {
fn with_wrap<C: std::fmt::Display + Send + Sync + 'static, F: FnOnce() -> C>(
self,
f: F,
) -> Self {
match self {
Error::Internal(err) => Error::Internal(err.with_wrap(f)),
err => err,
}
}
}

impl StateMachine {
/// Processes `ReplicaCommit` message.
pub(crate) fn process_replica_commit(
/// Processes a ReplicaCommit message.
pub(crate) async fn on_commit(
&mut self,
ctx: &ctx::Ctx,
signed_message: validator::Signed<validator::ReplicaCommit>,
Expand All @@ -55,25 +71,21 @@ impl StateMachine {
});
}

// If the message is from the "past", we discard it.
// That is, it's from a previous view or phase, or if we already received a message
// from the same validator and for the same view.
if (message.view.number, validator::Phase::Commit) < (self.view, self.phase)
|| self
.replica_commit_views
.get(author)
.is_some_and(|view_number| *view_number >= message.view.number)
{
// If the message is from a past view, ignore it.
if message.view.number < self.view_number {
return Err(Error::Old {
current_view: self.view,
current_phase: self.phase,
current_view: self.view_number,
});
}

// If the message is for a view when we are not a leader, we discard it.
if self.config.genesis().view_leader(message.view.number) != self.config.secret_key.public()
{
return Err(Error::NotLeaderInView);
// If we already have a message from the same validator for the same or past view, ignore it.
if let Some(&view) = self.commit_views_cache.get(author) {
if view >= message.view.number {
return Err(Error::DuplicateSigner {
message_view: message.view.number,
signer: author.clone().into(),
});
}
}

// ----------- Checking the signed part of the message --------------
Expand All @@ -89,7 +101,7 @@ impl StateMachine {

// We add the message to the incrementally-constructed QC.
let commit_qc = self
.commit_qcs
.commit_qcs_cache
.entry(message.view.number)
.or_default()
.entry(message.clone())
Expand All @@ -98,58 +110,51 @@ impl StateMachine {
// Should always succeed as all checks have been already performed
commit_qc
.add(&signed_message, self.config.genesis())
.expect("Could not add message to CommitQC");
.expect("could not add message to CommitQC");

// Calculate the CommitQC signers weight.
let weight = self.config.genesis().validators.weight(&commit_qc.signers);

// Update commit message current view number for author
self.replica_commit_views
// Update view number of last commit message for author
self.commit_views_cache
.insert(author.clone(), message.view.number);

// Clean up commit_qcs for the case that no replica is at the view
// of a given CommitQC
// of a given CommitQC.
// This prevents commit_qcs map from growing indefinitely in case some
// malicious replica starts spamming messages for future views
let active_views: HashSet<_> = self.replica_commit_views.values().collect();
self.commit_qcs
// malicious replica starts spamming messages for future views.
let active_views: HashSet<_> = self.commit_views_cache.values().collect();
self.commit_qcs_cache
.retain(|view_number, _| active_views.contains(view_number));

// Now we check if we have enough weight to continue.
// Now we check if we have enough weight to continue. If not, we wait for more messages.
if weight < self.config.genesis().validators.quorum_threshold() {
return Ok(());
};

// ----------- Update the state machine --------------
let now = ctx.now();
metrics::METRICS
.leader_commit_phase_latency
.observe_latency(now - self.phase_start);
self.view = message.view.number.next();
self.phase = validator::Phase::Prepare;
self.phase_start = now;

// ----------- Prepare our message and send it. --------------
// ----------- We have a QC. Now we process it. --------------

// Consume the incrementally-constructed QC for this view.
let justification = self
.commit_qcs
// Consume the created commit QC for this view.
let commit_qc = self
.commit_qcs_cache
.remove(&message.view.number)
.unwrap()
.remove(message)
.unwrap();

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
message: self
.config
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderCommit(
validator::LeaderCommit { justification },
)),
recipient: Target::Broadcast,
};
self.outbound_pipe.send(output_message.into());
// We update our state with the new commit QC.
self.process_commit_qc(ctx, &commit_qc)
.await
.wrap("process_commit_qc()")?;

// Metrics. We observe the latency of committing to a block measured
// from the start of this view.
metrics::METRICS
.commit_latency
.observe_latency(ctx.now() - self.view_start);

// Start a new view.
self.start_new_view(ctx, message.view.number.next()).await?;

Ok(())
}
Expand Down
Loading

0 comments on commit 00f6214

Please sign in to comment.