Skip to content

Commit

Permalink
Working input
Browse files Browse the repository at this point in the history
  • Loading branch information
jrm5100 committed Sep 6, 2024
1 parent c2d12ff commit 25220a1
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 26 deletions.
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ random_word = { version = "0.4.3", features = ["en"] }
reqwest = "0.12.7"
serde = { version = "1.0.209", features = ["derive"] }
serde_json = "1.0.128"
tar = "0.4.41"
tempfile = "3.12.0"
tes-client = { path = "./tes-client", version = "0.1.0" }
tokio = { version = "1.40.0", features = ["full", "time"] }
Expand Down
42 changes: 34 additions & 8 deletions examples/docker.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
//! An example for runner a task using the Docker backend service.
use crankshaft::engine::task::input;
use crankshaft::engine::task::Execution;
use crankshaft::engine::task::Input;
use crankshaft::engine::Engine;
use crankshaft::engine::Task;
use std::io::Write;
use tempfile::NamedTempFile;
use tracing_subscriber::fmt;
use tracing_subscriber::layer::SubscriberExt as _;
use tracing_subscriber::util::SubscriberInitExt as _;
Expand All @@ -17,18 +20,41 @@ async fn main() {

let mut engine = Engine::default();

let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "Hello, world from an input").unwrap();

// Get the path to the temp file
let temp_path = temp_file.path().to_path_buf();
let input = Input::builder()
.contents(temp_path)
.path("/volA/test_input.txt")
.r#type(input::Type::File)
.try_build()
.unwrap();

let task = Task::builder()
.name("my-example-task")
.unwrap()
.description("a longer description")
.unwrap()
.extend_executors(vec![Execution::builder()
.image("ubuntu")
.args(&[String::from("echo"), String::from("'hello, world!'")])
.stdout("stdout.txt")
.stderr("stderr.txt")
.try_build()
.unwrap()])
.extend_inputs(vec![input])
.unwrap()
.extend_executors(vec![
Execution::builder()
.image("ubuntu")
.args(&[
String::from("bash"),
String::from("-c"),
String::from("ls /volA"),
])
.try_build()
.unwrap(),
Execution::builder()
.image("ubuntu")
.args(&[String::from("cat"), String::from("/volA/test_input.txt")])
.try_build()
.unwrap(),
])
.unwrap()
.extend_volumes(vec!["/volA".to_string(), "/volB".to_string()])
.unwrap()
Expand Down
66 changes: 48 additions & 18 deletions src/engine/service/runner/backend/docker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! A docker runner service.
use std::io::Cursor;
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -9,6 +10,7 @@ use bollard::container::CreateContainerOptions;
use bollard::container::LogOutput;
use bollard::container::RemoveContainerOptions;
use bollard::container::StartContainerOptions;
use bollard::container::UploadToContainerOptions;
use bollard::errors::Error;
use bollard::exec::CreateExecOptions;
use bollard::exec::StartExecResults;
Expand All @@ -20,14 +22,14 @@ use futures::FutureExt;
use futures::TryStreamExt;
use nonempty::NonEmpty;
use random_word::Lang;
use tempfile::TempDir;
use tmp_mount::TmpMount;
use tokio::sync::oneshot::Sender;

use crate::engine::service::runner::backend::Backend;
use crate::engine::service::runner::backend::ExecutionResult;
use crate::engine::service::runner::backend::Reply;
use crate::engine::task::Execution;
use crate::engine::task::Input;
use crate::engine::task::Resources;
use crate::engine::Task;

