Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: overhaul task spawning in the whole project #433

Merged
merged 5 commits into from
Mar 3, 2025

Conversation

armyhaylenko
Copy link
Contributor

Overhaul task spawning in all modules. Remove usage of the mutexed_tasks concept, remove usage of a broadcast channel that was used for shutdown, create a static executor to save running tasks & exit gracefully.

@armyhaylenko armyhaylenko force-pushed the feature/MTG-1216-refactor-shutdown branch 3 times, most recently from 2be95e2 to 72ab5e1 Compare February 28, 2025 10:51
Cargo.toml Outdated
@@ -20,7 +20,7 @@ members = [
# Tokio ecosystem
tokio = { version = "1.37.0", features = ["full", "tracing"] }
tokio-stream = "0.1.11"
tokio-util = { version = "0.7.10", features = ["codec", "compat"] }
tokio-util = { version = "0.7.13", features = ["codec", "compat"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this patch version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - it introduces a function called run_until_cancelled, which we use currently in one place, but the usage can be extended to other places. this PR is already destructive as it is, but run_until_cancelled can very easily replace this construct:

tokio::select! {
  _ = cancellation_token.cancelled() => {}
  _ = other_async_action_like_a_long_sleep => {}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe 0.7 will do the trick as well then? Instead of juggling patch versions all the time

if !shutdown_for_file_writer_rx.is_empty() {
break;
}
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that if cancellation is received during sleep or proccess_failed_proofs (which can take quite a long time I presume) we wait for their completion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if we start the iteration it will complete at least once. that's the logic that was here before and i don't find it to be erroneous

f_ch.clear();

drop(f_ch);
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same question as above. Does it mean all write_record happen anyway even if shutdown signal is received?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, as it did before

Comment on lines 43 to 52
usecase::executor::spawn(async move {
while !cancellation_token.is_cancelled() {
// TODO: refactor this to use parameter from storage and last slot from slot storage
// overwhelm_backfill_gap.store(
// rocks_db.bubblegum_slots.iter_start().count().saturating_add(rocks_db.ingestable_slots.iter_start().count())
// >= consistence_backfilling_slots_threshold as usize,
// Ordering::Relaxed,
// );
//
tokio::time::sleep(Duration::from_secs(CATCH_UP_SEQUENCES_TIMEOUT_SEC)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it do except for sleeping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing - as it did before

@@ -132,13 +132,13 @@ pub async fn run_backfill_slots<C>(
C: BlockConsumer,
{
loop {
if shutdown_token.is_cancelled() {
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May shutdown be blocked for 400ms? Is it possible to select the whole function, e.g. select over timer and token and when the timer ticks we execute its body or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 400ms is not exactly a long time and we can easily wait IMO
  2. yes but this PR is big already. i'm happy to do it in a different PR

@@ -182,7 +182,7 @@ where
it.seek_to_first();
}
while it.valid() {
if shutdown_token.is_cancelled() {
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same concern. It's not a problem in general if the rest of the body will take a short time to complete because then we will guarantee that a message is processed and not lost though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each iteration here seems to be complete, we don't batch it or whatever; i think what we had before is fine

&self,
cancellation_token: CancellationToken,
) -> Result<(), JoinError> {
while !cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_batch_mint_to_verify will block the shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - it's graceful and we will complete existing work and won't start new iterations or whatever

pub async fn process_batch_mints(&self, mut rx: Receiver<()>) {
while rx.is_empty() {
pub async fn process_batch_mints(&self, cancellation_token: CancellationToken) {
while !cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown will be blocked by fetch_batch_mint_for_processing, is it intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it was intended before; only the current iteration will be blocked

mut batch_mint_to_process: BatchMintWithState,
) -> Result<(), IngesterError> {
info!("Processing {} batch_mint file", &batch_mint_to_process.file_name);
let start_time = Instant::now();
let (batch_mint, file_size, file_checksum) =
self.read_batch_mint_file(&batch_mint_to_process).await?;
let mut metadata_url = String::new();
while rx.is_empty() {
while !cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inside the match shutdown is impossible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i view it as do {} while, we do the current stuff and then exit

let cancellation_token = cancellation_token.child_token();
async move {
while let Ok((slot, raw_block_data)) = slot_receiver.recv().await {
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During the body execution shutting down is impossible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fine because we want to finish the work we started


// Increment slots_processed
let current_slots_processed =
slots_processed.fetch_add(1, Ordering::Relaxed) + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That +1 outside of atomic operation looks unusual to me. Why not +2 then inside the fetch_add?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_add returns the previous value (before the increment), so to get the value after the call we have to add 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might overflow the current_slots_processed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty sure this won't be the case in the coming years


loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body is blocked. Is not it possible to use select! for such cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't work for too long and IMO is fine

@andrii-kl
Copy link
Collaborator

andrii-kl commented Feb 28, 2025

I would add an additional function for graceful shutdown of the PG connections pool since it is not implemented now, so there may be connection leaks or data inconsistency on long queries. pool.close().await;

let missed_jsons_file =
File::create(MISSED_JSONS_FILE).expect("Failed to create file for missed jsons");
usecase::executor::spawn({
let cancellation_token = cancellation_token.child_token();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth to check the token before we start processing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in the utility binaries IMO

error!("Error writing failed checks: {}", e);
}
usecase::executor::spawn({
let cancellation_token = cancellation_token.child_token();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same, may to check parent cancellation before start data processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in the utility binaries IMO

Comment on lines +270 to +274
// Update rate
{
let mut rate_guard = rate.lock().unwrap();
*rate_guard = current_rate;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Update rate
{
let mut rate_guard = rate.lock().unwrap();
*rate_guard = current_rate;
}
*rate.lock().unwrap() = current_rate;

Isn't it possible? Why do we need a dedicated block for it? I guess it will be freed at the end of the loop, won't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is out of scope for this PR. i will create a task gathering such places and will submit the refactoring changes in a later PR

while first_processed_slot.load(Ordering::Relaxed) == 0 && shutdown_rx.is_empty() {
tokio_sleep(Duration::from_millis(100)).await
while first_processed_slot.load(Ordering::Relaxed) == 0
&& !cancellation_token.is_cancelled()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my perspective, the cancellation token does nothing here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does - we will exit if the first processed slot is still 0 and SIGINT is triggered

Comment on lines +547 to 584
usecase::executor::spawn({
let cancellation_token = cancellation_token.child_token();
async move {
let program_id = mpl_bubblegum::programs::MPL_BUBBLEGUM_ID;
while !cancellation_token.is_cancelled() {
match signature_fetcher
.fetch_signatures(program_id, args.rpc_retry_interval_millis)
.await
{
Ok(_) => {
metrics_clone.inc_run_fetch_signatures(
"fetch_signatures",
MetricStatus::SUCCESS,
);
info!(
"signatures sync finished successfully for program_id: {}",
program_id
);
},
Err(e) => {
metrics_clone.inc_run_fetch_signatures(
"fetch_signatures",
MetricStatus::FAILURE,
);
error!(
"signatures sync failed: {:?} for program_id: {}",
e, program_id
);
},
}

tokio_sleep(Duration::from_secs(60)).await;
tokio::select! {
_ = cancellation_token.cancelled() => {}
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
}
}
}

Ok(())
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
usecase::executor::spawn({
let cancellation_token = cancellation_token.child_token();
async move {
let program_id = mpl_bubblegum::programs::MPL_BUBBLEGUM_ID;
while !cancellation_token.is_cancelled() {
match signature_fetcher
.fetch_signatures(program_id, args.rpc_retry_interval_millis)
.await
{
Ok(_) => {
metrics_clone.inc_run_fetch_signatures(
"fetch_signatures",
MetricStatus::SUCCESS,
);
info!(
"signatures sync finished successfully for program_id: {}",
program_id
);
},
Err(e) => {
metrics_clone.inc_run_fetch_signatures(
"fetch_signatures",
MetricStatus::FAILURE,
);
error!(
"signatures sync failed: {:?} for program_id: {}",
e, program_id
);
},
}
tokio_sleep(Duration::from_secs(60)).await;
tokio::select! {
_ = cancellation_token.cancelled() => {}
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
}
}
}
Ok(())
});
usecase::executor::spawn({
let cancellation_token = cancellation_token.child_token();
async move {
let program_id = mpl_bubblegum::programs::MPL_BUBBLEGUM_ID;
tokio::select! {
_ = cancellation_token.cancelled() => {}
_ = async move {
loop {
match signature_fetcher.fetch_signatures(program_id, args.rpc_retry_interval_millis).await {
Ok(_) => {
metrics_clone.inc_run_fetch_signatures("fetch_signatures", MetricStatus::SUCCESS);
info!("signatures sync finished successfully for program_id: {}", program_id);
}
Err(e) => {
metrics_clone.inc_run_fetch_signatures("fetch_signatures", MetricStatus::FAILURE);
error!("signatures sync failed: {:?} for program_id: {}", e, program_id);
}
}
tokio::time::sleep(Duration::from_secs(60)).await;
}
} => {}
}
}
});

Is not it simpler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, and at this point we give up all the formatting to tokio::select, which is unformattable

let cancellation_token = cancellation_token.child_token();
async move {
loop {
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body will be blocked from the cancellation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be reviewed in the next iterations of this PR

});

assets_iter.seek_to_first();
while assets_iter.valid() {
if !rx.is_empty() {
if cancellation_token.is_cancelled() {
Copy link
Contributor

@kstepanovdev kstepanovdev Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body will block the cancellation Maybe it's intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be reviewed in the next iterations of this PR

}
Err(e) => {
tracing::error!("Full {:?} synchronization failed: {:?}", asset_type, e);
sync_tasks.spawn({
Copy link
Contributor

@kstepanovdev kstepanovdev Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of cancellation_token is happening here. can't it be replaced by one select! ?


let mut forked_slots = 0;
let mut delete_items = Vec::new();

// from this column data will be dropped by slot
// if we have any update from forked slot we have to delete it
for cl_item in self.cl_items_manager.cl_items_iter() {
if !rx.is_empty() {
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The huge function is prevented from shutting down by that cancellation because it is happening only at the function beginning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be reviewed in the next iterations of this PR

) {
while rx.is_empty() {
if let Err(e) = self.synchronize_nft_asset_indexes(rx, run_full_sync_threshold).await {
while !cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like synchronize_nft_asset_indexes already accepts the token, I do not see why we need while here. select may be more suitable here, or even just loop with break on error return from the function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each part in the subsequent call uses loops. the outer usage of cancellation token means that no new iterations will be started on this level, while the cancellation token in child functions acts to break inner loops

while rx.is_empty() {
if let Err(e) =
self.synchronize_fungible_asset_indexes(rx, run_full_sync_threshold).await
while !cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same concern as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each part in the subsequent call uses loops. the outer usage of cancellation token means that no new iterations will be started on this level, while the cancellation token in child functions acts to break inner loops

let response = json_downloader
.download_file(task.metadata_url.clone(), CLIENT_TIMEOUT)
.await;
let response = tokio::select! {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, that select with loop can replace the while?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be reviewed in the next iterations of this PR

continue;
}
usecase::executor::spawn(async move {
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we need loop and sleep here simultaneously

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

) {
usecase::executor::spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we terminate all background processes? (as part of a new task)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they will be terminated by select!

@@ -172,7 +165,7 @@ impl<T: UnprocessedAccountsGetter> AccountsProcessor<T> {
let mut interval = tokio::time::interval(FLUSH_INTERVAL);
let mut batch_fill_instant = Instant::now();

while rx.is_empty() {
while !cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while can be replaced with loop inside of the first branch imo

let run_transaction_processor = async move {
while rx.is_empty() {
usecase::executor::spawn(async move {
while !cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancellation is blocked

cloned_rocks_storage: Arc<Storage>,
first_processed_slot_clone: Arc<AtomicU64>,
last_saved_slot: u64,
) -> Result<(), JoinError> {
while cloned_rx.is_empty() {
while !cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while Result(Ok) seems more suitable here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, don't agree, it's pretty small and descriptive IMO

@@ -337,7 +338,7 @@ impl Dumper for Storage {
synchronizer_metrics.inc_num_of_records_written("authority", 1);
}
}
if !rx.is_empty() {
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cancellation is barely reachable inside this huge function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, it will cancel upon subsequent iterator accesses, which are pretty fast

let start_time = chrono::Utc::now();
let mut all_keys = HashSet::new();
for (key, _) in self
.db
.full_iterator_cf(&self.db.cf_handle(RawBlock::NAME).unwrap(), IteratorMode::Start)
.filter_map(Result::ok)
{
if !rx.is_empty() {
if cancellation_token.is_cancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should run in parallel imo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not async so IMO not worth it

@armyhaylenko armyhaylenko force-pushed the feature/MTG-1216-refactor-shutdown branch from 70f40a6 to 1d5d1d8 Compare February 28, 2025 14:45
Comment on lines +10 to +20
pub fn spawn<T, F: Future<Output = T> + Send + 'static>(future: F) -> AbortHandle {
tokio::task::block_in_place(|| {
let mut executor_guard = EXECUTOR.blocking_lock();
let _ = executor_guard.get_or_init(JoinSet::new);
let executor =
executor_guard.get_mut().expect("executor join set to be initialized upon access");
executor.spawn(async move {
let _ = future.await;
})
})
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block_in_place doesn't seem to be used right here. It temporarily converts the current asynchronous task into a blocking one, ifaik
wdyt of this instead?

Suggested change
pub fn spawn<T, F: Future<Output = T> + Send + 'static>(future: F) -> AbortHandle {
tokio::task::block_in_place(|| {
let mut executor_guard = EXECUTOR.blocking_lock();
let _ = executor_guard.get_or_init(JoinSet::new);
let executor =
executor_guard.get_mut().expect("executor join set to be initialized upon access");
executor.spawn(async move {
let _ = future.await;
})
})
}
pub async fn spawn<T, F: Future<Output = T> + Send + 'static>(future: F) -> AbortHandle {
let mut executor_guard = EXECUTOR.lock().await;
let executor = executor_guard.get_or_init(JoinSet::new);
executor.spawn(async move {
let _ = future.await;
})
}

Copy link
Contributor Author

@armyhaylenko armyhaylenko Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. won't work as i'd like to preserve the signature of spawn being a synchronous function
  2. to use blocking_lock we must enclose the spawning in block_in_place or spawn_blocking or block_on. IMO this is not an issue since this spawn is extremely fast, and we use a multi-threaded runtime anyway, which implies the existence of a blocking thread.

let current_timestamp = Utc::now().timestamp_millis() as u64;
for tx in txs {
match geyser_bubblegum_updates_processor
.process_transaction(tx.tx, false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_until_cancelled assumes the future is safe to cancel, but if you're canceled in the middle of process_transaction - we'll have an issue here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe you're right. rolling it back to use the loop

// Ordering::Relaxed,
// );
//
tokio::time::sleep(Duration::from_secs(CATCH_UP_SEQUENCES_TIMEOUT_SEC)).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not keep the tokyo select and just replace rx.recv() with cancellation_token.cancelled() in such cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, though i will replace with CancellationToken::run_until_cancelled

let stop_handle = tokio::task::spawn({
let cancellation_token = cancellation_token.clone();
async move {
usecase::graceful_stop::graceful_shutdown(cancellation_token).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it should be ok to set up those interruption handlers twice - here and below, but just double checking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdym?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we setup the interruption signal handler twice - in this method and directly in Ctrl+C handler slightly below. That should be ok, just was doublechecking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think it's fine

@armyhaylenko armyhaylenko force-pushed the feature/MTG-1216-refactor-shutdown branch from 1d5d1d8 to 5b58926 Compare March 3, 2025 13:46
Overhaul task spawning in all modules. Remove usage of
the `mutexed_tasks` concept, remove usage of a broadcast
channel that was used for shutdown, create a static
executor to save running tasks & exit gracefully.
Spawn graceful shutdown as the first tokio task in binaries that perform
it, await it at the end.
@armyhaylenko armyhaylenko force-pushed the feature/MTG-1216-refactor-shutdown branch from 5b58926 to 0c9da15 Compare March 3, 2025 14:29
@armyhaylenko armyhaylenko force-pushed the feature/MTG-1216-refactor-shutdown branch from 0c9da15 to f19370b Compare March 3, 2025 14:34
Copy link
Collaborator

@StanChe StanChe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets' roll this out!

@armyhaylenko armyhaylenko merged commit 427cc6d into develop Mar 3, 2025
9 of 10 checks passed
@armyhaylenko armyhaylenko deleted the feature/MTG-1216-refactor-shutdown branch March 3, 2025 15:36
@github-actions github-actions bot mentioned this pull request Mar 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants