Skip to content

Commit

Permalink
add rpc api ingestion to indexer-alt
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickkuo committed Jan 7, 2025
1 parent a0b5616 commit dc90055
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 10 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sui-indexer-alt-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ sui-field-count.workspace = true
sui-pg-db.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
sui-rpc-api.workspace = true
bytes = "1.8.0"

[dev-dependencies]
rand.workspace = true
Expand Down
13 changes: 13 additions & 0 deletions crates/sui-indexer-alt-framework/src/ingestion/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::ingestion::local_client::LocalIngestionClient;
use crate::ingestion::remote_client::RemoteIngestionClient;
use crate::ingestion::rpc_client::RpcIngestionClient;
use crate::ingestion::Error as IngestionError;
use crate::ingestion::Result as IngestionResult;
use crate::metrics::CheckpointLagMetricReporter;
Expand Down Expand Up @@ -76,6 +77,18 @@ impl IngestionClient {
}
}

pub(crate) fn new_rpc(
url: Url,
basic_auth: Option<(String, String)>,
metrics: Arc<IndexerMetrics>,
) -> IngestionResult<Self> {
let client = Arc::new(
RpcIngestionClient::new(url, basic_auth)
.map_err(|e| IngestionError::RpcClientError(e))?,
);
Ok(Self::new_impl(client, metrics))
}

/// Fetch checkpoint data by sequence number.
///
/// This function behaves like `IngestionClient::fetch`, but will repeatedly retry the fetch if
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer-alt-framework/src/ingestion/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ pub enum Error {

#[error("Shutdown signal received, stopping ingestion service")]
Cancelled,

#[error(transparent)]
RpcClientError(anyhow::Error),
}
20 changes: 19 additions & 1 deletion crates/sui-indexer-alt-framework/src/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod error;
mod local_client;
mod regulator;
mod remote_client;
mod rpc_client;
#[cfg(test)]
mod test_utils;

Expand All @@ -39,6 +40,14 @@ pub struct ClientArgs {
/// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used.
#[clap(long, required = true, group = "source")]
pub local_ingestion_path: Option<PathBuf>,

/// Path to the local ingestion directory.
/// If all remote_store_url, local_ingestion_path and rpc_api_url are provided, remote_store_url will be used.
#[clap(long, required = true, group = "source")]
pub rpc_api_url: Option<Url>,

#[clap(long)]
pub basic_auth: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -82,8 +91,15 @@ impl IngestionService {
IngestionClient::new_remote(url.clone(), metrics.clone())?
} else if let Some(path) = args.local_ingestion_path.as_ref() {
IngestionClient::new_local(path.clone(), metrics.clone())
} else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
let basic_auth = args.basic_auth.map(|s| {
let split = s.split(":").collect::<Vec<_>>();
assert_eq!(2, split.len());
(split[0].to_string(), split[1].to_string())
});
IngestionClient::new_rpc(rpc_api_url.clone(), basic_auth, metrics.clone())?
} else {
panic!("Either remote_store_url or local_ingestion_path must be provided");
panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided");
};

