From 414d9496a60497e83fd34a3dd8054374237cced5 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Fri, 3 May 2024 20:14:40 +0530 Subject: [PATCH 1/9] chore: error handling for event feed --- event_feed/src/common/context.rs | 1 + event_feed/src/cosmos/feeder.rs | 56 ++++++++++++++++++------------- event_feed/src/eth/feeder.rs | 28 ++++++++++++---- event_feed/src/icon/feeder.rs | 45 +++++++++++++++++++------ event_feed/src/main.rs | 1 - event_feed/src/substrate/types.rs | 12 +++---- 6 files changed, 96 insertions(+), 47 deletions(-) diff --git a/event_feed/src/common/context.rs b/event_feed/src/common/context.rs index 74e6249..55bd58f 100644 --- a/event_feed/src/common/context.rs +++ b/event_feed/src/common/context.rs @@ -2,6 +2,7 @@ use super::CosmosFeed; use super::EthFeed; use crate::IconFeed; use crate::PolkadotFeed; +// use crate::error::EventFeedError; use anyhow::Result; use subxt::PolkadotConfig; diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index b1cc478..432d0f0 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -3,6 +3,8 @@ use crate::{common, cosmos}; use anyhow::*; use serde_json::Value; use tendermint_rpc::event::{Event, EventData}; +use core::result::Result::Ok; +// use crate::error::EventFeedError; /// The `CosmosFeed` struct in Rust defines a feed for handling Cosmos blockchain events with filtering /// capabilities. @@ -63,31 +65,33 @@ impl CosmosFeed { /// The `event_feed` function returns a `Result<()>`. pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url) - .await - .unwrap(); + .await?; let driver_handle = tokio::spawn(async move { driver.run().await }); - let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap(); + let mut subs = client.subscribe(EventType::NewBlock.into()).await?; let mut events: Vec = Vec::new(); while let Some(res) = subs.next().await { - let ev = res.unwrap(); - events.push(ev.clone()); - - let filter_events = events - .iter() - .filter(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config).is_some() - }) - .flat_map(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config).unwrap() - }) - .collect::>(); - - // Call the callback with all or filtered events - cb(filter_events); - - events.clear(); + match res { + Ok(ev) => { + events.push(ev.clone()); + let filter_events = events + .iter() + .filter(|tendermint_event| { + Self::convert_to_feeder_event(tendermint_event, &self.chain_config).is_some() + }) + .flat_map(|tendermint_event| { + Self::convert_to_feeder_event(tendermint_event, &self.chain_config).unwrap() + }) + .collect::>(); + cb(filter_events); + events.clear(); + } + Err(err) => { + eprintln!("Error receiving event: {}", err); + // Consider retry logic or other actions here + } + } } drop(subs); @@ -121,7 +125,7 @@ impl CosmosFeed { result_begin_block, result_end_block: _, } => { - let block = block.as_ref().unwrap(); + let block = block.as_ref()?; let block_number = block.header.version.block as usize; let hash_string = block.header.last_commit_hash.map(|h| h.to_string()); @@ -138,7 +142,10 @@ impl CosmosFeed { tx_hash: hash_string.clone(), log_index: 0, }) - .map(|e| serde_json::to_value(e).unwrap()) + .map(|e| serde_json::to_value(e).unwrap_or_else(|err|{ + eprintln!("Error converting FeederEvent to Json: {}", err); + serde_json::Value::Null + })) .collect() } else { result_begin_block @@ -156,7 +163,10 @@ impl CosmosFeed { }) .collect::>() .into_iter() - .map(|e| serde_json::to_value(e).unwrap()) + .map(|e| serde_json::to_value(e).unwrap_or_else(|err|{ + eprintln!("Error converting FeederEvent to Json: {}", err); + serde_json::Value::Null + })) .collect() }; diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index a1b3c31..141aba3 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -1,4 +1,8 @@ use super::*; +use crate::error::EventFeedError; +use anyhow::*; +use core::result::Result::Ok; + pub struct EthFeed { eth_service: Provider, events: Vec<(String, H256)>, @@ -34,19 +38,28 @@ impl EthFeed { pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let client = Arc::new(&self.eth_service); + + // let last_block = client + // .get_block(BlockNumber::Latest) + // .await? + // .unwrap() + // .number + // .unwrap(); + let last_block = match client.get_block(BlockNumber::Latest).await? { + Some(block) => block.number.unwrap(), + None => return Err(anyhow!("Failed to fetch the latest block number")), + }; - let last_block = client - .get_block(BlockNumber::Latest) - .await? - .unwrap() - .number - .unwrap(); let events = self.events.iter().map(|e| e.0.clone()); let filter = Filter::new().from_block(last_block - 25).events(events); - let mut stream = client.subscribe_logs(&filter).await?; + // let mut stream = client.subscribe_logs(&filter).await?; + let mut stream = match client.subscribe_logs(&filter).await { + Ok(stream) => stream, + Err(e) => return Err(anyhow!("Failed to subscribe to logs: {}", e)), + }; while let Some(log) = stream.next().await { if self.contracts.is_empty() || self.contracts.contains(&format!("{:?}", &log.address)) @@ -103,6 +116,7 @@ impl EthFeed { } } } + Ok(()) } diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index ae19174..c8bdb95 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -38,8 +38,13 @@ impl IconFeed { let mut score_filter = false; if !self.events.is_empty() || !self.score.is_empty() { - let tx_hash: String = - serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap(); + // let tx_hash: String = + // serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap(); + let tx_hash: String = serde_json::from_value( + transaction.get("txHash") + .ok_or_else(|| anyhow::anyhow!("Transaction hash not found in transaction"))? + .clone() + )?; let event_logs = get_event_logs_by_tx_hash(&self.icon_service, &tx_hash).await?; @@ -97,13 +102,26 @@ impl IconFeed { } }; - let transactions: Vec = serde_json::from_value( - block - .get("result") - .and_then(|val| val.get("confirmed_transaction_list")) - .unwrap() - .clone(), - )?; + // let transactions: Vec = serde_json::from_value( + // block + // .get("result") + // .and_then(|val| val.get("confirmed_transaction_list")) + // .unwrap() + // .clone(), + // )?; + + let transactions = match block.get("result") + .and_then(|val| val.get("confirmed_transaction_list")) + .cloned() { + Some(transactions) => transactions, + None => { + eprintln!("No transactions found in block {}", latest_height); + sleep(Duration::from_millis(1000)); + continue; + } + }; + + let transactions: Vec = serde_json::from_value(transactions)?; let mut filtered_tx = Vec::::new(); @@ -122,7 +140,14 @@ impl IconFeed { sleep(Duration::from_secs(1)); } - latest_height = get_icon_block_height(&self.icon_service).await?; + // latest_height = get_icon_block_height(&self.icon_service).await?; + latest_height = match get_icon_block_height(&self.icon_service).await { + Ok(height) => height, + Err(err) => { + eprintln!("Error getting latest block height: {}", err); + continue; + } + } } } } diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index dab8243..c11b5bd 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -12,7 +12,6 @@ mod cosmos; mod eth; mod icon; mod substrate; -// pub use cosmos; #[tokio::main] async fn main() { diff --git a/event_feed/src/substrate/types.rs b/event_feed/src/substrate/types.rs index dad7803..685770c 100644 --- a/event_feed/src/substrate/types.rs +++ b/event_feed/src/substrate/types.rs @@ -32,18 +32,18 @@ impl PolkadotFeed { // For each block, print a bunch of information about it: loop { if let Some(block) = blocks_sub.next().await { - let block = block.unwrap(); + let block = block?; let mut fetched_events = Vec::new(); - let extrinsics = block.extrinsics().await.unwrap(); + let extrinsics = block.extrinsics().await?; for ext in extrinsics.iter() { - let ext = ext.unwrap(); - let events = ext.events().await.unwrap(); - let event_details = events.iter().collect::, _>>().unwrap(); + let ext = ext?; + let events = ext.events().await?; + let event_details = events.iter().collect::, _>>()?; let filter = self.split_filter(); for event in event_details.iter() { - let s = event.field_values().unwrap(); + let s = event.field_values()?; let data = format!("{}", s).replace("((", "[").replace("))", "]"); let pallet_name = event.pallet_name().to_lowercase(); From 2477c48cf8898103092e0625784ddda2f3140874 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Fri, 3 May 2024 20:16:27 +0530 Subject: [PATCH 2/9] chore: code cleanup --- event_feed/src/eth/feeder.rs | 9 +-------- event_feed/src/icon/feeder.rs | 9 --------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index 141aba3..6b0dc2f 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -1,5 +1,5 @@ use super::*; -use crate::error::EventFeedError; +// use crate::error::EventFeedError; use anyhow::*; use core::result::Result::Ok; @@ -39,12 +39,6 @@ impl EthFeed { pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let client = Arc::new(&self.eth_service); - // let last_block = client - // .get_block(BlockNumber::Latest) - // .await? - // .unwrap() - // .number - // .unwrap(); let last_block = match client.get_block(BlockNumber::Latest).await? { Some(block) => block.number.unwrap(), None => return Err(anyhow!("Failed to fetch the latest block number")), @@ -55,7 +49,6 @@ impl EthFeed { let filter = Filter::new().from_block(last_block - 25).events(events); - // let mut stream = client.subscribe_logs(&filter).await?; let mut stream = match client.subscribe_logs(&filter).await { Ok(stream) => stream, Err(e) => return Err(anyhow!("Failed to subscribe to logs: {}", e)), diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index c8bdb95..5d97bf0 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -102,14 +102,6 @@ impl IconFeed { } }; - // let transactions: Vec = serde_json::from_value( - // block - // .get("result") - // .and_then(|val| val.get("confirmed_transaction_list")) - // .unwrap() - // .clone(), - // )?; - let transactions = match block.get("result") .and_then(|val| val.get("confirmed_transaction_list")) .cloned() { @@ -140,7 +132,6 @@ impl IconFeed { sleep(Duration::from_secs(1)); } - // latest_height = get_icon_block_height(&self.icon_service).await?; latest_height = match get_icon_block_height(&self.icon_service).await { Ok(height) => height, Err(err) => { From 4783392b7e04b97096dc1bec514a00a7a9df71f0 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Mon, 6 May 2024 16:21:39 +0530 Subject: [PATCH 3/9] chore: error handling for event feeder --- event_feed/Cargo.toml | 19 ++++++-------- event_feed/src/common/context.rs | 6 ++--- event_feed/src/cosmos/feeder.rs | 42 ++++++++++++++++++------------ event_feed/src/error/io.rs | 32 +++++++++++++++++++++++ event_feed/src/error/mod.rs | 6 +++++ event_feed/src/eth/feeder.rs | 29 ++++++++++++--------- event_feed/src/icon/feeder.rs | 36 ++++++++++++++++--------- event_feed/src/lib.rs | 14 +++++----- event_feed/src/main.rs | 30 ++++++++++++++------- event_feed/src/substrate/feeder.rs | 34 +++++++++++++++++------- 10 files changed, 165 insertions(+), 83 deletions(-) create mode 100644 event_feed/src/error/io.rs create mode 100644 event_feed/src/error/mod.rs diff --git a/event_feed/Cargo.toml b/event_feed/Cargo.toml index a8dc8de..a6925ba 100644 --- a/event_feed/Cargo.toml +++ b/event_feed/Cargo.toml @@ -20,18 +20,19 @@ dirs = '5.0.1' envy = '0.4' anyhow = '1.0.82' icon-sdk = '1.2.0' -tendermint-rpc = {version = "0.35.0", features = ["http-client", "websocket-client"]} +tendermint-rpc = { version = "0.35.0", features = [ + "http-client", + "websocket-client", +] } futures = "0.3.30" base64 = "0.22.0" -clap ={ version = "4.5.4", features = [ "derive", "env" ]} +clap = { version = "4.5.4", features = ["derive", "env"] } +thiserror = "1.0.59" [dependencies.ethers] version = '2.0.8' default_features = false -features = [ - 'ws', - 'rustls', -] +features = ['ws', 'rustls'] [dependencies.serde] version = '1.0.198' @@ -42,8 +43,4 @@ path = '../runtime/lite' [dependencies.tokio] version = '1.36.0' -features = [ - 'macros', - 'time', - 'rt-multi-thread', -] +features = ['macros', 'time', 'rt-multi-thread'] diff --git a/event_feed/src/common/context.rs b/event_feed/src/common/context.rs index 0894d03..a555389 100644 --- a/event_feed/src/common/context.rs +++ b/event_feed/src/common/context.rs @@ -1,9 +1,8 @@ use super::CosmosFeed; use super::EthFeed; +use crate::error::IOError; use crate::IconFeed; use crate::PolkadotFeed; -// use crate::error::EventFeedError; -use anyhow::Result; use subxt::PolkadotConfig; /// Represents the main context of the event feed. This is used to configure the event feed with the @@ -16,9 +15,8 @@ pub enum Context { } impl Context { - /// Starts the event feed. - pub async fn feed_events(&self, cb: &dyn Fn(Vec)) -> Result<()> { + pub async fn feed_events(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError> { match self { Context::PolkadotFeed(feed) => feed.event_feed(cb).await, Context::IconFeed(feed) => feed.event_feed(cb).await, diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index d78acf8..7a1bdb5 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -1,10 +1,10 @@ use super::*; use crate::common; +use crate::error::IOError; use anyhow::*; +use core::result::Result::Ok; use serde_json::Value; use tendermint_rpc::event::{Event, EventData}; -use core::result::Result::Ok; -// use crate::error::EventFeedError; /// Represents the cosmos blockchain event feed. pub struct CosmosFeed { @@ -36,12 +36,16 @@ impl CosmosFeed { /// Arguments: /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError> { let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url) - .await?; + .await + .map_err(|err| IOError::Other(format!("Client not created {}", err)))?; let driver_handle = tokio::spawn(async move { driver.run().await }); - let mut subs = client.subscribe(EventType::NewBlock.into()).await?; + let mut subs = client + .subscribe(EventType::NewBlock.into()) + .await + .map_err(|err| IOError::Other(format!("Error in subscribing the client {}", err)))?; let mut events: Vec = Vec::new(); while let Some(res) = subs.next().await { @@ -51,18 +55,20 @@ impl CosmosFeed { let filter_events = events .iter() .filter(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config).is_some() + Self::convert_to_feeder_event(tendermint_event, &self.chain_config) + .is_some() }) .flat_map(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config).unwrap() + Self::convert_to_feeder_event(tendermint_event, &self.chain_config) + .unwrap() }) .collect::>(); cb(filter_events); events.clear(); } Err(err) => { - eprintln!("Error receiving event: {}", err); // Consider retry logic or other actions here + return Err(IOError::Other(format!("Error receving event {} ", err))); } } } @@ -109,10 +115,12 @@ impl CosmosFeed { tx_hash: hash_string.clone(), log_index: 0, }) - .map(|e| serde_json::to_value(e).unwrap_or_else(|err|{ - eprintln!("Error converting FeederEvent to Json: {}", err); - serde_json::Value::Null - })) + .map(|e| { + serde_json::to_value(e).unwrap_or_else(|err| { + eprintln!("Error converting FeederEvent to Json: {}", err); + serde_json::Value::Null + }) + }) .collect() } else { result_begin_block @@ -130,10 +138,12 @@ impl CosmosFeed { }) .collect::>() .into_iter() - .map(|e| serde_json::to_value(e).unwrap_or_else(|err|{ - eprintln!("Error converting FeederEvent to Json: {}", err); - serde_json::Value::Null - })) + .map(|e| { + serde_json::to_value(e).unwrap_or_else(|err| { + eprintln!("Error converting FeederEvent to Json: {}", err); + serde_json::Value::Null + }) + }) .collect() }; diff --git a/event_feed/src/error/io.rs b/event_feed/src/error/io.rs new file mode 100644 index 0000000..9c9acda --- /dev/null +++ b/event_feed/src/error/io.rs @@ -0,0 +1,32 @@ +use super::*; + +#[derive(Error, Debug)] +pub enum IOError { + Anyhow(Error), + Other(String), + Std(Box), + Subxt(subxt::Error), +} + +impl From for IOError { + fn from(err: anyhow::Error) -> Self { + match err.root_cause().downcast_ref::() { + Some(io_err) => IOError::Std(Box::new(std::io::Error::new( + io_err.kind(), + io_err.to_string(), + ))), + None => IOError::Other(format!("Unexpected error: {}", err)), + } + } +} + +impl Display for IOError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IOError::Anyhow(error) => write!(f, "{}", error), + IOError::Other(error) => write!(f, "{}", error), + IOError::Std(error) => write!(f, "{}", error), + IOError::Subxt(error) => write!(f, "{}", error), + } + } +} diff --git a/event_feed/src/error/mod.rs b/event_feed/src/error/mod.rs new file mode 100644 index 0000000..f4e0efc --- /dev/null +++ b/event_feed/src/error/mod.rs @@ -0,0 +1,6 @@ +mod io; +pub use io::*; + +use anyhow::Error; +use std::fmt::Display; +use thiserror::Error; diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index b08e2cb..3853e7d 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -1,5 +1,5 @@ use super::*; -// use crate::error::EventFeedError; +use crate::error::IOError; use anyhow::*; use core::result::Result::Ok; @@ -12,9 +12,9 @@ pub struct EthFeed { impl EthFeed { /// Creates a new EthFeed instance based on the provided chain configuration. - /// + /// /// Arguments: - /// + /// /// * `config`: An event feed chain configuration. pub async fn new(config: ChainConfig) -> Result { let events = config @@ -43,18 +43,22 @@ impl EthFeed { } /// Fetches the events from the Ethereum chain and returns it through the provided callback function. - /// + /// /// # Arguments: - /// + /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError> { let client = Arc::new(&self.eth_service); - - let last_block = match client.get_block(BlockNumber::Latest).await? { - Some(block) => block.number.unwrap(), - None => return Err(anyhow!("Failed to fetch the latest block number")), - }; + let last_block = match client.get_block(BlockNumber::Latest).await { + Ok(Some(block)) => block.number.unwrap(), + Ok(None) => { + return Err(IOError::Other( + "Failed to fetch the latest block number".to_string(), + )) + } + Err(err) => return Err(IOError::Other(err.to_string())), + }; let events = self.events.iter().map(|e| e.0.clone()); @@ -62,7 +66,7 @@ impl EthFeed { let mut stream = match client.subscribe_logs(&filter).await { Ok(stream) => stream, - Err(e) => return Err(anyhow!("Failed to subscribe to logs: {}", e)), + Err(e) => return Err(IOError::Other(e.to_string())), }; while let Some(log) = stream.next().await { @@ -131,7 +135,6 @@ impl EthFeed { } } } - Ok(()) } diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index dbc5ed7..5906068 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -1,4 +1,5 @@ use super::*; +use crate::error::IOError; /// Represents the icon blockchain event feed which contains the endpoint and the filters. pub struct IconFeed { @@ -60,9 +61,10 @@ impl IconFeed { // let tx_hash: String = // serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap(); let tx_hash: String = serde_json::from_value( - transaction.get("txHash") + transaction + .get("txHash") .ok_or_else(|| anyhow::anyhow!("Transaction hash not found in transaction"))? - .clone() + .clone(), )?; let event_logs = get_event_logs_by_tx_hash(&self.icon_service, &tx_hash).await?; @@ -108,7 +110,7 @@ impl IconFeed { /// Arguments: /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError> { let mut latest_height = get_icon_block_height(&self.icon_service).await?; let mut old_height = latest_height - 1; @@ -126,23 +128,29 @@ impl IconFeed { } }; - let transactions = match block.get("result") - .and_then(|val| val.get("confirmed_transaction_list")) - .cloned() { + let transactions = match block + .get("result") + .and_then(|val| val.get("confirmed_transaction_list")) + .cloned() + { Some(transactions) => transactions, None => { - eprintln!("No transactions found in block {}", latest_height); - sleep(Duration::from_millis(1000)); - continue; + return Err(IOError::Other(format!( + "No taransactions found in block {}", + latest_height + ))); } }; - let transactions: Vec = serde_json::from_value(transactions)?; + let transactions: Vec = serde_json::from_value(transactions) + .map_err(|err| IOError::Other(format!("Error in transacation: {}", err)))?; let mut filtered_tx = Vec::::new(); for transaction in transactions { - if self.filter(&transaction).await? { + if self.filter(&transaction).await.map_err(|err| { + IOError::Other(format!("Error filtering transaction: {}", err)) + })? { filtered_tx.push(transaction); } } @@ -159,8 +167,10 @@ impl IconFeed { latest_height = match get_icon_block_height(&self.icon_service).await { Ok(height) => height, Err(err) => { - eprintln!("Error getting latest block height: {}", err); - continue; + return Err(IOError::Other(format!( + "Error getting latest block height: {}", + err + ))); } } } diff --git a/event_feed/src/lib.rs b/event_feed/src/lib.rs index ae2915c..34031e2 100644 --- a/event_feed/src/lib.rs +++ b/event_feed/src/lib.rs @@ -1,18 +1,18 @@ - pub mod common; pub mod cosmos; +pub mod error; pub mod eth; pub mod icon; pub mod substrate; -/// can use this library to fetch events from multiple chain like +/// can use this library to fetch events from multiple chain like /// - substrate /// - icon /// - eth /// - cosmos -/// +/// /// use event_feed::*; -/// +/// /// #[tokio::main] /// async fn main() { /// dotenv::dotenv().ok(); @@ -59,12 +59,10 @@ pub mod substrate; /// }) /// .await; /// } -/// +/// pub use common::*; pub use cosmos::*; +pub use error::*; pub use eth::*; pub use icon::*; pub use substrate::*; - - - diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index f5a3016..0fcfa36 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -1,5 +1,6 @@ use event_feed::{ - ChainConfig, Context, CosmosFeed, EthFeed, IconFeed, PolkadotFeed, Producer, ProducerConfig, + ChainConfig, Context, CosmosFeed, EthFeed, IOError, IconFeed, PolkadotFeed, Producer, + ProducerConfig, }; use std::{fs, path}; use subxt::PolkadotConfig; @@ -35,29 +36,40 @@ async fn main() -> Result<(), Box> { let (chain_config, producer_config) = match args.clone().config { Some(path) => { let read_data = fs::read(path::Path::new(&path))?; - let chain_config: ChainConfig = serde_json::from_slice(&read_data).unwrap(); + let chain_config: ChainConfig = + serde_json::from_slice(&read_data).map_err(|e| IOError::Anyhow(e.into()))?; let producer_config: ProducerConfig = serde_json::from_slice(&read_data.clone()).unwrap(); (chain_config, producer_config) } None => (ChainConfig::default(), ProducerConfig::default()), }; - let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); - ssb_client.accept_invite().await.unwrap(); + let mut ssb_client = Producer::new(producer_config.clone()) + .await + .map_err(|e| IOError::Anyhow(e.into()))?; + ssb_client + .accept_invite() + .await + .map_err(|e| IOError::Anyhow(e.into()))?; let feed = match args.command { Commands::Substrate => { let polkadot_client = PolkadotFeed::::new(chain_config.clone()) .await - .unwrap(); + .map_err(|e| { + IOError::Other(format!("Error initializing POLKADOT client : {}", e)) + })?; Context::PolkadotFeed(polkadot_client) } Commands::Icon => { - let icon_client = IconFeed::new(chain_config.clone()).unwrap(); + let icon_client = IconFeed::new(chain_config.clone()) + .map_err(|e| IOError::Other(format!("Error initializing ICON client: {}", e)))?; Context::IconFeed(icon_client) } Commands::Eth => { - let eth_client = EthFeed::new(chain_config.clone()).await.unwrap(); + let eth_client = EthFeed::new(chain_config.clone()) + .await + .map_err(|e| IOError::Other(format!("Error initializing ETH client: {}", e)))?; Context::EthFeed(eth_client) } Commands::Cosmos => { @@ -67,7 +79,7 @@ async fn main() -> Result<(), Box> { }; let (tx, rx) = std::sync::mpsc::channel::>(); - + tokio::spawn(async move { while let Ok(logs) = rx.recv() { for log in logs { @@ -75,7 +87,7 @@ async fn main() -> Result<(), Box> { } } }); - + let _ = feed .feed_events(&|e| { tx.send(e).unwrap(); diff --git a/event_feed/src/substrate/feeder.rs b/event_feed/src/substrate/feeder.rs index 95ee072..9bf5e6b 100644 --- a/event_feed/src/substrate/feeder.rs +++ b/event_feed/src/substrate/feeder.rs @@ -1,5 +1,8 @@ use super::*; use crate::common::ChainConfig; +use crate::error::IOError; +use anyhow::*; +use core::result::Result::Ok; #[derive(Debug, Clone)] /// The `PolkadotFeed` struct in Rust contains a `chain_config` field of type `ChainConfig` and a @@ -44,24 +47,37 @@ impl PolkadotFeed { /// # Arguments /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()>{ - let mut blocks_sub = self.client.blocks().subscribe_finalized().await?; + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError>{ + let mut blocks_sub = self.client.blocks().subscribe_finalized().await.map_err(|err| IOError::Subxt(err.into()))?; // For each block, print a bunch of information about it: loop { if let Some(block) = blocks_sub.next().await { - let block = block?; + let block = block.map_err(|err| IOError::Subxt(err.into()))?; let mut fetched_events = Vec::new(); - let extrinsics = block.extrinsics().await?; + let extrinsics = block.extrinsics().await.map_err(|err| IOError::Subxt(err.into()))?; for ext in extrinsics.iter() { - let ext = ext?; - let events = ext.events().await?; - let event_details = events.iter().collect::, _>>()?; + let ext = match ext{ + Ok(ext) => ext, + Err(err) => return Err(IOError::Subxt(err)), + }; + let events = match ext.events().await{ + Ok(events) => events, + Err(err) => return Err(IOError::Other(format!("Error fetching events: {}", err))), + }; + let event_details = match events.iter().collect::, _>>(){ + Ok(event_details) => event_details, + Err(err) => return Err(IOError::Other(format!("Error collecting events: {}", err))), + }; let filter = self.split_filter(); for event in event_details.iter() { - let s = event.field_values()?; + let s = match event.field_values(){ + Ok(data) => data, + Err(err) => return Err(IOError::Other(format!("Error getting event field values: {}", err))), + + }; let data = format!("{}", s).replace("((", "[").replace("))", "]"); let pallet_name = event.pallet_name().to_lowercase(); @@ -85,7 +101,7 @@ impl PolkadotFeed { method: event.variant_name().to_string(), field_value: data, }; - let serialize_event = serde_json::to_value(&decode_event)?; + let serialize_event = serde_json::to_value(&decode_event).map_err(|err| IOError::Other(format!("Serialising error {}", err)))?; fetched_events.push(serialize_event); } } From 15c0dae3adf25f190f83838c551f2a495a0f385c Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Mon, 6 May 2024 17:52:16 +0530 Subject: [PATCH 4/9] chore: update the return type --- event_feed/src/common/context.rs | 6 +-- event_feed/src/cosmos/feeder.rs | 4 +- event_feed/src/error/io.rs | 15 +++--- event_feed/src/eth/feeder.rs | 9 ++-- event_feed/src/icon/feeder.rs | 6 +-- event_feed/src/main.rs | 4 +- event_feed/src/substrate/feeder.rs | 73 +++++++++++++++++++----------- event_feed/src/substrate/mod.rs | 1 - 8 files changed, 68 insertions(+), 50 deletions(-) diff --git a/event_feed/src/common/context.rs b/event_feed/src/common/context.rs index a555389..909b37d 100644 --- a/event_feed/src/common/context.rs +++ b/event_feed/src/common/context.rs @@ -1,10 +1,10 @@ use super::CosmosFeed; use super::EthFeed; -use crate::error::IOError; +// use crate::error::IOError; use crate::IconFeed; use crate::PolkadotFeed; use subxt::PolkadotConfig; - +use anyhow::Result; /// Represents the main context of the event feed. This is used to configure the event feed with the /// appropriate chain. pub enum Context { @@ -16,7 +16,7 @@ pub enum Context { impl Context { /// Starts the event feed. - pub async fn feed_events(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError> { + pub async fn feed_events(&self, cb: &dyn Fn(Vec)) -> Result<()> { match self { Context::PolkadotFeed(feed) => feed.event_feed(cb).await, Context::IconFeed(feed) => feed.event_feed(cb).await, diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index 7a1bdb5..6d27cdd 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -36,7 +36,7 @@ impl CosmosFeed { /// Arguments: /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError> { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url) .await .map_err(|err| IOError::Other(format!("Client not created {}", err)))?; @@ -68,7 +68,7 @@ impl CosmosFeed { } Err(err) => { // Consider retry logic or other actions here - return Err(IOError::Other(format!("Error receving event {} ", err))); + return Err(IOError::Other(format!("Error receving event {} ", err)).into_anyhow()); } } } diff --git a/event_feed/src/error/io.rs b/event_feed/src/error/io.rs index 9c9acda..06f9363 100644 --- a/event_feed/src/error/io.rs +++ b/event_feed/src/error/io.rs @@ -8,14 +8,13 @@ pub enum IOError { Subxt(subxt::Error), } -impl From for IOError { - fn from(err: anyhow::Error) -> Self { - match err.root_cause().downcast_ref::() { - Some(io_err) => IOError::Std(Box::new(std::io::Error::new( - io_err.kind(), - io_err.to_string(), - ))), - None => IOError::Other(format!("Unexpected error: {}", err)), +impl IOError { + pub fn into_anyhow(self) -> anyhow::Error { + match self { + IOError::Anyhow(error) => error, + IOError::Other(msg) => anyhow::Error::msg(msg), + IOError::Std(io_error) => anyhow::Error::new(*io_error), + IOError::Subxt(subxt_error) => anyhow::Error::msg(subxt_error.to_string()), } } } diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index 3853e7d..a4da354 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -1,6 +1,5 @@ use super::*; use crate::error::IOError; -use anyhow::*; use core::result::Result::Ok; /// Represents an Ethereum blockchain event feed. @@ -47,7 +46,7 @@ impl EthFeed { /// # Arguments: /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError> { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let client = Arc::new(&self.eth_service); let last_block = match client.get_block(BlockNumber::Latest).await { @@ -55,9 +54,9 @@ impl EthFeed { Ok(None) => { return Err(IOError::Other( "Failed to fetch the latest block number".to_string(), - )) + ).into()) } - Err(err) => return Err(IOError::Other(err.to_string())), + Err(err) => return Err(IOError::Other(err.to_string()).into()), }; let events = self.events.iter().map(|e| e.0.clone()); @@ -66,7 +65,7 @@ impl EthFeed { let mut stream = match client.subscribe_logs(&filter).await { Ok(stream) => stream, - Err(e) => return Err(IOError::Other(e.to_string())), + Err(e) => return Err(IOError::Other(e.to_string()).into()), }; while let Some(log) = stream.next().await { diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index 5906068..671d0e0 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -110,7 +110,7 @@ impl IconFeed { /// Arguments: /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError> { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let mut latest_height = get_icon_block_height(&self.icon_service).await?; let mut old_height = latest_height - 1; @@ -138,7 +138,7 @@ impl IconFeed { return Err(IOError::Other(format!( "No taransactions found in block {}", latest_height - ))); + )).into()); } }; @@ -170,7 +170,7 @@ impl IconFeed { return Err(IOError::Other(format!( "Error getting latest block height: {}", err - ))); + )).into()); } } } diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index 0fcfa36..7893c22 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -46,11 +46,11 @@ async fn main() -> Result<(), Box> { }; let mut ssb_client = Producer::new(producer_config.clone()) .await - .map_err(|e| IOError::Anyhow(e.into()))?; + .map_err(|e| IOError::Anyhow(e))?; ssb_client .accept_invite() .await - .map_err(|e| IOError::Anyhow(e.into()))?; + .map_err(|e| IOError::Anyhow(e))?; let feed = match args.command { Commands::Substrate => { diff --git a/event_feed/src/substrate/feeder.rs b/event_feed/src/substrate/feeder.rs index 9bf5e6b..b68eaff 100644 --- a/event_feed/src/substrate/feeder.rs +++ b/event_feed/src/substrate/feeder.rs @@ -7,9 +7,9 @@ use core::result::Result::Ok; #[derive(Debug, Clone)] /// The `PolkadotFeed` struct in Rust contains a `chain_config` field of type `ChainConfig` and a /// `client` field of type `OnlineClient`. -/// +/// /// Properties: -/// +/// /// * `chain_config`: The `chain_config` property is a field of type `ChainConfig` in the `PolkadotFeed` /// struct. It likely contains configuration settings related to the Polkadot chain that the feed is /// interacting with. @@ -24,16 +24,13 @@ pub struct PolkadotFeed { } impl PolkadotFeed { - /// Creates a new `PolkadotFeed` instance based on the provided chain configuration. - /// + /// /// # Arguments - /// + /// /// * `chain_config`: An event feed chain configuration. pub async fn new(chain_config: ChainConfig) -> Result> { - let client = OnlineClient::::from_url(&chain_config.node_url) - .await - ?; + let client = OnlineClient::::from_url(&chain_config.node_url).await?; Ok(PolkadotFeed { chain_config, client, @@ -43,40 +40,61 @@ impl PolkadotFeed { impl PolkadotFeed { /// Fetches the events from the Substrate chain and returns it through the provided callback function. - /// + /// /// # Arguments - /// + /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<(), IOError>{ - let mut blocks_sub = self.client.blocks().subscribe_finalized().await.map_err(|err| IOError::Subxt(err.into()))?; + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + let mut blocks_sub = self + .client + .blocks() + .subscribe_finalized() + .await + .map_err(|err| IOError::Subxt(err))?; // For each block, print a bunch of information about it: loop { if let Some(block) = blocks_sub.next().await { - let block = block.map_err(|err| IOError::Subxt(err.into()))?; + let block = block.map_err(IOError::Subxt)?; let mut fetched_events = Vec::new(); - let extrinsics = block.extrinsics().await.map_err(|err| IOError::Subxt(err.into()))?; + let extrinsics = block + .extrinsics() + .await + .map_err(|err| IOError::Subxt(err))?; for ext in extrinsics.iter() { - let ext = match ext{ + let ext = match ext { Ok(ext) => ext, - Err(err) => return Err(IOError::Subxt(err)), + Err(err) => return Err(IOError::Subxt(err).into()), }; - let events = match ext.events().await{ + let events = match ext.events().await { Ok(events) => events, - Err(err) => return Err(IOError::Other(format!("Error fetching events: {}", err))), + Err(err) => { + return Err( + IOError::Other(format!("Error fetching events: {}", err)).into() + ) + } }; - let event_details = match events.iter().collect::, _>>(){ + let event_details = match events.iter().collect::, _>>() { Ok(event_details) => event_details, - Err(err) => return Err(IOError::Other(format!("Error collecting events: {}", err))), + Err(err) => { + return Err( + IOError::Other(format!("Error collecting events: {}", err)).into() + ) + } }; let filter = self.split_filter(); for event in event_details.iter() { - let s = match event.field_values(){ + let s = match event.field_values() { Ok(data) => data, - Err(err) => return Err(IOError::Other(format!("Error getting event field values: {}", err))), - + Err(err) => { + return Err(IOError::Other(format!( + "Error getting event field values: {}", + err + )) + .into()) + } }; let data = format!("{}", s).replace("((", "[").replace("))", "]"); @@ -101,7 +119,10 @@ impl PolkadotFeed { method: event.variant_name().to_string(), field_value: data, }; - let serialize_event = serde_json::to_value(&decode_event).map_err(|err| IOError::Other(format!("Serialising error {}", err)))?; + let serialize_event = + serde_json::to_value(&decode_event).map_err(|err| { + IOError::Other(format!("Serialising error {}", err)) + })?; fetched_events.push(serialize_event); } } @@ -113,9 +134,9 @@ impl PolkadotFeed { } /// Splits a string by ';' and then by '=' to create a HasMap. This hashmap is used to filter the events. - /// + /// /// Returns: - /// + /// /// Returns the hashmap with the key as the pallet name and the value as the list of events. pub fn split_filter(&self) -> std::collections::HashMap> { let mut filter_map: std::collections::HashMap> = diff --git a/event_feed/src/substrate/mod.rs b/event_feed/src/substrate/mod.rs index 696b61b..fb109ba 100644 --- a/event_feed/src/substrate/mod.rs +++ b/event_feed/src/substrate/mod.rs @@ -4,7 +4,6 @@ pub mod feeder; pub use feeder::*; use serde::{Deserialize, Serialize}; use subxt::{OnlineClient, PolkadotConfig}; -use anyhow::Result; #[subxt::subxt(runtime_metadata_path = "./src/common/utils/polkadot_metadata_full.scale")] pub mod polkadot {} From af23ca90bf56ca08fdbd46fccb506159d0597a2d Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Mon, 6 May 2024 18:06:21 +0530 Subject: [PATCH 5/9] chore: removed unused implimentation --- event_feed/src/cosmos/feeder.rs | 2 +- event_feed/src/error/io.rs | 11 ----------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index 6d27cdd..ffad130 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -68,7 +68,7 @@ impl CosmosFeed { } Err(err) => { // Consider retry logic or other actions here - return Err(IOError::Other(format!("Error receving event {} ", err)).into_anyhow()); + return Err(IOError::Other(format!("Error receving event {} ", err)).into()); } } } diff --git a/event_feed/src/error/io.rs b/event_feed/src/error/io.rs index 06f9363..a7a21e7 100644 --- a/event_feed/src/error/io.rs +++ b/event_feed/src/error/io.rs @@ -8,17 +8,6 @@ pub enum IOError { Subxt(subxt::Error), } -impl IOError { - pub fn into_anyhow(self) -> anyhow::Error { - match self { - IOError::Anyhow(error) => error, - IOError::Other(msg) => anyhow::Error::msg(msg), - IOError::Std(io_error) => anyhow::Error::new(*io_error), - IOError::Subxt(subxt_error) => anyhow::Error::msg(subxt_error.to_string()), - } - } -} - impl Display for IOError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { From 9408bda676fc362cefd22016b55e60aa1b6a11a3 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Mon, 6 May 2024 18:27:04 +0530 Subject: [PATCH 6/9] chore: update code --- event_feed/src/cosmos/feeder.rs | 5 +++-- event_feed/src/main.rs | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index f48fe17..302ca5b 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -44,6 +44,7 @@ impl CosmosFeed { let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url) .await .map_err(|err| IOError::Other(format!("Client not created {}", err)))?; + self.logger.info(&format!("Following the chain at {}", self.chain_config.node_url)); let driver_handle = tokio::spawn(async move { driver.run().await }); let mut subs = client @@ -59,11 +60,11 @@ impl CosmosFeed { let filter_events = events .iter() .filter(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config) + Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) .is_some() }) .flat_map(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config) + Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) .unwrap() }) .collect::>(); diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index 4fdfaa9..33dd673 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -48,6 +48,7 @@ async fn main() -> Result<(), Box> { let mut ssb_client = Producer::new(producer_config.clone()) .await .map_err(|e| IOError::Anyhow(e))?; + logger.info("SSB client created"); ssb_client .accept_invite() .await From 76d2877c5f0e8bac182f929b558dc1a74b848acd Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Mon, 6 May 2024 18:27:44 +0530 Subject: [PATCH 7/9] chore: remove commented code --- event_feed/src/icon/feeder.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index 6fff864..db8a5a2 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -61,8 +61,6 @@ impl IconFeed { let mut score_filter = false; if !self.events.is_empty() || !self.score.is_empty() { - // let tx_hash: String = - // serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap(); let tx_hash: String = serde_json::from_value( transaction .get("txHash") @@ -156,7 +154,8 @@ impl IconFeed { return Err(IOError::Other(format!( "No taransactions found in block {}", latest_height - )).into()); + )) + .into()); } }; @@ -195,7 +194,8 @@ impl IconFeed { return Err(IOError::Other(format!( "Error getting latest block height: {}", err - )).into()); + )) + .into()); } } } From 798885cb7c0179746f43ac2e76548f7a8be644f1 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Tue, 7 May 2024 11:46:16 +0530 Subject: [PATCH 8/9] chore: update code with error handling --- event_feed/src/common/kuska_client.rs | 4 +++- event_feed/src/cosmos/feeder.rs | 29 +++++++++++++++++++++------ event_feed/src/main.rs | 19 +++++++++--------- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/event_feed/src/common/kuska_client.rs b/event_feed/src/common/kuska_client.rs index 58e8e12..ab0917c 100644 --- a/event_feed/src/common/kuska_client.rs +++ b/event_feed/src/common/kuska_client.rs @@ -1,4 +1,5 @@ use super::*; +use crate::error::IOError; use anyhow::anyhow; use anyhow::Result; use runtime::api::dto::content::Mention; @@ -61,7 +62,8 @@ impl Producer { .publish(&message.to_string(), Some(vec![mention])) .await; - result.unwrap(); + // result.map_err(|e| IOError::Anyhow(e))?; + result.map_err(|e| IOError::Anyhow(anyhow!("{e}")))?; Ok(()) } diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index 302ca5b..5869c32 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -3,7 +3,6 @@ use crate::common; use crate::error::IOError; use anyhow::*; use core::result::Result::Ok; -use futures::StreamExt; use runtime::{logger::CoreLogger, Logger}; use serde_json::Value; use tendermint_rpc::event::{Event, EventData}; @@ -44,7 +43,10 @@ impl CosmosFeed { let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url) .await .map_err(|err| IOError::Other(format!("Client not created {}", err)))?; - self.logger.info(&format!("Following the chain at {}", self.chain_config.node_url)); + self.logger.info(&format!( + "Following the chain at {}", + self.chain_config.node_url + )); let driver_handle = tokio::spawn(async move { driver.run().await }); let mut subs = client @@ -60,12 +62,27 @@ impl CosmosFeed { let filter_events = events .iter() .filter(|tendermint_event| { - Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) - .is_some() + Self::convert_to_feeder_event( + self, + tendermint_event, + &self.chain_config, + ) + .is_some() }) .flat_map(|tendermint_event| { - Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) - .unwrap() + match Self::convert_to_feeder_event( + self, + tendermint_event, + &self.chain_config, + ) { + Some(value) => value, + None => { + let _ = Err::(IOError::Other( + "Error converting to FeederEvent".to_string(), + )); + vec![serde_json::Value::Null] + } + } }) .collect::>(); cb(filter_events); diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index 33dd673..11c6761 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -1,11 +1,11 @@ +use clap::{Parser, Subcommand}; use event_feed::{ ChainConfig, Context, CosmosFeed, EthFeed, IOError, IconFeed, PolkadotFeed, Producer, ProducerConfig, }; +use runtime::{logger::CoreLogger, Logger}; use std::{fs, path}; use subxt::PolkadotConfig; -use runtime::{logger::CoreLogger, Logger}; -use clap::{Parser, Subcommand}; #[derive(Parser, Debug, Clone)] #[command(author, version, about = "Async CLI Example")] @@ -39,8 +39,8 @@ async fn main() -> Result<(), Box> { let read_data = fs::read(path::Path::new(&path))?; let chain_config: ChainConfig = serde_json::from_slice(&read_data).map_err(|e| IOError::Anyhow(e.into()))?; - let producer_config: ProducerConfig = - serde_json::from_slice(&read_data.clone()).unwrap(); + let producer_config: ProducerConfig = serde_json::from_slice(&read_data.clone()) + .map_err(|e| IOError::Anyhow(e.into()))?; (chain_config, producer_config) } None => (ChainConfig::default(), ProducerConfig::default()), @@ -56,11 +56,12 @@ async fn main() -> Result<(), Box> { let feed = match args.command { Commands::Substrate => { - let polkadot_client = PolkadotFeed::::new(chain_config.clone(), logger.clone()) - .await - .map_err(|e| { - IOError::Other(format!("Error initializing POLKADOT client : {}", e)) - })?; + let polkadot_client = + PolkadotFeed::::new(chain_config.clone(), logger.clone()) + .await + .map_err(|e| { + IOError::Other(format!("Error initializing POLKADOT client : {}", e)) + })?; Context::PolkadotFeed(polkadot_client) } Commands::Icon => { From 0347ce8c505c2723af235ee50b25eae69967dc09 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Tue, 14 May 2024 10:27:19 +0530 Subject: [PATCH 9/9] chore: handle the error in eth --- event_feed/src/eth/feeder.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index 9130d65..ef5f52e 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -55,9 +55,9 @@ impl EthFeed { let last_block = match client.get_block(BlockNumber::Latest).await { Ok(Some(block)) => block.number.unwrap(), Ok(None) => { - return Err(IOError::Other( - "Failed to fetch the latest block number".to_string(), - ).into()) + return Err( + IOError::Other("Failed to fetch the latest block number".to_string()).into(), + ) } Err(err) => return Err(IOError::Other(err.to_string()).into()), }; @@ -66,10 +66,7 @@ impl EthFeed { let filter = Filter::new().from_block(last_block - 25).events(events); - let mut stream = match client.subscribe_logs(&filter).await { - Ok(stream) => stream, - Err(e) => return Err(IOError::Other(e.to_string()).into()), - }; + let mut stream = client.subscribe_logs(&filter).await?; self.logger.info(&format!( "Subscribed to events with the filter : {:?}", filter