diff --git a/Cargo.lock b/Cargo.lock index 550aa462be0..85858010fa8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6525,6 +6525,7 @@ dependencies = [ "telemetry-subscribers", "tempfile", "tokio", + "tokio-util 0.7.12", "tracing", "url", ] @@ -6552,6 +6553,7 @@ dependencies = [ "tap", "tempfile", "tokio", + "tokio-util 0.7.12", "tracing", "url", ] @@ -7007,6 +7009,7 @@ dependencies = [ "prometheus", "telemetry-subscribers", "tokio", + "tokio-util 0.7.12", "tracing", ] diff --git a/crates/iota-analytics-indexer/src/main.rs b/crates/iota-analytics-indexer/src/main.rs index 3e08ce16efc..281f6ade02e 100644 --- a/crates/iota-analytics-indexer/src/main.rs +++ b/crates/iota-analytics-indexer/src/main.rs @@ -42,7 +42,7 @@ async fn main() -> Result<()> { batch_size: 10, ..Default::default() }; - let (executor, exit_sender) = setup_single_workflow( + let (executor, token) = setup_single_workflow( processor, remote_store_url, watermark, @@ -51,13 +51,11 @@ async fn main() -> Result<()> { ) .await?; - tokio::spawn(async { + tokio::spawn(async move { signal::ctrl_c() .await .expect("Failed to install Ctrl+C handler"); - exit_sender - .send(()) - .expect("Failed to gracefully process shutdown"); + token.cancel(); }); executor.await?; Ok(()) diff --git a/crates/iota-data-ingestion-core/Cargo.toml b/crates/iota-data-ingestion-core/Cargo.toml index 7c81d572656..bd0d00b3099 100644 --- a/crates/iota-data-ingestion-core/Cargo.toml +++ b/crates/iota-data-ingestion-core/Cargo.toml @@ -21,6 +21,7 @@ serde_json.workspace = true tap.workspace = true tempfile.workspace = true tokio = { workspace = true, features = ["full"] } +tokio-util.workspace = true tracing.workspace = true url.workspace = true diff --git a/crates/iota-data-ingestion-core/src/executor.rs b/crates/iota-data-ingestion-core/src/executor.rs index d9e4906fb47..9b388ab8063 100644 --- a/crates/iota-data-ingestion-core/src/executor.rs +++ b/crates/iota-data-ingestion-core/src/executor.rs @@ -11,7 +11,8 @@ use iota_types::{ full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber, }; use prometheus::Registry; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; use crate::{ DataIngestionMetrics, ReaderOptions, Worker, @@ -65,7 +66,7 @@ impl IndexerExecutor

{ remote_store_url: Option, remote_store_options: Vec<(String, String)>, reader_options: ReaderOptions, - mut exit_receiver: oneshot::Receiver<()>, + token: CancellationToken, ) -> Result { let mut reader_checkpoint_number = self.progress_store.min_watermark()?; let (checkpoint_reader, mut checkpoint_recv, gc_sender, _exit_sender) = @@ -83,7 +84,7 @@ impl IndexerExecutor

{ } loop { tokio::select! { - _ = &mut exit_receiver => break, + _ = token.cancelled() => break, Some((task_name, sequence_number)) = self.pool_progress_receiver.recv() => { self.progress_store.save(task_name.clone(), sequence_number).await?; let seq_number = self.progress_store.min_watermark()?; @@ -112,9 +113,9 @@ pub async fn setup_single_workflow( reader_options: Option, ) -> Result<( impl Future>, - oneshot::Sender<()>, + CancellationToken, )> { - let (exit_sender, exit_receiver) = oneshot::channel(); + let token = CancellationToken::new(); let metrics = DataIngestionMetrics::new(&Registry::new()); let progress_store = ShimProgressStore(initial_checkpoint_number); let mut executor = IndexerExecutor::new(progress_store, 1, metrics); @@ -126,8 +127,8 @@ pub async fn setup_single_workflow( Some(remote_store_url), vec![], reader_options.unwrap_or_default(), - exit_receiver, + token.child_token(), ), - exit_sender, + token, )) } diff --git a/crates/iota-data-ingestion-core/src/tests.rs b/crates/iota-data-ingestion-core/src/tests.rs index a04fa8c8ccc..e271f69be93 100644 --- a/crates/iota-data-ingestion-core/src/tests.rs +++ b/crates/iota-data-ingestion-core/src/tests.rs @@ -21,7 +21,7 @@ use iota_types::{ use prometheus::Registry; use rand::{SeedableRng, prelude::StdRng}; use tempfile::NamedTempFile; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use crate::{ DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, Worker, WorkerPool, @@ -48,21 +48,31 @@ async fn run( batch_size: 1, ..Default::default() }; - let (sender, recv) = oneshot::channel(); + match duration { None => { indexer - .run(path.unwrap_or_else(temp_dir), None, vec![], options, recv) + .run( + path.unwrap_or_else(temp_dir), + None, + vec![], + options, + CancellationToken::new(), + ) .await } Some(duration) => { - let handle = tokio::task::spawn(async move { - indexer - .run(path.unwrap_or_else(temp_dir), None, vec![], options, recv) - .await - }); + let token = CancellationToken::new(); + let token_child = token.child_token(); + let handle = tokio::task::spawn(indexer.run( + path.unwrap_or_else(temp_dir), + None, + vec![], + options, + token_child, + )); tokio::time::sleep(duration).await; - drop(sender); + token.cancel(); handle.await? } } diff --git a/crates/iota-data-ingestion/Cargo.toml b/crates/iota-data-ingestion/Cargo.toml index 2f998e650cc..e15a6b00ea1 100644 --- a/crates/iota-data-ingestion/Cargo.toml +++ b/crates/iota-data-ingestion/Cargo.toml @@ -26,6 +26,7 @@ serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true tokio = { workspace = true, features = ["full"] } +tokio-util.workspace = true tracing.workspace = true url.workspace = true diff --git a/crates/iota-data-ingestion/src/main.rs b/crates/iota-data-ingestion/src/main.rs index 1d87bd36c0b..14f947509f4 100644 --- a/crates/iota-data-ingestion/src/main.rs +++ b/crates/iota-data-ingestion/src/main.rs @@ -12,7 +12,8 @@ use iota_data_ingestion::{ use iota_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool}; use prometheus::Registry; use serde::{Deserialize, Serialize}; -use tokio::{signal, sync::oneshot}; +use tokio::signal; +use tokio_util::sync::CancellationToken; #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "lowercase")] @@ -68,7 +69,7 @@ fn default_remote_read_batch_size() -> usize { 100 } -fn setup_env(exit_sender: oneshot::Sender<()>) { +fn setup_env(token: CancellationToken) { let default_hook = std::panic::take_hook(); std::panic::set_hook(Box::new(move |panic| { @@ -76,20 +77,19 @@ fn setup_env(exit_sender: oneshot::Sender<()>) { std::process::exit(12); })); - tokio::spawn(async { + tokio::spawn(async move { signal::ctrl_c() .await .expect("Failed to install Ctrl+C handler"); - exit_sender - .send(()) - .expect("Failed to gracefully process shutdown"); + token.cancel(); }); } #[tokio::main] async fn main() -> Result<()> { - let (exit_sender, exit_receiver) = oneshot::channel(); - setup_env(exit_sender); + let token = CancellationToken::new(); + let token_child = token.child_token(); + setup_env(token); let args: Vec = env::args().collect(); assert_eq!(args.len(), 2, "configuration yaml file is required"); @@ -152,7 +152,7 @@ async fn main() -> Result<()> { config.remote_store_url, config.remote_store_options, reader_options, - exit_receiver, + token_child, ) .await?; Ok(()) diff --git a/crates/iota-indexer-builder/Cargo.toml b/crates/iota-indexer-builder/Cargo.toml index 1b12d82425f..67e187d9be1 100644 --- a/crates/iota-indexer-builder/Cargo.toml +++ b/crates/iota-indexer-builder/Cargo.toml @@ -12,6 +12,7 @@ anyhow.workspace = true async-trait.workspace = true prometheus.workspace = true tokio = { workspace = true, features = ["full"] } +tokio-util.workspace = true tracing.workspace = true # internal dependencies diff --git a/crates/iota-indexer-builder/src/iota_datasource.rs b/crates/iota-indexer-builder/src/iota_datasource.rs index 27839b1ddd6..85e71eaf7b1 100644 --- a/crates/iota-indexer-builder/src/iota_datasource.rs +++ b/crates/iota-indexer-builder/src/iota_datasource.rs @@ -15,10 +15,8 @@ use iota_types::{ full_checkpoint_content::{CheckpointData as IotaCheckpointData, CheckpointTransaction}, messages_checkpoint::CheckpointSequenceNumber, }; -use tokio::{ - sync::{oneshot, oneshot::Sender}, - task::JoinHandle, -}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use tracing::info; use crate::indexer_builder::{DataSender, Datasource}; @@ -53,11 +51,12 @@ impl Datasource for IotaCheckpointDatasource { target_checkpoint: u64, data_sender: DataSender, ) -> Result>, Error> { - let (exit_sender, exit_receiver) = oneshot::channel(); + let token = CancellationToken::new(); + let token_child = token.child_token(); let progress_store = PerTaskInMemProgressStore { current_checkpoint: starting_checkpoint, exit_checkpoint: target_checkpoint, - exit_sender: Some(exit_sender), + token: Some(token), }; let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone()); let worker = IndexerWorker::new(data_sender); @@ -76,7 +75,7 @@ impl Datasource for IotaCheckpointDatasource { Some(remote_store_url), vec![], // optional remote store access options ReaderOptions::default(), - exit_receiver, + token_child, ) .await?; Ok(()) @@ -87,7 +86,7 @@ impl Datasource for IotaCheckpointDatasource { struct PerTaskInMemProgressStore { pub current_checkpoint: u64, pub exit_checkpoint: u64, - pub exit_sender: Option>, + pub token: Option, } #[async_trait] @@ -105,8 +104,8 @@ impl ProgressStore for PerTaskInMemProgressStore { checkpoint_number: CheckpointSequenceNumber, ) -> anyhow::Result<()> { if checkpoint_number >= self.exit_checkpoint { - if let Some(sender) = self.exit_sender.take() { - let _ = sender.send(()); + if let Some(token) = self.token.take() { + token.cancel(); } } self.current_checkpoint = checkpoint_number; diff --git a/crates/iota-indexer/src/indexer.rs b/crates/iota-indexer/src/indexer.rs index 4325feae326..3f71000b7f8 100644 --- a/crates/iota-indexer/src/indexer.rs +++ b/crates/iota-indexer/src/indexer.rs @@ -13,7 +13,6 @@ use iota_data_ingestion_core::{ use iota_metrics::spawn_monitored_task; use iota_types::messages_checkpoint::CheckpointSequenceNumber; use prometheus::Registry; -use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tracing::info; @@ -132,14 +131,6 @@ impl Indexer { store.persist_protocol_configs_and_feature_flags(chain_id)?; } - let cancel_clone = cancel.clone(); - let (exit_sender, exit_receiver) = oneshot::channel(); - // Spawn a task that links the cancellation token to the exit sender - spawn_monitored_task!(async move { - cancel_clone.cancelled().await; - let _ = exit_sender.send(()); - }); - let mut executor = IndexerExecutor::new( ShimIndexerProgressStore::new(vec![ ("primary".to_string(), primary_watermark), @@ -170,7 +161,7 @@ impl Indexer { config.remote_store_url.clone(), vec![], extra_reader_options, - exit_receiver, + cancel.child_token(), ) .await?; Ok(())