Skip to content

Commit

Permalink
indexer-alt: wal_coin_balances pipeline
Browse files Browse the repository at this point in the history
## Description

`wal_coin_balances` is to `sum_coin_balances` what `wal_obj_types` is to
`sum_obj_types`.

## Test plan

Run the indexer, and correlate the live object set calculated from the
write-ahead log against the one that's already in the summary table:

```
sui$ cargo run -p sui-indexer-alt --release --                                   \
  --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \
  indexer --remote-store-url https://checkpoints.mainnet.sui.io                  \
  --last-checkpoint 5000
```

```
sui_indexer_alt=# SELECT COUNT(*) FROM sum_coin_balances;
 count
-------
   178
(1 row)

sui_indexer_alt=# SELECT
    COUNT(*)
FROM (
    SELECT DISTINCT ON (object_id)
        *
    FROM
        wal_coin_balances
    ORDER BY
        object_id,
        object_version DESC
) o
WHERE
    o.owner_id IS NOT NULL;
 count
-------
   178
(1 row)
```
  • Loading branch information
amnn committed Nov 1, 2024
1 parent 1059f74 commit 04804d7
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS wal_coin_balances;
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- Write-ahead log for `sum_coin_balances`.
--
-- It contains the same columns and indices as `sum_coin_balances`, but with
-- the following changes:
--
-- - A `cp_sequence_number` column (and an index on it), to support pruning by
-- checkpoint.
--
-- - The primary key includes the version, as the table may contain multiple
-- versions per object ID.
--
-- - The other fields are nullable, because this table also tracks deleted and
-- wrapped objects.
--
-- - There is an additional index on ID and version for querying the latest
-- version of every object.
--
-- This table is used in conjunction with `sum_coin_balances` to support
-- consistent live object set queries: `sum_coin_balances` holds the state of
-- the live object set at some checkpoint `C < T` where `T` is the tip of the
-- chain, and `wal_coin_balances` stores all the updates and deletes between
-- `C` and `T`.
--
-- To reconstruct the the live object set at some snapshot checkpoint `S`
-- between `C` and `T`, a query can be constructed that starts with the set
-- from `sum_coin_balances` and adds updates in `wal_coin_balances` from
-- `cp_sequence_number <= S`.
--
-- See `up.sql` for the original `sum_coin_balances` table for documentation on
-- columns.
CREATE TABLE IF NOT EXISTS wal_coin_balances
(
object_id BYTEA NOT NULL,
object_version BIGINT NOT NULL,
owner_id BYTEA,
coin_type BYTEA,
coin_balance BIGINT,
cp_sequence_number BIGINT NOT NULL,
PRIMARY KEY (object_id, object_version)
);

CREATE INDEX IF NOT EXISTS wal_coin_balances_cp_sequence_number
ON wal_coin_balances (cp_sequence_number);

CREATE INDEX IF NOT EXISTS wal_coin_balances_version
ON wal_coin_balances (object_id, object_version);

CREATE INDEX IF NOT EXISTS wal_coin_balances_owner_type
ON wal_coin_balances (owner_id, coin_type, coin_balance, object_id, object_version);
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pub mod sum_coin_balances;
pub mod sum_obj_types;
pub mod tx_affected_objects;
pub mod tx_balance_changes;
pub mod wal_coin_balances;
pub mod wal_obj_types;
59 changes: 59 additions & 0 deletions crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::Result;
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{
db,
models::objects::{StoredObjectUpdate, StoredSumCoinBalance, StoredWalCoinBalance},
pipeline::{concurrent::Handler, Processor},
schema::wal_coin_balances,
};

use super::sum_coin_balances::SumCoinBalances;

pub struct WalCoinBalances;

impl Processor for WalCoinBalances {
const NAME: &'static str = "wal_coin_balances";

type Value = StoredObjectUpdate<StoredSumCoinBalance>;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
SumCoinBalances::process(checkpoint)
}
}

#[async_trait::async_trait]
impl Handler for WalCoinBalances {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = values
.iter()
.map(|value| StoredWalCoinBalance {
object_id: value.object_id.to_vec(),
object_version: value.object_version as i64,

owner_id: value.update.as_ref().map(|o| o.owner_id.clone()),

coin_type: value.update.as_ref().map(|o| o.coin_type.clone()),
coin_balance: value.update.as_ref().map(|o| o.coin_balance),

cp_sequence_number: value.cp_sequence_number as i64,
})
.collect();

Ok(diesel::insert_into(wal_coin_balances::table)
.values(&values)
.on_conflict_do_nothing()
.execute(conn)
.await?)
}
}
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use sui_indexer_alt::{
kv_objects::KvObjects, kv_transactions::KvTransactions, obj_versions::ObjVersions,
sum_coin_balances::SumCoinBalances, sum_obj_types::SumObjTypes,
tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
wal_obj_types::WalObjTypes,
wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes,
},
Indexer,
};
Expand Down Expand Up @@ -44,6 +44,7 @@ async fn main() -> Result<()> {
indexer.concurrent_pipeline::<ObjVersions>().await?;
indexer.concurrent_pipeline::<TxAffectedObjects>().await?;
indexer.concurrent_pipeline::<TxBalanceChanges>().await?;
indexer.concurrent_pipeline::<WalCoinBalances>().await?;
indexer.concurrent_pipeline::<WalObjTypes>().await?;
indexer.sequential_pipeline::<SumCoinBalances>(lag).await?;
indexer.sequential_pipeline::<SumObjTypes>(lag).await?;
Expand Down
15 changes: 14 additions & 1 deletion crates/sui-indexer-alt/src/models/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use diesel::{
};
use sui_types::base_types::ObjectID;

use crate::schema::{kv_objects, obj_versions, sum_coin_balances, sum_obj_types, wal_obj_types};
use crate::schema::{
kv_objects, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances, wal_obj_types,
};

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = kv_objects, primary_key(object_id, object_version))]
Expand Down Expand Up @@ -70,6 +72,17 @@ pub struct StoredSumObjType {
pub instantiation: Option<Vec<u8>>,
}

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = wal_coin_balances, primary_key(object_id, object_version))]
pub struct StoredWalCoinBalance {
pub object_id: Vec<u8>,
pub object_version: i64,
pub owner_id: Option<Vec<u8>>,
pub coin_type: Option<Vec<u8>>,
pub coin_balance: Option<i64>,
pub cp_sequence_number: i64,
}

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = wal_obj_types, primary_key(object_id, object_version))]
pub struct StoredWalObjType {
Expand Down
12 changes: 12 additions & 0 deletions crates/sui-indexer-alt/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ diesel::table! {
}
}

diesel::table! {
wal_coin_balances (object_id, object_version) {
object_id -> Bytea,
object_version -> Int8,
owner_id -> Nullable<Bytea>,
coin_type -> Nullable<Bytea>,
coin_balance -> Nullable<Int8>,
cp_sequence_number -> Int8,
}
}

diesel::table! {
wal_obj_types (object_id, object_version) {
object_id -> Bytea,
Expand Down Expand Up @@ -134,6 +145,7 @@ diesel::allow_tables_to_appear_in_same_query!(
sum_obj_types,
tx_affected_objects,
tx_balance_changes,
wal_coin_balances,
wal_obj_types,
watermarks,
);

0 comments on commit 04804d7

Please sign in to comment.