diff --git a/src/bin/scheduler/scheduling/scheduler.rs b/src/bin/scheduler/scheduling/scheduler.rs index 48dc85a6..4ba1f085 100644 --- a/src/bin/scheduler/scheduling/scheduler.rs +++ b/src/bin/scheduler/scheduling/scheduler.rs @@ -3,8 +3,10 @@ use super::plans::run_plan; use crate::internal_config::{GlobalConfig, Plan}; use crate::logging::log_and_return_error; +use anyhow::anyhow; use chrono::Utc; -use log::{error, info}; +use log::info; +use robotmk::termination::Terminate; use std::collections::HashMap; use std::time::Duration; use tokio::task::{spawn_blocking, JoinSet}; @@ -43,7 +45,7 @@ pub async fn run_plans_and_cleanup(global_config: &GlobalConfig, plans: &[Plan]) info!("Received termination signal while scheduling, waiting for plans to terminate"); while let Some(outcome) = join_set.join_next().await { if let Err(error) = outcome { - error!("{error:?}"); + log_and_return_error(error); } } } @@ -67,7 +69,18 @@ async fn run_sequential_plan_group_scheduler( _ = cancellation_token.cancelled() => { return } }; for plan in plans.clone() { - let _ = spawn_blocking(move || run_plan(&plan).map_err(log_and_return_error)).await; + let plan_id = plan.id.clone(); + match spawn_blocking(move || run_plan(&plan).map_err(log_and_return_error)).await { + Ok(Err(Terminate::Cancelled)) => { + return; + } + Err(error) => { + log_and_return_error(anyhow!(error).context(format!( + "Task for plan {plan_id} failed to execute to completion" + ))); + } + _ => {} + } } } } @@ -80,7 +93,13 @@ async fn run_cleanup_job(cancellation_token: CancellationToken, plans: Vec _ = clock.tick() => { } _ = cancellation_token.cancelled() => { return } }; - spawn_blocking(move || cleanup_working_directories(plans.iter())); + let _ = spawn_blocking(move || cleanup_working_directories(plans.iter())) + .await + .map_err(|err| { + log_and_return_error( + anyhow!(err).context("Cleanup task failed to execute to completion"), + ) + }); } }