Skip to content

Commit

Permalink
VI: remove async-std: wait_for_task_exit to tokio
Browse files Browse the repository at this point in the history
We want to remove async-std, since it is no longer maintained.
  • Loading branch information
SoloJacobs committed Nov 9, 2023
1 parent d45785a commit 3b1be16
Showing 1 changed file with 35 additions and 40 deletions.
75 changes: 35 additions & 40 deletions v2/robotmk/src/bin/scheduler/sessions/schtasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ use super::session::RunOutcome;
use crate::command_spec::CommandSpec;
use crate::logging::log_and_return_error;
use crate::termination::kill_process_tree;
use robotmk::termination::TerminationFlag;
use robotmk::termination::{waited, Outcome, TerminationFlag};

use anyhow::{bail, Context, Result};
use async_std::{future::timeout, task::sleep as async_sleep};
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{Duration as ChronoDuration, Local};
use futures::executor;
use log::{debug, error};
use std::fs::{read_to_string, write};
use std::process::Command;
Expand All @@ -17,6 +15,31 @@ use std::thread::sleep;
use std::time::Duration;
use sysinfo::Pid;

fn wait_for_task_exit(task: &TaskSpec, paths: &Paths) -> Result<RunOutcome> {
let duration = Duration::from_secs(task.timeout);
let queried = query(task.task_name, &paths.exit_code);
match waited(duration, task.termination_flag, queried) {
Outcome::Cancel => {
kill_and_delete_task(task.task_name, &paths.pid);
Ok(RunOutcome::TimedOut)
}
Outcome::Timeout => {
error!("Timeout");
kill_and_delete_task(task.task_name, &paths.pid);
Ok(RunOutcome::Terminated)
}
Outcome::Completed(Err(e)) => {
kill_and_delete_task(task.task_name, &paths.pid);
Err(e)
}
Outcome::Completed(Ok(code)) => {
debug!("Task {} completed", task.task_name);
delete_task(task.task_name);
Ok(RunOutcome::Exited(Some(code)))
}
}
}

pub fn run_task(task_spec: &TaskSpec) -> Result<RunOutcome> {
debug!(
"Running the following command as task {} for user {}:\n{}\n\nBase path: {}",
Expand All @@ -31,32 +54,7 @@ pub fn run_task(task_spec: &TaskSpec) -> Result<RunOutcome> {
run_schtasks(["/run", "/tn", task_spec.task_name])
.context(format!("Failed to start task {}", task_spec.task_name))?;

if let Some(run_outcome) = match executor::block_on(timeout(
Duration::from_secs(task_spec.timeout),
wait_for_task_exit(task_spec.task_name, task_spec.termination_flag, &paths.pid),
)) {
Ok(task_wait_result) => task_wait_result.map_err(|err| {
kill_and_delete_task(task_spec.task_name, &paths.pid);
err
})?,
_ => {
error!("Timed out");
kill_and_delete_task(task_spec.task_name, &paths.pid);
return Ok(RunOutcome::TimedOut);
}
} {
return Ok(run_outcome);
};
debug!("Task {} completed", task_spec.task_name);

delete_task(task_spec.task_name);

let raw_exit_code = read_until_first_whitespace(&paths.exit_code)?;
Ok(RunOutcome::Exited(Some(
raw_exit_code
.parse::<i32>()
.context(format!("Failed to parse {} as i32", raw_exit_code))?,
)))
wait_for_task_exit(task_spec, &paths)
}

pub struct TaskSpec<'a> {
Expand Down Expand Up @@ -162,22 +160,19 @@ where
))
}

async fn wait_for_task_exit(
task_name: &str,
termination_flag: &TerminationFlag,
path_pid: &Utf8Path,
) -> Result<Option<RunOutcome>> {
async fn query(task_name: &str, exit_path: &Utf8Path) -> Result<i32> {
debug!("Waiting for task {} to complete", task_name);
while query_if_task_is_running(task_name)
.context(format!("Failed to query if task {task_name} is running"))?
{
if termination_flag.should_terminate() {
kill_and_delete_task(task_name, path_pid);
return Ok(Some(RunOutcome::Terminated));
}
async_sleep(Duration::from_millis(250)).await
std::hint::spin_loop();
}
Ok(None)

let raw_exit_code = read_until_first_whitespace(exit_path)?;
let exit_code: i32 = raw_exit_code
.parse()
.context(format!("Failed to parse {} as i32", raw_exit_code))?;
Ok(exit_code)
}

fn query_if_task_is_running(task_name: &str) -> Result<bool> {
Expand Down

0 comments on commit 3b1be16

Please sign in to comment.