-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: overhaul task spawning in the whole project #433
Conversation
2be95e2
to
72ab5e1
Compare
Cargo.toml
Outdated
@@ -20,7 +20,7 @@ members = [ | |||
# Tokio ecosystem | |||
tokio = { version = "1.37.0", features = ["full", "tracing"] } | |||
tokio-stream = "0.1.11" | |||
tokio-util = { version = "0.7.10", features = ["codec", "compat"] } | |||
tokio-util = { version = "0.7.13", features = ["codec", "compat"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need this patch version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes - it introduces a function called run_until_cancelled
, which we use currently in one place, but the usage can be extended to other places. this PR is already destructive as it is, but run_until_cancelled
can very easily replace this construct:
tokio::select! {
_ = cancellation_token.cancelled() => {}
_ = other_async_action_like_a_long_sleep => {}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe 0.7 will do the trick as well then? Instead of juggling patch versions all the time
if !shutdown_for_file_writer_rx.is_empty() { | ||
break; | ||
} | ||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that if cancellation is received during sleep or proccess_failed_proofs (which can take quite a long time I presume) we wait for their completion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, if we start the iteration it will complete at least once. that's the logic that was here before and i don't find it to be erroneous
f_ch.clear(); | ||
|
||
drop(f_ch); | ||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same question as above. Does it mean all write_record
happen anyway even if shutdown signal is received?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, as it did before
usecase::executor::spawn(async move { | ||
while !cancellation_token.is_cancelled() { | ||
// TODO: refactor this to use parameter from storage and last slot from slot storage | ||
// overwhelm_backfill_gap.store( | ||
// rocks_db.bubblegum_slots.iter_start().count().saturating_add(rocks_db.ingestable_slots.iter_start().count()) | ||
// >= consistence_backfilling_slots_threshold as usize, | ||
// Ordering::Relaxed, | ||
// ); | ||
// | ||
tokio::time::sleep(Duration::from_secs(CATCH_UP_SEQUENCES_TIMEOUT_SEC)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it do except for sleeping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nothing - as it did before
@@ -132,13 +132,13 @@ pub async fn run_backfill_slots<C>( | |||
C: BlockConsumer, | |||
{ | |||
loop { | |||
if shutdown_token.is_cancelled() { | |||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May shutdown be blocked for 400ms? Is it possible to select the whole function, e.g. select over timer and token and when the timer ticks we execute its body or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- 400ms is not exactly a long time and we can easily wait IMO
- yes but this PR is big already. i'm happy to do it in a different PR
@@ -182,7 +182,7 @@ where | |||
it.seek_to_first(); | |||
} | |||
while it.valid() { | |||
if shutdown_token.is_cancelled() { | |||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same concern. It's not a problem in general if the rest of the body will take a short time to complete because then we will guarantee that a message is processed and not lost though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each iteration here seems to be complete, we don't batch it or whatever; i think what we had before is fine
&self, | ||
cancellation_token: CancellationToken, | ||
) -> Result<(), JoinError> { | ||
while !cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_batch_mint_to_verify
will block the shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes - it's graceful and we will complete existing work and won't start new iterations or whatever
pub async fn process_batch_mints(&self, mut rx: Receiver<()>) { | ||
while rx.is_empty() { | ||
pub async fn process_batch_mints(&self, cancellation_token: CancellationToken) { | ||
while !cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown will be blocked by fetch_batch_mint_for_processing, is it intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as it was intended before; only the current iteration will be blocked
mut batch_mint_to_process: BatchMintWithState, | ||
) -> Result<(), IngesterError> { | ||
info!("Processing {} batch_mint file", &batch_mint_to_process.file_name); | ||
let start_time = Instant::now(); | ||
let (batch_mint, file_size, file_checksum) = | ||
self.read_batch_mint_file(&batch_mint_to_process).await?; | ||
let mut metadata_url = String::new(); | ||
while rx.is_empty() { | ||
while !cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inside the match
shutdown is impossible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i view it as do {} while
, we do the current stuff and then exit
let cancellation_token = cancellation_token.child_token(); | ||
async move { | ||
while let Ok((slot, raw_block_data)) = slot_receiver.recv().await { | ||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During the body execution shutting down is impossible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's fine because we want to finish the work we started
|
||
// Increment slots_processed | ||
let current_slots_processed = | ||
slots_processed.fetch_add(1, Ordering::Relaxed) + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That +1 outside of atomic operation looks unusual to me. Why not +2 then inside the fetch_add
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetch_add returns the previous value (before the increment), so to get the value after the call we have to add 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might overflow the current_slots_processed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty sure this won't be the case in the coming years
|
||
loop { | ||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The body is blocked. Is not it possible to use select! for such cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this won't work for too long and IMO is fine
I would add an additional function for graceful shutdown of the PG connections pool since it is not implemented now, so there may be connection leaks or data inconsistency on long queries. |
let missed_jsons_file = | ||
File::create(MISSED_JSONS_FILE).expect("Failed to create file for missed jsons"); | ||
usecase::executor::spawn({ | ||
let cancellation_token = cancellation_token.child_token(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth to check the token before we start processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not in the utility binaries IMO
error!("Error writing failed checks: {}", e); | ||
} | ||
usecase::executor::spawn({ | ||
let cancellation_token = cancellation_token.child_token(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same, may to check parent cancellation before start data processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not in the utility binaries IMO
// Update rate | ||
{ | ||
let mut rate_guard = rate.lock().unwrap(); | ||
*rate_guard = current_rate; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Update rate | |
{ | |
let mut rate_guard = rate.lock().unwrap(); | |
*rate_guard = current_rate; | |
} | |
*rate.lock().unwrap() = current_rate; |
Isn't it possible? Why do we need a dedicated block for it? I guess it will be freed at the end of the loop, won't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is out of scope for this PR. i will create a task gathering such places and will submit the refactoring changes in a later PR
while first_processed_slot.load(Ordering::Relaxed) == 0 && shutdown_rx.is_empty() { | ||
tokio_sleep(Duration::from_millis(100)).await | ||
while first_processed_slot.load(Ordering::Relaxed) == 0 | ||
&& !cancellation_token.is_cancelled() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my perspective, the cancellation token does nothing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does - we will exit if the first processed slot is still 0 and SIGINT is triggered
usecase::executor::spawn({ | ||
let cancellation_token = cancellation_token.child_token(); | ||
async move { | ||
let program_id = mpl_bubblegum::programs::MPL_BUBBLEGUM_ID; | ||
while !cancellation_token.is_cancelled() { | ||
match signature_fetcher | ||
.fetch_signatures(program_id, args.rpc_retry_interval_millis) | ||
.await | ||
{ | ||
Ok(_) => { | ||
metrics_clone.inc_run_fetch_signatures( | ||
"fetch_signatures", | ||
MetricStatus::SUCCESS, | ||
); | ||
info!( | ||
"signatures sync finished successfully for program_id: {}", | ||
program_id | ||
); | ||
}, | ||
Err(e) => { | ||
metrics_clone.inc_run_fetch_signatures( | ||
"fetch_signatures", | ||
MetricStatus::FAILURE, | ||
); | ||
error!( | ||
"signatures sync failed: {:?} for program_id: {}", | ||
e, program_id | ||
); | ||
}, | ||
} | ||
|
||
tokio_sleep(Duration::from_secs(60)).await; | ||
tokio::select! { | ||
_ = cancellation_token.cancelled() => {} | ||
_ = tokio::time::sleep(Duration::from_secs(60)) => {} | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usecase::executor::spawn({ | |
let cancellation_token = cancellation_token.child_token(); | |
async move { | |
let program_id = mpl_bubblegum::programs::MPL_BUBBLEGUM_ID; | |
while !cancellation_token.is_cancelled() { | |
match signature_fetcher | |
.fetch_signatures(program_id, args.rpc_retry_interval_millis) | |
.await | |
{ | |
Ok(_) => { | |
metrics_clone.inc_run_fetch_signatures( | |
"fetch_signatures", | |
MetricStatus::SUCCESS, | |
); | |
info!( | |
"signatures sync finished successfully for program_id: {}", | |
program_id | |
); | |
}, | |
Err(e) => { | |
metrics_clone.inc_run_fetch_signatures( | |
"fetch_signatures", | |
MetricStatus::FAILURE, | |
); | |
error!( | |
"signatures sync failed: {:?} for program_id: {}", | |
e, program_id | |
); | |
}, | |
} | |
tokio_sleep(Duration::from_secs(60)).await; | |
tokio::select! { | |
_ = cancellation_token.cancelled() => {} | |
_ = tokio::time::sleep(Duration::from_secs(60)) => {} | |
} | |
} | |
} | |
Ok(()) | |
}); | |
usecase::executor::spawn({ | |
let cancellation_token = cancellation_token.child_token(); | |
async move { | |
let program_id = mpl_bubblegum::programs::MPL_BUBBLEGUM_ID; | |
tokio::select! { | |
_ = cancellation_token.cancelled() => {} | |
_ = async move { | |
loop { | |
match signature_fetcher.fetch_signatures(program_id, args.rpc_retry_interval_millis).await { | |
Ok(_) => { | |
metrics_clone.inc_run_fetch_signatures("fetch_signatures", MetricStatus::SUCCESS); | |
info!("signatures sync finished successfully for program_id: {}", program_id); | |
} | |
Err(e) => { | |
metrics_clone.inc_run_fetch_signatures("fetch_signatures", MetricStatus::FAILURE); | |
error!("signatures sync failed: {:?} for program_id: {}", e, program_id); | |
} | |
} | |
tokio::time::sleep(Duration::from_secs(60)).await; | |
} | |
} => {} | |
} | |
} | |
}); |
Is not it simpler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really, and at this point we give up all the formatting to tokio::select, which is unformattable
let cancellation_token = cancellation_token.child_token(); | ||
async move { | ||
loop { | ||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The body will be blocked from the cancellation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be reviewed in the next iterations of this PR
}); | ||
|
||
assets_iter.seek_to_first(); | ||
while assets_iter.valid() { | ||
if !rx.is_empty() { | ||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The body will block the cancellation Maybe it's intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be reviewed in the next iterations of this PR
} | ||
Err(e) => { | ||
tracing::error!("Full {:?} synchronization failed: {:?}", asset_type, e); | ||
sync_tasks.spawn({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a lot of cancellation_token
is happening here. can't it be replaced by one select!
?
|
||
let mut forked_slots = 0; | ||
let mut delete_items = Vec::new(); | ||
|
||
// from this column data will be dropped by slot | ||
// if we have any update from forked slot we have to delete it | ||
for cl_item in self.cl_items_manager.cl_items_iter() { | ||
if !rx.is_empty() { | ||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The huge function is prevented from shutting down by that cancellation because it is happening only at the function beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be reviewed in the next iterations of this PR
) { | ||
while rx.is_empty() { | ||
if let Err(e) = self.synchronize_nft_asset_indexes(rx, run_full_sync_threshold).await { | ||
while !cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like synchronize_nft_asset_indexes already accepts the token, I do not see why we need while
here. select
may be more suitable here, or even just loop
with break
on error
return from the function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each part in the subsequent call uses loops. the outer usage of cancellation token means that no new iterations will be started on this level, while the cancellation token in child functions acts to break inner loops
while rx.is_empty() { | ||
if let Err(e) = | ||
self.synchronize_fungible_asset_indexes(rx, run_full_sync_threshold).await | ||
while !cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same concern as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each part in the subsequent call uses loops. the outer usage of cancellation token means that no new iterations will be started on this level, while the cancellation token in child functions acts to break inner loops
let response = json_downloader | ||
.download_file(task.metadata_url.clone(), CLIENT_TIMEOUT) | ||
.await; | ||
let response = tokio::select! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe, that select
with loop
can replace the while
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be reviewed in the next iterations of this PR
continue; | ||
} | ||
usecase::executor::spawn(async move { | ||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why we need loop and sleep here simultaneously
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
nft_ingester/src/scheduler.rs
Outdated
) { | ||
usecase::executor::spawn(async move { | ||
tokio::select! { | ||
_ = cancellation_token.cancelled() => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we terminate all background processes? (as part of a new task)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they will be terminated by select!
@@ -172,7 +165,7 @@ impl<T: UnprocessedAccountsGetter> AccountsProcessor<T> { | |||
let mut interval = tokio::time::interval(FLUSH_INTERVAL); | |||
let mut batch_fill_instant = Instant::now(); | |||
|
|||
while rx.is_empty() { | |||
while !cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while can be replaced with loop
inside of the first branch imo
let run_transaction_processor = async move { | ||
while rx.is_empty() { | ||
usecase::executor::spawn(async move { | ||
while !cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cancellation is blocked
cloned_rocks_storage: Arc<Storage>, | ||
first_processed_slot_clone: Arc<AtomicU64>, | ||
last_saved_slot: u64, | ||
) -> Result<(), JoinError> { | ||
while cloned_rx.is_empty() { | ||
while !cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while Result(Ok)
seems more suitable here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, don't agree, it's pretty small and descriptive IMO
@@ -337,7 +338,7 @@ impl Dumper for Storage { | |||
synchronizer_metrics.inc_num_of_records_written("authority", 1); | |||
} | |||
} | |||
if !rx.is_empty() { | |||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cancellation is barely reachable inside this huge function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really, it will cancel upon subsequent iterator accesses, which are pretty fast
let start_time = chrono::Utc::now(); | ||
let mut all_keys = HashSet::new(); | ||
for (key, _) in self | ||
.db | ||
.full_iterator_cf(&self.db.cf_handle(RawBlock::NAME).unwrap(), IteratorMode::Start) | ||
.filter_map(Result::ok) | ||
{ | ||
if !rx.is_empty() { | ||
if cancellation_token.is_cancelled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should run in parallel imo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not async so IMO not worth it
70f40a6
to
1d5d1d8
Compare
pub fn spawn<T, F: Future<Output = T> + Send + 'static>(future: F) -> AbortHandle { | ||
tokio::task::block_in_place(|| { | ||
let mut executor_guard = EXECUTOR.blocking_lock(); | ||
let _ = executor_guard.get_or_init(JoinSet::new); | ||
let executor = | ||
executor_guard.get_mut().expect("executor join set to be initialized upon access"); | ||
executor.spawn(async move { | ||
let _ = future.await; | ||
}) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
block_in_place doesn't seem to be used right here. It temporarily converts the current asynchronous task into a blocking one, ifaik
wdyt of this instead?
pub fn spawn<T, F: Future<Output = T> + Send + 'static>(future: F) -> AbortHandle { | |
tokio::task::block_in_place(|| { | |
let mut executor_guard = EXECUTOR.blocking_lock(); | |
let _ = executor_guard.get_or_init(JoinSet::new); | |
let executor = | |
executor_guard.get_mut().expect("executor join set to be initialized upon access"); | |
executor.spawn(async move { | |
let _ = future.await; | |
}) | |
}) | |
} | |
pub async fn spawn<T, F: Future<Output = T> + Send + 'static>(future: F) -> AbortHandle { | |
let mut executor_guard = EXECUTOR.lock().await; | |
let executor = executor_guard.get_or_init(JoinSet::new); | |
executor.spawn(async move { | |
let _ = future.await; | |
}) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- won't work as i'd like to preserve the signature of spawn being a synchronous function
- to use blocking_lock we must enclose the spawning in
block_in_place
orspawn_blocking
orblock_on
. IMO this is not an issue since this spawn is extremely fast, and we use a multi-threaded runtime anyway, which implies the existence of a blocking thread.
let current_timestamp = Utc::now().timestamp_millis() as u64; | ||
for tx in txs { | ||
match geyser_bubblegum_updates_processor | ||
.process_transaction(tx.tx, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run_until_cancelled assumes the future is safe to cancel, but if you're canceled in the middle of process_transaction - we'll have an issue here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i believe you're right. rolling it back to use the loop
// Ordering::Relaxed, | ||
// ); | ||
// | ||
tokio::time::sleep(Duration::from_secs(CATCH_UP_SEQUENCES_TIMEOUT_SEC)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not keep the tokyo select and just replace rx.recv()
with cancellation_token.cancelled()
in such cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, though i will replace with CancellationToken::run_until_cancelled
let stop_handle = tokio::task::spawn({ | ||
let cancellation_token = cancellation_token.clone(); | ||
async move { | ||
usecase::graceful_stop::graceful_shutdown(cancellation_token).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it should be ok to set up those interruption handlers twice - here and below, but just double checking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdym?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we setup the interruption signal handler twice - in this method and directly in Ctrl+C handler slightly below. That should be ok, just was doublechecking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i think it's fine
1d5d1d8
to
5b58926
Compare
Overhaul task spawning in all modules. Remove usage of the `mutexed_tasks` concept, remove usage of a broadcast channel that was used for shutdown, create a static executor to save running tasks & exit gracefully.
Spawn graceful shutdown as the first tokio task in binaries that perform it, await it at the end.
5b58926
to
0c9da15
Compare
0c9da15
to
f19370b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets' roll this out!
Overhaul task spawning in all modules. Remove usage of the
mutexed_tasks
concept, remove usage of a broadcast channel that was used for shutdown, create a static executor to save running tasks & exit gracefully.