Skip to content

Commit

Permalink
indexer: support force start and end checkpoints (#19987)
Browse files Browse the repository at this point in the history
## Description 

useful for benchmark so that we can replay the same traffic again and
again wo DB ops

## Test plan 

- ci to make sure it does not break other ingestion
- added an ingestion test for the new start and end functions

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp authored Oct 23, 2024
1 parent 4aa1303 commit 6f55a89
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 36 deletions.
2 changes: 2 additions & 0 deletions crates/sui-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ impl Cluster for LocalNewCluster {
None,
Some(data_ingestion_path.path().to_path_buf()),
None, /* cancel */
None, /* start_checkpoint */
None, /* end_checkpoint */
)
.await;
cancellation_tokens.push(writer_token.drop_guard());
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ pub async fn start_network_cluster() -> NetworkCluster {
None,
Some(data_ingestion_path.path().to_path_buf()),
Some(cancellation_token.clone()),
None, /* start_checkpoint */
None, /* end_checkpoint */
)
.await;

Expand Down Expand Up @@ -187,6 +189,8 @@ pub async fn serve_executor(
retention_config,
Some(data_ingestion_path),
Some(cancellation_token.clone()),
None,
None,
)
.await;

Expand Down
12 changes: 12 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ pub struct IngestionConfig {
)]
pub checkpoint_download_queue_size: usize,

/// Start checkpoint to ingest from, this is optional and if not provided, the ingestion will
/// start from the next checkpoint after the latest committed checkpoint.
#[arg(long, env = "START_CHECKPOINT")]
pub start_checkpoint: Option<u64>,

/// End checkpoint to ingest until, this is optional and if not provided, the ingestion will
/// continue until u64::MAX.
#[arg(long, env = "END_CHECKPOINT")]
pub end_checkpoint: Option<u64>,

#[arg(
long,
default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT,
Expand Down Expand Up @@ -146,6 +156,8 @@ impl Default for IngestionConfig {
fn default() -> Self {
Self {
sources: Default::default(),
start_checkpoint: None,
end_checkpoint: None,
checkpoint_download_queue_size: Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE,
checkpoint_download_timeout: Self::DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT,
checkpoint_download_queue_size_bytes:
Expand Down
26 changes: 18 additions & 8 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,20 @@ const CHECKPOINT_QUEUE_SIZE: usize = 100;
pub async fn new_handlers(
state: PgIndexerStore,
metrics: IndexerMetrics,
next_checkpoint_sequence_number: CheckpointSequenceNumber,
cancel: CancellationToken,
committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
) -> Result<CheckpointHandler, IndexerError> {
start_checkpoint_opt: Option<CheckpointSequenceNumber>,
end_checkpoint_opt: Option<CheckpointSequenceNumber>,
) -> Result<(CheckpointHandler, u64), IndexerError> {
let start_checkpoint = match start_checkpoint_opt {
Some(start_checkpoint) => start_checkpoint,
None => state
.get_latest_checkpoint_sequence_number()
.await?
.map(|seq| seq.saturating_add(1))
.unwrap_or_default(),
};

let checkpoint_queue_size = std::env::var("CHECKPOINT_QUEUE_SIZE")
.unwrap_or(CHECKPOINT_QUEUE_SIZE.to_string())
.parse::<usize>()
Expand All @@ -73,14 +83,14 @@ pub async fn new_handlers(
state_clone,
metrics_clone,
indexed_checkpoint_receiver,
next_checkpoint_sequence_number,
cancel.clone(),
committed_checkpoints_tx
committed_checkpoints_tx,
start_checkpoint,
end_checkpoint_opt,
));
Ok(CheckpointHandler::new(
state,
metrics,
indexed_checkpoint_sender,
Ok((
CheckpointHandler::new(state, metrics, indexed_checkpoint_sender),
start_checkpoint,
))
}

Expand Down
16 changes: 15 additions & 1 deletion crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ pub async fn start_tx_checkpoint_commit_task<S>(
state: S,
metrics: IndexerMetrics,
tx_indexing_receiver: mysten_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
cancel: CancellationToken,
mut committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
end_checkpoint_opt: Option<CheckpointSequenceNumber>,
) -> IndexerResult<()>
where
S: IndexerStore + Clone + Sync + Send + 'static,
Expand Down Expand Up @@ -82,11 +83,24 @@ where
);
})?;
}
// stop adding to the commit batch if we've reached the end checkpoint
if let Some(end_checkpoint_sequence_number) = end_checkpoint_opt {
if next_checkpoint_sequence_number > end_checkpoint_sequence_number {
break;
}
}
}
if !batch.is_empty() {
commit_checkpoints(&state, batch, None, &metrics, &mut committed_checkpoints_tx).await;
batch = vec![];
}

