From c547568fe96783e0c1c57efef5f4072f00127243 Mon Sep 17 00:00:00 2001 From: Jakub Wieczorek Date: Sat, 28 Sep 2024 13:13:17 +0200 Subject: [PATCH] API changes. --- Cargo.lock | 1 + Cargo.toml | 1 + src/lib.rs | 2 +- src/worker/listener.rs | 50 ++++++++++++++++++++++-------------------- src/worker/mod.rs | 2 +- src/workflow.rs | 40 ++++++++++++++++++++++++++++++--- 6 files changed, 67 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38e0685..dc1e54c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -437,6 +437,7 @@ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" name = "hatchet-sdk" version = "0.1.0-alpha.1" dependencies = [ + "anyhow", "ctrlc-async", "derive_builder", "envy", diff --git a/Cargo.toml b/Cargo.toml index 22a41f7..b0cf240 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0-alpha.1" edition = "2021" [dependencies] +anyhow = "1" ctrlc-async = "3" derive_builder = "0.20" envy = "0.4" diff --git a/src/lib.rs b/src/lib.rs index fad4bb6..eaea36a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ enum ClientTlStrategy { pub use client::Client; pub use worker::{Worker, WorkerBuilder}; -pub use workflow::{Workflow, WorkflowBuilder}; +pub use workflow::{Step, StepBuilder, Workflow, WorkflowBuilder}; #[cfg(test)] mod tests; diff --git a/src/worker/listener.rs b/src/worker/listener.rs index e95ee14..f9520d3 100644 --- a/src/worker/listener.rs +++ b/src/worker/listener.rs @@ -73,30 +73,32 @@ where .map_err(crate::Error::CouldNotDecodeActionPayload)?; // FIXME: Obviously, run this asynchronously rather than blocking the main listening loop. - match tokio::task::spawn_local(async move { action_callable(input).await }).await { - Ok(output_value) => { - dispatcher - .send_step_action_event(step_action_event( - worker_id, - &action, - StepActionEventType::StepEventTypeCompleted, - serde_json::to_string(&output_value).expect("must succeed"), - )) - .await? - .into_inner(); - } - Err(join_error) => { - dispatcher - .send_step_action_event(step_action_event( - worker_id, - &action, - StepActionEventType::StepEventTypeFailed, - join_error.to_string(), - )) - .await? - .into_inner(); - } - } + let action_event = + match tokio::task::spawn_local(async move { action_callable(input).await }).await { + Ok(Ok(output_value)) => step_action_event( + worker_id, + &action, + StepActionEventType::StepEventTypeCompleted, + serde_json::to_string(&output_value).expect("must succeed"), + ), + Ok(Err(error)) => step_action_event( + worker_id, + &action, + StepActionEventType::StepEventTypeFailed, + error.to_string(), + ), + Err(join_error) => step_action_event( + worker_id, + &action, + StepActionEventType::StepEventTypeFailed, + join_error.to_string(), + ), + }; + + dispatcher + .send_step_action_event(action_event) + .await? + .into_inner(); Ok(()) } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 3552ba0..683205e 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -111,7 +111,7 @@ async fn construct_endpoint( } impl<'a> Worker<'a> { - pub async fn register_workflow(&mut self, workflow: crate::workflow::Workflow) { + pub fn register_workflow(&mut self, workflow: crate::workflow::Workflow) { self.workflows.push(workflow); } diff --git a/src/workflow.rs b/src/workflow.rs index 363c6bf..daea3a8 100644 --- a/src/workflow.rs +++ b/src/workflow.rs @@ -1,12 +1,16 @@ use std::sync::Arc; -pub(crate) type StepFunction = - dyn Fn(serde_json::Value) -> futures_util::future::BoxFuture<'static, serde_json::Value>; +type StepFunction = + dyn Fn( + serde_json::Value, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result>; #[derive(derive_builder::Builder)] #[builder(pattern = "owned")] -pub(crate) struct Step { +pub struct Step { + #[builder(setter(into))] pub(crate) name: String, + #[builder(setter(custom))] function: Arc, #[builder(default)] pub(crate) retries: usize, @@ -16,12 +20,33 @@ pub(crate) struct Step { pub(crate) timeout: std::time::Duration, } +impl StepBuilder { + pub fn function(mut self, function: &'static F) -> Self + where + I: serde::de::DeserializeOwned, + O: serde::ser::Serialize, + Fut: std::future::Future> + 'static, + F: Fn(I) -> Fut, + { + use futures_util::FutureExt; + self.function = Some(Arc::new(|value| { + let result = function(serde_json::from_value(value).expect("must succeed")); + async { Ok(serde_json::to_value(result.await?).expect("must succeed")) }.boxed_local() + })); + self + } +} + #[derive(derive_builder::Builder)] #[builder(pattern = "owned")] pub struct Workflow { + #[builder(setter(into))] pub(crate) name: String, + #[builder(setter(into))] pub(crate) description: String, + #[builder(default, setter(into))] pub(crate) version: String, + #[builder(default, setter(custom))] pub(crate) steps: Vec, #[builder(default)] pub(crate) on_events: Vec, @@ -31,6 +56,15 @@ pub struct Workflow { pub(crate) schedule_timeout: std::time::Duration, } +impl WorkflowBuilder { + pub fn step(mut self, step: Step) -> Self { + let mut steps = self.steps.take().unwrap_or_default(); + steps.push(step); + self.steps = Some(steps); + self + } +} + impl Workflow { pub(crate) fn actions<'a>( &'a self,