diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 98c4c22e23d62..682a79619a575 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -227,7 +227,7 @@ where let block_verifier = Arc::new(SignedBlockVerifier::new( context.clone(), - transaction_verifier, + transaction_verifier.clone(), )); let block_manager = @@ -310,6 +310,7 @@ where let network_service = Arc::new(AuthorityService::new( context.clone(), block_verifier, + transaction_verifier, commit_vote_monitor, synchronizer.clone(), core_dispatcher, diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index 702bf657f7b6d..6279dd34a8705 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -14,7 +14,9 @@ use tokio_util::sync::ReusableBoxFuture; use tracing::{debug, info, trace, warn}; use crate::{ - block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND}, + block::{ + BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, VerifiedVotedBlock, GENESIS_ROUND, + }, block_verifier::BlockVerifier, commit::{CommitAPI as _, CommitRange, TrustedCommit}, commit_vote_monitor::CommitVoteMonitor, @@ -26,7 +28,7 @@ use crate::{ stake_aggregator::{QuorumThreshold, StakeAggregator}, storage::Store, synchronizer::SynchronizerHandle, - CommitIndex, Round, + CommitIndex, Round, TransactionVerifier, }; pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5; @@ -36,6 +38,7 @@ pub(crate) struct AuthorityService { context: Arc, commit_vote_monitor: Arc, block_verifier: Arc, + transaction_verifier: Arc, synchronizer: Arc, core_dispatcher: Arc, rx_block_broadcaster: broadcast::Receiver, @@ -48,6 +51,7 @@ impl AuthorityService { pub(crate) fn new( context: Arc, block_verifier: Arc, + transaction_verifier: Arc, commit_vote_monitor: Arc, synchronizer: Arc, core_dispatcher: Arc, @@ -62,6 +66,7 @@ impl AuthorityService { Self { context, block_verifier, + transaction_verifier, commit_vote_monitor, synchronizer, core_dispatcher, @@ -113,6 +118,38 @@ impl NetworkService for AuthorityService { info!("Invalid block from {}: {}", peer, e); return Err(e); } + + let transaction_batch: Vec<_> = signed_block + .transactions() + .iter() + .map(|t| t.data()) + .collect(); + let transactions_to_reject = if self.context.protocol_config.mysticeti_fastpath() { + match self + .transaction_verifier + .verify_and_vote_batch(&transaction_batch) + .await + { + Ok(votes) => votes, + Err(e) => { + self.context + .metrics + .node_metrics + .invalid_blocks + .with_label_values(&[ + peer_hostname, + "handle_send_block", + "InvalidTransaction", + ]) + .inc(); + info!("Invalid block from {}: {}", peer, e); + return Err(ConsensusError::InvalidTransaction(format!("{e:?}"))); + } + } + } else { + vec![] + }; + let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block); trace!("Received block {verified_block} via send block."); @@ -211,7 +248,10 @@ impl NetworkService for AuthorityService { let missing_ancestors = self .core_dispatcher - .add_blocks(vec![verified_block]) + .add_voted_blocks(vec![VerifiedVotedBlock::new( + verified_block, + transactions_to_reject, + )]) .await .map_err(|_| ConsensusError::Shutdown)?; if !missing_ancestors.is_empty() { @@ -602,8 +642,8 @@ async fn make_recv_future( mod tests { use crate::{ authority_service::AuthorityService, - block::BlockAPI, - block::{BlockRef, SignedBlock, TestBlock, VerifiedBlock}, + block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock, VerifiedVotedBlock}, + block_verifier::NoopBlockVerifier, commit::CommitRange, commit_vote_monitor::CommitVoteMonitor, context::Context, @@ -615,6 +655,7 @@ mod tests { storage::mem_store::MemStore, synchronizer::Synchronizer, test_dag_builder::DagBuilder, + transaction::NoopTransactionVerifier, Round, }; use async_trait::async_trait; @@ -654,6 +695,17 @@ mod tests { Ok(block_refs) } + async fn add_voted_blocks( + &self, + blocks: Vec, + ) -> Result, CoreError> { + let block_refs = blocks.iter().map(|b| b.inner.reference()).collect(); + self.blocks + .lock() + .extend(blocks.into_iter().map(|b| b.inner)); + Ok(block_refs) + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { Ok(()) } @@ -749,7 +801,8 @@ mod tests { async fn test_handle_send_block() { let (context, _keys) = Context::new_for_test(4); let context = Arc::new(context); - let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {}); + let block_verifier = Arc::new(NoopBlockVerifier {}); + let transaction_verifier = Arc::new(NoopTransactionVerifier {}); let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone())); let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new()); let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100); @@ -768,6 +821,7 @@ mod tests { let authority_service = Arc::new(AuthorityService::new( context.clone(), block_verifier, + transaction_verifier, commit_vote_monitor, synchronizer, core_dispatcher.clone(), @@ -808,7 +862,8 @@ mod tests { // GIVEN let (context, _keys) = Context::new_for_test(4); let context = Arc::new(context); - let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {}); + let block_verifier = Arc::new(NoopBlockVerifier {}); + let transaction_verifier = Arc::new(NoopTransactionVerifier {}); let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone())); let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new()); let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100); @@ -827,6 +882,7 @@ mod tests { let authority_service = Arc::new(AuthorityService::new( context.clone(), block_verifier, + transaction_verifier, commit_vote_monitor, synchronizer, core_dispatcher.clone(), diff --git a/consensus/core/src/block.rs b/consensus/core/src/block.rs index d15356590c460..7c96c4fba8f35 100644 --- a/consensus/core/src/block.rs +++ b/consensus/core/src/block.rs @@ -102,6 +102,7 @@ pub(crate) struct BlockV1 { } impl BlockV1 { + #[allow(unused)] pub(crate) fn new( epoch: Epoch, round: Round, @@ -658,6 +659,18 @@ pub(crate) fn genesis_blocks(context: Arc) -> Vec { .collect::>() } +#[derive(Clone, Debug)] +pub(crate) struct VerifiedVotedBlock { + pub(crate) inner: VerifiedBlock, + pub(crate) to_reject: Vec, +} + +impl VerifiedVotedBlock { + pub(crate) fn new(inner: VerifiedBlock, to_reject: Vec) -> Self { + Self { inner, to_reject } + } +} + /// A batch of blocks output by consensus for processing. pub enum BlockOutputBatch { /// All transactions in the block have a quorum of accept or reject votes. diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index e5d9112b319e4..c0fa7f997b053 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -14,7 +14,7 @@ use parking_lot::RwLock; use tracing::{debug, trace, warn}; use crate::{ - block::{BlockAPI, BlockRef, VerifiedBlock, GENESIS_ROUND}, + block::{BlockAPI, BlockRef, VerifiedBlock, VerifiedVotedBlock, GENESIS_ROUND}, block_verifier::BlockVerifier, context::Context, dag_state::DagState, @@ -22,15 +22,15 @@ use crate::{ }; struct SuspendedBlock { - block: VerifiedBlock, + inner: VerifiedVotedBlock, missing_ancestors: BTreeSet, timestamp: Instant, } impl SuspendedBlock { - fn new(block: VerifiedBlock, missing_ancestors: BTreeSet) -> Self { + fn new(block: VerifiedVotedBlock, missing_ancestors: BTreeSet) -> Self { Self { - block, + inner: block, missing_ancestors, timestamp: Instant::now(), } @@ -80,31 +80,48 @@ impl BlockManager { received_block_rounds: vec![None; committee_size], } } - /// Tries to accept the provided blocks assuming that all their causal history exists. The method /// returns all the blocks that have been successfully processed in round ascending order, that includes also previously /// suspended blocks that have now been able to get accepted. Method also returns a set with the missing ancestor blocks. pub(crate) fn try_accept_blocks( &mut self, - mut blocks: Vec, + blocks: Vec, + ) -> (Vec, BTreeSet) { + self.try_accept_voted_blocks( + blocks + .into_iter() + .map(|b| VerifiedVotedBlock::new(b, vec![])) + .collect(), + ) + } + + /// Tries to accept the provided blocks assuming that all their causal history exists. The method + /// returns all the blocks that have been successfully processed in round ascending order, that includes also previously + /// suspended blocks that have now been able to get accepted. Method also returns a set with the missing ancestor blocks. + pub(crate) fn try_accept_voted_blocks( + &mut self, + mut blocks: Vec, ) -> (Vec, BTreeSet) { let _s = monitored_scope("BlockManager::try_accept_blocks"); - blocks.sort_by_key(|b| b.round()); + blocks.sort_by_key(|b| b.inner.round()); debug!( "Trying to accept blocks: {}", - blocks.iter().map(|b| b.reference().to_string()).join(",") + blocks + .iter() + .map(|b| b.inner.reference().to_string()) + .join(",") ); let mut accepted_blocks = vec![]; let mut missing_blocks = BTreeSet::new(); - for block in blocks { - self.update_block_received_metrics(&block); + for voted_block in blocks { + self.update_block_received_metrics(&voted_block.inner); // Try to accept the input block. - let block_ref = block.reference(); - let block = match self.try_accept_one_block(block) { + let block_ref = voted_block.inner.reference(); + let voted_block = match self.try_accept_one_block(voted_block) { TryAcceptResult::Accepted(block) => block, TryAcceptResult::Suspended(ancestors_to_fetch) => { trace!( @@ -118,11 +135,13 @@ impl BlockManager { }; // If the block is accepted, try to unsuspend its children blocks if any. - let unsuspended_blocks = self.try_unsuspend_children_blocks(block.reference()); + let unsuspended_blocks = + self.try_unsuspend_children_blocks(voted_block.inner.reference()); // Verify block timestamps - let blocks_to_accept = self - .verify_block_timestamps_and_accept(iter::once(block).chain(unsuspended_blocks)); + let blocks_to_accept = self.verify_block_timestamps_and_accept( + iter::once(voted_block).chain(unsuspended_blocks), + ); accepted_blocks.extend(blocks_to_accept); } @@ -149,17 +168,18 @@ impl BlockManager { // returns the accepted and persisted blocks. fn verify_block_timestamps_and_accept( &mut self, - unsuspended_blocks: impl IntoIterator, + unsuspended_blocks: impl IntoIterator, ) -> Vec { let (gc_enabled, gc_round) = { let dag_state = self.dag_state.read(); (dag_state.gc_enabled(), dag_state.gc_round()) }; // Try to verify the block and its children for timestamp, with ancestor blocks. - let mut blocks_to_accept: BTreeMap = BTreeMap::new(); - let mut blocks_to_reject: BTreeMap = BTreeMap::new(); + let mut blocks_to_accept: BTreeMap = BTreeMap::new(); + let mut blocks_to_reject: BTreeMap = BTreeMap::new(); { - 'block: for b in unsuspended_blocks { + 'block: for voted_block in unsuspended_blocks { + let b = &voted_block.inner; let ancestors = self.dag_state.read().get_blocks(b.ancestors()); assert_eq!(b.ancestors().len(), ancestors.len()); let mut ancestor_blocks = vec![]; @@ -175,12 +195,12 @@ impl BlockManager { // blocks_to_accept have not been added to DagState yet, but they // can appear in ancestors. if blocks_to_accept.contains_key(ancestor_ref) { - ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone())); + ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].inner.clone())); continue 'ancestor; } // If an ancestor is already rejected, reject this block as well. if blocks_to_reject.contains_key(ancestor_ref) { - blocks_to_reject.insert(b.reference(), b); + blocks_to_reject.insert(b.reference(), voted_block); continue 'block; } @@ -203,12 +223,12 @@ impl BlockManager { } if let Err(e) = self.block_verifier - .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round) + .check_ancestors(b, &ancestor_blocks, gc_enabled, gc_round) { warn!("Block {:?} failed to verify ancestors: {}", b, e); - blocks_to_reject.insert(b.reference(), b); + blocks_to_reject.insert(b.reference(), voted_block); } else { - blocks_to_accept.insert(b.reference(), b); + blocks_to_accept.insert(b.reference(), voted_block); } } } @@ -228,7 +248,7 @@ impl BlockManager { .invalid_blocks .with_label_values(&[&hostname, "accept_block", "InvalidAncestors"]) .inc(); - warn!("Invalid block {:?} is rejected", block); + warn!("Invalid block {:?} is rejected", block.inner); } let blocks_to_accept = blocks_to_accept.values().cloned().collect::>(); @@ -237,15 +257,16 @@ impl BlockManager { // ancestors do not get suspended. self.dag_state .write() - .accept_blocks(blocks_to_accept.clone()); + .accept_voted_blocks(blocks_to_accept.clone()); - blocks_to_accept + blocks_to_accept.into_iter().map(|b| b.inner).collect() } /// Tries to accept the provided block. To accept a block its ancestors must have been already successfully accepted. If /// block is accepted then Some result is returned. None is returned when either the block is suspended or the block /// has been already accepted before. - fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult { + fn try_accept_one_block(&mut self, voted_block: VerifiedVotedBlock) -> TryAcceptResult { + let block = &voted_block.inner; let block_ref = block.reference(); let mut missing_ancestors = BTreeSet::new(); let mut ancestors_to_fetch = BTreeSet::new(); @@ -344,17 +365,22 @@ impl BlockManager { .block_suspensions .with_label_values(&[hostname]) .inc(); - self.suspended_blocks - .insert(block_ref, SuspendedBlock::new(block, missing_ancestors)); + self.suspended_blocks.insert( + block_ref, + SuspendedBlock::new(voted_block, missing_ancestors), + ); return TryAcceptResult::Suspended(ancestors_to_fetch); } - TryAcceptResult::Accepted(block) + TryAcceptResult::Accepted(voted_block) } /// Given an accepted block `accepted_block` it attempts to accept all the suspended children blocks assuming such exist. /// All the unsuspended / accepted blocks are returned as a vector in causal order. - fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec { + fn try_unsuspend_children_blocks( + &mut self, + accepted_block: BlockRef, + ) -> Vec { let mut unsuspended_blocks = vec![]; let mut to_process_blocks = vec![accepted_block]; @@ -365,7 +391,7 @@ impl BlockManager { // For each dependency try to unsuspend it. If that's successful then we add it to the queue so // we can recursively try to unsuspend its children. if let Some(block) = self.try_unsuspend_block(&r, &block_ref) { - to_process_blocks.push(block.block.reference()); + to_process_blocks.push(block.inner.inner.reference()); unsuspended_blocks.push(block); } } @@ -379,7 +405,7 @@ impl BlockManager { let hostname = self .context .committee - .authority(block.block.author()) + .authority(block.inner.inner.author()) .hostname .as_str(); self.context @@ -398,7 +424,7 @@ impl BlockManager { unsuspended_blocks .into_iter() - .map(|block| block.block) + .map(|block| block.inner) .collect() } @@ -418,7 +444,7 @@ impl BlockManager { block.missing_ancestors.remove(accepted_dependency), "Block reference {} should be present in missing dependencies of {:?}", block_ref, - block.block + block.inner.inner, ); if block.missing_ancestors.is_empty() { @@ -462,7 +488,7 @@ impl BlockManager { let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref); unsuspended_blocks.iter().for_each(|block| { - if block.round() <= gc_round { + if block.inner.round() <= gc_round { blocks_unsuspended_below_gc_round += 1; } }); @@ -539,7 +565,7 @@ impl BlockManager { // Result of trying to accept one block. enum TryAcceptResult { // The block is accepted. Wraps the block itself. - Accepted(VerifiedBlock), + Accepted(VerifiedVotedBlock), // The block is suspended. Wraps ancestors to be fetched. Suspended(BTreeSet), // The block has been processed before and already exists in BlockManager (and is suspended) or diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 80d618d77cbde..10d9545d4c369 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -20,8 +20,8 @@ use tracing::{debug, info, warn}; use crate::{ block::{ - Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, Round, SignedBlock, Slot, - VerifiedBlock, GENESIS_ROUND, + Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV2, Round, SignedBlock, Slot, + VerifiedBlock, VerifiedVotedBlock, GENESIS_ROUND, }, block_manager::BlockManager, commit::CommittedSubDag, @@ -227,9 +227,24 @@ impl Core { /// Processes the provided blocks and accepts them if possible when their causal history exists. /// The method returns the references of parents that are unknown and need to be fetched. + #[cfg(test)] pub(crate) fn add_blocks( &mut self, blocks: Vec, + ) -> ConsensusResult> { + self.add_voted_blocks( + blocks + .into_iter() + .map(|b| VerifiedVotedBlock::new(b, vec![])) + .collect(), + ) + } + + /// Processes the provided blocks and accepts them if possible when their causal history exists. + /// The method returns the references of parents that are unknown and need to be fetched. + pub(crate) fn add_voted_blocks( + &mut self, + blocks: Vec, ) -> ConsensusResult> { let _scope = monitored_scope("Core::add_blocks"); let _s = self @@ -246,7 +261,7 @@ impl Core { .observe(blocks.len() as f64); // Try to accept them via the block manager - let (accepted_blocks, missing_blocks) = self.block_manager.try_accept_blocks(blocks); + let (accepted_blocks, missing_blocks) = self.block_manager.try_accept_voted_blocks(blocks); if !accepted_blocks.is_empty() { debug!( @@ -437,14 +452,15 @@ impl Core { .proposed_block_transactions .observe(transactions.len() as f64); - // Consume the commit votes to be included. + // Consume the commit and transaction votes to be included. let commit_votes = self .dag_state .write() .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK); + let transaction_votes = self.dag_state.write().take_transaction_votes(); // Create the block and insert to storage. - let block = Block::V1(BlockV1::new( + let block = Block::V2(BlockV2::new( self.context.committee.epoch(), clock_round, self.context.own_index, @@ -452,6 +468,7 @@ impl Core { ancestors.iter().map(|b| b.reference()).collect(), transactions, commit_votes, + transaction_votes, vec![], )); let signed_block = diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index 5b2a83748672e..fe942b7c0586b 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -21,7 +21,7 @@ use tokio::sync::{oneshot, watch}; use tracing::warn; use crate::{ - block::{BlockRef, Round, VerifiedBlock}, + block::{BlockRef, Round, VerifiedBlock, VerifiedVotedBlock}, context::Context, core::Core, core_thread::CoreError::Shutdown, @@ -35,7 +35,7 @@ const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000; enum CoreThreadCommand { /// Add blocks to be processed and accepted - AddBlocks(Vec, oneshot::Sender>), + AddBlocks(Vec, oneshot::Sender>), /// Called when the min round has passed or the leader timeout occurred and a block should be produced. /// When the command is called with `force = true`, then the block will be created for `round` skipping /// any checks (ex leader existence of previous round). More information can be found on the `Core` component. @@ -57,6 +57,11 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static { async fn add_blocks(&self, blocks: Vec) -> Result, CoreError>; + async fn add_voted_blocks( + &self, + blocks: Vec, + ) -> Result, CoreError>; + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>; async fn get_missing_blocks(&self) -> Result, CoreError>; @@ -116,7 +121,7 @@ impl CoreThread { match command { CoreThreadCommand::AddBlocks(blocks, sender) => { let _scope = monitored_scope("CoreThread::loop::add_blocks"); - let missing_blocks = self.core.add_blocks(blocks)?; + let missing_blocks = self.core.add_voted_blocks(blocks)?; sender.send(missing_blocks).ok(); } CoreThreadCommand::NewBlock(round, sender, force) => { @@ -260,8 +265,22 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher { &self, blocks: Vec, ) -> Result, CoreError> { - for block in &blocks { - self.highest_received_rounds[block.author()].fetch_max(block.round(), Ordering::AcqRel); + self.add_voted_blocks( + blocks + .into_iter() + .map(|b| VerifiedVotedBlock::new(b, vec![])) + .collect(), + ) + .await + } + + async fn add_voted_blocks( + &self, + blocks: Vec, + ) -> Result, CoreError> { + for b in &blocks { + self.highest_received_rounds[b.inner.author()] + .fetch_max(b.inner.round(), Ordering::AcqRel); } let (sender, receiver) = oneshot::channel(); self.send(CoreThreadCommand::AddBlocks(blocks, sender)) @@ -354,6 +373,15 @@ impl CoreThreadDispatcher for MockCoreThreadDispatcher { Ok(BTreeSet::new()) } + async fn add_voted_blocks( + &self, + blocks: Vec, + ) -> Result, CoreError> { + let mut add_blocks = self.add_blocks.lock(); + add_blocks.extend(blocks.into_iter().map(|b| b.inner)); + Ok(BTreeSet::new()) + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { Ok(()) } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index c445184c8c359..f9f4dd0fae5d7 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -16,8 +16,8 @@ use tracing::{debug, error, info}; use crate::{ block::{ - genesis_blocks, BlockAPI, BlockDigest, BlockOutput, BlockRef, BlockTimestampMs, Round, - Slot, VerifiedBlock, GENESIS_ROUND, + genesis_blocks, BlockAPI, BlockDigest, BlockOutput, BlockRef, BlockTimestampMs, + BlockTransactionVotes, Round, Slot, VerifiedBlock, VerifiedVotedBlock, GENESIS_ROUND, }, commit::{ load_committed_subdag_from_store, CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, @@ -87,6 +87,8 @@ pub(crate) struct DagState { // TODO: limit to 1st commit per round with multi-leader. pending_commit_votes: VecDeque, + pending_transaction_votes: VecDeque, + // Certified blocks pending to be processed outside of consensus. pending_certified_blocks: Vec, @@ -176,6 +178,7 @@ impl DagState { last_commit_round_advancement_time: None, last_committed_rounds: last_committed_rounds.clone(), pending_commit_votes: VecDeque::new(), + pending_transaction_votes: VecDeque::new(), pending_certified_blocks: vec![], blocks_to_write: vec![], commits_to_write: vec![], @@ -241,7 +244,14 @@ impl DagState { } /// Accepts a block into DagState and keeps it in memory. + #[cfg(test)] pub(crate) fn accept_block(&mut self, block: VerifiedBlock) { + self.accept_voted_block(VerifiedVotedBlock::new(block, vec![])); + } + + /// Accepts a block into DagState and keeps it in memory. + pub(crate) fn accept_voted_block(&mut self, voted_block: VerifiedVotedBlock) { + let block = voted_block.inner.clone(); assert_ne!( block.round(), 0, @@ -276,6 +286,11 @@ impl DagState { if self.context.protocol_config.mysticeti_fastpath() { let certified_blocks = self.update_certification_votes(&block); self.pending_certified_blocks.extend(certified_blocks); + self.pending_transaction_votes + .push_back(BlockTransactionVotes { + block_ref, + rejects: voted_block.to_reject, + }); } self.blocks_to_write.push(block); @@ -445,13 +460,28 @@ impl DagState { } /// Accepts a blocks into DagState and keeps it in memory. + #[cfg(test)] pub(crate) fn accept_blocks(&mut self, blocks: Vec) { debug!( "Accepting blocks: {}", blocks.iter().map(|b| b.reference().to_string()).join(",") ); for block in blocks { - self.accept_block(block); + self.accept_voted_block(VerifiedVotedBlock::new(block, vec![])); + } + } + + /// Accepts a blocks into DagState and keeps it in memory. + pub(crate) fn accept_voted_blocks(&mut self, blocks: Vec) { + debug!( + "Accepting voted blocks: {}", + blocks + .iter() + .map(|b| b.inner.reference().to_string()) + .join(",") + ); + for block in blocks { + self.accept_voted_block(block); } } @@ -889,6 +919,10 @@ impl DagState { votes } + pub(crate) fn take_transaction_votes(&mut self) -> Vec { + self.pending_transaction_votes.drain(..).collect() + } + /// Returns the pending certified blocks. pub(crate) fn take_certified_blocks(&mut self) -> Vec { self.pending_certified_blocks.drain(..).collect() diff --git a/consensus/core/src/leader_timeout.rs b/consensus/core/src/leader_timeout.rs index 1e28852edc3e3..0338f68eac7cf 100644 --- a/consensus/core/src/leader_timeout.rs +++ b/consensus/core/src/leader_timeout.rs @@ -128,7 +128,7 @@ mod tests { use parking_lot::Mutex; use tokio::time::{sleep, Instant}; - use crate::block::{BlockRef, Round, VerifiedBlock}; + use crate::block::{BlockRef, Round, VerifiedBlock, VerifiedVotedBlock}; use crate::context::Context; use crate::core::CoreSignals; use crate::core_thread::{CoreError, CoreThreadDispatcher}; @@ -157,6 +157,13 @@ mod tests { todo!() } + async fn add_voted_blocks( + &self, + _blocks: Vec, + ) -> Result, CoreError> { + todo!() + } + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> { self.new_block_calls .lock() diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index a80ffe148f1f9..5017f0f5450cb 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -299,7 +299,7 @@ mod test { use super::QuorumRound; use crate::{ - block::BlockRef, + block::{BlockRef, VerifiedVotedBlock}, commit::CommitRange, context::Context, core_thread::{CoreError, CoreThreadDispatcher}, @@ -344,6 +344,13 @@ mod test { unimplemented!() } + async fn add_voted_blocks( + &self, + _blocks: Vec, + ) -> Result, CoreError> { + unimplemented!() + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { unimplemented!() }