Skip to content

Commit

Permalink
Merge pull request #76 from chaindexing/optimize-reset-counts
Browse files Browse the repository at this point in the history
Optimize reset counts
  • Loading branch information
Jurshsmith authored Mar 18, 2024
2 parents 99916be + 170be73 commit 783ed0b
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 15 deletions.
4 changes: 2 additions & 2 deletions chaindexing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct Config<SharedState: Sync + Send + Clone> {
pub handler_rate_ms: u64,
pub ingestion_rate_ms: u64,
node_election_rate_ms: Option<u64>,
pub reset_count: u8,
pub reset_count: u64,
pub reset_queries: Vec<String>,
pub shared_state: Option<Arc<Mutex<SharedState>>>,
pub max_concurrent_node_count: u16,
Expand Down Expand Up @@ -88,7 +88,7 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
self
}

pub fn reset(mut self, count: u8) -> Self {
pub fn reset(mut self, count: u64) -> Self {
self.reset_count = count;

self
Expand Down
2 changes: 1 addition & 1 deletion chaindexing/src/diesels/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ diesel::table! {

diesel::table! {
chaindexing_reset_counts (id) {
id -> Int4,
id -> Int8,
inserted_at -> Timestamptz,
}
}
Expand Down
15 changes: 9 additions & 6 deletions chaindexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,23 @@ impl Chaindexing {
Ok(())
}
pub async fn maybe_reset<'a, S: Send + Sync + Clone>(
reset_count: &u8,
reset_count: &u64,
reset_queries: &Vec<String>,
contracts: &[Contract<S>],
client: &ChaindexingRepoRawQueryClient,
conn: &mut ChaindexingRepoConn<'a>,
) {
let reset_count = *reset_count as usize;
let reset_counts = ChaindexingRepo::get_reset_counts(conn).await;
let previous_reset_count = reset_counts.len();
let reset_count = *reset_count;
let previous_reset_count_id = ChaindexingRepo::get_last_reset_count(conn)
.await
.map(|rc| rc.get_count())
.unwrap_or(0);

if reset_count > previous_reset_count {
if reset_count > previous_reset_count_id {
Self::reset_internal_migrations(client).await;
Self::reset_migrations_for_contract_states(client, contracts).await;
Self::run_user_reset_queries(client, reset_queries).await;
for _ in previous_reset_count..reset_count {
for _ in previous_reset_count_id..reset_count {
ChaindexingRepo::create_reset_count(conn).await;
}
}
Expand All @@ -166,6 +168,7 @@ impl Chaindexing {
ChaindexingRepo::create_reset_counts_migration().to_vec(),
)
.await;
ChaindexingRepo::prune_reset_counts(&client, reset_counts::MAX_RESET_COUNT).await;
}
pub async fn run_internal_migrations(client: &ChaindexingRepoRawQueryClient) {
ChaindexingRepo::migrate(client, ChaindexingRepo::get_internal_migrations()).await;
Expand Down
11 changes: 8 additions & 3 deletions chaindexing/src/repos/postgres_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use diesel::{
delete,
result::{DatabaseErrorKind, Error as DieselError},
upsert::excluded,
ExpressionMethods, QueryDsl,
ExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel_async::{pooled_connection::AsyncDieselConnectionManager, AsyncPgConnection};
use diesel_streamer::get_serial_table_async_stream;
Expand Down Expand Up @@ -209,10 +209,15 @@ impl Repo for PostgresRepo {
.unwrap();
}

async fn get_reset_counts<'a>(conn: &mut Self::Conn<'a>) -> Vec<ResetCount> {
async fn get_last_reset_count<'a>(conn: &mut Self::Conn<'a>) -> Option<ResetCount> {
use crate::diesels::schema::chaindexing_reset_counts::dsl::*;

chaindexing_reset_counts.load(conn).await.unwrap()
chaindexing_reset_counts
.order_by(id.desc())
.first(conn)
.await
.optional()
.unwrap()
}

async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node {
Expand Down
16 changes: 16 additions & 0 deletions chaindexing/src/repos/postgres_repo/raw_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ impl ExecutesWithRawQuery for PostgresRepo {

Self::execute_raw_query(client, &query).await;
}

async fn prune_reset_counts(client: &Self::RawQueryClient, prune_size: u64) {
let query = format!(
"
DELETE FROM chaindexing_reset_counts
WHERE id NOT IN (
SELECT id
FROM chaindexing_reset_counts
ORDER BY id DESC
LIMIT {prune_size}
)
"
);

Self::execute_raw_query(client, &query).await;
}
}

#[async_trait::async_trait]
Expand Down
5 changes: 3 additions & 2 deletions chaindexing/src/repos/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub trait Repo:
async fn get_unhandled_reorged_blocks<'a>(conn: &mut Self::Conn<'a>) -> Vec<ReorgedBlock>;

async fn create_reset_count<'a>(conn: &mut Self::Conn<'a>);
async fn get_reset_counts<'a>(conn: &mut Self::Conn<'a>) -> Vec<ResetCount>;
async fn get_last_reset_count<'a>(conn: &mut Self::Conn<'a>) -> Option<ResetCount>;

async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node;
async fn get_active_nodes<'a>(
Expand Down Expand Up @@ -119,6 +119,7 @@ pub trait ExecutesWithRawQuery: HasRawQueryClient {
);

async fn prune_nodes(client: &Self::RawQueryClient, prune_size: u16);
async fn prune_reset_counts(client: &Self::RawQueryClient, prune_size: u64);
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -278,7 +279,7 @@ impl SQLikeMigrations {

pub fn create_reset_counts() -> &'static [&'static str] {
&["CREATE TABLE IF NOT EXISTS chaindexing_reset_counts (
id SERIAL PRIMARY KEY,
id BIGSERIAL PRIMARY KEY,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)"]
}
Expand Down
10 changes: 9 additions & 1 deletion chaindexing/src/reset_counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ use diesel::{Insertable, Queryable};
#[derive(Debug, Clone, PartialEq, Queryable, Insertable)]
#[diesel(table_name = chaindexing_reset_counts)]
pub struct ResetCount {
id: i32,
id: i64,
inserted_at: chrono::NaiveDateTime,
}

impl ResetCount {
pub fn get_count(&self) -> u64 {
self.id as u64
}
}

pub const MAX_RESET_COUNT: u64 = 1_000;

0 comments on commit 783ed0b

Please sign in to comment.