Skip to content

Commit

Permalink
use recv() instead of try_recv() (#563)
Browse files Browse the repository at this point in the history
* use `recv()`

* fmt and clippy
  • Loading branch information
b-yap authored Oct 23, 2024
1 parent 8fdb462 commit 313c91b
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 21 deletions.
20 changes: 4 additions & 16 deletions clients/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use async_trait::async_trait;
use futures::{future::Either, Future, FutureExt};
use governor::{Quota, RateLimiter};
use nonzero_ext::*;
use tokio::{
sync::{broadcast::error::TryRecvError, RwLock},
time::sleep,
};
use tokio::{sync::RwLock, time::sleep};
pub use warp;

pub use cli::{LoggingFormat, MonitoringConfig, RestartPolicy, ServiceConfig};
Expand Down Expand Up @@ -183,18 +180,9 @@ where
F: Future<Output = Result<(), E>>,
{
if let Some(mut precheck_signal) = precheck_signal {
loop {
match precheck_signal.try_recv() {
// Received a signal to start the task
Ok(_) => break,
Err(TryRecvError::Empty) =>
tracing::trace!("wait_or_shutdown precheck signal: waiting..."),
// Precheck signal failed. Cannot start the task.
Err(e) => {
tracing::error!("Error receiving precheck signal: {:?}", e);
return Ok(());
},
}
if let Err(e) = precheck_signal.recv().await {
tracing::error!("Error receiving precheck signal: {:?}", e);
return Ok(());
}
}

Expand Down
1 change: 1 addition & 0 deletions clients/vault/src/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl<P: IssuePallet + ReplacePallet + UtilFuncs + SecurityPallet + Clone> Cancel
mut self,
mut event_listener: Receiver<Event>,
) -> Result<(), RuntimeError> {
tracing::info!("handle_cancellation(): started");
let mut list_state = ListState::Invalid;
let mut active_requests: Vec<ActiveRequest> = vec![];

Expand Down
2 changes: 2 additions & 0 deletions clients/vault/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ pub async fn monitor_bridge_metrics(
parachain_rpc: SpacewalkParachain,
vault_id_manager: VaultIdManager,
) -> Result<(), ServiceError<Error>> {
tracing::info!("monitor_bridge_metrics(): started");
let parachain_rpc = &parachain_rpc;
let vault_id_manager = &vault_id_manager;
parachain_rpc
Expand Down Expand Up @@ -540,6 +541,7 @@ pub async fn poll_metrics<
parachain_rpc: P,
vault_id_manager: VaultIdManager,
) -> Result<(), ServiceError<Error>> {
tracing::info!("poll_metrics(): started");
let parachain_rpc = &parachain_rpc;
let vault_id_manager = &vault_id_manager;

Expand Down
6 changes: 2 additions & 4 deletions clients/vault/src/oracle/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
use wallet::Slot;

/// The interval to check if we are still receiving messages from Stellar Relay
const STELLAR_RELAY_HEALTH_CHECK_IN_SECS: u64 = 600;
const STELLAR_RELAY_HEALTH_CHECK_IN_SECS: u64 = 300;

pub struct OracleAgent {
pub collector: ArcRwLock<ScpMessageCollector>,
Expand Down Expand Up @@ -140,9 +140,7 @@ pub async fn listen_for_stellar_messages(
oracle_agent: Arc<OracleAgent>,
shutdown_sender: ShutdownSender,
) -> Result<(), service::Error<crate::Error>> {
tracing::info!(
"listen_for_stellar_messages(): Starting connection to Stellar overlay network..."
);
tracing::info!("listen_for_stellar_messages(): started");

let mut overlay_conn = oracle_agent.overlay_conn.write().await;

Expand Down
1 change: 1 addition & 0 deletions clients/vault/src/redeem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub async fn listen_for_redeem_requests(
payment_margin: Duration,
oracle_agent: Arc<OracleAgent>,
) -> Result<(), ServiceError<Error>> {
tracing::info!("listen_for_redeem_requests(): started");
parachain_rpc
.on_event::<RequestRedeemEvent, _, _, _>(
|event| async {
Expand Down
4 changes: 3 additions & 1 deletion clients/vault/src/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub async fn listen_for_accept_replace(
payment_margin: Duration,
oracle_agent: Arc<OracleAgent>,
) -> Result<(), ServiceError<Error>> {
tracing::info!("listen_for_accept_replace(): started");
let parachain_rpc = &parachain_rpc;
let vault_id_manager = &vault_id_manager;
let shutdown_tx = &shutdown_tx;
Expand Down Expand Up @@ -96,7 +97,7 @@ pub async fn listen_for_replace_requests(
event_channel: Sender<Event>,
accept_replace_requests: bool,
) -> Result<(), ServiceError<Error>> {
tracing::debug!("listen_for_replace_requests(): started");
tracing::info!("listen_for_replace_requests(): started");

let parachain_rpc = &parachain_rpc;
let vault_id_manager = &vault_id_manager;
Expand Down Expand Up @@ -206,6 +207,7 @@ pub async fn listen_for_execute_replace(
parachain_rpc: SpacewalkParachain,
event_channel: Sender<Event>,
) -> Result<(), ServiceError<Error>> {
tracing::info!("listen_for_execute_replace(): started");
let event_channel = &event_channel;
let parachain_rpc = &parachain_rpc;
parachain_rpc
Expand Down
1 change: 1 addition & 0 deletions clients/vault/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl VaultIdManager {
}

pub async fn listen_for_vault_id_registrations(self) -> Result<(), ServiceError<Error>> {
tracing::info!("listen_for_vault_id_registrations(): started");
Ok(self
.spacewalk_parachain
.on_event::<RegisterVaultEvent, _, _, _>(
Expand Down
1 change: 1 addition & 0 deletions clients/wallet/src/horizon/horizon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ where
U: Clone + IsEmptyExt,
Filter: FilterWith<T, U> + Clone,
{
tracing::info!("listen_for_new_transactions(): started");
let horizon_client = reqwest::Client::new();
let mut fetcher =
HorizonFetcher::new(horizon_client, vault_account_public_key, is_public_network);
Expand Down

0 comments on commit 313c91b

Please sign in to comment.