Skip to content

Commit

Permalink
feat: logger implementation for event feeder like cosmos icon eth sub…
Browse files Browse the repository at this point in the history
…strate (#130)

* chore: add logger for the event feed

* chore: logger for eventfeed

* chore: remove unwanted imports

* chore: remove unwanted imports

* chore: implement logger

* chore: code formating

* chore: remove commented code

* chore: fix code

* chore: log block number in substrate

* chore: remove unused imports

* chore: fixed code

* chore: update code by removing the created file after the test

---------

Co-authored-by: Shanith K K <[email protected]>
  • Loading branch information
Prathiksha-Nataraja and shanithkk authored May 6, 2024
1 parent 5eb51ae commit a9e00f4
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 60 deletions.
16 changes: 6 additions & 10 deletions event_feed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ dirs = '5.0.1'
envy = '0.4'
anyhow = '1.0.82'
icon-sdk = '1.2.0'
tendermint-rpc = {version = "0.35.0", features = ["http-client", "websocket-client"]}
tendermint-rpc = { version = "0.35.0", features = [
"http-client",
"websocket-client",
] }
futures = "0.3.30"
base64 = "0.22.0"
clap ={ version = "4.5.4", features = [ "derive", "env" ]}

[dependencies.ethers]
version = '2.0.8'
default_features = false
features = [
'ws',
'rustls',
]
features = ['ws', 'rustls']

[dependencies.serde]
version = '1.0.198'
Expand All @@ -42,8 +42,4 @@ path = '../runtime/lite'

[dependencies.tokio]
version = '1.36.0'
features = [
'macros',
'time',
'rt-multi-thread',
]
features = ['macros', 'time', 'rt-multi-thread']
31 changes: 28 additions & 3 deletions event_feed/src/cosmos/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::*;
use crate::common;
use anyhow::*;
use futures::StreamExt;
use runtime::{logger::CoreLogger, Logger};
use serde_json::Value;
use tendermint_rpc::event::{Event, EventData};

/// Represents the cosmos blockchain event feed.
pub struct CosmosFeed {
pub chain_config: ChainConfig,
pub events: Vec<String>,
pub logger: CoreLogger,
}

impl CosmosFeed {
Expand All @@ -16,7 +19,7 @@ impl CosmosFeed {
/// Arguments:
///
/// * `chain_config`: An event feed chain configuration.
pub fn new(chain_config: common::ChainConfig) -> CosmosFeed {
pub fn new(chain_config: common::ChainConfig, logger: CoreLogger) -> CosmosFeed {
let events = chain_config
.event_filter
.split(',')
Expand All @@ -26,6 +29,7 @@ impl CosmosFeed {
CosmosFeed {
chain_config,
events,
logger,
}
}

Expand All @@ -38,6 +42,10 @@ impl CosmosFeed {
let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url)
.await
.unwrap();
self.logger.info(&format!(
"Following the chain at {}",
self.chain_config.node_url
));
let driver_handle = tokio::spawn(async move { driver.run().await });

let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
Expand All @@ -50,10 +58,12 @@ impl CosmosFeed {
let filter_events = events
.iter()
.filter(|tendermint_event| {
Self::convert_to_feeder_event(tendermint_event, &self.chain_config).is_some()
Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config)
.is_some()
})
.flat_map(|tendermint_event| {
Self::convert_to_feeder_event(tendermint_event, &self.chain_config).unwrap()
Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config)
.unwrap()
})
.collect::<Vec<serde_json::Value>>();

Expand All @@ -64,6 +74,7 @@ impl CosmosFeed {
}

drop(subs);
self.logger.info("Websocket connection closed!!");
let _ = driver_handle.await;
Ok(())
}
Expand All @@ -79,9 +90,12 @@ impl CosmosFeed {
///
/// Returns the list of `FeederEvent` objects as json values.
fn convert_to_feeder_event(
&self,
event: &Event,
chain_config: &ChainConfig,
) -> Option<Vec<serde_json::Value>> {
self.logger
.debug(&format!("Processing events : {:?}", event));
match event.data.clone() {
EventData::LegacyNewBlock {
ref block,
Expand All @@ -91,8 +105,16 @@ impl CosmosFeed {
let block = block.as_ref().unwrap();
let block_number = block.header.version.block as usize;
let hash_string = block.header.last_commit_hash.map(|h| h.to_string());
self.logger.info(&format!(
"Processing LegacyNewBlockEvent for block: {}",
block.header.version.block
));

let filtered_events: Vec<Value> = if chain_config.event_filter.is_empty() {
self.logger.info(&format!(
"Processing all events from block : {}",
block_number
));
result_begin_block
.unwrap()
.events
Expand All @@ -108,6 +130,7 @@ impl CosmosFeed {
.map(|e| serde_json::to_value(e).unwrap())
.collect()
} else {
self.logger.info("Filtering events based on the event name");
result_begin_block
.unwrap()
.events
Expand All @@ -128,8 +151,10 @@ impl CosmosFeed {
};

if !filtered_events.is_empty() {
self.logger.info("returning the filtered events");
Some(filtered_events)
} else {
self.logger.info("No events matched the filter");
None
}
}
Expand Down
14 changes: 9 additions & 5 deletions event_feed/src/cosmos/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod tests {

use super::super::*;
use runtime::{logger::CoreLogger, Logger};

#[tokio::test]
async fn test_cosmos_feed_new() {
Expand All @@ -10,8 +10,9 @@ mod tests {
contract_filter: "".to_string(),
chain: "cosmos".to_string(),
};
let cosmos_feed = CosmosFeed::new(chain_config.clone());
let logger = CoreLogger::new(Some("./event-feed.log"));

let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger);
assert_eq!(cosmos_feed.chain_config, chain_config);
assert_eq!(cosmos_feed.events.len(), 1);
assert!(cosmos_feed.events.contains(&"transfer".to_string()));
Expand All @@ -29,9 +30,10 @@ mod tests {
let callback = |events: Vec<serde_json::Value>| {
assert!(events.is_empty());
};

let cosmos_feed = CosmosFeed::new(chain_config.clone());
let logger = CoreLogger::new(Some("./event-feed.log"));
let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger);
let result = cosmos_feed.event_feed(&callback).await;
let _ = tokio::fs::remove_file("./event-feed.log").await;

assert!(result.is_ok());
}
Expand All @@ -51,8 +53,10 @@ mod tests {
// Simulate an error condition
assert!(events.is_empty());
};
let cosmos_feed = CosmosFeed::new(chain_config.clone());
let logger = CoreLogger::new(Some("./event-feed.log"));
let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger);

let _ = cosmos_feed.event_feed(&callback).await;
let _ = tokio::fs::remove_file("./event-feed.log").await;
}
}
21 changes: 16 additions & 5 deletions event_feed/src/eth/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use super::*;
use runtime::{logger::CoreLogger, Logger};
/// Represents an Ethereum blockchain event feed.
pub struct EthFeed {
eth_service: Provider<Ws>,
events: Vec<(String, H256)>,
contracts: Vec<String>,
logger: CoreLogger,
}

impl EthFeed {
/// Creates a new EthFeed instance based on the provided chain configuration.
///
///
/// Arguments:
///
///
/// * `config`: An event feed chain configuration.
pub async fn new(config: ChainConfig) -> Result<EthFeed> {
pub async fn new(config: ChainConfig, logger: CoreLogger) -> Result<EthFeed> {
let events = config
.event_filter
.split(';')
Expand All @@ -33,15 +35,16 @@ impl EthFeed {
eth_service: client,
events,
contracts,
logger,
};

Ok(eth_feed)
}

/// Fetches the events from the Ethereum chain and returns it through the provided callback function.
///
///
/// # Arguments:
///
///
/// * `cb`: A callback function to return the events to the caller.
pub async fn event_feed(&self, cb: &dyn Fn(Vec<Value>)) -> Result<()> {
let client = Arc::new(&self.eth_service);
Expand All @@ -58,6 +61,10 @@ impl EthFeed {
let filter = Filter::new().from_block(last_block - 25).events(events);

let mut stream = client.subscribe_logs(&filter).await?;
self.logger.info(&format!(
"Subscribed to events with the filter : {:?}",
filter
));

while let Some(log) = stream.next().await {
if self.contracts.is_empty() || self.contracts.contains(&format!("{:?}", &log.address))
Expand All @@ -83,6 +90,10 @@ impl EthFeed {
let tx_receipt = client.get_transaction_receipt(tx_hash).await;

if let Ok(Some(tx_receipt)) = tx_receipt {
self.logger.info(&format!(
"Received transaction receipt for the tx_hash : {:?}",
tx_hash
));
let mut logs = Vec::<Value>::new();

for log in tx_receipt.logs.iter() {
Expand Down
33 changes: 29 additions & 4 deletions event_feed/src/icon/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use super::*;
use runtime::{logger::CoreLogger, Logger};

/// Represents the icon blockchain event feed which contains the endpoint and the filters.
pub struct IconFeed {
icon_service: IconService,
events: Vec<String>,
score: Vec<String>,
logger: CoreLogger,
}

impl IconFeed {
Expand All @@ -17,7 +19,7 @@ impl IconFeed {
/// Returns:
///
/// The `new` function is returning a `Result` containing an `IconFeed` struct.
pub fn new(config: ChainConfig) -> Result<IconFeed> {
pub fn new(config: ChainConfig, logger: CoreLogger) -> Result<IconFeed> {
let events = config
.event_filter
.split(',')
Expand All @@ -38,6 +40,7 @@ impl IconFeed {
icon_service,
events,
score,
logger,
};

Ok(icon_feed)
Expand All @@ -59,18 +62,26 @@ impl IconFeed {
if !self.events.is_empty() || !self.score.is_empty() {
let tx_hash: String =
serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap();
self.logger.info(&format!(
"Filtering the events with the tx_hash : {:?}",
tx_hash
));

let event_logs = get_event_logs_by_tx_hash(&self.icon_service, &tx_hash).await?;

for event_log in event_logs {
if !&self.score.is_empty() {
for filter_score in &self.score {
if filter_score == &event_log.score_address {
self.logger
.info(&format!("Matched the score filter : {:?}", filter_score));
score_filter = true;
break;
}
}
} else {
self.logger
.info("No score filter found, allowing all the transactions");
score_filter = true;
}

Expand All @@ -83,6 +94,8 @@ impl IconFeed {
}
} else {
events_filter = true;
self.logger
.info("No event filter found, allowing all the transactions");
}

if events_filter && score_filter {
Expand All @@ -94,8 +107,10 @@ impl IconFeed {
events_filter = true;
score_filter = true;
}

Ok(events_filter & score_filter)
let result = events_filter && score_filter;
self.logger
.info(&format!("Filtering the result : {:?}", result));
Ok(result)
}

/// Fetches the events from the Icon chain and returns it through the provided callback function.
Expand All @@ -106,7 +121,10 @@ impl IconFeed {
pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()> {
let mut latest_height = get_icon_block_height(&self.icon_service).await?;
let mut old_height = latest_height - 1;

self.logger.info(&format!(
"Event feed started at {:?}, {:?}",
latest_height, old_height
));
loop {
if old_height < latest_height {
let block = match self
Expand Down Expand Up @@ -137,12 +155,19 @@ impl IconFeed {
}
}

self.logger.info(&format!(
"Filtered the {:?} transactions",
filtered_tx.len()
));

if !filtered_tx.is_empty() {
cb(filtered_tx)
}

old_height += 1;
} else {
self.logger
.info("No new blocks got detected, sleeping for 1 second");
sleep(Duration::from_secs(1));
}

Expand Down
Loading

0 comments on commit a9e00f4

Please sign in to comment.