Skip to content

Commit

Permalink
make listen() async; use tokio::select
Browse files Browse the repository at this point in the history
  • Loading branch information
b-yap committed Jul 30, 2024
1 parent e51eb0c commit 4e06886
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 60 deletions.
2 changes: 1 addition & 1 deletion clients/stellar-relay-lib/examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut overlay_connection = connect_to_stellar_overlay_network(cfg, &secret_key).await?;

while let Ok(Some(msg)) = overlay_connection.listen() {
while let Ok(Some(msg)) = overlay_connection.listen().await {
match msg {
StellarMessage::ScpMessage(msg) => {
let node_id = msg.statement.node_id.to_encoding();
Expand Down
38 changes: 14 additions & 24 deletions clients/stellar-relay-lib/src/overlay.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use substrate_stellar_sdk::types::{ErrorCode, StellarMessage};
use std::time::Duration;
use async_std::future::timeout;
use substrate_stellar_sdk::types::{StellarMessage};
use tokio::sync::{
mpsc,
mpsc::{
error::{SendError, TryRecvError},
error::{SendError},
Sender,
},
};
use tracing::{debug, error, info};
use tracing::{debug, info};

use crate::{
connection::{poll_messages_from_stellar, ConnectionInfo, Connector},
helper::error_to_string,
node::NodeInfo,
Error,
};
Expand Down Expand Up @@ -56,27 +57,16 @@ impl StellarOverlayConnection {
})
}

pub fn listen(&mut self) -> Result<Option<StellarMessage>, Error> {
loop {
if !self.is_alive() {
debug!("listen(): sender half of overlay has closed.");
return Err(Error::Disconnected)
}

match self.receiver.try_recv() {
Ok(StellarMessage::ErrorMsg(e)) => {
error!("listen(): received error message: {e:?}");
if e.code == ErrorCode::ErrConf || e.code == ErrorCode::ErrAuth {
return Err(Error::ConnectionFailed(error_to_string(e)))
}

return Ok(None)
},
Ok(msg) => return Ok(Some(msg)),
Err(TryRecvError::Disconnected) => return Err(Error::Disconnected),
Err(TryRecvError::Empty) => continue,
}
/// Listens for upcoming messages from Stellar Node via a receiver.
/// The sender pair can be found in [fn poll_messages_from_stellar](../src/connection/connector/message_reader.rs)
pub async fn listen(&mut self) -> Result<Option<StellarMessage>, Error> {
if !self.is_alive() {
debug!("listen(): sender half of overlay has closed.");
return Err(Error::Disconnected)
}

timeout(Duration::from_secs(1), self.receiver.recv()).await
.map_err(|_| Error::Timeout)
}

pub fn is_alive(&mut self) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions clients/stellar-relay-lib/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn stellar_overlay_should_receive_scp_messages() {

timeout(Duration::from_secs(300), async move {
let mut ov_conn_locked = ov_conn.lock().await;
if let Ok(Some(msg)) = ov_conn_locked.listen() {
if let Ok(Some(msg)) = ov_conn_locked.listen().await {
scps_vec_clone.lock().await.push(msg);

ov_conn_locked.stop();
Expand Down Expand Up @@ -89,7 +89,7 @@ async fn stellar_overlay_should_receive_tx_set() {
timeout(Duration::from_secs(500), async move {
let mut ov_conn_locked = ov_conn.lock().await;

while let Ok(Some(msg)) = ov_conn_locked.listen() {
while let Ok(Some(msg)) = ov_conn_locked.listen().await {
match msg {
StellarMessage::ScpMessage(msg) =>
if let ScpStatementPledges::ScpStExternalize(stmt) = &msg.statement.pledges {
Expand Down
62 changes: 30 additions & 32 deletions clients/vault/src/oracle/agent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

use tokio::{
sync::{mpsc, mpsc::error::TryRecvError, RwLock},
sync::{mpsc, RwLock},
time::{sleep, timeout},
};

Expand Down Expand Up @@ -84,42 +84,40 @@ pub async fn start_oracle_agent(
// Node
let (disconnect_signal_sender, mut disconnect_signal_receiver) = mpsc::channel::<()>(2);

let sender_clone = overlay_conn.sender();
tokio::spawn(async move {
let sender_clone = overlay_conn.sender();
loop {
match disconnect_signal_receiver.try_recv() {
// if a disconnect signal was sent, disconnect from Stellar.
Ok(_) | Err(TryRecvError::Disconnected) => {
tracing::info!("start_oracle_agent(): disconnect overlay...");
break
},
Err(TryRecvError::Empty) => {},
}

// listen for messages from Stellar
match overlay_conn.listen() {
Ok(Some(msg)) => {
let msg_as_str = to_base64_xdr_string(&msg);
if let Err(e) =
handle_message(msg, collector_clone.clone(), &sender_clone).await
{
tracing::error!(
"start_oracle_agent(): failed to handle message: {msg_as_str}: {e:?}"
);
}
tokio::select! {
_ = sleep(Duration::from_millis(100)) => {
tracing::info!("start_oracle_agent(): go to sleep");
},
Ok(None) => {},
// connection got lost
Err(e) => {
tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}");

if let Err(e) = shutdown_sender_clone.send(()) {
tracing::error!(
"start_oracle_agent(): Failed to send shutdown signal in thread: {e:?}"
);
// if a disconnect signal was sent, disconnect from Stellar.
result = disconnect_signal_receiver.recv() => {
if result.is_none() {
tracing::info!("start_oracle_agent(): disconnect overlay...");
break
}
break
},
result = overlay_conn.listen() => {
tracing::info!("start_oracle_agent(): received message from overlay");
match result {
Ok(Some(msg)) => {
let msg_as_str = to_base64_xdr_string(&msg);
if let Err(e) = handle_message(msg, collector_clone.clone(), &sender_clone).await {
tracing::error!("start_oracle_agent(): failed to handle message: {msg_as_str}: {e:?}");
}
},
Ok(None) => {},
// connection got lost
Err(e) => {
tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}");

if let Err(e) = shutdown_sender_clone.send(()) {
tracing::error!("start_oracle_agent(): Failed to send shutdown signal in thread: {e:?}");
}
break
},
}},
}
}

Expand Down
2 changes: 1 addition & 1 deletion clients/vault/src/oracle/collector/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ScpMessageCollector {
// Check if collector has a record of this hash.
if self.is_txset_new(&txset_hash, &slot) {
// if it doesn't exist, let's request from the Stellar Node.
tracing::info!(
tracing::debug!(
"Handling Incoming ScpEnvelopes for slot {slot}: requesting TxSet..."
);
message_sender.send(StellarMessage::GetTxSet(txset_hash)).await?;
Expand Down
4 changes: 4 additions & 0 deletions clients/vault/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ impl VaultService {

self.execute_open_requests(oracle_agent.clone());

tracing::info!("CONTINUE ON HOOY");

// issue handling
// this vec is passed to the stellar wallet to filter out transactions that are not relevant
// this has to be modified every time the issue set changes
Expand All @@ -806,6 +808,8 @@ impl VaultService {
issue::initialize_issue_set(&self.spacewalk_parachain, &issue_map, &memos_to_issue_ids)
.await?;

tracing::info!("ISSUE INITIALIZE ISSUE SET!!!");

let ledger_env_map: ArcRwLock<LedgerTxEnvMap> = Arc::new(RwLock::new(HashMap::new()));

tracing::info!("Starting all services...");
Expand Down

0 comments on commit 4e06886

Please sign in to comment.