From 000f22858c277b0742fcf55fc7d14dcc1cf11c26 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Tue, 23 Jan 2024 20:54:20 +0800 Subject: [PATCH 01/10] async-std --- Cargo.lock | 120 +++++++++++++++++- clients/stellar-relay-lib/Cargo.toml | 14 +- clients/stellar-relay-lib/examples/connect.rs | 2 +- .../src/connection/connector/connector.rs | 98 +++++++------- .../connection/connector/message_reader.rs | 82 +++++------- .../connection/connector/message_sender.rs | 31 ++--- .../stellar-relay-lib/src/connection/error.rs | 14 +- clients/stellar-relay-lib/src/overlay.rs | 55 +++++--- clients/stellar-relay-lib/src/tests/mod.rs | 27 +++- clients/vault/Cargo.toml | 1 + clients/vault/src/oracle/agent.rs | 60 +++++---- clients/vault/src/oracle/collector/handler.rs | 3 +- clients/vault/src/oracle/errors.rs | 1 + clients/vault/src/oracle/types/types.rs | 5 +- 14 files changed, 331 insertions(+), 182 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10847a510..d734e5851 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,16 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -393,6 +403,35 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-executor" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b0c4a4f319e45986f347ee47fef8bf5e81c9abc3f6f58dc2391439f30df65f0" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 2.0.0", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + [[package]] name = "async-io" version = "1.13.0" @@ -422,6 +461,39 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-attributes", + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite 0.2.10", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" + [[package]] name = "async-trait" version = "0.1.72" @@ -695,6 +767,22 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" +[[package]] +name = "blocking" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c36a4d0d48574b3dd360b4b7d95cc651d2b6557b6402848a27d4b228a473e2a" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "fastrand 2.0.0", + "futures-io", + "futures-lite", + "piper", + "tracing", +] + [[package]] name = "bounded-collections" version = "0.1.8" @@ -3826,6 +3914,15 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "kvdb" version = "0.13.0" @@ -4495,6 +4592,9 @@ name = "log" version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +dependencies = [ + "value-bag", +] [[package]] name = "lru" @@ -6022,6 +6122,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.0", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -9965,6 +10076,7 @@ dependencies = [ name = "stellar-relay-lib" version = "1.0.3" dependencies = [ + "async-std", "base64 0.13.1", "env_logger 0.9.3", "err-derive", @@ -9978,7 +10090,6 @@ dependencies = [ "serial_test", "sha2 0.10.7", "substrate-stellar-sdk", - "tokio", "tweetnacl", ] @@ -11119,10 +11230,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cdbaf5e132e593e9fc1de6a15bbec912395b11fb9719e061cf64f804524c503" + [[package]] name = "vault" version = "1.0.3" dependencies = [ + "async-std", "async-trait", "base64 0.13.1", "bincode", diff --git a/clients/stellar-relay-lib/Cargo.toml b/clients/stellar-relay-lib/Cargo.toml index 50bb38442..a80b6dcce 100644 --- a/clients/stellar-relay-lib/Cargo.toml +++ b/clients/stellar-relay-lib/Cargo.toml @@ -33,12 +33,14 @@ substrate-stellar-sdk = {git = "https://github.com/pendulum-chain/substrate-stel err-derive = "0.3.1" -tokio = { version = "1.0", features = [ - "macros", # allows main function to be async - "rt-multi-thread", # for multi-thread runtime - "sync", # to make channels available - "time" # for timeouts and sleep, when reconnecting -] } +#tokio = { version = "1.0", features = [ +# "macros", # allows main function to be async +# "rt-multi-thread", # for multi-thread runtime +# "sync", # to make channels available +# "time" # for timeouts and sleep, when reconnecting +#] } + +async-std = { version = "1.12.0", features = ["attributes"] } [features] std = [ diff --git a/clients/stellar-relay-lib/examples/connect.rs b/clients/stellar-relay-lib/examples/connect.rs index 9dd308798..817a23330 100644 --- a/clients/stellar-relay-lib/examples/connect.rs +++ b/clients/stellar-relay-lib/examples/connect.rs @@ -4,7 +4,7 @@ use stellar_relay_lib::{ StellarOverlayConfig, }; -#[tokio::main] +#[async_std::main] async fn main() -> Result<(), Box> { env_logger::init(); diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index f9bb8d56a..be4a34c80 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -1,9 +1,9 @@ use std::{ fmt::{Debug, Formatter}, - net::TcpStream, - sync::{Arc, Mutex}, - time::Duration, + // net::TcpStream, }; +use std::net::Shutdown; +use async_std::net::TcpStream; use substrate_stellar_sdk::{ types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType}, XdrCodec, @@ -37,7 +37,7 @@ pub struct Connector { flow_controller: FlowController, /// for writing/reading xdr messages to/from Stellar Node. - pub(crate) tcp_stream: Arc>, + pub(crate) tcp_stream: TcpStream } impl Debug for Connector { @@ -53,10 +53,31 @@ impl Debug for Connector { .field("receive_scp_messages", &self.receive_scp_messages) .field("handshake_state", &self.handshake_state) .field("flow_controller", &self.flow_controller) + .field("local_addr", + &self.tcp_stream.local_addr() + .map(|addr| addr.to_string()) + .unwrap_or("cannot provide".to_string()) + ) + .field("peer_addr", + &self.tcp_stream.peer_addr() + .map(|addr| addr.to_string()) + .unwrap_or("cannot provide".to_string()) + ) .finish() } } +impl Drop for Connector { + fn drop(&mut self) { + if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { + log::error!("drop(): failed to shutdown tcp stream: {}", e); + } else { + log::info!("drop(): tcp stream successfully shutdown"); + } + + } +} + impl Connector { /// Verifies the AuthenticatedMessage, received from the Stellar Node pub(super) fn verify_auth( @@ -115,22 +136,16 @@ impl Connector { /// returns a Connector and starts creating a connection to Stellar pub async fn start(local_node: NodeInfo, conn_info: ConnectionInfo) -> Result { + // Create the stream + let tcp_stream = TcpStream::connect(conn_info.address()).await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; + let connection_auth = ConnectionAuth::new( &local_node.network_id, conn_info.keypair(), conn_info.auth_cert_expiration, ); - // Create the stream - let tcp_stream = TcpStream::connect(conn_info.address()) - .map_err(|e| Error::ConnectionFailed(e.to_string()))?; - - if let Err(e) = - tcp_stream.set_read_timeout(Some(Duration::from_secs(conn_info.timeout_in_secs))) - { - log::warn!("start(): failed to set read timeout for the stream: {e:?}"); - } - let mut connector = Connector { local: LocalInfo::new(local_node), remote_info: None, @@ -142,7 +157,7 @@ impl Connector { receive_scp_messages: conn_info.recv_scp_msgs, handshake_state: HandshakeState::Connecting, flow_controller: FlowController::default(), - tcp_stream: Arc::new(Mutex::new(tcp_stream)), + tcp_stream }; // To start the handshake, send a hello message to Stellar @@ -225,13 +240,20 @@ impl Connector { ) { self.flow_controller.enable(local_overlay_version, remote_overlay_version) } + + // pub fn shutdown(&mut self) { + // if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { + // log::error!("shutdown(): failed to shutdown tcp stream: {}", e); + // } else { + // log::info!("shutdown(): tcp stream successfully shutdown"); + // } + // } } #[cfg(test)] mod test { use crate::{connection::hmac::HMacKeys, node::RemoteInfo, StellarOverlayConfig}; use serial_test::serial; - use std::net::Shutdown; use substrate_stellar_sdk::{ compound_types::LimitedString, @@ -263,16 +285,6 @@ mod test { new_auth_cert } - impl Connector { - fn shutdown(&mut self) { - self.tcp_stream - .lock() - .unwrap() - .shutdown(Shutdown::Both) - .expect("should shutdown both read and write of stream"); - } - } - async fn create_connector() -> (NodeInfo, ConnectionInfo, Connector) { let cfg_file_path = "./resources/config/testnet/stellar_relay_config_sdftest1.json"; let secret_key_path = "./resources/secretkey/stellar_secretkey_testnet"; @@ -291,10 +303,10 @@ mod test { (node_info, conn_info, connector) } - #[tokio::test] + #[async_std::test] #[serial] async fn create_new_connector_works() { - let (node_info, _, mut connector) = create_connector().await; + let (node_info, _, connector) = create_connector().await; let connector_local_node = connector.local.node(); @@ -303,11 +315,9 @@ mod test { assert_eq!(connector_local_node.overlay_min_version, node_info.overlay_min_version); assert_eq!(connector_local_node.version_str, node_info.version_str); assert_eq!(connector_local_node.network_id, node_info.network_id); - - connector.shutdown(); } - #[tokio::test] + #[async_std::test] #[serial] async fn connector_local_sequence_works() { let (_, _, mut connector) = create_connector().await; @@ -315,10 +325,10 @@ mod test { connector.increment_local_sequence(); assert_eq!(connector.local_sequence(), 1); - connector.shutdown(); + } - #[tokio::test] + #[async_std::test] #[serial] async fn connector_set_remote_works() { let (_, _, mut connector) = create_connector().await; @@ -341,10 +351,10 @@ mod test { assert!(connector.remote().is_some()); - connector.shutdown(); + } - #[tokio::test] + #[async_std::test] #[serial] async fn connector_increment_remote_sequence_works() { let (_, _, mut connector) = create_connector().await; @@ -371,12 +381,13 @@ mod test { connector.increment_remote_sequence().unwrap(); assert_eq!(connector.remote().unwrap().sequence(), 3); - connector.shutdown(); + } - #[tokio::test] + #[async_std::test] #[serial] async fn connector_get_and_set_hmac_keys_works() { + //arrange let (_, _, mut connector) = create_connector().await; let connector_auth = &connector.connection_auth; @@ -409,12 +420,14 @@ mod test { //assert assert!(connector.hmac_keys().is_some()); - connector.shutdown(); + } - #[tokio::test] + #[async_std::test] #[serial] async fn connector_method_works() { + env_logger::init(); + let (_, conn_config, mut connector) = create_connector().await; assert_eq!(connector.remote_called_us(), conn_config.remote_called_us); @@ -426,18 +439,17 @@ mod test { connector.handshake_completed(); assert!(connector.is_handshake_created()); - - connector.shutdown(); } - #[tokio::test] + #[async_std::test] #[serial] async fn enable_flow_controller_works() { + let (node_info, _, mut connector) = create_connector().await; assert!(!connector.inner_check_to_send_more(MessageType::ScpMessage)); connector.enable_flow_controller(node_info.overlay_version, node_info.overlay_version); - connector.shutdown(); + } } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 7b48daf99..26d591154 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -1,12 +1,10 @@ +use std::net::Shutdown; +use async_std::channel::{Receiver, Sender, TryRecvError}; +use async_std::io::ReadExt; use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; -use std::{ - io::Read, - net::{Shutdown, TcpStream}, - sync::{Arc, Mutex}, -}; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; -use tokio::sync::{mpsc, mpsc::error::TryRecvError}; +// use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// Polls for messages coming from the Stellar Node and communicates it back to the user /// @@ -17,11 +15,11 @@ use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// stream. pub(crate) async fn poll_messages_from_stellar( mut connector: Connector, - send_to_user_sender: mpsc::Sender, - mut send_to_node_receiver: mpsc::Receiver, + send_to_user_sender: Sender, + send_to_node_receiver: Receiver, ) { log::info!("poll_messages_from_stellar(): started."); - // clone the stream to perform a read operation on the next function calls + loop { if send_to_user_sender.is_closed() { log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); @@ -35,7 +33,7 @@ pub(crate) async fn poll_messages_from_stellar( if let Err(e) = connector.send_to_node(msg).await { log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, - Err(TryRecvError::Disconnected) => { + Err(TryRecvError::Closed) => { log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); break }, @@ -43,12 +41,7 @@ pub(crate) async fn poll_messages_from_stellar( } // check for messages from Stellar Node. - let stream_clone = connector.tcp_stream.clone(); - // Spawn a blocking task to read from the stream - let xdr_result = read_message_from_stellar(stream_clone).await; - - // Check the result of the blocking task - let xdr = match xdr_result { + let xdr = match read_message_from_stellar(&mut connector).await { Err(e) => { log::error!("poll_messages_from_stellar(): {e:?}"); break @@ -72,8 +65,9 @@ pub(crate) async fn poll_messages_from_stellar( }, } } + // make sure to shutdown the stream - if let Err(e) = connector.tcp_stream.clone().lock().unwrap().shutdown(Shutdown::Both) { + if let Err(e) = connector.tcp_stream.shutdown(Shutdown::Both) { log::error!("poll_messages_from_stellar(): Failed to shutdown the tcp stream: {e:?}"); }; send_to_node_receiver.close(); @@ -83,26 +77,19 @@ pub(crate) async fn poll_messages_from_stellar( } /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node -async fn read_message_from_stellar(stream_clone: Arc>) -> Result { +async fn read_message_from_stellar(connector: &mut Connector) -> Result { // holds the number of bytes that were missing from the previous stellar message. let mut lack_bytes_from_prev = 0; let mut readbuf: Vec = vec![]; let mut buff_for_reading = vec![0; 4]; - loop { + // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message - // Temporary scope for locking - let result = { - let mut stream = stream_clone.lock().unwrap(); - stream.read(&mut buff_for_reading) - }; - - match result { + match connector.tcp_stream.read(&mut buff_for_reading).await { Ok(size) if size == 0 => { - // No data available to read - tokio::task::yield_now().await; + async_std::task::yield_now().await; continue }, Ok(_) if lack_bytes_from_prev == 0 => { @@ -116,18 +103,21 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul log::trace!( "read_message_from_stellar(): Nothing left to read; waiting for next loop" ); - tokio::task::yield_now().await; + async_std::task::yield_now().await; continue } + + // let's start reading the actual stellar message. readbuf = vec![0; expect_msg_len]; + match read_message( - stream_clone.clone(), + connector, &mut lack_bytes_from_prev, &mut readbuf, expect_msg_len, - ) { + ).await { Ok(None) => { - tokio::task::yield_now().await; + async_std::task::yield_now().await; continue }, Ok(Some(xdr)) => return Ok(xdr), @@ -143,13 +133,10 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul readbuf.append(&mut buff_for_reading); // let's read the continuation number of bytes from the previous message. - match read_unfinished_message( - stream_clone.clone(), - &mut lack_bytes_from_prev, - &mut readbuf, - ) { + match read_unfinished_message(connector, &mut lack_bytes_from_prev, &mut readbuf).await + { Ok(None) => { - tokio::task::yield_now().await; + async_std::task::yield_now().await; continue }, Ok(Some(xdr)) => return Ok(xdr), @@ -176,14 +163,13 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message /// * `xpect_msg_len` - the expected # of bytes of the Stellar message -fn read_message( - stream: Arc>, +async fn read_message( + connector: &mut Connector, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, xpect_msg_len: usize, ) -> Result, Error> { - let mut stream = stream.lock().unwrap(); - let actual_msg_len = stream.read(readbuf).map_err(|e| Error::ReadFailed(e.to_string()))?; + let actual_msg_len = connector.tcp_stream.read(readbuf).await.map_err(|e| Error::ReadFailed(e.to_string()))?; // only when the message has the exact expected size bytes, should we send to user. if actual_msg_len == xpect_msg_len { @@ -208,19 +194,17 @@ fn read_message( /// * `stream` - the TcpStream for reading the xdr stellar message /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message -fn read_unfinished_message( - stream: Arc>, +async fn read_unfinished_message( + connector: &mut Connector, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, ) -> Result, Error> { // let's read the continuation number of bytes from the previous message. let mut cont_buf = vec![0; *lack_bytes_from_prev]; - let actual_msg_len = stream - .lock() - .unwrap() - .read(&mut cont_buf) - .map_err(|e| Error::ReadFailed(e.to_string()))?; + let actual_msg_len = + connector.tcp_stream.read(&mut cont_buf).await + .map_err(|e| Error::ReadFailed(e.to_string()))?; // this partial message completes the previous message. if actual_msg_len == *lack_bytes_from_prev { diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index 1f1a0916a..eaa61f81a 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -1,4 +1,5 @@ -use std::io::Write; +use std::time::Duration; +use async_std::io::{self, WriteExt}; use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage}; use crate::connection::{ @@ -13,26 +14,14 @@ impl Connector { // Create the XDR message outside the closure let xdr_msg = self.create_xdr_message(msg)?; - // Clone the TcpStream (or its Arc> wrapper) - let stream_clone = self.tcp_stream.clone(); - - // this may really not be necessary - let write_result = tokio::task::spawn_blocking(move || { - let mut stream = stream_clone.lock().unwrap(); - stream.write_all(&xdr_msg).map_err(|e| { - log::error!("send_to_node(): Failed to send message to node: {e:?}"); - Error::WriteFailed(e.to_string()) - }) - }); - - // Await the result of the blocking task - match write_result.await { - Ok(result) => result, - Err(e) => { - log::error!("send_to_node(): Error occurred in blocking task: {e:?}"); - Err(Error::WriteFailed(e.to_string())) - }, - } + + io::timeout( + Duration::from_secs(self.timeout_in_secs), + async { + self.tcp_stream.write_all(&xdr_msg).await?; + Ok(()) + } + ).await.map_err(|_| Error::Timeout) } pub async fn send_hello_message(&mut self) -> Result<(), Error> { diff --git a/clients/stellar-relay-lib/src/connection/error.rs b/clients/stellar-relay-lib/src/connection/error.rs index ba3438e71..e2d9b137f 100644 --- a/clients/stellar-relay-lib/src/connection/error.rs +++ b/clients/stellar-relay-lib/src/connection/error.rs @@ -1,8 +1,8 @@ #![allow(dead_code)] //todo: remove after being tested and implemented +use async_std::channel::SendError; use crate::{connection::xdr_converter::Error as XDRError, helper::error_to_string}; use substrate_stellar_sdk::{types::ErrorCode, StellarSdkError}; -use tokio::sync; #[derive(Debug, err_derive::Error)] pub enum Error { @@ -52,7 +52,7 @@ pub enum Error { XDRConversionError(XDRError), #[error(display = "{:?}", _0)] - StdIOError(std::io::Error), + TcpStreamError(async_std::io::Error), #[error(display = "{:?}", _0)] StellarSdkError(StellarSdkError), @@ -73,9 +73,9 @@ pub enum Error { VersionStrTooLong, } -impl From for Error { - fn from(e: std::io::Error) -> Self { - Error::StdIOError(e) +impl From for Error { + fn from(e: async_std::io::Error) -> Self { + Error::TcpStreamError(e) } } @@ -85,8 +85,8 @@ impl From for Error { } } -impl From> for Error { - fn from(e: sync::mpsc::error::SendError) -> Self { +impl From> for Error { + fn from(e: SendError) -> Self { Error::SendFailed(e.to_string()) } } diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index fcf0e440d..54b7170d9 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -1,11 +1,13 @@ +use async_std::channel::{Receiver, Sender, SendError, TryRecvError}; use substrate_stellar_sdk::types::{ErrorCode, StellarMessage}; -use tokio::sync::{ - mpsc, - mpsc::{ - error::{SendError, TryRecvError}, - Sender, - }, -}; +// use tokio::sync::{ +// mpsc, +// mpsc::{ +// error::{SendError, TryRecvError}, +// Sender, +// }, +// }; +use async_std::task; use crate::{ connection::{poll_messages_from_stellar, ConnectionInfo, Connector}, @@ -16,8 +18,8 @@ use crate::{ /// Used to send/receive messages to/from Stellar Node pub struct StellarOverlayConnection { - sender: mpsc::Sender, - receiver: mpsc::Receiver, + sender: Sender, + receiver: Receiver, } impl StellarOverlayConnection { @@ -37,17 +39,24 @@ impl StellarOverlayConnection { log::info!("connect(): connecting to {conn_info:?}"); // this is a channel to communicate with the user/caller. - let (send_to_user_sender, send_to_user_receiver) = mpsc::channel::(1024); + let (send_to_user_sender, send_to_user_receiver) = + async_std::channel::bounded::(1024); - let (send_to_node_sender, send_to_node_receiver) = mpsc::channel::(1024); + let (send_to_node_sender, send_to_node_receiver) = + async_std::channel::bounded::(1024); let connector = Connector::start(local_node_info, conn_info).await?; - tokio::spawn(poll_messages_from_stellar( - connector, - send_to_user_sender, - send_to_node_receiver, - )); + task::spawn(async { + poll_messages_from_stellar( + connector, + send_to_user_sender, + send_to_node_receiver, + ).await; + + log::info!("connect(): poll_messages_from_stellar() finished."); + + }); Ok(StellarOverlayConnection { sender: send_to_node_sender, @@ -59,9 +68,11 @@ impl StellarOverlayConnection { loop { if !self.is_alive() { self.disconnect(); + log::info!("listen(): return disconnect message"); return Err(Error::Disconnected) } + match self.receiver.try_recv() { Ok(StellarMessage::ErrorMsg(e)) => { log::error!("listen(): received error message: {e:?}"); @@ -72,14 +83,20 @@ impl StellarOverlayConnection { return Ok(None) }, Ok(msg) => return Ok(Some(msg)), - Err(TryRecvError::Disconnected) => return Err(Error::Disconnected), - Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Closed) => { + log::info!("listen(): receiver was closed."); + return Err(Error::Disconnected); + }, + Err(TryRecvError::Empty) => { + continue; + }, } } } pub fn is_alive(&mut self) -> bool { - let is_closed = self.sender.is_closed(); + let mut is_closed = self.sender.is_closed(); + is_closed = is_closed || self.receiver.is_closed(); if is_closed { self.disconnect(); diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index 0bd51ff45..1c3c1d4fe 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -3,11 +3,13 @@ use crate::{ }; use serial_test::serial; use std::{sync::Arc, thread::sleep, time::Duration}; +use async_std::future::timeout; +use async_std::sync::Mutex; use substrate_stellar_sdk::{ types::{ScpStatementExternalize, ScpStatementPledges, StellarMessage}, Hash, IntoHash, }; -use tokio::{sync::Mutex, time::timeout}; + fn secret_key(is_mainnet: bool) -> String { let path = if is_mainnet { @@ -41,9 +43,13 @@ fn overlay_infos(is_mainnet: bool) -> (NodeInfo, ConnectionInfo) { ) } -#[tokio::test(flavor = "multi_thread")] +#[async_std::test] #[serial] async fn stellar_overlay_should_receive_scp_messages() { + // let it run for a second, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(3)); + let (node_info, conn_info) = overlay_infos(false); let overlay_connection = Arc::new(Mutex::new( @@ -70,9 +76,14 @@ async fn stellar_overlay_should_receive_scp_messages() { assert!(!scps_vec.lock().await.is_empty()); } -#[tokio::test(flavor = "multi_thread")] +#[async_std::test] #[serial] async fn stellar_overlay_should_receive_tx_set() { + env_logger::init(); + // let it run for a second, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(3)); + //arrange fn get_tx_set_hash(x: &ScpStatementExternalize) -> Hash { let scp_value = x.commit.value.get_vec(); @@ -108,14 +119,14 @@ async fn stellar_overlay_should_receive_tx_set() { let tx_set_hash = set.into_hash().expect("should return a hash"); actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - ov_conn_locked.disconnect(); + //ov_conn_locked.disconnect(); break }, StellarMessage::GeneralizedTxSet(set) => { let tx_set_hash = set.into_hash().expect("should return a hash"); actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - ov_conn_locked.disconnect(); + //ov_conn_locked.disconnect(); break }, _ => {}, @@ -135,9 +146,13 @@ async fn stellar_overlay_should_receive_tx_set() { assert!(expected_hashes.contains(&actual_hashes[0])) } -#[tokio::test(flavor = "multi_thread")] +#[async_std::test] #[serial] async fn stellar_overlay_disconnect_works() { + // let it run for a second, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(3)); + let (node_info, conn_info) = overlay_infos(false); let mut overlay_connection = diff --git a/clients/vault/Cargo.toml b/clients/vault/Cargo.toml index f91f12b06..4dfd5c6a7 100644 --- a/clients/vault/Cargo.toml +++ b/clients/vault/Cargo.toml @@ -20,6 +20,7 @@ parachain-metadata-foucoco = ["runtime/parachain-metadata-foucoco"] integration-test = ["runtime/standalone-metadata", "integration"] [dependencies] +async-std = "1.12.0" async-trait = "0.1.40" base64 = { version = '0.13.0', default-features = false, features = ['alloc'] } bincode = "1.3.3" diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 8945bf878..8a2ce261d 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -2,9 +2,11 @@ use std::{sync::Arc, time::Duration}; use service::on_shutdown; use tokio::{ - sync::{mpsc, mpsc::error::TryRecvError, RwLock}, + sync::{ RwLock}, time::{sleep, timeout}, }; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TryRecvError; use runtime::ShutdownSender; use stellar_relay_lib::{ @@ -24,6 +26,7 @@ pub struct OracleAgent { pub is_public_network: bool, message_sender: Option, shutdown_sender: ShutdownSender, + disconnect_signal_sender: mpsc::UnboundedSender<()>, } /// listens to data to collect the scp messages and txsets. @@ -76,13 +79,20 @@ pub async fn start_oracle_agent( ))); let collector_clone = collector.clone(); - let shutdown_sender_clone = shutdown_sender.clone(); // a clone used to forcefully call a shutdown, when StellarOverlay disconnects. let shutdown_sender_clone2 = shutdown_sender.clone(); // disconnect signal sender tells the StellarOverlayConnection to close its TcpStream to Stellar // Node - let (disconnect_signal_sender, mut disconnect_signal_receiver) = mpsc::channel::<()>(2); + let (disconnect_signal_sender, mut disconnect_signal_receiver) = mpsc::unbounded_channel::<()>(); + let disconnect_signal_sender_clone = disconnect_signal_sender.clone(); + + tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { + tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); + if let Err(e) = disconnect_signal_sender.send(()) { + tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); + } + })); service::spawn_cancelable(shutdown_sender_clone.subscribe(), async move { let sender_clone = overlay_conn.sender(); @@ -91,7 +101,7 @@ pub async fn start_oracle_agent( // if a disconnect signal was sent, disconnect from Stellar. Ok(_) | Err(TryRecvError::Disconnected) => { tracing::info!("start_oracle_agent(): disconnect overlay..."); - overlay_conn.disconnect(); + //overlay_conn.disconnect(); break }, Err(TryRecvError::Empty) => {}, @@ -112,7 +122,7 @@ pub async fn start_oracle_agent( Ok(None) => {}, // connection got lost Err(e) => { - overlay_conn.disconnect(); + //overlay_conn.disconnect(); tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}"); if let Err(e) = shutdown_sender_clone2.send(()) { @@ -124,23 +134,34 @@ pub async fn start_oracle_agent( }, } } + overlay_conn.disconnect(); + tracing::info!("start_oracle_agent(): ------- THE SPAWNED THREAD HAS STOPPED --------"); }); - tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { - tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); - if let Err(e) = disconnect_signal_sender.send(()).await { - tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); - } - })); - Ok(OracleAgent { collector, is_public_network: false, message_sender: Some(sender), shutdown_sender, + disconnect_signal_sender: disconnect_signal_sender_clone }) } +impl Drop for OracleAgent { + fn drop(&mut self) { + tracing::info!("OracleAgent: Dropping OracleAgent..."); + if let Err(e) = self.shutdown_sender.send(()) { + tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); + } + + if let Err(e) = self.disconnect_signal_sender.send(()) { + tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); + } else { + tracing::info!("start_oracle_agent(): message was sent!!!"); + } + } +} + impl OracleAgent { /// This method returns the proof for a given slot or an error if the proof cannot be provided. /// The agent will try every possible way to get the proof before returning an error. @@ -191,14 +212,6 @@ impl OracleAgent { self.collector.read().await.remove_data(slot); } - /// Stops listening for new SCP messages. - pub fn stop(&self) -> Result<(), Error> { - tracing::info!("stop(): Shutting down OracleAgent..."); - if let Err(e) = self.shutdown_sender.send(()) { - tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); - } - Ok(()) - } } #[cfg(test)] @@ -266,8 +279,6 @@ mod tests { // These might return an error if the file does not exist, but that's fine. let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - - agent.stop().expect("Failed to stop the agent"); } #[tokio::test(flavor = "multi_thread")] @@ -303,7 +314,6 @@ mod tests { let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - agent.stop().expect("Failed to stop the agent"); } #[tokio::test(flavor = "multi_thread")] @@ -324,6 +334,8 @@ mod tests { // This slot should be archived on the public network let target_slot = 44041116; + tracing::info!("let's sleep for 3 seconds,to get the network up and running"); + sleep(Duration::from_secs(3)).await; let proof_result = agent.get_proof(target_slot).await; assert!(matches!(proof_result, Err(Error::ProofTimeout(_)))); @@ -332,7 +344,5 @@ mod tests { let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - println!("HOY PLEAAASE"); - agent.stop().expect("Failed to stop the agent"); } } diff --git a/clients/vault/src/oracle/collector/handler.rs b/clients/vault/src/oracle/collector/handler.rs index 8b788381f..b4021b55e 100644 --- a/clients/vault/src/oracle/collector/handler.rs +++ b/clients/vault/src/oracle/collector/handler.rs @@ -36,7 +36,8 @@ impl ScpMessageCollector { tracing::debug!( "Handling Incoming ScpEnvelopes for slot {slot}: requesting TxSet..." ); - message_sender.send(StellarMessage::GetTxSet(txset_hash)).await?; + message_sender.send(StellarMessage::GetTxSet(txset_hash)).await + .map_err(|e|Error::Other(format!("Send Error: {e:?}")))?; // let's save this for creating the proof later on. self.save_txset_hash_and_slot(txset_hash, slot); diff --git a/clients/vault/src/oracle/errors.rs b/clients/vault/src/oracle/errors.rs index 569879dbe..b3e8ef133 100644 --- a/clients/vault/src/oracle/errors.rs +++ b/clients/vault/src/oracle/errors.rs @@ -30,6 +30,7 @@ pub enum Error { #[error(display = "{:?}", _0)] ProofTimeout(String), + #[error(display = "{} is not initialized", _0)] Uninitialized(String), diff --git a/clients/vault/src/oracle/types/types.rs b/clients/vault/src/oracle/types/types.rs index 5424a2060..8dc937e50 100644 --- a/clients/vault/src/oracle/types/types.rs +++ b/clients/vault/src/oracle/types/types.rs @@ -1,8 +1,7 @@ #![allow(non_snake_case)] use std::collections::BTreeMap; - -use tokio::sync::mpsc; +use async_std::channel::Sender; use stellar_relay_lib::sdk::types::{Hash, StellarMessage, Uint64}; @@ -13,7 +12,7 @@ pub type Filename = String; pub type SerializedData = Vec; -pub type StellarMessageSender = mpsc::Sender; +pub type StellarMessageSender = Sender; /// For easy writing to file. BTreeMap to preserve order of the slots. pub(crate) type SlotEncodedMap = BTreeMap; From b9e0dccbf97f888e96d16d07d726de130db8e7a7 Mon Sep 17 00:00:00 2001 From: Gianfranco Date: Tue, 23 Jan 2024 13:27:36 -0300 Subject: [PATCH 02/10] works, but still in progress --- Cargo.lock | 15 +++++ clients/stellar-relay-lib/Cargo.toml | 14 ++--- clients/stellar-relay-lib/examples/connect.rs | 2 +- .../src/connection/connector/connector.rs | 14 ++--- .../connection/connector/message_reader.rs | 50 ++++++++++------ clients/stellar-relay-lib/src/overlay.rs | 57 +++++++------------ clients/vault/src/oracle/agent.rs | 11 ++-- clients/vault/src/oracle/types/types.rs | 7 ++- 8 files changed, 94 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d734e5851..a053f8bae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -403,6 +403,19 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compat" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68a707c1feb095d8c07f8a65b9f506b117d30af431cab89374357de7c11461b" +dependencies = [ + "futures-core", + "futures-io", + "once_cell", + "pin-project-lite 0.2.10", + "tokio", +] + [[package]] name = "async-executor" version = "1.6.0" @@ -10076,6 +10089,7 @@ dependencies = [ name = "stellar-relay-lib" version = "1.0.3" dependencies = [ + "async-compat", "async-std", "base64 0.13.1", "env_logger 0.9.3", @@ -10090,6 +10104,7 @@ dependencies = [ "serial_test", "sha2 0.10.7", "substrate-stellar-sdk", + "tokio", "tweetnacl", ] diff --git a/clients/stellar-relay-lib/Cargo.toml b/clients/stellar-relay-lib/Cargo.toml index a80b6dcce..60878cd24 100644 --- a/clients/stellar-relay-lib/Cargo.toml +++ b/clients/stellar-relay-lib/Cargo.toml @@ -33,13 +33,13 @@ substrate-stellar-sdk = {git = "https://github.com/pendulum-chain/substrate-stel err-derive = "0.3.1" -#tokio = { version = "1.0", features = [ -# "macros", # allows main function to be async -# "rt-multi-thread", # for multi-thread runtime -# "sync", # to make channels available -# "time" # for timeouts and sleep, when reconnecting -#] } - +tokio = { version = "1.0", features = [ + "macros", # allows main function to be async + "rt-multi-thread", # for multi-thread runtime + "sync", # to make channels available + "time" # for timeouts and sleep, when reconnecting +] } +async-compat = "0.2.3" async-std = { version = "1.12.0", features = ["attributes"] } [features] diff --git a/clients/stellar-relay-lib/examples/connect.rs b/clients/stellar-relay-lib/examples/connect.rs index 817a23330..9dd308798 100644 --- a/clients/stellar-relay-lib/examples/connect.rs +++ b/clients/stellar-relay-lib/examples/connect.rs @@ -4,7 +4,7 @@ use stellar_relay_lib::{ StellarOverlayConfig, }; -#[async_std::main] +#[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index be4a34c80..d566de5cd 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -303,7 +303,7 @@ mod test { (node_info, conn_info, connector) } - #[async_std::test] + #[tokio::test] #[serial] async fn create_new_connector_works() { let (node_info, _, connector) = create_connector().await; @@ -317,7 +317,7 @@ mod test { assert_eq!(connector_local_node.network_id, node_info.network_id); } - #[async_std::test] + #[tokio::test] #[serial] async fn connector_local_sequence_works() { let (_, _, mut connector) = create_connector().await; @@ -328,7 +328,7 @@ mod test { } - #[async_std::test] + #[tokio::test] #[serial] async fn connector_set_remote_works() { let (_, _, mut connector) = create_connector().await; @@ -354,7 +354,7 @@ mod test { } - #[async_std::test] + #[tokio::test] #[serial] async fn connector_increment_remote_sequence_works() { let (_, _, mut connector) = create_connector().await; @@ -384,7 +384,7 @@ mod test { } - #[async_std::test] + #[tokio::test] #[serial] async fn connector_get_and_set_hmac_keys_works() { @@ -423,7 +423,7 @@ mod test { } - #[async_std::test] + #[tokio::test] #[serial] async fn connector_method_works() { env_logger::init(); @@ -441,7 +441,7 @@ mod test { assert!(connector.is_handshake_created()); } - #[async_std::test] + #[tokio::test] #[serial] async fn enable_flow_controller_works() { diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 26d591154..d6ac39f38 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -1,9 +1,12 @@ use std::net::Shutdown; -use async_std::channel::{Receiver, Sender, TryRecvError}; + use async_std::io::ReadExt; +use async_compat::{Compat, CompatExt}; use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; - +use tokio::io::AsyncReadExt; +use tokio::time::{sleep, Duration}; +use tokio::sync::{mpsc, mpsc::error::TryRecvError}; // use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// Polls for messages coming from the Stellar Node and communicates it back to the user @@ -15,12 +18,13 @@ use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; /// stream. pub(crate) async fn poll_messages_from_stellar( mut connector: Connector, - send_to_user_sender: Sender, - send_to_node_receiver: Receiver, + send_to_user_sender: mpsc::Sender, + mut send_to_node_receiver: mpsc::Receiver, ) { log::info!("poll_messages_from_stellar(): started."); loop { + log::info!("poll loop"); if send_to_user_sender.is_closed() { log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); // close this channel as communication to user was closed. @@ -33,8 +37,8 @@ pub(crate) async fn poll_messages_from_stellar( if let Err(e) = connector.send_to_node(msg).await { log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, - Err(TryRecvError::Closed) => { - log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); + Err(TryRecvError::Disconnected) => { + log::info!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); break }, Err(TryRecvError::Empty) => {}, @@ -49,22 +53,29 @@ pub(crate) async fn poll_messages_from_stellar( Ok(xdr) => xdr, }; + log::info!("last message recieved {:?}", xdr); + match connector.process_raw_message(xdr).await { Ok(Some(stellar_msg)) => - // push message to user + { // push message to user if let Err(e) = send_to_user_sender.send(stellar_msg.clone()).await { log::warn!("poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}", String::from_utf8(stellar_msg.to_base64_xdr()) .unwrap_or_else(|_| format!("{:?}", stellar_msg.to_base64_xdr())) ); - }, + } + + }, Ok(None) => {}, Err(e) => { log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); - break + break; }, } + + } + log::info!("exiting poll messages from stellar loop"); // make sure to shutdown the stream if let Err(e) = connector.tcp_stream.shutdown(Shutdown::Both) { @@ -82,15 +93,19 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result = vec![]; let mut buff_for_reading = vec![0; 4]; + let mut compat_stream = Compat::new(connector.tcp_stream.clone()); loop { - + //log::info!("read loop"); // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message - match connector.tcp_stream.read(&mut buff_for_reading).await { + match compat_stream.read(&mut buff_for_reading).await { Ok(size) if size == 0 => { - async_std::task::yield_now().await; - continue + //let empty_vec = vec![]; + //return Ok(empty_vec); + tokio::task::yield_now().await; + sleep(Duration::from_millis(100)).await; + continue; }, Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, @@ -103,7 +118,8 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result Result { - async_std::task::yield_now().await; + //tokio::task::yield_now().await; + sleep(Duration::from_millis(100)).await; continue }, Ok(Some(xdr)) => return Ok(xdr), @@ -136,7 +153,8 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result { - async_std::task::yield_now().await; + //tokio::task::yield_now().await; + sleep(Duration::from_millis(100)).await; continue }, Ok(Some(xdr)) => return Ok(xdr), diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index 54b7170d9..d750bbc49 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -1,13 +1,11 @@ -use async_std::channel::{Receiver, Sender, SendError, TryRecvError}; use substrate_stellar_sdk::types::{ErrorCode, StellarMessage}; -// use tokio::sync::{ -// mpsc, -// mpsc::{ -// error::{SendError, TryRecvError}, -// Sender, -// }, -// }; -use async_std::task; +use tokio::sync::{ + mpsc, + mpsc::{ + error::{SendError, TryRecvError}, + Sender, + }, +}; use crate::{ connection::{poll_messages_from_stellar, ConnectionInfo, Connector}, @@ -18,8 +16,8 @@ use crate::{ /// Used to send/receive messages to/from Stellar Node pub struct StellarOverlayConnection { - sender: Sender, - receiver: Receiver, + sender: mpsc::Sender, + receiver: mpsc::Receiver, } impl StellarOverlayConnection { @@ -39,24 +37,17 @@ impl StellarOverlayConnection { log::info!("connect(): connecting to {conn_info:?}"); // this is a channel to communicate with the user/caller. - let (send_to_user_sender, send_to_user_receiver) = - async_std::channel::bounded::(1024); + let (send_to_user_sender, send_to_user_receiver) = mpsc::channel::(1024); - let (send_to_node_sender, send_to_node_receiver) = - async_std::channel::bounded::(1024); + let (send_to_node_sender, send_to_node_receiver) = mpsc::channel::(1024); let connector = Connector::start(local_node_info, conn_info).await?; - task::spawn(async { - poll_messages_from_stellar( - connector, - send_to_user_sender, - send_to_node_receiver, - ).await; - - log::info!("connect(): poll_messages_from_stellar() finished."); - - }); + tokio::spawn(poll_messages_from_stellar( + connector, + send_to_user_sender, + send_to_node_receiver, + )); Ok(StellarOverlayConnection { sender: send_to_node_sender, @@ -68,11 +59,9 @@ impl StellarOverlayConnection { loop { if !self.is_alive() { self.disconnect(); - log::info!("listen(): return disconnect message"); return Err(Error::Disconnected) } - match self.receiver.try_recv() { Ok(StellarMessage::ErrorMsg(e)) => { log::error!("listen(): received error message: {e:?}"); @@ -83,20 +72,14 @@ impl StellarOverlayConnection { return Ok(None) }, Ok(msg) => return Ok(Some(msg)), - Err(TryRecvError::Closed) => { - log::info!("listen(): receiver was closed."); - return Err(Error::Disconnected); - }, - Err(TryRecvError::Empty) => { - continue; - }, + Err(TryRecvError::Disconnected) => return Err(Error::Disconnected), + Err(TryRecvError::Empty) => continue, } } } pub fn is_alive(&mut self) -> bool { - let mut is_closed = self.sender.is_closed(); - is_closed = is_closed || self.receiver.is_closed(); + let is_closed = self.sender.is_closed(); if is_closed { self.disconnect(); @@ -115,4 +98,4 @@ impl Drop for StellarOverlayConnection { fn drop(&mut self) { self.disconnect(); } -} +} \ No newline at end of file diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 8a2ce261d..0c673901c 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -80,6 +80,7 @@ pub async fn start_oracle_agent( let collector_clone = collector.clone(); // a clone used to forcefully call a shutdown, when StellarOverlay disconnects. + let shutdown_sender_clone = shutdown_sender.clone(); let shutdown_sender_clone2 = shutdown_sender.clone(); // disconnect signal sender tells the StellarOverlayConnection to close its TcpStream to Stellar @@ -101,7 +102,7 @@ pub async fn start_oracle_agent( // if a disconnect signal was sent, disconnect from Stellar. Ok(_) | Err(TryRecvError::Disconnected) => { tracing::info!("start_oracle_agent(): disconnect overlay..."); - //overlay_conn.disconnect(); + overlay_conn.disconnect(); break }, Err(TryRecvError::Empty) => {}, @@ -150,9 +151,9 @@ pub async fn start_oracle_agent( impl Drop for OracleAgent { fn drop(&mut self) { tracing::info!("OracleAgent: Dropping OracleAgent..."); - if let Err(e) = self.shutdown_sender.send(()) { - tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); - } + // if let Err(e) = self.shutdown_sender.send(()) { + // tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); + // } if let Err(e) = self.disconnect_signal_sender.send(()) { tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); @@ -174,7 +175,7 @@ impl OracleAgent { let collector = self.collector.clone(); #[cfg(test)] - let timeout_seconds = 180; + let timeout_seconds = 60; #[cfg(not(test))] let timeout_seconds = 60; diff --git a/clients/vault/src/oracle/types/types.rs b/clients/vault/src/oracle/types/types.rs index 8dc937e50..39b5c4e42 100644 --- a/clients/vault/src/oracle/types/types.rs +++ b/clients/vault/src/oracle/types/types.rs @@ -1,7 +1,8 @@ #![allow(non_snake_case)] use std::collections::BTreeMap; -use async_std::channel::Sender; + +use tokio::sync::mpsc; use stellar_relay_lib::sdk::types::{Hash, StellarMessage, Uint64}; @@ -12,9 +13,9 @@ pub type Filename = String; pub type SerializedData = Vec; -pub type StellarMessageSender = Sender; +pub type StellarMessageSender = mpsc::Sender; /// For easy writing to file. BTreeMap to preserve order of the slots. pub(crate) type SlotEncodedMap = BTreeMap; -pub(crate) type SlotList = BTreeMap; +pub(crate) type SlotList = BTreeMap; \ No newline at end of file From e11d75b96b2a765656167689f29de6cfa0d806fe Mon Sep 17 00:00:00 2001 From: Gianfranco Date: Tue, 23 Jan 2024 16:16:33 -0300 Subject: [PATCH 03/10] working test withouth extra signals --- Cargo.lock | 14 ---- clients/stellar-relay-lib/Cargo.toml | 1 - .../src/connection/connector/connector.rs | 66 +++++++-------- .../connection/connector/message_reader.rs | 81 ++++++++++-------- .../connection/connector/message_sender.rs | 16 ++-- .../stellar-relay-lib/src/connection/error.rs | 2 +- clients/stellar-relay-lib/src/overlay.rs | 11 +-- clients/stellar-relay-lib/src/tests/mod.rs | 10 +-- clients/vault/src/oracle/agent.rs | 84 +++++++++---------- clients/vault/src/oracle/collector/handler.rs | 6 +- clients/vault/src/oracle/errors.rs | 1 - clients/vault/src/oracle/types/types.rs | 2 +- 12 files changed, 137 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a053f8bae..6a2563ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -403,19 +403,6 @@ dependencies = [ "futures-core", ] -[[package]] -name = "async-compat" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68a707c1feb095d8c07f8a65b9f506b117d30af431cab89374357de7c11461b" -dependencies = [ - "futures-core", - "futures-io", - "once_cell", - "pin-project-lite 0.2.10", - "tokio", -] - [[package]] name = "async-executor" version = "1.6.0" @@ -10089,7 +10076,6 @@ dependencies = [ name = "stellar-relay-lib" version = "1.0.3" dependencies = [ - "async-compat", "async-std", "base64 0.13.1", "env_logger 0.9.3", diff --git a/clients/stellar-relay-lib/Cargo.toml b/clients/stellar-relay-lib/Cargo.toml index 60878cd24..5446a6534 100644 --- a/clients/stellar-relay-lib/Cargo.toml +++ b/clients/stellar-relay-lib/Cargo.toml @@ -39,7 +39,6 @@ tokio = { version = "1.0", features = [ "sync", # to make channels available "time" # for timeouts and sleep, when reconnecting ] } -async-compat = "0.2.3" async-std = { version = "1.12.0", features = ["attributes"] } [features] diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index d566de5cd..4bee01b36 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -1,9 +1,8 @@ +use async_std::net::TcpStream; use std::{ fmt::{Debug, Formatter}, - // net::TcpStream, + net::Shutdown, }; -use std::net::Shutdown; -use async_std::net::TcpStream; use substrate_stellar_sdk::{ types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType}, XdrCodec, @@ -37,7 +36,7 @@ pub struct Connector { flow_controller: FlowController, /// for writing/reading xdr messages to/from Stellar Node. - pub(crate) tcp_stream: TcpStream + pub(crate) tcp_stream: TcpStream, } impl Debug for Connector { @@ -53,30 +52,36 @@ impl Debug for Connector { .field("receive_scp_messages", &self.receive_scp_messages) .field("handshake_state", &self.handshake_state) .field("flow_controller", &self.flow_controller) - .field("local_addr", - &self.tcp_stream.local_addr() - .map(|addr| addr.to_string()) - .unwrap_or("cannot provide".to_string()) + .field( + "local_addr", + &self + .tcp_stream + .local_addr() + .map(|addr| addr.to_string()) + .unwrap_or("cannot provide".to_string()), ) - .field("peer_addr", - &self.tcp_stream.peer_addr() - .map(|addr| addr.to_string()) - .unwrap_or("cannot provide".to_string()) + .field( + "peer_addr", + &self + .tcp_stream + .peer_addr() + .map(|addr| addr.to_string()) + .unwrap_or("cannot provide".to_string()), ) .finish() } } -impl Drop for Connector { - fn drop(&mut self) { - if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { - log::error!("drop(): failed to shutdown tcp stream: {}", e); - } else { - log::info!("drop(): tcp stream successfully shutdown"); - } +// impl Drop for Connector { +// fn drop(&mut self) { +// if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { +// log::error!("drop(): failed to shutdown tcp stream: {}", e); +// } else { +// log::info!("drop(): tcp stream successfully shutdown"); +// } - } -} +// } +// } impl Connector { /// Verifies the AuthenticatedMessage, received from the Stellar Node @@ -137,7 +142,8 @@ impl Connector { /// returns a Connector and starts creating a connection to Stellar pub async fn start(local_node: NodeInfo, conn_info: ConnectionInfo) -> Result { // Create the stream - let tcp_stream = TcpStream::connect(conn_info.address()).await + let tcp_stream = TcpStream::connect(conn_info.address()) + .await .map_err(|e| Error::ConnectionFailed(e.to_string()))?; let connection_auth = ConnectionAuth::new( @@ -157,7 +163,7 @@ impl Connector { receive_scp_messages: conn_info.recv_scp_msgs, handshake_state: HandshakeState::Connecting, flow_controller: FlowController::default(), - tcp_stream + tcp_stream, }; // To start the handshake, send a hello message to Stellar @@ -324,8 +330,6 @@ mod test { assert_eq!(connector.local_sequence(), 0); connector.increment_local_sequence(); assert_eq!(connector.local_sequence(), 1); - - } #[tokio::test] @@ -350,8 +354,6 @@ mod test { connector.set_remote(RemoteInfo::new(&hello)); assert!(connector.remote().is_some()); - - } #[tokio::test] @@ -380,14 +382,11 @@ mod test { connector.increment_remote_sequence().unwrap(); connector.increment_remote_sequence().unwrap(); assert_eq!(connector.remote().unwrap().sequence(), 3); - - } #[tokio::test] #[serial] async fn connector_get_and_set_hmac_keys_works() { - //arrange let (_, _, mut connector) = create_connector().await; let connector_auth = &connector.connection_auth; @@ -419,15 +418,13 @@ mod test { )); //assert assert!(connector.hmac_keys().is_some()); - - } #[tokio::test] #[serial] async fn connector_method_works() { env_logger::init(); - + let (_, conn_config, mut connector) = create_connector().await; assert_eq!(connector.remote_called_us(), conn_config.remote_called_us); @@ -444,12 +441,9 @@ mod test { #[tokio::test] #[serial] async fn enable_flow_controller_works() { - let (node_info, _, mut connector) = create_connector().await; assert!(!connector.inner_check_to_send_more(MessageType::ScpMessage)); connector.enable_flow_controller(node_info.overlay_version, node_info.overlay_version); - - } } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index d6ac39f38..193b8079a 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -1,13 +1,13 @@ use std::net::Shutdown; -use async_std::io::ReadExt; -use async_compat::{Compat, CompatExt}; use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; +use async_std::io::ReadExt; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; -use tokio::io::AsyncReadExt; -use tokio::time::{sleep, Duration}; -use tokio::sync::{mpsc, mpsc::error::TryRecvError}; -// use tokio::sync::{mpsc, mpsc::error::TryRecvError}; +use tokio::{ + io::AsyncReadExt, + sync::{mpsc, mpsc::error::TryRecvError}, + time::{sleep, Duration}, +}; /// Polls for messages coming from the Stellar Node and communicates it back to the user /// @@ -22,6 +22,7 @@ pub(crate) async fn poll_messages_from_stellar( mut send_to_node_receiver: mpsc::Receiver, ) { log::info!("poll_messages_from_stellar(): started."); + let mut counter = 0; loop { log::info!("poll loop"); @@ -31,6 +32,9 @@ pub(crate) async fn poll_messages_from_stellar( break } + // just for testing, remove me + log_counter(&mut counter).await; + // check for messages from user. match send_to_node_receiver.try_recv() { Ok(msg) => @@ -53,27 +57,22 @@ pub(crate) async fn poll_messages_from_stellar( Ok(xdr) => xdr, }; - log::info!("last message recieved {:?}", xdr); - match connector.process_raw_message(xdr).await { - Ok(Some(stellar_msg)) => - { // push message to user + Ok(Some(stellar_msg)) => { + // push message to user if let Err(e) = send_to_user_sender.send(stellar_msg.clone()).await { log::warn!("poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}", String::from_utf8(stellar_msg.to_base64_xdr()) .unwrap_or_else(|_| format!("{:?}", stellar_msg.to_base64_xdr())) ); } - }, Ok(None) => {}, Err(e) => { log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); - break; + break }, } - - } log::info!("exiting poll messages from stellar loop"); @@ -87,25 +86,32 @@ pub(crate) async fn poll_messages_from_stellar( log::info!("poll_messages_from_stellar(): stopped."); } +// just for testing the outer loop is getting polled. +// If the counter increases, means the yield from read_message_from_stellar +// is working. +async fn log_counter(counter: &mut u32) { + *counter += 1; + log::info!("Outer loop iteration: {}", *counter); + tokio::time::sleep(Duration::from_millis(100)).await; +} + /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node async fn read_message_from_stellar(connector: &mut Connector) -> Result { // holds the number of bytes that were missing from the previous stellar message. let mut lack_bytes_from_prev = 0; let mut readbuf: Vec = vec![]; let mut buff_for_reading = vec![0; 4]; - let mut compat_stream = Compat::new(connector.tcp_stream.clone()); loop { //log::info!("read loop"); // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message - match compat_stream.read(&mut buff_for_reading).await { + match connector.tcp_stream.read(&mut buff_for_reading).await { Ok(size) if size == 0 => { - //let empty_vec = vec![]; - //return Ok(empty_vec); + //log::info!("read_message_from_stellar(): size 0"); tokio::task::yield_now().await; sleep(Duration::from_millis(100)).await; - continue; + continue }, Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, @@ -115,10 +121,8 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result Result { - //tokio::task::yield_now().await; - sleep(Duration::from_millis(100)).await; continue }, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { - log::trace!("read_message_from_stellar(): ERROR: {e:?}"); + log::info!("read_message_from_stellar(): ERROR: {e:?}"); return Err(e) }, } @@ -150,23 +154,22 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result { - //tokio::task::yield_now().await; - sleep(Duration::from_millis(100)).await; continue }, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { - log::trace!("read_message_from_stellar(): ERROR: {e:?}"); + log::info!("read_message_from_stellar(): ERROR: {e:?}"); return Err(e) }, } }, Err(e) => { - log::trace!("read_message_from_stellar(): ERROR reading messages: {e:?}"); + log::info!("read_message_from_stellar(): ERROR reading messages: {e:?}"); return Err(Error::ReadFailed(e.to_string())) }, } @@ -187,7 +190,11 @@ async fn read_message( readbuf: &mut Vec, xpect_msg_len: usize, ) -> Result, Error> { - let actual_msg_len = connector.tcp_stream.read(readbuf).await.map_err(|e| Error::ReadFailed(e.to_string()))?; + let actual_msg_len = connector + .tcp_stream + .read(readbuf) + .await + .map_err(|e| Error::ReadFailed(e.to_string()))?; // only when the message has the exact expected size bytes, should we send to user. if actual_msg_len == xpect_msg_len { @@ -220,9 +227,11 @@ async fn read_unfinished_message( // let's read the continuation number of bytes from the previous message. let mut cont_buf = vec![0; *lack_bytes_from_prev]; - let actual_msg_len = - connector.tcp_stream.read(&mut cont_buf).await - .map_err(|e| Error::ReadFailed(e.to_string()))?; + let actual_msg_len = connector + .tcp_stream + .read(&mut cont_buf) + .await + .map_err(|e| Error::ReadFailed(e.to_string()))?; // this partial message completes the previous message. if actual_msg_len == *lack_bytes_from_prev { diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index eaa61f81a..0bfe685fb 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -1,5 +1,5 @@ -use std::time::Duration; use async_std::io::{self, WriteExt}; +use std::time::Duration; use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage}; use crate::connection::{ @@ -14,14 +14,12 @@ impl Connector { // Create the XDR message outside the closure let xdr_msg = self.create_xdr_message(msg)?; - - io::timeout( - Duration::from_secs(self.timeout_in_secs), - async { - self.tcp_stream.write_all(&xdr_msg).await?; - Ok(()) - } - ).await.map_err(|_| Error::Timeout) + io::timeout(Duration::from_secs(self.timeout_in_secs), async { + self.tcp_stream.write_all(&xdr_msg).await?; + Ok(()) + }) + .await + .map_err(|_| Error::Timeout) } pub async fn send_hello_message(&mut self) -> Result<(), Error> { diff --git a/clients/stellar-relay-lib/src/connection/error.rs b/clients/stellar-relay-lib/src/connection/error.rs index e2d9b137f..302969dee 100644 --- a/clients/stellar-relay-lib/src/connection/error.rs +++ b/clients/stellar-relay-lib/src/connection/error.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] //todo: remove after being tested and implemented -use async_std::channel::SendError; use crate::{connection::xdr_converter::Error as XDRError, helper::error_to_string}; +use async_std::channel::SendError; use substrate_stellar_sdk::{types::ErrorCode, StellarSdkError}; #[derive(Debug, err_derive::Error)] diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index d750bbc49..a1b48e613 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -82,6 +82,7 @@ impl StellarOverlayConnection { let is_closed = self.sender.is_closed(); if is_closed { + log::info!("is_alive(): not alive anymore"); self.disconnect(); } @@ -94,8 +95,8 @@ impl StellarOverlayConnection { } } -impl Drop for StellarOverlayConnection { - fn drop(&mut self) { - self.disconnect(); - } -} \ No newline at end of file +// impl Drop for StellarOverlayConnection { +// fn drop(&mut self) { +// self.disconnect(); +// } +// } diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index 1c3c1d4fe..15d53118a 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -1,16 +1,14 @@ use crate::{ connection::ConnectionInfo, node::NodeInfo, StellarOverlayConfig, StellarOverlayConnection, }; +use async_std::{future::timeout, sync::Mutex}; use serial_test::serial; use std::{sync::Arc, thread::sleep, time::Duration}; -use async_std::future::timeout; -use async_std::sync::Mutex; use substrate_stellar_sdk::{ types::{ScpStatementExternalize, ScpStatementPledges, StellarMessage}, Hash, IntoHash, }; - fn secret_key(is_mainnet: bool) -> String { let path = if is_mainnet { "./resources/secretkey/stellar_secretkey_mainnet" @@ -43,7 +41,7 @@ fn overlay_infos(is_mainnet: bool) -> (NodeInfo, ConnectionInfo) { ) } -#[async_std::test] +#[tokio::test] #[serial] async fn stellar_overlay_should_receive_scp_messages() { // let it run for a second, making sure that the other tests have successfully shutdown @@ -76,7 +74,7 @@ async fn stellar_overlay_should_receive_scp_messages() { assert!(!scps_vec.lock().await.is_empty()); } -#[async_std::test] +#[tokio::test] #[serial] async fn stellar_overlay_should_receive_tx_set() { env_logger::init(); @@ -146,7 +144,7 @@ async fn stellar_overlay_should_receive_tx_set() { assert!(expected_hashes.contains(&actual_hashes[0])) } -#[async_std::test] +#[tokio::test] #[serial] async fn stellar_overlay_disconnect_works() { // let it run for a second, making sure that the other tests have successfully shutdown diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 0c673901c..bae986c5a 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -2,11 +2,9 @@ use std::{sync::Arc, time::Duration}; use service::on_shutdown; use tokio::{ - sync::{ RwLock}, + sync::{mpsc, mpsc::error::TryRecvError, RwLock}, time::{sleep, timeout}, }; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TryRecvError; use runtime::ShutdownSender; use stellar_relay_lib::{ @@ -85,28 +83,29 @@ pub async fn start_oracle_agent( // disconnect signal sender tells the StellarOverlayConnection to close its TcpStream to Stellar // Node - let (disconnect_signal_sender, mut disconnect_signal_receiver) = mpsc::unbounded_channel::<()>(); + let (disconnect_signal_sender, mut disconnect_signal_receiver) = + mpsc::unbounded_channel::<()>(); let disconnect_signal_sender_clone = disconnect_signal_sender.clone(); - tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { - tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); - if let Err(e) = disconnect_signal_sender.send(()) { - tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); - } - })); + // tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { + // tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); + // if let Err(e) = disconnect_signal_sender.send(()) { + // tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); + // } + // })); service::spawn_cancelable(shutdown_sender_clone.subscribe(), async move { let sender_clone = overlay_conn.sender(); loop { - match disconnect_signal_receiver.try_recv() { - // if a disconnect signal was sent, disconnect from Stellar. - Ok(_) | Err(TryRecvError::Disconnected) => { - tracing::info!("start_oracle_agent(): disconnect overlay..."); - overlay_conn.disconnect(); - break - }, - Err(TryRecvError::Empty) => {}, - } + // match disconnect_signal_receiver.try_recv() { + // // if a disconnect signal was sent, disconnect from Stellar. + // Ok(_) | Err(TryRecvError::Disconnected) => { + // tracing::info!("start_oracle_agent(): disconnect overlay..."); + // overlay_conn.disconnect(); + // break + // }, + // Err(TryRecvError::Empty) => {}, + // } // listen for messages from Stellar match overlay_conn.listen() { @@ -123,19 +122,17 @@ pub async fn start_oracle_agent( Ok(None) => {}, // connection got lost Err(e) => { - //overlay_conn.disconnect(); + overlay_conn.disconnect(); tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}"); - - if let Err(e) = shutdown_sender_clone2.send(()) { - tracing::error!( - "start_oracle_agent(): Failed to send shutdown signal in thread: {e:?}" - ); - } + // if let Err(e) = shutdown_sender_clone2.send(()) { + // tracing::error!( + // "start_oracle_agent(): Failed to send shutdown signal in thread: {e:?}" + // ); + // } break }, } } - overlay_conn.disconnect(); tracing::info!("start_oracle_agent(): ------- THE SPAWNED THREAD HAS STOPPED --------"); }); @@ -144,24 +141,24 @@ pub async fn start_oracle_agent( is_public_network: false, message_sender: Some(sender), shutdown_sender, - disconnect_signal_sender: disconnect_signal_sender_clone + disconnect_signal_sender: disconnect_signal_sender_clone, }) } -impl Drop for OracleAgent { - fn drop(&mut self) { - tracing::info!("OracleAgent: Dropping OracleAgent..."); - // if let Err(e) = self.shutdown_sender.send(()) { - // tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); - // } - - if let Err(e) = self.disconnect_signal_sender.send(()) { - tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); - } else { - tracing::info!("start_oracle_agent(): message was sent!!!"); - } - } -} +// impl Drop for OracleAgent { +// fn drop(&mut self) { +// tracing::info!("OracleAgent: Dropping OracleAgent..."); +// if let Err(e) = self.shutdown_sender.send(()) { +// tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); +// } + +// if let Err(e) = self.disconnect_signal_sender.send(()) { +// tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); +// } else { +// tracing::info!("start_oracle_agent(): message was sent!!!"); +// } +// } +// } impl OracleAgent { /// This method returns the proof for a given slot or an error if the proof cannot be provided. @@ -212,7 +209,6 @@ impl OracleAgent { pub async fn remove_data(&self, slot: &Slot) { self.collector.read().await.remove_data(slot); } - } #[cfg(test)] @@ -314,7 +310,6 @@ mod tests { // These might return an error if the file does not exist, but that's fine. let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - } #[tokio::test(flavor = "multi_thread")] @@ -344,6 +339,5 @@ mod tests { // These might return an error if the file does not exist, but that's fine. let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - } } diff --git a/clients/vault/src/oracle/collector/handler.rs b/clients/vault/src/oracle/collector/handler.rs index b4021b55e..4fac5a5e7 100644 --- a/clients/vault/src/oracle/collector/handler.rs +++ b/clients/vault/src/oracle/collector/handler.rs @@ -36,8 +36,10 @@ impl ScpMessageCollector { tracing::debug!( "Handling Incoming ScpEnvelopes for slot {slot}: requesting TxSet..." ); - message_sender.send(StellarMessage::GetTxSet(txset_hash)).await - .map_err(|e|Error::Other(format!("Send Error: {e:?}")))?; + message_sender + .send(StellarMessage::GetTxSet(txset_hash)) + .await + .map_err(|e| Error::Other(format!("Send Error: {e:?}")))?; // let's save this for creating the proof later on. self.save_txset_hash_and_slot(txset_hash, slot); diff --git a/clients/vault/src/oracle/errors.rs b/clients/vault/src/oracle/errors.rs index b3e8ef133..569879dbe 100644 --- a/clients/vault/src/oracle/errors.rs +++ b/clients/vault/src/oracle/errors.rs @@ -30,7 +30,6 @@ pub enum Error { #[error(display = "{:?}", _0)] ProofTimeout(String), - #[error(display = "{} is not initialized", _0)] Uninitialized(String), diff --git a/clients/vault/src/oracle/types/types.rs b/clients/vault/src/oracle/types/types.rs index 39b5c4e42..5424a2060 100644 --- a/clients/vault/src/oracle/types/types.rs +++ b/clients/vault/src/oracle/types/types.rs @@ -18,4 +18,4 @@ pub type StellarMessageSender = mpsc::Sender; /// For easy writing to file. BTreeMap to preserve order of the slots. pub(crate) type SlotEncodedMap = BTreeMap; -pub(crate) type SlotList = BTreeMap; \ No newline at end of file +pub(crate) type SlotList = BTreeMap; From c2faf648aa91a62390cb3f7623035b8ee2e3b6f3 Mon Sep 17 00:00:00 2001 From: Gianfranco Date: Tue, 23 Jan 2024 17:31:29 -0300 Subject: [PATCH 04/10] remove comments and re-add connector drop trait --- .../src/connection/connector/connector.rs | 20 +++++++++---------- .../connection/connector/message_reader.rs | 9 +++++---- clients/vault/src/oracle/agent.rs | 2 +- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index 4bee01b36..ea7820af1 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -72,16 +72,16 @@ impl Debug for Connector { } } -// impl Drop for Connector { -// fn drop(&mut self) { -// if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { -// log::error!("drop(): failed to shutdown tcp stream: {}", e); -// } else { -// log::info!("drop(): tcp stream successfully shutdown"); -// } - -// } -// } +impl Drop for Connector { + fn drop(&mut self) { + if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { + log::error!("drop(): failed to shutdown tcp stream: {}", e); + } else { + log::info!("drop(): tcp stream successfully shutdown"); + } + + } +} impl Connector { /// Verifies the AuthenticatedMessage, received from the Stellar Node diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 193b8079a..35c709fa7 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -25,7 +25,7 @@ pub(crate) async fn poll_messages_from_stellar( let mut counter = 0; loop { - log::info!("poll loop"); + if send_to_user_sender.is_closed() { log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); // close this channel as communication to user was closed. @@ -33,7 +33,8 @@ pub(crate) async fn poll_messages_from_stellar( } // just for testing, remove me - log_counter(&mut counter).await; + // log::info!("poll loop"); + //log_counter(&mut counter).await; // check for messages from user. match send_to_node_receiver.try_recv() { @@ -74,7 +75,6 @@ pub(crate) async fn poll_messages_from_stellar( }, } } - log::info!("exiting poll messages from stellar loop"); // make sure to shutdown the stream if let Err(e) = connector.tcp_stream.shutdown(Shutdown::Both) { @@ -108,7 +108,8 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result { - //log::info!("read_message_from_stellar(): size 0"); + //if we remove the yield here, we have the risk that the process will become a + // "busy one", never letting other processes handle the exit tokio::task::yield_now().await; sleep(Duration::from_millis(100)).await; continue diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index bae986c5a..20ea8db62 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -172,7 +172,7 @@ impl OracleAgent { let collector = self.collector.clone(); #[cfg(test)] - let timeout_seconds = 60; + let timeout_seconds = 180; #[cfg(not(test))] let timeout_seconds = 60; From 823f5ff6cda3b00d75b9dfd51b2a8e0adf07c76c Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Wed, 24 Jan 2024 10:47:09 +0800 Subject: [PATCH 05/10] cleanup --- .../src/connection/connector/connector.rs | 23 ++--- .../connection/connector/message_reader.rs | 46 +++------- .../connection/connector/message_sender.rs | 16 ++-- .../stellar-relay-lib/src/connection/error.rs | 14 +-- clients/stellar-relay-lib/src/overlay.rs | 19 ++-- clients/stellar-relay-lib/src/tests/mod.rs | 27 +++--- clients/vault/src/oracle/agent.rs | 90 +++++++++---------- clients/vault/src/oracle/collector/handler.rs | 5 +- 8 files changed, 100 insertions(+), 140 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index ea7820af1..a588a46de 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -74,12 +74,7 @@ impl Debug for Connector { impl Drop for Connector { fn drop(&mut self) { - if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { - log::error!("drop(): failed to shutdown tcp stream: {}", e); - } else { - log::info!("drop(): tcp stream successfully shutdown"); - } - + self.stop(); } } @@ -171,6 +166,12 @@ impl Connector { Ok(connector) } + + pub fn stop(&mut self) { + if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { + log::error!("stop(): failed to shutdown tcp stream: {}", e); + } + } } // getters setters @@ -246,14 +247,6 @@ impl Connector { ) { self.flow_controller.enable(local_overlay_version, remote_overlay_version) } - - // pub fn shutdown(&mut self) { - // if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { - // log::error!("shutdown(): failed to shutdown tcp stream: {}", e); - // } else { - // log::info!("shutdown(): tcp stream successfully shutdown"); - // } - // } } #[cfg(test)] @@ -423,8 +416,6 @@ mod test { #[tokio::test] #[serial] async fn connector_method_works() { - env_logger::init(); - let (_, conn_config, mut connector) = create_connector().await; assert_eq!(connector.remote_called_us(), conn_config.remote_called_us); diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 35c709fa7..87a09ed99 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -4,7 +4,6 @@ use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, use async_std::io::ReadExt; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; use tokio::{ - io::AsyncReadExt, sync::{mpsc, mpsc::error::TryRecvError}, time::{sleep, Duration}, }; @@ -22,20 +21,14 @@ pub(crate) async fn poll_messages_from_stellar( mut send_to_node_receiver: mpsc::Receiver, ) { log::info!("poll_messages_from_stellar(): started."); - let mut counter = 0; loop { - if send_to_user_sender.is_closed() { log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); // close this channel as communication to user was closed. break } - // just for testing, remove me - // log::info!("poll loop"); - //log_counter(&mut counter).await; - // check for messages from user. match send_to_node_receiver.try_recv() { Ok(msg) => @@ -43,7 +36,7 @@ pub(crate) async fn poll_messages_from_stellar( log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, Err(TryRecvError::Disconnected) => { - log::info!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); + log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); break }, Err(TryRecvError::Empty) => {}, @@ -59,15 +52,14 @@ pub(crate) async fn poll_messages_from_stellar( }; match connector.process_raw_message(xdr).await { - Ok(Some(stellar_msg)) => { - // push message to user + Ok(Some(stellar_msg)) => + // push message to user if let Err(e) = send_to_user_sender.send(stellar_msg.clone()).await { log::warn!("poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}", String::from_utf8(stellar_msg.to_base64_xdr()) .unwrap_or_else(|_| format!("{:?}", stellar_msg.to_base64_xdr())) ); - } - }, + }, Ok(None) => {}, Err(e) => { log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); @@ -75,7 +67,6 @@ pub(crate) async fn poll_messages_from_stellar( }, } } - // make sure to shutdown the stream if let Err(e) = connector.tcp_stream.shutdown(Shutdown::Both) { log::error!("poll_messages_from_stellar(): Failed to shutdown the tcp stream: {e:?}"); @@ -86,29 +77,20 @@ pub(crate) async fn poll_messages_from_stellar( log::info!("poll_messages_from_stellar(): stopped."); } -// just for testing the outer loop is getting polled. -// If the counter increases, means the yield from read_message_from_stellar -// is working. -async fn log_counter(counter: &mut u32) { - *counter += 1; - log::info!("Outer loop iteration: {}", *counter); - tokio::time::sleep(Duration::from_millis(100)).await; -} - /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node async fn read_message_from_stellar(connector: &mut Connector) -> Result { // holds the number of bytes that were missing from the previous stellar message. let mut lack_bytes_from_prev = 0; let mut readbuf: Vec = vec![]; let mut buff_for_reading = vec![0; 4]; + loop { - //log::info!("read loop"); // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message match connector.tcp_stream.read(&mut buff_for_reading).await { Ok(size) if size == 0 => { - //if we remove the yield here, we have the risk that the process will become a + // if we remove the yield here, we have the risk that the process will become a // "busy one", never letting other processes handle the exit tokio::task::yield_now().await; sleep(Duration::from_millis(100)).await; @@ -122,7 +104,7 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result Result { - continue - }, + Ok(None) => continue, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { - log::info!("read_message_from_stellar(): ERROR: {e:?}"); + log::trace!("read_message_from_stellar(): ERROR: {e:?}"); return Err(e) }, } @@ -158,19 +138,17 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result { - continue - }, + Ok(None) => continue, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { - log::info!("read_message_from_stellar(): ERROR: {e:?}"); + log::trace!("read_message_from_stellar(): ERROR: {e:?}"); return Err(e) }, } }, Err(e) => { - log::info!("read_message_from_stellar(): ERROR reading messages: {e:?}"); + log::trace!("read_message_from_stellar(): ERROR reading messages: {e:?}"); return Err(Error::ReadFailed(e.to_string())) }, } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index 0bfe685fb..1145dddb6 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -1,6 +1,7 @@ -use async_std::io::{self, WriteExt}; +use async_std::io::WriteExt; use std::time::Duration; use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage}; +use tokio::time::timeout; use crate::connection::{ flow_controller::MAX_FLOOD_MSG_CAP, @@ -14,12 +15,15 @@ impl Connector { // Create the XDR message outside the closure let xdr_msg = self.create_xdr_message(msg)?; - io::timeout(Duration::from_secs(self.timeout_in_secs), async { - self.tcp_stream.write_all(&xdr_msg).await?; - Ok(()) - }) + match timeout( + Duration::from_secs(self.timeout_in_secs), + self.tcp_stream.write_all(&xdr_msg), + ) .await - .map_err(|_| Error::Timeout) + { + Ok(res) => res.map_err(|e| Error::WriteFailed(e.to_string())), + Err(_) => Err(Error::Timeout), + } } pub async fn send_hello_message(&mut self) -> Result<(), Error> { diff --git a/clients/stellar-relay-lib/src/connection/error.rs b/clients/stellar-relay-lib/src/connection/error.rs index 302969dee..ba3438e71 100644 --- a/clients/stellar-relay-lib/src/connection/error.rs +++ b/clients/stellar-relay-lib/src/connection/error.rs @@ -1,8 +1,8 @@ #![allow(dead_code)] //todo: remove after being tested and implemented use crate::{connection::xdr_converter::Error as XDRError, helper::error_to_string}; -use async_std::channel::SendError; use substrate_stellar_sdk::{types::ErrorCode, StellarSdkError}; +use tokio::sync; #[derive(Debug, err_derive::Error)] pub enum Error { @@ -52,7 +52,7 @@ pub enum Error { XDRConversionError(XDRError), #[error(display = "{:?}", _0)] - TcpStreamError(async_std::io::Error), + StdIOError(std::io::Error), #[error(display = "{:?}", _0)] StellarSdkError(StellarSdkError), @@ -73,9 +73,9 @@ pub enum Error { VersionStrTooLong, } -impl From for Error { - fn from(e: async_std::io::Error) -> Self { - Error::TcpStreamError(e) +impl From for Error { + fn from(e: std::io::Error) -> Self { + Error::StdIOError(e) } } @@ -85,8 +85,8 @@ impl From for Error { } } -impl From> for Error { - fn from(e: SendError) -> Self { +impl From> for Error { + fn from(e: sync::mpsc::error::SendError) -> Self { Error::SendFailed(e.to_string()) } } diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index a1b48e613..2a3a37cc7 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -58,7 +58,7 @@ impl StellarOverlayConnection { pub fn listen(&mut self) -> Result, Error> { loop { if !self.is_alive() { - self.disconnect(); + self.stop(); return Err(Error::Disconnected) } @@ -82,21 +82,20 @@ impl StellarOverlayConnection { let is_closed = self.sender.is_closed(); if is_closed { - log::info!("is_alive(): not alive anymore"); - self.disconnect(); + self.stop(); } !is_closed } - pub fn disconnect(&mut self) { - log::info!("disconnect(): closing connection to overlay network"); + pub fn stop(&mut self) { + log::info!("stop(): closing connection to overlay network"); self.receiver.close(); } } -// impl Drop for StellarOverlayConnection { -// fn drop(&mut self) { -// self.disconnect(); -// } -// } +impl Drop for StellarOverlayConnection { + fn drop(&mut self) { + self.stop(); + } +} diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index 15d53118a..22d1c0078 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -41,12 +41,12 @@ fn overlay_infos(is_mainnet: bool) -> (NodeInfo, ConnectionInfo) { ) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_should_receive_scp_messages() { - // let it run for a second, making sure that the other tests have successfully shutdown + // let it run for a few seconds, making sure that the other tests have successfully shutdown // their connection to Stellar Node - sleep(Duration::from_secs(3)); + sleep(Duration::from_secs(2)); let (node_info, conn_info) = overlay_infos(false); @@ -63,7 +63,7 @@ async fn stellar_overlay_should_receive_scp_messages() { if let Ok(Some(msg)) = ov_conn_locked.listen() { scps_vec_clone.lock().await.push(msg); - ov_conn_locked.disconnect(); + ov_conn_locked.stop(); } }) .await @@ -74,13 +74,12 @@ async fn stellar_overlay_should_receive_scp_messages() { assert!(!scps_vec.lock().await.is_empty()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_should_receive_tx_set() { - env_logger::init(); - // let it run for a second, making sure that the other tests have successfully shutdown + // let it run for a few seconds, making sure that the other tests have successfully shutdown // their connection to Stellar Node - sleep(Duration::from_secs(3)); + sleep(Duration::from_secs(2)); //arrange fn get_tx_set_hash(x: &ScpStatementExternalize) -> Hash { @@ -116,15 +115,11 @@ async fn stellar_overlay_should_receive_tx_set() { StellarMessage::TxSet(set) => { let tx_set_hash = set.into_hash().expect("should return a hash"); actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - - //ov_conn_locked.disconnect(); break }, StellarMessage::GeneralizedTxSet(set) => { let tx_set_hash = set.into_hash().expect("should return a hash"); actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - - //ov_conn_locked.disconnect(); break }, _ => {}, @@ -144,12 +139,12 @@ async fn stellar_overlay_should_receive_tx_set() { assert!(expected_hashes.contains(&actual_hashes[0])) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_disconnect_works() { - // let it run for a second, making sure that the other tests have successfully shutdown + // let it run for a few seconds, making sure that the other tests have successfully shutdown // their connection to Stellar Node - sleep(Duration::from_secs(3)); + sleep(Duration::from_secs(2)); let (node_info, conn_info) = overlay_infos(false); @@ -158,7 +153,7 @@ async fn stellar_overlay_disconnect_works() { // let it run for a second, before disconnecting. sleep(Duration::from_secs(1)); - overlay_connection.disconnect(); + overlay_connection.stop(); // let the disconnection call pass for a few seconds, before checking its status. sleep(Duration::from_secs(3)); diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 20ea8db62..bcc433d35 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -24,7 +24,6 @@ pub struct OracleAgent { pub is_public_network: bool, message_sender: Option, shutdown_sender: ShutdownSender, - disconnect_signal_sender: mpsc::UnboundedSender<()>, } /// listens to data to collect the scp messages and txsets. @@ -77,35 +76,25 @@ pub async fn start_oracle_agent( ))); let collector_clone = collector.clone(); - // a clone used to forcefully call a shutdown, when StellarOverlay disconnects. let shutdown_sender_clone = shutdown_sender.clone(); + // a clone used to forcefully call a shutdown, when StellarOverlay disconnects. let shutdown_sender_clone2 = shutdown_sender.clone(); // disconnect signal sender tells the StellarOverlayConnection to close its TcpStream to Stellar // Node - let (disconnect_signal_sender, mut disconnect_signal_receiver) = - mpsc::unbounded_channel::<()>(); - let disconnect_signal_sender_clone = disconnect_signal_sender.clone(); - - // tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { - // tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); - // if let Err(e) = disconnect_signal_sender.send(()) { - // tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); - // } - // })); + let (disconnect_signal_sender, mut disconnect_signal_receiver) = mpsc::channel::<()>(2); service::spawn_cancelable(shutdown_sender_clone.subscribe(), async move { let sender_clone = overlay_conn.sender(); loop { - // match disconnect_signal_receiver.try_recv() { - // // if a disconnect signal was sent, disconnect from Stellar. - // Ok(_) | Err(TryRecvError::Disconnected) => { - // tracing::info!("start_oracle_agent(): disconnect overlay..."); - // overlay_conn.disconnect(); - // break - // }, - // Err(TryRecvError::Empty) => {}, - // } + match disconnect_signal_receiver.try_recv() { + // if a disconnect signal was sent, disconnect from Stellar. + Ok(_) | Err(TryRecvError::Disconnected) => { + tracing::info!("start_oracle_agent(): disconnect overlay..."); + break + }, + Err(TryRecvError::Empty) => {}, + } // listen for messages from Stellar match overlay_conn.listen() { @@ -122,43 +111,42 @@ pub async fn start_oracle_agent( Ok(None) => {}, // connection got lost Err(e) => { - overlay_conn.disconnect(); tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}"); - // if let Err(e) = shutdown_sender_clone2.send(()) { - // tracing::error!( - // "start_oracle_agent(): Failed to send shutdown signal in thread: {e:?}" - // ); - // } + + if let Err(e) = shutdown_sender_clone2.send(()) { + tracing::error!( + "start_oracle_agent(): Failed to send shutdown signal in thread: {e:?}" + ); + } break }, } } - tracing::info!("start_oracle_agent(): ------- THE SPAWNED THREAD HAS STOPPED --------"); + + // shutdown the overlay connection + overlay_conn.stop(); }); + tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { + tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); + if let Err(e) = disconnect_signal_sender.send(()).await { + tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); + } + })); + Ok(OracleAgent { collector, is_public_network: false, message_sender: Some(sender), shutdown_sender, - disconnect_signal_sender: disconnect_signal_sender_clone, }) } -// impl Drop for OracleAgent { -// fn drop(&mut self) { -// tracing::info!("OracleAgent: Dropping OracleAgent..."); -// if let Err(e) = self.shutdown_sender.send(()) { -// tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); -// } - -// if let Err(e) = self.disconnect_signal_sender.send(()) { -// tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); -// } else { -// tracing::info!("start_oracle_agent(): message was sent!!!"); -// } -// } -// } +impl Drop for OracleAgent { + fn drop(&mut self) { + self.stop(); + } +} impl OracleAgent { /// This method returns the proof for a given slot or an error if the proof cannot be provided. @@ -209,6 +197,14 @@ impl OracleAgent { pub async fn remove_data(&self, slot: &Slot) { self.collector.read().await.remove_data(slot); } + + /// Stops listening for new SCP messages. + pub fn stop(&self) { + tracing::info!("stop(): Shutting down OracleAgent..."); + if let Err(e) = self.shutdown_sender.send(()) { + tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); + } + } } #[cfg(test)] @@ -225,6 +221,7 @@ mod tests { #[ntest::timeout(1_800_000)] // timeout at 30 minutes #[serial] async fn test_get_proof_for_current_slot() { + env_logger::init(); let shutdown_sender = ShutdownSender::new(); // We use a random secret key to avoid conflicts with other tests. @@ -243,10 +240,10 @@ mod tests { sleep(Duration::from_secs(1)).await; latest_slot = agent.last_slot_index().await; } - // use a future slot (2 slots ahead) to ensure enough messages can be collected + // use a future slot (1 slots ahead) to ensure enough messages can be collected // and to avoid "missed" messages. - latest_slot += 2; - + latest_slot += 1; + sleep(Duration::from_secs(5)).await; let proof_result = agent.get_proof(latest_slot).await; assert!(proof_result.is_ok(), "Failed to get proof for slot: {}", latest_slot); } @@ -315,7 +312,6 @@ mod tests { #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot_fails_without_archives() { - env_logger::init(); let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); diff --git a/clients/vault/src/oracle/collector/handler.rs b/clients/vault/src/oracle/collector/handler.rs index 4fac5a5e7..8b788381f 100644 --- a/clients/vault/src/oracle/collector/handler.rs +++ b/clients/vault/src/oracle/collector/handler.rs @@ -36,10 +36,7 @@ impl ScpMessageCollector { tracing::debug!( "Handling Incoming ScpEnvelopes for slot {slot}: requesting TxSet..." ); - message_sender - .send(StellarMessage::GetTxSet(txset_hash)) - .await - .map_err(|e| Error::Other(format!("Send Error: {e:?}")))?; + message_sender.send(StellarMessage::GetTxSet(txset_hash)).await?; // let's save this for creating the proof later on. self.save_txset_hash_and_slot(txset_hash, slot); From ce5bd9e5775db5c82f88795d19d3f81c87be0714 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Wed, 24 Jan 2024 14:25:45 +0800 Subject: [PATCH 06/10] fix the failing test about current slot --- clients/stellar-relay-lib/src/overlay.rs | 1 - clients/stellar-relay-lib/src/tests/mod.rs | 12 ------------ clients/vault/src/oracle/agent.rs | 22 +++++++++++++++++----- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index 2a3a37cc7..c311fa29e 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -58,7 +58,6 @@ impl StellarOverlayConnection { pub fn listen(&mut self) -> Result, Error> { loop { if !self.is_alive() { - self.stop(); return Err(Error::Disconnected) } diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index 22d1c0078..26e77bf72 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -44,10 +44,6 @@ fn overlay_infos(is_mainnet: bool) -> (NodeInfo, ConnectionInfo) { #[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_should_receive_scp_messages() { - // let it run for a few seconds, making sure that the other tests have successfully shutdown - // their connection to Stellar Node - sleep(Duration::from_secs(2)); - let (node_info, conn_info) = overlay_infos(false); let overlay_connection = Arc::new(Mutex::new( @@ -77,10 +73,6 @@ async fn stellar_overlay_should_receive_scp_messages() { #[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_should_receive_tx_set() { - // let it run for a few seconds, making sure that the other tests have successfully shutdown - // their connection to Stellar Node - sleep(Duration::from_secs(2)); - //arrange fn get_tx_set_hash(x: &ScpStatementExternalize) -> Hash { let scp_value = x.commit.value.get_vec(); @@ -142,10 +134,6 @@ async fn stellar_overlay_should_receive_tx_set() { #[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_disconnect_works() { - // let it run for a few seconds, making sure that the other tests have successfully shutdown - // their connection to Stellar Node - sleep(Duration::from_secs(2)); - let (node_info, conn_info) = overlay_infos(false); let mut overlay_connection = diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index bcc433d35..89bddb598 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -128,7 +128,7 @@ pub async fn start_oracle_agent( }); tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { - tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); + tracing::debug!("start_oracle_agent(): sending signal to shutdown overlay connection..."); if let Err(e) = disconnect_signal_sender.send(()).await { tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); } @@ -200,7 +200,7 @@ impl OracleAgent { /// Stops listening for new SCP messages. pub fn stop(&self) { - tracing::info!("stop(): Shutting down OracleAgent..."); + tracing::debug!("stop(): Shutting down OracleAgent..."); if let Err(e) = self.shutdown_sender.send(()) { tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); } @@ -221,6 +221,10 @@ mod tests { #[ntest::timeout(1_800_000)] // timeout at 30 minutes #[serial] async fn test_get_proof_for_current_slot() { + // let it run for a few seconds, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(2)).await; + env_logger::init(); let shutdown_sender = ShutdownSender::new(); @@ -240,10 +244,10 @@ mod tests { sleep(Duration::from_secs(1)).await; latest_slot = agent.last_slot_index().await; } - // use a future slot (1 slots ahead) to ensure enough messages can be collected + // use a future slot (2 slots ahead) to ensure enough messages can be collected // and to avoid "missed" messages. - latest_slot += 1; - sleep(Duration::from_secs(5)).await; + latest_slot += 2; + let proof_result = agent.get_proof(latest_slot).await; assert!(proof_result.is_ok(), "Failed to get proof for slot: {}", latest_slot); } @@ -251,6 +255,10 @@ mod tests { #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot() { + // let it run for a few seconds, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(2)).await; + let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); @@ -278,6 +286,10 @@ mod tests { #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot_with_fallback() { + // let it run for a few seconds, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(2)).await; + let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); From 813639401b89eaec7c99462cd0981391874250bf Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Thu, 25 Jan 2024 13:55:50 +0800 Subject: [PATCH 07/10] fix the failing test about current slot, by connecting to specifically different nodes --- .../connection/connector/message_handler.rs | 3 +- .../connection/connector/message_reader.rs | 39 +++++++------------ .../stellar-relay-lib/src/connection/error.rs | 9 ----- .../stellar_relay_config_frankfurt.json | 6 +-- .../mainnet/stellar_relay_config_iowa.json | 6 +-- .../stellar_relay_config_singapore.json | 6 +-- .../stellar_relay_config_sdftest1.json | 4 +- .../stellar_relay_config_sdftest2.json | 4 +- .../stellar_relay_config_sdftest3.json | 4 +- clients/vault/src/oracle/agent.rs | 11 +++--- .../vault/src/oracle/collector/collector.rs | 4 +- clients/vault/src/oracle/storage/impls.rs | 6 +-- clients/vault/src/oracle/testing_utils.rs | 38 ++++++++++++++---- clients/vault/tests/helper/mod.rs | 4 +- 14 files changed, 73 insertions(+), 71 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs index 87d2c2fde..b4101ea71 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs @@ -103,7 +103,6 @@ impl Connector { String::from_utf8(other.to_base64_xdr()) .unwrap_or(format!("{:?}", other.to_base64_xdr())) ); - self.check_to_send_more(msg_type).await?; return Ok(Some(other)) }, @@ -124,6 +123,8 @@ impl Connector { self.local().node().overlay_version, remote.node().overlay_version, ); + } else { + log::warn!("process_auth_message(): No remote overlay version after handshake."); } self.check_to_send_more(MessageType::Auth).await diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 87a09ed99..4e308c7ad 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -1,12 +1,7 @@ -use std::net::Shutdown; - use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; use async_std::io::ReadExt; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; -use tokio::{ - sync::{mpsc, mpsc::error::TryRecvError}, - time::{sleep, Duration}, -}; +use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// Polls for messages coming from the Stellar Node and communicates it back to the user /// @@ -35,10 +30,7 @@ pub(crate) async fn poll_messages_from_stellar( if let Err(e) = connector.send_to_node(msg).await { log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, - Err(TryRecvError::Disconnected) => { - log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); - break - }, + Err(TryRecvError::Disconnected) => break, Err(TryRecvError::Empty) => {}, } @@ -67,14 +59,13 @@ pub(crate) async fn poll_messages_from_stellar( }, } } - // make sure to shutdown the stream - if let Err(e) = connector.tcp_stream.shutdown(Shutdown::Both) { - log::error!("poll_messages_from_stellar(): Failed to shutdown the tcp stream: {e:?}"); - }; + + // make sure to shutdown the connector + connector.stop(); send_to_node_receiver.close(); drop(send_to_user_sender); - log::info!("poll_messages_from_stellar(): stopped."); + log::debug!("poll_messages_from_stellar(): stopped."); } /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node @@ -89,13 +80,7 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result { - // if we remove the yield here, we have the risk that the process will become a - // "busy one", never letting other processes handle the exit - tokio::task::yield_now().await; - sleep(Duration::from_millis(100)).await; - continue - }, + Ok(size) if size == 0 => continue, Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, // then check the size of next stellar message. @@ -105,8 +90,6 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result Result Result for Error { - fn from(e: std::io::Error) -> Self { - Error::StdIOError(e) - } -} - impl From for Error { fn from(e: XDRError) -> Self { Error::XDRConversionError(e) diff --git a/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json b/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json index e44f7e5fa..bed1d57d2 100644 --- a/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json +++ b/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json @@ -4,10 +4,10 @@ "port": 11625 }, "node_info": { - "ledger_version": 20, - "overlay_version": 31, + "ledger_version": 19, + "overlay_version": 29, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 19.14.0 (5664eff4e76ca6a277883d4085711dc3fa7c318a)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json b/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json index 2f60d1303..b5343009c 100644 --- a/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json +++ b/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json @@ -4,10 +4,10 @@ "port": 11625 }, "node_info": { - "ledger_version": 20, - "overlay_version": 31, + "ledger_version": 19, + "overlay_version": 29, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 19.14.0 (5664eff4e76ca6a277883d4085711dc3fa7c318a)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json b/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json index 8af20ed9e..7b1d0434d 100644 --- a/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json +++ b/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json @@ -4,10 +4,10 @@ "port": 11625 }, "node_info": { - "ledger_version": 20, - "overlay_version": 31, + "ledger_version": 19, + "overlay_version": 29, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 19.14.0 (5664eff4e76ca6a277883d4085711dc3fa7c318a)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json index cee8b5f6b..2fc5026d4 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 20, - "overlay_version": 31, + "overlay_version": 30, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 20.0.0.rc1 (ecb24df104c2453a00fa5097d2e879d7731b9596)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json index 894a57110..915ad9b52 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 20, - "overlay_version": 31, + "overlay_version": 30, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 20.0.0.rc1 (ecb24df104c2453a00fa5097d2e879d7731b9596)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json index 1f0a7291f..8b4fc2f22 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 20, - "overlay_version": 31, + "overlay_version": 30, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 20.0.0.rc1 (ecb24df104c2453a00fa5097d2e879d7731b9596)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 89bddb598..77ff5fff5 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -210,7 +210,7 @@ impl OracleAgent { #[cfg(test)] mod tests { use crate::oracle::{ - get_random_secret_key, get_test_secret_key, get_test_stellar_relay_config, + get_random_secret_key, get_test_secret_key, specific_stellar_relay_config, traits::ArchiveStorage, ScpArchiveStorage, TransactionsArchiveStorage, }; @@ -225,12 +225,11 @@ mod tests { // their connection to Stellar Node sleep(Duration::from_secs(2)).await; - env_logger::init(); let shutdown_sender = ShutdownSender::new(); // We use a random secret key to avoid conflicts with other tests. let agent = start_oracle_agent( - get_test_stellar_relay_config(true), + specific_stellar_relay_config(true, 0), &get_random_secret_key(), shutdown_sender, ) @@ -264,7 +263,7 @@ mod tests { let shutdown_sender = ShutdownSender::new(); let agent = start_oracle_agent( - get_test_stellar_relay_config(true), + specific_stellar_relay_config(true, 1), &get_test_secret_key(true), shutdown_sender, ) @@ -293,7 +292,7 @@ mod tests { let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); - let base_config = get_test_stellar_relay_config(true); + let base_config = specific_stellar_relay_config(true, 2); // We add two fake archive urls to the config to make sure that the agent will actually fall // back to other archives. let mut archive_urls = base_config.stellar_history_archive_urls().clone(); @@ -327,7 +326,7 @@ mod tests { let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); - let base_config = get_test_stellar_relay_config(true); + let base_config = specific_stellar_relay_config(true, 0); let modified_config: StellarOverlayConfig = StellarOverlayConfig { stellar_history_archive_urls: vec![], ..base_config }; diff --git a/clients/vault/src/oracle/collector/collector.rs b/clients/vault/src/oracle/collector/collector.rs index 3a9e3aac9..97f014c98 100644 --- a/clients/vault/src/oracle/collector/collector.rs +++ b/clients/vault/src/oracle/collector/collector.rs @@ -234,7 +234,7 @@ mod test { use crate::oracle::{ collector::{collector::AddTxSet, ScpMessageCollector}, - get_test_stellar_relay_config, + random_stellar_relay_config, traits::FileHandler, EnvelopesFileHandler, }; @@ -273,7 +273,7 @@ mod test { } fn stellar_history_archive_urls() -> Vec { - get_test_stellar_relay_config(true).stellar_history_archive_urls() + random_stellar_relay_config(true).stellar_history_archive_urls() } #[test] diff --git a/clients/vault/src/oracle/storage/impls.rs b/clients/vault/src/oracle/storage/impls.rs index 28be05d8a..e4dd33677 100644 --- a/clients/vault/src/oracle/storage/impls.rs +++ b/clients/vault/src/oracle/storage/impls.rs @@ -160,8 +160,8 @@ mod test { use crate::oracle::{ constants::MAX_SLOTS_PER_FILE, errors::Error, - get_test_stellar_relay_config, impls::ArchiveStorage, + random_stellar_relay_config, storage::{ traits::{FileHandler, FileHandlerExt}, EnvelopesFileHandler, @@ -174,7 +174,7 @@ mod test { impl Default for ScpArchiveStorage { fn default() -> Self { - let cfg = get_test_stellar_relay_config(true); + let cfg = random_stellar_relay_config(true); let archive_urls = cfg.stellar_history_archive_urls(); let archive_url = archive_urls.first().expect("should have an archive url"); ScpArchiveStorage(archive_url.clone()) @@ -183,7 +183,7 @@ mod test { impl Default for TransactionsArchiveStorage { fn default() -> Self { - let cfg = get_test_stellar_relay_config(true); + let cfg = random_stellar_relay_config(true); let archive_urls = cfg.stellar_history_archive_urls(); let archive_url = archive_urls.first().expect("should have an archive url"); TransactionsArchiveStorage(archive_url.clone()) diff --git a/clients/vault/src/oracle/testing_utils.rs b/clients/vault/src/oracle/testing_utils.rs index e5e10760b..6f245cd6e 100644 --- a/clients/vault/src/oracle/testing_utils.rs +++ b/clients/vault/src/oracle/testing_utils.rs @@ -1,19 +1,43 @@ use stellar_relay_lib::sdk::SecretKey; -pub fn get_test_stellar_relay_config(is_mainnet: bool) -> stellar_relay_lib::StellarOverlayConfig { +pub fn random_stellar_relay_config(is_mainnet: bool) -> stellar_relay_lib::StellarOverlayConfig { use rand::seq::SliceRandom; - let stellar_node_points: Vec<&str> = if is_mainnet { + let (stellar_node_points, dir) = stellar_relay_config_choices(is_mainnet); + + let node_point = stellar_node_points + .choose(&mut rand::thread_rng()) + .expect("should return a value"); + + stellar_relay_config_abs_path(dir, node_point) +} + +pub fn specific_stellar_relay_config( + is_mainnet: bool, + index: usize, +) -> stellar_relay_lib::StellarOverlayConfig { + let (stellar_node_points, dir) = stellar_relay_config_choices(is_mainnet); + + let node_point = stellar_node_points.get(index).expect("should return a value"); + + stellar_relay_config_abs_path(dir, node_point) +} + +fn stellar_relay_config_choices(is_mainnet: bool) -> (Vec<&'static str>, &'static str) { + let node_points = if is_mainnet { vec!["frankfurt", "iowa", "singapore"] } else { vec!["sdftest1", "sdftest2", "sdftest3"] }; - let dir = if is_mainnet { "mainnet" } else { "testnet" }; - let res = stellar_node_points - .choose(&mut rand::thread_rng()) - .expect("should return a value"); - let path_string = format!("./resources/config/{dir}/stellar_relay_config_{res}.json"); + let dir = if is_mainnet { "mainnet" } else { "testnet" }; + (node_points, dir) +} +fn stellar_relay_config_abs_path( + dir: &str, + node_point: &str, +) -> stellar_relay_lib::StellarOverlayConfig { + let path_string = format!("./resources/config/{dir}/stellar_relay_config_{node_point}.json"); stellar_relay_lib::StellarOverlayConfig::try_from_path(path_string.as_str()) .expect("should be able to extract config") diff --git a/clients/vault/tests/helper/mod.rs b/clients/vault/tests/helper/mod.rs index 7747f0b10..caa17e9b5 100644 --- a/clients/vault/tests/helper/mod.rs +++ b/clients/vault/tests/helper/mod.rs @@ -20,7 +20,7 @@ use std::{future::Future, sync::Arc}; use stellar_relay_lib::StellarOverlayConfig; use tokio::sync::RwLock; use vault::{ - oracle::{get_test_secret_key, get_test_stellar_relay_config, start_oracle_agent, OracleAgent}, + oracle::{get_test_secret_key, random_stellar_relay_config, start_oracle_agent, OracleAgent}, ArcRwLock, }; use wallet::StellarWallet; @@ -28,7 +28,7 @@ use wallet::StellarWallet; pub type StellarPublicKey = [u8; 32]; lazy_static! { - pub static ref CFG: StellarOverlayConfig = get_test_stellar_relay_config(false); + pub static ref CFG: StellarOverlayConfig = random_stellar_relay_config(false); pub static ref SECRET_KEY: String = get_test_secret_key(false); // TODO clean this up by extending the `get_test_secret_key()` function pub static ref DESTINATION_SECRET_KEY: String = "SDNQJEIRSA6YF5JNS6LQLCBF2XVWZ2NJV3YLC322RGIBJIJRIRGWKLEF".to_string(); From 514631577a1c083acc55953affccc1c451fcf704 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Thu, 25 Jan 2024 15:18:39 +0800 Subject: [PATCH 08/10] update config files --- .../config/mainnet/stellar_relay_config_frankfurt.json | 6 +++--- .../resources/config/mainnet/stellar_relay_config_iowa.json | 6 +++--- .../config/mainnet/stellar_relay_config_singapore.json | 6 +++--- .../config/testnet/stellar_relay_config_sdftest1.json | 4 ++-- .../config/testnet/stellar_relay_config_sdftest2.json | 4 ++-- .../config/testnet/stellar_relay_config_sdftest3.json | 4 ++-- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json b/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json index bed1d57d2..e44f7e5fa 100644 --- a/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json +++ b/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json @@ -4,10 +4,10 @@ "port": 11625 }, "node_info": { - "ledger_version": 19, - "overlay_version": 29, + "ledger_version": 20, + "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 19.14.0 (5664eff4e76ca6a277883d4085711dc3fa7c318a)", + "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json b/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json index b5343009c..2f60d1303 100644 --- a/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json +++ b/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json @@ -4,10 +4,10 @@ "port": 11625 }, "node_info": { - "ledger_version": 19, - "overlay_version": 29, + "ledger_version": 20, + "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 19.14.0 (5664eff4e76ca6a277883d4085711dc3fa7c318a)", + "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json b/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json index 7b1d0434d..8af20ed9e 100644 --- a/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json +++ b/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json @@ -4,10 +4,10 @@ "port": 11625 }, "node_info": { - "ledger_version": 19, - "overlay_version": 29, + "ledger_version": 20, + "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 19.14.0 (5664eff4e76ca6a277883d4085711dc3fa7c318a)", + "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json index 2fc5026d4..d82db5ad8 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 20, - "overlay_version": 30, + "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 20.0.0.rc1 (ecb24df104c2453a00fa5097d2e879d7731b9596)", + "version_str": "stellar-core 20.2.0.rc1 (3076c138d77735c6ce8230886a540f4d54d85c59)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json index 915ad9b52..057dd44cf 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 20, - "overlay_version": 30, + "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 20.0.0.rc1 (ecb24df104c2453a00fa5097d2e879d7731b9596)", + "version_str": "stellar-core 20.2.0.rc1 (3076c138d77735c6ce8230886a540f4d54d85c59)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json index 8b4fc2f22..c9334f817 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 20, - "overlay_version": 30, + "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 20.0.0.rc1 (ecb24df104c2453a00fa5097d2e879d7731b9596)", + "version_str": "stellar-core 20.2.0.rc1 (3076c138d77735c6ce8230886a540f4d54d85c59)", "is_pub_net": false }, "stellar_history_archive_urls": [] From 5d15bf0ea6fe5f4cce649fbce1e611296d616dc2 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Thu, 25 Jan 2024 19:48:21 +0800 Subject: [PATCH 09/10] use a different account for testing --- clients/wallet/src/horizon/tests.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/clients/wallet/src/horizon/tests.rs b/clients/wallet/src/horizon/tests.rs index 348de6ef4..a8921153c 100644 --- a/clients/wallet/src/horizon/tests.rs +++ b/clients/wallet/src/horizon/tests.rs @@ -20,7 +20,9 @@ use crate::types::FilterWith; use super::*; -const SECRET: &str = "SBLI7RKEJAEFGLZUBSCOFJHQBPFYIIPLBCKN7WVCWT4NEG2UJEW33N73"; +// const SECRET: &str = "SBLI7RKEJAEFGLZUBSCOFJHQBPFYIIPLBCKN7WVCWT4NEG2UJEW33N73"; + +const SECRET: &str = "SB6WHKIU2HGVBRNKNOEOQUY4GFC4ZLG5XPGWLEAHTIZXBXXYACC76VSQ"; #[derive(Clone)] struct MockFilter; @@ -196,6 +198,9 @@ async fn fetch_horizon_and_process_new_transactions_success() { let horizon_client = reqwest::Client::new(); let secret = secret_key_from_encoding(SECRET); + let pubkey= secret.get_public().to_encoding(); + let x = std::str::from_utf8(&pubkey); + println!("public key: {x:?}" ); let mut fetcher = HorizonFetcher::new(horizon_client, secret.get_public().clone(), false); assert!(slot_env_map.read().await.is_empty()); From cce0b3d25bf9e4b8add7b53fe38f1cdad1f74791 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Thu, 25 Jan 2024 21:05:17 +0800 Subject: [PATCH 10/10] fix rustfmt --- clients/wallet/src/horizon/tests.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/clients/wallet/src/horizon/tests.rs b/clients/wallet/src/horizon/tests.rs index a8921153c..dfbd9e845 100644 --- a/clients/wallet/src/horizon/tests.rs +++ b/clients/wallet/src/horizon/tests.rs @@ -198,9 +198,6 @@ async fn fetch_horizon_and_process_new_transactions_success() { let horizon_client = reqwest::Client::new(); let secret = secret_key_from_encoding(SECRET); - let pubkey= secret.get_public().to_encoding(); - let x = std::str::from_utf8(&pubkey); - println!("public key: {x:?}" ); let mut fetcher = HorizonFetcher::new(horizon_client, secret.get_public().clone(), false); assert!(slot_env_map.read().await.is_empty());