From 67434451108db78c10a08ee495fad43f0f4ccf62 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Wed, 18 Oct 2023 08:00:35 +0100 Subject: [PATCH 1/3] Isolate execution modules in events ingester --- chaindexing/src/events_ingester.rs | 224 +----------------- .../src/events_ingester/ingest_events.rs | 80 +++++++ .../src/events_ingester/ingested_events.rs | 153 ++++++++++++ 3 files changed, 243 insertions(+), 214 deletions(-) create mode 100644 chaindexing/src/events_ingester/ingest_events.rs create mode 100644 chaindexing/src/events_ingester/ingested_events.rs diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index 718922a..562e771 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -1,4 +1,7 @@ -use std::collections::{HashMap, HashSet}; +mod ingest_events; +mod ingested_events; + +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -7,16 +10,17 @@ use ethers::prelude::*; use ethers::providers::{Http, Provider, ProviderError}; use ethers::types::{Address, Filter as EthersFilter, Log}; use futures_util::future::try_join_all; -use futures_util::FutureExt; use futures_util::StreamExt; use std::cmp::min; use tokio::sync::Mutex; use tokio::time::{interval, sleep}; -use crate::chain_reorg::{Execution, UnsavedReorgedBlock}; +use ingest_events::IngestEvents; +use ingested_events::MaybeBacktrackIngestedEvents; + +use crate::chain_reorg::Execution; use crate::contracts::Contract; use crate::contracts::{ContractEventTopic, Contracts}; -use crate::events::{Event, Events}; use crate::{ ChaindexingRepo, ChaindexingRepoConn, Config, ContractAddress, MinConfirmationCount, Repo, RepoError, Streamable, @@ -144,7 +148,7 @@ impl EventsIngester { let mut conn = conn.lock().await; - MainExecution::ingest( + IngestEvents::run( &mut conn, contract_addresses.clone(), contracts, @@ -155,7 +159,7 @@ impl EventsIngester { .await?; if run_confirmation_execution { - ConfirmationExecution::ingest( + MaybeBacktrackIngestedEvents::run( &mut conn, contract_addresses.clone(), contracts, @@ -184,213 +188,6 @@ impl EventsIngester { } } -struct MainExecution; - -impl MainExecution { - async fn ingest<'a>( - conn: &mut ChaindexingRepoConn<'a>, - contract_addresses: Vec, - contracts: &Vec, - json_rpc: &Arc, - current_block_number: u64, - blocks_per_batch: u64, - ) -> Result<(), EventsIngesterError> { - let filters = Filters::new( - &contract_addresses, - &contracts, - current_block_number, - blocks_per_batch, - &Execution::Main, - ); - - if !filters.is_empty() { - let logs = fetch_logs(&filters, json_rpc).await; - let blocks_by_tx_hash = fetch_blocks_by_tx_hash(&logs, json_rpc).await; - let events = Events::new(&logs, &contracts, &blocks_by_tx_hash); - - ChaindexingRepo::run_in_transaction(conn, move |conn| { - async move { - ChaindexingRepo::create_events(conn, &events.clone()).await; - - Self::update_next_block_numbers_to_ingest_from( - conn, - &contract_addresses, - &filters, - ) - .await; - - Ok(()) - } - .boxed() - }) - .await?; - } - - Ok(()) - } - - async fn update_next_block_numbers_to_ingest_from<'a>( - conn: &mut ChaindexingRepoConn<'a>, - contract_addresses: &Vec, - filters: &Vec, - ) { - let filters_by_contract_address_id = Filters::group_by_contract_address_id(filters); - - for contract_address in contract_addresses { - let filters = filters_by_contract_address_id.get(&contract_address.id).unwrap(); - - if let Some(latest_filter) = Filters::get_latest(filters) { - let next_block_number_to_ingest_from = - latest_filter.value.get_to_block().unwrap() + 1; - - ChaindexingRepo::update_next_block_number_to_ingest_from( - conn, - &contract_address, - next_block_number_to_ingest_from.as_u64() as i64, - ) - .await - } - } - } -} - -struct ConfirmationExecution; - -impl ConfirmationExecution { - async fn ingest<'a>( - conn: &mut ChaindexingRepoConn<'a>, - contract_addresses: Vec, - contracts: &Vec, - json_rpc: &Arc, - chain: &Chain, - current_block_number: u64, - blocks_per_batch: u64, - min_confirmation_count: &MinConfirmationCount, - ) -> Result<(), EventsIngesterError> { - let filters = Filters::new( - &contract_addresses, - &contracts, - current_block_number, - blocks_per_batch, - &Execution::Confirmation(min_confirmation_count), - ); - - if !filters.is_empty() { - let already_ingested_events = Self::get_already_ingested_events(conn, &filters).await; - let json_rpc_events = Self::get_json_rpc_events(&filters, json_rpc, contracts).await; - - Self::maybe_handle_chain_reorg(conn, chain, &already_ingested_events, &json_rpc_events) - .await?; - } - - Ok(()) - } - - async fn get_already_ingested_events<'a>( - conn: &mut ChaindexingRepoConn<'a>, - filters: &Vec, - ) -> Vec { - let mut already_ingested_events = vec![]; - for filter in filters { - let from_block = filter.value.get_from_block().unwrap().as_u64(); - let to_block = filter.value.get_to_block().unwrap().as_u64(); - - let mut events = - ChaindexingRepo::get_events(conn, filter.address.to_owned(), from_block, to_block) - .await; - already_ingested_events.append(&mut events); - } - - already_ingested_events - } - - async fn get_json_rpc_events( - filters: &Vec, - json_rpc: &Arc, - contracts: &Vec, - ) -> Vec { - let logs = fetch_logs(&filters, json_rpc).await; - let blocks_by_tx_hash = fetch_blocks_by_tx_hash(&logs, json_rpc).await; - - Events::new(&logs, contracts, &blocks_by_tx_hash) - } - - async fn maybe_handle_chain_reorg<'a>( - conn: &mut ChaindexingRepoConn<'a>, - chain: &Chain, - already_ingested_events: &Vec, - json_rpc_events: &Vec, - ) -> Result<(), EventsIngesterError> { - if let Some((added_events, removed_events)) = - Self::get_json_rpc_added_and_removed_events(&already_ingested_events, &json_rpc_events) - { - let earliest_block_number = - Self::get_earliest_block_number((&added_events, &removed_events)); - let new_reorged_block = UnsavedReorgedBlock::new(earliest_block_number, chain); - - ChaindexingRepo::run_in_transaction(conn, move |conn| { - async move { - ChaindexingRepo::create_reorged_block(conn, &new_reorged_block).await; - - let event_ids = removed_events.iter().map(|e| e.id).collect(); - ChaindexingRepo::delete_events_by_ids(conn, &event_ids).await; - - ChaindexingRepo::create_events(conn, &added_events).await; - - Ok(()) - } - .boxed() - }) - .await?; - } - - Ok(()) - } - - fn get_json_rpc_added_and_removed_events( - already_ingested_events: &Vec, - json_rpc_events: &Vec, - ) -> Option<(Vec, Vec)> { - let already_ingested_events_set: HashSet<_> = - already_ingested_events.clone().into_iter().collect(); - let json_rpc_events_set: HashSet<_> = json_rpc_events.clone().into_iter().collect(); - - let added_events: Vec<_> = json_rpc_events - .clone() - .into_iter() - .filter(|e| !already_ingested_events_set.contains(e)) - .collect(); - - let removed_events: Vec<_> = already_ingested_events - .clone() - .into_iter() - .filter(|e| !json_rpc_events_set.contains(e)) - .collect(); - - if added_events.is_empty() && removed_events.is_empty() { - None - } else { - Some((added_events, removed_events)) - } - } - - fn get_earliest_block_number( - (added_events, removed_events): (&Vec, &Vec), - ) -> i64 { - let earliest_added_event = added_events.iter().min_by_key(|e| e.block_number); - let earliest_removed_event = removed_events.iter().min_by_key(|e| e.block_number); - - match (earliest_added_event, earliest_removed_event) { - (None, Some(event)) => event.block_number, - (Some(event), None) => event.block_number, - (Some(earliest_added), Some(earliest_removed)) => { - min(earliest_added.block_number, earliest_removed.block_number) - } - _ => unreachable!("Added Events or Removed Events must have at least one entry"), - } - } -} - async fn fetch_current_block_number<'a>(json_rpc: &'a Arc) -> u64 { let mut maybe_current_block_number = None; let mut retries_so_far = 0; @@ -454,7 +251,6 @@ async fn fetch_blocks_by_tx_hash( maybe_blocks_by_tx_hash.unwrap() } - async fn backoff(retries_so_far: u32) { sleep(Duration::from_secs(2u64.pow(retries_so_far))).await; } diff --git a/chaindexing/src/events_ingester/ingest_events.rs b/chaindexing/src/events_ingester/ingest_events.rs new file mode 100644 index 0000000..845257f --- /dev/null +++ b/chaindexing/src/events_ingester/ingest_events.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use futures_util::FutureExt; + +use crate::chain_reorg::Execution; +use crate::contracts::Contract; +use crate::events::Events; +use crate::{ChaindexingRepo, ChaindexingRepoConn, ContractAddress, EventsIngesterJsonRpc, Repo}; + +use super::{fetch_blocks_by_tx_hash, fetch_logs, EventsIngesterError, Filter, Filters}; + +pub struct IngestEvents; + +impl IngestEvents { + pub async fn run<'a>( + conn: &mut ChaindexingRepoConn<'a>, + contract_addresses: Vec, + contracts: &Vec, + json_rpc: &Arc, + current_block_number: u64, + blocks_per_batch: u64, + ) -> Result<(), EventsIngesterError> { + let filters = Filters::new( + &contract_addresses, + &contracts, + current_block_number, + blocks_per_batch, + &Execution::Main, + ); + + if !filters.is_empty() { + let logs = fetch_logs(&filters, json_rpc).await; + let blocks_by_tx_hash = fetch_blocks_by_tx_hash(&logs, json_rpc).await; + let events = Events::new(&logs, &contracts, &blocks_by_tx_hash); + + ChaindexingRepo::run_in_transaction(conn, move |conn| { + async move { + ChaindexingRepo::create_events(conn, &events.clone()).await; + + Self::update_next_block_numbers_to_ingest_from( + conn, + &contract_addresses, + &filters, + ) + .await; + + Ok(()) + } + .boxed() + }) + .await?; + } + + Ok(()) + } + + async fn update_next_block_numbers_to_ingest_from<'a>( + conn: &mut ChaindexingRepoConn<'a>, + contract_addresses: &Vec, + filters: &Vec, + ) { + let filters_by_contract_address_id = Filters::group_by_contract_address_id(filters); + + for contract_address in contract_addresses { + let filters = filters_by_contract_address_id.get(&contract_address.id).unwrap(); + + if let Some(latest_filter) = Filters::get_latest(filters) { + let next_block_number_to_ingest_from = + latest_filter.value.get_to_block().unwrap() + 1; + + ChaindexingRepo::update_next_block_number_to_ingest_from( + conn, + &contract_address, + next_block_number_to_ingest_from.as_u64() as i64, + ) + .await + } + } + } +} diff --git a/chaindexing/src/events_ingester/ingested_events.rs b/chaindexing/src/events_ingester/ingested_events.rs new file mode 100644 index 0000000..d52e96d --- /dev/null +++ b/chaindexing/src/events_ingester/ingested_events.rs @@ -0,0 +1,153 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use ethers::prelude::*; +use futures_util::FutureExt; +use std::cmp::min; + +use crate::chain_reorg::{Execution, UnsavedReorgedBlock}; +use crate::contracts::Contract; +use crate::events::{Event, Events}; +use crate::{ + ChaindexingRepo, ChaindexingRepoConn, ContractAddress, EventsIngesterJsonRpc, + MinConfirmationCount, Repo, +}; + +use super::{fetch_blocks_by_tx_hash, fetch_logs, EventsIngesterError, Filter, Filters}; + +pub struct MaybeBacktrackIngestedEvents; + +impl MaybeBacktrackIngestedEvents { + pub async fn run<'a>( + conn: &mut ChaindexingRepoConn<'a>, + contract_addresses: Vec, + contracts: &Vec, + json_rpc: &Arc, + chain: &Chain, + current_block_number: u64, + blocks_per_batch: u64, + min_confirmation_count: &MinConfirmationCount, + ) -> Result<(), EventsIngesterError> { + let filters = Filters::new( + &contract_addresses, + &contracts, + current_block_number, + blocks_per_batch, + &Execution::Confirmation(min_confirmation_count), + ); + + if !filters.is_empty() { + let already_ingested_events = Self::get_already_ingested_events(conn, &filters).await; + let json_rpc_events = Self::get_json_rpc_events(&filters, json_rpc, contracts).await; + + Self::maybe_handle_chain_reorg(conn, chain, &already_ingested_events, &json_rpc_events) + .await?; + } + + Ok(()) + } + + async fn get_already_ingested_events<'a>( + conn: &mut ChaindexingRepoConn<'a>, + filters: &Vec, + ) -> Vec { + let mut already_ingested_events = vec![]; + for filter in filters { + let from_block = filter.value.get_from_block().unwrap().as_u64(); + let to_block = filter.value.get_to_block().unwrap().as_u64(); + + let mut events = + ChaindexingRepo::get_events(conn, filter.address.to_owned(), from_block, to_block) + .await; + already_ingested_events.append(&mut events); + } + + already_ingested_events + } + + async fn get_json_rpc_events( + filters: &Vec, + json_rpc: &Arc, + contracts: &Vec, + ) -> Vec { + let logs = fetch_logs(&filters, json_rpc).await; + let blocks_by_tx_hash = fetch_blocks_by_tx_hash(&logs, json_rpc).await; + + Events::new(&logs, contracts, &blocks_by_tx_hash) + } + + async fn maybe_handle_chain_reorg<'a>( + conn: &mut ChaindexingRepoConn<'a>, + chain: &Chain, + already_ingested_events: &Vec, + json_rpc_events: &Vec, + ) -> Result<(), EventsIngesterError> { + if let Some((added_events, removed_events)) = + Self::get_json_rpc_added_and_removed_events(&already_ingested_events, &json_rpc_events) + { + let earliest_block_number = + Self::get_earliest_block_number((&added_events, &removed_events)); + let new_reorged_block = UnsavedReorgedBlock::new(earliest_block_number, chain); + + ChaindexingRepo::run_in_transaction(conn, move |conn| { + async move { + ChaindexingRepo::create_reorged_block(conn, &new_reorged_block).await; + + let event_ids = removed_events.iter().map(|e| e.id).collect(); + ChaindexingRepo::delete_events_by_ids(conn, &event_ids).await; + + ChaindexingRepo::create_events(conn, &added_events).await; + + Ok(()) + } + .boxed() + }) + .await?; + } + + Ok(()) + } + + fn get_json_rpc_added_and_removed_events( + already_ingested_events: &Vec, + json_rpc_events: &Vec, + ) -> Option<(Vec, Vec)> { + let already_ingested_events_set: HashSet<_> = + already_ingested_events.clone().into_iter().collect(); + let json_rpc_events_set: HashSet<_> = json_rpc_events.clone().into_iter().collect(); + + let added_events: Vec<_> = json_rpc_events + .clone() + .into_iter() + .filter(|e| !already_ingested_events_set.contains(e)) + .collect(); + + let removed_events: Vec<_> = already_ingested_events + .clone() + .into_iter() + .filter(|e| !json_rpc_events_set.contains(e)) + .collect(); + + if added_events.is_empty() && removed_events.is_empty() { + None + } else { + Some((added_events, removed_events)) + } + } + + fn get_earliest_block_number( + (added_events, removed_events): (&Vec, &Vec), + ) -> i64 { + let earliest_added_event = added_events.iter().min_by_key(|e| e.block_number); + let earliest_removed_event = removed_events.iter().min_by_key(|e| e.block_number); + + match (earliest_added_event, earliest_removed_event) { + (None, Some(event)) => event.block_number, + (Some(event), None) => event.block_number, + (Some(earliest_added), Some(earliest_removed)) => { + min(earliest_added.block_number, earliest_removed.block_number) + } + _ => unreachable!("Added Events or Removed Events must have at least one entry"), + } + } +} From e4c05ed1665b38f4c3f54401d9167021f59d27a2 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Wed, 18 Oct 2023 08:20:03 +0100 Subject: [PATCH 2/3] Always run confirmation execution This is okay since it can run without idempotently without side effects. --- .../src/tests/events_ingester.rs | 6 ---- chaindexing/src/events_ingester.rs | 31 +++++++------------ 2 files changed, 11 insertions(+), 26 deletions(-) diff --git a/chaindexing-tests/src/tests/events_ingester.rs b/chaindexing-tests/src/tests/events_ingester.rs index 03d64ed..06bcae1 100644 --- a/chaindexing-tests/src/tests/events_ingester.rs +++ b/chaindexing-tests/src/tests/events_ingester.rs @@ -1,6 +1,5 @@ #[cfg(test)] mod tests { - use ethers::types::U64; use std::sync::Arc; use tokio::sync::Mutex; @@ -37,7 +36,6 @@ mod tests { json_rpc, &Chain::Mainnet, &MinConfirmationCount::new(1), - false, ) .await .unwrap(); @@ -85,7 +83,6 @@ mod tests { json_rpc, &Chain::Mainnet, &MinConfirmationCount::new(1), - false, ) .await .unwrap(); @@ -117,7 +114,6 @@ mod tests { json_rpc, &Chain::Mainnet, &MinConfirmationCount::new(1), - false, ) .await .unwrap(); @@ -155,7 +151,6 @@ mod tests { json_rpc, &Chain::Mainnet, &MinConfirmationCount::new(1), - false, ) .await .unwrap(); @@ -184,7 +179,6 @@ mod tests { json_rpc, &Chain::Mainnet, &MinConfirmationCount::new(1), - false, ) .await .unwrap(); diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index 562e771..26798a0 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -99,7 +99,6 @@ impl EventsIngester { let conn = Arc::new(Mutex::new(conn)); let contracts = config.contracts.clone(); let mut interval = interval(Duration::from_millis(config.ingestion_interval_ms)); - let mut run_confirmation_execution = false; loop { interval.tick().await; @@ -114,15 +113,10 @@ impl EventsIngester { json_rpc, &chain, &config.min_confirmation_count, - run_confirmation_execution, ) .await .unwrap(); } - - if !run_confirmation_execution { - run_confirmation_execution = true; - } } }); } @@ -134,7 +128,6 @@ impl EventsIngester { json_rpc: Arc, chain: &Chain, min_confirmation_count: &MinConfirmationCount, - run_confirmation_execution: bool, ) -> Result<(), EventsIngesterError> { let current_block_number = fetch_current_block_number(&json_rpc).await; let mut contract_addresses_stream = @@ -158,19 +151,17 @@ impl EventsIngester { ) .await?; - if run_confirmation_execution { - MaybeBacktrackIngestedEvents::run( - &mut conn, - contract_addresses.clone(), - contracts, - &json_rpc, - chain, - current_block_number, - blocks_per_batch, - min_confirmation_count, - ) - .await?; - } + MaybeBacktrackIngestedEvents::run( + &mut conn, + contract_addresses.clone(), + contracts, + &json_rpc, + chain, + current_block_number, + blocks_per_batch, + min_confirmation_count, + ) + .await?; } Ok(()) From 40a1b69457c49766b8553424322bc0068a9e0885 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Wed, 18 Oct 2023 08:44:51 +0100 Subject: [PATCH 3/3] Randomize log index to avoid conflict in tests --- chaindexing-tests/Cargo.toml | 1 + chaindexing-tests/src/factory/json_rpcs.rs | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/chaindexing-tests/Cargo.toml b/chaindexing-tests/Cargo.toml index eabe085..ead7806 100644 --- a/chaindexing-tests/Cargo.toml +++ b/chaindexing-tests/Cargo.toml @@ -9,6 +9,7 @@ chaindexing = { path = "../chaindexing", features = ["postgres"] } ethers = "2.0" dotenvy = "0.15" diesel = { version = "2", features = ["postgres", "chrono"] } +rand = "0.8.5" serde = { version = "1.0", features = ["derive"] } tokio = { version = "1.29", features = ["full"] } diff --git a/chaindexing-tests/src/factory/json_rpcs.rs b/chaindexing-tests/src/factory/json_rpcs.rs index 7cbc9fb..c206dd3 100644 --- a/chaindexing-tests/src/factory/json_rpcs.rs +++ b/chaindexing-tests/src/factory/json_rpcs.rs @@ -2,6 +2,8 @@ use chaindexing::EventsIngesterJsonRpc; use ethers::providers::ProviderError; use ethers::types::{Block, Filter, Log, TxHash, U64}; +use rand::seq::SliceRandom; + pub fn empty_json_rpc() -> impl EventsIngesterJsonRpc { #[derive(Clone)] struct JsonRpc; @@ -30,6 +32,8 @@ use ethers::types::{Bytes, H160, H256}; use std::str::FromStr; pub fn transfer_log(contract_address: &str) -> Log { + let log_index = *(1..800).collect::>().choose(&mut rand::thread_rng()).unwrap(); + Log { address: H160::from_str(contract_address).unwrap(), topics: vec![ @@ -47,7 +51,7 @@ pub fn transfer_log(contract_address: &str) -> Log { "0x83d751998ff98cd609bc9b18bb36bdef8659cde2f74d6d7a1b0fef2c2bf8f839", )), transaction_index: Some(89.into()), - log_index: Some(218.into()), + log_index: Some(log_index.into()), transaction_log_index: None, log_type: None, removed: Some(false),