Skip to content

Commit

Permalink
QoL fixes (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha authored Nov 12, 2024
1 parent 19be739 commit 2aeb94f
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 21 deletions.
10 changes: 5 additions & 5 deletions src/services/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use apibara_sdk::{configuration, ClientBuilder, Configuration, Uri};
use dashmap::DashSet;
use futures_util::TryStreamExt;
use starknet::core::types::Felt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinSet;

use crate::cli::NetworkName;
Expand All @@ -27,7 +27,7 @@ pub struct IndexerService {
uri: Uri,
apibara_api_key: String,
stream_config: Configuration<Filter>,
positions_sender: Sender<(u64, Position)>,
positions_sender: UnboundedSender<(u64, Position)>,
seen_positions: DashSet<u64>,
}

Expand All @@ -36,7 +36,7 @@ impl Service for IndexerService {
async fn start(&mut self, join_set: &mut JoinSet<anyhow::Result<()>>) -> anyhow::Result<()> {
let service = self.clone();
join_set.spawn(async move {
tracing::info!("🧩 Indexer service started");
tracing::info!("🔍 Indexer service started");
service.run_forever().await?;
Ok(())
});
Expand All @@ -48,7 +48,7 @@ impl IndexerService {
pub fn new(
config: Config,
apibara_api_key: String,
positions_sender: Sender<(u64, Position)>,
positions_sender: UnboundedSender<(u64, Position)>,
from_block: u64,
) -> IndexerService {
let uri = match config.network {
Expand Down Expand Up @@ -167,7 +167,7 @@ impl IndexerService {
block_number
);
}
match self.positions_sender.try_send((block_number, new_position)) {
match self.positions_sender.send((block_number, new_position)) {
Ok(_) => {}
Err(e) => panic!("[🔍 Indexer] 😱 Could not send position: {}", e),
}
Expand Down
4 changes: 2 additions & 2 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{cmp, sync::Arc};

use anyhow::Result;
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient};
use tokio::sync::mpsc;
use tokio::sync::mpsc::unbounded_channel;

use oracle::{LatestOraclePrices, OracleService};

Expand All @@ -29,7 +29,7 @@ pub async fn start_all_services(
account: StarknetAccount,
run_cmd: RunCmd,
) -> Result<()> {
let (positions_sender, position_receiver) = mpsc::channel::<(u64, Position)>(1024);
let (positions_sender, position_receiver) = unbounded_channel::<(u64, Position)>();

// TODO: Add new methods of storage (s3, postgres, sqlite) and be able to define them in CLI
let mut storage = JsonStorage::new(
Expand Down
25 changes: 17 additions & 8 deletions src/services/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use starknet::{
core::types::{Call, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient},
};
use tokio::time::{interval, sleep};
use tokio::{sync::mpsc::Receiver, task::JoinSet};
use tokio::task::JoinSet;
use tokio::{
sync::mpsc::UnboundedReceiver,
time::{interval, sleep},
};

use crate::{
config::Config,
Expand All @@ -27,7 +30,7 @@ pub struct MonitoringService {
config: Config,
rpc_client: Arc<JsonRpcClient<HttpTransport>>,
account: Arc<StarknetAccount>,
positions_receiver: Arc<Mutex<Receiver<(u64, Position)>>>,
positions_receiver: Arc<Mutex<UnboundedReceiver<(u64, Position)>>>,
positions: PositionsMap,
latest_oracle_prices: LatestOraclePrices,
storage: Arc<Mutex<Box<dyn Storage>>>,
Expand All @@ -41,7 +44,7 @@ impl Service for MonitoringService {
// + indexed a few positions.
sleep(Duration::from_secs(5)).await;
join_set.spawn(async move {
tracing::info!("🧩 Indexer service started");
tracing::info!("🔭 Monitoring service started");
service.run_forever().await?;
Ok(())
});
Expand All @@ -54,7 +57,7 @@ impl MonitoringService {
config: Config,
rpc_client: Arc<JsonRpcClient<HttpTransport>>,
account: StarknetAccount,
positions_receiver: Receiver<(u64, Position)>,
positions_receiver: UnboundedReceiver<(u64, Position)>,
latest_oracle_prices: LatestOraclePrices,
storage: Box<dyn Storage>,
) -> MonitoringService {
Expand Down Expand Up @@ -121,7 +124,14 @@ impl MonitoringService {
"[🔭 Monitoring] Liquidatable position found #{}!",
position.key()
);
self.liquidate_position(position).await?;
tracing::info!("[🔭 Monitoring] 🔫 Liquidating position...");
if let Err(e) = self.liquidate_position(position).await {
tracing::error!(
"[🔭 Monitoring] 😨 Could not liquidate position #{}: {}",
position.key(),
e
);
}
position
.update(&self.rpc_client, &self.config.singleton_address)
.await?;
Expand All @@ -137,12 +147,11 @@ impl MonitoringService {
/// liquidate it.
async fn liquidate_position(&self, position: &Position) -> Result<()> {
let started_at = std::time::Instant::now();
tracing::info!("[🔭 Monitoring] 🔫 Liquidating position...");
let tx = self.get_liquidation_tx(position).await?;
let tx_hash = self.account.execute_txs(&[tx]).await?;
self.wait_for_tx_to_be_accepted(&tx_hash).await?;
tracing::info!(
"[🔭 Monitoring] ✅ Liquidated position #{}! (TX #{}) - ⌛ {:?}",
"[🔭 Monitoring] ✅ Liquidated position #{}! (tx {}) - ⌛ {:?}",
position.key(),
tx_hash.to_hex_string(),
started_at.elapsed()
Expand Down
9 changes: 3 additions & 6 deletions src/services/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Result};
use anyhow::{bail, Result};
use bigdecimal::BigDecimal;
use dashmap::DashMap;
use futures_util::future::join_all;
Expand Down Expand Up @@ -29,7 +29,7 @@ impl Service for OracleService {
async fn start(&mut self, join_set: &mut JoinSet<anyhow::Result<()>>) -> anyhow::Result<()> {
let service = self.clone();
join_set.spawn(async move {
tracing::info!("🧩 Indexer service started");
tracing::info!("🔮 Oracle service started");
service.run_forever().await?;
Ok(())
});
Expand Down Expand Up @@ -152,10 +152,7 @@ impl PragmaOracle {
let response_status = response.status();
let response_text = response.text().await?;
if response_status != StatusCode::OK {
tracing::error!("⛔ Oracle Request failed with: {:?}", response_text);
return Err(anyhow!(
"Oracle request failed with status {response_status}"
));
bail!("Oracle request failed with status {response_status}");
}
let oracle_response: OracleApiResponse = serde_json::from_str(&response_text)?;
let asset_price = hex_str_to_big_decimal(&oracle_response.price, oracle_response.decimals);
Expand Down

0 comments on commit 2aeb94f

Please sign in to comment.