From ab25d1ab1dc51d48cf78b2f8070c74aa8a7a860e Mon Sep 17 00:00:00 2001 From: MW Tian Date: Wed, 23 Oct 2024 17:09:48 -0700 Subject: [PATCH] Send certified blocks to fast path. --- consensus/core/src/authority_node.rs | 46 +++-- consensus/core/src/block.rs | 20 ++ consensus/core/src/commit.rs | 1 + consensus/core/src/commit_consumer.rs | 19 +- consensus/core/src/commit_observer.rs | 49 +++-- consensus/core/src/core.rs | 27 ++- consensus/core/src/dag_state.rs | 187 ++++++++++++++++-- consensus/core/src/lib.rs | 2 +- consensus/core/src/stake_aggregator.rs | 38 +++- crates/sui-core/src/consensus_handler.rs | 38 ++-- .../consensus_manager/mysticeti_manager.rs | 4 +- 11 files changed, 349 insertions(+), 82 deletions(-) diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 1b8b818b797216..98c4c22e23d625 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -426,7 +426,10 @@ mod tests { use super::*; use crate::block::GENESIS_ROUND; - use crate::{block::BlockAPI as _, transaction::NoopTransactionVerifier, CommittedSubDag}; + use crate::{ + block::BlockAPI as _, block::BlockOutputBatch, transaction::NoopTransactionVerifier, + CommittedSubDag, + }; #[rstest] #[tokio::test] @@ -490,12 +493,13 @@ mod tests { .map(|_| TempDir::new().unwrap()) .collect::>(); - let mut output_receivers = Vec::with_capacity(committee.size()); + let mut commit_receivers = Vec::with_capacity(committee.size()); + let mut block_receivers = Vec::with_capacity(committee.size()); let mut authorities = Vec::with_capacity(committee.size()); let mut boot_counters = [0; NUM_OF_AUTHORITIES]; for (index, _authority_info) in committee.authorities() { - let (authority, receiver) = make_authority( + let (authority, commit_receiver, block_receiver) = make_authority( index, &temp_dirs[index.value()], committee.clone(), @@ -506,7 +510,8 @@ mod tests { ) .await; boot_counters[index] += 1; - output_receivers.push(receiver); + commit_receivers.push(commit_receiver); + block_receivers.push(block_receiver); authorities.push(authority); } @@ -522,7 +527,7 @@ mod tests { .unwrap(); } - for receiver in &mut output_receivers { + for receiver in &mut commit_receivers { let mut expected_transactions = submitted_transactions.clone(); loop { let committed_subdag = @@ -552,7 +557,7 @@ mod tests { sleep(Duration::from_secs(10)).await; // Restart authority 1 and let it run. - let (authority, receiver) = make_authority( + let (authority, commit_receiver, block_receiver) = make_authority( index, &temp_dirs[index.value()], committee.clone(), @@ -563,7 +568,8 @@ mod tests { ) .await; boot_counters[index] += 1; - output_receivers[index] = receiver; + commit_receivers[index] = commit_receiver; + block_receivers[index] = block_receiver; authorities.insert(index.value(), authority); sleep(Duration::from_secs(10)).await; @@ -585,7 +591,8 @@ mod tests { const NUM_OF_AUTHORITIES: usize = 4; let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec()); - let mut output_receivers = vec![]; + let mut commit_receivers = vec![]; + let mut block_receivers = vec![]; let mut authorities = BTreeMap::new(); let mut temp_dirs = BTreeMap::new(); let mut boot_counters = [0; NUM_OF_AUTHORITIES]; @@ -595,7 +602,7 @@ mod tests { for (index, _authority_info) in committee.authorities() { let dir = TempDir::new().unwrap(); - let (authority, receiver) = make_authority( + let (authority, commit_receiver, block_receiver) = make_authority( index, &dir, committee.clone(), @@ -607,7 +614,8 @@ mod tests { .await; assert!(authority.sync_last_known_own_block_enabled(), "Expected syncing of last known own block to be enabled as all authorities are of empty db and boot for first time."); boot_counters[index] += 1; - output_receivers.push(receiver); + commit_receivers.push(commit_receiver); + block_receivers.push(block_receiver); authorities.insert(index, authority); temp_dirs.insert(index, dir); } @@ -617,7 +625,7 @@ mod tests { // at least one block has been proposed and successfully received by a quorum of nodes. let index_1 = committee.to_authority_index(1).unwrap(); 'outer: while let Some(result) = - timeout(Duration::from_secs(10), output_receivers[index_1].recv()) + timeout(Duration::from_secs(10), commit_receivers[index_1].recv()) .await .expect("Timed out while waiting for at least one committed block from authority 1") { @@ -644,7 +652,7 @@ mod tests { let dir = TempDir::new().unwrap(); // We do reset the boot counter for this one to simulate a "binary" restart boot_counters[index_1] = 0; - let (authority, mut receiver) = make_authority( + let (authority, mut commit_receiver, _block_receiver) = make_authority( index_1, &dir, committee.clone(), @@ -665,7 +673,7 @@ mod tests { // Now spin up authority 2 using its earlier directly - so no amnesia recovery should be forced here. // Authority 1 should be able to recover from amnesia successfully. - let (authority, _receiver) = make_authority( + let (authority, _commit_receiver, _block_receiver) = make_authority( index_2, &temp_dirs[&index_2], committee.clone(), @@ -684,7 +692,7 @@ mod tests { sleep(Duration::from_secs(5)).await; // We wait until we see at least one committed block authored from this authority - 'outer: while let Some(result) = receiver.recv().await { + 'outer: while let Some(result) = commit_receiver.recv().await { for block in result.blocks { if block.round() > GENESIS_ROUND && block.author() == index_1 { break 'outer; @@ -707,7 +715,11 @@ mod tests { network_type: ConsensusNetwork, boot_counter: u64, protocol_config: ProtocolConfig, - ) -> (ConsensusAuthority, UnboundedReceiver) { + ) -> ( + ConsensusAuthority, + UnboundedReceiver, + UnboundedReceiver, + ) { let registry = Registry::new(); // Cache less blocks to exercise commit sync. @@ -724,7 +736,7 @@ mod tests { let protocol_keypair = keypairs[index].1.clone(); let network_keypair = keypairs[index].0.clone(); - let (commit_consumer, commit_receiver, _) = CommitConsumer::new(0); + let (commit_consumer, commit_receiver, block_receiver) = CommitConsumer::new(0); let authority = ConsensusAuthority::start( network_type, @@ -741,6 +753,6 @@ mod tests { ) .await; - (authority, commit_receiver) + (authority, commit_receiver, block_receiver) } } diff --git a/consensus/core/src/block.rs b/consensus/core/src/block.rs index 9327560348b672..55de50d65b0a99 100644 --- a/consensus/core/src/block.rs +++ b/consensus/core/src/block.rs @@ -658,6 +658,26 @@ pub(crate) fn genesis_blocks(context: Arc) -> Vec { .collect::>() } +/// A batch of blocks output by consensus for processing. +pub enum BlockOutputBatch { + /// All transactions in the block have a quorum of accept or reject votes. + Certified(Vec), +} + +/// A block output by consensus for processing. +#[derive(Clone)] +pub struct BlockOutput { + pub block: VerifiedBlock, + /// Sorted transaction indices that indicate the transactions rejected by a quorum. + pub rejected: Vec, +} + +impl BlockOutput { + pub fn new(block: VerifiedBlock, rejected: Vec) -> Self { + Self { block, rejected } + } +} + /// Creates fake blocks for testing. /// This struct is public for testing in other crates. #[derive(Clone)] diff --git a/consensus/core/src/commit.rs b/consensus/core/src/commit.rs index d1fbed756b4613..c8c175bca9a59a 100644 --- a/consensus/core/src/commit.rs +++ b/consensus/core/src/commit.rs @@ -293,6 +293,7 @@ pub type CommitVote = CommitRef; pub struct CommittedSubDag { /// A reference to the leader of the sub-dag pub leader: BlockRef, + // TODO: refactor blocks and rejected_transactions_by_block to BlockOutput. /// All the committed blocks that are part of this sub-dag pub blocks: Vec, /// Indices of rejected transactions in each block. diff --git a/consensus/core/src/commit_consumer.rs b/consensus/core/src/commit_consumer.rs index 4e40e6da4a3769..c00cf74ec179fc 100644 --- a/consensus/core/src/commit_consumer.rs +++ b/consensus/core/src/commit_consumer.rs @@ -6,18 +6,15 @@ use tokio::sync::watch; use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use crate::{CommitIndex, CommittedSubDag, TransactionIndex, VerifiedBlock}; +use crate::{block::BlockOutputBatch, CommitIndex, CommittedSubDag}; #[derive(Clone)] pub struct CommitConsumer { // A channel to output the committed sub dags. pub(crate) commit_sender: UnboundedSender, - // A channel to output certified and rejected transactions by batches of blocks. - // Each tuple contains the block containing transactions, and indices of rejected transactions. - // In each block, transactions that are not rejected are considered certified. - // Batches of blocks are sent together, to improve efficiency. - #[allow(unused)] - pub(crate) transaction_sender: UnboundedSender)>>, + // A channel to output blocks for processing, separated from consensus commits. + // In each block output, transactions that are not rejected are considered certified. + pub(crate) block_sender: UnboundedSender, // Index of the last commit that the consumer has processed. This is useful for // crash/recovery so mysticeti can replay the commits from the next index. // First commit in the replayed sequence will have index last_processed_commit_index + 1. @@ -33,21 +30,21 @@ impl CommitConsumer { ) -> ( Self, UnboundedReceiver, - UnboundedReceiver)>>, + UnboundedReceiver, ) { let (commit_sender, commit_receiver) = unbounded_channel("consensus_output"); - let (transaction_sender, transaction_receiver) = unbounded_channel("consensus_certified"); + let (block_sender, block_receiver) = unbounded_channel("consensus_certified"); let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index)); ( Self { commit_sender, - transaction_sender, + block_sender, last_processed_commit_index, monitor, }, commit_receiver, - transaction_receiver, + block_receiver, ) } diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index 9196235c3b9347..f2e40da49d70b1 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -9,7 +9,7 @@ use tokio::time::Instant; use tracing::{debug, info}; use crate::{ - block::{BlockAPI, VerifiedBlock}, + block::{BlockAPI, BlockOutputBatch, VerifiedBlock}, commit::{load_committed_subdag_from_store, CommitAPI, CommitIndex}, context::Context, dag_state::DagState, @@ -17,7 +17,7 @@ use crate::{ leader_schedule::LeaderSchedule, linearizer::Linearizer, storage::Store, - CommitConsumer, CommittedSubDag, + BlockOutput, CommitConsumer, CommittedSubDag, }; /// Role of CommitObserver @@ -37,8 +37,10 @@ pub(crate) struct CommitObserver { context: Arc, /// Component to deterministically collect subdags for committed leaders. commit_interpreter: Linearizer, - /// An unbounded channel to send committed sub-dags to the consumer of consensus output. - sender: UnboundedSender, + /// An unbounded channel to send commits to commit handler. + commit_sender: UnboundedSender, + /// An unbounded channel to send blocks to block handler. + block_sender: UnboundedSender, /// Persistent storage for blocks, commits and other consensus data. store: Arc, leader_schedule: Arc, @@ -55,7 +57,8 @@ impl CommitObserver { let mut observer = Self { context, commit_interpreter: Linearizer::new(dag_state.clone(), leader_schedule.clone()), - sender: commit_consumer.commit_sender, + commit_sender: commit_consumer.commit_sender, + block_sender: commit_consumer.block_sender, store, leader_schedule, }; @@ -80,8 +83,8 @@ impl CommitObserver { let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len()); for committed_sub_dag in committed_sub_dags.into_iter() { // Failures in sender.send() are assumed to be permanent - if let Err(err) = self.sender.send(committed_sub_dag.clone()) { - tracing::error!( + if let Err(err) = self.commit_sender.send(committed_sub_dag.clone()) { + tracing::warn!( "Failed to send committed sub-dag, probably due to shutdown: {err:?}" ); return Err(ConsensusError::Shutdown); @@ -99,6 +102,24 @@ impl CommitObserver { Ok(sent_sub_dags) } + pub(crate) fn handle_certified_blocks( + &mut self, + certified_blocks: Vec, + ) -> ConsensusResult<()> { + if certified_blocks.is_empty() { + return Ok(()); + } + if let Err(err) = self + .block_sender + .send(BlockOutputBatch::Certified(certified_blocks)) + { + tracing::warn!("Failed to send certified blocks, probably due to shutdown: {err:?}"); + return Err(ConsensusError::Shutdown); + } + Ok(()) + } + + // Certified blocks are not sent to fast path, for efficiency. fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) { let now = Instant::now(); // TODO: remove this check, to allow consensus to regenerate commits? @@ -149,12 +170,14 @@ impl CommitObserver { info!("Sending commit {} during recovery", commit.index()); let committed_sub_dag = load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores); - self.sender.send(committed_sub_dag).unwrap_or_else(|e| { - panic!( - "Failed to send commit during recovery, probably due to shutdown: {:?}", - e - ) - }); + self.commit_sender + .send(committed_sub_dag) + .unwrap_or_else(|e| { + panic!( + "Failed to send commit during recovery, probably due to shutdown: {:?}", + e + ) + }); last_sent_commit_index += 1; } diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index a1ff274e6722c9..c9076e811a9968 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -40,8 +40,8 @@ use crate::{ }; #[cfg(test)] use crate::{ - block_verifier::NoopBlockVerifier, storage::mem_store::MemStore, CommitConsumer, - TransactionClient, + block_verifier::NoopBlockVerifier, storage::mem_store::MemStore, BlockOutputBatch, + CommitConsumer, TransactionClient, }; // Maximum number of commit votes to include in a block. @@ -264,6 +264,8 @@ impl Core { // Try to propose now since there are new blocks accepted. self.try_propose(false)?; + + self.try_certify()?; } if !missing_blocks.is_empty() { @@ -620,6 +622,17 @@ impl Core { Ok(committed_subdags) } + // Try processed certified blocks. + // Every transaction in a certified block is either accepted or rejected by a quorum. + fn try_certify(&mut self) -> ConsensusResult<()> { + if !self.context.protocol_config.mysticeti_fastpath() { + return Ok(()); + } + let certified_blocks = self.dag_state.write().take_certified_blocks(); + self.commit_observer + .handle_certified_blocks(certified_blocks) + } + pub(crate) fn get_missing_blocks(&self) -> BTreeSet { let _scope = monitored_scope("Core::get_missing_blocks"); self.block_manager.missing_blocks() @@ -901,8 +914,8 @@ pub(crate) struct CoreTextFixture { pub core: Core, pub signal_receivers: CoreSignalsReceivers, pub block_receiver: broadcast::Receiver, - #[allow(unused)] - pub commit_receiver: UnboundedReceiver, + pub _commit_output_receiver: UnboundedReceiver, + pub _block_output_receiver: UnboundedReceiver, pub store: Arc, } @@ -937,7 +950,8 @@ impl CoreTextFixture { // Need at least one subscriber to the block broadcast channel. let block_receiver = signal_receivers.block_broadcast_receiver(); - let (commit_consumer, commit_receiver, _transaction_receiver) = CommitConsumer::new(0); + let (commit_consumer, commit_output_receiver, block_output_receiver) = + CommitConsumer::new(0); let commit_observer = CommitObserver::new( context.clone(), commit_consumer, @@ -965,7 +979,8 @@ impl CoreTextFixture { core, signal_receivers, block_receiver, - commit_receiver, + _commit_output_receiver: commit_output_receiver, + _block_output_receiver: block_output_receiver, store, } } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 83aada9cb05a7e..9a3ee7cb9ef341 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -10,14 +10,14 @@ use std::{ vec, }; -use consensus_config::AuthorityIndex; +use consensus_config::{AuthorityIndex, Committee}; use itertools::Itertools as _; use tracing::{debug, error, info}; use crate::{ block::{ - genesis_blocks, BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, Round, Slot, - VerifiedBlock, GENESIS_ROUND, + genesis_blocks, BlockAPI, BlockDigest, BlockOutput, BlockRef, BlockTimestampMs, Round, + Slot, VerifiedBlock, GENESIS_ROUND, }, commit::{ load_committed_subdag_from_store, CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, @@ -27,7 +27,7 @@ use crate::{ leader_scoring::{ReputationScores, ScoringSubdag}, stake_aggregator::{QuorumThreshold, StakeAggregator}, storage::{Store, WriteBatch}, - CommittedSubDag, + CommittedSubDag, TransactionIndex, }; /// DagState provides the API to write and read accepted blocks from the DAG. @@ -50,7 +50,7 @@ pub(crate) struct DagState { // CACHED_ROUNDS worth of data. The entries are evicted based on the latest GC round, however the eviction process will respect the CACHED_ROUNDS. // For each authority, blocks are only evicted when their round is less than or equal to both `gc_round`, and `highest authority round - cached rounds`. // This ensures that the GC requirements are respected (we never clean up any block above `gc_round`), and there are enough blocks cached. - recent_blocks: BTreeMap, + recent_blocks: BTreeMap, // Indexes recent block refs by their authorities. // Vec position corresponds to the authority index. @@ -87,6 +87,9 @@ pub(crate) struct DagState { // TODO: limit to 1st commit per round with multi-leader. pending_commit_votes: VecDeque, + // Certified blocks pending to be processed outside of consensus. + pending_certified_blocks: Vec, + // Data to be flushed to storage. blocks_to_write: Vec, commits_to_write: Vec, @@ -173,6 +176,7 @@ impl DagState { last_commit_round_advancement_time: None, last_committed_rounds: last_committed_rounds.clone(), pending_commit_votes: VecDeque::new(), + pending_certified_blocks: vec![], blocks_to_write: vec![], commits_to_write: vec![], commit_info_to_write: vec![], @@ -268,7 +272,14 @@ impl DagState { ); } self.update_block_metadata(&block); + + if self.context.protocol_config.mysticeti_fastpath() { + let certified_blocks = self.update_certification_votes(&block); + self.pending_certified_blocks.extend(certified_blocks); + } + self.blocks_to_write.push(block); + let source = if self.context.own_index == block_ref.author { "own" } else { @@ -285,7 +296,8 @@ impl DagState { /// Updates internal metadata for a block. fn update_block_metadata(&mut self, block: &VerifiedBlock) { let block_ref = block.reference(); - self.recent_blocks.insert(block_ref, block.clone()); + self.recent_blocks + .insert(block_ref, BlockInfo::new(block.clone())); self.recent_refs_by_authority[block_ref.author].insert(block_ref); self.highest_accepted_round = max(self.highest_accepted_round, block.round()); self.context @@ -307,6 +319,90 @@ impl DagState { .set(highest_accepted_round_for_author as i64); } + // Updates votes for certification of transactions in the causal history of the block. + fn update_certification_votes(&mut self, block: &VerifiedBlock) -> Vec { + let mut certified_blocks = vec![]; + + // When a block has an explicit vote, record the rejections. The rest of the transactions are implicitly accepted. + for block_votes in block.transaction_votes() { + let Some(block_info) = self.recent_blocks.get_mut(&block_votes.block_ref) else { + // TODO(fastpath): ensure the voted block exists in the DAG, with BlockManager. + // If the block is not found, it is outside of GC bound. + continue; + }; + block_info + .accept_votes + .add_unique(block.author(), &self.context.committee); + for reject in &block_votes.rejects { + block_info + .reject_votes + .entry(*reject) + .or_default() + .add_unique(block.author(), &self.context.committee); + } + if let Some(b) = block_info.take_certified_output(&self.context.committee) { + certified_blocks.push(b); + } + } + + // Transactions in unvoted ancestors of the block are implicitly accepted. This is the common case. + for ancestor in block.ancestors() { + let blocks = self.update_accept_votes_for_ancestor_authority(block, *ancestor); + certified_blocks.extend(blocks); + } + + certified_blocks + } + + // This function updates the implicit accept votes for the target block and its ancestor blocks + // of the same authority. + // + // All blocks in the causal history of the voter / target block can in theory receive implicit + // accept votes. But traversing the causal history of voter / target block is very expensive. + // Instead, voter blocks's ancestors are traversed in update_certification_votes(). + // And only ancestors from the same authority of target are traversed here. This will not miss + // voting on block, when neither voter or target authority is equivocating. + // + // If any one of the authority is equivocating, there can be blocks in the causal history of + // voter / target block that do not receive implicit accept votes. This is ok though. The worst + // downside is that these blocks will not get certified and sent to fast path. But they can still + // be finalized with the consensus commit, which counts votes differently. THere is no safety or + // liveness issue. + fn update_accept_votes_for_ancestor_authority( + &mut self, + voter_block: &VerifiedBlock, + mut target: BlockRef, + ) -> Vec { + const VOTING_ROUNDS: u32 = 5; + let mut certified_blocks = vec![]; + while target.round >= voter_block.round().saturating_sub(VOTING_ROUNDS) { + let Some(target_block_info) = self.recent_blocks.get_mut(&target) else { + // The target block has been GC'ed. + break; + }; + if !target_block_info + .accept_votes + .add_unique(voter_block.author(), &self.context.committee) + { + // Stop voting because this target block and its ancestors from the same authority have been voted. + break; + } + if let Some(b) = target_block_info.take_certified_output(&self.context.committee) { + certified_blocks.push(b); + } + // Try voting on the ancestor of the same authority. + if let Some(a) = target_block_info.block.ancestors().first() { + if a.author == target.author { + target = *a; + continue; + } + } + // Stop voting because genesis block is reached. + break; + } + certified_blocks + } + /// Accepts a blocks into DagState and keeps it in memory. pub(crate) fn accept_blocks(&mut self, blocks: Vec) { debug!( @@ -340,8 +436,8 @@ impl DagState { } continue; } - if let Some(block) = self.recent_blocks.get(block_ref) { - blocks[index] = Some(block.clone()); + if let Some(block_info) = self.recent_blocks.get(block_ref) { + blocks[index] = Some(block_info.block.clone()); continue; } missing.push((index, block_ref)); @@ -380,11 +476,11 @@ impl DagState { // or support reading from storage while limiting storage reads to edge cases. let mut blocks = vec![]; - for (_block_ref, block) in self.recent_blocks.range(( + for (_block_ref, block_info) in self.recent_blocks.range(( Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)), Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)), )) { - blocks.push(block.clone()) + blocks.push(block_info.block.clone()) } blocks } @@ -397,7 +493,7 @@ impl DagState { } let mut blocks = vec![]; - for (_block_ref, block) in self.recent_blocks.range(( + for (_block_ref, block_info) in self.recent_blocks.range(( Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)), Excluded(BlockRef::new( round + 1, @@ -405,7 +501,7 @@ impl DagState { BlockDigest::MIN, )), )) { - blocks.push(block.clone()) + blocks.push(block_info.block.clone()) } blocks } @@ -460,6 +556,7 @@ impl DagState { .recent_blocks .get(last) .expect("Block should be found in recent blocks") + .block .clone(); } @@ -487,11 +584,11 @@ impl DagState { Included(BlockRef::new(start, authority, BlockDigest::MIN)), Unbounded, )) { - let block = self + let block_info = self .recent_blocks .get(block_ref) .expect("Block should exist in recent blocks"); - blocks.push(block.clone()); + blocks.push(block_info.block.clone()); } blocks } @@ -541,12 +638,12 @@ impl DagState { )) .next_back() { - let block = self + let block_info = self .recent_blocks .get(block_ref) .expect("Block should exist in recent blocks"); - blocks[authority_index] = block.clone(); + blocks[authority_index] = block_info.block.clone(); } } @@ -716,6 +813,11 @@ impl DagState { votes } + /// Returns the pending certified blocks. + pub(crate) fn take_certified_blocks(&mut self) -> Vec { + self.pending_certified_blocks.drain(..).collect() + } + /// Index of the last commit. pub(crate) fn last_commit_index(&self) -> CommitIndex { match &self.last_commit { @@ -970,6 +1072,59 @@ impl DagState { } } +struct BlockInfo { + block: VerifiedBlock, + // Accumulates implicit accept votes for all transactions. + accept_votes: StakeAggregator, + // Accumulates reject votes per transaction. + reject_votes: BTreeMap>, + // Whether this block has been sent to output after it has been certified. + certified_output: bool, +} + +impl BlockInfo { + fn new(block: VerifiedBlock) -> Self { + Self { + block, + accept_votes: StakeAggregator::new(), + reject_votes: BTreeMap::new(), + certified_output: false, + } + } + + // If this block has been certified but has not been sent to output, returns the output. + // Otherwise, returns None. + fn take_certified_output(&mut self, committee: &Committee) -> Option { + if self.certified_output { + return None; + } + if !self.accept_votes.reached_threshold(committee) { + return None; + } + let mut rejected = vec![]; + for (idx, reject_votes) in &self.reject_votes { + if reject_votes.reached_threshold(committee) { + rejected.push(*idx); + continue; + } + if self + .accept_votes + .stake() + .checked_sub(reject_votes.stake()) + .unwrap() + < committee.quorum_threshold() + { + return None; + } + } + self.certified_output = true; + Some(BlockOutput { + block: self.block.clone(), + rejected, + }) + } +} + #[cfg(test)] mod test { use std::vec; diff --git a/consensus/core/src/lib.rs b/consensus/core/src/lib.rs index 36b980311f5d77..fa4315b21ba358 100644 --- a/consensus/core/src/lib.rs +++ b/consensus/core/src/lib.rs @@ -45,7 +45,7 @@ mod test_dag_parser; /// Exported consensus API. pub use authority_node::ConsensusAuthority; -pub use block::{BlockAPI, Round, TransactionIndex}; +pub use block::{BlockAPI, BlockOutput, BlockOutputBatch, Round, TransactionIndex}; /// Exported API for testing. pub use block::{TestBlock, Transaction, VerifiedBlock}; pub use commit::{CommitDigest, CommitIndex, CommitRef, CommittedSubDag}; diff --git a/consensus/core/src/stake_aggregator.rs b/consensus/core/src/stake_aggregator.rs index 11501319e64e96..ae7985038ae5f1 100644 --- a/consensus/core/src/stake_aggregator.rs +++ b/consensus/core/src/stake_aggregator.rs @@ -9,9 +9,11 @@ pub(crate) trait CommitteeThreshold { fn is_threshold(committee: &Committee, amount: Stake) -> bool; } +#[derive(Default)] pub(crate) struct QuorumThreshold; -#[allow(unused)] +#[cfg(test)] +#[derive(Default)] pub(crate) struct ValidityThreshold; impl CommitteeThreshold for QuorumThreshold { @@ -20,12 +22,14 @@ impl CommitteeThreshold for QuorumThreshold { } } +#[cfg(test)] impl CommitteeThreshold for ValidityThreshold { fn is_threshold(committee: &Committee, amount: Stake) -> bool { committee.reached_validity(amount) } } +#[derive(Default)] pub(crate) struct StakeAggregator { votes: BTreeSet, stake: Stake, @@ -51,6 +55,17 @@ impl StakeAggregator { T::is_threshold(committee, self.stake) } + /// Adds a vote for the specified authority index to the aggregator. It is guaranteed to count + /// the vote only once for an authority. + /// The method returns true when the vote comes from a new authority and is counted. + pub(crate) fn add_unique(&mut self, vote: AuthorityIndex, committee: &Committee) -> bool { + if self.votes.insert(vote) { + self.stake += committee.stake(vote); + return true; + } + false + } + pub(crate) fn stake(&self) -> Stake { self.stake } @@ -82,6 +97,27 @@ mod tests { assert!(aggregator.add(AuthorityIndex::new_for_test(3), &committee)); } + #[test] + fn test_add_unique_quorum_threshold() { + let committee = local_committee_and_keys(0, vec![1, 1, 1, 1]).0; + let mut aggregator = StakeAggregator::::new(); + + assert!(aggregator.add_unique(AuthorityIndex::new_for_test(0), &committee)); + assert!(!aggregator.reached_threshold(&committee)); + + assert!(aggregator.add_unique(AuthorityIndex::new_for_test(1), &committee)); + assert!(!aggregator.reached_threshold(&committee)); + + assert!(!aggregator.add_unique(AuthorityIndex::new_for_test(1), &committee)); + assert!(!aggregator.reached_threshold(&committee)); + + assert!(aggregator.add_unique(AuthorityIndex::new_for_test(2), &committee)); + assert!(aggregator.reached_threshold(&committee)); + + assert!(aggregator.add_unique(AuthorityIndex::new_for_test(3), &committee)); + assert!(aggregator.reached_threshold(&committee)); + } + #[test] fn test_aggregator_validity_threshold() { let committee = local_committee_and_keys(0, vec![1, 1, 1, 1]).0; diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index 45e1a269a7876e..5e566b1be97784 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -10,7 +10,7 @@ use std::{ use arc_swap::ArcSwap; use consensus_config::Committee as ConsensusCommittee; -use consensus_core::{CommitConsumerMonitor, TransactionIndex, VerifiedBlock}; +use consensus_core::{BlockOutput, BlockOutputBatch, CommitConsumerMonitor}; use lru::LruCache; use mysten_metrics::{ monitored_future, @@ -477,7 +477,7 @@ impl MysticetiConsensusHandler { mut consensus_handler: ConsensusHandler, consensus_transaction_handler: ConsensusTransactionHandler, mut commit_receiver: UnboundedReceiver, - mut transaction_receiver: UnboundedReceiver)>>, + mut block_receiver: UnboundedReceiver, commit_consumer_monitor: Arc, ) -> Self { let mut tasks = JoinSet::new(); @@ -493,10 +493,9 @@ impl MysticetiConsensusHandler { })); if consensus_transaction_handler.enabled() { tasks.spawn(monitored_future!(async move { - while let Some(blocks_and_rejected_transactions) = transaction_receiver.recv().await - { + while let Some(blocks) = block_receiver.recv().await { consensus_transaction_handler - .handle_consensus_transactions(blocks_and_rejected_transactions) + .handle_consensus_blocks(blocks) .await; } })); @@ -878,16 +877,20 @@ impl ConsensusTransactionHandler { self.enabled } - pub async fn handle_consensus_transactions( - &self, - blocks_and_rejected_transactions: Vec<(VerifiedBlock, Vec)>, - ) { - let _scope = monitored_scope("ConsensusTransactionHandler::handle_consensus_transactions"); + pub async fn handle_consensus_blocks(&self, block_output_batch: BlockOutputBatch) { + match block_output_batch { + BlockOutputBatch::Certified(blocks) => self.handle_certified_blocks(blocks).await, + } + } + + pub async fn handle_certified_blocks(&self, blocks: Vec) { + let _scope: Option = + monitored_scope("ConsensusTransactionHandler::handle_certified_blocks"); - let parsed_transactions = blocks_and_rejected_transactions + let parsed_transactions = blocks .into_iter() - .flat_map(|(block, rejected_transactions)| { - parse_block_transactions(&block, &rejected_transactions) + .flat_map(|certified_block| { + parse_block_transactions(&certified_block.block, &certified_block.rejected) }) .collect::>(); let mut pending_consensus_transactions = vec![]; @@ -958,7 +961,8 @@ impl ConsensusTransactionHandler { #[cfg(test)] mod tests { use consensus_core::{ - BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock, + BlockAPI, BlockOutput, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, + VerifiedBlock, }; use prometheus::Registry; use sui_protocol_config::ConsensusTransactionOrdering; @@ -968,6 +972,7 @@ mod tests { crypto::deterministic_random_account_key, messages_consensus::{ AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind, + TransactionIndex, }, object::Object, supported_protocol_versions::SupportedProtocolVersions, @@ -1265,7 +1270,10 @@ mod tests { // AND process the transactions from consensus output. transaction_handler - .handle_consensus_transactions(vec![(block.clone(), rejected_transactions.clone())]) + .handle_consensus_blocks(BlockOutputBatch::Certified(vec![BlockOutput::new( + block.clone(), + rejected_transactions.clone(), + )])) .await; // THEN check for status of transactions that should have been executed. diff --git a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs index 9b71d11b7f4737..fe24693ea52d89 100644 --- a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs +++ b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs @@ -149,7 +149,7 @@ impl ConsensusManagerTrait for MysticetiManager { let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap(); let consensus_handler = consensus_handler_initializer.new_consensus_handler(); - let (commit_consumer, commit_receiver, transaction_receiver) = + let (commit_consumer, commit_receiver, block_receiver) = CommitConsumer::new(consensus_handler.last_processed_subdag_index() as CommitIndex); let monitor = commit_consumer.monitor(); @@ -211,7 +211,7 @@ impl ConsensusManagerTrait for MysticetiManager { consensus_handler, consensus_transaction_handler, commit_receiver, - transaction_receiver, + block_receiver, monitor, );