From 8e9e92c98cc32f745ac8fe7bed7c974a73fae38a Mon Sep 17 00:00:00 2001 From: steviez Date: Fri, 6 Dec 2024 16:10:18 -0600 Subject: [PATCH] rpc: Reduce RewardsRecorderService channel recv() rate (#3979) RewardsRecorderService currently does recv_timeout(1s) on the channel. A new item will only come across the channel every time a Bank is frozen, or every 400ms. Adjust the service to use try_recv() instead every 100ms. Also, make the service set exit flag and exit on error instead of panic --- rpc/src/rewards_recorder_service.rs | 57 +++++++++++++++++++---------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/rpc/src/rewards_recorder_service.rs b/rpc/src/rewards_recorder_service.rs index 3467a151632317..5d4f62462ede67 100644 --- a/rpc/src/rewards_recorder_service.rs +++ b/rpc/src/rewards_recorder_service.rs @@ -2,9 +2,9 @@ //! persisting it into the `Blockstore`. use { - crossbeam_channel::RecvTimeoutError, + crossbeam_channel::TryRecvError, solana_ledger::{ - blockstore::Blockstore, + blockstore::{Blockstore, BlockstoreError}, blockstore_processor::{RewardsMessage, RewardsRecorderReceiver}, }, solana_runtime::bank::KeyedRewardsAndNumPartitions, @@ -14,7 +14,7 @@ use { atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, - thread::{self, Builder, JoinHandle}, + thread::{self, sleep, Builder, JoinHandle}, time::Duration, }, }; @@ -23,6 +23,10 @@ pub struct RewardsRecorderService { thread_hdl: JoinHandle<()>, } +// ReplayStage sends a new item to this service for every frozen Bank. Banks +// are frozen every 400ms at steady state so checking every 100ms is sufficient +const TRY_RECV_INTERVAL: Duration = Duration::from_millis(100); + impl RewardsRecorderService { pub fn new( rewards_receiver: RewardsRecorderReceiver, @@ -38,11 +42,25 @@ impl RewardsRecorderService { if exit.load(Ordering::Relaxed) { break; } - if let Err(RecvTimeoutError::Disconnected) = Self::write_rewards( - &rewards_receiver, - &max_complete_rewards_slot, - &blockstore, - ) { + + let rewards = match rewards_receiver.try_recv() { + Ok(rewards) => rewards, + Err(TryRecvError::Empty) => { + sleep(TRY_RECV_INTERVAL); + continue; + } + Err(err @ TryRecvError::Disconnected) => { + info!("RewardsRecorderService is stopping because: {err}"); + break; + } + }; + + if let Err(err) = + Self::write_rewards(rewards, &max_complete_rewards_slot, &blockstore) + { + error!("RewardsRecorderService is stopping because: {err}"); + // Set the exit flag to allow other services to gracefully stop + exit.store(true, Ordering::Relaxed); break; } } @@ -53,11 +71,11 @@ impl RewardsRecorderService { } fn write_rewards( - rewards_receiver: &RewardsRecorderReceiver, + rewards: RewardsMessage, max_complete_rewards_slot: &Arc, blockstore: &Blockstore, - ) -> Result<(), RecvTimeoutError> { - match rewards_receiver.recv_timeout(Duration::from_secs(1))? { + ) -> Result<(), BlockstoreError> { + match rewards { RewardsMessage::Batch(( slot, KeyedRewardsAndNumPartitions { @@ -76,20 +94,19 @@ impl RewardsRecorderService { }) .collect(); - blockstore - .write_rewards( - slot, - RewardsAndNumPartitions { - rewards: rpc_rewards, - num_partitions, - }, - ) - .expect("Expect database write to succeed"); + blockstore.write_rewards( + slot, + RewardsAndNumPartitions { + rewards: rpc_rewards, + num_partitions, + }, + )?; } RewardsMessage::Complete(slot) => { max_complete_rewards_slot.fetch_max(slot, Ordering::SeqCst); } } + Ok(()) }