Skip to content

Commit

Permalink
Merge pull request #40 from chaindexing/fix-contract-address-constrai…
Browse files Browse the repository at this point in the history
…nt-conflict-resolution

Fix contract address constraint conflict resolution
  • Loading branch information
Jurshsmith authored Nov 3, 2023
2 parents 7a170c0 + d9f9ef8 commit 435ebfc
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 38 deletions.
1 change: 1 addition & 0 deletions chaindexing-tests/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod contract_states;
mod events_ingester;
mod repos;

pub async fn setup() {
contract_states::setup().await;
Expand Down
39 changes: 22 additions & 17 deletions chaindexing-tests/src/tests/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,25 @@ mod tests {
json_rpc_with_empty_logs, json_rpc_with_filter_stubber, json_rpc_with_logs, test_runner,
};
use chaindexing::{
Chain, Chaindexing, EventsIngester, HasRawQueryClient, MinConfirmationCount, PostgresRepo,
Repo,
Chain, ChaindexingRepo, EventsIngester, HasRawQueryClient, MinConfirmationCount, Repo,
};

#[tokio::test]
pub async fn creates_contract_events() {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
let contracts = vec![bayc_contract()];
let bayc_contract = bayc_contract();
let contracts = vec![bayc_contract.clone()];

static CURRENT_BLOCK_NUMBER: u32 = BAYC_CONTRACT_START_BLOCK_NUMBER + 20;
let json_rpc = Arc::new(json_rpc_with_logs!(
BAYC_CONTRACT_ADDRESS,
CURRENT_BLOCK_NUMBER
));

assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty());
Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await;
assert!(ChaindexingRepo::get_all_events(&mut conn).await.is_empty());
ChaindexingRepo::create_contract_addresses(&mut conn, &bayc_contract.addresses).await;

let conn = Arc::new(Mutex::new(conn));
let raw_query_client = test_runner::new_repo().get_raw_query_client().await;
Expand All @@ -44,7 +45,7 @@ mod tests {
.unwrap();

let mut conn = conn.lock().await;
let ingested_events = PostgresRepo::get_all_events(&mut conn).await;
let ingested_events = ChaindexingRepo::get_all_events(&mut conn).await;
let first_event = ingested_events.first().unwrap();
assert_eq!(
first_event.contract_address,
Expand All @@ -59,10 +60,11 @@ mod tests {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
let contracts = vec![bayc_contract()];
let bayc_contract = bayc_contract();
let contracts = vec![bayc_contract.clone()];

Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await;
let contract_addresses = PostgresRepo::get_all_contract_addresses(&mut conn).await;
ChaindexingRepo::create_contract_addresses(&mut conn, &bayc_contract.addresses).await;
let contract_addresses = ChaindexingRepo::get_all_contract_addresses(&mut conn).await;
let bayc_contract_address = contract_addresses.first().unwrap();
assert_eq!(
bayc_contract_address.next_block_number_to_ingest_from as u32,
Expand Down Expand Up @@ -100,14 +102,16 @@ mod tests {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
let contracts = vec![bayc_contract()];
let bayc_contract = bayc_contract();
let contracts = vec![bayc_contract.clone()];

static CURRENT_BLOCK_NUMBER: u32 = BAYC_CONTRACT_START_BLOCK_NUMBER + 20;
let json_rpc = Arc::new(json_rpc_with_logs!(
BAYC_CONTRACT_ADDRESS,
CURRENT_BLOCK_NUMBER
));

Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await;
ChaindexingRepo::create_contract_addresses(&mut conn, &bayc_contract.addresses).await;

let conn = Arc::new(Mutex::new(conn));
let blocks_per_batch = 10;
Expand All @@ -126,7 +130,7 @@ mod tests {
.unwrap();

let mut conn = conn.lock().await;
let contract_addresses = PostgresRepo::get_all_contract_addresses(&mut conn).await;
let contract_addresses = ChaindexingRepo::get_all_contract_addresses(&mut conn).await;
let bayc_contract_address = contract_addresses.first().unwrap();
let next_block_number_to_ingest_from =
bayc_contract_address.next_block_number_to_ingest_from as u64;
Expand Down Expand Up @@ -164,7 +168,7 @@ mod tests {
.await
.unwrap();
let mut conn = conn.lock().await;
assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty());
assert!(ChaindexingRepo::get_all_events(&mut conn).await.is_empty());
})
.await;
}
Expand All @@ -174,11 +178,12 @@ mod tests {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
let contracts = vec![bayc_contract()];
let bayc_contract = bayc_contract();
let contracts = vec![bayc_contract.clone()];
let json_rpc = Arc::new(json_rpc_with_empty_logs!(BAYC_CONTRACT_ADDRESS));

assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty());
Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await;
assert!(ChaindexingRepo::get_all_events(&mut conn).await.is_empty());
ChaindexingRepo::create_contract_addresses(&mut conn, &bayc_contract.addresses).await;

let conn = Arc::new(Mutex::new(conn));
let raw_query_client = test_runner::new_repo().get_raw_query_client().await;
Expand All @@ -195,7 +200,7 @@ mod tests {
.unwrap();

let mut conn = conn.lock().await;
assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty());
assert!(ChaindexingRepo::get_all_events(&mut conn).await.is_empty());
})
.await;
}
Expand Down
1 change: 1 addition & 0 deletions chaindexing-tests/src/tests/repos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod postgres_repo;
174 changes: 174 additions & 0 deletions chaindexing-tests/src/tests/repos/postgres_repo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#[cfg(test)]
mod create_initial_contract_addresses {
use chaindexing::{Chain, ChaindexingRepo, Repo, UnsavedContractAddress};

use crate::test_runner;

#[tokio::test]
pub async fn creates_contract_addresses() {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
let contract_name = "Test-contract-address";
let contract_address_value = "0x8a90CAb2b38dba80c64b7734e58Ee1dB38B8992e";
let chain = Chain::Arbitrum;
let start_block_number = 0;

let contract_addresses = vec![UnsavedContractAddress::new(
contract_name,
contract_address_value,
&chain,
start_block_number,
)];
ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await;

let contract_addresses = ChaindexingRepo::get_all_contract_addresses(&mut conn).await;
let contract_address = contract_addresses.first().unwrap();

assert_eq!(contract_address.contract_name, contract_name);
assert_eq!(
contract_address.address,
contract_address_value.to_lowercase()
);
assert_eq!(contract_address.start_block_number, start_block_number);
})
.await;
}

