Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove TerminationFlag #440

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions v2/robotmk/src/bin/scheduler/child_process_supervisor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::command_spec::CommandSpec;
use super::termination::kill_process_tree;
use robotmk::termination::{waited, Outcome, TerminationFlag};
use robotmk::termination::{waited, Outcome};

use anyhow::{Context, Result};
use camino::Utf8PathBuf;
Expand All @@ -9,12 +9,13 @@ use std::process::{ExitStatus, Stdio};
use std::time::Duration;
use sysinfo::{Pid, PidExt};
use tokio::process::{Child, Command};
use tokio_util::sync::CancellationToken;

pub struct ChildProcessSupervisor<'a> {
pub command_spec: &'a CommandSpec,
pub stdio_paths: Option<StdioPaths>,
pub timeout: u64,
pub termination_flag: &'a TerminationFlag,
pub cancellation_token: &'a CancellationToken,
}

pub struct StdioPaths {
Expand All @@ -24,7 +25,7 @@ pub struct StdioPaths {

fn wait_for_child(
duration: Duration,
flag: &TerminationFlag,
flag: &CancellationToken,
child: &mut Child,
) -> Result<ChildProcessOutcome> {
match waited(duration, flag, child.wait()) {
Expand Down Expand Up @@ -64,7 +65,7 @@ impl ChildProcessSupervisor<'_> {

wait_for_child(
Duration::from_secs(self.timeout),
self.termination_flag,
self.cancellation_token,
&mut command.spawn().context("Failed to spawn subprocess")?,
)
}
Expand Down
2 changes: 1 addition & 1 deletion v2/robotmk/src/bin/scheduler/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn build_environment(
stderr: stdio_directory.join(format!("{}.stderr", suite.id)),
}),
timeout: build_instructions.timeout,
termination_flag: &suite.termination_flag,
cancellation_token: &suite.cancellation_token,
},
start_time,
)?;
Expand Down
19 changes: 9 additions & 10 deletions v2/robotmk/src/bin/scheduler/internal_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ use robotmk::{
config::{Config, WorkingDirectoryCleanupConfig},
lock::Locker,
section::Host,
termination::TerminationFlag,
};

use camino::Utf8PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use tokio_util::sync::CancellationToken;

pub struct GlobalConfig {
pub working_directory: Utf8PathBuf,
pub results_directory: Utf8PathBuf,
pub rcc_binary_path: Utf8PathBuf,
pub termination_flag: TerminationFlag,
pub cancellation_token: CancellationToken,
pub results_directory_locker: Locker,
}

Expand All @@ -32,15 +32,15 @@ pub struct Suite {
pub environment: Environment,
pub session: Session,
pub working_directory_cleanup_config: WorkingDirectoryCleanupConfig,
pub termination_flag: TerminationFlag,
pub cancellation_token: CancellationToken,
pub parallelism_protection: Arc<Mutex<usize>>,
pub host: Host,
pub results_directory_locker: Locker,
}

