Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted validators (BFT-408, BFT-424) #77

Merged
merged 45 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
08dbad0
Added weights to ValidatorSet
ElFantasma Mar 11, 2024
67a1e50
Added WeightedValidator to represent weighted validators in a Validat…
ElFantasma Mar 13, 2024
664c4cb
Merge branch 'main' into weighted_validators
ElFantasma Mar 13, 2024
b48ec12
Signers weight calculation
ElFantasma Mar 14, 2024
0707c46
Completed test and renamed ValidatorSet to ValidatorCommittee
ElFantasma Mar 15, 2024
416517a
Simplified weights representation
ElFantasma Mar 15, 2024
e6425e4
Different weights per validator in test
ElFantasma Mar 18, 2024
d9c3826
Merge branch 'main' into weighted_validators
ElFantasma Mar 19, 2024
7689c4f
Fixed outdated names
ElFantasma Mar 19, 2024
f6a3b52
Merge branch 'main' into weighted_validators
ElFantasma Mar 19, 2024
283ae7c
Merge branch 'main' into weighted_validators
ElFantasma Mar 20, 2024
4c8e981
Updated threshold logic to use weights instead of amount of signers
ElFantasma Mar 21, 2024
5ff923d
Fixed asserts and tests to use weights instead of amount of signers
ElFantasma Mar 21, 2024
532a9aa
Fixing check for weight before adding new signature in replica commit
ElFantasma Mar 22, 2024
8f52b61
Merge branch 'main' into weighted_validators
ElFantasma Mar 22, 2024
955d778
removed unnecessary borrow
ElFantasma Mar 22, 2024
e20cdbd
fix typo
ElFantasma Mar 22, 2024
8652d0e
minor style fixes
ElFantasma Mar 22, 2024
77a3483
Improved weight checks in replica_commit
ElFantasma Mar 25, 2024
152283e
Changed weight type to u64 and several clippy corrections
ElFantasma Mar 25, 2024
94e4ef4
Removed unnecessary constants
ElFantasma Mar 25, 2024
fc5a921
Simplified ValidatorCommittee internal structure and methods
ElFantasma Mar 26, 2024
49efb35
Corrected thresholds and removed unnecessary weight calculations
ElFantasma Mar 27, 2024
6a650e7
Checking weights sum for overflow
ElFantasma Mar 27, 2024
a4dbbf6
Merge branch 'main' into weighted_validators
ElFantasma Mar 27, 2024
9913ed0
Fixed merge error
ElFantasma Mar 27, 2024
bf36b3c
Fixed 'Bft - 424 fix aggregation of ReplicaCommit messages'
ElFantasma Apr 5, 2024
785b819
Merge branch 'main' into weighted_validators
ElFantasma Apr 5, 2024
1212ef8
Merge branch 'main' into weighted_validators
ElFantasma Apr 9, 2024
89d9d4c
Fixed simpler suggestions in PR
ElFantasma Apr 9, 2024
5b3de38
Fixed weight calculation bug in process_replica_prepare and added a t…
ElFantasma Apr 11, 2024
f3d18aa
Fixed typo
ElFantasma Apr 11, 2024
b15857c
Making it backwards compatible
ElFantasma Apr 11, 2024
e1c94ab
Merge branch 'main' into weighted_validators
ElFantasma Apr 11, 2024
89316f1
Making it backwards compatible and modifying tests to check it
ElFantasma Apr 11, 2024
dccfaad
some more corrections from PR review
ElFantasma Apr 11, 2024
c074717
Fixing protocol buffer backward compatibility
ElFantasma Apr 12, 2024
2f58168
Several PR review corrections
ElFantasma Apr 12, 2024
1764d13
Removed redundant BTtreeMap
ElFantasma Apr 12, 2024
0df72f9
Removed commit_message_cache as it is no longer needed
ElFantasma Apr 12, 2024
5afab58
Addressed several suggestions in PR review
ElFantasma Apr 15, 2024
d6f2a8a
Improved some tests and added zero weight validator test
ElFantasma Apr 15, 2024
5e13fb3
Proper duplicate signature check and some suggested code improvements
ElFantasma Apr 16, 2024
608328c
Rewrote minimal validator amount assertion in UTHarness::new_many()
ElFantasma Apr 16, 2024
aa8e0e2
Merge branch 'main' into weighted_validators
ElFantasma Apr 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 31 additions & 45 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Handler of a ReplicaCommit message.

