From 24a77213433dddf3a9f55391b1b1cba2a142397f Mon Sep 17 00:00:00 2001 From: andthattoo Date: Tue, 5 Nov 2024 17:03:46 +0300 Subject: [PATCH] added multiple returnable results --- examples/execute_workflow.rs | 19 --- examples/import_test.rs | 29 ++++ examples/workflows/search.json | 253 ------------------------------- src/memory/types.rs | 14 ++ src/program/atomics.rs | 9 +- src/program/executor.rs | 13 +- tests/run.rs | 13 +- tests/test_workflows/simple.json | 21 ++- 8 files changed, 81 insertions(+), 290 deletions(-) delete mode 100644 examples/execute_workflow.rs create mode 100644 examples/import_test.rs delete mode 100644 examples/workflows/search.json diff --git a/examples/execute_workflow.rs b/examples/execute_workflow.rs deleted file mode 100644 index d9c5cfb..0000000 --- a/examples/execute_workflow.rs +++ /dev/null @@ -1,19 +0,0 @@ -use dotenv::dotenv; -use env_logger::Env; -use ollama_workflows::{Entry, Executor, Model, ProgramMemory, Workflow}; - -#[tokio::main] -async fn main() { - dotenv().ok(); - let env = Env::default().filter_or("LOG_LEVEL", "info"); - env_logger::Builder::from_env(env).init(); - let exe = Executor::new(Model::GPT4Turbo); - let workflow = Workflow::new_from_json("./workflows/search.json").unwrap(); - let mut memory = ProgramMemory::new(); - let input = Entry::try_value_or_str("How would does reiki work?"); - let return_value = exe.execute(Some(&input), &workflow, &mut memory).await; - match return_value { - Ok(value) => println!("{}", value), - Err(err) => eprintln!("Error: {:?}", err), - } -} diff --git a/examples/import_test.rs b/examples/import_test.rs new file mode 100644 index 0000000..46d0ffd --- /dev/null +++ b/examples/import_test.rs @@ -0,0 +1,29 @@ +use dotenv::dotenv; +use env_logger::Env; +use ollama_workflows::{Executor, Model, ProgramMemory, Workflow}; + +fn main() { + // Initialize environment + dotenv().ok(); + let env = Env::default().filter_or("LOG_LEVEL", "info"); + env_logger::Builder::from_env(env).init(); + + // Create runtime for async execution + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + // Initialize the executor with desired model + let exe = Executor::new(Model::Phi3Medium); + + // Load workflow from JSON file + let workflow = Workflow::new_from_json("./tests/test_workflows/simple.json").unwrap(); + + // Initialize program memory + let mut memory = ProgramMemory::new(); + + // Execute workflow and handle the result + match exe.execute(None, &workflow, &mut memory).await { + Ok(result) => println!("Generated poem:\n{}", result), + Err(err) => eprintln!("Error executing workflow: {:?}", err), + } + }); +} diff --git a/examples/workflows/search.json b/examples/workflows/search.json deleted file mode 100644 index 4b330c8..0000000 --- a/examples/workflows/search.json +++ /dev/null @@ -1,253 +0,0 @@ -{ - "name": "Searching Agent", - "description": "Try to answer a given question using data from the web with grounded data.", - "config": { - "max_steps": 35, - "max_time": 600, - "tools": ["ALL"] - }, - "tasks": [ - { - "id": "A", - "name": "Web Search Query", - "description": "Write a web search query to collect useful information for the given question", - "prompt": "You are tasked with generating a single search query to gather useful information for answering a given question. Your goal is to create a concise, clear, and creative query that avoids repeating previously asked questions.\n\nHere is the query you need to address:\n\n{{query}}\n\n\nHere are the previous questions that have been asked:\n\n{{history}}\n\n\nWhen creating your search query, follow these guidelines:\n1. Analyze the given query and identify key concepts or terms.\n2. Review the previous questions to avoid repetition.\n3. Be creative in your approach, considering alternative phrasings or related concepts.\n4. Keep the query concise and clear, typically no more than 5-7 words.\n5. Focus on gathering information that will be most useful in answering the original query.\n\nProvide your search query inside tags. Do not include any explanation or additional text outside these tags.", - "inputs": [ - { - "name": "query", - "value": { - "type": "input", - "key": "" - }, - "required": true - }, - { - "name": "history", - "value": { - "type": "get_all", - "key": "history" - }, - "required": false - } - ], - "operator": "generation", - "outputs": [ - { - "type": "write", - "key": "web_search_query", - "value": "__result" - }, - { - "type": "push", - "key": "history", - "value": "__result" - } - ] - }, - { - "id": "B", - "name": "Web Search", - "description": "Search the web with the given query", - "prompt": "{query}", - "inputs": [ - { - "name": "query", - "value": { - "type": "read", - "key": "web_search_query" - }, - "required": true - } - ], - "operator": "function_calling", - "outputs": [ - { - "type": "write", - "key": "web_search_result", - "value": "__result" - }, - { - "type": "push", - "key": "web_search_result", - "value": "__result" - } - ] - }, - { - "id": "C", - "name": "Candidate Website", - "description": "Pick the most useful link from web search results for the given query", - "prompt": "Pick the most useful URL to scrape information for Query: {query} \n\n ###Sarch Results:{web_results} \n\n Only output the selected URL: ###Selected URL:", - "inputs": [ - { - "name": "web_results", - "value": { - "type": "read", - "key": "web_search_result" - }, - "required": true - }, - { - "name": "query", - "value": { - "type": "input", - "key": "" - }, - "required": true - } - ], - "operator": "generation", - "outputs": [ - { - "type": "write", - "key": "search_url", - "value": "__result" - } - ] - }, - { - "id": "D", - "name": "Scrape Website", - "description": "Scrape the selected website", - "prompt": "scrape {search_url}", - "inputs": [ - { - "name": "search_url", - "value": { - "type": "read", - "key": "search_url", - "required": true - }, - "required": true - } - ], - "operator": "function_calling", - "outputs": [ - { - "type": "push", - "key": "content", - "value": "__result" - } - ] - }, - { - "id": "E", - "name": "Summarize Website", - "description": "Summarize website content", - "prompt": "Answer given question completely based on following context. Don't assume anything. Use the provided information to answer the question. If context is not enough, say 'I dont know.' ###Content: {content} \n\n ###Question: {query} \n\n ###Answer:", - "inputs": [ - { - "name": "content", - "value": { - "type": "pop", - "key": "content", - "required": true - }, - "required": true - }, - { - "name": "query", - "value": { - "type": "input", - "key": "" - }, - "required": true - } - ], - "operator": "generation", - "outputs": [ - { - "type": "push", - "key": "answers", - "value": "__result" - } - ] - }, - { - "id": "F", - "name": "Evaluate Result", - "description": "Evaluate if the result is satisfying", - "prompt": "Evaluate the result if it answers the query. Write 'Yes' if valid, if not 'No'.Write nothing else but strictly 'Yes'|'No'. query:{query} result:{answer}. ###Evaluation:", - "inputs": [ - { - "name": "answer", - "value": { - "type": "get_all", - "key": "answers" - }, - "required": true - }, - { - "name": "query", - "value": { - "type": "input", - "key": "" - }, - "required": true - } - ], - "operator": "generation", - "outputs": [ - { - "type": "write", - "key": "final_result", - "value": "__result" - } - ] - }, - { - "id": "__end", - "name": "end", - "description": "End of the task", - "prompt": "End of the task", - "inputs": [], - "operator": "end", - "outputs": [] - } - ], - "steps": [ - { - "source": "A", - "target": "B", - "fallback": "H" - }, - { - "source": "B", - "target": "C", - "fallback": "A" - }, - { - "source": "C", - "target": "D" - }, - { - "source": "D", - "target": "E", - "fallback": "A" - }, - { - "source": "E", - "target": "F" - }, - { - "source": "F", - "target": "end", - "condition": { - "input":{ - "type":"read", - "key": "final_result" - }, - "expression": "Equal", - "expected": "Yes", - "target_if_not": "A" - } - } - ], - "return_value": { - "input": { - "type": "read", - "key": "final_result" - } - } - } - \ No newline at end of file diff --git a/src/memory/types.rs b/src/memory/types.rs index 20fca98..f9dc970 100644 --- a/src/memory/types.rs +++ b/src/memory/types.rs @@ -51,6 +51,7 @@ pub enum MemoryReturnType { //EntryRef(Option<&'a Entry>), Entry(Option), EntryVec(Option>), + Multiple(Vec), } impl MemoryReturnType { @@ -59,6 +60,7 @@ impl MemoryReturnType { //MemoryReturnType::EntryRef(entry_ref) => entry_ref.is_none(), MemoryReturnType::Entry(entry) => entry.is_none(), MemoryReturnType::EntryVec(entry_vec) => entry_vec.is_none(), + MemoryReturnType::Multiple(returns) => returns.is_empty(), } } @@ -76,6 +78,10 @@ impl MemoryReturnType { Err(_) => None, } } + MemoryReturnType::Multiple(returns) => { + let values: Vec = returns.iter().map(|ret| ret.to_string()).collect(); + serde_json::to_string(&values).ok() + } _ => None, } } @@ -100,6 +106,14 @@ impl fmt::Display for MemoryReturnType { } write!(f, "{}", result) } + MemoryReturnType::Multiple(returns) => { + let mut result = String::new(); + for ret in returns { + result.push_str(&ret.to_string()); + result.push_str(" \n"); + } + write!(f, "{}", result) + } } } } diff --git a/src/program/atomics.rs b/src/program/atomics.rs index d636cae..b3e6ffd 100644 --- a/src/program/atomics.rs +++ b/src/program/atomics.rs @@ -95,9 +95,16 @@ pub struct Task { pub outputs: Vec, } +#[derive(Debug, Deserialize)] +#[serde(untagged)] +pub enum TaskOutputInput { + Single(InputValue), + Multiple(Vec), +} + #[derive(Debug, Deserialize)] pub struct TaskOutput { - pub input: InputValue, + pub input: TaskOutputInput, pub to_json: Option, pub post_process: Option>, } diff --git a/src/program/executor.rs b/src/program/executor.rs index 262c272..ecb9405 100644 --- a/src/program/executor.rs +++ b/src/program/executor.rs @@ -192,7 +192,18 @@ impl Executor { )); } let rv = workflow.get_return_value(); - let return_value = self.handle_input(&rv.input, memory).await; + + //let return_value = self.handle_input(&rv.input, memory).await; + let return_value = match &rv.input { + TaskOutputInput::Single(input) => self.handle_input(input, memory).await, + TaskOutputInput::Multiple(inputs) => { + let mut results = Vec::new(); + for input in inputs { + results.push(self.handle_input(input, memory).await); + } + MemoryReturnType::Multiple(results) + } + }; let mut return_string = return_value.to_string().clone(); if rv.to_json.is_some() && rv.to_json.unwrap() { diff --git a/tests/run.rs b/tests/run.rs index 339bed3..08e31e8 100644 --- a/tests/run.rs +++ b/tests/run.rs @@ -33,8 +33,14 @@ macro_rules! workflow_test { let workflow = Workflow::new_from_json($workflow).unwrap(); let mut memory = ProgramMemory::new(); let input = Entry::try_value_or_str($input); - if let Err(e) = exe.execute(Some(&input), &workflow, &mut memory).await { - log::error!("Execution failed: {}", e); + match exe.execute(Some(&input), &workflow, &mut memory).await { + Ok(result) => { + log::info!("Execution completed successfully"); + log::info!("Result: {:?}", result); + } + Err(e) => { + log::error!("Execution failed: {}", e); + } }; } }; @@ -68,8 +74,7 @@ mod simple_workflow_tests { workflow_test!( gemini_simple_workflow, Model::Gemini15Flash, - SIMPLE_WORKFLOW_PATH, - "How does reiki work?" + SIMPLE_WORKFLOW_PATH ); workflow_test!(simple_coder, Model::Qwen2_5Coder1_5B, CODER_PATH); diff --git a/tests/test_workflows/simple.json b/tests/test_workflows/simple.json index 5568664..23f449b 100644 --- a/tests/test_workflows/simple.json +++ b/tests/test_workflows/simple.json @@ -36,22 +36,19 @@ "steps":[ { "source":"A", - "target":"end", - "condition": { - "input":{ - "type":"read", - "key": "poem" - }, - "expression": "Equal", - "expected": "Yes", - "target_if_not": "A" - } + "target":"end" } ], - "return_value":{ - "input":{ + "return_value": + { + "input":[{ + "type": "read", + "key": "poem" + }, + { "type": "read", "key": "poem" } + ] } } \ No newline at end of file