From 93e177a4aa52aeb30aec23ae58c96d8f4b1aa953 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Thu, 1 Aug 2024 22:46:16 +0800 Subject: [PATCH] first iteration --- Cargo.lock | 323 ++++++++++++++++-- clients/stellar-relay-lib/Cargo.toml | 2 + clients/stellar-relay-lib/examples/connect.rs | 1 + .../stellar_relay_config_mainnet_iowa.json | 4 +- clients/vault/src/oracle/agent.rs | 104 ++++-- .../vault/src/oracle/collector/collector.rs | 1 - clients/vault/src/system.rs | 66 ++-- 7 files changed, 402 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a79fc6471..9b94dcb78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -612,13 +612,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "itoa", "matchit", "memchr", @@ -627,7 +627,34 @@ dependencies = [ "pin-project-lite 0.2.14", "rustversion", "serde", - "sync_wrapper", + "sync_wrapper 0.1.2", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core 0.4.3", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding 2.3.1", + "pin-project-lite 0.2.14", + "rustversion", + "serde", + "sync_wrapper 1.0.1", "tower", "tower-layer", "tower-service", @@ -643,13 +670,33 @@ dependencies = [ "bytes", "futures-util", "http 0.2.12", - "http-body", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite 0.2.14", + "rustversion", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "backoff" version = "0.3.0" @@ -715,6 +762,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64ct" version = "1.6.0" @@ -1345,7 +1398,20 @@ dependencies = [ "futures-core", "prost 0.12.6", "prost-types 0.12.6", - "tonic", + "tonic 0.10.2", + "tracing-core", +] + +[[package]] +name = "console-api" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ed14aa9c9f927213c6e4f3ef75faaad3406134efe84ba2cb7983431d5f0931" +dependencies = [ + "futures-core", + "prost 0.13.1", + "prost-types 0.13.1", + "tonic 0.12.1", "tracing-core", ] @@ -1355,7 +1421,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" dependencies = [ - "console-api", + "console-api 0.6.0", "crossbeam-channel", "crossbeam-utils", "futures-task", @@ -1367,7 +1433,33 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic", + "tonic 0.10.2", + "tracing", + "tracing-core", + "tracing-subscriber 0.3.18", +] + +[[package]] +name = "console-subscriber" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e3a111a37f3333946ebf9da370ba5c5577b18eb342ec683eb488dd21980302" +dependencies = [ + "console-api 0.8.0", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime 2.1.0", + "hyper-util", + "prost 0.13.1", + "prost-types 0.13.1", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic 0.12.1", "tracing", "tracing-core", "tracing-subscriber 0.3.18", @@ -3402,6 +3494,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "handlebars" version = "4.5.0" @@ -3656,6 +3767,29 @@ dependencies = [ "pin-project-lite 0.2.14", ] +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "pin-project-lite 0.2.14", +] + [[package]] name = "http-range-header" version = "0.3.1" @@ -3699,9 +3833,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -3713,6 +3847,27 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite 0.2.14", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.23.2" @@ -3720,7 +3875,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ "http 0.2.12", - "hyper", + "hyper 0.14.28", "log", "rustls 0.20.9", "rustls-native-certs", @@ -3736,7 +3891,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper", + "hyper 0.14.28", "log", "rustls 0.21.12", "rustls-native-certs", @@ -3751,12 +3906,25 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.28", "pin-project-lite 0.2.14", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper 1.4.1", + "hyper-util", + "pin-project-lite 0.2.14", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -3764,12 +3932,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.28", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.4.1", + "pin-project-lite 0.2.14", + "socket2 0.5.7", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -4102,7 +4290,7 @@ checksum = "d2b99d4207e2a04fb4581746903c2bb7eb376f88de9c699d0f3e10feeac0cd3a" dependencies = [ "derive_more", "futures 0.3.30", - "hyper", + "hyper 0.14.28", "hyper-tls", "jsonrpc-core", "jsonrpc-pubsub", @@ -4210,7 +4398,7 @@ dependencies = [ "futures-timer", "futures-util", "globset", - "hyper", + "hyper 0.14.28", "jsonrpsee-types", "parking_lot 0.12.2", "rand 0.8.5", @@ -4231,7 +4419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e5f9fabdd5d79344728521bb65e3106b49ec405a78b66fbff073b72b389fa43" dependencies = [ "async-trait", - "hyper", + "hyper 0.14.28", "hyper-rustls 0.24.2", "jsonrpsee-core", "jsonrpsee-types", @@ -4265,7 +4453,7 @@ dependencies = [ "futures-channel", "futures-util", "http 0.2.12", - "hyper", + "hyper 0.14.28", "jsonrpsee-core", "jsonrpsee-types", "serde", @@ -6959,6 +7147,16 @@ dependencies = [ "prost-derive 0.12.6", ] +[[package]] +name = "prost" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" +dependencies = [ + "bytes", + "prost-derive 0.13.1", +] + [[package]] name = "prost-build" version = "0.11.9" @@ -7020,6 +7218,19 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "prost-derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -7038,6 +7249,15 @@ dependencies = [ "prost 0.12.6", ] +[[package]] +name = "prost-types" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +dependencies = [ + "prost 0.13.1", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -7520,10 +7740,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-tls", "ipnet", "js-sys", @@ -7537,7 +7757,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio", "tokio-native-tls", @@ -8653,7 +8873,7 @@ dependencies = [ "fnv", "futures 0.3.30", "futures-timer", - "hyper", + "hyper 0.14.28", "hyper-rustls 0.23.2", "libp2p", "num_cpus", @@ -9501,7 +9721,7 @@ dependencies = [ "clap 3.2.25", "futures 0.3.30", "governor", - "hyper", + "hyper 0.14.28", "hyper-tls", "nonzero_ext", "runtime", @@ -10783,6 +11003,7 @@ version = "1.0.10" dependencies = [ "async-std", "base64 0.13.1", + "console-subscriber 0.4.0", "env_logger 0.9.3", "err-derive", "hex", @@ -10898,7 +11119,7 @@ name = "substrate-prometheus-endpoint" version = "0.10.0-dev" source = "git+https://github.com/paritytech//substrate?branch=polkadot-v0.9.42#ff24c60ac7d9f87727ecdd0ded9a80c56e4f4b65" dependencies = [ - "hyper", + "hyper 0.14.28", "log", "prometheus 0.13.4", "thiserror", @@ -11154,6 +11375,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "synstructure" version = "0.12.6" @@ -11584,14 +11811,14 @@ checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", - "hyper", - "hyper-timeout", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-timeout 0.4.1", "percent-encoding 2.3.1", "pin-project", "prost 0.12.6", @@ -11603,6 +11830,36 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.5", + "base64 0.22.1", + "bytes", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-timeout 0.5.1", + "hyper-util", + "percent-encoding 2.3.1", + "pin-project", + "prost 0.13.1", + "socket2 0.5.7", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -11634,7 +11891,7 @@ dependencies = [ "futures-core", "futures-util", "http 0.2.12", - "http-body", + "http-body 0.4.6", "http-range-header", "pin-project-lite 0.2.14", "tower-layer", @@ -12079,7 +12336,7 @@ dependencies = [ "bincode", "cfg-if 1.0.0", "clap 3.2.25", - "console-subscriber", + "console-subscriber 0.2.0", "env_logger 0.9.3", "err-derive", "flate2", @@ -12264,7 +12521,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.12", - "hyper", + "hyper 0.14.28", "log", "mime", "mime_guess", diff --git a/clients/stellar-relay-lib/Cargo.toml b/clients/stellar-relay-lib/Cargo.toml index c1efb8c83..2f1ad98e3 100644 --- a/clients/stellar-relay-lib/Cargo.toml +++ b/clients/stellar-relay-lib/Cargo.toml @@ -15,6 +15,8 @@ serial_test = "0.9.0" wallet = { path = "../wallet", features = ["testing-utils"] } [dependencies] +console-subscriber = { version = "0.4.0" } + hex = "0.4.3" tracing = { version = "0.1", features = ["log"] } diff --git a/clients/stellar-relay-lib/examples/connect.rs b/clients/stellar-relay-lib/examples/connect.rs index d63a73462..61b454c14 100644 --- a/clients/stellar-relay-lib/examples/connect.rs +++ b/clients/stellar-relay-lib/examples/connect.rs @@ -9,6 +9,7 @@ use wallet::keys::get_source_secret_key_from_env; #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); + console_subscriber::init(); let args: Vec = std::env::args().collect(); let arg_network = if args.len() > 1 { &args[1] } else { "testnet" }; diff --git a/clients/stellar-relay-lib/resources/config/mainnet/stellar_relay_config_mainnet_iowa.json b/clients/stellar-relay-lib/resources/config/mainnet/stellar_relay_config_mainnet_iowa.json index 4c4d575fa..697f731af 100644 --- a/clients/stellar-relay-lib/resources/config/mainnet/stellar_relay_config_mainnet_iowa.json +++ b/clients/stellar-relay-lib/resources/config/mainnet/stellar_relay_config_mainnet_iowa.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 21, - "overlay_version": 33, + "overlay_version": 34, "overlay_min_version": 32, - "version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)", + "version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index d7836a2fe..bb0996df9 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -6,6 +6,7 @@ use tokio::{ }; use runtime::ShutdownSender; +use runtime::stellar::SecretKey; use stellar_relay_lib::{ connect_to_stellar_overlay_network, helper::to_base64_xdr_string, sdk::types::StellarMessage, StellarOverlayConfig, @@ -17,14 +18,33 @@ use crate::oracle::{ use wallet::Slot; pub struct OracleAgent { - collector: Arc>, + pub collector: Arc>, pub is_public_network: bool, /// sends message directly to Stellar Node message_sender: Option, /// sends an entire Vault shutdown - shutdown_sender: ShutdownSender, - /// sends a 'stop' signal to `StellarOverlayConnection` poll - overlay_conn_end_signal: mpsc::Sender<()>, + shutdown_sender: ShutdownSender +} + +impl OracleAgent { + pub fn new( + config: StellarOverlayConfig, + shutdown_sender: ShutdownSender + ) -> Self { + let is_public_network = config.is_public_network(); + + let collector = Arc::new(RwLock::new(ScpMessageCollector::new( + is_public_network, + config.stellar_history_archive_urls(), + ))); + + OracleAgent { + collector, + is_public_network, + message_sender: None, + shutdown_sender, + } + } } /// listens to data to collect the scp messages and txsets. @@ -57,6 +77,50 @@ async fn handle_message( Ok(()) } +pub async fn listen_for_stellar_messages( + config: StellarOverlayConfig, + collector: Arc>, + secret_key_as_str: &str, + shutdown_sender: ShutdownSender, +) -> Result<(),service::Error> { + tracing::info!("listen_for_stellar_messages(): Starting connection to Stellar overlay network..."); + + let mut overlay_conn = connect_to_stellar_overlay_network(config.clone(), secret_key_as_str).await + .map_err(|e| service::Error::VaultError(Error::Other(format!("{e:?}"))))?; + + // use StellarOverlayConnection's sender to send message to Stellar + let sender = overlay_conn.sender(); + + loop { + tokio::select! { + _ = sleep(Duration::from_millis(100)) => {}, + result = overlay_conn.listen() => match result { + Ok(None) => {}, + Ok(Some(msg)) => { + let msg_as_str = to_base64_xdr_string(&msg); + if let Err(e) = handle_message(msg, collector.clone(), &sender).await { + tracing::error!("listen_for_stellar_messages(): failed to handle message: {msg_as_str}: {e:?}"); + } + } + // connection got lost + Err(e) => { + tracing::error!("listen_for_stellar_messages(): encounter error in overlay: {e:?}"); + + if let Err(e) = shutdown_sender.send(()) { + tracing::error!("listen_for_stellar_messages(): Failed to send shutdown signal in thread: {e:?}"); + } + break + } + } + } + } + + tracing::info!("listen_for_stellar_messages(): shutting down overlay connection"); + overlay_conn.stop(); + + Ok(()) +} + /// Start the connection to the Stellar Node. /// Returns an `OracleAgent` that will handle incoming messages from Stellar Node, /// and to send messages to Stellar Node @@ -88,9 +152,7 @@ pub async fn start_oracle_agent( tokio::spawn(async move { loop { tokio::select! { - _ = sleep(Duration::from_millis(100)) => { - tracing::info!("start_oracle_agent(): go to sleep"); - }, + _ = sleep(Duration::from_millis(100)) => {}, // if a disconnect signal was sent, disconnect from Stellar. result = disconnect_signal_receiver.recv() => { if result.is_none() { @@ -98,9 +160,7 @@ pub async fn start_oracle_agent( break } }, - result = overlay_conn.listen() => { - tracing::info!("start_oracle_agent(): received message from overlay"); - match result { + result = overlay_conn.listen() => match result { Ok(Some(msg)) => { let msg_as_str = to_base64_xdr_string(&msg); if let Err(e) = handle_message(msg, collector_clone.clone(), &sender_clone).await { @@ -117,7 +177,7 @@ pub async fn start_oracle_agent( } break }, - }}, + }, } } @@ -130,8 +190,7 @@ pub async fn start_oracle_agent( collector, is_public_network, message_sender: Some(sender), - shutdown_sender, - overlay_conn_end_signal: disconnect_signal_sender, + shutdown_sender }) } @@ -177,25 +236,6 @@ impl OracleAgent { Error::ProofTimeout(format!("Timeout elapsed for building proof of slot {slot}")) })? } - - pub async fn last_slot_index(&self) -> Slot { - self.collector.read().await.last_slot_index() - } - - pub async fn remove_data(&self, slot: &Slot) { - self.collector.read().await.remove_data(slot); - } - - /// Stops listening for new SCP messages. - pub async fn shutdown(&self) { - tracing::debug!("shutdown(): Shutting down OracleAgent..."); - if let Err(e) = self.overlay_conn_end_signal.send(()).await { - tracing::error!( - "shutdown(): Failed to send overlay conn end signal in OracleAgent: {:?}", - e - ); - } - } } #[cfg(test)] diff --git a/clients/vault/src/oracle/collector/collector.rs b/clients/vault/src/oracle/collector/collector.rs index 52999145c..32658c53f 100644 --- a/clients/vault/src/oracle/collector/collector.rs +++ b/clients/vault/src/oracle/collector/collector.rs @@ -69,7 +69,6 @@ impl ScpMessageCollector { stellar_history_archive_urls, } } - pub fn envelopes_map_len(&self) -> usize { self.envelopes_map.read().len() } diff --git a/clients/vault/src/system.rs b/clients/vault/src/system.rs index 46471614d..3a3bd9854 100644 --- a/clients/vault/src/system.rs +++ b/clients/vault/src/system.rs @@ -39,6 +39,7 @@ use crate::{ service::{CancellationScheduler, IssueCanceller}, ArcRwLock, Event, CHAIN_HEIGHT_POLLING_INTERVAL, }; +use crate::oracle::listen_for_stellar_messages; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); pub const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); @@ -296,7 +297,6 @@ impl Service for VaultService { async fn start(&mut self) -> Result<(), ServiceError> { let result = self.run_service().await; - self.try_shutdown_agent().await; self.try_shutdown_wallet().await; if let Err(error) = result { @@ -415,29 +415,27 @@ impl VaultService { Ok(()) } - async fn create_oracle_agent( + fn create_oracle_agent( &self, is_public_network: bool, shutdown_sender: ShutdownSender, ) -> Result, ServiceError> { - let cfg_path = &self.config.stellar_overlay_config_filepath; - let stellar_overlay_cfg = - StellarOverlayConfig::try_from_path(cfg_path).map_err(Error::StellarRelayError)?; + let stellar_overlay_cfg = self.stellar_overlay_cfg()?; // check if both the config file and the wallet are the same. if is_public_network != stellar_overlay_cfg.is_public_network() { return Err(ServiceError::IncompatibleNetwork) } - let oracle_agent = crate::oracle::start_oracle_agent( - stellar_overlay_cfg, - &self.secret_key, - shutdown_sender, - ) - .await - .expect("Failed to start oracle agent"); + // let oracle_agent = crate::oracle::start_oracle_agent( + // stellar_overlay_cfg, + // &self.secret_key, + // shutdown_sender, + // ) + // .await + // .expect("Failed to start oracle agent"); - Ok(Arc::new(oracle_agent)) + Ok(Arc::new(OracleAgent::new(stellar_overlay_cfg,shutdown_sender))) } fn execute_open_requests(&self, oracle_agent: Arc) { @@ -707,6 +705,11 @@ impl VaultService { Ok(tasks) } + + fn stellar_overlay_cfg(&self) -> Result { + let cfg_path = &self.config.stellar_overlay_config_filepath; + StellarOverlayConfig::try_from_path(cfg_path).map_err(Error::StellarRelayError) + } } impl VaultService { @@ -789,14 +792,13 @@ impl VaultService { .await; drop(wallet); - let oracle_agent = - self.create_oracle_agent(is_public_network, self.shutdown.clone()).await?; + // let oracle_agent = + // self.create_oracle_agent(is_public_network, self.shutdown.clone()).await?; + let oracle_agent = self.create_oracle_agent(is_public_network, self.shutdown.clone())?; self.agent = Some(oracle_agent.clone()); self.execute_open_requests(oracle_agent.clone()); - tracing::info!("CONTINUE ON HOOY"); - // issue handling // this vec is passed to the stellar wallet to filter out transactions that are not relevant // this has to be modified every time the issue set changes @@ -808,12 +810,24 @@ impl VaultService { issue::initialize_issue_set(&self.spacewalk_parachain, &issue_map, &memos_to_issue_ids) .await?; - tracing::info!("ISSUE INITIALIZE ISSUE SET!!!"); - let ledger_env_map: ArcRwLock = Arc::new(RwLock::new(HashMap::new())); tracing::info!("Starting all services..."); - let tasks = self.create_tasks( + + // + let mut tasks = vec![( + "Stellar Messages Listener", + run( + listen_for_stellar_messages( + self.stellar_overlay_cfg()?, + oracle_agent.collector.clone(), + &self.secret_key, + self.shutdown.clone() + ) + ) + )]; + + let mut _tasks = self.create_tasks( startup_height, account_id, is_public_network, @@ -823,6 +837,7 @@ impl VaultService { ledger_env_map, memos_to_issue_ids, )?; + tasks.append(&mut _tasks); run_and_monitor_tasks(self.shutdown.clone(), tasks).await } @@ -957,15 +972,4 @@ impl VaultService { drop(wallet); } - async fn try_shutdown_agent(&mut self) { - let opt_agent = self.agent.clone(); - self.agent = None; - - if let Some(arc_agent) = opt_agent { - tracing::info!("try_shutdown_agent(): shutting down agent"); - arc_agent.shutdown().await; - } else { - tracing::debug!("try_shutdown_agent(): no agent found"); - } - } }