From 2ce69cba7719cc6040be1e89b7880b33a8d17abb Mon Sep 17 00:00:00 2001 From: Emma Zhong Date: Fri, 13 Dec 2024 13:45:21 -0800 Subject: [PATCH] [indexer-alt] add prune impls for each pipeline --- crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs | 11 ++++++++++- crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs | 11 ++++++++++- crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs | 11 ++++++++++- crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs | 11 ++++++++++- .../sui-indexer-alt/src/handlers/kv_epoch_starts.rs | 11 ++++++++++- .../sui-indexer-alt/src/handlers/kv_transactions.rs | 11 ++++++++++- .../src/handlers/tx_affected_addresses.rs | 11 ++++++++++- .../src/handlers/tx_affected_objects.rs | 11 ++++++++++- .../src/handlers/tx_balance_changes.rs | 11 ++++++++++- crates/sui-indexer-alt/src/handlers/tx_calls.rs | 11 ++++++++++- crates/sui-indexer-alt/src/handlers/tx_digests.rs | 11 ++++++++++- crates/sui-indexer-alt/src/handlers/tx_kinds.rs | 11 ++++++++++- 12 files changed, 120 insertions(+), 12 deletions(-) diff --git a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs index b5f6f0a4b7ea3..43559cf2e1d80 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs @@ -4,8 +4,9 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -57,4 +58,12 @@ impl Handler for EvEmitMod { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.tx_interval(); + let filter = ev_emit_mod::table + .filter(ev_emit_mod::tx_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs index c66d5592fe57e..bcd6955e766c9 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs @@ -4,8 +4,9 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -60,4 +61,12 @@ impl Handler for EvStructInst { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.tx_interval(); + let filter = ev_struct_inst::table + .filter(ev_struct_inst::tx_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs index a9bc26a7e90f4..16cb8b3776395 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -38,4 +39,12 @@ impl Handler for KvCheckpoints { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.checkpoint_interval(); + let filter = kv_checkpoints::table + .filter(kv_checkpoints::sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs index 926d9325f442e..e19821e08f8f6 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends}; use sui_pg_db as db; use sui_types::{ @@ -125,4 +126,12 @@ impl Handler for KvEpochEnds { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.containing_epochs(); + let filter = + kv_epoch_ends::table.filter(kv_epoch_ends::epoch.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs index bd5efcdf61463..9bffbf6661d81 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts}; use sui_pg_db as db; use sui_types::{ @@ -72,4 +73,12 @@ impl Handler for KvEpochStarts { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.containing_epochs(); + let filter = kv_epoch_starts::table + .filter(kv_epoch_starts::epoch.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs index 7bef2130d8177..e50905ea992c8 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -66,4 +67,12 @@ impl Handler for KvTransactions { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.checkpoint_interval(); + let filter = kv_transactions::table + .filter(kv_transactions::cp_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs index 51fb7e6917b8f..47b8b63578318 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs @@ -4,9 +4,10 @@ use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use itertools::Itertools; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{ schema::tx_affected_addresses, transactions::StoredTxAffectedAddress, }; @@ -69,4 +70,12 @@ impl Handler for TxAffectedAddresses { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.tx_interval(); + let filter = tx_affected_addresses::table + .filter(tx_affected_addresses::tx_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs index c99f8dd56a49b..0e13546940be2 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject}; use sui_pg_db as db; use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData}; @@ -59,4 +60,12 @@ impl Handler for TxAffectedObjects { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.tx_interval(); + let filter = tx_affected_objects::table + .filter(tx_affected_objects::tx_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs index 31a49d33943cc..38b4d4a20c2d2 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs @@ -4,8 +4,9 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{ schema::tx_balance_changes, transactions::{BalanceChange, StoredTxBalanceChange}, @@ -65,6 +66,14 @@ impl Handler for TxBalanceChanges { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.tx_interval(); + let filter = tx_balance_changes::table + .filter(tx_balance_changes::tx_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } /// Calculate balance changes based on the object's input and output objects. diff --git a/crates/sui-indexer-alt/src/handlers/tx_calls.rs b/crates/sui-indexer-alt/src/handlers/tx_calls.rs index e189bdd9acd2d..8ae007aff7c08 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_calls.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_calls.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use anyhow::{Ok, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -62,4 +63,12 @@ impl Handler for TxCalls { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.tx_interval(); + let filter = tx_calls::table + .filter(tx_calls::tx_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_digests.rs b/crates/sui-indexer-alt/src/handlers/tx_digests.rs index 579ec32429240..005250a578209 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_digests.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_digests.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{schema::tx_digests, transactions::StoredTxDigest}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -49,4 +50,12 @@ impl Handler for TxDigests { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.tx_interval(); + let filter = tx_digests::table + .filter(tx_digests::tx_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs index 5f61e66be360f..1afbb96d2839b 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange}; use sui_indexer_alt_schema::{ schema::tx_kinds, transactions::{StoredKind, StoredTxKind}, @@ -60,4 +61,12 @@ impl Handler for TxKinds { .execute(conn) .await?) } + + async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result { + let (from, to) = range.tx_interval(); + let filter = tx_kinds::table + .filter(tx_kinds::tx_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } }