Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: error handling for event feeder #133

Open
wants to merge 11 commits into
base: aurras-runtime-lite-dev
Choose a base branch
from
3 changes: 2 additions & 1 deletion event_feed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ tendermint-rpc = { version = "0.35.0", features = [
] }
futures = "0.3.30"
base64 = "0.22.0"
clap ={ version = "4.5.4", features = [ "derive", "env" ]}
clap = { version = "4.5.4", features = ["derive", "env"] }
thiserror = "1.0.59"

[dependencies.ethers]
version = '2.0.8'
Expand Down
5 changes: 2 additions & 3 deletions event_feed/src/common/context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::CosmosFeed;
use super::EthFeed;
// use crate::error::IOError;
use crate::IconFeed;
use crate::PolkadotFeed;
use anyhow::Result;
use subxt::PolkadotConfig;

use anyhow::Result;
/// Represents the main context of the event feed. This is used to configure the event feed with the
/// appropriate chain.
pub enum Context {
Expand All @@ -15,7 +15,6 @@ pub enum Context {
}

impl Context {

/// Starts the event feed.
pub async fn feed_events(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()> {
match self {
Expand Down
4 changes: 3 additions & 1 deletion event_feed/src/common/kuska_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::error::IOError;
use anyhow::anyhow;
use anyhow::Result;
use runtime::api::dto::content::Mention;
Expand Down Expand Up @@ -61,7 +62,8 @@ impl Producer {
.publish(&message.to_string(), Some(vec![mention]))
.await;

result.unwrap();
// result.map_err(|e| IOError::Anyhow(e))?;
result.map_err(|e| IOError::Anyhow(anyhow!("{e}")))?;

Ok(())
}
Expand Down
82 changes: 57 additions & 25 deletions event_feed/src/cosmos/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::*;
use crate::common;
use crate::error::IOError;
use anyhow::*;
use futures::StreamExt;
use core::result::Result::Ok;
use runtime::{logger::CoreLogger, Logger};
use serde_json::Value;
use tendermint_rpc::event::{Event, EventData};
Expand Down Expand Up @@ -41,36 +42,57 @@ impl CosmosFeed {
pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()> {
let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url)
.await
.unwrap();
.map_err(|err| IOError::Other(format!("Client not created {}", err)))?;
self.logger.info(&format!(
"Following the chain at {}",
self.chain_config.node_url
));
let driver_handle = tokio::spawn(async move { driver.run().await });

let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
let mut subs = client
.subscribe(EventType::NewBlock.into())
.await
.map_err(|err| IOError::Other(format!("Error in subscribing the client {}", err)))?;
let mut events: Vec<Event> = Vec::new();

while let Some(res) = subs.next().await {
let ev = res.unwrap();
events.push(ev.clone());

let filter_events = events
.iter()
.filter(|tendermint_event| {
Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config)
.is_some()
})
.flat_map(|tendermint_event| {
Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config)
.unwrap()
})
.collect::<Vec<serde_json::Value>>();

// Call the callback with all or filtered events
cb(filter_events);

events.clear();
match res {
Ok(ev) => {
events.push(ev.clone());
let filter_events = events
.iter()
.filter(|tendermint_event| {
Self::convert_to_feeder_event(
self,
tendermint_event,
&self.chain_config,
)
.is_some()
})
.flat_map(|tendermint_event| {
match Self::convert_to_feeder_event(
self,
tendermint_event,
&self.chain_config,
) {
Some(value) => value,
None => {
let _ = Err::<IOError, _>(IOError::Other(
"Error converting to FeederEvent".to_string(),
));
vec![serde_json::Value::Null]
}
}
})
.collect::<Vec<serde_json::Value>>();
cb(filter_events);
events.clear();
}
Err(err) => {
// Consider retry logic or other actions here
return Err(IOError::Other(format!("Error receving event {} ", err)).into());
}
}
}

drop(subs);
Expand Down Expand Up @@ -102,7 +124,7 @@ impl CosmosFeed {
result_begin_block,
result_end_block: _,
} => {
let block = block.as_ref().unwrap();
let block = block.as_ref()?;
let block_number = block.header.version.block as usize;
let hash_string = block.header.last_commit_hash.map(|h| h.to_string());
self.logger.info(&format!(
Expand All @@ -127,7 +149,12 @@ impl CosmosFeed {
tx_hash: hash_string.clone(),
log_index: 0,
})
.map(|e| serde_json::to_value(e).unwrap())
.map(|e| {
serde_json::to_value(e).unwrap_or_else(|err| {
eprintln!("Error converting FeederEvent to Json: {}", err);
serde_json::Value::Null
})
})
.collect()
} else {
self.logger.info("Filtering events based on the event name");
Expand All @@ -146,7 +173,12 @@ impl CosmosFeed {
})
.collect::<Vec<common::FeederEvent>>()
.into_iter()
.map(|e| serde_json::to_value(e).unwrap())
.map(|e| {
serde_json::to_value(e).unwrap_or_else(|err| {
eprintln!("Error converting FeederEvent to Json: {}", err);
serde_json::Value::Null
})
})
.collect()
};

Expand Down
20 changes: 20 additions & 0 deletions event_feed/src/error/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use super::*;

#[derive(Error, Debug)]
pub enum IOError {
Anyhow(Error),
Other(String),
Std(Box<std::io::Error>),
Subxt(subxt::Error),
}

impl Display for IOError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IOError::Anyhow(error) => write!(f, "{}", error),
IOError::Other(error) => write!(f, "{}", error),
IOError::Std(error) => write!(f, "{}", error),
IOError::Subxt(error) => write!(f, "{}", error),
}
}
}
6 changes: 6 additions & 0 deletions event_feed/src/error/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod io;
pub use io::*;

use anyhow::Error;
use std::fmt::Display;
use thiserror::Error;
23 changes: 16 additions & 7 deletions event_feed/src/eth/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::*;
use crate::error::IOError;
use core::result::Result::Ok;

use runtime::{logger::CoreLogger, Logger};
/// Represents an Ethereum blockchain event feed.
pub struct EthFeed {
Expand Down Expand Up @@ -49,18 +52,24 @@ impl EthFeed {
pub async fn event_feed(&self, cb: &dyn Fn(Vec<Value>)) -> Result<()> {
let client = Arc::new(&self.eth_service);

let last_block = client
.get_block(BlockNumber::Latest)
.await?
.unwrap()
.number
.unwrap();
let last_block = match client.get_block(BlockNumber::Latest).await {
Ok(Some(block)) => block.number.unwrap(),
Ok(None) => {
return Err(IOError::Other(
"Failed to fetch the latest block number".to_string(),
).into())
}
Err(err) => return Err(IOError::Other(err.to_string()).into()),
};

let events = self.events.iter().map(|e| e.0.clone());

let filter = Filter::new().from_block(last_block - 25).events(events);

let mut stream = client.subscribe_logs(&filter).await?;
let mut stream = match client.subscribe_logs(&filter).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? operator is enough to return the error

Ok(stream) => stream,
Err(e) => return Err(IOError::Other(e.to_string()).into()),
};
self.logger.info(&format!(
"Subscribed to events with the filter : {:?}",
filter
Expand Down
48 changes: 37 additions & 11 deletions event_feed/src/icon/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::error::IOError;
use runtime::{logger::CoreLogger, Logger};

/// Represents the icon blockchain event feed which contains the endpoint and the filters.
Expand Down Expand Up @@ -60,8 +61,12 @@ impl IconFeed {
let mut score_filter = false;

if !self.events.is_empty() || !self.score.is_empty() {
let tx_hash: String =
serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap();
let tx_hash: String = serde_json::from_value(
transaction
.get("txHash")
.ok_or_else(|| anyhow::anyhow!("Transaction hash not found in transaction"))?
.clone(),
)?;
self.logger.info(&format!(
"Filtering the events with the tx_hash : {:?}",
tx_hash
Expand Down Expand Up @@ -139,18 +144,30 @@ impl IconFeed {
}
};

let transactions: Vec<Value> = serde_json::from_value(
block
.get("result")
.and_then(|val| val.get("confirmed_transaction_list"))
.unwrap()
.clone(),
)?;
let transactions = match block
.get("result")
.and_then(|val| val.get("confirmed_transaction_list"))
.cloned()
{
Some(transactions) => transactions,
None => {
return Err(IOError::Other(format!(
"No taransactions found in block {}",
latest_height
))
.into());
}
};

let transactions: Vec<Value> = serde_json::from_value(transactions)
.map_err(|err| IOError::Other(format!("Error in transacation: {}", err)))?;

let mut filtered_tx = Vec::<Value>::new();

for transaction in transactions {
if self.filter(&transaction).await? {
if self.filter(&transaction).await.map_err(|err| {
IOError::Other(format!("Error filtering transaction: {}", err))
})? {
filtered_tx.push(transaction);
}
}
Expand All @@ -171,7 +188,16 @@ impl IconFeed {
sleep(Duration::from_secs(1));
}

latest_height = get_icon_block_height(&self.icon_service).await?;
latest_height = match get_icon_block_height(&self.icon_service).await {
Ok(height) => height,
Err(err) => {
return Err(IOError::Other(format!(
"Error getting latest block height: {}",
err
))
.into());
}
}
}
}
}
14 changes: 6 additions & 8 deletions event_feed/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@

pub mod common;
pub mod cosmos;
pub mod error;
pub mod eth;
pub mod icon;
pub mod substrate;

/// can use this library to fetch events from multiple chain like
/// can use this library to fetch events from multiple chain like
/// - substrate
/// - icon
/// - eth
/// - cosmos
///
///
/// use event_feed::*;
///
///
/// #[tokio::main]
/// async fn main() {
/// dotenv::dotenv().ok();
Expand Down Expand Up @@ -59,12 +59,10 @@ pub mod substrate;
/// })
/// .await;
/// }
///
///
pub use common::*;
pub use cosmos::*;
pub use error::*;
pub use eth::*;
pub use icon::*;
pub use substrate::*;



Loading