Skip to content

Commit

Permalink
Improve task termination
Browse files Browse the repository at this point in the history
In addition to attempting to kill the process via its PID, we create a
flag file. Before starting the actual test run, the task checks if the
flag file still exists. If not, the task terminates.

This provides us with an additional mechanism for shutting down the task
in its early stage when the PID might not yet have been written to file.
  • Loading branch information
jherbel committed Nov 10, 2023
1 parent 2649b40 commit 0fcfc4e
Showing 1 changed file with 41 additions and 32 deletions.
73 changes: 41 additions & 32 deletions v2/robotmk/src/bin/scheduler/sessions/schtasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ use async_std::{future::timeout, task::sleep as async_sleep};
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{Duration as ChronoDuration, Local};
use futures::executor;
use log::{debug, error};
use std::fs::{read_to_string, write};
use log::{debug, error, warn};
use std::fs::{read_to_string, remove_file, write};
use std::process::Command;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use sysinfo::Pid;

Expand All @@ -26,22 +25,19 @@ pub fn run_task(task_spec: &TaskSpec) -> Result<RunOutcome> {
let paths = Paths::from(task_spec.base_path);
create_task(task_spec, &paths)
.context(format!("Failed to create task {}", task_spec.task_name))?;

debug!("Starting task {}", task_spec.task_name);
run_schtasks(["/run", "/tn", task_spec.task_name])
.context(format!("Failed to start task {}", task_spec.task_name))?;
start_task(task_spec.task_name, &paths.run_flag)?;

if let Some(run_outcome) = match executor::block_on(timeout(
Duration::from_secs(task_spec.timeout),
wait_for_task_exit(task_spec.task_name, task_spec.termination_flag, &paths.pid),
wait_for_task_exit(task_spec.task_name, task_spec.termination_flag, &paths),
)) {
Ok(task_wait_result) => task_wait_result.map_err(|err| {
kill_and_delete_task(task_spec.task_name, &paths.pid);
kill_and_delete_task(task_spec.task_name, &paths);
err
})?,
_ => {
error!("Timed out");
kill_and_delete_task(task_spec.task_name, &paths.pid);
kill_and_delete_task(task_spec.task_name, &paths);
return Ok(RunOutcome::TimedOut);
}
} {
Expand Down Expand Up @@ -71,9 +67,10 @@ pub struct TaskSpec<'a> {
#[cfg_attr(test, derive(Debug, PartialEq))]
struct Paths {
script: Utf8PathBuf,
run_flag: Utf8PathBuf,
pid: Utf8PathBuf,
stdout: Utf8PathBuf,
stderr: Utf8PathBuf,
pid: Utf8PathBuf,
exit_code: Utf8PathBuf,
}

Expand All @@ -87,6 +84,7 @@ impl From<&Utf8Path> for Paths {
stderr: Utf8PathBuf::from(format!("{base_path}.stderr")),
pid: Utf8PathBuf::from(format!("{base_path}.pid")),
exit_code: Utf8PathBuf::from(format!("{base_path}.exit_code")),
run_flag: Utf8PathBuf::from(format!("{base_path}.run_flag")),
}
}
}
Expand Down Expand Up @@ -134,6 +132,10 @@ fn build_task_script(command_spec: &CommandSpec, paths: &Paths) -> String {
"powershell.exe (Get-WmiObject Win32_Process -Filter ProcessId=$PID).ParentProcessId > {}",
paths.pid
),
// `schtasks.exe /end ...` seems utterly useless. Hence, we employ this run flag to signal
// our task to terminate (in addition to killing the process if we were able to read the
// PID, which is not the case if the task has just started).
format!("if not exist {} exit /b 1", paths.run_flag),
format!("{command_spec} > {} 2> {}", paths.stdout, paths.stderr),
format!("echo %errorlevel% > {}", paths.exit_code),
]
Expand Down Expand Up @@ -162,17 +164,25 @@ where
))
}

