Skip to content

Commit

Permalink
Rust scheduler: supervised child process execution
Browse files Browse the repository at this point in the history
CMK-1456
  • Loading branch information
jherbel committed Sep 22, 2023
1 parent 0775221 commit a2aeb89
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 0 deletions.
121 changes: 121 additions & 0 deletions v2/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions v2/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ log = "*"
quick-xml = { version = "0.30.0", features = ["serialize"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "*"
sysinfo = "*"
100 changes: 100 additions & 0 deletions v2/rust/src/child_process_supervisor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use super::termination::TerminationFlag;
use super::timeout::Timeout;
use anyhow::{bail, Context, Result};
use log::{error, warn};
use std::collections::{HashMap, HashSet};
use std::process::Command;
use std::process::ExitStatus;
use std::thread::sleep;
use std::time::Duration;
use sysinfo::{Pid, PidExt, Process, ProcessExt, System, SystemExt};

pub struct ChildProcessSupervisor<'a> {
pub command: Command,
pub timeout: u64,
pub termination_flag: &'a TerminationFlag,
}

impl ChildProcessSupervisor<'_> {
pub fn run(mut self) -> Result<ChildProcessOutcome> {
let mut child = self.command.spawn().context("Failed to spawn subprocess")?;
let timeout = Timeout::start(self.timeout);

loop {
if let Some(exit_status) = child
.try_wait()
.context(format!(
"Failed to query exit status of process {}, killing",
child.id()
))
.map_err(|err| {
kill_process_tree(child.id());
err
})?
{
return Ok(ChildProcessOutcome::Exited(exit_status));
}

if timeout.expired() {
error!("Process timed out");
kill_process_tree(child.id());
return Ok(ChildProcessOutcome::TimedOut);
}

if self.termination_flag.should_terminate() {
warn!("Terminated");
kill_process_tree(child.id());
bail!("Terminated")
}
sleep(Duration::from_millis(250))
}
}
}

pub enum ChildProcessOutcome {
Exited(ExitStatus),
TimedOut,
}

fn kill_process_tree(raw_top_pid: u32) {
let top_pid = Pid::from_u32(raw_top_pid);
let mut system = System::new_all();
system.refresh_processes();
let processes = system.processes();

if let Some(process) = processes.get(&top_pid) {
process.kill();
}

kill_all_children(&top_pid, processes);
}

fn kill_all_children<'a>(top_pid: &'a Pid, processes: &'a HashMap<Pid, Process>) {
let mut pids_in_tree = HashSet::from([top_pid]);

loop {
let current_tree_size = pids_in_tree.len();
add_and_kill_direct_children(&mut pids_in_tree, processes);
if pids_in_tree.len() == current_tree_size {
break;
}
}
}

fn add_and_kill_direct_children<'a>(
pids_in_tree: &mut HashSet<&'a Pid>,
processes: &'a HashMap<Pid, Process>,
) {
for (pid, parent_pid, process) in processes.iter().filter_map(|(pid, process)| {
process
.parent()
.map(|parent_pid| (pid, parent_pid, process))
}) {
{
if pids_in_tree.contains(&parent_pid) {
pids_in_tree.insert(pid);
process.kill();
}
}
}
}
2 changes: 2 additions & 0 deletions v2/rust/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code)]
pub mod attempt;
mod child_process_supervisor;
mod cli;
mod config;
mod environment;
Expand All @@ -8,6 +9,7 @@ pub mod parse_xml;
mod results;
mod setup;
mod termination;
mod timeout;

use anyhow::{Context, Result};
use clap::Parser;
Expand Down
34 changes: 34 additions & 0 deletions v2/rust/src/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::time::Instant;

pub struct Timeout {
start_time: Instant,
timeout: u64,
}

impl Timeout {
pub fn start(timeout: u64) -> Self {
Self {
start_time: Instant::now(),
timeout,
}
}

pub fn expired(&self) -> bool {
self.start_time.elapsed().as_secs() >= self.timeout
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_timout_expired() {
assert!(Timeout::start(0).expired())
}

#[test]
fn test_timout_not_expired() {
assert!(!Timeout::start(10).expired())
}
}

0 comments on commit a2aeb89

Please sign in to comment.