Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ChonkyBFT logic #210

Merged
merged 21 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,43 +1,58 @@
//! Handler of a ReplicaCommit message.

use super::StateMachine;
use crate::metrics;
use std::collections::HashSet;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_concurrency::{ctx, error::Wrap, metrics::LatencyHistogramExt as _};
use zksync_consensus_roles::validator;

/// Errors that can occur when processing a "replica commit" message.
/// Errors that can occur when processing a ReplicaCommit message.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
/// Message signer isn't part of the validator set.
#[error("Message signer isn't part of the validator set (signer: {signer:?})")]
#[error("message signer isn't part of the validator set (signer: {signer:?})")]
NonValidatorSigner {
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
/// Past view or phase.
#[error("past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")]
/// Past view.
#[error("past view (current view: {current_view:?})")]
Old {
/// Current view.
current_view: validator::ViewNumber,
/// Current phase.
current_phase: validator::Phase,
},
/// The processing node is not a lead for this message's view.
#[error("we are not a leader for this message's view")]
NotLeaderInView,
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] validator::ReplicaCommitVerifyError),
/// Duplicate signer.
#[error("duplicate signer (message view: {message_view:?}, signer: {signer:?})")]
DuplicateSigner {
/// View number of the message.
message_view: validator::ViewNumber,
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
InvalidSignature(#[source] anyhow::Error),
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] validator::ReplicaCommitVerifyError),
/// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable.
#[error(transparent)]
Internal(#[from] ctx::Error),
}

impl Wrap for Error {
fn with_wrap<C: std::fmt::Display + Send + Sync + 'static, F: FnOnce() -> C>(
self,
f: F,
) -> Self {
match self {
Error::Internal(err) => Error::Internal(err.with_wrap(f)),
err => err,
}
}
}

