diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index a61058633..609230304 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -6,7 +6,6 @@ use account_compression::utils::constants::{ }; use async_trait::async_trait; use forester_utils::forester_epoch::{TreeAccounts, TreeType}; -use futures::future::join_all; use light_client::{ indexer::Indexer, rpc::{RetryConfig, RpcConnection}, @@ -34,7 +33,7 @@ use tokio::{ sync::Mutex, time::{sleep, Instant}, }; -use tracing::{debug, warn}; +use tracing::{debug, log::info, warn}; use url::Url; use crate::{ @@ -46,9 +45,7 @@ use crate::{ GetPriorityFeeEstimateResponse, RpcRequest, RpcResponse, }, queue_helpers::fetch_queue_item_data, - smart_transaction::{ - create_smart_transaction, send_and_confirm_transaction, CreateSmartTransactionConfig, - }, + smart_transaction::{create_smart_transaction, CreateSmartTransactionConfig}, Result, }; #[async_trait] @@ -231,7 +228,7 @@ pub async fn send_batched_transactions( config.build_transaction_batch_config, ) .await?; - debug!( + info!( "build transaction time {:?}", transaction_build_time_start.elapsed() ); @@ -254,50 +251,46 @@ pub async fn send_batched_transactions( preflight_commitment: Some(CommitmentLevel::Confirmed), ..Default::default() }; - // Send and confirm all transactions in the batch non-blocking. - let send_futures: Vec<_> = transactions - .into_iter() - .map(|tx| { - let pool_clone = Arc::clone(&pool); - async move { - match pool_clone.get_connection().await { - Ok(mut rpc) => { - send_and_confirm_transaction( - &mut rpc, - &tx, - send_transaction_config, - last_valid_block_height, - config.retry_config.timeout, - ) - .await - } - Err(e) => Err(light_client::rpc::RpcError::CustomError(format!( - "Failed to get RPC connection: {}", - e - ))), + + let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(120); + for tx in transactions { + let tx_sender = tx_sender.clone(); + let pool_clone = Arc::clone(&pool); + let config = send_transaction_config; + + tokio::spawn(async move { + match pool_clone.get_connection().await { + Ok(mut rpc) => { + let result = rpc.process_transaction_with_config(tx, config).await; + let _ = tx_sender.send(result).await; + } + Err(e) => { + warn!("Failed to get RPC connection: {}", e); } } - }) - .collect(); - - let results = join_all(send_futures).await; + }); + } + drop(tx_sender); - // Evaluate results - for result in results { + while let Some(result) = tx_receiver.recv().await { match result { Ok(signature) => { num_sent_transactions += 1; - println!("Transaction sent: {:?}", signature); + // debug!("Transaction sent: {:?}", signature); + info!( + "tree {} / queue {} / tx {:?}", + tree_accounts.merkle_tree.to_string(), + tree_accounts.queue.to_string(), + signature + ); } Err(e) => warn!("Transaction failed: {:?}", e), } } - num_batches += 1; let batch_duration = batch_start.elapsed(); - debug!("Batch duration: {:?}", batch_duration); + info!("Batch duration: {:?}", batch_duration); - // 8. Await minimum batch time. if start_time.elapsed() + config.retry_config.retry_delay < config.retry_config.timeout { sleep(config.retry_config.retry_delay).await; @@ -305,6 +298,11 @@ pub async fn send_batched_transactions( break; } + if num_batches >= config.num_batches { + debug!("Reached max number of batches"); + break; + } + // 9. Check if we reached max number of batches. if num_batches >= config.num_batches { debug!("Reached max number of batches"); @@ -515,7 +513,7 @@ pub async fn fetch_proofs_and_create_instructions) -> Result { - if url.host_str() == Some("localhost") { + if url.host_str() != Some("mainnet") { return Ok(10_000); } diff --git a/sdk-libs/client/src/rpc/rpc_connection.rs b/sdk-libs/client/src/rpc/rpc_connection.rs index 34cfd5d33..9ebb85db4 100644 --- a/sdk-libs/client/src/rpc/rpc_connection.rs +++ b/sdk-libs/client/src/rpc/rpc_connection.rs @@ -43,6 +43,12 @@ pub trait RpcConnection: Send + Sync + Debug + 'static { transaction: Transaction, ) -> Result<(Signature, Slot), RpcError>; + async fn process_transaction_with_config( + &mut self, + transaction: Transaction, + config: RpcSendTransactionConfig, + ) -> Result; + async fn create_and_send_transaction_with_event( &mut self, instructions: &[Instruction], diff --git a/sdk-libs/client/src/rpc/solana_rpc.rs b/sdk-libs/client/src/rpc/solana_rpc.rs index c5461381f..fa80ae8dc 100644 --- a/sdk-libs/client/src/rpc/solana_rpc.rs +++ b/sdk-libs/client/src/rpc/solana_rpc.rs @@ -273,6 +273,15 @@ impl RpcConnection for SolanaRpcConnection { .await } + async fn process_transaction_with_config( + &mut self, + transaction: Transaction, + config: RpcSendTransactionConfig, + ) -> Result { + self.send_transaction_with_config(&transaction, RpcSendTransactionConfig { ..config }) + .await + } + async fn create_and_send_transaction_with_event( &mut self, instructions: &[Instruction], diff --git a/sdk-libs/program-test/src/test_rpc.rs b/sdk-libs/program-test/src/test_rpc.rs index 3864f46d0..7eec9aea3 100644 --- a/sdk-libs/program-test/src/test_rpc.rs +++ b/sdk-libs/program-test/src/test_rpc.rs @@ -100,6 +100,22 @@ impl RpcConnection for ProgramTestRpcConnection { Ok((sig, slot)) } + async fn process_transaction_with_config( + &mut self, + transaction: Transaction, + _config: RpcSendTransactionConfig, + ) -> Result { + let sig = *transaction.signatures.first().unwrap(); + let result = self + .context + .banks_client + .process_transaction_with_metadata(transaction) + .await + .map_err(RpcError::from)?; + result.result.map_err(RpcError::TransactionError)?; + Ok(sig) + } + async fn create_and_send_transaction_with_event( &mut self, instruction: &[Instruction],