Skip to content

Commit

Permalink
indexer-alt: sum_coin_balances pipeline
Browse files Browse the repository at this point in the history
## Description

Similar to `sum_obj_types` in that it is tracking the live object set,
but this index only covers coin objects owned by addresses, and it
orders them by balance, which allows queries to return them in
decreasing balance order.

## Test plan

Manually run the indexer on the first 100,000 checkpoints:

```
sui$ cargo run -p sui-indexer-alt --release --                                   \
  --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \
  --remote-store-url https://checkpoints.mainnet.sui.io                          \
  --last-checkpoint 100000
```
  • Loading branch information
amnn committed Oct 29, 2024
1 parent f8895f5 commit c98968f
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS sum_coin_balances;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- A summary table for coins owned by addresses
--
-- This can be used to paginate the coin balances of a given address at an
-- instant in time, returning coins in descending balance order.
CREATE TABLE IF NOT EXISTS sum_coin_balances
(
object_id BYTEA PRIMARY KEY,
object_version BIGINT NOT NULL,
-- The address that owns this version of the coin (it is guaranteed to be
-- address-owned).
owner_id BYTEA NOT NULL,
-- The type of the coin, as a BCS-serialized `TypeTag`. This is only the
-- marker type, and not the full object type (e.g. `0x0...02::sui::SUI`).
coin_type BYTEA NOT NULL,
-- The balance of the coin at this version.
coin_balance BIGINT NOT NULL
);

CREATE INDEX IF NOT EXISTS sum_coin_balances_owner_type
ON sum_coin_balances (owner_id, coin_type, coin_balance, object_id, object_version);
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod ev_struct_inst;
pub mod kv_checkpoints;
pub mod kv_objects;
pub mod kv_transactions;
pub mod sum_coin_balances;
pub mod sum_obj_types;
pub mod tx_affected_objects;
pub mod tx_balance_changes;
181 changes: 181 additions & 0 deletions crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};

use anyhow::{anyhow, bail, ensure};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use sui_types::{
base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData,
object::Owner,
};

use crate::{
db,
models::objects::{StoredObjectUpdate, StoredSumCoinBalance},
pipeline::{sequential::Handler, Processor},
schema::sum_coin_balances,
};

/// Each insert or update will include at most this many rows -- the size is chosen to maximize the
/// rows without hitting the limit on bind parameters.
const UPDATE_CHUNK_ROWS: usize = i16::MAX as usize / 5;

/// Each deletion will include at most this many rows.
const DELETE_CHUNK_ROWS: usize = i16::MAX as usize;

pub struct SumCoinBalances;

impl Processor for SumCoinBalances {
const NAME: &'static str = "sum_coin_balances";

type Value = StoredObjectUpdate<StoredSumCoinBalance>;

fn process(checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
let CheckpointData { transactions, .. } = checkpoint.as_ref();

let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
let mut coin_types: BTreeMap<ObjectID, Vec<u8>> = BTreeMap::new();

// Iterate over transactions in reverse so we see the latest version of each object first.
for tx in transactions.iter().rev() {
// Find all coins in the transaction's inputs and outputs.
for object in tx.input_objects.iter().chain(tx.output_objects.iter()) {
if let Some(coin_type) = object.type_().and_then(|t| t.coin_type_maybe()) {
let serialized = bcs::to_bytes(&coin_type)
.map_err(|_| anyhow!("Failed to serialize type for {}", object.id()))?;

coin_types.insert(object.id(), serialized);
}
}

// Deleted and wrapped coins
for change in tx.effects.object_changes() {
// The object is not deleted/wrapped, or if it is it was unwrapped in the same
// transaction.
if change.output_digest.is_some() || change.input_version.is_none() {
continue;
}

// Object is not a coin
if !coin_types.contains_key(&change.id) {
continue;
}

let object_id = change.id;
let object_version = tx.effects.lamport_version().value();
match values.entry(object_id) {
Entry::Occupied(entry) => {
ensure!(entry.get().object_version > object_version);
}

Entry::Vacant(entry) => {
entry.insert(StoredObjectUpdate {
object_id,
object_version,
update: None,
});
}
}
}

// Modified and created coins.
for object in &tx.output_objects {
let object_id = object.id();
let object_version = object.version().value();

let Some(coin_type) = coin_types.get(&object_id) else {
continue;
};

// Coin balance only tracks addreess-owned objects
let Owner::AddressOwner(owner_id) = object.owner() else {
continue;
};

let Some(coin) = object.as_coin_maybe() else {
bail!("Failed to deserialize Coin for {object_id}");
};

match values.entry(object_id) {
Entry::Occupied(entry) => {
ensure!(entry.get().object_version > object_version);
}

Entry::Vacant(entry) => {
entry.insert(StoredObjectUpdate {
object_id,
object_version,
update: Some(StoredSumCoinBalance {
object_id: object_id.to_vec(),
object_version: object_version as i64,
owner_id: owner_id.to_vec(),
coin_type: coin_type.clone(),
coin_balance: coin.balance.value() as i64,
}),
});
}
}
}
}

Ok(values.into_values().collect())
}
}

