Skip to content

Commit

Permalink
refactor: use CancellationToken for IndexerExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiupopescu199 committed Dec 28, 2024
1 parent aa93858 commit 5dd444a
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 50 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions crates/iota-analytics-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() -> Result<()> {
batch_size: 10,
..Default::default()
};
let (executor, exit_sender) = setup_single_workflow(
let (executor, token) = setup_single_workflow(
processor,
remote_store_url,
watermark,
Expand All @@ -51,13 +51,11 @@ async fn main() -> Result<()> {
)
.await?;

tokio::spawn(async {
tokio::spawn(async move {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
exit_sender
.send(())
.expect("Failed to gracefully process shutdown");
token.cancel();
});
executor.await?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions crates/iota-data-ingestion-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json.workspace = true
tap.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
15 changes: 8 additions & 7 deletions crates/iota-data-ingestion-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use iota_types::{
full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
};
use prometheus::Registry;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::{
DataIngestionMetrics, ReaderOptions, Worker,
Expand Down Expand Up @@ -65,7 +66,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
remote_store_url: Option<String>,
remote_store_options: Vec<(String, String)>,
reader_options: ReaderOptions,
mut exit_receiver: oneshot::Receiver<()>,
token: CancellationToken,
) -> Result<ExecutorProgress> {
let mut reader_checkpoint_number = self.progress_store.min_watermark()?;
let (checkpoint_reader, mut checkpoint_recv, gc_sender, _exit_sender) =
Expand All @@ -83,7 +84,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
}
loop {
tokio::select! {
_ = &mut exit_receiver => break,
_ = token.cancelled() => break,
Some((task_name, sequence_number)) = self.pool_progress_receiver.recv() => {
self.progress_store.save(task_name.clone(), sequence_number).await?;
let seq_number = self.progress_store.min_watermark()?;
Expand Down Expand Up @@ -112,9 +113,9 @@ pub async fn setup_single_workflow<W: Worker + 'static>(
reader_options: Option<ReaderOptions>,
) -> Result<(
impl Future<Output = Result<ExecutorProgress>>,
oneshot::Sender<()>,
CancellationToken,
)> {
let (exit_sender, exit_receiver) = oneshot::channel();
let token = CancellationToken::new();
let metrics = DataIngestionMetrics::new(&Registry::new());
let progress_store = ShimProgressStore(initial_checkpoint_number);
let mut executor = IndexerExecutor::new(progress_store, 1, metrics);
Expand All @@ -126,8 +127,8 @@ pub async fn setup_single_workflow<W: Worker + 'static>(
Some(remote_store_url),
vec![],
reader_options.unwrap_or_default(),
exit_receiver,
token.child_token(),
),
exit_sender,
token,
))
}
28 changes: 19 additions & 9 deletions crates/iota-data-ingestion-core/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use iota_types::{
use prometheus::Registry;
use rand::{SeedableRng, prelude::StdRng};
use tempfile::NamedTempFile;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;

use crate::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, Worker, WorkerPool,
Expand All @@ -48,21 +48,31 @@ async fn run(
batch_size: 1,
..Default::default()
};
let (sender, recv) = oneshot::channel();

match duration {
None => {
indexer
.run(path.unwrap_or_else(temp_dir), None, vec![], options, recv)
.run(
path.unwrap_or_else(temp_dir),
None,
vec![],
options,
CancellationToken::new(),
)
.await
}
Some(duration) => {
let handle = tokio::task::spawn(async move {
indexer
.run(path.unwrap_or_else(temp_dir), None, vec![], options, recv)
.await
});
let token = CancellationToken::new();
let token_child = token.child_token();
let handle = tokio::task::spawn(indexer.run(
path.unwrap_or_else(temp_dir),
None,
vec![],
options,
token_child,
));
tokio::time::sleep(duration).await;
drop(sender);
token.cancel();
handle.await?
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/iota-data-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
18 changes: 9 additions & 9 deletions crates/iota-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use iota_data_ingestion::{
use iota_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use prometheus::Registry;
use serde::{Deserialize, Serialize};
use tokio::{signal, sync::oneshot};
use tokio::signal;
use tokio_util::sync::CancellationToken;

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -68,28 +69,27 @@ fn default_remote_read_batch_size() -> usize {
100
}

fn setup_env(exit_sender: oneshot::Sender<()>) {
fn setup_env(token: CancellationToken) {
let default_hook = std::panic::take_hook();

std::panic::set_hook(Box::new(move |panic| {
default_hook(panic);
std::process::exit(12);
}));

tokio::spawn(async {
tokio::spawn(async move {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
exit_sender
.send(())
.expect("Failed to gracefully process shutdown");
token.cancel();
});
}

#[tokio::main]
async fn main() -> Result<()> {
let (exit_sender, exit_receiver) = oneshot::channel();
setup_env(exit_sender);
let token = CancellationToken::new();
let token_child = token.child_token();
setup_env(token);

let args: Vec<String> = env::args().collect();
assert_eq!(args.len(), 2, "configuration yaml file is required");
Expand Down Expand Up @@ -152,7 +152,7 @@ async fn main() -> Result<()> {
config.remote_store_url,
config.remote_store_options,
reader_options,
exit_receiver,
token_child,
)
.await?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions crates/iota-indexer-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ anyhow.workspace = true
async-trait.workspace = true
prometheus.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util.workspace = true
tracing.workspace = true

# internal dependencies
Expand Down
19 changes: 9 additions & 10 deletions crates/iota-indexer-builder/src/iota_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ use iota_types::{
full_checkpoint_content::{CheckpointData as IotaCheckpointData, CheckpointTransaction},
messages_checkpoint::CheckpointSequenceNumber,
};
use tokio::{
sync::{oneshot, oneshot::Sender},
task::JoinHandle,
};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::indexer_builder::{DataSender, Datasource};
Expand Down Expand Up @@ -53,11 +51,12 @@ impl Datasource<CheckpointTxnData> for IotaCheckpointDatasource {
target_checkpoint: u64,
data_sender: DataSender<CheckpointTxnData>,
) -> Result<JoinHandle<Result<(), Error>>, Error> {
let (exit_sender, exit_receiver) = oneshot::channel();
let token = CancellationToken::new();
let token_child = token.child_token();
let progress_store = PerTaskInMemProgressStore {
current_checkpoint: starting_checkpoint,
exit_checkpoint: target_checkpoint,
exit_sender: Some(exit_sender),
token: Some(token),
};
let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone());
let worker = IndexerWorker::new(data_sender);
Expand All @@ -76,7 +75,7 @@ impl Datasource<CheckpointTxnData> for IotaCheckpointDatasource {
Some(remote_store_url),
vec![], // optional remote store access options
ReaderOptions::default(),
exit_receiver,
token_child,
)
.await?;
Ok(())
Expand All @@ -87,7 +86,7 @@ impl Datasource<CheckpointTxnData> for IotaCheckpointDatasource {
struct PerTaskInMemProgressStore {
pub current_checkpoint: u64,
pub exit_checkpoint: u64,
pub exit_sender: Option<Sender<()>>,
pub token: Option<CancellationToken>,
}

#[async_trait]
Expand All @@ -105,8 +104,8 @@ impl ProgressStore for PerTaskInMemProgressStore {
checkpoint_number: CheckpointSequenceNumber,
) -> anyhow::Result<()> {
if checkpoint_number >= self.exit_checkpoint {
if let Some(sender) = self.exit_sender.take() {
let _ = sender.send(());
if let Some(token) = self.token.take() {
token.cancel();
}
}
self.current_checkpoint = checkpoint_number;
Expand Down
11 changes: 1 addition & 10 deletions crates/iota-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use iota_data_ingestion_core::{
use iota_metrics::spawn_monitored_task;
use iota_types::messages_checkpoint::CheckpointSequenceNumber;
use prometheus::Registry;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::info;

Expand Down Expand Up @@ -132,14 +131,6 @@ impl Indexer {
store.persist_protocol_configs_and_feature_flags(chain_id)?;
}

let cancel_clone = cancel.clone();
let (exit_sender, exit_receiver) = oneshot::channel();
// Spawn a task that links the cancellation token to the exit sender
spawn_monitored_task!(async move {
cancel_clone.cancelled().await;
let _ = exit_sender.send(());
});

let mut executor = IndexerExecutor::new(
ShimIndexerProgressStore::new(vec![
("primary".to_string(), primary_watermark),
Expand Down Expand Up @@ -170,7 +161,7 @@ impl Indexer {
config.remote_store_url.clone(),
vec![],
extra_reader_options,
exit_receiver,
cancel.child_token(),
)
.await?;
Ok(())
Expand Down

0 comments on commit 5dd444a

Please sign in to comment.