From c98968f224f1d5ad325a0c3661dd1c7e5871c59c Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Mon, 28 Oct 2024 15:23:08 +0000 Subject: [PATCH] indexer-alt: sum_coin_balances pipeline ## Description Similar to `sum_obj_types` in that it is tracking the live object set, but this index only covers coin objects owned by addresses, and it orders them by balance, which allows queries to return them in decreasing balance order. ## Test plan Manually run the indexer on the first 100,000 checkpoints: ``` sui$ cargo run -p sui-indexer-alt --release -- \ --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \ --remote-store-url https://checkpoints.mainnet.sui.io \ --last-checkpoint 100000 ``` --- .../down.sql | 1 + .../up.sql | 20 ++ crates/sui-indexer-alt/src/handlers/mod.rs | 1 + .../src/handlers/sum_coin_balances.rs | 181 ++++++++++++++++++ crates/sui-indexer-alt/src/main.rs | 6 +- crates/sui-indexer-alt/src/models/objects.rs | 12 +- crates/sui-indexer-alt/src/schema.rs | 11 ++ 7 files changed, 229 insertions(+), 3 deletions(-) create mode 100644 crates/sui-indexer-alt/migrations/2024-10-28-144002_sum_coin_balances/down.sql create mode 100644 crates/sui-indexer-alt/migrations/2024-10-28-144002_sum_coin_balances/up.sql create mode 100644 crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs diff --git a/crates/sui-indexer-alt/migrations/2024-10-28-144002_sum_coin_balances/down.sql b/crates/sui-indexer-alt/migrations/2024-10-28-144002_sum_coin_balances/down.sql new file mode 100644 index 00000000000000..68b45da3c6d9a3 --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-10-28-144002_sum_coin_balances/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS sum_coin_balances; diff --git a/crates/sui-indexer-alt/migrations/2024-10-28-144002_sum_coin_balances/up.sql b/crates/sui-indexer-alt/migrations/2024-10-28-144002_sum_coin_balances/up.sql new file mode 100644 index 00000000000000..dbd93cc74539cd --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-10-28-144002_sum_coin_balances/up.sql @@ -0,0 +1,20 @@ +-- A summary table for coins owned by addresses +-- +-- This can be used to paginate the coin balances of a given address at an +-- instant in time, returning coins in descending balance order. +CREATE TABLE IF NOT EXISTS sum_coin_balances +( + object_id BYTEA PRIMARY KEY, + object_version BIGINT NOT NULL, + -- The address that owns this version of the coin (it is guaranteed to be + -- address-owned). + owner_id BYTEA NOT NULL, + -- The type of the coin, as a BCS-serialized `TypeTag`. This is only the + -- marker type, and not the full object type (e.g. `0x0...02::sui::SUI`). + coin_type BYTEA NOT NULL, + -- The balance of the coin at this version. + coin_balance BIGINT NOT NULL +); + +CREATE INDEX IF NOT EXISTS sum_coin_balances_owner_type +ON sum_coin_balances (owner_id, coin_type, coin_balance, object_id, object_version); diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index 27ed97884ab325..58ffd14517fa03 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -6,6 +6,7 @@ pub mod ev_struct_inst; pub mod kv_checkpoints; pub mod kv_objects; pub mod kv_transactions; +pub mod sum_coin_balances; pub mod sum_obj_types; pub mod tx_affected_objects; pub mod tx_balance_changes; diff --git a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs new file mode 100644 index 00000000000000..48da9318426499 --- /dev/null +++ b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs @@ -0,0 +1,181 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::{btree_map::Entry, BTreeMap}, + sync::Arc, +}; + +use anyhow::{anyhow, bail, ensure}; +use diesel::{upsert::excluded, ExpressionMethods}; +use diesel_async::RunQueryDsl; +use futures::future::try_join_all; +use sui_types::{ + base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, + object::Owner, +}; + +use crate::{ + db, + models::objects::{StoredObjectUpdate, StoredSumCoinBalance}, + pipeline::{sequential::Handler, Processor}, + schema::sum_coin_balances, +}; + +/// Each insert or update will include at most this many rows -- the size is chosen to maximize the +/// rows without hitting the limit on bind parameters. +const UPDATE_CHUNK_ROWS: usize = i16::MAX as usize / 5; + +/// Each deletion will include at most this many rows. +const DELETE_CHUNK_ROWS: usize = i16::MAX as usize; + +pub struct SumCoinBalances; + +impl Processor for SumCoinBalances { + const NAME: &'static str = "sum_coin_balances"; + + type Value = StoredObjectUpdate; + + fn process(checkpoint: &Arc) -> anyhow::Result> { + let CheckpointData { transactions, .. } = checkpoint.as_ref(); + + let mut values: BTreeMap = BTreeMap::new(); + let mut coin_types: BTreeMap> = BTreeMap::new(); + + // Iterate over transactions in reverse so we see the latest version of each object first. + for tx in transactions.iter().rev() { + // Find all coins in the transaction's inputs and outputs. + for object in tx.input_objects.iter().chain(tx.output_objects.iter()) { + if let Some(coin_type) = object.type_().and_then(|t| t.coin_type_maybe()) { + let serialized = bcs::to_bytes(&coin_type) + .map_err(|_| anyhow!("Failed to serialize type for {}", object.id()))?; + + coin_types.insert(object.id(), serialized); + } + } + + // Deleted and wrapped coins + for change in tx.effects.object_changes() { + // The object is not deleted/wrapped, or if it is it was unwrapped in the same + // transaction. + if change.output_digest.is_some() || change.input_version.is_none() { + continue; + } + + // Object is not a coin + if !coin_types.contains_key(&change.id) { + continue; + } + + let object_id = change.id; + let object_version = tx.effects.lamport_version().value(); + match values.entry(object_id) { + Entry::Occupied(entry) => { + ensure!(entry.get().object_version > object_version); + } + + Entry::Vacant(entry) => { + entry.insert(StoredObjectUpdate { + object_id, + object_version, + update: None, + }); + } + } + } + + // Modified and created coins. + for object in &tx.output_objects { + let object_id = object.id(); + let object_version = object.version().value(); + + let Some(coin_type) = coin_types.get(&object_id) else { + continue; + }; + + // Coin balance only tracks addreess-owned objects + let Owner::AddressOwner(owner_id) = object.owner() else { + continue; + }; + + let Some(coin) = object.as_coin_maybe() else { + bail!("Failed to deserialize Coin for {object_id}"); + }; + + match values.entry(object_id) { + Entry::Occupied(entry) => { + ensure!(entry.get().object_version > object_version); + } + + Entry::Vacant(entry) => { + entry.insert(StoredObjectUpdate { + object_id, + object_version, + update: Some(StoredSumCoinBalance { + object_id: object_id.to_vec(), + object_version: object_version as i64, + owner_id: owner_id.to_vec(), + coin_type: coin_type.clone(), + coin_balance: coin.balance.value() as i64, + }), + }); + } + } + } + } + + Ok(values.into_values().collect()) + } +} + +#[async_trait::async_trait] +impl Handler for SumCoinBalances { + type Batch = BTreeMap; + + fn batch(batch: &mut Self::Batch, updates: Vec) { + // `updates` are guaranteed to be provided in checkpoint order, so blindly inserting them + // will result in the batch containing the most up-to-date update for each object. + for update in updates { + batch.insert(update.object_id, update); + } + } + + async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> anyhow::Result { + let mut updates = vec![]; + let mut deletes = vec![]; + + for update in batch.values() { + if let Some(update) = &update.update { + updates.push(update.clone()); + } else { + deletes.push(update.object_id.to_vec()); + } + } + + let update_chunks = updates.chunks(UPDATE_CHUNK_ROWS).map(|chunk| { + diesel::insert_into(sum_coin_balances::table) + .values(chunk) + .on_conflict(sum_coin_balances::object_id) + .do_update() + .set(( + sum_coin_balances::object_version + .eq(excluded(sum_coin_balances::object_version)), + sum_coin_balances::owner_id.eq(excluded(sum_coin_balances::owner_id)), + sum_coin_balances::coin_balance.eq(excluded(sum_coin_balances::coin_balance)), + )) + .execute(conn) + }); + + let updated: usize = try_join_all(update_chunks).await?.into_iter().sum(); + + let delete_chunks = deletes.chunks(DELETE_CHUNK_ROWS).map(|chunk| { + diesel::delete(sum_coin_balances::table) + .filter(sum_coin_balances::object_id.eq_any(chunk.iter().cloned())) + .execute(conn) + }); + + let deleted: usize = try_join_all(delete_chunks).await?.into_iter().sum(); + + Ok(updated + deleted) + } +} diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index 8b3b9f462c2006..e1b4ed75486df1 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -9,8 +9,9 @@ use sui_indexer_alt::{ args::Args, handlers::{ ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints, - kv_objects::KvObjects, kv_transactions::KvTransactions, sum_obj_types::SumObjTypes, - tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges, + kv_objects::KvObjects, kv_transactions::KvTransactions, sum_coin_balances::SumCoinBalances, + sum_obj_types::SumObjTypes, tx_affected_objects::TxAffectedObjects, + tx_balance_changes::TxBalanceChanges, }, Indexer, }; @@ -38,6 +39,7 @@ async fn main() -> Result<()> { indexer.concurrent_pipeline::().await?; indexer.concurrent_pipeline::().await?; indexer.concurrent_pipeline::().await?; + indexer.sequential_pipeline::().await?; indexer.sequential_pipeline::().await?; let h_indexer = indexer.run().await.context("Failed to start indexer")?; diff --git a/crates/sui-indexer-alt/src/models/objects.rs b/crates/sui-indexer-alt/src/models/objects.rs index b221abc5c7ba63..18b1b52553dd97 100644 --- a/crates/sui-indexer-alt/src/models/objects.rs +++ b/crates/sui-indexer-alt/src/models/objects.rs @@ -7,7 +7,7 @@ use diesel::{ }; use sui_types::base_types::ObjectID; -use crate::schema::{kv_objects, sum_obj_types}; +use crate::schema::{kv_objects, sum_coin_balances, sum_obj_types}; #[derive(Insertable, Debug, Clone)] #[diesel(table_name = kv_objects, primary_key(object_id, object_version))] @@ -37,6 +37,16 @@ pub enum StoredOwnerKind { Shared = 3, } +#[derive(Insertable, Debug, Clone)] +#[diesel(table_name = sum_coin_balances, primary_key(object_id))] +pub struct StoredSumCoinBalance { + pub object_id: Vec, + pub object_version: i64, + pub owner_id: Vec, + pub coin_type: Vec, + pub coin_balance: i64, +} + #[derive(Insertable, Debug, Clone)] #[diesel(table_name = sum_obj_types, primary_key(object_id))] pub struct StoredSumObjType { diff --git a/crates/sui-indexer-alt/src/schema.rs b/crates/sui-indexer-alt/src/schema.rs index 259d752dfc3c5b..2478427374c8fa 100644 --- a/crates/sui-indexer-alt/src/schema.rs +++ b/crates/sui-indexer-alt/src/schema.rs @@ -49,6 +49,16 @@ diesel::table! { } } +diesel::table! { + sum_coin_balances (object_id) { + object_id -> Bytea, + object_version -> Int8, + owner_id -> Bytea, + coin_type -> Bytea, + coin_balance -> Int8, + } +} + diesel::table! { sum_obj_types (object_id) { object_id -> Bytea, @@ -96,6 +106,7 @@ diesel::allow_tables_to_appear_in_same_query!( kv_checkpoints, kv_objects, kv_transactions, + sum_coin_balances, sum_obj_types, tx_affected_objects, tx_balance_changes,