diff --git a/Cargo.lock b/Cargo.lock index f05294dc4f1f3..763f789031c53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13884,6 +13884,7 @@ dependencies = [ "sui-storage", "sui-types", "telemetry-subscribers", + "tempfile", "thiserror", "tokio", "tokio-stream", diff --git a/crates/sui-indexer-alt/Cargo.toml b/crates/sui-indexer-alt/Cargo.toml index 83305df9fc2cb..c80b0b45808d5 100644 --- a/crates/sui-indexer-alt/Cargo.toml +++ b/crates/sui-indexer-alt/Cargo.toml @@ -39,5 +39,6 @@ sui-types.workspace = true [dev-dependencies] rand.workspace = true wiremock.workspace = true +tempfile.workspace = true sui-types = { workspace = true, features = ["test-utils"] } diff --git a/crates/sui-indexer-alt/src/ingestion/client.rs b/crates/sui-indexer-alt/src/ingestion/client.rs index 4edb2999d869a..b16a7c51daef1 100644 --- a/crates/sui-indexer-alt/src/ingestion/client.rs +++ b/crates/sui-indexer-alt/src/ingestion/client.rs @@ -1,133 +1,108 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{sync::Arc, time::Duration}; - +use crate::ingestion::local_client::LocalIngestionClient; +use crate::ingestion::remote_client::RemoteIngestionClient; +use crate::ingestion::Error as IngestionError; +use crate::ingestion::Result as IngestionResult; +use crate::metrics::IndexerMetrics; +use backoff::Error as BE; use backoff::ExponentialBackoff; -use reqwest::{Client, StatusCode}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; use sui_storage::blob::Blob; use sui_types::full_checkpoint_content::CheckpointData; +use tokio_util::bytes::Bytes; use tokio_util::sync::CancellationToken; -use tracing::{debug, error}; +use tracing::debug; use url::Url; -use crate::ingestion::error::{Error, Result}; -use crate::metrics::IndexerMetrics; - /// Wait at most this long between retries for transient errors. const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60); +#[async_trait::async_trait] +pub(crate) trait IngestionClientTrait: Send + Sync { + async fn fetch(&self, checkpoint: u64) -> FetchResult; +} + +#[derive(thiserror::Error, Debug)] +pub enum FetchError { + #[error("Checkpoint not found")] + NotFound, + #[error("Failed to fetch checkpoint due to permanent error: {0}")] + Permanent(#[from] anyhow::Error), + #[error("Failed to fetch checkpoint due to {reason}: {error}")] + Transient { + reason: &'static str, + #[source] + error: anyhow::Error, + }, +} + +pub type FetchResult = Result; + #[derive(Clone)] pub(crate) struct IngestionClient { - url: Url, - client: Client, + client: Arc, /// Wrap the metrics in an `Arc` to keep copies of the client cheap. metrics: Arc, } impl IngestionClient { - pub(crate) fn new(url: Url, metrics: Arc) -> Result { - Ok(Self { - url, - client: Client::builder().build()?, - metrics, - }) + pub(crate) fn new_remote(url: Url, metrics: Arc) -> IngestionResult { + let client = Arc::new(RemoteIngestionClient::new(url)?); + Ok(IngestionClient { client, metrics }) + } + + pub(crate) fn new_local(path: PathBuf, metrics: Arc) -> Self { + let client = Arc::new(LocalIngestionClient::new(path)); + IngestionClient { client, metrics } } - /// Fetch a checkpoint from the remote store. Repeatedly retries transient errors with an - /// exponential backoff (up to [MAX_RETRY_INTERVAL]), but will immediately return on: - /// - /// - non-transient errors, which include all client errors, except timeouts and rate limiting. + /// Repeatedly retries transient errors with an exponential backoff (up to [MAX_RETRY_INTERVAL]). + /// Transient errors are either defined by the client implementation that + /// returns a `FetchError::Transient` error variant, or within this function + /// if we fail to deserialize the result as [CheckpointData]. + /// The function will immediately return on: + /// - non-transient errors determined by the client implementation, + /// This includes both the FetcherError::NotFound and FetcherError::Permanent variants. /// - cancellation of the supplied `cancel` token. - /// - /// Transient errors include: - /// - /// - failures to issue a request, (network errors, redirect issues, etc) - /// - request timeouts, - /// - rate limiting, - /// - server errors (5xx), - /// - issues getting a full response and deserializing it as [CheckpointData]. pub(crate) async fn fetch( &self, checkpoint: u64, cancel: &CancellationToken, - ) -> Result> { - // SAFETY: The path being joined is statically known to be valid. - let url = self - .url - .join(&format!("/{checkpoint}.chk")) - .expect("Unexpected invalid URL"); - + ) -> IngestionResult> { + let client = self.client.clone(); let request = move || { - let url = url.clone(); + let client = client.clone(); async move { - use backoff::Error as BE; if cancel.is_cancelled() { - return Err(BE::permanent(Error::Cancelled)); + return Err(BE::permanent(IngestionError::Cancelled)); } - let response = self.client.get(url).send().await.map_err(|e| { - self.metrics - .inc_retry(checkpoint, "request", Error::ReqwestError(e)) - })?; - - match response.status() { - code if code.is_success() => { - // Failure to extract all the bytes from the payload, or to deserialize the - // checkpoint from them is considered a transient error -- the store being - // fetched from needs to be corrected, and ingestion will keep retrying it - // until it is. - let bytes = response.bytes().await.map_err(|e| { - self.metrics - .inc_retry(checkpoint, "bytes", Error::ReqwestError(e)) - })?; - - self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64); - let data: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| { - self.metrics.inc_retry( - checkpoint, - "deserialization", - Error::DeserializationError(checkpoint, e), - ) - })?; - - Ok(data) - } - - // Treat 404s as a special case so we can match on this error type. - code @ StatusCode::NOT_FOUND => { - debug!(checkpoint, %code, "Checkpoint not found"); - Err(BE::permanent(Error::NotFound(checkpoint))) + let bytes = client.fetch(checkpoint).await.map_err(|err| match err { + FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)), + FetchError::Permanent(error) => { + BE::permanent(IngestionError::FetchError(checkpoint, error)) } - - // Timeouts are a client error but they are usually transient. - code @ StatusCode::REQUEST_TIMEOUT => Err(self.metrics.inc_retry( + FetchError::Transient { reason, error } => self.metrics.inc_retry( checkpoint, - "timeout", - Error::HttpError(checkpoint, code), - )), - - // Rate limiting is also a client error, but the backoff will eventually widen the - // interval appropriately. - code @ StatusCode::TOO_MANY_REQUESTS => Err(self.metrics.inc_retry( - checkpoint, - "too_many_requests", - Error::HttpError(checkpoint, code), - )), + reason, + IngestionError::FetchError(checkpoint, error), + ), + })?; - // Assume that if the server is facing difficulties, it will recover eventually. - code if code.is_server_error() => Err(self.metrics.inc_retry( + self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64); + let data: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| { + self.metrics.inc_retry( checkpoint, - "server_error", - Error::HttpError(checkpoint, code), - )), + "deserialization", + IngestionError::DeserializationError(checkpoint, e), + ) + })?; - // For everything else, assume it's a permanent error and don't retry. - code => { - error!(checkpoint, %code, "Permanent error, giving up!"); - Err(BE::permanent(Error::HttpError(checkpoint, code))) - } - } + Ok(data) } }; @@ -178,239 +153,3 @@ impl IngestionClient { Ok(Arc::new(data)) } } - -#[cfg(test)] -pub(crate) mod tests { - use std::sync::Mutex; - - use rand::{rngs::StdRng, SeedableRng}; - use sui_storage::blob::BlobEncoding; - use sui_types::{ - crypto::KeypairTraits, - gas::GasCostSummary, - messages_checkpoint::{ - CertifiedCheckpointSummary, CheckpointContents, CheckpointSummary, - SignedCheckpointSummary, - }, - supported_protocol_versions::ProtocolConfig, - utils::make_committee_key, - }; - use wiremock::{ - matchers::{method, path_regex}, - Mock, MockServer, Request, Respond, ResponseTemplate, - }; - - use crate::metrics::tests::test_metrics; - - use super::*; - - const RNG_SEED: [u8; 32] = [ - 21, 23, 199, 200, 234, 250, 252, 178, 94, 15, 202, 178, 62, 186, 88, 137, 233, 192, 130, - 157, 179, 179, 65, 9, 31, 249, 221, 123, 225, 112, 199, 247, - ]; - - pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) { - Mock::given(method("GET")) - .and(path_regex(r"/\d+.chk")) - .respond_with(response) - .mount(server) - .await; - } - - pub(crate) fn status(code: StatusCode) -> ResponseTemplate { - ResponseTemplate::new(code.as_u16()) - } - - pub(crate) fn test_checkpoint_data(cp: u64) -> Vec { - let mut rng = StdRng::from_seed(RNG_SEED); - let (keys, committee) = make_committee_key(&mut rng); - let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]); - let summary = CheckpointSummary::new( - &ProtocolConfig::get_for_max_version_UNSAFE(), - 0, - cp, - 0, - &contents, - None, - GasCostSummary::default(), - None, - 0, - Vec::new(), - ); - - let sign_infos: Vec<_> = keys - .iter() - .map(|k| { - let name = k.public().into(); - SignedCheckpointSummary::sign(committee.epoch, &summary, k, name) - }) - .collect(); - - let checkpoint_data = CheckpointData { - checkpoint_summary: CertifiedCheckpointSummary::new(summary, sign_infos, &committee) - .unwrap(), - checkpoint_contents: contents, - transactions: vec![], - }; - - Blob::encode(&checkpoint_data, BlobEncoding::Bcs) - .unwrap() - .to_bytes() - } - - fn test_client(uri: String) -> IngestionClient { - IngestionClient::new(Url::parse(&uri).unwrap(), Arc::new(test_metrics())).unwrap() - } - - #[tokio::test] - async fn fail_on_not_found() { - let server = MockServer::start().await; - respond_with(&server, status(StatusCode::NOT_FOUND)).await; - - let client = test_client(server.uri()); - let error = client - .fetch(42, &CancellationToken::new()) - .await - .unwrap_err(); - - assert!(matches!(error, Error::NotFound(42))); - } - - #[tokio::test] - async fn fail_on_client_error() { - let server = MockServer::start().await; - respond_with(&server, status(StatusCode::IM_A_TEAPOT)).await; - - let client = test_client(server.uri()); - let error = client - .fetch(42, &CancellationToken::new()) - .await - .unwrap_err(); - - assert!(matches!( - error, - Error::HttpError(42, StatusCode::IM_A_TEAPOT) - )); - } - - /// Even if the server is repeatedly returning transient errors, it is possible to cancel the - /// fetch request via its cancellation token. - #[tokio::test] - async fn fail_on_cancel() { - let cancel = CancellationToken::new(); - let server = MockServer::start().await; - - // This mock server repeatedly returns internal server errors, but will also send a - // cancellation with the second request (this is a bit of a contrived test set-up). - let times: Mutex = Mutex::new(0); - let server_cancel = cancel.clone(); - respond_with(&server, move |_: &Request| { - let mut times = times.lock().unwrap(); - *times += 1; - - if *times > 2 { - server_cancel.cancel(); - } - - status(StatusCode::INTERNAL_SERVER_ERROR) - }) - .await; - - let client = test_client(server.uri()); - let error = client.fetch(42, &cancel.clone()).await.unwrap_err(); - - assert!(matches!(error, Error::Cancelled)); - } - - /// Assume that failures to send the request to the remote store are due to temporary - /// connectivity issues, and retry them. - #[tokio::test] - async fn retry_on_request_error() { - let server = MockServer::start().await; - - let times: Mutex = Mutex::new(0); - respond_with(&server, move |r: &Request| { - let mut times = times.lock().unwrap(); - *times += 1; - match (*times, r.url.path()) { - // The first request will trigger a redirect to 0.chk no matter what the original - // request was for -- triggering a request error. - (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"), - - // Set-up checkpoint 0 as an infinite redirect loop. - (_, "/0.chk") => { - status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str()) - } - - // Subsequently, requests will fail with a permanent error, this is what we expect - // to see. - _ => status(StatusCode::IM_A_TEAPOT), - } - }) - .await; - - let client = test_client(server.uri()); - let error = client - .fetch(42, &CancellationToken::new()) - .await - .unwrap_err(); - - assert!( - matches!(error, Error::HttpError(42, StatusCode::IM_A_TEAPOT),), - "{error}" - ); - } - - /// Assume that certain errors will recover by themselves, and keep retrying with an - /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429 - /// (rate limiting). - #[tokio::test] - async fn retry_on_transient_server_error() { - let server = MockServer::start().await; - let times: Mutex = Mutex::new(0); - respond_with(&server, move |_: &Request| { - let mut times = times.lock().unwrap(); - *times += 1; - status(match *times { - 1 => StatusCode::INTERNAL_SERVER_ERROR, - 2 => StatusCode::REQUEST_TIMEOUT, - 3 => StatusCode::TOO_MANY_REQUESTS, - _ => StatusCode::IM_A_TEAPOT, - }) - }) - .await; - - let client = test_client(server.uri()); - let error = client - .fetch(42, &CancellationToken::new()) - .await - .unwrap_err(); - - assert!(matches!( - error, - Error::HttpError(42, StatusCode::IM_A_TEAPOT) - )); - } - - /// Treat deserialization failure as another kind of transient error -- all checkpoint data - /// that is fetched should be valid (deserializable as a `CheckpointData`). - #[tokio::test] - async fn retry_on_deserialization_error() { - let server = MockServer::start().await; - let times: Mutex = Mutex::new(0); - respond_with(&server, move |_: &Request| { - let mut times = times.lock().unwrap(); - *times += 1; - if *times < 3 { - status(StatusCode::OK).set_body_bytes(vec![]) - } else { - status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)) - } - }) - .await; - - let client = test_client(server.uri()); - let checkpoint = client.fetch(42, &CancellationToken::new()).await.unwrap(); - assert_eq!(42, checkpoint.checkpoint_summary.sequence_number) - } -} diff --git a/crates/sui-indexer-alt/src/ingestion/error.rs b/crates/sui-indexer-alt/src/ingestion/error.rs index 78fff94d46d8f..17cafe495aa80 100644 --- a/crates/sui-indexer-alt/src/ingestion/error.rs +++ b/crates/sui-indexer-alt/src/ingestion/error.rs @@ -1,8 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use reqwest::StatusCode; - pub type Result = std::result::Result; #[derive(thiserror::Error, Debug)] @@ -14,7 +12,7 @@ pub enum Error { DeserializationError(u64, #[source] anyhow::Error), #[error("Failed to fetch checkpoint {0}: {1}")] - HttpError(u64, StatusCode), + FetchError(u64, #[source] anyhow::Error), #[error(transparent)] ReqwestError(#[from] reqwest::Error), diff --git a/crates/sui-indexer-alt/src/ingestion/local_client.rs b/crates/sui-indexer-alt/src/ingestion/local_client.rs new file mode 100644 index 0000000000000..2efb6708939ff --- /dev/null +++ b/crates/sui-indexer-alt/src/ingestion/local_client.rs @@ -0,0 +1,65 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait}; +use axum::body::Bytes; +use std::path::PathBuf; + +pub struct LocalIngestionClient { + path: PathBuf, +} + +impl LocalIngestionClient { + pub fn new(path: PathBuf) -> Self { + LocalIngestionClient { path } + } +} + +#[async_trait::async_trait] +impl IngestionClientTrait for LocalIngestionClient { + async fn fetch(&self, checkpoint: u64) -> FetchResult { + let path = self.path.join(format!("{}.chk", checkpoint)); + let bytes = tokio::fs::read(path).await.map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + FetchError::NotFound + } else { + FetchError::Transient { + reason: "io_error", + error: e.into(), + } + } + })?; + Ok(Bytes::from(bytes)) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use crate::ingestion::client::IngestionClient; + use crate::ingestion::test_utils::test_checkpoint_data; + use crate::metrics::tests::test_metrics; + use std::sync::Arc; + use sui_storage::blob::{Blob, BlobEncoding}; + use tokio_util::sync::CancellationToken; + + #[tokio::test] + async fn local_test_fetch() { + let tempdir = tempfile::tempdir().unwrap().into_path(); + let path = tempdir.join("1.chk"); + let test_checkpoint = test_checkpoint_data(1); + tokio::fs::write(&path, &test_checkpoint).await.unwrap(); + + let metrics = Arc::new(test_metrics()); + let local_client = IngestionClient::new_local(tempdir, metrics); + let checkpoint = local_client + .fetch(1, &CancellationToken::new()) + .await + .unwrap(); + assert_eq!( + Blob::encode(&*checkpoint, BlobEncoding::Bcs) + .unwrap() + .to_bytes(), + test_checkpoint + ); + } +} diff --git a/crates/sui-indexer-alt/src/ingestion/mod.rs b/crates/sui-indexer-alt/src/ingestion/mod.rs index 385b2026a5acc..0e726cccc891e 100644 --- a/crates/sui-indexer-alt/src/ingestion/mod.rs +++ b/crates/sui-indexer-alt/src/ingestion/mod.rs @@ -1,23 +1,26 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{sync::Arc, time::Duration}; - +use crate::ingestion::client::IngestionClient; +use crate::ingestion::error::{Error, Result}; +use crate::metrics::IndexerMetrics; use backoff::backoff::Constant; -use client::IngestionClient; use futures::{future::try_join_all, stream, StreamExt, TryStreamExt}; use mysten_metrics::spawn_monitored_task; +use std::path::PathBuf; +use std::{sync::Arc, time::Duration}; use sui_types::full_checkpoint_content::CheckpointData; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; use url::Url; -use crate::ingestion::error::{Error, Result}; -use crate::metrics::IndexerMetrics; - mod client; pub mod error; +mod local_client; +mod remote_client; +#[cfg(test)] +mod test_utils; pub struct IngestionService { config: IngestionConfig, @@ -30,8 +33,13 @@ pub struct IngestionService { #[derive(clap::Args, Debug, Clone)] pub struct IngestionConfig { /// Remote Store to fetch checkpoints from. - #[arg(long)] - remote_store_url: Url, + #[arg(long, required = true, group = "source")] + remote_store_url: Option, + + /// Path to the local ingestion directory. + /// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used. + #[arg(long, required = true, group = "source")] + local_ingestion_path: Option, /// Maximum size of checkpoint backlog across all workers downstream of the ingestion service. #[arg(long, default_value_t = 5000)] @@ -57,9 +65,18 @@ impl IngestionService { metrics: Arc, cancel: CancellationToken, ) -> Result { + // TODO: Potentially support a hybrid mode where we can fetch from both local and remote. + let client = if let Some(url) = config.remote_store_url.as_ref() { + IngestionClient::new_remote(url.clone(), metrics.clone())? + } else if let Some(path) = config.local_ingestion_path.as_ref() { + IngestionClient::new_local(path.clone(), metrics.clone()) + } else { + panic!("Either remote_store_url or local_ingestion_path must be provided"); + }; + let subscribers = Vec::new(); Ok(Self { - client: IngestionClient::new(config.remote_store_url.clone(), metrics.clone())?, - subscribers: Vec::new(), + client, + subscribers, config, metrics, cancel, @@ -83,7 +100,7 @@ impl IngestionService { /// /// - If a subscriber is lagging (not receiving checkpoints fast enough), it will eventually /// provide back-pressure to the ingestion service, which will stop fetching new checkpoints. - /// - If a subscriber closes its channel, the ingestion service will intepret that as a signal + /// - If a subscriber closes its channel, the ingestion service will interpret that as a signal /// to shutdown as well. /// /// If ingestion reaches the leading edge of the network, it will encounter checkpoints that do @@ -187,7 +204,8 @@ mod tests { use reqwest::StatusCode; use wiremock::{MockServer, Request}; - use crate::ingestion::client::tests::{respond_with, status, test_checkpoint_data}; + use crate::ingestion::remote_client::tests::{respond_with, status}; + use crate::ingestion::test_utils::test_checkpoint_data; use crate::metrics::tests::test_metrics; use super::*; @@ -200,7 +218,8 @@ mod tests { ) -> IngestionService { IngestionService::new( IngestionConfig { - remote_store_url: Url::parse(&uri).unwrap(), + remote_store_url: Some(Url::parse(&uri).unwrap()), + local_ingestion_path: None, buffer_size, concurrency, retry_interval: Duration::from_millis(200), diff --git a/crates/sui-indexer-alt/src/ingestion/remote_client.rs b/crates/sui-indexer-alt/src/ingestion/remote_client.rs new file mode 100644 index 0000000000000..c4f91fee57990 --- /dev/null +++ b/crates/sui-indexer-alt/src/ingestion/remote_client.rs @@ -0,0 +1,292 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait}; +use crate::ingestion::Result as IngestionResult; +use reqwest::{Client, StatusCode}; +use tracing::{debug, error}; +use url::Url; + +#[derive(thiserror::Error, Debug, Eq, PartialEq)] +pub enum HttpError { + #[error("HTTP error with status code: {0}")] + Http(StatusCode), +} + +fn status_code_to_error(code: StatusCode) -> anyhow::Error { + HttpError::Http(code).into() +} + +pub(crate) struct RemoteIngestionClient { + url: Url, + client: Client, +} + +impl RemoteIngestionClient { + pub(crate) fn new(url: Url) -> IngestionResult { + Ok(Self { + url, + client: Client::builder().build()?, + }) + } +} + +#[async_trait::async_trait] +impl IngestionClientTrait for RemoteIngestionClient { + /// Fetch a checkpoint from the remote store. + /// + /// Transient errors include: + /// + /// - failures to issue a request, (network errors, redirect issues, etc) + /// - request timeouts, + /// - rate limiting, + /// - server errors (5xx), + /// - issues getting a full response. + async fn fetch(&self, checkpoint: u64) -> FetchResult { + // SAFETY: The path being joined is statically known to be valid. + let url = self + .url + .join(&format!("/{checkpoint}.chk")) + .expect("Unexpected invalid URL"); + + let response = self + .client + .get(url) + .send() + .await + .map_err(|e| FetchError::Transient { + reason: "request", + error: e.into(), + })?; + + match response.status() { + code if code.is_success() => { + // Failure to extract all the bytes from the payload, or to deserialize the + // checkpoint from them is considered a transient error -- the store being + // fetched from needs to be corrected, and ingestion will keep retrying it + // until it is. + response.bytes().await.map_err(|e| FetchError::Transient { + reason: "bytes", + error: e.into(), + }) + } + + // Treat 404s as a special case so we can match on this error type. + code @ StatusCode::NOT_FOUND => { + debug!(checkpoint, %code, "Checkpoint not found"); + Err(FetchError::NotFound) + } + + // Timeouts are a client error but they are usually transient. + code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient { + reason: "timeout", + error: status_code_to_error(code), + }), + + // Rate limiting is also a client error, but the backoff will eventually widen the + // interval appropriately. + code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient { + reason: "too_many_requests", + error: status_code_to_error(code), + }), + + // Assume that if the server is facing difficulties, it will recover eventually. + code if code.is_server_error() => Err(FetchError::Transient { + reason: "server_error", + error: status_code_to_error(code), + }), + + // For everything else, assume it's a permanent error and don't retry. + code => { + error!(checkpoint, %code, "Permanent error, giving up!"); + Err(FetchError::Permanent(status_code_to_error(code))) + } + } + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use crate::ingestion::client::IngestionClient; + use crate::ingestion::error::Error; + use crate::ingestion::test_utils::test_checkpoint_data; + use crate::metrics::tests::test_metrics; + use axum::http::StatusCode; + use std::sync::{Arc, Mutex}; + use tokio_util::sync::CancellationToken; + use wiremock::{ + matchers::{method, path_regex}, + Mock, MockServer, Request, Respond, ResponseTemplate, + }; + + pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) { + Mock::given(method("GET")) + .and(path_regex(r"/\d+.chk")) + .respond_with(response) + .mount(server) + .await; + } + + pub(crate) fn status(code: StatusCode) -> ResponseTemplate { + ResponseTemplate::new(code.as_u16()) + } + + fn remote_test_client(uri: String) -> IngestionClient { + IngestionClient::new_remote(Url::parse(&uri).unwrap(), Arc::new(test_metrics())).unwrap() + } + + fn assert_http_error(error: Error, checkpoint: u64, code: StatusCode) { + let Error::FetchError(c, inner) = error else { + panic!("Expected FetchError, got: {:?}", error); + }; + assert_eq!(c, checkpoint); + let Some(http_error) = inner.downcast_ref::() else { + panic!("Expected HttpError, got: {:?}", inner); + }; + assert_eq!(http_error, &HttpError::Http(code)); + } + + #[tokio::test] + async fn fail_on_not_found() { + let server = MockServer::start().await; + respond_with(&server, status(StatusCode::NOT_FOUND)).await; + + let client = remote_test_client(server.uri()); + let error = client + .fetch(42, &CancellationToken::new()) + .await + .unwrap_err(); + + assert!(matches!(error, Error::NotFound(42))); + } + + #[tokio::test] + async fn fail_on_client_error() { + let server = MockServer::start().await; + respond_with(&server, status(StatusCode::IM_A_TEAPOT)).await; + + let client = remote_test_client(server.uri()); + let error = client + .fetch(42, &CancellationToken::new()) + .await + .unwrap_err(); + + assert_http_error(error, 42, StatusCode::IM_A_TEAPOT); + } + + /// Even if the server is repeatedly returning transient errors, it is possible to cancel the + /// fetch request via its cancellation token. + #[tokio::test] + async fn fail_on_cancel() { + let cancel = CancellationToken::new(); + let server = MockServer::start().await; + + // This mock server repeatedly returns internal server errors, but will also send a + // cancellation with the second request (this is a bit of a contrived test set-up). + let times: Mutex = Mutex::new(0); + let server_cancel = cancel.clone(); + respond_with(&server, move |_: &Request| { + let mut times = times.lock().unwrap(); + *times += 1; + + if *times > 2 { + server_cancel.cancel(); + } + + status(StatusCode::INTERNAL_SERVER_ERROR) + }) + .await; + + let client = remote_test_client(server.uri()); + let error = client.fetch(42, &cancel.clone()).await.unwrap_err(); + + assert!(matches!(error, Error::Cancelled)); + } + + /// Assume that failures to send the request to the remote store are due to temporary + /// connectivity issues, and retry them. + #[tokio::test] + async fn retry_on_request_error() { + let server = MockServer::start().await; + + let times: Mutex = Mutex::new(0); + respond_with(&server, move |r: &Request| { + let mut times = times.lock().unwrap(); + *times += 1; + match (*times, r.url.path()) { + // The first request will trigger a redirect to 0.chk no matter what the original + // request was for -- triggering a request error. + (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"), + + // Set-up checkpoint 0 as an infinite redirect loop. + (_, "/0.chk") => { + status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str()) + } + + // Subsequently, requests will fail with a permanent error, this is what we expect + // to see. + _ => status(StatusCode::IM_A_TEAPOT), + } + }) + .await; + + let client = remote_test_client(server.uri()); + let error = client + .fetch(42, &CancellationToken::new()) + .await + .unwrap_err(); + + assert_http_error(error, 42, StatusCode::IM_A_TEAPOT); + } + + /// Assume that certain errors will recover by themselves, and keep retrying with an + /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429 + /// (rate limiting). + #[tokio::test] + async fn retry_on_transient_server_error() { + let server = MockServer::start().await; + let times: Mutex = Mutex::new(0); + respond_with(&server, move |_: &Request| { + let mut times = times.lock().unwrap(); + *times += 1; + status(match *times { + 1 => StatusCode::INTERNAL_SERVER_ERROR, + 2 => StatusCode::REQUEST_TIMEOUT, + 3 => StatusCode::TOO_MANY_REQUESTS, + _ => StatusCode::IM_A_TEAPOT, + }) + }) + .await; + + let client = remote_test_client(server.uri()); + let error = client + .fetch(42, &CancellationToken::new()) + .await + .unwrap_err(); + + assert_http_error(error, 42, StatusCode::IM_A_TEAPOT); + } + + /// Treat deserialization failure as another kind of transient error -- all checkpoint data + /// that is fetched should be valid (deserializable as a `CheckpointData`). + #[tokio::test] + async fn retry_on_deserialization_error() { + let server = MockServer::start().await; + let times: Mutex = Mutex::new(0); + respond_with(&server, move |_: &Request| { + let mut times = times.lock().unwrap(); + *times += 1; + if *times < 3 { + status(StatusCode::OK).set_body_bytes(vec![]) + } else { + status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)) + } + }) + .await; + + let client = remote_test_client(server.uri()); + let checkpoint = client.fetch(42, &CancellationToken::new()).await.unwrap(); + assert_eq!(42, checkpoint.checkpoint_summary.sequence_number) + } +} diff --git a/crates/sui-indexer-alt/src/ingestion/test_utils.rs b/crates/sui-indexer-alt/src/ingestion/test_utils.rs new file mode 100644 index 0000000000000..99f130927d0bf --- /dev/null +++ b/crates/sui-indexer-alt/src/ingestion/test_utils.rs @@ -0,0 +1,56 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use rand::prelude::StdRng; +use rand::SeedableRng; +use sui_storage::blob::{Blob, BlobEncoding}; +use sui_types::crypto::KeypairTraits; +use sui_types::full_checkpoint_content::CheckpointData; +use sui_types::gas::GasCostSummary; +use sui_types::messages_checkpoint::{ + CertifiedCheckpointSummary, CheckpointContents, CheckpointSummary, SignedCheckpointSummary, +}; +use sui_types::supported_protocol_versions::ProtocolConfig; +use sui_types::utils::make_committee_key; + +const RNG_SEED: [u8; 32] = [ + 21, 23, 199, 200, 234, 250, 252, 178, 94, 15, 202, 178, 62, 186, 88, 137, 233, 192, 130, 157, + 179, 179, 65, 9, 31, 249, 221, 123, 225, 112, 199, 247, +]; + +pub(crate) fn test_checkpoint_data(cp: u64) -> Vec { + let mut rng = StdRng::from_seed(RNG_SEED); + let (keys, committee) = make_committee_key(&mut rng); + let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]); + let summary = CheckpointSummary::new( + &ProtocolConfig::get_for_max_version_UNSAFE(), + 0, + cp, + 0, + &contents, + None, + GasCostSummary::default(), + None, + 0, + Vec::new(), + ); + + let sign_infos: Vec<_> = keys + .iter() + .map(|k| { + let name = k.public().into(); + SignedCheckpointSummary::sign(committee.epoch, &summary, k, name) + }) + .collect(); + + let checkpoint_data = CheckpointData { + checkpoint_summary: CertifiedCheckpointSummary::new(summary, sign_infos, &committee) + .unwrap(), + checkpoint_contents: contents, + transactions: vec![], + }; + + Blob::encode(&checkpoint_data, BlobEncoding::Bcs) + .unwrap() + .to_bytes() +}