Skip to content

Commit

Permalink
Disable replay while in wen_restart. (#2007)
Browse files Browse the repository at this point in the history
  • Loading branch information
wen-coding authored Aug 2, 2024
1 parent 752a061 commit cb782d0
Showing 1 changed file with 66 additions and 37 deletions.
103 changes: 66 additions & 37 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct Tvu {
retransmit_stage: RetransmitStage,
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,
replay_stage: ReplayStage,
replay_stage: Option<ReplayStage>,
blockstore_cleanup_service: Option<BlockstoreCleanupService>,
cost_update_service: CostUpdateService,
voting_service: VotingService,
Expand Down Expand Up @@ -160,6 +160,8 @@ impl Tvu {
cluster_slots: Arc<ClusterSlots>,
wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
) -> Result<Self, String> {
let in_wen_restart = wen_restart_repair_slots.is_some();

let TvuSockets {
repair: repair_socket,
fetch: fetch_sockets,
Expand Down Expand Up @@ -312,33 +314,37 @@ impl Tvu {

let drop_bank_service = DropBankService::new(drop_bank_receiver);

let replay_stage = ReplayStage::new(
replay_stage_config,
blockstore.clone(),
bank_forks.clone(),
cluster_info.clone(),
ledger_signal_receiver,
duplicate_slots_receiver,
poh_recorder.clone(),
tower,
vote_tracker,
cluster_slots,
retransmit_slots_sender,
ancestor_duplicate_slots_receiver,
replay_vote_sender,
duplicate_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver,
cluster_slots_update_sender,
cost_update_sender,
voting_sender,
drop_bank_sender,
block_metadata_notifier,
log_messages_bytes_limit,
prioritization_fee_cache.clone(),
dumped_slots_sender,
banking_tracer,
popular_pruned_forks_receiver,
)?;
let replay_stage = if in_wen_restart {
None
} else {
Some(ReplayStage::new(
replay_stage_config,
blockstore.clone(),
bank_forks.clone(),
cluster_info.clone(),
ledger_signal_receiver,
duplicate_slots_receiver,
poh_recorder.clone(),
tower,
vote_tracker,
cluster_slots,
retransmit_slots_sender,
ancestor_duplicate_slots_receiver,
replay_vote_sender,
duplicate_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver,
cluster_slots_update_sender,
cost_update_sender,
voting_sender,
drop_bank_sender,
block_metadata_notifier,
log_messages_bytes_limit,
prioritization_fee_cache.clone(),
dumped_slots_sender,
banking_tracer,
popular_pruned_forks_receiver,
)?)
};

let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
BlockstoreCleanupService::new(blockstore.clone(), max_ledger_shreds, exit.clone())
Expand Down Expand Up @@ -381,7 +387,9 @@ impl Tvu {
if self.blockstore_cleanup_service.is_some() {
self.blockstore_cleanup_service.unwrap().join()?;
}
self.replay_stage.join()?;
if self.replay_stage.is_some() {
self.replay_stage.unwrap().join()?;
}
self.cost_update_service.join()?;
self.voting_service.join()?;
if let Some(warmup_service) = self.warm_quic_cache_service {
Expand Down Expand Up @@ -414,10 +422,7 @@ pub mod tests {
std::sync::atomic::{AtomicU64, Ordering},
};

#[ignore]
#[test]
#[serial]
fn test_tvu_exit() {
fn test_tvu_exit(enable_wen_restart: bool) {
solana_logger::setup();
let leader = Node::new_localhost();
let target1_keypair = Keypair::new();
Expand All @@ -428,15 +433,17 @@ pub mod tests {

let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));

let keypair = Arc::new(Keypair::new());
let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*capacity:*/ 128);
let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
let (repair_quic_endpoint_sender, _repair_quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
//start cluster_info1
let cluster_info1 =
ClusterInfo::new(target1.info.clone(), keypair, SocketAddrSpace::Unspecified);
let cluster_info1 = ClusterInfo::new(
target1.info.clone(),
target1_keypair.into(),
SocketAddrSpace::Unspecified,
);
cluster_info1.insert_info(leader.info);
let cref1 = Arc::new(cluster_info1);

Expand Down Expand Up @@ -464,6 +471,11 @@ pub mod tests {
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
let cluster_slots = Arc::new(ClusterSlots::default());
let wen_restart_repair_slots = if enable_wen_restart {
Some(Arc::new(RwLock::new(vec![])))
} else {
None
};
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -520,11 +532,28 @@ pub mod tests {
repair_quic_endpoint_sender,
outstanding_repair_requests,
cluster_slots,
None,
wen_restart_repair_slots,
)
.expect("assume success");
if enable_wen_restart {
assert!(tvu.replay_stage.is_none())
} else {
assert!(tvu.replay_stage.is_some())
}
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();
poh_service.join().unwrap();
}

#[test]
#[serial]
fn test_tvu_exit_no_wen_restart() {
test_tvu_exit(false);
}

#[test]
#[serial]
fn test_tvu_exit_with_wen_restart() {
test_tvu_exit(true);
}
}

0 comments on commit cb782d0

Please sign in to comment.