#[tokio::test]
pub async fn sets_next_block_number_to_ingest_from_with_provided_start_block_number() {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
let contract_name = "Test-contract-address";
let contract_address_value = "0x8a90CAb2b38dba80c64b7734e58Ee1dB38B8992e";
let chain = Chain::Arbitrum;
let start_block_number = 0;

let contract_addresses = vec![UnsavedContractAddress::new(
contract_name,
contract_address_value,
&chain,
start_block_number,
)];
ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await;

let contract_addresses = ChaindexingRepo::get_all_contract_addresses(&mut conn).await;
let contract_address = contract_addresses.first().unwrap();

assert_eq!(
contract_address.next_block_number_to_ingest_from,
start_block_number
);
})
.await;
}

#[tokio::test]
pub async fn sets_next_block_number_to_handle_from_with_provided_start_block_number() {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
let contract_name = "Test-contract-address";
let contract_address_value = "0x8a90CAb2b38dba80c64b7734e58Ee1dB38B8992e";
let chain = Chain::Arbitrum;
let start_block_number = 0;

let contract_addresses = vec![UnsavedContractAddress::new(
contract_name,
contract_address_value,
&chain,
start_block_number,
)];
ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await;

let contract_addresses = ChaindexingRepo::get_all_contract_addresses(&mut conn).await;
let contract_address = contract_addresses.first().unwrap();

assert_eq!(
contract_address.next_block_number_to_handle_from,
start_block_number
);
})
.await;
}

#[tokio::test]
pub async fn overwrites_contract_name_of_contract_addresses() {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
let initial_contract_address = UnsavedContractAddress::new(
"initial-contract-address",
"0x8a90CAb2b38dba80c64b7734e58Ee1dB38B8992e",
&Chain::Arbitrum,
0,
);

let contract_addresses = vec![initial_contract_address];
ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await;

let updated_contract_address = UnsavedContractAddress::new(
"updated-contract-address",
"0x8a90CAb2b38dba80c64b7734e58Ee1dB38B8992e",
&Chain::Arbitrum,
0,
);
let contract_addresses = vec![updated_contract_address];

ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await;

let contract_addresses = ChaindexingRepo::get_all_contract_addresses(&mut conn).await;
let contract_address = contract_addresses.first().unwrap();

assert_eq!(contract_address.contract_name, "updated-contract-address");
})
.await;
}

