Skip to content

Commit

Permalink
[WIP] 340 add handling for non recoverable stellar submission errors (#…
Browse files Browse the repository at this point in the history
…391)

* Moved cache population from send_to_address to submit_transaction function

* Added draft implementation for error handling

* removed pub visibility modifier from submit_transaction

* Made struct fields public

* Changed submit_transaction function

* Fixed unit test. Added mocktopus to Cargo.toml

* Update Cargo.lock

* Refactor code

* Added draft implementation for error handling

* re-add again Adel's code

* cargo fmt

* incorporate Marcel's changes

* cleanup after rebase

* apply changes

* update resubmission code

* add test cases

* update test cases and some functions

* remove stray println

* increase loop time interval

* increase loop time interval

* remove the "break", as we want it to loop endlessly

* add logs and update the auto checker

* cleanup logs

* https://github.com/pendulum-chain/spacewalk/actions/runs/6732117257/job/18298175427#step:10:39

* removed the idea of auto-check; immediately just delete from cache;
#391 (comment),
#391 (comment),
#391 (comment),
#391 (comment),
#391 (comment),
#391 (comment),
#391 (comment)

* update unfinished comment

* update unfinished commenthttps://github.com/pendulum-chain/spacewalk/actions/runs/6743040605/job/18330206794?pr=391#step:9:152

* https://github.com/pendulum-chain/spacewalk/actions/runs/6743411899/job/18387709725#step:11:12

* https://github.com/pendulum-chain/spacewalk/actions/runs/6768409960/job/18392787715?pr=391#step:12:665

* #391 (comment),
#391 (comment),
#391 (comment),
#391 (comment),
#391 (comment),
#391 (comment)

* https://github.com/pendulum-chain/spacewalk/actions/runs/6822042466/job/18553402505?pr=391#step:10:56

* https://github.com/pendulum-chain/spacewalk/actions/runs/6846493065/job/18613198292?pr=391#step:10:41

* cargo fmt

---------

Co-authored-by: Marcel Ebert <[email protected]>
Co-authored-by: b-yap <[email protected]>
  • Loading branch information
3 people authored Nov 14, 2023
1 parent 479ec0e commit 82f9e43
Show file tree
Hide file tree
Showing 17 changed files with 1,125 additions and 418 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion clients/vault/src/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub async fn handle_replace_request<
&event.old_vault_id,
event.amount,
0, // do not lock any additional collateral
wallet.get_public_key_raw(),
wallet.public_key_raw(),
)
.await?)
}
Expand Down
14 changes: 7 additions & 7 deletions clients/vault/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use runtime::{
};
use service::{wait_or_shutdown, Error as ServiceError, MonitoringConfig, Service};
use stellar_relay_lib::{sdk::PublicKey, StellarOverlayConfig};
use wallet::{LedgerTxEnvMap, StellarWallet};
use wallet::{LedgerTxEnvMap, StellarWallet, RESUBMISSION_INTERVAL_IN_SECS};

