diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index b22331d6d07cc..98893857eb177 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -14,7 +14,7 @@ use tokio_util::sync::ReusableBoxFuture; use tracing::{debug, info, warn}; use crate::{ - block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND}, + block::{BlockAPI as _, BlockRef, ExtendedBlock, SignedBlock, VerifiedBlock, GENESIS_ROUND}, block_verifier::BlockVerifier, commit::{CommitAPI as _, CommitRange, TrustedCommit}, commit_vote_monitor::CommitVoteMonitor, @@ -22,7 +22,7 @@ use crate::{ core_thread::CoreThreadDispatcher, dag_state::DagState, error::{ConsensusError, ConsensusResult}, - network::{BlockStream, NetworkService}, + network::{BlockStream, ExtendedSerializedBlock, NetworkService}, stake_aggregator::{QuorumThreshold, StakeAggregator}, storage::Store, synchronizer::SynchronizerHandle, @@ -38,7 +38,7 @@ pub(crate) struct AuthorityService { block_verifier: Arc, synchronizer: Arc, core_dispatcher: Arc, - rx_block_broadcaster: broadcast::Receiver, + rx_block_broadcaster: broadcast::Receiver, subscription_counter: Arc, dag_state: Arc>, store: Arc, @@ -51,7 +51,7 @@ impl AuthorityService { commit_vote_monitor: Arc, synchronizer: Arc, core_dispatcher: Arc, - rx_block_broadcaster: broadcast::Receiver, + rx_block_broadcaster: broadcast::Receiver, dag_state: Arc>, store: Arc, ) -> Self { @@ -78,7 +78,7 @@ impl NetworkService for AuthorityService { async fn handle_send_block( &self, peer: AuthorityIndex, - serialized_block: Bytes, + serialized_block: ExtendedSerializedBlock, ) -> ConsensusResult<()> { fail_point_async!("consensus-rpc-response"); @@ -86,7 +86,7 @@ impl NetworkService for AuthorityService { // TODO: dedup block verifications, here and with fetched blocks. let signed_block: SignedBlock = - bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?; + bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?; // Reject blocks not produced by the peer. if peer != signed_block.author() { @@ -113,7 +113,7 @@ impl NetworkService for AuthorityService { info!("Invalid block from {}: {}", peer, e); return Err(e); } - let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block); + let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block); let block_ref = verified_block.reference(); debug!("Received block {} via send block.", block_ref); @@ -225,6 +225,75 @@ impl NetworkService for AuthorityService { } } + // ------------ After processing the block, process the excluded ancestors ------------ + + let mut excluded_ancestors = serialized_block + .excluded_ancestors + .into_iter() + .map(|serialized| bcs::from_bytes::(&serialized)) + .collect::, bcs::Error>>() + .map_err(ConsensusError::MalformedBlock)?; + + let excluded_ancestors_limit = self.context.committee.size() * 2; + if excluded_ancestors.len() > excluded_ancestors_limit { + debug!( + "Dropping {} excluded ancestor(s) from {} {} due to size limit", + excluded_ancestors.len() - excluded_ancestors_limit, + peer, + peer_hostname, + ); + excluded_ancestors.truncate(excluded_ancestors_limit); + } + + self.context + .metrics + .node_metrics + .network_received_excluded_ancestors_from_authority + .with_label_values(&[peer_hostname]) + .inc_by(excluded_ancestors.len() as u64); + + for excluded_ancestor in &excluded_ancestors { + let excluded_ancestor_hostname = &self + .context + .committee + .authority(excluded_ancestor.author) + .hostname; + self.context + .metrics + .node_metrics + .network_excluded_ancestors_count_by_authority + .with_label_values(&[excluded_ancestor_hostname]) + .inc(); + } + + let missing_excluded_ancestors = self + .core_dispatcher + .check_block_refs(excluded_ancestors) + .await + .map_err(|_| ConsensusError::Shutdown)?; + + if !missing_excluded_ancestors.is_empty() { + self.context + .metrics + .node_metrics + .network_excluded_ancestors_sent_to_fetch + .with_label_values(&[peer_hostname]) + .inc_by(missing_excluded_ancestors.len() as u64); + + let synchronizer = self.synchronizer.clone(); + tokio::spawn(async move { + // schedule the fetching of them from this peer in the background + if let Err(err) = synchronizer + .fetch_blocks(missing_excluded_ancestors, peer) + .await + { + warn!( + "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}" + ); + } + }); + } + Ok(()) } @@ -243,7 +312,10 @@ impl NetworkService for AuthorityService { dag_state .get_cached_blocks(self.context.own_index, last_received + 1) .into_iter() - .map(|block| block.serialized().clone()), + .map(|block| ExtendedSerializedBlock { + block: block.serialized().clone(), + excluded_ancestors: vec![], + }), ); let broadcasted_blocks = BroadcastedBlockStream::new( @@ -254,7 +326,7 @@ impl NetworkService for AuthorityService { // Return a stream of blocks that first yields missed blocks as requested, then new blocks. Ok(Box::pin(missed_blocks.chain( - broadcasted_blocks.map(|block| block.serialized().clone()), + broadcasted_blocks.map(ExtendedSerializedBlock::from), ))) } @@ -423,7 +495,7 @@ impl NetworkService for AuthorityService { .get_last_cached_block_per_authority(Round::MAX); let highest_accepted_rounds = blocks .into_iter() - .map(|block| block.round()) + .map(|(block, _)| block.round()) .collect::>(); // Own blocks do not go through the core dispatcher, so they need to be set separately. @@ -516,7 +588,7 @@ impl SubscriptionCounter { /// Each broadcasted block stream wraps a broadcast receiver for blocks. /// It yields blocks that are broadcasted after the stream is created. -type BroadcastedBlockStream = BroadcastStream; +type BroadcastedBlockStream = BroadcastStream; /// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that /// this tolerates lags with only logging, without yielding errors. @@ -612,15 +684,14 @@ async fn make_recv_future( mod tests { use crate::{ authority_service::AuthorityService, - block::BlockAPI, - block::{BlockRef, SignedBlock, TestBlock, VerifiedBlock}, + block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock}, commit::CommitRange, commit_vote_monitor::CommitVoteMonitor, context::Context, core_thread::{CoreError, CoreThreadDispatcher}, dag_state::DagState, error::ConsensusResult, - network::{BlockStream, NetworkClient, NetworkService}, + network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService}, round_prober::QuorumRound, storage::mem_store::MemStore, synchronizer::Synchronizer, @@ -664,6 +735,13 @@ mod tests { Ok(block_refs) } + async fn check_block_refs( + &self, + _block_refs: Vec, + ) -> Result, CoreError> { + Ok(BTreeSet::new()) + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { Ok(()) } @@ -797,7 +875,11 @@ mod tests { ); let service = authority_service.clone(); - let serialized = input_block.serialized().clone(); + let serialized = ExtendedSerializedBlock { + block: input_block.serialized().clone(), + excluded_ancestors: vec![], + }; + tokio::spawn(async move { service .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized) diff --git a/consensus/core/src/block.rs b/consensus/core/src/block.rs index 9327560348b67..1a636fabfc610 100644 --- a/consensus/core/src/block.rs +++ b/consensus/core/src/block.rs @@ -19,7 +19,10 @@ use serde::{Deserialize, Serialize}; use shared_crypto::intent::{Intent, IntentMessage, IntentScope}; use crate::{ - commit::CommitVote, context::Context, ensure, error::ConsensusError, error::ConsensusResult, + commit::CommitVote, + context::Context, + ensure, + error::{ConsensusError, ConsensusResult}, }; /// Round number of a block. @@ -638,6 +641,15 @@ impl fmt::Debug for VerifiedBlock { } } +/// Block with extended additional information, such as +/// local blocks that are excluded from the block's ancestors. +/// The extended information do not need to be certified or forwarded to other authorities. +#[derive(Clone, Debug)] +pub(crate) struct ExtendedBlock { + pub block: VerifiedBlock, + pub excluded_ancestors: Vec, +} + /// Generates the genesis blocks for the current Committee. /// The blocks are returned in authority index order. pub(crate) fn genesis_blocks(context: Arc) -> Vec { diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index 4e54065c2b49c..8ca177944d114 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -144,6 +144,54 @@ impl BlockManager { (accepted_blocks, missing_blocks) } + /// Tries to find the provided block_refs in DagState and BlockManager, + /// and returns missing block refs. + pub(crate) fn try_find_blocks(&mut self, mut block_refs: Vec) -> BTreeSet { + let _s = monitored_scope("BlockManager::try_find_blocks"); + + block_refs.sort_by_key(|b| b.round); + debug!( + "Trying to find blocks: {}", + block_refs.iter().map(|b| b.to_string()).join(",") + ); + + let mut missing_blocks = BTreeSet::new(); + + for (found, block_ref) in self + .dag_state + .read() + .contains_blocks(block_refs.clone()) + .into_iter() + .zip(block_refs.iter()) + { + if found || self.suspended_blocks.contains_key(block_ref) { + continue; + } + // Fetches the block if it is not in dag state or suspended. + missing_blocks.insert(*block_ref); + if self.missing_blocks.insert(*block_ref) { + let block_ref_hostname = + &self.context.committee.authority(block_ref.author).hostname; + self.context + .metrics + .node_metrics + .block_manager_missing_blocks_by_authority + .with_label_values(&[block_ref_hostname]) + .inc(); + } + } + + let metrics = &self.context.metrics.node_metrics; + metrics + .missing_blocks_total + .inc_by(missing_blocks.len() as u64); + metrics + .block_manager_missing_blocks + .set(self.missing_blocks.len() as i64); + + missing_blocks + } + // TODO: remove once timestamping is refactored to the new approach. // Verifies each block's timestamp based on its ancestors, and persists in store all the valid blocks that should be accepted. Method // returns the accepted and persisted blocks. @@ -314,6 +362,7 @@ impl BlockManager { // Add the ancestor to the missing blocks set only if it doesn't already exist in the suspended blocks - meaning // that we already have its payload. if !self.suspended_blocks.contains_key(ancestor) { + // Fetches the block if it is not in dag state or suspended. ancestors_to_fetch.insert(*ancestor); if self.missing_blocks.insert(*ancestor) { self.context @@ -1067,4 +1116,88 @@ mod tests { // Other blocks should be rejected and there should be no remaining suspended block. assert!(block_manager.suspended_blocks().is_empty()); } + + #[tokio::test] + async fn try_find_blocks() { + // GIVEN + let (context, _key_pairs) = Context::new_for_test(4); + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone()))); + + let mut block_manager = + BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier)); + + // create a DAG + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder + .layers(1..=2) // 2 rounds + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(2), + ]) // Create equivocating blocks for 2 authorities + .equivocate(3) + .build(); + + // Take only the blocks of round 2 and try to accept them + let round_2_blocks = dag_builder + .blocks + .iter() + .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone())) + .collect::>(); + + // All blocks should be missing + let missing_block_refs_from_find = + block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect()); + assert_eq!(missing_block_refs_from_find.len(), 10); + assert!(missing_block_refs_from_find + .iter() + .all(|block_ref| block_ref.round == 2)); + + // Try accept blocks which will cause blocks to be suspended and added to missing + // in block manager. + let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone()); + assert!(accepted_blocks.is_empty()); + + let missing_block_refs = round_2_blocks.first().unwrap().ancestors(); + let missing_block_refs_from_accept = + missing_block_refs.iter().cloned().collect::>(); + assert_eq!(missing, missing_block_refs_from_accept); + assert_eq!( + block_manager.missing_blocks(), + missing_block_refs_from_accept + ); + + // No blocks should be accepted and block manager should have made note + // of the missing & suspended blocks. + // Now we can check get the result of try find block with all of the blocks + // from newly created but not accepted round 3. + dag_builder.layer(3).build(); + + let round_3_blocks = dag_builder + .blocks + .iter() + .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference())) + .collect::>(); + + let missing_block_refs_from_find = block_manager.try_find_blocks( + round_2_blocks + .iter() + .map(|b| b.reference()) + .chain(round_3_blocks.into_iter()) + .collect(), + ); + + assert_eq!(missing_block_refs_from_find.len(), 4); + assert!(missing_block_refs_from_find + .iter() + .all(|block_ref| block_ref.round == 3)); + assert_eq!( + block_manager.missing_blocks(), + missing_block_refs_from_accept + .into_iter() + .chain(missing_block_refs_from_find.into_iter()) + .collect() + ); + } } diff --git a/consensus/core/src/broadcaster.rs b/consensus/core/src/broadcaster.rs index ee3b9b0e1d77c..c8298b482b985 100644 --- a/consensus/core/src/broadcaster.rs +++ b/consensus/core/src/broadcaster.rs @@ -17,7 +17,7 @@ use tokio::{ use tracing::{trace, warn}; use crate::{ - block::{BlockAPI as _, VerifiedBlock}, + block::{BlockAPI as _, ExtendedBlock, VerifiedBlock}, context::Context, core::CoreSignalsReceivers, error::ConsensusResult, @@ -74,7 +74,7 @@ impl Broadcaster { async fn push_blocks( context: Arc, network_client: Arc, - mut rx_block_broadcast: broadcast::Receiver, + mut rx_block_broadcast: broadcast::Receiver, peer: AuthorityIndex, ) { let peer_hostname = &context.committee.authority(peer).hostname; @@ -126,7 +126,8 @@ impl Broadcaster { tokio::select! { result = rx_block_broadcast.recv(), if requests.len() < BROADCAST_CONCURRENCY => { let block = match result { - Ok(block) => block, + // Other info from ExtendedBlock are ignored, because Broadcaster is not used in production. + Ok(block) => block.block, Err(broadcast::error::RecvError::Closed) => { trace!("Sender to {peer} is shutting down!"); return; @@ -196,7 +197,7 @@ mod test { use super::*; use crate::{ - block::{BlockRef, TestBlock}, + block::{BlockRef, ExtendedBlock, TestBlock}, commit::CommitRange, core::CoreSignals, network::BlockStream, @@ -295,7 +296,12 @@ mod test { let block = VerifiedBlock::new_for_test(TestBlock::new(9, 1).build()); assert!( - core_signals.new_block(block.clone()).is_ok(), + core_signals + .new_block(ExtendedBlock { + block: block.clone(), + excluded_ancestors: vec![], + }) + .is_ok(), "No subscriber active to receive the block" ); diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 7aae08598d537..27faee49f1ff5 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -21,8 +21,8 @@ use tracing::{debug, info, trace, warn}; use crate::{ ancestor::{AncestorState, AncestorStateManager}, block::{ - Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, Round, SignedBlock, Slot, - VerifiedBlock, GENESIS_ROUND, + Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, ExtendedBlock, Round, SignedBlock, + Slot, VerifiedBlock, GENESIS_ROUND, }, block_manager::BlockManager, commit::CommittedSubDag, @@ -197,7 +197,7 @@ impl Core { .get_last_cached_block_per_authority(Round::MAX); let max_ancestor_timestamp = ancestor_blocks .iter() - .fold(0, |ts, b| ts.max(b.timestamp_ms())); + .fold(0, |ts, (b, _)| ts.max(b.timestamp_ms())); let wait_ms = max_ancestor_timestamp.saturating_sub(self.context.clock.timestamp_utc_ms()); if wait_ms > 0 { warn!( @@ -223,7 +223,12 @@ impl Core { } // if no new block proposed then just re-broadcast the last proposed one to ensure liveness. - self.signals.new_block(last_proposed_block.clone()).unwrap(); + self.signals + .new_block(ExtendedBlock { + block: last_proposed_block.clone(), + excluded_ancestors: vec![], + }) + .unwrap(); last_proposed_block }; @@ -237,7 +242,6 @@ impl Core { /// Processes the provided blocks and accepts them if possible when their causal history exists. /// The method returns: - /// - The references of accepted blocks /// - The references of ancestors missing their block pub(crate) fn add_blocks( &mut self, @@ -288,6 +292,39 @@ impl Core { Ok(missing_block_refs) } + /// Checks if provided block refs have been accepted. If not, missing block refs are kept for synchronizations. + /// Returns the references of missing blocks among the input blocks. + pub(crate) fn check_block_refs( + &mut self, + block_refs: Vec, + ) -> ConsensusResult> { + let _scope = monitored_scope("Core::check_block_refs"); + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["Core::check_block_refs"]) + .start_timer(); + self.context + .metrics + .node_metrics + .core_check_block_refs_batch_size + .observe(block_refs.len() as f64); + + // Try to find them via the block manager + let missing_block_refs = self.block_manager.try_find_blocks(block_refs); + + if !missing_block_refs.is_empty() { + trace!( + "Missing block refs: {}", + missing_block_refs.iter().map(|b| b.to_string()).join(", ") + ); + } + + Ok(missing_block_refs) + } + /// Adds/processed all the newly `accepted_blocks`. We basically try to move the threshold clock and add them to the /// pending ancestors list. fn add_accepted_blocks(&mut self, accepted_blocks: Vec) { @@ -336,21 +373,21 @@ impl Core { if !self.should_propose() { return Ok(None); } - if let Some(block) = self.try_new_block(force) { - self.signals.new_block(block.clone())?; + if let Some(extended_block) = self.try_new_block(force) { + self.signals.new_block(extended_block.clone())?; fail_point!("consensus-after-propose"); // The new block may help commit. self.try_commit()?; - return Ok(Some(block)); + return Ok(Some(extended_block.block)); } Ok(None) } /// Attempts to propose a new block for the next round. If a block has already proposed for latest /// or earlier round, then no block is created and None is returned. - fn try_new_block(&mut self, force: bool) -> Option { + fn try_new_block(&mut self, force: bool) -> Option { let _s = self .context .metrics @@ -387,7 +424,7 @@ impl Core { // Determine the ancestors to be included in proposal. // Smart ancestor selection requires distributed scoring to be enabled. - let ancestors = if self + let (ancestors, excluded_ancestors) = if self .context .protocol_config .consensus_distributed_vote_scoring_strategy() @@ -396,7 +433,8 @@ impl Core { .protocol_config .consensus_smart_ancestor_selection() { - let ancestors = self.smart_ancestors_to_propose(clock_round, !force); + let (ancestors, excluded_and_equivocating_ancestors) = + self.smart_ancestors_to_propose(clock_round, !force); // If we did not find enough good ancestors to propose, continue to wait before proposing. if ancestors.is_empty() { @@ -406,9 +444,22 @@ impl Core { ); return None; } - ancestors + + let excluded_ancestors_limit = self.context.committee.size() * 2; + if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit { + debug!( + "Dropping {} excluded ancestor(s) during proposal due to size limit", + excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit, + ); + } + let excluded_ancestors = excluded_and_equivocating_ancestors + .into_iter() + .take(excluded_ancestors_limit) + .collect(); + + (ancestors, excluded_ancestors) } else { - self.ancestors_to_propose(clock_round) + (self.ancestors_to_propose(clock_round), vec![]) }; // Update the last included ancestor block refs @@ -545,7 +596,10 @@ impl Core { .with_label_values(&[&force.to_string()]) .inc(); - Some(verified_block) + Some(ExtendedBlock { + block: verified_block, + excluded_ancestors, + }) } /// Runs commit rule to attempt to commit additional blocks from the DAG. @@ -804,20 +858,20 @@ impl Core { // Propose only ancestors of higher rounds than what has already been proposed. // And always include own last proposed block first among ancestors. - let last_proposed_block = ancestors[self.context.own_index].clone(); + let (last_proposed_block, _) = ancestors[self.context.own_index].clone(); assert_eq!(last_proposed_block.author(), self.context.own_index); let ancestors = iter::once(last_proposed_block) .chain( ancestors .into_iter() - .filter(|block| block.author() != self.context.own_index) - .filter(|block| { + .filter(|(block, _)| block.author() != self.context.own_index) + .filter(|(block, _)| { if gc_enabled && gc_round > GENESIS_ROUND { return block.round() > gc_round; } true }) - .flat_map(|block| { + .flat_map(|(block, _)| { if let Some(last_block_ref) = self.last_included_ancestors[block.author()] { return (last_block_ref.round < block.round()).then_some(block); } @@ -846,7 +900,7 @@ impl Core { &mut self, clock_round: Round, smart_select: bool, - ) -> Vec { + ) -> (Vec, BTreeSet) { let node_metrics = &self.context.metrics.node_metrics; let _s = node_metrics .scope_processing_time @@ -854,13 +908,13 @@ impl Core { .start_timer(); // Now take the ancestors before the clock_round (excluded) for each authority. - let ancestors = self + let all_ancestors = self .dag_state .read() .get_last_cached_block_per_authority(clock_round); assert_eq!( - ancestors.len(), + all_ancestors.len(), self.context.committee.size(), "Fatal error, number of returned ancestors don't match committee size." ); @@ -871,7 +925,8 @@ impl Core { let quorum_round = clock_round.saturating_sub(1); - let mut temp_excluded_ancestors = Vec::new(); + let mut score_and_pending_excluded_ancestors = Vec::new(); + let mut excluded_and_equivocating_ancestors = BTreeSet::new(); // Propose only ancestors of higher rounds than what has already been proposed. // And always include own last proposed block first among ancestors. @@ -879,29 +934,35 @@ impl Core { // will be included in a second pass below. let included_ancestors = iter::once(self.last_proposed_block().clone()) .chain( - ancestors + all_ancestors .into_iter() - .filter(|ancestor| ancestor.author() != self.context.own_index) - .flat_map(|ancestor| { + .flat_map(|(ancestor, equivocating_ancestors)| { + if ancestor.author() == self.context.own_index { + return None; + } + if let Some(last_block_ref) = + self.last_included_ancestors[ancestor.author()] + { + if last_block_ref.round >= ancestor.round() { + return None; + } + } - let ancestor_state = ancestor_state_map[ancestor.author()]; + // We will never include equivocating ancestors so add them immediately + excluded_and_equivocating_ancestors.extend(equivocating_ancestors); + let ancestor_state = ancestor_state_map[ancestor.author()]; match ancestor_state { AncestorState::Include => { trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}"); } AncestorState::Exclude(score) => { trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}"); - temp_excluded_ancestors.push((score, ancestor)); + score_and_pending_excluded_ancestors.push((score, ancestor)); return None; } } - if let Some(last_block_ref) = - self.last_included_ancestors[ancestor.author()] - { - return (last_block_ref.round < ancestor.round()).then_some(ancestor); - } Some(ancestor) }), ) @@ -920,22 +981,21 @@ impl Core { if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) { node_metrics.smart_selection_wait.inc(); debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake()); - return vec![]; + return (vec![], BTreeSet::new()); } - // Sort scores descending so we can include the best of the temp excluded + // Sort scores descending so we can include the best of the pending excluded // ancestors first until we reach the threshold. - temp_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0)); + score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0)); let mut ancestors_to_propose = included_ancestors; let mut excluded_ancestors = Vec::new(); - - for (score, ancestor) in temp_excluded_ancestors.into_iter() { + for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() { let block_hostname = &self.context.committee.authority(ancestor.author()).hostname; if !parent_round_quorum.reached_threshold(&self.context.committee) && ancestor.round() == quorum_round { - debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}"); + debug!("Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"); parent_round_quorum.add(ancestor.author(), &self.context.committee); ancestors_to_propose.push(ancestor); node_metrics @@ -948,8 +1008,8 @@ impl Core { } // Include partially propagated blocks from excluded authorities, to help propagate the blocks - // across the network with less latency impact. - // TODO: use a separate mechanism to propagate excluded ancestor blocks and remove this logic. + // across the network with less latency impact. Other excluded ancestors are not included in the block + // but still broadcasted to peers. for (score, ancestor) in excluded_ancestors.iter() { let excluded_author = ancestor.author(); let block_hostname = &self.context.committee.authority(excluded_author).hostname; @@ -966,11 +1026,16 @@ impl Core { let last_included_round = self.last_included_ancestors[excluded_author] .map(|block_ref| block_ref.round) .unwrap_or(GENESIS_ROUND); - if last_included_round >= accepted_low_quorum_round { - trace!( - "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {} >= accepted low quorum round {}", - ancestor.reference(), last_included_round, accepted_low_quorum_round, - ); + if ancestor.round() <= last_included_round { + // This should have already been filtered out when filtering all_ancestors. + // Still, ensure previously included ancestors are filtered out. + continue; + } + + // This ancestor has not propagated well, so it cannot be included / voted on. + if ancestor.round() > accepted_low_quorum_round { + excluded_and_equivocating_ancestors.insert(ancestor.reference()); + trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: too few validators have seen it", ancestor.reference()); node_metrics .excluded_proposal_ancestors_count_by_authority .with_label_values(&[block_hostname]) @@ -978,25 +1043,8 @@ impl Core { continue; } - // Include the ancestor block as it has been seen & accepted by a strong quorum. - let ancestor = if ancestor.round() == accepted_low_quorum_round { - ancestor.clone() - } else { - // Only cached blocks need to be propagated. Committed and GC'ed blocks do not need to be propagated. - let Some(ancestor) = self.dag_state.read().get_last_cached_block_in_range( - excluded_author, - last_included_round + 1, - accepted_low_quorum_round + 1, - ) else { - trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: no suitable block found", ancestor.reference()); - node_metrics - .excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname]) - .inc(); - continue; - }; - ancestor - }; + // Otherwise, include the ancestor block as it has been seen & accepted by a strong quorum. + // Only cached blocks need to be propagated. Committed and GC'ed blocks do not need to be propagated. self.last_included_ancestors[excluded_author] = Some(ancestor.reference()); ancestors_to_propose.push(ancestor.clone()); trace!("Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}", ancestor.reference()); @@ -1009,12 +1057,12 @@ impl Core { assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); info!( - "Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}", + "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}", ancestors_to_propose.len(), - excluded_ancestors.len() + excluded_and_equivocating_ancestors.len() ); - ancestors_to_propose + (ancestors_to_propose, excluded_and_equivocating_ancestors) } /// Checks whether all the leaders of the round exist. @@ -1063,7 +1111,7 @@ impl Core { /// Senders of signals from Core, for outputs and events (ex new block produced). pub(crate) struct CoreSignals { - tx_block_broadcast: broadcast::Sender, + tx_block_broadcast: broadcast::Sender, new_round_sender: watch::Sender, context: Arc, } @@ -1073,7 +1121,7 @@ impl CoreSignals { // Blocks buffered in broadcast channel should be roughly equal to thosed cached in dag state, // since the underlying blocks are ref counted so a lower buffer here will not reduce memory // usage significantly. - let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::( + let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::( context.parameters.dag_state_cached_rounds as usize, ); let (new_round_sender, new_round_receiver) = watch::channel(0); @@ -1094,21 +1142,23 @@ impl CoreSignals { /// Sends a signal to all the waiters that a new block has been produced. The method will return /// true if block has reached even one subscriber, false otherwise. - pub(crate) fn new_block(&self, block: VerifiedBlock) -> ConsensusResult<()> { + pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> { // When there is only one authority in committee, it is unnecessary to broadcast // the block which will fail anyway without subscribers to the signal. if self.context.committee.size() > 1 { - if block.round() == GENESIS_ROUND { + if extended_block.block.round() == GENESIS_ROUND { debug!("Ignoring broadcasting genesis block to peers"); return Ok(()); } - if let Err(err) = self.tx_block_broadcast.send(block) { + if let Err(err) = self.tx_block_broadcast.send(extended_block) { warn!("Couldn't broadcast the block to any receiver: {err}"); return Err(ConsensusError::Shutdown); } } else { - debug!("Did not broadcast block {block:?} to receivers as committee size is <= 1"); + debug!( + "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1" + ); } Ok(()) } @@ -1123,12 +1173,12 @@ impl CoreSignals { /// Receivers of signals from Core. /// Intentionally un-clonable. Comonents should only subscribe to channels they need. pub(crate) struct CoreSignalsReceivers { - rx_block_broadcast: broadcast::Receiver, + rx_block_broadcast: broadcast::Receiver, new_round_receiver: watch::Receiver, } impl CoreSignalsReceivers { - pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver { + pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver { self.rx_block_broadcast.resubscribe() } @@ -1155,7 +1205,7 @@ pub(crate) fn create_cores(context: Context, authorities: Vec) -> Vec, + pub block_receiver: broadcast::Receiver, #[allow(unused)] pub commit_receiver: UnboundedReceiver, pub store: Arc, @@ -1341,8 +1391,8 @@ mod test { .recv() .await .expect("A block should have been created"); - assert_eq!(proposed_block.round(), 5); - let ancestors = proposed_block.ancestors(); + assert_eq!(proposed_block.block.round(), 5); + let ancestors = proposed_block.block.ancestors(); // Only ancestors of round 4 should be included. assert_eq!(ancestors.len(), 4); @@ -1463,8 +1513,8 @@ mod test { .recv() .await .expect("A block should have been created"); - assert_eq!(proposed_block.round(), 4); - let ancestors = proposed_block.ancestors(); + assert_eq!(proposed_block.block.round(), 4); + let ancestors = proposed_block.block.ancestors(); assert_eq!(ancestors.len(), 4); for ancestor in ancestors { @@ -1562,18 +1612,18 @@ mod test { } // a new block should have been created during recovery. - let block = block_receiver + let extended_block = block_receiver .recv() .await .expect("A new block should have been created"); // A new block created - assert the details - assert_eq!(block.round(), 1); - assert_eq!(block.author().value(), 0); - assert_eq!(block.ancestors().len(), 4); + assert_eq!(extended_block.block.round(), 1); + assert_eq!(extended_block.block.author().value(), 0); + assert_eq!(extended_block.block.ancestors().len(), 4); let mut total = 0; - for (i, transaction) in block.transactions().iter().enumerate() { + for (i, transaction) in extended_block.block.transactions().iter().enumerate() { total += transaction.data().len() as u64; let transaction: String = bcs::from_bytes(transaction.data()).unwrap(); assert_eq!(format!("Transaction {i}"), transaction); @@ -1583,7 +1633,7 @@ mod test { // genesis blocks should be referenced let all_genesis = genesis_blocks(context); - for ancestor in block.ancestors() { + for ancestor in extended_block.block.ancestors() { all_genesis .iter() .find(|block| block.reference() == *ancestor) @@ -2439,15 +2489,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); @@ -2564,15 +2617,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); @@ -2698,15 +2754,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); @@ -2850,15 +2909,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); @@ -2982,15 +3044,18 @@ mod test { assert_eq!(new_round, round); // Check that a new block has been proposed. - let block = tokio::time::timeout( + let extended_block = tokio::time::timeout( Duration::from_secs(1), core_fixture.block_receiver.recv(), ) .await .unwrap() .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); + assert_eq!(extended_block.block.round(), round); + assert_eq!( + extended_block.block.author(), + core_fixture.core.context.own_index + ); // append the new block to this round blocks this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index d76a30d059aa7..87d1ea5215d38 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -36,6 +36,8 @@ const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000; enum CoreThreadCommand { /// Add blocks to be processed and accepted AddBlocks(Vec, oneshot::Sender>), + /// Checks if block refs exist locally and sync missing ones. + CheckBlockRefs(Vec, oneshot::Sender>), /// Called when the min round has passed or the leader timeout occurred and a block should be produced. /// When the command is called with `force = true`, then the block will be created for `round` skipping /// any checks (ex leader existence of previous round). More information can be found on the `Core` component. @@ -57,6 +59,11 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static { async fn add_blocks(&self, blocks: Vec) -> Result, CoreError>; + async fn check_block_refs( + &self, + block_refs: Vec, + ) -> Result, CoreError>; + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>; async fn get_missing_blocks(&self) -> Result, CoreError>; @@ -121,6 +128,11 @@ impl CoreThread { let missing_block_refs = self.core.add_blocks(blocks)?; sender.send(missing_block_refs).ok(); } + CoreThreadCommand::CheckBlockRefs(blocks, sender) => { + let _scope = monitored_scope("CoreThread::loop::find_excluded_blocks"); + let missing_block_refs = self.core.check_block_refs(blocks)?; + sender.send(missing_block_refs).ok(); + } CoreThreadCommand::NewBlock(round, sender, force) => { let _scope = monitored_scope("CoreThread::loop::new_block"); self.core.new_block(round, force)?; @@ -283,6 +295,21 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher { Ok(missing_block_refs) } + async fn check_block_refs( + &self, + block_refs: Vec, + ) -> Result, CoreError> { + let (sender, receiver) = oneshot::channel(); + self.send(CoreThreadCommand::CheckBlockRefs( + block_refs.clone(), + sender, + )) + .await; + let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?; + + Ok(missing_block_refs) + } + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> { let (sender, receiver) = oneshot::channel(); self.send(CoreThreadCommand::NewBlock(round, sender, force)) @@ -380,6 +407,13 @@ impl CoreThreadDispatcher for MockCoreThreadDispatcher { Ok(BTreeSet::new()) } + async fn check_block_refs( + &self, + _block_refs: Vec, + ) -> Result, CoreError> { + Ok(BTreeSet::new()) + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { Ok(()) } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 3ff804d02da8d..8209bf1d2f0e7 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -576,6 +576,7 @@ impl DagState { // Retrieves the cached block within the range [start_round, end_round) from a given authority. // NOTE: end_round must be greater than GENESIS_ROUND. + #[cfg(test)] pub(crate) fn get_last_cached_block_in_range( &self, authority: AuthorityIndex, @@ -608,13 +609,15 @@ impl DagState { /// The method is guaranteed to return results only when the `end_round` is not earlier of the /// available cached data for each authority (evicted round + 1), otherwise the method will panic. /// It's the caller's responsibility to ensure that is not requesting for earlier rounds. - /// In case of equivocation for an authority's last slot only one block will be returned (the last in order). + /// In case of equivocation for an authority's last slot, one block will be returned (the last in order) + /// and the other equivocating blocks will be returned. pub(crate) fn get_last_cached_block_per_authority( &self, end_round: Round, - ) -> Vec { - // init with the genesis blocks as fallback + ) -> Vec<(VerifiedBlock, Vec)> { + // Initialize with the genesis blocks as fallback let mut blocks = self.genesis.values().cloned().collect::>(); + let mut equivocating_blocks = vec![vec![]; self.context.committee.size()]; if end_round == GENESIS_ROUND { panic!( @@ -623,7 +626,7 @@ impl DagState { } if end_round == GENESIS_ROUND + 1 { - return blocks; + return blocks.into_iter().map(|b| (b, vec![])).collect(); } for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() { @@ -638,7 +641,7 @@ impl DagState { panic!("Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}", ); } - if let Some(block_ref) = block_refs + let block_ref_iter = block_refs .range(( Included(BlockRef::new( last_evicted_round + 1, @@ -647,18 +650,27 @@ impl DagState { )), Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)), )) - .next_back() - { - let block_info = self - .recent_blocks - .get(block_ref) - .expect("Block should exist in recent blocks"); - - blocks[authority_index] = block_info.block.clone(); + .rev(); + + let mut last_round = 0; + for block_ref in block_ref_iter { + if last_round == 0 { + last_round = block_ref.round; + let block_info = self + .recent_blocks + .get(block_ref) + .expect("Block should exist in recent blocks"); + blocks[authority_index] = block_info.block.clone(); + continue; + } + if block_ref.round < last_round { + break; + } + equivocating_blocks[authority_index].push(*block_ref); } } - blocks.into_iter().collect() + blocks.into_iter().zip(equivocating_blocks).collect() } /// Checks whether a block exists in the slot. The method checks only against the cached data. @@ -2176,8 +2188,15 @@ mod test { let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag"); + // Add equivocating block for round 2 authority 3 + let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build()); + // Accept all blocks - for block in dag_builder.all_blocks() { + for block in dag_builder + .all_blocks() + .into_iter() + .chain(std::iter::once(block)) + { dag_state.accept_block(block); } @@ -2192,13 +2211,17 @@ mod test { // WHEN search for the latest blocks let end_round = 4; let expected_rounds = vec![0, 1, 2, 3]; - + let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0]; // THEN let last_blocks = dag_state.get_last_cached_block_per_authority(end_round); assert_eq!( - last_blocks.iter().map(|b| b.round()).collect::>(), + last_blocks.iter().map(|b| b.0.round()).collect::>(), expected_rounds ); + assert_eq!( + last_blocks.iter().map(|b| b.1.len()).collect::>(), + expected_excluded_and_equivocating_blocks + ); // THEN for (i, expected_round) in expected_rounds.iter().enumerate() { @@ -2245,7 +2268,7 @@ mod test { // THEN let last_blocks = dag_state.get_last_cached_block_per_authority(end_round); assert_eq!( - last_blocks.iter().map(|b| b.round()).collect::>(), + last_blocks.iter().map(|b| b.0.round()).collect::>(), expected_rounds ); diff --git a/consensus/core/src/leader_timeout.rs b/consensus/core/src/leader_timeout.rs index a8b55a0c648d9..9e4c8898d6657 100644 --- a/consensus/core/src/leader_timeout.rs +++ b/consensus/core/src/leader_timeout.rs @@ -157,6 +157,13 @@ mod tests { todo!() } + async fn check_block_refs( + &self, + _block_refs: Vec, + ) -> Result, CoreError> { + todo!() + } + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> { self.new_block_calls .lock() diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 3e15953d96102..cd87f008a9573 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -112,6 +112,7 @@ pub(crate) struct NodeMetrics { pub(crate) blocks_per_commit_count: Histogram, pub(crate) broadcaster_rtt_estimate_ms: IntGaugeVec, pub(crate) core_add_blocks_batch_size: Histogram, + pub(crate) core_check_block_refs_batch_size: Histogram, pub(crate) core_lock_dequeued: IntCounter, pub(crate) core_lock_enqueued: IntCounter, pub(crate) core_skipped_proposals: IntCounterVec, @@ -128,6 +129,9 @@ pub(crate) struct NodeMetrics { pub(crate) synchronizer_missing_blocks_by_authority: IntCounterVec, pub(crate) synchronizer_current_missing_blocks_by_authority: IntGaugeVec, pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec, + pub(crate) network_received_excluded_ancestors_from_authority: IntCounterVec, + pub(crate) network_excluded_ancestors_sent_to_fetch: IntCounterVec, + pub(crate) network_excluded_ancestors_count_by_authority: IntCounterVec, pub(crate) invalid_blocks: IntCounterVec, pub(crate) rejected_blocks: IntCounterVec, pub(crate) rejected_future_blocks: IntCounterVec, @@ -286,6 +290,12 @@ impl NodeMetrics { NUM_BUCKETS.to_vec(), registry, ).unwrap(), + core_check_block_refs_batch_size: register_histogram_with_registry!( + "core_check_block_refs_batch_size", + "The number of excluded blocks received from Core for search on a single batch", + NUM_BUCKETS.to_vec(), + registry, + ).unwrap(), core_lock_dequeued: register_int_counter_with_registry!( "core_lock_dequeued", "Number of dequeued core requests", @@ -375,6 +385,24 @@ impl NodeMetrics { &["authority", "type"], registry, ).unwrap(), + network_received_excluded_ancestors_from_authority: register_int_counter_vec_with_registry!( + "network_received_excluded_ancestors_from_authority", + "Number of excluded ancestors received from each authority.", + &["authority"], + registry, + ).unwrap(), + network_excluded_ancestors_count_by_authority: register_int_counter_vec_with_registry!( + "network_excluded_ancestors_count_by_authority", + "Total number of excluded ancestors per authority.", + &["authority"], + registry, + ).unwrap(), + network_excluded_ancestors_sent_to_fetch: register_int_counter_vec_with_registry!( + "network_excluded_ancestors_sent_to_fetch", + "Number of excluded ancestors sent to fetch.", + &["authority"], + registry, + ).unwrap(), last_known_own_block_round: register_int_gauge_with_registry!( "last_known_own_block_round", "The highest round of our own block as this has been synced from peers during an amnesia recovery", diff --git a/consensus/core/src/network/anemo_network.rs b/consensus/core/src/network/anemo_network.rs index 36bd12228da0a..52b53ac498871 100644 --- a/consensus/core/src/network/anemo_network.rs +++ b/consensus/core/src/network/anemo_network.rs @@ -35,7 +35,7 @@ use super::{ connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle}, epoch_filter::{AllowedEpoch, EPOCH_HEADER_KEY}, metrics_layer::{MetricsCallbackMaker, MetricsResponseCallback, SizedRequest, SizedResponse}, - BlockStream, NetworkClient, NetworkManager, NetworkService, + BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkManager, NetworkService, }; use crate::{ block::{BlockRef, VerifiedBlock}, @@ -297,6 +297,10 @@ impl ConsensusRpc for AnemoServiceProxy { ) })?; let block = request.into_body().block; + let block = ExtendedSerializedBlock { + block, + excluded_ancestors: vec![], + }; self.service .handle_send_block(index, block) .await diff --git a/consensus/core/src/network/mod.rs b/consensus/core/src/network/mod.rs index 97c2d7e4f794e..ba56a526b6dfa 100644 --- a/consensus/core/src/network/mod.rs +++ b/consensus/core/src/network/mod.rs @@ -24,7 +24,7 @@ use consensus_config::{AuthorityIndex, NetworkKeyPair}; use futures::Stream; use crate::{ - block::{BlockRef, VerifiedBlock}, + block::{BlockRef, ExtendedBlock, VerifiedBlock}, commit::{CommitRange, TrustedCommit}, context::Context, error::ConsensusResult, @@ -54,8 +54,8 @@ pub(crate) mod test_network; pub(crate) mod tonic_network; mod tonic_tls; -/// A stream of serialized blocks returned over the network. -pub(crate) type BlockStream = Pin + Send>>; +/// A stream of serialized filtered blocks returned over the network. +pub(crate) type BlockStream = Pin + Send>>; /// Network client for communicating with peers. /// @@ -132,7 +132,13 @@ pub(crate) trait NetworkService: Send + Sync + 'static { /// Handles the block sent from the peer via either unicast RPC or subscription stream. /// Peer value can be trusted to be a valid authority index. /// But serialized_block must be verified before its contents are trusted. - async fn handle_send_block(&self, peer: AuthorityIndex, block: Bytes) -> ConsensusResult<()>; + /// Excluded ancestors are also included as part of an effort to further propagate + /// blocks to peers despite the current exclusion. + async fn handle_send_block( + &self, + peer: AuthorityIndex, + block: ExtendedSerializedBlock, + ) -> ConsensusResult<()>; /// Handles the subscription request from the peer. /// A stream of newly proposed blocks is returned to the peer. @@ -193,3 +199,30 @@ where /// Stops the network service. async fn stop(&mut self); } + +/// Serialized block with extended information from the proposing authority. +#[derive(Clone, PartialEq, Eq, Debug)] +pub(crate) struct ExtendedSerializedBlock { + pub(crate) block: Bytes, + // Serialized BlockRefs that are excluded from the blocks ancestors. + pub(crate) excluded_ancestors: Vec>, +} + +impl From for ExtendedSerializedBlock { + fn from(extended_block: ExtendedBlock) -> Self { + Self { + block: extended_block.block.serialized().clone(), + excluded_ancestors: extended_block + .excluded_ancestors + .iter() + .filter_map(|r| match bcs::to_bytes(r) { + Ok(serialized) => Some(serialized), + Err(e) => { + tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r); + None + } + }) + .collect(), + } + } +} diff --git a/consensus/core/src/network/network_tests.rs b/consensus/core/src/network/network_tests.rs index cef1ff8530c54..0ddb60e7fc9fb 100644 --- a/consensus/core/src/network/network_tests.rs +++ b/consensus/core/src/network/network_tests.rs @@ -12,7 +12,7 @@ use tokio::time::sleep; use super::{ anemo_network::AnemoManager, test_network::TestService, tonic_network::TonicManager, - NetworkClient, NetworkManager, + ExtendedSerializedBlock, NetworkClient, NetworkManager, }; use crate::{ block::{TestBlock, VerifiedBlock}, @@ -52,8 +52,11 @@ impl ManagerBuilder for TonicManagerBuilder { } } -fn block_for_round(round: Round) -> Bytes { - Bytes::from(vec![round as u8; 16]) +fn block_for_round(round: Round) -> ExtendedSerializedBlock { + ExtendedSerializedBlock { + block: Bytes::from(vec![round as u8; 16]), + excluded_ancestors: vec![], + } } fn service_with_own_blocks() -> Arc> { @@ -125,13 +128,19 @@ async fn send_and_receive_blocks_with_auth( assert_eq!(service_0.lock().handle_send_block[0].0.value(), 1); assert_eq!( service_0.lock().handle_send_block[0].1, - test_block_1.serialized(), + ExtendedSerializedBlock { + block: test_block_1.serialized().clone(), + excluded_ancestors: vec![], + }, ); assert_eq!(service_1.lock().handle_send_block.len(), 1); assert_eq!(service_1.lock().handle_send_block[0].0.value(), 0); assert_eq!( service_1.lock().handle_send_block[0].1, - test_block_0.serialized(), + ExtendedSerializedBlock { + block: test_block_0.serialized().clone(), + excluded_ancestors: vec![], + }, ); // `Committee` is generated with the same random seed in Context::new_for_test(), diff --git a/consensus/core/src/network/test_network.rs b/consensus/core/src/network/test_network.rs index 0290a599ab4e9..259b94261d1e2 100644 --- a/consensus/core/src/network/test_network.rs +++ b/consensus/core/src/network/test_network.rs @@ -15,12 +15,14 @@ use crate::{ Round, }; +use super::ExtendedSerializedBlock; + pub(crate) struct TestService { - pub(crate) handle_send_block: Vec<(AuthorityIndex, Bytes)>, + pub(crate) handle_send_block: Vec<(AuthorityIndex, ExtendedSerializedBlock)>, pub(crate) handle_fetch_blocks: Vec<(AuthorityIndex, Vec)>, pub(crate) handle_subscribe_blocks: Vec<(AuthorityIndex, Round)>, pub(crate) handle_fetch_commits: Vec<(AuthorityIndex, CommitRange)>, - pub(crate) own_blocks: Vec, + pub(crate) own_blocks: Vec, } impl TestService { @@ -35,14 +37,18 @@ impl TestService { } #[cfg_attr(msim, allow(dead_code))] - pub(crate) fn add_own_blocks(&mut self, blocks: Vec) { + pub(crate) fn add_own_blocks(&mut self, blocks: Vec) { self.own_blocks.extend(blocks); } } #[async_trait] impl NetworkService for Mutex { - async fn handle_send_block(&self, peer: AuthorityIndex, block: Bytes) -> ConsensusResult<()> { + async fn handle_send_block( + &self, + peer: AuthorityIndex, + block: ExtendedSerializedBlock, + ) -> ConsensusResult<()> { let mut state = self.lock(); state.handle_send_block.push((peer, block)); Ok(()) diff --git a/consensus/core/src/network/tonic_network.rs b/consensus/core/src/network/tonic_network.rs index 3f3403c26b896..09bbac5c7e6ad 100644 --- a/consensus/core/src/network/tonic_network.rs +++ b/consensus/core/src/network/tonic_network.rs @@ -32,7 +32,7 @@ use super::{ consensus_service_client::ConsensusServiceClient, consensus_service_server::ConsensusService, }, - BlockStream, NetworkClient, NetworkManager, NetworkService, + BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkManager, NetworkService, }; use crate::{ block::{BlockRef, VerifiedBlock}, @@ -129,7 +129,10 @@ impl NetworkClient for TonicClient { .take_while(|b| futures::future::ready(b.is_ok())) .filter_map(move |b| async move { match b { - Ok(response) => Some(response.block), + Ok(response) => Some(ExtendedSerializedBlock { + block: response.block, + excluded_ancestors: response.excluded_ancestors, + }), Err(e) => { debug!("Network error received from {}: {e:?}", peer); None @@ -448,6 +451,10 @@ impl ConsensusService for TonicServiceProxy { return Err(tonic::Status::internal("PeerInfo not found")); }; let block = request.into_inner().block; + let block = ExtendedSerializedBlock { + block, + excluded_ancestors: vec![], + }; self.service .handle_send_block(peer_index, block) .await @@ -488,7 +495,12 @@ impl ConsensusService for TonicServiceProxy { .handle_subscribe_blocks(peer_index, first_request.last_received_round) .await .map_err(|e| tonic::Status::internal(format!("{e:?}")))? - .map(|block| Ok(SubscribeBlocksResponse { block })); + .map(|block| { + Ok(SubscribeBlocksResponse { + block: block.block, + excluded_ancestors: block.excluded_ancestors, + }) + }); let rate_limited_stream = tokio_stream::StreamExt::throttle(stream, self.context.parameters.min_round_delay / 2) .boxed(); @@ -1032,6 +1044,9 @@ pub(crate) struct SubscribeBlocksRequest { pub(crate) struct SubscribeBlocksResponse { #[prost(bytes = "bytes", tag = "1")] block: Bytes, + // Serialized BlockRefs that are excluded from the blocks ancestors. + #[prost(bytes = "vec", repeated, tag = "2")] + excluded_ancestors: Vec>, } #[derive(Clone, prost::Message)] diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index 6a05d42bb0282..ee43f2462043f 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -155,7 +155,7 @@ impl RoundProber { .get_last_cached_block_per_authority(Round::MAX); let local_highest_accepted_rounds = blocks .into_iter() - .map(|block| block.round()) + .map(|(block, _)| block.round()) .collect::>(); let last_proposed_round = local_highest_accepted_rounds[own_index]; @@ -411,6 +411,13 @@ mod test { unimplemented!() } + async fn check_block_refs( + &self, + _block_refs: Vec, + ) -> Result, CoreError> { + unimplemented!() + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { unimplemented!() } diff --git a/consensus/core/src/subscriber.rs b/consensus/core/src/subscriber.rs index e60c3fad77a3b..e497ea70246bb 100644 --- a/consensus/core/src/subscriber.rs +++ b/consensus/core/src/subscriber.rs @@ -248,11 +248,12 @@ mod test { use super::*; use crate::{ - block::{BlockRef, VerifiedBlock}, + block::BlockRef, commit::CommitRange, error::ConsensusResult, - network::{test_network::TestService, BlockStream}, + network::{test_network::TestService, BlockStream, ExtendedSerializedBlock}, storage::mem_store::MemStore, + VerifiedBlock, }; struct SubscriberTestClient {} @@ -284,7 +285,11 @@ mod test { ) -> ConsensusResult { let block_stream = stream::unfold((), |_| async { sleep(Duration::from_millis(1)).await; - Some((Bytes::from(vec![1u8; 8]), ())) + let block = ExtendedSerializedBlock { + block: Bytes::from(vec![1u8; 8]), + excluded_ancestors: vec![], + }; + Some((block, ())) }) .take(10); Ok(Box::pin(block_stream)) @@ -360,7 +365,13 @@ mod test { assert!(service.handle_send_block.len() >= 100); for (p, block) in service.handle_send_block.iter() { assert_eq!(*p, peer); - assert_eq!(*block, Bytes::from(vec![1u8; 8])); + assert_eq!( + *block, + ExtendedSerializedBlock { + block: Bytes::from(vec![1u8; 8]), + excluded_ancestors: vec![] + } + ); } } } diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index 7261a4c6261aa..d538425aacd48 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -606,7 +606,7 @@ impl Synchronizer>() }