Skip to content

Commit

Permalink
chore: add pause and resume feature in workflow management
Browse files Browse the repository at this point in the history
  • Loading branch information
shanithkk committed Nov 8, 2023
1 parent d0418fd commit f48adf2
Show file tree
Hide file tree
Showing 8 changed files with 649 additions and 43 deletions.
31 changes: 20 additions & 11 deletions actions/workflow-invoker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern crate serde_json;

use serde_derive::{Deserialize, Serialize};
use serde_json::{Error, Value};
use types::UserData;
mod types;
use crate::types::update_with;
#[cfg(test)]
Expand Down Expand Up @@ -56,26 +57,30 @@ impl Action {
self.context.as_mut().expect("Action not Initialized!")
}

pub fn fetch_input(&mut self) -> Result<Vec<Value>, Error> {
pub fn fetch_input(&mut self) -> Result<Vec<UserData>, Error> {
let id = self.params.messages.clone()[0].topic.clone();
let data = self.get_context().get_document(&id)?;
let parsed = serde_json::from_value::<Topic>(data)?;
Ok(parsed.data)
}

pub fn invoke_trigger(&mut self, payload: &mut Vec<Value>) -> Result<Value, Error> {
pub fn invoke_trigger(&mut self, payload: &mut Vec<UserData>) -> Result<Value, Error> {
let mut failed_triggers = vec![];
for message in payload.iter_mut() {

for user in payload.iter_mut() {
let data = serde_json::from_str::<Value>(&self.params.messages[0].value).unwrap();
update_with(message, &data);
update_with(&mut user.input_data, &data);

let trigger = self.params.polkadot_payout_trigger.clone();
if self
.get_context()
.invoke_trigger(&trigger, &serde_json::json!({"data": message}))
.is_err()
{
failed_triggers.push(self.params.messages[0].value.clone());

if user.status == "active".to_string() {
if self
.get_context()
.invoke_trigger(&trigger, &serde_json::json!({"data": user.input_data}))
.is_err()
{
failed_triggers.push(self.params.messages[0].value.clone());
}
}
}
if !failed_triggers.is_empty() {
Expand Down Expand Up @@ -132,7 +137,11 @@ mod tests {
let workflow_db = action.connect_db(&action.params.db_url, &action.params.db_name);
let workflow_management_db_context = Context::new(workflow_db, None);
let doc = serde_json::json!({
"data": [{ "url": "todo!()".to_string(), "validator": "todo!()".to_string(), "owner_key": "todo!()".to_string() }]
"data": [{
"user_id" : "asdf",
"status" : "active",
"input_data" :{ "url": "todo!()".to_string(), "validator": "todo!()".to_string(), "owner_key": "todo!()".to_string() }
}]
});
let _ = workflow_management_db_context
.insert_document(&doc, Some(action.params.messages[0].topic.clone()));
Expand Down
17 changes: 17 additions & 0 deletions actions/workflow-invoker/src/types/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,20 @@ pub struct DbDatas {
pub validator: String,
pub key: String,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Topic {
#[serde(rename = "_id")]
pub id: String,
#[serde(rename = "_rev")]
pub rev: String,
pub data: Vec<UserData>,
}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct UserData {
pub user_id: String,
pub status: String,
pub input_data: serde_json::Value,
}
3 changes: 1 addition & 2 deletions actions/workflow-invoker/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ pub mod source;
pub mod topic;
pub use message::{Era, Message};
pub use source::Source;
pub use topic::Topic;
mod data;
pub use data::*;
pub use data::{Topic, UserData};

pub fn update_with(dest: &mut serde_json::Value, src: &serde_json::Value) {
use serde_json::Value::{Null, Object};
Expand Down
10 changes: 0 additions & 10 deletions actions/workflow-invoker/src/types/topic.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Topic {
#[serde(skip_serializing, rename(deserialize = "_id"))]
pub id: String,
#[serde(skip_serializing, rename(deserialize = "_rev"))]
pub rev: String,
pub data: Vec<Value>,
}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand Down
7 changes: 5 additions & 2 deletions actions/workflow_management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ actions-common = { git = "https://github.com/HugoByte/aurras", rev = '1f7e117' }
reqwest = { version = "0.11", features = ["blocking", "json"] }
bcrypt = "0.13.0"
jsonwebtoken = "7.1"
chrono = { version = "0.4", features = ["serde"] }

[dev-dependencies]
actions-common = { git = "https://github.com/HugoByte/aurras", rev = '1f7e117', features = ["mock_containers"] }
tokio = { version = "1.0.0", features = ["macros"] }
actions-common = { git = "https://github.com/HugoByte/aurras", rev = '1f7e117', features = [
"mock_containers",
] }
tokio = { version = "1.0.0", features = ["macros"] }
Loading

0 comments on commit f48adf2

Please sign in to comment.