Skip to content

Commit

Permalink
[indexer-alt-jsonrpc] Add objects JSONRPC API
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Jan 23, 2025
1 parent 75ea5e3 commit 7962b99
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer-alt-jsonrpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bcs.workspace = true
clap.workspace = true
diesel = { workspace = true, features = ["chrono"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
hex.workspace = true
jsonrpsee = { workspace = true, features = ["macros", "server"] }
pin-project-lite.workspace = true
prometheus.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer-alt-jsonrpc/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod governance;
pub(crate) mod objects;
pub(crate) mod rpc_module;
pub(crate) mod transactions;
175 changes: 175 additions & 0 deletions crates/sui-indexer-alt-jsonrpc/src/api/objects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

// FIXME: Add tests.
// TODO: Migrate to use BigTable for KV storage.

use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use sui_indexer_alt_schema::objects::StoredObject;
use sui_json_rpc_types::{
SuiGetPastObjectRequest, SuiObjectData, SuiObjectDataOptions, SuiObjectRef,
SuiPastObjectResponse,
};
use sui_open_rpc::Module;
use sui_open_rpc_macros::open_rpc;
use sui_types::{
base_types::{ObjectID, SequenceNumber},
digests::ObjectDigest,
object::Object,
};

use crate::{
context::Context,
error::{internal_error, invalid_params},
};

use super::rpc_module::RpcModule;

#[open_rpc(namespace = "sui", tag = "Objects API")]
#[rpc(server, namespace = "sui")]
trait ObjectsApi {
/// Note there is no software-level guarantee/SLA that objects with past versions
/// can be retrieved by this API, even if the object and version exists/existed.
/// The result may vary across nodes depending on their pruning policies.
/// Return the object information for a specified version
#[method(name = "tryGetPastObject")]
async fn try_get_past_object(
&self,
/// the ID of the queried object
object_id: ObjectID,
/// the version of the queried object. If None, default to the latest known version
version: SequenceNumber,
/// options for specifying the content to be returned
options: Option<SuiObjectDataOptions>,
) -> RpcResult<SuiPastObjectResponse>;

/// Note there is no software-level guarantee/SLA that objects with past versions
/// can be retrieved by this API, even if the object and version exists/existed.
/// The result may vary across nodes depending on their pruning policies.
/// Return the object information for a specified version
#[method(name = "tryMultiGetPastObjects")]
async fn try_multi_get_past_objects(
&self,
/// a vector of object and versions to be queried
past_objects: Vec<SuiGetPastObjectRequest>,
/// options for specifying the content to be returned
options: Option<SuiObjectDataOptions>,
) -> RpcResult<Vec<SuiPastObjectResponse>>;
}

pub(crate) struct Objects(pub Context);

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("Object not found: {0} with version {1}")]
NotFound(ObjectID, SequenceNumber),

#[error("Error converting to response: {0}")]
Conversion(anyhow::Error),

#[error("Deserialization error: {0}")]
Deserialization(#[from] bcs::Error),
}

#[async_trait::async_trait]
impl ObjectsApiServer for Objects {
async fn try_get_past_object(
&self,
object_id: ObjectID,
version: SequenceNumber,
options: Option<SuiObjectDataOptions>,
) -> RpcResult<SuiPastObjectResponse> {
let Self(ctx) = self;
let Some(stored) = ctx
.loader()
.load_one((object_id, version))
.await
.map_err(internal_error)?
else {
return Err(invalid_params(Error::NotFound(object_id, version)));
};

let options = options.unwrap_or_default();
response(ctx, &stored, &options)
.await
.map_err(internal_error)
}

async fn try_multi_get_past_objects(
&self,
past_objects: Vec<SuiGetPastObjectRequest>,
options: Option<SuiObjectDataOptions>,
) -> RpcResult<Vec<SuiPastObjectResponse>> {
let Self(ctx) = self;
let stored_objects = ctx
.loader()
.load_many(past_objects.iter().map(|p| (p.object_id, p.version)))
.await
.map_err(internal_error)?;

let mut responses = Vec::with_capacity(past_objects.len());
let options = options.unwrap_or_default();
for request in past_objects {
if let Some(stored) = stored_objects.get(&(request.object_id, request.version)) {
responses.push(
response(ctx, stored, &options)
.await
.map_err(internal_error)?,
);
} else {
responses.push(SuiPastObjectResponse::VersionNotFound(
request.object_id,
request.version,
));
}
}

Ok(responses)
}
}

impl RpcModule for Objects {
fn schema(&self) -> Module {
ObjectsApiOpenRpc::module_doc()
}

fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
self.into_rpc()
}
}

