Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace tokio's TcpStream with async-std #481

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clients/stellar-relay-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tokio = { version = "1.0", features = [
"sync", # to make channels available
"time" # for timeouts and sleep, when reconnecting
] }
async-std = { version = "1.12.0", features = ["attributes"] }

[features]
std = [
Expand Down
79 changes: 38 additions & 41 deletions clients/stellar-relay-lib/src/connection/connector/connector.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use async_std::net::TcpStream;
use std::{
fmt::{Debug, Formatter},
net::TcpStream,
sync::{Arc, Mutex},
time::Duration,
net::Shutdown,
};
use substrate_stellar_sdk::{
types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType},
Expand Down Expand Up @@ -37,7 +36,7 @@ pub struct Connector {
flow_controller: FlowController,

/// for writing/reading xdr messages to/from Stellar Node.
pub(crate) tcp_stream: Arc<Mutex<TcpStream>>,
pub(crate) tcp_stream: TcpStream,
}

impl Debug for Connector {
Expand All @@ -53,10 +52,32 @@ impl Debug for Connector {
.field("receive_scp_messages", &self.receive_scp_messages)
.field("handshake_state", &self.handshake_state)
.field("flow_controller", &self.flow_controller)
.field(
"local_addr",
&self
.tcp_stream
.local_addr()
.map(|addr| addr.to_string())
.unwrap_or("cannot provide".to_string()),
)
.field(
"peer_addr",
&self
.tcp_stream
.peer_addr()
.map(|addr| addr.to_string())
.unwrap_or("cannot provide".to_string()),
)
.finish()
}
}

impl Drop for Connector {
fn drop(&mut self) {
self.stop();
}
}

impl Connector {
/// Verifies the AuthenticatedMessage, received from the Stellar Node
pub(super) fn verify_auth(
Expand Down Expand Up @@ -115,22 +136,17 @@ impl Connector {

/// returns a Connector and starts creating a connection to Stellar
pub async fn start(local_node: NodeInfo, conn_info: ConnectionInfo) -> Result<Self, Error> {
// Create the stream
let tcp_stream = TcpStream::connect(conn_info.address())
.await
.map_err(|e| Error::ConnectionFailed(e.to_string()))?;

let connection_auth = ConnectionAuth::new(
&local_node.network_id,
conn_info.keypair(),
conn_info.auth_cert_expiration,
);

// Create the stream
let tcp_stream = TcpStream::connect(conn_info.address())
.map_err(|e| Error::ConnectionFailed(e.to_string()))?;

if let Err(e) =
tcp_stream.set_read_timeout(Some(Duration::from_secs(conn_info.timeout_in_secs)))
{
log::warn!("start(): failed to set read timeout for the stream: {e:?}");
}

let mut connector = Connector {
local: LocalInfo::new(local_node),
remote_info: None,
Expand All @@ -142,14 +158,20 @@ impl Connector {
receive_scp_messages: conn_info.recv_scp_msgs,
handshake_state: HandshakeState::Connecting,
flow_controller: FlowController::default(),
tcp_stream: Arc::new(Mutex::new(tcp_stream)),
tcp_stream,
};

// To start the handshake, send a hello message to Stellar
connector.send_hello_message().await?;

Ok(connector)
}

pub fn stop(&mut self) {
if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) {
log::error!("stop(): failed to shutdown tcp stream: {}", e);
}
}
}

// getters setters
Expand Down Expand Up @@ -231,7 +253,6 @@ impl Connector {
mod test {
use crate::{connection::hmac::HMacKeys, node::RemoteInfo, StellarOverlayConfig};
use serial_test::serial;
use std::net::Shutdown;

use substrate_stellar_sdk::{
compound_types::LimitedString,
Expand Down Expand Up @@ -263,16 +284,6 @@ mod test {
new_auth_cert
}

impl Connector {
fn shutdown(&mut self) {
self.tcp_stream
.lock()
.unwrap()
.shutdown(Shutdown::Both)
.expect("should shutdown both read and write of stream");
}
}

async fn create_connector() -> (NodeInfo, ConnectionInfo, Connector) {
let cfg_file_path = "./resources/config/testnet/stellar_relay_config_sdftest1.json";
let secret_key_path = "./resources/secretkey/stellar_secretkey_testnet";
Expand All @@ -294,7 +305,7 @@ mod test {
#[tokio::test]
#[serial]
async fn create_new_connector_works() {
let (node_info, _, mut connector) = create_connector().await;
let (node_info, _, connector) = create_connector().await;

let connector_local_node = connector.local.node();

Expand All @@ -303,8 +314,6 @@ mod test {
assert_eq!(connector_local_node.overlay_min_version, node_info.overlay_min_version);
assert_eq!(connector_local_node.version_str, node_info.version_str);
assert_eq!(connector_local_node.network_id, node_info.network_id);

connector.shutdown();
}

#[tokio::test]
Expand All @@ -314,8 +323,6 @@ mod test {
assert_eq!(connector.local_sequence(), 0);
connector.increment_local_sequence();
assert_eq!(connector.local_sequence(), 1);

connector.shutdown();
}

#[tokio::test]
Expand All @@ -340,8 +347,6 @@ mod test {
connector.set_remote(RemoteInfo::new(&hello));

assert!(connector.remote().is_some());

connector.shutdown();
}

#[tokio::test]
Expand Down Expand Up @@ -370,8 +375,6 @@ mod test {
connector.increment_remote_sequence().unwrap();
connector.increment_remote_sequence().unwrap();
assert_eq!(connector.remote().unwrap().sequence(), 3);

connector.shutdown();
}

#[tokio::test]
Expand Down Expand Up @@ -408,8 +411,6 @@ mod test {
));
//assert
assert!(connector.hmac_keys().is_some());

connector.shutdown();
}

#[tokio::test]
Expand All @@ -426,8 +427,6 @@ mod test {

connector.handshake_completed();
assert!(connector.is_handshake_created());

connector.shutdown();
}

#[tokio::test]
Expand All @@ -437,7 +436,5 @@ mod test {

assert!(!connector.inner_check_to_send_more(MessageType::ScpMessage));
connector.enable_flow_controller(node_info.overlay_version, node_info.overlay_version);

connector.shutdown();
}
}
Loading
Loading