Skip to content

Commit

Permalink
Post-merge fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
moshababo committed Dec 19, 2023
1 parent 4ec0a92 commit c3a5d91
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 136 deletions.
31 changes: 12 additions & 19 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, CommitQCBuilder, ProtocolVersion};
use zksync_consensus_roles::validator::{self, CommitQC, ProtocolVersion};

/// Errors that can occur when processing a "replica commit" message.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -67,12 +67,13 @@ impl StateMachine {
}

// Check that the message signer is in the validator set.
let validator_index = self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
let validator_index =
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Commit) < (self.view, self.phase) {
Expand Down Expand Up @@ -106,9 +107,9 @@ impl StateMachine {
// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.commit_qc
self.commit_qcs
.entry(message.view)
.or_insert(CommitQCBuilder::new(message, consensus.validator_set.len()))
.or_insert(CommitQC::new(message, self.inner.validator_set.len()))
.add(&signed_message.sig, validator_index);

// We store the message in our cache.
Expand Down Expand Up @@ -140,20 +141,12 @@ impl StateMachine {

// ----------- Prepare our message and send it. --------------

// TODO: post-merge fixes
// // Create the justification for our message.
// let justification = validator::CommitQC::from(
// &replica_messages.into_iter().cloned().collect::<Vec<_>>(),
// &self.inner.validator_set,
// )
// .expect("Couldn't create justification from valid replica messages!");

// // Remove replica commit messages for this view, so that we don't create a new leader commit
// Remove replica commit messages for this view, so that we don't create a new leader commit
// for this same view if we receive another replica commit message after this.
self.commit_message_cache.remove(&message.view);

// Consume the incrementally-constructed QC for this view.
let justification = self.commit_qc.remove(&message.view).unwrap().take();
let justification = self.commit_qcs.remove(&message.view).unwrap();

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
Expand Down
38 changes: 17 additions & 21 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use super::StateMachine;
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_roles::validator::{self, ProtocolVersion};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, PrepareQCBuilder, ProtocolVersion};
use zksync_consensus_roles::validator::{self, PrepareQC, ProtocolVersion};

/// Errors that can occur when processing a "replica prepare" message.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -94,12 +92,13 @@ impl StateMachine {
}

// Check that the message signer is in the validator set.
let validator_index = self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
let validator_index =
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Prepare) < (self.view, self.phase) {
Expand Down Expand Up @@ -151,10 +150,13 @@ impl StateMachine {
// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.prepare_qc
self.prepare_qcs
.entry(message.view)
.or_insert(PrepareQCBuilder::new(consensus.validator_set.len()))
.add(&signed_message, validator_index);
.or_insert(PrepareQC::new())
.add(
&signed_message,
(validator_index, self.inner.validator_set.len()),
);

// We store the message in our cache.
self.prepare_message_cache
Expand All @@ -169,15 +171,9 @@ impl StateMachine {
return Ok(());
}

// Get all the replica prepare messages for this view. Note that we consume the
// messages here. That's purposeful, so that we don't create a new block proposal
// Remove replica prepare messages for this view, so that we don't create a new block proposal
// for this same view if we receive another replica prepare message after this.
let replica_messages: Vec<_> = self
.prepare_message_cache
.remove(&message.view)
.unwrap()
.into_values()
.collect();
self.prepare_message_cache.remove(&message.view);

debug_assert_eq!(num_messages, self.inner.threshold());

Expand All @@ -188,7 +184,7 @@ impl StateMachine {
self.phase_start = ctx.now();

// Consume the incrementally-constructed QC for this view.
let justification = self.prepare_qc.remove(&message.view).unwrap().take();
let justification = self.prepare_qcs.remove(&message.view).unwrap();

self.prepare_qc.send_replace(Some(justification));
Ok(())
Expand Down
23 changes: 10 additions & 13 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use std::{
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::{
validator,
validator::{CommitQCBuilder, PrepareQCBuilder},
};
use zksync_consensus_roles::validator::{self, CommitQC, PrepareQC};

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
Expand All @@ -31,17 +28,17 @@ pub(crate) struct StateMachine {
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaPrepare>>,
>,
/// Prepare QC builders indexed by view number.
pub(crate) prepare_qc: BTreeMap<validator::ViewNumber, PrepareQCBuilder>,
/// Prepare QCs indexed by view number.
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, PrepareQC>,
/// Newest prepare QC composed from the `ReplicaPrepare` messages.
pub(crate) prepare_qc: sync::watch::Sender<Option<PrepareQC>>,
/// A cache of replica commit messages indexed by view number and validator.
pub(crate) commit_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaCommit>>,
>,
/// Newest quorum certificate composed from the `ReplicaPrepare` messages.
pub(crate) prepare_qc: sync::watch::Sender<Option<validator::PrepareQC>>,
/// Commit QC builders indexed by view number.
pub(crate) commit_qc: BTreeMap<validator::ViewNumber, CommitQCBuilder>,
/// Commit QCs indexed by view number.
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, CommitQC>,
}

impl StateMachine {
Expand All @@ -54,10 +51,10 @@ impl StateMachine {
phase: validator::Phase::Prepare,
phase_start: ctx.now(),
prepare_message_cache: BTreeMap::new(),
prepare_qc: BTreeMap::new(),
prepare_qcs: BTreeMap::new(),
commit_message_cache: BTreeMap::new(),
prepare_qc: sync::watch::channel(None).0,
commit_qc: BTreeMap::new(),
commit_qcs: BTreeMap::new(),
}
}

Expand Down Expand Up @@ -111,7 +108,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
inner: &ConsensusInner,
payload_source: &dyn PayloadSource,
mut prepare_qc: sync::watch::Receiver<Option<validator::PrepareQC>>,
mut prepare_qc: sync::watch::Receiver<Option<PrepareQC>>,
) -> ctx::Result<()> {
let mut next_view = validator::ViewNumber(0);
loop {
Expand Down
123 changes: 47 additions & 76 deletions node/libs/roles/src/validator/messages/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use super::{BlockHeader, Msg, Payload, Signed};
use crate::{validator, validator::Signature};
use anyhow::bail;
use bit_vec::BitVec;
use std::{
collections::{BTreeMap, BTreeSet},
mem,
};
use std::collections::{BTreeMap, BTreeSet};
use zksync_consensus_utils::enum_util::{BadVariantError, Variant};

/// Version of the consensus algorithm that the validator is using.
Expand Down Expand Up @@ -172,49 +169,6 @@ pub struct LeaderCommit {
pub justification: CommitQC,
}

/// Utility for incrementally constructing a `PrepareQC` instance .
pub struct PrepareQCBuilder {
instance: PrepareQC,
val_set_size: usize,
}

impl PrepareQCBuilder {
/// Create a new builder instance for a given validator set size.
pub fn new(val_set_size: usize) -> Self {
Self {
instance: Default::default(),
val_set_size,
}
}

/// Add a validator's signed message to the `PrepareQC`.
pub fn add(&mut self, signed_message: &Signed<ReplicaPrepare>, validator_index: usize) {
// TODO: refactor to cleaner code
if self.instance.map.contains_key(&signed_message.msg) {
self.instance
.map
.get_mut(&signed_message.msg)
.unwrap()
.0
.set(validator_index, true);
} else {
let mut bit_vec = BitVec::from_elem(self.val_set_size, false);
bit_vec.set(validator_index, true);

self.instance
.map
.insert(signed_message.msg.clone(), Signers(bit_vec));
}

self.instance.signature.add(&signed_message.sig);
}

/// Take the constructed `PrepareQC`, consuming the builder in the process.
pub fn take(&mut self) -> PrepareQC {
mem::take(&mut self.instance)
}
}

/// A quorum certificate of replica Prepare messages. Since not all Prepare messages are
/// identical (they have different high blocks and high QCs), we need to keep the high blocks
/// and high QCs in a map. We can still aggregate the signatures though.
Expand All @@ -227,6 +181,11 @@ pub struct PrepareQC {
}

impl PrepareQC {
/// Create a new empty instance. (added on top of `Default` for compatibility with `CommitQC`.)
pub fn new() -> Self {
Default::default()
}

/// View of the QC.
pub fn view(&self) -> ViewNumber {
self.map
Expand All @@ -236,6 +195,32 @@ impl PrepareQC {
.unwrap_or(ViewNumber(0))
}

/// Add a validator's signed message.
/// * `signed_message` - A signed message.
/// * `validator_index` - A tuple containing the index of the validator and the total size of the set.
pub fn add(
&mut self,
signed_message: &Signed<ReplicaPrepare>,
validator_index: (usize, usize),
) {
// TODO: refactor to cleaner code
if self.map.contains_key(&signed_message.msg) {
self.map
.get_mut(&signed_message.msg)
.unwrap()
.0
.set(validator_index.0, true);
} else {
let mut bit_vec = BitVec::from_elem(validator_index.1, false);
bit_vec.set(validator_index.0, true);

self.map
.insert(signed_message.msg.clone(), Signers(bit_vec));
}

self.signature.add(&signed_message.sig);
}

/// Verifies the integrity of the PrepareQC.
pub fn verify(
&self,
Expand Down Expand Up @@ -299,35 +284,6 @@ impl PrepareQC {
}
}

/// Utility for incrementally constructing a `CommitQC` instance .
pub struct CommitQCBuilder {
instance: CommitQC,
}

impl CommitQCBuilder {
/// Create a new builder instance for a given `ReplicaCommit` message and a validator set size.
pub fn new(message: ReplicaCommit, validator_set_size: usize) -> Self {
Self {
instance: CommitQC {
message,
signers: Signers(BitVec::from_elem(validator_set_size, false)),
signature: Default::default(),
},
}
}

/// Add a validator's signature to the `CommitQC`.
pub fn add(&mut self, sig: &Signature, validator_index: usize) {
self.instance.signers.0.set(validator_index, true);
self.instance.signature.add(sig);
}

/// Take the constructed `CommitQC`.
pub fn take(&mut self) -> CommitQC {
self.instance.clone()
}
}

/// A Commit Quorum Certificate. It is an aggregate of signed replica Commit messages.
/// The Quorum Certificate is supposed to be over identical messages, so we only need one message.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
Expand All @@ -341,6 +297,21 @@ pub struct CommitQC {
}

impl CommitQC {
/// Create a new empty instance for a given `ReplicaCommit` message and a validator set size.
pub fn new(message: ReplicaCommit, validator_set_size: usize) -> Self {
Self {
message,
signers: Signers(BitVec::from_elem(validator_set_size, false)),
signature: Default::default(),
}
}

/// Add a validator's signature.
pub fn add(&mut self, sig: &Signature, validator_index: usize) {
self.signers.0.set(validator_index, true);
self.signature.add(sig);
}

/// Verifies the signature of the CommitQC.
pub fn verify(&self, validators: &ValidatorSet, threshold: usize) -> anyhow::Result<()> {
let signers = self.signers.0.clone();
Expand Down
Loading

0 comments on commit c3a5d91

Please sign in to comment.