diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 4323008687baa0..7c5e6dc8804235 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -460,6 +460,9 @@ impl TaskHandler for DefaultTaskHandler { let pre_commit_callback = match scheduling_context.mode() { SchedulingMode::BlockVerification => None, SchedulingMode::BlockProduction => Some(|| { + if !scheduling_context.can_commit() { + return false; + } let summary = handler_context .transaction_recorder .as_ref() @@ -870,7 +873,7 @@ impl, TH: TaskHandler> ThreadManager { #[must_use] fn accumulate_result_with_timings( - mode: SchedulingMode, + context: &SchedulingContext, (result, timings): &mut ResultWithTimings, executed_task: HandlerResult, ignored_error_count: &mut usize, @@ -879,7 +882,7 @@ impl, TH: TaskHandler> ThreadManager { return None; }; timings.accumulate(&executed_task.result_with_timings.1); - match mode { + match context.mode() { SchedulingMode::BlockVerification => match executed_task.result_with_timings.0 { Ok(()) => Some(executed_task), Err(error) => { @@ -888,7 +891,13 @@ impl, TH: TaskHandler> ThreadManager { None } }, - SchedulingMode::BlockProduction => match executed_task.result_with_timings.0 { + SchedulingMode::BlockProduction => { + if !context.can_commit() { + info!("detected max tick height at scheduler thread..."); + *result = Err(TransactionError::CommitFailed); + None + } + match executed_task.result_with_timings.0 { Ok(()) => Some(executed_task), Err(error @ TransactionError::CommitFailed) => { info!("maybe reached max tick height...: {error:?}"); @@ -902,7 +911,7 @@ impl, TH: TaskHandler> ThreadManager { *ignored_error_count += 1; Some(executed_task) } - }, + }}, } } @@ -1008,7 +1017,7 @@ impl, TH: TaskHandler> ThreadManager { // heuristic's caveat for the first task of linearized runs, which is described above. let mode = context.mode(); let (mut runnable_task_sender, runnable_task_receiver) = - chained_channel::unbounded::(context); + chained_channel::unbounded::(context.clone()); // Create two handler-to-scheduler channels to prioritize the finishing of blocked tasks, // because it is more likely that a blocked task will have more blocked tasks behind it, // which should be scheduled while minimizing the delay to clear buffered linearized runs @@ -1173,7 +1182,7 @@ impl, TH: TaskHandler> ThreadManager { let step_type = select! { recv(finished_blocked_task_receiver) -> executed_task => { let Some(executed_task) = Self::accumulate_result_with_timings( - state_machine.mode(), + &context, &mut result_with_timings, executed_task.expect("alive handler"), &mut ignored_error_count, @@ -1221,7 +1230,7 @@ impl, TH: TaskHandler> ThreadManager { }, recv(finished_idle_task_receiver) -> executed_task => { let Some(executed_task) = Self::accumulate_result_with_timings( - state_machine.mode(), + &context, &mut result_with_timings, executed_task.expect("alive handler"), &mut ignored_error_count, @@ -1268,8 +1277,9 @@ impl, TH: TaskHandler> ThreadManager { log_scheduler!(info, "started"); runnable_task_sender - .send_chained_channel(new_context, handler_count) + .send_chained_channel(new_context.clone(), handler_count) .unwrap(); + context = new_context; result_with_timings = new_result_with_timings; } Err(_) => {