Skip to content

Commit

Permalink
Improve task termination
Browse files Browse the repository at this point in the history
In addition to attempting to kill the process via its PID, we create a
flag file. Before starting the actual test run, the task checks if the
flag file still exists. If not, the task terminates.

This provides us with an additional mechanism for shutting down the task
in its early stage when the PID might not yet have been written to file.
  • Loading branch information
jherbel committed Nov 14, 2023
1 parent 548e2b8 commit 802f812
Showing 1 changed file with 40 additions and 31 deletions.
71 changes: 40 additions & 31 deletions v2/robotmk/src/bin/scheduler/sessions/schtasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use robotmk::termination::{waited, Outcome, TerminationFlag};
use anyhow::{bail, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{Duration as ChronoDuration, Local};
use log::{debug, error};
use std::fs::{read_to_string, write};
use log::{debug, error, warn};
use std::fs::{read_to_string, remove_file, write};
use std::process::Command;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use sysinfo::Pid;
use tokio::task::yield_now;
Expand All @@ -21,16 +20,16 @@ fn wait_for_task_exit(task: &TaskSpec, paths: &Paths) -> Result<RunOutcome> {
let queried = query(task.task_name, &paths.exit_code);
match waited(duration, task.termination_flag, queried) {
Outcome::Cancel => {
kill_and_delete_task(task.task_name, &paths.pid);
kill_and_delete_task(task.task_name, paths);
Ok(RunOutcome::TimedOut)
}
Outcome::Timeout => {
error!("Timeout");
kill_and_delete_task(task.task_name, &paths.pid);
kill_and_delete_task(task.task_name, paths);
Ok(RunOutcome::Terminated)
}
Outcome::Completed(Err(e)) => {
kill_and_delete_task(task.task_name, &paths.pid);
kill_and_delete_task(task.task_name, paths);
Err(e)
}
Outcome::Completed(Ok(code)) => {
Expand All @@ -50,10 +49,7 @@ pub fn run_task(task_spec: &TaskSpec) -> Result<RunOutcome> {
let paths = Paths::from(task_spec.base_path);
create_task(task_spec, &paths)
.context(format!("Failed to create task {}", task_spec.task_name))?;

debug!("Starting task {}", task_spec.task_name);
run_schtasks(["/run", "/tn", task_spec.task_name])
.context(format!("Failed to start task {}", task_spec.task_name))?;
start_task(task_spec.task_name, &paths.run_flag)?;

wait_for_task_exit(task_spec, &paths)
}
Expand All @@ -70,9 +66,10 @@ pub struct TaskSpec<'a> {
#[cfg_attr(test, derive(Debug, PartialEq))]
struct Paths {
script: Utf8PathBuf,
run_flag: Utf8PathBuf,
pid: Utf8PathBuf,
stdout: Utf8PathBuf,
stderr: Utf8PathBuf,
pid: Utf8PathBuf,
exit_code: Utf8PathBuf,
}

Expand All @@ -82,9 +79,10 @@ impl From<&Utf8Path> for Paths {
// .bat is important here, otherwise, the Windows task scheduler won't know how to
// execute this file.
script: Utf8PathBuf::from(format!("{base_path}.bat")),
run_flag: Utf8PathBuf::from(format!("{base_path}.run_flag")),
pid: Utf8PathBuf::from(format!("{base_path}.pid")),
stdout: Utf8PathBuf::from(format!("{base_path}.stdout")),
stderr: Utf8PathBuf::from(format!("{base_path}.stderr")),
pid: Utf8PathBuf::from(format!("{base_path}.pid")),
exit_code: Utf8PathBuf::from(format!("{base_path}.exit_code")),
}
}
Expand Down Expand Up @@ -133,6 +131,10 @@ fn build_task_script(command_spec: &CommandSpec, paths: &Paths) -> String {
"powershell.exe (Get-WmiObject Win32_Process -Filter ProcessId=$PID).ParentProcessId > {}",
paths.pid
),
// `schtasks.exe /end ...` seems utterly useless. Hence, we employ this run flag to signal
// our task to terminate (in addition to killing the process if we were able to read the
// PID, which is not the case if the task has just started).
format!("if not exist {} exit /b 1", paths.run_flag),
format!("{command_spec} > {} 2> {}", paths.stdout, paths.stderr),
format!("echo %errorlevel% > {}", paths.exit_code),
]
Expand Down Expand Up @@ -161,6 +163,14 @@ where
))
}

