From 436f714d20f293f9cd85d8367212fec36d743428 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Sun, 5 Nov 2023 19:07:23 +0100 Subject: [PATCH] Reduce task struct boilercode in events processing Before now, we assumed that these tasks might need some structure to enable distributed processing. Since, we offloaded most of the synchonization to the user's DB, we can safely remove the structs and expose their tasks via bare modules. --- chaindexing/src/event_handlers.rs | 4 +- .../src/event_handlers/handle_events.rs | 112 +++++----- chaindexing/src/events_ingester.rs | 4 +- .../src/events_ingester/ingest_events.rs | 211 +++++++++--------- 4 files changed, 156 insertions(+), 175 deletions(-) diff --git a/chaindexing/src/event_handlers.rs b/chaindexing/src/event_handlers.rs index 0cfee30..a905b63 100644 --- a/chaindexing/src/event_handlers.rs +++ b/chaindexing/src/event_handlers.rs @@ -8,8 +8,6 @@ use tokio::{sync::Mutex, time::interval}; use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo}; use crate::{ChaindexingRepoRawQueryTxnClient, HasRawQueryClient}; -use handle_events::HandleEvents; - #[derive(Clone)] pub struct EventHandlerContext<'a> { pub event: Event, @@ -54,7 +52,7 @@ impl EventHandlers { loop { interval.tick().await; - HandleEvents::run( + handle_events::run( conn.clone(), &event_handlers_by_event_abi, &mut raw_query_client, diff --git a/chaindexing/src/event_handlers/handle_events.rs b/chaindexing/src/event_handlers/handle_events.rs index 7027d85..7ea2147 100644 --- a/chaindexing/src/event_handlers/handle_events.rs +++ b/chaindexing/src/event_handlers/handle_events.rs @@ -11,73 +11,69 @@ use crate::{ use super::{EventHandler, EventHandlerContext}; -pub struct HandleEvents; +pub async fn run<'a>( + conn: Arc>>, + event_handlers_by_event_abi: &HashMap<&str, Arc>, + raw_query_client: &mut ChaindexingRepoRawQueryClient, +) { + let mut contract_addresses_stream = + ChaindexingRepo::get_contract_addresses_stream(conn.clone()); -impl HandleEvents { - pub async fn run<'a>( - conn: Arc>>, - event_handlers_by_event_abi: &HashMap<&str, Arc>, - raw_query_client: &mut ChaindexingRepoRawQueryClient, - ) { - let mut contract_addresses_stream = - ChaindexingRepo::get_contract_addresses_stream(conn.clone()); - - while let Some(contract_addresses) = contract_addresses_stream.next().await { - for contract_address in contract_addresses { - Self::handle_events_for_contract_address( - conn.clone(), - &contract_address, - event_handlers_by_event_abi, - raw_query_client, - ) - .await - } + while let Some(contract_addresses) = contract_addresses_stream.next().await { + for contract_address in contract_addresses { + handle_events_for_contract_address( + conn.clone(), + &contract_address, + event_handlers_by_event_abi, + raw_query_client, + ) + .await } } +} - async fn handle_events_for_contract_address<'a>( - conn: Arc>>, - contract_address: &ContractAddress, - event_handlers_by_event_abi: &HashMap<&str, Arc>, - raw_query_client: &mut ChaindexingRepoRawQueryClient, - ) { - let mut events_stream = ChaindexingRepo::get_events_stream( - conn.clone(), - contract_address.next_block_number_to_handle_from, - ); - - while let Some(events) = events_stream.next().await { - // TODO: Move this filter to the stream query level - let mut events: Vec = events - .into_iter() - .filter(|event| { - event.match_contract_address(&contract_address.address) && event.not_removed() - }) - .collect(); - events.sort_by_key(|e| (e.block_number, e.log_index)); +async fn handle_events_for_contract_address<'a>( + conn: Arc>>, + contract_address: &ContractAddress, + event_handlers_by_event_abi: &HashMap<&str, Arc>, + raw_query_client: &mut ChaindexingRepoRawQueryClient, +) { + let mut events_stream = ChaindexingRepo::get_events_stream( + conn.clone(), + contract_address.next_block_number_to_handle_from, + ); - let raw_query_txn_client = - ChaindexingRepo::get_raw_query_txn_client(raw_query_client).await; + while let Some(events) = events_stream.next().await { + // TODO: Move this filter to the stream query level + let mut events: Vec = events + .into_iter() + .filter(|event| { + event.match_contract_address(&contract_address.address) && event.not_removed() + }) + .collect(); + events.sort_by_key(|e| (e.block_number, e.log_index)); - for event in events.clone() { - let event_handler = event_handlers_by_event_abi.get(event.abi.as_str()).unwrap(); - let event_handler_context = - EventHandlerContext::new(event.clone(), &raw_query_txn_client); + let raw_query_txn_client = + ChaindexingRepo::get_raw_query_txn_client(raw_query_client).await; - event_handler.handle_event(event_handler_context).await; - } + for event in events.clone() { + let event_handler = event_handlers_by_event_abi.get(event.abi.as_str()).unwrap(); + let event_handler_context = + EventHandlerContext::new(event.clone(), &raw_query_txn_client); - if let Some(Event { block_number, .. }) = events.last() { - let next_block_number_to_handle_from = block_number + 1; - ChaindexingRepo::update_next_block_number_to_handle_from_in_txn( - &raw_query_txn_client, - contract_address.id(), - next_block_number_to_handle_from, - ) - .await; - } + event_handler.handle_event(event_handler_context).await; + } - ChaindexingRepo::commit_raw_query_txns(raw_query_txn_client).await; + if let Some(Event { block_number, .. }) = events.last() { + let next_block_number_to_handle_from = block_number + 1; + ChaindexingRepo::update_next_block_number_to_handle_from_in_txn( + &raw_query_txn_client, + contract_address.id(), + next_block_number_to_handle_from, + ) + .await; } + + ChaindexingRepo::commit_raw_query_txns(raw_query_txn_client).await; } } diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index 998b38f..7d68231 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -15,8 +15,6 @@ use std::cmp::min; use tokio::sync::Mutex; use tokio::time::{interval, sleep}; -use ingest_events::IngestEvents; - use crate::chain_reorg::Execution; use crate::contracts::Contract; use crate::contracts::{ContractEventTopic, Contracts}; @@ -151,7 +149,7 @@ impl EventsIngester { let mut conn = conn.lock().await; - IngestEvents::run( + ingest_events::run( &mut conn, raw_query_client, contract_addresses.clone(), diff --git a/chaindexing/src/events_ingester/ingest_events.rs b/chaindexing/src/events_ingester/ingest_events.rs index c3c7df8..7769521 100644 --- a/chaindexing/src/events_ingester/ingest_events.rs +++ b/chaindexing/src/events_ingester/ingest_events.rs @@ -13,126 +13,115 @@ use crate::{ use super::{fetch_blocks_by_number, fetch_logs, EventsIngesterError, Filter, Filters}; -pub struct IngestEvents; - -impl IngestEvents { - pub async fn run<'a>( - conn: &mut ChaindexingRepoConn<'a>, - raw_query_client: &ChaindexingRepoRawQueryClient, - contract_addresses: Vec, - contracts: &Vec, - json_rpc: &Arc, - current_block_number: u64, - blocks_per_batch: u64, - ) -> Result<(), EventsIngesterError> { - let filters = Filters::new( - &contract_addresses, - &contracts, - current_block_number, - blocks_per_batch, - &Execution::Main, - ); - - let filters = - Self::remove_already_ingested_filters(&filters, &contract_addresses, raw_query_client) - .await; - - if !filters.is_empty() { - let logs = fetch_logs(&filters, json_rpc).await; - let blocks_by_tx_hash = fetch_blocks_by_number(&logs, json_rpc).await; - let events = Events::new(&logs, &contracts, &blocks_by_tx_hash); - - ChaindexingRepo::run_in_transaction(conn, move |conn| { - async move { - ChaindexingRepo::create_events(conn, &events.clone()).await; - - Self::update_next_block_numbers_to_ingest_from( - conn, - &contract_addresses, - &filters, - ) - .await; - - Ok(()) - } - .boxed() - }) - .await?; - } - - Ok(()) +pub async fn run<'a>( + conn: &mut ChaindexingRepoConn<'a>, + raw_query_client: &ChaindexingRepoRawQueryClient, + contract_addresses: Vec, + contracts: &Vec, + json_rpc: &Arc, + current_block_number: u64, + blocks_per_batch: u64, +) -> Result<(), EventsIngesterError> { + let filters = Filters::new( + &contract_addresses, + &contracts, + current_block_number, + blocks_per_batch, + &Execution::Main, + ); + + let filters = + remove_already_ingested_filters(&filters, &contract_addresses, raw_query_client).await; + + if !filters.is_empty() { + let logs = fetch_logs(&filters, json_rpc).await; + let blocks_by_tx_hash = fetch_blocks_by_number(&logs, json_rpc).await; + let events = Events::new(&logs, &contracts, &blocks_by_tx_hash); + + ChaindexingRepo::run_in_transaction(conn, move |conn| { + async move { + ChaindexingRepo::create_events(conn, &events.clone()).await; + + update_next_block_numbers_to_ingest_from(conn, &contract_addresses, &filters).await; + + Ok(()) + } + .boxed() + }) + .await?; } - async fn remove_already_ingested_filters( - filters: &Vec, - contract_addresses: &Vec, - raw_query_client: &ChaindexingRepoRawQueryClient, - ) -> Vec { - let current_block_filters: Vec<_> = filters - .iter() - .filter(|f| f.value.get_from_block() == f.value.get_to_block()) - .collect(); - - if current_block_filters.is_empty() { - filters.clone() - } else { - let addresses = contract_addresses.iter().map(|c| c.address.clone()).collect(); - - let latest_ingested_events = - ChaindexingRepo::load_latest_events(raw_query_client, &addresses).await; - let latest_ingested_events = latest_ingested_events.iter().fold( - HashMap::new(), - |mut events_by_address, event| { + Ok(()) +} + +async fn remove_already_ingested_filters( + filters: &Vec, + contract_addresses: &Vec, + raw_query_client: &ChaindexingRepoRawQueryClient, +) -> Vec { + let current_block_filters: Vec<_> = filters + .iter() + .filter(|f| f.value.get_from_block() == f.value.get_to_block()) + .collect(); + + if current_block_filters.is_empty() { + filters.clone() + } else { + let addresses = contract_addresses.iter().map(|c| c.address.clone()).collect(); + + let latest_ingested_events = + ChaindexingRepo::load_latest_events(raw_query_client, &addresses).await; + let latest_ingested_events = + latest_ingested_events + .iter() + .fold(HashMap::new(), |mut events_by_address, event| { events_by_address.insert(&event.contract_address, event); events_by_address - }, - ); - - let already_ingested_filters = current_block_filters - .iter() - .filter(|filter| match latest_ingested_events.get(&filter.address) { - Some(latest_event) => { - latest_event.block_number as u64 - == filter.value.get_to_block().unwrap().as_u64() - } - None => false, - }) - .fold(HashMap::new(), |mut stale_current_block_filters, filter| { - stale_current_block_filters.insert(filter.contract_address_id, filter); - - stale_current_block_filters }); - filters - .iter() - .filter(|f| !already_ingested_filters.contains_key(&f.contract_address_id)) - .cloned() - .collect::>() - } + let already_ingested_filters = current_block_filters + .iter() + .filter(|filter| match latest_ingested_events.get(&filter.address) { + Some(latest_event) => { + latest_event.block_number as u64 + == filter.value.get_to_block().unwrap().as_u64() + } + None => false, + }) + .fold(HashMap::new(), |mut stale_current_block_filters, filter| { + stale_current_block_filters.insert(filter.contract_address_id, filter); + + stale_current_block_filters + }); + + filters + .iter() + .filter(|f| !already_ingested_filters.contains_key(&f.contract_address_id)) + .cloned() + .collect::>() } +} - async fn update_next_block_numbers_to_ingest_from<'a>( - conn: &mut ChaindexingRepoConn<'a>, - contract_addresses: &Vec, - filters: &Vec, - ) { - let filters_by_contract_address_id = Filters::group_by_contract_address_id(filters); - - for contract_address in contract_addresses { - let filters = filters_by_contract_address_id.get(&contract_address.id).unwrap(); - - if let Some(latest_filter) = Filters::get_latest(filters) { - let next_block_number_to_ingest_from = - latest_filter.value.get_to_block().unwrap() + 1; - - ChaindexingRepo::update_next_block_number_to_ingest_from( - conn, - &contract_address, - next_block_number_to_ingest_from.as_u64() as i64, - ) - .await - } +async fn update_next_block_numbers_to_ingest_from<'a>( + conn: &mut ChaindexingRepoConn<'a>, + contract_addresses: &Vec, + filters: &Vec, +) { + let filters_by_contract_address_id = Filters::group_by_contract_address_id(filters); + + for contract_address in contract_addresses { + let filters = filters_by_contract_address_id.get(&contract_address.id).unwrap(); + + if let Some(latest_filter) = Filters::get_latest(filters) { + let next_block_number_to_ingest_from = latest_filter.value.get_to_block().unwrap() + 1; + + ChaindexingRepo::update_next_block_number_to_ingest_from( + conn, + &contract_address, + next_block_number_to_ingest_from.as_u64() as i64, + ) + .await } } }