Skip to content

Commit

Permalink
Allow users reset derived states using reset_scripts
Browse files Browse the repository at this point in the history
This change introduces reset_scripts in user config
to enable running user defined SQL queries when
resetting indexed contracts. This is typically useful
for normalizing Chaindexing-unaware derived states.
  • Loading branch information
Jurshsmith committed Feb 12, 2024
1 parent 2ccebe2 commit 2f79c5a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
12 changes: 10 additions & 2 deletions chaindexing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct Config<SharedState: Sync + Send + Clone> {
pub handler_rate_ms: u64,
pub ingestion_rate_ms: u64,
pub reset_count: u8,
pub reset_queries: Vec<String>,
pub shared_state: Option<Arc<Mutex<SharedState>>>,
}

Expand All @@ -46,6 +47,7 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
handler_rate_ms: 4_000,
ingestion_rate_ms: 30_000,
reset_count: 0,
reset_queries: vec![],
shared_state: None,
}
}
Expand All @@ -62,8 +64,8 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
self
}

pub fn with_initial_state(mut self, initial_state: SharedState) -> Self {
self.shared_state = Some(Arc::new(Mutex::new(initial_state)));
pub fn add_reset_query(mut self, reset_query: &str) -> Self {
self.reset_queries.push(reset_query.to_string());

self
}
Expand All @@ -74,6 +76,12 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
self
}

pub fn with_initial_state(mut self, initial_state: SharedState) -> Self {
self.shared_state = Some(Arc::new(Mutex::new(initial_state)));

self
}

pub fn with_min_confirmation_count(mut self, min_confirmation_count: u8) -> Self {
self.min_confirmation_count = MinConfirmationCount::new(min_confirmation_count);

Expand Down
14 changes: 13 additions & 1 deletion chaindexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Chaindexing {
repo,
contracts,
reset_count,
reset_queries,
..
} = config;

Expand All @@ -92,7 +93,7 @@ impl Chaindexing {
let mut conn = ChaindexingRepo::get_conn(&pool).await;

Self::run_migrations_for_resets(&client).await;
Self::maybe_reset(reset_count, contracts, &client, &mut conn).await;
Self::maybe_reset(reset_count, reset_queries, contracts, &client, &mut conn).await;
Self::run_internal_migrations(&client).await;
Self::run_migrations_for_contract_states(&client, contracts).await;

Expand All @@ -104,6 +105,7 @@ impl Chaindexing {

pub async fn maybe_reset<'a, S: Send + Sync + Clone>(
reset_count: &u8,
reset_queries: &Vec<String>,
contracts: &[Contract<S>],
client: &ChaindexingRepoRawQueryClient,
conn: &mut ChaindexingRepoConn<'a>,
Expand All @@ -115,6 +117,7 @@ impl Chaindexing {
if reset_count > previous_reset_count {
Self::reset_internal_migrations(client).await;
Self::reset_migrations_for_contract_states(client, contracts).await;
Self::run_user_reset_queries(client, reset_queries).await;
for _ in previous_reset_count..reset_count {
ChaindexingRepo::create_reset_count(conn).await;
}
Expand Down Expand Up @@ -151,6 +154,15 @@ impl Chaindexing {
ChaindexingRepo::migrate(client, state_migration.get_reset_migrations()).await;
}
}

async fn run_user_reset_queries(
client: &ChaindexingRepoRawQueryClient,
reset_queries: &Vec<String>,
) {
for reset_query in reset_queries {
ChaindexingRepo::execute_raw_query(client, reset_query).await;
}
}
}

pub mod hashes {
Expand Down

0 comments on commit 2f79c5a

Please sign in to comment.