pub fn from_external_config(
external_config: Config,
termination_flag: TerminationFlag,
cancellation_token: CancellationToken,
results_directory_locker: Locker,
) -> (GlobalConfig, Vec<Suite>) {
let mut suites: Vec<Suite> = external_config
Expand Down Expand Up @@ -69,7 +69,7 @@ pub fn from_external_config(
),
session: Session::new(&suite_config.session_config),
working_directory_cleanup_config: suite_config.working_directory_cleanup_config,
termination_flag: termination_flag.clone(),
cancellation_token: cancellation_token.clone(),
parallelism_protection: Arc::new(Mutex::new(0)),
host: suite_config.host,
results_directory_locker: results_directory_locker.clone(),
Expand All @@ -81,7 +81,7 @@ pub fn from_external_config(
working_directory: external_config.working_directory,
results_directory: external_config.results_directory,
rcc_binary_path: external_config.rcc_binary_path,
termination_flag,
cancellation_token,
results_directory_locker,
},
suites,
Expand All @@ -103,7 +103,6 @@ mod tests {
};

use std::collections::HashMap;
use std::sync::{atomic::AtomicBool, Arc};

fn system_suite_config() -> SuiteConfig {
SuiteConfig {
Expand Down Expand Up @@ -151,7 +150,7 @@ mod tests {

#[test]
fn test_from_external_config() {
let termination_flag = TerminationFlag::new(Arc::new(AtomicBool::new(false)));
let cancellation_token = CancellationToken::new();
let (global_config, suites) = from_external_config(
Config {
working_directory: Utf8PathBuf::from("/working"),
Expand All @@ -162,8 +161,8 @@ mod tests {
(String::from("rcc"), rcc_suite_config()),
]),
},
termination_flag.clone(),
Locker::new("/config.json", Some(&termination_flag)),
cancellation_token.clone(),
Locker::new("/config.json", Some(&cancellation_token)),
);
assert_eq!(global_config.working_directory, "/working");
assert_eq!(global_config.results_directory, "/results");
Expand Down
12 changes: 6 additions & 6 deletions v2/robotmk/src/bin/scheduler/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ fn run() -> Result<()> {
robotmk::config::load(&args.config_path).context("Configuration loading failed")?;
debug!("Configuration loaded");

let termination_flag = termination::start_termination_control(args.run_flag)
let cancellation_token = termination::start_termination_control(args.run_flag)
.context("Failed to set up termination control")?;
debug!("Termination control set up");

let (global_config, suites) = internal_config::from_external_config(
external_config,
termination_flag.clone(),
Locker::new(&args.config_path, Some(&termination_flag)),
cancellation_token.clone(),
Locker::new(&args.config_path, Some(&cancellation_token)),
);

if global_config.termination_flag.should_terminate() {
if global_config.cancellation_token.is_cancelled() {
bail!("Terminated")
}

Expand All @@ -52,7 +52,7 @@ fn run() -> Result<()> {
let suites = setup::rcc::setup(&global_config, suites).context("RCC-specific setup failed")?;
debug!("RCC-specific setup completed");

if global_config.termination_flag.should_terminate() {
if global_config.cancellation_token.is_cancelled() {
bail!("Terminated")
}

Expand All @@ -64,7 +64,7 @@ fn run() -> Result<()> {
let suites = environment::build_environments(&global_config, suites)?;
info!("Environment building finished");

if global_config.termination_flag.should_terminate() {
if global_config.cancellation_token.is_cancelled() {
bail!("Terminated")
}

Expand Down
2 changes: 1 addition & 1 deletion v2/robotmk/src/bin/scheduler/scheduling/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn run_suites_and_cleanup(global_config: &GlobalConfig, suites: &[Suite]) ->
.run(move || run_cleanup_working_directories_in_new_thread(suites_for_cleanup.clone()));

loop {
if global_config.termination_flag.should_terminate() {
if global_config.cancellation_token.is_cancelled() {
error!("Received termination signal while scheduling, waiting for suites to terminate");
wait_until_all_suites_have_terminated(suites);
bail!("Terminated");
Expand Down
2 changes: 1 addition & 1 deletion v2/robotmk/src/bin/scheduler/scheduling/suites.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn run_attempt(
command_spec: &suite.environment.wrap(attempt.command_spec),
base_path: &output_directory.join(attempt.index.to_string()),
timeout: suite.timeout,
termination_flag: &suite.termination_flag,
cancellation_token: &suite.cancellation_token,
}) {
Ok(run_outcome) => run_outcome,
Err(error_) => {
Expand Down
7 changes: 4 additions & 3 deletions v2/robotmk/src/bin/scheduler/sessions/schtasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::session::RunOutcome;
use crate::command_spec::CommandSpec;
use crate::logging::log_and_return_error;
use crate::termination::kill_process_tree;
use robotmk::termination::{waited, Outcome, TerminationFlag};
use robotmk::termination::{waited, Outcome};

use anyhow::{bail, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
Expand All @@ -14,11 +14,12 @@ use std::str::FromStr;
use std::time::Duration;
use sysinfo::Pid;
use tokio::task::yield_now;
use tokio_util::sync::CancellationToken;

fn wait_for_task_exit(task: &TaskSpec, paths: &Paths) -> Result<RunOutcome> {
let duration = Duration::from_secs(task.timeout);
let queried = query(task.task_name, &paths.exit_code);
match waited(duration, task.termination_flag, queried) {
match waited(duration, task.cancellation_token, queried) {
Outcome::Cancel => {
kill_and_delete_task(task.task_name, paths);
Ok(RunOutcome::TimedOut)
Expand Down Expand Up @@ -60,7 +61,7 @@ pub struct TaskSpec<'a> {
pub user_name: &'a str,
pub base_path: &'a Utf8Path,
pub timeout: u64,
pub termination_flag: &'a TerminationFlag,
pub cancellation_token: &'a CancellationToken,
}

#[cfg_attr(test, derive(Debug, PartialEq))]
Expand Down
9 changes: 5 additions & 4 deletions v2/robotmk/src/bin/scheduler/sessions/session.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::schtasks::{run_task, TaskSpec};
use crate::child_process_supervisor::{ChildProcessOutcome, ChildProcessSupervisor, StdioPaths};
use crate::command_spec::CommandSpec;
use robotmk::{config::SessionConfig, termination::TerminationFlag};
use robotmk::config::SessionConfig;

use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
use std::fmt::{Display, Formatter, Result as FmtResult};
use tokio_util::sync::CancellationToken;

#[derive(Clone, Eq, Hash, PartialEq)]
#[cfg_attr(test, derive(Debug))]
Expand Down Expand Up @@ -72,7 +73,7 @@ pub struct RunSpec<'a> {
pub command_spec: &'a CommandSpec,
pub base_path: &'a Utf8Path,
pub timeout: u64,
pub termination_flag: &'a TerminationFlag,
pub cancellation_token: &'a CancellationToken,
}

pub enum RunOutcome {
Expand All @@ -90,7 +91,7 @@ impl CurrentSession {
stderr: Utf8PathBuf::from(format!("{}.stderr", spec.base_path)),
}),
timeout: spec.timeout,
termination_flag: spec.termination_flag,
cancellation_token: spec.cancellation_token,
}
.run())?
{
Expand All @@ -112,7 +113,7 @@ impl UserSession {
user_name: &self.user_name,
base_path: spec.base_path,
timeout: spec.timeout,
termination_flag: spec.termination_flag,
cancellation_token: spec.cancellation_token,
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions v2/robotmk/src/bin/scheduler/setup/rcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ fn run_command_spec_once_in_current_session(
command_spec,
base_path: &rcc_setup_working_directory(&global_config.working_directory).join(id),
timeout: 120,
termination_flag: &global_config.termination_flag,
cancellation_token: &global_config.cancellation_token,
},
)? {
(suites, vec![])
Expand Down Expand Up @@ -241,7 +241,7 @@ fn run_command_spec_per_session(
base_path: &rcc_setup_working_directory(&global_config.working_directory)
.join(session_id),
timeout: 120,
termination_flag: &global_config.termination_flag,
cancellation_token: &global_config.cancellation_token,
},
)? {
succesful_suites.extend(suites);
Expand Down
30 changes: 14 additions & 16 deletions v2/robotmk/src/bin/scheduler/termination.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use log::debug;
use robotmk::termination::TerminationFlag;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{sleep, spawn};
use std::time::Duration;
use sysinfo::{Pid, Process, ProcessExt, System, SystemExt};
use tokio_util::sync::CancellationToken;

pub fn start_termination_control(run_flag_file: Option<Utf8PathBuf>) -> Result<TerminationFlag> {
let raw_flag = Arc::new(AtomicBool::new(false));
let raw_flag_clone = raw_flag.clone();
ctrlc::set_handler(move || {
raw_flag_clone.store(true, Ordering::Relaxed);
})
.context("Failed to register signal handler for CTRL+C")?;
pub fn start_termination_control(run_flag_file: Option<Utf8PathBuf>) -> Result<CancellationToken> {
let token = CancellationToken::new();
watch_ctrlc(token.clone()).context("Failed to register signal handler for CTRL+C")?;
if let Some(run_flag_file) = run_flag_file {
start_run_flag_watch_thread(run_flag_file, raw_flag.clone());
start_run_flag_watch_thread(run_flag_file, token.clone());
}
Ok(TerminationFlag::new(raw_flag))
Ok(token)
}

fn start_run_flag_watch_thread(file: Utf8PathBuf, raw_termination_flag: Arc<AtomicBool>) {
fn watch_ctrlc(token: CancellationToken) -> Result<(), ctrlc::Error> {
ctrlc::set_handler(move || token.cancel())
}

fn start_run_flag_watch_thread(file: Utf8PathBuf, token: CancellationToken) {
spawn(move || {
debug!("Watching {file}");
while file.exists() {
sleep(Duration::from_millis(250));
}
debug!("{file} not found, raising termination flag");
raw_termination_flag.store(true, Ordering::Relaxed)
token.cancel()
});
}

Expand Down Expand Up @@ -88,12 +86,12 @@ mod tests {
#[test]
fn run_flag_file() -> Result<()> {
let run_flag_temp_path = NamedTempFile::new()?.into_temp_path();
let termination_flag = start_termination_control(Some(Utf8PathBuf::try_from(
let cancellation_token = start_termination_control(Some(Utf8PathBuf::try_from(
run_flag_temp_path.to_path_buf(),
)?))?;
run_flag_temp_path.close()?;
sleep(Duration::from_millis(500));
assert!(termination_flag.should_terminate());
assert!(cancellation_token.is_cancelled());
Ok(())
}
}
Loading
Loading