Skip to content

Commit

Permalink
cleanup clones; add names to threads
Browse files Browse the repository at this point in the history
  • Loading branch information
b-yap committed Aug 9, 2024
1 parent d8d6ec2 commit 8c00f3a
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 98 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion clients/runtime/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_trait::async_trait;
use codec::Encode;
use futures::{future::join_all, stream::StreamExt, FutureExt, SinkExt};
use jsonrpsee::core::{client::Client, JsonValue};
use jsonrpsee::tracing;
use subxt::{
blocks::ExtrinsicEvents,
client::OnlineClient,
Expand Down Expand Up @@ -1090,7 +1091,9 @@ impl IssuePallet for SpacewalkParachain {
let key_hash = issue_id.0.as_slice();
// last bytes are the raw key
let key = &key_hash[key_hash.len() - 32..];
issue_requests.push((H256::from_slice(key), request));
let key = H256::from_slice(key);
tracing::info!("get_all_active_issues: found {}", key.to_string());
issue_requests.push((key, request));
}
}
Ok(issue_requests)
Expand Down
2 changes: 1 addition & 1 deletion clients/stellar-relay-lib/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl StellarOverlayConfig {

#[allow(dead_code)]
pub(crate) fn node_info(&self) -> NodeInfo {
self.node_info.clone().into()
NodeInfo::new(&self.node_info)
}

#[allow(dead_code)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ mod test {
cert: new_auth_cert,
nonce: [0; 32],
};
connector.set_remote(RemoteInfo::new(&hello));
connector.set_remote(RemoteInfo::new(hello));

assert!(connector.remote().is_some());
}
Expand All @@ -364,7 +364,7 @@ mod test {
cert: new_auth_cert,
nonce: [0; 32],
};
connector.set_remote(RemoteInfo::new(&hello));
connector.set_remote(RemoteInfo::new(hello));
assert_eq!(connector.remote().unwrap().sequence(), 0);

connector.increment_remote_sequence().unwrap();
Expand Down Expand Up @@ -392,7 +392,7 @@ mod test {
cert: new_auth_cert,
nonce: [0; 32],
};
let remote = RemoteInfo::new(&hello);
let remote = RemoteInfo::new(hello);
let remote_nonce = remote.nonce();
connector.set_remote(remote.clone());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Connector {
return Err(Error::AuthCertInvalid)
}

let remote_info = RemoteInfo::new(&hello);
let remote_info = RemoteInfo::new(hello);
let shared_key = self.get_shared_key(remote_info.pub_key_ecdh());

