Skip to content

Commit

Permalink
[Consensus] Core to read last proposed round from DagState (#20173)
Browse files Browse the repository at this point in the history
## Description 

Refactoring Core to read the last proposed block from DagState rather
than local cached value. This change:
* will allow for nodes that recover from amnesia (potentially) link
their new proposed block the actual last proposed block instead of
linking back to genesis.
* because of the above point, we are also avoiding edge case error when
an amnesia recovered node is attempting to propose for a round with
exact quorum, although their last proposed block still pointing to
genesis, practically making us (thankfully) hit the assert
[here](https://github.com/MystenLabs/sui/blob/6a571f189b49f99ed9294cd01c82d56e03ef8bfe/consensus/core/src/core.rs#L759)
* our last proposed block is cached in DagState anyways so no practical
difference

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
akichidis authored Nov 6, 2024
1 parent b3290a2 commit 5c97430
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ pub(crate) struct Core {

/// Used to make commit decisions for leader blocks in the dag.
committer: UniversalCommitter,
/// The last produced block
last_proposed_block: VerifiedBlock,
/// The blocks of the last included ancestors per authority. This vector is basically used as a
/// watermark in order to include in the next block proposal only ancestors of higher rounds.
/// By default, is initialised with `None` values.
Expand Down Expand Up @@ -155,7 +153,6 @@ impl Core {
Self {
context: context.clone(),
threshold_clock: ThresholdClock::new(0, context.clone()),
last_proposed_block,
last_included_ancestors,
last_decided_leader,
leader_schedule,
Expand Down Expand Up @@ -206,20 +203,28 @@ impl Core {
self.add_accepted_blocks(last_quorum);
// Try to commit and propose, since they may not have run after the last storage write.
self.try_commit().unwrap();
if self.try_propose(true).unwrap().is_none() {

let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
{
last_proposed_block
} else {
let last_proposed_block = self
.dag_state
.read()
.get_last_block_for_authority(self.context.own_index);

if self.should_propose() {
assert!(self.last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery");
assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery");
}

// if no new block proposed then just re-broadcast the last proposed one to ensure liveness.
self.signals
.new_block(self.last_proposed_block.clone())
.unwrap();
}
self.signals.new_block(last_proposed_block.clone()).unwrap();
last_proposed_block
};

info!(
"Core recovery completed with last proposed block {:?}",
self.last_proposed_block
last_proposed_block
);

self
Expand Down Expand Up @@ -490,9 +495,6 @@ impl Core {
.as_secs_f64(),
);

// Update internal state.
self.last_proposed_block = verified_block.clone();

// Now acknowledge the transactions for their inclusion to block
ack_transactions(verified_block.reference());

Expand Down Expand Up @@ -723,7 +725,9 @@ impl Core {

// Propose only ancestors of higher rounds than what has already been proposed.
// And always include own last proposed block first among ancestors.
let ancestors = iter::once(self.last_proposed_block.clone())
let last_proposed_block = ancestors[self.context.own_index].clone();
assert_eq!(last_proposed_block.author(), self.context.own_index);
let ancestors = iter::once(last_proposed_block)
.chain(
ancestors
.into_iter()
Expand Down Expand Up @@ -793,16 +797,17 @@ impl Core {
}

fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
self.last_proposed_block.timestamp_ms()
self.last_proposed_block().timestamp_ms()
}

fn last_proposed_round(&self) -> Round {
self.last_proposed_block.round()
self.last_proposed_block().round()
}

#[cfg(test)]
fn last_proposed_block(&self) -> &VerifiedBlock {
&self.last_proposed_block
fn last_proposed_block(&self) -> VerifiedBlock {
self.dag_state
.read()
.get_last_block_for_authority(self.context.own_index)
}
}

Expand Down Expand Up @@ -1496,12 +1501,9 @@ mod test {
assert_eq!(block.round(), 11);
assert_eq!(block.ancestors().len(), 4);

// Our last ancestored included should be genesis. We do not update the last proposed block via the
// normal block processing path to keep it simple.
let our_ancestor_included = block.ancestors().iter().find(|block_ref: &&BlockRef| {
block_ref.author == context.own_index && block_ref.round == GENESIS_ROUND
});
assert!(our_ancestor_included.is_some());
let our_ancestor_included = block.ancestors()[0];
assert_eq!(our_ancestor_included.author, context.own_index);
assert_eq!(our_ancestor_included.round, 10);
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
Expand Down Expand Up @@ -1567,7 +1569,7 @@ mod test {

assert_eq!(core_fixture.core.last_proposed_round(), round);

this_round_blocks.push(core_fixture.core.last_proposed_block.clone());
this_round_blocks.push(core_fixture.core.last_proposed_block());
}

last_round_blocks = this_round_blocks;
Expand Down

0 comments on commit 5c97430

Please sign in to comment.