Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve task termination #434

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 40 additions & 31 deletions v2/robotmk/src/bin/scheduler/sessions/schtasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use robotmk::termination::{waited, Outcome, TerminationFlag};
use anyhow::{bail, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{Duration as ChronoDuration, Local};
use log::{debug, error};
use std::fs::{read_to_string, write};
use log::{debug, error, warn};
use std::fs::{read_to_string, remove_file, write};
use std::process::Command;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use sysinfo::Pid;
use tokio::task::yield_now;
Expand All @@ -21,16 +20,16 @@ fn wait_for_task_exit(task: &TaskSpec, paths: &Paths) -> Result<RunOutcome> {
let queried = query(task.task_name, &paths.exit_code);
match waited(duration, task.termination_flag, queried) {
Outcome::Cancel => {
kill_and_delete_task(task.task_name, &paths.pid);
kill_and_delete_task(task.task_name, paths);
Ok(RunOutcome::TimedOut)
}
Outcome::Timeout => {
error!("Timeout");
kill_and_delete_task(task.task_name, &paths.pid);
kill_and_delete_task(task.task_name, paths);
Ok(RunOutcome::Terminated)
}
Outcome::Completed(Err(e)) => {
kill_and_delete_task(task.task_name, &paths.pid);
kill_and_delete_task(task.task_name, paths);
Err(e)
}
Outcome::Completed(Ok(code)) => {
Expand All @@ -50,10 +49,7 @@ pub fn run_task(task_spec: &TaskSpec) -> Result<RunOutcome> {
let paths = Paths::from(task_spec.base_path);
create_task(task_spec, &paths)
.context(format!("Failed to create task {}", task_spec.task_name))?;

debug!("Starting task {}", task_spec.task_name);
run_schtasks(["/run", "/tn", task_spec.task_name])
.context(format!("Failed to start task {}", task_spec.task_name))?;
start_task(task_spec.task_name, &paths.run_flag)?;

wait_for_task_exit(task_spec, &paths)
}
Expand All @@ -70,9 +66,10 @@ pub struct TaskSpec<'a> {
#[cfg_attr(test, derive(Debug, PartialEq))]
struct Paths {
script: Utf8PathBuf,
run_flag: Utf8PathBuf,
pid: Utf8PathBuf,
stdout: Utf8PathBuf,
stderr: Utf8PathBuf,
pid: Utf8PathBuf,
exit_code: Utf8PathBuf,
}

Expand All @@ -82,9 +79,10 @@ impl From<&Utf8Path> for Paths {
// .bat is important here, otherwise, the Windows task scheduler won't know how to
// execute this file.
script: Utf8PathBuf::from(format!("{base_path}.bat")),
run_flag: Utf8PathBuf::from(format!("{base_path}.run_flag")),
pid: Utf8PathBuf::from(format!("{base_path}.pid")),
stdout: Utf8PathBuf::from(format!("{base_path}.stdout")),
stderr: Utf8PathBuf::from(format!("{base_path}.stderr")),
pid: Utf8PathBuf::from(format!("{base_path}.pid")),
exit_code: Utf8PathBuf::from(format!("{base_path}.exit_code")),
}
}
Expand Down Expand Up @@ -133,6 +131,10 @@ fn build_task_script(command_spec: &CommandSpec, paths: &Paths) -> String {
"powershell.exe (Get-WmiObject Win32_Process -Filter ProcessId=$PID).ParentProcessId > {}",
paths.pid
),
// `schtasks.exe /end ...` seems utterly useless. Hence, we employ this run flag to signal
// our task to terminate (in addition to killing the process if we were able to read the
// PID, which is not the case if the task has just started).
format!("if not exist {} exit /b 1", paths.run_flag),
format!("{command_spec} > {} 2> {}", paths.stdout, paths.stderr),
format!("echo %errorlevel% > {}", paths.exit_code),
]
Expand Down Expand Up @@ -161,6 +163,14 @@ where
))
}

fn start_task(task_name: &str, path_run_flag: &Utf8Path) -> Result<()> {
debug!("Starting task {task_name}");
write(path_run_flag, "").context(format!("Failed to create run flag file {path_run_flag}"))?;
jherbel marked this conversation as resolved.
Show resolved Hide resolved
run_schtasks(["/run", "/tn", task_name])
.context(format!("Failed to start task {task_name}"))?;
Ok(())
}

async fn query(task_name: &str, exit_path: &Utf8Path) -> Result<i32> {
debug!("Waiting for task {} to complete", task_name);
while query_if_task_is_running(task_name)
Expand All @@ -181,15 +191,25 @@ fn query_if_task_is_running(task_name: &str) -> Result<bool> {
Ok(schtasks_stdout.contains("Running"))
}

fn kill_and_delete_task(task_name: &str, path_pid: &Utf8Path) {
// schtasks.exe /end ... terminates the batch script, but child processes will survive ...
fn kill_and_delete_task(task_name: &str, paths: &Paths) {
error!("Killing and deleting task {task_name}");
let _ = kill_task(path_pid).map_err(log_and_return_error);
kill_task(paths);
delete_task(task_name);
}

fn kill_task(path_pid: &Utf8Path) -> Result<()> {
let raw_pid = read_pid(path_pid)?;
fn kill_task(paths: &Paths) {
let _ = remove_file(&paths.run_flag)
.context(format!("Failed to remove {}", paths.run_flag))
.map_err(log_and_return_error);
let _ = kill_task_via_pid(&paths.pid).map_err(|error| {
warn!("{:?}", error);
error
});
}

fn kill_task_via_pid(path_pid: &Utf8Path) -> Result<()> {
let raw_pid = read_until_first_whitespace(path_pid)
.context(format!("Failed to read PID from {path_pid}"))?;
kill_process_tree(
&Pid::from_str(&raw_pid).context(format!("Failed to parse {} as PID", raw_pid))?,
);
Expand All @@ -212,19 +232,6 @@ fn read_until_first_whitespace(path: &Utf8Path) -> Result<String> {
.to_string())
}

fn read_pid(path: &Utf8Path) -> Result<String> {
match read_until_first_whitespace(path) {
Ok(pid) => return Ok(pid),
Err(err) => {
log_and_return_error(err.context(format!(
"Failed to read PID from {path}, will sleep 1s and try one more time"
)));
}
};
sleep(Duration::from_secs(1));
read_until_first_whitespace(path).context(format!("Failed to read PID from {path}"))
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -237,9 +244,10 @@ mod tests {
Paths::from(Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0").as_ref()),
Paths {
script: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.bat"),
run_flag: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.run_flag"),
pid: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.pid"),
stdout: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.stdout"),
stderr: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.stderr"),
pid: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.pid"),
exit_code: Utf8PathBuf::from("C:\\working\\suites\\my_suite\\123\\0.exit_code"),
}
)
Expand All @@ -261,6 +269,7 @@ mod tests {
"@echo off
powershell.exe (Get-WmiObject Win32_Process -Filter ProcessId=$PID).ParentProcessId \
> C:\\working\\suites\\my_suite\\123\\0.pid
if not exist C:\\working\\suites\\my_suite\\123\\0.run_flag exit /b 1
\"C:\\\\somewhere\\\\rcc.exe\" \"mandatory\" \"--some-flag\" \"--some-option\" \"some-value\" \
> C:\\working\\suites\\my_suite\\123\\0.stdout 2> C:\\working\\suites\\my_suite\\123\\0.stderr
echo %errorlevel% > C:\\working\\suites\\my_suite\\123\\0.exit_code"
Expand Down
Loading