#[tokio::test]
pub async fn does_not_update_block_number_related_fields() {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
let initial_start_block_number = 400;

let initial_contract_address = UnsavedContractAddress::new(
"initial-contract-address",
"0x8a90CAb2b38dba80c64b7734e58Ee1dB38B8992e",
&Chain::Arbitrum,
initial_start_block_number,
);

let contract_addresses = vec![initial_contract_address];
ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await;

let updated_contract_address = UnsavedContractAddress::new(
"updated-contract-address",
"0x8a90CAb2b38dba80c64b7734e58Ee1dB38B8992e",
&Chain::Arbitrum,
2000,
);
let contract_addresses = vec![updated_contract_address];

ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await;

let contract_addresses = ChaindexingRepo::get_all_contract_addresses(&mut conn).await;
let contract_address = contract_addresses.first().unwrap();

assert_eq!(
contract_address.start_block_number,
initial_start_block_number
);
assert_eq!(
contract_address.next_block_number_to_ingest_from,
initial_start_block_number
);
assert_eq!(
contract_address.next_block_number_to_handle_from,
initial_start_block_number
);
})
.await;
}
}
17 changes: 5 additions & 12 deletions chaindexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use chains::Chains;
pub use config::Config;
use config::ConfigError;
pub use contract_states::{ContractState, ContractStateMigrations, ContractStates};
pub use contracts::{Contract, ContractAddress, ContractEvent, Contracts};
pub use contracts::{Contract, ContractAddress, ContractEvent, Contracts, UnsavedContractAddress};
pub use ethers::prelude::Chain;
pub use event_handlers::{EventHandler, EventHandlerContext as EventContext, EventHandlers};
pub use events::{Event, Events};
Expand Down Expand Up @@ -93,7 +93,10 @@ impl Chaindexing {
Self::maybe_reset(reset_count, contracts, &client, &mut conn).await;
Self::run_internal_migrations(&client).await;
Self::run_migrations_for_contract_states(&client, contracts).await;
Self::create_initial_contract_addresses(&mut conn, contracts).await;

let contract_addresses =
contracts.clone().into_iter().map(|c| c.addresses).flatten().collect();
ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await;

Ok(())
}
Expand Down Expand Up @@ -147,16 +150,6 @@ impl Chaindexing {
ChaindexingRepo::migrate(client, state_migration.get_reset_migrations()).await;
}
}

pub async fn create_initial_contract_addresses<'a>(
conn: &mut ChaindexingRepoConn<'a>,
contracts: &Vec<Contract>,
) {
let contract_addresses =
contracts.clone().into_iter().map(|c| c.addresses).flatten().collect();

ChaindexingRepo::create_contract_addresses(conn, &contract_addresses).await;
}
}

pub mod hashes {
Expand Down
8 changes: 3 additions & 5 deletions chaindexing/src/repos/postgres_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,13 @@ impl Repo for PostgresRepo {
conn: &mut Conn<'a>,
contract_addresses: &Vec<UnsavedContractAddress>,
) {
use crate::diesels::schema::chaindexing_contract_addresses::dsl::{
address, chaindexing_contract_addresses,
};
use crate::diesels::schema::chaindexing_contract_addresses::dsl::*;

diesel::insert_into(chaindexing_contract_addresses)
.values(contract_addresses)
.on_conflict(address)
.on_conflict((chain_id, address))
.do_update()
.set(address.eq(excluded(address)))
.set(contract_name.eq(excluded(contract_name)))
.execute(conn)
.await
.unwrap();
Expand Down
8 changes: 4 additions & 4 deletions chaindexing/src/repos/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ impl SQLikeMigrations {
next_block_number_to_ingest_from BIGINT NULL,
next_block_number_to_handle_from BIGINT NULL
)",
"CREATE UNIQUE INDEX IF NOT EXISTS chaindexing_contract_addresses_address_index
ON chaindexing_contract_addresses(address)",
"CREATE UNIQUE INDEX IF NOT EXISTS chaindexing_contract_addresses_chain_address_index
ON chaindexing_contract_addresses(chain_id, address)",
]
}
pub fn drop_contract_addresses() -> &'static [&'static str] {
Expand All @@ -228,8 +228,8 @@ impl SQLikeMigrations {
removed BOOLEAN NOT NULL,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)",
"CREATE UNIQUE INDEX IF NOT EXISTS chaindexing_events_transaction_hash_log_index
ON chaindexing_events(transaction_hash,log_index)",
"CREATE UNIQUE INDEX IF NOT EXISTS chaindexing_events_chain_transaction_hash_log_index
ON chaindexing_events(chain_id,transaction_hash,log_index)",
"CREATE INDEX IF NOT EXISTS chaindexing_events_abi
ON chaindexing_events(abi)",
]
Expand Down

0 comments on commit 435ebfc

Please sign in to comment.