diff --git a/node/actors/bft/src/leader/replica_commit.rs b/node/actors/bft/src/leader/replica_commit.rs index 1430584a..36e5a026 100644 --- a/node/actors/bft/src/leader/replica_commit.rs +++ b/node/actors/bft/src/leader/replica_commit.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use tracing::instrument; use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _}; use zksync_consensus_network::io::{ConsensusInputMessage, Target}; -use zksync_consensus_roles::validator::{self, ProtocolVersion}; +use zksync_consensus_roles::validator::{self, CommitQC, ProtocolVersion}; /// Errors that can occur when processing a "replica commit" message. #[derive(Debug, thiserror::Error)] @@ -67,12 +67,13 @@ impl StateMachine { } // Check that the message signer is in the validator set. - self.inner - .validator_set - .index(author) - .ok_or(Error::NonValidatorSigner { - signer: author.clone(), - })?; + let validator_index = + self.inner + .validator_set + .index(author) + .ok_or(Error::NonValidatorSigner { + signer: author.clone(), + })?; // If the message is from the "past", we discard it. if (message.view, validator::Phase::Commit) < (self.view, self.phase) { @@ -105,6 +106,12 @@ impl StateMachine { // ----------- All checks finished. Now we process the message. -------------- + // We add the message to the incrementally-constructed QC. + self.commit_qcs + .entry(message.view) + .or_insert(CommitQC::new(message, &self.inner.validator_set)) + .add(&signed_message.sig, validator_index); + // We store the message in our cache. let cache_entry = self.commit_message_cache.entry(message.view).or_default(); cache_entry.insert(author.clone(), signed_message); @@ -134,12 +141,12 @@ impl StateMachine { // ----------- Prepare our message and send it. -------------- - // Create the justification for our message. - let justification = validator::CommitQC::from( - &replica_messages.into_iter().cloned().collect::>(), - &self.inner.validator_set, - ) - .expect("Couldn't create justification from valid replica messages!"); + // Remove replica commit messages for this view, so that we don't create a new leader commit + // for this same view if we receive another replica commit message after this. + self.commit_message_cache.remove(&message.view); + + // Consume the incrementally-constructed QC for this view. + let justification = self.commit_qcs.remove(&message.view).unwrap(); // Broadcast the leader commit message to all replicas (ourselves included). let output_message = ConsensusInputMessage { diff --git a/node/actors/bft/src/leader/replica_prepare.rs b/node/actors/bft/src/leader/replica_prepare.rs index d7a30e9b..3f458cad 100644 --- a/node/actors/bft/src/leader/replica_prepare.rs +++ b/node/actors/bft/src/leader/replica_prepare.rs @@ -92,12 +92,13 @@ impl StateMachine { } // Check that the message signer is in the validator set. - self.inner - .validator_set - .index(author) - .ok_or(Error::NonValidatorSigner { - signer: author.clone(), - })?; + let validator_index = + self.inner + .validator_set + .index(author) + .ok_or(Error::NonValidatorSigner { + signer: author.clone(), + })?; // If the message is from the "past", we discard it. if (message.view, validator::Phase::Prepare) < (self.view, self.phase) { @@ -148,6 +149,13 @@ impl StateMachine { // ----------- All checks finished. Now we process the message. -------------- + // We add the message to the incrementally-constructed QC. + self.prepare_qcs.entry(message.view).or_default().add( + &signed_message, + validator_index, + &self.inner.validator_set, + ); + // We store the message in our cache. self.prepare_message_cache .entry(message.view) @@ -161,15 +169,9 @@ impl StateMachine { return Ok(()); } - // Get all the replica prepare messages for this view. Note that we consume the - // messages here. That's purposeful, so that we don't create a new block proposal + // Remove replica prepare messages for this view, so that we don't create a new block proposal // for this same view if we receive another replica prepare message after this. - let replica_messages: Vec<_> = self - .prepare_message_cache - .remove(&message.view) - .unwrap() - .into_values() - .collect(); + self.prepare_message_cache.remove(&message.view); debug_assert_eq!(num_messages, self.inner.threshold()); @@ -179,10 +181,9 @@ impl StateMachine { self.phase = validator::Phase::Commit; self.phase_start = ctx.now(); - // Create the justification for our message. - let justification = - validator::PrepareQC::from(&replica_messages, &self.inner.validator_set) - .expect("Couldn't create justification from valid replica messages!"); + // Consume the incrementally-constructed QC for this view. + let justification = self.prepare_qcs.remove(&message.view).unwrap(); + self.prepare_qc.send_replace(Some(justification)); Ok(()) } diff --git a/node/actors/bft/src/leader/state_machine.rs b/node/actors/bft/src/leader/state_machine.rs index 8af0e4b1..53d93836 100644 --- a/node/actors/bft/src/leader/state_machine.rs +++ b/node/actors/bft/src/leader/state_machine.rs @@ -7,7 +7,7 @@ use std::{ use tracing::instrument; use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time}; use zksync_consensus_network::io::{ConsensusInputMessage, Target}; -use zksync_consensus_roles::validator; +use zksync_consensus_roles::validator::{self, CommitQC, PrepareQC}; /// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store /// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for @@ -28,13 +28,17 @@ pub(crate) struct StateMachine { validator::ViewNumber, HashMap>, >, + /// Prepare QCs indexed by view number. + pub(crate) prepare_qcs: BTreeMap, + /// Newest prepare QC composed from the `ReplicaPrepare` messages. + pub(crate) prepare_qc: sync::watch::Sender>, /// A cache of replica commit messages indexed by view number and validator. pub(crate) commit_message_cache: BTreeMap< validator::ViewNumber, HashMap>, >, - /// Newest quorum certificate composed from the `ReplicaPrepare` messages. - pub(crate) prepare_qc: sync::watch::Sender>, + /// Commit QCs indexed by view number. + pub(crate) commit_qcs: BTreeMap, } impl StateMachine { @@ -47,8 +51,10 @@ impl StateMachine { phase: validator::Phase::Prepare, phase_start: ctx.now(), prepare_message_cache: BTreeMap::new(), + prepare_qcs: BTreeMap::new(), commit_message_cache: BTreeMap::new(), prepare_qc: sync::watch::channel(None).0, + commit_qcs: BTreeMap::new(), } } @@ -102,7 +108,7 @@ impl StateMachine { ctx: &ctx::Ctx, inner: &ConsensusInner, payload_source: &dyn PayloadSource, - mut prepare_qc: sync::watch::Receiver>, + mut prepare_qc: sync::watch::Receiver>, ) -> ctx::Result<()> { let mut next_view = validator::ViewNumber(0); loop { @@ -123,7 +129,7 @@ impl StateMachine { ctx: &ctx::Ctx, inner: &ConsensusInner, payload_source: &dyn PayloadSource, - justification: validator::PrepareQC, + justification: PrepareQC, ) -> ctx::Result<()> { // Get the highest block voted for and check if there's a quorum of votes for it. To have a quorum // in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas. @@ -139,7 +145,7 @@ impl StateMachine { .cloned(); // Get the highest CommitQC. - let highest_qc: &validator::CommitQC = justification + let highest_qc: &CommitQC = justification .map .keys() .map(|s| &s.high_qc) diff --git a/node/libs/crypto/src/bn254/mod.rs b/node/libs/crypto/src/bn254/mod.rs index 622553c7..66592b67 100644 --- a/node/libs/crypto/src/bn254/mod.rs +++ b/node/libs/crypto/src/bn254/mod.rs @@ -170,15 +170,16 @@ impl Ord for Signature { #[derive(Clone, Debug, PartialEq, Eq)] pub struct AggregateSignature(G1); -impl AggregateSignature { - /// Generates an aggregate signature from a list of signatures. - pub fn aggregate<'a>(sigs: impl IntoIterator) -> Self { - let mut agg = G1Affine::zero().into_projective(); - for sig in sigs { - agg.add_assign(&sig.0) - } +impl Default for AggregateSignature { + fn default() -> Self { + Self(G1Affine::zero().into_projective()) + } +} - AggregateSignature(agg) +impl AggregateSignature { + /// Add a signature to the aggregation. + pub fn add(&mut self, sig: &Signature) { + self.0.add_assign(&sig.0) } /// This function is intentionally non-generic and disallow inlining to ensure that compilation optimizations can be effectively applied. diff --git a/node/libs/crypto/src/bn254/testonly.rs b/node/libs/crypto/src/bn254/testonly.rs index dd7bb4a1..8ee10d18 100644 --- a/node/libs/crypto/src/bn254/testonly.rs +++ b/node/libs/crypto/src/bn254/testonly.rs @@ -48,3 +48,14 @@ impl Distribution for Standard { AggregateSignature(p) } } + +impl AggregateSignature { + /// Generate a new aggregate signature from a list of signatures. + pub fn aggregate<'a>(sigs: impl IntoIterator) -> Self { + let mut agg = Self::default(); + for sig in sigs { + agg.add(sig); + } + agg + } +} diff --git a/node/libs/roles/src/validator/keys/aggregate_signature.rs b/node/libs/roles/src/validator/keys/aggregate_signature.rs index b41db29e..ebf2903a 100644 --- a/node/libs/roles/src/validator/keys/aggregate_signature.rs +++ b/node/libs/roles/src/validator/keys/aggregate_signature.rs @@ -5,15 +5,13 @@ use zksync_consensus_crypto::{bn254, ByteFmt, Text, TextFmt}; use zksync_consensus_utils::enum_util::Variant; /// An aggregate signature from a validator. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Default)] pub struct AggregateSignature(pub(crate) bn254::AggregateSignature); impl AggregateSignature { - /// Generate a new aggregate signature from a list of signatures. - pub fn aggregate<'a>(sigs: impl IntoIterator) -> Self { - Self(bn254::AggregateSignature::aggregate( - sigs.into_iter().map(|sig| &sig.0).collect::>(), - )) + /// Add a signature to the aggregation. + pub fn add(&mut self, sig: &Signature) { + self.0.add(&sig.0) } /// Verify a list of messages against a list of public keys. diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index 3dffa79a..8f6004ee 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -1,10 +1,10 @@ //! Messages related to the consensus protocol. use super::{BlockHeader, Msg, Payload, Signed}; -use crate::validator; -use anyhow::{bail, Context}; +use crate::{validator, validator::Signature}; +use anyhow::bail; use bit_vec::BitVec; -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet}; use zksync_consensus_utils::enum_util::{BadVariantError, Variant}; /// Version of the consensus algorithm that the validator is using. @@ -172,7 +172,7 @@ pub struct LeaderCommit { /// A quorum certificate of replica Prepare messages. Since not all Prepare messages are /// identical (they have different high blocks and high QCs), we need to keep the high blocks /// and high QCs in a map. We can still aggregate the signatures though. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Default)] pub struct PrepareQC { /// Map from replica Prepare messages to the validators that signed them. pub map: BTreeMap, @@ -190,46 +190,23 @@ impl PrepareQC { .unwrap_or(ViewNumber(0)) } - /// Creates a new PrepareQC from a list of *signed* replica Prepare messages and the current validator set. - pub fn from( - signed_messages: &[Signed], - validators: &ValidatorSet, - ) -> anyhow::Result { - // Get the view number from the messages, they must all be equal. - let view = signed_messages - .get(0) - .context("Empty signed messages vector")? - .msg - .view; - - // Create the messages map. - let mut map: BTreeMap = BTreeMap::new(); - - for signed_message in signed_messages { - if signed_message.msg.view != view { - bail!("Signed messages aren't all for the same view."); - } - - // Get index of the validator in the validator set. - let index = validators - .index(&signed_message.key) - .context("Message signer isn't in the validator set")?; - - if map.contains_key(&signed_message.msg) { - map.get_mut(&signed_message.msg).unwrap().0.set(index, true); - } else { - let mut bit_vec = BitVec::from_elem(validators.len(), false); - bit_vec.set(index, true); - - map.insert(signed_message.msg.clone(), Signers(bit_vec)); - } - } - - // Aggregate the signatures. - let signature = - validator::AggregateSignature::aggregate(signed_messages.iter().map(|v| &v.sig)); + /// Add a validator's signed message. + /// * `signed_message` - A valid signed `ReplicaPrepare` message. + /// * `validator_index` - The signer index in the validator set. + /// * `validator_set` - The validator set. + pub fn add( + &mut self, + signed_message: &Signed, + validator_index: usize, + validator_set: &ValidatorSet, + ) { + self.map + .entry(signed_message.msg.clone()) + .or_insert_with(|| Signers(BitVec::from_elem(validator_set.len(), false))) + .0 + .set(validator_index, true); - Ok(Self { map, signature }) + self.signature.add(&signed_message.sig); } /// Verifies the integrity of the PrepareQC. @@ -308,46 +285,21 @@ pub struct CommitQC { } impl CommitQC { - /// Creates a new CommitQC from a list of *signed* replica Commit messages and the current validator set. - pub fn from( - signed_messages: &[Signed], - validators: &ValidatorSet, - ) -> anyhow::Result { - // Store the signed messages in a Hashmap. - let message = signed_messages[0].msg; - - for signed_message in signed_messages { - // Check that the votes are all for the same message. - if signed_message.msg != message { - bail!("CommitQC can only be created from votes for the same message."); - } + /// Create a new empty instance for a given `ReplicaCommit` message and a validator set size. + pub fn new(message: ReplicaCommit, validator_set: &ValidatorSet) -> Self { + Self { + message, + signers: Signers(BitVec::from_elem(validator_set.len(), false)), + signature: validator::AggregateSignature::default(), } + } - // Store the signed messages in a Hashmap. - let msg_map: HashMap<_, _> = signed_messages - .iter() - .map(|signed_message| { - // Check that the votes are all for the same message. - if signed_message.msg != message { - bail!("QuorumCertificate can only be created from votes for the same message."); - } - Ok((&signed_message.key, &signed_message.sig)) - }) - .collect::>()?; - - // Create the signers bit map. - let bit_vec = validators - .iter() - .map(|validator| msg_map.contains_key(validator)) - .collect(); - - // Aggregate the signatures. - let signature = validator::AggregateSignature::aggregate(msg_map.values().copied()); - Ok(Self { - message, - signers: Signers(bit_vec), - signature, - }) + /// Add a validator's signature. + /// * `sig` - A valid signature. + /// * `validator_index` - The signer index in the validator set. + pub fn add(&mut self, sig: &Signature, validator_index: usize) { + self.signers.0.set(validator_index, true); + self.signature.add(sig); } /// Verifies the signature of the CommitQC. diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index dfe1e0f4..f8effd9c 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -5,6 +5,7 @@ use super::{ PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, ValidatorSet, ViewNumber, }; +use anyhow::{bail, Context}; use bit_vec::BitVec; use rand::{ distributions::{Distribution, Standard}, @@ -62,6 +63,80 @@ pub fn make_block( } } +impl AggregateSignature { + /// Generate a new aggregate signature from a list of signatures. + pub fn aggregate<'a>(sigs: impl IntoIterator) -> Self { + let mut agg = Self::default(); + for sig in sigs { + agg.add(sig); + } + agg + } +} + +impl PrepareQC { + /// Creates a new PrepareQC from a list of *signed* replica Prepare messages and the current validator set. + pub fn from( + signed_messages: &[Signed], + validators: &ValidatorSet, + ) -> anyhow::Result { + // Get the view number from the messages, they must all be equal. + let view = signed_messages + .get(0) + .context("Empty signed messages vector")? + .msg + .view; + + // Create the messages map. + let mut prepare_qc = PrepareQC::default(); + + for signed_message in signed_messages { + if signed_message.msg.view != view { + bail!("Signed messages aren't all for the same view."); + } + + // Get index of the validator in the validator set. + let index = validators + .index(&signed_message.key) + .context("Message signer isn't in the validator set")?; + + prepare_qc.add(signed_message, index, validators); + } + + Ok(prepare_qc) + } +} + +impl CommitQC { + /// Creates a new CommitQC from a list of *signed* replica Commit messages and the current validator set. + /// * `signed_messages` - A list of valid `ReplicaCommit` signed messages. Must contain at least one item. + /// * `validators` - The validator set. + pub fn from( + signed_messages: &[Signed], + validators: &ValidatorSet, + ) -> anyhow::Result { + // Store the signed messages in a Hashmap. + let message = signed_messages[0].msg; + let mut commit_qc = CommitQC::new(message, validators); + + for signed_message in signed_messages { + // Check that the votes are all for the same message. + if signed_message.msg != message { + bail!("CommitQC can only be created from votes for the same message."); + } + + // Get index of the validator in the validator set. + let validator_index = validators + .index(&signed_message.key) + .context("Message signer isn't in the validator set")?; + + commit_qc.add(&signed_message.sig, validator_index); + } + + Ok(commit_qc) + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> AggregateSignature { AggregateSignature(rng.gen())