let subscribers = Vec::new();
Expand Down Expand Up @@ -204,6 +220,8 @@ mod tests {
ClientArgs {
remote_store_url: Some(Url::parse(&uri).unwrap()),
local_ingestion_path: None,
rpc_api_url: None,
basic_auth: None,
},
IngestionConfig {
checkpoint_buffer_size,
Expand Down
49 changes: 49 additions & 0 deletions crates/sui-indexer-alt-framework/src/ingestion/rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait};
use anyhow::anyhow;
use bytes::Bytes;
use sui_rpc_api::Client;
use sui_storage::blob::{Blob, BlobEncoding};
use url::Url;

pub(crate) struct RpcIngestionClient {
client: Client,
}

impl RpcIngestionClient {
pub(crate) fn new(
url: Url,
basic_auth: Option<(String, String)>,
) -> Result<Self, anyhow::Error> {
let mut client = Client::new(url.to_string())?;
if let Some(basic_auth) = basic_auth {
client = client.with_basic_auth(basic_auth)?;
}
Ok(Self { client })
}
}

#[async_trait::async_trait]
impl IngestionClientTrait for RpcIngestionClient {
async fn fetch(&self, checkpoint: u64) -> FetchResult {
let data = self
.client
.get_full_checkpoint(checkpoint)
.await
.map_err(|e| {
if e.message().contains("not found") {
FetchError::NotFound
} else {
FetchError::Transient {
reason: "io_error",
error: anyhow!(e),
}
}
})?;
Ok(Bytes::from(
Blob::encode(&data, BlobEncoding::Bcs)?.to_bytes(),
))
}
}
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ impl Indexer {
ClientArgs {
remote_store_url: None,
local_ingestion_path: Some(tempdir().unwrap().into_path()),
rpc_api_url: None,
basic_auth: None,
},
IngestionConfig::default(),
&MIGRATIONS,
Expand Down
1 change: 1 addition & 0 deletions crates/sui-rpc-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tonic.workspace = true
prost.workspace = true
prost-types = "0.13.3"
bytes.workspace = true
base64 = "0.22.1"

tonic-health.workspace = true
tonic-reflection.workspace = true
Expand Down
57 changes: 48 additions & 9 deletions crates/sui-rpc-api/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
// SPDX-License-Identifier: Apache-2.0

mod response_ext;

use base64::Engine;
pub use response_ext::ResponseExt;

pub mod sdk;
use sdk::BoxError;

pub use reqwest;
use tap::Pipe;
use tonic::metadata::MetadataMap;
use tonic::metadata::{Ascii, MetadataMap, MetadataValue};

use crate::proto::node::node_client::NodeClient;
use crate::proto::node::{
Expand All @@ -25,16 +27,20 @@ use sui_types::messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSeque
use sui_types::object::Object;
use sui_types::transaction::Transaction;

pub type Result<T, E = tonic::Status> = std::result::Result<T, E>;
pub type Result<T, E = Status> = std::result::Result<T, E>;

use tonic::codegen::InterceptedService;
use tonic::service::Interceptor;
use tonic::transport::channel::ClientTlsConfig;
use tonic::Status;
use tonic::transport::Channel;
use tonic::{Request, Status};

#[derive(Clone)]
pub struct Client {
#[allow(unused)]
uri: http::Uri,
channel: tonic::transport::Channel,
channel: Channel,
basic_auth_value: Option<MetadataValue<Ascii>>,
}

impl Client {
Expand All @@ -56,11 +62,31 @@ impl Client {
}
let channel = endpoint.connect_lazy();

Ok(Self { uri, channel })
Ok(Self {
uri,
channel,
basic_auth_value: None,
})
}

pub fn raw_client(&self) -> NodeClient<tonic::transport::Channel> {
NodeClient::new(self.channel.clone())
pub fn with_basic_auth(mut self, (username, password): (String, String)) -> Result<Self> {
let auth =
base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", username, password));
let value: MetadataValue<Ascii> = format!("Basic {}", auth)
.parse()
.map_err(Into::into)
.map_err(Status::from_error)?;
self.basic_auth_value = Some(value);
Ok(self)
}

pub fn raw_client(&self) -> NodeClient<InterceptedService<Channel, BasicAuthInterceptor>> {
NodeClient::with_interceptor(
self.channel.clone(),
BasicAuthInterceptor {
basic_auth: self.basic_auth_value.clone(),
},
)
}

pub async fn get_latest_checkpoint(&self) -> Result<CertifiedCheckpointSummary> {
Expand Down Expand Up @@ -193,7 +219,7 @@ impl Client {
let request = crate::proto::node::ExecuteTransactionRequest {
transaction: None,
transaction_bcs: Some(
crate::proto::types::Bcs::serialize(&transaction.inner().intent_message.value)
Bcs::serialize(&transaction.inner().intent_message.value)
.map_err(|e| Status::from_error(e.into()))?,
),
signatures: None,
Expand All @@ -204,7 +230,7 @@ impl Client {
effects_bcs: Some(true),
events: Some(false),
events_bcs: Some(true),
..(parameters.to_owned().into())
..parameters.to_owned().into()
}),
};

Expand Down Expand Up @@ -393,3 +419,16 @@ fn status_from_error_with_metadata<T: Into<BoxError>>(err: T, metadata: Metadata
*status.metadata_mut() = metadata;
status
}

pub struct BasicAuthInterceptor {
basic_auth: Option<MetadataValue<Ascii>>,
}

impl Interceptor for BasicAuthInterceptor {
fn call(&mut self, mut request: Request<()>) -> std::result::Result<Request<()>, Status> {
if let Some(auth) = self.basic_auth.as_ref() {
request.metadata_mut().insert("authorization", auth.clone());
}
Ok(request)
}
}

0 comments on commit dc90055

Please sign in to comment.