Skip to content

Commit

Permalink
rpc: Reduce RewardsRecorderService channel recv() rate (#3979)
Browse files Browse the repository at this point in the history
RewardsRecorderService currently does recv_timeout(1s) on the channel. A
new item will only come across the channel every time a Bank is frozen,
or every 400ms.

Adjust the service to use try_recv() instead every 100ms. Also, make
the service set exit flag and exit on error instead of panic
  • Loading branch information
steviez authored Dec 6, 2024
1 parent 9d99617 commit 8e9e92c
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions rpc/src/rewards_recorder_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
//! persisting it into the `Blockstore`.
use {
crossbeam_channel::RecvTimeoutError,
crossbeam_channel::TryRecvError,
solana_ledger::{
blockstore::Blockstore,
blockstore::{Blockstore, BlockstoreError},
blockstore_processor::{RewardsMessage, RewardsRecorderReceiver},
},
solana_runtime::bank::KeyedRewardsAndNumPartitions,
Expand All @@ -14,7 +14,7 @@ use {
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
};
Expand All @@ -23,6 +23,10 @@ pub struct RewardsRecorderService {
thread_hdl: JoinHandle<()>,
}

// ReplayStage sends a new item to this service for every frozen Bank. Banks
// are frozen every 400ms at steady state so checking every 100ms is sufficient
const TRY_RECV_INTERVAL: Duration = Duration::from_millis(100);

impl RewardsRecorderService {
pub fn new(
rewards_receiver: RewardsRecorderReceiver,
Expand All @@ -38,11 +42,25 @@ impl RewardsRecorderService {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(RecvTimeoutError::Disconnected) = Self::write_rewards(
&rewards_receiver,
&max_complete_rewards_slot,
&blockstore,
) {

let rewards = match rewards_receiver.try_recv() {
Ok(rewards) => rewards,
Err(TryRecvError::Empty) => {
sleep(TRY_RECV_INTERVAL);
continue;
}
Err(err @ TryRecvError::Disconnected) => {
info!("RewardsRecorderService is stopping because: {err}");
break;
}
};

if let Err(err) =
Self::write_rewards(rewards, &max_complete_rewards_slot, &blockstore)
{
error!("RewardsRecorderService is stopping because: {err}");
// Set the exit flag to allow other services to gracefully stop
exit.store(true, Ordering::Relaxed);
break;
}
}
Expand All @@ -53,11 +71,11 @@ impl RewardsRecorderService {
}

fn write_rewards(
rewards_receiver: &RewardsRecorderReceiver,
rewards: RewardsMessage,
max_complete_rewards_slot: &Arc<AtomicU64>,
blockstore: &Blockstore,
) -> Result<(), RecvTimeoutError> {
match rewards_receiver.recv_timeout(Duration::from_secs(1))? {
) -> Result<(), BlockstoreError> {
match rewards {
RewardsMessage::Batch((
slot,
KeyedRewardsAndNumPartitions {
Expand All @@ -76,20 +94,19 @@ impl RewardsRecorderService {
})
.collect();

blockstore
.write_rewards(
slot,
RewardsAndNumPartitions {
rewards: rpc_rewards,
num_partitions,
},
)
.expect("Expect database write to succeed");
blockstore.write_rewards(
slot,
RewardsAndNumPartitions {
rewards: rpc_rewards,
num_partitions,
},
)?;
}
RewardsMessage::Complete(slot) => {
max_complete_rewards_slot.fetch_max(slot, Ordering::SeqCst);
}
}

Ok(())
}

Expand Down

0 comments on commit 8e9e92c

Please sign in to comment.