diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index fb0e7890cd1b38..d128289cb7bb93 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1992,6 +1992,14 @@ impl ReplayStage { } } + /// Checks if it is time for us to start producing a leader block. + /// Fails if: + /// - Current PoH has not satisfied criteria to start my leader block + /// - Startup verification is not complete, + /// - Bank forks already contains a bank for this leader slot + /// - We have not landed a vote yet and the `wait_for_vote_to_start_leader` flag is set + /// - We have failed the propagated check + /// Returns whether a new working bank was created and inserted into bank forks. #[allow(clippy::too_many_arguments)] fn maybe_start_leader( my_pubkey: &Pubkey, @@ -2005,7 +2013,7 @@ impl ReplayStage { banking_tracer: &Arc, has_new_vote_been_rooted: bool, track_transaction_indexes: bool, - ) { + ) -> bool { // all the individual calls to poh_recorder.read() are designed to // increase granularity, decrease contention @@ -2019,7 +2027,7 @@ impl ReplayStage { } => (poh_slot, parent_slot), PohLeaderStatus::NotReached => { trace!("{} poh_recorder hasn't reached_leader_slot", my_pubkey); - return; + return false; } }; @@ -2035,12 +2043,12 @@ impl ReplayStage { if !parent.is_startup_verification_complete() { info!("startup verification incomplete, so skipping my leader slot"); - return; + return false; } if bank_forks.read().unwrap().get(poh_slot).is_some() { warn!("{} already have bank in forks at {}?", my_pubkey, poh_slot); - return; + return false; } trace!( "{} poh_slot {} parent_slot {}", @@ -2052,7 +2060,7 @@ impl ReplayStage { if let Some(next_leader) = leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) { if !has_new_vote_been_rooted { info!("Haven't landed a vote, so skipping my leader slot"); - return; + return false; } trace!( @@ -2064,7 +2072,7 @@ impl ReplayStage { // I guess I missed my slot if next_leader != *my_pubkey { - return; + return false; } datapoint_info!( @@ -2098,7 +2106,7 @@ impl ReplayStage { latest_unconfirmed_leader_slot, ); } - return; + return false; } let root_slot = bank_forks.read().unwrap().root(); @@ -2133,8 +2141,10 @@ impl ReplayStage { .write() .unwrap() .set_bank(tpu_bank, track_transaction_indexes); + true } else { error!("{} No next leader found", my_pubkey); + false } } @@ -9097,4 +9107,124 @@ pub(crate) mod tests { .is_candidate(&(5, bank_forks.bank_hash(5).unwrap())) .unwrap()); } + + #[test] + fn test_skip_leader_slot_for_existing_slot() { + solana_logger::setup(); + + let ReplayBlockstoreComponents { + blockstore, + my_pubkey, + leader_schedule_cache, + poh_recorder, + vote_simulator, + rpc_subscriptions, + .. + } = replay_blockstore_components(None, 1, None); + let VoteSimulator { + bank_forks, + mut progress, + .. + } = vote_simulator; + + let working_bank = bank_forks.read().unwrap().working_bank(); + assert!(working_bank.is_complete()); + assert!(working_bank.is_frozen()); + // Mark startup verification as complete to avoid skipping leader slots + working_bank.set_startup_verification_complete(); + + // Insert a block two slots greater than current bank. This slot does + // not have a corresponding Bank in BankForks; this emulates a scenario + // where the block had previously been created and added to BankForks, + // but then got removed. This could be the case if the Bank was not on + // the major fork. + let dummy_slot = working_bank.slot() + 2; + let initial_slot = working_bank.slot(); + let num_entries = 10; + let merkle_variant = true; + let (shreds, _) = make_slot_entries(dummy_slot, initial_slot, num_entries, merkle_variant); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + // Reset PoH recorder to the completed bank to ensure consistent state + ReplayStage::reset_poh_recorder( + &my_pubkey, + &blockstore, + working_bank.clone(), + &poh_recorder, + &leader_schedule_cache, + ); + + // Register just over one slot worth of ticks directly with PoH recorder + let num_poh_ticks = + (working_bank.ticks_per_slot() * working_bank.hashes_per_tick().unwrap()) + 1; + poh_recorder + .write() + .map(|mut poh_recorder| { + for _ in 0..num_poh_ticks + 1 { + poh_recorder.tick(); + } + }) + .unwrap(); + + let poh_recorder = Arc::new(poh_recorder); + let (retransmit_slots_sender, _) = unbounded(); + let (banking_tracer, _) = BankingTracer::new(None).unwrap(); + // A vote has not technically been rooted, but it doesn't matter for + // this test to use true to avoid skipping the leader slot + let has_new_vote_been_rooted = true; + let track_transaction_indexes = false; + + // We should not attempt to start leader for the dummy_slot + assert_matches!( + poh_recorder.read().unwrap().reached_leader_slot(&my_pubkey), + PohLeaderStatus::NotReached + ); + assert!(!ReplayStage::maybe_start_leader( + &my_pubkey, + &bank_forks, + &poh_recorder, + &leader_schedule_cache, + &rpc_subscriptions, + &mut progress, + &retransmit_slots_sender, + &mut SkippedSlotsInfo::default(), + &banking_tracer, + has_new_vote_been_rooted, + track_transaction_indexes, + )); + + // Register another slots worth of ticks with PoH recorder + poh_recorder + .write() + .map(|mut poh_recorder| { + for _ in 0..num_poh_ticks + 1 { + poh_recorder.tick(); + } + }) + .unwrap(); + + // We should now start leader for dummy_slot + 1 + let good_slot = dummy_slot + 1; + assert!(ReplayStage::maybe_start_leader( + &my_pubkey, + &bank_forks, + &poh_recorder, + &leader_schedule_cache, + &rpc_subscriptions, + &mut progress, + &retransmit_slots_sender, + &mut SkippedSlotsInfo::default(), + &banking_tracer, + has_new_vote_been_rooted, + track_transaction_indexes, + )); + // Get the new working bank, which is also the new leader bank/slot + let working_bank = bank_forks.read().unwrap().working_bank(); + // The new bank's slot must NOT be dummy_slot as the blockstore already + // had a shred inserted for dummy_slot prior to maybe_start_leader(). + // maybe_start_leader() must not pick dummy_slot to avoid creating a + // duplicate block. + assert_eq!(working_bank.slot(), good_slot); + assert_eq!(working_bank.parent_slot(), initial_slot); + } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 21d5418b7c6038..8dc7d71df7574d 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -4049,6 +4049,13 @@ impl Blockstore { Ok(duplicate_slots_iterator.map(|(slot, _)| slot)) } + pub fn has_existing_shreds_for_slot(&self, slot: Slot) -> bool { + match self.meta(slot).unwrap() { + Some(meta) => meta.received > 0, + None => false, + } + } + /// Returns the max root or 0 if it does not exist pub fn max_root(&self) -> Slot { self.max_root.load(Ordering::Relaxed) diff --git a/ledger/src/leader_schedule_cache.rs b/ledger/src/leader_schedule_cache.rs index b65f7593c5e6a3..9354b0c13c373f 100644 --- a/ledger/src/leader_schedule_cache.rs +++ b/ledger/src/leader_schedule_cache.rs @@ -139,14 +139,10 @@ impl LeaderScheduleCache { .map(move |i| i as Slot + first_slot) }) .skip_while(|slot| { - match blockstore { - None => false, - // Skip slots we have already sent a shred for. - Some(blockstore) => match blockstore.meta(*slot).unwrap() { - Some(meta) => meta.received > 0, - None => false, - }, - } + // Skip slots we already have shreds for + blockstore + .map(|bs| bs.has_existing_shreds_for_slot(*slot)) + .unwrap_or(false) }); let first_slot = schedule.next()?; let max_slot = first_slot.saturating_add(max_slot_range); diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index f9a22a9c27afb1..fab2d9f62559e6 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -576,6 +576,15 @@ impl PohRecorder { return PohLeaderStatus::NotReached; } + if self.blockstore.has_existing_shreds_for_slot(next_poh_slot) { + // We already have existing shreds for this slot. This can happen when this block was previously + // created and added to BankForks, however a recent PoH reset caused this bank to be removed + // as it was not part of the rooted fork. If this slot is not the first slot for this leader, + // and the first slot was previously ticked over, the check in `leader_schedule_cache::next_leader_slot` + // will not suffice, as it only checks if there are shreds for the first slot. + return PohLeaderStatus::NotReached; + } + assert!(next_tick_height >= self.start_tick_height); let poh_slot = next_poh_slot; let parent_slot = self.start_slot();