From 5c97430f1b52acac06f5c96dbac9352c526a5042 Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Wed, 6 Nov 2024 10:26:23 -0800 Subject: [PATCH] [Consensus] Core to read last proposed round from DagState (#20173) ## Description Refactoring Core to read the last proposed block from DagState rather than local cached value. This change: * will allow for nodes that recover from amnesia (potentially) link their new proposed block the actual last proposed block instead of linking back to genesis. * because of the above point, we are also avoiding edge case error when an amnesia recovered node is attempting to propose for a round with exact quorum, although their last proposed block still pointing to genesis, practically making us (thankfully) hit the assert [here](https://github.com/MystenLabs/sui/blob/6a571f189b49f99ed9294cd01c82d56e03ef8bfe/consensus/core/src/core.rs#L759) * our last proposed block is cached in DagState anyways so no practical difference ## Test plan CI --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- consensus/core/src/core.rs | 54 ++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index a1ff274e6722c..ee2e914e4659a 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -71,8 +71,6 @@ pub(crate) struct Core { /// Used to make commit decisions for leader blocks in the dag. committer: UniversalCommitter, - /// The last produced block - last_proposed_block: VerifiedBlock, /// The blocks of the last included ancestors per authority. This vector is basically used as a /// watermark in order to include in the next block proposal only ancestors of higher rounds. /// By default, is initialised with `None` values. @@ -155,7 +153,6 @@ impl Core { Self { context: context.clone(), threshold_clock: ThresholdClock::new(0, context.clone()), - last_proposed_block, last_included_ancestors, last_decided_leader, leader_schedule, @@ -206,20 +203,28 @@ impl Core { self.add_accepted_blocks(last_quorum); // Try to commit and propose, since they may not have run after the last storage write. self.try_commit().unwrap(); - if self.try_propose(true).unwrap().is_none() { + + let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap() + { + last_proposed_block + } else { + let last_proposed_block = self + .dag_state + .read() + .get_last_block_for_authority(self.context.own_index); + if self.should_propose() { - assert!(self.last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery"); + assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery"); } // if no new block proposed then just re-broadcast the last proposed one to ensure liveness. - self.signals - .new_block(self.last_proposed_block.clone()) - .unwrap(); - } + self.signals.new_block(last_proposed_block.clone()).unwrap(); + last_proposed_block + }; info!( "Core recovery completed with last proposed block {:?}", - self.last_proposed_block + last_proposed_block ); self @@ -490,9 +495,6 @@ impl Core { .as_secs_f64(), ); - // Update internal state. - self.last_proposed_block = verified_block.clone(); - // Now acknowledge the transactions for their inclusion to block ack_transactions(verified_block.reference()); @@ -723,7 +725,9 @@ impl Core { // Propose only ancestors of higher rounds than what has already been proposed. // And always include own last proposed block first among ancestors. - let ancestors = iter::once(self.last_proposed_block.clone()) + let last_proposed_block = ancestors[self.context.own_index].clone(); + assert_eq!(last_proposed_block.author(), self.context.own_index); + let ancestors = iter::once(last_proposed_block) .chain( ancestors .into_iter() @@ -793,16 +797,17 @@ impl Core { } fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs { - self.last_proposed_block.timestamp_ms() + self.last_proposed_block().timestamp_ms() } fn last_proposed_round(&self) -> Round { - self.last_proposed_block.round() + self.last_proposed_block().round() } - #[cfg(test)] - fn last_proposed_block(&self) -> &VerifiedBlock { - &self.last_proposed_block + fn last_proposed_block(&self) -> VerifiedBlock { + self.dag_state + .read() + .get_last_block_for_authority(self.context.own_index) } } @@ -1496,12 +1501,9 @@ mod test { assert_eq!(block.round(), 11); assert_eq!(block.ancestors().len(), 4); - // Our last ancestored included should be genesis. We do not update the last proposed block via the - // normal block processing path to keep it simple. - let our_ancestor_included = block.ancestors().iter().find(|block_ref: &&BlockRef| { - block_ref.author == context.own_index && block_ref.round == GENESIS_ROUND - }); - assert!(our_ancestor_included.is_some()); + let our_ancestor_included = block.ancestors()[0]; + assert_eq!(our_ancestor_included.author, context.own_index); + assert_eq!(our_ancestor_included.round, 10); } #[tokio::test(flavor = "current_thread", start_paused = true)] @@ -1567,7 +1569,7 @@ mod test { assert_eq!(core_fixture.core.last_proposed_round(), round); - this_round_blocks.push(core_fixture.core.last_proposed_block.clone()); + this_round_blocks.push(core_fixture.core.last_proposed_block()); } last_round_blocks = this_round_blocks;