Skip to content

Commit

Permalink
fix:workflow-macro-term
Browse files Browse the repository at this point in the history
  • Loading branch information
shanithkk committed Dec 8, 2023
1 parent 1f09715 commit fae66b1
Showing 1 changed file with 49 additions and 16 deletions.
65 changes: 49 additions & 16 deletions workflow/workflow_macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream {


impl #workflow {

pub fn node_count(&self) -> usize {
self.nodes.len()
}
Expand Down Expand Up @@ -60,27 +60,60 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream {
}

}
pub fn term(&mut self, task_index: Option<usize>) -> Result<Value,String> {

pub fn term(&mut self, task_index: Option<usize>) -> Result<Value, String> {
match task_index {
Some(index) => {
let previous_index = (index - 1).try_into().unwrap();
let previous_task = self.get_task(previous_index);
let previous_task_output = previous_task.get_task_output();
let current_task = self.get_task_as_mut(index);
current_task.set_output_to_task(previous_task_output);
match current_task.execute(){
Ok(()) => Ok(current_task.get_task_output()),
Err(err) => Err(err),
let mut list = Vec::new();
let edges_list = self.edges.clone();
edges_list.iter().for_each(|(source, destination)| {
if destination == &index {
list.push(source)
}
});
let mut res: Vec<Value> = Vec::new();
match list.len() {
0 => {
let current_task = self.get_task_as_mut(index);
match current_task.execute() {
Ok(()) => return Ok(current_task.get_task_output()),
Err(err) => return Err(err),
}
}
1 => {
let previous_task_output = self.get_task(*list[0]).get_task_output();
let current_task = self.get_task_as_mut(index);
current_task.set_output_to_task(previous_task_output);
match current_task.execute() {
Ok(()) => return Ok(current_task.get_task_output()),
Err(err) => return Err(err),
}
}
_ => {
res = list
.iter()
.map(|index| {
let previous_task = self.get_task(**index);
let previous_task_output = previous_task.get_task_output();
previous_task_output
})
.collect();

let s: Value = res.into();
let current_task = self.get_task_as_mut(index);
current_task.set_output_to_task(s);

match current_task.execute() {
Ok(()) => return Ok(current_task.get_task_output()),
Err(err) => return Err(err),
};
}
}

},
}
None => {
let len = self.node_count();
Ok(self.get_task(len-1).get_task_output())
},
Ok(self.get_task(len - 1).get_task_output())
}
}

}

pub fn pipe(&mut self, task_index: usize) -> Result<&mut Self,String> {
Expand Down

0 comments on commit fae66b1

Please sign in to comment.