From 170be731bccf580f5e5077d559e546679cb95c64 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Mon, 18 Mar 2024 17:11:55 +0100 Subject: [PATCH] Optimize reset counts This change breaks reset count type, u8 -> u64, since users can reset as much as they need. Equally, we prune reset counts when it hits a thousand implicitly. --- chaindexing/src/config.rs | 4 ++-- chaindexing/src/diesels/schema.rs | 2 +- chaindexing/src/lib.rs | 15 +++++++++------ chaindexing/src/repos/postgres_repo.rs | 11 ++++++++--- .../src/repos/postgres_repo/raw_queries.rs | 16 ++++++++++++++++ chaindexing/src/repos/repo.rs | 5 +++-- chaindexing/src/reset_counts.rs | 10 +++++++++- 7 files changed, 48 insertions(+), 15 deletions(-) diff --git a/chaindexing/src/config.rs b/chaindexing/src/config.rs index 3c4fb36..d404371 100644 --- a/chaindexing/src/config.rs +++ b/chaindexing/src/config.rs @@ -44,7 +44,7 @@ pub struct Config { pub handler_rate_ms: u64, pub ingestion_rate_ms: u64, node_election_rate_ms: Option, - pub reset_count: u8, + pub reset_count: u64, pub reset_queries: Vec, pub shared_state: Option>>, pub max_concurrent_node_count: u16, @@ -88,7 +88,7 @@ impl Config { self } - pub fn reset(mut self, count: u8) -> Self { + pub fn reset(mut self, count: u64) -> Self { self.reset_count = count; self diff --git a/chaindexing/src/diesels/schema.rs b/chaindexing/src/diesels/schema.rs index 818bbae..a8d71e1 100644 --- a/chaindexing/src/diesels/schema.rs +++ b/chaindexing/src/diesels/schema.rs @@ -43,7 +43,7 @@ diesel::table! { diesel::table! { chaindexing_reset_counts (id) { - id -> Int4, + id -> Int8, inserted_at -> Timestamptz, } } diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index 70aa796..f5b6773 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -141,21 +141,23 @@ impl Chaindexing { Ok(()) } pub async fn maybe_reset<'a, S: Send + Sync + Clone>( - reset_count: &u8, + reset_count: &u64, reset_queries: &Vec, contracts: &[Contract], client: &ChaindexingRepoRawQueryClient, conn: &mut ChaindexingRepoConn<'a>, ) { - let reset_count = *reset_count as usize; - let reset_counts = ChaindexingRepo::get_reset_counts(conn).await; - let previous_reset_count = reset_counts.len(); + let reset_count = *reset_count; + let previous_reset_count_id = ChaindexingRepo::get_last_reset_count(conn) + .await + .map(|rc| rc.get_count()) + .unwrap_or(0); - if reset_count > previous_reset_count { + if reset_count > previous_reset_count_id { Self::reset_internal_migrations(client).await; Self::reset_migrations_for_contract_states(client, contracts).await; Self::run_user_reset_queries(client, reset_queries).await; - for _ in previous_reset_count..reset_count { + for _ in previous_reset_count_id..reset_count { ChaindexingRepo::create_reset_count(conn).await; } } @@ -167,6 +169,7 @@ impl Chaindexing { ChaindexingRepo::create_reset_counts_migration().to_vec(), ) .await; + ChaindexingRepo::prune_reset_counts(&client, reset_counts::MAX_RESET_COUNT).await; } pub async fn run_internal_migrations(client: &ChaindexingRepoRawQueryClient) { ChaindexingRepo::migrate(client, ChaindexingRepo::get_internal_migrations()).await; diff --git a/chaindexing/src/repos/postgres_repo.rs b/chaindexing/src/repos/postgres_repo.rs index fd95dd2..9875a82 100644 --- a/chaindexing/src/repos/postgres_repo.rs +++ b/chaindexing/src/repos/postgres_repo.rs @@ -17,7 +17,7 @@ use diesel::{ delete, result::{DatabaseErrorKind, Error as DieselError}, upsert::excluded, - ExpressionMethods, QueryDsl, + ExpressionMethods, OptionalExtension, QueryDsl, }; use diesel_async::{pooled_connection::AsyncDieselConnectionManager, AsyncPgConnection}; use diesel_streamer::get_serial_table_async_stream; @@ -209,10 +209,15 @@ impl Repo for PostgresRepo { .unwrap(); } - async fn get_reset_counts<'a>(conn: &mut Self::Conn<'a>) -> Vec { + async fn get_last_reset_count<'a>(conn: &mut Self::Conn<'a>) -> Option { use crate::diesels::schema::chaindexing_reset_counts::dsl::*; - chaindexing_reset_counts.load(conn).await.unwrap() + chaindexing_reset_counts + .order_by(id.desc()) + .first(conn) + .await + .optional() + .unwrap() } async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node { diff --git a/chaindexing/src/repos/postgres_repo/raw_queries.rs b/chaindexing/src/repos/postgres_repo/raw_queries.rs index 9935303..c244aaf 100644 --- a/chaindexing/src/repos/postgres_repo/raw_queries.rs +++ b/chaindexing/src/repos/postgres_repo/raw_queries.rs @@ -97,6 +97,22 @@ impl ExecutesWithRawQuery for PostgresRepo { Self::execute_raw_query(client, &query).await; } + + async fn prune_reset_counts(client: &Self::RawQueryClient, prune_size: u64) { + let query = format!( + " + DELETE FROM chaindexing_reset_counts + WHERE id NOT IN ( + SELECT id + FROM chaindexing_reset_counts + ORDER BY id DESC + LIMIT {prune_size} + ) + " + ); + + Self::execute_raw_query(client, &query).await; + } } #[async_trait::async_trait] diff --git a/chaindexing/src/repos/repo.rs b/chaindexing/src/repos/repo.rs index fa4472c..58455ee 100644 --- a/chaindexing/src/repos/repo.rs +++ b/chaindexing/src/repos/repo.rs @@ -74,7 +74,7 @@ pub trait Repo: async fn get_unhandled_reorged_blocks<'a>(conn: &mut Self::Conn<'a>) -> Vec; async fn create_reset_count<'a>(conn: &mut Self::Conn<'a>); - async fn get_reset_counts<'a>(conn: &mut Self::Conn<'a>) -> Vec; + async fn get_last_reset_count<'a>(conn: &mut Self::Conn<'a>) -> Option; async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node; async fn get_active_nodes<'a>( @@ -119,6 +119,7 @@ pub trait ExecutesWithRawQuery: HasRawQueryClient { ); async fn prune_nodes(client: &Self::RawQueryClient, prune_size: u16); + async fn prune_reset_counts(client: &Self::RawQueryClient, prune_size: u64); } #[async_trait::async_trait] @@ -278,7 +279,7 @@ impl SQLikeMigrations { pub fn create_reset_counts() -> &'static [&'static str] { &["CREATE TABLE IF NOT EXISTS chaindexing_reset_counts ( - id SERIAL PRIMARY KEY, + id BIGSERIAL PRIMARY KEY, inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW() )"] } diff --git a/chaindexing/src/reset_counts.rs b/chaindexing/src/reset_counts.rs index 5e8339b..9952cd0 100644 --- a/chaindexing/src/reset_counts.rs +++ b/chaindexing/src/reset_counts.rs @@ -4,6 +4,14 @@ use diesel::{Insertable, Queryable}; #[derive(Debug, Clone, PartialEq, Queryable, Insertable)] #[diesel(table_name = chaindexing_reset_counts)] pub struct ResetCount { - id: i32, + id: i64, inserted_at: chrono::NaiveDateTime, } + +impl ResetCount { + pub fn get_count(&self) -> u64 { + self.id as u64 + } +} + +pub const MAX_RESET_COUNT: u64 = 1_000;