Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer-alt: sum_obj_types pipeline #20054

Merged
merged 5 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS sum_obj_types;
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
-- A summary table of live objects, with owner and type information
--
-- This can be used to paginate the live object set at an instant in time,
-- filtering by a combination of owner and/or type.
CREATE TABLE IF NOT EXISTS sum_obj_types
amnn marked this conversation as resolved.
Show resolved Hide resolved
(
object_id BYTEA PRIMARY KEY,
object_version BIGINT NOT NULL,
-- An enum describing the object's ownership model:
--
-- Immutable = 0,
-- Address-owned = 1,
-- Object-owned (dynamic field) = 2,
-- Shared = 3.
--
-- Note that there is a distinction between an object that is owned by
-- another object (kind 2), which relates to dynamic fields, and an object
-- that is owned by another object's address (kind 1), which relates to
-- transfer-to-object.
owner_kind SMALLINT NOT NULL,
-- The address for address-owned objects, and the parent object for
-- object-owned objects.
owner_id BYTEA,
-- The following fields relate to the object's type. These only apply to
-- Move Objects. For Move Packages they will all be NULL.
--
-- The type's package ID.
package BYTEA,
-- The type's module name.
module TEXT,
-- The type's name.
name TEXT,
-- The type's type parameters, as a BCS-encoded array of TypeTags.
instantiation BYTEA
);

CREATE INDEX IF NOT EXISTS sum_obj_types_owner
ON sum_obj_types (owner_kind, owner_id, object_id, object_version);
amnn marked this conversation as resolved.
Show resolved Hide resolved

CREATE INDEX IF NOT EXISTS sum_obj_types_pkg
ON sum_obj_types (package, object_id, object_version);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to include the object_version in these indices, since this only tracks the live objects set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right, we can get rid of it -- I added it for ordering purposes, originally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I take it back -- we need the version in the index because we are querying for it. By including the version in the index we can keep this as an index-only scan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about

CREATE INDEX IF NOT EXISTS sum_obj_types_pkg
ON sum_obj_types (package, object_id) INCLUDE (object_version)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only benefit of doing this is if the index is otherwise unique on (package, object_id), which we do not need, so I prefer to use the form that doesn't require the extra SQL features.


CREATE INDEX IF NOT EXISTS sum_obj_types_mod
ON sum_obj_types (package, module, object_id, object_version);

CREATE INDEX IF NOT EXISTS sum_obj_types_name
ON sum_obj_types (package, module, name, object_id, object_version);

CREATE INDEX IF NOT EXISTS sum_obj_types_inst
ON sum_obj_types (package, module, name, instantiation, object_id, object_version);

CREATE INDEX IF NOT EXISTS sum_obj_types_owner_pkg
ON sum_obj_types (owner_kind, owner_id, package, object_id, object_version);

CREATE INDEX IF NOT EXISTS sum_obj_types_owner_mod
ON sum_obj_types (owner_kind, owner_id, package, module, object_id, object_version);

CREATE INDEX IF NOT EXISTS sum_obj_types_owner_name
ON sum_obj_types (owner_kind, owner_id, package, module, name, object_id, object_version);

CREATE INDEX IF NOT EXISTS sum_obj_types_owner_inst
ON sum_obj_types (owner_kind, owner_id, package, module, name, instantiation, object_id, object_version);
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ pub mod ev_struct_inst;
pub mod kv_checkpoints;
pub mod kv_objects;
pub mod kv_transactions;
pub mod sum_obj_types;
pub mod tx_affected_objects;
pub mod tx_balance_changes;
195 changes: 195 additions & 0 deletions crates/sui-indexer-alt/src/handlers/sum_obj_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};

use anyhow::{anyhow, ensure};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use sui_types::{
effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, object::Owner,
};

use crate::{
db,
models::objects::{StoredOwnerKind, StoredSumObjType},
pipeline::{sequential::Handler, Processor},
schema::sum_obj_types,
};

/// Each insert or update will include at most this many rows -- the size is chosen to maximize the
/// rows without hitting the limit on bind parameters.
const UPDATE_CHUNK_ROWS: usize = i16::MAX as usize / 8;
amnn marked this conversation as resolved.
Show resolved Hide resolved

/// Each deletion will include at most this many rows.
const DELETE_CHUNK_ROWS: usize = i16::MAX as usize;

pub struct SumObjTypes;

#[derive(Clone)]
pub struct DeletedSumObjType {
object_id: Vec<u8>,
object_version: i64,
}

pub enum Update {
Update(StoredSumObjType),
Delete(DeletedSumObjType),
}

impl Update {
pub fn object_id(&self) -> Vec<u8> {
match self {
Update::Update(StoredSumObjType { object_id, .. })
| Update::Delete(DeletedSumObjType { object_id, .. }) => object_id.clone(),
}
}

pub fn object_version(&self) -> i64 {
match self {
Update::Update(StoredSumObjType { object_version, .. })
| Update::Delete(DeletedSumObjType { object_version, .. }) => *object_version,
}
}
}

