diff --git a/clients/vault/src/system.rs b/clients/vault/src/system.rs index b6546a810..8bcc8e3aa 100644 --- a/clients/vault/src/system.rs +++ b/clients/vault/src/system.rs @@ -1,3 +1,4 @@ +#![allow(clippy::too_many_arguments)] use std::{ collections::HashMap, convert::TryInto, fs, future::Future, pin::Pin, str::from_utf8, sync::Arc, time::Duration, @@ -6,7 +7,10 @@ use std::{ use async_trait::async_trait; use clap::Parser; use futures::{ - channel::{mpsc, mpsc::Sender}, + channel::{ + mpsc, + mpsc::{Receiver, Sender}, + }, future::{join, join_all}, SinkExt, TryFutureExt, }; @@ -14,24 +18,25 @@ use git_version::git_version; use tokio::{sync::RwLock, time::sleep}; use runtime::{ - cli::parse_duration_minutes, CollateralBalancesPallet, CurrencyId, Error as RuntimeError, - IssueIdLookup, IssueRequestsMap, PrettyPrint, RegisterVaultEvent, ShutdownSender, - SpacewalkParachain, StellarRelayPallet, TryFromSymbol, UpdateActiveBlockEvent, UtilFuncs, - VaultCurrencyPair, VaultId, VaultRegistryPallet, + cli::parse_duration_minutes, AccountId, BlockNumber, CollateralBalancesPallet, CurrencyId, + Error as RuntimeError, IssueIdLookup, IssueRequestsMap, PrettyPrint, RegisterVaultEvent, + ShutdownSender, SpacewalkParachain, StellarRelayPallet, TryFromSymbol, UpdateActiveBlockEvent, + UtilFuncs, VaultCurrencyPair, VaultId, VaultRegistryPallet, }; use service::{wait_or_shutdown, Error as ServiceError, MonitoringConfig, Service}; -use stellar_relay_lib::StellarOverlayConfig; +use stellar_relay_lib::{sdk::PublicKey, StellarOverlayConfig}; use wallet::{LedgerTxEnvMap, StellarWallet}; use crate::{ cancellation::ReplaceCanceller, error::Error, - execution::execute_open_requests, issue, issue::IssueFilter, metrics::{monitor_bridge_metrics, poll_metrics, publish_tokio_metrics, PerCurrencyMetrics}, + oracle::OracleAgent, redeem::listen_for_redeem_requests, replace::{listen_for_accept_replace, listen_for_execute_replace, listen_for_replace_requests}, + requests::execution::execute_open_requests, service::{CancellationScheduler, IssueCanceller}, ArcRwLock, Event, CHAIN_HEIGHT_POLLING_INTERVAL, }; @@ -113,17 +118,14 @@ impl VaultIdManager { .get_vaults_by_account_id(self.spacewalk_parachain.get_account_id()) .await? { - match is_vault_registered(&self.spacewalk_parachain, &vault_id).await { - Err(Error::RuntimeError(RuntimeError::VaultLiquidated)) => { - tracing::error!( - "[{}] Vault is liquidated -- not going to process events for this vault.", - vault_id.pretty_print() - ); - }, - Ok(_) => { - self.add_vault_id(vault_id.clone()).await?; - }, - Err(x) => return Err(x), + // check if vault is registered + match self.spacewalk_parachain.get_vault(&vault_id).await { + Ok(_) => self.add_vault_id(vault_id.clone()).await?, + Err(RuntimeError::VaultLiquidated) => tracing::error!( + "[{}] Vault is liquidated -- not going to process events for this vault.", + vault_id.pretty_print() + ), + Err(e) => return Err(e.into()), } } Ok(()) @@ -342,73 +344,21 @@ fn run(task: F) -> ServiceTask ServiceTask::Essential(Box::pin(task.map_err(|x| x.into()))) } +type RegistrationData = Vec<(CurrencyId, CurrencyId, Option)>; +// dedicated for running the service impl VaultService { - async fn new( - spacewalk_parachain: SpacewalkParachain, - config: VaultServiceConfig, - monitoring_config: MonitoringConfig, - shutdown: ShutdownSender, - ) -> Result { - let is_public_network = - spacewalk_parachain.is_public_network().await.unwrap_or_else(|error| { - // Sometimes the fetch fails with 'StorageItemNotFound' error. - // We assume public network by default - tracing::warn!( - "Failed to fetch public network status from parachain: {error}. Assuming public network." - ); - true - }); - - let secret_key = fs::read_to_string(&config.stellar_vault_secret_key_filepath)? - .trim() - .to_string(); - let stellar_wallet = StellarWallet::from_secret_encoded(&secret_key, is_public_network)?; - tracing::debug!( - "Vault wallet public key: {}", - from_utf8(&stellar_wallet.get_public_key().to_encoding())? - ); - - let stellar_wallet = Arc::new(RwLock::new(stellar_wallet)); - - Ok(Self { - spacewalk_parachain: spacewalk_parachain.clone(), - stellar_wallet: stellar_wallet.clone(), - config, - monitoring_config, - shutdown, - vault_id_manager: VaultIdManager::new(spacewalk_parachain, stellar_wallet), - secret_key, - }) - } - - fn get_vault_id( - &self, - collateral_currency: CurrencyId, - wrapped_currency: CurrencyId, - ) -> VaultId { - let account_id = self.spacewalk_parachain.get_account_id(); - - VaultId { - account_id: account_id.clone(), - currencies: VaultCurrencyPair { - collateral: collateral_currency, - wrapped: wrapped_currency, - }, - } - } - - async fn run_service(&mut self) -> Result<(), ServiceError> { - tracing::info!("Starting client service..."); - - let startup_height = self.await_parachain_block().await?; - let account_id = self.spacewalk_parachain.get_account_id().clone(); - + fn auto_register(&self) -> Result> { + let mut amount_is_none: bool = false; let parsed_auto_register = self .config .auto_register .clone() .into_iter() .map(|(collateral, wrapped, amount)| { + if amount.is_none() { + amount_is_none = true; + } + Ok(( CurrencyId::try_from_symbol(collateral)?, CurrencyId::try_from_symbol(wrapped)?, @@ -420,12 +370,14 @@ impl VaultService { .map_err(ServiceError::Abort)?; // exit if auto-register uses faucet and faucet url not set - if parsed_auto_register.iter().any(|(_, _, o)| o.is_none()) && - self.config.faucet_url.is_none() - { + if amount_is_none && self.config.faucet_url.is_none() { return Err(ServiceError::Abort(Error::FaucetUrlNotSet)) } + Ok(parsed_auto_register) + } + + fn maintain_connection(&self) -> Result<(), ServiceError> { // Subscribe to an event (any event will do) so that a period of inactivity does not close // the jsonrpsee connection let err_provider = self.spacewalk_parachain.clone(); @@ -437,35 +389,13 @@ impl VaultService { }); tokio::task::spawn(err_listener); - self.maybe_register_public_key().await?; - join_all(parsed_auto_register.iter().map( - |(collateral_currency, wrapped_currency, amount)| { - self.maybe_register_vault(collateral_currency, wrapped_currency, amount) - }, - )) - .await - .into_iter() - .collect::>()?; - - // purposefully _after_ maybe_register_vault and _before_ other calls - self.vault_id_manager.fetch_vault_ids().await?; - - let wallet = self.stellar_wallet.write().await; - let vault_public_key = wallet.get_public_key(); - let is_public_network = wallet.is_public_network(); - - // re-submit transactions in the cache - let _receivers = wallet.resubmit_transactions_from_cache().await; - - for result in _receivers { - match &result.await { - Ok(Err(error)) => wallet.handle_error(*error), - _ => (), - } - } - - drop(wallet); + Ok(()) + } + async fn create_oracle_agent( + &self, + is_public_network: bool, + ) -> Result, ServiceError> { let cfg_path = &self.config.stellar_overlay_config_filepath; let stellar_overlay_cfg = StellarOverlayConfig::try_from_path(cfg_path).map_err(Error::StellarRelayError)?; @@ -478,14 +408,17 @@ impl VaultService { let oracle_agent = crate::oracle::start_oracle_agent(stellar_overlay_cfg, &self.secret_key) .await .expect("Failed to start oracle agent"); - let oracle_agent = Arc::new(oracle_agent); + Ok(Arc::new(oracle_agent)) + } + + fn execute_open_requests(&self, oracle_agent: Arc) { let open_request_executor = execute_open_requests( self.shutdown.clone(), self.spacewalk_parachain.clone(), self.vault_id_manager.clone(), self.stellar_wallet.clone(), - oracle_agent.clone(), + oracle_agent, self.config.payment_margin_minutes, ); service::spawn_cancelable(self.shutdown.subscribe(), async move { @@ -494,56 +427,27 @@ impl VaultService { tracing::error!("Failed to process open requests: {}", e) }; }); + } - // issue handling - // this vec is passed to the stellar wallet to filter out transactions that are not relevant - // this has to be modified every time the issue set changes - let issue_map: ArcRwLock = Arc::new(RwLock::new(IssueRequestsMap::new())); - // this map resolves issue memo to issue ids - let memos_to_issue_ids: ArcRwLock = - Arc::new(RwLock::new(IssueIdLookup::new())); - - issue::initialize_issue_set(&self.spacewalk_parachain, &issue_map, &memos_to_issue_ids) - .await?; - - let issue_filter = IssueFilter::new(&vault_public_key)?; - - let ledger_env_map: ArcRwLock = Arc::new(RwLock::new(HashMap::new())); - - let (issue_event_tx, issue_event_rx) = mpsc::channel::(32); - let (replace_event_tx, replace_event_rx) = mpsc::channel::(16); - - tracing::info!("Starting all services..."); - let tasks = vec![ - ( - "VaultId Registration Listener", - run(self.vault_id_manager.clone().listen_for_vault_id_registrations()), - ), - ( - "Restart Timer", - run(async move { - tokio::time::sleep(RESTART_INTERVAL).await; - tracing::info!("Initiating periodic restart..."); - Err(ServiceError::ClientShutdown) - }), - ), - ( - "Stellar Transaction Listener", - run(wallet::listen_for_new_transactions( - vault_public_key.clone(), - is_public_network, - ledger_env_map.clone(), - issue_map.clone(), - memos_to_issue_ids.clone(), - issue_filter, - )), - ), + fn create_issue_tasks( + &self, + issue_event_tx: Sender, + issue_event_rx: Receiver, + startup_height: BlockNumber, + account_id: AccountId, + vault_public_key: PublicKey, + oracle_agent: Arc, + issue_map: ArcRwLock, + ledger_env_map: ArcRwLock, + memos_to_issue_ids: ArcRwLock, + ) -> Vec<(&str, ServiceTask)> { + vec![ ( "Issue Request Listener", run(issue::listen_for_issue_requests( self.spacewalk_parachain.clone(), vault_public_key, - issue_event_tx.clone(), + issue_event_tx, issue_map.clone(), memos_to_issue_ids.clone(), )), @@ -570,10 +474,10 @@ impl VaultService { !self.config.no_issue_execution, issue::process_issues_requests( self.spacewalk_parachain.clone(), - oracle_agent.clone(), - ledger_env_map.clone(), - issue_map.clone(), - memos_to_issue_ids.clone(), + oracle_agent, + ledger_env_map, + issue_map, + memos_to_issue_ids, ), ), ), @@ -582,10 +486,22 @@ impl VaultService { run(CancellationScheduler::new( self.spacewalk_parachain.clone(), startup_height, - account_id.clone(), + account_id, ) .handle_cancellation::(issue_event_rx)), ), + ] + } + + fn create_replace_tasks( + &self, + replace_event_tx: Sender, + replace_event_rx: Receiver, + startup_height: BlockNumber, + account_id: AccountId, + oracle_agent: Arc, + ) -> Vec<(&str, ServiceTask)> { + vec![ ( "Request Replace Listener", run(listen_for_replace_requests( @@ -602,43 +518,27 @@ impl VaultService { self.spacewalk_parachain.clone(), self.vault_id_manager.clone(), self.config.payment_margin_minutes, - oracle_agent.clone(), + oracle_agent, )), ), ( "Execute Replace Listener", - run(listen_for_execute_replace( - self.spacewalk_parachain.clone(), - replace_event_tx.clone(), - )), + run(listen_for_execute_replace(self.spacewalk_parachain.clone(), replace_event_tx)), ), ( "Replace Cancellation Scheduler", run(CancellationScheduler::new( self.spacewalk_parachain.clone(), startup_height, - account_id.clone(), + account_id, ) .handle_cancellation::(replace_event_rx)), ), - ( - "Parachain Block Listener", - run(active_block_listener( - self.spacewalk_parachain.clone(), - issue_event_tx.clone(), - replace_event_tx.clone(), - )), - ), - ( - "Redeem Request Listener", - run(listen_for_redeem_requests( - self.shutdown.clone(), - self.spacewalk_parachain.clone(), - self.vault_id_manager.clone(), - self.config.payment_margin_minutes, - oracle_agent.clone(), - )), - ), + ] + } + + fn create_bridge_metrics_tasks(&self) -> Vec<(&str, ServiceTask)> { + vec![ ( "Bridge Metrics Listener", maybe_run( @@ -656,12 +556,258 @@ impl VaultService { poll_metrics(self.spacewalk_parachain.clone(), self.vault_id_manager.clone()), ), ), - ]; + ] + } + + fn create_initial_tasks( + &self, + is_public_network: bool, + issue_event_tx: Sender, + replace_event_tx: Sender, + vault_public_key: PublicKey, + issue_map: ArcRwLock, + ledger_env_map: ArcRwLock, + memos_to_issue_ids: ArcRwLock, + ) -> Result, ServiceError> { + let issue_filter = IssueFilter::new(&vault_public_key)?; + + Ok(vec![ + ( + "VaultId Registration Listener", + run(self.vault_id_manager.clone().listen_for_vault_id_registrations()), + ), + ( + "Restart Timer", + run(async move { + tokio::time::sleep(RESTART_INTERVAL).await; + tracing::info!("Initiating periodic restart..."); + Err(ServiceError::ClientShutdown) + }), + ), + ( + "Stellar Transaction Listener", + run(wallet::listen_for_new_transactions( + vault_public_key, + is_public_network, + ledger_env_map, + issue_map, + memos_to_issue_ids, + issue_filter, + )), + ), + ( + "Parachain Block Listener", + run(active_block_listener( + self.spacewalk_parachain.clone(), + issue_event_tx, + replace_event_tx, + )), + ), + ]) + } + + fn create_tasks( + &self, + startup_height: BlockNumber, + account_id: AccountId, + is_public_network: bool, + vault_public_key: PublicKey, + oracle_agent: Arc, + issue_map: ArcRwLock, + ledger_env_map: ArcRwLock, + memos_to_issue_ids: ArcRwLock, + ) -> Result, ServiceError> { + let (issue_event_tx, issue_event_rx) = mpsc::channel::(32); + let (replace_event_tx, replace_event_rx) = mpsc::channel::(16); + + let mut tasks = self.create_initial_tasks( + is_public_network, + issue_event_tx.clone(), + replace_event_tx.clone(), + vault_public_key.clone(), + issue_map.clone(), + ledger_env_map.clone(), + memos_to_issue_ids.clone(), + )?; + + let mut issue_tasks = self.create_issue_tasks( + issue_event_tx.clone(), + issue_event_rx, + startup_height, + account_id.clone(), + vault_public_key, + oracle_agent.clone(), + issue_map, + ledger_env_map, + memos_to_issue_ids, + ); + + tasks.append(&mut issue_tasks); + + let mut replace_tasks = self.create_replace_tasks( + replace_event_tx.clone(), + replace_event_rx, + startup_height, + account_id, + oracle_agent.clone(), + ); + + tasks.append(&mut replace_tasks); + + tasks.push(( + "Parachain Block Listener", + run(active_block_listener( + self.spacewalk_parachain.clone(), + issue_event_tx, + replace_event_tx, + )), + )); + + tasks.push(( + "Redeem Request Listener", + run(listen_for_redeem_requests( + self.shutdown.clone(), + self.spacewalk_parachain.clone(), + self.vault_id_manager.clone(), + self.config.payment_margin_minutes, + oracle_agent, + )), + )); + + let mut bridge_metrics_tasks = self.create_bridge_metrics_tasks(); + + tasks.append(&mut bridge_metrics_tasks); + + Ok(tasks) + } +} + +impl VaultService { + async fn new( + spacewalk_parachain: SpacewalkParachain, + config: VaultServiceConfig, + monitoring_config: MonitoringConfig, + shutdown: ShutdownSender, + ) -> Result { + let is_public_network = + spacewalk_parachain.is_public_network().await.unwrap_or_else(|error| { + // Sometimes the fetch fails with 'StorageItemNotFound' error. + // We assume public network by default + tracing::warn!( + "Failed to fetch public network status from parachain: {error}. Assuming public network." + ); + true + }); + + let secret_key = fs::read_to_string(&config.stellar_vault_secret_key_filepath)? + .trim() + .to_string(); + let stellar_wallet = StellarWallet::from_secret_encoded(&secret_key, is_public_network)?; + tracing::debug!( + "Vault wallet public key: {}", + from_utf8(&stellar_wallet.get_public_key().to_encoding())? + ); + + let stellar_wallet = Arc::new(RwLock::new(stellar_wallet)); + + Ok(Self { + spacewalk_parachain: spacewalk_parachain.clone(), + stellar_wallet: stellar_wallet.clone(), + config, + monitoring_config, + shutdown, + vault_id_manager: VaultIdManager::new(spacewalk_parachain, stellar_wallet), + secret_key, + }) + } + + fn get_vault_id( + &self, + collateral_currency: CurrencyId, + wrapped_currency: CurrencyId, + ) -> VaultId { + VaultId { + account_id: self.spacewalk_parachain.get_account_id().clone(), + currencies: VaultCurrencyPair { + collateral: collateral_currency, + wrapped: wrapped_currency, + }, + } + } + + async fn run_service(&mut self) -> Result<(), ServiceError> { + let startup_height = self.await_parachain_block().await?; + let account_id = self.spacewalk_parachain.get_account_id().clone(); + + tracing::info!("Starting client service..."); + + let parsed_auto_register = self.auto_register()?; + + self.maintain_connection()?; + + self.register_public_key_if_not_present().await?; + + join_all(parsed_auto_register.iter().map( + |(collateral_currency, wrapped_currency, amount)| { + self.register_vault_if_not_present(collateral_currency, wrapped_currency, amount) + }, + )) + .await + .into_iter() + .collect::>()?; + + // purposefully _after_ register_vault_if_not_present and _before_ other calls + self.vault_id_manager.fetch_vault_ids().await?; + + let wallet = self.stellar_wallet.write().await; + let vault_public_key = wallet.get_public_key(); + let is_public_network = wallet.is_public_network(); + + // re-submit transactions in the cache + let _receivers = wallet.resubmit_transactions_from_cache().await; + + for result in _receivers { + if let Ok(Err(error)) = result.await { + let error = error; + let _ = wallet.handle_error(error).await; + } + } + + drop(wallet); + + let oracle_agent = self.create_oracle_agent(is_public_network).await?; + + self.execute_open_requests(oracle_agent.clone()); + + // issue handling + // this vec is passed to the stellar wallet to filter out transactions that are not relevant + // this has to be modified every time the issue set changes + let issue_map: ArcRwLock = Arc::new(RwLock::new(IssueRequestsMap::new())); + // this map resolves issue memo to issue ids + let memos_to_issue_ids: ArcRwLock = + Arc::new(RwLock::new(IssueIdLookup::new())); + + issue::initialize_issue_set(&self.spacewalk_parachain, &issue_map, &memos_to_issue_ids) + .await?; + + let ledger_env_map: ArcRwLock = Arc::new(RwLock::new(HashMap::new())); + + tracing::info!("Starting all services..."); + let tasks = self.create_tasks( + startup_height, + account_id, + is_public_network, + vault_public_key, + oracle_agent, + issue_map, + ledger_env_map, + memos_to_issue_ids, + )?; run_and_monitor_tasks(self.shutdown.clone(), tasks).await } - async fn maybe_register_public_key(&mut self) -> Result<(), Error> { + async fn register_public_key_if_not_present(&mut self) -> Result<(), Error> { if let Some(_faucet_url) = &self.config.faucet_url { // TODO fund account with faucet } @@ -681,7 +827,50 @@ impl VaultService { Ok(()) } - async fn maybe_register_vault( + async fn register_vault_with_collateral( + &self, + vault_id: VaultId, + collateral_amount: &Option, + ) -> Result<(), Error> { + if let Some(collateral) = collateral_amount { + tracing::info!("[{}] Automatically registering...", vault_id.pretty_print()); + let free_balance = self + .spacewalk_parachain + .get_free_balance(vault_id.collateral_currency()) + .await?; + return self + .spacewalk_parachain + .register_vault( + &vault_id, + if collateral.gt(&free_balance) { + tracing::warn!( + "Cannot register with {}, using the available free balance: {}", + collateral, + free_balance + ); + free_balance + } else { + *collateral + }, + ) + .await + .map_err(|e| Error::RuntimeError(e)) + } else if let Some(_faucet_url) = &self.config.faucet_url { + tracing::info!("[{}] Automatically registering...", vault_id.pretty_print()); + // TODO + // faucet::fund_and_register(&self.spacewalk_parachain, faucet_url, &vault_id) + // .await?; + Ok(()) + } else { + tracing::error!( + "[{}] Cannot register a vault: no collateral and no faucet url", + vault_id.pretty_print() + ); + Err(Error::FaucetUrlNotSet) + } + } + + async fn register_vault_if_not_present( &self, collateral_currency: &CurrencyId, wrapped_currency: &CurrencyId, @@ -689,47 +878,21 @@ impl VaultService { ) -> Result<(), Error> { let vault_id = self.get_vault_id(*collateral_currency, *wrapped_currency); - match is_vault_registered(&self.spacewalk_parachain, &vault_id).await { - Err(Error::RuntimeError(RuntimeError::VaultLiquidated)) | Ok(true) => { + // check if a vault is registered + match self.spacewalk_parachain.get_vault(&vault_id).await { + Ok(_) | Err(RuntimeError::VaultLiquidated) => { tracing::info!( "[{}] Not registering vault -- already registered", vault_id.pretty_print() ); + Ok(()) }, - Ok(false) => { + Err(RuntimeError::VaultNotFound) => { tracing::info!("[{}] Not registered", vault_id.pretty_print()); - if let Some(collateral) = maybe_collateral_amount { - tracing::info!("[{}] Automatically registering...", vault_id.pretty_print()); - let free_balance = self - .spacewalk_parachain - .get_free_balance(vault_id.collateral_currency()) - .await?; - self.spacewalk_parachain - .register_vault( - &vault_id, - if collateral.gt(&free_balance) { - tracing::warn!( - "Cannot register with {}, using the available free balance: {}", - collateral, - free_balance - ); - free_balance - } else { - *collateral - }, - ) - .await?; - } else if let Some(_faucet_url) = &self.config.faucet_url { - tracing::info!("[{}] Automatically registering...", vault_id.pretty_print()); - // TODO - // faucet::fund_and_register(&self.spacewalk_parachain, faucet_url, &vault_id) - // .await?; - } + self.register_vault_with_collateral(vault_id, maybe_collateral_amount).await }, - Err(x) => return Err(x), + Err(e) => Err(Error::RuntimeError(e)), } - - Ok(()) } async fn await_parachain_block(&self) -> Result { @@ -743,14 +906,3 @@ impl VaultService { Ok(startup_height) } } - -pub(crate) async fn is_vault_registered( - parachain_rpc: &SpacewalkParachain, - vault_id: &VaultId, -) -> Result { - match parachain_rpc.get_vault(vault_id).await { - Ok(_) => Ok(true), - Err(RuntimeError::VaultNotFound) => Ok(false), - Err(err) => Err(err.into()), - } -} diff --git a/clients/wallet/src/stellar_wallet.rs b/clients/wallet/src/stellar_wallet.rs index 715dd63f1..1eba624a7 100644 --- a/clients/wallet/src/stellar_wallet.rs +++ b/clients/wallet/src/stellar_wallet.rs @@ -31,9 +31,6 @@ use primitives::{ StellarPublicKeyRaw, StellarStroops, StellarTypeToString, TransactionEnvelopeExt, }; -#[cfg(test)] -extern crate mocktopus; - use crate::error::Error::DecodeError; #[cfg(test)] use mocktopus::macros::mockable;