Skip to content

Commit

Permalink
Support voting in TransactionVerifier
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Nov 1, 2024
1 parent 11a581e commit d79675a
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 61 deletions.
3 changes: 2 additions & 1 deletion consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ where

let block_verifier = Arc::new(SignedBlockVerifier::new(
context.clone(),
transaction_verifier,
transaction_verifier.clone(),
));

let block_manager =
Expand Down Expand Up @@ -310,6 +310,7 @@ where
let network_service = Arc::new(AuthorityService::new(
context.clone(),
block_verifier,
transaction_verifier,
commit_vote_monitor,
synchronizer.clone(),
core_dispatcher,
Expand Down
70 changes: 63 additions & 7 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, info, trace, warn};

use crate::{
block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND},
block::{
BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, VerifiedVotedBlock, GENESIS_ROUND,
},
block_verifier::BlockVerifier,
commit::{CommitAPI as _, CommitRange, TrustedCommit},
commit_vote_monitor::CommitVoteMonitor,
Expand All @@ -26,7 +28,7 @@ use crate::{
stake_aggregator::{QuorumThreshold, StakeAggregator},
storage::Store,
synchronizer::SynchronizerHandle,
CommitIndex, Round,
CommitIndex, Round, TransactionVerifier,
};

pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
Expand All @@ -36,6 +38,7 @@ pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
context: Arc<Context>,
commit_vote_monitor: Arc<CommitVoteMonitor>,
block_verifier: Arc<dyn BlockVerifier>,
transaction_verifier: Arc<dyn TransactionVerifier>,
synchronizer: Arc<SynchronizerHandle>,
core_dispatcher: Arc<C>,
rx_block_broadcaster: broadcast::Receiver<VerifiedBlock>,
Expand All @@ -48,6 +51,7 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
pub(crate) fn new(
context: Arc<Context>,
block_verifier: Arc<dyn BlockVerifier>,
transaction_verifier: Arc<dyn TransactionVerifier>,
commit_vote_monitor: Arc<CommitVoteMonitor>,
synchronizer: Arc<SynchronizerHandle>,
core_dispatcher: Arc<C>,
Expand All @@ -62,6 +66,7 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
Self {
context,
block_verifier,
transaction_verifier,
commit_vote_monitor,
synchronizer,
core_dispatcher,
Expand Down Expand Up @@ -113,6 +118,38 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
info!("Invalid block from {}: {}", peer, e);
return Err(e);
}

let transaction_batch: Vec<_> = signed_block
.transactions()
.iter()
.map(|t| t.data())
.collect();
let transactions_to_reject = if self.context.protocol_config.mysticeti_fastpath() {
match self
.transaction_verifier
.verify_and_vote_batch(&transaction_batch)
.await
{
Ok(votes) => votes,
Err(e) => {
self.context
.metrics
.node_metrics
.invalid_blocks
.with_label_values(&[
peer_hostname,
"handle_send_block",
"InvalidTransaction",
])
.inc();
info!("Invalid block from {}: {}", peer, e);
return Err(ConsensusError::InvalidTransaction(format!("{e:?}")));
}
}
} else {
vec![]
};

let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);

trace!("Received block {verified_block} via send block.");
Expand Down Expand Up @@ -211,7 +248,10 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {

let missing_ancestors = self
.core_dispatcher
.add_blocks(vec![verified_block])
.add_voted_blocks(vec![VerifiedVotedBlock::new(
verified_block,
transactions_to_reject,
)])
.await
.map_err(|_| ConsensusError::Shutdown)?;
if !missing_ancestors.is_empty() {
Expand Down Expand Up @@ -602,8 +642,8 @@ async fn make_recv_future<T: Clone>(
mod tests {
use crate::{
authority_service::AuthorityService,
block::BlockAPI,
block::{BlockRef, SignedBlock, TestBlock, VerifiedBlock},
block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock, VerifiedVotedBlock},
block_verifier::NoopBlockVerifier,
commit::CommitRange,
commit_vote_monitor::CommitVoteMonitor,
context::Context,
Expand All @@ -615,6 +655,7 @@ mod tests {
storage::mem_store::MemStore,
synchronizer::Synchronizer,
test_dag_builder::DagBuilder,
transaction::NoopTransactionVerifier,
Round,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -654,6 +695,17 @@ mod tests {
Ok(block_refs)
}

async fn add_voted_blocks(
&self,
blocks: Vec<VerifiedVotedBlock>,
) -> Result<BTreeSet<BlockRef>, CoreError> {
let block_refs = blocks.iter().map(|b| b.inner.reference()).collect();
self.blocks
.lock()
.extend(blocks.into_iter().map(|b| b.inner));
Ok(block_refs)
}

async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
Ok(())
}
Expand Down Expand Up @@ -749,7 +801,8 @@ mod tests {
async fn test_handle_send_block() {
let (context, _keys) = Context::new_for_test(4);
let context = Arc::new(context);
let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
let block_verifier = Arc::new(NoopBlockVerifier {});
let transaction_verifier = Arc::new(NoopTransactionVerifier {});
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
Expand All @@ -768,6 +821,7 @@ mod tests {
let authority_service = Arc::new(AuthorityService::new(
context.clone(),
block_verifier,
transaction_verifier,
commit_vote_monitor,
synchronizer,
core_dispatcher.clone(),
Expand Down Expand Up @@ -808,7 +862,8 @@ mod tests {
// GIVEN
let (context, _keys) = Context::new_for_test(4);
let context = Arc::new(context);
let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
let block_verifier = Arc::new(NoopBlockVerifier {});
let transaction_verifier = Arc::new(NoopTransactionVerifier {});
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
Expand All @@ -827,6 +882,7 @@ mod tests {
let authority_service = Arc::new(AuthorityService::new(
context.clone(),
block_verifier,
transaction_verifier,
commit_vote_monitor,
synchronizer,
core_dispatcher.clone(),
Expand Down
13 changes: 13 additions & 0 deletions consensus/core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub(crate) struct BlockV1 {
}

impl BlockV1 {
#[allow(unused)]
pub(crate) fn new(
epoch: Epoch,
round: Round,
Expand Down Expand Up @@ -658,6 +659,18 @@ pub(crate) fn genesis_blocks(context: Arc<Context>) -> Vec<VerifiedBlock> {
.collect::<Vec<VerifiedBlock>>()
}

#[derive(Clone, Debug)]
pub(crate) struct VerifiedVotedBlock {
pub(crate) inner: VerifiedBlock,
pub(crate) to_reject: Vec<TransactionIndex>,
}

impl VerifiedVotedBlock {
pub(crate) fn new(inner: VerifiedBlock, to_reject: Vec<TransactionIndex>) -> Self {
Self { inner, to_reject }
}
}

/// A batch of blocks output by consensus for processing.
pub enum BlockOutputBatch {
/// All transactions in the block have a quorum of accept or reject votes.
Expand Down
Loading

0 comments on commit d79675a

Please sign in to comment.