diff --git a/Cargo.lock b/Cargo.lock index 72e0442f1..6bcde5bb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11226,6 +11226,7 @@ dependencies = [ "async-trait", "futures 0.3.28", "mockall 0.8.3", + "mocktopus", "parity-scale-codec", "rand 0.8.5", "reqwest", diff --git a/clients/vault/src/replace.rs b/clients/vault/src/replace.rs index 997040115..8a698217c 100644 --- a/clients/vault/src/replace.rs +++ b/clients/vault/src/replace.rs @@ -189,7 +189,7 @@ pub async fn handle_replace_request< &event.old_vault_id, event.amount, 0, // do not lock any additional collateral - wallet.get_public_key_raw(), + wallet.public_key_raw(), ) .await?) } diff --git a/clients/vault/src/system.rs b/clients/vault/src/system.rs index 0bccc2d4e..121fcdaa6 100644 --- a/clients/vault/src/system.rs +++ b/clients/vault/src/system.rs @@ -24,7 +24,7 @@ use runtime::{ }; use service::{wait_or_shutdown, Error as ServiceError, MonitoringConfig, Service}; use stellar_relay_lib::{sdk::PublicKey, StellarOverlayConfig}; -use wallet::{LedgerTxEnvMap, StellarWallet}; +use wallet::{LedgerTxEnvMap, StellarWallet, RESUBMISSION_INTERVAL_IN_SECS}; use crate::{ cancellation::ReplaceCanceller, @@ -704,7 +704,7 @@ impl VaultService { let stellar_wallet = StellarWallet::from_secret_encoded(&secret_key, is_public_network)?; tracing::debug!( "Vault wallet public key: {}", - from_utf8(&stellar_wallet.get_public_key().to_encoding())? + from_utf8(&stellar_wallet.public_key().to_encoding())? ); let stellar_wallet = Arc::new(RwLock::new(stellar_wallet)); @@ -759,13 +759,13 @@ impl VaultService { self.vault_id_manager.fetch_vault_ids().await?; let wallet = self.stellar_wallet.write().await; - let vault_public_key = wallet.get_public_key(); + let vault_public_key = wallet.public_key(); let is_public_network = wallet.is_public_network(); // re-submit transactions in the cache - let _receivers = wallet.resubmit_transactions_from_cache().await; - //todo: handle errors from the receivers - + wallet + .start_periodic_resubmission_of_transactions_from_cache(RESUBMISSION_INTERVAL_IN_SECS) + .await; drop(wallet); let oracle_agent = self.create_oracle_agent(is_public_network).await?; @@ -806,7 +806,7 @@ impl VaultService { } if self.spacewalk_parachain.get_public_key().await?.is_none() { - let public_key = self.stellar_wallet.read().await.get_public_key(); + let public_key = self.stellar_wallet.read().await.public_key(); let pub_key_encoded = public_key.to_encoding(); tracing::info!( "Registering public key to the parachain...{}", diff --git a/clients/vault/tests/helper/helper.rs b/clients/vault/tests/helper/helper.rs index ca9f2bece..a5ad5a5fa 100644 --- a/clients/vault/tests/helper/helper.rs +++ b/clients/vault/tests/helper/helper.rs @@ -93,7 +93,7 @@ pub async fn register_vault_with_wallet( items: Vec<(&SpacewalkParachain, &VaultId, u128)>, ) -> u128 { let wallet_read = wallet.read().await; - let public_key = wallet_read.get_public_key(); + let public_key = wallet_read.public_key(); let vault_collateral = register_vault(public_key, items).await; diff --git a/clients/vault/tests/vault_integration_tests.rs b/clients/vault/tests/vault_integration_tests.rs index 5aea98813..17df04f52 100644 --- a/clients/vault/tests/vault_integration_tests.rs +++ b/clients/vault/tests/vault_integration_tests.rs @@ -94,7 +94,7 @@ async fn test_redeem_succeeds() { ), async { let wallet_read = user_wallet.read().await; - let address = wallet_read.get_public_key_raw(); + let address = wallet_read.public_key_raw(); drop(wallet_read); // We redeem half of what we issued let redeem_id = user_provider @@ -340,7 +340,7 @@ async fn test_cancel_scheduler_succeeds() { let wallet_read = vault_wallet.read().await; let issue_request_listener = vault::service::listen_for_issue_requests( new_vault_provider.clone(), - wallet_read.get_public_key(), + wallet_read.public_key(), issue_cancellation_event_tx.clone(), issue_set.clone(), memos_to_issue_ids.clone(), @@ -473,7 +473,7 @@ async fn test_issue_cancel_succeeds() { let memos_to_issue_ids = Arc::new(RwLock::new(IssueIdLookup::new())); let issue_filter = - IssueFilter::new(&vault_wallet.read().await.get_public_key()).expect("Invalid filter"); + IssueFilter::new(&vault_wallet.read().await.public_key()).expect("Invalid filter"); let issue_amount = upscaled_compatible_amount(100); let vault_collateral = get_required_vault_collateral_for_issue( @@ -489,7 +489,7 @@ async fn test_issue_cancel_succeeds() { .register_vault_with_public_key( &vault_id, vault_collateral, - vault_wallet.read().await.get_public_key_raw(), + vault_wallet.read().await.public_key_raw(), ) .await ); @@ -524,7 +524,7 @@ async fn test_issue_cancel_succeeds() { let wallet_read = vault_wallet.read().await; let service = join3( vault::service::listen_for_new_transactions( - wallet_read.get_public_key(), + wallet_read.public_key(), wallet_read.is_public_network(), slot_tx_env_map.clone(), issue_set.clone(), @@ -533,7 +533,7 @@ async fn test_issue_cancel_succeeds() { ), vault::service::listen_for_issue_requests( vault_provider.clone(), - wallet_read.get_public_key(), + wallet_read.public_key(), issue_event_tx, issue_set.clone(), memos_to_issue_ids.clone(), @@ -662,7 +662,7 @@ async fn test_automatic_issue_execution_succeeds() { .register_vault_with_public_key( &vault_id, vault_collateral, - wallet_read.get_public_key_raw(), + wallet_read.public_key_raw(), ) .await ); @@ -705,8 +705,8 @@ async fn test_automatic_issue_execution_succeeds() { }; let wallet_read = vault_wallet.read().await; - let issue_filter = - IssueFilter::new(&wallet_read.get_public_key()).expect("Invalid filter"); + let issue_filter = IssueFilter::new(&wallet_read.public_key()).expect("Invalid filter"); + let slot_tx_env_map = Arc::new(RwLock::new(HashMap::new())); let issue_set = Arc::new(RwLock::new(IssueRequestsMap::new())); @@ -714,7 +714,7 @@ async fn test_automatic_issue_execution_succeeds() { let (issue_event_tx, _issue_event_rx) = mpsc::channel::(16); let service = join3( vault::service::listen_for_new_transactions( - wallet_read.get_public_key(), + wallet_read.public_key(), wallet_read.is_public_network(), slot_tx_env_map.clone(), issue_set.clone(), @@ -723,7 +723,7 @@ async fn test_automatic_issue_execution_succeeds() { ), vault::service::listen_for_issue_requests( vault_provider.clone(), - wallet_read.get_public_key(), + wallet_read.public_key(), issue_event_tx, issue_set.clone(), memos_to_issue_ids.clone(), @@ -777,7 +777,7 @@ async fn test_automatic_issue_execution_succeeds_for_other_vault() { .register_vault_with_public_key( &vault1_id, vault_collateral, - wallet_read.get_public_key_raw(), + wallet_read.public_key_raw(), ) .await ); @@ -786,7 +786,7 @@ async fn test_automatic_issue_execution_succeeds_for_other_vault() { .register_vault_with_public_key( &vault2_id, vault_collateral, - wallet_read.get_public_key_raw(), + wallet_read.public_key_raw(), ) .await ); @@ -852,7 +852,7 @@ async fn test_automatic_issue_execution_succeeds_for_other_vault() { }; let wallet_read = vault_wallet.read().await; - let vault_account_public_key = wallet_read.get_public_key(); + let vault_account_public_key = wallet_read.public_key(); drop(wallet_read); let issue_filter = IssueFilter::new(&vault_account_public_key).expect("Invalid filter"); @@ -920,7 +920,7 @@ async fn test_execute_open_requests_succeeds() { .register_vault_with_public_key( &vault_id, vault_collateral, - wallet_read.get_public_key_raw(), + wallet_read.public_key_raw(), ) .await ); @@ -936,8 +936,8 @@ async fn test_execute_open_requests_succeeds() { .await; let wallet_read = user_wallet.read().await; - let address = wallet_read.get_public_key(); - let address_raw = wallet_read.get_public_key_raw(); + let address = wallet_read.public_key(); + let address_raw = wallet_read.public_key_raw(); drop(wallet_read); // Place redeem requests. 100_00000 is our minimum redeem amount with the current fee // settings defined in the chain spec @@ -1088,7 +1088,7 @@ async fn test_shutdown() { .register_vault_with_public_key( &sudo_vault_id, vault_collateral, - vault_wallet.read().await.get_public_key_raw(), + vault_wallet.read().await.public_key_raw(), ) .await ); @@ -1136,13 +1136,13 @@ async fn test_requests_with_incompatible_amounts_fail() { .await; let wallet_read = vault_wallet.read().await; - let address = wallet_read.get_public_key_raw(); + let address = wallet_read.public_key_raw(); assert_ok!( vault_provider .register_vault_with_public_key( &vault_id, vault_collateral, - wallet_read.get_public_key_raw() + wallet_read.public_key_raw() ) .await ); diff --git a/clients/wallet/Cargo.toml b/clients/wallet/Cargo.toml index 0cf386c0a..ec0011f08 100644 --- a/clients/wallet/Cargo.toml +++ b/clients/wallet/Cargo.toml @@ -28,3 +28,4 @@ primitives = { package = "spacewalk-primitives", path = "../../primitives"} [dev-dependencies] mockall = "0.8.1" serial_test = "0.9.0" +mocktopus = "0.8.0" diff --git a/clients/wallet/src/cache.rs b/clients/wallet/src/cache.rs index 16739859f..2a5d81b42 100644 --- a/clients/wallet/src/cache.rs +++ b/clients/wallet/src/cache.rs @@ -147,12 +147,14 @@ impl WalletStateStorage { } /// Removes a transaction from the local folder - pub fn remove_tx_envelope(&self, sequence: SequenceNumber) -> Result<(), Error> { + pub fn remove_tx_envelope(&self, sequence: SequenceNumber) { let full_file_path = format!("{}/{sequence}", self.txs_inner_dir()); - remove_file(&full_file_path).map_err(|e| { - tracing::error!("Failed to delete file: {:?}", e); - Error::cache_error_with_seq(CacheErrorKind::DeleteFileFailed, sequence) - }) + match remove_file(&full_file_path) { + Ok(_) => tracing::debug!("remove_tx_envelope(): Deleted file with sequence {sequence}"), + Err(e) => tracing::error!( + "remove_tx_envelope(): Failed to delete file with sequence {sequence}: {e:?}" + ), + } } #[allow(dead_code)] @@ -330,7 +332,7 @@ mod test { extract_tx_envelope_from_path, parse_xdr_string_to_vec_u8, Error, WalletStateStorage, }, error::CacheErrorKind, - test_helper::public_key_from_encoding, + mock::public_key_from_encoding, }; use primitives::{ stellar::{ @@ -477,10 +479,7 @@ mod test { let actual_tx = new_storage.get_tx_envelope(sequence).expect("a tx should be found"); assert_eq!(actual_tx, expected_tx); - assert!(new_storage.remove_tx_envelope(sequence).is_ok()); - - // removing a tx again will return an error. - assert_error(new_storage.remove_tx_envelope(sequence), CacheErrorKind::DeleteFileFailed); + new_storage.remove_tx_envelope(sequence); // let's remove the entire directory new_storage.remove_dir(); diff --git a/clients/wallet/src/error.rs b/clients/wallet/src/error.rs index f0aba23a0..3b4a45354 100644 --- a/clients/wallet/src/error.rs +++ b/clients/wallet/src/error.rs @@ -17,6 +17,7 @@ pub enum Error { title: String, status: StatusCode, reason: String, + result_code_op: Vec, envelope_xdr: Option, }, #[error("Could not parse string: {0}")] @@ -29,6 +30,9 @@ pub enum Error { #[error(transparent)] CacheError(CacheError), + #[error("Transaction resubmission failed: {0}")] + ResubmissionError(String), + #[error("Cannot send payment to self")] SelfPaymentError, } @@ -37,9 +41,7 @@ impl Error { pub fn is_recoverable(&self) -> bool { match self { Error::HorizonResponseError(e) if e.is_timeout() => true, - Error::HorizonSubmissionError { title: _, status, reason: _, envelope_xdr: _ } - if *status == 504 => - true, + Error::HorizonSubmissionError { status, .. } if *status == 504 => true, Error::CacheError(e) => match e.kind { CacheErrorKind::CreateDirectoryFailed | CacheErrorKind::FileCreationFailed | @@ -58,8 +60,7 @@ impl Error { match self { Error::HorizonResponseError(e) => e.status().map(|code| server_errors.contains(&code.as_u16())).unwrap_or(false), - Error::HorizonSubmissionError { title: _, status, reason: _, envelope_xdr: _ } => - server_errors.contains(status), + Error::HorizonSubmissionError { status, .. } => server_errors.contains(status), _ => false, } } @@ -99,9 +100,9 @@ impl Error { #[derive(Error, PartialEq, Eq)] pub struct CacheError { pub(crate) kind: CacheErrorKind, - path: Option, - envelope: Option, - sequence_number: Option, + pub(crate) path: Option, + pub(crate) envelope: Option, + pub(crate) sequence_number: Option, } impl Display for CacheError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { diff --git a/clients/wallet/src/horizon/horizon.rs b/clients/wallet/src/horizon/horizon.rs index 855d66bf2..81bb9795c 100644 --- a/clients/wallet/src/horizon/horizon.rs +++ b/clients/wallet/src/horizon/horizon.rs @@ -172,7 +172,13 @@ impl HorizonClient for reqwest::Client { continue }, - Err(Error::HorizonSubmissionError { title, status, reason, envelope_xdr }) => { + Err(Error::HorizonSubmissionError { + title, + status, + reason, + result_code_op, + envelope_xdr, + }) => { tracing::error!("submitting transaction with seq no: {seq_no:?}: failed with {title}, {reason}"); tracing::debug!("submitting transaction with seq no: {seq_no:?}: the envelope: {envelope_xdr:?}"); let envelope_xdr = envelope_xdr.or(Some(transaction_xdr.to_string())); @@ -181,6 +187,7 @@ impl HorizonClient for reqwest::Client { title, status, reason, + result_code_op, envelope_xdr, }) }, diff --git a/clients/wallet/src/horizon/responses.rs b/clients/wallet/src/horizon/responses.rs index 599281e58..3a902e76a 100644 --- a/clients/wallet/src/horizon/responses.rs +++ b/clients/wallet/src/horizon/responses.rs @@ -9,7 +9,7 @@ use primitives::{ types::{ Memo, OperationResult, SequenceNumber, TransactionResult, TransactionResultResult, }, - Asset, TransactionEnvelope, XdrCodec, + Asset, PublicKey, TransactionEnvelope, XdrCodec, }, MemoTypeExt, TextMemo, }; @@ -73,6 +73,7 @@ pub(crate) async fn interpret_response( title: title.to_string(), status, reason: detail.to_string(), + result_code_op: vec![], envelope_xdr: Some(envelope_xdr.to_string()), } }, @@ -93,7 +94,8 @@ pub(crate) async fn interpret_response( Error::HorizonSubmissionError { title: title.to_string(), status, - reason: format!("{result_code_tx}: {result_code_op:?}"), + reason: result_code_tx.to_string(), + result_code_op, envelope_xdr: Some(envelope_xdr.to_string()), } }, @@ -106,6 +108,7 @@ pub(crate) async fn interpret_response( title: title.to_string(), status, reason: detail.to_string(), + result_code_op: vec![], envelope_xdr: None, } }, @@ -251,6 +254,10 @@ impl TransactionResponse { res.parse::().map_err(|_| Error::DecodeError) } + pub fn source_account(&self) -> Result { + PublicKey::from_encoding(&self.source_account).map_err(|_| Error::DecodeError) + } + pub fn get_successful_operations_result(&self) -> Result, Error> { if let TransactionResultResult::TxSuccess(res) = TransactionResult::from_base64_xdr(&self.result_xdr) @@ -268,7 +275,7 @@ impl TransactionResponse { } } -#[derive(Deserialize, Debug)] +#[derive(Deserialize)] pub struct HorizonAccountResponse { #[serde(deserialize_with = "de_string_to_bytes")] pub id: Vec, @@ -280,6 +287,17 @@ pub struct HorizonAccountResponse { // ... } +impl Debug for HorizonAccountResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HorizonAccountResponse") + .field("id", &debug_str_or_vec_u8!(&self.id)) + .field("account_id", &debug_str_or_vec_u8!(&self.account_id)) + .field("sequence", &self.sequence) + .field("balances", &self.balances) + .finish() + } +} + impl HorizonAccountResponse { pub fn is_trustline_exist(&self, asset: &Asset) -> bool { for balance in &self.balances { diff --git a/clients/wallet/src/horizon/tests.rs b/clients/wallet/src/horizon/tests.rs index c20b210ad..3b0cade97 100644 --- a/clients/wallet/src/horizon/tests.rs +++ b/clients/wallet/src/horizon/tests.rs @@ -6,7 +6,7 @@ use crate::{ responses::{HorizonClaimableBalanceResponse, TransactionResponse}, traits::HorizonClient, }, - test_helper::secret_key_from_encoding, + mock::secret_key_from_encoding, }; use mockall::predicate::*; use primitives::stellar::{ diff --git a/clients/wallet/src/lib.rs b/clients/wallet/src/lib.rs index 30a04eac4..d47a1087b 100644 --- a/clients/wallet/src/lib.rs +++ b/clients/wallet/src/lib.rs @@ -13,28 +13,12 @@ mod stellar_wallet; mod task; pub mod types; -pub use types::{LedgerTxEnvMap, Slot}; - -pub type TransactionsResponseIter = horizon::responses::TransactionsResponseIter; - #[cfg(test)] -pub mod test_helper { - use primitives::{ - stellar::{Asset, PublicKey, SecretKey}, - CurrencyId, - }; +pub(crate) mod mock; - pub const USDC_ISSUER: &str = "GAKNDFRRWA3RPWNLTI3G4EBSD3RGNZZOY5WKWYMQ6CQTG3KIEKPYWAYC"; - pub fn default_usdc_asset() -> Asset { - let asset = CurrencyId::try_from(("USDC", USDC_ISSUER)).expect("should convert ok"); - asset.try_into().expect("should convert to Asset") - } +mod resubmissions; - pub fn public_key_from_encoding>(encoded_key: T) -> PublicKey { - PublicKey::from_encoding(encoded_key).expect("should return a public key") - } +pub use resubmissions::*; +pub use types::{LedgerTxEnvMap, Slot}; - pub fn secret_key_from_encoding>(encoded_key: T) -> SecretKey { - SecretKey::from_encoding(encoded_key).expect("should return a secret key") - } -} +pub type TransactionsResponseIter = horizon::responses::TransactionsResponseIter; diff --git a/clients/wallet/src/mock.rs b/clients/wallet/src/mock.rs new file mode 100644 index 000000000..d085b2a66 --- /dev/null +++ b/clients/wallet/src/mock.rs @@ -0,0 +1,153 @@ +use crate::{ + error::Error, + horizon::HorizonClient, + operations::{ + create_basic_spacewalk_stellar_transaction, create_payment_operation, + redeem_request_tests::create_account_merge_operation, AppendExt, + }, + StellarWallet, TransactionResponse, +}; +use primitives::{ + stellar::{ + types::SequenceNumber, Asset as StellarAsset, PublicKey, SecretKey, TransactionEnvelope, + }, + CurrencyId, StellarStroops, +}; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub const DEFAULT_DEST_PUBLIC_KEY: &str = + "GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335X2KGX3IHOJAPP5RE34K4KZVN"; +pub const STELLAR_VAULT_SECRET_KEY: &str = + "SCV7RZN5XYYMMVSWYCR4XUMB76FFMKKKNHP63UTZQKVM4STWSCIRLWFJ"; +pub const IS_PUBLIC_NETWORK: bool = false; + +pub const DEFAULT_STROOP_FEE_PER_OPERATION: u32 = 100; + +impl StellarWallet { + pub async fn is_account_exist(&self) -> bool { + self.client + .get_account(self.public_key(), self.is_public_network()) + .await + .is_ok() + } + + /// merges the wallet's account with the specified destination. + /// Exercise prudence when using this method, as it automatically removes the source account + /// once operation is successful. + pub async fn merge_account( + &mut self, + destination_address: PublicKey, + ) -> Result { + let account_merge_op = + create_account_merge_operation(destination_address, self.public_key())?; + + self.send_to_address([9u8; 32], DEFAULT_STROOP_FEE_PER_OPERATION, vec![account_merge_op]) + .await + } + + pub fn create_payment_envelope( + &self, + destination_address: PublicKey, + asset: StellarAsset, + stroop_amount: StellarStroops, + request_id: [u8; 32], + stroop_fee_per_operation: u32, + next_sequence_number: SequenceNumber, + ) -> Result { + let public_key = self.public_key(); + // create payment operation + let payment_op = create_payment_operation( + destination_address, + asset, + stroop_amount, + public_key.clone(), + )?; + + self.create_envelope( + request_id, + stroop_fee_per_operation, + next_sequence_number, + vec![payment_op], + ) + } + + pub fn create_payment_envelope_no_signature( + &self, + destination_address: PublicKey, + asset: StellarAsset, + stroop_amount: StellarStroops, + request_id: [u8; 32], + stroop_fee_per_operation: u32, + next_sequence_number: SequenceNumber, + ) -> Result { + let public_key = self.public_key(); + // create payment operation + let payment_op = create_payment_operation( + destination_address, + asset, + stroop_amount, + public_key.clone(), + )?; + + // create the transaction + let mut transaction = create_basic_spacewalk_stellar_transaction( + request_id, + stroop_fee_per_operation, + public_key, + next_sequence_number, + )?; + + transaction.append(payment_op)?; + + Ok(transaction.into_transaction_envelope()) + } + + pub async fn create_dummy_envelope_no_signature( + &self, + stroop_amount: StellarStroops, + ) -> Result { + let sequence = self.get_sequence().await?; + self.create_payment_envelope_no_signature( + default_destination(), + StellarAsset::native(), + stroop_amount, + rand::random(), + DEFAULT_STROOP_FEE_PER_OPERATION, + sequence + 1, + ) + } +} + +pub fn wallet_with_storage(storage: &str) -> Result>, Error> { + wallet_with_secret_key_for_storage(storage, STELLAR_VAULT_SECRET_KEY) +} + +pub fn wallet_with_secret_key_for_storage( + storage: &str, + secret_key: &str, +) -> Result>, Error> { + Ok(Arc::new(RwLock::new(StellarWallet::from_secret_encoded_with_cache( + secret_key, + IS_PUBLIC_NETWORK, + storage.to_string(), + )?))) +} + +pub fn default_destination() -> PublicKey { + public_key_from_encoding(DEFAULT_DEST_PUBLIC_KEY) +} + +pub const USDC_ISSUER: &str = "GAKNDFRRWA3RPWNLTI3G4EBSD3RGNZZOY5WKWYMQ6CQTG3KIEKPYWAYC"; +pub fn default_usdc_asset() -> StellarAsset { + let asset = CurrencyId::try_from(("USDC", USDC_ISSUER)).expect("should convert ok"); + asset.try_into().expect("should convert to Asset") +} + +pub fn public_key_from_encoding>(encoded_key: T) -> PublicKey { + PublicKey::from_encoding(encoded_key).expect("should return a public key") +} + +pub fn secret_key_from_encoding>(encoded_key: T) -> SecretKey { + SecretKey::from_encoding(encoded_key).expect("should return a secret key") +} diff --git a/clients/wallet/src/operations.rs b/clients/wallet/src/operations.rs index 24d4a4022..a62613e5b 100644 --- a/clients/wallet/src/operations.rs +++ b/clients/wallet/src/operations.rs @@ -93,9 +93,7 @@ pub trait RedeemOperationsExt: HorizonClient { // if account exists and NO trustline, use claimable balance operation Ok(_) => claimable_balance_operation(), // if INactive account... - Err(Error::HorizonSubmissionError { title: _, status, reason: _, envelope_xdr: _ }) - if status == 404 => - { + Err(Error::HorizonSubmissionError { status, .. }) if status == 404 => { let to_be_redeemed_amount_u128 = stellar_stroops_to_u128(to_be_redeemed_amount); // ... and redeeming amount >= 1 XLM, use create account operation @@ -184,9 +182,7 @@ pub fn create_basic_spacewalk_stellar_transaction( #[cfg(test)] pub mod redeem_request_tests { use super::*; - use crate::test_helper::{ - default_usdc_asset, public_key_from_encoding, secret_key_from_encoding, - }; + use crate::mock::{default_usdc_asset, public_key_from_encoding, secret_key_from_encoding}; use primitives::{stellar::SecretKey, CurrencyId}; const INACTIVE_STELLAR_SECRET_KEY: &str = diff --git a/clients/wallet/src/resubmissions.rs b/clients/wallet/src/resubmissions.rs new file mode 100644 index 000000000..0d62a5b4d --- /dev/null +++ b/clients/wallet/src/resubmissions.rs @@ -0,0 +1,729 @@ +use std::sync::Arc; + +use crate::{ + error::{ + CacheError, CacheErrorKind, Error, + Error::{DecodeError, ResubmissionError}, + }, + StellarWallet, TransactionResponse, +}; +use primitives::{ + stellar::{Memo, Transaction, TransactionEnvelope, XdrCodec}, + TransactionEnvelopeExt, +}; +use std::time::Duration; +use tokio::time::sleep; + +#[cfg(test)] +use mocktopus::macros::mockable; + +pub const RESUBMISSION_INTERVAL_IN_SECS: u64 = 1800; + +const MAX_LOOK_BACK_PAGES: u8 = 10; + +#[cfg_attr(test, mockable)] +impl StellarWallet { + pub async fn start_periodic_resubmission_of_transactions_from_cache( + &self, + interval_in_seconds: u64, + ) { + // Perform the resubmission + self._resubmit_transactions_from_cache().await; + + // The succeeding resubmission will be done on intervals + // Clone self to use this in another thread + let me = Arc::new(self.clone()); + // Spawn a thread to resubmit envelopes from cache + tokio::spawn(async move { + let me_clone = Arc::clone(&me); + loop { + // Loops every 30 minutes or 1800 seconds + pause_process_in_secs(interval_in_seconds).await; + + me_clone._resubmit_transactions_from_cache().await; + } + }); + } + + #[doc(hidden)] + /// Submits transactions found in the wallet's cache to Stellar. + async fn _resubmit_transactions_from_cache(&self) { + let _ = self.transaction_submission_lock.lock().await; + + // Collect envelopes from cache + let envelopes = match self.get_tx_envelopes_from_cache() { + Ok((envs, errors)) => { + // Log those with errors. + if !errors.is_empty() { + tracing::warn!( + "_resubmit_transactions_from_cache(): errors from cache: {errors:?}" + ); + } + envs + }, + Err(errors) => { + tracing::warn!( + "_resubmit_transactions_from_cache(): errors from cache: {errors:?}" + ); + return + }, + }; + + // to prevent `error[E0434]: can't capture dynamic environment in a fn item`, + // use a closure instead + let submit = + |envelope: TransactionEnvelope| async { self.submit_transaction(envelope).await }; + + // there's nothing to resubmit + if envelopes.is_empty() { + return + } + tracing::info!( + "_resubmit_transactions_from_cache(): resubmitting {:?} envelopes in cache...", + envelopes.len() + ); + + let mut error_collector = vec![]; + // loop through the envelopes and resubmit each one + for envelope in envelopes { + if let Err(e) = submit(envelope.clone()).await { + tracing::debug!("_resubmit_transactions_from_cache(): encountered error: {e:?}"); + // save the kind of error and the envelope that failed + error_collector.push((e, envelope)); + } + } + + // a few errors happened and must be handled. + if !error_collector.is_empty() { + // clone self, to use in another thread + let me = Arc::new(self.clone()); + tokio::spawn(async move { + me.handle_errors(error_collector).await; + }); + } + } + + #[doc(hidden)] + /// Handle all errors + /// + /// # Arguments + /// * `errors` - a list of a tuple containing the error and its transaction envelope + async fn handle_errors( + &self, + #[allow(unused_mut)] mut errors: Vec<(Error, TransactionEnvelope)>, + ) { + while let Some((error, env)) = errors.pop() { + // handle the error + match self.handle_error(error).await { + // a new kind of error occurred. Process it on the next loop. + Err(e) => { + tracing::error!("handle_errors(): new error occurred: {e:?}"); + + // push the transaction that failed, and the corresponding error + errors.push((e, env)); + }, + + // Resubmission failed for this Transaction Envelope and it's a non-recoverable + // error Remove from cache + Ok(None) => self.remove_tx_envelope_from_cache(&env), + + // Resubmission was successful + Ok(Some(resp)) => + tracing::debug!("handle_errors(): successfully processed envelope: {resp:?}"), + } + } + } + + /// Returns: + /// * `TransactionResponse` for successful resubmission; + /// * None for errors that cannot be resubmitted; + /// * An error that can potentially be resubmitted again + /// + /// This function determines whether an error is up for resubmission or not: + /// `tx_bad_seq` or `SequenceNumberAlreadyUsed` can be resubmitted by updating the sequence + /// number `tx_internal_error` should be resubmitted again + /// other errors must be logged and removed from cache. + async fn handle_error(&self, error: Error) -> Result, Error> { + match &error { + Error::HorizonSubmissionError { reason, envelope_xdr, .. } => match &reason[..] { + "tx_bad_seq" => + return self.handle_tx_bad_seq_error_with_xdr(envelope_xdr).await.map(Some), + "tx_internal_error" => + return self.handle_tx_internal_error(envelope_xdr).await.map(Some), + _ => { + if let Ok(env) = decode_to_envelope(envelope_xdr) { + self.remove_tx_envelope_from_cache(&env); + }; + + tracing::error!( + "handle_error(): Unrecoverable HorizonSubmissionError: {error:?}" + ); + }, + }, + Error::CacheError(CacheError { + kind: CacheErrorKind::SequenceNumberAlreadyUsed, + envelope, + .. + }) => { + if let Some(transaction_envelope) = envelope { + return self + .handle_tx_bad_seq_error_with_envelope(transaction_envelope.clone()) + .await + .map(Some) + } + + tracing::warn!("handle_error(): SequenceNumberAlreadyUsed error but no envelope"); + }, + _ => tracing::warn!("handle_error(): Unrecoverable error in Stellar wallet: {error:?}"), + } + + // the error found is not recoverable, and cannot be resubmitted again. + Ok(None) + } + + // We encountered an unknown error and try submitting the transaction again as is + async fn handle_tx_internal_error( + &self, + envelope_xdr_as_str_opt: &Option, + ) -> Result { + let mut envelope = decode_to_envelope(envelope_xdr_as_str_opt)?; + self.sign_envelope(&mut envelope)?; + + self.submit_transaction(envelope).await + } +} + +// handle tx_bad_seq +#[cfg_attr(test, mockable)] +impl StellarWallet { + async fn handle_tx_bad_seq_error_with_xdr( + &self, + envelope_xdr_as_str_opt: &Option, + ) -> Result { + let tx_envelope = decode_to_envelope(envelope_xdr_as_str_opt)?; + self.handle_tx_bad_seq_error_with_envelope(tx_envelope).await + } + + async fn handle_tx_bad_seq_error_with_envelope( + &self, + tx_envelope: TransactionEnvelope, + ) -> Result { + let tx = tx_envelope.get_transaction().ok_or(DecodeError)?; + + // Check if we already submitted this transaction + if !self.is_transaction_already_submitted(&tx).await { + // Remove original transaction. + // The same envelope will be saved again using a different sequence number + self.remove_tx_envelope_from_cache(&tx_envelope); + + return self.bump_sequence_number_and_submit(tx).await + } + + tracing::error!("handle_tx_bad_seq_error_with_envelope(): Similar transaction already submitted. Skipping {:?}", tx); + + Err(ResubmissionError("Transaction already submitted".to_string())) + } + + async fn bump_sequence_number_and_submit( + &self, + tx: Transaction, + ) -> Result { + let sequence_number = self.get_sequence().await?; + let mut updated_tx = tx.clone(); + updated_tx.seq_num = sequence_number + 1; + + let old_tx_xdr = tx.to_base64_xdr(); + let old_tx = String::from_utf8(old_tx_xdr.clone()).unwrap_or(format!("{old_tx_xdr:?}")); + tracing::trace!("bump_sequence_number_and_submit(): old transaction: {old_tx}"); + + let updated_tx_xdr = updated_tx.to_base64_xdr(); + let updated_tx_xdr = + String::from_utf8(updated_tx_xdr.clone()).unwrap_or(format!("{updated_tx_xdr:?}")); + tracing::trace!("bump_sequence_number_and_submit(): new transaction: {updated_tx_xdr}"); + + let envelope = self.create_and_sign_envelope(updated_tx)?; + self.submit_transaction(envelope).await + } + + /// This function iterates over all transactions of an account to see if a similar transaction + /// i.e. a transaction containing the same memo was already submitted previously. + /// TODO: This operation is very costly and we should try to optimize it in the future. + async fn is_transaction_already_submitted(&self, tx: &Transaction) -> bool { + // loop through until the 10th page + let mut remaining_page = 0; + let own_public_key = self.public_key(); + + while let Ok(transaction) = self.get_all_transactions_iter().await { + if remaining_page == MAX_LOOK_BACK_PAGES { + break + } + + for response in transaction.records { + // Make sure that we are the sender and not the receiver because otherwise an + // attacker could send a transaction to us with the target memo and we'd wrongly + // assume that we already submitted this transaction. + match response.source_account() { + // no source account was found; move on to the next response + Err(_) => continue, + // the wallet's public key is not this response's source account; + // move on to the next response + Ok(source_account) if !source_account.eq(&own_public_key) => continue, + _ => {}, + } + + // Check that the transaction contains the memo that we want to send. + if let Some(response_memo) = response.memo_text() { + let Memo::MemoText(tx_memo) = &tx.memo else { + continue + }; + + if are_memos_eq(response_memo, tx_memo.get_vec()) { + return true + } + } + } + + remaining_page += 1; + } + + // We did not find a transaction that matched our criteria + false + } +} + +#[cfg_attr(test, mockable)] +fn are_memos_eq(memo1: &Vec, memo2: &Vec) -> bool { + memo1 == memo2 +} + +#[cfg_attr(test, mockable)] +async fn pause_process_in_secs(in_secs: u64) { + sleep(Duration::from_secs(in_secs)).await; +} + +fn decode_to_envelope( + envelope_xdr_as_str_opt: &Option, +) -> Result { + let Some(envelope_xdr) = envelope_xdr_as_str_opt else { + tracing::warn!("handle_error(): no envelope_xdr found"); + return Err(ResubmissionError("no envelope_xdr".to_string())) + }; + + TransactionEnvelope::from_base64_xdr(envelope_xdr).map_err(|_| DecodeError) +} + +#[cfg(test)] +mod test { + use crate::{ + error::Error, mock::*, operations::create_basic_spacewalk_stellar_transaction, + resubmissions::pause_process_in_secs, StellarWallet, + }; + use mocktopus::mocking::{MockResult, Mockable}; + use primitives::{ + stellar::{types::AlphaNum4, Asset as StellarAsset, TransactionEnvelope, XdrCodec}, + TransactionEnvelopeExt, + }; + use serial_test::serial; + + #[tokio::test] + #[serial] + async fn check_is_transaction_already_submitted() { + let wallet = wallet_with_storage("resources/check_is_transaction_already_submitted") + .expect("") + .clone(); + let mut wallet = wallet.write().await; + + let asset = StellarAsset::native(); + let amount = 1002; + + // test is_transaction_already_submitted returns true + { + let response = wallet + .send_payment_to_address( + default_destination(), + asset.clone(), + amount, + rand::random(), + DEFAULT_STROOP_FEE_PER_OPERATION, + false, + ) + .await + .expect("should be ok"); + + let tx_envelope = TransactionEnvelope::from_base64_xdr(response.envelope_xdr) + .expect("should return an envelope"); + let tx = tx_envelope.get_transaction().expect("should return a transaction"); + + // check that the transaction truly exists + assert!(wallet.is_transaction_already_submitted(&tx).await); + } + + // test is_transaction_already_submitted returns false + { + let dummy_tx = create_basic_spacewalk_stellar_transaction( + rand::random(), + DEFAULT_STROOP_FEE_PER_OPERATION, + wallet.public_key(), + 1, + ) + .expect("return a transaction"); + + assert!(!wallet.is_transaction_already_submitted(&dummy_tx).await); + } + + wallet.remove_cache_dir(); + } + + #[tokio::test] + #[serial] + async fn check_bump_sequence_number_and_submit() { + let wallet = wallet_with_storage("resources/check_bump_sequence_number_and_submit") + .expect("should return a wallet") + .clone(); + let wallet = wallet.write().await; + let seq = wallet.get_sequence().await.expect("return sequence number"); + + let asset = StellarAsset::native(); + let amount = 1002; + + // test bump_sequence_number_and_submit success + { + let dummy_envelope = wallet + .create_payment_envelope( + default_destination(), + asset.clone(), + amount, + rand::random(), + DEFAULT_STROOP_FEE_PER_OPERATION, + seq - 5, + ) + .expect("should return an envelope"); + + let dummy_transaction = + dummy_envelope.get_transaction().expect("must return a transaction"); + + let _ = wallet + .bump_sequence_number_and_submit(dummy_transaction.clone()) + .await + .expect("return ok"); + + assert!(wallet.is_transaction_already_submitted(&dummy_transaction).await); + } + + // test bump_sequence_number_and_submit failed + { + let dummy_envelope = wallet + .create_payment_envelope( + default_destination(), + asset, + amount, + rand::random(), + DEFAULT_STROOP_FEE_PER_OPERATION, + seq, + ) + .expect("should return an envelope"); + let dummy_tx = dummy_envelope.get_transaction().expect("should return a tx"); + + StellarWallet::sign_envelope + .mock_safe(move |_, _| MockResult::Return(Err(Error::SignEnvelopeError))); + + match wallet.bump_sequence_number_and_submit(dummy_tx).await { + Err(Error::SignEnvelopeError) => assert!(true), + other => panic!("expecting Error::SignEnvelopeError, found: {other:?}"), + }; + } + + wallet.remove_cache_dir(); + } + + #[tokio::test] + #[serial] + async fn check_handle_tx_bad_seq_error_with_envelope() { + let wallet = wallet_with_storage("resources/check_handle_tx_bad_seq_error_with_envelope") + .expect("should return a wallet") + .clone(); + let wallet = wallet.write().await; + + let dummy_envelope = wallet + .create_dummy_envelope_no_signature(1003) + .await + .expect("should return an envelope"); + + // test handle_tx_bad_seq_error_with_envelope is success + { + StellarWallet::is_transaction_already_submitted + .mock_safe(move |_, _| MockResult::Return(Box::pin(async move { false }))); + + assert!(wallet + .handle_tx_bad_seq_error_with_envelope(dummy_envelope.clone()) + .await + .is_ok()); + } + + // test handle_tx_bad_seq_error_with_envelope is fail + { + StellarWallet::is_transaction_already_submitted + .mock_safe(move |_, _| MockResult::Return(Box::pin(async move { true }))); + + match wallet.handle_tx_bad_seq_error_with_envelope(dummy_envelope).await { + Err(Error::ResubmissionError(_)) => assert!(true), + other => panic!("expecting Error::ResubmissionError, found: {other:?}"), + } + } + + wallet.remove_cache_dir(); + } + + #[tokio::test] + #[serial] + async fn check_handle_tx_internal_error() { + let wallet = wallet_with_storage("resources/check_handle_tx_internal_error") + .expect("should return a wallet") + .clone(); + let wallet = wallet.write().await; + let sequence = wallet.get_sequence().await.expect("return a sequence"); + + let envelope = wallet + .create_payment_envelope_no_signature( + default_destination(), + StellarAsset::native(), + 100, + rand::random(), + DEFAULT_STROOP_FEE_PER_OPERATION, + sequence + 1, + ) + .expect("should return an envelope"); + + let envelope_xdr = envelope.to_base64_xdr(); + // Convert vec to string (because the HorizonSubmissionError always returns a string) + let envelope_xdr = + Some(String::from_utf8(envelope_xdr).expect("should create string from vec")); + + // test handle_tx_internal_error is success + if let Err(e) = wallet.handle_tx_internal_error(&envelope_xdr).await { + panic!("expect a success, found error: {e:?}"); + }; + + // test handle_tx_internal_error is fail + { + StellarWallet::submit_transaction.mock_safe(move |_, _| { + MockResult::Return(Box::pin(async move { Err(Error::DecodeError) })) + }); + + match wallet.handle_tx_internal_error(&envelope_xdr).await { + Err(Error::DecodeError) => assert!(true), + other => { + panic!("expect an Error::DecodeError, found: {other:?}"); + }, + } + } + + wallet.remove_cache_dir(); + } + + #[tokio::test] + #[serial] + async fn check_handle_error() { + let wallet = wallet_with_storage("resources/check_handle_error") + .expect("should return a wallet") + .clone(); + let wallet = wallet.write().await; + + // tx_bad_seq test + { + let envelope = wallet + .create_dummy_envelope_no_signature(1010) + .await + .expect("returns an envelope"); + let envelope_xdr = envelope.to_base64_xdr(); + let envelope_xdr = String::from_utf8(envelope_xdr).ok(); + + // result is success + { + let error = Error::HorizonSubmissionError { + title: "title".to_string(), + status: 400, + reason: "tx_bad_seq".to_string(), + result_code_op: vec![], + envelope_xdr, + }; + + StellarWallet::is_transaction_already_submitted + .mock_safe(move |_, _| MockResult::Return(Box::pin(async move { false }))); + + assert!(wallet.handle_error(error).await.is_ok()); + } + + // result is error + { + let error = Error::HorizonSubmissionError { + title: "title".to_string(), + status: 400, + reason: "tx_bad_seq".to_string(), + result_code_op: vec![], + envelope_xdr: None, + }; + + match wallet.handle_error(error).await { + Err(Error::ResubmissionError(_)) => assert!(true), + other => panic!("expecting Error::ResubmissionError, found: {other:?}"), + }; + } + } + + // tx_internal_error test + { + let envelope = wallet + .create_dummy_envelope_no_signature(1020) + .await + .expect("returns an envelope"); + let envelope_xdr = envelope.to_base64_xdr(); + let envelope_xdr = String::from_utf8(envelope_xdr).ok(); + + // result is success + { + let error = Error::HorizonSubmissionError { + title: "title".to_string(), + status: 400, + reason: "tx_internal_error".to_string(), + result_code_op: vec![], + envelope_xdr, + }; + + assert!(wallet.handle_error(error).await.is_ok()); + } + + // result is error + { + let error = Error::HorizonSubmissionError { + title: "title".to_string(), + status: 400, + reason: "tx_internal_error".to_string(), + result_code_op: vec![], + envelope_xdr: None, + }; + + match wallet.handle_error(error).await { + Err(Error::ResubmissionError(_)) => assert!(true), + other => panic!("expecting Error::ResubmissionError, found: {other:?}"), + }; + } + } + + // other error + { + let envelope = wallet + .create_dummy_envelope_no_signature(1010) + .await + .expect("returns an envelope"); + let envelope_xdr = envelope.to_base64_xdr(); + let envelope_xdr = String::from_utf8(envelope_xdr).ok(); + + let error = Error::HorizonSubmissionError { + title: "Transaction Failed".to_string(), + status: 400, + reason: "tx_bad_auth".to_string(), + result_code_op: vec![], + envelope_xdr, + }; + + match wallet.handle_error(error).await { + // `tx_bad_auth` is not recoverable so we expect `Ok(None)` + Ok(None) => assert!(true), + other => panic!("expect an Ok(None), found: {other:?}"), + } + } + + wallet.remove_cache_dir(); + } + + #[tokio::test] + #[serial] + async fn resubmit_transactions_works() { + let wallet = wallet_with_storage("resources/resubmit_transactions_works") + .expect("should return a wallet") + .clone(); + let wallet = wallet.write().await; + + let seq_number = wallet.get_sequence().await.expect("should return a sequence"); + + let mut asset_code: [u8; 4] = [0; 4]; + asset_code.copy_from_slice("EURO".as_bytes()); + + // creating a bad envelope + let non_recoverable_envelope = wallet + .create_payment_envelope_no_signature( + wallet.public_key(), + StellarAsset::AssetTypeCreditAlphanum4(AlphaNum4 { + asset_code, + issuer: default_destination(), + }), + 1001, + rand::random(), + DEFAULT_STROOP_FEE_PER_OPERATION, + seq_number + 2, + ) + .expect("should return an envelope"); + + // let's save this in storage + let _ = wallet + .save_tx_envelope_to_cache(non_recoverable_envelope.clone()) + .expect("should save."); + + // creating a bad (but recoverable) envelope + let recoverable_envelope = wallet + .create_payment_envelope( + default_destination(), + StellarAsset::native(), + 1100, + rand::random(), + DEFAULT_STROOP_FEE_PER_OPERATION, + seq_number + 10, + ) + .expect("should return an envelope"); + + // let's save this in storage + let _ = wallet + .save_tx_envelope_to_cache(recoverable_envelope.clone()) + .expect("should save."); + + // create a good envelope + let good_envelope = wallet + .create_payment_envelope( + default_destination(), + StellarAsset::native(), + 1002, + rand::random(), + DEFAULT_STROOP_FEE_PER_OPERATION, + seq_number + 1, + ) + .expect("should return an envelope"); + + // let's save this in storage + let _ = wallet.save_tx_envelope_to_cache(good_envelope.clone()).expect("should save."); + + StellarWallet::is_transaction_already_submitted + .mock_safe(move |_, _| MockResult::Return(Box::pin(async move { false }))); + + // let's resubmit these 3 transactions + let _ = wallet.start_periodic_resubmission_of_transactions_from_cache(60).await; + + // We wait until the whole cache is empty because eventually all transactions should be + // handled + pause_process_in_secs(10).await; + + loop { + let (txs, _) = wallet.get_tx_envelopes_from_cache().expect("return a tuple"); + + if txs.is_empty() { + assert_eq!( + wallet.get_sequence().await.expect("should return a sequence"), + seq_number + 2 + ); + break + } + } + + wallet.remove_cache_dir(); + } +} diff --git a/clients/wallet/src/stellar_wallet.rs b/clients/wallet/src/stellar_wallet.rs index 056f30a45..938d6d909 100644 --- a/clients/wallet/src/stellar_wallet.rs +++ b/clients/wallet/src/stellar_wallet.rs @@ -4,10 +4,10 @@ use std::{fmt::Formatter, sync::Arc}; use primitives::stellar::{ network::{Network, PUBLIC_NETWORK, TEST_NETWORK}, types::SequenceNumber, - Asset as StellarAsset, Operation, PublicKey, SecretKey, StellarTypeToString, + Asset as StellarAsset, Operation, PublicKey, SecretKey, StellarTypeToString, Transaction, TransactionEnvelope, }; -use tokio::sync::{oneshot, Mutex}; +use tokio::sync::Mutex; use crate::{ cache::WalletStateStorage, @@ -19,7 +19,6 @@ use crate::{ }; use crate::{ - error::CacheErrorKind, horizon::{responses::TransactionsResponseIter, DEFAULT_PAGE_SIZE}, operations::{ create_basic_spacewalk_stellar_transaction, create_payment_operation, AppendExt, @@ -29,6 +28,9 @@ use crate::{ }; use primitives::{StellarPublicKeyRaw, StellarStroops, TransactionEnvelopeExt}; +#[cfg(test)] +use mocktopus::macros::mockable; + #[derive(Clone)] pub struct StellarWallet { secret_key: SecretKey, @@ -37,7 +39,7 @@ pub struct StellarWallet { /// so that the transaction is not rejected due to an outdated sequence number. /// Releasing the lock ensures the sequence number of the account /// has been increased on the network. - transaction_submission_lock: Arc>, + pub(crate) transaction_submission_lock: Arc>, /// Used for caching Stellar transactions before they get submitted. /// Also used for caching the latest cursor to page through Stellar transactions in horizon cache: WalletStateStorage, @@ -49,41 +51,15 @@ pub struct StellarWallet { max_backoff_delay: u16, /// a client to connect to Horizon - client: reqwest::Client, + pub(crate) client: Client, } impl StellarWallet { /// if the user doesn't define the maximum number of retry attempts for 500 internal server /// error, this will be the default. - const DEFAULT_MAX_RETRY_ATTEMPTS_BEFORE_FALLBACK: u8 = 3; - - const DEFAULT_MAX_BACKOFF_DELAY_IN_SECS: u16 = 600; - - /// Returns a TransactionResponse after submitting transaction envelope to Stellar, - /// Else an Error. - async fn submit_transaction( - &self, - envelope: TransactionEnvelope, - ) -> Result { - let sequence = &envelope.sequence_number().ok_or(Error::cache_error_with_env( - CacheErrorKind::UnknownSequenceNumber, - envelope.clone(), - ))?; - - let submission_result = self - .client - .submit_transaction( - envelope.clone(), - self.is_public_network, - self.max_retry_attempts_before_fallback, - self.max_backoff_delay, - ) - .await; + pub(crate) const DEFAULT_MAX_RETRY_ATTEMPTS_BEFORE_FALLBACK: u8 = 3; - let _ = self.cache.remove_tx_envelope(*sequence); - - submission_result - } + pub(crate) const DEFAULT_MAX_BACKOFF_DELAY_IN_SECS: u16 = 600; } impl StellarWallet { @@ -147,16 +123,27 @@ impl StellarWallet { self } +} - pub fn get_public_key_raw(&self) -> StellarPublicKeyRaw { +// getters and other derivations +impl StellarWallet { + pub fn max_backoff_delay(&self) -> u16 { + self.max_backoff_delay + } + + pub fn max_retry_attempts_before_fallback(&self) -> u8 { + self.max_retry_attempts_before_fallback + } + + pub fn public_key_raw(&self) -> StellarPublicKeyRaw { self.secret_key.get_public().clone().into_binary() } - pub fn get_public_key(&self) -> PublicKey { + pub fn public_key(&self) -> PublicKey { self.secret_key.get_public().clone() } - pub fn get_secret_key(&self) -> SecretKey { + pub fn secret_key(&self) -> SecretKey { self.secret_key.clone() } @@ -164,14 +151,6 @@ impl StellarWallet { self.is_public_network } - pub fn get_last_cursor(&self) -> PagingToken { - self.cache.get_last_cursor() - } - - pub fn save_cursor(&self, paging_token: PagingToken) -> Result<(), Error> { - self.cache.save_cursor(paging_token) - } - /// Returns an iter for all transactions. /// This method is looking BACKWARDS, so the transactions are in DESCENDING order: /// starting from the LATEST ones, at the time of the call. @@ -182,7 +161,7 @@ impl StellarWallet { let transactions_response = horizon_client .get_account_transactions( - self.get_public_key(), + self.public_key(), self.is_public_network, 0, DEFAULT_PAGE_SIZE, @@ -198,76 +177,114 @@ impl StellarWallet { /// Returns the balances of this wallet's Stellar account pub async fn get_balances(&self) -> Result, Error> { - let account = - self.client.get_account(self.get_public_key(), self.is_public_network).await?; + let account = self.client.get_account(self.public_key(), self.is_public_network).await?; Ok(account.balances) } + + pub async fn get_sequence(&self) -> Result { + let account = self.client.get_account(self.public_key(), self.is_public_network).await?; + + Ok(account.sequence) + } +} + +// cache operations +impl StellarWallet { + pub fn last_cursor(&self) -> PagingToken { + self.cache.get_last_cursor() + } + + pub fn save_cursor(&self, paging_token: PagingToken) -> Result<(), Error> { + self.cache.save_cursor(paging_token) + } + + #[doc(hidden)] + #[cfg(any(test, feature = "testing-utils"))] + pub fn remove_cache_dir(&self) { + self.cache.remove_dir() + } + + #[doc(hidden)] + #[cfg(any(test, feature = "testing-utils"))] + pub fn remove_tx_envelopes_from_cache(&self) { + self.cache.remove_all_tx_envelopes() + } + + pub fn get_tx_envelopes_from_cache( + &self, + ) -> Result<(Vec, Vec), Vec> { + self.cache.get_tx_envelopes() + } + + pub fn remove_tx_envelope_from_cache(&self, tx_envelope: &TransactionEnvelope) { + if let Some(sequence) = tx_envelope.sequence_number() { + return self.cache.remove_tx_envelope(sequence) + } + + tracing::warn!("remove_tx_envelope_from_cache(): cannot find sequence number in transaction envelope: {tx_envelope:?}"); + } + + pub fn save_tx_envelope_to_cache(&self, tx_envelope: TransactionEnvelope) -> Result<(), Error> { + self.cache.save_tx_envelope(tx_envelope) + } } // send/submit functions of StellarWallet +#[cfg_attr(test, mockable)] impl StellarWallet { - /// Submits transactions found in the wallet's cache to Stellar. - /// Returns a list of oneshot receivers to send back the result of resubmission. - pub async fn resubmit_transactions_from_cache( + /// Returns a TransactionResponse after submitting transaction envelope to Stellar, + /// Else an Error. + pub async fn submit_transaction( &self, - ) -> Vec>> { - let _ = self.transaction_submission_lock.lock().await; + envelope: TransactionEnvelope, + ) -> Result { + let _ = self.save_tx_envelope_to_cache(envelope.clone()); - // Iterates over all errors and creates channels to send errors back to the - // caller of this function. - let mut error_receivers = vec![]; - - let mut collect_errors = |errors: Vec| { - for error in errors { - let (sender, receiver) = oneshot::channel(); - error_receivers.push(receiver); - - if let Err(e) = sender.send(Err(error)) { - tracing::error!( - "Failed to send error to list during transaction resubmission: {e:?}" - ); - } - } - }; + let submission_result = self + .client + .submit_transaction( + envelope.clone(), + self.is_public_network(), + self.max_retry_attempts_before_fallback(), + self.max_backoff_delay(), + ) + .await; - let envs = match self.cache.get_tx_envelopes() { - Ok((envs, errors)) => { - collect_errors(errors); - envs - }, - Err(errors) => { - collect_errors(errors); - return error_receivers - }, - }; + let _ = self.remove_tx_envelope_from_cache(&envelope); - let me = Arc::new(self.clone()); - for env in envs.into_iter() { - let me_clone = Arc::clone(&me); + submission_result + } - let (sender, receiver) = oneshot::channel(); - error_receivers.push(receiver); + pub(crate) fn create_and_sign_envelope( + &self, + tx: Transaction, + ) -> Result { + // convert to envelope + let mut envelope = tx.into_transaction_envelope(); + self.sign_envelope(&mut envelope)?; - tokio::spawn(async move { - if let Err(e) = sender.send(me_clone.submit_transaction(env).await) { - tracing::error!( - "Failed to send message during transaction resubmission: {e:?}" - ); - }; - }); - } + Ok(envelope) + } + + pub(crate) fn sign_envelope(&self, envelope: &mut TransactionEnvelope) -> Result<(), Error> { + let network: &Network = + if self.is_public_network { &PUBLIC_NETWORK } else { &TEST_NETWORK }; + + envelope + .sign(network, vec![&self.secret_key()]) + .map_err(|_e| Error::SignEnvelopeError)?; - error_receivers + Ok(()) } - fn create_envelope( + pub(crate) fn create_envelope( &self, request_id: [u8; 32], stroop_fee_per_operation: u32, next_sequence_number: SequenceNumber, operations: Vec, ) -> Result { - let public_key = self.get_public_key(); + let public_key = self.public_key(); // create the transaction let mut transaction = create_basic_spacewalk_stellar_transaction( @@ -281,15 +298,7 @@ impl StellarWallet { transaction.append_multiple(operations)?; // convert to envelope - let mut envelope = transaction.into_transaction_envelope(); - let network: &Network = - if self.is_public_network { &PUBLIC_NETWORK } else { &TEST_NETWORK }; - - envelope - .sign(network, vec![&self.get_secret_key()]) - .map_err(|_e| Error::SignEnvelopeError)?; - - Ok(envelope) + self.create_and_sign_envelope(transaction) } /// Sends a 'Payment' transaction. @@ -319,7 +328,7 @@ impl StellarWallet { let payment_op = if is_payment_for_redeem_request { self.client .create_payment_op_for_redeem_request( - self.get_public_key(), + self.public_key(), destination_address, self.is_public_network, asset, @@ -327,19 +336,14 @@ impl StellarWallet { ) .await? } else { - create_payment_operation( - destination_address, - asset, - stroop_amount, - self.get_public_key(), - )? + create_payment_operation(destination_address, asset, stroop_amount, self.public_key())? }; self.send_to_address(request_id, stroop_fee_per_operation, vec![payment_op]) .await } - async fn send_to_address( + pub(crate) async fn send_to_address( &mut self, request_id: [u8; 32], stroop_fee_per_operation: u32, @@ -347,8 +351,7 @@ impl StellarWallet { ) -> Result { let _ = self.transaction_submission_lock.lock().await; - let account = - self.client.get_account(self.get_public_key(), self.is_public_network).await?; + let account = self.client.get_account(self.public_key(), self.is_public_network).await?; let next_sequence_number = account.sequence + 1; tracing::trace!( @@ -364,8 +367,6 @@ impl StellarWallet { operations, )?; - let _ = self.cache.save_tx_envelope(envelope.clone())?; - self.submit_transaction(envelope).await } } @@ -391,111 +392,16 @@ mod test { use crate::{ error::Error, horizon::{responses::HorizonClaimableBalanceResponse, HorizonClient}, - operations::{ - create_payment_operation, redeem_request_tests::create_account_merge_operation, - }, - TransactionResponse, + mock::*, + StellarWallet, }; - use primitives::{ - stellar::{ - types::{ - CreateAccountResult, CreateClaimableBalanceResult, OperationResult, - OperationResultTr, SequenceNumber, - }, - Asset as StellarAsset, PublicKey, TransactionEnvelope, XdrCodec, + use primitives::stellar::{ + types::{ + CreateAccountResult, CreateClaimableBalanceResult, OperationResult, OperationResultTr, }, - StellarStroops, TransactionEnvelopeExt, + Asset as StellarAsset, }; use serial_test::serial; - use std::sync::Arc; - use tokio::sync::RwLock; - - use crate::{ - test_helper::{default_usdc_asset, public_key_from_encoding, secret_key_from_encoding}, - StellarWallet, - }; - - const DEFAULT_DEST_PUBLIC_KEY: &str = - "GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335X2KGX3IHOJAPP5RE34K4KZVN"; - const STELLAR_VAULT_SECRET_KEY: &str = - "SCV7RZN5XYYMMVSWYCR4XUMB76FFMKKKNHP63UTZQKVM4STWSCIRLWFJ"; - const IS_PUBLIC_NETWORK: bool = false; - - const DEFAULT_STROOP_FEE_PER_OPERATION: u32 = 100; - - impl StellarWallet { - async fn is_account_exist(&self) -> bool { - self.client - .get_account(self.get_public_key(), self.is_public_network) - .await - .is_ok() - } - - /// merges the wallet's account with the specified destination. - /// Exercise prudence when using this method, as it automatically removes the source account - /// once operation is successful. - async fn merge_account( - &mut self, - destination_address: PublicKey, - ) -> Result { - let account_merge_op = create_account_merge_operation( - destination_address, - self.secret_key.get_public().clone(), - )?; - - self.send_to_address( - [9u8; 32], - DEFAULT_STROOP_FEE_PER_OPERATION, - vec![account_merge_op], - ) - .await - } - - fn create_payment_envelope( - &self, - destination_address: PublicKey, - asset: StellarAsset, - stroop_amount: StellarStroops, - request_id: [u8; 32], - stroop_fee_per_operation: u32, - next_sequence_number: SequenceNumber, - ) -> Result { - let public_key = self.get_public_key(); - // create payment operation - let payment_op = create_payment_operation( - destination_address, - asset, - stroop_amount, - public_key.clone(), - )?; - - self.create_envelope( - request_id, - stroop_fee_per_operation, - next_sequence_number, - vec![payment_op], - ) - } - } - - fn wallet_with_storage(storage: &str) -> Result>, Error> { - wallet_with_secret_key_for_storage(storage, STELLAR_VAULT_SECRET_KEY) - } - - fn wallet_with_secret_key_for_storage( - storage: &str, - secret_key: &str, - ) -> Result>, Error> { - Ok(Arc::new(RwLock::new(StellarWallet::from_secret_encoded_with_cache( - secret_key, - IS_PUBLIC_NETWORK, - storage.to_string(), - )?))) - } - - fn default_destination() -> PublicKey { - public_key_from_encoding(DEFAULT_DEST_PUBLIC_KEY) - } #[test] fn test_add_backoff_delay() { @@ -506,18 +412,18 @@ mod test { ) .expect("should return a wallet"); - assert_eq!(wallet.max_backoff_delay, StellarWallet::DEFAULT_MAX_BACKOFF_DELAY_IN_SECS); + assert_eq!(wallet.max_backoff_delay(), StellarWallet::DEFAULT_MAX_BACKOFF_DELAY_IN_SECS); // too big backoff delay let expected_max_backoff_delay = 800; let new_wallet = wallet.with_max_backoff_delay(expected_max_backoff_delay); - assert_ne!(new_wallet.max_backoff_delay, expected_max_backoff_delay); + assert_ne!(new_wallet.max_backoff_delay(), expected_max_backoff_delay); let expected_max_backoff_delay = 300; let new_wallet = new_wallet.with_max_backoff_delay(expected_max_backoff_delay); - assert_eq!(new_wallet.max_backoff_delay, expected_max_backoff_delay); + assert_eq!(new_wallet.max_backoff_delay(), expected_max_backoff_delay); - new_wallet.cache.remove_dir(); + new_wallet.remove_cache_dir(); } #[test] @@ -530,15 +436,15 @@ mod test { .expect("should return an arc rwlock wallet"); assert_eq!( - wallet.max_retry_attempts_before_fallback, + wallet.max_retry_attempts_before_fallback(), StellarWallet::DEFAULT_MAX_RETRY_ATTEMPTS_BEFORE_FALLBACK ); let expected_max_retries = 5; let new_wallet = wallet.with_max_retry_attempts_before_fallback(expected_max_retries); - assert_eq!(new_wallet.max_retry_attempts_before_fallback, expected_max_retries); + assert_eq!(new_wallet.max_retry_attempts_before_fallback(), expected_max_retries); - new_wallet.cache.remove_dir(); + new_wallet.remove_cache_dir(); } #[tokio::test] @@ -598,7 +504,7 @@ mod test { let _ = tokio::join!(first_job, second_job); - wallet.read().await.cache.remove_dir(); + wallet.read().await.remove_cache_dir(); } #[tokio::test] @@ -610,7 +516,7 @@ mod test { let mut wallet = wallet.write().await; // let's cleanup, just to make sure. - wallet.cache.remove_all_tx_envelopes(); + wallet.remove_tx_envelopes_from_cache(); let amount = 10_000; // in the response, value is 0.0010000. let request_id = [1u8; 32]; @@ -640,11 +546,11 @@ mod test { // check existence of claimable balance. let HorizonClaimableBalanceResponse { claimable_balance } = wallet .client - .get_claimable_balance(id.clone(), wallet.is_public_network) + .get_claimable_balance(id.clone(), wallet.is_public_network()) .await .expect("should return a response"); - assert_eq!(claimable_balance.sponsor, wallet.get_public_key().to_encoding()); + assert_eq!(claimable_balance.sponsor, wallet.public_key().to_encoding()); assert_eq!(&claimable_balance.amount, "0.0010000".as_bytes()); @@ -660,7 +566,7 @@ mod test { }, } - wallet.cache.remove_dir(); + wallet.remove_cache_dir(); } #[tokio::test] @@ -674,7 +580,7 @@ mod test { let mut wallet = wallet.write().await; // let's cleanup, just to make sure. - wallet.cache.remove_all_tx_envelopes(); + wallet.remove_tx_envelopes_from_cache(); // sending enough amount to be able to perform account merge. let amount = 200_000_000; @@ -724,14 +630,14 @@ mod test { // `STELLAR_VAULT_SECRET_KEY`. assert!(!temp_wallet.is_account_exist().await); - temp_wallet.cache.remove_dir(); + temp_wallet.remove_cache_dir(); }, other => { panic!("wrong result: {other:?}"); }, } - wallet.cache.remove_dir(); + wallet.remove_cache_dir(); } #[tokio::test] @@ -759,7 +665,7 @@ mod test { assert!(!transaction_response.hash.to_vec().is_empty()); assert!(transaction_response.ledger() > 0); - wallet.read().await.cache.remove_dir(); + wallet.read().await.remove_cache_dir(); } #[tokio::test] @@ -771,9 +677,9 @@ mod test { let mut wallet = wallet.write().await; // let's cleanup, just to make sure. - wallet.cache.remove_all_tx_envelopes(); + wallet.remove_tx_envelopes_from_cache(); - let destination = wallet.secret_key.get_public().clone(); + let destination = wallet.public_key().clone(); match wallet .send_payment_to_address( @@ -794,7 +700,7 @@ mod test { }, } - wallet.cache.remove_dir(); + wallet.remove_cache_dir(); } #[tokio::test] @@ -807,7 +713,7 @@ mod test { let mut wallet = wallet.write().await; // let's cleanup, just to make sure. - wallet.cache.remove_all_tx_envelopes(); + wallet.remove_tx_envelopes_from_cache(); let asset = StellarAsset::native(); let amount = 1000; @@ -841,8 +747,8 @@ mod test { assert!(err_insufficient_fee.is_err()); match err_insufficient_fee.unwrap_err() { - Error::HorizonSubmissionError { title: _, status: _, reason, envelope_xdr: _ } => { - assert_eq!(reason, "tx_insufficient_fee: []"); + Error::HorizonSubmissionError { reason, .. } => { + assert_eq!(reason, "tx_insufficient_fee"); }, _ => assert!(false), } @@ -860,105 +766,6 @@ mod test { assert!(tx_response.is_ok()); - wallet.cache.remove_dir(); - } - - #[tokio::test] - #[serial] - async fn resubmit_transactions_works() { - let wallet = wallet_with_storage("resources/resubmit_transactions_works") - .expect("should return an arc rwlock wallet") - .clone(); - let mut wallet = wallet.write().await; - - // let's send a successful transaction first - - let asset = StellarAsset::native(); - let amount = 1001; - let request_id = [0u8; 32]; - - let response = wallet - .send_payment_to_address( - default_destination(), - asset.clone(), - amount, - request_id, - DEFAULT_STROOP_FEE_PER_OPERATION, - false, - ) - .await - .expect("should be ok"); - - // get the sequence number of the previous one. - let env = - TransactionEnvelope::from_base64_xdr(response.envelope_xdr).expect("should convert ok"); - let seq_number = env.sequence_number().expect("should return sequence number"); - - // creating a `tx_bad_seq` envelope. - let request_id = [1u8; 32]; - let bad_envelope = wallet - .create_payment_envelope( - default_destination(), - asset.clone(), - amount, - request_id, - DEFAULT_STROOP_FEE_PER_OPERATION, - seq_number, - ) - .expect("should return an envelope"); - - // let's save this in storage - let _ = wallet.cache.save_tx_envelope(bad_envelope.clone()).expect("should save."); - - // create a successful transaction - let request_id = [2u8; 32]; - let good_envelope = wallet - .create_payment_envelope( - default_destination(), - asset, - amount, - request_id, - DEFAULT_STROOP_FEE_PER_OPERATION, - seq_number + 1, - ) - .expect("should return an envelope"); - - // let's save this in storage - let _ = wallet.cache.save_tx_envelope(good_envelope.clone()).expect("should save"); - - // let's resubmit these 2 transactions - let receivers = wallet.resubmit_transactions_from_cache().await; - assert_eq!(receivers.len(), 2); - - // a count on how many txs passed, and how many failed. - let mut passed_count = 0; - let mut failed_count = 0; - - for receiver in receivers { - match &receiver.await { - Ok(Ok(env)) => { - assert_eq!(env.envelope_xdr, good_envelope.to_base64_xdr()); - passed_count += 1; - }, - Ok(Err(Error::HorizonSubmissionError { - title: _, - status: _, - reason, - envelope_xdr: _, - })) => { - assert_eq!(reason, "tx_bad_seq: []"); - failed_count += 1; - }, - other => { - panic!("other result was received: {other:?}") - }, - } - } - - // 1 should pass, and 1 should fail. - assert_eq!(passed_count, 1); - assert_eq!(failed_count, 1); - - wallet.cache.remove_dir(); + wallet.remove_tx_envelopes_from_cache(); } } diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 55af0414e..21fcf3f2c 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -30,7 +30,7 @@ use stellar::{ pub use substrate_stellar_sdk as stellar; use substrate_stellar_sdk::{ types::{OperationBody, SequenceNumber}, - ClaimPredicate, Claimant, Memo, MuxedAccount, Operation, TransactionEnvelope, + ClaimPredicate, Claimant, Memo, MuxedAccount, Operation, Transaction, TransactionEnvelope, }; #[cfg(test)] @@ -807,6 +807,8 @@ pub trait TransactionEnvelopeExt { -> u128; fn sequence_number(&self) -> Option; + + fn get_transaction(&self) -> Option; } impl TransactionEnvelopeExt for TransactionEnvelope { @@ -875,4 +877,13 @@ impl TransactionEnvelopeExt for TransactionEnvelope { TransactionEnvelope::EnvelopeTypeTxFeeBump(_) | TransactionEnvelope::Default(_) => None, } } + + fn get_transaction(&self) -> Option { + match self { + TransactionEnvelope::EnvelopeTypeTxV0(transaction) => + Some(transaction.tx.clone().into()), + TransactionEnvelope::EnvelopeTypeTx(transaction) => Some(transaction.tx.clone()), + _ => None, + } + } }