use crate::{
cancellation::ReplaceCanceller,
Expand Down Expand Up @@ -704,7 +704,7 @@ impl VaultService {
let stellar_wallet = StellarWallet::from_secret_encoded(&secret_key, is_public_network)?;
tracing::debug!(
"Vault wallet public key: {}",
from_utf8(&stellar_wallet.get_public_key().to_encoding())?
from_utf8(&stellar_wallet.public_key().to_encoding())?
);

let stellar_wallet = Arc::new(RwLock::new(stellar_wallet));
Expand Down Expand Up @@ -759,13 +759,13 @@ impl VaultService {
self.vault_id_manager.fetch_vault_ids().await?;

let wallet = self.stellar_wallet.write().await;
let vault_public_key = wallet.get_public_key();
let vault_public_key = wallet.public_key();
let is_public_network = wallet.is_public_network();

// re-submit transactions in the cache
let _receivers = wallet.resubmit_transactions_from_cache().await;
//todo: handle errors from the receivers

wallet
.start_periodic_resubmission_of_transactions_from_cache(RESUBMISSION_INTERVAL_IN_SECS)
.await;
drop(wallet);

let oracle_agent = self.create_oracle_agent(is_public_network).await?;
Expand Down Expand Up @@ -806,7 +806,7 @@ impl VaultService {
}

if self.spacewalk_parachain.get_public_key().await?.is_none() {
let public_key = self.stellar_wallet.read().await.get_public_key();
let public_key = self.stellar_wallet.read().await.public_key();
let pub_key_encoded = public_key.to_encoding();
tracing::info!(
"Registering public key to the parachain...{}",
Expand Down
2 changes: 1 addition & 1 deletion clients/vault/tests/helper/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub async fn register_vault_with_wallet(
items: Vec<(&SpacewalkParachain, &VaultId, u128)>,
) -> u128 {
let wallet_read = wallet.read().await;
let public_key = wallet_read.get_public_key();
let public_key = wallet_read.public_key();

let vault_collateral = register_vault(public_key, items).await;

Expand Down
40 changes: 20 additions & 20 deletions clients/vault/tests/vault_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async fn test_redeem_succeeds() {
),
async {
let wallet_read = user_wallet.read().await;
let address = wallet_read.get_public_key_raw();
let address = wallet_read.public_key_raw();
drop(wallet_read);
// We redeem half of what we issued
let redeem_id = user_provider
Expand Down Expand Up @@ -340,7 +340,7 @@ async fn test_cancel_scheduler_succeeds() {
let wallet_read = vault_wallet.read().await;
let issue_request_listener = vault::service::listen_for_issue_requests(
new_vault_provider.clone(),
wallet_read.get_public_key(),
wallet_read.public_key(),
issue_cancellation_event_tx.clone(),
issue_set.clone(),
memos_to_issue_ids.clone(),
Expand Down Expand Up @@ -473,7 +473,7 @@ async fn test_issue_cancel_succeeds() {
let memos_to_issue_ids = Arc::new(RwLock::new(IssueIdLookup::new()));

let issue_filter =
IssueFilter::new(&vault_wallet.read().await.get_public_key()).expect("Invalid filter");
IssueFilter::new(&vault_wallet.read().await.public_key()).expect("Invalid filter");

let issue_amount = upscaled_compatible_amount(100);
let vault_collateral = get_required_vault_collateral_for_issue(
Expand All @@ -489,7 +489,7 @@ async fn test_issue_cancel_succeeds() {
.register_vault_with_public_key(
&vault_id,
vault_collateral,
vault_wallet.read().await.get_public_key_raw(),
vault_wallet.read().await.public_key_raw(),
)
.await
);
Expand Down Expand Up @@ -524,7 +524,7 @@ async fn test_issue_cancel_succeeds() {
let wallet_read = vault_wallet.read().await;
let service = join3(
vault::service::listen_for_new_transactions(
wallet_read.get_public_key(),
wallet_read.public_key(),
wallet_read.is_public_network(),
slot_tx_env_map.clone(),
issue_set.clone(),
Expand All @@ -533,7 +533,7 @@ async fn test_issue_cancel_succeeds() {
),
vault::service::listen_for_issue_requests(
vault_provider.clone(),
wallet_read.get_public_key(),
wallet_read.public_key(),
issue_event_tx,
issue_set.clone(),
memos_to_issue_ids.clone(),
Expand Down Expand Up @@ -662,7 +662,7 @@ async fn test_automatic_issue_execution_succeeds() {
.register_vault_with_public_key(
&vault_id,
vault_collateral,
wallet_read.get_public_key_raw(),
wallet_read.public_key_raw(),
)
.await
);
Expand Down Expand Up @@ -705,16 +705,16 @@ async fn test_automatic_issue_execution_succeeds() {
};

let wallet_read = vault_wallet.read().await;
let issue_filter =
IssueFilter::new(&wallet_read.get_public_key()).expect("Invalid filter");
let issue_filter = IssueFilter::new(&wallet_read.public_key()).expect("Invalid filter");

let slot_tx_env_map = Arc::new(RwLock::new(HashMap::new()));

let issue_set = Arc::new(RwLock::new(IssueRequestsMap::new()));
let memos_to_issue_ids = Arc::new(RwLock::new(IssueIdLookup::new()));
let (issue_event_tx, _issue_event_rx) = mpsc::channel::<CancellationEvent>(16);
let service = join3(
vault::service::listen_for_new_transactions(
wallet_read.get_public_key(),
wallet_read.public_key(),
wallet_read.is_public_network(),
slot_tx_env_map.clone(),
issue_set.clone(),
Expand All @@ -723,7 +723,7 @@ async fn test_automatic_issue_execution_succeeds() {
),
vault::service::listen_for_issue_requests(
vault_provider.clone(),
wallet_read.get_public_key(),
wallet_read.public_key(),
issue_event_tx,
issue_set.clone(),
memos_to_issue_ids.clone(),
Expand Down Expand Up @@ -777,7 +777,7 @@ async fn test_automatic_issue_execution_succeeds_for_other_vault() {
.register_vault_with_public_key(
&vault1_id,
vault_collateral,
wallet_read.get_public_key_raw(),
wallet_read.public_key_raw(),
)
.await
);
Expand All @@ -786,7 +786,7 @@ async fn test_automatic_issue_execution_succeeds_for_other_vault() {
.register_vault_with_public_key(
&vault2_id,
vault_collateral,
wallet_read.get_public_key_raw(),
wallet_read.public_key_raw(),
)
.await
);
Expand Down Expand Up @@ -852,7 +852,7 @@ async fn test_automatic_issue_execution_succeeds_for_other_vault() {
};

let wallet_read = vault_wallet.read().await;
let vault_account_public_key = wallet_read.get_public_key();
let vault_account_public_key = wallet_read.public_key();
drop(wallet_read);
let issue_filter = IssueFilter::new(&vault_account_public_key).expect("Invalid filter");

Expand Down Expand Up @@ -920,7 +920,7 @@ async fn test_execute_open_requests_succeeds() {
.register_vault_with_public_key(
&vault_id,
vault_collateral,
wallet_read.get_public_key_raw(),
wallet_read.public_key_raw(),
)
.await
);
Expand All @@ -936,8 +936,8 @@ async fn test_execute_open_requests_succeeds() {
.await;

let wallet_read = user_wallet.read().await;
let address = wallet_read.get_public_key();
let address_raw = wallet_read.get_public_key_raw();
let address = wallet_read.public_key();
let address_raw = wallet_read.public_key_raw();
drop(wallet_read);
// Place redeem requests. 100_00000 is our minimum redeem amount with the current fee
// settings defined in the chain spec
Expand Down Expand Up @@ -1088,7 +1088,7 @@ async fn test_shutdown() {
.register_vault_with_public_key(
&sudo_vault_id,
vault_collateral,
vault_wallet.read().await.get_public_key_raw(),
vault_wallet.read().await.public_key_raw(),
)
.await
);
Expand Down Expand Up @@ -1136,13 +1136,13 @@ async fn test_requests_with_incompatible_amounts_fail() {
.await;

let wallet_read = vault_wallet.read().await;
let address = wallet_read.get_public_key_raw();
let address = wallet_read.public_key_raw();
assert_ok!(
vault_provider
.register_vault_with_public_key(
&vault_id,
vault_collateral,
wallet_read.get_public_key_raw()
wallet_read.public_key_raw()
)
.await
);
Expand Down
1 change: 1 addition & 0 deletions clients/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ primitives = { package = "spacewalk-primitives", path = "../../primitives"}
[dev-dependencies]
mockall = "0.8.1"
serial_test = "0.9.0"
mocktopus = "0.8.0"
19 changes: 9 additions & 10 deletions clients/wallet/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ impl WalletStateStorage {
}

/// Removes a transaction from the local folder
pub fn remove_tx_envelope(&self, sequence: SequenceNumber) -> Result<(), Error> {
pub fn remove_tx_envelope(&self, sequence: SequenceNumber) {
let full_file_path = format!("{}/{sequence}", self.txs_inner_dir());
remove_file(&full_file_path).map_err(|e| {
tracing::error!("Failed to delete file: {:?}", e);
Error::cache_error_with_seq(CacheErrorKind::DeleteFileFailed, sequence)
})
match remove_file(&full_file_path) {
Ok(_) => tracing::debug!("remove_tx_envelope(): Deleted file with sequence {sequence}"),
Err(e) => tracing::error!(
"remove_tx_envelope(): Failed to delete file with sequence {sequence}: {e:?}"
),
}
}

#[allow(dead_code)]
Expand Down Expand Up @@ -330,7 +332,7 @@ mod test {
extract_tx_envelope_from_path, parse_xdr_string_to_vec_u8, Error, WalletStateStorage,
},
error::CacheErrorKind,
test_helper::public_key_from_encoding,
mock::public_key_from_encoding,
};
use primitives::{
stellar::{
Expand Down Expand Up @@ -477,10 +479,7 @@ mod test {
let actual_tx = new_storage.get_tx_envelope(sequence).expect("a tx should be found");
assert_eq!(actual_tx, expected_tx);

assert!(new_storage.remove_tx_envelope(sequence).is_ok());

// removing a tx again will return an error.
assert_error(new_storage.remove_tx_envelope(sequence), CacheErrorKind::DeleteFileFailed);
new_storage.remove_tx_envelope(sequence);

// let's remove the entire directory
new_storage.remove_dir();
Expand Down
17 changes: 9 additions & 8 deletions clients/wallet/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub enum Error {
title: String,
status: StatusCode,
reason: String,
result_code_op: Vec<String>,
envelope_xdr: Option<String>,
},
#[error("Could not parse string: {0}")]
Expand All @@ -29,6 +30,9 @@ pub enum Error {
#[error(transparent)]
CacheError(CacheError),

#[error("Transaction resubmission failed: {0}")]
ResubmissionError(String),

#[error("Cannot send payment to self")]
SelfPaymentError,
}
Expand All @@ -37,9 +41,7 @@ impl Error {
pub fn is_recoverable(&self) -> bool {
match self {
Error::HorizonResponseError(e) if e.is_timeout() => true,
Error::HorizonSubmissionError { title: _, status, reason: _, envelope_xdr: _ }
if *status == 504 =>
true,
Error::HorizonSubmissionError { status, .. } if *status == 504 => true,
Error::CacheError(e) => match e.kind {
CacheErrorKind::CreateDirectoryFailed |
CacheErrorKind::FileCreationFailed |
Expand All @@ -58,8 +60,7 @@ impl Error {
match self {
Error::HorizonResponseError(e) =>
e.status().map(|code| server_errors.contains(&code.as_u16())).unwrap_or(false),
Error::HorizonSubmissionError { title: _, status, reason: _, envelope_xdr: _ } =>
server_errors.contains(status),
Error::HorizonSubmissionError { status, .. } => server_errors.contains(status),
_ => false,
}
}
Expand Down Expand Up @@ -99,9 +100,9 @@ impl Error {
#[derive(Error, PartialEq, Eq)]
pub struct CacheError {
pub(crate) kind: CacheErrorKind,
path: Option<String>,
envelope: Option<TransactionEnvelope>,
sequence_number: Option<SequenceNumber>,
pub(crate) path: Option<String>,
pub(crate) envelope: Option<TransactionEnvelope>,
pub(crate) sequence_number: Option<SequenceNumber>,
}
impl Display for CacheError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand Down
9 changes: 8 additions & 1 deletion clients/wallet/src/horizon/horizon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,13 @@ impl HorizonClient for reqwest::Client {
continue
},

Err(Error::HorizonSubmissionError { title, status, reason, envelope_xdr }) => {
Err(Error::HorizonSubmissionError {
title,
status,
reason,
result_code_op,
envelope_xdr,
}) => {
tracing::error!("submitting transaction with seq no: {seq_no:?}: failed with {title}, {reason}");
tracing::debug!("submitting transaction with seq no: {seq_no:?}: the envelope: {envelope_xdr:?}");
let envelope_xdr = envelope_xdr.or(Some(transaction_xdr.to_string()));
Expand All @@ -181,6 +187,7 @@ impl HorizonClient for reqwest::Client {
title,
status,
reason,
result_code_op,
envelope_xdr,
})
},
Expand Down
Loading

0 comments on commit 82f9e43

Please sign in to comment.