impl Processor for SumObjTypes {
const NAME: &'static str = "sum_obj_types";

type Value = Update;

fn process(checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
let CheckpointData { transactions, .. } = checkpoint.as_ref();

let mut values: BTreeMap<Vec<u8>, Update> = BTreeMap::new();

// Iterate over transactions in reverse so we see the latest version of each object first.
for tx in transactions.iter().rev() {
// Deleted and wrapped objects -- objects that show up without a digest in
// `object_changes` are either deleted or wrapped. Objects without an input version
// must have been unwrapped and deleted, meaning they do not need to be deleted from
amnn marked this conversation as resolved.
Show resolved Hide resolved
// our records.
for change in tx.effects.object_changes() {
if change.output_digest.is_some() || change.input_version.is_none() {
amnn marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

let object_id = change.id.to_vec();
let object_version = tx.effects.lamport_version().value() as i64;
match values.entry(object_id.clone()) {
Entry::Occupied(entry) => {
ensure!(entry.get().object_version() > object_version);
}

Entry::Vacant(entry) => {
entry.insert(Update::Delete(DeletedSumObjType {
object_id,
object_version,
}));
}
}
}

// Modified and created objects.
for object in &tx.output_objects {
let object_id = object.id().to_vec();
let object_version = object.version().value() as i64;
match values.entry(object_id.clone()) {
Entry::Occupied(entry) => {
ensure!(entry.get().object_version() > object_version);
}

Entry::Vacant(entry) => {
let type_ = object.type_();
entry.insert(Update::Update(StoredSumObjType {
object_id,
object_version,

owner_kind: match object.owner() {
Owner::AddressOwner(_) => StoredOwnerKind::Address,
Owner::ObjectOwner(_) => StoredOwnerKind::Object,
Owner::Shared { .. } => StoredOwnerKind::Shared,
Owner::Immutable => StoredOwnerKind::Immutable,
},

owner_id: match object.owner() {
Owner::AddressOwner(a) => Some(a.to_vec()),
Owner::ObjectOwner(o) => Some(o.to_vec()),
_ => None,
},

package: type_.map(|t| t.address().to_vec()),
module: type_.map(|t| t.module().to_string()),
name: type_.map(|t| t.name().to_string()),
instantiation: type_
.map(|t| bcs::to_bytes(&t.type_params()))
.transpose()
.map_err(|e| {
anyhow!(
"Failed to serialize type parameters for {}: {e}",
object.id().to_canonical_display(/* with_prefix */ true),
)
})?,
}));
}
}
}
}

Ok(values.into_values().collect())
}
}

#[async_trait::async_trait]
impl Handler for SumObjTypes {
type Batch = BTreeMap<Vec<u8>, Update>;

fn batch(batch: &mut Self::Batch, updates: Vec<Self::Value>) {
// `updates` are guaranteed to be provided in checkpoint order, so blindly inserting them
// will result in the batch containing the most up-to-date update for each object.
for update in updates {
batch.insert(update.object_id(), update);
}
}
amnn marked this conversation as resolved.
Show resolved Hide resolved

async fn commit(values: &Self::Batch, conn: &mut db::Connection<'_>) -> anyhow::Result<usize> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remind me why you chose a conn instead of a pool here, as naively try_join_all will interleave on execution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection is tied to a transaction -- all these commits are done atomically, alongside the watermark update. This is required for consistency but something that the current indexer does not do.

let mut updates = vec![];
let mut deletes = vec![];

for update in values.values() {
match update {
Update::Update(value) => updates.push(value.clone()),
Update::Delete(value) => deletes.push(value.clone()),
}
}

let update_chunks = updates.chunks(UPDATE_CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_obj_types::table)
.values(chunk)
.on_conflict(sum_obj_types::object_id)
.do_update()
.set((
sum_obj_types::object_version.eq(excluded(sum_obj_types::object_version)),
sum_obj_types::owner_kind.eq(excluded(sum_obj_types::owner_kind)),
sum_obj_types::owner_id.eq(excluded(sum_obj_types::owner_id)),
))
.execute(conn)
});

let updated: usize = try_join_all(update_chunks).await?.into_iter().sum();

let delete_chunks = deletes.chunks(DELETE_CHUNK_ROWS).map(|chunk| {
diesel::delete(sum_obj_types::table)
.filter(sum_obj_types::object_id.eq_any(chunk.iter().map(|d| d.object_id.clone())))
.execute(conn)
});

let deleted: usize = try_join_all(delete_chunks).await?.into_iter().sum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here updates and deletions are handled separately and sequentically, which dilutes the tuning power of BATCH_SIZE and likely generate small DB commit especially for deletions, wdyt of splitting updates and deletions earlier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the suggestion (to split updates and deletions earlier) could you elaborate?

The reason why I ended up having to split them up here was because of access to the connection -- Rust did not like me sharing it between two closures (the map over update chunks, and the map over deletion chunks), because it is a &mut.

There are other ways around this, where we split the updates, chunk them up, and then recombine them into a single stream of updates. Then we can map over these updates in a single pass, using one closure that can take ownership of the conn. We can do that if we notice this is a bottleneck.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I saw the &mut conn sharing issue too; and by splitting earlier, I meant having separate functions for commit_mutations and commit_deletions, so that we can batch the mutations and deletions "earlier" and avoid committing very small deletion chunks b/c deletion count is often smaller than mutation count https://metrics.sui.io/goto/OcJUs1WNg?orgId=1 It makes the trait more verbose indeed so it has pros and cons.


Ok(updated + deleted)
}
}
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sui_indexer_alt::{
args::Args,
handlers::{
ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
kv_objects::KvObjects, kv_transactions::KvTransactions,
kv_objects::KvObjects, kv_transactions::KvTransactions, sum_obj_types::SumObjTypes,
tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
},
Indexer,
Expand Down Expand Up @@ -38,6 +38,7 @@ async fn main() -> Result<()> {
indexer.concurrent_pipeline::<KvTransactions>().await?;
indexer.concurrent_pipeline::<TxAffectedObjects>().await?;
indexer.concurrent_pipeline::<TxBalanceChanges>().await?;
indexer.sequential_pipeline::<SumObjTypes>().await?;

let h_indexer = indexer.run().await.context("Failed to start indexer")?;

Expand Down
60 changes: 58 additions & 2 deletions crates/sui-indexer-alt/src/models/objects.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::schema::kv_objects;
use diesel::prelude::*;
use diesel::{
backend::Backend, deserialize, expression::AsExpression, prelude::*, serialize,
sql_types::SmallInt, FromSqlRow,
};

use crate::schema::{kv_objects, sum_obj_types};

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = kv_objects, primary_key(object_id, object_version))]
Expand All @@ -11,3 +15,55 @@ pub struct StoredObject {
pub object_version: i64,
pub serialized_object: Option<Vec<u8>>,
}

