From 264907ac9a93899c2b18b22e4ab22db66d569380 Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Wed, 11 Dec 2024 11:54:07 -0800 Subject: [PATCH] Switch to local rayon thread pool. --- rpc/src/transaction_status_service.rs | 97 ++++++++++++--------------- 1 file changed, 44 insertions(+), 53 deletions(-) diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index 0ed23019a074f3..c14a54f41ac5a2 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -2,6 +2,7 @@ use { crate::transaction_notifier_interface::TransactionNotifierArc, crossbeam_channel::{Receiver, RecvTimeoutError}, itertools::izip, + rayon::ThreadPoolBuilder, solana_ledger::{ blockstore::{Blockstore, BlockstoreError}, blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage}, @@ -13,7 +14,7 @@ use { std::{ sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, + Arc, }, thread::{self, Builder, JoinHandle}, time::Duration, @@ -23,7 +24,7 @@ use { const NUM_TSS_WORKER_THREADS: usize = 4; pub struct TransactionStatusService { - thread_handles: Vec>, + thread_hdl: JoinHandle<()>, } impl TransactionStatusService { @@ -36,63 +37,59 @@ impl TransactionStatusService { enable_extended_tx_metadata_storage: bool, exit: Arc, ) -> Self { - let mut thread_handles = vec![]; - let write_transaction_status_receiver = - Arc::new(Mutex::new(write_transaction_status_receiver)); - - for worker in 0..NUM_TSS_WORKER_THREADS { - let write_transaction_status_receiver = Arc::clone(&write_transaction_status_receiver); - let max_complete_transaction_status_slot = - Arc::clone(&max_complete_transaction_status_slot); - let transaction_notifier = transaction_notifier.clone(); - let blockstore = Arc::clone(&blockstore); - let exit = Arc::clone(&exit); - - let thread_hdl = Builder::new() - .name(format!("solTxStatusWrtr-{}", worker)) - .spawn(move || { - info!("TransactionStatusService worker {worker} has started"); - loop { - if exit.load(Ordering::Relaxed) { - break; - } + let thread_pool = ThreadPoolBuilder::new() + .num_threads(NUM_TSS_WORKER_THREADS) + .build() + .unwrap(); - let message = { - let tss_receiver = write_transaction_status_receiver.lock().unwrap(); - match tss_receiver.recv_timeout(Duration::from_secs(1)) { - Ok(message) => message, - Err(RecvTimeoutError::Disconnected) => { - break; - } - Err(RecvTimeoutError::Timeout) => { - continue; - } - } - }; + let thread_hdl = Builder::new() + .name("solTxStatusWrtr".to_string()) + .spawn(move || { + info!("TransactionStatusService has started"); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let message = match write_transaction_status_receiver + .recv_timeout(Duration::from_secs(1)) + { + Ok(message) => message, + Err(RecvTimeoutError::Disconnected) => { + break; + } + Err(RecvTimeoutError::Timeout) => { + continue; + } + }; + + let max_complete_transaction_status_slot = + Arc::clone(&max_complete_transaction_status_slot); + let blockstore = Arc::clone(&blockstore); + let transaction_notifier = transaction_notifier.clone(); + let exit_clone = Arc::clone(&exit); + thread_pool.spawn(move || { match Self::write_transaction_status_batch( message, &max_complete_transaction_status_slot, enable_rpc_transaction_history, - transaction_notifier.clone(), + transaction_notifier, &blockstore, enable_extended_tx_metadata_storage, ) { Ok(_) => {} Err(err) => { - error!("TransactionStatusService worker {worker} stopping due to error: {err}"); - exit.store(true, Ordering::Relaxed); - break; - } + error!("TransactionStatusService stopping due to error: {err}"); + exit_clone.store(true, Ordering::Relaxed); } } - info!("TransactionStatusService worker {worker} has stopped"); - }) - .unwrap(); - - thread_handles.push(thread_hdl); - } - Self { thread_handles } + }); + } + info!("TransactionStatusService has stopped"); + }) + .unwrap(); + Self { thread_hdl } } fn write_transaction_status_batch( @@ -237,13 +234,7 @@ impl TransactionStatusService { } pub fn join(self) -> thread::Result<()> { - let mut result = Ok(()); - for handle in self.thread_handles { - if let Err(err) = handle.join() { - result = Err(err); - } - } - result + self.thread_hdl.join() } }