diff --git a/Cargo.lock b/Cargo.lock index 5e2ec17199542..a3d35ea35da72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14062,6 +14062,7 @@ dependencies = [ "axum 0.7.5", "backoff", "bb8", + "bytes", "chrono", "clap", "diesel", @@ -14074,6 +14075,7 @@ dependencies = [ "serde", "sui-field-count", "sui-pg-db", + "sui-rpc-api", "sui-storage", "sui-types", "telemetry-subscribers", @@ -15082,6 +15084,7 @@ dependencies = [ "anyhow", "async-trait", "axum 0.7.5", + "base64 0.22.1", "bcs", "bytes", "diffy", diff --git a/crates/sui-indexer-alt-framework/Cargo.toml b/crates/sui-indexer-alt-framework/Cargo.toml index 2c9a172be8efb..8fa74aee2ff9e 100644 --- a/crates/sui-indexer-alt-framework/Cargo.toml +++ b/crates/sui-indexer-alt-framework/Cargo.toml @@ -33,6 +33,8 @@ sui-field-count.workspace = true sui-pg-db.workspace = true sui-storage.workspace = true sui-types.workspace = true +sui-rpc-api.workspace = true +bytes = "1.8.0" [dev-dependencies] rand.workspace = true diff --git a/crates/sui-indexer-alt-framework/src/ingestion/client.rs b/crates/sui-indexer-alt-framework/src/ingestion/client.rs index 25fac78a162ba..03f5919b90ca9 100644 --- a/crates/sui-indexer-alt-framework/src/ingestion/client.rs +++ b/crates/sui-indexer-alt-framework/src/ingestion/client.rs @@ -3,6 +3,7 @@ use crate::ingestion::local_client::LocalIngestionClient; use crate::ingestion::remote_client::RemoteIngestionClient; +use crate::ingestion::rpc_client::RpcIngestionClient; use crate::ingestion::Error as IngestionError; use crate::ingestion::Result as IngestionResult; use crate::metrics::CheckpointLagMetricReporter; @@ -76,6 +77,18 @@ impl IngestionClient { } } + pub(crate) fn new_rpc( + url: Url, + basic_auth: Option<(String, String)>, + metrics: Arc, + ) -> IngestionResult { + let client = Arc::new( + RpcIngestionClient::new(url, basic_auth) + .map_err(|e| IngestionError::RpcClientError(e))?, + ); + Ok(Self::new_impl(client, metrics)) + } + /// Fetch checkpoint data by sequence number. /// /// This function behaves like `IngestionClient::fetch`, but will repeatedly retry the fetch if diff --git a/crates/sui-indexer-alt-framework/src/ingestion/error.rs b/crates/sui-indexer-alt-framework/src/ingestion/error.rs index 17cafe495aa80..0265096f28f66 100644 --- a/crates/sui-indexer-alt-framework/src/ingestion/error.rs +++ b/crates/sui-indexer-alt-framework/src/ingestion/error.rs @@ -22,4 +22,7 @@ pub enum Error { #[error("Shutdown signal received, stopping ingestion service")] Cancelled, + + #[error(transparent)] + RpcClientError(anyhow::Error), } diff --git a/crates/sui-indexer-alt-framework/src/ingestion/mod.rs b/crates/sui-indexer-alt-framework/src/ingestion/mod.rs index 036c3ed0185e9..d57a381a11757 100644 --- a/crates/sui-indexer-alt-framework/src/ingestion/mod.rs +++ b/crates/sui-indexer-alt-framework/src/ingestion/mod.rs @@ -26,6 +26,7 @@ pub mod error; mod local_client; mod regulator; mod remote_client; +mod rpc_client; #[cfg(test)] mod test_utils; @@ -39,6 +40,14 @@ pub struct ClientArgs { /// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used. #[clap(long, required = true, group = "source")] pub local_ingestion_path: Option, + + /// Path to the local ingestion directory. + /// If all remote_store_url, local_ingestion_path and rpc_api_url are provided, remote_store_url will be used. + #[clap(long, required = true, group = "source")] + pub rpc_api_url: Option, + + #[clap(long)] + pub basic_auth: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -82,8 +91,15 @@ impl IngestionService { IngestionClient::new_remote(url.clone(), metrics.clone())? } else if let Some(path) = args.local_ingestion_path.as_ref() { IngestionClient::new_local(path.clone(), metrics.clone()) + } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() { + let basic_auth = args.basic_auth.map(|s| { + let split = s.split(":").collect::>(); + assert_eq!(2, split.len()); + (split[0].to_string(), split[1].to_string()) + }); + IngestionClient::new_rpc(rpc_api_url.clone(), basic_auth, metrics.clone())? } else { - panic!("Either remote_store_url or local_ingestion_path must be provided"); + panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided"); }; let subscribers = Vec::new(); @@ -204,6 +220,8 @@ mod tests { ClientArgs { remote_store_url: Some(Url::parse(&uri).unwrap()), local_ingestion_path: None, + rpc_api_url: None, + basic_auth: None, }, IngestionConfig { checkpoint_buffer_size, diff --git a/crates/sui-indexer-alt-framework/src/ingestion/rpc_client.rs b/crates/sui-indexer-alt-framework/src/ingestion/rpc_client.rs new file mode 100644 index 0000000000000..a1349f7acbac6 --- /dev/null +++ b/crates/sui-indexer-alt-framework/src/ingestion/rpc_client.rs @@ -0,0 +1,49 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait}; +use anyhow::anyhow; +use bytes::Bytes; +use sui_rpc_api::Client; +use sui_storage::blob::{Blob, BlobEncoding}; +use url::Url; + +pub(crate) struct RpcIngestionClient { + client: Client, +} + +impl RpcIngestionClient { + pub(crate) fn new( + url: Url, + basic_auth: Option<(String, String)>, + ) -> Result { + let mut client = Client::new(url.to_string())?; + if let Some(basic_auth) = basic_auth { + client = client.with_basic_auth(basic_auth)?; + } + Ok(Self { client }) + } +} + +#[async_trait::async_trait] +impl IngestionClientTrait for RpcIngestionClient { + async fn fetch(&self, checkpoint: u64) -> FetchResult { + let data = self + .client + .get_full_checkpoint(checkpoint) + .await + .map_err(|e| { + if e.message().contains("not found") { + FetchError::NotFound + } else { + FetchError::Transient { + reason: "io_error", + error: anyhow!(e), + } + } + })?; + Ok(Bytes::from( + Blob::encode(&data, BlobEncoding::Bcs)?.to_bytes(), + )) + } +} diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index ac4d990268295..413f6fd2a78e7 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -185,6 +185,8 @@ impl Indexer { ClientArgs { remote_store_url: None, local_ingestion_path: Some(tempdir().unwrap().into_path()), + rpc_api_url: None, + basic_auth: None, }, IngestionConfig::default(), &MIGRATIONS, diff --git a/crates/sui-rpc-api/Cargo.toml b/crates/sui-rpc-api/Cargo.toml index 60e141c9ce034..d46c443173b72 100644 --- a/crates/sui-rpc-api/Cargo.toml +++ b/crates/sui-rpc-api/Cargo.toml @@ -40,6 +40,7 @@ tonic.workspace = true prost.workspace = true prost-types = "0.13.3" bytes.workspace = true +base64 = "0.22.1" tonic-health.workspace = true tonic-reflection.workspace = true diff --git a/crates/sui-rpc-api/src/client/mod.rs b/crates/sui-rpc-api/src/client/mod.rs index 22c5739d420ba..bf1ef56d705b8 100644 --- a/crates/sui-rpc-api/src/client/mod.rs +++ b/crates/sui-rpc-api/src/client/mod.rs @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 mod response_ext; + +use base64::Engine; pub use response_ext::ResponseExt; pub mod sdk; @@ -9,7 +11,7 @@ use sdk::BoxError; pub use reqwest; use tap::Pipe; -use tonic::metadata::MetadataMap; +use tonic::metadata::{Ascii, MetadataMap, MetadataValue}; use crate::proto::node::node_client::NodeClient; use crate::proto::node::{ @@ -25,16 +27,20 @@ use sui_types::messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSeque use sui_types::object::Object; use sui_types::transaction::Transaction; -pub type Result = std::result::Result; +pub type Result = std::result::Result; +use tonic::codegen::InterceptedService; +use tonic::service::Interceptor; use tonic::transport::channel::ClientTlsConfig; -use tonic::Status; +use tonic::transport::Channel; +use tonic::{Request, Status}; #[derive(Clone)] pub struct Client { #[allow(unused)] uri: http::Uri, - channel: tonic::transport::Channel, + channel: Channel, + basic_auth_value: Option>, } impl Client { @@ -56,11 +62,31 @@ impl Client { } let channel = endpoint.connect_lazy(); - Ok(Self { uri, channel }) + Ok(Self { + uri, + channel, + basic_auth_value: None, + }) } - pub fn raw_client(&self) -> NodeClient { - NodeClient::new(self.channel.clone()) + pub fn with_basic_auth(mut self, (username, password): (String, String)) -> Result { + let auth = + base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", username, password)); + let value: MetadataValue = format!("Basic {}", auth) + .parse() + .map_err(Into::into) + .map_err(Status::from_error)?; + self.basic_auth_value = Some(value); + Ok(self) + } + + pub fn raw_client(&self) -> NodeClient> { + NodeClient::with_interceptor( + self.channel.clone(), + BasicAuthInterceptor { + basic_auth: self.basic_auth_value.clone(), + }, + ) } pub async fn get_latest_checkpoint(&self) -> Result { @@ -193,7 +219,7 @@ impl Client { let request = crate::proto::node::ExecuteTransactionRequest { transaction: None, transaction_bcs: Some( - crate::proto::types::Bcs::serialize(&transaction.inner().intent_message.value) + Bcs::serialize(&transaction.inner().intent_message.value) .map_err(|e| Status::from_error(e.into()))?, ), signatures: None, @@ -204,7 +230,7 @@ impl Client { effects_bcs: Some(true), events: Some(false), events_bcs: Some(true), - ..(parameters.to_owned().into()) + ..parameters.to_owned().into() }), }; @@ -393,3 +419,16 @@ fn status_from_error_with_metadata>(err: T, metadata: Metadata *status.metadata_mut() = metadata; status } + +pub struct BasicAuthInterceptor { + basic_auth: Option>, +} + +impl Interceptor for BasicAuthInterceptor { + fn call(&mut self, mut request: Request<()>) -> std::result::Result, Status> { + if let Some(auth) = self.basic_auth.as_ref() { + request.metadata_mut().insert("authorization", auth.clone()); + } + Ok(request) + } +}