Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Oct 29, 2024
1 parent 5690c52 commit b6e2da7
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 42 deletions.
6 changes: 4 additions & 2 deletions consensus/core/src/base_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ impl BaseCommitter {
// There can be at most one leader with enough support for each round, otherwise it means
// the BFT assumption is broken.
if leaders_with_enough_support.len() > 1 {
panic!("[{self}] More than one certified block for {leader}")
panic!(
"[{self}] More than one candidate for {leader}: {leaders_with_enough_support:?}"
);
}

leaders_with_enough_support
Expand Down Expand Up @@ -316,7 +318,7 @@ impl BaseCommitter {

// There can be at most one certified leader, otherwise it means the BFT assumption is broken.
if certified_leader_blocks.len() > 1 {
panic!("More than one certified block at wave {wave} from leader {leader_slot}")
panic!("More than one certified leader at wave {wave} from in {leader_slot}: {certified_leader_blocks:?}");
}

// We commit the target leader if it has a certificate that is an ancestor of the anchor.
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/commit_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ impl CommitConsumer {
UnboundedReceiver<CommittedSubDag>,
UnboundedReceiver<BlockOutputBatch>,
) {
let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
let (block_sender, block_receiver) = unbounded_channel("consensus_certified");
let (commit_sender, commit_receiver) = unbounded_channel("consensus_commit_output");
let (block_sender, block_receiver) = unbounded_channel("consensus_block_output");

let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index));
(
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl CommitObserver {
Ok(())
}

// Certified blocks are not sent to fast path, for efficiency.
// Certified blocks are not sent to fast path on recovery, for efficiency.
fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
let now = Instant::now();
// TODO: remove this check, to allow consensus to regenerate commits?
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
}

debug!(
"Fetched certified blocks for commit range {:?}: {}",
"Fetched blocks for commit range {:?}: {}",
fetched_commit_range,
blocks.iter().map(|b| b.reference().to_string()).join(","),
);
Expand Down Expand Up @@ -598,7 +598,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
let forward_drift = Duration::from_millis(forward_drift);
if forward_drift >= inner.context.parameters.max_forward_time_drift {
warn!(
"Local clock is behind a quorum of peers: local ts {}, certified block ts {}",
"Local clock is behind a quorum of peers: local ts {}, committed block ts {}",
now_ms,
block.timestamp_ms()
);
Expand Down
5 changes: 3 additions & 2 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,9 @@ impl Core {
Ok(committed_subdags)
}

// Try processed certified blocks.
// Every transaction in a certified block is either accepted or rejected by a quorum.
// Try certifying blocks in the DAG. A certified block must meet these criteria within its voting round:
// - A quorum of authorities link to the block via ancestors.
// - Every transaction in the block is either accepted or rejected by a quorum.
fn try_certify(&mut self) -> ConsensusResult<()> {
if !self.context.protocol_config.mysticeti_fastpath() {
return Ok(());
Expand Down
78 changes: 45 additions & 33 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub(crate) struct DagState {
pending_commit_votes: VecDeque<CommitVote>,

// Certified blocks pending to be processed outside of consensus.
// A certified block must meet these criteria within its voting round:
// - A quorum of authorities link to the block via ancestors.
// - Every transaction in the block is either accepted or rejected by a quorum.
pending_certified_blocks: Vec<BlockOutput>,

// Data to be flushed to storage.
Expand Down Expand Up @@ -321,34 +324,41 @@ impl DagState {

// Updates votes for certification of transactions in the causal history of the block.
// TODO(fastpath): add randomized tests.
fn update_certification_votes(&mut self, block: &VerifiedBlock) -> Vec<BlockOutput> {
fn update_certification_votes(&mut self, voter_block: &VerifiedBlock) -> Vec<BlockOutput> {
let mut certified_blocks = vec![];

// When a block has an explicit vote, record the rejections. The rest of the transactions are implicitly accepted.
for block_votes in block.transaction_votes() {
// NOTE: it is very important to count the rejection votes before checking for certification of a block.
// Otherwise a transaction that should not be certified or should be rejected may become accepted along with its block.
for block_votes in voter_block.transaction_votes() {
let Some(block_info) = self.recent_blocks.get_mut(&block_votes.block_ref) else {
// TODO(fastpath): ensure the voted block exists in the DAG, with BlockManager.
// If the block is not found, it is outside of GC bound.
continue;
};
block_info
.accept_votes
.add_unique(block.author(), &self.context.committee);
for reject in &block_votes.rejects {
// TODO(fastpath): validate votes after ensuring existence of voted block in BlockManager.
block_info
.reject_votes
.entry(*reject)
.or_default()
.add_unique(block.author(), &self.context.committee);
}
if let Some(b) = block_info.take_certified_output(&self.context.committee) {
certified_blocks.push(b);
.add_unique(voter_block.author(), &self.context.committee);
}
}

// Transactions in unvoted ancestors of the block are implicitly accepted. This is the common case.
for ancestor in block.ancestors() {
let blocks = self.update_accept_votes_for_ancestor_authority(block, *ancestor);
// Implicitly accept transactions in the DAG of the block. This is the common case.
// And check for certification of the block after the voting.
//
// NOTE: if the voter or an ancestor authority equivocates, it is possible an explicitly voted
// block in transaction_votes is not in the DAG of any voter block ancestor refs. So blocks in
// transaction_votes and their authority ancestors get another chance to receive implicit
// accept votes.
for ancestor in voter_block
.ancestors()
.iter()
.chain(voter_block.transaction_votes().iter().map(|b| &b.block_ref))
{
let blocks = self.update_accept_votes_for_ancestor_authority(voter_block, *ancestor);
certified_blocks.extend(blocks);
}

Expand All @@ -358,31 +368,31 @@ impl DagState {
// This function updates the implicit accept votes for the target block and its ancestor blocks
// of the same authority.
//
// All blocks in the causal history of the voter / target block can in theory receive implicit
// accept votes. But traversing the causal history of voter / target block is very expensive.
// Instead, voter blocks's ancestors are traversed in update_certification_votes().
// And only ancestors from the same authority of target are traversed here. This will not miss
// voting on block, when neither voter or target authority is equivocating.
// All blocks in the causal history of the voter / target blocks can in theory receive implicit
// accept votes. But traversing the causal history of voter / target blocks are very expensive.
// Instead, only voter blocks's ancestors are traversed in update_certification_votes().
// And only ancestors from the same authority are traversed here. This will not skip implicitly
// voting accept on any block, when neither the voter or target authority is equivocating.
//
// If any one of the authority is equivocating, there can be blocks in the causal history of
// voter / target block that do not receive implicit accept votes. This is ok though. The worst
// downside is that these blocks will not get certified and sent to fast path. But they can still
// be finalized with the consensus commit, which counts votes differently. THere is no safety or
// liveness issue.
// If any one of the voter or target authority is equivocating, there can be blocks in the causal
// history of the voter / target blocks that do not receive implicit accept votes.
// This is ok though. All blocks are still finalized with the fastpath commit rule, which counts
// votes differently. There is no safety or liveness issue with not voting accept on some blocks.
// The only downside is that these blocks may not get certified and sent to the fastpath.
fn update_accept_votes_for_ancestor_authority(
&mut self,
voter_block: &VerifiedBlock,
mut target: BlockRef,
) -> Vec<BlockOutput> {
// Remove this const and its usage after mainnet config version reaches 68.
const DEFAULT_VOTING_ROUNDS: Round = 40;
let mut certified_blocks = vec![];
while target.round
>= voter_block.round().saturating_sub(
self.context
.protocol_config
.consensus_voting_rounds_as_option()
.unwrap_or(
40, /* remove default after mainnet config version reaches 68 */
),
.unwrap_or(DEFAULT_VOTING_ROUNDS),
)
{
let Some(target_block_info) = self.recent_blocks.get_mut(&target) else {
Expand All @@ -400,14 +410,13 @@ impl DagState {
certified_blocks.push(b);
}
// Try voting on the ancestor of the same authority.
if let Some(a) = target_block_info.block.ancestors().first() {
if a.author == target.author {
target = *a;
continue;
}
}
// Stop voting because genesis block is reached.
break;
let Some(ancestor) = target_block_info.block.ancestors().first() else {
// Stop voting when genesis block is reached.
assert_eq!(target_block_info.block.round(), GENESIS_ROUND);
break;
};
assert_eq!(ancestor.author, target.author);
target = *ancestor;
}
certified_blocks
}
Expand Down Expand Up @@ -1112,10 +1121,13 @@ impl BlockInfo {
}
let mut rejected = vec![];
for (idx, reject_votes) in &self.reject_votes {
// The transaction is certified to be rejected.
if reject_votes.reached_threshold(committee) {
rejected.push(*idx);
continue;
}
// If the transaction is not certified to be rejected or accepted, the block cannot
// be considered as certified.
if self
.accept_votes
.stake()
Expand Down

0 comments on commit b6e2da7

Please sign in to comment.