Skip to content

Commit

Permalink
add a precheck signal
Browse files Browse the repository at this point in the history
  • Loading branch information
b-yap committed Aug 12, 2024
1 parent 8c00f3a commit 1193f75
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 72 deletions.
14 changes: 13 additions & 1 deletion clients/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,22 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
}
}

pub async fn wait_or_shutdown<F, E>(shutdown_tx: ShutdownSender, future2: F) -> Result<(), E>
pub async fn wait_or_shutdown<F, E>(
shutdown_tx: ShutdownSender,
future2: F,
// a consumer that receives a precheck signal to start a task.
precheck_signal: Option<tokio::sync::broadcast::Receiver<()>>
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
{
if let Some(mut precheck_signal) = precheck_signal {
if let Err(e) = precheck_signal.recv().await {
tracing::error!("Error receiving precheck signal: {:?}", e);
return Ok(())
}
}

match run_cancelable(shutdown_tx.subscribe(), future2).await {
TerminationStatus::Cancelled => {
tracing::trace!("Received shutdown signal");
Expand Down
40 changes: 27 additions & 13 deletions clients/vault/src/oracle/agent.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{sync::Arc, time::Duration};
use std::future::Future;

use tokio::{
sync::{mpsc, RwLock},
Expand Down Expand Up @@ -84,12 +85,15 @@ async fn handle_message(
Ok(())
}

pub async fn listen_for_stellar_messages(
pub async fn listen_for_stellar_messages<F>(
config: StellarOverlayConfig,
collector: Arc<RwLock<ScpMessageCollector>>,
secret_key_as_string: String,
shutdown_sender: ShutdownSender,
) -> Result<(),service::Error<crate::Error>> {
// executes a task that determines whether other tasks should run or not
pre_task_execution: Option<(F, tokio::sync::broadcast::Sender<()>)>
) -> Result<(),service::Error<crate::Error>>
where F: Future<Output = Result<(), service::Error<crate::Error>>> + Send + 'static {
tracing::info!("listen_for_stellar_messages(): Starting connection to Stellar overlay network...");

let mut overlay_conn = connect_to_stellar_overlay_network(config.clone(), secret_key_as_string)
Expand All @@ -98,16 +102,37 @@ pub async fn listen_for_stellar_messages(
service::Error::StartOracleAgentError
})?;

if let Some((pre_task_execution, signal_sender)) = pre_task_execution {
pre_task_execution.await?;

if let Err(e) = signal_sender.send(()) {
tracing::error!("listen_for_stellar_messages(): Failed to send signal: {e:?}");
}
}

// use StellarOverlayConnection's sender to send message to Stellar
let sender = overlay_conn.sender();

// occasionally log a new message received.
let mut log_counter:u8 = 0;
loop {
if log_counter == u8::MAX {
log_counter = 0;
} else {
log_counter += 1;
}

tokio::select! {
_ = sleep(Duration::from_millis(100)) => {},
result = overlay_conn.listen() => match result {
Ok(None) => {},
Ok(Some(msg)) => {
let msg_as_str = to_base64_xdr_string(&msg);

if log_counter % 100 == 0 {
tracing::info!("listen_for_stellar_messages(): received message: {msg_as_str}");
}

if let Err(e) = handle_message(msg, collector.clone(), &sender).await {
tracing::error!("listen_for_stellar_messages(): failed to handle message: {msg_as_str}: {e:?}");
}
Expand Down Expand Up @@ -154,24 +179,13 @@ pub async fn start_oracle_agent(
let collector_clone = collector.clone();

let shutdown_sender_clone = shutdown_sender.clone();
// disconnect signal sender tells the StellarOverlayConnection to close its TcpStream to Stellar
// Node
let (disconnect_signal_sender, mut disconnect_signal_receiver) = mpsc::channel::<()>(2);

let sender_clone = overlay_conn.sender();
tokio_spawn(
"overlay connection started",
async move {
loop {
tokio::select! {
_ = sleep(Duration::from_millis(100)) => {},
// if a disconnect signal was sent, disconnect from Stellar.
result = disconnect_signal_receiver.recv() => {
if result.is_none() {
tracing::info!("start_oracle_agent(): disconnect overlay...");
break
}
},
result = overlay_conn.listen() => match result {
Ok(Some(msg)) => {
let msg_as_str = to_base64_xdr_string(&msg);
Expand Down
Loading

0 comments on commit 1193f75

Please sign in to comment.