Skip to content

Commit

Permalink
remove TerminationFlag
Browse files Browse the repository at this point in the history
  • Loading branch information
SoloJacobs committed Nov 13, 2023
1 parent b122f1b commit 314432e
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 62 deletions.
7 changes: 4 additions & 3 deletions v2/robotmk/src/bin/scheduler/child_process_supervisor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::command_spec::CommandSpec;
use super::termination::kill_process_tree;
use robotmk::termination::{waited, Outcome, TerminationFlag};
use robotmk::termination::{waited, Outcome};

use anyhow::{Context, Result};
use camino::Utf8PathBuf;
Expand All @@ -9,12 +9,13 @@ use std::process::{ExitStatus, Stdio};
use std::time::Duration;
use sysinfo::{Pid, PidExt};
use tokio::process::{Child, Command};
use tokio_util::sync::CancellationToken;

pub struct ChildProcessSupervisor<'a> {
pub command_spec: &'a CommandSpec,
pub stdio_paths: Option<StdioPaths>,
pub timeout: u64,
pub termination_flag: &'a TerminationFlag,
pub termination_flag: &'a CancellationToken,
}

pub struct StdioPaths {
Expand All @@ -24,7 +25,7 @@ pub struct StdioPaths {

fn wait_for_child(
duration: Duration,
flag: &TerminationFlag,
flag: &CancellationToken,
child: &mut Child,
) -> Result<ChildProcessOutcome> {
match waited(duration, flag, child.wait()) {
Expand Down
11 changes: 5 additions & 6 deletions v2/robotmk/src/bin/scheduler/internal_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ use robotmk::{
config::{Config, WorkingDirectoryCleanupConfig},
lock::Locker,
section::Host,
termination::TerminationFlag,
};

use camino::Utf8PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use tokio_util::sync::CancellationToken;

pub struct GlobalConfig {
pub working_directory: Utf8PathBuf,
pub results_directory: Utf8PathBuf,
pub rcc_binary_path: Utf8PathBuf,
pub termination_flag: TerminationFlag,
pub termination_flag: CancellationToken,
pub results_directory_locker: Locker,
}

Expand All @@ -32,15 +32,15 @@ pub struct Suite {
pub environment: Environment,
pub session: Session,
pub working_directory_cleanup_config: WorkingDirectoryCleanupConfig,
pub termination_flag: TerminationFlag,
pub termination_flag: CancellationToken,
pub parallelism_protection: Arc<Mutex<usize>>,
pub host: Host,
pub results_directory_locker: Locker,
}

pub fn from_external_config(
external_config: Config,
termination_flag: TerminationFlag,
termination_flag: CancellationToken,
results_directory_locker: Locker,
) -> (GlobalConfig, Vec<Suite>) {
let mut suites: Vec<Suite> = external_config
Expand Down Expand Up @@ -103,7 +103,6 @@ mod tests {
};

use std::collections::HashMap;
use std::sync::{atomic::AtomicBool, Arc};

fn system_suite_config() -> SuiteConfig {
SuiteConfig {
Expand Down Expand Up @@ -151,7 +150,7 @@ mod tests {

#[test]
fn test_from_external_config() {
let termination_flag = TerminationFlag::new(Arc::new(AtomicBool::new(false)));
let termination_flag = CancellationToken::new();
let (global_config, suites) = from_external_config(
Config {
working_directory: Utf8PathBuf::from("/working"),
Expand Down
6 changes: 3 additions & 3 deletions v2/robotmk/src/bin/scheduler/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ fn run() -> Result<()> {
Locker::new(&args.config_path, Some(&termination_flag)),
);

if global_config.termination_flag.should_terminate() {
if global_config.termination_flag.is_cancelled() {
bail!("Terminated")
}

write_state(&results::SchedulerState::Setup, &global_config)?;
let suites = setup::setup(&global_config, suites)?;
debug!("Setup completed");

if global_config.termination_flag.should_terminate() {
if global_config.termination_flag.is_cancelled() {
bail!("Terminated")
}

Expand All @@ -62,7 +62,7 @@ fn run() -> Result<()> {
let suites = environment::build_environments(&global_config, suites)?;
info!("Environment building finished");

if global_config.termination_flag.should_terminate() {
if global_config.termination_flag.is_cancelled() {
bail!("Terminated")
}

Expand Down
2 changes: 1 addition & 1 deletion v2/robotmk/src/bin/scheduler/scheduling/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn run_suites_and_cleanup(global_config: &GlobalConfig, suites: &[Suite]) ->
.run(move || run_cleanup_working_directories_in_new_thread(suites_for_cleanup.clone()));

loop {
if global_config.termination_flag.should_terminate() {
if global_config.termination_flag.is_cancelled() {
error!("Received termination signal while scheduling, waiting for suites to terminate");
wait_until_all_suites_have_terminated(suites);
bail!("Terminated");
Expand Down
5 changes: 3 additions & 2 deletions v2/robotmk/src/bin/scheduler/sessions/schtasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::session::RunOutcome;
use crate::command_spec::CommandSpec;
use crate::logging::log_and_return_error;
use crate::termination::kill_process_tree;
use robotmk::termination::{waited, Outcome, TerminationFlag};
use robotmk::termination::{waited, Outcome};

use anyhow::{bail, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
Expand All @@ -15,6 +15,7 @@ use std::thread::sleep;
use std::time::Duration;
use sysinfo::Pid;
use tokio::task::yield_now;
use tokio_util::sync::CancellationToken;

fn wait_for_task_exit(task: &TaskSpec, paths: &Paths) -> Result<RunOutcome> {
let duration = Duration::from_secs(task.timeout);
Expand Down Expand Up @@ -64,7 +65,7 @@ pub struct TaskSpec<'a> {
pub user_name: &'a str,
pub base_path: &'a Utf8Path,
pub timeout: u64,
pub termination_flag: &'a TerminationFlag,
pub termination_flag: &'a CancellationToken,
}

#[cfg_attr(test, derive(Debug, PartialEq))]
Expand Down
5 changes: 3 additions & 2 deletions v2/robotmk/src/bin/scheduler/sessions/session.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::schtasks::{run_task, TaskSpec};
use crate::child_process_supervisor::{ChildProcessOutcome, ChildProcessSupervisor, StdioPaths};
use crate::command_spec::CommandSpec;
use robotmk::{config::SessionConfig, termination::TerminationFlag};
use robotmk::config::SessionConfig;

use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
use std::fmt::{Display, Formatter, Result as FmtResult};
use tokio_util::sync::CancellationToken;

#[derive(Clone, Eq, Hash, PartialEq)]
#[cfg_attr(test, derive(Debug))]
Expand Down Expand Up @@ -72,7 +73,7 @@ pub struct RunSpec<'a> {
pub command_spec: &'a CommandSpec,
pub base_path: &'a Utf8Path,
pub timeout: u64,
pub termination_flag: &'a TerminationFlag,
pub termination_flag: &'a CancellationToken,
}

pub enum RunOutcome {
Expand Down
28 changes: 13 additions & 15 deletions v2/robotmk/src/bin/scheduler/termination.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use log::debug;
use robotmk::termination::TerminationFlag;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{sleep, spawn};
use std::time::Duration;
use sysinfo::{Pid, Process, ProcessExt, System, SystemExt};
use tokio_util::sync::CancellationToken;

pub fn start_termination_control(run_flag_file: Option<Utf8PathBuf>) -> Result<TerminationFlag> {
let raw_flag = Arc::new(AtomicBool::new(false));
let raw_flag_clone = raw_flag.clone();
ctrlc::set_handler(move || {
raw_flag_clone.store(true, Ordering::Relaxed);
})
.context("Failed to register signal handler for CTRL+C")?;
pub fn start_termination_control(run_flag_file: Option<Utf8PathBuf>) -> Result<CancellationToken> {
let token = CancellationToken::new();
watch_ctrcl(token.clone()).context("Failed to register signal handler for CTRL+C")?;
if let Some(run_flag_file) = run_flag_file {
start_run_flag_watch_thread(run_flag_file, raw_flag.clone());
start_run_flag_watch_thread(run_flag_file, token.clone());
}
Ok(TerminationFlag::new(raw_flag))
Ok(token)
}

fn start_run_flag_watch_thread(file: Utf8PathBuf, raw_termination_flag: Arc<AtomicBool>) {
fn watch_ctrcl(token: CancellationToken) -> Result<(), ctrlc::Error> {
ctrlc::set_handler(move || token.cancel())
}

fn start_run_flag_watch_thread(file: Utf8PathBuf, token: CancellationToken) {
spawn(move || {
debug!("Watching {file}");
while file.exists() {
sleep(Duration::from_millis(250));
}
debug!("{file} not found, raising termination flag");
raw_termination_flag.store(true, Ordering::Relaxed)
token.cancel()
});
}

Expand Down Expand Up @@ -93,7 +91,7 @@ mod tests {
)?))?;
run_flag_temp_path.close()?;
sleep(Duration::from_millis(500));
assert!(termination_flag.should_terminate());
assert!(termination_flag.is_cancelled());
Ok(())
}
}
11 changes: 5 additions & 6 deletions v2/robotmk/src/lock.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
use super::termination::TerminationFlag;

use anyhow::{bail, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use fs4::{lock_contended_error, FileExt};
use log::debug;
use std::fs::File;
use std::io::Result as IOResult;
use std::time::Duration;
use tokio_util::sync::CancellationToken;

#[derive(Clone)]
pub struct Locker {
lock_path: Utf8PathBuf,
termination_flag: Option<TerminationFlag>,
termination_flag: Option<CancellationToken>,
}

pub struct Lock(File);

impl Locker {
pub fn new(
lock_path: impl AsRef<Utf8Path>,
termination_flag: Option<&TerminationFlag>,
termination_flag: Option<&CancellationToken>,
) -> Self {
Self {
lock_path: lock_path.as_ref().to_owned(),
Expand Down Expand Up @@ -64,10 +63,10 @@ impl Locker {

fn lock_manual_loop(
lock_tryer: &dyn Fn() -> IOResult<()>,
termination_flag: &TerminationFlag,
termination_flag: &CancellationToken,
) -> Result<()> {
loop {
if termination_flag.should_terminate() {
if termination_flag.is_cancelled() {
bail!("Terminated")
}
match lock_tryer() {
Expand Down
32 changes: 8 additions & 24 deletions v2/robotmk/src/termination.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,7 @@
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::{task::yield_now, time::sleep};

#[derive(Clone)]
pub struct TerminationFlag(Arc<AtomicBool>);

impl TerminationFlag {
pub fn new(raw_flag: Arc<AtomicBool>) -> Self {
Self(raw_flag)
}

pub fn should_terminate(&self) -> bool {
self.0.load(Ordering::Relaxed)
}
}
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;

pub enum Outcome<T> {
Cancel,
Expand All @@ -24,19 +10,17 @@ pub enum Outcome<T> {
}

#[tokio::main]
pub async fn waited<F>(duration: Duration, flag: &TerminationFlag, future: F) -> Outcome<F::Output>
pub async fn waited<F>(
duration: Duration,
flag: &CancellationToken,
future: F,
) -> Outcome<F::Output>
where
F: Future,
{
async fn cancelled(flag: &TerminationFlag) {
while !flag.should_terminate() {
yield_now().await
}
}

tokio::select! {
outcome = future => { Outcome::Completed(outcome) },
_ = cancelled(flag) => { Outcome::Cancel },
_ = flag.cancelled() => { Outcome::Cancel },
_ = sleep(duration) => { Outcome::Timeout },
}
}

0 comments on commit 314432e

Please sign in to comment.