use super::StateMachine;
use crate::metrics;
use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
Expand Down Expand Up @@ -39,10 +39,10 @@ pub(crate) enum Error {
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] anyhow::Error),
/// Duplicate message from a replica.
#[error("duplicate message from a replica (existing message: {existing_message:?}")]
DuplicateMessage {
/// Existing message from the same replica.
existing_message: validator::ReplicaCommit,
#[error("Replica signed more than one message for same view (message: {message:?}")]
DuplicateSignature {
/// Offending message.
message: validator::ReplicaCommit,
},
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
Expand Down Expand Up @@ -70,7 +70,7 @@ impl StateMachine {
});
}

// Check that the message signer is in the validator set.
// Check that the message signer is in the validator committee.
if !self.config.genesis().validators.contains(author) {
return Err(Error::NonValidatorSigner {
signer: author.clone(),
Expand All @@ -96,16 +96,23 @@ impl StateMachine {
return Err(Error::NotLeaderInView);
}

// Get current incrementally-constructed QC to work on it
let commit_qc = self
.commit_qcs
.entry(message.view.number)
.or_default()
.entry(message.clone())
.or_insert_with(|| CommitQC::new(message.clone(), self.config.genesis()));

// If we already have a message from the same validator and for the same view, we discard it.
if let Some(existing_message) = self
.commit_message_cache
.get(&message.view.number)
.and_then(|x| x.get(author))
{
return Err(Error::DuplicateMessage {
existing_message: existing_message.msg.clone(),
let validator_view = self.validator_views.get(author);
if validator_view.is_some_and(|view_number| *view_number >= message.view.number) {
return Err(Error::DuplicateSignature {
message: commit_qc.message.clone(),
});
ElFantasma marked this conversation as resolved.
Show resolved Hide resolved
}
self.validator_views
.insert(author.clone(), message.view.number);

// ----------- Checking the signed part of the message --------------

Expand All @@ -118,37 +125,16 @@ impl StateMachine {

// ----------- All checks finished. Now we process the message. --------------

// TODO: we have a bug here since we don't check whether replicas commit
// to the same proposal.

// We add the message to the incrementally-constructed QC.
self.commit_qcs
.entry(message.view.number)
.or_insert_with(|| CommitQC::new(message.clone(), self.config.genesis()))
.add(&signed_message, self.config.genesis());
// Add the message to the QC.
commit_qc.add(&signed_message, self.config.genesis());

// We store the message in our cache.
let cache_entry = self
.commit_message_cache
.entry(message.view.number)
.or_default();
cache_entry.insert(author.clone(), signed_message.clone());

// Now we check if we have enough messages to continue.
let mut by_proposal: HashMap<_, Vec<_>> = HashMap::new();
for msg in cache_entry.values() {
by_proposal.entry(msg.msg.proposal).or_default().push(msg);
}
let threshold = self.config.genesis().validators.threshold();
let Some((_, replica_messages)) =
by_proposal.into_iter().find(|(_, v)| v.len() >= threshold)
else {
// Now we check if we have enough weight to continue.
let weight = self.config.genesis().validators.weight(&commit_qc.signers);
if weight < self.config.genesis().validators.threshold() {
return Ok(());
};
debug_assert_eq!(replica_messages.len(), threshold);

// ----------- Update the state machine --------------

let now = ctx.now();
metrics::METRICS
.leader_commit_phase_latency
Expand All @@ -159,12 +145,13 @@ impl StateMachine {

// ----------- Prepare our message and send it. --------------

// Remove replica commit messages for this view, so that we don't create a new leader commit
// for this same view if we receive another replica commit message after this.
self.commit_message_cache.remove(&message.view.number);

// Consume the incrementally-constructed QC for this view.
let justification = self.commit_qcs.remove(&message.view.number).unwrap();
let justification = self
.commit_qcs
.remove(&message.view.number)
.unwrap()
.remove(message)
.unwrap();

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
Expand All @@ -180,7 +167,6 @@ impl StateMachine {

// Clean the caches.
self.prepare_message_cache.retain(|k, _| k >= &self.view);
self.commit_message_cache.retain(|k, _| k >= &self.view);

Ok(())
}
Expand Down
29 changes: 12 additions & 17 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use super::StateMachine;
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_roles::validator::{self, ProtocolVersion};
use zksync_consensus_roles::validator;

/// Errors that can occur when processing a "replica prepare" message.
#[derive(Debug, thiserror::Error)]
Expand All @@ -11,9 +11,9 @@ pub(crate) enum Error {
#[error("incompatible protocol version (message version: {message_version:?}, local version: {local_version:?}")]
IncompatibleProtocolVersion {
/// Message version.
message_version: ProtocolVersion,
message_version: validator::ProtocolVersion,
/// Local version.
local_version: ProtocolVersion,
local_version: validator::ProtocolVersion,
},
/// Message signer isn't part of the validator set.
#[error("Message signer isn't part of the validator set (signer: {signer:?})")]
Expand Down Expand Up @@ -132,34 +132,29 @@ impl StateMachine {
// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.prepare_qcs
let prepare_qc = self
.prepare_qcs
.entry(message.view.number)
.or_insert_with(|| validator::PrepareQC::new(message.view.clone()))
.add(&signed_message, self.config.genesis());
.or_insert_with(|| validator::PrepareQC::new(message.view.clone()));
prepare_qc.add(&signed_message, self.config.genesis());

// We store the message in our cache.
self.prepare_message_cache
.entry(message.view.number)
.or_default()
.insert(author.clone(), signed_message);
.insert(author.clone(), signed_message.clone());

// Now we check if we have enough messages to continue.
let num_messages = self
.prepare_message_cache
.get(&message.view.number)
.unwrap()
.len();

if num_messages < self.config.genesis().validators.threshold() {
// Now we check if we have enough weight to continue.
if prepare_qc.weight(&self.config.genesis().validators)
< self.config.genesis().validators.threshold()
{
return Ok(());
}

// Remove replica prepare messages for this view, so that we don't create a new block proposal
// for this same view if we receive another replica prepare message after this.
self.prepare_message_cache.remove(&message.view.number);

debug_assert_eq!(num_messages, self.config.genesis().validators.threshold());

// ----------- Update the state machine --------------

self.view = message.view.number;
Expand Down
14 changes: 6 additions & 8 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ pub(crate) struct StateMachine {
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, validator::PrepareQC>,
/// Newest prepare QC composed from the `ReplicaPrepare` messages.
pub(crate) prepare_qc: sync::watch::Sender<Option<validator::PrepareQC>>,
/// A cache of replica commit messages indexed by view number and validator.
pub(crate) commit_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, Signed<validator::ReplicaCommit>>,
>,
/// Commit QCs indexed by view number.
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, validator::CommitQC>,
/// Commit QCs indexed by view number and then by message.
pub(crate) commit_qcs:
BTreeMap<validator::ViewNumber, BTreeMap<validator::ReplicaCommit, validator::CommitQC>>,
/// Latest view a validator has signed a message for.
pub(crate) validator_views: BTreeMap<validator::PublicKey, validator::ViewNumber>,
}

impl StateMachine {
Expand All @@ -67,10 +65,10 @@ impl StateMachine {
phase_start: ctx.now(),
prepare_message_cache: BTreeMap::new(),
prepare_qcs: BTreeMap::new(),
commit_message_cache: BTreeMap::new(),
prepare_qc: sync::watch::channel(None).0,
commit_qcs: BTreeMap::new(),
inbound_pipe: recv,
validator_views: BTreeMap::new(),
};

(this, send)
Expand Down
127 changes: 125 additions & 2 deletions node/actors/bft/src/leader/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,68 @@ async fn replica_prepare_high_qc_of_future_view() {
.unwrap();
}

/// Check all ReplicaPrepare are included for weight calculation
/// even on different messages for the same view.
#[tokio::test]
async fn replica_prepare_different_messages() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new_many(ctx).await;
s.spawn_bg(runner.run(ctx));

util.produce_block(ctx).await;

let view = util.replica_view();
let replica_prepare = util.new_replica_prepare();

// Create a different proposal for the same view
let proposal = replica_prepare.clone().high_vote.unwrap().proposal;
let mut different_proposal = proposal;
different_proposal.number = different_proposal.number.next();

// Create a new ReplicaPrepare with the different proposal
let mut other_replica_prepare = replica_prepare.clone();
let mut high_vote = other_replica_prepare.high_vote.clone().unwrap();
high_vote.proposal = different_proposal;
let high_qc = util.new_commit_qc(|msg| {
msg.proposal = different_proposal;
msg.view = view.clone()
});

other_replica_prepare.high_vote = Some(high_vote);
other_replica_prepare.high_qc = Some(high_qc);

let validators = util.keys.len();

// half of the validators sign replica_prepare
for i in 0..validators / 2 {
util.process_replica_prepare(ctx, util.keys[i].sign_msg(replica_prepare.clone()))
.await
.unwrap();
}

let mut replica_commit_result = None;
// The rest of the validators until threshold sign other_replica_prepare
for i in validators / 2..util.genesis().validators.threshold() as usize {
replica_commit_result = util
.process_replica_prepare(ctx, util.keys[i].sign_msg(other_replica_prepare.clone()))
.await
.unwrap();
}

// That should be enough for a proposal to be committed (even with different proposals)
assert_matches!(replica_commit_result, Some(_));

// Check the first proposal has been committed (as it has more votes)
let message = replica_commit_result.unwrap().msg;
assert_eq!(message.proposal, proposal);
Ok(())
})
.await
.unwrap();
}

#[tokio::test]
async fn replica_commit_sanity() {
zksync_concurrency::testonly::abort_on_panic();
Expand Down Expand Up @@ -484,15 +546,31 @@ async fn replica_commit_already_exists() {
.await
.unwrap()
.is_none());

// Processing twice same ReplicaCommit for same view gets DuplicateSignature error
let res = util
.process_replica_commit(ctx, util.sign(replica_commit.clone()))
.await;
assert_matches!(
res,
Err(replica_commit::Error::DuplicateMessage { existing_message }) => {
assert_eq!(existing_message, replica_commit)
Err(replica_commit::Error::DuplicateSignature { message }) => {
assert_eq!(message, replica_commit)
}
);

// Processing twice different ReplicaCommit for same view gets DuplicateSignature error too
let mut different_replica_commit = replica_commit.clone();
different_replica_commit.proposal.number = replica_commit.proposal.number.next();
let res = util
.process_replica_commit(ctx, util.sign(different_replica_commit.clone()))
.await;
assert_matches!(
res,
Err(replica_commit::Error::DuplicateSignature { message }) => {
assert_eq!(message, different_replica_commit)
}
);

Ok(())
})
.await
Expand Down Expand Up @@ -572,3 +650,48 @@ async fn replica_commit_unexpected_proposal() {
.await
.unwrap();
}

/// Proposal should be the same for every ReplicaCommit
/// Check it doesn't fail if one validator sends a different proposal in
/// the ReplicaCommit
#[tokio::test]
async fn replica_commit_different_proposals() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new_many(ctx).await;
s.spawn_bg(runner.run(ctx));

let replica_commit = util.new_replica_commit(ctx).await;

// Process a modified replica_commit (ie. from a malicious or wrong node)
let mut bad_replica_commit = replica_commit.clone();
bad_replica_commit.proposal.number = replica_commit.proposal.number.next();
util.process_replica_commit(ctx, util.sign(bad_replica_commit))
.await
.unwrap();

// The rest of the validators sign the correct one
let mut replica_commit_result = None;
for i in 1..util.keys.len() {
replica_commit_result = util
.process_replica_commit(ctx, util.keys[i].sign_msg(replica_commit.clone()))
.await
.unwrap();
}

// Check correct proposal has been committed
assert_matches!(
replica_commit_result,
Some(leader_commit) => {
assert_eq!(
leader_commit.msg.justification.message.proposal,
replica_commit.proposal
);
}
);
Ok(())
})
.await
.unwrap();
}
Loading
Loading