diff --git a/consensus/config/src/committee.rs b/consensus/config/src/committee.rs index 0d941a3aa39fd..2dfb5ff66bb87 100644 --- a/consensus/config/src/committee.rs +++ b/consensus/config/src/committee.rs @@ -68,6 +68,11 @@ impl Committee { self.total_stake } + pub fn n_percent_stake_threshold(&self, n: u64) -> Stake { + assert!(n <= 100, "n must be between 0 and 100"); + self.total_stake * n / 100 + } + pub fn quorum_threshold(&self) -> Stake { self.quorum_threshold } diff --git a/consensus/core/src/ancestor.rs b/consensus/core/src/ancestor.rs new file mode 100644 index 0000000000000..6ad49ac190e39 --- /dev/null +++ b/consensus/core/src/ancestor.rs @@ -0,0 +1,385 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use consensus_config::AuthorityIndex; +use tracing::info; + +use crate::{context::Context, leader_scoring::ReputationScores, round_prober::QuorumRound}; + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum AncestorState { + Include, + Exclude(u64), +} + +#[derive(Clone)] +struct AncestorInfo { + state: AncestorState, + // This will be set to the count of either the quorum round update count or + // the score update count for which the EXCLUDE or INCLUDE state are locked + // in respectively. + lock_expiry_count: u32, +} + +impl AncestorInfo { + fn new() -> Self { + Self { + state: AncestorState::Include, + lock_expiry_count: 0, + } + } + + fn is_locked( + &self, + propagation_score_update_count: u32, + quorum_round_update_count: u32, + ) -> bool { + match self.state { + AncestorState::Include => self.lock_expiry_count > propagation_score_update_count, + AncestorState::Exclude(_) => self.lock_expiry_count > quorum_round_update_count, + } + } + + fn set_lock(&mut self, future_count: u32) { + self.lock_expiry_count = future_count; + } +} + +pub(crate) struct AncestorStateManager { + context: Arc, + state_map: Vec, + propagation_score_update_count: u32, + quorum_round_update_count: u32, + pub(crate) quorum_round_per_authority: Vec, + // This is the reputation scores that we use for leader election but we are + // using it here as a signal for high quality block propagation as well. + pub(crate) propagation_scores: ReputationScores, +} + +impl AncestorStateManager { + // Number of quorum round updates for which an ancestor is locked in the EXCLUDE state + // Chose 10 updates as that should be ~50 seconds of waiting with the current round prober + // interval of 5s + #[cfg(not(test))] + const STATE_LOCK_QUORUM_ROUND_UPDATES: u32 = 10; + #[cfg(test)] + const STATE_LOCK_QUORUM_ROUND_UPDATES: u32 = 1; + + // Number of propagation score updates for which an ancestor is locked in the INCLUDE state + // Chose 2 leader schedule updates (~300 commits per schedule) which should be ~30-90 seconds + // depending on the round rate for the authority to improve scores. + #[cfg(not(test))] + const STATE_LOCK_SCORE_UPDATES: u32 = 2; + #[cfg(test)] + const STATE_LOCK_SCORE_UPDATES: u32 = 1; + + // Exclusion threshold is based on propagation (reputation) scores + const EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 10; + + // Inclusion threshold is based on network quorum round + const INCLUSION_THRESHOLD_PERCENTAGE: u64 = 90; + + pub(crate) fn new(context: Arc) -> Self { + let state_map = vec![AncestorInfo::new(); context.committee.size()]; + + let quorum_round_per_authority = vec![(0, 0); context.committee.size()]; + Self { + context, + state_map, + propagation_score_update_count: 0, + quorum_round_update_count: 0, + propagation_scores: ReputationScores::default(), + quorum_round_per_authority, + } + } + + pub(crate) fn set_quorum_round_per_authority(&mut self, quorum_rounds: Vec) { + self.quorum_round_per_authority = quorum_rounds; + self.quorum_round_update_count += 1; + } + + pub(crate) fn set_propagation_scores(&mut self, scores: ReputationScores) { + self.propagation_scores = scores; + self.propagation_score_update_count += 1; + } + + pub(crate) fn get_ancestor_states(&self) -> Vec { + self.state_map.iter().map(|info| info.state).collect() + } + + /// Updates the state of all ancestors based on the latest scores and quorum rounds + pub(crate) fn update_all_ancestors_state(&mut self) { + let propagation_scores_by_authority = self + .propagation_scores + .scores_per_authority + .clone() + .into_iter() + .enumerate() + .map(|(idx, score)| { + ( + self.context + .committee + .to_authority_index(idx) + .expect("Index should be valid"), + score, + ) + }) + .collect::>(); + + // If round prober has not run yet and we don't have network quorum round, + // it is okay because network_low_quorum_round will be zero and we will + // include all ancestors until we get more information. + let network_low_quorum_round = self.calculate_network_low_quorum_round(); + + // If propagation scores are not ready because the first 300 commits have not + // happened, this is okay as we will only start excluding ancestors after that + // point in time. + for (authority_id, score) in propagation_scores_by_authority { + let (authority_low_quorum_round, _high) = self.quorum_round_per_authority[authority_id]; + + self.update_state( + authority_id, + score, + authority_low_quorum_round, + network_low_quorum_round, + ); + } + } + + /// Updates the state of the given authority based on current scores and quorum rounds. + fn update_state( + &mut self, + authority_id: AuthorityIndex, + propagation_score: u64, + authority_low_quorum_round: u32, + network_low_quorum_round: u32, + ) { + let block_hostname = &self.context.committee.authority(authority_id).hostname; + let mut ancestor_info = self.state_map[authority_id].clone(); + + if ancestor_info.is_locked( + self.propagation_score_update_count, + self.quorum_round_update_count, + ) { + // If still locked, we won't make any state changes. + return; + } + + let low_score_threshold = + (self.propagation_scores.highest_score() * Self::EXCLUSION_THRESHOLD_PERCENTAGE) / 100; + + match ancestor_info.state { + // Check conditions to switch to EXCLUDE state + AncestorState::Include => { + if propagation_score <= low_score_threshold { + ancestor_info.state = AncestorState::Exclude(propagation_score); + ancestor_info.set_lock( + self.quorum_round_update_count + Self::STATE_LOCK_QUORUM_ROUND_UPDATES, + ); + info!( + "Authority {authority_id} moved to EXCLUDE state with score {propagation_score} <= threshold of {low_score_threshold} and locked for {:?} quorum round updates", + Self::STATE_LOCK_QUORUM_ROUND_UPDATES + ); + self.context + .metrics + .node_metrics + .ancestor_state_change_by_authority + .with_label_values(&[block_hostname, "exclude"]) + .inc(); + } + } + // Check conditions to switch back to INCLUDE state + AncestorState::Exclude(_) => { + // It should not be possible for the scores to get over the threshold + // until the node is back in the INCLUDE state, but adding just in case. + if propagation_score > low_score_threshold + || authority_low_quorum_round >= network_low_quorum_round + { + ancestor_info.state = AncestorState::Include; + ancestor_info.set_lock( + self.propagation_score_update_count + Self::STATE_LOCK_SCORE_UPDATES, + ); + info!( + "Authority {authority_id} moved to INCLUDE state with {propagation_score} > threshold of {low_score_threshold} or {authority_low_quorum_round} >= {network_low_quorum_round} and locked for {:?} score updates.", + Self::STATE_LOCK_SCORE_UPDATES + ); + self.context + .metrics + .node_metrics + .ancestor_state_change_by_authority + .with_label_values(&[block_hostname, "include"]) + .inc(); + } + } + } + + // If any updates were made to state ensure they are persisted. + self.state_map[authority_id] = ancestor_info; + } + + /// Calculate the network's quorum round from authorities by inclusion stake + /// threshold, where quorum round is the highest round a block has been seen + /// by a percentage (inclusion threshold) of authorities. + /// TODO: experiment with using high quorum round + fn calculate_network_low_quorum_round(&self) -> u32 { + let committee = &self.context.committee; + let inclusion_stake_threshold = self.get_inclusion_stake_threshold(); + let mut low_quorum_rounds_with_stake = self + .quorum_round_per_authority + .iter() + .zip(committee.authorities()) + .map(|((low, _high), (_, authority))| (*low, authority.stake)) + .collect::>(); + low_quorum_rounds_with_stake.sort(); + + let mut total_stake = 0; + let mut network_low_quorum_round = 0; + + for (round, stake) in low_quorum_rounds_with_stake.iter().rev() { + total_stake += stake; + if total_stake >= inclusion_stake_threshold { + network_low_quorum_round = *round; + break; + } + } + + network_low_quorum_round + } + + fn get_inclusion_stake_threshold(&self) -> u64 { + self.context + .committee + .n_percent_stake_threshold(Self::INCLUSION_THRESHOLD_PERCENTAGE) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::leader_scoring::ReputationScores; + + #[tokio::test] + async fn test_calculate_network_low_quorum_round() { + telemetry_subscribers::init_for_testing(); + let context = Arc::new(Context::new_for_test(4).0); + + let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]); + let mut ancestor_state_manager = AncestorStateManager::new(context); + ancestor_state_manager.set_propagation_scores(scores); + + // Quorum rounds are not set yet, so we should calculate a network quorum + // round of 0 to start. + let network_low_quorum_round = ancestor_state_manager.calculate_network_low_quorum_round(); + assert_eq!(network_low_quorum_round, 0); + + let quorum_rounds = vec![(100, 229), (225, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + + let network_low_quorum_round = ancestor_state_manager.calculate_network_low_quorum_round(); + assert_eq!(network_low_quorum_round, 225); + } + + // Test all state transitions + // Default all INCLUDE -> EXCLUDE + // EXCLUDE -> INCLUDE (Blocked due to lock) + // EXCLUDE -> INCLUDE (Pass due to lock expired) + // INCLUDE -> EXCLUDE (Blocked due to lock) + // INCLUDE -> EXCLUDE (Pass due to lock expired) + #[tokio::test] + async fn test_update_all_ancestor_state() { + telemetry_subscribers::init_for_testing(); + let context = Arc::new(Context::new_for_test(4).0); + + let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]); + let mut ancestor_state_manager = AncestorStateManager::new(context); + ancestor_state_manager.set_propagation_scores(scores); + + let quorum_rounds = vec![(225, 229), (225, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + ancestor_state_manager.update_all_ancestors_state(); + + // Score threshold for exclude is (4 * 10) / 100 = 0 + // No ancestors should be excluded in with this threshold + let state_map = ancestor_state_manager.get_ancestor_states(); + for state in state_map.iter() { + assert_eq!(*state, AncestorState::Include); + } + + let scores = ReputationScores::new((1..=300).into(), vec![10, 10, 100, 100]); + ancestor_state_manager.set_propagation_scores(scores); + ancestor_state_manager.update_all_ancestors_state(); + + // Score threshold for exclude is (100 * 10) / 100 = 10 + // 2 authorities should be excluded in with this threshold + let state_map = ancestor_state_manager.get_ancestor_states(); + for (authority, state) in state_map.iter().enumerate() { + if (0..=1).contains(&authority) { + assert_eq!(*state, AncestorState::Exclude(10)); + } else { + assert_eq!(*state, AncestorState::Include); + } + } + + ancestor_state_manager.update_all_ancestors_state(); + + // 2 authorities should still be excluded with these scores and no new + // quorum round updates have been set to expire the locks. + let state_map = ancestor_state_manager.get_ancestor_states(); + for (authority, state) in state_map.iter().enumerate() { + if (0..=1).contains(&authority) { + assert_eq!(*state, AncestorState::Exclude(10)); + } else { + assert_eq!(*state, AncestorState::Include); + } + } + + // Updating the quorum rounds will expire the lock as we only need 1 + // quorum round update for tests. + let quorum_rounds = vec![(229, 300), (225, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + ancestor_state_manager.update_all_ancestors_state(); + + // Authority 0 should now be included again because low quorum round is + // at the network low quorum round of 229. Authority 1's quorum round is + // too low and will remain excluded. + let state_map = ancestor_state_manager.get_ancestor_states(); + for (authority, state) in state_map.iter().enumerate() { + if authority == 1 { + assert_eq!(*state, AncestorState::Exclude(10)); + } else { + assert_eq!(*state, AncestorState::Include); + } + } + + let quorum_rounds = vec![(229, 300), (229, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + ancestor_state_manager.update_all_ancestors_state(); + + // Ancestor 1 can transtion to the INCLUDE state. Ancestor 0 is still locked + // in the INCLUDE state until a score update is performed which is why + // even though the scores are still low it has not moved to the EXCLUDE + // state. + let state_map = ancestor_state_manager.get_ancestor_states(); + for state in state_map.iter() { + assert_eq!(*state, AncestorState::Include); + } + + // Updating the scores will expire the lock as we only need 1 update for tests. + let scores = ReputationScores::new((1..=300).into(), vec![100, 10, 100, 100]); + ancestor_state_manager.set_propagation_scores(scores); + ancestor_state_manager.update_all_ancestors_state(); + + // Ancestor 1 can transition to EXCLUDE state now that the lock expired + // and its scores are below the threshold. + let state_map = ancestor_state_manager.get_ancestor_states(); + for (authority, state) in state_map.iter().enumerate() { + if authority == 1 { + assert_eq!(*state, AncestorState::Exclude(10)); + } else { + assert_eq!(*state, AncestorState::Include); + } + } + } +} diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index ee2e914e4659a..f1fb157f62e14 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -16,9 +16,10 @@ use tokio::{ sync::{broadcast, watch}, time::Instant, }; -use tracing::{debug, info, warn}; +use tracing::{debug, info, trace, warn}; use crate::{ + ancestor::{AncestorState, AncestorStateManager}, block::{ Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, Round, SignedBlock, Slot, VerifiedBlock, GENESIS_ROUND, @@ -96,6 +97,11 @@ pub(crate) struct Core { /// This is currently being used to avoid equivocations during a node recovering from amnesia. When value is None it means that /// the last block sync mechanism is enabled, but it hasn't been initialised yet. last_known_proposed_round: Option, + // The ancestor state manager will keep track of the quality of the authorities + // based on the distribution of their blocks to the network. It will use this + // information to decide whether to include that authority block in the next + // proposal or not. + ancestor_state_manager: AncestorStateManager, } impl Core { @@ -150,6 +156,14 @@ impl Core { Some(0) }; + let propagation_scores = leader_schedule + .leader_swap_table + .read() + .reputation_scores + .clone(); + let mut ancestor_state_manager = AncestorStateManager::new(context.clone()); + ancestor_state_manager.set_propagation_scores(propagation_scores); + Self { context: context.clone(), threshold_clock: ThresholdClock::new(0, context.clone()), @@ -166,6 +180,7 @@ impl Core { block_signer, dag_state, last_known_proposed_round: min_propose_round, + ancestor_state_manager, } .recover() } @@ -375,6 +390,37 @@ impl Core { } } + // Determine the ancestors to be included in proposal. + // Smart ancestor selection requires distributed scoring to be enabled. + let ancestors = if self + .context + .protocol_config + .consensus_distributed_vote_scoring_strategy() + && self + .context + .protocol_config + .consensus_smart_ancestor_selection() + { + let ancestors = self.smart_ancestors_to_propose(clock_round, !force); + + // If we did not find enough good ancestors to propose, continue to wait before proposing. + if ancestors.is_empty() { + assert!( + !force, + "Ancestors should have been returned if force is true!" + ); + return None; + } + ancestors + } else { + self.ancestors_to_propose(clock_round) + }; + + // Update the last included ancestor block refs + for ancestor in &ancestors { + self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference()); + } + let leader_authority = &self .context .committee @@ -397,14 +443,6 @@ impl Core { .with_label_values(&[leader_authority]) .inc(); - // TODO: produce the block for the clock_round. As the threshold clock can advance many rounds at once (ex - // because we synchronized a bulk of blocks) we can decide here whether we want to produce blocks per round - // or just the latest one. From earlier experiments I saw only benefit on proposing for the penultimate round - // only when the validator was supposed to be the leader of the round - so we bring down the missed leaders. - // Probably proposing for all the intermediate rounds might not make much sense. - - // Determine the ancestors to be included in proposal - let ancestors = self.ancestors_to_propose(clock_round); self.context .metrics .node_metrics @@ -498,7 +536,7 @@ impl Core { // Now acknowledge the transactions for their inclusion to block ack_transactions(verified_block.reference()); - info!("Created block {:?}", verified_block); + info!("Created block {verified_block:?} for round {clock_round}"); self.context .metrics @@ -542,6 +580,15 @@ impl Core { { self.leader_schedule .update_leader_schedule_v2(&self.dag_state); + + let propagation_scores = self + .leader_schedule + .leader_swap_table + .read() + .reputation_scores + .clone(); + self.ancestor_state_manager + .set_propagation_scores(propagation_scores); } else { self.leader_schedule .update_leader_schedule_v1(&self.dag_state); @@ -633,13 +680,16 @@ impl Core { self.subscriber_exists = exists; } - /// Sets the delay by round for propagating blocks to a quorum. - // TODO: Will set the quorum round per authority in ancestor state manager. + /// Sets the delay by round for propagating blocks to a quorum and the + /// quorum round per authority for ancestor state manager. pub(crate) fn set_propagation_delay_and_quorum_rounds( &mut self, delay: Round, - _quorum_rounds: Vec, + quorum_rounds: Vec, ) { + info!("Quorum round per authority in ancestor state manager set to: {quorum_rounds:?}"); + self.ancestor_state_manager + .set_quorum_round_per_authority(quorum_rounds); info!("Propagation round delay set to: {delay}"); self.propagation_delay = delay; } @@ -747,11 +797,6 @@ impl Core { ) .collect::>(); - // Update the last included ancestor block refs - for ancestor in &ancestors { - self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference()); - } - // TODO: this is for temporary sanity check - we might want to remove later on let mut quorum = StakeAggregator::::new(); for ancestor in ancestors @@ -765,6 +810,142 @@ impl Core { ancestors } + /// Retrieves the next ancestors to propose to form a block at `clock_round` round. + /// If smart selection is enabled then this will try to select the best ancestors + /// based on the propagation scores of the authorities. + fn smart_ancestors_to_propose( + &mut self, + clock_round: Round, + smart_select: bool, + ) -> Vec { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["Core::smart_ancestors_to_propose"]) + .start_timer(); + + // Now take the ancestors before the clock_round (excluded) for each authority. + let ancestors = self + .dag_state + .read() + .get_last_cached_block_per_authority(clock_round); + + assert_eq!( + ancestors.len(), + self.context.committee.size(), + "Fatal error, number of returned ancestors don't match committee size." + ); + + // Ensure ancestor state is up to date before selecting for proposal. + self.ancestor_state_manager.update_all_ancestors_state(); + let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states(); + + let quorum_round = clock_round.saturating_sub(1); + + let mut temp_excluded_ancestors = Vec::new(); + + // Propose only ancestors of higher rounds than what has already been proposed. + // And always include own last proposed block first among ancestors. + // Start by only including the high scoring ancestors. Low scoring ancestors + // will be included in a second pass below. + let included_ancestors = iter::once(self.last_proposed_block().clone()) + .chain( + ancestors + .into_iter() + .filter(|ancestor| ancestor.author() != self.context.own_index) + .flat_map(|ancestor| { + + let ancestor_state = ancestor_state_map[ancestor.author()]; + + match ancestor_state { + AncestorState::Include => { + trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}"); + } + AncestorState::Exclude(score) => { + trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}"); + temp_excluded_ancestors.push((score, ancestor)); + return None; + } + } + + if let Some(last_block_ref) = + self.last_included_ancestors[ancestor.author()] + { + return (last_block_ref.round < ancestor.round()).then_some(ancestor); + } + Some(ancestor) + }), + ) + .collect::>(); + + let mut parent_round_quorum = StakeAggregator::::new(); + + // Check total stake of high scoring parent round ancestors + for ancestor in included_ancestors + .iter() + .filter(|a| a.round() == quorum_round) + { + parent_round_quorum.add(ancestor.author(), &self.context.committee); + } + + if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) { + self.context.metrics.node_metrics.smart_selection_wait.inc(); + debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake()); + return vec![]; + } + + // Sort scores descending so we can include the best of the temp excluded + // ancestors first until we reach the threshold. + temp_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0)); + + let mut ancestors_to_propose = included_ancestors; + let mut excluded_ancestors = Vec::new(); + + for (score, ancestor) in temp_excluded_ancestors.into_iter() { + let block_hostname = &self.context.committee.authority(ancestor.author()).hostname; + if !parent_round_quorum.reached_threshold(&self.context.committee) + && ancestor.round() == quorum_round + { + debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}"); + parent_round_quorum.add(ancestor.author(), &self.context.committee); + ancestors_to_propose.push(ancestor); + self.context + .metrics + .node_metrics + .included_excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname, "strong"]) + .inc(); + } else { + excluded_ancestors.push((score, ancestor)); + } + } + + assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); + + for (score, ancestor) in excluded_ancestors.iter() { + let excluded_author = ancestor.author(); + let block_hostname = &self.context.committee.authority(excluded_author).hostname; + + trace!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}"); + self.context + .metrics + .node_metrics + .excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname]) + .inc(); + } + + info!( + "Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}", + ancestors_to_propose.len(), + excluded_ancestors.len() + ); + + ancestors_to_propose + } + /// Checks whether all the leaders of the round exist. /// TODO: we can leverage some additional signal here in order to more cleverly manipulate later the leader timeout /// Ex if we already have one leader - the first in order - we might don't want to wait as much. @@ -1610,6 +1791,269 @@ mod test { } } + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() { + telemetry_subscribers::init_for_testing(); + + // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time + // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each + // Core's clock will have initialised potentially with different values but it never advances. + // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time + // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the + // tokio clock. + async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) { + // Simulate the time wait before processing a block to ensure that block.timestamp <= now + let now = context.clock.timestamp_utc_ms(); + let max_timestamp = blocks + .iter() + .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs) + .map(|block| block.timestamp_ms()) + .unwrap_or(0); + + let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now)); + sleep(wait_time).await; + } + + let (context, _) = Context::new_for_test(4); + + // Create the cores for all authorities + let mut all_cores = create_cores(context, vec![1, 1, 1, 1]); + let (_last_core, cores) = all_cores.split_last_mut().unwrap(); + + // Create blocks for rounds 1..=30 from all Cores except last Core of authority 3. + let mut last_round_blocks = Vec::::new(); + for round in 1..=30 { + let mut this_round_blocks = Vec::new(); + + for core_fixture in cores.iter_mut() { + wait_blocks(&last_round_blocks, &core_fixture.core.context).await; + + core_fixture + .core + .add_blocks(last_round_blocks.clone()) + .unwrap(); + + // Only when round > 1 and using non-genesis parents. + if let Some(r) = last_round_blocks.first().map(|b| b.round()) { + assert_eq!(round - 1, r); + if core_fixture.core.last_proposed_round() == r { + // Force propose new block regardless of min round delay. + core_fixture + .core + .try_propose(true) + .unwrap() + .unwrap_or_else(|| { + panic!("Block should have been proposed for round {}", round) + }); + } + } + + assert_eq!(core_fixture.core.last_proposed_round(), round); + + this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); + } + + last_round_blocks = this_round_blocks; + } + + // Now produce blocks for all Cores + for round in 31..=40 { + let mut this_round_blocks = Vec::new(); + + for core_fixture in all_cores.iter_mut() { + wait_blocks(&last_round_blocks, &core_fixture.core.context).await; + + core_fixture + .core + .add_blocks(last_round_blocks.clone()) + .unwrap(); + + // Only when round > 1 and using non-genesis parents. + if let Some(r) = last_round_blocks.first().map(|b| b.round()) { + assert_eq!(round - 1, r); + if core_fixture.core.last_proposed_round() == r { + // Force propose new block regardless of min round delay. + core_fixture + .core + .try_propose(true) + .unwrap() + .unwrap_or_else(|| { + panic!("Block should have been proposed for round {}", round) + }); + } + } + + this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); + + for block in this_round_blocks.iter() { + if block.author() != AuthorityIndex::new_for_test(3) { + // Assert blocks created include only 3 ancestors per block as one + // should be excluded + assert_eq!(block.ancestors().len(), 3); + } else { + // Authority 3 is the low scoring authority so it will still include + // its own blocks. + assert_eq!(block.ancestors().len(), 4); + } + } + } + + last_round_blocks = this_round_blocks; + } + } + + #[tokio::test] + async fn test_smart_ancestor_selection() { + telemetry_subscribers::init_for_testing(); + let (context, mut key_pairs) = Context::new_for_test(7); + let context = Arc::new(context.with_parameters(Parameters { + sync_last_known_own_block_timeout: Duration::from_millis(2_000), + ..Default::default() + })); + + let store = Arc::new(MemStore::new()); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone()))); + + let block_manager = BlockManager::new( + context.clone(), + dag_state.clone(), + Arc::new(NoopBlockVerifier), + ); + let leader_schedule = Arc::new( + LeaderSchedule::from_store(context.clone(), dag_state.clone()) + .with_num_commits_per_schedule(10), + ); + + let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone()); + let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone()); + let (signals, signal_receivers) = CoreSignals::new(context.clone()); + // Need at least one subscriber to the block broadcast channel. + let _block_receiver = signal_receivers.block_broadcast_receiver(); + + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); + let commit_observer = CommitObserver::new( + context.clone(), + commit_consumer, + dag_state.clone(), + store.clone(), + leader_schedule.clone(), + ); + + let mut core = Core::new( + context.clone(), + leader_schedule, + transaction_consumer, + block_manager, + true, + commit_observer, + signals, + key_pairs.remove(context.own_index.value()).1, + dag_state.clone(), + true, + ); + + // No new block should have been produced + assert_eq!( + core.last_proposed_round(), + GENESIS_ROUND, + "No block should have been created other than genesis" + ); + + // Trying to explicitly propose a block will not produce anything + assert!(core.try_propose(true).unwrap().is_none()); + + // Create blocks for the whole network but not for authority 1 + let mut builder = DagBuilder::new(context.clone()); + builder + .layers(1..=12) + .authorities(vec![AuthorityIndex::new_for_test(1)]) + .skip_block() + .build(); + let blocks = builder.blocks(1..=12); + // Process all the blocks + assert!(core.add_blocks(blocks).unwrap().is_empty()); + core.set_last_known_proposed_round(12); + + let block = core.try_propose(true).expect("No error").unwrap(); + assert_eq!(block.round(), 13); + assert_eq!(block.ancestors().len(), 7); + + // Build blocks for rest of the network other than own index + builder + .layers(13..=14) + .authorities(vec![AuthorityIndex::new_for_test(0)]) + .skip_block() + .build(); + let blocks = builder.blocks(13..=14); + assert!(core.add_blocks(blocks).unwrap().is_empty()); + + // We now have triggered a leader schedule change so we should have + // one EXCLUDE ancestor when we go to select ancestors for the next proposal + let block = core.try_propose(true).expect("No error").unwrap(); + assert_eq!(block.round(), 15); + assert_eq!(block.ancestors().len(), 6); + + // Build blocks for a quorum of the network including the EXCLUDE ancestor + // which will trigger smart select and we will not propose a block + builder + .layer(15) + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(5), + AuthorityIndex::new_for_test(6), + ]) + .skip_block() + .build(); + let blocks = builder.blocks(15..=15); + // Wait for min round delay to allow blocks to be proposed. + sleep(context.parameters.min_round_delay).await; + // Smart select should be triggered and no block should be proposed. + assert!(core.add_blocks(blocks).unwrap().is_empty()); + assert_eq!(core.last_proposed_block().round(), 15); + + builder + .layer(15) + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(1), + AuthorityIndex::new_for_test(2), + AuthorityIndex::new_for_test(3), + AuthorityIndex::new_for_test(4), + ]) + .skip_block() + .build(); + let blocks = builder.blocks(15..=15); + // Have enough ancestor blocks to propose now. + assert!(core.add_blocks(blocks).unwrap().is_empty()); + assert_eq!(core.last_proposed_block().round(), 16); + + // Build blocks for a quorum of the network including the EXCLUDE ancestor + // which will trigger smart select and we will not propose a block. + // This time we will force propose by hitting the leader timeout after which + // should cause us to include this EXCLUDE ancestor. + builder + .layer(16) + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(5), + AuthorityIndex::new_for_test(6), + ]) + .skip_block() + .build(); + let blocks = builder.blocks(16..=16); + // Wait for leader timeout to force blocks to be proposed. + sleep(context.parameters.min_round_delay).await; + // Smart select should be triggered and no block should be proposed. + assert!(core.add_blocks(blocks).unwrap().is_empty()); + assert_eq!(core.last_proposed_block().round(), 16); + + // Simulate a leader timeout and a force proposal where we will include + // one EXCLUDE ancestor when we go to select ancestors for the next proposal + let block = core.try_propose(true).expect("No error").unwrap(); + assert_eq!(block.round(), 17); + assert_eq!(block.ancestors().len(), 5); + } + #[tokio::test] async fn test_core_set_subscriber_exists() { telemetry_subscribers::init_for_testing(); @@ -1675,7 +2119,7 @@ mod test { } #[tokio::test] - async fn test_core_set_propagation_delay() { + async fn test_core_set_propagation_delay_per_authority() { // TODO: create helper to avoid the duplicated code here. telemetry_subscribers::init_for_testing(); let (context, mut key_pairs) = Context::new_for_test(4); diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 83aada9cb05a7..1fc9a30a872e3 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -920,7 +920,7 @@ impl DagState { } pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores { - self.scoring_subdag.calculate_scores() + self.scoring_subdag.calculate_distributed_vote_scores() } pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex { diff --git a/consensus/core/src/leader_scoring.rs b/consensus/core/src/leader_scoring.rs index 7136daf7c5e30..51ab0a5e4aa2e 100644 --- a/consensus/core/src/leader_scoring.rs +++ b/consensus/core/src/leader_scoring.rs @@ -129,6 +129,10 @@ impl ReputationScores { } } + pub(crate) fn highest_score(&self) -> u64 { + *self.scores_per_authority.iter().max().unwrap_or(&0) + } + // Returns the authorities index with score tuples. pub(crate) fn authorities_by_score(&self, context: Arc) -> Vec<(AuthorityIndex, u64)> { self.scores_per_authority @@ -258,17 +262,9 @@ impl ScoringSubdag { } // Iterate through votes and calculate scores for each authority based on - // scoring strategy that is used. (Vote or CertifiedVote) - pub(crate) fn calculate_scores(&self) -> ReputationScores { - let _s = self - .context - .metrics - .node_metrics - .scope_processing_time - .with_label_values(&["ScoringSubdag::calculate_scores"]) - .start_timer(); - - let scores_per_authority = self.score_distributed_votes(); + // distributed vote scoring strategy. + pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores { + let scores_per_authority = self.distributed_votes_scores(); // TODO: Normalize scores ReputationScores::new( @@ -283,7 +279,15 @@ impl ScoringSubdag { /// Instead of only giving one point for each vote that is included in 2f+1 /// blocks. We give a score equal to the amount of stake of all blocks that /// included the vote. - fn score_distributed_votes(&self) -> Vec { + fn distributed_votes_scores(&self) -> Vec { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["ScoringSubdag::score_distributed_votes"]) + .start_timer(); + let num_authorities = self.context.committee.size(); let mut scores_per_authority = vec![0_u64; num_authorities]; @@ -299,29 +303,6 @@ impl ScoringSubdag { scores_per_authority } - /// This scoring strategy gives points equal to the amount of stake in blocks - /// that include the authority's vote, if that amount of total_stake > 2f+1. - /// We consider this a certified vote. - // TODO: This will be used for ancestor selection - #[allow(unused)] - fn score_certified_votes(&self) -> Vec { - let num_authorities = self.context.committee.size(); - let mut scores_per_authority = vec![0_u64; num_authorities]; - - for (vote, stake_agg) in self.votes.iter() { - let authority = vote.author; - if stake_agg.reached_threshold(&self.context.committee) { - let stake = stake_agg.stake(); - tracing::trace!( - "[{}] scores +{stake} reputation for {authority}!", - self.context.own_index, - ); - scores_per_authority[authority.value()] += stake; - } - } - scores_per_authority - } - pub(crate) fn scored_subdags_count(&self) -> usize { if let Some(commit_range) = &self.commit_range { commit_range.size() @@ -555,41 +536,11 @@ mod tests { scoring_subdag.add_subdags(vec![sub_dag]); } - let scores = scoring_subdag.calculate_scores(); + let scores = scoring_subdag.calculate_distributed_vote_scores(); assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]); assert_eq!(scores.commit_range, (1..=4).into()); } - #[tokio::test] - async fn test_certified_vote_scoring_subdag() { - telemetry_subscribers::init_for_testing(); - let context = Arc::new(Context::new_for_test(4).0); - - // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3. - let mut dag_builder = DagBuilder::new(context.clone()); - dag_builder.layers(1..=3).build(); - // Build round 4 but with just the leader block - dag_builder - .layer(4) - .authorities(vec![ - AuthorityIndex::new_for_test(1), - AuthorityIndex::new_for_test(2), - AuthorityIndex::new_for_test(3), - ]) - .skip_block() - .build(); - - let mut scoring_subdag = ScoringSubdag::new(context.clone()); - - for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) { - scoring_subdag.add_subdags(vec![sub_dag]); - } - - let scores_per_authority = scoring_subdag.score_certified_votes(); - assert_eq!(scores_per_authority, vec![4, 4, 4, 4]); - assert_eq!(scoring_subdag.commit_range.unwrap(), (1..=4).into()); - } - // TODO: Remove all tests below this when DistributedVoteScoring is enabled. #[tokio::test] async fn test_reputation_score_calculator() { diff --git a/consensus/core/src/lib.rs b/consensus/core/src/lib.rs index 36b980311f5d7..7cee617d64c35 100644 --- a/consensus/core/src/lib.rs +++ b/consensus/core/src/lib.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +mod ancestor; mod authority_node; mod authority_service; mod base_committer; diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 11d303f3cf65a..da29cfae33201 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -141,6 +141,10 @@ pub(crate) struct NodeMetrics { pub(crate) commit_round_advancement_interval: Histogram, pub(crate) last_decided_leader_round: IntGauge, pub(crate) leader_timeout_total: IntCounterVec, + pub(crate) smart_selection_wait: IntCounter, + pub(crate) ancestor_state_change_by_authority: IntCounterVec, + pub(crate) excluded_proposal_ancestors_count_by_authority: IntCounterVec, + pub(crate) included_excluded_proposal_ancestors_count_by_authority: IntCounterVec, pub(crate) missing_blocks_total: IntCounter, pub(crate) missing_blocks_after_fetch_total: IntCounter, pub(crate) num_of_bad_nodes: IntGauge, @@ -441,6 +445,29 @@ impl NodeMetrics { &["timeout_type"], registry, ).unwrap(), + smart_selection_wait: register_int_counter_with_registry!( + "smart_selection_wait", + "Number of times we waited for smart ancestor selection.", + registry, + ).unwrap(), + ancestor_state_change_by_authority: register_int_counter_vec_with_registry!( + "ancestor_state_change_by_authority", + "The total number of times an ancestor state changed to EXCLUDE or INCLUDE.", + &["authority", "state"], + registry, + ).unwrap(), + excluded_proposal_ancestors_count_by_authority: register_int_counter_vec_with_registry!( + "excluded_proposal_ancestors_count_by_authority", + "Total number of excluded ancestors per authority during proposal.", + &["authority"], + registry, + ).unwrap(), + included_excluded_proposal_ancestors_count_by_authority: register_int_counter_vec_with_registry!( + "included_excluded_proposal_ancestors_count_by_authority", + "Total number of ancestors per authority with 'excluded' status that got included in proposal. Either weak or strong type.", + &["authority", "type"], + registry, + ).unwrap(), missing_blocks_total: register_int_counter_with_registry!( "missing_blocks_total", "Total cumulative number of missing blocks", diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index a80ffe148f1f9..3ce8b04dc8ede 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -238,7 +238,7 @@ impl RoundProber { .set_propagation_delay_and_quorum_rounds(propagation_delay, quorum_rounds.clone()) { tracing::warn!( - "Failed to set propagation delay {propagation_delay} on Core: {:?}", + "Failed to set propagation delay and quorum rounds {quorum_rounds:?} on Core: {:?}", e ); } diff --git a/crates/sui-open-rpc/spec/openrpc.json b/crates/sui-open-rpc/spec/openrpc.json index f389b2fc27312..f33a0c653b67b 100644 --- a/crates/sui-open-rpc/spec/openrpc.json +++ b/crates/sui-open-rpc/spec/openrpc.json @@ -1307,6 +1307,7 @@ "consensus_distributed_vote_scoring_strategy": false, "consensus_order_end_of_epoch_last": true, "consensus_round_prober": false, + "consensus_smart_ancestor_selection": false, "disable_invariant_violation_check_in_swap_loc": false, "disallow_adding_abilities_on_upgrade": false, "disallow_change_struct_type_params_on_upgrade": false, diff --git a/crates/sui-protocol-config/src/lib.rs b/crates/sui-protocol-config/src/lib.rs index cbaef85171d26..0a994958dfd95 100644 --- a/crates/sui-protocol-config/src/lib.rs +++ b/crates/sui-protocol-config/src/lib.rs @@ -196,6 +196,7 @@ const MAX_PROTOCOL_VERSION: u64 = 69; // Further reduce minimum number of random beacon shares. // Disallow adding new modules in `deps-only` packages. // Version 69: Sets number of rounds allowed for fastpath voting in consensus. +// Enable smart ancestor selection in devnet. #[derive(Copy, Clone, Debug, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct ProtocolVersion(u64); @@ -565,6 +566,10 @@ struct FeatureFlags { #[serde(skip_serializing_if = "is_false")] disallow_new_modules_in_deps_only_packages: bool, + + // Use smart ancestor selection in consensus. + #[serde(skip_serializing_if = "is_false")] + consensus_smart_ancestor_selection: bool, } fn is_false(b: &bool) -> bool { @@ -1675,6 +1680,10 @@ impl ProtocolConfig { self.feature_flags .disallow_new_modules_in_deps_only_packages } + + pub fn consensus_smart_ancestor_selection(&self) -> bool { + self.feature_flags.consensus_smart_ancestor_selection + } } #[cfg(not(msim))] @@ -2944,6 +2953,11 @@ impl ProtocolConfig { 69 => { // Sets number of rounds allowed for fastpath voting in consensus. cfg.consensus_voting_rounds = Some(40); + + if chain != Chain::Mainnet && chain != Chain::Testnet { + // Enable smart ancestor selection for devnet + cfg.feature_flags.consensus_smart_ancestor_selection = true; + } } // Use this template when making changes: // diff --git a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_69.snap b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_69.snap index 2c42d9988aad7..ccca747ea3a97 100644 --- a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_69.snap +++ b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_69.snap @@ -72,6 +72,7 @@ feature_flags: relocate_event_module: true uncompressed_g1_group_elements: true disallow_new_modules_in_deps_only_packages: true + consensus_smart_ancestor_selection: true max_tx_size_bytes: 131072 max_input_objects: 2048 max_size_written_objects: 5000000