From c04bb5e198f8a48279708306c3959be319d064ef Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Sun, 12 May 2024 15:00:40 -0700 Subject: [PATCH] [Consensus] break score ties in randomized and deterministic order (#17667) ## Description Currently score ties are broken with authority index, which can bias leader selection to nodes with lower authority index. Instead, shuffle the scores based on commit index, then stable sort scores. Use a separate type `DecidedLeader` to represent leader slots that are either committed or skipped. ## Test plan CI --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --- consensus/core/src/authority_node.rs | 2 +- consensus/core/src/commit.rs | 88 +++++++++----- consensus/core/src/commit_observer.rs | 22 ++-- consensus/core/src/core.rs | 14 +-- consensus/core/src/dag_state.rs | 13 +-- consensus/core/src/leader_schedule.rs | 107 ++++++++++++------ consensus/core/src/leader_scoring.rs | 34 ++---- consensus/core/src/leader_scoring_strategy.rs | 4 +- .../src/tests/pipelined_committer_tests.rs | 60 +++++----- .../src/tests/universal_committer_tests.rs | 54 ++++----- consensus/core/src/universal_committer.rs | 70 ++++++------ .../consensus_types/consensus_output_api.rs | 4 +- 12 files changed, 258 insertions(+), 214 deletions(-) diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 22920ace89980..bf59253dbb8f1 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -640,7 +640,7 @@ mod tests { ); } } - assert_eq!(committed_subdag.reputation_scores, vec![]); + assert_eq!(committed_subdag.reputation_scores_desc, vec![]); if expected_transactions.is_empty() { break; } diff --git a/consensus/core/src/commit.rs b/consensus/core/src/commit.rs index ba844e5a0d322..676841fd2438c 100644 --- a/consensus/core/src/commit.rs +++ b/consensus/core/src/commit.rs @@ -298,12 +298,12 @@ pub struct CommittedSubDag { pub commit_index: CommitIndex, /// Optional scores that are provided as part of the consensus output to Sui /// that can then be used by Sui for future submission to consensus. - pub reputation_scores: Vec<(AuthorityIndex, u64)>, + pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>, } impl CommittedSubDag { /// Create new (empty) sub-dag. - pub fn new( + pub(crate) fn new( leader: BlockRef, blocks: Vec, timestamp_ms: BlockTimestampMs, @@ -314,17 +314,17 @@ impl CommittedSubDag { blocks, timestamp_ms, commit_index, - reputation_scores: vec![], + reputation_scores_desc: vec![], } } - pub fn update_scores(&mut self, scores: Vec<(AuthorityIndex, u64)>) { - self.reputation_scores = scores; + pub(crate) fn update_scores(&mut self, reputation_scores_desc: Vec<(AuthorityIndex, u64)>) { + self.reputation_scores_desc = reputation_scores_desc; } /// Sort the blocks of the sub-dag by round number then authority index. Any /// deterministic & stable algorithm works. - pub fn sort(&mut self) { + pub(crate) fn sort(&mut self) { self.blocks.sort_by(|a, b| { a.round() .cmp(&b.round()) @@ -359,7 +359,7 @@ impl fmt::Debug for CommittedSubDag { write!( f, "];{}ms;rs{:?})", - self.timestamp_ms, self.reputation_scores + self.timestamp_ms, self.reputation_scores_desc ) } } @@ -427,9 +427,7 @@ pub(crate) enum Decision { Indirect, } -/// The status of every leader output by the committers. While the core only cares -/// about committed leaders, providing a richer status allows for easier debugging, -/// testing, and composition with advanced commit strategies. +/// The status of a leader slot from the direct and indirect commit rules. #[derive(Debug, Clone, PartialEq)] pub(crate) enum LeaderStatus { Commit(VerifiedBlock), @@ -446,14 +444,6 @@ impl LeaderStatus { } } - pub(crate) fn authority(&self) -> AuthorityIndex { - match self { - Self::Commit(block) => block.author(), - Self::Skip(leader) => leader.authority, - Self::Undecided(leader) => leader.authority, - } - } - pub(crate) fn is_decided(&self) -> bool { match self { Self::Commit(_) => true, @@ -462,31 +452,71 @@ impl LeaderStatus { } } - // Only should be called when the leader status is decided (Commit/Skip) - pub fn get_decided_slot(&self) -> Slot { + pub(crate) fn into_decided_leader(self) -> Option { + match self { + Self::Commit(block) => Some(DecidedLeader::Commit(block)), + Self::Skip(slot) => Some(DecidedLeader::Skip(slot)), + Self::Undecided(..) => None, + } + } +} + +impl Display for LeaderStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Commit(block) => write!(f, "Commit({})", block.reference()), + Self::Skip(slot) => write!(f, "Skip({slot})"), + Self::Undecided(slot) => write!(f, "Undecided({slot})"), + } + } +} + +/// Decision of each leader slot. +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum DecidedLeader { + Commit(VerifiedBlock), + Skip(Slot), +} + +impl DecidedLeader { + // Slot where the leader is decided. + pub(crate) fn slot(&self) -> Slot { match self { Self::Commit(block) => block.reference().into(), - Self::Skip(leader) => *leader, - Self::Undecided(..) => panic!("Decided block is either Commit or Skip"), + Self::Skip(slot) => *slot, } } - // Only should be called when the leader status is decided (Commit/Skip) - pub fn into_committed_block(self) -> Option { + // Converts to committed block if the decision is to commit. Returns None otherwise. + pub(crate) fn into_committed_block(self) -> Option { match self { Self::Commit(block) => Some(block), - Self::Skip(_leader) => None, - Self::Undecided(..) => panic!("Decided block is either Commit or Skip"), + Self::Skip(_) => None, + } + } + + #[cfg(test)] + pub(crate) fn round(&self) -> Round { + match self { + Self::Commit(block) => block.round(), + Self::Skip(leader) => leader.round, + } + } + + #[cfg(test)] + pub(crate) fn authority(&self) -> AuthorityIndex { + match self { + Self::Commit(block) => block.author(), + Self::Skip(leader) => leader.authority, } } } -impl Display for LeaderStatus { +impl Display for DecidedLeader { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Commit(block) => write!(f, "Commit({})", block.reference()), - Self::Skip(leader) => write!(f, "Skip({leader})"), - Self::Undecided(leader) => write!(f, "Undecided({leader})"), + Self::Skip(slot) => write!(f, "Skip({slot})"), } } } diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index ea7e630c2cb57..9e093bea968ba 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -74,17 +74,17 @@ impl CommitObserver { let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders); let mut sent_sub_dags = vec![]; - let reputation_scores = self + let reputation_scores_desc = self .leader_schedule .leader_swap_table .read() - .reputation_scores - .authorities_by_score_desc(self.context.clone()); + .reputation_scores_desc + .clone(); for mut committed_sub_dag in committed_sub_dags.into_iter() { // TODO: Only update scores after a leader schedule change // On handle commit the current scores that were used to elect the // leader of the subdag will be added to the subdag and sent to sui. - committed_sub_dag.update_scores(reputation_scores.clone()); + committed_sub_dag.update_scores(reputation_scores_desc.clone()); // Failures in sender.send() are assumed to be permanent if let Err(err) = self.sender.send(committed_sub_dag.clone()) { tracing::error!( @@ -145,8 +145,8 @@ impl CommitObserver { self.leader_schedule .leader_swap_table .read() - .reputation_scores - .authorities_by_score_desc(self.context.clone()), + .reputation_scores_desc + .clone(), ); } self.sender.send(committed_sub_dag).unwrap_or_else(|e| { @@ -290,7 +290,7 @@ mod tests { let mut processed_subdag_index = 0; while let Ok(subdag) = receiver.try_recv() { assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == leaders.len() { break; @@ -376,7 +376,7 @@ mod tests { while let Ok(subdag) = receiver.try_recv() { tracing::info!("Processed {subdag}"); assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == expected_last_processed_index { break; @@ -412,7 +412,7 @@ mod tests { while let Ok(subdag) = receiver.try_recv() { tracing::info!("{subdag} was sent but not processed by consumer"); assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == expected_last_sent_index { break; @@ -448,7 +448,7 @@ mod tests { while let Ok(subdag) = receiver.try_recv() { tracing::info!("Processed {subdag} on resubmission"); assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == expected_last_sent_index { break; @@ -516,7 +516,7 @@ mod tests { while let Ok(subdag) = receiver.try_recv() { tracing::info!("Processed {subdag}"); assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == expected_last_processed_index { break; diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 8aac633430978..1219e43fa6851 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -449,9 +449,9 @@ impl Core { .protocol_config .mysticeti_leader_scoring_and_schedule() { - let sequenced_leaders = self.committer.try_commit(self.last_decided_leader); - if let Some(last) = sequenced_leaders.last() { - self.last_decided_leader = last.get_decided_slot(); + let decided_leaders = self.committer.try_decide(self.last_decided_leader); + if let Some(last) = decided_leaders.last() { + self.last_decided_leader = last.slot(); self.context .metrics .node_metrics @@ -459,7 +459,7 @@ impl Core { .set(self.last_decided_leader.round as i64); } - let committed_leaders = sequenced_leaders + let committed_leaders = decided_leaders .into_iter() .filter_map(|leader| leader.into_committed_block()) .collect::>(); @@ -499,7 +499,7 @@ impl Core { // TODO: limit commits by commits_until_update, which may be needed when leader schedule length // is reduced. - let decided_leaders = self.committer.try_commit(self.last_decided_leader); + let decided_leaders = self.committer.try_decide(self.last_decided_leader); let Some(last_decided) = decided_leaders.last().cloned() else { break; @@ -518,7 +518,7 @@ impl Core { self.last_decided_leader = sequenced_leaders.last().unwrap().slot(); sequenced_leaders } else { - self.last_decided_leader = last_decided.get_decided_slot(); + self.last_decided_leader = last_decided.slot(); sequenced_leaders }; @@ -1344,7 +1344,7 @@ mod test { 1 ); let expected_reputation_scores = - ReputationScores::new((11..21).into(), vec![8, 8, 9, 8]); + ReputationScores::new((11..21).into(), vec![9, 8, 8, 8]); assert_eq!( core.leader_schedule .leader_swap_table diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 6bd0a05c88459..ab35495cf2fb4 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -789,17 +789,10 @@ impl DagState { panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds."); } - pub(crate) fn last_reputation_scores_from_store(&self) -> Option { - let commit_info = self - .store + pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> { + self.store .read_last_commit_info() - .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)); - if let Some((commit_ref, commit_info)) = commit_info { - assert!(commit_ref.index <= self.last_commit.as_ref().unwrap().index()); - Some(commit_info.reputation_scores) - } else { - None - } + .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)) } pub(crate) fn unscored_committed_subdags_count(&self) -> u64 { diff --git a/consensus/core/src/leader_schedule.rs b/consensus/core/src/leader_schedule.rs index 53a857d3b8811..83a78db2e37b0 100644 --- a/consensus/core/src/leader_schedule.rs +++ b/consensus/core/src/leader_schedule.rs @@ -19,7 +19,7 @@ use crate::{ CertificateScoringStrategy, CertifiedVoteScoringStrategyV1, CertifiedVoteScoringStrategyV2, ScoringStrategy, VoteScoringStrategy, }, - Round, + CommitIndex, Round, }; /// The `LeaderSchedule` is responsible for producing the leader schedule across @@ -60,15 +60,13 @@ impl LeaderSchedule { /// Restores the `LeaderSchedule` from storage. It will attempt to retrieve the /// last stored `ReputationScores` and use them to build a `LeaderSwapTable`. pub(crate) fn from_store(context: Arc, dag_state: Arc>) -> Self { - let leader_swap_table = dag_state.read().last_reputation_scores_from_store().map_or( + let leader_swap_table = dag_state.read().recover_last_commit_info().map_or( LeaderSwapTable::default(), - |reputation_scores| { + |(last_commit_ref, last_commit_info)| { LeaderSwapTable::new( context.clone(), - reputation_scores, - context - .protocol_config - .consensus_bad_nodes_stake_threshold(), + last_commit_ref.index, + last_commit_info.reputation_scores, ) }, ); @@ -145,12 +143,11 @@ impl LeaderSchedule { reputation_scores.update_metrics(self.context.clone()); + let last_commit_index = unscored_subdags.last().unwrap().commit_index; self.update_leader_swap_table(LeaderSwapTable::new( self.context.clone(), + last_commit_index, reputation_scores.clone(), - self.context - .protocol_config - .consensus_bad_nodes_stake_threshold(), )); self.context @@ -250,6 +247,10 @@ pub(crate) struct LeaderSwapTable { /// Storing the hostname & stake along side the authority index for debugging. pub(crate) bad_nodes: BTreeMap, + /// Scores by authority in descending order, needed by other parts of the system + /// for a consistent view on how each validator performs in consensus. + pub(crate) reputation_scores_desc: Vec<(AuthorityIndex, u64)>, + // The scores for which the leader swap table was built from. This struct is // used for debugging purposes. Once `good_nodes` & `bad_nodes` are identified // the `reputation_scores` are no longer needed functionally for the swap table. @@ -263,9 +264,27 @@ impl LeaderSwapTable { // The `swap_stake_threshold` should be in the range of [0 - 33]. pub(crate) fn new( context: Arc, + commit_index: CommitIndex, reputation_scores: ReputationScores, - // Needed to prevent linter warning in simtests + ) -> Self { + let swap_stake_threshold = context + .protocol_config + .consensus_bad_nodes_stake_threshold(); + Self::new_inner( + context, + swap_stake_threshold, + commit_index, + reputation_scores, + ) + } + + fn new_inner( + context: Arc, + // Ignore linter warning in simtests. + // TODO: maybe override protocol configs in tests for swap_stake_threshold, and call new(). #[allow(unused_variables)] swap_stake_threshold: u64, + commit_index: CommitIndex, + reputation_scores: ReputationScores, ) -> Self { #[cfg(msim)] let swap_stake_threshold = 33; @@ -275,12 +294,26 @@ impl LeaderSwapTable { "The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected" ); + // When reputation scores are disabled or at genesis, use the default value. + if reputation_scores.scores_per_authority.is_empty() { + return Self::default(); + } + + // Randomize order of authorities when they have the same score, + // to avoid bias in the selection of the good and bad nodes. + let mut seed_bytes = [0u8; 32]; + seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes()); + let mut rng = StdRng::from_seed(seed_bytes); + let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone()); + assert_eq!(authorities_by_score.len(), context.committee.size()); + authorities_by_score.shuffle(&mut rng); + // Stable sort the authorities by score descending. Order of authorities with the same score is preserved. + authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1)); + // Calculating the good nodes let good_nodes = Self::retrieve_first_nodes( context.clone(), - reputation_scores - .authorities_by_score_desc(context.clone()) - .into_iter(), + authorities_by_score.iter(), swap_stake_threshold, ) .into_iter() @@ -291,10 +324,7 @@ impl LeaderSwapTable { // low scorers up to the provided stake threshold. let bad_nodes = Self::retrieve_first_nodes( context.clone(), - reputation_scores - .authorities_by_score_desc(context.clone()) - .into_iter() - .rev(), + authorities_by_score.iter().rev(), swap_stake_threshold, ) .into_iter() @@ -322,6 +352,7 @@ impl LeaderSwapTable { Self { good_nodes, bad_nodes, + reputation_scores_desc: authorities_by_score, reputation_scores, } } @@ -375,15 +406,15 @@ impl LeaderSwapTable { /// [0, 100] and expresses the percentage of stake that is considered the cutoff. /// It's the caller's responsibility to ensure that the elements of the `authorities` /// input is already sorted. - fn retrieve_first_nodes( + fn retrieve_first_nodes<'a>( context: Arc, - authorities: impl Iterator, + authorities: impl Iterator, stake_threshold: u64, ) -> Vec<(AuthorityIndex, String, Stake)> { let mut filtered_authorities = Vec::new(); let mut stake = 0; - for (authority_idx, _score) in authorities { + for &(authority_idx, _score) in authorities { stake += context.committee.stake(authority_idx); // If the total accumulated stake has surpassed the stake threshold @@ -718,9 +749,13 @@ mod tests { AuthorityIndex::new_for_test(0) ); assert_eq!(leader_swap_table.bad_nodes.len(), 1); - assert!(leader_swap_table - .bad_nodes - .contains_key(&AuthorityIndex::new_for_test(1))); + assert!( + leader_swap_table + .bad_nodes + .contains_key(&AuthorityIndex::new_for_test(2)), + "{:?}", + leader_swap_table.bad_nodes + ); } #[tokio::test] @@ -853,7 +888,7 @@ mod tests { assert_eq!(leader_swap_table.good_nodes.len(), 1); assert_eq!( leader_swap_table.good_nodes[0].0, - AuthorityIndex::new_for_test(3) + AuthorityIndex::new_for_test(2) ); assert_eq!(leader_swap_table.bad_nodes.len(), 1); assert!(leader_swap_table @@ -861,7 +896,7 @@ mod tests { .contains_key(&AuthorityIndex::new_for_test(0))); assert_eq!( leader_schedule.elect_leader(4, 0), - AuthorityIndex::new_for_test(3) + AuthorityIndex::new_for_test(2) ); } @@ -874,7 +909,7 @@ mod tests { let reputation_scores = ReputationScores::new((0..11).into(), (0..4).map(|i| i as u64).collect::>()); let leader_swap_table = - LeaderSwapTable::new(context, reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores); assert_eq!(leader_swap_table.good_nodes.len(), 1); assert_eq!( @@ -896,7 +931,7 @@ mod tests { let reputation_scores = ReputationScores::new((0..11).into(), (0..4).map(|i| i as u64).collect::>()); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); // Test swapping a bad leader let leader = AuthorityIndex::new_for_test(0); @@ -918,7 +953,7 @@ mod tests { telemetry_subscribers::init_for_testing(); let context = Arc::new(Context::new_for_test(4).0); - let authorities = vec![ + let authorities = [ (AuthorityIndex::new_for_test(0), 1), (AuthorityIndex::new_for_test(1), 2), (AuthorityIndex::new_for_test(2), 3), @@ -928,7 +963,7 @@ mod tests { let stake_threshold = 50; let filtered_authorities = LeaderSwapTable::retrieve_first_nodes( context.clone(), - authorities.into_iter(), + authorities.iter(), stake_threshold, ); @@ -962,7 +997,7 @@ mod tests { let swap_stake_threshold = 34; let reputation_scores = ReputationScores::new((0..11).into(), (0..4).map(|i| i as u64).collect::>()); - LeaderSwapTable::new(context, reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores); } #[tokio::test] @@ -974,7 +1009,7 @@ mod tests { let reputation_scores = ReputationScores::new((1..11).into(), (0..4).map(|i| i as u64).collect::>()); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); @@ -986,7 +1021,7 @@ mod tests { (0..4).map(|i| i as u64).collect::>(), ); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); // Update leader from old swap table to new valid swap table leader_schedule.update_leader_swap_table(leader_swap_table.clone()); @@ -1004,7 +1039,7 @@ mod tests { let reputation_scores = ReputationScores::new((1..11).into(), (0..4).map(|i| i as u64).collect::>()); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); @@ -1016,7 +1051,7 @@ mod tests { (0..4).map(|i| i as u64).collect::>(), ); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); // Update leader from old swap table to new valid swap table leader_schedule.update_leader_swap_table(leader_swap_table.clone()); @@ -1026,7 +1061,7 @@ mod tests { (0..4).map(|i| i as u64).collect::>(), ); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); // Update leader from old swap table to new invalid swap table leader_schedule.update_leader_swap_table(leader_swap_table.clone()); diff --git a/consensus/core/src/leader_scoring.rs b/consensus/core/src/leader_scoring.rs index 107dea08e5a48..aba1b7a6b0dea 100644 --- a/consensus/core/src/leader_scoring.rs +++ b/consensus/core/src/leader_scoring.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - cmp::Ordering, collections::{BTreeMap, HashMap}, fmt::Debug, ops::Bound::{Excluded, Included}, @@ -103,13 +102,9 @@ impl ReputationScores { } } - // Returns the authorities in score descending order. - pub(crate) fn authorities_by_score_desc( - &self, - context: Arc, - ) -> Vec<(AuthorityIndex, u64)> { - let mut authorities: Vec<_> = self - .scores_per_authority + // Returns the authorities index with score tuples. + pub(crate) fn authorities_by_score(&self, context: Arc) -> Vec<(AuthorityIndex, u64)> { + self.scores_per_authority .iter() .enumerate() .map(|(index, score)| { @@ -121,20 +116,7 @@ impl ReputationScores { *score, ) }) - .collect(); - - authorities.sort_by(|a1, a2| { - match a2.1.cmp(&a1.1) { - Ordering::Equal => { - // we resolve the score equality deterministically by ordering in authority - // identifier order descending. - a2.0.cmp(&a1.0) - } - result => result, - } - }); - - authorities + .collect() } pub(crate) fn update_metrics(&self, context: Arc) { @@ -334,17 +316,17 @@ mod tests { use crate::{leader_scoring_strategy::VoteScoringStrategy, test_dag_builder::DagBuilder}; #[tokio::test] - async fn test_reputation_scores_authorities_by_score_desc() { + async fn test_reputation_scores_authorities_by_score() { let context = Arc::new(Context::new_for_test(4).0); let scores = ReputationScores::new((1..300).into(), vec![4, 1, 1, 3]); - let authorities = scores.authorities_by_score_desc(context); + let authorities = scores.authorities_by_score(context); assert_eq!( authorities, vec![ (AuthorityIndex::new_for_test(0), 4), - (AuthorityIndex::new_for_test(3), 3), + (AuthorityIndex::new_for_test(1), 1), (AuthorityIndex::new_for_test(2), 1), - (AuthorityIndex::new_for_test(1), 1) + (AuthorityIndex::new_for_test(3), 3), ] ); } diff --git a/consensus/core/src/leader_scoring_strategy.rs b/consensus/core/src/leader_scoring_strategy.rs index 67137227c497a..55c7d9e4405dc 100644 --- a/consensus/core/src/leader_scoring_strategy.rs +++ b/consensus/core/src/leader_scoring_strategy.rs @@ -177,7 +177,6 @@ impl ScoringStrategy for VoteScoringStrategy { fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec { let num_authorities = subdag.context.committee.size(); let mut scores_per_authority = vec![0_u64; num_authorities]; - let voting_round = leader_slot.round + 1; let leader_blocks = subdag.get_blocks_at_slot(leader_slot); @@ -192,8 +191,11 @@ impl ScoringStrategy for VoteScoringStrategy { let leader_block = leader_blocks.first().unwrap(); + let voting_round = leader_slot.round + 1; let voting_blocks = subdag.get_blocks_at_round(voting_round); for potential_vote in voting_blocks { + // TODO: use the decided leader as input instead of leader slot. If the leader was skipped, + // votes to skip should be included in the score as well. if subdag.is_vote(&potential_vote, leader_block) { let authority = potential_vote.author(); tracing::trace!( diff --git a/consensus/core/src/tests/pipelined_committer_tests.rs b/consensus/core/src/tests/pipelined_committer_tests.rs index c684dd2def47c..4fe251329602b 100644 --- a/consensus/core/src/tests/pipelined_committer_tests.rs +++ b/consensus/core/src/tests/pipelined_committer_tests.rs @@ -8,7 +8,7 @@ use parking_lot::RwLock; use crate::{ block::{BlockAPI, Slot, TestBlock, Transaction, VerifiedBlock}, - commit::{LeaderStatus, DEFAULT_WAVE_LENGTH}, + commit::{DecidedLeader, DEFAULT_WAVE_LENGTH}, context::Context, dag_state::DagState, leader_schedule::{LeaderSchedule, LeaderSwapTable}, @@ -27,12 +27,12 @@ async fn direct_commit() { build_dag(context, dag_state, None, decision_round_wave_0_pipeline_1); let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); let leader_round_wave_0_pipeline_1 = committer.committers[1].leader_round(0); - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), leader_round_wave_0_pipeline_1); assert_eq!( block.author(), @@ -61,11 +61,11 @@ async fn idempotence() { // Commit one leader. let last_decided = Slot::new_for_test(0, 0); - let first_sequence = committer.try_commit(last_decided); + let first_sequence = committer.try_decide(last_decided); assert_eq!(first_sequence.len(), 1); tracing::info!("Commit sequence: {first_sequence:#?}"); - if let LeaderStatus::Commit(ref block) = first_sequence[0] { + if let DecidedLeader::Commit(ref block) = first_sequence[0] { assert_eq!(block.round(), leader_round_pipeline_1_wave_0); assert_eq!( block.author(), @@ -77,10 +77,10 @@ async fn idempotence() { // Ensure that if try_commit is called again with the same last decided leader // input the commit decision will be the same. - let first_sequence = committer.try_commit(last_decided); + let first_sequence = committer.try_decide(last_decided); assert_eq!(first_sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = first_sequence[0] { + if let DecidedLeader::Commit(ref block) = first_sequence[0] { assert_eq!(block.round(), leader_round_pipeline_1_wave_0); assert_eq!( block.author(), @@ -92,7 +92,7 @@ async fn idempotence() { // Ensure we don't commit the same leader again once last decided has been updated. let last_decided = Slot::new(first_sequence[0].round(), first_sequence[0].authority()); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); assert!(sequence.is_empty()); } @@ -121,11 +121,11 @@ async fn multiple_direct_commit() { )); // Because of pipelining we are committing a leader every round. - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), leader_round); assert_eq!( block.author(), @@ -157,14 +157,14 @@ async fn direct_commit_late_call() { build_dag(context.clone(), dag_state.clone(), None, decision_round); let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), n); for (i, leader_block) in sequence.iter().enumerate() { // First sequenced leader should be in round 1. let leader_round = i as u32 + 1; - if let LeaderStatus::Commit(ref block) = leader_block { + if let DecidedLeader::Commit(ref block) = leader_block { assert_eq!(block.round(), leader_round); assert_eq!(block.author(), committer.get_leaders(leader_round)[0]); } else { @@ -188,7 +188,7 @@ async fn no_genesis_commit() { ancestors = Some(build_dag(context.clone(), dag_state.clone(), ancestors, r)); let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); assert!(sequence.is_empty()); } } @@ -232,11 +232,11 @@ async fn direct_skip_no_leader() { // Ensure no blocks are committed because there are 2f+1 blame (non-votes) for // the missing leader. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Skip(leader) = sequence[0] { + if let DecidedLeader::Skip(leader) = sequence[0] { assert_eq!(leader.authority, leader_pipeline_1_wave_0); assert_eq!(leader.round, leader_round_pipeline_1_wave_0); } else { @@ -305,11 +305,11 @@ async fn direct_skip_enough_blame() { // Ensure the leader is skipped because there are 2f+1 blame (non-votes) for // the wave 0 leader of pipeline 1. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Skip(leader) = sequence[0] { + if let DecidedLeader::Skip(leader) = sequence[0] { assert_eq!(leader.authority, leader_pipeline_1_wave_0); assert_eq!(leader.round, leader_round_pipeline_1_wave_0); } else { @@ -411,13 +411,13 @@ async fn indirect_commit() { // Ensure we commit the first leaders. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 5); let committed_leader_round = 1; let leader = committer.get_leaders(committed_leader_round)[0]; - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), committed_leader_round); assert_eq!(block.author(), leader); } else { @@ -426,7 +426,7 @@ async fn indirect_commit() { let skipped_leader_round = 2; let leader = committer.get_leaders(skipped_leader_round)[0]; - if let LeaderStatus::Skip(ref slot) = sequence[1] { + if let DecidedLeader::Skip(ref slot) = sequence[1] { assert_eq!(slot.round, skipped_leader_round); assert_eq!(slot.authority, leader); } else { @@ -491,7 +491,7 @@ async fn indirect_skip() { // Ensure we commit the first 3 leaders, skip the 4th, and commit the last 2 leaders. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 7); @@ -500,7 +500,7 @@ async fn indirect_skip() { // First sequenced leader should be in round 1. let leader_round = i + 1; let leader = committer.get_leaders(leader_round)[0]; - if let LeaderStatus::Commit(ref block) = sequence[i as usize] { + if let DecidedLeader::Commit(ref block) = sequence[i as usize] { assert_eq!(block.author(), leader); } else { panic!("Expected a committed leader") @@ -508,7 +508,7 @@ async fn indirect_skip() { } // Ensure we skip the leader of wave 1 (pipeline one) but commit the others. - if let LeaderStatus::Skip(leader) = sequence[3] { + if let DecidedLeader::Skip(leader) = sequence[3] { assert_eq!(leader.authority, committer.get_leaders(leader_round_4)[0]); assert_eq!(leader.round, leader_round_4); } else { @@ -519,7 +519,7 @@ async fn indirect_skip() { for i in 4..=6 { let leader_round = i + 1; let leader = committer.get_leaders(leader_round)[0]; - if let LeaderStatus::Commit(ref block) = sequence[i as usize] { + if let DecidedLeader::Commit(ref block) = sequence[i as usize] { assert_eq!(block.author(), leader); } else { panic!("Expected a committed leader") @@ -569,7 +569,7 @@ async fn undecided() { // Ensure no blocks are committed. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); assert!(sequence.is_empty()); } @@ -714,11 +714,11 @@ async fn test_byzantine_validator() { // Expect a successful direct commit of A12 and leaders at rounds 1 ~ 11 as // pipelining is enabled. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 12); - if let LeaderStatus::Commit(ref block) = sequence[11] { + if let DecidedLeader::Commit(ref block) = sequence[11] { assert_eq!(block.round(), leader_round_12); assert_eq!(block.author(), committer.get_leaders(leader_round_12)[0]) } else { @@ -737,7 +737,7 @@ async fn test_byzantine_validator() { // Ensure B13 is marked as undecided as there is <2f+1 blame and <2f+1 certs let last_sequenced = sequence.last().unwrap(); let last_decided = Slot::new(last_sequenced.round(), last_sequenced.authority()); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); assert!(sequence.is_empty()); // Now build an additional 3 dag layers on top of the existing dag so a commit @@ -749,7 +749,7 @@ async fn test_byzantine_validator() { Some(references_round_15), 18, ); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 4); @@ -757,7 +757,7 @@ async fn test_byzantine_validator() { // of the multiple blocks at slot B13. let skipped_leader_round = 13; let leader = *committer.get_leaders(skipped_leader_round).first().unwrap(); - if let LeaderStatus::Skip(ref slot) = sequence[0] { + if let DecidedLeader::Skip(ref slot) = sequence[0] { assert_eq!(slot.round, skipped_leader_round); assert_eq!(slot.authority, leader); } else { diff --git a/consensus/core/src/tests/universal_committer_tests.rs b/consensus/core/src/tests/universal_committer_tests.rs index 1eb440d3ed153..8d57a935fbc6a 100644 --- a/consensus/core/src/tests/universal_committer_tests.rs +++ b/consensus/core/src/tests/universal_committer_tests.rs @@ -8,7 +8,7 @@ use parking_lot::RwLock; use crate::{ block::{BlockAPI, Slot, TestBlock, Transaction, VerifiedBlock}, - commit::LeaderStatus, + commit::DecidedLeader, context::Context, dag_state::DagState, leader_schedule::{LeaderSchedule, LeaderSwapTable}, @@ -43,11 +43,11 @@ async fn direct_commit() { // The universal committer should mark the potential leaders in leader round 6 as // undecided because there is no way to get enough certificates for leaders of // leader round 6 without completing wave 2. - let sequence = test_setup.committer.try_commit(last_decided); + let sequence = test_setup.committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!( block.author(), test_setup.committer.get_leaders(leader_round_wave_1)[0] @@ -74,10 +74,10 @@ async fn idempotence() { // Commit one leader. let last_decided = Slot::new_for_test(0, 0); - let first_sequence = committer.try_commit(last_decided); + let first_sequence = committer.try_decide(last_decided); assert_eq!(first_sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = first_sequence[0] { + if let DecidedLeader::Commit(ref block) = first_sequence[0] { assert_eq!(first_sequence[0].round(), leader_round_wave_1); assert_eq!( block.author(), @@ -89,10 +89,10 @@ async fn idempotence() { // Ensure that if try_commit is called again with the same last decided leader // input the commit decision will be the same. - let first_sequence = committer.try_commit(last_decided); + let first_sequence = committer.try_decide(last_decided); assert_eq!(first_sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = first_sequence[0] { + if let DecidedLeader::Commit(ref block) = first_sequence[0] { assert_eq!(first_sequence[0].round(), leader_round_wave_1); assert_eq!( block.author(), @@ -119,11 +119,11 @@ async fn idempotence() { leader_status_wave_1.authority(), ); let leader_round_wave_2 = committer.committers[0].leader_round(2); - let second_sequence = committer.try_commit(last_decided); + let second_sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {second_sequence:#?}"); assert_eq!(second_sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = second_sequence[0] { + if let DecidedLeader::Commit(ref block) = second_sequence[0] { assert_eq!(second_sequence[0].round(), leader_round_wave_2); assert_eq!( block.author(), @@ -154,11 +154,11 @@ async fn multiple_direct_commit() { // After each wave is complete try commit the leader of that wave. let leader_round = committer.committers[0].leader_round(n); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), leader_round); assert_eq!(block.author(), committer.get_leaders(leader_round)[0]); } else { @@ -188,7 +188,7 @@ async fn direct_commit_late_call() { ); let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); // With 11 waves completed, excluding wave 0 with genesis round as its leader @@ -196,7 +196,7 @@ async fn direct_commit_late_call() { assert_eq!(sequence.len(), num_waves - 1_usize); for (i, leader_block) in sequence.iter().enumerate() { let leader_round = committer.committers[0].leader_round(i as u32 + 1); - if let LeaderStatus::Commit(ref block) = leader_block { + if let DecidedLeader::Commit(ref block) = leader_block { assert_eq!(block.round(), leader_round); assert_eq!(block.author(), committer.get_leaders(leader_round)[0]); } else { @@ -217,7 +217,7 @@ async fn no_genesis_commit() { ancestors = Some(build_dag(context.clone(), dag_state.clone(), ancestors, r)); let last_committed = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_committed); + let sequence = committer.try_decide(last_committed); tracing::info!("Commit sequence: {sequence:#?}"); assert!(sequence.is_empty()); } @@ -259,11 +259,11 @@ async fn direct_skip_no_leader_votes() { // Ensure no blocks are committed because there are 2f+1 blame (non-votes) for // the leader of wave 1. let last_decided = Slot::new_for_test(0, 0); - let sequence = test_setup.committer.try_commit(last_decided); + let sequence = test_setup.committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Skip(leader) = sequence[0] { + if let DecidedLeader::Skip(leader) = sequence[0] { assert_eq!(leader.authority, leader_wave_1); assert_eq!(leader.round, leader_round_wave_1); } else { @@ -307,11 +307,11 @@ async fn direct_skip_missing_leader_block() { // Ensure the leader is skipped because the leader is missing. let last_committed = Slot::new_for_test(0, 0); - let sequence = test_setup.committer.try_commit(last_committed); + let sequence = test_setup.committer.try_decide(last_committed); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Skip(leader) = sequence[0] { + if let DecidedLeader::Skip(leader) = sequence[0] { assert_eq!( leader.authority, test_setup.committer.get_leaders(leader_round_wave_1)[0] @@ -381,14 +381,14 @@ async fn indirect_commit() { // Ensure we indirectly commit the leader of wave 1 via the directly committed // leader of wave 2. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 2); for (idx, decided_leader) in sequence.iter().enumerate() { let leader_round = committer.committers[0].leader_round(idx as u32 + 1); let expected_leader = committer.get_leaders(leader_round)[0]; - if let LeaderStatus::Commit(ref block) = decided_leader { + if let DecidedLeader::Commit(ref block) = decided_leader { assert_eq!(block.round(), leader_round); assert_eq!(block.author(), expected_leader); } else { @@ -461,14 +461,14 @@ async fn indirect_skip() { // Ensure we make a commit decision for the leaders of wave 1 ~ 3 let last_committed = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_committed); + let sequence = committer.try_decide(last_committed); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 3); // Ensure we commit the leader of wave 1 directly. let leader_round_wave_1 = committer.committers[0].leader_round(1); let leader_wave_1 = committer.get_leaders(leader_round_wave_1)[0]; - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), leader_round_wave_1); assert_eq!(block.author(), leader_wave_1); } else { @@ -479,7 +479,7 @@ async fn indirect_skip() { // This happens because we do not have enough votes in voting round of wave 2 // for the certificates of decision round wave 2 to form a certified link to // the leader of wave 2. - if let LeaderStatus::Skip(leader) = sequence[1] { + if let DecidedLeader::Skip(leader) = sequence[1] { assert_eq!(leader.authority, leader_wave_2); assert_eq!(leader.round, leader_round_wave_2); } else { @@ -489,7 +489,7 @@ async fn indirect_skip() { // Ensure we commit the 3rd leader directly. let leader_round_wave_3 = committer.committers[0].leader_round(3); let leader_wave_3 = committer.get_leaders(leader_round_wave_3)[0]; - if let LeaderStatus::Commit(ref block) = sequence[2] { + if let DecidedLeader::Commit(ref block) = sequence[2] { assert_eq!(block.round(), leader_round_wave_3); assert_eq!(block.author(), leader_wave_3); } else { @@ -549,7 +549,7 @@ async fn undecided() { // Ensure outcome of direct & indirect rule is undecided. So not commit decisions // should be returned. let last_committed = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_committed); + let sequence = committer.try_decide(last_committed); tracing::info!("Commit sequence: {sequence:#?}"); assert!(sequence.is_empty()); } @@ -692,11 +692,11 @@ async fn test_byzantine_direct_commit() { // Expect a successful direct commit of A12 and leaders at rounds 9, 6 & 3. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 4); - if let LeaderStatus::Commit(ref block) = sequence[3] { + if let DecidedLeader::Commit(ref block) = sequence[3] { assert_eq!( block.author(), committer.get_leaders(leader_round_wave_4)[0] diff --git a/consensus/core/src/universal_committer.rs b/consensus/core/src/universal_committer.rs index e7e07310ddf82..dc393b31db96e 100644 --- a/consensus/core/src/universal_committer.rs +++ b/consensus/core/src/universal_committer.rs @@ -8,8 +8,8 @@ use parking_lot::RwLock; use crate::{ base_committer::BaseCommitter, - block::{Round, Slot}, - commit::{Decision, LeaderStatus}, + block::{Round, Slot, GENESIS_ROUND}, + commit::{DecidedLeader, Decision}, context::Context, dag_state::DagState, }; @@ -35,10 +35,10 @@ pub(crate) struct UniversalCommitter { } impl UniversalCommitter { - /// Try to commit part of the dag. This function is idempotent and returns an ordered list of - /// decided slots. + /// Try to decide part of the dag. This function is idempotent and returns an ordered list of + /// decided leaders. #[tracing::instrument(skip_all, fields(last_decided = %last_decided))] - pub(crate) fn try_commit(&self, last_decided: Slot) -> Vec { + pub(crate) fn try_decide(&self, last_decided: Slot) -> Vec { let highest_accepted_round = self.dag_state.read().highest_accepted_round(); // Try to decide as many leaders as possible, starting with the highest round. @@ -50,48 +50,47 @@ impl UniversalCommitter { 'outer: for round in (last_decided.round..=highest_accepted_round.saturating_sub(2)).rev() { for committer in self.committers.iter().rev() { // Skip committers that don't have a leader for this round. - let Some(leader) = committer.elect_leader(round) else { + let Some(slot) = committer.elect_leader(round) else { continue; }; // now that we reached the last committed leader we can stop the commit rule - if leader == last_decided { - tracing::debug!("Reached last committed {leader}, now exit"); + if slot == last_decided { + tracing::debug!("Reached last committed {slot}, now exit"); break 'outer; } - tracing::debug!("Trying to decide {leader} with {committer}",); + tracing::debug!("Trying to decide {slot} with {committer}",); // Try to directly decide the leader. - let mut status = committer.try_direct_decide(leader); + let mut status = committer.try_direct_decide(slot); tracing::debug!("Outcome of direct rule: {status}"); // If we can't directly decide the leader, try to indirectly decide it. if status.is_decided() { - leaders.push_front((status.clone(), Decision::Direct)); + leaders.push_front((status, Decision::Direct)); } else { - status = committer.try_indirect_decide(leader, leaders.iter().map(|(x, _)| x)); - leaders.push_front((status.clone(), Decision::Indirect)); + status = committer.try_indirect_decide(slot, leaders.iter().map(|(x, _)| x)); tracing::debug!("Outcome of indirect rule: {status}"); + leaders.push_front((status, Decision::Indirect)); } } } // The decided sequence is the longest prefix of decided leaders. - leaders - .into_iter() - // Filter out all the genesis. - .filter(|(x, _)| x.round() > 0) - // Stop the sequence upon encountering an undecided leader. - .take_while(|(x, _)| x.is_decided()) - // We want to report metrics at this point to ensure that the decisions - // are reported only once hence we increase our accuracy - .inspect(|(x, direct_decided)| { - self.update_metrics(x, *direct_decided); - tracing::debug!("Decided {x}"); - }) - .map(|(x, _)| x) - .collect() + let mut decided_leaders = Vec::new(); + for (leader, decision) in leaders { + if leader.round() == GENESIS_ROUND { + continue; + } + let Some(decided_leader) = leader.into_decided_leader() else { + break; + }; + self.update_metrics(&decided_leader, decision); + decided_leaders.push(decided_leader); + } + tracing::debug!("Decided {decided_leaders:?}"); + decided_leaders } /// Return list of leaders for the round. @@ -105,23 +104,26 @@ impl UniversalCommitter { } /// Update metrics. - fn update_metrics(&self, leader: &LeaderStatus, decision: Decision) { - let authority = leader.authority().to_string(); + fn update_metrics(&self, decided_leader: &DecidedLeader, decision: Decision) { let decision_str = if decision == Decision::Direct { "direct" } else { "indirect" }; - let status = match leader { - LeaderStatus::Commit(..) => format!("{decision_str}-commit"), - LeaderStatus::Skip(..) => format!("{decision_str}-skip"), - LeaderStatus::Undecided(..) => return, + let status = match decided_leader { + DecidedLeader::Commit(..) => format!("{decision_str}-commit"), + DecidedLeader::Skip(..) => format!("{decision_str}-skip"), }; + let leader_host = &self + .context + .committee + .authority(decided_leader.slot().authority) + .hostname; self.context .metrics .node_metrics .committed_leaders_total - .with_label_values(&[&authority, &status]) + .with_label_values(&[leader_host, &status]) .inc(); } } diff --git a/crates/sui-core/src/consensus_types/consensus_output_api.rs b/crates/sui-core/src/consensus_types/consensus_output_api.rs index fc22c1f7c406b..5764124237f42 100644 --- a/crates/sui-core/src/consensus_types/consensus_output_api.rs +++ b/crates/sui-core/src/consensus_types/consensus_output_api.rs @@ -105,9 +105,9 @@ impl ConsensusOutputAPI for narwhal_types::ConsensusOutput { impl ConsensusOutputAPI for consensus_core::CommittedSubDag { fn reputation_score_sorted_desc(&self) -> Option> { - if !self.reputation_scores.is_empty() { + if !self.reputation_scores_desc.is_empty() { Some( - self.reputation_scores + self.reputation_scores_desc .iter() .map(|(id, score)| (id.value() as AuthorityIndex, *score)) .collect(),