Skip to content

Commit

Permalink
Make TSS multi-threaded.
Browse files Browse the repository at this point in the history
  • Loading branch information
fkouteib committed Dec 10, 2024
1 parent 6c6c26e commit b874697
Showing 1 changed file with 66 additions and 42 deletions.
108 changes: 66 additions & 42 deletions rpc/src/transaction_status_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ use {
std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
Arc, Mutex,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};

const NUM_TSS_WORKER_THREADS: usize = 4;

pub struct TransactionStatusService {
thread_hdl: JoinHandle<()>,
thread_handles: Vec<JoinHandle<()>>,
}

impl TransactionStatusService {
Expand All @@ -34,47 +36,63 @@ impl TransactionStatusService {
enable_extended_tx_metadata_storage: bool,
exit: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
.name("solTxStatusWrtr".to_string())
.spawn(move || {
info!("TransactionStatusService has started");
loop {
if exit.load(Ordering::Relaxed) {
break;
}

let message = match write_transaction_status_receiver
.recv_timeout(Duration::from_secs(1))
{
Ok(message) => message,
Err(RecvTimeoutError::Disconnected) => {
break;
let mut thread_handles = vec![];
let write_transaction_status_receiver =
Arc::new(Mutex::new(write_transaction_status_receiver));

for worker in 0..NUM_TSS_WORKER_THREADS {
let write_transaction_status_receiver = Arc::clone(&write_transaction_status_receiver);
let max_complete_transaction_status_slot =
Arc::clone(&max_complete_transaction_status_slot);
let transaction_notifier = transaction_notifier.clone();
let blockstore = Arc::clone(&blockstore);
let exit = Arc::clone(&exit);

let thread_hdl = Builder::new()
.name(format!("solTxStatusWrtr-{}", worker))
.spawn(move || {
info!("TransactionStatusService worker {worker} has started");
loop {
if exit.load(Ordering::Relaxed) {
break;
}

let message = {
let tss_receiver = write_transaction_status_receiver.lock().unwrap();
match tss_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(message) => message,
Err(RecvTimeoutError::Disconnected) => {
break;
}
Err(RecvTimeoutError::Timeout) => {
continue;
}
}
};

match Self::write_transaction_status_batch(
message,
&max_complete_transaction_status_slot,
enable_rpc_transaction_history,
transaction_notifier.clone(),
&blockstore,
enable_extended_tx_metadata_storage,
) {
Ok(_) => {}
Err(err) => {
error!("TransactionStatusService worker {worker} stopping due to error: {err}");
exit.store(true, Ordering::Relaxed);
break;
}
}
}
Err(RecvTimeoutError::Timeout) => {
continue;
}
};
info!("TransactionStatusService worker {worker} has stopped");
})
.unwrap();

match Self::write_transaction_status_batch(
message,
&max_complete_transaction_status_slot,
enable_rpc_transaction_history,
transaction_notifier.clone(),
&blockstore,
enable_extended_tx_metadata_storage,
) {
Ok(_) => {}
Err(err) => {
error!("TransactionStatusService stopping due to error: {err}");
exit.store(true, Ordering::Relaxed);
break;
}
}
}
info!("TransactionStatusService has stopped");
})
.unwrap();
Self { thread_hdl }
thread_handles.push(thread_hdl);
}
Self { thread_handles }
}

fn write_transaction_status_batch(
Expand Down Expand Up @@ -219,7 +237,13 @@ impl TransactionStatusService {
}

pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
let mut result = Ok(());
for handle in self.thread_handles {
if let Err(err) = handle.join() {
result = Err(err);
}
}
result
}
}

Expand Down

0 comments on commit b874697

Please sign in to comment.