From dad6a9bafbf2666ec9b74093f993272b082dab5f Mon Sep 17 00:00:00 2001 From: Clay McLeod Date: Fri, 6 Sep 2024 10:21:47 -0500 Subject: [PATCH 01/16] revise: removes results from `task::Builder` --- crankshaft/examples/docker.rs | 6 +- crankshaft/examples/lsf.rs | 5 +- crankshaft/examples/multi.rs | 42 ++++++++++ crankshaft/examples/tes.rs | 6 +- .../engine/service/runner/backend/config.rs | 1 - crankshaft/src/engine/task/builder.rs | 79 +++++++++---------- 6 files changed, 84 insertions(+), 55 deletions(-) create mode 100644 crankshaft/examples/multi.rs diff --git a/crankshaft/examples/docker.rs b/crankshaft/examples/docker.rs index 03c4d52..6ed240f 100644 --- a/crankshaft/examples/docker.rs +++ b/crankshaft/examples/docker.rs @@ -19,19 +19,15 @@ async fn main() { let task = Task::builder() .name("my-example-task") - .unwrap() .description("a longer description") - .unwrap() - .extend_executors(vec![Execution::builder() + .extend_executions(vec![Execution::builder() .image("ubuntu") .args(&[String::from("echo"), String::from("'hello, world!'")]) .stdout("stdout.txt") .stderr("stderr.txt") .try_build() .unwrap()]) - .unwrap() .extend_volumes(vec!["/volA".to_string(), "/volB".to_string()]) - .unwrap() .try_build() .unwrap(); diff --git a/crankshaft/examples/lsf.rs b/crankshaft/examples/lsf.rs index 7888a0f..d0592cf 100644 --- a/crankshaft/examples/lsf.rs +++ b/crankshaft/examples/lsf.rs @@ -32,10 +32,8 @@ async fn main() { let task = Task::builder() .name("my-example-task") - .unwrap() .description("a longer description") - .unwrap() - .extend_executors(vec![Execution::builder() + .extend_executions(vec![Execution::builder() .working_directory(".") .image("ubuntu") .args(&[String::from("echo"), String::from("'hello, world!'")]) @@ -43,7 +41,6 @@ async fn main() { .stderr("stderr.txt") .try_build() .unwrap()]) - .unwrap() .try_build() .unwrap(); diff --git a/crankshaft/examples/multi.rs b/crankshaft/examples/multi.rs new file mode 100644 index 0000000..c8b339a --- /dev/null +++ b/crankshaft/examples/multi.rs @@ -0,0 +1,42 @@ +//! An example for runner a task using multiple backend services. + +use crankshaft::engine::task::Execution; +use crankshaft::engine::Engine; +use crankshaft::engine::Task; +use tracing_subscriber::fmt; +use tracing_subscriber::layer::SubscriberExt as _; +use tracing_subscriber::util::SubscriberInitExt as _; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let mut engine = Engine::with_default_tes(); + + let task = Task::builder() + .name("my-example-task") + .description("a longer description") + .extend_executions(vec![Execution::builder() + .image("ubuntu") + .args(&[String::from("echo"), String::from("'hello, world!'")]) + .stdout("stdout.txt") + .stderr("stderr.txt") + .try_build() + .unwrap()]) + .try_build() + .unwrap(); + + let receivers = (0..10) + .map(|_| engine.submit(task.clone()).callback) + .collect::>(); + + engine.run().await; + + for rx in receivers { + println!("Reply: {:?}", rx.await.unwrap()); + } +} diff --git a/crankshaft/examples/tes.rs b/crankshaft/examples/tes.rs index e5f01e9..e3d26f6 100644 --- a/crankshaft/examples/tes.rs +++ b/crankshaft/examples/tes.rs @@ -16,21 +16,17 @@ async fn main() { .init(); let mut engine = Engine::with_default_tes(); - dbg!(&engine); let task = Task::builder() .name("my-example-task") - .unwrap() .description("a longer description") - .unwrap() - .extend_executors(vec![Execution::builder() + .extend_executions(vec![Execution::builder() .image("ubuntu") .args(&[String::from("echo"), String::from("'hello, world!'")]) .stdout("stdout.txt") .stderr("stderr.txt") .try_build() .unwrap()]) - .unwrap() .try_build() .unwrap(); diff --git a/crankshaft/src/engine/service/runner/backend/config.rs b/crankshaft/src/engine/service/runner/backend/config.rs index eb7f742..4e02573 100644 --- a/crankshaft/src/engine/service/runner/backend/config.rs +++ b/crankshaft/src/engine/service/runner/backend/config.rs @@ -18,7 +18,6 @@ pub(crate) fn substitute_placeholders(s: &str, substitutions: &HashMap>(mut self, name: S) -> Result { - let name = name.into(); - - if self.name.is_some() { - return Err(Error::Multiple("name")); - } - - self.name = Some(name); - Ok(self) + /// Adds a name to the [`Builder`]. + /// + /// # Notes + /// + /// This will silently overwrite any previous name declarations provided to + /// the builder. + pub fn name>(mut self, name: S) -> Self { + self.name = Some(name.into()); + self } - /// Attempts to add a description to the [`Builder`]. - pub fn description>(mut self, description: S) -> Result { - let description = description.into(); - - if self.description.is_some() { - return Err(Error::Multiple("description")); - } - - self.description = Some(description); - Ok(self) + /// Adds a description to the [`Builder`]. + /// + /// # Notes + /// + /// This will silently overwrite any previous description declarations + /// provided to the builder. + pub fn description>(mut self, description: S) -> Self { + self.description = Some(description.into()); + self } - /// Attempts to extend inputs within the [`Builder`]. - pub fn extend_inputs(mut self, inputs: Iter) -> Result + /// Extends the set of inputs within the [`Builder`]. + pub fn extend_inputs(mut self, inputs: Iter) -> Self where Iter: IntoIterator, { @@ -109,11 +107,11 @@ impl Builder { } }; - Ok(self) + self } - /// Attempts to extend outputs within the [`Builder`]. - pub fn extend_outputs(mut self, outputs: Iter) -> Result + /// Extends the set of outputs within the [`Builder`]. + pub fn extend_outputs(mut self, outputs: Iter) -> Self where Iter: IntoIterator, { @@ -135,21 +133,22 @@ impl Builder { } }; - Ok(self) + self } - /// Attempts to add a resources to the [`Builder`]. - pub fn resources(mut self, resources: Resources) -> Result { - if self.resources.is_some() { - return Err(Error::Multiple("resources")); - } - - self.resources = Some(resources); - Ok(self) + /// Adds a set of requested resources to the [`Builder`]. + /// + /// # Notes + /// + /// This will silently overwrite any previous description declarations + /// provided to the builder. + pub fn resources>(mut self, resources: R) -> Self { + self.resources = Some(resources.into()); + self } - /// Attempts to extend executors within the [`Builder`]. - pub fn extend_executors(mut self, executors: Iter) -> Result + /// Extends the set of executions within the [`Builder`]. + pub fn extend_executions(mut self, executors: Iter) -> Self where Iter: IntoIterator, { @@ -171,11 +170,11 @@ impl Builder { } }; - Ok(self) + self } - /// Attempts to extend volumes within the [`Builder`]. - pub fn extend_volumes(mut self, volumes: Iter) -> Result + /// Extends the set of volumes within the [`Builder`]. + pub fn extend_volumes(mut self, volumes: Iter) -> Self where Iter: IntoIterator, { @@ -197,7 +196,7 @@ impl Builder { } }; - Ok(self) + self } /// Consumes `self` and attempts to return a built [`Task`]. From 6cfdc8b591357e09353b3c6c9141e5dcaffad554 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Fri, 6 Sep 2024 10:36:53 -0500 Subject: [PATCH 02/16] feat: create a `sprocket` CLI (a stand-in for an eventual `sprocket run` command) (#14) * added dependencies in cargo.toml * changes file names to match readme * added unwanted exe not to track * feat: move `main_wdl` into its own bin. (#13) * chore: add a root `Cargo.toml` for a workspace. * chore: cleanup `main` files. * chore: run `cargo fmt`. * chore: run `cargo clippy` and apply fixes. --------- Co-authored-by: Suchitra Chavan --- .gitignore | 3 ++ Cargo.toml | 3 ++ crankshaft/Cargo.toml | 7 ++- crankshaft/README.md | 92 ++++++++++++++++++++++++++++++++ crankshaft/src/bin/crankshaft.rs | 60 +++++++++++++++++++++ crankshaft/src/bin/sprocket.rs | 44 +++++++++++++++ crankshaft/src/example_task.json | 5 ++ crankshaft/src/example_task.yaml | 3 ++ crankshaft/src/my.wdl | 35 ++++++++++++ 9 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 Cargo.toml create mode 100644 crankshaft/README.md create mode 100644 crankshaft/src/bin/crankshaft.rs create mode 100644 crankshaft/src/bin/sprocket.rs create mode 100644 crankshaft/src/example_task.json create mode 100644 crankshaft/src/example_task.yaml create mode 100644 crankshaft/src/my.wdl diff --git a/.gitignore b/.gitignore index 8a20901..6999fe8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ Cargo.lock stderr.txt stdout.txt +/target +rustup-init.exe +src/main_worked.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..6c2bdf7 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,3 @@ +[workspace] +resolver = "2" +members = ["crankshaft", "tes"] diff --git a/crankshaft/Cargo.toml b/crankshaft/Cargo.toml index 7f30477..9cef9b3 100644 --- a/crankshaft/Cargo.toml +++ b/crankshaft/Cargo.toml @@ -22,12 +22,17 @@ serde = { version = "1.0.209", features = ["derive"] } serde_json = "1.0.128" tempfile = "3.12.0" tes = { path = "../tes", version = "0.1.0" } +serde_yaml = "0.8" # Optional, if you want YAML support tokio = { version = "1.40.0", features = ["full", "time"] } toml = "0.8.19" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = "2.5.2" - +wdl-grammar = "0.7.0" +wdl-ast = "0.6.0" +wdl-analysis = "0.2.0" +anyhow = "1.0.86" + [lints.rust] missing_docs = "warn" nonstandard-style = "warn" diff --git a/crankshaft/README.md b/crankshaft/README.md new file mode 100644 index 0000000..3957eb9 --- /dev/null +++ b/crankshaft/README.md @@ -0,0 +1,92 @@ +# Crankshaft - A Simple Task Runner CLI + +Crankshaft is a command-line interface (CLI) tool designed to execute tasks defined in JSON or YAML files. It allows users to automate commands via the system shell, making it easy to run predefined scripts or repetitive tasks. + +## Features + +- Supports task definitions in both JSON and YAML formats. +- Executes commands directly through the system's shell. +- Compatible with Windows and Unix-like systems( Linux, macOS). + +## Requirements + +- Rust installed on your system. Install Rust from [rust-lang.org](https://www.rust-lang.org/). +- A compatible shell available. + +## Setup + +### 1. Clone the Repository + +``` +git clone +cd KIDS24-team15 +``` +### 2. Build the Project + +Compile the project using Cargo, Rust's package manager: + +``` +cargo build + +``` +### How to Use + +### 1. Create a Task File + +- Define your task in a JSON or YAML file. Each task file should include a name and a command to execute. + +- Example JSON file (example_task.json) + +{ + "name": "Sample Task", + "command": "echo Hello, json file input!" +} + +- Example YAML file (example_task.yaml) + +name: Sample Task +command: echo Hello, yaml file! + +### 2 Run a Task + +- Use the following command to run a task defined in your JSON or YAML file: + +``` +cargo run --bin crankshaft -- +``` + +- Example Command + +``` +cargo run --bin crankshaft -- ./example_task.json + +``` +### 3 Expected Output + +The command specified in the task file will be executed via the system shell. + +For the examples provided, the expected output should be "Hello, Json file input!". when using json file. + +### Troubleshooting +- No Output: Verify that the command in your task file works independently in your terminal. +- JSON/YAML Errors: Ensure that your task files are correctly formatted with valid JSON or YAML syntax. +- Command Execution Errors: Check that the shell command is valid and your system environment is set up correctly. + +### References + +- 1. Rust programming language: +- 2. Rust Libraries Used: + - Clap (Command-line Argument Parser) + - Serde (Serialization and Deserialization) +- 3. Workflow Description Language (WDL): + +### Contributors + +- Peter Huene +- Clay McLeod +- John McGuigan +- Andrew Frantz +- Braden Everson +- Jared Andrews +- Michael Gattas +- Suchitra Chavan diff --git a/crankshaft/src/bin/crankshaft.rs b/crankshaft/src/bin/crankshaft.rs new file mode 100644 index 0000000..abb17ec --- /dev/null +++ b/crankshaft/src/bin/crankshaft.rs @@ -0,0 +1,60 @@ +//! A command to evaluate WDL. + +use clap::{Arg, Command}; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::process::Command as ProcessCommand; + +/// A simple task representation for JSON/YAML serialization. +#[derive(Debug, Deserialize, Serialize)] +struct Task { + /// The name of the task. + name: String, + /// The command to run. + command: String, +} + +fn main() { + let matches = Command::new("crankshaft") + .version("1.0") + .about("A simple task runner CLI using JSON and YAML") + .arg( + Arg::new("file") + .help("Task definition file (JSON or YAML)") + .required(true), + ) + .get_matches(); + + if let Some(task_file) = matches.get_one::("file") { + if let Ok(content) = fs::read_to_string(task_file) { + let task: Result> = + if task_file.ends_with(".yaml") || task_file.ends_with(".yml") { + serde_yaml::from_str(&content) + .map_err(|e| Box::new(e) as Box) + } else if task_file.ends_with(".json") { + serde_json::from_str(&content) + .map_err(|e| Box::new(e) as Box) + } else { + Err("Unsupported file format".into()) + }; + + if let Ok(task) = task { + let shell = if cfg!(target_os = "windows") { + "cmd" + } else { + "sh" + }; + let arg = if cfg!(target_os = "windows") { + "/C" + } else { + "-c" + }; + + let _ = ProcessCommand::new(shell) + .arg(arg) + .arg(&task.command) + .status(); + } + } + } +} diff --git a/crankshaft/src/bin/sprocket.rs b/crankshaft/src/bin/sprocket.rs new file mode 100644 index 0000000..1a26c56 --- /dev/null +++ b/crankshaft/src/bin/sprocket.rs @@ -0,0 +1,44 @@ +//! A testing implementation for a `sprocket run` command. + +use anyhow::Result; +use clap::{Arg, Command}; +use std::path::PathBuf; +use wdl_analysis::Analyzer; + +#[tokio::main] +async fn main() -> Result<()> { + let matches = Command::new("sprocket") + .version("1.0") + .about("Runs a WDL task") + .subcommand( + Command::new("run").about("Runs a WDL task").arg( + Arg::new("PATH") + .help("The path to the WDL file defining the task to run") + .required(true), + ), + ) + .arg_required_else_help(true) + .get_matches(); + + if let Some(matches) = matches.subcommand_matches("run") { + let task_file = matches.get_one::("PATH").unwrap(); + analyze_wdl(PathBuf::from(task_file)).await?; + } + + Ok(()) +} + +/// Analyzes the given WDL document. +async fn analyze_wdl(wdl_path: PathBuf) -> Result<()> { + let analyzer = Analyzer::new(|_: (), _, _, _| async {}); + analyzer.add_documents(vec![wdl_path]).await?; + let results = analyzer.analyze(()).await?; + + for result in results { + for diagnostic in result.diagnostics() { + println!("{:?}: {}", diagnostic.severity(), diagnostic.message()); + } + } + + Ok(()) +} diff --git a/crankshaft/src/example_task.json b/crankshaft/src/example_task.json new file mode 100644 index 0000000..ca42ca6 --- /dev/null +++ b/crankshaft/src/example_task.json @@ -0,0 +1,5 @@ +{ + "name": "Sample Task", + "command": "echo Hello, json unput!" +} + diff --git a/crankshaft/src/example_task.yaml b/crankshaft/src/example_task.yaml new file mode 100644 index 0000000..8cb23e2 --- /dev/null +++ b/crankshaft/src/example_task.yaml @@ -0,0 +1,3 @@ +name: Sample Task +command: echo Hello, YAML file input! + diff --git a/crankshaft/src/my.wdl b/crankshaft/src/my.wdl new file mode 100644 index 0000000..bbd71eb --- /dev/null +++ b/crankshaft/src/my.wdl @@ -0,0 +1,35 @@ +version 1.0 + +task foo { + input { + String name + Int x = 5 + } + + command <<< + echo hello ~{name} + echo x is ~{x} + >>> + + output { + File stdout = stdout() + } +} + +workflow exampleWorkflow { + input { + String wf_name + Int wf_x + } + + call foo { + input: + name = wf_name, + x = wf_x + } + + output { + File foo_stdout = foo.stdout + } +} + From 186cc5dafa71ffde3de9d2e776c406166220209f Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Fri, 6 Sep 2024 10:50:07 -0500 Subject: [PATCH 03/16] feat: implement the basis of a WDL runtime. (#15) --- Cargo.toml | 2 +- crankshaft/src/lib.rs | 1 - wdl-runtime/Cargo.toml | 19 + wdl-runtime/src/expr.rs | 3 + wdl-runtime/src/expr/v1.rs | 453 +++++++++++++++ wdl-runtime/src/lib.rs | 19 + wdl-runtime/src/runtime.rs | 311 +++++++++++ wdl-runtime/src/stdlib.rs | 21 + wdl-runtime/src/task.rs | 525 ++++++++++++++++++ .../wdl_util.rs => wdl-runtime/src/util.rs | 0 10 files changed, 1352 insertions(+), 2 deletions(-) create mode 100644 wdl-runtime/Cargo.toml create mode 100644 wdl-runtime/src/expr.rs create mode 100644 wdl-runtime/src/expr/v1.rs create mode 100644 wdl-runtime/src/lib.rs create mode 100644 wdl-runtime/src/runtime.rs create mode 100644 wdl-runtime/src/stdlib.rs create mode 100644 wdl-runtime/src/task.rs rename crankshaft/src/wdl_util.rs => wdl-runtime/src/util.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index 6c2bdf7..0044ffc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] resolver = "2" -members = ["crankshaft", "tes"] +members = ["crankshaft", "tes", "wdl-runtime"] diff --git a/crankshaft/src/lib.rs b/crankshaft/src/lib.rs index bcc8cf0..2b8b64b 100644 --- a/crankshaft/src/lib.rs +++ b/crankshaft/src/lib.rs @@ -1,7 +1,6 @@ //! Crankshaft. pub mod engine; -pub mod wdl_util; /// A boxed [`std::error::Error`]. pub type BoxedError = Box; diff --git a/wdl-runtime/Cargo.toml b/wdl-runtime/Cargo.toml new file mode 100644 index 0000000..d8fef09 --- /dev/null +++ b/wdl-runtime/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "wdl-runtime" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.86" +id-arena = "2.2.1" +indexmap = "2.5.0" +ordered-float = "4.2.2" +paste = "1.0.15" +petgraph = "0.6.5" +string-interner = "0.17.0" +wdl-analysis = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } +wdl-ast = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } + +[dev-dependencies] +tempfile = "3.12.0" +tokio = "1.40.0" diff --git a/wdl-runtime/src/expr.rs b/wdl-runtime/src/expr.rs new file mode 100644 index 0000000..8e39f64 --- /dev/null +++ b/wdl-runtime/src/expr.rs @@ -0,0 +1,3 @@ +//! Module for expression evaluation implementation. + +pub mod v1; diff --git a/wdl-runtime/src/expr/v1.rs b/wdl-runtime/src/expr/v1.rs new file mode 100644 index 0000000..43436e5 --- /dev/null +++ b/wdl-runtime/src/expr/v1.rs @@ -0,0 +1,453 @@ +//! Implementation of an expression evaluator for 1.x WDL documents. + +use std::collections::HashMap; + +use wdl_analysis::diagnostics::{ + ambiguous_argument, argument_type_mismatch, cannot_coerce_to_string, map_key_not_primitive, + too_few_arguments, too_many_arguments, type_mismatch, unknown_function, unknown_name, + unsupported_function, +}; +use wdl_analysis::stdlib::FunctionBindError; +use wdl_analysis::types::{Coercible, Type}; +use wdl_ast::v1::{ + CallExpr, Expr, LiteralArray, LiteralExpr, LiteralMap, LiteralObject, LiteralPair, + LiteralStringKind, LiteralStruct, Placeholder, StringPart, +}; +use wdl_ast::{AstNode, AstNodeExt, AstToken, Diagnostic, Ident, Span, SyntaxKind, TokenStrHash}; + +use crate::util::strip_leading_whitespace; +use crate::{read_string, Runtime, Value}; +use std::fmt::Write; + +/// Creates an "integer not in range" diagnostic +fn integer_not_in_range(span: Span) -> Diagnostic { + Diagnostic::error(format!( + "literal integer exceeds the range for a 64-bit signed integer ({min}..={max})", + min = i64::MIN, + max = i64::MAX, + )) + .with_label("this literal integer is not in range", span) +} + +/// Creates a "float not in range" diagnostic +fn float_not_in_range(span: Span) -> Diagnostic { + Diagnostic::error(format!( + "literal float exceeds the range for a 64-bit float ({min:+e}..={max:+e})", + min = f64::MIN, + max = f64::MAX, + )) + .with_label("this literal float is not in range", span) +} + +/// Creates a "cannot call" diagnostic. +fn cannot_call(target: &Ident) -> Diagnostic { + Diagnostic::error(format!( + "function `{target}` can only be called from task outputs", + target = target.as_str() + )) + .with_highlight(target.span()) +} + +/// Creates a "call failed" diagnostic. +fn call_failed(target: &Ident, error: &anyhow::Error) -> Diagnostic { + Diagnostic::error(format!( + "function `{target}` failed: {error:#}", + target = target.as_str() + )) + .with_highlight(target.span()) +} + +/// Represents a WDL expression evaluator. +#[derive(Debug)] +pub struct ExprEvaluator<'a> { + /// The scope to use for the evaluation. + scope: &'a HashMap, Value>, + /// The value to return from a call to `stdout`. + /// + /// This is `Some` only when evaluating task outputs. + stdout: Option, + /// The value to return from a call to `stderr`. + /// + /// This is `Some` only when evaluating task outputs. + stderr: Option, +} + +impl<'a> ExprEvaluator<'a> { + /// Creates a new expression evaluator. + pub fn new(scope: &'a HashMap, Value>) -> Self { + Self { + scope, + stdout: None, + stderr: None, + } + } + + /// Creates a new expression evaluator with the given stdout/stderr output. + pub fn new_with_output( + scope: &'a HashMap, Value>, + stdout: Value, + stderr: Value, + ) -> Self { + Self { + scope, + stdout: Some(stdout), + stderr: Some(stderr), + } + } + + /// Evaluates the given expression. + pub fn evaluate_expr( + &self, + runtime: &mut Runtime<'_>, + expr: &Expr, + ) -> Result { + match expr { + Expr::Literal(expr) => self.evaluate_literal_expr(runtime, expr), + Expr::Name(r) => { + let name = r.name(); + self.scope + .get(name.as_str()) + .copied() + .ok_or_else(|| unknown_name(name.as_str(), name.span())) + } + Expr::Parenthesized(expr) => self.evaluate_expr(runtime, &expr.inner()), + Expr::If(_) => todo!(), + Expr::LogicalNot(_) => todo!(), + Expr::Negation(_) => todo!(), + Expr::LogicalOr(_) => todo!(), + Expr::LogicalAnd(_) => todo!(), + Expr::Equality(_) => todo!(), + Expr::Inequality(_) => todo!(), + Expr::Less(_) => todo!(), + Expr::LessEqual(_) => todo!(), + Expr::Greater(_) => todo!(), + Expr::GreaterEqual(_) => todo!(), + Expr::Addition(_) => todo!(), + Expr::Subtraction(_) => todo!(), + Expr::Multiplication(_) => todo!(), + Expr::Division(_) => todo!(), + Expr::Modulo(_) => todo!(), + Expr::Exponentiation(_) => todo!(), + Expr::Call(expr) => self.evaluate_call_expr(runtime, expr), + Expr::Index(_) => todo!(), + Expr::Access(_) => todo!(), + } + } + + /// Evaluates a literal expression. + fn evaluate_literal_expr( + &self, + runtime: &mut Runtime<'_>, + expr: &LiteralExpr, + ) -> Result { + match expr { + LiteralExpr::Boolean(lit) => Ok(lit.value().into()), + LiteralExpr::Integer(lit) => { + let parent = lit + .syntax() + .parent() + .expect("should have a parent expression"); + + // Check to see if this literal is a direct child of a negation expression; if so, we want to negate the literal + let (value, span) = if parent.kind() == SyntaxKind::NegationExprNode { + let start = parent.text_range().start().into(); + (lit.negate(), Span::new(start, lit.span().end() - start)) + } else { + (lit.value(), lit.span()) + }; + + Ok(value.ok_or_else(|| integer_not_in_range(span))?.into()) + } + LiteralExpr::Float(lit) => Ok(lit + .value() + .ok_or_else(|| float_not_in_range(lit.span()))? + .into()), + LiteralExpr::String(lit) => { + // An optimization if the literal is just text; don't bother building a new string + if let Some(text) = lit.text() { + return Ok(runtime.new_string(text.as_str())); + } + + let mut s = String::new(); + for p in lit.parts() { + match p { + StringPart::Text(t) => s.push_str(t.as_str()), + StringPart::Placeholder(placeholder) => { + self.evaluate_placeholder(runtime, &placeholder, &mut s)?; + } + } + } + + if let LiteralStringKind::Multiline = lit.kind() { + s = strip_leading_whitespace(&s, false); + } + + Ok(runtime.new_string(s)) + } + LiteralExpr::Array(lit) => self.evaluate_literal_array(runtime, lit), + LiteralExpr::Pair(lit) => self.evaluate_literal_pair(runtime, lit), + LiteralExpr::Map(lit) => self.evaluate_literal_map(runtime, lit), + LiteralExpr::Object(lit) => self.evaluate_literal_object(runtime, lit), + LiteralExpr::Struct(lit) => self.evaluate_literal_struct(runtime, lit), + LiteralExpr::None(_) => Ok(Value::None), + LiteralExpr::Hints(_) | LiteralExpr::Input(_) | LiteralExpr::Output(_) => { + todo!("implement for WDL 1.2 support") + } + } + } + + /// Evaluates a placeholder into the given string buffer. + pub(crate) fn evaluate_placeholder( + &self, + runtime: &mut Runtime<'_>, + placeholder: &Placeholder, + buffer: &mut String, + ) -> Result<(), Diagnostic> { + let expr = placeholder.expr(); + match self.evaluate_expr(runtime, &expr)? { + Value::Boolean(v) => buffer.push_str(if v { "true" } else { "false" }), + Value::Integer(v) => write!(buffer, "{v}").unwrap(), + Value::Float(v) => write!(buffer, "{v}").unwrap(), + Value::String(v) | Value::File(v) | Value::Directory(v) => { + buffer.push_str(runtime.resolve_str(v)) + } + Value::None => {} + Value::Stored(ty, _) => { + return Err(cannot_coerce_to_string(runtime.types(), ty, expr.span())); + } + } + + Ok(()) + } + + /// Evaluates a literal array expression. + fn evaluate_literal_array( + &self, + runtime: &mut Runtime<'_>, + expr: &LiteralArray, + ) -> Result { + // Look at the first array element to determine the element type + // The remaining elements must match the first type + let mut elements = expr.elements(); + match elements.next() { + Some(expr) => { + let mut values = Vec::new(); + let value = self.evaluate_expr(runtime, &expr)?; + let expected = value.ty(); + let expected_span = expr.span(); + values.push(value); + + // Ensure the remaining element types are the same as the first + for expr in elements { + let value = self.evaluate_expr(runtime, &expr)?; + let actual = value.ty(); + values.push(value); + + if !actual.is_coercible_to(runtime.types(), &expected) { + return Err(type_mismatch( + runtime.types(), + expected, + expected_span, + actual, + expr.span(), + )); + } + } + + Ok(runtime.new_array(values)) + } + None => Ok(runtime.new_array(Vec::new())), + } + } + + /// Evaluates a literal pair expression. + fn evaluate_literal_pair( + &self, + runtime: &mut Runtime<'_>, + expr: &LiteralPair, + ) -> Result { + let (left, right) = expr.exprs(); + let left = self.evaluate_expr(runtime, &left)?; + let right = self.evaluate_expr(runtime, &right)?; + Ok(runtime.new_pair(left, right)) + } + + /// Evaluates a literal map expression. + fn evaluate_literal_map( + &self, + runtime: &mut Runtime<'_>, + expr: &LiteralMap, + ) -> Result { + let mut items = expr.items(); + match items.next() { + Some(item) => { + let mut elements = HashMap::new(); + let (key, value) = item.key_value(); + let expected_key = self.evaluate_expr(runtime, &key)?; + let expected_key_span = key.span(); + let expected_value = self.evaluate_expr(runtime, &value)?; + let expected_value_span = value.span(); + match expected_key.ty() { + Type::Primitive(_) => { + // OK + } + ty => { + return Err(map_key_not_primitive(runtime.types(), ty, key.span())); + } + } + + elements.insert(expected_key, expected_value); + + // Ensure the remaining items types are the same as the first + for item in items { + let (key, value) = item.key_value(); + let actual_key = self.evaluate_expr(runtime, &key)?; + let actual_value = self.evaluate_expr(runtime, &value)?; + + if !actual_key + .ty() + .is_coercible_to(runtime.types(), &expected_key.ty()) + { + return Err(type_mismatch( + runtime.types(), + expected_key.ty(), + expected_key_span, + actual_key.ty(), + key.span(), + )); + } + + if !actual_value + .ty() + .is_coercible_to(runtime.types(), &expected_value.ty()) + { + return Err(type_mismatch( + runtime.types(), + expected_value.ty(), + expected_value_span, + actual_value.ty(), + value.span(), + )); + } + + elements.insert(actual_key, actual_value); + } + + Ok(runtime.new_map(elements)) + } + // Treat as `Map[Union, Union]` + None => Ok(runtime.new_map(HashMap::new())), + } + } + + /// Evaluates a literal object expression. + fn evaluate_literal_object( + &self, + runtime: &mut Runtime<'_>, + expr: &LiteralObject, + ) -> Result { + let items = expr + .items() + .map(|item| { + let (name, value) = item.name_value(); + Ok(( + name.as_str().to_string(), + self.evaluate_expr(runtime, &value)?, + )) + }) + .collect::, _>>()?; + + Ok(runtime.new_object(items)) + } + + /// Evaluates a literal struct expression. + fn evaluate_literal_struct( + &self, + _runtime: &mut Runtime<'_>, + _expr: &LiteralStruct, + ) -> Result { + todo!() + } + + /// Evaluates a call expression. + fn evaluate_call_expr( + &self, + runtime: &mut Runtime<'_>, + expr: &CallExpr, + ) -> Result { + let target = expr.target(); + match wdl_analysis::stdlib::STDLIB.function(target.as_str()) { + Some(f) => { + let minimum_version = f.minimum_version(); + if minimum_version + > runtime + .document() + .version() + .expect("document should have a version") + { + return Err(unsupported_function( + minimum_version, + target.as_str(), + target.span(), + )); + } + + let (arguments, types) = expr.arguments().try_fold( + (Vec::new(), Vec::new()), + |(mut args, mut types), expr| { + let value = self.evaluate_expr(runtime, &expr)?; + types.push(value.ty()); + args.push(value); + Ok((args, types)) + }, + )?; + + // TODO: implement a `can_bind` which doesn't mutate the types collection + match f.bind(runtime.types_mut(), &types) { + Ok(_) => { + // TODO: dispatch the function call in a better way + let r = match target.as_str() { + "read_string" => read_string(runtime, &arguments), + "stdout" => return self.stdout.ok_or_else(|| cannot_call(&target)), + "stderr" => return self.stderr.ok_or_else(|| cannot_call(&target)), + _ => unreachable!("unknown function"), + }; + + r.map_err(|e| call_failed(&target, &e)) + } + Err(FunctionBindError::TooFewArguments(minimum)) => Err(too_few_arguments( + target.as_str(), + target.span(), + minimum, + arguments.len(), + )), + Err(FunctionBindError::TooManyArguments(maximum)) => Err(too_many_arguments( + target.as_str(), + target.span(), + maximum, + arguments.len(), + expr.arguments().skip(maximum).map(|e| e.span()), + )), + Err(FunctionBindError::ArgumentTypeMismatch { index, expected }) => { + Err(argument_type_mismatch( + runtime.types(), + &expected, + types[index], + expr.arguments() + .nth(index) + .map(|e| e.span()) + .expect("should have span"), + )) + } + Err(FunctionBindError::Ambiguous { first, second }) => Err(ambiguous_argument( + target.as_str(), + target.span(), + &first, + &second, + )), + } + } + None => Err(unknown_function(target.as_str(), target.span())), + } + } +} diff --git a/wdl-runtime/src/lib.rs b/wdl-runtime/src/lib.rs new file mode 100644 index 0000000..6f427c5 --- /dev/null +++ b/wdl-runtime/src/lib.rs @@ -0,0 +1,19 @@ +//! A create for evaluating WDL documents. + +#![warn(missing_docs)] +#![warn(rust_2018_idioms)] +#![warn(rust_2021_compatibility)] +#![warn(missing_debug_implementations)] +#![warn(clippy::missing_docs_in_private_items)] +#![warn(rustdoc::broken_intra_doc_links)] + +mod expr; +mod runtime; +mod stdlib; +mod task; +mod util; + +pub use expr::*; +pub use runtime::*; +pub use stdlib::*; +pub use task::*; diff --git a/wdl-runtime/src/runtime.rs b/wdl-runtime/src/runtime.rs new file mode 100644 index 0000000..b8776f1 --- /dev/null +++ b/wdl-runtime/src/runtime.rs @@ -0,0 +1,311 @@ +//! Implementation of the WDL runtime and values. + +use std::collections::HashMap; + +use id_arena::{Arena, Id}; +use ordered_float::OrderedFloat; +use string_interner::{symbol::SymbolU32, DefaultStringInterner}; +use wdl_analysis::{ + diagnostics::unknown_type, + scope::DocumentScope, + types::{ArrayType, MapType, PairType, PrimitiveTypeKind, Type, TypeEq, Types}, +}; +use wdl_ast::{AstToken, Diagnostic, Ident}; + +/// Represents a WDL runtime value. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Value { + /// The value is a `Boolean`. + Boolean(bool), + /// The value is an `Int`. + Integer(i64), + /// The value is a `Float`. + Float(OrderedFloat), + /// The value is a `String`. + String(SymbolU32), + /// The value is a `File`. + File(SymbolU32), + /// The value is a `Directory`. + Directory(SymbolU32), + /// The value is a literal `None` value. + None, + /// The value is stored in the runtime. + Stored(Type, StoredValueId), +} + +impl Value { + /// Gets the type of the value. + pub fn ty(&self) -> Type { + match self { + Value::Boolean(_) => PrimitiveTypeKind::Boolean.into(), + Value::Integer(_) => PrimitiveTypeKind::Integer.into(), + Value::Float(_) => PrimitiveTypeKind::Float.into(), + Value::String(_) => PrimitiveTypeKind::String.into(), + Value::File(_) => PrimitiveTypeKind::File.into(), + Value::Directory(_) => PrimitiveTypeKind::Directory.into(), + Value::None => Type::None, + Value::Stored(ty, _) => *ty, + } + } + + /// Unwraps the value into a boolean. + /// + /// # Panics + /// + /// Panics if the value is not a boolean. + pub fn unwrap_boolean(self) -> bool { + match self { + Self::Boolean(v) => v, + _ => panic!("value is not a boolean"), + } + } + + /// Unwraps the value into an integer. + /// + /// # Panics + /// + /// Panics if the value is not an integer. + pub fn unwrap_integer(self) -> i64 { + match self { + Self::Integer(v) => v, + _ => panic!("value is not an integer"), + } + } + + /// Unwraps the value into a float. + /// + /// # Panics + /// + /// Panics if the value is not a float. + pub fn unwrap_float(self) -> f64 { + match self { + Self::Float(v) => v.into(), + _ => panic!("value is not a float"), + } + } + + /// Unwraps the value into a string. + /// + /// # Panics + /// + /// Panics if the value is not a string. + pub fn unwrap_string<'a>(self, runtime: &'a Runtime<'_>) -> &'a str { + match self { + Self::String(sym) => runtime.resolve_str(sym), + _ => panic!("value is not a string"), + } + } + + /// Unwraps the value into a file. + /// + /// # Panics + /// + /// Panics if the value is not a file. + pub fn unwrap_file<'a>(self, runtime: &'a Runtime<'_>) -> &'a str { + match self { + Self::File(sym) => runtime.resolve_str(sym), + _ => panic!("value is not a file"), + } + } + + /// Unwraps the value into a directory. + /// + /// # Panics + /// + /// Panics if the value is not a directory. + pub fn unwrap_directory<'a>(self, runtime: &'a Runtime<'_>) -> &'a str { + match self { + Self::Directory(sym) => runtime.resolve_str(sym), + _ => panic!("value is not a directory"), + } + } + + /// Coerces the value into the given type. + /// + /// Returns `None` if the coercion is not supported. + pub fn coerce(&self, runtime: &mut Runtime<'_>, ty: Type) -> Option { + if self.ty().type_eq(runtime.types(), &ty) { + return Some(*self); + } + + match (self, ty) { + (Value::String(sym), ty) => { + if let Some(ty) = ty.as_primitive() { + match ty.kind() { + PrimitiveTypeKind::File => Some(Self::File(*sym)), + PrimitiveTypeKind::Directory => Some(Self::Directory(*sym)), + _ => None, + } + } else { + None + } + } + (Value::Integer(v), ty) => { + if let Some(ty) = ty.as_primitive() { + match ty.kind() { + PrimitiveTypeKind::Float => Some(Self::Float((*v as f64).into())), + _ => None, + } + } else { + None + } + } + _ => todo!("implement the remainder coercions"), + } + } +} + +impl From for Value { + fn from(value: bool) -> Self { + Self::Boolean(value) + } +} + +impl From for Value { + fn from(value: i64) -> Self { + Self::Integer(value) + } +} + +impl From for Value { + fn from(value: f64) -> Self { + Self::Float(value.into()) + } +} + +/// Represents a value stored in the runtime. +#[derive(Debug)] +pub enum StoredValue { + /// The value is a `Pair` of values. + Pair(Value, Value), + /// The value is an `Array` of values. + Array(Vec), + /// The value is a `Map` of values. + Map(HashMap), + /// The value is an `Object.` + Object(HashMap), + /// The value is a struct. + Struct(Vec), +} + +/// Represents an identifier of a stored value. +pub type StoredValueId = Id; + +/// Represents a WDL runtime. +/// +/// The runtime is responsible for storing complex value types and interning strings. +#[derive(Debug)] +pub struct Runtime<'a> { + /// The reference to the document scope being evaluated. + document: &'a DocumentScope, + /// The types collection for values. + types: Types, + /// The storage arena for values. + values: Arena, + /// The string interner used to intern string/file/directory values. + interner: DefaultStringInterner, + /// The map of known structs to already imported types. + structs: HashMap, +} + +impl<'a> Runtime<'a> { + /// Constructs a new runtime for the given document being evaluated. + pub fn new(document: &'a DocumentScope) -> Self { + Self { + document, + types: Types::default(), + values: Arena::default(), + interner: DefaultStringInterner::default(), + structs: HashMap::default(), + } + } + + /// Gets the document associated with the runtime. + pub fn document(&self) -> &DocumentScope { + self.document + } + + /// Gets the types collection associated with the runtime. + pub fn types(&self) -> &Types { + &self.types + } + + /// Gets the mutable types collection associated with the runtime. + pub fn types_mut(&mut self) -> &mut Types { + &mut self.types + } + + /// Creates a new `String` value. + pub fn new_string(&mut self, s: impl AsRef) -> Value { + Value::String(self.interner.get_or_intern(s)) + } + + /// Creates a new `File` value. + pub fn new_file(&mut self, s: impl AsRef) -> Value { + Value::File(self.interner.get_or_intern(s)) + } + + /// Creates a new `Directory` value. + pub fn new_directory(&mut self, s: impl AsRef) -> Value { + Value::Directory(self.interner.get_or_intern(s)) + } + + /// Creates a new `Pair` value. + pub fn new_pair(&mut self, left: Value, right: Value) -> Value { + let id = self.values.alloc(StoredValue::Pair(left, right)); + let ty = self.types.add_pair(PairType::new(left.ty(), right.ty())); + Value::Stored(ty, id) + } + + /// Creates a new `Array` value. + /// + /// Note that this expects that the array elements are homogenous. + pub fn new_array(&mut self, elements: Vec) -> Value { + let element_type = elements.first().map(Value::ty).unwrap_or(Type::Union); + let id = self.values.alloc(StoredValue::Array(elements)); + let ty = self.types.add_array(ArrayType::new(element_type)); + Value::Stored(ty, id) + } + + /// Creates a new `Map` value. + /// + /// Note that this expects the item keys and values to be homogenous, respectively. + pub fn new_map(&mut self, items: HashMap) -> Value { + let mut iter = items.iter().map(|(k, v)| (k.ty(), v.ty())); + let (key_type, value_type) = iter.next().unwrap_or((Type::Union, Type::Union)); + let id = self.values.alloc(StoredValue::Map(items)); + let ty = self.types.add_map(MapType::new(key_type, value_type)); + Value::Stored(ty, id) + } + + /// Creates a new `Object` value. + pub fn new_object(&mut self, items: HashMap) -> Value { + let id = self.values.alloc(StoredValue::Object(items)); + Value::Stored(Type::Object, id) + } + + /// Creates a new struct value. + pub fn new_struct(&mut self, name: &Ident, members: Vec) -> Result { + // Import the struct type from the document scope if needed + let ty = if let Some(ty) = self.structs.get(name.as_str()) { + *ty + } else { + let ty = self + .document + .struct_by_name(name.as_str()) + .and_then(|s| s.ty()) + .ok_or_else(|| unknown_type(name.as_str(), name.span()))?; + let ty = self.types.import(self.document.types(), ty); + self.structs.insert(name.as_str().to_string(), ty); + ty + }; + + let id = self.values.alloc(StoredValue::Struct(members)); + Ok(Value::Stored(ty, id)) + } + + /// Resolves a previously interned string from a symbol. + pub fn resolve_str(&self, sym: SymbolU32) -> &str { + self.interner.resolve(sym).expect("should have symbol") + } +} diff --git a/wdl-runtime/src/stdlib.rs b/wdl-runtime/src/stdlib.rs new file mode 100644 index 0000000..2dfd21e --- /dev/null +++ b/wdl-runtime/src/stdlib.rs @@ -0,0 +1,21 @@ +//! Implementation of stdlib functions. + +use std::fs; + +use anyhow::Result; + +use anyhow::Context; +use wdl_analysis::types::PrimitiveTypeKind; + +use crate::{Runtime, Value}; + +/// Implements the `read_string` stdlib function. +pub fn read_string(runtime: &mut Runtime<'_>, args: &[Value]) -> Result { + let path = args[0] + .coerce(runtime, PrimitiveTypeKind::File.into()) + .unwrap() + .unwrap_file(runtime); + Ok(runtime.new_string( + fs::read_to_string(path).with_context(|| format!("failed to read file `{path}`"))?, + )) +} diff --git a/wdl-runtime/src/task.rs b/wdl-runtime/src/task.rs new file mode 100644 index 0000000..c030c5f --- /dev/null +++ b/wdl-runtime/src/task.rs @@ -0,0 +1,525 @@ +//! Representation of task evaluation. + +use std::{collections::HashMap, path::Path}; + +use indexmap::IndexMap; +use petgraph::{ + algo::{has_path_connecting, toposort}, + graph::{DiGraph, NodeIndex}, +}; +use wdl_ast::{ + v1::{ + CommandPart, CommandSection, Decl, HintsSection, NameRef, RequirementsSection, + RuntimeSection, TaskDefinition, TaskItem, + }, + AstNode, AstToken, Diagnostic, Ident, SyntaxNode, TokenStrHash, +}; + +use crate::{util::strip_leading_whitespace, v1::ExprEvaluator, Runtime, Value}; + +/// Creates a "missing input" diagnostic. +fn missing_input(task: &str, input: &Ident) -> Diagnostic { + Diagnostic::error(format!( + "missing input `{input}` for task `{task}`", + input = input.as_str() + )) + .with_label("a value must be specified for this input", input.span()) +} + +/// Represents a node in an evaluation graph. +#[derive(Debug, Clone)] +pub enum GraphNode { + /// The node is an input. + Input(Decl), + /// The node is a private decl. + Decl(Decl), + /// The node is an output decl. + Output(Decl), + /// The node is the task's command. + Command(CommandSection), + /// The node is a runtime section. + Runtime(RuntimeSection), + /// The node is a requirements section. + Requirements(RequirementsSection), + /// The node is a hints section. + Hints(HintsSection), +} + +/// Represents a task evaluation graph. +/// +/// This is used to evaluate declarations and sections in topological order. +#[derive(Debug, Default)] +pub struct TaskEvaluationGraph { + /// The inner directed graph. + /// + /// Note that edges in this graph are in *reverse* dependency ordering (implies "depended upon by" relationships). + inner: DiGraph, + /// The map of declaration names to node indexes in the graph. + names: IndexMap, NodeIndex>, + /// The command node index. + command: Option, + /// The runtime node index. + runtime: Option, + /// The requirements node index. + requirements: Option, + /// The hints node index. + hints: Option, +} + +impl TaskEvaluationGraph { + /// Constructs a new task evaluation graph. + pub fn new(task: &TaskDefinition) -> Self { + // Populate the declaration types and build a name reference graph + let mut saw_inputs = false; + let mut outputs = None; + let mut graph = Self::default(); + for item in task.items() { + match item { + TaskItem::Input(section) if !saw_inputs => { + saw_inputs = true; + for decl in section.declarations() { + graph.add_decl_node(decl, GraphNode::Input); + } + } + TaskItem::Output(section) if outputs.is_none() => { + outputs = Some(section); + } + TaskItem::Declaration(decl) => { + graph.add_decl_node(Decl::Bound(decl), GraphNode::Decl); + } + TaskItem::Command(section) if graph.command.is_none() => { + graph.command = Some(graph.inner.add_node(GraphNode::Command(section))); + } + TaskItem::Runtime(section) + if graph.runtime.is_none() && graph.requirements.is_none() => + { + graph.runtime = Some(graph.inner.add_node(GraphNode::Runtime(section))); + } + TaskItem::Requirements(section) + if graph.requirements.is_none() && graph.runtime.is_none() => + { + graph.requirements = + Some(graph.inner.add_node(GraphNode::Requirements(section))); + } + TaskItem::Hints(section) if graph.hints.is_none() && graph.runtime.is_none() => { + graph.hints = Some(graph.inner.add_node(GraphNode::Hints(section))); + } + _ => continue, + } + } + + // Add name reference edges before adding the outputs + graph.add_reference_edges(None); + + let count = graph.inner.node_count(); + + if let Some(section) = outputs { + for decl in section.declarations() { + if let Some(index) = graph.add_decl_node(Decl::Bound(decl), GraphNode::Output) { + // Add an edge to the command node as all outputs depend on the command + if let Some(command) = graph.command { + graph.inner.update_edge(command, index, ()); + } + } + } + } + + // Add reference edges again, but only for the output declaration nodes + graph.add_reference_edges(Some(count)); + + // Finally, add edges from the command to runtime/requirements/hints + if let Some(command) = graph.command { + if let Some(runtime) = graph.runtime { + graph.inner.update_edge(runtime, command, ()); + } + + if let Some(requirements) = graph.requirements { + graph.inner.update_edge(requirements, command, ()); + } + + if let Some(hints) = graph.hints { + graph.inner.update_edge(hints, command, ()); + } + } + + graph + } + + /// Performs a topological sort of the graph nodes. + pub fn toposort(&self) -> Vec { + toposort(&self.inner, None) + .expect("graph should be acyclic") + .into_iter() + .map(|i| self.inner[i].clone()) + .collect() + } + + /// Adds a declaration node to the graph. + fn add_decl_node(&mut self, decl: Decl, map: F) -> Option + where + F: FnOnce(Decl) -> GraphNode, + { + let name = decl.name(); + + // Ignore duplicate nodes + if self.names.contains_key(name.as_str()) { + return None; + } + + let index = self.inner.add_node(map(decl)); + self.names.insert(TokenStrHash::new(name), index); + Some(index) + } + + /// Adds edges from task sections to declarations. + fn add_section_edges( + &mut self, + from: NodeIndex, + descendants: impl Iterator, + ) { + // Add edges for any descendant name references + for r in descendants.filter_map(NameRef::cast) { + let name = r.name(); + + // Look up the name; we don't check for cycles here as decls can't + // reference a section. + if let Some(to) = self.names.get(name.as_str()) { + self.inner.update_edge(*to, from, ()); + } + } + } + + /// Adds name reference edges to the graph. + fn add_reference_edges(&mut self, skip: Option) { + let mut space = Default::default(); + + // Populate edges for any nodes that reference other nodes by name + for from in self.inner.node_indices().skip(skip.unwrap_or(0)) { + match &self.inner[from] { + GraphNode::Input(decl) | GraphNode::Decl(decl) | GraphNode::Output(decl) => { + let expr = decl.expr(); + if let Some(expr) = expr { + for r in expr.syntax().descendants().filter_map(NameRef::cast) { + let name = r.name(); + + // Only add an edge if the name is known to us + if let Some(to) = self.names.get(name.as_str()) { + // Ignore edges that form cycles; evaluation will later treat this as an unknown name reference. + if has_path_connecting(&self.inner, from, *to, Some(&mut space)) { + continue; + } + + self.inner.update_edge(*to, from, ()); + } + } + } + } + GraphNode::Command(section) => { + // Add name references from the command section to any decls in scope + let section = section.clone(); + for part in section.parts() { + if let CommandPart::Placeholder(p) = part { + self.add_section_edges(from, p.syntax().descendants()); + } + } + } + GraphNode::Runtime(section) => { + // Add name references from the runtime section to any decls in scope + let section = section.clone(); + for item in section.items() { + self.add_section_edges(from, item.syntax().descendants()); + } + } + GraphNode::Requirements(section) => { + // Add name references from the requirements section to any decls in scope + let section = section.clone(); + for item in section.items() { + self.add_section_edges(from, item.syntax().descendants()); + } + } + GraphNode::Hints(section) => { + // Add name references from the hints section to any decls in scope + let section = section.clone(); + for item in section.items() { + self.add_section_edges(from, item.syntax().descendants()); + } + } + } + } + } +} + +/// Represents an evaluated task. +#[derive(Debug)] +pub struct EvaluatedTask<'a> { + /// The evaluated command text (i.e. bash script) to use for executing the task. + command: String, + /// The evaluated requirements for running the command. + requirements: IndexMap, + /// The evaluated hints for running the command. + hints: IndexMap, + /// The map from input paths to localized paths within the execution environment. + paths: IndexMap, + /// The evaluation scope for evaluating the task so far. + scope: HashMap, Value>, + /// The evaluation nodes; this is used to evaluate the outputs after the task is executed. + nodes: &'a Vec, + /// The index of the start of the outputs in the set of nodes. + outputs: Option, +} + +impl<'a> EvaluatedTask<'a> { + /// Constructs a new evaluated task given the graph nodes in topological order. + fn new(nodes: &'a Vec) -> Self { + Self { + command: String::new(), + requirements: Default::default(), + hints: Default::default(), + paths: Default::default(), + scope: Default::default(), + nodes, + outputs: None, + } + } + + /// Gets the command (i.e. bash script) to use for executing the task. + pub fn command(&self) -> &str { + &self.command + } + + /// The localized paths used by the command. + /// + /// The key is the local path and the value is the localized path. + pub fn paths(&self) -> &IndexMap { + &self.paths + } + + /// The evaluated requirements for running the command. + pub fn requirements(&self) -> &IndexMap { + &self.requirements + } + + /// The evaluated hints for running the command. + pub fn hints(&self) -> &IndexMap { + &self.hints + } + + /// Evaluates the outputs of the task given the stdout and stderr from task execution. + pub fn outputs( + &self, + runtime: &mut Runtime<'_>, + stdout: impl AsRef, + stderr: impl AsRef, + ) -> Result, Value>, Diagnostic> { + let mut outputs = HashMap::default(); + if let Some(index) = self.outputs { + let stdout = runtime.new_file(stdout.as_ref().to_string_lossy()); + let stderr = runtime.new_file(stderr.as_ref().to_string_lossy()); + + let evaluator = ExprEvaluator::new_with_output(&self.scope, stdout, stderr); + for node in &self.nodes[index..] { + match node { + GraphNode::Output(decl) => { + let name = decl.name(); + let expr = decl.expr().expect("decl should be bound"); + let value = evaluator.evaluate_expr(runtime, &expr)?; + outputs.insert(TokenStrHash::new(name), value); + } + _ => panic!("only output nodes should follow the command"), + } + } + } + + Ok(outputs) + } +} + +/// Represents a task evaluator. +#[derive(Debug)] +pub struct TaskEvaluator { + /// The name of the task being evaluated. + name: Ident, + /// The task evaluation nodes in topological order. + nodes: Vec, +} + +impl TaskEvaluator { + /// Constructs a new task based on a definition and its inputs. + pub fn new(definition: TaskDefinition) -> Self { + let graph = TaskEvaluationGraph::new(&definition); + let nodes = graph.toposort(); + Self { + name: definition.name(), + nodes, + } + } + + /// Evaluates the task with the given base path to use for file localization. + pub fn evaluate<'a>( + &'a self, + runtime: &mut Runtime<'_>, + inputs: &HashMap, + _base: impl AsRef, + ) -> Result, Diagnostic> { + let mut evaluated = EvaluatedTask::new(&self.nodes); + + // Start by walking the nodes looking for input decls to populate the scope + for node in &self.nodes { + match node { + GraphNode::Input(decl) => { + let name = decl.name(); + if let Some(value) = inputs.get(name.as_str()) { + evaluated.scope.insert(TokenStrHash::new(name), *value); + } else { + // Check to see if the declaration was unbound; if so, it may be required if the declared type is not optional + if let Decl::Unbound(decl) = decl { + let ty = decl.ty(); + if ty.is_optional() { + evaluated.scope.insert(TokenStrHash::new(name), Value::None); + } else { + // The input is required + return Err(missing_input(self.name.as_str(), &name)); + } + } + } + } + GraphNode::Decl(_) => continue, + _ => break, + } + } + + // Walk the nodes again and evaluate them + for (index, node) in self.nodes.iter().enumerate() { + match node { + GraphNode::Input(decl) | GraphNode::Decl(decl) => { + let name = decl.name(); + if evaluated.scope.contains_key(name.as_str()) { + // Skip evaluating the input as we already have the value in scope + continue; + } + + let expr = decl.expr().expect("declaration should be bound"); + let evaluator = ExprEvaluator::new(&evaluated.scope); + let value = evaluator.evaluate_expr(runtime, &expr)?; + evaluated.scope.insert(TokenStrHash::new(name), value); + } + GraphNode::Requirements(_) => { + // TODO: implement + } + GraphNode::Runtime(_) => { + // TODO: implement + } + GraphNode::Hints(_) => { + // TODO: implement + } + GraphNode::Command(section) => { + // TODO: set `task` variable in scope for 1.2 documents + let evaluator = ExprEvaluator::new(&evaluated.scope); + for part in section.parts() { + match part { + CommandPart::Text(text) => evaluated.command.push_str(text.as_str()), + CommandPart::Placeholder(placeholder) => evaluator + .evaluate_placeholder( + runtime, + &placeholder, + &mut evaluated.command, + )?, + } + } + + evaluated.command = strip_leading_whitespace(&evaluated.command, true); + } + GraphNode::Output(_) => { + evaluated.outputs = Some(index); + break; + } + } + } + + Ok(evaluated) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::fs; + + use tempfile::TempDir; + use wdl_analysis::Analyzer; + + #[tokio::test] + async fn it_works() { + let dir = TempDir::new().expect("failed to create temporary directory"); + let path = dir.path().join("foo.wdl"); + fs::write( + &path, + r#"version 1.1 + +task test { + input { + String name = "Peter" + } + + command <<< + echo Hi, ~{name}! + >>> + + output { + String message = "stdout was: ~{read_string(stdout())}" + } +} +"#, + ) + .expect("failed to create test file"); + + let stdout = dir.path().join("stdout"); + fs::write(&stdout, r#"Hi, Peter!"#).expect("failed to create test file"); + + let stderr = dir.path().join("stderr"); + fs::write(&stderr, r#""#).expect("failed to create test file"); + + let analyzer = Analyzer::new(|_: (), _, _, _| async {}); + analyzer + .add_documents(vec![dir.path().to_path_buf()]) + .await + .expect("should add documents"); + + let results = analyzer.analyze(()).await.expect("should succeed"); + assert_eq!(results.len(), 1); + assert!(results[0].diagnostics().is_empty()); + + let document = results[0] + .parse_result() + .document() + .expect("should have a document"); + + let task = document + .ast() + .as_v1() + .expect("should be a V1 AST") + .tasks() + .find(|t| t.name().as_str() == "test") + .expect("should have task"); + + let mut runtime = Runtime::new(results[0].scope()); + let inputs = HashMap::new(); + let evaluator = TaskEvaluator::new(task); + let evaluated = evaluator + .evaluate(&mut runtime, &inputs, "/tmp") + .expect("should evaluate"); + + let outputs = evaluated + .outputs(&mut runtime, stdout, stderr) + .expect("should evaluate"); + for (k, v) in outputs { + assert_eq!(k.as_ref().as_str(), "message"); + match v { + Value::String(sym) => { + assert_eq!(runtime.resolve_str(sym), "stdout was: Hi, Peter!") + } + _ => panic!("expected a string value, found {v:?}"), + } + } + } +} diff --git a/crankshaft/src/wdl_util.rs b/wdl-runtime/src/util.rs similarity index 100% rename from crankshaft/src/wdl_util.rs rename to wdl-runtime/src/util.rs From d7521eb0a22f84c8d540ee3d500b72e9091704c0 Mon Sep 17 00:00:00 2001 From: Andrew Frantz Date: Fri, 6 Sep 2024 12:00:44 -0400 Subject: [PATCH 04/16] Update lsf.rs (#18) * Update lsf.rs * Update lsf.rs --- crankshaft/examples/lsf.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crankshaft/examples/lsf.rs b/crankshaft/examples/lsf.rs index d0592cf..ede825b 100644 --- a/crankshaft/examples/lsf.rs +++ b/crankshaft/examples/lsf.rs @@ -15,10 +15,11 @@ async fn main() { .with(EnvFilter::from_default_env()) .init(); - let config = crankshaft::engine::config::Config::new("configs/lsf.toml") - .expect("Load from example config") - .backends[0] - .clone(); + let config = + crankshaft::engine::config::Config::new("crankshaft/test/fixtures/config/lsf.toml") + .expect("Load from example config") + .backends[0] + .clone(); let backend = crankshaft::engine::service::runner::backend::generic::GenericBackend::from_config(config) From 0cf2965c4e4150d9340198b34d4dbc480ec8d068 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Fri, 6 Sep 2024 11:07:08 -0500 Subject: [PATCH 05/16] chore: move dependencies into workspace deps. (#16) --- Cargo.toml | 35 +++++++++++++++++++++++++++ crankshaft/Cargo.toml | 54 +++++++++++++++++++++--------------------- tes/Cargo.toml | 10 ++++---- wdl-runtime/Cargo.toml | 17 +++++++------ 4 files changed, 75 insertions(+), 41 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0044ffc..7a9da2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,38 @@ [workspace] resolver = "2" members = ["crankshaft", "tes", "wdl-runtime"] + +[workspace.dependencies] +anyhow = "1.0.86" +id-arena = "2.2.1" +indexmap = "2.5.0" +ordered-float = "4.2.2" +paste = "1.0.15" +petgraph = "0.6.5" +string-interner = "0.17.0" +wdl-analysis = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } +wdl-ast = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } +wdl-grammar = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } +async-trait = "0.1.82" +bollard = "0.17.1" +bytes = "1.7.1" +clap = { version = "4.5.16", features = ["derive"] } +config = "0.14.0" +dirs = "5.0.1" +futures = "0.3.30" +nonempty = "0.10.0" +rand = "0.8.5" +random_word = { version = "0.4.3", features = ["en"] } +regex = "1.10.6" +reqwest = "0.12.7" +serde = { version = "1.0.209", features = ["derive"] } +serde_json = "1.0.128" +tempfile = "3.12.0" +tes = { path = "../tes", version = "0.1.0" } +serde_yaml = "0.8" # Optional, if you want YAML support +tokio = { version = "1.40.0", features = ["full", "time"] } +toml = "0.8.19" +tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +url = "2.5.2" +chrono = { version = "0.4.38", features = ["serde"] } diff --git a/crankshaft/Cargo.toml b/crankshaft/Cargo.toml index 9cef9b3..a541078 100644 --- a/crankshaft/Cargo.toml +++ b/crankshaft/Cargo.toml @@ -4,34 +4,34 @@ version = "0.1.0" edition = "2021" [dependencies] -async-trait = "0.1.82" -bollard = "0.17.1" -bytes = "1.7.1" -clap = { version = "4.5.16", features = ["derive"] } -config = "0.14.0" -dirs = "5.0.1" -futures = "0.3.30" -indexmap = "2.5.0" -nonempty = "0.10.0" -paste = "1.0.15" -rand = "0.8.5" -random_word = { version = "0.4.3", features = ["en"] } -regex = "1.10.6" -reqwest = "0.12.7" -serde = { version = "1.0.209", features = ["derive"] } -serde_json = "1.0.128" -tempfile = "3.12.0" tes = { path = "../tes", version = "0.1.0" } -serde_yaml = "0.8" # Optional, if you want YAML support -tokio = { version = "1.40.0", features = ["full", "time"] } -toml = "0.8.19" -tracing = "0.1.40" -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } -url = "2.5.2" -wdl-grammar = "0.7.0" -wdl-ast = "0.6.0" -wdl-analysis = "0.2.0" -anyhow = "1.0.86" +async-trait = { workspace = true } +bollard = { workspace = true } +bytes = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +dirs = { workspace = true } +futures = { workspace = true } +indexmap = { workspace = true } +nonempty = { workspace = true } +paste = { workspace = true } +rand = { workspace = true } +random_word = { workspace = true } +regex = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tempfile = { workspace = true } +serde_yaml = { workspace = true } +tokio = { workspace = true } +toml = { workspace = true } +tracing = { workspace = true } +tracing-subscriber ={ workspace = true } +url = { workspace = true } +wdl-grammar = { workspace = true } +wdl-ast = { workspace = true } +wdl-analysis = { workspace = true } +anyhow = { workspace = true } [lints.rust] missing_docs = "warn" diff --git a/tes/Cargo.toml b/tes/Cargo.toml index 1ab1f88..1877715 100644 --- a/tes/Cargo.toml +++ b/tes/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] -chrono = { version = "0.4.38", features = ["serde"] } -reqwest = "0.12.7" -serde = { version = "1.0.209", features = ["derive"] } -serde_json = "1.0.128" -tokio = { version = "1.40.0", features = ["full"] } +chrono = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio ={ workspace = true } diff --git a/wdl-runtime/Cargo.toml b/wdl-runtime/Cargo.toml index d8fef09..0fe130e 100644 --- a/wdl-runtime/Cargo.toml +++ b/wdl-runtime/Cargo.toml @@ -4,15 +4,14 @@ version = "0.1.0" edition = "2021" [dependencies] -anyhow = "1.0.86" -id-arena = "2.2.1" -indexmap = "2.5.0" -ordered-float = "4.2.2" -paste = "1.0.15" -petgraph = "0.6.5" -string-interner = "0.17.0" -wdl-analysis = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } -wdl-ast = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } +anyhow = { workspace = true } +id-arena = { workspace = true } +indexmap = { workspace = true } +ordered-float = { workspace = true } +petgraph = { workspace = true } +string-interner = { workspace = true } +wdl-analysis = { workspace = true } +wdl-ast = { workspace = true } [dev-dependencies] tempfile = "3.12.0" From 512598fd48972a1b80d9b849dfd5fca709a1dbec Mon Sep 17 00:00:00 2001 From: Braden Everson Date: Fri, 6 Sep 2024 11:26:00 -0500 Subject: [PATCH 06/16] Add Progress Bar when Engine is Run (#17) * Add progress bar to engine run * Add dependencies to workspace * Remove println in favor of debug printing on task submit --- Cargo.toml | 3 ++- crankshaft/Cargo.toml | 5 +++-- crankshaft/src/engine.rs | 25 ++++++++++++++++++++++--- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7a9da2f..140f57c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ indexmap = "2.5.0" ordered-float = "4.2.2" paste = "1.0.15" petgraph = "0.6.5" +indicatif = "0.17.8" string-interner = "0.17.0" wdl-analysis = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } wdl-ast = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } @@ -29,7 +30,7 @@ serde = { version = "1.0.209", features = ["derive"] } serde_json = "1.0.128" tempfile = "3.12.0" tes = { path = "../tes", version = "0.1.0" } -serde_yaml = "0.8" # Optional, if you want YAML support +serde_yaml = "0.8" # Optional, if you want YAML support tokio = { version = "1.40.0", features = ["full", "time"] } toml = "0.8.19" tracing = "0.1.40" diff --git a/crankshaft/Cargo.toml b/crankshaft/Cargo.toml index a541078..f5e1541 100644 --- a/crankshaft/Cargo.toml +++ b/crankshaft/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +indicatif = { workspace = true } tes = { path = "../tes", version = "0.1.0" } async-trait = { workspace = true } bollard = { workspace = true } @@ -26,13 +27,13 @@ serde_yaml = { workspace = true } tokio = { workspace = true } toml = { workspace = true } tracing = { workspace = true } -tracing-subscriber ={ workspace = true } +tracing-subscriber = { workspace = true } url = { workspace = true } wdl-grammar = { workspace = true } wdl-ast = { workspace = true } wdl-analysis = { workspace = true } anyhow = { workspace = true } - + [lints.rust] missing_docs = "warn" nonstandard-style = "warn" diff --git a/crankshaft/src/engine.rs b/crankshaft/src/engine.rs index 26e9aae..52a9c2c 100644 --- a/crankshaft/src/engine.rs +++ b/crankshaft/src/engine.rs @@ -4,7 +4,9 @@ pub mod config; pub mod service; pub mod task; -use futures::future::join_all; +use futures::StreamExt; +use indicatif::ProgressBar; +use indicatif::ProgressStyle; pub use task::Task; use tracing::debug; @@ -47,8 +49,25 @@ impl Engine { } /// Runs all of the tasks scheduled in the engine. - pub async fn run(self) { - join_all(self.runner.tasks).await; + pub async fn run(&mut self) { + let task_completion_bar = ProgressBar::new(self.runner.tasks.len() as u64); + task_completion_bar.set_style( + ProgressStyle::with_template( + "{spinner:.cyan/blue} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos:>7}/{len:7} {msg}", + ) + .unwrap() + .progress_chars("#>-"), + ); + + let mut count = 1; + + while let Some(()) = self.runner.tasks.next().await { + task_completion_bar.set_message(format!("task #{}", count)); + task_completion_bar.inc(1); + count += 1; + } + + task_completion_bar.finish_with_message("All jobs complete."); } } From 6eef880ebe9ccaa22ef198d544592657476aeca8 Mon Sep 17 00:00:00 2001 From: Andrew Frantz Date: Fri, 6 Sep 2024 14:13:41 -0400 Subject: [PATCH 07/16] Update lsf.rs --- crankshaft/examples/lsf.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crankshaft/examples/lsf.rs b/crankshaft/examples/lsf.rs index ede825b..94ab4da 100644 --- a/crankshaft/examples/lsf.rs +++ b/crankshaft/examples/lsf.rs @@ -1,4 +1,4 @@ -//! An example for runner a task using the Docker backend service. +//! An example for runner a task using the generic LSF backend service. use crankshaft::engine::task::Execution; use crankshaft::engine::Engine; @@ -45,8 +45,11 @@ async fn main() { .try_build() .unwrap(); - let receivers = (0..10) - .map(|_| engine.submit(task.clone()).callback) + let receivers = (0..1000) + .map(|i| { + println!("Submitting task number: {}", i); + engine.submit(task.clone()).callback + }) .collect::>(); engine.run().await; From 983a95a1045727fa1c6d40714764b1921a0ec17d Mon Sep 17 00:00:00 2001 From: Andrew Frantz Date: Fri, 6 Sep 2024 14:15:55 -0400 Subject: [PATCH 08/16] Update lsf.rs --- crankshaft/examples/lsf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crankshaft/examples/lsf.rs b/crankshaft/examples/lsf.rs index 94ab4da..59c58f4 100644 --- a/crankshaft/examples/lsf.rs +++ b/crankshaft/examples/lsf.rs @@ -47,7 +47,7 @@ async fn main() { let receivers = (0..1000) .map(|i| { - println!("Submitting task number: {}", i); + println!("Submitting task number: {}", i + 1); engine.submit(task.clone()).callback }) .collect::>(); From ced18afe97c9952790f7dd3ca7be1992fa17d802 Mon Sep 17 00:00:00 2001 From: Braden Everson Date: Fri, 6 Sep 2024 13:22:32 -0500 Subject: [PATCH 09/16] Set progressbar to 0 initially to remove gap between running and progressbar actually showing. Remove debug statements from lsf example --- crankshaft/examples/lsf.rs | 5 +---- crankshaft/src/engine.rs | 1 + 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/crankshaft/examples/lsf.rs b/crankshaft/examples/lsf.rs index 59c58f4..a847ab2 100644 --- a/crankshaft/examples/lsf.rs +++ b/crankshaft/examples/lsf.rs @@ -46,10 +46,7 @@ async fn main() { .unwrap(); let receivers = (0..1000) - .map(|i| { - println!("Submitting task number: {}", i + 1); - engine.submit(task.clone()).callback - }) + .map(|_| engine.submit(task.clone()).callback) .collect::>(); engine.run().await; diff --git a/crankshaft/src/engine.rs b/crankshaft/src/engine.rs index 52a9c2c..ce9e630 100644 --- a/crankshaft/src/engine.rs +++ b/crankshaft/src/engine.rs @@ -60,6 +60,7 @@ impl Engine { ); let mut count = 1; + task_completion_bar.inc(0); while let Some(()) = self.runner.tasks.next().await { task_completion_bar.set_message(format!("task #{}", count)); From 75f71088f877857f4627ba86793233450c4ef904 Mon Sep 17 00:00:00 2001 From: Braden Everson Date: Fri, 6 Sep 2024 13:29:28 -0500 Subject: [PATCH 10/16] Enable steady ticking for progressbar --- crankshaft/src/engine.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crankshaft/src/engine.rs b/crankshaft/src/engine.rs index ce9e630..2567043 100644 --- a/crankshaft/src/engine.rs +++ b/crankshaft/src/engine.rs @@ -4,6 +4,8 @@ pub mod config; pub mod service; pub mod task; +use std::time::Duration; + use futures::StreamExt; use indicatif::ProgressBar; use indicatif::ProgressStyle; @@ -61,6 +63,7 @@ impl Engine { let mut count = 1; task_completion_bar.inc(0); + task_completion_bar.enable_steady_tick(Duration::from_millis(100)); while let Some(()) = self.runner.tasks.next().await { task_completion_bar.set_message(format!("task #{}", count)); From 51e275c35b9e642dcedab16fd08a67dc3858e558 Mon Sep 17 00:00:00 2001 From: John McGuigan Date: Fri, 6 Sep 2024 14:38:37 -0400 Subject: [PATCH 11/16] Refactor docker usage and support input files (#20) * Refactor to separate container launch from running the command * Correctly set a temp dir for working directory * Working input * rebase * rm Cargo.lock in root folder --- Cargo.toml | 1 + crankshaft/Cargo.toml | 1 + crankshaft/examples/docker.rs | 41 +++- .../engine/service/runner/backend/docker.rs | 223 ++++++++++-------- crankshaft/src/engine/task/input.rs | 34 +++ 5 files changed, 197 insertions(+), 103 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 140f57c..996f5a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ regex = "1.10.6" reqwest = "0.12.7" serde = { version = "1.0.209", features = ["derive"] } serde_json = "1.0.128" +tar = "0.4.41" tempfile = "3.12.0" tes = { path = "../tes", version = "0.1.0" } serde_yaml = "0.8" # Optional, if you want YAML support diff --git a/crankshaft/Cargo.toml b/crankshaft/Cargo.toml index f5e1541..c9a04ea 100644 --- a/crankshaft/Cargo.toml +++ b/crankshaft/Cargo.toml @@ -33,6 +33,7 @@ wdl-grammar = { workspace = true } wdl-ast = { workspace = true } wdl-analysis = { workspace = true } anyhow = { workspace = true } +tar.workspace = true [lints.rust] missing_docs = "warn" diff --git a/crankshaft/examples/docker.rs b/crankshaft/examples/docker.rs index 6ed240f..5611a57 100644 --- a/crankshaft/examples/docker.rs +++ b/crankshaft/examples/docker.rs @@ -1,8 +1,11 @@ //! An example for runner a task using the Docker backend service. - +use crankshaft::engine::task::input; use crankshaft::engine::task::Execution; +use crankshaft::engine::task::Input; use crankshaft::engine::Engine; use crankshaft::engine::Task; +use std::io::Write; +use tempfile::NamedTempFile; use tracing_subscriber::fmt; use tracing_subscriber::layer::SubscriberExt as _; use tracing_subscriber::util::SubscriberInitExt as _; @@ -17,16 +20,38 @@ async fn main() { let mut engine = Engine::default(); + let mut temp_file = NamedTempFile::new().unwrap(); + writeln!(temp_file, "Hello, world from an input").unwrap(); + + // Get the path to the temp file + let temp_path = temp_file.path().to_path_buf(); + let input = Input::builder() + .contents(temp_path) + .path("/volA/test_input.txt") + .r#type(input::Type::File) + .try_build() + .unwrap(); + let task = Task::builder() .name("my-example-task") .description("a longer description") - .extend_executions(vec![Execution::builder() - .image("ubuntu") - .args(&[String::from("echo"), String::from("'hello, world!'")]) - .stdout("stdout.txt") - .stderr("stderr.txt") - .try_build() - .unwrap()]) + .extend_inputs(vec![input]) + .extend_executions(vec![ + Execution::builder() + .image("ubuntu") + .args(&[ + String::from("bash"), + String::from("-c"), + String::from("ls /volA"), + ]) + .try_build() + .unwrap(), + Execution::builder() + .image("ubuntu") + .args(&[String::from("cat"), String::from("/volA/test_input.txt")]) + .try_build() + .unwrap(), + ]) .extend_volumes(vec!["/volA".to_string(), "/volB".to_string()]) .try_build() .unwrap(); diff --git a/crankshaft/src/engine/service/runner/backend/docker.rs b/crankshaft/src/engine/service/runner/backend/docker.rs index 345dc28..5f3e3fb 100644 --- a/crankshaft/src/engine/service/runner/backend/docker.rs +++ b/crankshaft/src/engine/service/runner/backend/docker.rs @@ -1,5 +1,6 @@ //! A docker runner service. +use std::io::Cursor; use std::str::FromStr; use std::sync::Arc; @@ -7,22 +8,20 @@ use async_trait::async_trait; use bollard::container::Config; use bollard::container::CreateContainerOptions; use bollard::container::LogOutput; -use bollard::container::LogsOptions; +use bollard::container::RemoveContainerOptions; use bollard::container::StartContainerOptions; -use bollard::container::WaitContainerOptions; +use bollard::container::UploadToContainerOptions; use bollard::errors::Error; +use bollard::exec::CreateExecOptions; +use bollard::exec::StartExecResults; use bollard::models::HostConfig; use bollard::models::Mount; -use bollard::models::MountTypeEnum; -use bollard::secret::ContainerWaitResponse; use bollard::Docker; use futures::future::BoxFuture; use futures::FutureExt; -use futures::StreamExt; use futures::TryStreamExt; use nonempty::NonEmpty; use random_word::Lang; -use tempfile::TempDir; use tmp_mount::TmpMount; use tokio::sync::oneshot::Sender; @@ -30,6 +29,7 @@ use crate::engine::service::runner::backend::Backend; use crate::engine::service::runner::backend::ExecutionResult; use crate::engine::service::runner::backend::Reply; use crate::engine::task::Execution; +use crate::engine::task::Input; use crate::engine::task::Resources; use crate::engine::Task; @@ -85,72 +85,43 @@ impl Backend for Runner { for execution in task.executions() { let name = random_name(); - let tmp_dir = TempDir::new().unwrap(); - let workdir_path = tmp_dir.path().to_str().unwrap(); - - container_create( - &name, - execution, - task.resources(), - &mut client, - workdir_path, - &mounts[..], - ) - .await; + // Create the container + container_create(&name, execution, task.resources(), &mut client, &mounts[..]) + .await; + + // Start the container container_start(&name, &mut client).await; - let logs = configure_logs(&name, execution, &mut client); - let mut wait = configure_wait(&name, &mut client); - - // Process logs until they stop when container stops - let (stdout, stderr) = logs - .try_fold( - (String::with_capacity(1 << 8), String::with_capacity(1 << 8)), - |(mut stdout, mut stderr), log| async move { - match log { - LogOutput::StdOut { message } => { - stdout.push_str(&String::from_utf8_lossy(&message)); - } - LogOutput::StdErr { message } => { - stderr.push_str(&String::from_utf8_lossy(&message)); - } - _ => {} - } - Ok((stdout, stderr)) - }, + // Insert inputs + if let Some(inputs) = task.inputs() { + for input in inputs { + insert_input(&name, &mut client, input).await; + } + }; + + // Run a command + let exec_result = container_exec(&name, execution, &mut client).await; + + // Export outputs + + // remove the container + client + .remove_container( + &name, + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), ) .await - .unwrap_or_else(|e| { - eprintln!("Error collecting logs: {:?}", e); - (String::new(), String::new()) - }); - - // Process container stop - let status = wait - .next() - .await - .transpose() - .unwrap_or_else(|e| { - eprintln!("Error waiting for container: {:?}", e); - None - }) - .map(|response| response.status_code) - .unwrap_or(-1); - - client.remove_container(&name, None).await.unwrap(); - - let result = ExecutionResult { - status, - stdout, - stderr, - }; + .unwrap(); results = match results { Some(mut results) => { - results.push(result); + results.push(exec_result); Some(results) } - None => Some(NonEmpty::new(result)), + None => Some(NonEmpty::new(exec_result)), } } @@ -180,23 +151,11 @@ async fn container_create( execution: &Execution, resources: Option<&Resources>, client: &mut Arc, - workdir_path: &str, mounts: &[Mount], ) { - // Create a local tmpdir mount for the working directory - let workdir_mount = Mount { - target: Some(WORKDIR.to_string()), - source: Some(workdir_path.to_string()), - typ: Some(MountTypeEnum::BIND), - ..Default::default() - }; - // Configure Docker to use all mounts - let mut final_mounts = Vec::with_capacity(1 + mounts.len()); - final_mounts.push(workdir_mount); - final_mounts.extend_from_slice(mounts); let host_config = HostConfig { - mounts: Some(final_mounts), + mounts: Some(mounts.to_vec()), ..resources.map(HostConfig::from).unwrap_or_default() }; @@ -207,9 +166,9 @@ async fn container_create( let config = Config { image: Some(execution.image()), - cmd: Some(execution.args().into_iter().map(|s| s.as_str()).collect()), + tty: Some(true), host_config: Some(host_config), - working_dir: Some(WORKDIR), + working_dir: execution.workdir().map(String::as_str), ..Default::default() }; @@ -224,26 +183,100 @@ async fn container_start(name: &str, client: &mut Arc) { .unwrap(); } -/// Configures the log stream for the Docker container. -fn configure_logs( - name: &str, - execution: &Execution, - client: &mut Arc, -) -> impl futures::Stream> { - let options = LogsOptions:: { - follow: true, - stdout: execution.stdout().is_some(), - stderr: execution.stderr().is_some(), - ..Default::default() - }; +/// Puts input files into the container +async fn insert_input(name: &str, client: &mut Arc, input: &Input) { + let mut tar = tar::Builder::new(Vec::new()); + + let content = input.fetch().await.unwrap(); - client.logs(name, Some(options)) + let tar_path = input.path().trim_start_matches('/'); + + // Create a header with the full path + let mut header = tar::Header::new_gnu(); + header.set_path(tar_path).unwrap(); + header.set_size(content.len() as u64); + header.set_mode(0o644); // Set appropriate permissions + header.set_cksum(); + + // Append the file to the tar archive + tar.append_data(&mut header, tar_path, Cursor::new(content)) + .unwrap(); + + let tar_contents = tar.into_inner().unwrap(); + + // Upload to the root of the container + client + .upload_to_container( + name, + Some(UploadToContainerOptions { + path: "/", + ..Default::default() + }), + tar_contents.into(), + ) + .await + .unwrap(); } -/// Configures the waiting stream for the Docker container. -fn configure_wait( +/// Execute a command in container, returning an ExecutionResult +async fn container_exec( name: &str, + execution: &Execution, client: &mut Arc, -) -> impl futures::Stream> { - client.wait_container(name, None::>) +) -> ExecutionResult { + let exec_id = client + .create_exec( + name, + CreateExecOptions { + attach_stdout: Some(true), + attach_stderr: Some(true), + cmd: Some(execution.args().into_iter().map(|s| s.as_str()).collect()), + ..Default::default() + }, + ) + .await + .unwrap() + .id; + + let log_stream = if let StartExecResults::Attached { output, .. } = + client.start_exec(&exec_id, None).await.unwrap() + { + output + } else { + unreachable!(); + }; + + // Process logs + let (stdout, stderr) = log_stream + .try_fold( + (String::with_capacity(1 << 8), String::with_capacity(1 << 8)), + |(mut stdout, mut stderr), log| async move { + match log { + LogOutput::StdOut { message } => { + stdout.push_str(&String::from_utf8_lossy(&message)); + } + LogOutput::StdErr { message } => { + stderr.push_str(&String::from_utf8_lossy(&message)); + } + _ => {} + } + Ok((stdout, stderr)) + }, + ) + .await + .unwrap_or_else(|e| { + eprintln!("Error collecting logs: {:?}", e); + (String::new(), String::new()) + }); + + // Get return code + // Get the exit code + let exec_inspect = client.inspect_exec(&exec_id).await.unwrap(); + let status = exec_inspect.exit_code.unwrap_or(-1); + + ExecutionResult { + status, + stdout, + stderr, + } } diff --git a/crankshaft/src/engine/task/input.rs b/crankshaft/src/engine/task/input.rs index a0a9a60..d95bc25 100644 --- a/crankshaft/src/engine/task/input.rs +++ b/crankshaft/src/engine/task/input.rs @@ -2,8 +2,11 @@ mod builder; +use std::path::PathBuf; + pub use builder::Builder; +use tokio::{fs::File, io::AsyncReadExt}; use url::Url; /// A type of input. @@ -26,6 +29,13 @@ pub enum Contents { Literal(String), } +impl From for Contents { + fn from(value: PathBuf) -> Self { + let url = Url::from_file_path(value).unwrap_or_else(|_| panic!("Invalid path")); + Contents::URL(url) + } +} + /// An input to a task. #[derive(Clone, Debug)] pub struct Input { @@ -46,6 +56,11 @@ pub struct Input { } impl Input { + /// Gets a new builder for an [`Input`]. + pub fn builder() -> Builder { + Builder::default() + } + /// The name of the input (if it exists). pub fn name(&self) -> Option<&str> { self.name.as_deref() @@ -70,4 +85,23 @@ impl Input { pub fn r#type(&self) -> &Type { &self.r#type } + + /// Fetch file contents + pub async fn fetch(&self) -> Result, Box> { + match &self.contents { + Contents::Literal(content) => Ok(content.as_bytes().to_vec()), + Contents::URL(url) => match url.scheme() { + "file" => { + let path = url.to_file_path().map_err(|_| "Invalid file path")?; + let mut file = File::open(path).await?; + let mut contents = Vec::new(); + file.read_to_end(&mut contents).await?; + Ok(contents) + } + "http" | "https" => todo!("HTTP(S) URL support not implemented"), + "s3" => todo!("S3 URL support not implemented"), + _ => Err("Unsupported URL scheme".into()), + }, + } + } } From 0cf3e9c477cea12a436c55c098a72a7103126896 Mon Sep 17 00:00:00 2001 From: Clay McLeod Date: Fri, 6 Sep 2024 13:51:40 -0500 Subject: [PATCH 12/16] revise: final push --- Cargo.toml | 26 +++--- crankshaft/examples/docker.rs | 8 +- crankshaft/examples/full.rs | 80 ++++++++++++++++ crankshaft/examples/lsf.rs | 30 +++--- crankshaft/examples/multi.rs | 42 --------- crankshaft/examples/tes.rs | 14 ++- crankshaft/src/engine.rs | 92 +++++++++++++------ crankshaft/src/engine/service/runner.rs | 23 ++++- .../src/engine/service/runner/backend.rs | 10 +- .../engine/service/runner/backend/docker.rs | 43 ++++----- .../engine/service/runner/backend/generic.rs | 71 +++++++------- .../src/engine/service/runner/backend/tes.rs | 86 ++++++++++------- tes/Cargo.toml | 4 +- tes/src/lib.rs | 20 +++- tes/src/task.rs | 5 +- tes/src/task/executor.rs | 2 +- 16 files changed, 357 insertions(+), 199 deletions(-) create mode 100644 crankshaft/examples/full.rs delete mode 100644 crankshaft/examples/multi.rs diff --git a/Cargo.toml b/Cargo.toml index 996f5a9..f34cbc1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,37 +4,39 @@ members = ["crankshaft", "tes", "wdl-runtime"] [workspace.dependencies] anyhow = "1.0.86" -id-arena = "2.2.1" -indexmap = "2.5.0" -ordered-float = "4.2.2" -paste = "1.0.15" -petgraph = "0.6.5" -indicatif = "0.17.8" -string-interner = "0.17.0" -wdl-analysis = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } -wdl-ast = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } -wdl-grammar = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } async-trait = "0.1.82" bollard = "0.17.1" bytes = "1.7.1" +chrono = { version = "0.4.38", features = ["serde"] } clap = { version = "4.5.16", features = ["derive"] } config = "0.14.0" dirs = "5.0.1" futures = "0.3.30" +id-arena = "2.2.1" +indexmap = "2.5.0" +indicatif = "0.17.8" nonempty = "0.10.0" +ordered-float = "4.2.2" +paste = "1.0.15" +petgraph = "0.6.5" rand = "0.8.5" random_word = { version = "0.4.3", features = ["en"] } regex = "1.10.6" reqwest = "0.12.7" +reqwest-middleware = "0.3.3" +reqwest-retry = "0.6.1" serde = { version = "1.0.209", features = ["derive"] } serde_json = "1.0.128" +serde_yaml = "0.8" # Optional, if you want YAML support +string-interner = "0.17.0" tar = "0.4.41" tempfile = "3.12.0" tes = { path = "../tes", version = "0.1.0" } -serde_yaml = "0.8" # Optional, if you want YAML support tokio = { version = "1.40.0", features = ["full", "time"] } toml = "0.8.19" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = "2.5.2" -chrono = { version = "0.4.38", features = ["serde"] } +wdl-analysis = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } +wdl-ast = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } +wdl-grammar = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } diff --git a/crankshaft/examples/docker.rs b/crankshaft/examples/docker.rs index 5611a57..b72f0b0 100644 --- a/crankshaft/examples/docker.rs +++ b/crankshaft/examples/docker.rs @@ -1,11 +1,13 @@ //! An example for runner a task using the Docker backend service. +use std::io::Write; + use crankshaft::engine::task::input; use crankshaft::engine::task::Execution; use crankshaft::engine::task::Input; use crankshaft::engine::Engine; use crankshaft::engine::Task; -use std::io::Write; use tempfile::NamedTempFile; +use tracing::info; use tracing_subscriber::fmt; use tracing_subscriber::layer::SubscriberExt as _; use tracing_subscriber::util::SubscriberInitExt as _; @@ -57,12 +59,12 @@ async fn main() { .unwrap(); let receivers = (0..10) - .map(|_| engine.submit(task.clone()).callback) + .map(|_| engine.submit("docker", task.clone()).callback) .collect::>(); engine.run().await; for rx in receivers { - println!("Reply: {:?}", rx.await.unwrap()); + info!(runner = "Docker", reply = ?rx.await.unwrap()); } } diff --git a/crankshaft/examples/full.rs b/crankshaft/examples/full.rs new file mode 100644 index 0000000..64762d4 --- /dev/null +++ b/crankshaft/examples/full.rs @@ -0,0 +1,80 @@ +//! An example for runner that uses multiple backends at the same time. + +use crankshaft::engine::config::Config; +use crankshaft::engine::service::runner::backend::config::BackendType; +use crankshaft::engine::service::runner::backend::generic::GenericBackend; +use crankshaft::engine::service::runner::backend::tes::TesBackend; +use crankshaft::engine::task::Execution; +use crankshaft::engine::Engine; +use crankshaft::engine::Task; +use tracing::info; +use tracing_subscriber::fmt; +use tracing_subscriber::layer::SubscriberExt as _; +use tracing_subscriber::util::SubscriberInitExt as _; +use tracing_subscriber::EnvFilter; + +/// The environment variable name for the token. +const TOKEN_ENV_NAME: &str = "TOKEN"; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let token = std::env::var(TOKEN_ENV_NAME).ok(); + + let url = std::env::args().nth(1).expect("no url provided"); + let config = Config::new(std::env::args().nth(2).expect("no config provided")) + .expect("could not load from config file") + .backends + .into_iter() + .find(|backend| matches!(backend.kind, BackendType::Generic(_))) + .expect("at least one generic backend config to be present in the config"); + + let mut engine = Engine::empty() + .with_docker(false) + .expect("docker daemon to be alive and reachable") + .with_backend("tes", TesBackend::new(url, token)) + .with_backend( + "lsf", + GenericBackend::try_from(config) + .expect("parsing the backend configuration") + .to_runner(), + ); + + let task = Task::builder() + .name("my-example-task") + .description("a longer description") + .extend_executions(vec![Execution::builder() + .image("ubuntu") + .args(&[String::from("echo"), String::from("'hello, world!'")]) + .stdout("stdout.txt") + .stderr("stderr.txt") + .try_build() + .unwrap()]) + .try_build() + .unwrap(); + + let mut receivers = Vec::new(); + let runners = engine.runners().map(|s| s.to_owned()).collect::>(); + + for runner in &runners { + if runner == "lsf" { + continue; + } + + info!("creating jobs within {runner}"); + + for _ in 0..10 { + receivers.push(engine.submit(runner, task.clone()).callback); + } + } + + engine.run().await; + + for rx in receivers { + info!(reply = ?rx.await.unwrap()); + } +} diff --git a/crankshaft/examples/lsf.rs b/crankshaft/examples/lsf.rs index a847ab2..464fb66 100644 --- a/crankshaft/examples/lsf.rs +++ b/crankshaft/examples/lsf.rs @@ -1,8 +1,12 @@ //! An example for runner a task using the generic LSF backend service. +use crankshaft::engine::config::Config; +use crankshaft::engine::service::runner::backend::config::BackendType; +use crankshaft::engine::service::runner::backend::generic::GenericBackend; use crankshaft::engine::task::Execution; use crankshaft::engine::Engine; use crankshaft::engine::Task; +use tracing::info; use tracing_subscriber::fmt; use tracing_subscriber::layer::SubscriberExt as _; use tracing_subscriber::util::SubscriberInitExt as _; @@ -15,21 +19,15 @@ async fn main() { .with(EnvFilter::from_default_env()) .init(); - let config = - crankshaft::engine::config::Config::new("crankshaft/test/fixtures/config/lsf.toml") - .expect("Load from example config") - .backends[0] - .clone(); + let config = Config::new(std::env::args().nth(1).expect("no config provided")) + .expect("could not load from config file") + .backends + .into_iter() + .find(|backend| matches!(backend.kind, BackendType::Generic(_))) + .expect("at least one generic backend config to be present in the config"); - let backend = - crankshaft::engine::service::runner::backend::generic::GenericBackend::from_config(config) - .expect("Get backend from config"); - - let generic_runner = - crankshaft::engine::service::runner::backend::generic::GenericRunner::new(backend); - let runner = crankshaft::engine::service::runner::Runner::new(generic_runner); - - let mut engine = Engine::with_runner(runner); + let backend = GenericBackend::try_from(config).expect("parsing the backend configuration"); + let mut engine = Engine::default().with_backend("generic", backend.to_runner()); let task = Task::builder() .name("my-example-task") @@ -46,12 +44,12 @@ async fn main() { .unwrap(); let receivers = (0..1000) - .map(|_| engine.submit(task.clone()).callback) + .map(|_| engine.submit("generic", task.clone()).callback) .collect::>(); engine.run().await; for rx in receivers { - println!("Reply: {:?}", rx.await.unwrap()); + info!(runner = "LSF", reply = ?rx.await.unwrap()); } } diff --git a/crankshaft/examples/multi.rs b/crankshaft/examples/multi.rs deleted file mode 100644 index c8b339a..0000000 --- a/crankshaft/examples/multi.rs +++ /dev/null @@ -1,42 +0,0 @@ -//! An example for runner a task using multiple backend services. - -use crankshaft::engine::task::Execution; -use crankshaft::engine::Engine; -use crankshaft::engine::Task; -use tracing_subscriber::fmt; -use tracing_subscriber::layer::SubscriberExt as _; -use tracing_subscriber::util::SubscriberInitExt as _; -use tracing_subscriber::EnvFilter; - -#[tokio::main] -async fn main() { - tracing_subscriber::registry() - .with(fmt::layer()) - .with(EnvFilter::from_default_env()) - .init(); - - let mut engine = Engine::with_default_tes(); - - let task = Task::builder() - .name("my-example-task") - .description("a longer description") - .extend_executions(vec![Execution::builder() - .image("ubuntu") - .args(&[String::from("echo"), String::from("'hello, world!'")]) - .stdout("stdout.txt") - .stderr("stderr.txt") - .try_build() - .unwrap()]) - .try_build() - .unwrap(); - - let receivers = (0..10) - .map(|_| engine.submit(task.clone()).callback) - .collect::>(); - - engine.run().await; - - for rx in receivers { - println!("Reply: {:?}", rx.await.unwrap()); - } -} diff --git a/crankshaft/examples/tes.rs b/crankshaft/examples/tes.rs index e3d26f6..76a2700 100644 --- a/crankshaft/examples/tes.rs +++ b/crankshaft/examples/tes.rs @@ -1,13 +1,18 @@ //! An example for runner a task using the TES backend service. +use crankshaft::engine::service::runner::backend::tes::TesBackend; use crankshaft::engine::task::Execution; use crankshaft::engine::Engine; use crankshaft::engine::Task; +use tracing::info; use tracing_subscriber::fmt; use tracing_subscriber::layer::SubscriberExt as _; use tracing_subscriber::util::SubscriberInitExt as _; use tracing_subscriber::EnvFilter; +/// The environment variable name for the token. +const TOKEN_ENV_NAME: &str = "TOKEN"; + #[tokio::main] async fn main() { tracing_subscriber::registry() @@ -15,7 +20,10 @@ async fn main() { .with(EnvFilter::from_default_env()) .init(); - let mut engine = Engine::with_default_tes(); + let token = std::env::var(TOKEN_ENV_NAME).unwrap(); + + let url = std::env::args().nth(1).expect("no url provided"); + let mut engine = Engine::new_with_backend("tes", TesBackend::new(url, Some(token))); let task = Task::builder() .name("my-example-task") @@ -31,12 +39,12 @@ async fn main() { .unwrap(); let receivers = (0..10) - .map(|_| engine.submit(task.clone()).callback) + .map(|_| engine.submit("tes", task.clone()).callback) .collect::>(); engine.run().await; for rx in receivers { - println!("Reply: {:?}", rx.await.unwrap()); + info!(runner = "TES", reply = ?rx.await.unwrap()); } } diff --git a/crankshaft/src/engine.rs b/crankshaft/src/engine.rs index 2567043..64811ea 100644 --- a/crankshaft/src/engine.rs +++ b/crankshaft/src/engine.rs @@ -1,58 +1,98 @@ //! Engine. -pub mod config; -pub mod service; -pub mod task; - use std::time::Duration; +use futures::stream::FuturesUnordered; use futures::StreamExt; +use indexmap::IndexMap; use indicatif::ProgressBar; use indicatif::ProgressStyle; -pub use task::Task; -use tracing::debug; use crate::engine::service::runner::backend::docker; -use crate::engine::service::runner::backend::tes; +use crate::engine::service::runner::backend::docker::DockerBackend; +use crate::engine::service::runner::backend::Backend; use crate::engine::service::runner::Handle; use crate::engine::service::runner::Runner; +pub mod config; +pub mod service; +pub mod task; + +pub use task::Task; + +/// The runners stored within the engine. +type Runners = IndexMap; + /// An engine. #[derive(Debug)] pub struct Engine { - /// The task runner. - runner: Runner, + /// The task runner(s). + runners: Runners, } impl Engine { - /// Gets an engine with a generic [`Runner`]. - pub fn with_runner(runner: Runner) -> Self { - Self { runner } + /// Creates an empty engine. + pub fn empty() -> Self { + Self { + runners: Default::default(), + } + } + + /// Adds a [`Backend`] to the engine. + pub fn with_backend(mut self, name: impl Into, backend: impl Backend) -> Self { + let name = name.into(); + self.runners + .insert(name.clone(), Runner::new(name, backend)); + self + } + + /// Gets a new engine with a backend. + pub fn new_with_backend(name: impl Into, backend: impl Backend) -> Self { + Self::empty().with_backend(name, backend) + } + + /// Adds a docker backend to a [`Engine`]. + pub fn with_docker(self, cleanup: bool) -> docker::Result { + let backend = DockerBackend::try_new(cleanup)?; + Ok(self.with_backend(backend.default_name(), backend)) } - /// Gets an engine with a Docker backend. - pub fn with_docker() -> docker::Result { - let docker = docker::Runner::try_new()?; - Ok(Self::with_runner(Runner::new(docker))) + /// Gets a new engine with a default Docker backend. + pub fn new_with_docker(cleanup: bool) -> docker::Result { + Ok(Self::empty() + .with_docker(cleanup) + .expect("docker client to connect")) } - /// Gets an engine with a default TES backend. - pub fn with_default_tes() -> Self { - Self::with_runner(Runner::new(tes::Tes::default())) + /// Gets the names of the runners. + pub fn runners(&self) -> impl Iterator { + self.runners.keys().map(|key| key.as_ref()) } /// Submits a [`Task`] to be executed. /// /// A [`Handle`] is returned, which contains a channel that can be awaited /// for the result of the job. - pub fn submit(&mut self, task: Task) -> Handle { - debug!(task = ?task); - self.runner.submit(task) + pub fn submit(&mut self, name: impl AsRef, task: Task) -> Handle { + let name = name.as_ref(); + + let backend = self + .runners + .get(name) + .unwrap_or_else(|| panic!("backend not found: {name}")); + + backend.submit(task) } /// Runs all of the tasks scheduled in the engine. - pub async fn run(&mut self) { - let task_completion_bar = ProgressBar::new(self.runner.tasks.len() as u64); + pub async fn run(self) { + let mut futures = FuturesUnordered::new(); + + for (_, runner) in self.runners { + futures.extend(runner.tasks()); + } + + let task_completion_bar = ProgressBar::new(futures.len() as u64); task_completion_bar.set_style( ProgressStyle::with_template( "{spinner:.cyan/blue} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos:>7}/{len:7} {msg}", @@ -65,7 +105,7 @@ impl Engine { task_completion_bar.inc(0); task_completion_bar.enable_steady_tick(Duration::from_millis(100)); - while let Some(()) = self.runner.tasks.next().await { + while let Some(()) = futures.next().await { task_completion_bar.set_message(format!("task #{}", count)); task_completion_bar.inc(1); count += 1; @@ -77,6 +117,6 @@ impl Engine { impl Default for Engine { fn default() -> Self { - Self::with_docker().expect("could not initialize engine") + Self::empty().with_docker(true).unwrap() } } diff --git a/crankshaft/src/engine/service/runner.rs b/crankshaft/src/engine/service/runner.rs index 6af2172..9defbcb 100644 --- a/crankshaft/src/engine/service/runner.rs +++ b/crankshaft/src/engine/service/runner.rs @@ -1,8 +1,10 @@ //! Task runner services. +use futures::future::join_all; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use tokio::sync::oneshot::Receiver; +use tracing::trace; use crate::engine::service::runner::backend::Backend; use crate::engine::service::runner::backend::Reply; @@ -20,6 +22,9 @@ pub struct Handle { /// A generic task runner. #[derive(Debug)] pub struct Runner { + /// The name of the backend. + name: String, + /// The task runner itself. backend: Box, @@ -29,8 +34,9 @@ pub struct Runner { impl Runner { /// Creates a new [`Runner`]. - pub fn new(backend: impl Backend) -> Self { + pub fn new(name: String, backend: impl Backend) -> Self { Self { + name, backend: Box::new(backend), tasks: Default::default(), } @@ -38,9 +44,22 @@ impl Runner { /// Submits a task to be executed by the backend. pub fn submit(&self, task: Task) -> Handle { + trace!(backend = ?self.backend, task = ?task); + let (tx, rx) = tokio::sync::oneshot::channel(); - self.tasks.push(Box::pin(self.backend.run(task, tx))); + self.tasks + .push(Box::pin(self.backend.run(self.name.clone(), task, tx))); Handle { callback: rx } } + + /// Gets the tasks from the runner. + pub fn tasks(self) -> impl Iterator> { + self.tasks.into_iter() + } + + /// Runs all of the tasks scheduled in the [`Runner`]. + pub async fn run(self) { + join_all(self.tasks).await; + } } diff --git a/crankshaft/src/engine/service/runner/backend.rs b/crankshaft/src/engine/service/runner/backend.rs index 1182469..09b6aee 100644 --- a/crankshaft/src/engine/service/runner/backend.rs +++ b/crankshaft/src/engine/service/runner/backend.rs @@ -24,7 +24,7 @@ pub type Result = std::result::Result; #[derive(Debug)] pub struct ExecutionResult { /// The exit code. - pub status: i64, + pub status: u64, /// The contents of standard out. pub stdout: String, @@ -36,6 +36,9 @@ pub struct ExecutionResult { /// A reply from a backend when a task is completed. #[derive(Debug)] pub struct Reply { + /// The name of the backend that ran this. + pub backend: String, + /// The results from each execution. pub executions: Option>, } @@ -43,6 +46,9 @@ pub struct Reply { /// An execution backend. #[async_trait] pub trait Backend: Debug + Send + 'static { + /// Gets the default name for the backend. + fn default_name(&self) -> &'static str; + /// Runs a task in a backend; - fn run(&self, task: Task, cb: Sender) -> BoxFuture<'static, ()>; + fn run(&self, name: String, task: Task, cb: Sender) -> BoxFuture<'static, ()>; } diff --git a/crankshaft/src/engine/service/runner/backend/docker.rs b/crankshaft/src/engine/service/runner/backend/docker.rs index 5f3e3fb..66f8ffb 100644 --- a/crankshaft/src/engine/service/runner/backend/docker.rs +++ b/crankshaft/src/engine/service/runner/backend/docker.rs @@ -8,7 +8,6 @@ use async_trait::async_trait; use bollard::container::Config; use bollard::container::CreateContainerOptions; use bollard::container::LogOutput; -use bollard::container::RemoveContainerOptions; use bollard::container::StartContainerOptions; use bollard::container::UploadToContainerOptions; use bollard::errors::Error; @@ -49,25 +48,36 @@ pub type Result = std::result::Result; /// A local execution backend. #[derive(Debug)] -pub struct Runner { +pub struct DockerBackend { /// A handle to the inner docker client. client: Arc, + + /// Whether or not to clean up containers. + cleanup: bool, } -impl Runner { +impl DockerBackend { /// Attempts to create a new [`Docker`]. /// /// Note that, currently, we connect [using defaults](Docker::connect_with_defaults). - pub fn try_new() -> Result { + pub fn try_new(cleanup: bool) -> Result { let inner = Docker::connect_with_defaults().map(Arc::new)?; - Ok(Self { client: inner }) + Ok(Self { + client: inner, + cleanup, + }) } } #[async_trait] -impl Backend for Runner { - fn run(&self, task: Task, cb: Sender) -> BoxFuture<'static, ()> { +impl Backend for DockerBackend { + fn default_name(&self) -> &'static str { + "docker" + } + + fn run(&self, name: String, task: Task, cb: Sender) -> BoxFuture<'static, ()> { let mut client = self.client.clone(); + let cleanup = self.cleanup; async move { let mut results: Option> = None; @@ -102,19 +112,9 @@ impl Backend for Runner { // Run a command let exec_result = container_exec(&name, execution, &mut client).await; - // Export outputs - - // remove the container - client - .remove_container( - &name, - Some(RemoveContainerOptions { - force: true, - ..Default::default() - }), - ) - .await - .unwrap(); + if cleanup { + client.remove_container(&name, None).await.unwrap(); + } results = match results { Some(mut results) => { @@ -130,6 +130,7 @@ impl Backend for Runner { // client wasn't interested in the response, so we don't care about // this error. let _ = cb.send(Reply { + backend: name, executions: Some(results.expect("at least one execution to be run")), }); } @@ -272,7 +273,7 @@ async fn container_exec( // Get return code // Get the exit code let exec_inspect = client.inspect_exec(&exec_id).await.unwrap(); - let status = exec_inspect.exit_code.unwrap_or(-1); + let status = exec_inspect.exit_code.unwrap_or(-1) as u64; ExecutionResult { status, diff --git a/crankshaft/src/engine/service/runner/backend/generic.rs b/crankshaft/src/engine/service/runner/backend/generic.rs index 08716b4..303e96f 100644 --- a/crankshaft/src/engine/service/runner/backend/generic.rs +++ b/crankshaft/src/engine/service/runner/backend/generic.rs @@ -8,15 +8,15 @@ use nonempty::NonEmpty; use regex; use tokio::sync::oneshot::Sender; -use crate::engine::service::runner::backend; use crate::engine::service::runner::backend::config::substitute_placeholders; use crate::engine::service::runner::backend::config::BackendType; use crate::engine::service::runner::backend::Backend; +use crate::engine::service::runner::backend::Config; use crate::engine::service::runner::backend::ExecutionResult; use crate::engine::service::runner::backend::Reply; use crate::engine::Task; -/// A generic backend +/// A generic backend. #[derive(Debug)] pub struct GenericBackend { /// All runtime attributes @@ -41,25 +41,6 @@ pub struct GenericBackend { } impl GenericBackend { - /// Generates a generic backend from its designated configuration - pub fn from_config(conf: backend::Config) -> Option { - // If BackendConfig is not of type generic, return None(Or Err) - if let BackendType::Generic(generic_backend) = conf.kind { - Some(Self { - runtime_attributes: conf.runtime_attrs, - default_cpu: conf.default_cpu, - default_ram_mb: conf.default_ram, - submit: generic_backend.submit, - job_id_regex: generic_backend.job_id_regex, - monitor: generic_backend.monitor, - monitor_frequency: generic_backend.monitor_frequency, - kill: generic_backend.kill, - }) - } else { - None - } - } - /// Generates a process result from an incoming task pub async fn process_command( &self, @@ -120,33 +101,49 @@ impl GenericBackend { // TODO: collect job output. In meantime, just return the status code // and the stdout/stderr of the submit command Some(ExecutionResult { - status: submit_output.status.code()? as i64, + status: submit_output.status.code()? as u64, stdout: submit_stdout, stderr: String::from_utf8(submit_output.stderr).ok()?, }) } /// Wraps the GenericBackend in an Arc and returns the GenericRunner from it - pub fn to_runner(self) -> GenericRunner { - GenericRunner { + pub fn to_runner(self) -> Runner { + Runner { client: Arc::new(self), } } +} + +impl TryFrom for GenericBackend { + type Error = (); - /// Generates a generic backend from a config file path - pub fn from_config_file() { - todo!() + fn try_from(value: Config) -> Result { + if let BackendType::Generic(generic_backend) = value.kind { + Ok(Self { + runtime_attributes: value.runtime_attrs, + default_cpu: value.default_cpu, + default_ram_mb: value.default_ram, + submit: generic_backend.submit, + job_id_regex: generic_backend.job_id_regex, + monitor: generic_backend.monitor, + monitor_frequency: generic_backend.monitor_frequency, + kill: generic_backend.kill, + }) + } else { + Err(()) + } } } /// A generic backend runner #[derive(Debug)] -pub struct GenericRunner { - /// An Arc to the underlying Backend +pub struct Runner { + /// The underlying backend. client: Arc, } -impl GenericRunner { +impl Runner { /// Creates a new GenericRunner from a GenericBackend pub fn new(client: GenericBackend) -> Self { Self { @@ -156,8 +153,17 @@ impl GenericRunner { } #[async_trait] -impl Backend for GenericRunner { - fn run(&self, task: Task, cb: Sender) -> futures::future::BoxFuture<'static, ()> { +impl Backend for Runner { + fn default_name(&self) -> &'static str { + unimplemented!("you must provide a backend name for a generic runner!") + } + + fn run( + &self, + name: String, + task: Task, + cb: Sender, + ) -> futures::future::BoxFuture<'static, ()> { let client = self.client.clone(); async move { @@ -202,6 +208,7 @@ impl Backend for GenericRunner { } let _ = cb.send(Reply { + backend: name, executions: Some(results.expect("at least one execution to be run")), }); } diff --git a/crankshaft/src/engine/service/runner/backend/tes.rs b/crankshaft/src/engine/service/runner/backend/tes.rs index cd9205a..5508f46 100644 --- a/crankshaft/src/engine/service/runner/backend/tes.rs +++ b/crankshaft/src/engine/service/runner/backend/tes.rs @@ -6,11 +6,13 @@ use std::time::Duration; use async_trait::async_trait; use futures::future::BoxFuture; use futures::FutureExt as _; +use nonempty::NonEmpty; use reqwest::header; use tes::Client; use tokio::sync::oneshot::Sender; use crate::engine::service::runner::backend::Backend; +use crate::engine::service::runner::backend::ExecutionResult; use crate::engine::service::runner::backend::Reply; use crate::engine::Task; use crate::BoxedError; @@ -26,51 +28,53 @@ pub type Result = std::result::Result; /// A local execution backend. #[derive(Debug)] -pub struct Tes { +pub struct TesBackend { /// A handle to the inner TES client. client: Arc, } -impl Tes { - /// Attempts to create a new [`Tes`]. - /// - /// Note that, currently, we connect [using defaults](Docker::connect_with_defaults). - pub fn try_new(url: impl Into) -> Result { +impl TesBackend { + /// Creates a new [`TesBackend`]. + pub fn new(url: impl Into, token: Option>) -> Self { let url = url.into(); let mut headers = header::HeaderMap::new(); - headers.insert( - "X-Pinggy-No-Screen", - header::HeaderValue::from_static("value"), - ); + + if let Some(token) = token { + headers.insert( + "Authorization", + header::HeaderValue::from_str(&format!("Basic {}", token.into())).unwrap(), + ); + } let inner = Client::new(&url, headers).unwrap(); - Ok(Self { + Self { client: Arc::new(inner), - }) + } } } -impl Default for Tes { - fn default() -> Self { - Self::try_new("http://localhost:8080/ga4gh/tes/v1/").unwrap() +#[async_trait] +impl Backend for TesBackend { + fn default_name(&self) -> &'static str { + unimplemented!("you must provide a backend name for a TES runner!") } -} -#[async_trait] -impl Backend for Tes { - fn run(&self, _: Task, cb: Sender) -> BoxFuture<'static, ()> { + fn run(&self, name: String, task: Task, cb: Sender) -> BoxFuture<'static, ()> { let client = self.client.clone(); let task = tes::Task { - name: Some("Hello World".to_string()), - description: Some("Hello World, inspired by Funnel's most basic example".to_string()), - executors: vec![tes::task::Executor { - image: "alpine".to_string(), - command: vec!["echo".to_string(), "TESK says: Hello World".to_string()], - ..Default::default() - }], + name: task.name().map(|v| v.to_owned()), + description: task.description().map(|v| v.to_owned()), + executors: task + .executions() + .map(|execution| tes::task::Executor { + image: execution.image().to_owned(), + command: execution.args().into_iter().cloned().collect::>(), + ..Default::default() + }) + .collect::>(), ..Default::default() }; @@ -78,20 +82,36 @@ impl Backend for Tes { let task_id = client.create_task(task).await.unwrap(); loop { - if let Ok(result) = client.get_task(&task_id).await { - if let Some(ref state) = result.state { + if let Ok(task) = client.get_task(&task_id).await { + if let Some(ref state) = task.state { if !state.is_executing() { - break; + let mut results = task + .logs + .unwrap() + .into_iter() + .flat_map(|task| task.logs) + .map(|log| ExecutionResult { + status: log.exit_code.unwrap_or_default() as u64, + stdout: log.stdout.unwrap_or_default(), + stderr: log.stderr.unwrap_or_default(), + }); + + let mut executions = NonEmpty::new(results.next().unwrap()); + executions.extend(results); + + let reply = Reply { + backend: name, + executions: Some(executions), + }; + + let _ = cb.send(reply); + return; } tokio::time::sleep(Duration::from_millis(200)).await; } } } - - let reply = Reply { executions: None }; - - let _ = cb.send(reply); } .boxed() } diff --git a/tes/Cargo.toml b/tes/Cargo.toml index 1877715..792940a 100644 --- a/tes/Cargo.toml +++ b/tes/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] chrono = { workspace = true } reqwest = { workspace = true } +reqwest-middleware = { workspace = true } +reqwest-retry = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -tokio ={ workspace = true } +tokio = { workspace = true } diff --git a/tes/src/lib.rs b/tes/src/lib.rs index 197eac9..feab577 100644 --- a/tes/src/lib.rs +++ b/tes/src/lib.rs @@ -2,8 +2,9 @@ use reqwest::header; use reqwest::header::HeaderMap; -use reqwest::Error; use reqwest::StatusCode; +use reqwest_middleware::ClientWithMiddleware; +use reqwest_middleware::Error; pub mod responses; pub mod task; @@ -20,7 +21,7 @@ pub struct Client { url: String, /// The client. - client: reqwest::Client, + client: ClientWithMiddleware, } impl Client { @@ -29,10 +30,18 @@ impl Client { let url = url.into(); let headers = headers.into(); - let client = reqwest::Client::builder() + let client = reqwest::ClientBuilder::new() .default_headers(headers) .build()?; + let retry_policy = + reqwest_retry::policies::ExponentialBackoff::builder().build_with_max_retries(3); + let client = reqwest_middleware::ClientBuilder::new(client) + .with(reqwest_retry::RetryTransientMiddleware::new_with_policy( + retry_policy, + )) + .build(); + Ok(Self { url, client }) } @@ -70,6 +79,7 @@ impl Client { .await?; let text = &res.text().await?; + Ok(serde_json::from_str::(text) .unwrap() .id) @@ -79,7 +89,9 @@ impl Client { pub async fn get_task(&self, id: &str) -> Result { let url = format!("{}tasks/{}?view=FULL", self.url, id); let res = self.client.get(&url).send().await?; - let task: Task = serde_json::from_str(&res.text().await?).unwrap(); + let text = &res.text().await?; + let task: Task = serde_json::from_str(text).unwrap(); + Ok(task) } } diff --git a/tes/src/task.rs b/tes/src/task.rs index fac7fe0..203c7ba 100644 --- a/tes/src/task.rs +++ b/tes/src/task.rs @@ -149,7 +149,7 @@ pub struct TaskLog { pub end_time: Option>, /// The output file logs. - pub outputs: Vec, + pub outputs: Option>, /// The system logs. pub system_logs: Option, @@ -187,4 +187,7 @@ pub struct Task { /// The tags. pub tags: Option>, + + /// The logs. + pub logs: Option>, } diff --git a/tes/src/task/executor.rs b/tes/src/task/executor.rs index 7019fdb..4ebfb07 100644 --- a/tes/src/task/executor.rs +++ b/tes/src/task/executor.rs @@ -50,5 +50,5 @@ pub struct Log { pub stderr: Option, /// The exit code. - pub exit_code: u32, + pub exit_code: Option, } From 515b01f8e3259e92e5c2fcbc5d17bc8d6f3269d5 Mon Sep 17 00:00:00 2001 From: Braden Everson Date: Fri, 6 Sep 2024 13:53:18 -0500 Subject: [PATCH 13/16] Remove Progress Bar on All Tasks Completing (#22) Remove progress bar on task run completion --- crankshaft/src/engine.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crankshaft/src/engine.rs b/crankshaft/src/engine.rs index 64811ea..4e12564 100644 --- a/crankshaft/src/engine.rs +++ b/crankshaft/src/engine.rs @@ -110,8 +110,6 @@ impl Engine { task_completion_bar.inc(1); count += 1; } - - task_completion_bar.finish_with_message("All jobs complete."); } } From 829261ecb89977e8f4a5bba7628c217145c2dc31 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Fri, 6 Sep 2024 14:52:45 -0500 Subject: [PATCH 14/16] feat: implement more of the `sprocket run` command. (#24) --- Cargo.toml | 6 +- crankshaft/Cargo.toml | 3 + crankshaft/demo.json | 3 + crankshaft/demo.wdl | 21 ++ crankshaft/src/bin/sprocket.rs | 323 +++++++++++++++++- .../engine/service/runner/backend/docker.rs | 5 + wdl-runtime/src/runtime.rs | 37 ++ wdl-runtime/src/task.rs | 65 +++- 8 files changed, 437 insertions(+), 26 deletions(-) create mode 100644 crankshaft/demo.json create mode 100644 crankshaft/demo.wdl diff --git a/Cargo.toml b/Cargo.toml index f34cbc1..1b8d6df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,5 +38,7 @@ tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = "2.5.2" wdl-analysis = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } -wdl-ast = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } -wdl-grammar = { git = "https://github.com/peterhuene/wdl", branch = "hackathon" } +wdl-ast = { git = "https://github.com/peterhuene/wdl", branch = "hackathon", features = ["codespan"] } +wdl-grammar = { git = "https://github.com/peterhuene/wdl", branch = "hackathon", features = ["codespan"] } +codespan-reporting = "0.11.1" +colored = "2.1.0" diff --git a/crankshaft/Cargo.toml b/crankshaft/Cargo.toml index c9a04ea..8915f17 100644 --- a/crankshaft/Cargo.toml +++ b/crankshaft/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] indicatif = { workspace = true } tes = { path = "../tes", version = "0.1.0" } +wdl-runtime = { path = "../wdl-runtime", version = "0.1.0" } async-trait = { workspace = true } bollard = { workspace = true } bytes = { workspace = true } @@ -34,6 +35,8 @@ wdl-ast = { workspace = true } wdl-analysis = { workspace = true } anyhow = { workspace = true } tar.workspace = true +codespan-reporting = { workspace = true } +colored = { workspace = true } [lints.rust] missing_docs = "warn" diff --git a/crankshaft/demo.json b/crankshaft/demo.json new file mode 100644 index 0000000..bd9a9d5 --- /dev/null +++ b/crankshaft/demo.json @@ -0,0 +1,3 @@ +{ + "url": "https://www.encodeproject.org/files/ENCFF863PGO/@@download/ENCFF863PGO.bam" +} diff --git a/crankshaft/demo.wdl b/crankshaft/demo.wdl new file mode 100644 index 0000000..20af475 --- /dev/null +++ b/crankshaft/demo.wdl @@ -0,0 +1,21 @@ +## This is a demonstration of evaluating WDL and running a task using crankshaft. + +version 1.2 + +task samtools_flagstat { + input { + String url + } + + command <<< + samtools flagstat <(wget -O - -q '~{url}' | samtools view -h | head -n 100000) + >>> + + requirements { + container: "quay.io/biocontainers/samtools:1.19.2--h50ea8bc_0" + } + + output { + String stats = read_string(stdout()) + } +} diff --git a/crankshaft/src/bin/sprocket.rs b/crankshaft/src/bin/sprocket.rs index 1a26c56..1d069cf 100644 --- a/crankshaft/src/bin/sprocket.rs +++ b/crankshaft/src/bin/sprocket.rs @@ -1,44 +1,333 @@ //! A testing implementation for a `sprocket run` command. -use anyhow::Result; +use anyhow::{anyhow, bail, Context, Result}; use clap::{Arg, Command}; -use std::path::PathBuf; -use wdl_analysis::Analyzer; +use codespan_reporting::{ + files::SimpleFile, + term::{ + emit, + termcolor::{ColorChoice, StandardStream}, + Config, + }, +}; +use colored::Colorize; +use crankshaft::engine::{ + task::{ + input::{self, Contents}, + Execution, Input, + }, + Engine, Task, +}; +use std::{borrow::Cow, collections::HashMap, fs, io::IsTerminal, path::PathBuf}; +use tempfile::tempdir; +use wdl_analysis::{AnalysisResult, Analyzer}; +use wdl_ast::{AstToken, Diagnostic, Severity, SyntaxNode}; +use wdl_runtime::{Runtime, TaskEvaluator, Value}; + +/// Emits the given diagnostics to the output stream. +/// +/// The use of color is determined by the presence of a terminal. +/// +/// In the future, we might want the color choice to be a CLI argument. +fn emit_diagnostics(path: &str, source: &str, diagnostics: &[Diagnostic]) -> Result<()> { + let file = SimpleFile::new(path, source); + let mut stream = StandardStream::stdout(if std::io::stdout().is_terminal() { + ColorChoice::Auto + } else { + ColorChoice::Never + }); + for diagnostic in diagnostics.iter() { + emit( + &mut stream, + &Config::default(), + &file, + &diagnostic.to_codespan(), + ) + .context("failed to emit diagnostic")?; + } + + Ok(()) +} #[tokio::main] -async fn main() -> Result<()> { +async fn main() { + if let Err(e) = inner_main().await { + eprintln!( + "{error}: {e:?}", + error = if std::io::stderr().is_terminal() { + "error".red().bold() + } else { + "error".normal() + } + ); + std::process::exit(1); + } +} + +/// An inner main that returns result. +/// +/// This exists so we can do custom error handling instead of returning `Result` from `main`. +async fn inner_main() -> Result<()> { let matches = Command::new("sprocket") .version("1.0") .about("Runs a WDL task") .subcommand( - Command::new("run").about("Runs a WDL task").arg( - Arg::new("PATH") - .help("The path to the WDL file defining the task to run") - .required(true), - ), + Command::new("run") + .about("Runs a WDL task") + .arg( + Arg::new("PATH") + .help("The path to the WDL file defining the task to run") + .required(true), + ) + .arg( + Arg::new("TASK") + .long("task") + .help("The name of the task to run") + .required(true), + ) + .arg( + Arg::new("INPUTS") + .long("inputs") + .help("The inputs JSON file"), + ), ) .arg_required_else_help(true) .get_matches(); if let Some(matches) = matches.subcommand_matches("run") { let task_file = matches.get_one::("PATH").unwrap(); - analyze_wdl(PathBuf::from(task_file)).await?; + let task_name = matches.get_one::("TASK").unwrap(); + let inputs_file = matches.get_one::("INPUTS"); + let result = analyze_wdl(PathBuf::from(task_file)).await?; + + let document = result + .parse_result() + .document() + .expect("should have a parsed document"); + + match document.ast() { + wdl_ast::Ast::Unsupported => { + panic!("should not have parsed an unsupported document without error") + } + wdl_ast::Ast::V1(ast) => { + let task = ast + .tasks() + .find(|t| t.name().as_str() == task_name) + .ok_or_else(|| { + anyhow!("document does not contain a task named `{task_name}`") + })?; + let mut runtime = Runtime::new(result.scope()); + let evaluator = TaskEvaluator::new(task); + + let inputs = if let Some(inputs_file) = inputs_file { + read_inputs(&mut runtime, inputs_file)? + } else { + Default::default() + }; + + match evaluator.evaluate(&mut runtime, &inputs, "/tmp") { + Ok(evaluated) => { + let container = match evaluated + .requirements() + .get("container") + .or_else(|| evaluated.requirements().get("docker")) + { + Some(container) => container.unwrap_string(&runtime), + None => { + bail!("task `{task_name}` is missing a `container` requirement"); + } + }; + + let input = Input::builder() + .contents(Contents::Literal(evaluated.command().to_string())) + .path("/exec/command") + .r#type(input::Type::File) + .try_build() + .unwrap(); + + let mut engine = Engine::default(); + let task = Task::builder() + .name(task_name) + .extend_inputs([input]) + .extend_executions([Execution::builder() + .image(container) + .args(["bash", "-C", "/exec/command"]) + .stdout("stdout.txt") + .stderr("stderr.txt") + .try_build() + .context("failed to build execution definition")?]) + .try_build() + .context("failed to build task definition")?; + + let receivers = (0..1) + .map(|_| engine.submit("docker", task.clone()).callback) + .collect::>(); + + engine.run().await; + + for rx in receivers { + let reply = rx.await.expect("failed to receive reply"); + let exec_result = + &reply.executions.expect("should have execution result")[0]; + if exec_result.status != 0 { + bail!( + "task failed with exit code {status}:\n{stderr}", + status = exec_result.status, + stderr = exec_result.stderr + ); + } + + let dir = tempdir().context("failed to create temp directory")?; + let stdout = dir.path().join("stdout"); + fs::write(&stdout, &exec_result.stdout).with_context(|| { + format!( + "failed to write stdout to `{stdout}`", + stdout = stdout.display() + ) + })?; + + let stderr = dir.path().join("stderr"); + fs::write(&stderr, &exec_result.stderr).with_context(|| { + format!( + "failed to write stderr to `{stderr}`", + stderr = stderr.display() + ) + })?; + + match evaluated.outputs(&mut runtime, stdout, stderr) { + Ok(outputs) => { + for (name, value) in outputs { + println!( + "Output `{name}`:\n{value}", + name = name.as_ref().as_str(), + value = value.display(&runtime) + ); + } + } + Err(diagnostic) => { + emit_diagnostics( + task_file, + &result + .parse_result() + .root() + .map(|n| { + SyntaxNode::new_root(n.clone()).text().to_string() + }) + .unwrap_or(String::new()), + &[diagnostic], + )?; + + bail!("aborting due to evaluation error"); + } + } + } + } + Err(diagnostic) => { + emit_diagnostics( + task_file, + &result + .parse_result() + .root() + .map(|n| SyntaxNode::new_root(n.clone()).text().to_string()) + .unwrap_or(String::new()), + &[diagnostic], + )?; + + bail!("aborting due to evaluation error"); + } + } + } + } } Ok(()) } +/// Reads task inputs from a given JSON file. +fn read_inputs(runtime: &mut Runtime<'_>, inputs_file: &str) -> Result> { + let contents = &fs::read_to_string(inputs_file) + .with_context(|| format!("failed to read inputs file `{inputs_file}`"))?; + let inputs: serde_json::Value = serde_json::from_str(contents) + .with_context(|| format!("failed to deserialize JSON inputs file `{inputs_file}`"))?; + let object = inputs + .as_object() + .with_context(|| format!("inputs file `{inputs_file}` is not a JSON object"))?; + + let mut inputs = HashMap::new(); + for (name, value) in object.iter() { + let value = match value { + serde_json::Value::Bool(v) => (*v).into(), + serde_json::Value::Number(v) => v + .as_i64() + .with_context(|| { + format!("input value `{name}` cannot be represented as a 64-bit signed integer") + })? + .into(), + serde_json::Value::String(s) => runtime.new_string(s), + _ => bail!("input value `{name}` has an unsupported type"), + }; + + inputs.insert(name.clone(), value); + } + + Ok(inputs) +} + /// Analyzes the given WDL document. -async fn analyze_wdl(wdl_path: PathBuf) -> Result<()> { +async fn analyze_wdl(wdl_path: PathBuf) -> Result { let analyzer = Analyzer::new(|_: (), _, _, _| async {}); - analyzer.add_documents(vec![wdl_path]).await?; - let results = analyzer.analyze(()).await?; + analyzer.add_documents(vec![wdl_path.clone()]).await?; + let mut results = analyzer.analyze(()).await?; - for result in results { - for diagnostic in result.diagnostics() { - println!("{:?}: {}", diagnostic.severity(), diagnostic.message()); + let mut result_index = None; + let mut error_count = 0; + let cwd = std::env::current_dir().ok(); + for (index, result) in results.iter().enumerate() { + let path = result.uri().to_file_path().ok(); + + // Attempt to strip the CWD from the result path + let path = match (&cwd, &path) { + // Use the id itself if there is no path + (_, None) => result.uri().as_str().into(), + // Use just the path if there's no CWD + (None, Some(path)) => path.to_string_lossy(), + // Strip the CWD from the path + (Some(cwd), Some(path)) => path.strip_prefix(cwd).unwrap_or(path).to_string_lossy(), + }; + + if path == wdl_path.to_string_lossy() { + result_index = Some(index); + } + + let diagnostics: Cow<'_, [Diagnostic]> = match result.parse_result().error() { + Some(e) => vec![Diagnostic::error(format!("failed to read `{path}`: {e:#}"))].into(), + None => result.diagnostics().into(), + }; + + if !diagnostics.is_empty() { + emit_diagnostics( + &path, + &result + .parse_result() + .root() + .map(|n| SyntaxNode::new_root(n.clone()).text().to_string()) + .unwrap_or(String::new()), + &diagnostics, + )?; + + error_count += diagnostics + .iter() + .filter(|d| d.severity() == Severity::Error) + .count(); } } - Ok(()) + if error_count > 0 { + bail!( + "aborting due to previous {error_count} error{s}", + s = if error_count == 1 { "" } else { "s" } + ); + } + + Ok(results.swap_remove(result_index.expect("should have seen result for requested file"))) } diff --git a/crankshaft/src/engine/service/runner/backend/docker.rs b/crankshaft/src/engine/service/runner/backend/docker.rs index 66f8ffb..1f1e162 100644 --- a/crankshaft/src/engine/service/runner/backend/docker.rs +++ b/crankshaft/src/engine/service/runner/backend/docker.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use async_trait::async_trait; use bollard::container::Config; use bollard::container::CreateContainerOptions; +use bollard::container::KillContainerOptions; use bollard::container::LogOutput; use bollard::container::StartContainerOptions; use bollard::container::UploadToContainerOptions; @@ -113,6 +114,10 @@ impl Backend for DockerBackend { let exec_result = container_exec(&name, execution, &mut client).await; if cleanup { + client + .kill_container(&name, None::>) + .await + .unwrap(); client.remove_container(&name, None).await.unwrap(); } diff --git a/wdl-runtime/src/runtime.rs b/wdl-runtime/src/runtime.rs index b8776f1..7623fda 100644 --- a/wdl-runtime/src/runtime.rs +++ b/wdl-runtime/src/runtime.rs @@ -1,6 +1,7 @@ //! Implementation of the WDL runtime and values. use std::collections::HashMap; +use std::fmt; use id_arena::{Arena, Id}; use ordered_float::OrderedFloat; @@ -153,6 +154,37 @@ impl Value { _ => todo!("implement the remainder coercions"), } } + + /// Used to display the value. + pub fn display<'a>(&'a self, runtime: &'a Runtime<'_>) -> impl fmt::Display + 'a { + /// Helper type for implementing display. + struct Display<'a> { + /// A reference to the runtime. + runtime: &'a Runtime<'a>, + /// The value to display. + value: Value, + } + + impl fmt::Display for Display<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.value { + Value::Boolean(v) => write!(f, "{v}"), + Value::Integer(v) => write!(f, "{v}"), + Value::Float(v) => write!(f, "{v}"), + Value::String(sym) | Value::File(sym) | Value::Directory(sym) => { + write!(f, "{v}", v = self.runtime.resolve_str(sym)) + } + Value::None => write!(f, "None"), + Value::Stored(_, _) => todo!("implement display of compound types"), + } + } + } + + Display { + runtime, + value: *self, + } + } } impl From for Value { @@ -308,4 +340,9 @@ impl<'a> Runtime<'a> { pub fn resolve_str(&self, sym: SymbolU32) -> &str { self.interner.resolve(sym).expect("should have symbol") } + + /// Imports a type from the document types collection. + pub(crate) fn import_type(&mut self, ty: Type) -> Type { + self.types.import(self.document.types(), ty) + } } diff --git a/wdl-runtime/src/task.rs b/wdl-runtime/src/task.rs index c030c5f..62ff898 100644 --- a/wdl-runtime/src/task.rs +++ b/wdl-runtime/src/task.rs @@ -7,12 +7,13 @@ use petgraph::{ algo::{has_path_connecting, toposort}, graph::{DiGraph, NodeIndex}, }; +use wdl_analysis::types::{Coercible, Type, Types}; use wdl_ast::{ v1::{ CommandPart, CommandSection, Decl, HintsSection, NameRef, RequirementsSection, RuntimeSection, TaskDefinition, TaskItem, }, - AstNode, AstToken, Diagnostic, Ident, SyntaxNode, TokenStrHash, + AstNode, AstNodeExt, AstToken, Diagnostic, Ident, Span, SyntaxNode, TokenStrHash, }; use crate::{util::strip_leading_whitespace, v1::ExprEvaluator, Runtime, Value}; @@ -26,6 +27,22 @@ fn missing_input(task: &str, input: &Ident) -> Diagnostic { .with_label("a value must be specified for this input", input.span()) } +/// Creates a "input type mismatch" diagnostic. +fn input_type_mismatch( + types: &Types, + name: &str, + expected: Type, + actual: Type, + span: Span, +) -> Diagnostic { + Diagnostic::error(format!( + "type mismatch for input `{name}`: expected type `{expected}`, but found type `{actual}`", + expected = expected.display(types), + actual = actual.display(types), + )) + .with_label("the expected input type", span) +} + /// Represents a node in an evaluation graph. #[derive(Debug, Clone)] pub enum GraphNode { @@ -255,7 +272,7 @@ pub struct EvaluatedTask<'a> { /// The evaluated command text (i.e. bash script) to use for executing the task. command: String, /// The evaluated requirements for running the command. - requirements: IndexMap, + requirements: IndexMap, Value>, /// The evaluated hints for running the command. hints: IndexMap, /// The map from input paths to localized paths within the execution environment. @@ -295,7 +312,7 @@ impl<'a> EvaluatedTask<'a> { } /// The evaluated requirements for running the command. - pub fn requirements(&self) -> &IndexMap { + pub fn requirements(&self) -> &IndexMap, Value> { &self.requirements } @@ -369,7 +386,32 @@ impl TaskEvaluator { GraphNode::Input(decl) => { let name = decl.name(); if let Some(value) = inputs.get(name.as_str()) { - evaluated.scope.insert(TokenStrHash::new(name), *value); + if let Some(n) = runtime + .document() + .task_by_name(self.name.as_str()) + .expect("should have task scope") + .lookup(name.as_str()) + { + if let Some(ty) = n.ty() { + let ty = runtime.import_type(ty); + if !value.ty().is_coercible_to(runtime.types(), &ty) { + return Err(input_type_mismatch( + runtime.types(), + name.as_str(), + ty, + value.ty(), + decl.ty().span(), + )); + } + + evaluated.scope.insert(TokenStrHash::new(name), *value); + } else { + todo!("handle unknown type"); + } + } else { + // Handle specified input that is not a task input + todo!("handle extra specified input"); + } } else { // Check to see if the declaration was unbound; if so, it may be required if the declared type is not optional if let Decl::Unbound(decl) = decl { @@ -384,7 +426,7 @@ impl TaskEvaluator { } } GraphNode::Decl(_) => continue, - _ => break, + _ => continue, } } @@ -403,8 +445,17 @@ impl TaskEvaluator { let value = evaluator.evaluate_expr(runtime, &expr)?; evaluated.scope.insert(TokenStrHash::new(name), value); } - GraphNode::Requirements(_) => { - // TODO: implement + GraphNode::Requirements(section) => { + for item in section.items() { + let name = item.name(); + let expr = item.expr(); + + let evaluator = ExprEvaluator::new(&evaluated.scope); + let value = evaluator.evaluate_expr(runtime, &expr)?; + evaluated + .requirements + .insert(TokenStrHash::new(name), value); + } } GraphNode::Runtime(_) => { // TODO: implement From a886a62883b7700e05b2612d53b51b134bf730de Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Fri, 6 Sep 2024 14:56:59 -0500 Subject: [PATCH 15/16] fix: add `count` input to the demo WDL. (#25) --- crankshaft/demo.json | 3 ++- crankshaft/demo.wdl | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crankshaft/demo.json b/crankshaft/demo.json index bd9a9d5..c347c8d 100644 --- a/crankshaft/demo.json +++ b/crankshaft/demo.json @@ -1,3 +1,4 @@ { - "url": "https://www.encodeproject.org/files/ENCFF863PGO/@@download/ENCFF863PGO.bam" + "url": "https://www.encodeproject.org/files/ENCFF863PGO/@@download/ENCFF863PGO.bam", + "count": 100000 } diff --git a/crankshaft/demo.wdl b/crankshaft/demo.wdl index 20af475..6679d4d 100644 --- a/crankshaft/demo.wdl +++ b/crankshaft/demo.wdl @@ -5,10 +5,11 @@ version 1.2 task samtools_flagstat { input { String url + Int count = 100000 } command <<< - samtools flagstat <(wget -O - -q '~{url}' | samtools view -h | head -n 100000) + samtools flagstat <(wget -O - -q '~{url}' | samtools view -h | head -n ~{count}) >>> requirements { From fb3c076a425b184fc2ab70a5b6a71b0c8a13ca08 Mon Sep 17 00:00:00 2001 From: Andrew Frantz Date: Fri, 6 Sep 2024 16:58:37 -0400 Subject: [PATCH 16/16] feat: hammer lsf (#26) * Update lsf.rs * Update lsf.rs * Update lsf.rs * Update lsf.rs --- crankshaft/examples/lsf.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crankshaft/examples/lsf.rs b/crankshaft/examples/lsf.rs index 464fb66..f331e9d 100644 --- a/crankshaft/examples/lsf.rs +++ b/crankshaft/examples/lsf.rs @@ -27,7 +27,7 @@ async fn main() { .expect("at least one generic backend config to be present in the config"); let backend = GenericBackend::try_from(config).expect("parsing the backend configuration"); - let mut engine = Engine::default().with_backend("generic", backend.to_runner()); + let mut engine = Engine::empty().with_backend("generic", backend.to_runner()); let task = Task::builder() .name("my-example-task") @@ -35,7 +35,7 @@ async fn main() { .extend_executions(vec![Execution::builder() .working_directory(".") .image("ubuntu") - .args(&[String::from("echo"), String::from("'hello, world!'")]) + .args(&[String::from("echo"), String::from("'hello world from LSF'")]) .stdout("stdout.txt") .stderr("stderr.txt") .try_build() @@ -43,7 +43,7 @@ async fn main() { .try_build() .unwrap(); - let receivers = (0..1000) + let receivers = (0..10000) .map(|_| engine.submit("generic", task.clone()).callback) .collect::>();