Skip to content

Commit

Permalink
Remove dropped threads on the tx executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Aursen committed Sep 2, 2024
1 parent fd33f0a commit b5eb726
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 13 deletions.
13 changes: 4 additions & 9 deletions plugin/src/executors/state/executable_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static THREAD_TIMEOUT_WINDOW: u64 = 24;
static EXPONENTIAL_BACKOFF_CONSTANT: u32 = 2;

#[derive(Default)]
pub struct ExecutableThreads(RwLock<HashMap<Pubkey, ExecutableThreadMetadata>>);
pub struct ExecutableThreads(RwLock<HashMap<Pubkey, ExecutableThreadMetadata>>, AtomicU64);

impl ExecutableThreads {
pub async fn increment_simulation_failure(&self, thread_pubkey: Pubkey) {
Expand Down Expand Up @@ -68,12 +68,7 @@ impl ExecutableThreads {
w_state.remove(thread);
}

pub async fn rebase_threads(
&self,
slot: u64,
threads: &HashSet<Pubkey>,
dropped_threads: &AtomicU64,
) {
pub async fn rebase_threads(&self, slot: u64, threads: &HashSet<Pubkey>) {
// Index the provided threads as executable.
let mut w_state = self.0.write().await;
threads.iter().for_each(|pubkey| {
Expand All @@ -89,15 +84,15 @@ impl ExecutableThreads {
// Drop threads that cross the simulation failure threshold.
w_state.retain(|_thread_pubkey, metadata| {
if metadata.simulation_failures > MAX_THREAD_SIMULATION_FAILURES {
dropped_threads.fetch_add(1, Ordering::Relaxed);
self.1.fetch_add(1, Ordering::Relaxed);
false
} else {
true
}
});
info!(
"dropped_threads: {:?} executable_threads: {:?}",
dropped_threads.load(Ordering::Relaxed),
self.1.load(Ordering::Relaxed),
*w_state
);
}
Expand Down
6 changes: 2 additions & 4 deletions plugin/src/executors/tx.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
sync::{atomic::AtomicU64, Arc},
sync::Arc,
};

use bincode::serialize;
Expand Down Expand Up @@ -39,7 +39,6 @@ pub struct TxExecutor {
pub executable_threads: ExecutableThreads,
pub transaction_history: TransactionHistory,
pub rotation_history: RotationHistory,
pub dropped_threads: AtomicU64,
pub keypair: Keypair,
}

Expand All @@ -63,7 +62,6 @@ impl TxExecutor {
executable_threads: ExecutableThreads::default(),
transaction_history: TransactionHistory::default(),
rotation_history: RotationHistory::default(),
dropped_threads: AtomicU64::new(0),
keypair: read_or_new_keypair(config.keypath),
}
}
Expand All @@ -76,7 +74,7 @@ impl TxExecutor {
runtime: Arc<Runtime>,
) -> PluginResult<()> {
self.executable_threads
.rebase_threads(slot, &thread_pubkeys, &self.dropped_threads)
.rebase_threads(slot, &thread_pubkeys)
.await;

// Process retries.
Expand Down

0 comments on commit b5eb726

Please sign in to comment.