Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jul 10, 2024
1 parent 3d76375 commit 774b55b
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,9 @@ impl TaskHandler for DefaultTaskHandler {
let pre_commit_callback = match scheduling_context.mode() {
SchedulingMode::BlockVerification => None,
SchedulingMode::BlockProduction => Some(|| {
if !scheduling_context.can_commit() {
return false;
}
let summary = handler_context
.transaction_recorder
.as_ref()
Expand Down Expand Up @@ -870,7 +873,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

#[must_use]
fn accumulate_result_with_timings(
mode: SchedulingMode,
context: &SchedulingContext,
(result, timings): &mut ResultWithTimings,
executed_task: HandlerResult,
ignored_error_count: &mut usize,
Expand All @@ -879,7 +882,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
return None;
};
timings.accumulate(&executed_task.result_with_timings.1);
match mode {
match context.mode() {
SchedulingMode::BlockVerification => match executed_task.result_with_timings.0 {
Ok(()) => Some(executed_task),
Err(error) => {
Expand All @@ -888,7 +891,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
None
}
},
SchedulingMode::BlockProduction => match executed_task.result_with_timings.0 {
SchedulingMode::BlockProduction => {
if !context.can_commit() {
info!("detected max tick height at scheduler thread...");
*result = Err(TransactionError::CommitFailed);
None
}
match executed_task.result_with_timings.0 {
Ok(()) => Some(executed_task),
Err(error @ TransactionError::CommitFailed) => {
info!("maybe reached max tick height...: {error:?}");
Expand All @@ -902,7 +911,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
*ignored_error_count += 1;
Some(executed_task)
}
},
}},
}
}

Expand Down Expand Up @@ -1008,7 +1017,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// heuristic's caveat for the first task of linearized runs, which is described above.
let mode = context.mode();
let (mut runnable_task_sender, runnable_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context);
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
// Create two handler-to-scheduler channels to prioritize the finishing of blocked tasks,
// because it is more likely that a blocked task will have more blocked tasks behind it,
// which should be scheduled while minimizing the delay to clear buffered linearized runs
Expand Down Expand Up @@ -1173,7 +1182,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let step_type = select! {
recv(finished_blocked_task_receiver) -> executed_task => {
let Some(executed_task) = Self::accumulate_result_with_timings(
state_machine.mode(),
&context,
&mut result_with_timings,
executed_task.expect("alive handler"),
&mut ignored_error_count,
Expand Down Expand Up @@ -1221,7 +1230,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
},
recv(finished_idle_task_receiver) -> executed_task => {
let Some(executed_task) = Self::accumulate_result_with_timings(
state_machine.mode(),
&context,
&mut result_with_timings,
executed_task.expect("alive handler"),
&mut ignored_error_count,
Expand Down Expand Up @@ -1268,8 +1277,9 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
log_scheduler!(info, "started");

runnable_task_sender
.send_chained_channel(new_context, handler_count)
.send_chained_channel(new_context.clone(), handler_count)
.unwrap();
context = new_context;
result_with_timings = new_result_with_timings;
}
Err(_) => {
Expand Down

0 comments on commit 774b55b

Please sign in to comment.