Expand Down Expand Up @@ -83,24 +85,20 @@ impl Backend for Runner {
for execution in task.executions() {
let name = random_name();

// Create a temporary working directory
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().to_str().unwrap();

// Create the container
container_create(
&name,
execution,
task.resources(),
&mut client,
temp_path,
&mounts[..],
)
.await;
container_create(&name, execution, task.resources(), &mut client, &mounts[..])
.await;

// Start the container
container_start(&name, &mut client).await;

// Insert inputs
if let Some(inputs) = task.inputs() {
for input in inputs {
insert_input(&name, &mut client, input).await;
}
};

// Run a command
let exec_result = container_exec(&name, execution, &mut client).await;

Expand Down Expand Up @@ -153,13 +151,10 @@ async fn container_create(
execution: &Execution,
resources: Option<&Resources>,
client: &mut Arc<Docker>,
temp_workdir: &str,
mounts: &[Mount],
) {
// Configure Docker to use all mounts
let container_workdir: &str = execution.workdir().map(String::as_str).unwrap_or(WORKDIR);
let host_config = HostConfig {
binds: Some(vec![format!("{}:{}", temp_workdir, container_workdir)]),
mounts: Some(mounts.to_vec()),
..resources.map(HostConfig::from).unwrap_or_default()
};
Expand All @@ -173,7 +168,7 @@ async fn container_create(
image: Some(execution.image()),
tty: Some(true),
host_config: Some(host_config),
working_dir: execution.workdir().map(String::as_str).or(Some(WORKDIR)),
working_dir: execution.workdir().map(String::as_str),
..Default::default()
};

Expand All @@ -188,6 +183,41 @@ async fn container_start(name: &str, client: &mut Arc<Docker>) {
.unwrap();
}

/// Puts input files into the container
async fn insert_input(name: &str, client: &mut Arc<Docker>, input: &Input) {
let mut tar = tar::Builder::new(Vec::new());

let content = input.fetch().await.unwrap();

let tar_path = input.path().trim_start_matches('/');

// Create a header with the full path
let mut header = tar::Header::new_gnu();
header.set_path(tar_path).unwrap();
header.set_size(content.len() as u64);
header.set_mode(0o644); // Set appropriate permissions
header.set_cksum();

// Append the file to the tar archive
tar.append_data(&mut header, tar_path, Cursor::new(content))
.unwrap();

let tar_contents = tar.into_inner().unwrap();

// Upload to the root of the container
client
.upload_to_container(
name,
Some(UploadToContainerOptions {
path: "/",
..Default::default()
}),
tar_contents.into(),
)
.await
.unwrap();
}

/// Execute a command in container, returning an ExecutionResult
async fn container_exec(
name: &str,
Expand Down
34 changes: 34 additions & 0 deletions src/engine/task/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
mod builder;

use std::path::PathBuf;

pub use builder::Builder;

use tokio::{fs::File, io::AsyncReadExt};
use url::Url;

/// A type of input.
Expand All @@ -26,6 +29,13 @@ pub enum Contents {
Literal(String),
}

impl From<PathBuf> for Contents {
fn from(value: PathBuf) -> Self {
let url = Url::from_file_path(value).unwrap_or_else(|_| panic!("Invalid path"));
Contents::URL(url)
}
}

/// An input to a task.
#[derive(Clone, Debug)]
pub struct Input {
Expand All @@ -46,6 +56,11 @@ pub struct Input {
}

impl Input {
/// Gets a new builder for an [`Input`].
pub fn builder() -> Builder {
Builder::default()
}

/// The name of the input (if it exists).
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
Expand All @@ -70,4 +85,23 @@ impl Input {
pub fn r#type(&self) -> &Type {
&self.r#type
}

/// Fetch file contents
pub async fn fetch(&self) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
match &self.contents {
Contents::Literal(content) => Ok(content.as_bytes().to_vec()),
Contents::URL(url) => match url.scheme() {
"file" => {
let path = url.to_file_path().map_err(|_| "Invalid file path")?;
let mut file = File::open(path).await?;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
Ok(contents)
}
"http" | "https" => todo!("HTTP(S) URL support not implemented"),
"s3" => todo!("S3 URL support not implemented"),
_ => Err("Unsupported URL scheme".into()),
},
}
}
}

0 comments on commit 25220a1

Please sign in to comment.