#[derive(AsExpression, FromSqlRow, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[diesel(sql_type = SmallInt)]
#[repr(i16)]
pub enum StoredOwnerKind {
Immutable = 0,
Address = 1,
Object = 2,
Shared = 3,
}

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = sum_obj_types, primary_key(object_id))]
pub struct StoredSumObjType {
pub object_id: Vec<u8>,
pub object_version: i64,
pub owner_kind: StoredOwnerKind,
amnn marked this conversation as resolved.
Show resolved Hide resolved
pub owner_id: Option<Vec<u8>>,
pub package: Option<Vec<u8>>,
pub module: Option<String>,
pub name: Option<String>,
pub instantiation: Option<Vec<u8>>,
}

impl<DB: Backend> serialize::ToSql<SmallInt, DB> for StoredOwnerKind
where
i16: serialize::ToSql<SmallInt, DB>,
{
fn to_sql<'b>(&'b self, out: &mut serialize::Output<'b, '_, DB>) -> serialize::Result {
match self {
StoredOwnerKind::Immutable => 0.to_sql(out),
StoredOwnerKind::Address => 1.to_sql(out),
StoredOwnerKind::Object => 2.to_sql(out),
StoredOwnerKind::Shared => 3.to_sql(out),
}
}
}

impl<DB: Backend> deserialize::FromSql<SmallInt, DB> for StoredOwnerKind
where
i16: deserialize::FromSql<SmallInt, DB>,
{
fn from_sql(raw: DB::RawValue<'_>) -> deserialize::Result<Self> {
Ok(match i16::from_sql(raw)? {
0 => StoredOwnerKind::Immutable,
1 => StoredOwnerKind::Address,
2 => StoredOwnerKind::Object,
3 => StoredOwnerKind::Shared,
o => return Err(format!("Unexpected StoredOwnerKind: {o}").into()),
})
}
}
14 changes: 14 additions & 0 deletions crates/sui-indexer-alt/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ diesel::table! {
}
}

diesel::table! {
sum_obj_types (object_id) {
object_id -> Bytea,
object_version -> Int8,
owner_kind -> Int2,
owner_id -> Nullable<Bytea>,
package -> Nullable<Bytea>,
module -> Nullable<Text>,
name -> Nullable<Text>,
instantiation -> Nullable<Bytea>,
}
}

diesel::table! {
tx_affected_objects (affected, tx_sequence_number) {
tx_sequence_number -> Int8,
Expand Down Expand Up @@ -83,6 +96,7 @@ diesel::allow_tables_to_appear_in_same_query!(
kv_checkpoints,
kv_objects,
kv_transactions,
sum_obj_types,
tx_affected_objects,
tx_balance_changes,
watermarks,
Expand Down
Loading