Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: logger implementation for event feeder like cosmos icon eth substrate #130

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions event_feed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ dirs = '5.0.1'
envy = '0.4'
anyhow = '1.0.82'
icon-sdk = '1.2.0'
tendermint-rpc = {version = "0.35.0", features = ["http-client", "websocket-client"]}
tendermint-rpc = { version = "0.35.0", features = [
"http-client",
"websocket-client",
] }
futures = "0.3.30"
base64 = "0.22.0"
clap ={ version = "4.5.4", features = [ "derive", "env" ]}

[dependencies.ethers]
version = '2.0.8'
default_features = false
features = [
'ws',
'rustls',
]
features = ['ws', 'rustls']

[dependencies.serde]
version = '1.0.198'
Expand All @@ -42,8 +42,4 @@ path = '../runtime/lite'

[dependencies.tokio]
version = '1.36.0'
features = [
'macros',
'time',
'rt-multi-thread',
]
features = ['macros', 'time', 'rt-multi-thread']
31 changes: 28 additions & 3 deletions event_feed/src/cosmos/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::*;
use crate::common;
use anyhow::*;
use futures::StreamExt;
use runtime::{logger::CoreLogger, Logger};
use serde_json::Value;
use tendermint_rpc::event::{Event, EventData};

/// Represents the cosmos blockchain event feed.
pub struct CosmosFeed {
pub chain_config: ChainConfig,
pub events: Vec<String>,
pub logger: CoreLogger,
}

