From 7962b992286bba9325da099489a4f35a723c7eab Mon Sep 17 00:00:00 2001 From: Xun Li Date: Thu, 23 Jan 2025 12:18:54 -0800 Subject: [PATCH] [indexer-alt-jsonrpc] Add objects JSONRPC API --- Cargo.lock | 1 + crates/sui-indexer-alt-jsonrpc/Cargo.toml | 1 + crates/sui-indexer-alt-jsonrpc/src/api/mod.rs | 1 + .../src/api/objects.rs | 175 ++++++++++++++++++ .../sui-indexer-alt-jsonrpc/src/data/mod.rs | 1 + .../src/data/objects.rs | 59 ++++++ .../src/data/reader.rs | 21 +++ crates/sui-indexer-alt-jsonrpc/src/lib.rs | 4 +- crates/sui-indexer-alt-schema/src/objects.rs | 2 +- 9 files changed, 263 insertions(+), 2 deletions(-) create mode 100644 crates/sui-indexer-alt-jsonrpc/src/api/objects.rs create mode 100644 crates/sui-indexer-alt-jsonrpc/src/data/objects.rs diff --git a/Cargo.lock b/Cargo.lock index 78d69cfd5bd0c..7fe20bf18aeab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14046,6 +14046,7 @@ dependencies = [ "clap", "diesel", "diesel-async", + "hex", "jsonrpsee", "move-core-types", "pin-project-lite", diff --git a/crates/sui-indexer-alt-jsonrpc/Cargo.toml b/crates/sui-indexer-alt-jsonrpc/Cargo.toml index bc1c09bab321a..70bd0221e8eaa 100644 --- a/crates/sui-indexer-alt-jsonrpc/Cargo.toml +++ b/crates/sui-indexer-alt-jsonrpc/Cargo.toml @@ -19,6 +19,7 @@ bcs.workspace = true clap.workspace = true diesel = { workspace = true, features = ["chrono"] } diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] } +hex.workspace = true jsonrpsee = { workspace = true, features = ["macros", "server"] } pin-project-lite.workspace = true prometheus.workspace = true diff --git a/crates/sui-indexer-alt-jsonrpc/src/api/mod.rs b/crates/sui-indexer-alt-jsonrpc/src/api/mod.rs index 885cd3b02da68..f5fda76c56463 100644 --- a/crates/sui-indexer-alt-jsonrpc/src/api/mod.rs +++ b/crates/sui-indexer-alt-jsonrpc/src/api/mod.rs @@ -2,5 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 pub(crate) mod governance; +pub(crate) mod objects; pub(crate) mod rpc_module; pub(crate) mod transactions; diff --git a/crates/sui-indexer-alt-jsonrpc/src/api/objects.rs b/crates/sui-indexer-alt-jsonrpc/src/api/objects.rs new file mode 100644 index 0000000000000..6930fd5b483cb --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc/src/api/objects.rs @@ -0,0 +1,175 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +// FIXME: Add tests. +// TODO: Migrate to use BigTable for KV storage. + +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use sui_indexer_alt_schema::objects::StoredObject; +use sui_json_rpc_types::{ + SuiGetPastObjectRequest, SuiObjectData, SuiObjectDataOptions, SuiObjectRef, + SuiPastObjectResponse, +}; +use sui_open_rpc::Module; +use sui_open_rpc_macros::open_rpc; +use sui_types::{ + base_types::{ObjectID, SequenceNumber}, + digests::ObjectDigest, + object::Object, +}; + +use crate::{ + context::Context, + error::{internal_error, invalid_params}, +}; + +use super::rpc_module::RpcModule; + +#[open_rpc(namespace = "sui", tag = "Objects API")] +#[rpc(server, namespace = "sui")] +trait ObjectsApi { + /// Note there is no software-level guarantee/SLA that objects with past versions + /// can be retrieved by this API, even if the object and version exists/existed. + /// The result may vary across nodes depending on their pruning policies. + /// Return the object information for a specified version + #[method(name = "tryGetPastObject")] + async fn try_get_past_object( + &self, + /// the ID of the queried object + object_id: ObjectID, + /// the version of the queried object. If None, default to the latest known version + version: SequenceNumber, + /// options for specifying the content to be returned + options: Option, + ) -> RpcResult; + + /// Note there is no software-level guarantee/SLA that objects with past versions + /// can be retrieved by this API, even if the object and version exists/existed. + /// The result may vary across nodes depending on their pruning policies. + /// Return the object information for a specified version + #[method(name = "tryMultiGetPastObjects")] + async fn try_multi_get_past_objects( + &self, + /// a vector of object and versions to be queried + past_objects: Vec, + /// options for specifying the content to be returned + options: Option, + ) -> RpcResult>; +} + +pub(crate) struct Objects(pub Context); + +#[derive(thiserror::Error, Debug)] +pub(crate) enum Error { + #[error("Object not found: {0} with version {1}")] + NotFound(ObjectID, SequenceNumber), + + #[error("Error converting to response: {0}")] + Conversion(anyhow::Error), + + #[error("Deserialization error: {0}")] + Deserialization(#[from] bcs::Error), +} + +#[async_trait::async_trait] +impl ObjectsApiServer for Objects { + async fn try_get_past_object( + &self, + object_id: ObjectID, + version: SequenceNumber, + options: Option, + ) -> RpcResult { + let Self(ctx) = self; + let Some(stored) = ctx + .loader() + .load_one((object_id, version)) + .await + .map_err(internal_error)? + else { + return Err(invalid_params(Error::NotFound(object_id, version))); + }; + + let options = options.unwrap_or_default(); + response(ctx, &stored, &options) + .await + .map_err(internal_error) + } + + async fn try_multi_get_past_objects( + &self, + past_objects: Vec, + options: Option, + ) -> RpcResult> { + let Self(ctx) = self; + let stored_objects = ctx + .loader() + .load_many(past_objects.iter().map(|p| (p.object_id, p.version))) + .await + .map_err(internal_error)?; + + let mut responses = Vec::with_capacity(past_objects.len()); + let options = options.unwrap_or_default(); + for request in past_objects { + if let Some(stored) = stored_objects.get(&(request.object_id, request.version)) { + responses.push( + response(ctx, stored, &options) + .await + .map_err(internal_error)?, + ); + } else { + responses.push(SuiPastObjectResponse::VersionNotFound( + request.object_id, + request.version, + )); + } + } + + Ok(responses) + } +} + +impl RpcModule for Objects { + fn schema(&self) -> Module { + ObjectsApiOpenRpc::module_doc() + } + + fn into_impl(self) -> jsonrpsee::RpcModule { + self.into_rpc() + } +} + +/// Convert the representation of an object from the database into the response format, +/// including the fields requested in the `options`. +/// FIXME: Actually use the options. +pub(crate) async fn response( + _ctx: &Context, + stored: &StoredObject, + _options: &SuiObjectDataOptions, +) -> Result { + let object_id = + ObjectID::from_bytes(&stored.object_id).map_err(|e| Error::Conversion(e.into()))?; + let version = SequenceNumber::from_u64(stored.object_version as u64); + + let Some(serialized_object) = &stored.serialized_object else { + return Ok(SuiPastObjectResponse::ObjectDeleted(SuiObjectRef { + object_id, + version, + digest: ObjectDigest::OBJECT_DIGEST_DELETED, + })); + }; + let object: Object = bcs::from_bytes(serialized_object).map_err(Error::Deserialization)?; + let object_data = SuiObjectData { + object_id, + version, + digest: object.digest(), + type_: None, + owner: None, + previous_transaction: None, + storage_rebate: None, + display: None, + content: None, + bcs: None, + }; + + Ok(SuiPastObjectResponse::VersionFound(object_data)) +} diff --git a/crates/sui-indexer-alt-jsonrpc/src/data/mod.rs b/crates/sui-indexer-alt-jsonrpc/src/data/mod.rs index 74f0034f2b978..ea0cff800c73f 100644 --- a/crates/sui-indexer-alt-jsonrpc/src/data/mod.rs +++ b/crates/sui-indexer-alt-jsonrpc/src/data/mod.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +pub(crate) mod objects; pub(crate) mod package_resolver; pub(crate) mod reader; pub mod system_package_task; diff --git a/crates/sui-indexer-alt-jsonrpc/src/data/objects.rs b/crates/sui-indexer-alt-jsonrpc/src/data/objects.rs new file mode 100644 index 0000000000000..d5f1a77ba2048 --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc/src/data/objects.rs @@ -0,0 +1,59 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashMap, sync::Arc}; + +use async_graphql::dataloader::Loader; +use sui_indexer_alt_schema::objects::StoredObject; +use sui_types::{base_types::ObjectID, base_types::SequenceNumber}; + +use super::reader::{ReadError, Reader}; + +/// Load objects by key (object_id, version). +#[async_trait::async_trait] +impl Loader<(ObjectID, SequenceNumber)> for Reader { + type Value = StoredObject; + type Error = Arc; + + async fn load( + &self, + keys: &[(ObjectID, SequenceNumber)], + ) -> Result, Self::Error> { + if keys.is_empty() { + return Ok(HashMap::new()); + } + + let conditions = keys + .iter() + .map(|key| { + format!( + "object_id = '\\x{}' AND object_version = {:?}", + hex::encode(key.0.as_ref()), + key.1 + ) + }) + .collect::>(); + let query = format!( + "SELECT * FROM kv_objects WHERE {}", + conditions.join(" AND ") + ); + + let mut conn = self.connect().await.map_err(Arc::new)?; + let objects: Vec = conn.raw_query(&query).await.map_err(Arc::new)?; + + let results: HashMap<_, _> = objects + .into_iter() + .map(|stored| { + ( + ( + ObjectID::from_bytes(&stored.object_id).unwrap(), + SequenceNumber::from_u64(stored.object_version as u64), + ), + stored, + ) + }) + .collect(); + + Ok(results) + } +} diff --git a/crates/sui-indexer-alt-jsonrpc/src/data/reader.rs b/crates/sui-indexer-alt-jsonrpc/src/data/reader.rs index 153f892f8f40c..009dc6b51cb86 100644 --- a/crates/sui-indexer-alt-jsonrpc/src/data/reader.rs +++ b/crates/sui-indexer-alt-jsonrpc/src/data/reader.rs @@ -9,6 +9,7 @@ use diesel::pg::Pg; use diesel::query_builder::QueryFragment; use diesel::query_dsl::methods::LimitDsl; use diesel::result::Error as DieselError; +use diesel::sql_query; use diesel_async::methods::LoadQuery; use diesel_async::RunQueryDsl; use prometheus::Registry; @@ -116,4 +117,24 @@ impl<'p> Connection<'p> { Ok(res?) } + + // TODO: Use the `RawSqlQuery` from GraphQL implementation. + pub(crate) async fn raw_query(&mut self, query: &str) -> Result, ReadError> + where + U: diesel::QueryableByName + Send + 'static, + { + debug!("Raw query: {}", query); + + let _guard = self.metrics.db_latency.start_timer(); + + let res = sql_query(query).load::(&mut self.conn).await; + + if res.is_ok() { + self.metrics.db_requests_succeeded.inc(); + } else { + self.metrics.db_requests_failed.inc(); + } + + Ok(res?) + } } diff --git a/crates/sui-indexer-alt-jsonrpc/src/lib.rs b/crates/sui-indexer-alt-jsonrpc/src/lib.rs index 15e318cd0defb..ee66e55cd055b 100644 --- a/crates/sui-indexer-alt-jsonrpc/src/lib.rs +++ b/crates/sui-indexer-alt-jsonrpc/src/lib.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use std::sync::Arc; use anyhow::Context as _; +use api::objects::Objects; use api::rpc_module::RpcModule; use api::transactions::Transactions; use data::system_package_task::{SystemPackageTask, SystemPackageTaskArgs}; @@ -189,7 +190,7 @@ impl Default for RpcArgs { /// command-line). The service will continue to run until the cancellation token is triggered, and /// will signal cancellation on the token when it is shutting down. /// -/// The service may spin up auxilliary services (such as the system package task) to support +/// The service may spin up auxiliary services (such as the system package task) to support /// itself, and will clean these up on shutdown as well. pub async fn start_rpc( db_args: DbArgs, @@ -211,6 +212,7 @@ pub async fn start_rpc( rpc.add_module(Governance(context.clone()))?; rpc.add_module(Transactions(context.clone()))?; + rpc.add_module(Objects(context.clone()))?; let h_rpc = rpc.run().await.context("Failed to start RPC service")?; let h_system_package_task = system_package_task.run(); diff --git a/crates/sui-indexer-alt-schema/src/objects.rs b/crates/sui-indexer-alt-schema/src/objects.rs index a2472ffe6522d..dc30684fe9b5c 100644 --- a/crates/sui-indexer-alt-schema/src/objects.rs +++ b/crates/sui-indexer-alt-schema/src/objects.rs @@ -12,7 +12,7 @@ use sui_types::object::{Object, Owner}; use crate::schema::{coin_balance_buckets, kv_objects, obj_info, obj_versions}; -#[derive(Insertable, Debug, Clone, FieldCount)] +#[derive(Insertable, QueryableByName, Debug, Clone, FieldCount)] #[diesel(table_name = kv_objects, primary_key(object_id, object_version))] #[diesel(treat_none_as_default_value = false)] pub struct StoredObject {