From 258f1351f8d9f256332edd4ac972474a66a27868 Mon Sep 17 00:00:00 2001 From: Solomon Jacobs Date: Thu, 9 Nov 2023 10:14:03 +0100 Subject: [PATCH] VI: remove async-std: `wait_for_task_exit` to tokio We want to remove async-std, since it is no longer maintained. --- .../src/bin/scheduler/sessions/schtasks.rs | 76 +++++++++---------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/v2/robotmk/src/bin/scheduler/sessions/schtasks.rs b/v2/robotmk/src/bin/scheduler/sessions/schtasks.rs index 637a47fc..028ff4af 100644 --- a/v2/robotmk/src/bin/scheduler/sessions/schtasks.rs +++ b/v2/robotmk/src/bin/scheduler/sessions/schtasks.rs @@ -2,13 +2,11 @@ use super::session::RunOutcome; use crate::command_spec::CommandSpec; use crate::logging::log_and_return_error; use crate::termination::kill_process_tree; -use robotmk::termination::TerminationFlag; +use robotmk::termination::{waited, Outcome, TerminationFlag}; use anyhow::{bail, Context, Result}; -use async_std::{future::timeout, task::sleep as async_sleep}; use camino::{Utf8Path, Utf8PathBuf}; use chrono::{Duration as ChronoDuration, Local}; -use futures::executor; use log::{debug, error}; use std::fs::{read_to_string, write}; use std::process::Command; @@ -16,6 +14,32 @@ use std::str::FromStr; use std::thread::sleep; use std::time::Duration; use sysinfo::Pid; +use tokio::task::yield_now; + +fn wait_for_task_exit(task: &TaskSpec, paths: &Paths) -> Result { + let duration = Duration::from_secs(task.timeout); + let queried = query(task.task_name, &paths.exit_code); + match waited(duration, task.termination_flag, queried) { + Outcome::Cancel => { + kill_and_delete_task(task.task_name, &paths.pid); + Ok(RunOutcome::TimedOut) + } + Outcome::Timeout => { + error!("Timeout"); + kill_and_delete_task(task.task_name, &paths.pid); + Ok(RunOutcome::Terminated) + } + Outcome::Completed(Err(e)) => { + kill_and_delete_task(task.task_name, &paths.pid); + Err(e) + } + Outcome::Completed(Ok(code)) => { + debug!("Task {} completed", task.task_name); + delete_task(task.task_name); + Ok(RunOutcome::Exited(Some(code))) + } + } +} pub fn run_task(task_spec: &TaskSpec) -> Result { debug!( @@ -31,32 +55,7 @@ pub fn run_task(task_spec: &TaskSpec) -> Result { run_schtasks(["/run", "/tn", task_spec.task_name]) .context(format!("Failed to start task {}", task_spec.task_name))?; - if let Some(run_outcome) = match executor::block_on(timeout( - Duration::from_secs(task_spec.timeout), - wait_for_task_exit(task_spec.task_name, task_spec.termination_flag, &paths.pid), - )) { - Ok(task_wait_result) => task_wait_result.map_err(|err| { - kill_and_delete_task(task_spec.task_name, &paths.pid); - err - })?, - _ => { - error!("Timed out"); - kill_and_delete_task(task_spec.task_name, &paths.pid); - return Ok(RunOutcome::TimedOut); - } - } { - return Ok(run_outcome); - }; - debug!("Task {} completed", task_spec.task_name); - - delete_task(task_spec.task_name); - - let raw_exit_code = read_until_first_whitespace(&paths.exit_code)?; - Ok(RunOutcome::Exited(Some( - raw_exit_code - .parse::() - .context(format!("Failed to parse {} as i32", raw_exit_code))?, - ))) + wait_for_task_exit(task_spec, &paths) } pub struct TaskSpec<'a> { @@ -162,22 +161,19 @@ where )) } -async fn wait_for_task_exit( - task_name: &str, - termination_flag: &TerminationFlag, - path_pid: &Utf8Path, -) -> Result> { +async fn query(task_name: &str, exit_path: &Utf8Path) -> Result { debug!("Waiting for task {} to complete", task_name); while query_if_task_is_running(task_name) .context(format!("Failed to query if task {task_name} is running"))? { - if termination_flag.should_terminate() { - kill_and_delete_task(task_name, path_pid); - return Ok(Some(RunOutcome::Terminated)); - } - async_sleep(Duration::from_millis(250)).await + yield_now().await } - Ok(None) + + let raw_exit_code = read_until_first_whitespace(exit_path)?; + let exit_code: i32 = raw_exit_code + .parse() + .context(format!("Failed to parse {} as i32", raw_exit_code))?; + Ok(exit_code) } fn query_if_task_is_running(task_name: &str) -> Result {