Skip to content

Commit

Permalink
replay: do not start leader for a block we already have shreds for
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Aug 2, 2024
1 parent a60fbc2 commit c7fcd27
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 7 deletions.
143 changes: 136 additions & 7 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,13 @@ impl ReplayStage {
}
}

/// Checks if it is time for us to start producing a leader block.
/// Additionally fails if:
/// - Startup verification is not complete,
/// - Bank forks already contains a bank for this leader slot
/// - We have not landed a vote yet and the `wait_for_vote_to_start_leader` flag is set
/// - We have failed the propagated check
/// Returns whether a new working bank was created and inserted into bank forks.
#[allow(clippy::too_many_arguments)]
fn maybe_start_leader(
my_pubkey: &Pubkey,
Expand All @@ -2005,7 +2012,7 @@ impl ReplayStage {
banking_tracer: &Arc<BankingTracer>,
has_new_vote_been_rooted: bool,
track_transaction_indexes: bool,
) {
) -> bool {
// all the individual calls to poh_recorder.read() are designed to
// increase granularity, decrease contention

Expand All @@ -2019,7 +2026,7 @@ impl ReplayStage {
} => (poh_slot, parent_slot),
PohLeaderStatus::NotReached => {
trace!("{} poh_recorder hasn't reached_leader_slot", my_pubkey);
return;
return false;
}
};

Expand All @@ -2035,12 +2042,12 @@ impl ReplayStage {

if !parent.is_startup_verification_complete() {
info!("startup verification incomplete, so skipping my leader slot");
return;
return false;
}

if bank_forks.read().unwrap().get(poh_slot).is_some() {
warn!("{} already have bank in forks at {}?", my_pubkey, poh_slot);
return;
return false;
}
trace!(
"{} poh_slot {} parent_slot {}",
Expand All @@ -2052,7 +2059,7 @@ impl ReplayStage {
if let Some(next_leader) = leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) {
if !has_new_vote_been_rooted {
info!("Haven't landed a vote, so skipping my leader slot");
return;
return false;
}

trace!(
Expand All @@ -2064,7 +2071,7 @@ impl ReplayStage {

// I guess I missed my slot
if next_leader != *my_pubkey {
return;
return false;
}

datapoint_info!(
Expand Down Expand Up @@ -2098,7 +2105,7 @@ impl ReplayStage {
latest_unconfirmed_leader_slot,
);
}
return;
return false;
}

let root_slot = bank_forks.read().unwrap().root();
Expand Down Expand Up @@ -2133,8 +2140,10 @@ impl ReplayStage {
.write()
.unwrap()
.set_bank(tpu_bank, track_transaction_indexes);
true
} else {
error!("{} No next leader found", my_pubkey);
false
}
}

Expand Down Expand Up @@ -9097,4 +9106,124 @@ pub(crate) mod tests {
.is_candidate(&(5, bank_forks.bank_hash(5).unwrap()))
.unwrap());
}

#[test]
fn test_skip_leader_slot_for_existing_slot() {
solana_logger::setup();

let ReplayBlockstoreComponents {
blockstore,
my_pubkey,
leader_schedule_cache,
poh_recorder,
vote_simulator,
rpc_subscriptions,
..
} = replay_blockstore_components(None, 1, None);
let VoteSimulator {
bank_forks,
mut progress,
..
} = vote_simulator;

let working_bank = bank_forks.read().unwrap().working_bank();
assert!(working_bank.is_complete());
assert!(working_bank.is_frozen());
// Mark startup verification as complete to avoid skipping leader slots
working_bank.set_startup_verification_complete();

// Insert a block two slots greater than current bank. This slot does
// not have a corresponding Bank in BankForks; this emulates a scenario
// where the block had previously been created and added to BankForks,
// but then got removed. This could be the case if the Bank was not on
// the major fork.
let dummy_slot = working_bank.slot() + 2;
let initial_slot = working_bank.slot();
let num_entries = 10;
let merkle_variant = true;
let (shreds, _) = make_slot_entries(dummy_slot, initial_slot, num_entries, merkle_variant);
blockstore.insert_shreds(shreds, None, false).unwrap();

// Reset PoH recorder to the completed bank to ensure consistent state
ReplayStage::reset_poh_recorder(
&my_pubkey,
&blockstore,
working_bank.clone(),
&poh_recorder,
&leader_schedule_cache,
);

// Register just over one slot worth of ticks directly with PoH recorder
let num_poh_ticks =
(working_bank.ticks_per_slot() * working_bank.hashes_per_tick().unwrap()) + 1;
poh_recorder
.write()
.map(|mut poh_recorder| {
for _ in 0..num_poh_ticks + 1 {
poh_recorder.tick();
}
})
.unwrap();

let poh_recorder = Arc::new(poh_recorder);
let (retransmit_slots_sender, _) = unbounded();
let (banking_tracer, _) = BankingTracer::new(None).unwrap();
// A vote has not technically been rooted, but it doesn't matter for
// this test to use true to avoid skipping the leader slot
let has_new_vote_been_rooted = true;
let track_transaction_indexes = false;

// We should not attempt to start leader for the dummy_slot
assert_matches!(
poh_recorder.read().unwrap().reached_leader_slot(&my_pubkey),
PohLeaderStatus::NotReached
);
assert!(!ReplayStage::maybe_start_leader(
&my_pubkey,
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&retransmit_slots_sender,
&mut SkippedSlotsInfo::default(),
&banking_tracer,
has_new_vote_been_rooted,
track_transaction_indexes,
));

// Register another slots worth of ticks with PoH recorder
poh_recorder
.write()
.map(|mut poh_recorder| {
for _ in 0..num_poh_ticks + 1 {
poh_recorder.tick();
}
})
.unwrap();

// We should now start leader for dummy_slot + 1
let good_slot = dummy_slot + 1;
assert!(ReplayStage::maybe_start_leader(
&my_pubkey,
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&retransmit_slots_sender,
&mut SkippedSlotsInfo::default(),
&banking_tracer,
has_new_vote_been_rooted,
track_transaction_indexes,
));
// Get the new working bank, which is also the new leader bank/slot
let working_bank = bank_forks.read().unwrap().working_bank();
// The new bank's slot must NOT be dummy_slot as the blockstore already
// had a shred inserted for dummy_slot prior to maybe_start_leader().
// maybe_start_leader() must not pick dummy_slot to avoid creating a
// duplicate block.
assert_eq!(working_bank.slot(), good_slot);
assert_eq!(working_bank.parent_slot(), initial_slot);
}
}
16 changes: 16 additions & 0 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,15 @@ impl PohRecorder {
return PohLeaderStatus::NotReached;
};

if self.has_existing_shreds_for_slot(next_poh_slot) {
// We already have existing shreds for this slot. This can happen when this block was previously
// created and added to BankForks, however a recent PoH reset caused this bank to be removed
// as it was not part of the rooted fork. If this slot is not the first slot for this leader,
// and the first slot was previously ticked over, the check in `leader_schedule_cache::next_leader_slot`
// will not suffice, as it only checks if there are shreds for the first slot.
return PohLeaderStatus::NotReached;
}

if !self.reached_leader_tick(my_pubkey, leader_first_tick_height_including_grace_ticks) {
// PoH hasn't ticked far enough yet.
return PohLeaderStatus::NotReached;
Expand All @@ -585,6 +594,13 @@ impl PohRecorder {
}
}

fn has_existing_shreds_for_slot(&self, slot: Slot) -> bool {
match self.blockstore.meta(slot).unwrap() {
Some(meta) => meta.received > 0,
None => false,
}
}

// returns (leader_first_tick_height_including_grace_ticks, leader_last_tick_height, grace_ticks) given the next
// slot this recorder will lead
fn compute_leader_slot_tick_heights(
Expand Down

0 comments on commit c7fcd27

Please sign in to comment.