diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index c0e3a47f93e6f7..0ed23019a074f3 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -13,15 +13,17 @@ use { std::{ sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, + Arc, Mutex, }, thread::{self, Builder, JoinHandle}, time::Duration, }, }; +const NUM_TSS_WORKER_THREADS: usize = 4; + pub struct TransactionStatusService { - thread_hdl: JoinHandle<()>, + thread_handles: Vec>, } impl TransactionStatusService { @@ -34,47 +36,63 @@ impl TransactionStatusService { enable_extended_tx_metadata_storage: bool, exit: Arc, ) -> Self { - let thread_hdl = Builder::new() - .name("solTxStatusWrtr".to_string()) - .spawn(move || { - info!("TransactionStatusService has started"); - loop { - if exit.load(Ordering::Relaxed) { - break; - } - - let message = match write_transaction_status_receiver - .recv_timeout(Duration::from_secs(1)) - { - Ok(message) => message, - Err(RecvTimeoutError::Disconnected) => { - break; + let mut thread_handles = vec![]; + let write_transaction_status_receiver = + Arc::new(Mutex::new(write_transaction_status_receiver)); + + for worker in 0..NUM_TSS_WORKER_THREADS { + let write_transaction_status_receiver = Arc::clone(&write_transaction_status_receiver); + let max_complete_transaction_status_slot = + Arc::clone(&max_complete_transaction_status_slot); + let transaction_notifier = transaction_notifier.clone(); + let blockstore = Arc::clone(&blockstore); + let exit = Arc::clone(&exit); + + let thread_hdl = Builder::new() + .name(format!("solTxStatusWrtr-{}", worker)) + .spawn(move || { + info!("TransactionStatusService worker {worker} has started"); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let message = { + let tss_receiver = write_transaction_status_receiver.lock().unwrap(); + match tss_receiver.recv_timeout(Duration::from_secs(1)) { + Ok(message) => message, + Err(RecvTimeoutError::Disconnected) => { + break; + } + Err(RecvTimeoutError::Timeout) => { + continue; + } + } + }; + + match Self::write_transaction_status_batch( + message, + &max_complete_transaction_status_slot, + enable_rpc_transaction_history, + transaction_notifier.clone(), + &blockstore, + enable_extended_tx_metadata_storage, + ) { + Ok(_) => {} + Err(err) => { + error!("TransactionStatusService worker {worker} stopping due to error: {err}"); + exit.store(true, Ordering::Relaxed); + break; + } + } } - Err(RecvTimeoutError::Timeout) => { - continue; - } - }; + info!("TransactionStatusService worker {worker} has stopped"); + }) + .unwrap(); - match Self::write_transaction_status_batch( - message, - &max_complete_transaction_status_slot, - enable_rpc_transaction_history, - transaction_notifier.clone(), - &blockstore, - enable_extended_tx_metadata_storage, - ) { - Ok(_) => {} - Err(err) => { - error!("TransactionStatusService stopping due to error: {err}"); - exit.store(true, Ordering::Relaxed); - break; - } - } - } - info!("TransactionStatusService has stopped"); - }) - .unwrap(); - Self { thread_hdl } + thread_handles.push(thread_hdl); + } + Self { thread_handles } } fn write_transaction_status_batch( @@ -219,7 +237,13 @@ impl TransactionStatusService { } pub fn join(self) -> thread::Result<()> { - self.thread_hdl.join() + let mut result = Ok(()); + for handle in self.thread_handles { + if let Err(err) = handle.join() { + result = Err(err); + } + } + result } }