From cf8340d50a7a8c2fe4118640b9f5b07f0997122c Mon Sep 17 00:00:00 2001 From: Moshe Shababo <17073733+moshababo@users.noreply.github.com> Date: Thu, 4 Jan 2024 15:17:18 +0200 Subject: [PATCH] Construct quorum certificate incrementally (BFT-386) (#51) Constructing quorum certificates from a batch of signed messages creates redundant error checks which are hard to reason about. Since this operation's failure is unacceptable and can only occur as a result of a bug, it is preferred to construct incrementally upon receiving each message. However, this refactoring ended up with no error checks during the incremental construction, delegating responsibility to the message processing. Consequently, this change can be viewed merely as code cleanup which removes redundant error checks, and the incremental steps as a safer way to do that. ### Notes * Batch-mode functions throughout the stack are no longer used in non-test code, and were moved to `testonly`. They can be removed altogether if tests will construct incrementally as well. * `PrepareQC::from` * `CommitQC::from` (also seemed to had a duplicated error check which was removed) * `validator::AggregateSignature::aggregate` * `bn254::AggregateSignature::aggregate` --- node/actors/bft/src/leader/replica_commit.rs | 33 +++-- node/actors/bft/src/leader/replica_prepare.rs | 37 +++--- node/actors/bft/src/leader/state_machine.rs | 18 ++- node/libs/crypto/src/bn254/mod.rs | 17 +-- node/libs/crypto/src/bn254/testonly.rs | 11 ++ .../src/validator/keys/aggregate_signature.rs | 10 +- .../roles/src/validator/messages/consensus.rs | 114 +++++------------- node/libs/roles/src/validator/testonly.rs | 75 ++++++++++++ 8 files changed, 183 insertions(+), 132 deletions(-) diff --git a/node/actors/bft/src/leader/replica_commit.rs b/node/actors/bft/src/leader/replica_commit.rs index 1430584a..36e5a026 100644 --- a/node/actors/bft/src/leader/replica_commit.rs +++ b/node/actors/bft/src/leader/replica_commit.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use tracing::instrument; use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _}; use zksync_consensus_network::io::{ConsensusInputMessage, Target}; -use zksync_consensus_roles::validator::{self, ProtocolVersion}; +use zksync_consensus_roles::validator::{self, CommitQC, ProtocolVersion}; /// Errors that can occur when processing a "replica commit" message. #[derive(Debug, thiserror::Error)] @@ -67,12 +67,13 @@ impl StateMachine { } // Check that the message signer is in the validator set. - self.inner - .validator_set - .index(author) - .ok_or(Error::NonValidatorSigner { - signer: author.clone(), - })?; + let validator_index = + self.inner + .validator_set + .index(author) + .ok_or(Error::NonValidatorSigner { + signer: author.clone(), + })?; // If the message is from the "past", we discard it. if (message.view, validator::Phase::Commit) < (self.view, self.phase) { @@ -105,6 +106,12 @@ impl StateMachine { // ----------- All checks finished. Now we process the message. -------------- + // We add the message to the incrementally-constructed QC. + self.commit_qcs + .entry(message.view) + .or_insert(CommitQC::new(message, &self.inner.validator_set)) + .add(&signed_message.sig, validator_index); + // We store the message in our cache. let cache_entry = self.commit_message_cache.entry(message.view).or_default(); cache_entry.insert(author.clone(), signed_message); @@ -134,12 +141,12 @@ impl StateMachine { // ----------- Prepare our message and send it. -------------- - // Create the justification for our message. - let justification = validator::CommitQC::from( - &replica_messages.into_iter().cloned().collect::>(), - &self.inner.validator_set, - ) - .expect("Couldn't create justification from valid replica messages!"); + // Remove replica commit messages for this view, so that we don't create a new leader commit + // for this same view if we receive another replica commit message after this. + self.commit_message_cache.remove(&message.view); + + // Consume the incrementally-constructed QC for this view. + let justification = self.commit_qcs.remove(&message.view).unwrap(); // Broadcast the leader commit message to all replicas (ourselves included). let output_message = ConsensusInputMessage { diff --git a/node/actors/bft/src/leader/replica_prepare.rs b/node/actors/bft/src/leader/replica_prepare.rs index d7a30e9b..3f458cad 100644 --- a/node/actors/bft/src/leader/replica_prepare.rs +++ b/node/actors/bft/src/leader/replica_prepare.rs @@ -92,12 +92,13 @@ impl StateMachine { } // Check that the message signer is in the validator set. - self.inner - .validator_set - .index(author) - .ok_or(Error::NonValidatorSigner { - signer: author.clone(), - })?; + let validator_index = + self.inner + .validator_set + .index(author) + .ok_or(Error::NonValidatorSigner { + signer: author.clone(), + })?; // If the message is from the "past", we discard it. if (message.view, validator::Phase::Prepare) < (self.view, self.phase) { @@ -148,6 +149,13 @@ impl StateMachine { // ----------- All checks finished. Now we process the message. -------------- + // We add the message to the incrementally-constructed QC. + self.prepare_qcs.entry(message.view).or_default().add( + &signed_message, + validator_index, + &self.inner.validator_set, + ); + // We store the message in our cache. self.prepare_message_cache .entry(message.view) @@ -161,15 +169,9 @@ impl StateMachine { return Ok(()); } - // Get all the replica prepare messages for this view. Note that we consume the - // messages here. That's purposeful, so that we don't create a new block proposal + // Remove replica prepare messages for this view, so that we don't create a new block proposal // for this same view if we receive another replica prepare message after this. - let replica_messages: Vec<_> = self - .prepare_message_cache - .remove(&message.view) - .unwrap() - .into_values() - .collect(); + self.prepare_message_cache.remove(&message.view); debug_assert_eq!(num_messages, self.inner.threshold()); @@ -179,10 +181,9 @@ impl StateMachine { self.phase = validator::Phase::Commit; self.phase_start = ctx.now(); - // Create the justification for our message. - let justification = - validator::PrepareQC::from(&replica_messages, &self.inner.validator_set) - .expect("Couldn't create justification from valid replica messages!"); + // Consume the incrementally-constructed QC for this view. + let justification = self.prepare_qcs.remove(&message.view).unwrap(); + self.prepare_qc.send_replace(Some(justification)); Ok(()) } diff --git a/node/actors/bft/src/leader/state_machine.rs b/node/actors/bft/src/leader/state_machine.rs index 8af0e4b1..53d93836 100644 --- a/node/actors/bft/src/leader/state_machine.rs +++ b/node/actors/bft/src/leader/state_machine.rs @@ -7,7 +7,7 @@ use std::{ use tracing::instrument; use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time}; use zksync_consensus_network::io::{ConsensusInputMessage, Target}; -use zksync_consensus_roles::validator; +use zksync_consensus_roles::validator::{self, CommitQC, PrepareQC}; /// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store /// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for @@ -28,13 +28,17 @@ pub(crate) struct StateMachine { validator::ViewNumber, HashMap>, >, + /// Prepare QCs indexed by view number. + pub(crate) prepare_qcs: BTreeMap, + /// Newest prepare QC composed from the `ReplicaPrepare` messages. + pub(crate) prepare_qc: sync::watch::Sender>, /// A cache of replica commit messages indexed by view number and validator. pub(crate) commit_message_cache: BTreeMap< validator::ViewNumber, HashMap>, >, - /// Newest quorum certificate composed from the `ReplicaPrepare` messages. - pub(crate) prepare_qc: sync::watch::Sender>, + /// Commit QCs indexed by view number. + pub(crate) commit_qcs: BTreeMap, } impl StateMachine { @@ -47,8 +51,10 @@ impl StateMachine { phase: validator::Phase::Prepare, phase_start: ctx.now(), prepare_message_cache: BTreeMap::new(), + prepare_qcs: BTreeMap::new(), commit_message_cache: BTreeMap::new(), prepare_qc: sync::watch::channel(None).0, + commit_qcs: BTreeMap::new(), } } @@ -102,7 +108,7 @@ impl StateMachine { ctx: &ctx::Ctx, inner: &ConsensusInner, payload_source: &dyn PayloadSource, - mut prepare_qc: sync::watch::Receiver>, + mut prepare_qc: sync::watch::Receiver>, ) -> ctx::Result<()> { let mut next_view = validator::ViewNumber(0); loop { @@ -123,7 +129,7 @@ impl StateMachine { ctx: &ctx::Ctx, inner: &ConsensusInner, payload_source: &dyn PayloadSource, - justification: validator::PrepareQC, + justification: PrepareQC, ) -> ctx::Result<()> { // Get the highest block voted for and check if there's a quorum of votes for it. To have a quorum // in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas. @@ -139,7 +145,7 @@ impl StateMachine { .cloned(); // Get the highest CommitQC. - let highest_qc: &validator::CommitQC = justification + let highest_qc: &CommitQC = justification .map .keys() .map(|s| &s.high_qc) diff --git a/node/libs/crypto/src/bn254/mod.rs b/node/libs/crypto/src/bn254/mod.rs index 622553c7..66592b67 100644 --- a/node/libs/crypto/src/bn254/mod.rs +++ b/node/libs/crypto/src/bn254/mod.rs @@ -170,15 +170,16 @@ impl Ord for Signature { #[derive(Clone, Debug, PartialEq, Eq)] pub struct AggregateSignature(G1); -impl AggregateSignature { - /// Generates an aggregate signature from a list of signatures. - pub fn aggregate<'a>(sigs: impl IntoIterator) -> Self { - let mut agg = G1Affine::zero().into_projective(); - for sig in sigs { - agg.add_assign(&sig.0) - } +impl Default for AggregateSignature { + fn default() -> Self { + Self(G1Affine::zero().into_projective()) + } +} - AggregateSignature(agg) +impl AggregateSignature { + /// Add a signature to the aggregation. + pub fn add(&mut self, sig: &Signature) { + self.0.add_assign(&sig.0) } /// This function is intentionally non-generic and disallow inlining to ensure that compilation optimizations can be effectively applied. diff --git a/node/libs/crypto/src/bn254/testonly.rs b/node/libs/crypto/src/bn254/testonly.rs index dd7bb4a1..8ee10d18 100644 --- a/node/libs/crypto/src/bn254/testonly.rs +++ b/node/libs/crypto/src/bn254/testonly.rs @@ -48,3 +48,14 @@ impl Distribution for Standard { AggregateSignature(p) } } + +impl AggregateSignature { + /// Generate a new aggregate signature from a list of signatures. + pub fn aggregate<'a>(sigs: impl IntoIterator) -> Self { + let mut agg = Self::default(); + for sig in sigs { + agg.add(sig); + } + agg + } +} diff --git a/node/libs/roles/src/validator/keys/aggregate_signature.rs b/node/libs/roles/src/validator/keys/aggregate_signature.rs index b41db29e..ebf2903a 100644 --- a/node/libs/roles/src/validator/keys/aggregate_signature.rs +++ b/node/libs/roles/src/validator/keys/aggregate_signature.rs @@ -5,15 +5,13 @@ use zksync_consensus_crypto::{bn254, ByteFmt, Text, TextFmt}; use zksync_consensus_utils::enum_util::Variant; /// An aggregate signature from a validator. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Default)] pub struct AggregateSignature(pub(crate) bn254::AggregateSignature); impl AggregateSignature { - /// Generate a new aggregate signature from a list of signatures. - pub fn aggregate<'a>(sigs: impl IntoIterator) -> Self { - Self(bn254::AggregateSignature::aggregate( - sigs.into_iter().map(|sig| &sig.0).collect::>(), - )) + /// Add a signature to the aggregation. + pub fn add(&mut self, sig: &Signature) { + self.0.add(&sig.0) } /// Verify a list of messages against a list of public keys. diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index 3dffa79a..8f6004ee 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -1,10 +1,10 @@ //! Messages related to the consensus protocol. use super::{BlockHeader, Msg, Payload, Signed}; -use crate::validator; -use anyhow::{bail, Context}; +use crate::{validator, validator::Signature}; +use anyhow::bail; use bit_vec::BitVec; -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet}; use zksync_consensus_utils::enum_util::{BadVariantError, Variant}; /// Version of the consensus algorithm that the validator is using. @@ -172,7 +172,7 @@ pub struct LeaderCommit { /// A quorum certificate of replica Prepare messages. Since not all Prepare messages are /// identical (they have different high blocks and high QCs), we need to keep the high blocks /// and high QCs in a map. We can still aggregate the signatures though. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Default)] pub struct PrepareQC { /// Map from replica Prepare messages to the validators that signed them. pub map: BTreeMap, @@ -190,46 +190,23 @@ impl PrepareQC { .unwrap_or(ViewNumber(0)) } - /// Creates a new PrepareQC from a list of *signed* replica Prepare messages and the current validator set. - pub fn from( - signed_messages: &[Signed], - validators: &ValidatorSet, - ) -> anyhow::Result { - // Get the view number from the messages, they must all be equal. - let view = signed_messages - .get(0) - .context("Empty signed messages vector")? - .msg - .view; - - // Create the messages map. - let mut map: BTreeMap = BTreeMap::new(); - - for signed_message in signed_messages { - if signed_message.msg.view != view { - bail!("Signed messages aren't all for the same view."); - } - - // Get index of the validator in the validator set. - let index = validators - .index(&signed_message.key) - .context("Message signer isn't in the validator set")?; - - if map.contains_key(&signed_message.msg) { - map.get_mut(&signed_message.msg).unwrap().0.set(index, true); - } else { - let mut bit_vec = BitVec::from_elem(validators.len(), false); - bit_vec.set(index, true); - - map.insert(signed_message.msg.clone(), Signers(bit_vec)); - } - } - - // Aggregate the signatures. - let signature = - validator::AggregateSignature::aggregate(signed_messages.iter().map(|v| &v.sig)); + /// Add a validator's signed message. + /// * `signed_message` - A valid signed `ReplicaPrepare` message. + /// * `validator_index` - The signer index in the validator set. + /// * `validator_set` - The validator set. + pub fn add( + &mut self, + signed_message: &Signed, + validator_index: usize, + validator_set: &ValidatorSet, + ) { + self.map + .entry(signed_message.msg.clone()) + .or_insert_with(|| Signers(BitVec::from_elem(validator_set.len(), false))) + .0 + .set(validator_index, true); - Ok(Self { map, signature }) + self.signature.add(&signed_message.sig); } /// Verifies the integrity of the PrepareQC. @@ -308,46 +285,21 @@ pub struct CommitQC { } impl CommitQC { - /// Creates a new CommitQC from a list of *signed* replica Commit messages and the current validator set. - pub fn from( - signed_messages: &[Signed], - validators: &ValidatorSet, - ) -> anyhow::Result { - // Store the signed messages in a Hashmap. - let message = signed_messages[0].msg; - - for signed_message in signed_messages { - // Check that the votes are all for the same message. - if signed_message.msg != message { - bail!("CommitQC can only be created from votes for the same message."); - } + /// Create a new empty instance for a given `ReplicaCommit` message and a validator set size. + pub fn new(message: ReplicaCommit, validator_set: &ValidatorSet) -> Self { + Self { + message, + signers: Signers(BitVec::from_elem(validator_set.len(), false)), + signature: validator::AggregateSignature::default(), } + } - // Store the signed messages in a Hashmap. - let msg_map: HashMap<_, _> = signed_messages - .iter() - .map(|signed_message| { - // Check that the votes are all for the same message. - if signed_message.msg != message { - bail!("QuorumCertificate can only be created from votes for the same message."); - } - Ok((&signed_message.key, &signed_message.sig)) - }) - .collect::>()?; - - // Create the signers bit map. - let bit_vec = validators - .iter() - .map(|validator| msg_map.contains_key(validator)) - .collect(); - - // Aggregate the signatures. - let signature = validator::AggregateSignature::aggregate(msg_map.values().copied()); - Ok(Self { - message, - signers: Signers(bit_vec), - signature, - }) + /// Add a validator's signature. + /// * `sig` - A valid signature. + /// * `validator_index` - The signer index in the validator set. + pub fn add(&mut self, sig: &Signature, validator_index: usize) { + self.signers.0.set(validator_index, true); + self.signature.add(sig); } /// Verifies the signature of the CommitQC. diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index dfe1e0f4..f8effd9c 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -5,6 +5,7 @@ use super::{ PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, ValidatorSet, ViewNumber, }; +use anyhow::{bail, Context}; use bit_vec::BitVec; use rand::{ distributions::{Distribution, Standard}, @@ -62,6 +63,80 @@ pub fn make_block( } } +impl AggregateSignature { + /// Generate a new aggregate signature from a list of signatures. + pub fn aggregate<'a>(sigs: impl IntoIterator) -> Self { + let mut agg = Self::default(); + for sig in sigs { + agg.add(sig); + } + agg + } +} + +impl PrepareQC { + /// Creates a new PrepareQC from a list of *signed* replica Prepare messages and the current validator set. + pub fn from( + signed_messages: &[Signed], + validators: &ValidatorSet, + ) -> anyhow::Result { + // Get the view number from the messages, they must all be equal. + let view = signed_messages + .get(0) + .context("Empty signed messages vector")? + .msg + .view; + + // Create the messages map. + let mut prepare_qc = PrepareQC::default(); + + for signed_message in signed_messages { + if signed_message.msg.view != view { + bail!("Signed messages aren't all for the same view."); + } + + // Get index of the validator in the validator set. + let index = validators + .index(&signed_message.key) + .context("Message signer isn't in the validator set")?; + + prepare_qc.add(signed_message, index, validators); + } + + Ok(prepare_qc) + } +} + +impl CommitQC { + /// Creates a new CommitQC from a list of *signed* replica Commit messages and the current validator set. + /// * `signed_messages` - A list of valid `ReplicaCommit` signed messages. Must contain at least one item. + /// * `validators` - The validator set. + pub fn from( + signed_messages: &[Signed], + validators: &ValidatorSet, + ) -> anyhow::Result { + // Store the signed messages in a Hashmap. + let message = signed_messages[0].msg; + let mut commit_qc = CommitQC::new(message, validators); + + for signed_message in signed_messages { + // Check that the votes are all for the same message. + if signed_message.msg != message { + bail!("CommitQC can only be created from votes for the same message."); + } + + // Get index of the validator in the validator set. + let validator_index = validators + .index(&signed_message.key) + .context("Message signer isn't in the validator set")?; + + commit_qc.add(&signed_message.sig, validator_index); + } + + Ok(commit_qc) + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> AggregateSignature { AggregateSignature(rng.gen())