#[async_trait::async_trait]
impl Handler for SumCoinBalances {
type Batch = BTreeMap<ObjectID, Self::Value>;

fn batch(batch: &mut Self::Batch, updates: Vec<Self::Value>) {
// `updates` are guaranteed to be provided in checkpoint order, so blindly inserting them
// will result in the batch containing the most up-to-date update for each object.
for update in updates {
batch.insert(update.object_id, update);
}
}

async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> anyhow::Result<usize> {
let mut updates = vec![];
let mut deletes = vec![];

for update in batch.values() {
if let Some(update) = &update.update {
updates.push(update.clone());
} else {
deletes.push(update.object_id.to_vec());
}
}

let update_chunks = updates.chunks(UPDATE_CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_coin_balances::table)
.values(chunk)
.on_conflict(sum_coin_balances::object_id)
.do_update()
.set((
sum_coin_balances::object_version
.eq(excluded(sum_coin_balances::object_version)),
sum_coin_balances::owner_id.eq(excluded(sum_coin_balances::owner_id)),
sum_coin_balances::coin_balance.eq(excluded(sum_coin_balances::coin_balance)),
))
.execute(conn)
});

let updated: usize = try_join_all(update_chunks).await?.into_iter().sum();

let delete_chunks = deletes.chunks(DELETE_CHUNK_ROWS).map(|chunk| {
diesel::delete(sum_coin_balances::table)
.filter(sum_coin_balances::object_id.eq_any(chunk.iter().cloned()))
.execute(conn)
});

let deleted: usize = try_join_all(delete_chunks).await?.into_iter().sum();

Ok(updated + deleted)
}
}
6 changes: 4 additions & 2 deletions crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use sui_indexer_alt::{
args::Args,
handlers::{
ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
kv_objects::KvObjects, kv_transactions::KvTransactions, sum_obj_types::SumObjTypes,
tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
kv_objects::KvObjects, kv_transactions::KvTransactions, sum_coin_balances::SumCoinBalances,
sum_obj_types::SumObjTypes, tx_affected_objects::TxAffectedObjects,
tx_balance_changes::TxBalanceChanges,
},
Indexer,
};
Expand Down Expand Up @@ -38,6 +39,7 @@ async fn main() -> Result<()> {
indexer.concurrent_pipeline::<KvTransactions>().await?;
indexer.concurrent_pipeline::<TxAffectedObjects>().await?;
indexer.concurrent_pipeline::<TxBalanceChanges>().await?;
indexer.sequential_pipeline::<SumCoinBalances>().await?;
indexer.sequential_pipeline::<SumObjTypes>().await?;

let h_indexer = indexer.run().await.context("Failed to start indexer")?;
Expand Down
12 changes: 11 additions & 1 deletion crates/sui-indexer-alt/src/models/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use diesel::{
};
use sui_types::base_types::ObjectID;

use crate::schema::{kv_objects, sum_obj_types};
use crate::schema::{kv_objects, sum_coin_balances, sum_obj_types};

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = kv_objects, primary_key(object_id, object_version))]
Expand Down Expand Up @@ -37,6 +37,16 @@ pub enum StoredOwnerKind {
Shared = 3,
}

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = sum_coin_balances, primary_key(object_id))]
pub struct StoredSumCoinBalance {
pub object_id: Vec<u8>,
pub object_version: i64,
pub owner_id: Vec<u8>,
pub coin_type: Vec<u8>,
pub coin_balance: i64,
}

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = sum_obj_types, primary_key(object_id))]
pub struct StoredSumObjType {
Expand Down
11 changes: 11 additions & 0 deletions crates/sui-indexer-alt/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ diesel::table! {
}
}

diesel::table! {
sum_coin_balances (object_id) {
object_id -> Bytea,
object_version -> Int8,
owner_id -> Bytea,
coin_type -> Bytea,
coin_balance -> Int8,
}
}

diesel::table! {
sum_obj_types (object_id) {
object_id -> Bytea,
Expand Down Expand Up @@ -96,6 +106,7 @@ diesel::allow_tables_to_appear_in_same_query!(
kv_checkpoints,
kv_objects,
kv_transactions,
sum_coin_balances,
sum_obj_types,
tx_affected_objects,
tx_balance_changes,
Expand Down

0 comments on commit c98968f

Please sign in to comment.