diff --git a/Cargo.lock b/Cargo.lock index 5e2ec17199542d..ef418f5e9de4f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14034,6 +14034,7 @@ dependencies = [ "futures", "hex", "itertools 0.13.0", + "move-core-types", "rand 0.8.5", "serde", "sui-default-config", diff --git a/crates/sui-indexer-alt-schema/src/objects.rs b/crates/sui-indexer-alt-schema/src/objects.rs index 849960d86de78e..093b310f5e1b29 100644 --- a/crates/sui-indexer-alt-schema/src/objects.rs +++ b/crates/sui-indexer-alt-schema/src/objects.rs @@ -21,7 +21,7 @@ pub struct StoredObject { pub serialized_object: Option>, } -#[derive(Insertable, Debug, Clone, FieldCount)] +#[derive(Insertable, QueryableByName, Selectable, Debug, Clone, FieldCount)] #[diesel(table_name = obj_versions, primary_key(object_id, object_version))] pub struct StoredObjVersion { pub object_id: Vec, diff --git a/crates/sui-indexer-alt/Cargo.toml b/crates/sui-indexer-alt/Cargo.toml index 1291f5173a3055..8b7a6a2ffb4cdc 100644 --- a/crates/sui-indexer-alt/Cargo.toml +++ b/crates/sui-indexer-alt/Cargo.toml @@ -22,6 +22,7 @@ diesel_migrations.workspace = true futures.workspace = true hex.workspace = true itertools.workspace = true +move-core-types.workspace = true serde.workspace = true telemetry-subscribers.workspace = true tokio.workspace = true diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index 3a1c477f26395d..7ac31b362c0963 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -33,6 +33,7 @@ pub(crate) mod bootstrap; pub mod config; pub(crate) mod consistent_pruning; pub(crate) mod handlers; +pub mod queries; pub async fn start_indexer( db_args: DbArgs, diff --git a/crates/sui-indexer-alt/src/queries/mod.rs b/crates/sui-indexer-alt/src/queries/mod.rs new file mode 100644 index 00000000000000..db6fff9e012bbc --- /dev/null +++ b/crates/sui-indexer-alt/src/queries/mod.rs @@ -0,0 +1,4 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +pub mod objects; diff --git a/crates/sui-indexer-alt/src/queries/objects.rs b/crates/sui-indexer-alt/src/queries/objects.rs new file mode 100644 index 00000000000000..4c4056ed28015a --- /dev/null +++ b/crates/sui-indexer-alt/src/queries/objects.rs @@ -0,0 +1,222 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use diesel::dsl::sql_query; +use diesel::{ + sql_types::{BigInt, Bytea}, + QueryableByName, Selectable, +}; +use diesel_async::RunQueryDsl; +use move_core_types::language_storage::StructTag; +use sui_indexer_alt_schema::objects::{StoredObjVersion, StoredOwnerKind}; +use sui_indexer_alt_schema::schema::obj_info; +use sui_pg_db::Db; +use sui_types::base_types::{ObjectID, SuiAddress}; + +pub struct Cursor { + checkpoint: i64, + object_id: ObjectID, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ModuleFilter { + /// Filter the module by the package it's from. + ByPackage(SuiAddress), + + /// Exact match on the module. + ByModule(SuiAddress, String), +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum TypeFilter { + /// Filter the type by the package or module it's from. + ByModule(ModuleFilter), + + /// If the struct tag has type parameters, treat it as an exact filter on that instantiation, + /// otherwise treat it as either a filter on all generic instantiations of the type, or an exact + /// match on the type with no type parameters. E.g. + /// + /// 0x2::coin::Coin + /// + /// would match both 0x2::coin::Coin and 0x2::coin::Coin<0x2::sui::SUI>. + ByType(StructTag), +} + +#[derive(Default, Debug, Clone, Eq, PartialEq)] +pub struct ObjectFilter { + /// Filter objects by their type's `package`, `package::module`, or their fully qualified type + /// name. + /// + /// Generic types can be queried by either the generic type name, e.g. `0x2::coin::Coin`, or by + /// the full type name, such as `0x2::coin::Coin<0x2::sui::SUI>`. + pub type_filter: Option, + + /// Filter for live objects by their current owners. + pub owner_filter: Option, +} + +#[derive(QueryableByName, Selectable, Debug)] +#[diesel(table_name = obj_info)] +struct IdCheckpoint { + #[diesel(sql_type = BigInt)] + cp_sequence_number: i64, + #[diesel(sql_type = Bytea)] + object_id: Vec, +} + +pub async fn query_objects_with_filters( + db: &Db, + view_checkpoint_number: i64, + filters: ObjectFilter, + cursor: Option, + limit: usize, +) -> anyhow::Result> { + let object_ids = + query_object_ids_with_filters(db, view_checkpoint_number, filters, cursor, limit).await?; + query_latest_object_versions(db, &object_ids).await +} + +// TODO: Double check that this function is not prone to SQL injection. +async fn query_object_ids_with_filters( + db: &Db, + view_checkpoint_number: i64, + filters: ObjectFilter, + cursor: Option, + limit: usize, +) -> anyhow::Result> { + let owner_filter_condition = if let Some(owner) = filters.owner_filter { + format!( + " AND owner_kind = {} AND owner_id = '\\x{}'::bytea", + StoredOwnerKind::Address as i16, + hex::encode(owner.to_vec()) + ) + } else { + String::new() + }; + let type_filter = if let Some(type_filter) = filters.type_filter { + match type_filter { + TypeFilter::ByModule(module) => match module { + ModuleFilter::ByPackage(package) => format!( + " AND package = '\\x{}'::bytea", + hex::encode(package.to_vec()) + ), + ModuleFilter::ByModule(package, module) => format!( + " AND package = '\\x{}'::bytea AND module = '{}'", + hex::encode(package.to_vec()), + module + ), + }, + TypeFilter::ByType(struct_tag) => { + format!( + " AND package = '\\x{}'::bytea AND module = '{:?}' AND name = '{:?}' AND instantiation = '\\x{}'::bytea", + hex::encode(struct_tag.address.to_vec()), + struct_tag.module, + struct_tag.name, + hex::encode(bcs::to_bytes(&struct_tag.type_params).unwrap()) + ) + } + } + } else { + String::new() + }; + let cursor_filter_condition = if let Some(cursor) = cursor { + format!( + " AND (cp_sequence_number < {} OR (cp_sequence_number = {} AND object_id > '\\x{}'::bytea))", + cursor.checkpoint, cursor.checkpoint, hex::encode(cursor.object_id) + ) + } else { + String::new() + }; + + let query = format!( + " + WITH filtered_rows AS ( + SELECT + cp_sequence_number, + object_id + FROM + obj_info + WHERE + cp_sequence_number <= {view_checkpoint_number} + {owner_filter_condition} + {type_filter} + {cursor_filter_condition} + ), + max_cp_per_object AS ( + SELECT + object_id, + MAX(cp_sequence_number) AS max_cp_sequence_number + FROM + obj_info + WHERE + cp_sequence_number <= {view_checkpoint_number} + GROUP BY + object_id + ) + SELECT + f.cp_sequence_number, + f.object_id + FROM + filtered_rows f + JOIN + max_cp_per_object m + ON + f.object_id = m.object_id + AND f.cp_sequence_number = m.max_cp_sequence_number + ORDER BY + f.cp_sequence_number DESC, + f.object_id ASC + LIMIT {limit}; + ", + ); + + let sql_query = sql_query(query); + let mut conn = db.connect().await?; + Ok(sql_query.load::(&mut conn).await?) +} + +async fn query_latest_object_versions( + db: &Db, + objects: &[IdCheckpoint], +) -> anyhow::Result> { + if objects.is_empty() { + return Ok(vec![]); + } + let conditions = objects + .iter() + .map(|o| { + format!( + "(object_id = '\\x{}'::bytea AND cp_sequence_number <= {})", + hex::encode(&o.object_id), + o.cp_sequence_number + ) + }) + .collect::>() + .join(" OR "); + let query = format!( + " + SELECT obj_versions.* + FROM obj_versions + JOIN ( + SELECT object_id, MAX(cp_sequence_number) AS max_cp_sequence_number + FROM obj_versions + WHERE {} + GROUP BY object_id + ) AS filtered_objects + ON obj_versions.object_id = filtered_objects.object_id + AND obj_versions.cp_sequence_number = filtered_objects.max_cp_sequence_number + ", + conditions + ); + let sql_query = sql_query(query); + let mut conn = db.connect().await?; + Ok(sql_query.load::(&mut conn).await?) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_query_objects_with_filters() {} +}