Skip to content

Commit

Permalink
revise: multiple updates to the backend execution engine
Browse files Browse the repository at this point in the history
  • Loading branch information
claymcleod committed Sep 5, 2024
1 parent 147e52f commit d20d8de
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 107 deletions.
154 changes: 154 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"

[dependencies]
bollard = "0.17.1"
bytes = "1.7.1"
clap = { version = "4.5.16", features = ["derive"] }
futures = "0.3.30"
indexmap = "2.5.0"
Expand All @@ -13,6 +14,8 @@ paste = "1.0.15"
rand = "0.8.5"
random_word = { version = "0.4.3", features = ["en"] }
tokio = { version = "1.40.0", features = ["full", "time"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = "2.5.2"

[lints.rust]
Expand Down
24 changes: 20 additions & 4 deletions examples/docker.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
//! An example for runner a task using the Docker backend service.
use crankshaft::engine::service::runner::backend::Docker;
use crankshaft::engine::task::Execution;
use crankshaft::engine::Engine;
use crankshaft::engine::Task;
use tracing_subscriber::fmt;
use tracing_subscriber::layer::SubscriberExt as _;
use tracing_subscriber::util::SubscriberInitExt as _;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() {
let mut docker = Docker::try_new().unwrap();
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();

let mut engine = Engine::default();

let task = Task::builder()
.name("my-example-task")
Expand All @@ -24,6 +33,13 @@ async fn main() {
.try_build()
.unwrap();

let result = docker.submit(task).await.unwrap();
println!("Exit code: {:?}", &result)
let receivers = (0..10)
.map(|_| engine.submit(task.clone()).callback)
.collect::<Vec<_>>();

engine.run().await;

for rx in receivers {
println!("Reply: {:?}", rx.await.unwrap());
}
}
36 changes: 36 additions & 0 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,40 @@
pub mod service;
pub mod task;

use futures::future::join_all;
pub use task::Task;
use tracing::debug;

use crate::engine::service::runner::Handle;
use crate::engine::service::runner::Runner;

/// An engine.
#[derive(Debug)]
pub struct Engine {
/// The task runner.
runner: Runner,
}

impl Engine {
/// Submits a [`Task`] to be executed.
///
/// A [`Handle`] is returned, which contains a channel that can be awaited
/// for the result of the job.
pub fn submit(&mut self, task: Task) -> Handle {
debug!(task = ?task);
self.runner.submit(task)
}

/// Runs all of the tasks scheduled in the engine.
pub async fn run(self) {
join_all(self.runner.tasks).await;
}
}

impl Default for Engine {
fn default() -> Self {
Self {
runner: Runner::docker(),
}
}
}
47 changes: 47 additions & 0 deletions src/engine/service/runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,50 @@
//! Task runner services.
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use tokio::sync::oneshot::Receiver;

use crate::engine::service::runner::backend::docker;
use crate::engine::service::runner::backend::Reply;
use crate::engine::Task;

pub mod backend;

/// A submitted task handle.
#[derive(Debug)]
pub struct Handle {
/// The callback that is executed when a task is completed.
pub callback: Receiver<Reply>,
}

/// A generic task runner.
#[derive(Debug)]
pub struct Runner {
/// The task runner itself.
runner: docker::Runner,

/// The list of submitted tasks.
pub tasks: FuturesUnordered<BoxFuture<'static, ()>>,
}

impl Runner {
/// Creates a Docker-backed [`Runner`].
///
/// # Panics
///
/// If initialization of the [`bollard`](bollard) client fails.
pub fn docker() -> Self {
Self {
runner: docker::Runner::try_new().unwrap(),
tasks: Default::default(),
}
}

/// Submits a task to be executed by the backend.
pub fn submit(&self, task: Task) -> Handle {
let (tx, rx) = tokio::sync::oneshot::channel();
self.tasks.push(Box::pin(self.runner.run(task, tx)));

Handle { callback: rx }
}
}
Loading

0 comments on commit d20d8de

Please sign in to comment.