impl CosmosFeed {
Expand All @@ -16,7 +19,7 @@ impl CosmosFeed {
/// Arguments:
///
/// * `chain_config`: An event feed chain configuration.
pub fn new(chain_config: common::ChainConfig) -> CosmosFeed {
pub fn new(chain_config: common::ChainConfig, logger: CoreLogger) -> CosmosFeed {
let events = chain_config
.event_filter
.split(',')
Expand All @@ -26,6 +29,7 @@ impl CosmosFeed {
CosmosFeed {
chain_config,
events,
logger,
}
}

Expand All @@ -38,6 +42,10 @@ impl CosmosFeed {
let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url)
.await
.unwrap();
self.logger.info(&format!(
"Following the chain at {}",
self.chain_config.node_url
));
let driver_handle = tokio::spawn(async move { driver.run().await });

let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
Expand All @@ -50,10 +58,12 @@ impl CosmosFeed {
let filter_events = events
.iter()
.filter(|tendermint_event| {
Self::convert_to_feeder_event(tendermint_event, &self.chain_config).is_some()
Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config)
.is_some()
})
.flat_map(|tendermint_event| {
Self::convert_to_feeder_event(tendermint_event, &self.chain_config).unwrap()
Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config)
.unwrap()
})
.collect::<Vec<serde_json::Value>>();

Expand All @@ -64,6 +74,7 @@ impl CosmosFeed {
}

drop(subs);
self.logger.info("Websocket connection closed!!");
let _ = driver_handle.await;
Ok(())
}
Expand All @@ -79,9 +90,12 @@ impl CosmosFeed {
///
/// Returns the list of `FeederEvent` objects as json values.
fn convert_to_feeder_event(
&self,
event: &Event,
chain_config: &ChainConfig,
) -> Option<Vec<serde_json::Value>> {
self.logger
.debug(&format!("Processing events : {:?}", event));
match event.data.clone() {
EventData::LegacyNewBlock {
ref block,
Expand All @@ -91,8 +105,16 @@ impl CosmosFeed {
let block = block.as_ref().unwrap();
let block_number = block.header.version.block as usize;
let hash_string = block.header.last_commit_hash.map(|h| h.to_string());
self.logger.info(&format!(
"Processing LegacyNewBlockEvent for block: {}",
block.header.version.block
));

let filtered_events: Vec<Value> = if chain_config.event_filter.is_empty() {
self.logger.info(&format!(
"Processing all events from block : {}",
block_number
));
result_begin_block
.unwrap()
.events
Expand All @@ -108,6 +130,7 @@ impl CosmosFeed {
.map(|e| serde_json::to_value(e).unwrap())
.collect()
} else {
self.logger.info("Filtering events based on the event name");
result_begin_block
.unwrap()
.events
Expand All @@ -128,8 +151,10 @@ impl CosmosFeed {
};

if !filtered_events.is_empty() {
self.logger.info("returning the filtered events");
Some(filtered_events)
} else {
self.logger.info("No events matched the filter");
None
}
}
Expand Down
14 changes: 8 additions & 6 deletions event_feed/src/cosmos/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod tests {

use super::super::*;

use runtime::{logger::CoreLogger, Logger};

#[tokio::test]
async fn test_cosmos_feed_new() {
let chain_config = ChainConfig {
Expand All @@ -10,8 +10,9 @@ mod tests {
contract_filter: "".to_string(),
chain: "cosmos".to_string(),
};
let cosmos_feed = CosmosFeed::new(chain_config.clone());
let logger = CoreLogger::new(Some("./event-feed.log"));

let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger);
assert_eq!(cosmos_feed.chain_config, chain_config);
assert_eq!(cosmos_feed.events.len(), 1);
assert!(cosmos_feed.events.contains(&"transfer".to_string()));
Expand All @@ -29,8 +30,8 @@ mod tests {
let callback = |events: Vec<serde_json::Value>| {
assert!(events.is_empty());
};

let cosmos_feed = CosmosFeed::new(chain_config.clone());
let logger = CoreLogger::new(Some("./event-feed.log"));
let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger);
let result = cosmos_feed.event_feed(&callback).await;

assert!(result.is_ok());
Expand All @@ -51,7 +52,8 @@ mod tests {
// Simulate an error condition
assert!(events.is_empty());
};
let cosmos_feed = CosmosFeed::new(chain_config.clone());
let logger = CoreLogger::new(Some("./event-feed.log"));
let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger);

let _ = cosmos_feed.event_feed(&callback).await;
}
Expand Down
21 changes: 16 additions & 5 deletions event_feed/src/eth/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use super::*;
use runtime::{logger::CoreLogger, Logger};
/// Represents an Ethereum blockchain event feed.
pub struct EthFeed {
eth_service: Provider<Ws>,
events: Vec<(String, H256)>,
contracts: Vec<String>,
logger: CoreLogger,
}

impl EthFeed {
/// Creates a new EthFeed instance based on the provided chain configuration.
///
///
/// Arguments:
///
///
/// * `config`: An event feed chain configuration.
pub async fn new(config: ChainConfig) -> Result<EthFeed> {
pub async fn new(config: ChainConfig, logger: CoreLogger) -> Result<EthFeed> {
let events = config
.event_filter
.split(';')
Expand All @@ -33,15 +35,16 @@ impl EthFeed {
eth_service: client,
events,
contracts,
logger,
};

Ok(eth_feed)
}

/// Fetches the events from the Ethereum chain and returns it through the provided callback function.
///
///
/// # Arguments:
///
///
/// * `cb`: A callback function to return the events to the caller.
pub async fn event_feed(&self, cb: &dyn Fn(Vec<Value>)) -> Result<()> {
let client = Arc::new(&self.eth_service);
Expand All @@ -58,6 +61,10 @@ impl EthFeed {
let filter = Filter::new().from_block(last_block - 25).events(events);

let mut stream = client.subscribe_logs(&filter).await?;
self.logger.info(&format!(
"Subscribed to events with the filter : {:?}",
filter
));

while let Some(log) = stream.next().await {
if self.contracts.is_empty() || self.contracts.contains(&format!("{:?}", &log.address))
Expand All @@ -83,6 +90,10 @@ impl EthFeed {
let tx_receipt = client.get_transaction_receipt(tx_hash).await;

if let Ok(Some(tx_receipt)) = tx_receipt {
self.logger.info(&format!(
"Received trabsaction receipt for the tx_hash : {:?}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typographical error "transaction"

tx_hash
));
let mut logs = Vec::<Value>::new();

for log in tx_receipt.logs.iter() {
Expand Down
33 changes: 29 additions & 4 deletions event_feed/src/icon/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use super::*;
use runtime::{logger::CoreLogger, Logger};

/// Represents the icon blockchain event feed which contains the endpoint and the filters.
pub struct IconFeed {
icon_service: IconService,
events: Vec<String>,
score: Vec<String>,
logger: CoreLogger,
}

impl IconFeed {
Expand All @@ -17,7 +19,7 @@ impl IconFeed {
/// Returns:
///
/// The `new` function is returning a `Result` containing an `IconFeed` struct.
pub fn new(config: ChainConfig) -> Result<IconFeed> {
pub fn new(config: ChainConfig, logger: CoreLogger) -> Result<IconFeed> {
let events = config
.event_filter
.split(',')
Expand All @@ -38,6 +40,7 @@ impl IconFeed {
icon_service,
events,
score,
logger,
};

Ok(icon_feed)
Expand All @@ -59,18 +62,26 @@ impl IconFeed {
if !self.events.is_empty() || !self.score.is_empty() {
let tx_hash: String =
serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap();
self.logger.info(&format!(
"Filtering the events with the tx_hash : {:?}",
tx_hash
));

let event_logs = get_event_logs_by_tx_hash(&self.icon_service, &tx_hash).await?;

for event_log in event_logs {
if !&self.score.is_empty() {
for filter_score in &self.score {
if filter_score == &event_log.score_address {
self.logger
.info(&format!("Matched the score filter : {:?}", filter_score));
score_filter = true;
break;
}
}
} else {
self.logger
.info("No score filter found, allowing all the transactions");
score_filter = true;
}

Expand All @@ -83,6 +94,8 @@ impl IconFeed {
}
} else {
events_filter = true;
self.logger
.info("No event filter found, allowing all the transactions");
}

if events_filter && score_filter {
Expand All @@ -94,8 +107,10 @@ impl IconFeed {
events_filter = true;
score_filter = true;
}

Ok(events_filter & score_filter)
let result = events_filter && score_filter;
self.logger
.info(&format!("Filtering the result : {:?}", result));
Ok(result)
}

/// Fetches the events from the Icon chain and returns it through the provided callback function.
Expand All @@ -106,7 +121,10 @@ impl IconFeed {
pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()> {
let mut latest_height = get_icon_block_height(&self.icon_service).await?;
let mut old_height = latest_height - 1;

self.logger.info(&format!(
"Event feed started at {:?}, {:?}",
latest_height, old_height
));
loop {
if old_height < latest_height {
let block = match self
Expand Down Expand Up @@ -137,12 +155,19 @@ impl IconFeed {
}
}

self.logger.info(&format!(
"Filtered the {:?} transactions",
filtered_tx.len()
));

if !filtered_tx.is_empty() {
cb(filtered_tx)
}

old_height += 1;
} else {
self.logger
.info("No new blocks got detected, sleeping for 1 second");
sleep(Duration::from_secs(1));
}

Expand Down
Loading