From 147e52f7b9e2c596c6c37c60c46c601ed9b3f030 Mon Sep 17 00:00:00 2001 From: John McGuigan Date: Wed, 4 Sep 2024 22:58:40 -0400 Subject: [PATCH] feat: Write stderr and stdout from Docker executor (#1) * Write stderr and stdout from Docker executor * Try to get an error code returned and clean up container --- examples/docker.rs | 5 +- src/engine/service/runner/backend/docker.rs | 125 ++++++++++++++++---- 2 files changed, 103 insertions(+), 27 deletions(-) diff --git a/examples/docker.rs b/examples/docker.rs index 3f4529b..e649014 100644 --- a/examples/docker.rs +++ b/examples/docker.rs @@ -16,11 +16,14 @@ async fn main() { .extend_executors(vec![Execution::builder() .image("ubuntu") .args(&[String::from("echo"), String::from("'hello, world!'")]) + .stdout("stdout.txt") + .stderr("stderr.txt") .try_build() .unwrap()]) .unwrap() .try_build() .unwrap(); - docker.submit(task).await; + let result = docker.submit(task).await.unwrap(); + println!("Exit code: {:?}", &result) } diff --git a/src/engine/service/runner/backend/docker.rs b/src/engine/service/runner/backend/docker.rs index 31632b0..5a7be01 100644 --- a/src/engine/service/runner/backend/docker.rs +++ b/src/engine/service/runner/backend/docker.rs @@ -1,11 +1,22 @@ //! A docker runner service. +use crate::engine::task::Execution; use crate::engine::Task; +use std::fs::File; +use std::io::Write; +use std::path::Path; + use bollard::container::Config; use bollard::container::CreateContainerOptions; +use bollard::container::LogsOptions; +use bollard::container::LogOutput; use bollard::container::StartContainerOptions; +use bollard::container::WaitContainerOptions; use bollard::errors::Error; +use futures::select; +use futures::StreamExt; +use futures::FutureExt; use random_word::Lang; /// The default runner service name. @@ -25,33 +36,95 @@ impl Docker { Ok(Self { docker }) } - /// Submits a task. - pub async fn submit(&mut self, task: Task) { + /// Submit all tasks + pub async fn submit(&mut self, task: Task) -> Result, Error> { + let mut results = Vec::new(); + for executor in task.executions() { - let name = (1..=3) - .map(|_| random_word::r#gen(Lang::En)) - .collect::>() - .join("-"); - - let options = Some(CreateContainerOptions { - name: name.clone(), - ..Default::default() - }); - - let config = Config { - image: Some(executor.image()), - cmd: Some(executor.args().into_iter().map(|s| s.as_str()).collect()), - ..Default::default() - }; - - let job = self.docker.create_container(options, config).await.unwrap(); - - println!("{job:?}"); - - self.docker - .start_container(&name, None::>) - .await - .unwrap(); + let result = self.submit_task(executor).await?; + results.push(result); + } + + Ok(results) + } + + /// Submit a single task + async fn submit_task(&self, executor: &Execution) -> Result { + let name = (1..=3) + .map(|_| random_word::r#gen(Lang::En)) + .collect::>() + .join("-"); + + let create_options = Some(CreateContainerOptions { + name: name.clone(), + ..Default::default() + }); + + let config = Config { + image: Some(executor.image()), + cmd: Some(executor.args().into_iter().map(|s| s.as_str()).collect()), + ..Default::default() + }; + + // Create docker container + let job = self.docker.create_container(create_options, config).await?; + println!("{job:?}"); + + // Start docker container + self.docker.start_container(&name, None::>).await?; + + // Setup logs + let stdout_path = executor.stdout().map(Path::new); + let stderr_path = executor.stderr().map(Path::new); + + let log_options = LogsOptions::{ + follow: true, + stdout: executor.stdout().is_some(), + stderr: executor.stderr().is_some(), + ..Default::default() + }; + + let mut stdout_file = stdout_path.map(|path| File::create(path).expect("Failed to create stdout file")); + let mut stderr_file = stderr_path.map(|path| File::create(path).expect("Failed to create stderr file")); + + let mut logs_stream = self.docker.logs(&name, Some(log_options)); + let mut wait_stream = self.docker.wait_container(&name, None::>); + + let mut exit_code = None; + + // Loop through processing the stream and any final return from the container + loop { + select! { + log_result = logs_stream.next().fuse() => match log_result { + Some(Ok(LogOutput::StdOut {message})) => { + if let Some(file) = &mut stdout_file { + file.write_all(&message).expect("Failed to write to stdout file"); + } + } + Some(Ok(LogOutput::StdErr {message})) => { + if let Some(file) = &mut stderr_file { + file.write_all(&message).expect("Failed to write to stdout file"); + } + } + Some(Ok(_)) => {} + Some(Err(e)) => eprintln!("Error reading log: {:?}", e), + None => break // stream ended + }, + wait_result = wait_stream.next().fuse() => match wait_result { + Some(Ok(wait_response)) => { + exit_code = Some(wait_response.status_code); + break; + } + Some(Err(e)) => return Err(e), + None => break, // This should not happen under normal circumstances + } + + } } + + // Cleanup + self.docker.remove_container(&name, None).await?; + + Ok(exit_code.unwrap_or(-1)) } }