Skip to content

Commit

Permalink
feat: add retry logic to RPC methods (#1166)
Browse files Browse the repository at this point in the history
* Add async support and retry logic to RPC methods

Added the 'async-trait' crate to provide async trait methods in RPC interfaces.
Implemented async versions of existing synchronous methods and added retry logic for better error handling in 'SolanaRpcConnection'.

* Switch to async_trait for TransactionBuilder methods

The `build_signed_transaction_batch` method in `TransactionBuilder` now uses `async_trait` to support async/await syntax. This change ensures better concurrency handling and simplifies the implementation of asynchronous code within the trait. Additionally, added a TODO comment about optimizing retry logic for sending transactions.

* Reorganize imports in send_transaction.rs

Move `async_trait::async_trait` to the correct location. This change improves code readability by maintaining a consistent import ordering.
  • Loading branch information
sergeytimoshin authored Sep 8, 2024
1 parent 183bcbf commit c7491d0
Show file tree
Hide file tree
Showing 14 changed files with 290 additions and 245 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/token-escrow/programs/token-escrow/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async fn test_escrow_pda() {

assert_rpc_error(result, 0, EscrowError::EscrowLocked.into()).unwrap();

rpc.warp_to_slot(1000).unwrap();
rpc.warp_to_slot(1000).await.unwrap();
// try withdrawal with invalid signer
let result = perform_withdrawal_failing(
&mut rpc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn test_escrow_with_compressed_pda() {
let rpc_error = RpcError::TransactionError(transaction_error);
assert!(matches!(result, Err(error) if error.to_string() == rpc_error.to_string()));

rpc.warp_to_slot(lockup_end + 1).unwrap();
rpc.warp_to_slot(lockup_end + 1).await.unwrap();
perform_withdrawal_with_event(
&mut rpc,
&mut test_indexer,
Expand Down
1 change: 1 addition & 0 deletions forester-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ log = "0.4"
num-bigint = "0.4.6"
num-traits = "0.2.19"
reqwest = "0.11.26"
async-trait = "0.1.82"
137 changes: 46 additions & 91 deletions forester-utils/src/rpc/rpc_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::transaction_params::TransactionParams;
use anchor_lang::solana_program::clock::Slot;
use anchor_lang::solana_program::instruction::Instruction;
use anchor_lang::AnchorDeserialize;
use async_trait::async_trait;
use solana_sdk::account::{Account, AccountSharedData};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
Expand All @@ -12,126 +13,80 @@ use solana_sdk::signature::{Keypair, Signature};
use solana_sdk::transaction::Transaction;
use std::fmt::Debug;

#[async_trait]
pub trait RpcConnection: Send + Sync + Debug + 'static {
fn new<U: ToString>(_url: U, _commitment_config: Option<CommitmentConfig>) -> Self
fn new<U: ToString>(url: U, commitment_config: Option<CommitmentConfig>) -> Self
where
Self: Sized,
{
unimplemented!()
}

fn health(&self) -> Result<(), RpcError> {
unimplemented!()
}
Self: Sized;

fn get_block_time(&self, _slot: u64) -> Result<i64, RpcError> {
unimplemented!()
}
fn get_payer(&self) -> &Keypair;
fn get_url(&self) -> String;

fn get_program_accounts(&self, program_id: &Pubkey)
-> Result<Vec<(Pubkey, Account)>, RpcError>;
async fn health(&self) -> Result<(), RpcError>;
async fn get_block_time(&self, slot: u64) -> Result<i64, RpcError>;
async fn get_epoch_info(&self) -> Result<EpochInfo, RpcError>;

fn process_transaction(
async fn get_program_accounts(
&self,
program_id: &Pubkey,
) -> Result<Vec<(Pubkey, Account)>, RpcError>;
async fn process_transaction(
&mut self,
transaction: Transaction,
) -> impl std::future::Future<Output = Result<Signature, RpcError>> + Send;

fn process_transaction_with_context(
) -> Result<Signature, RpcError>;
async fn process_transaction_with_context(
&mut self,
transaction: Transaction,
) -> impl std::future::Future<Output = Result<(Signature, Slot), RpcError>> + Send;
) -> Result<(Signature, Slot), RpcError>;

fn create_and_send_transaction_with_event<T>(
async fn create_and_send_transaction_with_event<T>(
&mut self,
instruction: &[Instruction],
instructions: &[Instruction],
authority: &Pubkey,
signers: &[&Keypair],
transaction_params: Option<TransactionParams>,
) -> impl std::future::Future<Output = Result<Option<(T, Signature, Slot)>, RpcError>> + Send
) -> Result<Option<(T, Signature, Slot)>, RpcError>
where
T: AnchorDeserialize + Send + Debug;

fn create_and_send_transaction<'a>(
async fn create_and_send_transaction<'a>(
&'a mut self,
instruction: &'a [Instruction],
instructions: &'a [Instruction],
payer: &'a Pubkey,
signers: &'a [&'a Keypair],
) -> impl std::future::Future<Output = Result<Signature, RpcError>> + Send + 'a {
async move {
let blockhash = self.get_latest_blockhash().await?;
let transaction = Transaction::new_signed_with_payer(
instruction,
Some(payer),
&signers.to_vec(),
blockhash,
);
let signature = transaction.signatures[0];
self.process_transaction(transaction).await?;
Ok(signature)
}
) -> Result<Signature, RpcError> {
let blockhash = self.get_latest_blockhash().await?;
let transaction =
Transaction::new_signed_with_payer(instructions, Some(payer), signers, blockhash);
self.process_transaction(transaction).await
}

fn confirm_transaction(
&self,
transaction: Signature,
) -> impl std::future::Future<Output = Result<bool, RpcError>> + Send;

fn get_payer(&self) -> &Keypair;
fn get_account(
&mut self,
address: Pubkey,
) -> impl std::future::Future<Output = Result<Option<Account>, RpcError>> + Send;
async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError>;
async fn get_account(&mut self, address: Pubkey) -> Result<Option<Account>, RpcError>;
fn set_account(&mut self, address: &Pubkey, account: &AccountSharedData);

fn get_minimum_balance_for_rent_exemption(
async fn get_minimum_balance_for_rent_exemption(
&mut self,
data_len: usize,
) -> impl std::future::Future<Output = Result<u64, RpcError>> + Send;
) -> Result<u64, RpcError>;
async fn airdrop_lamports(&mut self, to: &Pubkey, lamports: u64)
-> Result<Signature, RpcError>;

fn airdrop_lamports(
async fn get_anchor_account<T: AnchorDeserialize>(
&mut self,
to: &Pubkey,
lamports: u64,
) -> impl std::future::Future<Output = Result<Signature, RpcError>> + Send;

fn get_anchor_account<'a, T: AnchorDeserialize + 'static>(
&'a mut self,
pubkey: &'a Pubkey,
) -> impl std::future::Future<Output = Result<Option<T>, RpcError>> + Send + 'a {
async move {
match self.get_account(*pubkey).await? {
Some(account) => {
let data = T::deserialize(&mut &account.data[8..]).map_err(RpcError::from)?;
Ok(Some(data))
}
None => Ok(None),
pubkey: &Pubkey,
) -> Result<Option<T>, RpcError> {
match self.get_account(*pubkey).await? {
Some(account) => {
let data = T::deserialize(&mut &account.data[8..]).map_err(RpcError::from)?;
Ok(Some(data))
}
None => Ok(None),
}
}

fn get_balance(
&mut self,
pubkey: &Pubkey,
) -> impl std::future::Future<Output = Result<u64, RpcError>> + Send;

fn get_latest_blockhash(
&mut self,
) -> impl std::future::Future<Output = Result<Hash, RpcError>> + Send;

fn get_slot(&mut self) -> impl std::future::Future<Output = Result<u64, RpcError>> + Send;

fn warp_to_slot(&mut self, _slot: Slot) -> Result<(), RpcError> {
unimplemented!()
}

fn get_epoch_info(&self) -> Result<EpochInfo, RpcError> {
unimplemented!()
}

fn send_transaction(
&self,
transaction: &Transaction,
) -> impl std::future::Future<Output = Result<Signature, RpcError>> + Send;

fn get_url(&self) -> String;
async fn get_balance(&mut self, pubkey: &Pubkey) -> Result<u64, RpcError>;
async fn get_latest_blockhash(&mut self) -> Result<Hash, RpcError>;
async fn get_slot(&mut self) -> Result<u64, RpcError>;
async fn warp_to_slot(&mut self, slot: Slot) -> Result<(), RpcError>;
async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError>;
}
Loading

0 comments on commit c7491d0

Please sign in to comment.