From 2f79c5a94d72ba06926ba0622d5b355415257469 Mon Sep 17 00:00:00 2001
From: Joshua Oladele <jurshsmith@gmail.com>
Date: Mon, 12 Feb 2024 12:26:43 +0100
Subject: [PATCH] Allow users reset derived states using reset_scripts

This change introduces reset_scripts in user config
to enable running user defined SQL queries when
resetting indexed contracts. This is typically useful
for normalizing Chaindexing-unaware derived states.
---
 chaindexing/src/config.rs | 12 ++++++++++--
 chaindexing/src/lib.rs    | 14 +++++++++++++-
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/chaindexing/src/config.rs b/chaindexing/src/config.rs
index 79e76d5..ade9ba2 100644
--- a/chaindexing/src/config.rs
+++ b/chaindexing/src/config.rs
@@ -32,6 +32,7 @@ pub struct Config<SharedState: Sync + Send + Clone> {
     pub handler_rate_ms: u64,
     pub ingestion_rate_ms: u64,
     pub reset_count: u8,
+    pub reset_queries: Vec<String>,
     pub shared_state: Option<Arc<Mutex<SharedState>>>,
 }
 
@@ -46,6 +47,7 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
             handler_rate_ms: 4_000,
             ingestion_rate_ms: 30_000,
             reset_count: 0,
+            reset_queries: vec![],
             shared_state: None,
         }
     }
@@ -62,8 +64,8 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
         self
     }
 
-    pub fn with_initial_state(mut self, initial_state: SharedState) -> Self {
-        self.shared_state = Some(Arc::new(Mutex::new(initial_state)));
+    pub fn add_reset_query(mut self, reset_query: &str) -> Self {
+        self.reset_queries.push(reset_query.to_string());
 
         self
     }
@@ -74,6 +76,12 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
         self
     }
 
+    pub fn with_initial_state(mut self, initial_state: SharedState) -> Self {
+        self.shared_state = Some(Arc::new(Mutex::new(initial_state)));
+
+        self
+    }
+
     pub fn with_min_confirmation_count(mut self, min_confirmation_count: u8) -> Self {
         self.min_confirmation_count = MinConfirmationCount::new(min_confirmation_count);
 
diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs
index 4ebeb57..bab9095 100644
--- a/chaindexing/src/lib.rs
+++ b/chaindexing/src/lib.rs
@@ -84,6 +84,7 @@ impl Chaindexing {
             repo,
             contracts,
             reset_count,
+            reset_queries,
             ..
         } = config;
 
@@ -92,7 +93,7 @@ impl Chaindexing {
         let mut conn = ChaindexingRepo::get_conn(&pool).await;
 
         Self::run_migrations_for_resets(&client).await;
-        Self::maybe_reset(reset_count, contracts, &client, &mut conn).await;
+        Self::maybe_reset(reset_count, reset_queries, contracts, &client, &mut conn).await;
         Self::run_internal_migrations(&client).await;
         Self::run_migrations_for_contract_states(&client, contracts).await;
 
@@ -104,6 +105,7 @@ impl Chaindexing {
 
     pub async fn maybe_reset<'a, S: Send + Sync + Clone>(
         reset_count: &u8,
+        reset_queries: &Vec<String>,
         contracts: &[Contract<S>],
         client: &ChaindexingRepoRawQueryClient,
         conn: &mut ChaindexingRepoConn<'a>,
@@ -115,6 +117,7 @@ impl Chaindexing {
         if reset_count > previous_reset_count {
             Self::reset_internal_migrations(client).await;
             Self::reset_migrations_for_contract_states(client, contracts).await;
+            Self::run_user_reset_queries(client, reset_queries).await;
             for _ in previous_reset_count..reset_count {
                 ChaindexingRepo::create_reset_count(conn).await;
             }
@@ -151,6 +154,15 @@ impl Chaindexing {
             ChaindexingRepo::migrate(client, state_migration.get_reset_migrations()).await;
         }
     }
+
+    async fn run_user_reset_queries(
+        client: &ChaindexingRepoRawQueryClient,
+        reset_queries: &Vec<String>,
+    ) {
+        for reset_query in reset_queries {
+            ChaindexingRepo::execute_raw_query(client, reset_query).await;
+        }
+    }
 }
 
 pub mod hashes {