impl StateMachine {
/// Processes `ReplicaCommit` message.
pub(crate) fn process_replica_commit(
/// Processes a ReplicaCommit message.
pub(crate) async fn on_commit(
&mut self,
ctx: &ctx::Ctx,
signed_message: validator::Signed<validator::ReplicaCommit>,
Expand All @@ -55,25 +70,21 @@ impl StateMachine {
});
}

// If the message is from the "past", we discard it.
// That is, it's from a previous view or phase, or if we already received a message
// from the same validator and for the same view.
if (message.view.number, validator::Phase::Commit) < (self.view, self.phase)
|| self
.replica_commit_views
.get(author)
.is_some_and(|view_number| *view_number >= message.view.number)
{
// If the message is from a past view, ignore it.
if message.view.number < self.view_number {
return Err(Error::Old {
current_view: self.view,
current_phase: self.phase,
current_view: self.view_number,
});
}

// If the message is for a view when we are not a leader, we discard it.
if self.config.genesis().view_leader(message.view.number) != self.config.secret_key.public()
{
return Err(Error::NotLeaderInView);
// If we already have a message from the same validator for the same or past view, ignore it.
if let Some(&view) = self.commit_views_cache.get(author) {
if view >= message.view.number {
return Err(Error::DuplicateSigner {
message_view: message.view.number,
signer: author.clone().into(),
});
}
}

// ----------- Checking the signed part of the message --------------
Expand All @@ -89,7 +100,7 @@ impl StateMachine {

// We add the message to the incrementally-constructed QC.
let commit_qc = self
.commit_qcs
.commit_qcs_cache
.entry(message.view.number)
.or_default()
.entry(message.clone())
Expand All @@ -98,58 +109,52 @@ impl StateMachine {
// Should always succeed as all checks have been already performed
commit_qc
.add(&signed_message, self.config.genesis())
.expect("Could not add message to CommitQC");
.expect("could not add message to CommitQC");

// Calculate the CommitQC signers weight.
let weight = self.config.genesis().validators.weight(&commit_qc.signers);

// Update commit message current view number for author
self.replica_commit_views
// Update view number of last commit message for author
self.commit_views_cache
.insert(author.clone(), message.view.number);

// Clean up commit_qcs for the case that no replica is at the view
// of a given CommitQC
// of a given CommitQC.
// This prevents commit_qcs map from growing indefinitely in case some
// malicious replica starts spamming messages for future views
let active_views: HashSet<_> = self.replica_commit_views.values().collect();
self.commit_qcs
// malicious replica starts spamming messages for future views.
let active_views: HashSet<_> = self.commit_views_cache.values().collect();
self.commit_qcs_cache
.retain(|view_number, _| active_views.contains(view_number));

// Now we check if we have enough weight to continue.
// Now we check if we have enough weight to continue. If not, we wait for more messages.
if weight < self.config.genesis().validators.quorum_threshold() {
return Ok(());
};

// ----------- Update the state machine --------------
let now = ctx.now();
metrics::METRICS
.leader_commit_phase_latency
.observe_latency(now - self.phase_start);
self.view = message.view.number.next();
self.phase = validator::Phase::Prepare;
self.phase_start = now;
// ----------- We have a QC. Now we process it. --------------

// ----------- Prepare our message and send it. --------------

// Consume the incrementally-constructed QC for this view.
let justification = self
.commit_qcs
// Consume the created commit QC for this view.
let commit_qc = self
.commit_qcs_cache
.remove(&message.view.number)
.unwrap()
.remove(message)
.unwrap();

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
message: self
.config
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderCommit(
validator::LeaderCommit { justification },
)),
recipient: Target::Broadcast,
};
self.outbound_pipe.send(output_message.into());
// We update our state with the new commit QC.
self.process_commit_qc(ctx, &commit_qc)
.await
.wrap("process_commit_qc()")?;

// Metrics.
let now = ctx.now();
metrics::METRICS
.leader_commit_phase_latency
.observe_latency(now - self.phase_start);
self.phase_start = now;

// Start a new view.
self.start_new_view(ctx, message.view.number.next()).await?;

Ok(())
}
Expand Down
105 changes: 105 additions & 0 deletions node/actors/bft/src/chonky_bft/misc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use super::StateMachine;
brunoffranca marked this conversation as resolved.
Show resolved Hide resolved
use std::cmp::max;
use zksync_concurrency::{ctx, error::Wrap as _};
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

impl StateMachine {
/// Makes a justification (for a ReplicaNewView or a LeaderProposal) based on the current state.
pub(crate) fn get_justification(&self) -> validator::ProposalJustification {
// We need some QC in order to be able to create a justification.
// In fact, it should be impossible to get here without a QC. Because
// we only get here after starting a new view, which requires a QC.
assert!(self.high_commit_qc.is_some() || self.high_timeout_qc.is_some());

// We use the highest QC as the justification. If both have the same view, we use the CommitQC.
if self.high_commit_qc.as_ref().map(|x| x.view())
>= self.high_timeout_qc.as_ref().map(|x| &x.view)
{
validator::ProposalJustification::Commit(self.high_commit_qc.clone().unwrap())
} else {
validator::ProposalJustification::Timeout(self.high_timeout_qc.clone().unwrap())
}
}

/// Processes a (already verified) CommitQC. It bumps the local high_commit_qc and if
/// we have the proposal corresponding to this qc, we save the corresponding block to DB.
pub(crate) async fn process_commit_qc(
&mut self,
ctx: &ctx::Ctx,
qc: &validator::CommitQC,
) -> ctx::Result<()> {
self.high_commit_qc = max(Some(qc.clone()), self.high_commit_qc.clone());
self.save_block(ctx, qc).await.wrap("save_block()")
}

/// Tries to build a finalized block from the given CommitQC. We simply search our
/// block proposal cache for the matching block, and if we find it we build the block.
/// If this method succeeds, it sends the finalized block to the executor.
brunoffranca marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) async fn save_block(
&mut self,
ctx: &ctx::Ctx,
commit_qc: &validator::CommitQC,
) -> ctx::Result<()> {
let Some(cache) = self.block_proposal_cache.get(&commit_qc.header().number) else {
return Ok(());
};
let Some(payload) = cache.get(&commit_qc.header().payload) else {
return Ok(());
};
let block = validator::FinalBlock {
payload: payload.clone(),
justification: commit_qc.clone(),
};

tracing::info!(
"Finalized block {}: {:#?}",
block.header().number,
block.header().payload,
);
self.config
.block_store
.queue_block(ctx, block.clone().into())
.await?;

// For availability, replica should not proceed until it stores the block persistently.
// Rationale is that after save_block, there is start_new_view which prunes the
// cache. Without persisting this block, if all replicas crash just after
// start_new_view, the payload becomes unavailable.
self.config
.block_store
.wait_until_persisted(ctx, block.header().number)
.await?;

let number_metric = &crate::metrics::METRICS.finalized_block_number;
let current_number = number_metric.get();
number_metric.set(current_number.max(block.header().number.0));

Ok(())
}

/// Backups the replica state to DB.
pub(crate) async fn backup_state(&self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let mut proposals = vec![];
for (number, payloads) in &self.block_proposal_cache {
proposals.extend(payloads.values().map(|p| storage::Proposal {
number: *number,
payload: p.clone(),
}));
}
let backup = storage::ReplicaState {
view: self.view_number,
phase: self.phase,
high_vote: self.high_vote.clone(),
high_commit_qc: self.high_commit_qc.clone(),
high_timeout_qc: self.high_timeout_qc.clone(),
proposals,
};
self.config
.replica_store
.set_state(ctx, &backup)
.await
.wrap("put_replica_state")?;
brunoffranca marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}
}
Loading
Loading