From f0833c8dd8fe6d49ec067cd70886d8728035f2e8 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Mon, 28 Oct 2024 16:59:34 -0700 Subject: [PATCH] . --- consensus/core/src/base_committer.rs | 6 +- consensus/core/src/commit_consumer.rs | 4 +- consensus/core/src/commit_observer.rs | 2 +- consensus/core/src/commit_syncer.rs | 4 +- consensus/core/src/core.rs | 5 +- consensus/core/src/dag_state.rs | 111 +++++++++++++++++--------- 6 files changed, 85 insertions(+), 47 deletions(-) diff --git a/consensus/core/src/base_committer.rs b/consensus/core/src/base_committer.rs index 7c390f9d47f2a7..7233db9b60f221 100644 --- a/consensus/core/src/base_committer.rs +++ b/consensus/core/src/base_committer.rs @@ -101,7 +101,9 @@ impl BaseCommitter { // There can be at most one leader with enough support for each round, otherwise it means // the BFT assumption is broken. if leaders_with_enough_support.len() > 1 { - panic!("[{self}] More than one certified block for {leader}") + panic!( + "[{self}] More than one candidate for {leader}: {leaders_with_enough_support:?}" + ); } leaders_with_enough_support @@ -316,7 +318,7 @@ impl BaseCommitter { // There can be at most one certified leader, otherwise it means the BFT assumption is broken. if certified_leader_blocks.len() > 1 { - panic!("More than one certified block at wave {wave} from leader {leader_slot}") + panic!("More than one certified leader at wave {wave} from in {leader_slot}: {certified_leader_blocks:?}"); } // We commit the target leader if it has a certificate that is an ancestor of the anchor. diff --git a/consensus/core/src/commit_consumer.rs b/consensus/core/src/commit_consumer.rs index c00cf74ec179fc..56ac0fc5dea0c0 100644 --- a/consensus/core/src/commit_consumer.rs +++ b/consensus/core/src/commit_consumer.rs @@ -32,8 +32,8 @@ impl CommitConsumer { UnboundedReceiver, UnboundedReceiver, ) { - let (commit_sender, commit_receiver) = unbounded_channel("consensus_output"); - let (block_sender, block_receiver) = unbounded_channel("consensus_certified"); + let (commit_sender, commit_receiver) = unbounded_channel("consensus_commit_output"); + let (block_sender, block_receiver) = unbounded_channel("consensus_block_output"); let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index)); ( diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index f2e40da49d70b1..e9ed34fa0ee88b 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -119,7 +119,7 @@ impl CommitObserver { Ok(()) } - // Certified blocks are not sent to fast path, for efficiency. + // Certified blocks are not sent to fast path on recovery, for efficiency. fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) { let now = Instant::now(); // TODO: remove this check, to allow consensus to regenerate commits? diff --git a/consensus/core/src/commit_syncer.rs b/consensus/core/src/commit_syncer.rs index 44a38d595d1484..88ae75f99b00d0 100644 --- a/consensus/core/src/commit_syncer.rs +++ b/consensus/core/src/commit_syncer.rs @@ -291,7 +291,7 @@ impl CommitSyncer { } debug!( - "Fetched certified blocks for commit range {:?}: {}", + "Fetched blocks for commit range {:?}: {}", fetched_commit_range, blocks.iter().map(|b| b.reference().to_string()).join(","), ); @@ -598,7 +598,7 @@ impl CommitSyncer { let forward_drift = Duration::from_millis(forward_drift); if forward_drift >= inner.context.parameters.max_forward_time_drift { warn!( - "Local clock is behind a quorum of peers: local ts {}, certified block ts {}", + "Local clock is behind a quorum of peers: local ts {}, committed block ts {}", now_ms, block.timestamp_ms() ); diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index c9076e811a9968..80d618d77cbde2 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -622,8 +622,9 @@ impl Core { Ok(committed_subdags) } - // Try processed certified blocks. - // Every transaction in a certified block is either accepted or rejected by a quorum. + // Try certifying blocks in the DAG. A certified block must meet these criteria within its voting round: + // - A quorum of authorities link to the block via ancestors. + // - Every transaction in the block is either accepted or rejected by a quorum. fn try_certify(&mut self) -> ConsensusResult<()> { if !self.context.protocol_config.mysticeti_fastpath() { return Ok(()); diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 3907d4de2fa6b2..aa3f3def04e881 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -319,36 +319,47 @@ impl DagState { .set(highest_accepted_round_for_author as i64); } - // Updates votes for certification of transactions in the causal history of the block. + // Updates votes for certification of transactions and blocks in the causal history of the voter block. + // Returns newly certified blocks. A certified block must meet these criteria within its voting round: + // - A quorum of authorities link to the block via ancestors. + // - Every transaction in the block is either accepted or rejected by a quorum. // TODO(fastpath): add randomized tests. - fn update_certification_votes(&mut self, block: &VerifiedBlock) -> Vec { + fn update_certification_votes(&mut self, voter_block: &VerifiedBlock) -> Vec { let mut certified_blocks = vec![]; // When a block has an explicit vote, record the rejections. The rest of the transactions are implicitly accepted. - for block_votes in block.transaction_votes() { + // NOTE: it is very important to count the rejection votes before checking for certification of a block. + // Otherwise a transaction that should not be certified or should be rejected may become accepted along with its block. + // TODO(fastpath): validate transaction votes in block verifier. + for block_votes in voter_block.transaction_votes() { let Some(block_info) = self.recent_blocks.get_mut(&block_votes.block_ref) else { // TODO(fastpath): ensure the voted block exists in the DAG, with BlockManager. // If the block is not found, it is outside of GC bound. continue; }; - block_info - .accept_votes - .add_unique(block.author(), &self.context.committee); for reject in &block_votes.rejects { + // TODO(fastpath): validate votes after ensuring existence of voted block in BlockManager. block_info .reject_votes .entry(*reject) .or_default() - .add_unique(block.author(), &self.context.committee); - } - if let Some(b) = block_info.take_certified_output(&self.context.committee) { - certified_blocks.push(b); + .add_unique(voter_block.author(), &self.context.committee); } } - // Transactions in unvoted ancestors of the block are implicitly accepted. This is the common case. - for ancestor in block.ancestors() { - let blocks = self.update_accept_votes_for_ancestor_authority(block, *ancestor); + // Implicitly accept transactions in the DAG of the block. This is the common case. + // And check for certification of the block after the voting. + // + // NOTE: if the voter or an ancestor authority equivocates, it is possible an explicitly voted + // block in transaction_votes is not in the DAG of any voter block ancestor refs. So blocks in + // transaction_votes and their authority ancestors get another chance to receive implicit + // accept votes. + for ancestor in voter_block + .ancestors() + .iter() + .chain(voter_block.transaction_votes().iter().map(|b| &b.block_ref)) + { + let blocks = self.update_accept_votes_for_ancestor_authority(voter_block, *ancestor); certified_blocks.extend(blocks); } @@ -358,32 +369,33 @@ impl DagState { // This function updates the implicit accept votes for the target block and its ancestor blocks // of the same authority. // - // All blocks in the causal history of the voter / target block can in theory receive implicit - // accept votes. But traversing the causal history of voter / target block is very expensive. - // Instead, voter blocks's ancestors are traversed in update_certification_votes(). - // And only ancestors from the same authority of target are traversed here. This will not miss - // voting on block, when neither voter or target authority is equivocating. + // All blocks in the causal history of the voter / target blocks can in theory receive implicit + // accept votes. But traversing the causal history of voter / target blocks are very expensive. + // Instead, only voter blocks's ancestors are traversed in update_certification_votes(). + // And only ancestors from the same authority are traversed here. This will not skip implicitly + // voting accept on any block, when neither the voter or target authority is equivocating. // - // If any one of the authority is equivocating, there can be blocks in the causal history of - // voter / target block that do not receive implicit accept votes. This is ok though. The worst - // downside is that these blocks will not get certified and sent to fast path. But they can still - // be finalized with the consensus commit, which counts votes differently. THere is no safety or - // liveness issue. + // If any one of the voter or target authority is equivocating, there can be blocks in the causal + // history of the voter / target blocks that do not receive implicit accept votes. + // This is ok though. All blocks are still finalized with the fastpath commit rule, which counts + // votes differently. There is no safety or liveness issue with not voting accept on some blocks. + // The only downside is that these blocks may not get certified and sent to the fastpath. fn update_accept_votes_for_ancestor_authority( &mut self, voter_block: &VerifiedBlock, mut target: BlockRef, ) -> Vec { + // Remove this const and its usage after mainnet config version reaches 68. + const DEFAULT_VOTING_ROUNDS: Round = 40; let mut certified_blocks = vec![]; - while target.round - >= voter_block.round().saturating_sub( - self.context - .protocol_config - .consensus_voting_rounds_as_option() - .unwrap_or( - 40, /* remove default after mainnet config version reaches 68 */ - ), - ) + while target.round > GENESIS_ROUND + && target.round + >= voter_block.round().saturating_sub( + self.context + .protocol_config + .consensus_voting_rounds_as_option() + .unwrap_or(DEFAULT_VOTING_ROUNDS), + ) { let Some(target_block_info) = self.recent_blocks.get_mut(&target) else { // The target block has been GC'ed. @@ -400,14 +412,34 @@ impl DagState { certified_blocks.push(b); } // Try voting on the ancestor of the same authority. - if let Some(a) = target_block_info.block.ancestors().first() { - if a.author == target.author { - target = *a; - continue; + // This should stop at the first ancestor, on blocks verified by BlockVerifier. + // TODO: fix ancestors order in tests. + let Some(ancestor) = target_block_info.block.ancestors().first() else { + cfg_if::cfg_if! { + if #[cfg(test)] { + // This is expected in tests where blocks are created without + // proper ancestor order. + break; + } else { + panic!("Block has no ancestor: {:?}", target_block_info.block,); + } + } + }; + if target.author != ancestor.author { + cfg_if::cfg_if! { + if #[cfg(test)] { + // This is expected in tests where blocks are created without + // proper ancestor order. + break; + } else { + panic!( + "1st ancestor is not from the same authority: {:?}", + target_block_info.block, + ); + } } } - // Stop voting because genesis block is reached. - break; + target = *ancestor; } certified_blocks } @@ -1112,10 +1144,13 @@ impl BlockInfo { } let mut rejected = vec![]; for (idx, reject_votes) in &self.reject_votes { + // The transaction is certified to be rejected. if reject_votes.reached_threshold(committee) { rejected.push(*idx); continue; } + // If the transaction is not certified to be rejected or accepted, the block cannot + // be considered as certified. if self .accept_votes .stake()