diff --git a/src/services/indexer.rs b/src/services/indexer.rs index 68a97ed..7c36996 100644 --- a/src/services/indexer.rs +++ b/src/services/indexer.rs @@ -8,7 +8,7 @@ use apibara_sdk::{configuration, ClientBuilder, Configuration, Uri}; use dashmap::DashSet; use futures_util::TryStreamExt; use starknet::core::types::Felt; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinSet; use crate::cli::NetworkName; @@ -27,7 +27,7 @@ pub struct IndexerService { uri: Uri, apibara_api_key: String, stream_config: Configuration, - positions_sender: Sender<(u64, Position)>, + positions_sender: UnboundedSender<(u64, Position)>, seen_positions: DashSet, } @@ -36,7 +36,7 @@ impl Service for IndexerService { async fn start(&mut self, join_set: &mut JoinSet>) -> anyhow::Result<()> { let service = self.clone(); join_set.spawn(async move { - tracing::info!("🧩 Indexer service started"); + tracing::info!("🔍 Indexer service started"); service.run_forever().await?; Ok(()) }); @@ -48,7 +48,7 @@ impl IndexerService { pub fn new( config: Config, apibara_api_key: String, - positions_sender: Sender<(u64, Position)>, + positions_sender: UnboundedSender<(u64, Position)>, from_block: u64, ) -> IndexerService { let uri = match config.network { @@ -167,7 +167,7 @@ impl IndexerService { block_number ); } - match self.positions_sender.try_send((block_number, new_position)) { + match self.positions_sender.send((block_number, new_position)) { Ok(_) => {} Err(e) => panic!("[🔍 Indexer] 😱 Could not send position: {}", e), } diff --git a/src/services/mod.rs b/src/services/mod.rs index bf2aa67..3f05d06 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -6,7 +6,7 @@ use std::{cmp, sync::Arc}; use anyhow::Result; use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient}; -use tokio::sync::mpsc; +use tokio::sync::mpsc::unbounded_channel; use oracle::{LatestOraclePrices, OracleService}; @@ -29,7 +29,7 @@ pub async fn start_all_services( account: StarknetAccount, run_cmd: RunCmd, ) -> Result<()> { - let (positions_sender, position_receiver) = mpsc::channel::<(u64, Position)>(1024); + let (positions_sender, position_receiver) = unbounded_channel::<(u64, Position)>(); // TODO: Add new methods of storage (s3, postgres, sqlite) and be able to define them in CLI let mut storage = JsonStorage::new( diff --git a/src/services/monitoring.rs b/src/services/monitoring.rs index e33cd09..33088ab 100644 --- a/src/services/monitoring.rs +++ b/src/services/monitoring.rs @@ -6,8 +6,11 @@ use starknet::{ core::types::{Call, Felt}, providers::{jsonrpc::HttpTransport, JsonRpcClient}, }; -use tokio::time::{interval, sleep}; -use tokio::{sync::mpsc::Receiver, task::JoinSet}; +use tokio::task::JoinSet; +use tokio::{ + sync::mpsc::UnboundedReceiver, + time::{interval, sleep}, +}; use crate::{ config::Config, @@ -27,7 +30,7 @@ pub struct MonitoringService { config: Config, rpc_client: Arc>, account: Arc, - positions_receiver: Arc>>, + positions_receiver: Arc>>, positions: PositionsMap, latest_oracle_prices: LatestOraclePrices, storage: Arc>>, @@ -41,7 +44,7 @@ impl Service for MonitoringService { // + indexed a few positions. sleep(Duration::from_secs(5)).await; join_set.spawn(async move { - tracing::info!("🧩 Indexer service started"); + tracing::info!("🔭 Monitoring service started"); service.run_forever().await?; Ok(()) }); @@ -54,7 +57,7 @@ impl MonitoringService { config: Config, rpc_client: Arc>, account: StarknetAccount, - positions_receiver: Receiver<(u64, Position)>, + positions_receiver: UnboundedReceiver<(u64, Position)>, latest_oracle_prices: LatestOraclePrices, storage: Box, ) -> MonitoringService { @@ -121,7 +124,14 @@ impl MonitoringService { "[🔭 Monitoring] Liquidatable position found #{}!", position.key() ); - self.liquidate_position(position).await?; + tracing::info!("[🔭 Monitoring] 🔫 Liquidating position..."); + if let Err(e) = self.liquidate_position(position).await { + tracing::error!( + "[🔭 Monitoring] 😨 Could not liquidate position #{}: {}", + position.key(), + e + ); + } position .update(&self.rpc_client, &self.config.singleton_address) .await?; @@ -137,12 +147,11 @@ impl MonitoringService { /// liquidate it. async fn liquidate_position(&self, position: &Position) -> Result<()> { let started_at = std::time::Instant::now(); - tracing::info!("[🔭 Monitoring] 🔫 Liquidating position..."); let tx = self.get_liquidation_tx(position).await?; let tx_hash = self.account.execute_txs(&[tx]).await?; self.wait_for_tx_to_be_accepted(&tx_hash).await?; tracing::info!( - "[🔭 Monitoring] ✅ Liquidated position #{}! (TX #{}) - ⌛ {:?}", + "[🔭 Monitoring] ✅ Liquidated position #{}! (tx {}) - ⌛ {:?}", position.key(), tx_hash.to_hex_string(), started_at.elapsed() diff --git a/src/services/oracle.rs b/src/services/oracle.rs index ac3a2ad..13b0d82 100644 --- a/src/services/oracle.rs +++ b/src/services/oracle.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::time::Duration; -use anyhow::{anyhow, Result}; +use anyhow::{bail, Result}; use bigdecimal::BigDecimal; use dashmap::DashMap; use futures_util::future::join_all; @@ -29,7 +29,7 @@ impl Service for OracleService { async fn start(&mut self, join_set: &mut JoinSet>) -> anyhow::Result<()> { let service = self.clone(); join_set.spawn(async move { - tracing::info!("🧩 Indexer service started"); + tracing::info!("🔮 Oracle service started"); service.run_forever().await?; Ok(()) }); @@ -152,10 +152,7 @@ impl PragmaOracle { let response_status = response.status(); let response_text = response.text().await?; if response_status != StatusCode::OK { - tracing::error!("⛔ Oracle Request failed with: {:?}", response_text); - return Err(anyhow!( - "Oracle request failed with status {response_status}" - )); + bail!("Oracle request failed with status {response_status}"); } let oracle_response: OracleApiResponse = serde_json::from_str(&response_text)?; let asset_price = hex_str_to_big_decimal(&oracle_response.price, oracle_response.decimals);