Skip to content

Commit

Permalink
chore: forester simple fast rpc.process_transaction (#1509)
Browse files Browse the repository at this point in the history
* process-tx

* refactor transaction handling

* cleanup

---------

Co-authored-by: Sergey Timoshin <[email protected]>
  • Loading branch information
SwenSchaeferjohann and sergeytimoshin authored Jan 23, 2025
1 parent bdb8737 commit 72a5a3b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 38 deletions.
74 changes: 36 additions & 38 deletions forester/src/send_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use account_compression::utils::constants::{
};
use async_trait::async_trait;
use forester_utils::forester_epoch::{TreeAccounts, TreeType};
use futures::future::join_all;
use light_client::{
indexer::Indexer,
rpc::{RetryConfig, RpcConnection},
Expand Down Expand Up @@ -34,7 +33,7 @@ use tokio::{
sync::Mutex,
time::{sleep, Instant},
};
use tracing::{debug, warn};
use tracing::{debug, log::info, warn};
use url::Url;

use crate::{
Expand All @@ -46,9 +45,7 @@ use crate::{
GetPriorityFeeEstimateResponse, RpcRequest, RpcResponse,
},
queue_helpers::fetch_queue_item_data,
smart_transaction::{
create_smart_transaction, send_and_confirm_transaction, CreateSmartTransactionConfig,
},
smart_transaction::{create_smart_transaction, CreateSmartTransactionConfig},
Result,
};
#[async_trait]
Expand Down Expand Up @@ -231,7 +228,7 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
config.build_transaction_batch_config,
)
.await?;
debug!(
info!(
"build transaction time {:?}",
transaction_build_time_start.elapsed()
);
Expand All @@ -254,57 +251,58 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
preflight_commitment: Some(CommitmentLevel::Confirmed),
..Default::default()
};
// Send and confirm all transactions in the batch non-blocking.
let send_futures: Vec<_> = transactions
.into_iter()
.map(|tx| {
let pool_clone = Arc::clone(&pool);
async move {
match pool_clone.get_connection().await {
Ok(mut rpc) => {
send_and_confirm_transaction(
&mut rpc,
&tx,
send_transaction_config,
last_valid_block_height,
config.retry_config.timeout,
)
.await
}
Err(e) => Err(light_client::rpc::RpcError::CustomError(format!(
"Failed to get RPC connection: {}",
e
))),

let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(120);
for tx in transactions {
let tx_sender = tx_sender.clone();
let pool_clone = Arc::clone(&pool);
let config = send_transaction_config;

tokio::spawn(async move {
match pool_clone.get_connection().await {
Ok(mut rpc) => {
let result = rpc.process_transaction_with_config(tx, config).await;
let _ = tx_sender.send(result).await;
}
Err(e) => {
warn!("Failed to get RPC connection: {}", e);
}
}
})
.collect();

let results = join_all(send_futures).await;
});
}
drop(tx_sender);

// Evaluate results
for result in results {
while let Some(result) = tx_receiver.recv().await {
match result {
Ok(signature) => {
num_sent_transactions += 1;
println!("Transaction sent: {:?}", signature);
// debug!("Transaction sent: {:?}", signature);
info!(
"tree {} / queue {} / tx {:?}",
tree_accounts.merkle_tree.to_string(),
tree_accounts.queue.to_string(),
signature
);
}
Err(e) => warn!("Transaction failed: {:?}", e),
}
}

num_batches += 1;
let batch_duration = batch_start.elapsed();
debug!("Batch duration: {:?}", batch_duration);
info!("Batch duration: {:?}", batch_duration);

// 8. Await minimum batch time.
if start_time.elapsed() + config.retry_config.retry_delay < config.retry_config.timeout
{
sleep(config.retry_config.retry_delay).await;
} else {
break;
}

if num_batches >= config.num_batches {
debug!("Reached max number of batches");
break;
}

// 9. Check if we reached max number of batches.
if num_batches >= config.num_batches {
debug!("Reached max number of batches");
Expand Down Expand Up @@ -515,7 +513,7 @@ pub async fn fetch_proofs_and_create_instructions<R: RpcConnection, I: Indexer<R

/// Request priority fee estimate from Helius RPC endpoint
pub async fn request_priority_fee_estimate(url: &Url, account_keys: Vec<Pubkey>) -> Result<u64> {
if url.host_str() == Some("localhost") {
if url.host_str() != Some("mainnet") {
return Ok(10_000);
}

Expand Down
6 changes: 6 additions & 0 deletions sdk-libs/client/src/rpc/rpc_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub trait RpcConnection: Send + Sync + Debug + 'static {
transaction: Transaction,
) -> Result<(Signature, Slot), RpcError>;

async fn process_transaction_with_config(
&mut self,
transaction: Transaction,
config: RpcSendTransactionConfig,
) -> Result<Signature, RpcError>;

async fn create_and_send_transaction_with_event<T>(
&mut self,
instructions: &[Instruction],
Expand Down
9 changes: 9 additions & 0 deletions sdk-libs/client/src/rpc/solana_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,15 @@ impl RpcConnection for SolanaRpcConnection {
.await
}

async fn process_transaction_with_config(
&mut self,
transaction: Transaction,
config: RpcSendTransactionConfig,
) -> Result<Signature, RpcError> {
self.send_transaction_with_config(&transaction, RpcSendTransactionConfig { ..config })
.await
}

async fn create_and_send_transaction_with_event<T>(
&mut self,
instructions: &[Instruction],
Expand Down
16 changes: 16 additions & 0 deletions sdk-libs/program-test/src/test_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ impl RpcConnection for ProgramTestRpcConnection {
Ok((sig, slot))
}

async fn process_transaction_with_config(
&mut self,
transaction: Transaction,
_config: RpcSendTransactionConfig,
) -> Result<Signature, RpcError> {
let sig = *transaction.signatures.first().unwrap();
let result = self
.context
.banks_client
.process_transaction_with_metadata(transaction)
.await
.map_err(RpcError::from)?;
result.result.map_err(RpcError::TransactionError)?;
Ok(sig)
}

async fn create_and_send_transaction_with_event<T>(
&mut self,
instruction: &[Instruction],
Expand Down

0 comments on commit 72a5a3b

Please sign in to comment.