Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pause and resume feature in workflow management #80

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions actions/workflow-invoker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern crate serde_json;

use serde_derive::{Deserialize, Serialize};
use serde_json::{Error, Value};
use types::UserData;
mod types;
use crate::types::update_with;
#[cfg(test)]
Expand Down Expand Up @@ -56,26 +57,30 @@ impl Action {
self.context.as_mut().expect("Action not Initialized!")
}

pub fn fetch_input(&mut self) -> Result<Vec<Value>, Error> {
pub fn fetch_input(&mut self) -> Result<Vec<UserData>, Error> {
let id = self.params.messages.clone()[0].topic.clone();
let data = self.get_context().get_document(&id)?;
let parsed = serde_json::from_value::<Topic>(data)?;
Ok(parsed.data)
}

pub fn invoke_trigger(&mut self, payload: &mut Vec<Value>) -> Result<Value, Error> {
pub fn invoke_trigger(&mut self, payload: &mut Vec<UserData>) -> Result<Value, Error> {
let mut failed_triggers = vec![];
for message in payload.iter_mut() {

for user in payload.iter_mut() {
let data = serde_json::from_str::<Value>(&self.params.messages[0].value).unwrap();
update_with(message, &data);
update_with(&mut user.input_data, &data);

let trigger = self.params.polkadot_payout_trigger.clone();
if self
.get_context()
.invoke_trigger(&trigger, &serde_json::json!({"data": message}))
.is_err()
{
failed_triggers.push(self.params.messages[0].value.clone());

if user.status {
if self
.get_context()
.invoke_trigger(&trigger, &serde_json::json!({"data": user.input_data}))
.is_err()
{
failed_triggers.push(self.params.messages[0].value.clone());
}
}
}
if !failed_triggers.is_empty() {
Expand Down Expand Up @@ -132,7 +137,11 @@ mod tests {
let workflow_db = action.connect_db(&action.params.db_url, &action.params.db_name);
let workflow_management_db_context = Context::new(workflow_db, None);
let doc = serde_json::json!({
"data": [{ "url": "todo!()".to_string(), "validator": "todo!()".to_string(), "owner_key": "todo!()".to_string() }]
"data": [{
"user_id" : "asdf",
"status" : true,
"input_data" :{ "url": "todo!()".to_string(), "validator": "todo!()".to_string(), "owner_key": "todo!()".to_string() }
}]
});
let _ = workflow_management_db_context
.insert_document(&doc, Some(action.params.messages[0].topic.clone()));
Expand Down
18 changes: 13 additions & 5 deletions actions/workflow-invoker/src/types/data.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
use serde_derive::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Topic {
#[serde(rename = "_id")]
pub id: String,
#[serde(rename = "_rev")]
pub rev: String,
pub data: Vec<UserData>,
}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DbDatas {
pub endpoint: String,
pub validator: String,
pub key: String,
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct UserData {
pub user_id: String,
pub status: bool,
pub input_data: serde_json::Value,
}
3 changes: 1 addition & 2 deletions actions/workflow-invoker/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ pub mod source;
pub mod topic;
pub use message::{Era, Message};
pub use source::Source;
pub use topic::Topic;
mod data;
pub use data::*;
pub use data::{Topic, UserData};

pub fn update_with(dest: &mut serde_json::Value, src: &serde_json::Value) {
use serde_json::Value::{Null, Object};
Expand Down
10 changes: 0 additions & 10 deletions actions/workflow-invoker/src/types/topic.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Topic {
#[serde(skip_serializing, rename(deserialize = "_id"))]
pub id: String,
#[serde(skip_serializing, rename(deserialize = "_rev"))]
pub rev: String,
pub data: Vec<Value>,
}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand Down
7 changes: 5 additions & 2 deletions actions/workflow-management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ actions-common = { git = "https://github.com/HugoByte/aurras", rev = '1f7e117' }
reqwest = { version = "0.11", features = ["blocking", "json"] }
bcrypt = "0.13.0"
jsonwebtoken = "7.1"
chrono = { version = "0.4", features = ["serde"] }

[dev-dependencies]
actions-common = { git = "https://github.com/HugoByte/aurras", rev = '1f7e117', features = ["mock_containers"] }
tokio = { version = "1.0.0", features = ["macros"] }
actions-common = { git = "https://github.com/HugoByte/aurras", rev = '1f7e117', features = [
"mock_containers",
] }
tokio = { version = "1.0.0", features = ["macros"] }
Loading
Loading