/// Convert the representation of an object from the database into the response format,
/// including the fields requested in the `options`.
/// FIXME: Actually use the options.
pub(crate) async fn response(
_ctx: &Context,
stored: &StoredObject,
_options: &SuiObjectDataOptions,
) -> Result<SuiPastObjectResponse, Error> {
let object_id =
ObjectID::from_bytes(&stored.object_id).map_err(|e| Error::Conversion(e.into()))?;
let version = SequenceNumber::from_u64(stored.object_version as u64);

let Some(serialized_object) = &stored.serialized_object else {
return Ok(SuiPastObjectResponse::ObjectDeleted(SuiObjectRef {
object_id,
version,
digest: ObjectDigest::OBJECT_DIGEST_DELETED,
}));
};
let object: Object = bcs::from_bytes(serialized_object).map_err(Error::Deserialization)?;
let object_data = SuiObjectData {
object_id,
version,
digest: object.digest(),
type_: None,
owner: None,
previous_transaction: None,
storage_rebate: None,
display: None,
content: None,
bcs: None,
};

Ok(SuiPastObjectResponse::VersionFound(object_data))
}
1 change: 1 addition & 0 deletions crates/sui-indexer-alt-jsonrpc/src/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod objects;
pub(crate) mod package_resolver;
pub(crate) mod reader;
pub mod system_package_task;
Expand Down
59 changes: 59 additions & 0 deletions crates/sui-indexer-alt-jsonrpc/src/data/objects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, sync::Arc};

use async_graphql::dataloader::Loader;
use sui_indexer_alt_schema::objects::StoredObject;
use sui_types::{base_types::ObjectID, base_types::SequenceNumber};

use super::reader::{ReadError, Reader};

/// Load objects by key (object_id, version).
#[async_trait::async_trait]
impl Loader<(ObjectID, SequenceNumber)> for Reader {
type Value = StoredObject;
type Error = Arc<ReadError>;

async fn load(
&self,
keys: &[(ObjectID, SequenceNumber)],
) -> Result<HashMap<(ObjectID, SequenceNumber), Self::Value>, Self::Error> {
if keys.is_empty() {
return Ok(HashMap::new());
}

let conditions = keys
.iter()
.map(|key| {
format!(
"object_id = '\\x{}' AND object_version = {:?}",
hex::encode(key.0.as_ref()),
key.1
)
})
.collect::<Vec<_>>();
let query = format!(
"SELECT * FROM kv_objects WHERE {}",
conditions.join(" AND ")
);

let mut conn = self.connect().await.map_err(Arc::new)?;
let objects: Vec<StoredObject> = conn.raw_query(&query).await.map_err(Arc::new)?;

let results: HashMap<_, _> = objects
.into_iter()
.map(|stored| {
(
(
ObjectID::from_bytes(&stored.object_id).unwrap(),
SequenceNumber::from_u64(stored.object_version as u64),
),
stored,
)
})
.collect();

Ok(results)
}
}
21 changes: 21 additions & 0 deletions crates/sui-indexer-alt-jsonrpc/src/data/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use diesel::pg::Pg;
use diesel::query_builder::QueryFragment;
use diesel::query_dsl::methods::LimitDsl;
use diesel::result::Error as DieselError;
use diesel::sql_query;
use diesel_async::methods::LoadQuery;
use diesel_async::RunQueryDsl;
use prometheus::Registry;
Expand Down Expand Up @@ -116,4 +117,24 @@ impl<'p> Connection<'p> {

Ok(res?)
}

// TODO: Use the `RawSqlQuery` from GraphQL implementation.
pub(crate) async fn raw_query<U>(&mut self, query: &str) -> Result<Vec<U>, ReadError>
where
U: diesel::QueryableByName<diesel::pg::Pg> + Send + 'static,
{
debug!("Raw query: {}", query);

let _guard = self.metrics.db_latency.start_timer();

let res = sql_query(query).load::<U>(&mut self.conn).await;

if res.is_ok() {
self.metrics.db_requests_succeeded.inc();
} else {
self.metrics.db_requests_failed.inc();
}

Ok(res?)
}
}
4 changes: 3 additions & 1 deletion crates/sui-indexer-alt-jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use anyhow::Context as _;
use api::objects::Objects;
use api::rpc_module::RpcModule;
use api::transactions::Transactions;
use data::system_package_task::{SystemPackageTask, SystemPackageTaskArgs};
Expand Down Expand Up @@ -189,7 +190,7 @@ impl Default for RpcArgs {
/// command-line). The service will continue to run until the cancellation token is triggered, and
/// will signal cancellation on the token when it is shutting down.
///
/// The service may spin up auxilliary services (such as the system package task) to support
/// The service may spin up auxiliary services (such as the system package task) to support
/// itself, and will clean these up on shutdown as well.
pub async fn start_rpc(
db_args: DbArgs,
Expand All @@ -211,6 +212,7 @@ pub async fn start_rpc(

rpc.add_module(Governance(context.clone()))?;
rpc.add_module(Transactions(context.clone()))?;
rpc.add_module(Objects(context.clone()))?;

let h_rpc = rpc.run().await.context("Failed to start RPC service")?;
let h_system_package_task = system_package_task.run();
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-schema/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use sui_types::object::{Object, Owner};

use crate::schema::{coin_balance_buckets, kv_objects, obj_info, obj_versions};

#[derive(Insertable, Debug, Clone, FieldCount)]
#[derive(Insertable, QueryableByName, Debug, Clone, FieldCount)]
#[diesel(table_name = kv_objects, primary_key(object_id, object_version))]
#[diesel(treat_none_as_default_value = false)]
pub struct StoredObject {
Expand Down

0 comments on commit 7962b99

Please sign in to comment.