diff --git a/v2/robotmk/src/bin/scheduler/child_process_supervisor.rs b/v2/robotmk/src/bin/scheduler/child_process_supervisor.rs index a276fa1d..4fbf5f73 100644 --- a/v2/robotmk/src/bin/scheduler/child_process_supervisor.rs +++ b/v2/robotmk/src/bin/scheduler/child_process_supervisor.rs @@ -1,6 +1,6 @@ use super::command_spec::CommandSpec; use super::termination::kill_process_tree; -use robotmk::termination::{waited, Outcome, TerminationFlag}; +use robotmk::termination::{waited, Outcome}; use anyhow::{Context, Result}; use camino::Utf8PathBuf; @@ -9,12 +9,13 @@ use std::process::{ExitStatus, Stdio}; use std::time::Duration; use sysinfo::{Pid, PidExt}; use tokio::process::{Child, Command}; +use tokio_util::sync::CancellationToken; pub struct ChildProcessSupervisor<'a> { pub command_spec: &'a CommandSpec, pub stdio_paths: Option, pub timeout: u64, - pub termination_flag: &'a TerminationFlag, + pub termination_flag: &'a CancellationToken, } pub struct StdioPaths { @@ -24,7 +25,7 @@ pub struct StdioPaths { fn wait_for_child( duration: Duration, - flag: &TerminationFlag, + flag: &CancellationToken, child: &mut Child, ) -> Result { match waited(duration, flag, child.wait()) { diff --git a/v2/robotmk/src/bin/scheduler/internal_config.rs b/v2/robotmk/src/bin/scheduler/internal_config.rs index 627b71c0..9755ed2a 100644 --- a/v2/robotmk/src/bin/scheduler/internal_config.rs +++ b/v2/robotmk/src/bin/scheduler/internal_config.rs @@ -6,18 +6,18 @@ use robotmk::{ config::{Config, WorkingDirectoryCleanupConfig}, lock::Locker, section::Host, - termination::TerminationFlag, }; use camino::Utf8PathBuf; use std::sync::Arc; use std::sync::Mutex; +use tokio_util::sync::CancellationToken; pub struct GlobalConfig { pub working_directory: Utf8PathBuf, pub results_directory: Utf8PathBuf, pub rcc_binary_path: Utf8PathBuf, - pub termination_flag: TerminationFlag, + pub termination_flag: CancellationToken, pub results_directory_locker: Locker, } @@ -32,7 +32,7 @@ pub struct Suite { pub environment: Environment, pub session: Session, pub working_directory_cleanup_config: WorkingDirectoryCleanupConfig, - pub termination_flag: TerminationFlag, + pub termination_flag: CancellationToken, pub parallelism_protection: Arc>, pub host: Host, pub results_directory_locker: Locker, @@ -40,7 +40,7 @@ pub struct Suite { pub fn from_external_config( external_config: Config, - termination_flag: TerminationFlag, + termination_flag: CancellationToken, results_directory_locker: Locker, ) -> (GlobalConfig, Vec) { let mut suites: Vec = external_config @@ -103,7 +103,6 @@ mod tests { }; use std::collections::HashMap; - use std::sync::{atomic::AtomicBool, Arc}; fn system_suite_config() -> SuiteConfig { SuiteConfig { @@ -151,7 +150,7 @@ mod tests { #[test] fn test_from_external_config() { - let termination_flag = TerminationFlag::new(Arc::new(AtomicBool::new(false))); + let termination_flag = CancellationToken::new(); let (global_config, suites) = from_external_config( Config { working_directory: Utf8PathBuf::from("/working"), diff --git a/v2/robotmk/src/bin/scheduler/main.rs b/v2/robotmk/src/bin/scheduler/main.rs index f95d737d..454a75c5 100644 --- a/v2/robotmk/src/bin/scheduler/main.rs +++ b/v2/robotmk/src/bin/scheduler/main.rs @@ -42,7 +42,7 @@ fn run() -> Result<()> { Locker::new(&args.config_path, Some(&termination_flag)), ); - if global_config.termination_flag.should_terminate() { + if global_config.termination_flag.is_cancelled() { bail!("Terminated") } @@ -50,7 +50,7 @@ fn run() -> Result<()> { let suites = setup::setup(&global_config, suites)?; debug!("Setup completed"); - if global_config.termination_flag.should_terminate() { + if global_config.termination_flag.is_cancelled() { bail!("Terminated") } @@ -62,7 +62,7 @@ fn run() -> Result<()> { let suites = environment::build_environments(&global_config, suites)?; info!("Environment building finished"); - if global_config.termination_flag.should_terminate() { + if global_config.termination_flag.is_cancelled() { bail!("Terminated") } diff --git a/v2/robotmk/src/bin/scheduler/scheduling/scheduler.rs b/v2/robotmk/src/bin/scheduler/scheduling/scheduler.rs index 76dfbc55..1e298c21 100644 --- a/v2/robotmk/src/bin/scheduler/scheduling/scheduler.rs +++ b/v2/robotmk/src/bin/scheduler/scheduling/scheduler.rs @@ -25,7 +25,7 @@ pub fn run_suites_and_cleanup(global_config: &GlobalConfig, suites: &[Suite]) -> .run(move || run_cleanup_working_directories_in_new_thread(suites_for_cleanup.clone())); loop { - if global_config.termination_flag.should_terminate() { + if global_config.termination_flag.is_cancelled() { error!("Received termination signal while scheduling, waiting for suites to terminate"); wait_until_all_suites_have_terminated(suites); bail!("Terminated"); diff --git a/v2/robotmk/src/bin/scheduler/sessions/schtasks.rs b/v2/robotmk/src/bin/scheduler/sessions/schtasks.rs index 028ff4af..6fe5e9fb 100644 --- a/v2/robotmk/src/bin/scheduler/sessions/schtasks.rs +++ b/v2/robotmk/src/bin/scheduler/sessions/schtasks.rs @@ -2,7 +2,7 @@ use super::session::RunOutcome; use crate::command_spec::CommandSpec; use crate::logging::log_and_return_error; use crate::termination::kill_process_tree; -use robotmk::termination::{waited, Outcome, TerminationFlag}; +use robotmk::termination::{waited, Outcome}; use anyhow::{bail, Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; @@ -15,6 +15,7 @@ use std::thread::sleep; use std::time::Duration; use sysinfo::Pid; use tokio::task::yield_now; +use tokio_util::sync::CancellationToken; fn wait_for_task_exit(task: &TaskSpec, paths: &Paths) -> Result { let duration = Duration::from_secs(task.timeout); @@ -64,7 +65,7 @@ pub struct TaskSpec<'a> { pub user_name: &'a str, pub base_path: &'a Utf8Path, pub timeout: u64, - pub termination_flag: &'a TerminationFlag, + pub termination_flag: &'a CancellationToken, } #[cfg_attr(test, derive(Debug, PartialEq))] diff --git a/v2/robotmk/src/bin/scheduler/sessions/session.rs b/v2/robotmk/src/bin/scheduler/sessions/session.rs index d5927ec2..740e1ec7 100644 --- a/v2/robotmk/src/bin/scheduler/sessions/session.rs +++ b/v2/robotmk/src/bin/scheduler/sessions/session.rs @@ -1,11 +1,12 @@ use super::schtasks::{run_task, TaskSpec}; use crate::child_process_supervisor::{ChildProcessOutcome, ChildProcessSupervisor, StdioPaths}; use crate::command_spec::CommandSpec; -use robotmk::{config::SessionConfig, termination::TerminationFlag}; +use robotmk::config::SessionConfig; use anyhow::Result; use camino::{Utf8Path, Utf8PathBuf}; use std::fmt::{Display, Formatter, Result as FmtResult}; +use tokio_util::sync::CancellationToken; #[derive(Clone, Eq, Hash, PartialEq)] #[cfg_attr(test, derive(Debug))] @@ -72,7 +73,7 @@ pub struct RunSpec<'a> { pub command_spec: &'a CommandSpec, pub base_path: &'a Utf8Path, pub timeout: u64, - pub termination_flag: &'a TerminationFlag, + pub termination_flag: &'a CancellationToken, } pub enum RunOutcome { diff --git a/v2/robotmk/src/bin/scheduler/termination.rs b/v2/robotmk/src/bin/scheduler/termination.rs index 33d92d0b..505d5962 100644 --- a/v2/robotmk/src/bin/scheduler/termination.rs +++ b/v2/robotmk/src/bin/scheduler/termination.rs @@ -1,35 +1,33 @@ use anyhow::{Context, Result}; use camino::Utf8PathBuf; use log::debug; -use robotmk::termination::TerminationFlag; use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use std::thread::{sleep, spawn}; use std::time::Duration; use sysinfo::{Pid, Process, ProcessExt, System, SystemExt}; +use tokio_util::sync::CancellationToken; -pub fn start_termination_control(run_flag_file: Option) -> Result { - let raw_flag = Arc::new(AtomicBool::new(false)); - let raw_flag_clone = raw_flag.clone(); - ctrlc::set_handler(move || { - raw_flag_clone.store(true, Ordering::Relaxed); - }) - .context("Failed to register signal handler for CTRL+C")?; +pub fn start_termination_control(run_flag_file: Option) -> Result { + let token = CancellationToken::new(); + watch_ctrcl(token.clone()).context("Failed to register signal handler for CTRL+C")?; if let Some(run_flag_file) = run_flag_file { - start_run_flag_watch_thread(run_flag_file, raw_flag.clone()); + start_run_flag_watch_thread(run_flag_file, token.clone()); } - Ok(TerminationFlag::new(raw_flag)) + Ok(token) } -fn start_run_flag_watch_thread(file: Utf8PathBuf, raw_termination_flag: Arc) { +fn watch_ctrcl(token: CancellationToken) -> Result<(), ctrlc::Error> { + ctrlc::set_handler(move || token.cancel()) +} + +fn start_run_flag_watch_thread(file: Utf8PathBuf, token: CancellationToken) { spawn(move || { debug!("Watching {file}"); while file.exists() { sleep(Duration::from_millis(250)); } debug!("{file} not found, raising termination flag"); - raw_termination_flag.store(true, Ordering::Relaxed) + token.cancel() }); } @@ -93,7 +91,7 @@ mod tests { )?))?; run_flag_temp_path.close()?; sleep(Duration::from_millis(500)); - assert!(termination_flag.should_terminate()); + assert!(termination_flag.is_cancelled()); Ok(()) } } diff --git a/v2/robotmk/src/lock.rs b/v2/robotmk/src/lock.rs index 6b8cef4e..96667d6b 100644 --- a/v2/robotmk/src/lock.rs +++ b/v2/robotmk/src/lock.rs @@ -1,5 +1,3 @@ -use super::termination::TerminationFlag; - use anyhow::{bail, Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; use fs4::{lock_contended_error, FileExt}; @@ -7,11 +5,12 @@ use log::debug; use std::fs::File; use std::io::Result as IOResult; use std::time::Duration; +use tokio_util::sync::CancellationToken; #[derive(Clone)] pub struct Locker { lock_path: Utf8PathBuf, - termination_flag: Option, + termination_flag: Option, } pub struct Lock(File); @@ -19,7 +18,7 @@ pub struct Lock(File); impl Locker { pub fn new( lock_path: impl AsRef, - termination_flag: Option<&TerminationFlag>, + termination_flag: Option<&CancellationToken>, ) -> Self { Self { lock_path: lock_path.as_ref().to_owned(), @@ -64,10 +63,10 @@ impl Locker { fn lock_manual_loop( lock_tryer: &dyn Fn() -> IOResult<()>, - termination_flag: &TerminationFlag, + termination_flag: &CancellationToken, ) -> Result<()> { loop { - if termination_flag.should_terminate() { + if termination_flag.is_cancelled() { bail!("Terminated") } match lock_tryer() { diff --git a/v2/robotmk/src/termination.rs b/v2/robotmk/src/termination.rs index 38c92989..5e287f96 100644 --- a/v2/robotmk/src/termination.rs +++ b/v2/robotmk/src/termination.rs @@ -1,21 +1,7 @@ use std::future::Future; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use std::time::Duration; -use tokio::{task::yield_now, time::sleep}; - -#[derive(Clone)] -pub struct TerminationFlag(Arc); - -impl TerminationFlag { - pub fn new(raw_flag: Arc) -> Self { - Self(raw_flag) - } - - pub fn should_terminate(&self) -> bool { - self.0.load(Ordering::Relaxed) - } -} +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; pub enum Outcome { Cancel, @@ -24,19 +10,17 @@ pub enum Outcome { } #[tokio::main] -pub async fn waited(duration: Duration, flag: &TerminationFlag, future: F) -> Outcome +pub async fn waited( + duration: Duration, + flag: &CancellationToken, + future: F, +) -> Outcome where F: Future, { - async fn cancelled(flag: &TerminationFlag) { - while !flag.should_terminate() { - yield_now().await - } - } - tokio::select! { outcome = future => { Outcome::Completed(outcome) }, - _ = cancelled(flag) => { Outcome::Cancel }, + _ = flag.cancelled() => { Outcome::Cancel }, _ = sleep(duration) => { Outcome::Timeout }, } }