diff --git a/chaindexing/src/config.rs b/chaindexing/src/config.rs index 79e76d5..ade9ba2 100644 --- a/chaindexing/src/config.rs +++ b/chaindexing/src/config.rs @@ -32,6 +32,7 @@ pub struct Config { pub handler_rate_ms: u64, pub ingestion_rate_ms: u64, pub reset_count: u8, + pub reset_queries: Vec, pub shared_state: Option>>, } @@ -46,6 +47,7 @@ impl Config { handler_rate_ms: 4_000, ingestion_rate_ms: 30_000, reset_count: 0, + reset_queries: vec![], shared_state: None, } } @@ -62,8 +64,8 @@ impl Config { self } - pub fn with_initial_state(mut self, initial_state: SharedState) -> Self { - self.shared_state = Some(Arc::new(Mutex::new(initial_state))); + pub fn add_reset_query(mut self, reset_query: &str) -> Self { + self.reset_queries.push(reset_query.to_string()); self } @@ -74,6 +76,12 @@ impl Config { self } + pub fn with_initial_state(mut self, initial_state: SharedState) -> Self { + self.shared_state = Some(Arc::new(Mutex::new(initial_state))); + + self + } + pub fn with_min_confirmation_count(mut self, min_confirmation_count: u8) -> Self { self.min_confirmation_count = MinConfirmationCount::new(min_confirmation_count); diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index 4ebeb57..bab9095 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -84,6 +84,7 @@ impl Chaindexing { repo, contracts, reset_count, + reset_queries, .. } = config; @@ -92,7 +93,7 @@ impl Chaindexing { let mut conn = ChaindexingRepo::get_conn(&pool).await; Self::run_migrations_for_resets(&client).await; - Self::maybe_reset(reset_count, contracts, &client, &mut conn).await; + Self::maybe_reset(reset_count, reset_queries, contracts, &client, &mut conn).await; Self::run_internal_migrations(&client).await; Self::run_migrations_for_contract_states(&client, contracts).await; @@ -104,6 +105,7 @@ impl Chaindexing { pub async fn maybe_reset<'a, S: Send + Sync + Clone>( reset_count: &u8, + reset_queries: &Vec, contracts: &[Contract], client: &ChaindexingRepoRawQueryClient, conn: &mut ChaindexingRepoConn<'a>, @@ -115,6 +117,7 @@ impl Chaindexing { if reset_count > previous_reset_count { Self::reset_internal_migrations(client).await; Self::reset_migrations_for_contract_states(client, contracts).await; + Self::run_user_reset_queries(client, reset_queries).await; for _ in previous_reset_count..reset_count { ChaindexingRepo::create_reset_count(conn).await; } @@ -151,6 +154,15 @@ impl Chaindexing { ChaindexingRepo::migrate(client, state_migration.get_reset_migrations()).await; } } + + async fn run_user_reset_queries( + client: &ChaindexingRepoRawQueryClient, + reset_queries: &Vec, + ) { + for reset_query in reset_queries { + ChaindexingRepo::execute_raw_query(client, reset_query).await; + } + } } pub mod hashes {