fn start_task(task_name: &str, path_run_flag: &Utf8Path) -> Result<()> {
debug!("Starting task {task_name}");
write(path_run_flag, "").context(format!("Failed to create run flag file {path_run_flag}"))?;
run_schtasks(["/run", "/tn", task_name])
.context(format!("Failed to start task {task_name}"))?;
Ok(())
}

async fn wait_for_task_exit(
task_name: &str,
termination_flag: &TerminationFlag,
path_pid: &Utf8Path,
paths: &Paths,
) -> Result<Option<RunOutcome>> {
debug!("Waiting for task {} to complete", task_name);
while query_if_task_is_running(task_name)
.context(format!("Failed to query if task {task_name} is running"))?
{
if termination_flag.should_terminate() {
kill_and_delete_task(task_name, path_pid);
kill_and_delete_task(task_name, paths);
return Ok(Some(RunOutcome::Terminated));
}
async_sleep(Duration::from_millis(250)).await
Expand All @@ -185,15 +195,25 @@ fn query_if_task_is_running(task_name: &str) -> Result<bool> {
Ok(schtasks_stdout.contains("Running"))
}

fn kill_and_delete_task(task_name: &str, path_pid: &Utf8Path) {
// schtasks.exe /end ... terminates the batch script, but child processes will survive ...
fn kill_and_delete_task(task_name: &str, paths: &Paths) {
error!("Killing and deleting task {task_name}");
let _ = kill_task(path_pid).map_err(log_and_return_error);
kill_task(paths);
delete_task(task_name);
}

fn kill_task(path_pid: &Utf8Path) -> Result<()> {
let raw_pid = read_pid(path_pid)?;
fn kill_task(paths: &Paths) {
let _ = remove_file(&paths.run_flag)
.context(format!("Failed to remove {}", paths.run_flag))
.map_err(log_and_return_error);
let _ = kill_task_via_pid(&paths.pid).map_err(|error| {
warn!("{:?}", error);
error
});
}

fn kill_task_via_pid(path_pid: &Utf8Path) -> Result<()> {
let raw_pid = read_until_first_whitespace(path_pid)
.context(format!("Failed to read PID from {path_pid}"))?;
kill_process_tree(
&Pid::from_str(&raw_pid).context(format!("Failed to parse {} as PID", raw_pid))?,
);
Expand All @@ -216,19 +236,6 @@ fn read_until_first_whitespace(path: &Utf8Path) -> Result<String> {
.to_string())
}

fn read_pid(path: &Utf8Path) -> Result<String> {
match read_until_first_whitespace(path) {
Ok(pid) => return Ok(pid),
Err(err) => {
log_and_return_error(err.context(format!(
"Failed to read PID from {path}, will sleep 1s and try one more time"
)));
}
};
sleep(Duration::from_secs(1));
read_until_first_whitespace(path).context(format!("Failed to read PID from {path}"))
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -241,9 +248,10 @@ mod tests {
Paths::from(Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0").as_ref()),
Paths {
script: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.bat"),
run_flag: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.run_flag"),
pid: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.pid"),
stdout: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.stdout"),
stderr: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.stderr"),
pid: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.pid"),
exit_code: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.exit_code"),
}
)
Expand All @@ -265,6 +273,7 @@ mod tests {
"@echo off
powershell.exe (Get-WmiObject Win32_Process -Filter ProcessId=$PID).ParentProcessId \
> C:\\working\\suites\\my_suite\\123\\0.pid
if not exist C:\\working\\suites\\my_suite\\123\\0.run_flag exit /b 1
\"C:\\\\somewhere\\\\rcc.exe\" \"mandatory\" \"--some-flag\" \"--some-option\" \"some-value\" \
> C:\\working\\suites\\my_suite\\123\\0.stdout 2> C:\\working\\suites\\my_suite\\123\\0.stderr
echo %errorlevel% > C:\\working\\suites\\my_suite\\123\\0.exit_code"
Expand Down

0 comments on commit 0fcfc4e

Please sign in to comment.