Skip to content

Commit

Permalink
Switch to local rayon thread pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
fkouteib committed Dec 11, 2024
1 parent b874697 commit 264907a
Showing 1 changed file with 44 additions and 53 deletions.
97 changes: 44 additions & 53 deletions rpc/src/transaction_status_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use {
crate::transaction_notifier_interface::TransactionNotifierArc,
crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::izip,
rayon::ThreadPoolBuilder,
solana_ledger::{
blockstore::{Blockstore, BlockstoreError},
blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage},
Expand All @@ -13,7 +14,7 @@ use {
std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex,
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
Expand All @@ -23,7 +24,7 @@ use {
const NUM_TSS_WORKER_THREADS: usize = 4;

pub struct TransactionStatusService {
thread_handles: Vec<JoinHandle<()>>,
thread_hdl: JoinHandle<()>,
}

impl TransactionStatusService {
Expand All @@ -36,63 +37,59 @@ impl TransactionStatusService {
enable_extended_tx_metadata_storage: bool,
exit: Arc<AtomicBool>,
) -> Self {
let mut thread_handles = vec![];
let write_transaction_status_receiver =
Arc::new(Mutex::new(write_transaction_status_receiver));

for worker in 0..NUM_TSS_WORKER_THREADS {
let write_transaction_status_receiver = Arc::clone(&write_transaction_status_receiver);
let max_complete_transaction_status_slot =
Arc::clone(&max_complete_transaction_status_slot);
let transaction_notifier = transaction_notifier.clone();
let blockstore = Arc::clone(&blockstore);
let exit = Arc::clone(&exit);

let thread_hdl = Builder::new()
.name(format!("solTxStatusWrtr-{}", worker))
.spawn(move || {
info!("TransactionStatusService worker {worker} has started");
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let thread_pool = ThreadPoolBuilder::new()
.num_threads(NUM_TSS_WORKER_THREADS)
.build()
.unwrap();

let message = {
let tss_receiver = write_transaction_status_receiver.lock().unwrap();
match tss_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(message) => message,
Err(RecvTimeoutError::Disconnected) => {
break;
}
Err(RecvTimeoutError::Timeout) => {
continue;
}
}
};
let thread_hdl = Builder::new()
.name("solTxStatusWrtr".to_string())
.spawn(move || {
info!("TransactionStatusService has started");
loop {
if exit.load(Ordering::Relaxed) {
break;
}

let message = match write_transaction_status_receiver
.recv_timeout(Duration::from_secs(1))
{
Ok(message) => message,
Err(RecvTimeoutError::Disconnected) => {
break;
}
Err(RecvTimeoutError::Timeout) => {
continue;
}
};

let max_complete_transaction_status_slot =
Arc::clone(&max_complete_transaction_status_slot);
let blockstore = Arc::clone(&blockstore);
let transaction_notifier = transaction_notifier.clone();
let exit_clone = Arc::clone(&exit);

thread_pool.spawn(move || {
match Self::write_transaction_status_batch(
message,
&max_complete_transaction_status_slot,
enable_rpc_transaction_history,
transaction_notifier.clone(),
transaction_notifier,
&blockstore,
enable_extended_tx_metadata_storage,
) {
Ok(_) => {}
Err(err) => {
error!("TransactionStatusService worker {worker} stopping due to error: {err}");
exit.store(true, Ordering::Relaxed);
break;
}
error!("TransactionStatusService stopping due to error: {err}");
exit_clone.store(true, Ordering::Relaxed);
}
}
info!("TransactionStatusService worker {worker} has stopped");
})
.unwrap();

thread_handles.push(thread_hdl);
}
Self { thread_handles }
});
}
info!("TransactionStatusService has stopped");
})
.unwrap();
Self { thread_hdl }
}

fn write_transaction_status_batch(
Expand Down Expand Up @@ -237,13 +234,7 @@ impl TransactionStatusService {
}

pub fn join(self) -> thread::Result<()> {
let mut result = Ok(());
for handle in self.thread_handles {
if let Err(err) = handle.join() {
result = Err(err);
}
}
result
self.thread_hdl.join()
}
}

Expand Down

0 comments on commit 264907a

Please sign in to comment.