self.set_hmac_keys(HMacKeys::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ pub(crate) async fn poll_messages_from_stellar(
};

match connector.process_raw_message(xdr).await {
Ok(Some(stellar_msg)) =>
// push message to user
if let Err(e) = send_to_user_sender.send(stellar_msg.clone()).await {
Ok(Some(stellar_msg)) => {
// push message to user
let stellar_msg_as_base64_xdr = stellar_msg.to_base64_xdr();
if let Err(e) = send_to_user_sender.send(stellar_msg).await {
warn!("poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}",
String::from_utf8(stellar_msg.to_base64_xdr())
.unwrap_or_else(|_| format!("{:?}", stellar_msg.to_base64_xdr()))
);
},
String::from_utf8(stellar_msg_as_base64_xdr.clone())
.unwrap_or_else(|_| format!("{stellar_msg_as_base64_xdr:?}"))
);
}
},
Ok(None) => {},
Err(e) => {
error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}");
Expand Down Expand Up @@ -110,16 +112,16 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result<Xdr, Err
// let's start reading the actual stellar message.
readbuf = vec![0; expect_msg_len];

match read_message(
match is_reading_complete(
connector,
&mut lack_bytes_from_prev,
&mut readbuf,
expect_msg_len,
)
.await
{
Ok(None) => continue,
Ok(Some(xdr)) => return Ok(xdr),
Ok(false) => continue,
Ok(true) => return Ok(readbuf),
Err(e) => {
trace!("read_message_from_stellar(): ERROR: {e:?}");
return Err(e)
Expand All @@ -134,11 +136,11 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result<Xdr, Err
buff_for_reading = vec![0; 4];

// let's read the continuation number of bytes from the previous message.
match read_unfinished_message(connector, &mut lack_bytes_from_prev, &mut readbuf)
match is_reading_unfinished_message_complete(connector, &mut lack_bytes_from_prev, &mut readbuf)
.await
{
Ok(None) => continue,
Ok(Some(xdr)) => return Ok(xdr),
Ok(false) => continue,
Ok(true) => return Ok(readbuf),
Err(e) => {
trace!("read_message_from_stellar(): ERROR: {e:?}");
return Err(e)
Expand All @@ -157,7 +159,7 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result<Xdr, Err
}
}

/// Returns Xdr when all bytes from the stream have successfully been converted; else None.
/// Returns true when all bytes from the stream have successfully been converted; else false.
/// This reads a number of bytes based on the expected message length.
///
/// # Arguments
Expand All @@ -166,12 +168,12 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result<Xdr, Err
/// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message
/// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message
/// * `xpect_msg_len` - the expected # of bytes of the Stellar message
async fn read_message(
async fn is_reading_complete(
connector: &mut Connector,
lack_bytes_from_prev: &mut usize,
readbuf: &mut Vec<u8>,
xpect_msg_len: usize,
) -> Result<Option<Xdr>, Error> {
) -> Result<bool, Error> {
let actual_msg_len = connector
.tcp_stream
.read(readbuf)
Expand All @@ -180,7 +182,7 @@ async fn read_message(

// only when the message has the exact expected size bytes, should we send to user.
if actual_msg_len == xpect_msg_len {
return Ok(Some(readbuf.clone()))
return Ok(true)
}

// The next bytes are remnants from the previous stellar message.
Expand All @@ -191,22 +193,22 @@ async fn read_message(
"read_message(): received only partial message. Need {lack_bytes_from_prev} bytes to complete."
);

Ok(None)
Ok(false)
}

/// Returns Xdr when all bytes from the stream have successfully been converted; else None.
/// Returns true when all bytes from the stream have successfully been converted; else false.
/// Reads a continuation of bytes that belong to the previous message
///
/// # Arguments
/// * `connector` - a ref struct that contains the config and necessary info for connecting to
/// Stellar Node
/// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message
/// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message
async fn read_unfinished_message(
async fn is_reading_unfinished_message_complete(
connector: &mut Connector,
lack_bytes_from_prev: &mut usize,
readbuf: &mut Vec<u8>,
) -> Result<Option<Xdr>, Error> {
) -> Result<bool, Error> {
// let's read the continuation number of bytes from the previous message.
let mut cont_buf = vec![0; *lack_bytes_from_prev];

Expand All @@ -221,7 +223,7 @@ async fn read_unfinished_message(
trace!("read_unfinished_message(): received continuation from the previous message.");
readbuf.append(&mut cont_buf);

return Ok(Some(readbuf.clone()))
return Ok(true)
}

// this partial message is not enough to complete the previous message.
Expand All @@ -234,5 +236,5 @@ async fn read_unfinished_message(
);
}

Ok(None)
Ok(false)
}
15 changes: 14 additions & 1 deletion clients/stellar-relay-lib/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ pub struct NodeInfo {
pub network_id: NetworkId,
}

impl NodeInfo {
pub(crate) fn new(cfg:&NodeInfoCfg) -> Self {
let network: &Network = if cfg.is_pub_net { &PUBLIC_NETWORK } else { &TEST_NETWORK };
NodeInfo {
ledger_version: cfg.ledger_version,
overlay_version: cfg.overlay_version,
overlay_min_version: cfg.overlay_min_version,
version_str: cfg.version_str.clone(),
network_id: *network.get_id(),
}
}
}

impl Debug for NodeInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NodeInfo")
Expand All @@ -42,4 +55,4 @@ impl From<NodeInfoCfg> for NodeInfo {
network_id: *network.get_id(),
}
}
}
}
6 changes: 3 additions & 3 deletions clients/stellar-relay-lib/src/node/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ impl Debug for RemoteInfo {
}

impl RemoteInfo {
pub fn new(hello: &Hello) -> Self {
pub fn new(hello: Hello) -> Self {
RemoteInfo {
sequence: 0,
pub_key_ecdh: hello.cert.pubkey.clone(),
pub_key: hello.peer_id.clone(),
pub_key_ecdh: hello.cert.pubkey,
pub_key: hello.peer_id,
nonce: hello.nonce,
node: NodeInfo {
ledger_version: hello.ledger_version,
Expand Down
4 changes: 2 additions & 2 deletions clients/stellar-relay-lib/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{

/// Used to send/receive messages to/from Stellar Node
pub struct StellarOverlayConnection {
sender: mpsc::Sender<StellarMessage>,
sender: Sender<StellarMessage>,
receiver: mpsc::Receiver<StellarMessage>,
}

Expand Down Expand Up @@ -47,7 +47,7 @@ impl StellarOverlayConnection {

#[cfg(tokio_unstable)]
tokio::task::Builder::new()
.name("poll_messages_from_stellar")
.name("poll stellar messages")
.spawn(poll_messages_from_stellar(
connector,
send_to_user_sender,
Expand Down
3 changes: 3 additions & 0 deletions clients/vault/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub enum Error {

#[error("StdIoError: {0}")]
StdIoError(#[from] std::io::Error),

#[error("Other error: {0}")]
Other(String)
}

impl From<Error> for service::Error<Error> {
Expand Down
23 changes: 13 additions & 10 deletions clients/vault/src/issue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use wallet::{
types::FilterWith, LedgerTxEnvMap, Slot, SlotTask, SlotTaskStatus, TransactionResponse,
};

use crate::{oracle::OracleAgent, ArcRwLock, Error, Event};
use crate::{oracle::OracleAgent, ArcRwLock, Error, Event, tokio_spawn};

fn is_vault(p1: &PublicKey, p2_raw: [u8; 32]) -> bool {
return *p1.as_binary() == p2_raw
Expand Down Expand Up @@ -272,15 +272,18 @@ pub async fn process_issues_requests(
continue
};

tokio::spawn(execute_issue(
parachain_rpc.clone(),
tx_env.clone(),
issues.clone(),
memos_to_issue_ids.clone(),
oracle_agent.clone(),
*slot,
sender,
));
tokio_spawn(
"execute_issue",
execute_issue(
parachain_rpc.clone(),
tx_env.clone(),
issues.clone(),
memos_to_issue_ids.clone(),
oracle_agent.clone(),
*slot,
sender,
)
);
}

// Give 5 seconds interval before starting again.
Expand Down
14 changes: 14 additions & 0 deletions clients/vault/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,17 @@ cfg_if::cfg_if! {
pub type DecimalsLookupImpl = primitives::AmplitudeDecimalsLookup;
}
}

pub fn tokio_spawn<F>(task_name: &str, future: F) -> tokio::task::JoinHandle<F::Output>
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
cfg_if::cfg_if!{
if #[cfg(all(tokio_unstable, feature = "allow-debugger"))] {
tokio::task::Builder::new().name(task_name).spawn(future).unwrap()
} else {
tokio::spawn(future)
}
}
}
15 changes: 7 additions & 8 deletions clients/vault/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ use service::{
};
use signal_hook::consts::*;
use signal_hook_tokio::Signals;
use vault::{
metrics::{self, increment_restart_counter},
process::PidFile,
Error, VaultService, VaultServiceConfig, ABOUT, AUTHORS, NAME, VERSION,
};
use vault::{metrics::{self, increment_restart_counter}, process::PidFile, Error, VaultService, VaultServiceConfig, ABOUT, AUTHORS, NAME, VERSION, tokio_spawn};

#[derive(Parser)]
#[clap(args_conflicts_with_subcommands = true)]
Expand Down Expand Up @@ -69,7 +65,7 @@ async fn catch_signals<F>(
where
F: Future<Output = Result<(), ServiceError<Error>>> + Send + 'static,
{
let blocking_task = tokio::task::spawn(future);
let blocking_task = tokio_spawn("blocking task", future);
tokio::select! {
res = blocking_task => {
return res?;
Expand Down Expand Up @@ -116,7 +112,7 @@ async fn start() -> Result<(), ServiceError<Error>> {
);
let prometheus_port = opts.monitoring.prometheus_port;

tokio::task::spawn(async move {
tokio_spawn("Prometheus", async move {
warp::serve(metrics_route)
.run(SocketAddr::new(prometheus_host.into(), prometheus_port))
.await;
Expand Down Expand Up @@ -164,6 +160,7 @@ mod tests {
use std::{thread, time::Duration};

use runtime::AccountId;
use vault::tokio_spawn;

use super::*;

Expand All @@ -172,7 +169,9 @@ mod tests {
let termination_signals = &[SIGHUP, SIGTERM, SIGINT, SIGQUIT];
for sig in termination_signals {
let task =
tokio::spawn(catch_signals(Signals::new(termination_signals).unwrap(), async {
tokio_spawn(
"catch signals",
catch_signals(Signals::new(termination_signals).unwrap(), async {
tokio::time::sleep(Duration::from_millis(100_000)).await;
Ok(())
}));
Expand Down
Loading

0 comments on commit 8c00f3a

Please sign in to comment.