// stop the commit task if we've reached the end checkpoint
if let Some(end_checkpoint_sequence_number) = end_checkpoint_opt {
if next_checkpoint_sequence_number > end_checkpoint_sequence_number {
break;
}
}
}
Ok(())
}
Expand Down
26 changes: 19 additions & 7 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ impl<T> CommonHandler<T> {
&self,
cp_receiver: mysten_metrics::metered_channel::Receiver<(CommitterWatermark, T)>,
cancel: CancellationToken,
start_checkpoint: u64,
end_checkpoint_opt: Option<u64>,
) -> IndexerResult<()> {
let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
.unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
Expand All @@ -104,12 +106,7 @@ impl<T> CommonHandler<T> {
// just the checkpoint sequence number, and the tuple is (CommitterWatermark, T).
let mut unprocessed: BTreeMap<u64, (CommitterWatermark, _)> = BTreeMap::new();
let mut tuple_batch = vec![];
let mut next_cp_to_process = self
.handler
.get_watermark_hi()
.await?
.map(|n| n.saturating_add(1))
.unwrap_or_default();
let mut next_cp_to_process = start_checkpoint;

loop {
if cancel.is_cancelled() {
Expand Down Expand Up @@ -140,7 +137,12 @@ impl<T> CommonHandler<T> {

// Process unprocessed checkpoints, even no new checkpoints from stream
let checkpoint_lag_limiter = self.handler.get_max_committable_checkpoint().await?;
while next_cp_to_process <= checkpoint_lag_limiter {
let max_commitable_cp = std::cmp::min(
checkpoint_lag_limiter,
end_checkpoint_opt.unwrap_or(u64::MAX),
);
// Stop pushing to tuple_batch if we've reached the end checkpoint.
while next_cp_to_process <= max_commitable_cp {
if let Some(data_tuple) = unprocessed.remove(&next_cp_to_process) {
tuple_batch.push(data_tuple);
next_cp_to_process += 1;
Expand All @@ -162,6 +164,16 @@ impl<T> CommonHandler<T> {
self.handler.set_watermark_hi(committer_watermark).await?;
tuple_batch = vec![];
}

if let Some(end_checkpoint) = end_checkpoint_opt {
if next_cp_to_process > end_checkpoint {
tracing::info!(
"Reached end checkpoint, stopping handler {}...",
self.handler.name()
);
return Ok(());
}
}
}
Err(IndexerError::ChannelClosed(format!(
"Checkpoint channel is closed unexpectedly for handler {}",
Expand Down
18 changes: 15 additions & 3 deletions crates/sui-indexer/src/handlers/objects_snapshot_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub async fn start_objects_snapshot_handler(
metrics: IndexerMetrics,
snapshot_config: SnapshotLagConfig,
cancel: CancellationToken,
start_checkpoint_opt: Option<u64>,
end_checkpoint_opt: Option<u64>,
) -> IndexerResult<(ObjectsSnapshotHandler, u64)> {
info!("Starting object snapshot handler...");

Expand All @@ -104,10 +106,20 @@ pub async fn start_objects_snapshot_handler(
let objects_snapshot_handler =
ObjectsSnapshotHandler::new(store.clone(), sender, metrics.clone(), snapshot_config);

let watermark_hi = objects_snapshot_handler.get_watermark_hi().await?;
let next_cp_from_db = objects_snapshot_handler
.get_watermark_hi()
.await?
.map(|cp| cp.saturating_add(1))
.unwrap_or_default();
let start_checkpoint = start_checkpoint_opt.unwrap_or(next_cp_from_db);
let common_handler = CommonHandler::new(Box::new(objects_snapshot_handler.clone()));
spawn_monitored_task!(common_handler.start_transform_and_load(receiver, cancel));
Ok((objects_snapshot_handler, watermark_hi.unwrap_or_default()))
spawn_monitored_task!(common_handler.start_transform_and_load(
receiver,
cancel,
start_checkpoint,
end_checkpoint_opt,
));
Ok((objects_snapshot_handler, start_checkpoint))
}

impl ObjectsSnapshotHandler {
Expand Down
28 changes: 13 additions & 15 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ impl Indexer {
);
info!("Sui Indexer Writer config: {config:?}",);

let primary_watermark = store
.get_latest_checkpoint_sequence_number()
.await
.expect("Failed to get latest tx checkpoint sequence number from DB")
.map(|seq| seq + 1)
.unwrap_or_default();

let extra_reader_options = ReaderOptions {
batch_size: config.checkpoint_download_queue_size,
timeout_secs: config.checkpoint_download_timeout,
Expand All @@ -69,6 +62,8 @@ impl Indexer {
metrics.clone(),
snapshot_config,
cancel.clone(),
config.start_checkpoint,
config.end_checkpoint,
)
.await?;

Expand All @@ -90,6 +85,16 @@ impl Indexer {

let mut exit_senders = vec![];
let mut executors = vec![];

let (worker, primary_watermark) = new_handlers(
store,
metrics,
cancel.clone(),
committed_checkpoints_tx,
config.start_checkpoint,
config.end_checkpoint,
)
.await?;
// Ingestion task watermarks are snapshotted once on indexer startup based on the
// corresponding watermark table before being handed off to the ingestion task.
let progress_store = ShimIndexerProgressStore::new(vec![
Expand All @@ -101,14 +106,7 @@ impl Indexer {
2,
DataIngestionMetrics::new(&Registry::new()),
);
let worker = new_handlers(
store,
metrics,
primary_watermark,
cancel.clone(),
committed_checkpoints_tx,
)
.await?;

let worker_pool = WorkerPool::new(
worker,
"primary".to_string(),
Expand Down
44 changes: 43 additions & 1 deletion crates/sui-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub async fn start_indexer_writer_for_testing(
retention_config: Option<RetentionConfig>,
data_ingestion_path: Option<PathBuf>,
cancel: Option<CancellationToken>,
start_checkpoint: Option<u64>,
end_checkpoint: Option<u64>,
) -> (
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
Expand Down Expand Up @@ -117,7 +119,11 @@ pub async fn start_indexer_writer_for_testing(
crate::db::reset_database(connection).await.unwrap();

let store_clone = store.clone();
let mut ingestion_config = IngestionConfig::default();
let mut ingestion_config = IngestionConfig {
start_checkpoint,
end_checkpoint,
..Default::default()
};
ingestion_config.sources.data_ingestion_path = data_ingestion_path;
let token_clone = token.clone();

Expand Down Expand Up @@ -251,6 +257,42 @@ pub async fn set_up(
None,
Some(data_ingestion_path),
None, /* cancel */
None, /* start_checkpoint */
None, /* end_checkpoint */
)
.await;
(server_handle, pg_store, pg_handle, database)
}

pub async fn set_up_with_start_and_end_checkpoints(
sim: Arc<Simulacrum>,
data_ingestion_path: PathBuf,
start_checkpoint: u64,
end_checkpoint: u64,
) -> (
JoinHandle<()>,
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
TempDb,
) {
let database = TempDb::new().unwrap();
let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port())
.parse()
.unwrap();
let server_handle = tokio::spawn(async move {
sui_rest_api::RestService::new_without_version(sim)
.start_service(server_url)
.await;
});
// Starts indexer
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
database.database().url().as_str().to_owned(),
None,
None,
Some(data_ingestion_path),
None, /* cancel */
Some(start_checkpoint),
Some(end_checkpoint),
)
.await;
(server_handle, pg_store, pg_handle, database)
Expand Down
Loading

0 comments on commit 6f55a89

Please sign in to comment.