diff --git a/chaindexing-tests/src/factory/contracts.rs b/chaindexing-tests/src/factory/contracts.rs index 9a58e3c..a123ce8 100644 --- a/chaindexing-tests/src/factory/contracts.rs +++ b/chaindexing-tests/src/factory/contracts.rs @@ -1,4 +1,4 @@ -use chaindexing::{Chain, Contract}; +use chaindexing::{Chain, Contract, NoSharedState}; use super::{ApprovalForAllTestEventHandler, TransferTestEventHandler}; @@ -10,7 +10,7 @@ pub const APPROCAL_EVENT_ABI: &str = pub const BAYC_CONTRACT_ADDRESS: &str = "0xBC4CA0EdA7647A8aB7C2061c2E118A18a936f13D"; pub const BAYC_CONTRACT_START_BLOCK_NUMBER: u32 = 17773490; -pub fn bayc_contract() -> Contract { +pub fn bayc_contract() -> Contract { Contract::new("BoredApeYachtClub") .add_event(TRANSFER_EVENT_ABI, TransferTestEventHandler) .add_event(APPROCAL_EVENT_ABI, ApprovalForAllTestEventHandler) diff --git a/chaindexing-tests/src/factory/event_handlers.rs b/chaindexing-tests/src/factory/event_handlers.rs index 92a5c29..4876e21 100644 --- a/chaindexing-tests/src/factory/event_handlers.rs +++ b/chaindexing-tests/src/factory/event_handlers.rs @@ -1,4 +1,4 @@ -use chaindexing::{EventContext, EventHandler}; +use chaindexing::{EventContext, EventHandler, NoSharedState}; #[derive(Clone, Debug)] pub struct NftState; @@ -7,12 +7,16 @@ pub struct TransferTestEventHandler; #[async_trait::async_trait] impl EventHandler for TransferTestEventHandler { - async fn handle_event<'a>(&self, _event_context: EventContext<'a>) {} + type SharedState = NoSharedState; + + async fn handle_event<'a>(&self, _event_context: EventContext<'a, Self::SharedState>) {} } pub struct ApprovalForAllTestEventHandler; #[async_trait::async_trait] impl EventHandler for ApprovalForAllTestEventHandler { - async fn handle_event<'a>(&self, _event_context: EventContext<'a>) {} + type SharedState = NoSharedState; + + async fn handle_event<'a>(&self, _event_context: EventContext<'a, Self::SharedState>) {} } diff --git a/chaindexing-tests/src/factory/events.rs b/chaindexing-tests/src/factory/events.rs index 4fbe78b..a8cdfb4 100644 --- a/chaindexing-tests/src/factory/events.rs +++ b/chaindexing-tests/src/factory/events.rs @@ -1,11 +1,11 @@ use std::collections::HashMap; -use chaindexing::{Contract, Event, Events}; +use chaindexing::{Contract, Event, Events, NoSharedState}; use ethers::types::Block; use super::{transfer_log, BAYC_CONTRACT_ADDRESS}; -pub fn transfer_event_with_contract(contract: Contract) -> Event { +pub fn transfer_event_with_contract(contract: Contract) -> Event { let contract_address = BAYC_CONTRACT_ADDRESS; let transfer_log = transfer_log(contract_address); let blocks_by_number = HashMap::from([( diff --git a/chaindexing-tests/src/factory/json_rpcs.rs b/chaindexing-tests/src/factory/json_rpcs.rs index c5681d8..dff2c20 100644 --- a/chaindexing-tests/src/factory/json_rpcs.rs +++ b/chaindexing-tests/src/factory/json_rpcs.rs @@ -70,10 +70,10 @@ macro_rules! json_rpc_with_logs { json_rpc_with_logs!($contract_address, 17774490) }}; ($contract_address:expr, $current_block_number:expr) => {{ - use crate::factory::transfer_log; use chaindexing::EventsIngesterJsonRpc; use ethers::providers::ProviderError; use ethers::types::{Block, Filter, Log, TxHash, U64}; + use $crate::factory::transfer_log; #[derive(Clone)] struct JsonRpc; diff --git a/chaindexing-tests/src/tests/contract_states.rs b/chaindexing-tests/src/tests/contract_states.rs index f124b26..f69b575 100644 --- a/chaindexing-tests/src/tests/contract_states.rs +++ b/chaindexing-tests/src/tests/contract_states.rs @@ -1,6 +1,6 @@ #[cfg(test)] mod tests { - use chaindexing::{ChaindexingRepo, EventContext, HasRawQueryClient}; + use chaindexing::{ChaindexingRepo, EventContext, HasRawQueryClient, NoSharedState}; use super::*; use crate::factory::{bayc_contract, transfer_event_with_contract}; @@ -12,9 +12,10 @@ mod tests { let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await; let raw_query_txn_client = ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await; - let event_context = EventContext::new( + let event_context: EventContext<'_, NoSharedState> = EventContext::new( transfer_event_with_contract(bayc_contract), &raw_query_txn_client, + None, ); let new_state = NftState { token_id: 2 }; @@ -37,9 +38,10 @@ mod tests { let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await; let raw_query_txn_client = ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await; - let event_context = EventContext::new( + let event_context: EventContext<'_, NoSharedState> = EventContext::new( transfer_event_with_contract(bayc_contract), &raw_query_txn_client, + None, ); let new_state = NftState { token_id: 1 }; @@ -68,9 +70,10 @@ mod tests { let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await; let raw_query_txn_client = ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await; - let event_context = EventContext::new( + let event_context: EventContext<'_, NoSharedState> = EventContext::new( transfer_event_with_contract(bayc_contract), &raw_query_txn_client, + None, ); let new_state = NftState { token_id: 9 }; diff --git a/chaindexing-tests/src/tests/events_ingester.rs b/chaindexing-tests/src/tests/events_ingester.rs index 45f8083..0f25ffd 100644 --- a/chaindexing-tests/src/tests/events_ingester.rs +++ b/chaindexing-tests/src/tests/events_ingester.rs @@ -10,7 +10,8 @@ mod tests { json_rpc_with_empty_logs, json_rpc_with_filter_stubber, json_rpc_with_logs, test_runner, }; use chaindexing::{ - Chain, ChaindexingRepo, EventsIngester, HasRawQueryClient, MinConfirmationCount, Repo, + Chain, ChaindexingRepo, Contract, EventsIngester, HasRawQueryClient, MinConfirmationCount, + NoSharedState, Repo, }; #[tokio::test] @@ -151,7 +152,7 @@ mod tests { let pool = test_runner::get_pool().await; test_runner::run_test(&pool, |conn| async move { - let contracts = vec![]; + let contracts: Vec> = vec![]; let json_rpc = Arc::new(empty_json_rpc()); let blocks_per_batch = 10; let conn = Arc::new(Mutex::new(conn)); diff --git a/chaindexing/src/config.rs b/chaindexing/src/config.rs index 23b3cb3..0c8dbde 100644 --- a/chaindexing/src/config.rs +++ b/chaindexing/src/config.rs @@ -1,6 +1,7 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use ethers::types::Chain; +use tokio::sync::Mutex; use crate::{ChaindexingRepo, Chains, Contract, MinConfirmationCount}; @@ -22,19 +23,20 @@ impl std::fmt::Debug for ConfigError { } } #[derive(Clone)] -pub struct Config { +pub struct Config { pub chains: Chains, pub repo: ChaindexingRepo, - pub contracts: Vec, + pub contracts: Vec>, pub min_confirmation_count: MinConfirmationCount, pub blocks_per_batch: u64, pub handler_rate_ms: u64, pub ingestion_rate_ms: u64, pub reset_count: u8, + pub shared_state: Option>>, } -impl Config { - pub fn new(repo: ChaindexingRepo) -> Self { +impl Config { + pub fn new(repo: ChaindexingRepo, initial_state: Option) -> Self { Self { repo, chains: HashMap::new(), @@ -44,6 +46,7 @@ impl Config { handler_rate_ms: 4000, ingestion_rate_ms: 4000, reset_count: 0, + shared_state: initial_state.map(|s| Arc::new(Mutex::new(s))), } } @@ -53,7 +56,7 @@ impl Config { self } - pub fn add_contract(mut self, contract: Contract) -> Self { + pub fn add_contract(mut self, contract: Contract) -> Self { self.contracts.push(contract); self diff --git a/chaindexing/src/contract_states.rs b/chaindexing/src/contract_states.rs index cd19bbf..e99b21a 100644 --- a/chaindexing/src/contract_states.rs +++ b/chaindexing/src/contract_states.rs @@ -75,7 +75,7 @@ pub trait ContractState: StateView::get_complete(&view, table_name, client).await } - async fn create<'a>(&self, context: &EventHandlerContext) { + async fn create<'a, S: Send + Sync + Clone>(&self, context: &EventHandlerContext) { let event = &context.event; let client = context.get_raw_query_client(); @@ -87,7 +87,11 @@ pub trait ContractState: StateView::refresh(&latest_state_version, table_name, client).await; } - async fn update<'a>(&self, updates: HashMap, context: &EventHandlerContext) { + async fn update<'a, S: Send + Sync + Clone>( + &self, + updates: HashMap, + context: &EventHandlerContext, + ) { let event = &context.event; let client = context.get_raw_query_client(); @@ -99,7 +103,7 @@ pub trait ContractState: StateView::refresh(&latest_state_version, table_name, client).await; } - async fn delete<'a>(&self, context: &EventHandlerContext) { + async fn delete<'a, S: Send + Sync + Clone>(&self, context: &EventHandlerContext) { let event = &context.event; let client = context.get_raw_query_client(); @@ -111,18 +115,18 @@ pub trait ContractState: StateView::refresh(&latest_state_version, table_name, client).await; } - async fn read_one<'a>( + async fn read_one<'a, S: Send + Sync + Clone>( filters: HashMap, - context: &EventHandlerContext, + context: &EventHandlerContext, ) -> Option { let states = Self::read_many(filters, context).await; states.first().cloned() } - async fn read_many<'a>( + async fn read_many<'a, S: Send + Sync + Clone>( filters: HashMap, - context: &EventHandlerContext, + context: &EventHandlerContext, ) -> Vec { let client = context.get_raw_query_client(); diff --git a/chaindexing/src/contracts.rs b/chaindexing/src/contracts.rs index fe29505..39b775d 100644 --- a/chaindexing/src/contracts.rs +++ b/chaindexing/src/contracts.rs @@ -30,14 +30,14 @@ impl ContractEvent { type EventAbi = &'static str; #[derive(Clone)] -pub struct Contract { +pub struct Contract { pub addresses: Vec, pub name: String, - pub event_handlers: HashMap>, + pub event_handlers: HashMap>>, pub state_migrations: Vec>, } -impl Contract { +impl Contract { pub fn new(name: &str) -> Self { Self { addresses: vec![], @@ -47,7 +47,7 @@ impl Contract { } } - pub fn add_address(&self, address: &str, chain: &Chain, start_block_number: i64) -> Self { + pub fn add_address(&mut self, address: &str, chain: &Chain, start_block_number: i64) -> Self { let mut addresses = self.addresses.clone(); addresses.push(UnsavedContractAddress::new( @@ -66,7 +66,7 @@ impl Contract { pub fn add_event( mut self, event_abi: EventAbi, - event_handler: impl EventHandler + 'static, + event_handler: impl EventHandler + 'static, ) -> Self { self.event_handlers.insert(event_abi, Arc::new(event_handler)); @@ -101,13 +101,15 @@ impl Contract { pub struct Contracts; impl Contracts { - pub fn get_state_migrations(contracts: &[Contract]) -> Vec> { + pub fn get_state_migrations( + contracts: &[Contract], + ) -> Vec> { contracts.iter().flat_map(|c| c.state_migrations.clone()).collect() } - pub fn get_all_event_handlers_by_event_abi( - contracts: &[Contract], - ) -> HashMap> { + pub fn get_all_event_handlers_by_event_abi( + contracts: &[Contract], + ) -> HashMap>> { contracts.iter().fold( HashMap::new(), |mut event_handlers_by_event_abi, contract| { @@ -120,8 +122,8 @@ impl Contracts { ) } - pub fn group_event_topics_by_names( - contracts: &[Contract], + pub fn group_event_topics_by_names( + contracts: &[Contract], ) -> HashMap> { contracts.iter().fold(HashMap::new(), |mut topics_by_contract_name, contract| { topics_by_contract_name.insert(contract.name.clone(), contract.get_event_topics()); @@ -130,8 +132,8 @@ impl Contracts { }) } - pub fn group_events_by_topics( - contracts: &[Contract], + pub fn group_events_by_topics( + contracts: &[Contract], ) -> HashMap { contracts .iter() @@ -140,8 +142,8 @@ impl Contracts { .collect() } - pub fn get_all_contract_addresses_grouped_by_address( - contracts: &[Contract], + pub fn get_all_contract_addresses_grouped_by_address( + contracts: &[Contract], ) -> HashMap { contracts.iter().fold(HashMap::new(), |mut contracts_by_addresses, contract| { contract.addresses.iter().for_each( diff --git a/chaindexing/src/event_handlers.rs b/chaindexing/src/event_handlers.rs index 01e455e..26a97f9 100644 --- a/chaindexing/src/event_handlers.rs +++ b/chaindexing/src/event_handlers.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::{sync::Arc, time::Duration}; mod handle_events; @@ -9,34 +10,51 @@ use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo}; use crate::{ChaindexingRepoRawQueryTxnClient, HasRawQueryClient}; #[derive(Clone)] -pub struct EventHandlerContext<'a> { +pub struct EventHandlerContext<'a, SharedState: Sync + Send + Clone> { pub event: Event, raw_query_client: &'a ChaindexingRepoRawQueryTxnClient<'a>, + shared_state: Option>>, } -impl<'a> EventHandlerContext<'a> { - pub fn new(event: Event, client: &'a ChaindexingRepoRawQueryTxnClient<'a>) -> Self { +impl<'a, SharedState: Sync + Send + Clone> EventHandlerContext<'a, SharedState> { + pub fn new( + event: Event, + client: &'a ChaindexingRepoRawQueryTxnClient<'a>, + shared_state: Option>>, + ) -> Self { Self { event, raw_query_client: client, + shared_state, } } + pub async fn get_shared_state(&self) -> SharedState { + let shared_state = self.shared_state.clone().unwrap(); + let shared_state = shared_state.lock().await; + shared_state.clone() + } + pub(super) fn get_raw_query_client(&self) -> &'a ChaindexingRepoRawQueryTxnClient<'a> { self.raw_query_client } } +#[derive(Clone, Debug)] +pub struct NoSharedState; + #[async_trait::async_trait] pub trait EventHandler: Send + Sync { - async fn handle_event<'a>(&self, event_context: EventHandlerContext<'a>); + type SharedState: Send + Sync + Clone + Debug; + + async fn handle_event<'a>(&self, event_context: EventHandlerContext<'a, Self::SharedState>); } // TODO: Use just raw query client through for mutations pub struct EventHandlers; impl EventHandlers { - pub fn start(config: &Config) { + pub fn start(config: &Config) { let config = config.clone(); tokio::spawn(async move { let pool = config.repo.get_pool(1).await; @@ -56,6 +74,7 @@ impl EventHandlers { conn.clone(), &event_handlers_by_event_abi, &mut raw_query_client, + config.shared_state.clone(), ) .await; diff --git a/chaindexing/src/event_handlers/handle_events.rs b/chaindexing/src/event_handlers/handle_events.rs index 7ea2147..5c3b5a9 100644 --- a/chaindexing/src/event_handlers/handle_events.rs +++ b/chaindexing/src/event_handlers/handle_events.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::{collections::HashMap, sync::Arc}; use futures_util::StreamExt; @@ -11,10 +12,11 @@ use crate::{ use super::{EventHandler, EventHandlerContext}; -pub async fn run<'a>( +pub async fn run<'a, S: Send + Sync + Clone + Debug>( conn: Arc>>, - event_handlers_by_event_abi: &HashMap<&str, Arc>, + event_handlers_by_event_abi: &HashMap<&str, Arc>>, raw_query_client: &mut ChaindexingRepoRawQueryClient, + shared_state: Option>>, ) { let mut contract_addresses_stream = ChaindexingRepo::get_contract_addresses_stream(conn.clone()); @@ -26,17 +28,19 @@ pub async fn run<'a>( &contract_address, event_handlers_by_event_abi, raw_query_client, + shared_state.clone(), ) .await } } } -async fn handle_events_for_contract_address<'a>( +async fn handle_events_for_contract_address<'a, S: Send + Sync + Clone + Debug>( conn: Arc>>, contract_address: &ContractAddress, - event_handlers_by_event_abi: &HashMap<&str, Arc>, + event_handlers_by_event_abi: &HashMap<&str, Arc>>, raw_query_client: &mut ChaindexingRepoRawQueryClient, + shared_state: Option>>, ) { let mut events_stream = ChaindexingRepo::get_events_stream( conn.clone(), @@ -58,8 +62,11 @@ async fn handle_events_for_contract_address<'a>( for event in events.clone() { let event_handler = event_handlers_by_event_abi.get(event.abi.as_str()).unwrap(); - let event_handler_context = - EventHandlerContext::new(event.clone(), &raw_query_txn_client); + let event_handler_context = EventHandlerContext::new( + event.clone(), + &raw_query_txn_client, + shared_state.clone(), + ); event_handler.handle_event(event_handler_context).await; } diff --git a/chaindexing/src/events.rs b/chaindexing/src/events.rs index 4ce3b0e..ba56e0a 100644 --- a/chaindexing/src/events.rs +++ b/chaindexing/src/events.rs @@ -123,9 +123,9 @@ impl Event { pub struct Events; impl Events { - pub fn get( + pub fn get( logs: &[Log], - contracts: &Vec, + contracts: &Vec>, blocks_by_number: &HashMap>, ) -> Vec { let events_by_topics = Contracts::group_events_by_topics(contracts); diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index 2e6af2b..551b1c9 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -94,7 +94,7 @@ impl From for EventsIngesterError { pub struct EventsIngester; impl EventsIngester { - pub fn start(config: &Config) { + pub fn start(config: &Config) { let config = config.clone(); tokio::spawn(async move { let pool = config.repo.get_pool(1).await; @@ -128,10 +128,10 @@ impl EventsIngester { }); } - pub async fn ingest<'a>( + pub async fn ingest<'a, S: Send + Sync + Clone>( conn: Arc>>, raw_query_client: &ChaindexingRepoRawQueryClient, - contracts: &Vec, + contracts: &Vec>, blocks_per_batch: u64, json_rpc: Arc, chain: &Chain, @@ -258,9 +258,9 @@ async fn backoff(retries_so_far: u32) { struct Filters; impl Filters { - fn new( + fn get( contract_addresses: &[ContractAddress], - contracts: &[Contract], + contracts: &[Contract], current_block_number: u64, blocks_per_batch: u64, execution: &Execution, diff --git a/chaindexing/src/events_ingester/ingest_events.rs b/chaindexing/src/events_ingester/ingest_events.rs index 91b5508..a4abed8 100644 --- a/chaindexing/src/events_ingester/ingest_events.rs +++ b/chaindexing/src/events_ingester/ingest_events.rs @@ -13,16 +13,16 @@ use crate::{ use super::{fetch_blocks_by_number, fetch_logs, EventsIngesterError, Filter, Filters}; -pub async fn run<'a>( +pub async fn run<'a, S: Send + Sync + Clone>( conn: &mut ChaindexingRepoConn<'a>, raw_query_client: &ChaindexingRepoRawQueryClient, contract_addresses: Vec, - contracts: &Vec, + contracts: &Vec>, json_rpc: &Arc, current_block_number: u64, blocks_per_batch: u64, ) -> Result<(), EventsIngesterError> { - let filters = Filters::new( + let filters = Filters::get( &contract_addresses, contracts, current_block_number, diff --git a/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs b/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs index b1b8ec7..0bf13fd 100644 --- a/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs +++ b/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs @@ -15,17 +15,17 @@ use crate::{ use super::{fetch_blocks_by_number, fetch_logs, EventsIngesterError, Filter, Filters}; -pub async fn run<'a>( +pub async fn run<'a, S: Send + Sync + Clone>( conn: &mut ChaindexingRepoConn<'a>, contract_addresses: Vec, - contracts: &Vec, + contracts: &Vec>, json_rpc: &Arc, chain: &Chain, current_block_number: u64, blocks_per_batch: u64, min_confirmation_count: &MinConfirmationCount, ) -> Result<(), EventsIngesterError> { - let filters = Filters::new( + let filters = Filters::get( &contract_addresses, contracts, current_block_number, @@ -65,10 +65,10 @@ async fn get_already_ingested_events<'a>( already_ingested_events } -async fn get_json_rpc_events( +async fn get_json_rpc_events( filters: &Vec, json_rpc: &Arc, - contracts: &Vec, + contracts: &Vec>, ) -> Vec { let logs = fetch_logs(filters, json_rpc).await; let blocks_by_number = fetch_blocks_by_number(&logs, json_rpc).await; diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index 3b116db..5bb4f3d 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -10,6 +10,8 @@ mod events_ingester; mod repos; mod reset_counts; +use std::fmt::Debug; + pub use chain_reorg::{MinConfirmationCount, ReorgedBlock, ReorgedBlocks, UnsavedReorgedBlock}; pub use chains::Chains; pub use config::Config; @@ -17,7 +19,9 @@ use config::ConfigError; pub use contract_states::{ContractState, ContractStateMigrations, ContractStates}; pub use contracts::{Contract, ContractAddress, ContractEvent, Contracts, UnsavedContractAddress}; pub use ethers::prelude::Chain; -pub use event_handlers::{EventHandler, EventHandlerContext as EventContext, EventHandlers}; +pub use event_handlers::{ + EventHandler, EventHandlerContext as EventContext, EventHandlers, NoSharedState, +}; pub use events::{Event, Events}; pub use events_ingester::{EventsIngester, EventsIngesterJsonRpc}; pub use repos::*; @@ -69,7 +73,9 @@ impl std::fmt::Debug for ChaindexingError { pub struct Chaindexing; impl Chaindexing { - pub async fn index_states(config: &Config) -> Result<(), ChaindexingError> { + pub async fn index_states( + config: &Config, + ) -> Result<(), ChaindexingError> { config.validate()?; Self::setup(config).await?; EventsIngester::start(config); @@ -77,7 +83,7 @@ impl Chaindexing { Ok(()) } - pub async fn setup(config: &Config) -> Result<(), ChaindexingError> { + pub async fn setup(config: &Config) -> Result<(), ChaindexingError> { let Config { repo, contracts, @@ -100,9 +106,9 @@ impl Chaindexing { Ok(()) } - pub async fn maybe_reset<'a>( + pub async fn maybe_reset<'a, S: Send + Sync + Clone>( reset_count: &u8, - contracts: &[Contract], + contracts: &[Contract], client: &ChaindexingRepoRawQueryClient, conn: &mut ChaindexingRepoConn<'a>, ) { @@ -133,17 +139,17 @@ impl Chaindexing { ChaindexingRepo::migrate(client, ChaindexingRepo::get_reset_internal_migrations()).await; } - pub async fn run_migrations_for_contract_states( + pub async fn run_migrations_for_contract_states( client: &ChaindexingRepoRawQueryClient, - contracts: &[Contract], + contracts: &[Contract], ) { for state_migration in Contracts::get_state_migrations(contracts) { ChaindexingRepo::migrate(client, state_migration.get_migrations()).await; } } - pub async fn reset_migrations_for_contract_states( + pub async fn reset_migrations_for_contract_states( client: &ChaindexingRepoRawQueryClient, - contracts: &[Contract], + contracts: &[Contract], ) { for state_migration in Contracts::get_state_migrations(contracts) { ChaindexingRepo::migrate(client, state_migration.get_reset_migrations()).await;