From 4b4e533f03530cc8eb9af1ed004aecfe95756e28 Mon Sep 17 00:00:00 2001 From: Arun Koshy <97870774+arun-koshy@users.noreply.github.com> Date: Sat, 18 Jan 2025 16:53:02 -0800 Subject: [PATCH] [CP] [consensus] Send excluded ancestors with new block messages (#20896) (#20924) In an effort to continue using smart ancestor selection we have to ensure that we are not sacrificing on block propagation. This PR adds excluded ancestors as part of the message sent when a new block is created which can then be optimistically fetched by peers if they don't have these block refs. --- Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: Co-authored-by: Mingwei Tian --- consensus/core/src/authority_service.rs | 112 ++++++-- consensus/core/src/block.rs | 14 +- consensus/core/src/block_manager.rs | 133 ++++++++++ consensus/core/src/broadcaster.rs | 16 +- consensus/core/src/core.rs | 267 ++++++++++++-------- consensus/core/src/core_thread.rs | 34 +++ consensus/core/src/dag_state.rs | 59 +++-- consensus/core/src/leader_timeout.rs | 7 + consensus/core/src/metrics.rs | 28 ++ consensus/core/src/network/anemo_network.rs | 6 +- consensus/core/src/network/mod.rs | 41 ++- consensus/core/src/network/network_tests.rs | 19 +- consensus/core/src/network/test_network.rs | 14 +- consensus/core/src/network/tonic_network.rs | 21 +- consensus/core/src/round_prober.rs | 9 +- consensus/core/src/subscriber.rs | 19 +- consensus/core/src/synchronizer.rs | 2 +- 17 files changed, 638 insertions(+), 163 deletions(-) diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index b22331d6d07cc..98893857eb177 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -14,7 +14,7 @@ use tokio_util::sync::ReusableBoxFuture; use tracing::{debug, info, warn}; use crate::{ - block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND}, + block::{BlockAPI as _, BlockRef, ExtendedBlock, SignedBlock, VerifiedBlock, GENESIS_ROUND}, block_verifier::BlockVerifier, commit::{CommitAPI as _, CommitRange, TrustedCommit}, commit_vote_monitor::CommitVoteMonitor, @@ -22,7 +22,7 @@ use crate::{ core_thread::CoreThreadDispatcher, dag_state::DagState, error::{ConsensusError, ConsensusResult}, - network::{BlockStream, NetworkService}, + network::{BlockStream, ExtendedSerializedBlock, NetworkService}, stake_aggregator::{QuorumThreshold, StakeAggregator}, storage::Store, synchronizer::SynchronizerHandle, @@ -38,7 +38,7 @@ pub(crate) struct AuthorityService { block_verifier: Arc, synchronizer: Arc, core_dispatcher: Arc, - rx_block_broadcaster: broadcast::Receiver, + rx_block_broadcaster: broadcast::Receiver, subscription_counter: Arc, dag_state: Arc>, store: Arc, @@ -51,7 +51,7 @@ impl AuthorityService { commit_vote_monitor: Arc, synchronizer: Arc, core_dispatcher: Arc, - rx_block_broadcaster: broadcast::Receiver, + rx_block_broadcaster: broadcast::Receiver, dag_state: Arc>, store: Arc, ) -> Self { @@ -78,7 +78,7 @@ impl NetworkService for AuthorityService { async fn handle_send_block( &self, peer: AuthorityIndex, - serialized_block: Bytes, + serialized_block: ExtendedSerializedBlock, ) -> ConsensusResult<()> { fail_point_async!("consensus-rpc-response"); @@ -86,7 +86,7 @@ impl NetworkService for AuthorityService { // TODO: dedup block verifications, here and with fetched blocks. let signed_block: SignedBlock = - bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?; + bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?; // Reject blocks not produced by the peer. if peer != signed_block.author() { @@ -113,7 +113,7 @@ impl NetworkService for AuthorityService { info!("Invalid block from {}: {}", peer, e); return Err(e); } - let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block); + let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block); let block_ref = verified_block.reference(); debug!("Received block {} via send block.", block_ref); @@ -225,6 +225,75 @@ impl NetworkService for AuthorityService { } } + // ------------ After processing the block, process the excluded ancestors ------------ + + let mut excluded_ancestors = serialized_block + .excluded_ancestors + .into_iter() + .map(|serialized| bcs::from_bytes::(&serialized)) + .collect::, bcs::Error>>() + .map_err(ConsensusError::MalformedBlock)?; + + let excluded_ancestors_limit = self.context.committee.size() * 2; + if excluded_ancestors.len() > excluded_ancestors_limit { + debug!( + "Dropping {} excluded ancestor(s) from {} {} due to size limit", + excluded_ancestors.len() - excluded_ancestors_limit, + peer, + peer_hostname, + ); + excluded_ancestors.truncate(excluded_ancestors_limit); + } + + self.context + .metrics + .node_metrics + .network_received_excluded_ancestors_from_authority + .with_label_values(&[peer_hostname]) + .inc_by(excluded_ancestors.len() as u64); + + for excluded_ancestor in &excluded_ancestors { + let excluded_ancestor_hostname = &self + .context + .committee + .authority(excluded_ancestor.author) + .hostname; + self.context + .metrics + .node_metrics + .network_excluded_ancestors_count_by_authority + .with_label_values(&[excluded_ancestor_hostname]) + .inc(); + } + + let missing_excluded_ancestors = self + .core_dispatcher + .check_block_refs(excluded_ancestors) + .await + .map_err(|_| ConsensusError::Shutdown)?; + + if !missing_excluded_ancestors.is_empty() { + self.context + .metrics + .node_metrics + .network_excluded_ancestors_sent_to_fetch + .with_label_values(&[peer_hostname]) + .inc_by(missing_excluded_ancestors.len() as u64); + + let synchronizer = self.synchronizer.clone(); + tokio::spawn(async move { + // schedule the fetching of them from this peer in the background + if let Err(err) = synchronizer + .fetch_blocks(missing_excluded_ancestors, peer) + .await + { + warn!( + "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}" + ); + } + }); + } + Ok(()) } @@ -243,7 +312,10 @@ impl NetworkService for AuthorityService { dag_state .get_cached_blocks(self.context.own_index, last_received + 1) .into_iter() - .map(|block| block.serialized().clone()), + .map(|block| ExtendedSerializedBlock { + block: block.serialized().clone(), + excluded_ancestors: vec![], + }), ); let broadcasted_blocks = BroadcastedBlockStream::new( @@ -254,7 +326,7 @@ impl NetworkService for AuthorityService { // Return a stream of blocks that first yields missed blocks as requested, then new blocks. Ok(Box::pin(missed_blocks.chain( - broadcasted_blocks.map(|block| block.serialized().clone()), + broadcasted_blocks.map(ExtendedSerializedBlock::from), ))) } @@ -423,7 +495,7 @@ impl NetworkService for AuthorityService { .get_last_cached_block_per_authority(Round::MAX); let highest_accepted_rounds = blocks .into_iter() - .map(|block| block.round()) + .map(|(block, _)| block.round()) .collect::>(); // Own blocks do not go through the core dispatcher, so they need to be set separately. @@ -516,7 +588,7 @@ impl SubscriptionCounter { /// Each broadcasted block stream wraps a broadcast receiver for blocks. /// It yields blocks that are broadcasted after the stream is created. -type BroadcastedBlockStream = BroadcastStream; +type BroadcastedBlockStream = BroadcastStream; /// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that /// this tolerates lags with only logging, without yielding errors. @@ -612,15 +684,14 @@ async fn make_recv_future( mod tests { use crate::{ authority_service::AuthorityService, - block::BlockAPI, - block::{BlockRef, SignedBlock, TestBlock, VerifiedBlock}, + block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock}, commit::CommitRange, commit_vote_monitor::CommitVoteMonitor, context::Context, core_thread::{CoreError, CoreThreadDispatcher}, dag_state::DagState, error::ConsensusResult, - network::{BlockStream, NetworkClient, NetworkService}, + network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService}, round_prober::QuorumRound, storage::mem_store::MemStore, synchronizer::Synchronizer, @@ -664,6 +735,13 @@ mod tests { Ok(block_refs) } + async fn check_block_refs( + &self, + _block_refs: Vec, + ) -> Result, CoreError> { + Ok(BTreeSet::new()) + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { Ok(()) } @@ -797,7 +875,11 @@ mod tests { ); let service = authority_service.clone(); - let serialized = input_block.serialized().clone(); + let serialized = ExtendedSerializedBlock { + block: input_block.serialized().clone(), + excluded_ancestors: vec![], + }; + tokio::spawn(async move { service .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized) diff --git a/consensus/core/src/block.rs b/consensus/core/src/block.rs index 9327560348b67..1a636fabfc610 100644 --- a/consensus/core/src/block.rs +++ b/consensus/core/src/block.rs @@ -19,7 +19,10 @@ use serde::{Deserialize, Serialize}; use shared_crypto::intent::{Intent, IntentMessage, IntentScope}; use crate::{ - commit::CommitVote, context::Context, ensure, error::ConsensusError, error::ConsensusResult, + commit::CommitVote, + context::Context, + ensure, + error::{ConsensusError, ConsensusResult}, }; /// Round number of a block. @@ -638,6 +641,15 @@ impl fmt::Debug for VerifiedBlock { } } +/// Block with extended additional information, such as +/// local blocks that are excluded from the block's ancestors. +/// The extended information do not need to be certified or forwarded to other authorities. +#[derive(Clone, Debug)] +pub(crate) struct ExtendedBlock { + pub block: VerifiedBlock, + pub excluded_ancestors: Vec, +} + /// Generates the genesis blocks for the current Committee. /// The blocks are returned in authority index order. pub(crate) fn genesis_blocks(context: Arc) -> Vec { diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index 4e54065c2b49c..8ca177944d114 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -144,6 +144,54 @@ impl BlockManager { (accepted_blocks, missing_blocks) } + /// Tries to find the provided block_refs in DagState and BlockManager, + /// and returns missing block refs. + pub(crate) fn try_find_blocks(&mut self, mut block_refs: Vec) -> BTreeSet { + let _s = monitored_scope("BlockManager::try_find_blocks"); + + block_refs.sort_by_key(|b| b.round); + debug!( + "Trying to find blocks: {}", + block_refs.iter().map(|b| b.to_string()).join(",") + ); + + let mut missing_blocks = BTreeSet::new(); + + for (found, block_ref) in self + .dag_state + .read() + .contains_blocks(block_refs.clone()) + .into_iter() + .zip(block_refs.iter()) + { + if found || self.suspended_blocks.contains_key(block_ref) { + continue; + } + // Fetches the block if it is not in dag state or suspended. + missing_blocks.insert(*block_ref); + if self.missing_blocks.insert(*block_ref) { + let block_ref_hostname = + &self.context.committee.authority(block_ref.author).hostname; + self.context + .metrics + .node_metrics + .block_manager_missing_blocks_by_authority + .with_label_values(&[block_ref_hostname]) + .inc(); + } + } + + let metrics = &self.context.metrics.node_metrics; + metrics + .missing_blocks_total + .inc_by(missing_blocks.len() as u64); + metrics + .block_manager_missing_blocks + .set(self.missing_blocks.len() as i64); + + missing_blocks + } + // TODO: remove once timestamping is refactored to the new approach. // Verifies each block's timestamp based on its ancestors, and persists in store all the valid blocks that should be accepted. Method // returns the accepted and persisted blocks. @@ -314,6 +362,7 @@ impl BlockManager { // Add the ancestor to the missing blocks set only if it doesn't already exist in the suspended blocks - meaning // that we already have its payload. if !self.suspended_blocks.contains_key(ancestor) { + // Fetches the block if it is not in dag state or suspended. ancestors_to_fetch.insert(*ancestor); if self.missing_blocks.insert(*ancestor) { self.context @@ -1067,4 +1116,88 @@ mod tests { // Other blocks should be rejected and there should be no remaining suspended block. assert!(block_manager.suspended_blocks().is_empty()); } + + #[tokio::test] + async fn try_find_blocks() { + // GIVEN + let (context, _key_pairs) = Context::new_for_test(4); + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone()))); + + let mut block_manager = + BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier)); + + // create a DAG + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder + .layers(1..=2) // 2 rounds + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(2), + ]) // Create equivocating blocks for 2 authorities + .equivocate(3) + .build(); + + // Take only the blocks of round 2 and try to accept them + let round_2_blocks = dag_builder + .blocks + .iter() + .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone())) + .collect::>(); + + // All blocks should be missing + let missing_block_refs_from_find = + block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect()); + assert_eq!(missing_block_refs_from_find.len(), 10); + assert!(missing_block_refs_from_find + .iter() + .all(|block_ref| block_ref.round == 2)); + + // Try accept blocks which will cause blocks to be suspended and added to missing + // in block manager. + let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone()); + assert!(accepted_blocks.is_empty()); + + let missing_block_refs = round_2_blocks.first().unwrap().ancestors(); + let missing_block_refs_from_accept = + missing_block_refs.iter().cloned().collect::>(); + assert_eq!(missing, missing_block_refs_from_accept); + assert_eq!( + block_manager.missing_blocks(), + missing_block_refs_from_accept + ); + + // No blocks should be accepted and block manager should have made note + // of the missing & suspended blocks. + // Now we can check get the result of try find block with all of the blocks + // from newly created but not accepted round 3. + dag_builder.layer(3).build(); + + let round_3_blocks = dag_builder + .blocks + .iter() + .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference())) + .collect::>(); + + let missing_block_refs_from_find = block_manager.try_find_blocks( + round_2_blocks + .iter() + .map(|b| b.reference()) + .chain(round_3_blocks.into_iter()) + .collect(), + ); + + assert_eq!(missing_block_refs_from_find.len(), 4); + assert!(missing_block_refs_from_find + .iter() + .all(|block_ref| block_ref.round == 3)); + assert_eq!( + block_manager.missing_blocks(), + missing_block_refs_from_accept + .into_iter() + .chain(missing_block_refs_from_find.into_iter()) + .collect() + ); + } } diff --git a/consensus/core/src/broadcaster.rs b/consensus/core/src/broadcaster.rs index ee3b9b0e1d77c..c8298b482b985 100644 --- a/consensus/core/src/broadcaster.rs +++ b/consensus/core/src/broadcaster.rs @@ -17,7 +17,7 @@ use tokio::{ use tracing::{trace, warn}; use crate::{ - block::{BlockAPI as _, VerifiedBlock}, + block::{BlockAPI as _, ExtendedBlock, VerifiedBlock}, context::Context, core::CoreSignalsReceivers, error::ConsensusResult, @@ -74,7 +74,7 @@ impl Broadcaster { async fn push_blocks( context: Arc, network_client: Arc, - mut rx_block_broadcast: broadcast::Receiver, + mut rx_block_broadcast: broadcast::Receiver, peer: AuthorityIndex, ) { let peer_hostname = &context.committee.authority(peer).hostname; @@ -126,7 +126,8 @@ impl Broadcaster { tokio::select! { result = rx_block_broadcast.recv(), if requests.len() < BROADCAST_CONCURRENCY => { let block = match result { - Ok(block) => block, + // Other info from ExtendedBlock are ignored, because Broadcaster is not used in production. + Ok(block) => block.block, Err(broadcast::error::RecvError::Closed) => { trace!("Sender to {peer} is shutting down!"); return; @@ -196,7 +197,7 @@ mod test { use super::*; use crate::{ - block::{BlockRef, TestBlock}, + block::{BlockRef, ExtendedBlock, TestBlock}, commit::CommitRange, core::CoreSignals, network::BlockStream, @@ -295,7 +296,12 @@ mod test { let block = VerifiedBlock::new_for_test(TestBlock::new(9, 1).build()); assert!( - core_signals.new_block(block.clone()).is_ok(), + core_signals + .new_block(ExtendedBlock { + block: block.clone(), + excluded_ancestors: vec![], + }) + .is_ok(), "No subscriber active to receive the block" ); diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 7aae08598d537..27faee49f1ff5 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -21,8 +21,8 @@ use tracing::{debug, info, trace, warn}; use crate::{ ancestor::{AncestorState, AncestorStateManager}, block::{ - Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, Round, SignedBlock, Slot, - VerifiedBlock, GENESIS_ROUND, + Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, ExtendedBlock, Round, SignedBlock, + Slot, VerifiedBlock, GENESIS_ROUND, }, block_manager::BlockManager, commit::CommittedSubDag, @@ -197,7 +197,7 @@ impl Core { .get_last_cached_block_per_authority(Round::MAX); let max_ancestor_timestamp = ancestor_blocks .iter() - .fold(0, |ts, b| ts.max(b.timestamp_ms())); + .fold(0, |ts, (b, _)| ts.max(b.timestamp_ms())); let wait_ms = max_ancestor_timestamp.saturating_sub(self.context.clock.timestamp_utc_ms()); if wait_ms > 0 { warn!( @@ -223,7 +223,12 @@ impl Core { } // if no new block proposed then just re-broadcast the last proposed one to ensure liveness. - self.signals.new_block(last_proposed_block.clone()).unwrap(); + self.signals + .new_block(ExtendedBlock { + block: last_proposed_block.clone(), + excluded_ancestors: vec![], + }) + .unwrap(); last_proposed_block }; @@ -237,7 +242,6 @@ impl Core { /// Processes the provided blocks and accepts them if possible when their causal history exists. /// The method returns: - /// - The references of accepted blocks /// - The references of ancestors missing their block pub(crate) fn add_blocks( &mut self, @@ -288,6 +292,39 @@ impl Core { Ok(missing_block_refs) } + /// Checks if provided block refs have been accepted. If not, missing block refs are kept for synchronizations. + /// Returns the references of missing blocks among the input blocks. + pub(crate) fn check_block_refs( + &mut self, + block_refs: Vec, + ) -> ConsensusResult> { + let _scope = monitored_scope("Core::check_block_refs"); + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["Core::check_block_refs"]) + .start_timer(); + self.context + .metrics + .node_metrics + .core_check_block_refs_batch_size + .observe(block_refs.len() as f64); + + // Try to find them via the block manager + let missing_block_refs = self.block_manager.try_find_blocks(block_refs); + + if !missing_block_refs.is_empty() { + trace!( + "Missing block refs: {}", + missing_block_refs.iter().map(|b| b.to_string()).join(", ") + ); + } + + Ok(missing_block_refs) + } + /// Adds/processed all the newly `accepted_blocks`. We basically try to move the threshold clock and add them to the /// pending ancestors list. fn add_accepted_blocks(&mut self, accepted_blocks: Vec) { @@ -336,21 +373,21 @@ impl Core { if !self.should_propose() { return Ok(None); } - if let Some(block) = self.try_new_block(force) { - self.signals.new_block(block.clone())?; + if let Some(extended_block) = self.try_new_block(force) { + self.signals.new_block(extended_block.clone())?; fail_point!("consensus-after-propose"); // The new block may help commit. self.try_commit()?; - return Ok(Some(block)); + return Ok(Some(extended_block.block)); } Ok(None) } /// Attempts to propose a new block for the next round. If a block has already proposed for latest /// or earlier round, then no block is created and None is returned. - fn try_new_block(&mut self, force: bool) -> Option { + fn try_new_block(&mut self, force: bool) -> Option { let _s = self .context .metrics @@ -387,7 +424,7 @@ impl Core { // Determine the ancestors to be included in proposal. // Smart ancestor selection requires distributed scoring to be enabled. - let ancestors = if self + let (ancestors, excluded_ancestors) = if self .context .protocol_config .consensus_distributed_vote_scoring_strategy() @@ -396,7 +433,8 @@ impl Core { .protocol_config .consensus_smart_ancestor_selection() { - let ancestors = self.smart_ancestors_to_propose(clock_round, !force); + let (ancestors, excluded_and_equivocating_ancestors) = + self.smart_ancestors_to_propose(clock_round, !force); // If we did not find enough good ancestors to propose, continue to wait before proposing. if ancestors.is_empty() { @@ -406,9 +444,22 @@ impl Core { ); return None; } - ancestors + + let excluded_ancestors_limit = self.context.committee.size() * 2; + if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit { + debug!( + "Dropping {} excluded ancestor(s) during proposal due to size limit", + excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit, + ); + } + let excluded_ancestors = excluded_and_equivocating_ancestors + .into_iter() + .take(excluded_ancestors_limit) + .collect(); + + (ancestors, excluded_ancestors) } else { - self.ancestors_to_propose(clock_round) + (self.ancestors_to_propose(clock_round), vec![]) }; // Update the last included ancestor block refs @@ -545,7 +596,10 @@ impl Core { .with_label_values(&[&force.to_string()]) .inc(); - Some(verified_block) + Some(ExtendedBlock { + block: verified_block, + excluded_ancestors, + }) } /// Runs commit rule to attempt to commit additional blocks from the DAG. @@ -804,20 +858,20 @@ impl Core { // Propose only ancestors of higher rounds than what has already been proposed. // And always include own last proposed block first among ancestors. - let last_proposed_block = ancestors[self.context.own_index].clone(); + let (last_proposed_block, _) = ancestors[self.context.own_index].clone(); assert_eq!(last_proposed_block.author(), self.context.own_index); let ancestors = iter::once(last_proposed_block) .chain( ancestors .into_iter() - .filter(|block| block.author() != self.context.own_index) - .filter(|block| { + .filter(|(block, _)| block.author() != self.context.own_index) + .filter(|(block, _)| { if gc_enabled && gc_round > GENESIS_ROUND { return block.round() > gc_round; } true }) - .flat_map(|block| { + .flat_map(|(block, _)| { if let Some(last_block_ref) = self.last_included_ancestors[block.author()] { return (last_block_ref.round < block.round()).then_some(block); } @@ -846,7 +900,7 @@ impl Core { &mut self, clock_round: Round, smart_select: bool, - ) -> Vec { + ) -> (Vec, BTreeSet) { let node_metrics = &self.context.metrics.node_metrics; let _s = node_metrics .scope_processing_time @@ -854,13 +908,13 @@ impl Core { .start_timer(); // Now take the ancestors before the clock_round (excluded) for each authority. - let ancestors = self + let all_ancestors = self .dag_state .read() .get_last_cached_block_per_authority(clock_round); assert_eq!( - ancestors.len(), + all_ancestors.len(), self.context.committee.size(), "Fatal error, number of returned ancestors don't match committee size." ); @@ -871,7 +925,8 @@ impl Core { let quorum_round = clock_round.saturating_sub(1); - let mut temp_excluded_ancestors = Vec::new(); + let mut score_and_pending_excluded_ancestors = Vec::new(); + let mut excluded_and_equivocating_ancestors = BTreeSet::new(); // Propose only ancestors of higher rounds than what has already been proposed. // And always include own last proposed block first among ancestors. @@ -879,29 +934,35 @@ impl Core { // will be included in a second pass below. let included_ancestors = iter::once(self.last_proposed_block().clone()) .chain( - ancestors + all_ancestors .into_iter() - .filter(|ancestor| ancestor.author() != self.context.own_index) - .flat_map(|ancestor| { + .flat_map(|(ancestor, equivocating_ancestors)| { + if ancestor.author() == self.context.own_index { + return None; + } + if let Some(last_block_ref) = + self.last_included_ancestors[ancestor.author()] + { + if last_block_ref.round >= ancestor.round() { + return None; + } + } - let ancestor_state = ancestor_state_map[ancestor.author()]; + // We will never include equivocating ancestors so add them immediately + excluded_and_equivocating_ancestors.extend(equivocating_ancestors); + let ancestor_state = ancestor_state_map[ancestor.author()]; match ancestor_state { AncestorState::Include => { trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}"); } AncestorState::Exclude(score) => { trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}"); - temp_excluded_ancestors.push((score, ancestor)); + score_and_pending_excluded_ancestors.push((score, ancestor)); return None; } } - if let Some(last_block_ref) = - self.last_included_ancestors[ancestor.author()] - { - return (last_block_ref.round < ancestor.round()).then_some(ancestor); - } Some(ancestor) }), ) @@ -920,22 +981,21 @@ impl Core { if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) { node_metrics.smart_selection_wait.inc(); debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake()); - return vec![]; + return (vec![], BTreeSet::new()); } - // Sort scores descending so we can include the best of the temp excluded + // Sort scores descending so we can include the best of the pending excluded // ancestors first until we reach the threshold. - temp_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0)); + score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0)); let mut ancestors_to_propose = included_ancestors; let mut excluded_ancestors = Vec::new(); - - for (score, ancestor) in temp_excluded_ancestors.into_iter() { + for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() { let block_hostname = &self.context.committee.authority(ancestor.author()).hostname; if !parent_round_quorum.reached_threshold(&self.context.committee) && ancestor.round() == quorum_round { - debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}"); + debug!("Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"); parent_round_quorum.add(ancestor.author(), &self.context.committee); ancestors_to_propose.push(ancestor); node_metrics @@ -948,8 +1008,8 @@ impl Core { } // Include partially propagated blocks from excluded authorities, to help propagate the blocks - // across the network with less latency impact. - // TODO: use a separate mechanism to propagate excluded ancestor blocks and remove this logic. + // across the network with less latency impact. Other excluded ancestors are not included in the block + // but still broadcasted to peers. for (score, ancestor) in excluded_ancestors.iter() { let excluded_author = ancestor.author(); let block_hostname = &self.context.committee.authority(excluded_author).hostname; @@ -966,11 +1026,16 @@ impl Core { let last_included_round = self.last_included_ancestors[excluded_author] .map(|block_ref| block_ref.round) .unwrap_or(GENESIS_ROUND); - if last_included_round >= accepted_low_quorum_round { - trace!( - "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {} >= accepted low quorum round {}", - ancestor.reference(), last_included_round, accepted_low_quorum_round, - ); + if ancestor.round() <= last_included_round { + // This should have already been filtered out when filtering all_ancestors. + // Still, ensure previously included ancestors are filtered out. + continue; + } + + // This ancestor has not propagated well, so it cannot be included / voted on. + if ancestor.round() > accepted_low_quorum_round { + excluded_and_equivocating_ancestors.insert(ancestor.reference()); + trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: too few validators have seen it", ancestor.reference()); node_metrics .excluded_proposal_ancestors_count_by_authority .with_label_values(&[block_hostname]) @@ -978,25 +1043,8 @@ impl Core { continue; } - // Include the ancestor block as it has been seen & accepted by a strong quorum. - let ancestor = if ancestor.round() == accepted_low_quorum_round { - ancestor.clone() - } else { - // Only cached blocks need to be propagated. Committed and GC'ed blocks do not need to be propagated. - let Some(ancestor) = self.dag_state.read().get_last_cached_block_in_range( - excluded_author, - last_included_round + 1, - accepted_low_quorum_round + 1, - ) else { - trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: no suitable block found", ancestor.reference()); - node_metrics - .excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname]) - .inc(); - continue; - }; - ancestor - }; + // Otherwise, include the ancestor block as it has been seen & accepted by a strong quorum. + // Only cached blocks need to be propagated. Committed and GC'ed blocks do not need to be propagated. self.last_included_ancestors[excluded_author] = Some(ancestor.reference()); ancestors_to_propose.push(ancestor.clone()); trace!("Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}", ancestor.reference()); @@ -1009,12 +1057,12 @@ impl Core { assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); info!( - "Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}", + "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}", ancestors_to_propose.len(), - excluded_ancestors.len() + excluded_and_equivocating_ancestors.len() ); - ancestors_to_propose + (ancestors_to_propose, excluded_and_equivocating_ancestors) } /// Checks whether all the leaders of the round exist. @@ -1063,7 +1111,7 @@ impl Core { /// Senders of signals from Core, for outputs and events (ex new block produced). pub(crate) struct CoreSignals { - tx_block_broadcast: broadcast::Sender, + tx_block_broadcast: broadcast::Sender, new_round_sender: watch::Sender, context: Arc, } @@ -1073,7 +1121,7 @@ impl CoreSignals { // Blocks buffered in broadcast channel should be roughly equal to thosed cached in dag state, // since the underlying blocks are ref counted so a lower buffer here will not reduce memory // usage significantly. - let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::( + let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::( context.parameters.dag_state_cached_rounds as usize, ); let (new_round_sender, new_round_receiver) = watch::channel(0); @@ -1094,21 +1142,23 @@ impl CoreSignals { /// Sends a signal to all the waiters that a new block has been produced. The method will return /// true if block has reached even one subscriber, false otherwise. - pub(crate) fn new_block(&self, block: VerifiedBlock) -> ConsensusResult<()> { + pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> { // When there is only one authority in committee, it is unnecessary to broadcast // the block which will fail anyway without subscribers to the signal. if self.context.committee.size() > 1 { - if block.round() == GENESIS_ROUND { + if extended_block.block.round() == GENESIS_ROUND { debug!("Ignoring broadcasting genesis block to peers"); return Ok(()); } - if let Err(err) = self.tx_block_broadcast.send(block) { + if let Err(err) = self.tx_block_broadcast.send(extended_block) { warn!("Couldn't broadcast the block to any receiver: {err}"); return Err(ConsensusError::Shutdown); } } else { - debug!("Did not broadcast block {block:?} to receivers as committee size is <= 1"); + debug!( + "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1" + ); } Ok(()) } @@ -1123,12 +1173,12 @@ impl CoreSignals { /// Receivers of signals from Core. /// Intentionally un-clonable. Comonents should only subscribe to channels they need. pub(crate) struct CoreSignalsReceivers { - rx_block_broadcast: broadcast::Receiver, + rx_block_broadcast: broadcast::Receiver, new_round_receiver: watch::Receiver, } impl CoreSignalsReceivers { - pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver { + pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver { self.rx_block_broadcast.resubscribe() } @@ -1155,7 +1205,7 @@ pub(crate) fn create_cores(context: Context, authorities: Vec) -> Vec, + pub block_receiver: broadcast::Receiver, #[allow(unused)] pub commit_receiver: UnboundedReceiver, pub store: Arc, @@ -1341,8 +1391,8 @@ mod test { .recv() .await .expect("A block should have been created"); - assert_eq!(proposed_block.round(), 5); - let ancestors = proposed_block.ancestors(); + assert_eq!(proposed_block.block.round(), 5); + let ancestors = proposed_block.block.ancestors(); // Only ancestors of round 4 should be included. assert_eq!(ancestors.len(), 4); @@ -1463,8 +1513,8 @@ mod test { .recv() .await .expect("A block should have been created"); - assert_eq!(proposed_block.round(), 4); - let ancestors = proposed_block.ancestors(); + assert_eq!(proposed_block.block.round(), 4); + let ancestors = proposed_block.block.ancestors(); assert_eq!(ancestors.len(), 4); for ancestor in ancestors { @@ -1562,18 +1612,18 @@ mod test { } // a new block should have been created during recovery. - let block = block_receiver + let extended_block = block_receiver .recv() .await .expect("A new block should have been created"); // A new block created - assert the details - assert_eq!(block.round(), 1); - assert_eq!(block.author().value(), 0); - assert_eq!(block.ancestors().len(), 4); + assert_eq!(extended_block.block.round(), 1); + assert_eq!(extended_block.block.author().value(), 0); + assert_eq!(extended_block.block.ancestors().len(), 4); let mut total = 0; - for (i, transaction) in block.transactions().iter().enumerate() { + for (i, transaction) in extended_block.block.transactions().iter().enumerate() { total += transaction.data().len() as u64; let transaction: String = bcs::from_bytes(transaction.data()).unwrap(); assert_eq!(format!("Transaction {i}"), transaction); @@ -1583,7 +1633,7 @@ mod test { // genesis blocks should be referenced let all_genesis = genesis_blocks(context); - for ancestor in block.ancestors() { + for ancestor in extended_block.block.ancestors() { all_genesis .iter() .find(|block| block.reference() == *ancestor) @@ -2439,15 +2489,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); @@ -2564,15 +2617,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); @@ -2698,15 +2754,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); @@ -2850,15 +2909,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); @@ -2982,15 +3044,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index d76a30d059aa7..87d1ea5215d38 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -36,6 +36,8 @@ const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000; enum CoreThreadCommand { /// Add blocks to be processed and accepted AddBlocks(Vec, oneshot::Sender>), + /// Checks if block refs exist locally and sync missing ones. + CheckBlockRefs(Vec, oneshot::Sender>), /// Called when the min round has passed or the leader timeout occurred and a block should be produced. /// When the command is called with `force = true`, then the block will be created for `round` skipping /// any checks (ex leader existence of previous round). More information can be found on the `Core` component. @@ -57,6 +59,11 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static { async fn add_blocks(&self, blocks: Vec) -> Result, CoreError>; + async fn check_block_refs( + &self, + block_refs: Vec, + ) -> Result, CoreError>; + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>; async fn get_missing_blocks(&self) -> Result, CoreError>; @@ -121,6 +128,11 @@ impl CoreThread { let missing_block_refs = self.core.add_blocks(blocks)?; sender.send(missing_block_refs).ok(); } + CoreThreadCommand::CheckBlockRefs(blocks, sender) => { + let _scope = monitored_scope("CoreThread::loop::find_excluded_blocks"); + let missing_block_refs = self.core.check_block_refs(blocks)?; + sender.send(missing_block_refs).ok(); + } CoreThreadCommand::NewBlock(round, sender, force) => { let _scope = monitored_scope("CoreThread::loop::new_block"); self.core.new_block(round, force)?; @@ -283,6 +295,21 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher { Ok(missing_block_refs) } + async fn check_block_refs( + &self, + block_refs: Vec, + ) -> Result, CoreError> { + let (sender, receiver) = oneshot::channel(); + self.send(CoreThreadCommand::CheckBlockRefs( + block_refs.clone(), + sender, + )) + .await; + let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?; + + Ok(missing_block_refs) + } + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> { let (sender, receiver) = oneshot::channel(); self.send(CoreThreadCommand::NewBlock(round, sender, force)) @@ -380,6 +407,13 @@ impl CoreThreadDispatcher for MockCoreThreadDispatcher { Ok(BTreeSet::new()) } + async fn check_block_refs( + &self, + _block_refs: Vec, + ) -> Result, CoreError> { + Ok(BTreeSet::new()) + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { Ok(()) } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 3ff804d02da8d..8209bf1d2f0e7 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -576,6 +576,7 @@ impl DagState { // Retrieves the cached block within the range [start_round, end_round) from a given authority. // NOTE: end_round must be greater than GENESIS_ROUND. + #[cfg(test)] pub(crate) fn get_last_cached_block_in_range( &self, authority: AuthorityIndex, @@ -608,13 +609,15 @@ impl DagState { /// The method is guaranteed to return results only when the `end_round` is not earlier of the /// available cached data for each authority (evicted round + 1), otherwise the method will panic. /// It's the caller's responsibility to ensure that is not requesting for earlier rounds. - /// In case of equivocation for an authority's last slot only one block will be returned (the last in order). + /// In case of equivocation for an authority's last slot, one block will be returned (the last in order) + /// and the other equivocating blocks will be returned. pub(crate) fn get_last_cached_block_per_authority( &self, end_round: Round, - ) -> Vec { - // init with the genesis blocks as fallback + ) -> Vec<(VerifiedBlock, Vec)> { + // Initialize with the genesis blocks as fallback let mut blocks = self.genesis.values().cloned().collect::>(); + let mut equivocating_blocks = vec![vec![]; self.context.committee.size()]; if end_round == GENESIS_ROUND { panic!( @@ -623,7 +626,7 @@ impl DagState { } if end_round == GENESIS_ROUND + 1 { - return blocks; + return blocks.into_iter().map(|b| (b, vec![])).collect(); } for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() { @@ -638,7 +641,7 @@ impl DagState { panic!("Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}", ); } - if let Some(block_ref) = block_refs + let block_ref_iter = block_refs .range(( Included(BlockRef::new( last_evicted_round + 1, @@ -647,18 +650,27 @@ impl DagState { )), Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)), )) - .next_back() - { - let block_info = self - .recent_blocks - .get(block_ref) - .expect("Block should exist in recent blocks"); - - blocks[authority_index] = block_info.block.clone(); + .rev(); + + let mut last_round = 0; + for block_ref in block_ref_iter { + if last_round == 0 { + last_round = block_ref.round; + let block_info = self + .recent_blocks + .get(block_ref) + .expect("Block should exist in recent blocks"); + blocks[authority_index] = block_info.block.clone(); + continue; + } + if block_ref.round < last_round { + break; + } + equivocating_blocks[authority_index].push(*block_ref); } } - blocks.into_iter().collect() + blocks.into_iter().zip(equivocating_blocks).collect() } /// Checks whether a block exists in the slot. The method checks only against the cached data. @@ -2176,8 +2188,15 @@ mod test { let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag"); + // Add equivocating block for round 2 authority 3 + let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build()); + // Accept all blocks - for block in dag_builder.all_blocks() { + for block in dag_builder + .all_blocks() + .into_iter() + .chain(std::iter::once(block)) + { dag_state.accept_block(block); } @@ -2192,13 +2211,17 @@ mod test { // WHEN search for the latest blocks let end_round = 4; let expected_rounds = vec![0, 1, 2, 3]; - + let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0]; // THEN let last_blocks = dag_state.get_last_cached_block_per_authority(end_round); assert_eq!( - last_blocks.iter().map(|b| b.round()).collect::>(), + last_blocks.iter().map(|b| b.0.round()).collect::>(), expected_rounds ); + assert_eq!( + last_blocks.iter().map(|b| b.1.len()).collect::>(), + expected_excluded_and_equivocating_blocks + ); // THEN for (i, expected_round) in expected_rounds.iter().enumerate() { @@ -2245,7 +2268,7 @@ mod test { // THEN let last_blocks = dag_state.get_last_cached_block_per_authority(end_round); assert_eq!( - last_blocks.iter().map(|b| b.round()).collect::>(), + last_blocks.iter().map(|b| b.0.round()).collect::>(), expected_rounds ); diff --git a/consensus/core/src/leader_timeout.rs b/consensus/core/src/leader_timeout.rs index a8b55a0c648d9..9e4c8898d6657 100644 --- a/consensus/core/src/leader_timeout.rs +++ b/consensus/core/src/leader_timeout.rs @@ -157,6 +157,13 @@ mod tests { todo!() } + async fn check_block_refs( + &self, + _block_refs: Vec, + ) -> Result, CoreError> { + todo!() + } + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> { self.new_block_calls .lock() diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 3e15953d96102..cd87f008a9573 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -112,6 +112,7 @@ pub(crate) struct NodeMetrics { pub(crate) blocks_per_commit_count: Histogram, pub(crate) broadcaster_rtt_estimate_ms: IntGaugeVec, pub(crate) core_add_blocks_batch_size: Histogram, + pub(crate) core_check_block_refs_batch_size: Histogram, pub(crate) core_lock_dequeued: IntCounter, pub(crate) core_lock_enqueued: IntCounter, pub(crate) core_skipped_proposals: IntCounterVec, @@ -128,6 +129,9 @@ pub(crate) struct NodeMetrics { pub(crate) synchronizer_missing_blocks_by_authority: IntCounterVec, pub(crate) synchronizer_current_missing_blocks_by_authority: IntGaugeVec, pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec, + pub(crate) network_received_excluded_ancestors_from_authority: IntCounterVec, + pub(crate) network_excluded_ancestors_sent_to_fetch: IntCounterVec, + pub(crate) network_excluded_ancestors_count_by_authority: IntCounterVec, pub(crate) invalid_blocks: IntCounterVec, pub(crate) rejected_blocks: IntCounterVec, pub(crate) rejected_future_blocks: IntCounterVec, @@ -286,6 +290,12 @@ impl NodeMetrics { NUM_BUCKETS.to_vec(), registry, ).unwrap(), + core_check_block_refs_batch_size: register_histogram_with_registry!( + "core_check_block_refs_batch_size", + "The number of excluded blocks received from Core for search on a single batch", + NUM_BUCKETS.to_vec(), + registry, + ).unwrap(), core_lock_dequeued: register_int_counter_with_registry!( "core_lock_dequeued", "Number of dequeued core requests", @@ -375,6 +385,24 @@ impl NodeMetrics { &["authority", "type"], registry, ).unwrap(), + network_received_excluded_ancestors_from_authority: register_int_counter_vec_with_registry!( + "network_received_excluded_ancestors_from_authority", + "Number of excluded ancestors received from each authority.", + &["authority"], + registry, + ).unwrap(), + network_excluded_ancestors_count_by_authority: register_int_counter_vec_with_registry!( + "network_excluded_ancestors_count_by_authority", + "Total number of excluded ancestors per authority.", + &["authority"], + registry, + ).unwrap(), + network_excluded_ancestors_sent_to_fetch: register_int_counter_vec_with_registry!( + "network_excluded_ancestors_sent_to_fetch", + "Number of excluded ancestors sent to fetch.", + &["authority"], + registry, + ).unwrap(), last_known_own_block_round: register_int_gauge_with_registry!( "last_known_own_block_round", "The highest round of our own block as this has been synced from peers during an amnesia recovery", diff --git a/consensus/core/src/network/anemo_network.rs b/consensus/core/src/network/anemo_network.rs index 36bd12228da0a..52b53ac498871 100644 --- a/consensus/core/src/network/anemo_network.rs +++ b/consensus/core/src/network/anemo_network.rs @@ -35,7 +35,7 @@ use super::{ connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle}, epoch_filter::{AllowedEpoch, EPOCH_HEADER_KEY}, metrics_layer::{MetricsCallbackMaker, MetricsResponseCallback, SizedRequest, SizedResponse}, - BlockStream, NetworkClient, NetworkManager, NetworkService, + BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkManager, NetworkService, }; use crate::{ block::{BlockRef, VerifiedBlock}, @@ -297,6 +297,10 @@ impl ConsensusRpc for AnemoServiceProxy { ) })?; let block = request.into_body().block; + let block = ExtendedSerializedBlock { + block, + excluded_ancestors: vec![], + }; self.service .handle_send_block(index, block) .await diff --git a/consensus/core/src/network/mod.rs b/consensus/core/src/network/mod.rs index 97c2d7e4f794e..ba56a526b6dfa 100644 --- a/consensus/core/src/network/mod.rs +++ b/consensus/core/src/network/mod.rs @@ -24,7 +24,7 @@ use consensus_config::{AuthorityIndex, NetworkKeyPair}; use futures::Stream; use crate::{ - block::{BlockRef, VerifiedBlock}, + block::{BlockRef, ExtendedBlock, VerifiedBlock}, commit::{CommitRange, TrustedCommit}, context::Context, error::ConsensusResult, @@ -54,8 +54,8 @@ pub(crate) mod test_network; pub(crate) mod tonic_network; mod tonic_tls; -/// A stream of serialized blocks returned over the network. -pub(crate) type BlockStream = Pin + Send>>; +/// A stream of serialized filtered blocks returned over the network. +pub(crate) type BlockStream = Pin + Send>>; /// Network client for communicating with peers. /// @@ -132,7 +132,13 @@ pub(crate) trait NetworkService: Send + Sync + 'static { /// Handles the block sent from the peer via either unicast RPC or subscription stream. /// Peer value can be trusted to be a valid authority index. /// But serialized_block must be verified before its contents are trusted. - async fn handle_send_block(&self, peer: AuthorityIndex, block: Bytes) -> ConsensusResult<()>; + /// Excluded ancestors are also included as part of an effort to further propagate + /// blocks to peers despite the current exclusion. + async fn handle_send_block( + &self, + peer: AuthorityIndex, + block: ExtendedSerializedBlock, + ) -> ConsensusResult<()>; /// Handles the subscription request from the peer. /// A stream of newly proposed blocks is returned to the peer. @@ -193,3 +199,30 @@ where /// Stops the network service. async fn stop(&mut self); } + +/// Serialized block with extended information from the proposing authority. +#[derive(Clone, PartialEq, Eq, Debug)] +pub(crate) struct ExtendedSerializedBlock { + pub(crate) block: Bytes, + // Serialized BlockRefs that are excluded from the blocks ancestors. + pub(crate) excluded_ancestors: Vec>, +} + +impl From for ExtendedSerializedBlock { + fn from(extended_block: ExtendedBlock) -> Self { + Self { + block: extended_block.block.serialized().clone(), + excluded_ancestors: extended_block + .excluded_ancestors + .iter() + .filter_map(|r| match bcs::to_bytes(r) { + Ok(serialized) => Some(serialized), + Err(e) => { + tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r); + None + } + }) + .collect(), + } + } +} diff --git a/consensus/core/src/network/network_tests.rs b/consensus/core/src/network/network_tests.rs index cef1ff8530c54..0ddb60e7fc9fb 100644 --- a/consensus/core/src/network/network_tests.rs +++ b/consensus/core/src/network/network_tests.rs @@ -12,7 +12,7 @@ use tokio::time::sleep; use super::{ anemo_network::AnemoManager, test_network::TestService, tonic_network::TonicManager, - NetworkClient, NetworkManager, + ExtendedSerializedBlock, NetworkClient, NetworkManager, }; use crate::{ block::{TestBlock, VerifiedBlock}, @@ -52,8 +52,11 @@ impl ManagerBuilder for TonicManagerBuilder { } } -fn block_for_round(round: Round) -> Bytes { - Bytes::from(vec![round as u8; 16]) +fn block_for_round(round: Round) -> ExtendedSerializedBlock { + ExtendedSerializedBlock { + block: Bytes::from(vec![round as u8; 16]), + excluded_ancestors: vec![], + } } fn service_with_own_blocks() -> Arc> { @@ -125,13 +128,19 @@ async fn send_and_receive_blocks_with_auth( assert_eq!(service_0.lock().handle_send_block[0].0.value(), 1); assert_eq!( service_0.lock().handle_send_block[0].1, - test_block_1.serialized(), + ExtendedSerializedBlock { + block: test_block_1.serialized().clone(), + excluded_ancestors: vec![], + }, ); assert_eq!(service_1.lock().handle_send_block.len(), 1); assert_eq!(service_1.lock().handle_send_block[0].0.value(), 0); assert_eq!( service_1.lock().handle_send_block[0].1, - test_block_0.serialized(), + ExtendedSerializedBlock { + block: test_block_0.serialized().clone(), + excluded_ancestors: vec![], + }, ); // `Committee` is generated with the same random seed in Context::new_for_test(), diff --git a/consensus/core/src/network/test_network.rs b/consensus/core/src/network/test_network.rs index 0290a599ab4e9..259b94261d1e2 100644 --- a/consensus/core/src/network/test_network.rs +++ b/consensus/core/src/network/test_network.rs @@ -15,12 +15,14 @@ use crate::{ Round, }; +use super::ExtendedSerializedBlock; + pub(crate) struct TestService { - pub(crate) handle_send_block: Vec<(AuthorityIndex, Bytes)>, + pub(crate) handle_send_block: Vec<(AuthorityIndex, ExtendedSerializedBlock)>, pub(crate) handle_fetch_blocks: Vec<(AuthorityIndex, Vec)>, pub(crate) handle_subscribe_blocks: Vec<(AuthorityIndex, Round)>, pub(crate) handle_fetch_commits: Vec<(AuthorityIndex, CommitRange)>, - pub(crate) own_blocks: Vec, + pub(crate) own_blocks: Vec, } impl TestService { @@ -35,14 +37,18 @@ impl TestService { } #[cfg_attr(msim, allow(dead_code))] - pub(crate) fn add_own_blocks(&mut self, blocks: Vec) { + pub(crate) fn add_own_blocks(&mut self, blocks: Vec) { self.own_blocks.extend(blocks); } } #[async_trait] impl NetworkService for Mutex { - async fn handle_send_block(&self, peer: AuthorityIndex, block: Bytes) -> ConsensusResult<()> { + async fn handle_send_block( + &self, + peer: AuthorityIndex, + block: ExtendedSerializedBlock, + ) -> ConsensusResult<()> { let mut state = self.lock(); state.handle_send_block.push((peer, block)); Ok(()) diff --git a/consensus/core/src/network/tonic_network.rs b/consensus/core/src/network/tonic_network.rs index 3f3403c26b896..09bbac5c7e6ad 100644 --- a/consensus/core/src/network/tonic_network.rs +++ b/consensus/core/src/network/tonic_network.rs @@ -32,7 +32,7 @@ use super::{ consensus_service_client::ConsensusServiceClient, consensus_service_server::ConsensusService, }, - BlockStream, NetworkClient, NetworkManager, NetworkService, + BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkManager, NetworkService, }; use crate::{ block::{BlockRef, VerifiedBlock}, @@ -129,7 +129,10 @@ impl NetworkClient for TonicClient { .take_while(|b| futures::future::ready(b.is_ok())) .filter_map(move |b| async move { match b { - Ok(response) => Some(response.block), + Ok(response) => Some(ExtendedSerializedBlock { + block: response.block, + excluded_ancestors: response.excluded_ancestors, + }), Err(e) => { debug!("Network error received from {}: {e:?}", peer); None @@ -448,6 +451,10 @@ impl ConsensusService for TonicServiceProxy { return Err(tonic::Status::internal("PeerInfo not found")); }; let block = request.into_inner().block; + let block = ExtendedSerializedBlock { + block, + excluded_ancestors: vec![], + }; self.service .handle_send_block(peer_index, block) .await @@ -488,7 +495,12 @@ impl ConsensusService for TonicServiceProxy { .handle_subscribe_blocks(peer_index, first_request.last_received_round) .await .map_err(|e| tonic::Status::internal(format!("{e:?}")))? - .map(|block| Ok(SubscribeBlocksResponse { block })); + .map(|block| { + Ok(SubscribeBlocksResponse { + block: block.block, + excluded_ancestors: block.excluded_ancestors, + }) + }); let rate_limited_stream = tokio_stream::StreamExt::throttle(stream, self.context.parameters.min_round_delay / 2) .boxed(); @@ -1032,6 +1044,9 @@ pub(crate) struct SubscribeBlocksRequest { pub(crate) struct SubscribeBlocksResponse { #[prost(bytes = "bytes", tag = "1")] block: Bytes, + // Serialized BlockRefs that are excluded from the blocks ancestors. + #[prost(bytes = "vec", repeated, tag = "2")] + excluded_ancestors: Vec>, } #[derive(Clone, prost::Message)] diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index 6a05d42bb0282..ee43f2462043f 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -155,7 +155,7 @@ impl RoundProber { .get_last_cached_block_per_authority(Round::MAX); let local_highest_accepted_rounds = blocks .into_iter() - .map(|block| block.round()) + .map(|(block, _)| block.round()) .collect::>(); let last_proposed_round = local_highest_accepted_rounds[own_index]; @@ -411,6 +411,13 @@ mod test { unimplemented!() } + async fn check_block_refs( + &self, + _block_refs: Vec, + ) -> Result, CoreError> { + unimplemented!() + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { unimplemented!() } diff --git a/consensus/core/src/subscriber.rs b/consensus/core/src/subscriber.rs index e60c3fad77a3b..e497ea70246bb 100644 --- a/consensus/core/src/subscriber.rs +++ b/consensus/core/src/subscriber.rs @@ -248,11 +248,12 @@ mod test { use super::*; use crate::{ - block::{BlockRef, VerifiedBlock}, + block::BlockRef, commit::CommitRange, error::ConsensusResult, - network::{test_network::TestService, BlockStream}, + network::{test_network::TestService, BlockStream, ExtendedSerializedBlock}, storage::mem_store::MemStore, + VerifiedBlock, }; struct SubscriberTestClient {} @@ -284,7 +285,11 @@ mod test { ) -> ConsensusResult { let block_stream = stream::unfold((), |_| async { sleep(Duration::from_millis(1)).await; - Some((Bytes::from(vec![1u8; 8]), ())) + let block = ExtendedSerializedBlock { + block: Bytes::from(vec![1u8; 8]), + excluded_ancestors: vec![], + }; + Some((block, ())) }) .take(10); Ok(Box::pin(block_stream)) @@ -360,7 +365,13 @@ mod test { assert!(service.handle_send_block.len() >= 100); for (p, block) in service.handle_send_block.iter() { assert_eq!(*p, peer); - assert_eq!(*block, Bytes::from(vec![1u8; 8])); + assert_eq!( + *block, + ExtendedSerializedBlock { + block: Bytes::from(vec![1u8; 8]), + excluded_ancestors: vec![] + } + ); } } } diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index 7261a4c6261aa..d538425aacd48 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -606,7 +606,7 @@ impl Synchronizer>() }