From 2f79c5a94d72ba06926ba0622d5b355415257469 Mon Sep 17 00:00:00 2001 From: Joshua Oladele <jurshsmith@gmail.com> Date: Mon, 12 Feb 2024 12:26:43 +0100 Subject: [PATCH] Allow users reset derived states using reset_scripts This change introduces reset_scripts in user config to enable running user defined SQL queries when resetting indexed contracts. This is typically useful for normalizing Chaindexing-unaware derived states. --- chaindexing/src/config.rs | 12 ++++++++++-- chaindexing/src/lib.rs | 14 +++++++++++++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/chaindexing/src/config.rs b/chaindexing/src/config.rs index 79e76d5..ade9ba2 100644 --- a/chaindexing/src/config.rs +++ b/chaindexing/src/config.rs @@ -32,6 +32,7 @@ pub struct Config<SharedState: Sync + Send + Clone> { pub handler_rate_ms: u64, pub ingestion_rate_ms: u64, pub reset_count: u8, + pub reset_queries: Vec<String>, pub shared_state: Option<Arc<Mutex<SharedState>>>, } @@ -46,6 +47,7 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> { handler_rate_ms: 4_000, ingestion_rate_ms: 30_000, reset_count: 0, + reset_queries: vec![], shared_state: None, } } @@ -62,8 +64,8 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> { self } - pub fn with_initial_state(mut self, initial_state: SharedState) -> Self { - self.shared_state = Some(Arc::new(Mutex::new(initial_state))); + pub fn add_reset_query(mut self, reset_query: &str) -> Self { + self.reset_queries.push(reset_query.to_string()); self } @@ -74,6 +76,12 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> { self } + pub fn with_initial_state(mut self, initial_state: SharedState) -> Self { + self.shared_state = Some(Arc::new(Mutex::new(initial_state))); + + self + } + pub fn with_min_confirmation_count(mut self, min_confirmation_count: u8) -> Self { self.min_confirmation_count = MinConfirmationCount::new(min_confirmation_count); diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index 4ebeb57..bab9095 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -84,6 +84,7 @@ impl Chaindexing { repo, contracts, reset_count, + reset_queries, .. } = config; @@ -92,7 +93,7 @@ impl Chaindexing { let mut conn = ChaindexingRepo::get_conn(&pool).await; Self::run_migrations_for_resets(&client).await; - Self::maybe_reset(reset_count, contracts, &client, &mut conn).await; + Self::maybe_reset(reset_count, reset_queries, contracts, &client, &mut conn).await; Self::run_internal_migrations(&client).await; Self::run_migrations_for_contract_states(&client, contracts).await; @@ -104,6 +105,7 @@ impl Chaindexing { pub async fn maybe_reset<'a, S: Send + Sync + Clone>( reset_count: &u8, + reset_queries: &Vec<String>, contracts: &[Contract<S>], client: &ChaindexingRepoRawQueryClient, conn: &mut ChaindexingRepoConn<'a>, @@ -115,6 +117,7 @@ impl Chaindexing { if reset_count > previous_reset_count { Self::reset_internal_migrations(client).await; Self::reset_migrations_for_contract_states(client, contracts).await; + Self::run_user_reset_queries(client, reset_queries).await; for _ in previous_reset_count..reset_count { ChaindexingRepo::create_reset_count(conn).await; } @@ -151,6 +154,15 @@ impl Chaindexing { ChaindexingRepo::migrate(client, state_migration.get_reset_migrations()).await; } } + + async fn run_user_reset_queries( + client: &ChaindexingRepoRawQueryClient, + reset_queries: &Vec<String>, + ) { + for reset_query in reset_queries { + ChaindexingRepo::execute_raw_query(client, reset_query).await; + } + } } pub mod hashes {