Skip to content

Commit

Permalink
API changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubadamw committed Sep 28, 2024
1 parent 6447294 commit c547568
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0-alpha.1"
edition = "2021"

[dependencies]
anyhow = "1"
ctrlc-async = "3"
derive_builder = "0.20"
envy = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ enum ClientTlStrategy {

pub use client::Client;
pub use worker::{Worker, WorkerBuilder};
pub use workflow::{Workflow, WorkflowBuilder};
pub use workflow::{Step, StepBuilder, Workflow, WorkflowBuilder};

#[cfg(test)]
mod tests;
50 changes: 26 additions & 24 deletions src/worker/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,32 @@ where
.map_err(crate::Error::CouldNotDecodeActionPayload)?;

// FIXME: Obviously, run this asynchronously rather than blocking the main listening loop.
match tokio::task::spawn_local(async move { action_callable(input).await }).await {
Ok(output_value) => {
dispatcher
.send_step_action_event(step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeCompleted,
serde_json::to_string(&output_value).expect("must succeed"),
))
.await?
.into_inner();
}
Err(join_error) => {
dispatcher
.send_step_action_event(step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeFailed,
join_error.to_string(),
))
.await?
.into_inner();
}
}
let action_event =
match tokio::task::spawn_local(async move { action_callable(input).await }).await {
Ok(Ok(output_value)) => step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeCompleted,
serde_json::to_string(&output_value).expect("must succeed"),
),
Ok(Err(error)) => step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeFailed,
error.to_string(),
),
Err(join_error) => step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeFailed,
join_error.to_string(),
),
};

dispatcher
.send_step_action_event(action_event)
.await?
.into_inner();

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn construct_endpoint(
}

impl<'a> Worker<'a> {
pub async fn register_workflow(&mut self, workflow: crate::workflow::Workflow) {
pub fn register_workflow(&mut self, workflow: crate::workflow::Workflow) {
self.workflows.push(workflow);
}

Expand Down
40 changes: 37 additions & 3 deletions src/workflow.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::sync::Arc;

pub(crate) type StepFunction =
dyn Fn(serde_json::Value) -> futures_util::future::BoxFuture<'static, serde_json::Value>;
type StepFunction =
dyn Fn(
serde_json::Value,
) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<serde_json::Value>>;

#[derive(derive_builder::Builder)]
#[builder(pattern = "owned")]
pub(crate) struct Step {
pub struct Step {
#[builder(setter(into))]
pub(crate) name: String,
#[builder(setter(custom))]
function: Arc<StepFunction>,
#[builder(default)]
pub(crate) retries: usize,
Expand All @@ -16,12 +20,33 @@ pub(crate) struct Step {
pub(crate) timeout: std::time::Duration,
}

impl StepBuilder {
pub fn function<I, O, Fut, F>(mut self, function: &'static F) -> Self
where
I: serde::de::DeserializeOwned,
O: serde::ser::Serialize,
Fut: std::future::Future<Output = anyhow::Result<O>> + 'static,
F: Fn(I) -> Fut,
{
use futures_util::FutureExt;
self.function = Some(Arc::new(|value| {
let result = function(serde_json::from_value(value).expect("must succeed"));
async { Ok(serde_json::to_value(result.await?).expect("must succeed")) }.boxed_local()
}));
self
}
}

#[derive(derive_builder::Builder)]
#[builder(pattern = "owned")]
pub struct Workflow {
#[builder(setter(into))]
pub(crate) name: String,
#[builder(setter(into))]
pub(crate) description: String,
#[builder(default, setter(into))]
pub(crate) version: String,
#[builder(default, setter(custom))]
pub(crate) steps: Vec<Step>,
#[builder(default)]
pub(crate) on_events: Vec<String>,
Expand All @@ -31,6 +56,15 @@ pub struct Workflow {
pub(crate) schedule_timeout: std::time::Duration,
}

impl WorkflowBuilder {
pub fn step(mut self, step: Step) -> Self {
let mut steps = self.steps.take().unwrap_or_default();
steps.push(step);
self.steps = Some(steps);
self
}
}

impl Workflow {
pub(crate) fn actions<'a>(
&'a self,
Expand Down

0 comments on commit c547568

Please sign in to comment.