diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f680331..39e70a8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,15 +52,16 @@ jobs: - name: Run Tests run: cargo test - # clippy: - # name: Clippy - # runs-on: ubuntu-latest - # if: github.event_name != 'pull_request' - # timeout-minutes: 45 - # steps: - # - uses: actions/checkout@v3 - # - uses: dtolnay/rust-toolchain@clippy - # - run: cargo clippy --tests -- -Dclippy::all + clippy: + name: Clippy + runs-on: ubuntu-latest + if: github.event_name != 'pull_request' + timeout-minutes: 45 + steps: + - uses: actions/checkout@v3 + - uses: dtolnay/rust-toolchain@clippy + # - run: cargo clippy --tests -- -Dclippy::all + - run: cargo clippy fmt: name: fmt diff --git a/chaindexing-tests/src/factory/events.rs b/chaindexing-tests/src/factory/events.rs index 02c52f3..4fbe78b 100644 --- a/chaindexing-tests/src/factory/events.rs +++ b/chaindexing-tests/src/factory/events.rs @@ -9,12 +9,12 @@ pub fn transfer_event_with_contract(contract: Contract) -> Event { let contract_address = BAYC_CONTRACT_ADDRESS; let transfer_log = transfer_log(contract_address); let blocks_by_number = HashMap::from([( - transfer_log.block_number.clone().unwrap(), + transfer_log.block_number.unwrap(), Block { ..Default::default() }, )]); - Events::new(&vec![transfer_log], &vec![contract], &blocks_by_number) + Events::get(&vec![transfer_log], &vec![contract], &blocks_by_number) .first() .cloned() .unwrap() diff --git a/chaindexing-tests/src/factory/json_rpcs.rs b/chaindexing-tests/src/factory/json_rpcs.rs index c206dd3..c5681d8 100644 --- a/chaindexing-tests/src/factory/json_rpcs.rs +++ b/chaindexing-tests/src/factory/json_rpcs.rs @@ -25,7 +25,7 @@ pub fn empty_json_rpc() -> impl EventsIngesterJsonRpc { } } - return JsonRpc; + JsonRpc } use ethers::types::{Bytes, H160, H256}; @@ -65,7 +65,7 @@ fn h256(str: &str) -> H256 { #[macro_export] macro_rules! json_rpc_with_logs { ($contract_address:expr) => {{ - use crate::json_rpc_with_logs; + use $crate::json_rpc_with_logs; json_rpc_with_logs!($contract_address, 17774490) }}; diff --git a/chaindexing-tests/src/test_runner.rs b/chaindexing-tests/src/test_runner.rs index cea92ac..9386f2b 100644 --- a/chaindexing-tests/src/test_runner.rs +++ b/chaindexing-tests/src/test_runner.rs @@ -16,7 +16,7 @@ where TestFn: Fn(ChaindexingRepoConn<'a>) -> Fut, Fut: Future, { - let mut conn = ChaindexingRepo::get_conn(&pool).await; + let mut conn = ChaindexingRepo::get_conn(pool).await; if should_setup_test_db() { db::setup(); diff --git a/chaindexing-tests/src/tests/contract_states.rs b/chaindexing-tests/src/tests/contract_states.rs index e67c7b2..f124b26 100644 --- a/chaindexing-tests/src/tests/contract_states.rs +++ b/chaindexing-tests/src/tests/contract_states.rs @@ -114,5 +114,5 @@ impl ContractStateMigrations for NftStateMigrations { pub async fn setup() { let bayc_contract = bayc_contract().add_state_migrations(NftStateMigrations); let raw_query_client = test_runner::new_repo().get_raw_query_client().await; - Chaindexing::run_migrations_for_contract_states(&raw_query_client, &vec![bayc_contract]).await; + Chaindexing::run_migrations_for_contract_states(&raw_query_client, &[bayc_contract]).await; } diff --git a/chaindexing/src/chain_reorg.rs b/chaindexing/src/chain_reorg.rs index dd8f8d3..8fe0bde 100644 --- a/chaindexing/src/chain_reorg.rs +++ b/chaindexing/src/chain_reorg.rs @@ -63,16 +63,16 @@ impl UnsavedReorgedBlock { pub struct ReorgedBlocks; impl ReorgedBlocks { - pub fn only_earliest_per_chain(reorged_blocks: &Vec) -> Vec { + pub fn only_earliest_per_chain(reorged_blocks: &[ReorgedBlock]) -> Vec { reorged_blocks - .into_iter() + .iter() .fold( HashMap::::new(), |mut reorged_blocks_by_chain, reorged_block| { let ReorgedBlock { chain_id, .. } = reorged_block; if let Some(earliest_reorged_block) = reorged_blocks_by_chain.get(chain_id) { - if reorged_block.block_number < (*earliest_reorged_block).block_number { + if reorged_block.block_number < earliest_reorged_block.block_number { reorged_blocks_by_chain.insert(*chain_id, reorged_block.clone()); } } else { @@ -88,7 +88,7 @@ impl ReorgedBlocks { .collect() } - pub fn get_ids(reorged_blocks: &Vec) -> Vec { + pub fn get_ids(reorged_blocks: &[ReorgedBlock]) -> Vec { reorged_blocks.iter().map(|r| r.id).collect() } } diff --git a/chaindexing/src/contract_states.rs b/chaindexing/src/contract_states.rs index 871191d..cd19bbf 100644 --- a/chaindexing/src/contract_states.rs +++ b/chaindexing/src/contract_states.rs @@ -18,12 +18,12 @@ pub struct ContractStates; impl ContractStates { pub async fn backtrack_states<'a>( - state_migrations: &Vec>, + state_migrations: &[Arc], chain_id: i32, block_number: i64, client: &ChaindexingRepoRawQueryTxnClient<'a>, ) { - let table_names = Self::get_all_table_names(&state_migrations); + let table_names = Self::get_all_table_names(state_migrations); for table_name in table_names { let state_versions = @@ -38,7 +38,7 @@ impl ContractStates { } pub fn get_all_table_names( - state_migrations: &Vec>, + state_migrations: &[Arc], ) -> Vec { state_migrations .iter() @@ -92,7 +92,7 @@ pub trait ContractState: let client = context.get_raw_query_client(); let table_name = Self::table_name(); - let state_view = self.to_complete_view(&table_name, &client).await; + let state_view = self.to_complete_view(table_name, client).await; let latest_state_version = StateVersion::update(&state_view, &updates, table_name, event, client).await; @@ -104,7 +104,7 @@ pub trait ContractState: let client = context.get_raw_query_client(); let table_name = Self::table_name(); - let state_view = self.to_complete_view(&table_name, &client).await; + let state_view = self.to_complete_view(table_name, client).await; let latest_state_version = StateVersion::delete(&state_view, table_name, event, client).await; @@ -137,7 +137,7 @@ pub trait ContractState: } pub fn to_columns_and_values(state: &HashMap) -> (Vec, Vec) { - state.into_iter().fold( + state.iter().fold( (vec![], vec![]), |(mut columns, mut values), (column, value)| { columns.push(column.to_string()); @@ -166,7 +166,7 @@ pub fn serde_map_to_string_map( if value.is_object() { map.insert(key.to_owned(), value.to_string()); } else { - map.insert(key.to_owned(), value.to_string().replace("\"", "")); + map.insert(key.to_owned(), value.to_string().replace('\"', "")); } } diff --git a/chaindexing/src/contract_states/migrations.rs b/chaindexing/src/contract_states/migrations.rs index 25150a7..e73c364 100644 --- a/chaindexing/src/contract_states/migrations.rs +++ b/chaindexing/src/contract_states/migrations.rs @@ -11,7 +11,7 @@ pub trait ContractStateMigrations: Send + Sync { fn get_table_names(&self) -> Vec { self.migrations().iter().fold(vec![], |mut table_names, migration| { if migration.starts_with("CREATE TABLE IF NOT EXISTS") { - let table_name = extract_table_name(&migration); + let table_name = extract_table_name(migration); table_names.push(table_name) } @@ -27,16 +27,14 @@ pub trait ContractStateMigrations: Send + Sync { if user_migration.starts_with("CREATE TABLE IF NOT EXISTS") { let create_state_views_table_migration = - append_migration(&user_migration, &get_remaining_state_views_migration()); + append_migration(user_migration, &get_remaining_state_views_migration()); let create_state_views_table_migration = DefaultMigration::remove_repeating_occurrences( &create_state_views_table_migration, ); - let create_state_versions_table_migration = append_migration( - &user_migration, - &get_remaining_state_versions_migration(), - ); + let create_state_versions_table_migration = + append_migration(user_migration, &get_remaining_state_versions_migration()); let create_state_versions_table_migration = set_state_versions_table_name(&create_state_versions_table_migration); let create_state_versions_table_migration = @@ -77,7 +75,7 @@ pub trait ContractStateMigrations: Send + Sync { .iter() .filter(|m| m.starts_with("CREATE TABLE IF NOT EXISTS")) .map(|create_migration| { - let table_name = extract_table_name(&create_migration); + let table_name = extract_table_name(create_migration); format!("DROP TABLE IF EXISTS {table_name}") }) @@ -88,7 +86,7 @@ pub trait ContractStateMigrations: Send + Sync { fn extract_table_name(migration: &str) -> String { migration .replace("CREATE TABLE IF NOT EXISTS", "") - .split("(") + .split('(') .collect::>() .first() .unwrap() @@ -98,12 +96,12 @@ fn extract_table_name(migration: &str) -> String { fn extract_table_fields(migration: &str, remove_json_fields: bool) -> Vec { migration - .replace(")", "") - .split("(") + .replace(')', "") + .split('(') .collect::>() .last() .unwrap() - .split(",") + .split(',') .filter(|field| remove_json_fields && !(field.contains("JSON") || field.contains("JSONB"))) .map(|field| { field @@ -140,8 +138,8 @@ fn validate_migration(migration: &str) { } fn append_migration(migration: &str, migration_to_append: &str) -> String { - let mut migration = migration.replace("\n", ""); - migration.push_str(","); + let mut migration = migration.replace('\n', ""); + migration.push(','); migration.push_str(migration_to_append); migration .split_ascii_whitespace() @@ -176,12 +174,12 @@ fn set_state_versions_table_name(migration: &str) -> String { } fn extract_migration_columns(migration: &str) -> Vec { - let migration_tokens = migration.split("("); + let migration_tokens = migration.split('('); let migration = migration_tokens.last().unwrap(); - let mut migration_tokens = migration.split(")"); + let mut migration_tokens = migration.split(')'); let migration = migration_tokens.next().unwrap(); - migration.split(",").fold(vec![], |mut migration_columns, migration_column| { + migration.split(',').fold(vec![], |mut migration_columns, migration_column| { migration_columns.push(migration_column.to_string()); migration_columns }) @@ -252,14 +250,14 @@ impl DefaultMigration { let mut repeating_state_fields_count = repeating_state_fields.iter().fold( HashMap::new(), |mut repeating_field_count, field| { - repeating_field_count.insert(*field, 0 as u8); + repeating_field_count.insert(*field, 0_u8); repeating_field_count }, ); migration - .split(",") + .split(',') .fold(vec![], |mut unique_migration_tokens, migration_token| { match repeating_state_fields.iter().find(|field| migration_token.contains(**field)) { @@ -307,7 +305,7 @@ mod contract_state_migrations_get_migration_test { contract_state.migrations().first().unwrap() ); - assert_default_migration(&create_state_migration); + assert_default_migration(create_state_migration); } #[test] diff --git a/chaindexing/src/contract_states/state_versions.rs b/chaindexing/src/contract_states/state_versions.rs index c213c81..42153b6 100644 --- a/chaindexing/src/contract_states/state_versions.rs +++ b/chaindexing/src/contract_states/state_versions.rs @@ -8,7 +8,7 @@ use crate::{ use super::{serde_map_to_string_map, to_columns_and_values}; -pub const STATE_VERSIONS_TABLE_PREFIX: &'static str = "chaindexing_state_versions_for_"; +pub const STATE_VERSIONS_TABLE_PREFIX: &str = "chaindexing_state_versions_for_"; pub const STATE_VERSIONS_UNIQUE_FIELDS: [&str; 2] = ["state_version_id", "state_version_is_deleted"]; @@ -25,7 +25,7 @@ impl StateVersions { "SELECT * FROM {table_name} WHERE chain_id = {chain_id} AND block_number >= {from_block_number}", - table_name = StateVersion::table_name(&state_table_name), + table_name = StateVersion::table_name(state_table_name), ); ChaindexingRepo::load_data_list_from_raw_query_with_txn_client::< @@ -37,7 +37,7 @@ impl StateVersions { .collect() } - pub fn get_ids(state_versions: &Vec>) -> Vec { + pub fn get_ids(state_versions: &[HashMap]) -> Vec { state_versions .iter() .map(|state_version| state_version.get("state_version_id").unwrap()) @@ -45,7 +45,7 @@ impl StateVersions { .collect() } - pub fn get_group_ids(state_versions: &Vec>) -> Vec { + pub fn get_group_ids(state_versions: &[HashMap]) -> Vec { state_versions .iter() .map(|state_version| state_version.get("state_version_group_id").unwrap()) @@ -54,7 +54,7 @@ impl StateVersions { } pub async fn delete_by_ids<'a>( - ids: &Vec, + ids: &[String], state_table_name: &str, client: &ChaindexingRepoRawQueryTxnClient<'a>, ) { @@ -69,7 +69,7 @@ impl StateVersions { } pub async fn get_latest<'a>( - group_ids: &Vec, + group_ids: &[String], state_table_name: &str, client: &ChaindexingRepoRawQueryTxnClient<'a>, ) -> Vec> { @@ -77,7 +77,7 @@ impl StateVersions { "SELECT DISTINCT ON (state_version_group_id) * FROM {table_name} WHERE state_version_group_id IN ({group_ids}) ORDER BY state_version_group_id, block_number, log_index DESC", - table_name = StateVersion::table_name(&state_table_name), + table_name = StateVersion::table_name(state_table_name), group_ids = group_ids.iter().map(|id| format!("'{id}'")).collect::>().join(",") ); diff --git a/chaindexing/src/contract_states/state_views.rs b/chaindexing/src/contract_states/state_views.rs index 90bbb88..eaa8739 100644 --- a/chaindexing/src/contract_states/state_views.rs +++ b/chaindexing/src/contract_states/state_views.rs @@ -12,12 +12,12 @@ pub struct StateViews; impl StateViews { pub async fn refresh<'a>( - state_version_group_ids: &Vec, + state_version_group_ids: &[String], table_name: &str, client: &ChaindexingRepoRawQueryTxnClient<'a>, ) { let latest_state_versions = - StateVersions::get_latest(&state_version_group_ids, table_name, client).await; + StateVersions::get_latest(state_version_group_ids, table_name, client).await; for latest_state_version in latest_state_versions { StateView::refresh(&latest_state_version, table_name, client).await @@ -53,10 +53,10 @@ impl StateView { ) { let state_version_group_id = StateVersion::get_group_id(latest_state_version); - if StateVersion::was_deleted(&latest_state_version) { + if StateVersion::was_deleted(latest_state_version) { Self::delete(&state_version_group_id, table_name, client).await; } else { - let new_state_view = Self::from_latest_state_version(&latest_state_version); + let new_state_view = Self::from_latest_state_version(latest_state_version); Self::delete(&state_version_group_id, table_name, client).await; Self::create(&new_state_view, table_name, client).await; @@ -90,7 +90,7 @@ impl StateView { table_name: &str, client: &ChaindexingRepoRawQueryTxnClient<'a>, ) { - let (columns, values) = to_columns_and_values(&new_state_view); + let (columns, values) = to_columns_and_values(new_state_view); let query = format!( "INSERT INTO {table_name} ({columns}) VALUES ({values})", columns = columns.join(","), diff --git a/chaindexing/src/contracts.rs b/chaindexing/src/contracts.rs index 7df88d1..fe29505 100644 --- a/chaindexing/src/contracts.rs +++ b/chaindexing/src/contracts.rs @@ -101,14 +101,12 @@ impl Contract { pub struct Contracts; impl Contracts { - pub fn get_state_migrations( - contracts: &Vec, - ) -> Vec> { - contracts.into_iter().flat_map(|c| c.state_migrations.clone()).collect() + pub fn get_state_migrations(contracts: &[Contract]) -> Vec> { + contracts.iter().flat_map(|c| c.state_migrations.clone()).collect() } pub fn get_all_event_handlers_by_event_abi( - contracts: &Vec, + contracts: &[Contract], ) -> HashMap> { contracts.iter().fold( HashMap::new(), @@ -123,7 +121,7 @@ impl Contracts { } pub fn group_event_topics_by_names( - contracts: &Vec, + contracts: &[Contract], ) -> HashMap> { contracts.iter().fold(HashMap::new(), |mut topics_by_contract_name, contract| { topics_by_contract_name.insert(contract.name.clone(), contract.get_event_topics()); @@ -133,7 +131,7 @@ impl Contracts { } pub fn group_events_by_topics( - contracts: &Vec, + contracts: &[Contract], ) -> HashMap { contracts .iter() @@ -142,14 +140,14 @@ impl Contracts { .collect() } - pub fn get_all_contract_addresses_grouped_by_address<'a>( - contracts: &'a Vec, - ) -> HashMap { + pub fn get_all_contract_addresses_grouped_by_address( + contracts: &[Contract], + ) -> HashMap { contracts.iter().fold(HashMap::new(), |mut contracts_by_addresses, contract| { contract.addresses.iter().for_each( |contract_address @ UnsavedContractAddress { address, .. }| { contracts_by_addresses.insert( - Address::from_str(&*address.as_str()).unwrap(), + Address::from_str(address.as_str()).unwrap(), contract_address, ); }, @@ -177,7 +175,7 @@ impl UnsavedContractAddress { contract_name: contract_name.to_string(), address: address.to_lowercase().to_string(), chain_id: *chain as i32, - start_block_number: start_block_number, + start_block_number, next_block_number_to_ingest_from: start_block_number, next_block_number_to_handle_from: start_block_number, } diff --git a/chaindexing/src/event_handlers/maybe_handle_chain_reorg.rs b/chaindexing/src/event_handlers/maybe_handle_chain_reorg.rs index cf0fabc..aee0045 100644 --- a/chaindexing/src/event_handlers/maybe_handle_chain_reorg.rs +++ b/chaindexing/src/event_handlers/maybe_handle_chain_reorg.rs @@ -13,7 +13,7 @@ use crate::{ReorgedBlock, ReorgedBlocks}; pub async fn run<'a>( conn: Arc>>, raw_query_client: &mut ChaindexingRepoRawQueryClient, - state_migrations: &Vec>, + state_migrations: &[Arc], ) { let mut conn = conn.lock().await; let reorged_blocks = ChaindexingRepo::get_unhandled_reorged_blocks(&mut conn).await; diff --git a/chaindexing/src/events.rs b/chaindexing/src/events.rs index e7f4ba9..4ce3b0e 100644 --- a/chaindexing/src/events.rs +++ b/chaindexing/src/events.rs @@ -107,11 +107,11 @@ impl Event { !self.removed } - pub fn match_contract_address(&self, contract_address: &String) -> bool { + pub fn match_contract_address(&self, contract_address: &str) -> bool { self.contract_address.to_lowercase() == *contract_address.to_lowercase() } - fn log_params_to_parameters(log_params: &Vec) -> HashMap { + fn log_params_to_parameters(log_params: &[LogParam]) -> HashMap { log_params.iter().fold(HashMap::new(), |mut parameters, log_param| { parameters.insert(log_param.name.to_string(), log_param.value.clone()); @@ -123,8 +123,8 @@ impl Event { pub struct Events; impl Events { - pub fn new( - logs: &Vec, + pub fn get( + logs: &[Log], contracts: &Vec, blocks_by_number: &HashMap>, ) -> Vec { @@ -140,13 +140,13 @@ impl Events { block_number, .. }| { - let contract_address = contract_addresses_by_address.get(&address).unwrap(); + let contract_address = contract_addresses_by_address.get(address).unwrap(); let block = blocks_by_number.get(&block_number.unwrap()).unwrap(); Event::new( log, - &events_by_topics.get(&topics[0]).unwrap(), - &contract_address, + events_by_topics.get(&topics[0]).unwrap(), + contract_address, block.timestamp.as_u64() as i64, ) }, diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index b51c437..2e6af2b 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -33,7 +33,7 @@ pub trait EventsIngesterJsonRpc: Clone + Sync + Send { &self, logs: &Vec, ) -> Result>, ProviderError> { - let mut logs = logs.clone(); + let mut logs = logs.to_owned(); logs.dedup_by_key(|log| log.block_number); const CHUNK_SIZE: usize = 4; @@ -177,18 +177,18 @@ impl EventsIngester { } fn filter_uningested_contract_addresses( - contract_addresses: &Vec, + contract_addresses: &[ContractAddress], current_block_number: u64, ) -> Vec { contract_addresses - .to_vec() - .into_iter() + .iter() .filter(|ca| current_block_number >= ca.next_block_number_to_ingest_from as u64) + .cloned() .collect() } } -async fn fetch_current_block_number<'a>(json_rpc: &'a Arc) -> u64 { +async fn fetch_current_block_number(json_rpc: &Arc) -> u64 { let mut maybe_current_block_number = None; let mut retries_so_far = 0; @@ -208,7 +208,7 @@ async fn fetch_current_block_number<'a>(json_rpc: &'a Arc, json_rpc: &Arc) -> Vec { +async fn fetch_logs(filters: &[Filter], json_rpc: &Arc) -> Vec { let mut maybe_logs = None; let mut retries_so_far = 0; @@ -259,8 +259,8 @@ struct Filters; impl Filters { fn new( - contract_addresses: &Vec, - contracts: &Vec, + contract_addresses: &[ContractAddress], + contracts: &[Contract], current_block_number: u64, blocks_per_batch: u64, execution: &Execution, @@ -284,7 +284,7 @@ impl Filters { .collect() } - fn group_by_contract_address_id(filters: &Vec) -> HashMap> { + fn group_by_contract_address_id(filters: &[Filter]) -> HashMap> { let empty_filter_group = vec![]; filters.iter().fold( @@ -305,7 +305,7 @@ impl Filters { } fn get_latest(filters: &Vec) -> Option { - let mut filters = filters.clone(); + let mut filters = filters.to_owned(); filters.sort_by_key(|f| f.value.get_to_block()); filters.last().cloned() @@ -322,7 +322,7 @@ struct Filter { impl Filter { fn new( contract_address: &ContractAddress, - topics: &Vec, + topics: &[ContractEventTopic], current_block_number: u64, blocks_per_batch: u64, execution: &Execution, diff --git a/chaindexing/src/events_ingester/ingest_events.rs b/chaindexing/src/events_ingester/ingest_events.rs index 7769521..91b5508 100644 --- a/chaindexing/src/events_ingester/ingest_events.rs +++ b/chaindexing/src/events_ingester/ingest_events.rs @@ -24,7 +24,7 @@ pub async fn run<'a>( ) -> Result<(), EventsIngesterError> { let filters = Filters::new( &contract_addresses, - &contracts, + contracts, current_block_number, blocks_per_batch, &Execution::Main, @@ -36,7 +36,7 @@ pub async fn run<'a>( if !filters.is_empty() { let logs = fetch_logs(&filters, json_rpc).await; let blocks_by_tx_hash = fetch_blocks_by_number(&logs, json_rpc).await; - let events = Events::new(&logs, &contracts, &blocks_by_tx_hash); + let events = Events::get(&logs, contracts, &blocks_by_tx_hash); ChaindexingRepo::run_in_transaction(conn, move |conn| { async move { @@ -56,7 +56,7 @@ pub async fn run<'a>( async fn remove_already_ingested_filters( filters: &Vec, - contract_addresses: &Vec, + contract_addresses: &[ContractAddress], raw_query_client: &ChaindexingRepoRawQueryClient, ) -> Vec { let current_block_filters: Vec<_> = filters @@ -65,7 +65,7 @@ async fn remove_already_ingested_filters( .collect(); if current_block_filters.is_empty() { - filters.clone() + filters.to_owned() } else { let addresses = contract_addresses.iter().map(|c| c.address.clone()).collect(); @@ -118,7 +118,7 @@ async fn update_next_block_numbers_to_ingest_from<'a>( ChaindexingRepo::update_next_block_number_to_ingest_from( conn, - &contract_address, + contract_address, next_block_number_to_ingest_from.as_u64() as i64, ) .await diff --git a/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs b/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs index 7a47d3b..b1b8ec7 100644 --- a/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs +++ b/chaindexing/src/events_ingester/maybe_handle_chain_reorg.rs @@ -27,7 +27,7 @@ pub async fn run<'a>( ) -> Result<(), EventsIngesterError> { let filters = Filters::new( &contract_addresses, - &contracts, + contracts, current_block_number, blocks_per_batch, &Execution::Confirmation(min_confirmation_count), @@ -70,10 +70,10 @@ async fn get_json_rpc_events( json_rpc: &Arc, contracts: &Vec, ) -> Vec { - let logs = fetch_logs(&filters, json_rpc).await; + let logs = fetch_logs(filters, json_rpc).await; let blocks_by_number = fetch_blocks_by_number(&logs, json_rpc).await; - Events::new(&logs, contracts, &blocks_by_number) + Events::get(&logs, contracts, &blocks_by_number) } async fn handle_chain_reorg<'a>( @@ -106,20 +106,19 @@ fn get_json_rpc_added_and_removed_events( already_ingested_events: &Vec, json_rpc_events: &Vec, ) -> Option<(Vec, Vec)> { - let already_ingested_events_set: HashSet<_> = - already_ingested_events.clone().into_iter().collect(); - let json_rpc_events_set: HashSet<_> = json_rpc_events.clone().into_iter().collect(); + let already_ingested_events_set: HashSet<_> = already_ingested_events.iter().cloned().collect(); + let json_rpc_events_set: HashSet<_> = json_rpc_events.iter().cloned().collect(); let added_events: Vec<_> = json_rpc_events - .clone() - .into_iter() + .iter() .filter(|e| !already_ingested_events_set.contains(e)) + .cloned() .collect(); let removed_events: Vec<_> = already_ingested_events - .clone() - .into_iter() + .iter() .filter(|e| !json_rpc_events_set.contains(e)) + .cloned() .collect(); if added_events.is_empty() && removed_events.is_empty() { diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index a97b439..3b116db 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -94,8 +94,7 @@ impl Chaindexing { Self::run_internal_migrations(&client).await; Self::run_migrations_for_contract_states(&client, contracts).await; - let contract_addresses = - contracts.clone().into_iter().map(|c| c.addresses).flatten().collect(); + let contract_addresses = contracts.clone().into_iter().flat_map(|c| c.addresses).collect(); ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await; Ok(()) @@ -103,7 +102,7 @@ impl Chaindexing { pub async fn maybe_reset<'a>( reset_count: &u8, - contracts: &Vec, + contracts: &[Contract], client: &ChaindexingRepoRawQueryClient, conn: &mut ChaindexingRepoConn<'a>, ) { @@ -122,21 +121,21 @@ impl Chaindexing { pub async fn run_migrations_for_resets(client: &ChaindexingRepoRawQueryClient) { ChaindexingRepo::migrate( - &client, + client, ChaindexingRepo::create_reset_counts_migration().to_vec(), ) .await; } pub async fn run_internal_migrations(client: &ChaindexingRepoRawQueryClient) { - ChaindexingRepo::migrate(&client, ChaindexingRepo::get_internal_migrations()).await; + ChaindexingRepo::migrate(client, ChaindexingRepo::get_internal_migrations()).await; } pub async fn reset_internal_migrations(client: &ChaindexingRepoRawQueryClient) { - ChaindexingRepo::migrate(&client, ChaindexingRepo::get_reset_internal_migrations()).await; + ChaindexingRepo::migrate(client, ChaindexingRepo::get_reset_internal_migrations()).await; } pub async fn run_migrations_for_contract_states( client: &ChaindexingRepoRawQueryClient, - contracts: &Vec, + contracts: &[Contract], ) { for state_migration in Contracts::get_state_migrations(contracts) { ChaindexingRepo::migrate(client, state_migration.get_migrations()).await; @@ -144,7 +143,7 @@ impl Chaindexing { } pub async fn reset_migrations_for_contract_states( client: &ChaindexingRepoRawQueryClient, - contracts: &Vec, + contracts: &[Contract], ) { for state_migration in Contracts::get_state_migrations(contracts) { ChaindexingRepo::migrate(client, state_migration.get_reset_migrations()).await; diff --git a/chaindexing/src/repos/postgres_repo.rs b/chaindexing/src/repos/postgres_repo.rs index f895135..0aaf041 100644 --- a/chaindexing/src/repos/postgres_repo.rs +++ b/chaindexing/src/repos/postgres_repo.rs @@ -108,7 +108,7 @@ impl Repo for PostgresRepo { chaindexing_contract_addresses.load(conn).await.unwrap() } - async fn create_events<'a>(conn: &mut Conn<'a>, events: &Vec) { + async fn create_events<'a>(conn: &mut Conn<'a>, events: &[Event]) { use crate::diesels::schema::chaindexing_events::dsl::*; diesel::insert_into(chaindexing_events) diff --git a/chaindexing/src/repos/postgres_repo/raw_queries.rs b/chaindexing/src/repos/postgres_repo/raw_queries.rs index f049856..6e84b51 100644 --- a/chaindexing/src/repos/postgres_repo/raw_queries.rs +++ b/chaindexing/src/repos/postgres_repo/raw_queries.rs @@ -69,14 +69,14 @@ impl ExecutesWithRawQuery for PostgresRepo { async fn update_reorged_blocks_as_handled_in_txn<'a>( client: &Self::RawQueryTxnClient<'a>, - reorged_block_ids: &Vec, + reorged_block_ids: &[i32], ) { let query = format!( "UPDATE chaindexing_reorged_blocks SET handled_at = '{handled_at}' WHERE id IN ({reorged_block_ids})", reorged_block_ids = join_i32s_with_comma(reorged_block_ids), - handled_at = chrono::Utc::now().naive_utc().to_string(), + handled_at = chrono::Utc::now().naive_utc(), ); Self::execute_raw_query_in_txn(client, &query).await; @@ -176,10 +176,10 @@ fn json_aggregate_query(query: &str) -> String { format!("WITH result AS ({query}) SELECT COALESCE(json_agg(result), '[]'::json) FROM result",) } -fn join_i32s_with_comma(i32s: &Vec) -> String { +fn join_i32s_with_comma(i32s: &[i32]) -> String { i32s.iter().map(|id| id.to_string()).collect::>().join(",") } -fn join_strings_with_comma(strings: &Vec) -> String { +fn join_strings_with_comma(strings: &[String]) -> String { strings.iter().map(|string| format!("'{string}'")).collect::>().join(",") } diff --git a/chaindexing/src/repos/repo.rs b/chaindexing/src/repos/repo.rs index 383b003..c839c1a 100644 --- a/chaindexing/src/repos/repo.rs +++ b/chaindexing/src/repos/repo.rs @@ -45,7 +45,7 @@ pub trait Repo: ); async fn get_all_contract_addresses<'a>(conn: &mut Self::Conn<'a>) -> Vec; - async fn create_events<'a>(conn: &mut Self::Conn<'a>, events: &Vec); + async fn create_events<'a>(conn: &mut Self::Conn<'a>, events: &[Event]); async fn get_all_events<'a>(conn: &mut Self::Conn<'a>) -> Vec; async fn get_events<'a>( conn: &mut Self::Conn<'a>, @@ -107,7 +107,7 @@ pub trait ExecutesWithRawQuery: HasRawQueryClient { async fn update_reorged_blocks_as_handled_in_txn<'a>( client: &Self::RawQueryTxnClient<'a>, - reorged_block_ids: &Vec, + reorged_block_ids: &[i32], ); }