From 5134b9a34f06f3b7f8936ad4a36a4d33dbabb8bd Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Sat, 23 Dec 2023 08:00:15 +0100 Subject: [PATCH] Introduce SharedState In EventHandlers (Breaking Change) A shared state is exposed in event handlers to allow performing side effects using the app's state or resources. A typical example would be to store the DBPool for a web app in the shared state to allow doing some DB related logic owned by your application. When there is no shared state required for indexing, the user can simply pass None in the `Config::new` build fn and `NoSharedState` in the associated type for the event handlers. --- chaindexing-tests/src/factory/contracts.rs | 4 +-- .../src/factory/event_handlers.rs | 10 ++++-- chaindexing-tests/src/factory/events.rs | 4 +-- chaindexing-tests/src/factory/json_rpcs.rs | 2 +- .../src/tests/contract_states.rs | 11 ++++--- .../src/tests/events_ingester.rs | 5 +-- chaindexing/src/config.rs | 15 +++++---- chaindexing/src/contract_states.rs | 18 +++++++---- chaindexing/src/contracts.rs | 32 ++++++++++--------- chaindexing/src/event_handlers.rs | 29 ++++++++++++++--- .../src/event_handlers/handle_events.rs | 19 +++++++---- chaindexing/src/events.rs | 4 +-- chaindexing/src/events_ingester.rs | 10 +++--- .../src/events_ingester/ingest_events.rs | 6 ++-- .../maybe_handle_chain_reorg.rs | 10 +++--- chaindexing/src/lib.rs | 24 ++++++++------ 16 files changed, 126 insertions(+), 77 deletions(-) diff --git a/chaindexing-tests/src/factory/contracts.rs b/chaindexing-tests/src/factory/contracts.rs index 9a58e3c..a123ce8 100644 --- a/chaindexing-tests/src/factory/contracts.rs +++ b/chaindexing-tests/src/factory/contracts.rs @@ -1,4 +1,4 @@ -use chaindexing::{Chain, Contract}; +use chaindexing::{Chain, Contract, NoSharedState}; use super::{ApprovalForAllTestEventHandler, TransferTestEventHandler}; @@ -10,7 +10,7 @@ pub const APPROCAL_EVENT_ABI: &str = pub const BAYC_CONTRACT_ADDRESS: &str = "0xBC4CA0EdA7647A8aB7C2061c2E118A18a936f13D"; pub const BAYC_CONTRACT_START_BLOCK_NUMBER: u32 = 17773490; -pub fn bayc_contract() -> Contract { +pub fn bayc_contract() -> Contract { Contract::new("BoredApeYachtClub") .add_event(TRANSFER_EVENT_ABI, TransferTestEventHandler) .add_event(APPROCAL_EVENT_ABI, ApprovalForAllTestEventHandler) diff --git a/chaindexing-tests/src/factory/event_handlers.rs b/chaindexing-tests/src/factory/event_handlers.rs index 92a5c29..4876e21 100644 --- a/chaindexing-tests/src/factory/event_handlers.rs +++ b/chaindexing-tests/src/factory/event_handlers.rs @@ -1,4 +1,4 @@ -use chaindexing::{EventContext, EventHandler}; +use chaindexing::{EventContext, EventHandler, NoSharedState}; #[derive(Clone, Debug)] pub struct NftState; @@ -7,12 +7,16 @@ pub struct TransferTestEventHandler; #[async_trait::async_trait] impl EventHandler for TransferTestEventHandler { - async fn handle_event<'a>(&self, _event_context: EventContext<'a>) {} + type SharedState = NoSharedState; + + async fn handle_event<'a>(&self, _event_context: EventContext<'a, Self::SharedState>) {} } pub struct ApprovalForAllTestEventHandler; #[async_trait::async_trait] impl EventHandler for ApprovalForAllTestEventHandler { - async fn handle_event<'a>(&self, _event_context: EventContext<'a>) {} + type SharedState = NoSharedState; + + async fn handle_event<'a>(&self, _event_context: EventContext<'a, Self::SharedState>) {} } diff --git a/chaindexing-tests/src/factory/events.rs b/chaindexing-tests/src/factory/events.rs index 4fbe78b..a8cdfb4 100644 --- a/chaindexing-tests/src/factory/events.rs +++ b/chaindexing-tests/src/factory/events.rs @@ -1,11 +1,11 @@ use std::collections::HashMap; -use chaindexing::{Contract, Event, Events}; +use chaindexing::{Contract, Event, Events, NoSharedState}; use ethers::types::Block; use super::{transfer_log, BAYC_CONTRACT_ADDRESS}; -pub fn transfer_event_with_contract(contract: Contract) -> Event { +pub fn transfer_event_with_contract(contract: Contract) -> Event { let contract_address = BAYC_CONTRACT_ADDRESS; let transfer_log = transfer_log(contract_address); let blocks_by_number = HashMap::from([( diff --git a/chaindexing-tests/src/factory/json_rpcs.rs b/chaindexing-tests/src/factory/json_rpcs.rs index c5681d8..dff2c20 100644 --- a/chaindexing-tests/src/factory/json_rpcs.rs +++ b/chaindexing-tests/src/factory/json_rpcs.rs @@ -70,10 +70,10 @@ macro_rules! json_rpc_with_logs { json_rpc_with_logs!($contract_address, 17774490) }}; ($contract_address:expr, $current_block_number:expr) => {{ - use crate::factory::transfer_log; use chaindexing::EventsIngesterJsonRpc; use ethers::providers::ProviderError; use ethers::types::{Block, Filter, Log, TxHash, U64}; + use $crate::factory::transfer_log; #[derive(Clone)] struct JsonRpc; diff --git a/chaindexing-tests/src/tests/contract_states.rs b/chaindexing-tests/src/tests/contract_states.rs index f124b26..f69b575 100644 --- a/chaindexing-tests/src/tests/contract_states.rs +++ b/chaindexing-tests/src/tests/contract_states.rs @@ -1,6 +1,6 @@ #[cfg(test)] mod tests { - use chaindexing::{ChaindexingRepo, EventContext, HasRawQueryClient}; + use chaindexing::{ChaindexingRepo, EventContext, HasRawQueryClient, NoSharedState}; use super::*; use crate::factory::{bayc_contract, transfer_event_with_contract}; @@ -12,9 +12,10 @@ mod tests { let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await; let raw_query_txn_client = ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await; - let event_context = EventContext::new( + let event_context: EventContext<'_, NoSharedState> = EventContext::new( transfer_event_with_contract(bayc_contract), &raw_query_txn_client, + None, ); let new_state = NftState { token_id: 2 }; @@ -37,9 +38,10 @@ mod tests { let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await; let raw_query_txn_client = ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await; - let event_context = EventContext::new( + let event_context: EventContext<'_, NoSharedState> = EventContext::new( transfer_event_with_contract(bayc_contract), &raw_query_txn_client, + None, ); let new_state = NftState { token_id: 1 }; @@ -68,9 +70,10 @@ mod tests { let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await; let raw_query_txn_client = ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await; - let event_context = EventContext::new( + let event_context: EventContext<'_, NoSharedState> = EventContext::new( transfer_event_with_contract(bayc_contract), &raw_query_txn_client, + None, ); let new_state = NftState { token_id: 9 }; diff --git a/chaindexing-tests/src/tests/events_ingester.rs b/chaindexing-tests/src/tests/events_ingester.rs index 45f8083..0f25ffd 100644 --- a/chaindexing-tests/src/tests/events_ingester.rs +++ b/chaindexing-tests/src/tests/events_ingester.rs @@ -10,7 +10,8 @@ mod tests { json_rpc_with_empty_logs, json_rpc_with_filter_stubber, json_rpc_with_logs, test_runner, }; use chaindexing::{ - Chain, ChaindexingRepo, EventsIngester, HasRawQueryClient, MinConfirmationCount, Repo, + Chain, ChaindexingRepo, Contract, EventsIngester, HasRawQueryClient, MinConfirmationCount, + NoSharedState, Repo, }; #[tokio::test] @@ -151,7 +152,7 @@ mod tests { let pool = test_runner::get_pool().await; test_runner::run_test(&pool, |conn| async move { - let contracts = vec![]; + let contracts: Vec> = vec![]; let json_rpc = Arc::new(empty_json_rpc()); let blocks_per_batch = 10; let conn = Arc::new(Mutex::new(conn)); diff --git a/chaindexing/src/config.rs b/chaindexing/src/config.rs index 23b3cb3..0c8dbde 100644 --- a/chaindexing/src/config.rs +++ b/chaindexing/src/config.rs @@ -1,6 +1,7 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use ethers::types::Chain; +use tokio::sync::Mutex; use crate::{ChaindexingRepo, Chains, Contract, MinConfirmationCount}; @@ -22,19 +23,20 @@ impl std::fmt::Debug for ConfigError { } } #[derive(Clone)] -pub struct Config { +pub struct Config { pub chains: Chains, pub repo: ChaindexingRepo, - pub contracts: Vec, + pub contracts: Vec>, pub min_confirmation_count: MinConfirmationCount, pub blocks_per_batch: u64, pub handler_rate_ms: u64, pub ingestion_rate_ms: u64, pub reset_count: u8, + pub shared_state: Option>>, } -impl Config { - pub fn new(repo: ChaindexingRepo) -> Self { +impl Config { + pub fn new(repo: ChaindexingRepo, initial_state: Option) -> Self { Self { repo, chains: HashMap::new(), @@ -44,6 +46,7 @@ impl Config { handler_rate_ms: 4000, ingestion_rate_ms: 4000, reset_count: 0, + shared_state: initial_state.map(|s| Arc::new(Mutex::new(s))), } } @@ -53,7 +56,7 @@ impl Config { self } - pub fn add_contract(mut self, contract: Contract) -> Self { + pub fn add_contract(mut self, contract: Contract) -> Self { self.contracts.push(contract); self diff --git a/chaindexing/src/contract_states.rs b/chaindexing/src/contract_states.rs index cd19bbf..e99b21a 100644 --- a/chaindexing/src/contract_states.rs +++ b/chaindexing/src/contract_states.rs @@ -75,7 +75,7 @@ pub trait ContractState: StateView::get_complete(&view, table_name, client).await } - async fn create<'a>(&self, context: &EventHandlerContext) { + async fn create<'a, S: Send + Sync + Clone>(&self, context: &EventHandlerContext) { let event = &context.event; let client = context.get_raw_query_client(); @@ -87,7 +87,11 @@ pub trait ContractState: StateView::refresh(&latest_state_version, table_name, client).await; } - async fn update<'a>(&self, updates: HashMap, context: &EventHandlerContext) { + async fn update<'a, S: Send + Sync + Clone>( + &self, + updates: HashMap, + context: &EventHandlerContext, + ) { let event = &context.event; let client = context.get_raw_query_client(); @@ -99,7 +103,7 @@ pub trait ContractState: StateView::refresh(&latest_state_version, table_name, client).await; } - async fn delete<'a>(&self, context: &EventHandlerContext) { + async fn delete<'a, S: Send + Sync + Clone>(&self, context: &EventHandlerContext) { let event = &context.event; let client = context.get_raw_query_client(); @@ -111,18 +115,18 @@ pub trait ContractState: StateView::refresh(&latest_state_version, table_name, client).await; } - async fn read_one<'a>( + async fn read_one<'a, S: Send + Sync + Clone>( filters: HashMap, - context: &EventHandlerContext, + context: &EventHandlerContext, ) -> Option { let states = Self::read_many(filters, context).await; states.first().cloned() } - async fn read_many<'a>( + async fn read_many<'a, S: Send + Sync + Clone>( filters: HashMap, - context: &EventHandlerContext, + context: &EventHandlerContext, ) -> Vec { let client = context.get_raw_query_client(); diff --git a/chaindexing/src/contracts.rs b/chaindexing/src/contracts.rs index fe29505..39b775d 100644 --- a/chaindexing/src/contracts.rs +++ b/chaindexing/src/contracts.rs @@ -30,14 +30,14 @@ impl ContractEvent { type EventAbi = &'static str; #[derive(Clone)] -pub struct Contract { +pub struct Contract { pub addresses: Vec, pub name: String, - pub event_handlers: HashMap>, + pub event_handlers: HashMap>>, pub state_migrations: Vec>, } -impl Contract { +impl Contract { pub fn new(name: &str) -> Self { Self { addresses: vec![], @@ -47,7 +47,7 @@ impl Contract { } } - pub fn add_address(&self, address: &str, chain: &Chain, start_block_number: i64) -> Self { + pub fn add_address(&mut self, address: &str, chain: &Chain, start_block_number: i64) -> Self { let mut addresses = self.addresses.clone(); addresses.push(UnsavedContractAddress::new( @@ -66,7 +66,7 @@ impl Contract { pub fn add_event( mut self, event_abi: EventAbi, - event_handler: impl EventHandler + 'static, + event_handler: impl EventHandler + 'static, ) -> Self { self.event_handlers.insert(event_abi, Arc::new(event_handler)); @@ -101,13 +101,15 @@ impl Contract { pub struct Contracts; impl Contracts { - pub fn get_state_migrations(contracts: &[Contract]) -> Vec> { + pub fn get_state_migrations( + contracts: &[Contract], + ) -> Vec> { contracts.iter().flat_map(|c| c.state_migrations.clone()).collect() } - pub fn get_all_event_handlers_by_event_abi( - contracts: &[Contract], - ) -> HashMap> { + pub fn get_all_event_handlers_by_event_abi( + contracts: &[Contract], + ) -> HashMap>> { contracts.iter().fold( HashMap::new(), |mut event_handlers_by_event_abi, contract| { @@ -120,8 +122,8 @@ impl Contracts { ) } - pub fn group_event_topics_by_names( - contracts: &[Contract], + pub fn group_event_topics_by_names( + contracts: &[Contract], ) -> HashMap> { contracts.iter().fold(HashMap::new(), |mut topics_by_contract_name, contract| { topics_by_contract_name.insert(contract.name.clone(), contract.get_event_topics()); @@ -130,8 +132,8 @@ impl Contracts { }) } - pub fn group_events_by_topics( - contracts: &[Contract], + pub fn group_events_by_topics( + contracts: &[Contract], ) -> HashMap { contracts .iter() @@ -140,8 +142,8 @@ impl Contracts { .collect() } - pub fn get_all_contract_addresses_grouped_by_address( - contracts: &[Contract], + pub fn get_all_contract_addresses_grouped_by_address( + contracts: &[Contract], ) -> HashMap { contracts.iter().fold(HashMap::new(), |mut contracts_by_addresses, contract| { contract.addresses.iter().for_each( diff --git a/chaindexing/src/event_handlers.rs b/chaindexing/src/event_handlers.rs index 01e455e..26a97f9 100644 --- a/chaindexing/src/event_handlers.rs +++ b/chaindexing/src/event_handlers.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::{sync::Arc, time::Duration}; mod handle_events; @@ -9,34 +10,51 @@ use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo}; use crate::{ChaindexingRepoRawQueryTxnClient, HasRawQueryClient}; #[derive(Clone)] -pub struct EventHandlerContext<'a> { +pub struct EventHandlerContext<'a, SharedState: Sync + Send + Clone> { pub event: Event, raw_query_client: &'a ChaindexingRepoRawQueryTxnClient<'a>, + shared_state: Option>>, } -impl<'a> EventHandlerContext<'a> { - pub fn new(event: Event, client: &'a ChaindexingRepoRawQueryTxnClient<'a>) -> Self { +impl<'a, SharedState: Sync + Send + Clone> EventHandlerContext<'a, SharedState> { + pub fn new( + event: Event, + client: &'a ChaindexingRepoRawQueryTxnClient<'a>, + shared_state: Option>>, + ) -> Self { Self { event, raw_query_client: client, + shared_state, } } + pub async fn get_shared_state(&self) -> SharedState { + let shared_state = self.shared_state.clone().unwrap(); + let shared_state = shared_state.lock().await; + shared_state.clone() + } + pub(super) fn get_raw_query_client(&self) -> &'a ChaindexingRepoRawQueryTxnClient<'a> { self.raw_query_client } } +#[derive(Clone, Debug)] +pub struct NoSharedState; + #[async_trait::async_trait] pub trait EventHandler: Send + Sync { - async fn handle_event<'a>(&self, event_context: EventHandlerContext<'a>); + type SharedState: Send + Sync + Clone + Debug; + + async fn handle_event<'a>(&self, event_context: EventHandlerContext<'a, Self::SharedState>); } // TODO: Use just raw query client through for mutations pub struct EventHandlers; impl EventHandlers { - pub fn start(config: &Config) { + pub fn start(config: &Config) { let config = config.clone(); tokio::spawn(async move { let pool = config.repo.get_pool(1).await; @@ -56,6 +74,7 @@ impl EventHandlers { conn.clone(), &event_handlers_by_event_abi, &mut raw_query_client, + config.shared_state.clone(), ) .await; diff --git a/chaindexing/src/event_handlers/handle_events.rs b/chaindexing/src/event_handlers/handle_events.rs index 7ea2147..5c3b5a9 100644 --- a/chaindexing/src/event_handlers/handle_events.rs +++ b/chaindexing/src/event_handlers/handle_events.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::{collections::HashMap, sync::Arc}; use futures_util::StreamExt; @@ -11,10 +12,11 @@ use crate::{ use super::{EventHandler, EventHandlerContext}; -pub async fn run<'a>( +pub async fn run<'a, S: Send + Sync + Clone + Debug>( conn: Arc>>, - event_handlers_by_event_abi: &HashMap<&str, Arc>, + event_handlers_by_event_abi: &HashMap<&str, Arc>>, raw_query_client: &mut ChaindexingRepoRawQueryClient, + shared_state: Option>>, ) { let mut contract_addresses_stream = ChaindexingRepo::get_contract_addresses_stream(conn.clone()); @@ -26,17 +28,19 @@ pub async fn run<'a>( &contract_address, event_handlers_by_event_abi, raw_query_client, + shared_state.clone(), ) .await } } } -async fn handle_events_for_contract_address<'a>( +async fn handle_events_for_contract_address<'a, S: Send + Sync + Clone + Debug>( conn: Arc>>, contract_address: &ContractAddress, - event_handlers_by_event_abi: &HashMap<&str, Arc>, + event_handlers_by_event_abi: &HashMap<&str, Arc>>, raw_query_client: &mut ChaindexingRepoRawQueryClient, + shared_state: Option>>, ) { let mut events_stream = ChaindexingRepo::get_events_stream( conn.clone(), @@ -58,8 +62,11 @@ async fn handle_events_for_contract_address<'a>( for event in events.clone() { let event_handler = event_handlers_by_event_abi.get(event.abi.as_str()).unwrap(); - let event_handler_context = - EventHandlerContext::new(event.clone(), &raw_query_txn_client); + let event_handler_context = EventHandlerContext::new( + event.clone(), + &raw_query_txn_client, + shared_state.clone(), + ); event_handler.handle_event(event_handler_context).await; } diff --git a/chaindexing/src/events.rs b/chaindexing/src/events.rs index 4ce3b0e..ba56e0a 100644 --- a/chaindexing/src/events.rs +++ b/chaindexing/src/events.rs @@ -123,9 +123,9 @@ impl Event { pub struct Events; impl Events { - pub fn get( + pub fn get( logs: &[Log], - contracts: &Vec, + contracts: &Vec>, blocks_by_number: &HashMap>, ) -> Vec { let events_by_topics = Contracts::group_events_by_topics(contracts); diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index 2e6af2b..551b1c9 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -94,7 +94,7 @@ impl From for EventsIngesterError { pub struct EventsIngester; impl EventsIngester { - pub fn start(config: &Config) { + pub fn start(config: &Config) { let config = config.clone(); tokio::spawn(async move { let pool = config.repo.get_pool(1).await; @@ -128,10 +128,10 @@ impl EventsIngester { }); } - pub async fn ingest<'a>( + pub async fn ingest<'a, S: Send + Sync + Clone>( conn: Arc>>, raw_query_client: &ChaindexingRepoRawQueryClient, - contracts: &Vec, + contracts: &Vec>, blocks_per_batch: u64, json_rpc: Arc, chain: &Chain, @@ -258,9 +258,9 @@ async fn backoff(retries_so_far: u32) { struct Filters; impl Filters { - fn new( + fn get( contract_addresses: &[ContractAddress], - contracts: &[Contract], + contracts: &[Contract], current_block_number: u64, blocks_per_batch: u64, execution: &Execution, diff --git a/chaindexing/src/events_ingester/ingest_events.rs b/chaindexing/src/events_ingester/ingest_events.rs index 91b5508..a4abed8 100644 --- a/chaindexing/src/events_ingester/ingest_events.rs +++ b/chaindexing/src/events_ingester/ingest_events.rs @@ -13,16 +13,16 @@ use crate::{ use super::{fetch_blocks_by_number, fetch_logs, EventsIngesterError, Filter, Filters}; -pub async fn run<'a>( +pub async fn run<'a, S: Send + Sync + Clone>( conn: &mut ChaindexingRepoConn<'a>, raw_query_client: &ChaindexingRepoRawQueryClient, contract_addresses: Vec, - contracts: &Vec, + contracts: &Vec>, json_rpc: &Arc, current_block_number: u64, blocks_per_batch: u64, ) -> Result<(), EventsIngesterError> { - let filters = Filters::new( + let filters = Filters::get( &contract_addresses, contracts, current_block_number, diff --git a/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs b/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs index b1b8ec7..0bf13fd 100644 --- a/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs +++ b/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs @@ -15,17 +15,17 @@ use crate::{ use super::{fetch_blocks_by_number, fetch_logs, EventsIngesterError, Filter, Filters}; -pub async fn run<'a>( +pub async fn run<'a, S: Send + Sync + Clone>( conn: &mut ChaindexingRepoConn<'a>, contract_addresses: Vec, - contracts: &Vec, + contracts: &Vec>, json_rpc: &Arc, chain: &Chain, current_block_number: u64, blocks_per_batch: u64, min_confirmation_count: &MinConfirmationCount, ) -> Result<(), EventsIngesterError> { - let filters = Filters::new( + let filters = Filters::get( &contract_addresses, contracts, current_block_number, @@ -65,10 +65,10 @@ async fn get_already_ingested_events<'a>( already_ingested_events } -async fn get_json_rpc_events( +async fn get_json_rpc_events( filters: &Vec, json_rpc: &Arc, - contracts: &Vec, + contracts: &Vec>, ) -> Vec { let logs = fetch_logs(filters, json_rpc).await; let blocks_by_number = fetch_blocks_by_number(&logs, json_rpc).await; diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index 3b116db..5bb4f3d 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -10,6 +10,8 @@ mod events_ingester; mod repos; mod reset_counts; +use std::fmt::Debug; + pub use chain_reorg::{MinConfirmationCount, ReorgedBlock, ReorgedBlocks, UnsavedReorgedBlock}; pub use chains::Chains; pub use config::Config; @@ -17,7 +19,9 @@ use config::ConfigError; pub use contract_states::{ContractState, ContractStateMigrations, ContractStates}; pub use contracts::{Contract, ContractAddress, ContractEvent, Contracts, UnsavedContractAddress}; pub use ethers::prelude::Chain; -pub use event_handlers::{EventHandler, EventHandlerContext as EventContext, EventHandlers}; +pub use event_handlers::{ + EventHandler, EventHandlerContext as EventContext, EventHandlers, NoSharedState, +}; pub use events::{Event, Events}; pub use events_ingester::{EventsIngester, EventsIngesterJsonRpc}; pub use repos::*; @@ -69,7 +73,9 @@ impl std::fmt::Debug for ChaindexingError { pub struct Chaindexing; impl Chaindexing { - pub async fn index_states(config: &Config) -> Result<(), ChaindexingError> { + pub async fn index_states( + config: &Config, + ) -> Result<(), ChaindexingError> { config.validate()?; Self::setup(config).await?; EventsIngester::start(config); @@ -77,7 +83,7 @@ impl Chaindexing { Ok(()) } - pub async fn setup(config: &Config) -> Result<(), ChaindexingError> { + pub async fn setup(config: &Config) -> Result<(), ChaindexingError> { let Config { repo, contracts, @@ -100,9 +106,9 @@ impl Chaindexing { Ok(()) } - pub async fn maybe_reset<'a>( + pub async fn maybe_reset<'a, S: Send + Sync + Clone>( reset_count: &u8, - contracts: &[Contract], + contracts: &[Contract], client: &ChaindexingRepoRawQueryClient, conn: &mut ChaindexingRepoConn<'a>, ) { @@ -133,17 +139,17 @@ impl Chaindexing { ChaindexingRepo::migrate(client, ChaindexingRepo::get_reset_internal_migrations()).await; } - pub async fn run_migrations_for_contract_states( + pub async fn run_migrations_for_contract_states( client: &ChaindexingRepoRawQueryClient, - contracts: &[Contract], + contracts: &[Contract], ) { for state_migration in Contracts::get_state_migrations(contracts) { ChaindexingRepo::migrate(client, state_migration.get_migrations()).await; } } - pub async fn reset_migrations_for_contract_states( + pub async fn reset_migrations_for_contract_states( client: &ChaindexingRepoRawQueryClient, - contracts: &[Contract], + contracts: &[Contract], ) { for state_migration in Contracts::get_state_migrations(contracts) { ChaindexingRepo::migrate(client, state_migration.get_reset_migrations()).await;