Skip to content

Commit

Permalink
First pass on the bft actor.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Oct 22, 2024
1 parent 2b0b30c commit 015b364
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 145 deletions.
7 changes: 4 additions & 3 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ impl Config {
anyhow::ensure!(genesis.protocol_version == validator::ProtocolVersion::CURRENT);
genesis.verify().context("genesis().verify()")?;

// TODO: What about pruning???
if let Some(prev) = genesis.first_block.prev() {
tracing::info!("Waiting for the pre-genesis blocks to be persisted");
tracing::info!("Waiting for the pre-fork blocks to be persisted");
if let Err(ctx::Canceled) = self.block_store.wait_until_persisted(ctx, prev).await {
return Ok(());
}
Expand All @@ -95,8 +96,8 @@ impl Config {

tracing::info!("Starting consensus actor {:?}", cfg.secret_key.public());

// This is the infinite loop where the consensus actually runs. The validator waits for either
// a message from the network or for a timeout, and processes each accordingly.
// This is the infinite loop where the consensus actually runs. The validator waits for
// a message from the network and processes it accordingly.
loop {
async {
let InputMessage::Network(req) = pipe
Expand Down
4 changes: 2 additions & 2 deletions node/actors/bft/src/replica/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ impl StateMachine {
) -> ctx::Result<()> {
// Update high_qc.
if self
.high_qc
.high_commit_qc
.as_ref()
.map(|qc| qc.view().number < commit_qc.view().number)
.unwrap_or(true)
{
self.high_qc = Some(commit_qc.clone());
self.high_commit_qc = Some(commit_qc.clone());
}
// TODO(gprusak): for availability of finalized blocks,
// replicas should be able to broadcast highest quorums without
Expand Down
3 changes: 1 addition & 2 deletions node/actors/bft/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
mod block;
pub(crate) mod leader_commit;
pub(crate) mod leader_prepare;
mod new_view;
pub(crate) mod proposal;
pub(crate) mod replica_prepare;
mod state_machine;
#[cfg(test)]
mod tests;
mod timer;

pub(crate) use self::state_machine::StateMachine;
4 changes: 2 additions & 2 deletions node/actors/bft/src/replica/new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl StateMachine {
metrics::METRICS.replica_view_number.set(self.view.0);

self.phase = validator::Phase::Prepare;
if let Some(qc) = self.high_qc.as_ref() {
if let Some(qc) = self.high_commit_qc.as_ref() {
// Clear the block cache.
self.block_proposal_cache
.retain(|k, _| k > &qc.header().number);
Expand All @@ -34,7 +34,7 @@ impl StateMachine {
number: self.view,
},
high_vote: self.high_vote.clone(),
high_qc: self.high_qc.clone(),
high_qc: self.high_commit_qc.clone(),
},
)),
recipient: Target::Broadcast,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,11 @@
use super::StateMachine;
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, BlockNumber};
use zksync_consensus_roles::validator::{self, BlockHeader, BlockNumber};

/// Errors that can occur when processing a "leader prepare" message.
/// Errors that can occur when processing a LeaderProposal message.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
/// Invalid leader.
#[error(
"invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?})"
)]
InvalidLeader {
/// Correct leader.
correct_leader: validator::PublicKey,
/// Received leader.
received_leader: validator::PublicKey,
},
/// Message for a past view or phase.
#[error(
"message for a past view / phase (current view: {current_view:?}, current phase: {current_phase:?})"
Expand All @@ -27,15 +17,26 @@ pub(crate) enum Error {
/// Current phase.
current_phase: validator::Phase,
},
/// Invalid leader.
#[error(
"invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?})"
)]
InvalidLeader {
/// Correct leader.
correct_leader: validator::PublicKey,
/// Received leader.
received_leader: validator::PublicKey,
},
/// Leader proposed a block that was already pruned from replica's storage.
#[error("leader proposed a block that was already pruned from replica's storage")]
ProposalAlreadyPruned,
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
InvalidSignature(#[source] anyhow::Error),
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] validator::LeaderPrepareVerifyError),
/// Leader proposed a block that was already pruned from replica's storage.
#[error("leader proposed a block that was already pruned from replica's storage")]
ProposalAlreadyPruned,

