Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Construct quorum certificate incrementally (BFT-386) #51

Merged
merged 11 commits into from
Jan 4, 2024
33 changes: 20 additions & 13 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, ProtocolVersion};
use zksync_consensus_roles::validator::{self, CommitQC, ProtocolVersion};

/// Errors that can occur when processing a "replica commit" message.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -67,12 +67,13 @@ impl StateMachine {
}

// Check that the message signer is in the validator set.
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
let validator_index =
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Commit) < (self.view, self.phase) {
Expand Down Expand Up @@ -105,6 +106,12 @@ impl StateMachine {

// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.commit_qcs
.entry(message.view)
.or_insert(CommitQC::new(message, self.inner.validator_set.len()))
.add(&signed_message.sig, validator_index);

// We store the message in our cache.
let cache_entry = self.commit_message_cache.entry(message.view).or_default();
cache_entry.insert(author.clone(), signed_message);
Expand Down Expand Up @@ -134,12 +141,12 @@ impl StateMachine {

// ----------- Prepare our message and send it. --------------

// Create the justification for our message.
let justification = validator::CommitQC::from(
&replica_messages.into_iter().cloned().collect::<Vec<_>>(),
&self.inner.validator_set,
)
.expect("Couldn't create justification from valid replica messages!");
// Remove replica commit messages for this view, so that we don't create a new leader commit
// for this same view if we receive another replica commit message after this.
self.commit_message_cache.remove(&message.view);

// Consume the incrementally-constructed QC for this view.
let justification = self.commit_qcs.remove(&message.view).unwrap();

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
Expand Down
36 changes: 18 additions & 18 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ impl StateMachine {
}

// Check that the message signer is in the validator set.
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
let validator_index =
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Prepare) < (self.view, self.phase) {
Expand Down Expand Up @@ -148,6 +149,12 @@ impl StateMachine {

// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.prepare_qcs.entry(message.view).or_default().add(
&signed_message,
(validator_index, self.inner.validator_set.len()),
);

// We store the message in our cache.
self.prepare_message_cache
.entry(message.view)
Expand All @@ -161,15 +168,9 @@ impl StateMachine {
return Ok(());
}

// Get all the replica prepare messages for this view. Note that we consume the
// messages here. That's purposeful, so that we don't create a new block proposal
// Remove replica prepare messages for this view, so that we don't create a new block proposal
// for this same view if we receive another replica prepare message after this.
let replica_messages: Vec<_> = self
.prepare_message_cache
.remove(&message.view)
.unwrap()
.into_values()
.collect();
self.prepare_message_cache.remove(&message.view);

debug_assert_eq!(num_messages, self.inner.threshold());

Expand All @@ -179,10 +180,9 @@ impl StateMachine {
self.phase = validator::Phase::Commit;
self.phase_start = ctx.now();

// Create the justification for our message.
let justification =
validator::PrepareQC::from(&replica_messages, &self.inner.validator_set)
.expect("Couldn't create justification from valid replica messages!");
// Consume the incrementally-constructed QC for this view.
let justification = self.prepare_qcs.remove(&message.view).unwrap();

self.prepare_qc.send_replace(Some(justification));
Ok(())
}
Expand Down
18 changes: 12 additions & 6 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator;
use zksync_consensus_roles::validator::{self, CommitQC, PrepareQC};

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
Expand All @@ -28,13 +28,17 @@ pub(crate) struct StateMachine {
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaPrepare>>,
>,
/// Prepare QCs indexed by view number.
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, PrepareQC>,
/// Newest prepare QC composed from the `ReplicaPrepare` messages.
pub(crate) prepare_qc: sync::watch::Sender<Option<PrepareQC>>,
/// A cache of replica commit messages indexed by view number and validator.
pub(crate) commit_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaCommit>>,
>,
/// Newest quorum certificate composed from the `ReplicaPrepare` messages.
pub(crate) prepare_qc: sync::watch::Sender<Option<validator::PrepareQC>>,
/// Commit QCs indexed by view number.
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, CommitQC>,
}

impl StateMachine {
Expand All @@ -47,8 +51,10 @@ impl StateMachine {
phase: validator::Phase::Prepare,
phase_start: ctx.now(),
prepare_message_cache: BTreeMap::new(),
prepare_qcs: BTreeMap::new(),
commit_message_cache: BTreeMap::new(),
prepare_qc: sync::watch::channel(None).0,
commit_qcs: BTreeMap::new(),
}
}

Expand Down Expand Up @@ -102,7 +108,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
inner: &ConsensusInner,
payload_source: &dyn PayloadSource,
mut prepare_qc: sync::watch::Receiver<Option<validator::PrepareQC>>,
mut prepare_qc: sync::watch::Receiver<Option<PrepareQC>>,
) -> ctx::Result<()> {
let mut next_view = validator::ViewNumber(0);
loop {
Expand All @@ -123,7 +129,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
inner: &ConsensusInner,
payload_source: &dyn PayloadSource,
justification: validator::PrepareQC,
justification: PrepareQC,
) -> ctx::Result<()> {
// Get the highest block voted for and check if there's a quorum of votes for it. To have a quorum
// in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas.
Expand All @@ -139,7 +145,7 @@ impl StateMachine {
.cloned();

// Get the highest CommitQC.
let highest_qc: &validator::CommitQC = justification
let highest_qc: &CommitQC = justification
.map
.keys()
.map(|s| &s.high_qc)
Expand Down
17 changes: 9 additions & 8 deletions node/libs/crypto/src/bn254/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,16 @@ impl Ord for Signature {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AggregateSignature(G1);

impl AggregateSignature {
/// Generates an aggregate signature from a list of signatures.
pub fn aggregate<'a>(sigs: impl IntoIterator<Item = &'a Signature>) -> Self {
let mut agg = G1Affine::zero().into_projective();
for sig in sigs {
agg.add_assign(&sig.0)
}
impl Default for AggregateSignature {
fn default() -> Self {
Self(G1Affine::zero().into_projective())
}
}

AggregateSignature(agg)
impl AggregateSignature {
/// Add a signature to the aggregation.
pub fn add(&mut self, sig: &Signature) {
self.0.add_assign(&sig.0)
}

/// This function is intentionally non-generic and disallow inlining to ensure that compilation optimizations can be effectively applied.
Expand Down
11 changes: 11 additions & 0 deletions node/libs/crypto/src/bn254/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,14 @@ impl Distribution<AggregateSignature> for Standard {
AggregateSignature(p)
}
}

impl AggregateSignature {
/// Generate a new aggregate signature from a list of signatures.
pub fn aggregate<'a>(sigs: impl IntoIterator<Item = &'a Signature>) -> Self {
let mut agg = Self::default();
for sig in sigs {
agg.add(sig);
}
agg
}
}
10 changes: 4 additions & 6 deletions node/libs/roles/src/validator/keys/aggregate_signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ use zksync_consensus_crypto::{bn254, ByteFmt, Text, TextFmt};
use zksync_consensus_utils::enum_util::Variant;

/// An aggregate signature from a validator.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Default)]
pub struct AggregateSignature(pub(crate) bn254::AggregateSignature);

impl AggregateSignature {
/// Generate a new aggregate signature from a list of signatures.
pub fn aggregate<'a>(sigs: impl IntoIterator<Item = &'a Signature>) -> Self {
Self(bn254::AggregateSignature::aggregate(
sigs.into_iter().map(|sig| &sig.0).collect::<Vec<_>>(),
))
/// Add a signature to the aggregation.
pub fn add(&mut self, sig: &Signature) {
self.0.add(&sig.0)
}

/// Verify a list of messages against a list of public keys.
Expand Down
110 changes: 29 additions & 81 deletions node/libs/roles/src/validator/messages/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Messages related to the consensus protocol.

use super::{BlockHeader, Msg, Payload, Signed};
use crate::validator;
use anyhow::{bail, Context};
use crate::{validator, validator::Signature};
use anyhow::bail;
use bit_vec::BitVec;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::collections::{BTreeMap, BTreeSet};
use zksync_consensus_utils::enum_util::{BadVariantError, Variant};

/// Version of the consensus algorithm that the validator is using.
Expand Down Expand Up @@ -172,7 +172,7 @@ pub struct LeaderCommit {
/// A quorum certificate of replica Prepare messages. Since not all Prepare messages are
/// identical (they have different high blocks and high QCs), we need to keep the high blocks
/// and high QCs in a map. We can still aggregate the signatures though.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct PrepareQC {
/// Map from replica Prepare messages to the validators that signed them.
pub map: BTreeMap<ReplicaPrepare, Signers>,
Expand All @@ -190,46 +190,21 @@ impl PrepareQC {
.unwrap_or(ViewNumber(0))
}

/// Creates a new PrepareQC from a list of *signed* replica Prepare messages and the current validator set.
pub fn from(
signed_messages: &[Signed<ReplicaPrepare>],
validators: &ValidatorSet,
) -> anyhow::Result<Self> {
// Get the view number from the messages, they must all be equal.
let view = signed_messages
.get(0)
.context("Empty signed messages vector")?
.msg
.view;

// Create the messages map.
let mut map: BTreeMap<ReplicaPrepare, Signers> = BTreeMap::new();

for signed_message in signed_messages {
if signed_message.msg.view != view {
bail!("Signed messages aren't all for the same view.");
}

// Get index of the validator in the validator set.
let index = validators
.index(&signed_message.key)
.context("Message signer isn't in the validator set")?;

if map.contains_key(&signed_message.msg) {
map.get_mut(&signed_message.msg).unwrap().0.set(index, true);
} else {
let mut bit_vec = BitVec::from_elem(validators.len(), false);
bit_vec.set(index, true);

map.insert(signed_message.msg.clone(), Signers(bit_vec));
}
}

// Aggregate the signatures.
let signature =
validator::AggregateSignature::aggregate(signed_messages.iter().map(|v| &v.sig));
/// Add a validator's signed message.
/// * `signed_message` - A signed message.
/// * `validator_index` - A tuple containing the index of the validator and the total size of the set.
pub fn add(
&mut self,
signed_message: &Signed<ReplicaPrepare>,
validator_index: (usize, usize),
moshababo marked this conversation as resolved.
Show resolved Hide resolved
) {
self.map
.entry(signed_message.msg.clone())
.or_insert_with(|| Signers(BitVec::from_elem(validator_index.1, false)))
.0
.set(validator_index.0, true);

Ok(Self { map, signature })
self.signature.add(&signed_message.sig);
}

/// Verifies the integrity of the PrepareQC.
Expand Down Expand Up @@ -308,46 +283,19 @@ pub struct CommitQC {
}

impl CommitQC {
/// Creates a new CommitQC from a list of *signed* replica Commit messages and the current validator set.
pub fn from(
signed_messages: &[Signed<ReplicaCommit>],
validators: &ValidatorSet,
) -> anyhow::Result<Self> {
// Store the signed messages in a Hashmap.
let message = signed_messages[0].msg;

for signed_message in signed_messages {
// Check that the votes are all for the same message.
if signed_message.msg != message {
bail!("CommitQC can only be created from votes for the same message.");
}
/// Create a new empty instance for a given `ReplicaCommit` message and a validator set size.
pub fn new(message: ReplicaCommit, validator_set_size: usize) -> Self {
moshababo marked this conversation as resolved.
Show resolved Hide resolved
Self {
message,
signers: Signers(BitVec::from_elem(validator_set_size, false)),
signature: validator::AggregateSignature::default(),
}
}

// Store the signed messages in a Hashmap.
let msg_map: HashMap<_, _> = signed_messages
.iter()
.map(|signed_message| {
// Check that the votes are all for the same message.
if signed_message.msg != message {
bail!("QuorumCertificate can only be created from votes for the same message.");
}
Ok((&signed_message.key, &signed_message.sig))
})
.collect::<anyhow::Result<_>>()?;

// Create the signers bit map.
let bit_vec = validators
.iter()
.map(|validator| msg_map.contains_key(validator))
.collect();

// Aggregate the signatures.
let signature = validator::AggregateSignature::aggregate(msg_map.values().copied());
Ok(Self {
message,
signers: Signers(bit_vec),
signature,
})
/// Add a validator's signature.
pub fn add(&mut self, sig: &Signature, validator_index: usize) {
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
self.signers.0.set(validator_index, true);
self.signature.add(sig);
}

/// Verifies the signature of the CommitQC.
Expand Down
Loading
Loading