Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small improvements #645

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/bin/scheduler/logging.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::Error;
use camino::Utf8PathBuf;
use flexi_logger::{
Age, Cleanup, Criterion, DeferredNow, FileSpec, FlexiLoggerError, LogSpecification, Logger,
LoggerHandle, Naming, Record,
};
use log::error;
use std::fmt::Debug;

pub const TIMESTAMP_FORMAT: &str = "%Y-%m-%dT%H.%M.%S%.f%z";

Expand Down Expand Up @@ -43,7 +43,10 @@ pub fn init(
.start()
}

pub fn log_and_return_error(error: Error) -> Error {
pub fn log_and_return_error<T>(error: T) -> T
where
T: Debug,
{
error!("{error:?}");
error
}
12 changes: 7 additions & 5 deletions src/bin/scheduler/scheduling/plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use crate::logging::TIMESTAMP_FORMAT;
use robotmk::plans::run_attempts_with_rebot;
use robotmk::results::{AttemptsConfig, PlanExecutionReport};

use anyhow::{Context, Result as AnyhowResult};
use anyhow::Context;
use chrono::Utc;
use log::info;
use robotmk::section::WritePiggybackSection;
use robotmk::termination::{ContextUnrecoverable, Terminate};
use std::fs::create_dir_all;

pub fn run_plan(plan: &Plan) -> AnyhowResult<()> {
pub fn run_plan(plan: &Plan) -> Result<(), Terminate> {
info!(
"Running plan {} ({})",
&plan.id,
Expand All @@ -21,7 +22,7 @@ pub fn run_plan(plan: &Plan) -> AnyhowResult<()> {
plan.host.clone(),
&plan.results_directory_locker,
)
.context("Reporting plan results failed")?;
.context_unrecoverable("Reporting plan results failed")?;
info!("Plan {} finished", &plan.id);

Ok(())
Expand All @@ -48,7 +49,7 @@ fn format_source_for_logging(source: &Source) -> String {
}
}

fn produce_plan_results(plan: &Plan) -> AnyhowResult<PlanExecutionReport> {
fn produce_plan_results(plan: &Plan) -> Result<PlanExecutionReport, Terminate> {
let timestamp = Utc::now();
let output_directory = plan
.working_directory
Expand All @@ -68,7 +69,8 @@ fn produce_plan_results(plan: &Plan) -> AnyhowResult<PlanExecutionReport> {
&plan.cancellation_token,
&output_directory,
)
.context("Received termination signal while running plan")?;
.map_err(|cancelled| cancelled.into())
.context_unrecoverable("Received termination signal while running plan")?;

Ok(PlanExecutionReport {
plan_id: plan.id.clone(),
Expand Down
27 changes: 23 additions & 4 deletions src/bin/scheduler/scheduling/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use super::plans::run_plan;
use crate::internal_config::{GlobalConfig, Plan};
use crate::logging::log_and_return_error;

use anyhow::anyhow;
use chrono::Utc;
use log::{error, info};
use log::info;
use robotmk::termination::Terminate;
use std::collections::HashMap;
use std::time::Duration;
use tokio::task::{spawn_blocking, JoinSet};
Expand Down Expand Up @@ -43,7 +45,7 @@ pub async fn run_plans_and_cleanup(global_config: &GlobalConfig, plans: &[Plan])
info!("Received termination signal while scheduling, waiting for plans to terminate");
while let Some(outcome) = join_set.join_next().await {
if let Err(error) = outcome {
error!("{error:?}");
log_and_return_error(error);
jherbel marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand All @@ -67,7 +69,18 @@ async fn run_sequential_plan_group_scheduler(
_ = cancellation_token.cancelled() => { return }
};
for plan in plans.clone() {
let _ = spawn_blocking(move || run_plan(&plan).map_err(log_and_return_error)).await;
let plan_id = plan.id.clone();
match spawn_blocking(move || run_plan(&plan).map_err(log_and_return_error)).await {
Ok(Err(Terminate::Cancelled)) => {
return;
}
Err(error) => {
SoloJacobs marked this conversation as resolved.
Show resolved Hide resolved
log_and_return_error(anyhow!(error).context(format!(
"Task for plan {plan_id} failed to execute to completion"
)));
}
_ => {}
}
}
}
}
Expand All @@ -80,7 +93,13 @@ async fn run_cleanup_job(cancellation_token: CancellationToken, plans: Vec<Plan>
_ = clock.tick() => { }
_ = cancellation_token.cancelled() => { return }
};
spawn_blocking(move || cleanup_working_directories(plans.iter()));
jherbel marked this conversation as resolved.
Show resolved Hide resolved
let _ = spawn_blocking(move || cleanup_working_directories(plans.iter()))
.await
.map_err(|err| {
log_and_return_error(
jherbel marked this conversation as resolved.
Show resolved Hide resolved
anyhow!(err).context("Cleanup task failed to execute to completion"),
)
});
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/bin/scheduler/setup/directories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use robotmk::environment::Environment;
use robotmk::fs::{create_dir_all, remove_dir_all, remove_file};
use robotmk::results::{plan_results_directory, SetupFailure};
use robotmk::session::Session;
use robotmk::termination::Cancelled;
use robotmk::termination::Terminate;
use robotmk::termination::{Cancelled, ContextUnrecoverable, Terminate};
use std::collections::HashSet;

pub fn setup(
Expand Down Expand Up @@ -61,10 +60,11 @@ fn setup_managed_directory(managed_directory: &Utf8Path) -> AnyhowResult<()> {
Ok(())
}

fn setup_results_directory(global_config: &GlobalConfig, plans: &[Plan]) -> AnyhowResult<()> {
fn setup_results_directory(global_config: &GlobalConfig, plans: &[Plan]) -> Result<(), Terminate> {
create_dir_all(&global_config.results_directory)?;
create_dir_all(plan_results_directory(&global_config.results_directory))?;
clean_up_results_directory(global_config, plans).context("Failed to clean up results directory")
clean_up_results_directory(global_config, plans)
.context_unrecoverable("Failed to clean up results directory")
}

fn clean_up_results_directory(
Expand Down
19 changes: 19 additions & 0 deletions src/termination.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::fmt::Display;
use std::future::Future;
use std::time::Duration;
use sysinfo::{Pid, Process, ProcessesToUpdate, System};
Expand All @@ -14,6 +15,24 @@ pub enum Terminate {
Unrecoverable(#[from] anyhow::Error),
}

pub trait ContextUnrecoverable<T> {
fn context_unrecoverable<C>(self, context: C) -> Result<T, Terminate>
where
C: Display + Send + Sync + 'static;
}

impl<T> ContextUnrecoverable<T> for Result<T, Terminate> {
fn context_unrecoverable<C>(self, context: C) -> Result<T, Terminate>
where
C: Display + Send + Sync + 'static,
{
self.map_err(|err| match err {
Terminate::Unrecoverable(any) => Terminate::Unrecoverable(any.context(context)),
Terminate::Cancelled => Terminate::Cancelled,
})
}
}

#[derive(Error, Debug, Clone)]
#[error("Terminated")]
pub struct Cancelled;
Expand Down