/// Oversized payload.
#[error("block proposal with an oversized payload (payload size: {payload_size})")]
ProposalOversizedPayload {
Expand All @@ -44,7 +45,7 @@ pub(crate) enum Error {
},
/// Invalid payload.
#[error("invalid payload: {0:#}")]
ProposalInvalidPayload(#[source] anyhow::Error),
InvalidPayload(#[source] anyhow::Error),
/// Previous payload missing.
#[error("previous block proposal payload missing from store (block number: {prev_number})")]
MissingPreviousPayload {
Expand All @@ -69,11 +70,11 @@ impl Wrap for Error {
}

impl StateMachine {
/// Processes a leader prepare message.
pub(crate) async fn process_leader_prepare(
/// Processes a LeaderProposal message.
pub(crate) async fn on_proposal(
&mut self,
ctx: &ctx::Ctx,
signed_message: validator::Signed<validator::LeaderPrepare>,
signed_message: validator::Signed<validator::LeaderProposal>,
) -> Result<(), Error> {
// ----------- Checking origin of the message --------------

Expand All @@ -82,6 +83,15 @@ impl StateMachine {
let author = &signed_message.key;
let view = message.view().number;

// Check that the message is for the current view or a future view. We only allow proposals for
// the current view if we have not voted or timed out yet.
if view < self.view || (view == self.view && self.phase != validator::Phase::Prepare) {
return Err(Error::Old {
current_view: self.view,
current_phase: self.phase,
});
}

// Check that it comes from the correct leader.
let leader = self.config.genesis().view_leader(view);
if author != &leader {
Expand All @@ -91,96 +101,118 @@ impl StateMachine {
});
}

// If the message is from the "past", we discard it.
if (view, validator::Phase::Prepare) < (self.view, self.phase) {
return Err(Error::Old {
current_view: self.view,
current_phase: self.phase,
});
}

// Replica MUSTN'T vote for blocks which have been already pruned for storage.
// (because it won't be able to persist and broadcast them once finalized).
// TODO(gprusak): it should never happen, we should add safety checks to prevent
// pruning blocks not known to be finalized.
if message.proposal.number < self.config.block_store.queued().first {
return Err(Error::ProposalAlreadyPruned);
}

// ----------- Checking the message --------------

signed_message.verify().map_err(Error::InvalidSignature)?;

message
.verify(self.config.genesis())
.map_err(Error::InvalidMessage)?;

let high_qc = message.justification.high_qc();
let (implied_block_number, implied_block_hash) =
message.justification.get_implied_block(self.genesis());

if let Some(high_qc) = high_qc {
// Try to create a finalized block with this CommitQC and our block proposal cache.
// This gives us another chance to finalize a block that we may have missed before.
self.save_block(ctx, high_qc).await.wrap("save_block()")?;
// Replica MUSTN'T vote for blocks which have been already pruned for storage.
// (because it won't be able to persist and broadcast them once finalized).
// TODO(gprusak): it should never happen, we should add safety checks to prevent
// pruning blocks not known to be finalized.
if implied_block_number < self.config.block_store.queued().first {
return Err(Error::ProposalAlreadyPruned);
}

// Check that the payload doesn't exceed the maximum size.
if let Some(payload) = &message.proposal_payload {
if payload.0.len() > self.config.max_payload_size {
return Err(Error::ProposalOversizedPayload {
payload_size: payload.0.len(),
});
}
let block_hash = match implied_block_hash {
// This is a reproposal. We let the leader repropose blocks without sending
// them in the proposal (it sends only the number + hash). That allows a
// leader to repropose a block without having it stored.
// It is an optimization that allows us to not wait for a leader that has
// the previous proposal stored (which can take 4f views), and to somewhat
// speed up reproposals by skipping block broadcast.
// This only saves time because we have a gossip network running in parallel,
// and any time a replica is able to create a finalized block (by possessing
// both the block and the commit QC) it broadcasts the finalized block (this
// was meant to propagate the block to full nodes, but of course validators
// will end up receiving it as well).
Some(hash) => hash,
// This is a new proposal, so we need to verify it (i.e. execute it).
None => {
// Check that the payload is present.
let Some(payload) = message.proposal_payload else {
return Err(Error::MissingPayload);
};

if payload.len() > self.config.max_payload_size {
return Err(Error::ProposalOversizedPayload {
payload_size: payload.len(),
});
}

if let Some(prev) = message.proposal.number.prev() {
// Defensively assume that PayloadManager cannot verify proposal until the previous block is stored.
self.config
.block_store
.wait_until_persisted(&ctx.with_deadline(self.timeout_deadline), prev)
// Note that it doesn't mean that the block is actually available, as old blocks might get pruned or
// we might just have started from a snapshot state. It just means that we have the state of the chain
// up to the previous block.
if let Some(prev) = implied_block_number.prev() {
self.config
.block_store
.wait_until_persisted(&ctx.with_deadline(self.timeout_deadline), prev)
.await
.map_err(|_| Error::MissingPreviousPayload { prev_number: prev })?;
}

// Execute the payload.
if let Err(err) = self
.config
.payload_manager
.verify(ctx, message.proposal.number, &payload)
.await
.map_err(|_| Error::MissingPreviousPayload { prev_number: prev })?;
{
return Err(match err {
ctx::Error::Internal(err) => Error::InvalidPayload(err),
err @ ctx::Error::Canceled(_) => Error::Internal(err),
});
}

// The proposal is valid. We cache it, waiting for it to be committed.
self.block_proposal_cache
.entry(implied_block_number)
.or_default()
.insert(payload.hash(), payload.clone());
}
if let Err(err) = self
.config
.payload_manager
.verify(ctx, message.proposal.number, payload)
.await
{
return Err(match err {
err @ ctx::Error::Canceled(_) => Error::Internal(err),
ctx::Error::Internal(err) => Error::ProposalInvalidPayload(err),
});
}
}
};

// ----------- All checks finished. Now we process the message. --------------

// Create our commit vote.
let commit_vote = validator::ReplicaCommit {
view: message.view().clone(),
proposal: message.proposal,
proposal: BlockHeader {
number: implied_block_number,
payload: block_hash,
},
};

// Update the state machine.
self.view = message.view().number;
self.phase = validator::Phase::Commit;
self.high_vote = Some(commit_vote.clone());
// If we received a new block proposal, store it in our cache.
if let Some(payload) = &message.proposal_payload {
self.block_proposal_cache
.entry(message.proposal.number)
.or_default()
.insert(payload.hash(), payload.clone());
}
match message.justification {
validator::ProposalJustification::Commit(qc) => self.process_commit_qc(qc),
validator::ProposalJustification::Timeout(qc) => {
if let Some(high_qc) = qc.high_qc() {
self.process_commit_qc(high_qc);
}
self.high_timeout_qc = Some(qc);
}
};

// Backup our state.
self.backup_state(ctx).await.wrap("backup_state()")?;

// Send the replica message to the leader.
// Broadcast our message.
let output_message = ConsensusInputMessage {
message: self
.config
.secret_key
.sign_msg(validator::ConsensusMsg::ReplicaCommit(commit_vote)),
recipient: Target::Validator(author.clone()),
};
self.outbound_pipe.send(output_message.into());

Expand Down
Loading

0 comments on commit 015b364

Please sign in to comment.