Skip to content

Commit

Permalink
https://github.com/pendulum-chain/spacewalk/pull/385#discussion_r1301…
Browse files Browse the repository at this point in the history
  • Loading branch information
b-yap committed Aug 23, 2023
1 parent 4a57904 commit 85f95f3
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 128 deletions.
9 changes: 5 additions & 4 deletions clients/vault/src/oracle/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,15 @@ impl OracleAgent {

#[cfg(test)]
mod tests {

use crate::oracle::{
get_test_secret_key, get_test_stellar_relay_config, traits::ArchiveStorage,
ScpArchiveStorage, TransactionsArchiveStorage,
get_test_secret_key, get_test_stellar_relay_config, start_oracle_agent,
traits::ArchiveStorage, Error, ScpArchiveStorage, TransactionsArchiveStorage,
};
use std::time::Duration;

use super::*;
use serial_test::serial;
use stellar_relay_lib::StellarOverlayConfig;
use tokio::time::sleep;

#[tokio::test]
#[ntest::timeout(1_800_000)] // timeout at 30 minutes
Expand Down
143 changes: 59 additions & 84 deletions clients/vault/src/requests/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use crate::{
requests::{
helper::{
get_all_transactions_of_wallet_async, get_request_for_stellar_tx,
retrieve_open_requests_async,
retrieve_open_redeem_replace_requests_async, PayAndExecuteExt,
},
structs::Request,
PayAndExecute,
},
VaultIdManager, YIELD_RATE,
};
use async_trait::async_trait;
use governor::{
clock::{Clock, ReasonablyRealtime},
middleware::RateLimitingMiddleware,
Expand Down Expand Up @@ -190,25 +192,9 @@ async fn execute_open_request_async(
}
}

/// Spawns cancelable task for each open request.
/// The task performs payment and execution of the open request.
///
/// # Arguments
///
/// * `requests` - open/pending requests that requires Stellar payment before execution
/// * `vault_id_manager` - contains all the vault ids and their data
/// * `shutdown_tx` - for sending and receiving shutdown signals
/// * `parachain_rpc` - the parachain RPC handle
/// * `oracle_agent` - the agent used to get the proofs
/// * `rate_limiter` - rate limiter
fn spawn_tasks_to_pay_and_execute_open_requests<S, C, MW>(
requests: HashMap<TextMemo, Request>,
vault_id_manager: VaultIdManager,
shutdown_tx: ShutdownSender,
parachain_rpc: &SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
rate_limiter: Arc<RateLimiter<NotKeyed, S, C, MW>>,
) where
#[async_trait]
impl<S, C, MW> PayAndExecuteExt<RateLimiter<NotKeyed, S, C, MW>> for PayAndExecute
where
S: DirectStateStore + Send + Sync + 'static,
C: ReasonablyRealtime + Send + Sync + 'static,
MW: RateLimitingMiddleware<C::Instant, NegativeOutcome = NotUntil<C::Instant>>
Expand All @@ -217,78 +203,67 @@ fn spawn_tasks_to_pay_and_execute_open_requests<S, C, MW>(
+ 'static,
<MW as RateLimitingMiddleware<<C as Clock>::Instant>>::PositiveOutcome: Send,
{
for (_, request) in requests {
// there are potentially a large number of open requests - pay and execute each
// in a separate task to ensure that awaiting confirmations does not significantly
// delay other requests
// make copies of the variables we move into the task
spawn_cancelable(
shutdown_tx.subscribe(),
pay_and_execute_open_request_async(
request,
vault_id_manager.clone(),
parachain_rpc.clone(),
oracle_agent.clone(),
rate_limiter.clone(),
),
);
fn spawn_tasks_to_pay_and_execute_open_requests(
requests: HashMap<TextMemo, Request>,
vault_id_manager: VaultIdManager,
shutdown_tx: ShutdownSender,
parachain_rpc: &SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
rate_limiter: Arc<RateLimiter<NotKeyed, S, C, MW>>,
) {
for (_, request) in requests {
// there are potentially a large number of open requests - pay and execute each
// in a separate task to ensure that awaiting confirmations does not significantly
// delay other requests
// make copies of the variables we move into the task
spawn_cancelable(
shutdown_tx.subscribe(),
Self::pay_and_execute_open_request_async(
request,
vault_id_manager.clone(),
parachain_rpc.clone(),
oracle_agent.clone(),
rate_limiter.clone(),
),
);
}
}
}

/// Performs payment and execution of the open request.
/// The stellar address of the open request receives the payment; and
/// the vault id of the open request sends the payment.
/// However, the vault id MUST exist in the vault_id_manager.
///
/// # Arguments
///
/// * `request` - the open request
/// * `vault_id_manager` - contains all the vault ids and their data.
/// * `parachain_rpc` - the parachain RPC handle
/// * `oracle_agent` - the agent used to get the proofs
/// * `rate_limiter` - rate limiter
async fn pay_and_execute_open_request_async<S, C, MW>(
request: Request,
vault_id_manager: VaultIdManager,
parachain_rpc: SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
rate_limiter: Arc<RateLimiter<NotKeyed, S, C, MW>>,
) where
S: DirectStateStore + Send + Sync + 'static,
C: ReasonablyRealtime + Send + Sync + 'static,
MW: RateLimitingMiddleware<C::Instant, NegativeOutcome = NotUntil<C::Instant>>
+ Send
+ Sync
+ 'static,
<MW as RateLimitingMiddleware<<C as Clock>::Instant>>::PositiveOutcome: Send,
{
let Some(vault) = vault_id_manager.get_vault(request.vault_id()).await else {
tracing::error!(
async fn pay_and_execute_open_request_async(
request: Request,
vault_id_manager: VaultIdManager,
parachain_rpc: SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
rate_limiter: Arc<RateLimiter<NotKeyed, S, C, MW>>,
) {
let Some(vault) = vault_id_manager.get_vault(request.vault_id()).await else {
tracing::error!(
"Couldn't process open {:?} request #{:?}: Failed to fetch vault data for vault {}",
request.request_type(),
request.hash(),
request.vault_id().pretty_print()
);

return; // nothing we can do - bail
};
return; // nothing we can do - bail
};

// We rate limit the number of transactions we pay and execute simultaneously because
// sending too many at once might cause the Stellar network to respond with a timeout
// error.
rate_limiter.until_ready().await;
// We rate limit the number of transactions we pay and execute simultaneously because
// sending too many at once might cause the Stellar network to respond with a timeout
// error.
rate_limiter.until_ready().await;

match request.pay_and_execute(parachain_rpc, vault, oracle_agent).await {
Ok(_) => tracing::info!(
"Successfully executed open {:?} request #{:?}",
request.request_type(),
request.hash()
),
Err(e) => tracing::info!(
"Failed to process open {:?} request #{:?} due to error: {e}",
request.request_type(),
request.hash(),
),
match request.pay_and_execute(parachain_rpc, vault, oracle_agent).await {
Ok(_) => tracing::info!(
"Successfully executed open {:?} request #{:?}",
request.request_type(),
request.hash()
),
Err(e) => tracing::info!(
"Failed to process open {:?} request #{:?} due to error: {e}",
request.request_type(),
request.hash(),
),
}
}
}

Expand Down Expand Up @@ -316,7 +291,7 @@ pub async fn execute_open_requests(
let parachain_rpc_ref = &parachain_rpc;

// get all redeem and replace requests
let mut open_requests = retrieve_open_requests_async(
let mut open_requests = retrieve_open_redeem_replace_requests_async(
parachain_rpc_ref,
parachain_rpc.get_account_id().clone(),
payment_margin,
Expand All @@ -339,7 +314,7 @@ pub async fn execute_open_requests(

// Remaining requests in the hashmap did not have a Stellar payment yet,
// so pay and execute all of these
spawn_tasks_to_pay_and_execute_open_requests(
PayAndExecute::spawn_tasks_to_pay_and_execute_open_requests(
open_requests,
vault_id_manager,
shutdown_tx,
Expand Down
53 changes: 48 additions & 5 deletions clients/vault/src/requests/helper.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,62 @@
use async_trait::async_trait;
use futures::try_join;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock;

use crate::{requests::structs::Request, Error};
use crate::{requests::structs::Request, Error, VaultIdManager};

use crate::oracle::OracleAgent;
use primitives::{derive_shortened_request_id, TextMemo, TransactionEnvelopeExt};
use runtime::{
AccountId, RedeemPallet, RedeemRequestStatus, ReplacePallet, ReplaceRequestStatus,
SpacewalkParachain,
ShutdownSender, SpacewalkParachain,
};
use service::Error as ServiceError;
use wallet::{StellarWallet, TransactionResponse, TransactionsResponseIter};

#[async_trait]
pub(crate) trait PayAndExecuteExt<R> {
/// Spawns cancelable task for each open request.
/// The task performs payment and execution of the open request.
///
/// # Arguments
///
/// * `requests` - open/pending requests that requires Stellar payment before execution
/// * `vault_id_manager` - contains all the vault ids and their data
/// * `shutdown_tx` - for sending and receiving shutdown signals
/// * `parachain_rpc` - the parachain RPC handle
/// * `oracle_agent` - the agent used to get the proofs
/// * `rate_limiter` - rate limiter
fn spawn_tasks_to_pay_and_execute_open_requests(
requests: HashMap<TextMemo, Request>,
vault_id_manager: VaultIdManager,
shutdown_tx: ShutdownSender,
parachain_rpc: &SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
rate_limiter: Arc<R>,
);

/// Performs payment and execution of the open request.
/// The stellar address of the open request receives the payment; and
/// the vault id of the open request sends the payment.
/// However, the vault id MUST exist in the vault_id_manager.
///
/// # Arguments
///
/// * `request` - the open request
/// * `vault_id_manager` - contains all the vault ids and their data.
/// * `parachain_rpc` - the parachain RPC handle
/// * `oracle_agent` - the agent used to get the proofs
/// * `rate_limiter` - rate limiter
async fn pay_and_execute_open_request_async(
request: Request,
vault_id_manager: VaultIdManager,
parachain_rpc: SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
rate_limiter: Arc<R>,
);
}

/// Returns an iter of all known transactions of the wallet
pub(crate) async fn get_all_transactions_of_wallet_async(
wallet: Arc<RwLock<StellarWallet>>,
Expand All @@ -22,8 +67,6 @@ pub(crate) async fn get_all_transactions_of_wallet_async(
let transactions_result = wallet.get_all_transactions_iter().await;
drop(wallet);

// Check if some of the requests that are open already have a corresponding payment on Stellar
// and are just waiting to be executed on the parachain
match transactions_result {
Err(e) => {
tracing::error!(
Expand Down Expand Up @@ -63,7 +106,7 @@ pub(crate) fn get_request_for_stellar_tx(
/// * `vault_id` - account ID of the vault
/// * `payment_margin` - minimum time to the the redeem execution deadline to make the stellar
/// payment.
pub(crate) async fn retrieve_open_requests_async(
pub(crate) async fn retrieve_open_redeem_replace_requests_async(
parachain_rpc: &SpacewalkParachain,
vault_id: AccountId,
payment_margin: Duration,
Expand Down
2 changes: 2 additions & 0 deletions clients/vault/src/requests/structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,5 @@ impl Request {
Ok(response)
}
}

pub struct PayAndExecute;
10 changes: 7 additions & 3 deletions clients/vault/tests/helper/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use runtime::{
assert_event, get_required_vault_collateral_for_issue, setup_provider, SubxtClient,
},
stellar::SecretKey,
ExecuteRedeemEvent, IssuePallet, SpacewalkParachain, VaultId,
ExecuteRedeemEvent, IssuePallet, SpacewalkParachain, VaultId, VaultRegistryPallet,
};
use sp_keyring::AccountKeyring;
use sp_runtime::traits::StaticLookup;
Expand All @@ -30,13 +30,17 @@ pub fn upscaled_compatible_amount(amount: StellarStroops) -> u128 {
}

#[async_trait]
pub trait SpacewalkParachainExt {
pub trait SpacewalkParachainExt: VaultRegistryPallet {
async fn register_vault_with_public_key(
&self,
vault_id: &VaultId,
collateral: u128,
public_key: crate::StellarPublicKey,
) -> Result<(), runtime::Error>;
) -> Result<(), runtime::Error> {
self.register_public_key(public_key).await.unwrap();
self.register_vault(vault_id, collateral).await.unwrap();
Ok(())
}
}

pub async fn create_vault(
Expand Down
15 changes: 2 additions & 13 deletions clients/vault/tests/helper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use runtime::{
default_provider_client, set_exchange_rate_and_wait, setup_provider, SubxtClient,
},
types::FixedU128,
SpacewalkParachain, VaultId, VaultRegistryPallet,
SpacewalkParachain, VaultId,
};
use sp_arithmetic::FixedPointNumber;
use sp_keyring::AccountKeyring;
Expand All @@ -33,18 +33,7 @@ lazy_static! {
}

#[async_trait]
impl SpacewalkParachainExt for SpacewalkParachain {
async fn register_vault_with_public_key(
&self,
vault_id: &VaultId,
collateral: u128,
public_key: StellarPublicKey,
) -> Result<(), runtime::Error> {
self.register_public_key(public_key).await.unwrap();
self.register_vault(vault_id, collateral).await.unwrap();
Ok(())
}
}
impl SpacewalkParachainExt for SpacewalkParachain {}

pub async fn test_with<F, R>(execute: impl FnOnce(SubxtClient, ArcRwLock<StellarWallet>) -> F) -> R
where
Expand Down
4 changes: 2 additions & 2 deletions clients/wallet/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::types::StatusCode;
use primitives::stellar::{types::SequenceNumber, TransactionEnvelope};
use reqwest::Error as FetchError;
use std::fmt::{Debug, Display, Formatter};

use thiserror::Error;

#[derive(Debug, Error)]
Expand All @@ -15,7 +15,7 @@ pub enum Error {
#[error("Transaction submission failed. Title: {title}, Status: {status}, Reason: {reason}, Envelope XDR: {envelope_xdr:?}")]
HorizonSubmissionError {
title: String,
status: u16,
status: StatusCode,
reason: String,
envelope_xdr: Option<String>,
},
Expand Down
Loading

0 comments on commit 85f95f3

Please sign in to comment.