fn start_task(task_name: &str, path_run_flag: &Utf8Path) -> Result<()> {
debug!("Starting task {task_name}");
write(path_run_flag, "").context(format!("Failed to create run flag file {path_run_flag}"))?;
run_schtasks(["/run", "/tn", task_name])
.context(format!("Failed to start task {task_name}"))?;
Ok(())
}

async fn query(task_name: &str, exit_path: &Utf8Path) -> Result<i32> {
debug!("Waiting for task {} to complete", task_name);
while query_if_task_is_running(task_name)
Expand All @@ -181,15 +191,25 @@ fn query_if_task_is_running(task_name: &str) -> Result<bool> {
Ok(schtasks_stdout.contains("Running"))
}

fn kill_and_delete_task(task_name: &str, path_pid: &Utf8Path) {
// schtasks.exe /end ... terminates the batch script, but child processes will survive ...
fn kill_and_delete_task(task_name: &str, paths: &Paths) {
error!("Killing and deleting task {task_name}");
let _ = kill_task(path_pid).map_err(log_and_return_error);
kill_task(paths);
delete_task(task_name);
}

fn kill_task(path_pid: &Utf8Path) -> Result<()> {
let raw_pid = read_pid(path_pid)?;
fn kill_task(paths: &Paths) {
let _ = remove_file(&paths.run_flag)
.context(format!("Failed to remove {}", paths.run_flag))
.map_err(log_and_return_error);
let _ = kill_task_via_pid(&paths.pid).map_err(|error| {
warn!("{:?}", error);
error
});
}

fn kill_task_via_pid(path_pid: &Utf8Path) -> Result<()> {
let raw_pid = read_until_first_whitespace(path_pid)
.context(format!("Failed to read PID from {path_pid}"))?;
kill_process_tree(
&Pid::from_str(&raw_pid).context(format!("Failed to parse {} as PID", raw_pid))?,
);
Expand All @@ -212,19 +232,6 @@ fn read_until_first_whitespace(path: &Utf8Path) -> Result<String> {
.to_string())
}

fn read_pid(path: &Utf8Path) -> Result<String> {
match read_until_first_whitespace(path) {
Ok(pid) => return Ok(pid),
Err(err) => {
log_and_return_error(err.context(format!(
"Failed to read PID from {path}, will sleep 1s and try one more time"
)));
}
};
sleep(Duration::from_secs(1));
read_until_first_whitespace(path).context(format!("Failed to read PID from {path}"))
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -237,9 +244,10 @@ mod tests {
Paths::from(Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0").as_ref()),
Paths {
script: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.bat"),
run_flag: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.run_flag"),
pid: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.pid"),
stdout: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.stdout"),
stderr: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.stderr"),
pid: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.pid"),
exit_code: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.exit_code"),
}
)
Expand All @@ -261,6 +269,7 @@ mod tests {
"@echo off
powershell.exe (Get-WmiObject Win32_Process -Filter ProcessId=$PID).ParentProcessId \
> C:\\working\\suites\\my_suite\\123\\0.pid
if not exist C:\\working\\suites\\my_suite\\123\\0.run_flag exit /b 1
\"C:\\\\somewhere\\\\rcc.exe\" \"mandatory\" \"--some-flag\" \"--some-option\" \"some-value\" \
> C:\\working\\suites\\my_suite\\123\\0.stdout 2> C:\\working\\suites\\my_suite\\123\\0.stderr
echo %errorlevel% > C:\\working\\suites\\my_suite\\123\\0.exit_code"
Expand Down

0 comments on commit 802f812

Please sign in to comment.