Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

made bft actor send LeaderPrepare messages asynchronously (BFT-400) #52

Merged
merged 9 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions node/actors/bft/src/inner.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
//! The inner data of the consensus state machine. This is shared between the different roles.

use crate::{
io::{InputMessage, OutputMessage},
misc,
};
use crate::{io::OutputMessage, misc};
use tracing::instrument;
use zksync_concurrency::ctx::channel;
use zksync_consensus_roles::validator;
use zksync_consensus_utils::pipe::ActorPipe;

/// The ConsensusInner struct, it contains data to be shared with the state machines. This is never supposed
/// to be modified, except by the Consensus struct.
#[derive(Debug)]
pub(crate) struct ConsensusInner {
/// The communication pipe. This is used to receive inputs and send outputs.
pub(crate) pipe: ActorPipe<InputMessage, OutputMessage>,
/// The communication pipe. This is used to send outputs.
pub(crate) pipe: channel::UnboundedSender<OutputMessage>,
/// The validator's secret key.
pub(crate) secret_key: validator::SecretKey,
/// A vector of public keys for all the validators in the network.
Expand Down
85 changes: 27 additions & 58 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::StateMachine;
use crate::{inner::ConsensusInner, metrics, Consensus};
use crate::metrics;
use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
Expand All @@ -22,9 +23,6 @@ pub(crate) enum Error {
/// Signer of the message.
signer: validator::PublicKey,
},
/// Unexpected proposal.
#[error("unexpected proposal")]
UnexpectedProposal,
/// Past view or phase.
#[error("past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")]
Old {
Expand All @@ -42,17 +40,6 @@ pub(crate) enum Error {
/// Existing message from the same replica.
existing_message: validator::ReplicaCommit,
},
/// Number of received messages is below threshold.
#[error(
"number of received messages is below threshold. waiting for more (received: {num_messages:?}, \
threshold: {threshold:?}"
)]
NumReceivedBelowThreshold {
/// Number of received messages.
num_messages: usize,
/// Threshold for message count.
threshold: usize,
},
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
InvalidSignature(#[source] validator::Error),
Expand All @@ -63,7 +50,6 @@ impl StateMachine {
pub(crate) fn process_replica_commit(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
signed_message: validator::Signed<validator::ReplicaCommit>,
) -> Result<(), Error> {
// ----------- Checking origin of the message --------------
Expand All @@ -73,15 +59,15 @@ impl StateMachine {
let author = &signed_message.key;

// Check protocol version compatibility.
if !Consensus::PROTOCOL_VERSION.compatible(&message.protocol_version) {
if !crate::PROTOCOL_VERSION.compatible(&message.protocol_version) {
return Err(Error::IncompatibleProtocolVersion {
message_version: message.protocol_version,
local_version: Consensus::PROTOCOL_VERSION,
local_version: crate::PROTOCOL_VERSION,
});
}

// Check that the message signer is in the validator set.
consensus
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
Expand All @@ -97,7 +83,7 @@ impl StateMachine {
}

// If the message is for a view when we are not a leader, we discard it.
if consensus.view_leader(message.view) != consensus.secret_key.public() {
if self.inner.view_leader(message.view) != self.inner.secret_key.public() {
return Err(Error::NotLeaderInView);
}

Expand All @@ -117,33 +103,24 @@ impl StateMachine {
// Check the signature on the message.
signed_message.verify().map_err(Error::InvalidSignature)?;

// ----------- Checking the contents of the message --------------

// We only accept replica commit messages for proposals that we have cached. That's so
// we don't need to store replica commit messages for different proposals.
if self.block_proposal_cache != Some(message.proposal) {
return Err(Error::UnexpectedProposal);
}

// ----------- All checks finished. Now we process the message. --------------

// We store the message in our cache.
self.commit_message_cache
.entry(message.view)
.or_default()
.insert(author.clone(), signed_message);
let cache_entry = self.commit_message_cache.entry(message.view).or_default();
cache_entry.insert(author.clone(), signed_message);

// Now we check if we have enough messages to continue.
let num_messages = self.commit_message_cache.get(&message.view).unwrap().len();

if num_messages < consensus.threshold() {
return Err(Error::NumReceivedBelowThreshold {
num_messages,
threshold: consensus.threshold(),
});
let mut by_proposal: HashMap<_, Vec<_>> = HashMap::new();
for msg in cache_entry.values() {
by_proposal.entry(msg.msg.proposal).or_default().push(msg);
}

debug_assert!(num_messages == consensus.threshold());
let Some((_, replica_messages)) = by_proposal
.into_iter()
.find(|(_, v)| v.len() >= self.inner.threshold())
else {
return Ok(());
};
debug_assert_eq!(replica_messages.len(), self.inner.threshold());

// ----------- Update the state machine --------------

Expand All @@ -157,37 +134,29 @@ impl StateMachine {

// ----------- Prepare our message and send it. --------------

// Get all the replica commit messages for this view. Note that we consume the
// messages here. That's purposeful, so that we don't create a new leader commit
// for this same view if we receive another replica commit message after this.
let replica_messages = self
.commit_message_cache
.remove(&message.view)
.unwrap()
.values()
.cloned()
.collect::<Vec<_>>();

// Create the justification for our message.
let justification = validator::CommitQC::from(&replica_messages, &consensus.validator_set)
.expect("Couldn't create justification from valid replica messages!");
let justification = validator::CommitQC::from(
&replica_messages.into_iter().cloned().collect::<Vec<_>>(),
&self.inner.validator_set,
)
.expect("Couldn't create justification from valid replica messages!");

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
message: consensus
message: self
.inner
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderCommit(
validator::LeaderCommit {
protocol_version: Consensus::PROTOCOL_VERSION,
protocol_version: crate::PROTOCOL_VERSION,
justification,
},
)),
recipient: Target::Broadcast,
};
consensus.pipe.send(output_message.into());
self.inner.pipe.send(output_message.into());

// Clean the caches.
self.block_proposal_cache = None;
self.prepare_message_cache.retain(|k, _| k >= &self.view);
self.commit_message_cache.retain(|k, _| k >= &self.view);

Expand Down
108 changes: 12 additions & 96 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use super::StateMachine;
use crate::{inner::ConsensusInner, metrics, Consensus};
use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, ProtocolVersion};

/// Errors that can occur when processing a "replica prepare" message.
Expand Down Expand Up @@ -40,17 +37,6 @@ pub(crate) enum Error {
/// Existing message from the same replica.
existing_message: validator::ReplicaPrepare,
},
/// Number of received messages is below a threshold.
#[error(
"number of received messages below threshold. waiting for more (received: {num_messages:?}, \
threshold: {threshold:?}"
)]
NumReceivedBelowThreshold {
/// Number of received messages.
num_messages: usize,
/// Threshold for message count.
threshold: usize,
},
/// High QC of a future view.
#[error(
"high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}"
Expand Down Expand Up @@ -89,7 +75,6 @@ impl StateMachine {
pub(crate) async fn process_replica_prepare(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
signed_message: validator::Signed<validator::ReplicaPrepare>,
) -> Result<(), Error> {
// ----------- Checking origin of the message --------------
Expand All @@ -99,15 +84,15 @@ impl StateMachine {
let author = &signed_message.key;

// Check protocol version compatibility.
if !Consensus::PROTOCOL_VERSION.compatible(&message.protocol_version) {
if !crate::PROTOCOL_VERSION.compatible(&message.protocol_version) {
return Err(Error::IncompatibleProtocolVersion {
message_version: message.protocol_version,
local_version: Consensus::PROTOCOL_VERSION,
local_version: crate::PROTOCOL_VERSION,
});
}

// Check that the message signer is in the validator set.
consensus
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
Expand All @@ -123,7 +108,7 @@ impl StateMachine {
}

// If the message is for a view when we are not a leader, we discard it.
if consensus.view_leader(message.view) != consensus.secret_key.public() {
if self.inner.view_leader(message.view) != self.inner.secret_key.public() {
return Err(Error::NotLeaderInView);
}

Expand All @@ -148,7 +133,7 @@ impl StateMachine {
// Verify the high QC.
message
.high_qc
.verify(&consensus.validator_set, consensus.threshold())
.verify(&self.inner.validator_set, self.inner.threshold())
.map_err(Error::InvalidHighQC)?;

// If the high QC is for a future view, we discard the message.
Expand All @@ -172,15 +157,10 @@ impl StateMachine {
// Now we check if we have enough messages to continue.
let num_messages = self.prepare_message_cache.get(&message.view).unwrap().len();

if num_messages < consensus.threshold() {
return Err(Error::NumReceivedBelowThreshold {
num_messages,
threshold: consensus.threshold(),
});
if num_messages < self.inner.threshold() {
return Ok(());
}

// ----------- Creating the block proposal --------------

// Get all the replica prepare messages for this view. Note that we consume the
// messages here. That's purposeful, so that we don't create a new block proposal
// for this same view if we receive another replica prepare message after this.
Expand All @@ -191,83 +171,19 @@ impl StateMachine {
.into_values()
.collect();

debug_assert!(num_messages == consensus.threshold());

// Get the highest block voted for and check if there's a quorum of votes for it. To have a quorum
// in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas.
let mut count: HashMap<_, usize> = HashMap::new();

for vote in replica_messages.iter() {
*count.entry(vote.msg.high_vote.proposal).or_default() += 1;
}

let highest_vote: Option<validator::BlockHeader> = count
.iter()
// We only take one value from the iterator because there can only be at most one block with a quorum of 2f+1 votes.
.find(|(_, v)| **v > 2 * consensus.faulty_replicas())
.map(|(h, _)| h)
.cloned();

// Get the highest CommitQC.
let highest_qc: &validator::CommitQC = replica_messages
.iter()
.map(|s| &s.msg.high_qc)
.max_by_key(|qc| qc.message.view)
.unwrap();

// Create the block proposal to send to the replicas,
// and the commit vote to store in our block proposal cache.
let (proposal, payload) = match highest_vote {
// The previous block was not finalized, so we need to propose it again.
// For this we only need the header, since we are guaranteed that at least
// f+1 honest replicas have the block can broadcast when finalized
// (2f+1 have stated that they voted for the block, at most f are malicious).
Some(proposal) if proposal != highest_qc.message.proposal => (proposal, None),
// The previous block was finalized, so we can propose a new block.
_ => {
let payload = self
.payload_source
.propose(ctx, highest_qc.message.proposal.number.next())
.await?;
metrics::METRICS
.leader_proposal_payload_size
.observe(payload.0.len());
let proposal =
validator::BlockHeader::new(&highest_qc.message.proposal, payload.hash());
(proposal, Some(payload))
}
};
debug_assert_eq!(num_messages, self.inner.threshold());

// ----------- Update the state machine --------------

self.view = message.view;
self.phase = validator::Phase::Commit;
self.phase_start = ctx.now();
self.block_proposal_cache = Some(proposal);

// ----------- Prepare our message and send it --------------

// Create the justification for our message.
let justification = validator::PrepareQC::from(&replica_messages, &consensus.validator_set)
.expect("Couldn't create justification from valid replica messages!");

// Broadcast the leader prepare message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
message: consensus
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderPrepare(
validator::LeaderPrepare {
protocol_version: Consensus::PROTOCOL_VERSION,
view: self.view,
proposal,
proposal_payload: payload,
justification,
},
)),
recipient: Target::Broadcast,
};
consensus.pipe.send(output_message.into());

let justification =
validator::PrepareQC::from(&replica_messages, &self.inner.validator_set)
.expect("Couldn't create justification from valid replica messages!");
self.prepare_qc.send_replace(Some(justification));
Ok(())
}
}
Loading
Loading