Skip to content

Commit

Permalink
[Consensus] break score ties in randomized and deterministic order (M…
Browse files Browse the repository at this point in the history
…ystenLabs#17667)

## Description 

Currently score ties are broken with authority index, which can bias
leader selection to nodes with lower authority index. Instead, shuffle
the scores based on commit index, then stable sort scores.

Use a separate type `DecidedLeader` to represent leader slots that are
either committed or skipped.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
mwtian authored May 12, 2024
1 parent 7ac16a8 commit c04bb5e
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 214 deletions.
2 changes: 1 addition & 1 deletion consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ mod tests {
);
}
}
assert_eq!(committed_subdag.reputation_scores, vec![]);
assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
if expected_transactions.is_empty() {
break;
}
Expand Down
88 changes: 59 additions & 29 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,12 @@ pub struct CommittedSubDag {
pub commit_index: CommitIndex,
/// Optional scores that are provided as part of the consensus output to Sui
/// that can then be used by Sui for future submission to consensus.
pub reputation_scores: Vec<(AuthorityIndex, u64)>,
pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
}

impl CommittedSubDag {
/// Create new (empty) sub-dag.
pub fn new(
pub(crate) fn new(
leader: BlockRef,
blocks: Vec<VerifiedBlock>,
timestamp_ms: BlockTimestampMs,
Expand All @@ -314,17 +314,17 @@ impl CommittedSubDag {
blocks,
timestamp_ms,
commit_index,
reputation_scores: vec![],
reputation_scores_desc: vec![],
}
}

pub fn update_scores(&mut self, scores: Vec<(AuthorityIndex, u64)>) {
self.reputation_scores = scores;
pub(crate) fn update_scores(&mut self, reputation_scores_desc: Vec<(AuthorityIndex, u64)>) {
self.reputation_scores_desc = reputation_scores_desc;
}

/// Sort the blocks of the sub-dag by round number then authority index. Any
/// deterministic & stable algorithm works.
pub fn sort(&mut self) {
pub(crate) fn sort(&mut self) {
self.blocks.sort_by(|a, b| {
a.round()
.cmp(&b.round())
Expand Down Expand Up @@ -359,7 +359,7 @@ impl fmt::Debug for CommittedSubDag {
write!(
f,
"];{}ms;rs{:?})",
self.timestamp_ms, self.reputation_scores
self.timestamp_ms, self.reputation_scores_desc
)
}
}
Expand Down Expand Up @@ -427,9 +427,7 @@ pub(crate) enum Decision {
Indirect,
}

/// The status of every leader output by the committers. While the core only cares
/// about committed leaders, providing a richer status allows for easier debugging,
/// testing, and composition with advanced commit strategies.
/// The status of a leader slot from the direct and indirect commit rules.
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum LeaderStatus {
Commit(VerifiedBlock),
Expand All @@ -446,14 +444,6 @@ impl LeaderStatus {
}
}

pub(crate) fn authority(&self) -> AuthorityIndex {
match self {
Self::Commit(block) => block.author(),
Self::Skip(leader) => leader.authority,
Self::Undecided(leader) => leader.authority,
}
}

pub(crate) fn is_decided(&self) -> bool {
match self {
Self::Commit(_) => true,
Expand All @@ -462,31 +452,71 @@ impl LeaderStatus {
}
}

// Only should be called when the leader status is decided (Commit/Skip)
pub fn get_decided_slot(&self) -> Slot {
pub(crate) fn into_decided_leader(self) -> Option<DecidedLeader> {
match self {
Self::Commit(block) => Some(DecidedLeader::Commit(block)),
Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
Self::Undecided(..) => None,
}
}
}

impl Display for LeaderStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Commit(block) => write!(f, "Commit({})", block.reference()),
Self::Skip(slot) => write!(f, "Skip({slot})"),
Self::Undecided(slot) => write!(f, "Undecided({slot})"),
}
}
}

/// Decision of each leader slot.
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum DecidedLeader {
Commit(VerifiedBlock),
Skip(Slot),
}

impl DecidedLeader {
// Slot where the leader is decided.
pub(crate) fn slot(&self) -> Slot {
match self {
Self::Commit(block) => block.reference().into(),
Self::Skip(leader) => *leader,
Self::Undecided(..) => panic!("Decided block is either Commit or Skip"),
Self::Skip(slot) => *slot,
}
}

// Only should be called when the leader status is decided (Commit/Skip)
pub fn into_committed_block(self) -> Option<VerifiedBlock> {
// Converts to committed block if the decision is to commit. Returns None otherwise.
pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
match self {
Self::Commit(block) => Some(block),
Self::Skip(_leader) => None,
Self::Undecided(..) => panic!("Decided block is either Commit or Skip"),
Self::Skip(_) => None,
}
}

#[cfg(test)]
pub(crate) fn round(&self) -> Round {
match self {
Self::Commit(block) => block.round(),
Self::Skip(leader) => leader.round,
}
}

#[cfg(test)]
pub(crate) fn authority(&self) -> AuthorityIndex {
match self {
Self::Commit(block) => block.author(),
Self::Skip(leader) => leader.authority,
}
}
}

impl Display for LeaderStatus {
impl Display for DecidedLeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Commit(block) => write!(f, "Commit({})", block.reference()),
Self::Skip(leader) => write!(f, "Skip({leader})"),
Self::Undecided(leader) => write!(f, "Undecided({leader})"),
Self::Skip(slot) => write!(f, "Skip({slot})"),
}
}
}
Expand Down
22 changes: 11 additions & 11 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ impl CommitObserver {

let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
let mut sent_sub_dags = vec![];
let reputation_scores = self
let reputation_scores_desc = self
.leader_schedule
.leader_swap_table
.read()
.reputation_scores
.authorities_by_score_desc(self.context.clone());
.reputation_scores_desc
.clone();
for mut committed_sub_dag in committed_sub_dags.into_iter() {
// TODO: Only update scores after a leader schedule change
// On handle commit the current scores that were used to elect the
// leader of the subdag will be added to the subdag and sent to sui.
committed_sub_dag.update_scores(reputation_scores.clone());
committed_sub_dag.update_scores(reputation_scores_desc.clone());
// Failures in sender.send() are assumed to be permanent
if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
tracing::error!(
Expand Down Expand Up @@ -145,8 +145,8 @@ impl CommitObserver {
self.leader_schedule
.leader_swap_table
.read()
.reputation_scores
.authorities_by_score_desc(self.context.clone()),
.reputation_scores_desc
.clone(),
);
}
self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
Expand Down Expand Up @@ -290,7 +290,7 @@ mod tests {
let mut processed_subdag_index = 0;
while let Ok(subdag) = receiver.try_recv() {
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores, vec![]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
if processed_subdag_index == leaders.len() {
break;
Expand Down Expand Up @@ -376,7 +376,7 @@ mod tests {
while let Ok(subdag) = receiver.try_recv() {
tracing::info!("Processed {subdag}");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores, vec![]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
if processed_subdag_index == expected_last_processed_index {
break;
Expand Down Expand Up @@ -412,7 +412,7 @@ mod tests {
while let Ok(subdag) = receiver.try_recv() {
tracing::info!("{subdag} was sent but not processed by consumer");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores, vec![]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
if processed_subdag_index == expected_last_sent_index {
break;
Expand Down Expand Up @@ -448,7 +448,7 @@ mod tests {
while let Ok(subdag) = receiver.try_recv() {
tracing::info!("Processed {subdag} on resubmission");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores, vec![]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
if processed_subdag_index == expected_last_sent_index {
break;
Expand Down Expand Up @@ -516,7 +516,7 @@ mod tests {
while let Ok(subdag) = receiver.try_recv() {
tracing::info!("Processed {subdag}");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores, vec![]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
if processed_subdag_index == expected_last_processed_index {
break;
Expand Down
14 changes: 7 additions & 7 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,17 +449,17 @@ impl Core {
.protocol_config
.mysticeti_leader_scoring_and_schedule()
{
let sequenced_leaders = self.committer.try_commit(self.last_decided_leader);
if let Some(last) = sequenced_leaders.last() {
self.last_decided_leader = last.get_decided_slot();
let decided_leaders = self.committer.try_decide(self.last_decided_leader);
if let Some(last) = decided_leaders.last() {
self.last_decided_leader = last.slot();
self.context
.metrics
.node_metrics
.last_decided_leader_round
.set(self.last_decided_leader.round as i64);
}

let committed_leaders = sequenced_leaders
let committed_leaders = decided_leaders
.into_iter()
.filter_map(|leader| leader.into_committed_block())
.collect::<Vec<_>>();
Expand Down Expand Up @@ -499,7 +499,7 @@ impl Core {

// TODO: limit commits by commits_until_update, which may be needed when leader schedule length
// is reduced.
let decided_leaders = self.committer.try_commit(self.last_decided_leader);
let decided_leaders = self.committer.try_decide(self.last_decided_leader);

let Some(last_decided) = decided_leaders.last().cloned() else {
break;
Expand All @@ -518,7 +518,7 @@ impl Core {
self.last_decided_leader = sequenced_leaders.last().unwrap().slot();
sequenced_leaders
} else {
self.last_decided_leader = last_decided.get_decided_slot();
self.last_decided_leader = last_decided.slot();
sequenced_leaders
};

Expand Down Expand Up @@ -1344,7 +1344,7 @@ mod test {
1
);
let expected_reputation_scores =
ReputationScores::new((11..21).into(), vec![8, 8, 9, 8]);
ReputationScores::new((11..21).into(), vec![9, 8, 8, 8]);
assert_eq!(
core.leader_schedule
.leader_swap_table
Expand Down
13 changes: 3 additions & 10 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,17 +789,10 @@ impl DagState {
panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
}

pub(crate) fn last_reputation_scores_from_store(&self) -> Option<ReputationScores> {
let commit_info = self
.store
pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
self.store
.read_last_commit_info()
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
if let Some((commit_ref, commit_info)) = commit_info {
assert!(commit_ref.index <= self.last_commit.as_ref().unwrap().index());
Some(commit_info.reputation_scores)
} else {
None
}
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
}

pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
Expand Down
Loading

0 comments on commit c04bb5e

Please sign in to comment.