Skip to content

Commit

Permalink
Send certified blocks to fast path.
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Oct 25, 2024
1 parent e0b51bc commit ab25d1a
Show file tree
Hide file tree
Showing 11 changed files with 349 additions and 82 deletions.
46 changes: 29 additions & 17 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,10 @@ mod tests {

use super::*;
use crate::block::GENESIS_ROUND;
use crate::{block::BlockAPI as _, transaction::NoopTransactionVerifier, CommittedSubDag};
use crate::{
block::BlockAPI as _, block::BlockOutputBatch, transaction::NoopTransactionVerifier,
CommittedSubDag,
};

#[rstest]
#[tokio::test]
Expand Down Expand Up @@ -490,12 +493,13 @@ mod tests {
.map(|_| TempDir::new().unwrap())
.collect::<Vec<_>>();

let mut output_receivers = Vec::with_capacity(committee.size());
let mut commit_receivers = Vec::with_capacity(committee.size());
let mut block_receivers = Vec::with_capacity(committee.size());
let mut authorities = Vec::with_capacity(committee.size());
let mut boot_counters = [0; NUM_OF_AUTHORITIES];

for (index, _authority_info) in committee.authorities() {
let (authority, receiver) = make_authority(
let (authority, commit_receiver, block_receiver) = make_authority(
index,
&temp_dirs[index.value()],
committee.clone(),
Expand All @@ -506,7 +510,8 @@ mod tests {
)
.await;
boot_counters[index] += 1;
output_receivers.push(receiver);
commit_receivers.push(commit_receiver);
block_receivers.push(block_receiver);
authorities.push(authority);
}

Expand All @@ -522,7 +527,7 @@ mod tests {
.unwrap();
}

for receiver in &mut output_receivers {
for receiver in &mut commit_receivers {
let mut expected_transactions = submitted_transactions.clone();
loop {
let committed_subdag =
Expand Down Expand Up @@ -552,7 +557,7 @@ mod tests {
sleep(Duration::from_secs(10)).await;

// Restart authority 1 and let it run.
let (authority, receiver) = make_authority(
let (authority, commit_receiver, block_receiver) = make_authority(
index,
&temp_dirs[index.value()],
committee.clone(),
Expand All @@ -563,7 +568,8 @@ mod tests {
)
.await;
boot_counters[index] += 1;
output_receivers[index] = receiver;
commit_receivers[index] = commit_receiver;
block_receivers[index] = block_receiver;
authorities.insert(index.value(), authority);
sleep(Duration::from_secs(10)).await;

Expand All @@ -585,7 +591,8 @@ mod tests {

const NUM_OF_AUTHORITIES: usize = 4;
let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
let mut output_receivers = vec![];
let mut commit_receivers = vec![];
let mut block_receivers = vec![];
let mut authorities = BTreeMap::new();
let mut temp_dirs = BTreeMap::new();
let mut boot_counters = [0; NUM_OF_AUTHORITIES];
Expand All @@ -595,7 +602,7 @@ mod tests {

for (index, _authority_info) in committee.authorities() {
let dir = TempDir::new().unwrap();
let (authority, receiver) = make_authority(
let (authority, commit_receiver, block_receiver) = make_authority(
index,
&dir,
committee.clone(),
Expand All @@ -607,7 +614,8 @@ mod tests {
.await;
assert!(authority.sync_last_known_own_block_enabled(), "Expected syncing of last known own block to be enabled as all authorities are of empty db and boot for first time.");
boot_counters[index] += 1;
output_receivers.push(receiver);
commit_receivers.push(commit_receiver);
block_receivers.push(block_receiver);
authorities.insert(index, authority);
temp_dirs.insert(index, dir);
}
Expand All @@ -617,7 +625,7 @@ mod tests {
// at least one block has been proposed and successfully received by a quorum of nodes.
let index_1 = committee.to_authority_index(1).unwrap();
'outer: while let Some(result) =
timeout(Duration::from_secs(10), output_receivers[index_1].recv())
timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
.await
.expect("Timed out while waiting for at least one committed block from authority 1")
{
Expand All @@ -644,7 +652,7 @@ mod tests {
let dir = TempDir::new().unwrap();
// We do reset the boot counter for this one to simulate a "binary" restart
boot_counters[index_1] = 0;
let (authority, mut receiver) = make_authority(
let (authority, mut commit_receiver, _block_receiver) = make_authority(
index_1,
&dir,
committee.clone(),
Expand All @@ -665,7 +673,7 @@ mod tests {

// Now spin up authority 2 using its earlier directly - so no amnesia recovery should be forced here.
// Authority 1 should be able to recover from amnesia successfully.
let (authority, _receiver) = make_authority(
let (authority, _commit_receiver, _block_receiver) = make_authority(
index_2,
&temp_dirs[&index_2],
committee.clone(),
Expand All @@ -684,7 +692,7 @@ mod tests {
sleep(Duration::from_secs(5)).await;

// We wait until we see at least one committed block authored from this authority
'outer: while let Some(result) = receiver.recv().await {
'outer: while let Some(result) = commit_receiver.recv().await {
for block in result.blocks {
if block.round() > GENESIS_ROUND && block.author() == index_1 {
break 'outer;
Expand All @@ -707,7 +715,11 @@ mod tests {
network_type: ConsensusNetwork,
boot_counter: u64,
protocol_config: ProtocolConfig,
) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
) -> (
ConsensusAuthority,
UnboundedReceiver<CommittedSubDag>,
UnboundedReceiver<BlockOutputBatch>,
) {
let registry = Registry::new();

// Cache less blocks to exercise commit sync.
Expand All @@ -724,7 +736,7 @@ mod tests {
let protocol_keypair = keypairs[index].1.clone();
let network_keypair = keypairs[index].0.clone();

let (commit_consumer, commit_receiver, _) = CommitConsumer::new(0);
let (commit_consumer, commit_receiver, block_receiver) = CommitConsumer::new(0);

let authority = ConsensusAuthority::start(
network_type,
Expand All @@ -741,6 +753,6 @@ mod tests {
)
.await;

(authority, commit_receiver)
(authority, commit_receiver, block_receiver)
}
}
20 changes: 20 additions & 0 deletions consensus/core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,26 @@ pub(crate) fn genesis_blocks(context: Arc<Context>) -> Vec<VerifiedBlock> {
.collect::<Vec<VerifiedBlock>>()
}

/// A batch of blocks output by consensus for processing.
pub enum BlockOutputBatch {
/// All transactions in the block have a quorum of accept or reject votes.
Certified(Vec<BlockOutput>),
}

/// A block output by consensus for processing.
#[derive(Clone)]
pub struct BlockOutput {
pub block: VerifiedBlock,
/// Sorted transaction indices that indicate the transactions rejected by a quorum.
pub rejected: Vec<TransactionIndex>,
}

impl BlockOutput {
pub fn new(block: VerifiedBlock, rejected: Vec<TransactionIndex>) -> Self {
Self { block, rejected }
}
}

/// Creates fake blocks for testing.
/// This struct is public for testing in other crates.
#[derive(Clone)]
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ pub type CommitVote = CommitRef;
pub struct CommittedSubDag {
/// A reference to the leader of the sub-dag
pub leader: BlockRef,
// TODO: refactor blocks and rejected_transactions_by_block to BlockOutput.
/// All the committed blocks that are part of this sub-dag
pub blocks: Vec<VerifiedBlock>,
/// Indices of rejected transactions in each block.
Expand Down
19 changes: 8 additions & 11 deletions consensus/core/src/commit_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@ use tokio::sync::watch;

use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

use crate::{CommitIndex, CommittedSubDag, TransactionIndex, VerifiedBlock};
use crate::{block::BlockOutputBatch, CommitIndex, CommittedSubDag};

#[derive(Clone)]
pub struct CommitConsumer {
// A channel to output the committed sub dags.
pub(crate) commit_sender: UnboundedSender<CommittedSubDag>,
// A channel to output certified and rejected transactions by batches of blocks.
// Each tuple contains the block containing transactions, and indices of rejected transactions.
// In each block, transactions that are not rejected are considered certified.
// Batches of blocks are sent together, to improve efficiency.
#[allow(unused)]
pub(crate) transaction_sender: UnboundedSender<Vec<(VerifiedBlock, Vec<TransactionIndex>)>>,
// A channel to output blocks for processing, separated from consensus commits.
// In each block output, transactions that are not rejected are considered certified.
pub(crate) block_sender: UnboundedSender<BlockOutputBatch>,
// Index of the last commit that the consumer has processed. This is useful for
// crash/recovery so mysticeti can replay the commits from the next index.
// First commit in the replayed sequence will have index last_processed_commit_index + 1.
Expand All @@ -33,21 +30,21 @@ impl CommitConsumer {
) -> (
Self,
UnboundedReceiver<CommittedSubDag>,
UnboundedReceiver<Vec<(VerifiedBlock, Vec<TransactionIndex>)>>,
UnboundedReceiver<BlockOutputBatch>,
) {
let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
let (transaction_sender, transaction_receiver) = unbounded_channel("consensus_certified");
let (block_sender, block_receiver) = unbounded_channel("consensus_certified");

let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index));
(
Self {
commit_sender,
transaction_sender,
block_sender,
last_processed_commit_index,
monitor,
},
commit_receiver,
transaction_receiver,
block_receiver,
)
}

Expand Down
49 changes: 36 additions & 13 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use tokio::time::Instant;
use tracing::{debug, info};

use crate::{
block::{BlockAPI, VerifiedBlock},
block::{BlockAPI, BlockOutputBatch, VerifiedBlock},
commit::{load_committed_subdag_from_store, CommitAPI, CommitIndex},
context::Context,
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
leader_schedule::LeaderSchedule,
linearizer::Linearizer,
storage::Store,
CommitConsumer, CommittedSubDag,
BlockOutput, CommitConsumer, CommittedSubDag,
};

/// Role of CommitObserver
Expand All @@ -37,8 +37,10 @@ pub(crate) struct CommitObserver {
context: Arc<Context>,
/// Component to deterministically collect subdags for committed leaders.
commit_interpreter: Linearizer,
/// An unbounded channel to send committed sub-dags to the consumer of consensus output.
sender: UnboundedSender<CommittedSubDag>,
/// An unbounded channel to send commits to commit handler.
commit_sender: UnboundedSender<CommittedSubDag>,
/// An unbounded channel to send blocks to block handler.
block_sender: UnboundedSender<BlockOutputBatch>,
/// Persistent storage for blocks, commits and other consensus data.
store: Arc<dyn Store>,
leader_schedule: Arc<LeaderSchedule>,
Expand All @@ -55,7 +57,8 @@ impl CommitObserver {
let mut observer = Self {
context,
commit_interpreter: Linearizer::new(dag_state.clone(), leader_schedule.clone()),
sender: commit_consumer.commit_sender,
commit_sender: commit_consumer.commit_sender,
block_sender: commit_consumer.block_sender,
store,
leader_schedule,
};
Expand All @@ -80,8 +83,8 @@ impl CommitObserver {
let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
for committed_sub_dag in committed_sub_dags.into_iter() {
// Failures in sender.send() are assumed to be permanent
if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
tracing::error!(
if let Err(err) = self.commit_sender.send(committed_sub_dag.clone()) {
tracing::warn!(
"Failed to send committed sub-dag, probably due to shutdown: {err:?}"
);
return Err(ConsensusError::Shutdown);
Expand All @@ -99,6 +102,24 @@ impl CommitObserver {
Ok(sent_sub_dags)
}

pub(crate) fn handle_certified_blocks(
&mut self,
certified_blocks: Vec<BlockOutput>,
) -> ConsensusResult<()> {
if certified_blocks.is_empty() {
return Ok(());
}
if let Err(err) = self
.block_sender
.send(BlockOutputBatch::Certified(certified_blocks))
{
tracing::warn!("Failed to send certified blocks, probably due to shutdown: {err:?}");
return Err(ConsensusError::Shutdown);
}
Ok(())
}

// Certified blocks are not sent to fast path, for efficiency.
fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
let now = Instant::now();
// TODO: remove this check, to allow consensus to regenerate commits?
Expand Down Expand Up @@ -149,12 +170,14 @@ impl CommitObserver {
info!("Sending commit {} during recovery", commit.index());
let committed_sub_dag =
load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
panic!(
"Failed to send commit during recovery, probably due to shutdown: {:?}",
e
)
});
self.commit_sender
.send(committed_sub_dag)
.unwrap_or_else(|e| {
panic!(
"Failed to send commit during recovery, probably due to shutdown: {:?}",
e
)
});

last_sent_commit_index += 1;
}
Expand Down
Loading

0 comments on commit ab25d1a

Please sign in to comment.