From dbd6e424734e436966e3c2429b54f559030a460c Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Dec 2024 16:26:25 -0500 Subject: [PATCH 01/10] Types changes for role and task messages --- src/pipecat_flows/types.py | 45 ++++++++++++++++---------------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/src/pipecat_flows/types.py b/src/pipecat_flows/types.py index d243b3e..c378087 100644 --- a/src/pipecat_flows/types.py +++ b/src/pipecat_flows/types.py @@ -49,7 +49,7 @@ class FlowResult(TypedDict, total=False): class NodeConfigRequired(TypedDict): """Required fields for node configuration.""" - messages: List[dict] + task_messages: List[dict] functions: List[dict] @@ -57,47 +57,39 @@ class NodeConfig(NodeConfigRequired, total=False): """Configuration for a single node in the flow. Required fields: - messages: List of message dicts in provider-specific format + task_messages: List of message dicts defining the current node's objectives functions: List of function definitions in provider-specific format Optional fields: + role_messages: List of message dicts defining the bot's role/personality pre_actions: Actions to execute before LLM inference post_actions: Actions to execute after LLM inference Example: { - "messages": [ + "role_messages": [ { "role": "system", - "content": "You are handling orders..." + "content": "You are a helpful assistant..." } ], - "functions": [ + "task_messages": [ { - "type": "function", - "function": { - "name": "process_order", - "description": "Process the order", - "parameters": {...} - } - } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Processing your order..." + "role": "system", + "content": "Ask the user for their name..." } ], - "post_actions": [ - { - "type": "update_db", - "user_id": 123, - "data": {"status": "completed"} - } - ] + "functions": [...], + "pre_actions": [...], + "post_actions": [...] } """ + role_messages: List[Dict[str, Any]] + pre_actions: List[Dict[str, Any]] + post_actions: List[Dict[str, Any]] + + role_messages: List[Dict[str, Any]] pre_actions: List[Dict[str, Any]] post_actions: List[Dict[str, Any]] @@ -114,12 +106,13 @@ class FlowConfig(TypedDict): "initial_node": "greeting", "nodes": { "greeting": { - "messages": [...], + "role_messages": [...], + "task_messages": [...], "functions": [...], "pre_actions": [...] }, "process_order": { - "messages": [...], + "task_messages": [...], "functions": [...], "post_actions": [...] } From f9f8764a8beb2269802d4c066583755d191a8fd6 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Dec 2024 17:00:35 -0500 Subject: [PATCH 02/10] Update FlowManager to include role and task messages; all node processing happens in set_node, update examples --- examples/dynamic/insurance_anthropic.py | 46 +++++++-------- examples/dynamic/insurance_gemini.py | 38 ++++++------- examples/dynamic/insurance_openai.py | 36 ++++++------ examples/static/movie_explorer_anthropic.py | 25 ++++---- examples/static/movie_explorer_gemini.py | 18 +++--- examples/static/movie_explorer_openai.py | 18 +++--- src/pipecat_flows/manager.py | 63 ++++++++------------- 7 files changed, 110 insertions(+), 134 deletions(-) diff --git a/examples/dynamic/insurance_anthropic.py b/examples/dynamic/insurance_anthropic.py index e099720..9ede461 100644 --- a/examples/dynamic/insurance_anthropic.py +++ b/examples/dynamic/insurance_anthropic.py @@ -153,7 +153,22 @@ async def end_quote() -> FlowResult: def create_initial_node() -> NodeConfig: """Create the initial node asking for age.""" return { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": [ + { + "type": "text", + "text": ( + "You are a friendly insurance agent. Your responses will be " + "converted to audio, so avoid special characters. Always use " + "the available functions to progress the conversation naturally." + ), + } + ], + } + ], + "task_messages": [ { "role": "user", "content": [ @@ -186,7 +201,7 @@ def create_initial_node() -> NodeConfig: def create_marital_status_node() -> NodeConfig: """Create node for collecting marital status.""" return { - "messages": [ + "task_messages": [ { "role": "user", "content": [ @@ -217,7 +232,7 @@ def create_marital_status_node() -> NodeConfig: def create_quote_calculation_node(age: int, marital_status: str) -> NodeConfig: """Create node for calculating initial quote.""" return { - "messages": [ + "task_messages": [ { "role": "user", "content": [ @@ -255,7 +270,7 @@ def create_quote_results_node( ) -> NodeConfig: """Create node for showing quote and adjustment options.""" return { - "messages": [ + "task_messages": [ { "role": "user", "content": [ @@ -301,7 +316,7 @@ def create_quote_results_node( def create_end_node() -> NodeConfig: """Create the final node.""" return { - "messages": [ + "task_messages": [ { "role": "user", "content": [ @@ -385,24 +400,7 @@ async def main(): api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-latest" ) - # Create initial context - messages = [ - { - "role": "user", - "content": [ - { - "type": "text", - "text": ( - "You are a friendly insurance agent. Your responses will be " - "converted to audio, so avoid special characters. Always use " - "the available functions to progress the conversation naturally." - ), - } - ], - } - ] - - context = OpenAILLMContext(messages, []) + context = OpenAILLMContext() context_aggregator = llm.create_context_aggregator(context) # Create pipeline @@ -429,7 +427,7 @@ async def main(): async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) # Initialize flow - await flow_manager.initialize(messages) + await flow_manager.initialize() # Set initial node await flow_manager.set_node("initial", create_initial_node()) await task.queue_frames([context_aggregator.user().get_context_frame()]) diff --git a/examples/dynamic/insurance_gemini.py b/examples/dynamic/insurance_gemini.py index 0c82ff6..50b1883 100644 --- a/examples/dynamic/insurance_gemini.py +++ b/examples/dynamic/insurance_gemini.py @@ -153,7 +153,18 @@ async def end_quote() -> FlowResult: def create_initial_node() -> NodeConfig: """Create the initial node asking for age.""" return { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": ( + "You are a friendly insurance agent. Your responses will be " + "converted to audio, so avoid special characters. " + "Always wait for customer responses before calling functions. " + "Only call functions after receiving relevant information from the customer." + ), + } + ], + "task_messages": [ { "role": "system", "content": "Start by asking for the customer's age.", @@ -180,7 +191,7 @@ def create_initial_node() -> NodeConfig: def create_marital_status_node() -> NodeConfig: """Create node for collecting marital status.""" return { - "messages": [ + "task_messages": [ { "role": "system", "content": ( @@ -215,7 +226,7 @@ def create_marital_status_node() -> NodeConfig: def create_quote_calculation_node(age: int, marital_status: str) -> NodeConfig: """Create node for calculating initial quote.""" return { - "messages": [ + "task_messages": [ { "role": "system", "content": ( @@ -252,7 +263,7 @@ def create_quote_results_node( ) -> NodeConfig: """Create node for showing quote and adjustment options.""" return { - "messages": [ + "task_messages": [ { "role": "system", "content": ( @@ -301,7 +312,7 @@ def create_quote_results_node( def create_end_node() -> NodeConfig: """Create the final node.""" return { - "messages": [ + "task_messages": [ { "role": "system", "content": ( @@ -374,20 +385,7 @@ async def main(): tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-exp") - # Create initial context - messages = [ - { - "role": "system", - "content": ( - "You are a friendly insurance agent. Your responses will be " - "converted to audio, so avoid special characters. " - "Always wait for customer responses before calling functions. " - "Only call functions after receiving relevant information from the customer." - ), - } - ] - - context = OpenAILLMContext(messages, []) + context = OpenAILLMContext() context_aggregator = llm.create_context_aggregator(context) # Create pipeline @@ -414,7 +412,7 @@ async def main(): async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) # Initialize flow - await flow_manager.initialize(messages) + await flow_manager.initialize() # Set initial node await flow_manager.set_node("initial", create_initial_node()) await task.queue_frames([context_aggregator.user().get_context_frame()]) diff --git a/examples/dynamic/insurance_openai.py b/examples/dynamic/insurance_openai.py index 2435f31..d100e37 100644 --- a/examples/dynamic/insurance_openai.py +++ b/examples/dynamic/insurance_openai.py @@ -153,7 +153,17 @@ async def end_quote() -> FlowResult: def create_initial_node() -> NodeConfig: """Create the initial node asking for age.""" return { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": ( + "You are a friendly insurance agent. Your responses will be " + "converted to audio, so avoid special characters. Always use " + "the available functions to progress the conversation naturally." + ), + } + ], + "task_messages": [ { "role": "system", "content": "Start by asking for the customer's age.", @@ -180,7 +190,7 @@ def create_initial_node() -> NodeConfig: def create_marital_status_node() -> NodeConfig: """Create node for collecting marital status.""" return { - "messages": [ + "task_messages": [ { "role": "system", "content": "Ask about the customer's marital status for premium calculation.", @@ -210,7 +220,7 @@ def create_marital_status_node() -> NodeConfig: def create_quote_calculation_node(age: int, marital_status: str) -> NodeConfig: """Create node for calculating initial quote.""" return { - "messages": [ + "task_messages": [ { "role": "system", "content": ( @@ -249,7 +259,7 @@ def create_quote_results_node( ) -> NodeConfig: """Create node for showing quote and adjustment options.""" return { - "messages": [ + "task_messages": [ { "role": "system", "content": ( @@ -296,7 +306,7 @@ def create_quote_results_node( def create_end_node() -> NodeConfig: """Create the final node.""" return { - "messages": [ + "task_messages": [ { "role": "system", "content": ( @@ -383,19 +393,7 @@ async def main(): tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - # Create initial context - messages = [ - { - "role": "system", - "content": ( - "You are a friendly insurance agent. Your responses will be " - "converted to audio, so avoid special characters. Always use " - "the available functions to progress the conversation naturally." - ), - } - ] - - context = OpenAILLMContext(messages, []) + context = OpenAILLMContext() context_aggregator = llm.create_context_aggregator(context) # Create pipeline @@ -422,7 +420,7 @@ async def main(): async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) logger.debug("Initializing flow") - await flow_manager.initialize(messages) + await flow_manager.initialize() logger.debug("Setting initial node") await flow_manager.set_node("initial", create_initial_node()) logger.debug("Queueing initial context") diff --git a/examples/static/movie_explorer_anthropic.py b/examples/static/movie_explorer_anthropic.py index e880a4a..664e477 100644 --- a/examples/static/movie_explorer_anthropic.py +++ b/examples/static/movie_explorer_anthropic.py @@ -300,23 +300,18 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error # Flow configuration flow_config: FlowConfig = { "initial_node": "greeting", - "initial_system_message": [ - { - "role": "system", - "content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.", - } - ], "nodes": { "greeting": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.", + } + ], + "task_messages": [ { "role": "user", - "content": [ - { - "type": "text", - "text": "Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.", - } - ], + "content": "Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.", } ], "functions": [ @@ -337,7 +332,7 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error ], }, "explore_movie": { - "messages": [ + "task_messages": [ { "role": "user", "content": [ @@ -401,7 +396,7 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error ], }, "end": { - "messages": [ + "task_messages": [ { "role": "user", "content": [ diff --git a/examples/static/movie_explorer_gemini.py b/examples/static/movie_explorer_gemini.py index 176f20e..46e35d9 100644 --- a/examples/static/movie_explorer_gemini.py +++ b/examples/static/movie_explorer_gemini.py @@ -298,15 +298,15 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error # Flow configuration flow_config: FlowConfig = { "initial_node": "greeting", - "initial_system_message": [ - { - "role": "system", - "content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.", - } - ], "nodes": { "greeting": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.", + } + ], + "task_messages": [ { "role": "system", "content": "Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.", @@ -334,7 +334,7 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error ], }, "explore_movie": { - "messages": [ + "task_messages": [ { "role": "system", "content": """Help the user learn more about movies. You can: @@ -397,7 +397,7 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error ], }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Thank the user warmly and mention they can return anytime to discover more movies.", diff --git a/examples/static/movie_explorer_openai.py b/examples/static/movie_explorer_openai.py index f1ecafa..b73c671 100644 --- a/examples/static/movie_explorer_openai.py +++ b/examples/static/movie_explorer_openai.py @@ -302,15 +302,15 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error # Flow configuration flow_config: FlowConfig = { "initial_node": "greeting", - "initial_system_message": [ - { - "role": "system", - "content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.", - } - ], "nodes": { "greeting": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.", + } + ], + "task_messages": [ { "role": "system", "content": "Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.", @@ -340,7 +340,7 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error ], }, "explore_movie": { - "messages": [ + "task_messages": [ { "role": "system", "content": """Help the user learn more about movies. You can: @@ -414,7 +414,7 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error ], }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Thank the user warmly and mention they can return anytime to discover more movies.", diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index 35c51e3..92ea254 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -97,13 +97,11 @@ def __init__( if flow_config: self.nodes = flow_config["nodes"] self.initial_node = flow_config["initial_node"] - self.initial_system_message = flow_config.get("initial_system_message", []) self.transition_callback = self._handle_static_transition logger.debug("Initialized in static mode") else: self.nodes = {} self.initial_node = None - self.initial_system_message = None self.transition_callback = transition_callback logger.debug("Initialized in dynamic mode") @@ -111,42 +109,13 @@ def __init__( self.current_functions: Set[str] = set() # Track registered functions self.current_node: Optional[str] = None - async def initialize(self, initial_messages: Optional[List[dict]] = None) -> None: - """Initialize the flow. - - For static flows, sets the initial system message from config. - For dynamic flows, uses provided initial_messages. - - Args: - initial_messages: Optional initial system messages for dynamic flows. - Ignored for static flows. - - Raises: - FlowInitializationError: If initialization fails or if initial_messages - is missing for dynamic flows. - """ + async def initialize(self) -> None: + """Initialize the flow manager.""" if self.initialized: logger.warning(f"{self.__class__.__name__} already initialized") return try: - if self.nodes: # Static flow - if initial_messages: - logger.warning("Initial messages ignored for static flow configuration") - - # Set initial system message if present - if self.initial_system_message: - await self.task.queue_frame( - LLMMessagesUpdateFrame(messages=self.initial_system_message) - ) - - else: # Dynamic flow - if not initial_messages: - raise FlowInitializationError("Initial messages required for dynamic flow") - - # Initialize context with provided messages - await self.task.queue_frame(LLMMessagesUpdateFrame(messages=initial_messages)) - self.initialized = True logger.debug(f"Initialized {self.__class__.__name__}") @@ -375,9 +344,17 @@ async def set_node(self, node_id: str, node_config: NodeConfig) -> None: self._validate_node_config(node_id, node_config) logger.debug(f"Setting node: {node_id}") + # Execute pre-actions if any if pre_actions := node_config.get("pre_actions"): await self._execute_actions(pre_actions=pre_actions) + # Combine role and task messages + messages = [] + if role_messages := node_config.get("role_messages"): + messages.extend(role_messages) + messages.extend(node_config["task_messages"]) + + # Register functions and prepare tools tools = [] new_functions: Set[str] = set() @@ -414,7 +391,7 @@ async def set_node(self, node_id: str, node_config: NodeConfig) -> None: formatted_tools = self.adapter.format_functions(tools) # Update LLM context - await self._update_llm_context(node_config["messages"], formatted_tools) + await self._update_llm_context(messages, formatted_tools) logger.debug("Updated LLM context") # Execute post-actions if any @@ -437,11 +414,21 @@ async def _update_llm_context(self, messages: List[dict], functions: List[dict]) Args: messages: New messages to add to context functions: New functions to make available + + Raises: + FlowError: If context update fails """ try: + # Determine frame type based on whether this is the first node + frame_type = ( + LLMMessagesUpdateFrame if self.current_node is None else LLMMessagesAppendFrame + ) + await self.task.queue_frames( - [LLMMessagesAppendFrame(messages=messages), LLMSetToolsFrame(tools=functions)] + [frame_type(messages=messages), LLMSetToolsFrame(tools=functions)] ) + + logger.debug(f"Updated LLM context using {frame_type.__name__}") except Exception as e: logger.error(f"Failed to update LLM context: {str(e)}") raise FlowError(f"Context update failed: {str(e)}") from e @@ -464,7 +451,7 @@ def _validate_node_config(self, node_id: str, config: NodeConfig) -> None: """Validate the configuration of a conversation node. This method ensures that: - 1. Required fields (messages, functions) are present + 1. Required fields (task_messages, functions) are present 2. Functions have valid configurations based on their type: - Node functions must have either a handler or transition_to - Edge functions (matching node names) are allowed without handlers @@ -478,8 +465,8 @@ def _validate_node_config(self, node_id: str, config: NodeConfig) -> None: ValueError: If configuration is invalid """ # Check required fields - if "messages" not in config: - raise ValueError(f"Node '{node_id}' missing required 'messages' field") + if "task_messages" not in config: + raise ValueError(f"Node '{node_id}' missing required 'task_messages' field") if "functions" not in config: raise ValueError(f"Node '{node_id}' missing required 'functions' field") From 2e598f2872ed8327cd8342c14fa2369a1b82e67b Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Dec 2024 17:20:21 -0500 Subject: [PATCH 03/10] Fix the gemini dynamic example --- examples/dynamic/insurance_gemini.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/examples/dynamic/insurance_gemini.py b/examples/dynamic/insurance_gemini.py index 50b1883..8d994c0 100644 --- a/examples/dynamic/insurance_gemini.py +++ b/examples/dynamic/insurance_gemini.py @@ -172,17 +172,18 @@ def create_initial_node() -> NodeConfig: ], "functions": [ { - "type": "function", - "function": { - "name": "collect_age", - "handler": collect_age, - "description": "Record customer's age", - "parameters": { - "type": "object", - "properties": {"age": {"type": "integer"}}, - "required": ["age"], - }, - }, + "function_declarations": [ + { + "name": "collect_age", + "handler": collect_age, + "description": "Record customer's age", + "parameters": { + "type": "object", + "properties": {"age": {"type": "integer"}}, + "required": ["age"], + }, + } + ] } ], } From f380337ea5a80b86a80c3fada867ad782e3baf2b Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Dec 2024 17:29:49 -0500 Subject: [PATCH 04/10] Update changelog --- CHANGELOG.md | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e56f465..8aa68ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,16 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -### Added +### Changed -- New `initial_system_message` field in `FlowConfig`, which allows setting a - global system message for static flows. +- Nodes now have two message types to better delineate defining the role or + persona of the bot from the task it needs to accomplish. The message types are: -### Changed + - `role_message`, which defines the personality or role of the bot + - `task_message`, which defines the task to be completed for a given node + +- `role_messages` can be defined for the initial node and then inherited by + subsequent nodes. You can treat this as an LLM "system" message. - Simplified FlowManager initialization by removing the need for manual context - setup in static flows. -- Updated static examples to use the updated API. + setup in both static and dynamic flows. Now, you need to create a `FlowManager` + and initialize it to start the flow. +- All examples have been updated to align with the API changes. ## [0.0.9] - 2024-12-08 From 8ef7cca456b9311e11144c05cab96a5d58e414e0 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Dec 2024 17:59:53 -0500 Subject: [PATCH 05/10] Improve transition callbacks and handlers for dynamic examples --- examples/dynamic/insurance_anthropic.py | 67 ++++++++++--------- examples/dynamic/insurance_gemini.py | 71 ++++++++++---------- examples/dynamic/insurance_openai.py | 86 +++++++++++-------------- 3 files changed, 113 insertions(+), 111 deletions(-) diff --git a/examples/dynamic/insurance_anthropic.py b/examples/dynamic/insurance_anthropic.py index 9ede461..977d1ce 100644 --- a/examples/dynamic/insurance_anthropic.py +++ b/examples/dynamic/insurance_anthropic.py @@ -342,38 +342,47 @@ def create_end_node() -> NodeConfig: } -# Transition callback -async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager): - """Handle transitions between insurance flow states.""" - logger.debug(f"Handling transition for function: {function_name} with args: {args}") - - if function_name == "collect_age": - flow_manager.state["age"] = args["age"] - await flow_manager.set_node("marital_status", create_marital_status_node()) - - elif function_name == "collect_marital_status": - flow_manager.state["marital_status"] = args["marital_status"] - await flow_manager.set_node( - "quote_calculation", - create_quote_calculation_node( - flow_manager.state["age"], flow_manager.state["marital_status"] - ), - ) +# Transition callbacks and handlers +async def handle_age_collection(args: Dict, flow_manager: FlowManager): + flow_manager.state["age"] = args["age"] + await flow_manager.set_node("marital_status", create_marital_status_node()) + +async def handle_marital_status_collection(args: Dict, flow_manager: FlowManager): + flow_manager.state["marital_status"] = args["marital_status"] + await flow_manager.set_node( + "quote_calculation", + create_quote_calculation_node( + flow_manager.state["age"], + flow_manager.state["marital_status"] + ), + ) - elif function_name == "calculate_quote": - # Calculate the quote using the handler - quote = await calculate_quote(args) - flow_manager.state["quote"] = quote - await flow_manager.set_node("quote_results", create_quote_results_node(quote)) +async def handle_quote_calculation(args: Dict, flow_manager: FlowManager): + quote = await calculate_quote(args) + flow_manager.state["quote"] = quote + await flow_manager.set_node("quote_results", create_quote_results_node(quote)) + +async def handle_coverage_update(args: Dict, flow_manager: FlowManager): + updated_quote = await update_coverage(args) + flow_manager.state["quote"] = updated_quote + await flow_manager.set_node("quote_results", create_quote_results_node(updated_quote)) + +async def handle_end_quote(_: Dict, flow_manager: FlowManager): + await flow_manager.set_node("end", create_end_node()) + +HANDLERS = { + "collect_age": handle_age_collection, + "collect_marital_status": handle_marital_status_collection, + "calculate_quote": handle_quote_calculation, + "update_coverage": handle_coverage_update, + "end_quote": handle_end_quote, +} - elif function_name == "update_coverage": - # Calculate updated quote using the handler - updated_quote = await update_coverage(args) - flow_manager.state["quote"] = updated_quote - await flow_manager.set_node("quote_results", create_quote_results_node(updated_quote)) +async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager): + """Handle transitions between insurance flow states.""" + logger.debug(f"Processing {function_name} transition with args: {args}") + await HANDLERS[function_name](args, flow_manager) - elif function_name == "end_quote": - await flow_manager.set_node("end", create_end_node()) async def main(): diff --git a/examples/dynamic/insurance_gemini.py b/examples/dynamic/insurance_gemini.py index 8d994c0..edd60be 100644 --- a/examples/dynamic/insurance_gemini.py +++ b/examples/dynamic/insurance_gemini.py @@ -220,7 +220,6 @@ def create_marital_status_node() -> NodeConfig: ] } ], - "pre_actions": [{"type": "tts_say", "text": "Now, I'll need to know your marital status."}], } @@ -323,46 +322,50 @@ def create_end_node() -> NodeConfig: } ], "functions": [], - "pre_actions": [ - {"type": "tts_say", "text": "Thank you for getting a quote with us today!"} - ], "post_actions": [{"type": "end_conversation"}], } -# Transition callback -async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager): - """Handle transitions between insurance flow states.""" - logger.debug(f"Handling transition for function: {function_name} with args: {args}") - - if function_name == "collect_age": - flow_manager.state["age"] = args["age"] - await flow_manager.set_node("marital_status", create_marital_status_node()) - - elif function_name == "collect_marital_status": - flow_manager.state["marital_status"] = args["marital_status"] - await flow_manager.set_node( - "quote_calculation", - create_quote_calculation_node( - flow_manager.state["age"], flow_manager.state["marital_status"] - ), - ) - - elif function_name == "calculate_quote": - # Calculate the quote using the handler - quote = await calculate_quote(args) - flow_manager.state["quote"] = quote - await flow_manager.set_node("quote_results", create_quote_results_node(quote)) +# Transition callbacks and handlers +async def handle_age_collection(args: Dict, flow_manager: FlowManager): + flow_manager.state["age"] = args["age"] + await flow_manager.set_node("marital_status", create_marital_status_node()) - elif function_name == "update_coverage": - # Calculate updated quote using the handler - updated_quote = await update_coverage(args) - flow_manager.state["quote"] = updated_quote - await flow_manager.set_node("quote_results", create_quote_results_node(updated_quote)) +async def handle_marital_status_collection(args: Dict, flow_manager: FlowManager): + flow_manager.state["marital_status"] = args["marital_status"] + await flow_manager.set_node( + "quote_calculation", + create_quote_calculation_node( + flow_manager.state["age"], + flow_manager.state["marital_status"] + ), + ) - elif function_name == "end_quote": - await flow_manager.set_node("end", create_end_node()) +async def handle_quote_calculation(args: Dict, flow_manager: FlowManager): + quote = await calculate_quote(args) + flow_manager.state["quote"] = quote + await flow_manager.set_node("quote_results", create_quote_results_node(quote)) + +async def handle_coverage_update(args: Dict, flow_manager: FlowManager): + updated_quote = await update_coverage(args) + flow_manager.state["quote"] = updated_quote + await flow_manager.set_node("quote_results", create_quote_results_node(updated_quote)) + +async def handle_end_quote(_: Dict, flow_manager: FlowManager): + await flow_manager.set_node("end", create_end_node()) + +HANDLERS = { + "collect_age": handle_age_collection, + "collect_marital_status": handle_marital_status_collection, + "calculate_quote": handle_quote_calculation, + "update_coverage": handle_coverage_update, + "end_quote": handle_end_quote, +} +async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager): + """Handle transitions between insurance flow states.""" + logger.debug(f"Processing {function_name} transition with args: {args}") + await HANDLERS[function_name](args, flow_manager) async def main(): """Main function to set up and run the insurance quote bot.""" diff --git a/examples/dynamic/insurance_openai.py b/examples/dynamic/insurance_openai.py index d100e37..f03dc38 100644 --- a/examples/dynamic/insurance_openai.py +++ b/examples/dynamic/insurance_openai.py @@ -213,7 +213,6 @@ def create_marital_status_node() -> NodeConfig: }, } ], - "pre_actions": [{"type": "tts_say", "text": "Now, I'll need to know your marital status."}], } @@ -316,59 +315,50 @@ def create_end_node() -> NodeConfig: } ], "functions": [], - "pre_actions": [ - {"type": "tts_say", "text": "Thank you for getting a quote with us today!"} - ], "post_actions": [{"type": "end_conversation"}], } -# Transition callback +# Transition callbacks and handlers +async def handle_age_collection(args: Dict, flow_manager: FlowManager): + flow_manager.state["age"] = args["age"] + await flow_manager.set_node("marital_status", create_marital_status_node()) + +async def handle_marital_status_collection(args: Dict, flow_manager: FlowManager): + flow_manager.state["marital_status"] = args["marital_status"] + await flow_manager.set_node( + "quote_calculation", + create_quote_calculation_node( + flow_manager.state["age"], + flow_manager.state["marital_status"] + ), + ) + +async def handle_quote_calculation(args: Dict, flow_manager: FlowManager): + quote = await calculate_quote(args) + flow_manager.state["quote"] = quote + await flow_manager.set_node("quote_results", create_quote_results_node(quote)) + +async def handle_coverage_update(args: Dict, flow_manager: FlowManager): + updated_quote = await update_coverage(args) + flow_manager.state["quote"] = updated_quote + await flow_manager.set_node("quote_results", create_quote_results_node(updated_quote)) + +async def handle_end_quote(_: Dict, flow_manager: FlowManager): + await flow_manager.set_node("end", create_end_node()) + +HANDLERS = { + "collect_age": handle_age_collection, + "collect_marital_status": handle_marital_status_collection, + "calculate_quote": handle_quote_calculation, + "update_coverage": handle_coverage_update, + "end_quote": handle_end_quote, +} + async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager): """Handle transitions between insurance flow states.""" - logger.debug(f"Transition callback executing for function: {function_name} with args: {args}") - - if function_name == "collect_age": - logger.debug("Processing collect_age transition") - flow_manager.state["age"] = args["age"] - await flow_manager.set_node("marital_status", create_marital_status_node()) - logger.debug("Completed collect_age transition") - - elif function_name == "collect_marital_status": - logger.debug("Processing collect_marital_status transition") - flow_manager.state["marital_status"] = args["marital_status"] - await flow_manager.set_node( - "quote_calculation", - create_quote_calculation_node( - flow_manager.state["age"], flow_manager.state["marital_status"] - ), - ) - logger.debug("Completed collect_marital_status transition") - - elif function_name == "calculate_quote": - logger.debug("Processing calculate_quote transition") - quote = await calculate_quote(args) - flow_manager.state["quote"] = quote - await flow_manager.set_node( - "quote_results", - create_quote_results_node(quote), - ) - logger.debug("Completed calculate_quote transition") - - elif function_name == "update_coverage": - logger.debug("Processing update_coverage transition") - updated_quote = await update_coverage(args) - flow_manager.state["quote"] = updated_quote - await flow_manager.set_node( - "quote_results", - create_quote_results_node(updated_quote), - ) - logger.debug("Completed update_coverage transition") - - elif function_name == "end_quote": - logger.debug("Processing end_quote transition") - await flow_manager.set_node("end", create_end_node()) - logger.debug("Completed end_quote transition") + logger.debug(f"Processing {function_name} transition with args: {args}") + await HANDLERS[function_name](args, flow_manager) async def main(): From 3e8779a103718899f14397659502c5d6415cb514 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Dec 2024 18:09:10 -0500 Subject: [PATCH 06/10] Update the README --- README.md | 108 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index bb0203c..19a40dd 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ pip install "pipecat-ai[daily,google,deepgram,cartesia]" # For Google ## Quick Start -Here's a basic example of setting up a conversation flow: +Here's a basic example of setting up a static conversation flow: ```python from pipecat_flows import FlowManager @@ -46,18 +46,10 @@ from pipecat_flows import FlowManager # Initialize flow manager with static configuration flow_manager = FlowManager(task, llm, tts, flow_config=flow_config) -# Or with dynamic flow handling -flow_manager = FlowManager( - task, - llm, - tts, - transition_callback=handle_transitions -) - @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) - await flow_manager.initialize(messages) + await flow_manager.initialize() await task.queue_frames([context_aggregator.user().get_context_frame()]) ``` @@ -71,17 +63,32 @@ Each conversation flow consists of nodes that define the conversation structure. #### Messages -Messages set the context for the LLM at each state: +Nodes use two types of messages to control the conversation: + +1. **Role Messages**: Define the bot's personality or role (optional) ```python -"messages": [ +"role_messages": [ { "role": "system", - "content": "You are handling pizza orders. Ask for size selection." + "content": "You are a friendly pizza ordering assistant. Keep responses casual and upbeat." } ] ``` +2. **Task Messages**: Define what the bot should do in the current node + +```python +"task_messages": [ + { + "role": "system", + "content": "Ask the customer which pizza size they'd like: small, medium, or large." + } +] +``` + +Role messages are typically defined in your initial node and inherited by subsequent nodes, while task messages are specific to each node's purpose. + #### Functions Functions come in two types: @@ -101,7 +108,6 @@ Functions come in two types: "size": {"type": "string", "enum": ["small", "medium", "large"]} } }, - "transition_to": "next_node" # Optional: Specify next node } } ``` @@ -113,6 +119,7 @@ Functions come in two types: "type": "function", "function": { "name": "next_step", + "handler": select_size_handler, # Optional handler "description": "Move to next state", "parameters": {"type": "object", "properties": {}}, "transition_to": "target_node" # Required: Specify target node @@ -129,7 +136,10 @@ Functions can: #### Actions -Actions execute during state transitions: +There are two types of actions available: + +- `pre_actions`: Run before the LLM inference. For long function calls, you can use a pre_action for the TTS to say something, like "Hold on a moment..." +- `post_actions`: Run after the LLM inference. This is handy for actions like ending or transferring a call. ```python "pre_actions": [ @@ -137,9 +147,16 @@ Actions execute during state transitions: "type": "tts_say", "text": "Processing your order..." } +], +"post_actions": [ + { + "type": "end_conversation" + } ] ``` +Learn more about built-in actions and defining your own action in the docs. + #### Provider-Specific Formats Pipecat Flows automatically handles format differences between LLM providers: @@ -189,15 +206,15 @@ The FlowManager handles both static and dynamic flows through a unified interfac # Define flow configuration upfront flow_config = { "initial_node": "greeting", - "initial_system_message": [ - { - "role": "system", - "content": "You are a helpful assistant. Your responses will be converted to audio." - } - ], "nodes": { "greeting": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are a helpful assistant. Your responses will be converted to audio." + } + ], + "task_messages": [ { "role": "system", "content": "Start by greeting the user and asking for their name." @@ -225,16 +242,49 @@ await flow_manager.initialize() #### Dynamic Flows ```python -# Define transition handling -async def handle_transitions(function_name: str, args: Dict, flow_manager): - if function_name == "collect_age": - await flow_manager.set_node("next_step", create_next_node()) - -system_message = "You are an assistant." +def create_initial_node() -> NodeConfig: + return { + "role_messages": [ + { + "role": "system", + "content": "You are a helpful assistant." + } + ], + "task_messages": [ + { + "role": "system", + "content": "Ask the user for their age." + } + ], + "functions": [ + { + "type": "function", + "function": { + "name": "collect_age", + "handler": collect_age, + "description": "Record user's age", + "parameters": { + "type": "object", + "properties": { + "age": {"type": "integer"} + }, + "required": ["age"] + } + } + } + ] + } # Initialize with transition callback flow_manager = FlowManager(task, llm, tts, transition_callback=handle_transitions) -await flow_manager.initialize(system_message) +await flow_manager.initialize() + +@transport.event_handler("on_first_participant_joined") +async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + await flow_manager.initialize() + await flow_manager.set_node("initial", create_initial_node()) + await task.queue_frames([context_aggregator.user().get_context_frame()]) ``` ## Examples From 1793372e7c5c525024a1fb01b0ac72a12dd0726f Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Dec 2024 18:48:26 -0500 Subject: [PATCH 07/10] Update tests --- tests/test_manager.py | 453 ++++++++++++++++++++++++------------------ 1 file changed, 256 insertions(+), 197 deletions(-) diff --git a/tests/test_manager.py b/tests/test_manager.py index ea4c960..f2b1724 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -21,7 +21,7 @@ import unittest from unittest.mock import AsyncMock, MagicMock, patch -from pipecat.frames.frames import LLMMessagesUpdateFrame +from pipecat.frames.frames import LLMMessagesAppendFrame, LLMMessagesUpdateFrame from pipecat.services.openai import OpenAILLMService from pipecat_flows.exceptions import FlowError, FlowInitializationError, FlowTransitionError @@ -46,12 +46,10 @@ async def asyncSetUp(self): self.mock_llm = MagicMock(spec=OpenAILLMService) self.mock_tts = AsyncMock() - # Sample messages for dynamic flows - self.initial_messages = [{"role": "system", "content": "Initial message"}] - # Sample node configurations - self.sample_node_config = { - "messages": [{"role": "system", "content": "Test message"}], + self.sample_node = { + "role_messages": [{"role": "system", "content": "You are a helpful test assistant."}], + "task_messages": [{"role": "system", "content": "Complete the test task."}], "functions": [ { "type": "function", @@ -68,10 +66,9 @@ async def asyncSetUp(self): # Sample static flow configuration self.static_flow_config = { "initial_node": "start", - "initial_system_message": [{"role": "system", "content": "Global system message"}], "nodes": { - "start": self.sample_node_config, - "next_node": self.sample_node_config, + "start": self.sample_node, + "next_node": self.sample_node, }, } @@ -89,110 +86,168 @@ async def test_static_flow_initialization(self): self.assertEqual(flow_manager.nodes, self.static_flow_config["nodes"]) self.assertEqual(flow_manager.transition_callback.__name__, "_handle_static_transition") - # Initialize flow (no messages needed for static flow) + # Initialize flow await flow_manager.initialize() # Verify initialization self.assertTrue(flow_manager.initialized) - # Verify the initial system message was queued - calls = self.mock_task.queue_frame.call_args_list - update_frame_calls = [ - call for call in calls if isinstance(call[0][0], LLMMessagesUpdateFrame) - ] - self.assertTrue( - any( - call[0][0].messages == self.static_flow_config["initial_system_message"] - for call in update_frame_calls - ) - ) + # Verify the initial node was set + self.assertEqual(flow_manager.current_node, "start") + + # Verify the messages were queued with UpdateFrame + calls = self.mock_task.queue_frames.call_args_list + self.assertEqual(len(calls), 1) # Should be called once + + # Get the frames from the first call + frames = calls[0][0][0] # First call, first argument, which is the list of frames + update_frames = [f for f in frames if isinstance(f, LLMMessagesUpdateFrame)] + self.assertEqual(len(update_frames), 1) + + # Verify the combined messages were sent + expected_messages = self.sample_node["role_messages"] + self.sample_node["task_messages"] + actual_messages = update_frames[0].messages + self.assertEqual(actual_messages, expected_messages) async def test_dynamic_flow_initialization(self): """Test initialization of dynamic flow.""" - - async def transition_callback(function_name, args, flow_manager): - pass + # Create mock transition callback + mock_transition_callback = AsyncMock() flow_manager = FlowManager( task=self.mock_task, llm=self.mock_llm, tts=self.mock_tts, - transition_callback=transition_callback, + transition_callback=mock_transition_callback, ) # Verify dynamic mode setup self.assertIsNone(flow_manager.initial_node) self.assertEqual(flow_manager.nodes, {}) - self.assertEqual(flow_manager.transition_callback, transition_callback) + self.assertEqual(flow_manager.transition_callback, mock_transition_callback) - # Initialize flow with required messages - await flow_manager.initialize(self.initial_messages) + # Initialize flow + await flow_manager.initialize() # Verify initialization self.assertTrue(flow_manager.initialized) - # Verify the messages were queued - calls = self.mock_task.queue_frame.call_args_list - update_frame_calls = [ - call for call in calls if isinstance(call[0][0], LLMMessagesUpdateFrame) - ] - self.assertTrue( - any(call[0][0].messages == self.initial_messages for call in update_frame_calls) - ) + # Verify no messages were queued during initialization + self.mock_task.queue_frames.assert_not_called() + + # Create and set initial node + initial_node = { + "role_messages": [{"role": "system", "content": "You are a helpful assistant."}], + "task_messages": [{"role": "system", "content": "Ask the user for their name."}], + "functions": [], + } + + # Reset mock to clear any previous calls + self.mock_task.queue_frames.reset_mock() + + # Set initial node + await flow_manager.set_node("initial", initial_node) + + # Verify frames were queued + self.mock_task.queue_frames.assert_called_once() + frames = self.mock_task.queue_frames.call_args[0][0] + + # Should have exactly one UpdateFrame (since it's first node) + update_frames = [f for f in frames if isinstance(f, LLMMessagesUpdateFrame)] + self.assertEqual(len(update_frames), 1, "Should have exactly one UpdateFrame") + + # Verify message content + expected_messages = initial_node["role_messages"] + initial_node["task_messages"] + actual_messages = update_frames[0].messages + self.assertEqual(actual_messages, expected_messages) async def test_static_flow_transitions(self): - """Test transitions in static flow.""" + """Test transitions in static flows. + + Verifies that: + 1. Static transitions correctly change the current node + 2. Node configuration is properly processed during transition + 3. Messages are sent using AppendFrame for non-initial nodes + """ + # Setup flow manager with static configuration flow_manager = FlowManager( task=self.mock_task, llm=self.mock_llm, tts=self.mock_tts, flow_config=self.static_flow_config, ) - await flow_manager.initialize([]) + + # Initialize and transition to first node + await flow_manager.initialize() + self.assertEqual(flow_manager.current_node, "start") + + # Clear mock call history to focus on transition + self.mock_task.queue_frames.reset_mock() # Test transition to next node await flow_manager._handle_static_transition("next_node", {}, flow_manager) + + # Verify node transition occurred self.assertEqual(flow_manager.current_node, "next_node") + # Verify frame handling + self.mock_task.queue_frames.assert_called_once() + frames = self.mock_task.queue_frames.call_args[0][0] + + # Should have exactly one AppendFrame and one SetToolsFrame + append_frames = [f for f in frames if isinstance(f, LLMMessagesAppendFrame)] + self.assertEqual(len(append_frames), 1, "Should have exactly one AppendFrame") + async def test_dynamic_flow_transitions(self): - """Test transitions in dynamic flow.""" - transition_called = False + """Test transitions in dynamic flow. - async def transition_callback(function_name, args, flow_manager): - nonlocal transition_called - transition_called = True - await flow_manager.set_node("dynamic_node", self.sample_node_config) + Verifies that: + 1. Transition callback is called with correct arguments + 2. Dynamic node transitions work properly + 3. State is updated correctly + """ + # Create mock transition callback + mock_transition_callback = AsyncMock() + # Initialize flow manager with mock callback flow_manager = FlowManager( task=self.mock_task, llm=self.mock_llm, tts=self.mock_tts, - transition_callback=transition_callback, + transition_callback=mock_transition_callback, ) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Set initial node - await flow_manager.set_node("start", self.sample_node_config) + await flow_manager.set_node("start", self.sample_node) self.assertEqual(flow_manager.current_node, "start") + # Reset frame tracking + self.mock_task.queue_frames.reset_mock() + # Trigger transition - await flow_manager.transition_callback("test_function", {}, flow_manager) - self.assertTrue(transition_called) - self.assertEqual(flow_manager.current_node, "dynamic_node") + test_function_name = "test_function" + test_args = {"test": "value"} + await flow_manager.transition_callback(test_function_name, test_args, flow_manager) + + # Verify callback was called with correct arguments + mock_transition_callback.assert_called_once_with( + test_function_name, test_args, flow_manager + ) async def test_node_validation(self): """Test node configuration validation.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) # Initialize first + await flow_manager.initialize() - # Test missing messages + # Test missing task_messages invalid_config = {"functions": []} with self.assertRaises(FlowError) as context: await flow_manager.set_node("test", invalid_config) - self.assertIn("missing required 'messages' field", str(context.exception)) + self.assertIn("missing required 'task_messages' field", str(context.exception)) # Test missing functions - invalid_config = {"messages": []} + invalid_config = {"task_messages": []} with self.assertRaises(FlowError) as context: await flow_manager.set_node("test", invalid_config) self.assertIn("missing required 'functions' field", str(context.exception)) @@ -200,10 +255,13 @@ async def test_node_validation(self): async def test_function_registration(self): """Test function registration with LLM.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() + + # Reset mock to clear initialization calls + self.mock_llm.register_function.reset_mock() # Set node with function - await flow_manager.set_node("test", self.sample_node_config) + await flow_manager.set_node("test", self.sample_node) # Verify function was registered self.mock_llm.register_function.assert_called_once() @@ -214,15 +272,22 @@ async def test_function_registration(self): async def test_action_execution(self): """Test execution of pre and post actions.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm, tts=self.mock_tts) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() + + # Create node config with actions + node_with_actions = { + "role_messages": self.sample_node["role_messages"], + "task_messages": self.sample_node["task_messages"], + "functions": self.sample_node["functions"], + "pre_actions": [{"type": "tts_say", "text": "Pre action"}], + "post_actions": [{"type": "tts_say", "text": "Post action"}], + } - # Add actions to node config - node_config = self.sample_node_config.copy() - node_config["pre_actions"] = [{"type": "tts_say", "text": "Pre action"}] - node_config["post_actions"] = [{"type": "tts_say", "text": "Post action"}] + # Reset mock to clear initialization calls + self.mock_tts.say.reset_mock() # Set node with actions - await flow_manager.set_node("test", node_config) + await flow_manager.set_node("test", node_with_actions) # Verify TTS was called for both actions self.assertEqual(self.mock_tts.say.call_count, 2) @@ -230,41 +295,55 @@ async def test_action_execution(self): self.mock_tts.say.assert_any_call("Post action") async def test_error_handling(self): - """Test error handling in flow manager.""" + """Test error handling in flow manager. + + Verifies: + 1. Cannot set node before initialization + 2. Initialization fails properly when task queue fails + 3. Node setting fails when task queue fails + """ flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - # Test initialization before setting node + # Test setting node before initialization with self.assertRaises(FlowTransitionError): - await flow_manager.set_node("test", self.sample_node_config) + await flow_manager.set_node("test", self.sample_node) + + # Initialize normally + await flow_manager.initialize() + self.assertTrue(flow_manager.initialized) - # Test initialization error - self.mock_task.queue_frame.side_effect = Exception("Queue error") - with self.assertRaises(FlowInitializationError): - await flow_manager.initialize([]) + # Test node setting error + self.mock_task.queue_frames.side_effect = Exception("Queue error") + with self.assertRaises(FlowError): + await flow_manager.set_node("test", self.sample_node) - # Verify initialization failed - self.assertFalse(flow_manager.initialized) + # Verify flow manager remains initialized despite error + self.assertTrue(flow_manager.initialized) async def test_state_management(self): """Test state management across nodes.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Set state data - flow_manager.state["test_data"] = "value" + test_value = "test_value" + flow_manager.state["test_key"] = test_value + + # Reset mock to clear initialization calls + self.mock_task.queue_frames.reset_mock() # Verify state persists across node transitions - await flow_manager.set_node("test", self.sample_node_config) - self.assertEqual(flow_manager.state["test_data"], "value") + await flow_manager.set_node("test", self.sample_node) + self.assertEqual(flow_manager.state["test_key"], test_value) async def test_multiple_function_registration(self): """Test registration of multiple functions.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Create node config with multiple functions node_config = { - "messages": [{"role": "system", "content": "Test"}], + "task_messages": [{"role": "system", "content": "Test"}], "functions": [ { "type": "function", @@ -287,11 +366,11 @@ async def test_multiple_function_registration(self): async def test_initialize_already_initialized(self): """Test initializing an already initialized flow manager.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Try to initialize again with patch("loguru.logger.warning") as mock_logger: - await flow_manager.initialize([]) + await flow_manager.initialize() mock_logger.assert_called_once() async def test_register_action(self): @@ -307,7 +386,7 @@ async def custom_action(action): async def test_call_handler_variations(self): """Test different handler signature variations.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Test handler with args async def handler_with_args(args): @@ -326,7 +405,7 @@ async def handler_no_args(): async def test_transition_func_error_handling(self): """Test error handling in transition functions.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() async def error_handler(args): raise ValueError("Test error") @@ -352,11 +431,11 @@ async def result_callback(result): async def test_node_validation_edge_cases(self): """Test edge cases in node validation.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Test function with missing name invalid_config = { - "messages": [], + "task_messages": [{"role": "system", "content": "Test"}], "functions": [{"type": "function"}], # Missing name } with self.assertRaises(FlowError) as context: @@ -365,7 +444,7 @@ async def test_node_validation_edge_cases(self): # Test node function without handler or transition_to invalid_config = { - "messages": [], + "task_messages": [{"role": "system", "content": "Test"}], "functions": [ { "type": "function", @@ -393,25 +472,6 @@ def capture_warning(msg, *args, **kwargs): warning_message, ) - async def test_pre_post_actions(self): - """Test pre and post actions in set_node.""" - flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) - - # Create node config with pre and post actions - node_config = { - "messages": [{"role": "system", "content": "Test"}], - "functions": [], - "pre_actions": [{"type": "tts_say", "text": "Pre"}], - "post_actions": [{"type": "tts_say", "text": "Post"}], - } - - await flow_manager.set_node("test", node_config) - - # Verify actions were executed in order - calls = self.mock_task.queue_frame.call_args_list - self.assertGreater(len(calls), 0) - async def test_transition_callback_error_handling(self): """Test error handling in transition callback.""" @@ -421,7 +481,7 @@ async def failing_transition(function_name, args, flow_manager): flow_manager = FlowManager( task=self.mock_task, llm=self.mock_llm, transition_callback=failing_transition ) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() transition_func = await flow_manager._create_transition_func( "test", None, transition_to=None @@ -435,7 +495,7 @@ async def result_callback(result): async def test_register_function_error_handling(self): """Test error handling in function registration.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Mock LLM to raise error on register_function flow_manager.llm.register_function.side_effect = Exception("Registration error") @@ -447,11 +507,11 @@ async def test_register_function_error_handling(self): async def test_action_execution_error_handling(self): """Test error handling in action execution.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Create node config with actions that will fail node_config = { - "messages": [{"role": "system", "content": "Test"}], + "task_messages": [{"role": "system", "content": "Test"}], "functions": [], "pre_actions": [{"type": "invalid_action"}], "post_actions": [{"type": "another_invalid_action"}], @@ -471,7 +531,7 @@ async def test_action_execution_error_handling(self): async def test_update_llm_context_error_handling(self): """Test error handling in LLM context updates.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Mock task to raise error on queue_frames flow_manager.task.queue_frames.side_effect = Exception("Queue error") @@ -484,7 +544,7 @@ async def test_update_llm_context_error_handling(self): async def test_handler_callback_completion(self): """Test handler completion callback and logging.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() async def test_handler(args): return {"status": "success", "data": "test"} @@ -505,7 +565,7 @@ async def result_callback(result): async def test_handler_removal_all_formats(self): """Test handler removal from different function configurations.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() async def dummy_handler(args): return {"status": "success"} @@ -537,14 +597,14 @@ async def dummy_handler(args): async def test_function_declarations_processing(self): """Test processing of function declarations format.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() async def test_handler(args): return {"status": "success"} # Create node config with OpenAI format for multiple functions node_config = { - "messages": [{"role": "system", "content": "Test"}], + "task_messages": [{"role": "system", "content": "Test"}], "functions": [ { "type": "function", @@ -574,40 +634,10 @@ async def test_handler(args): self.assertIn("test1", flow_manager.current_functions) self.assertIn("test2", flow_manager.current_functions) - async def test_direct_handler_format(self): - """Test processing of direct handler format.""" - flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) - - async def test_handler(args): - return {"status": "success"} - - # Create node config with OpenAI format - node_config = { - "messages": [{"role": "system", "content": "Test"}], - "functions": [ - { - "type": "function", - "function": { - "name": "test", - "handler": test_handler, - "description": "Test function", - "parameters": {}, - }, - } - ], - } - - # Set node and verify function registration - await flow_manager.set_node("test", node_config) - - # Verify function was registered - self.assertIn("test", flow_manager.current_functions) - async def test_function_token_handling_main_module(self): """Test handling of __function__: tokens when function is in main module.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Define test handler in main module async def test_handler_main(args): @@ -620,7 +650,7 @@ async def test_handler_main(args): try: node_config = { - "messages": [{"role": "system", "content": "Test"}], + "task_messages": [{"role": "system", "content": "Test"}], "functions": [ { "type": "function", @@ -644,10 +674,10 @@ async def test_handler_main(args): async def test_function_token_handling_not_found(self): """Test error handling when function is not found in any module.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() node_config = { - "messages": [{"role": "system", "content": "Test"}], + "task_messages": [{"role": "system", "content": "Test"}], "functions": [ { "type": "function", @@ -669,7 +699,7 @@ async def test_function_token_handling_not_found(self): async def test_function_token_execution(self): """Test that functions registered with __function__: token work when called.""" flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) - await flow_manager.initialize(self.initial_messages) + await flow_manager.initialize() # Define and register test handler test_called = False @@ -685,7 +715,7 @@ async def test_handler(args): try: node_config = { - "messages": [{"role": "system", "content": "Test"}], + "task_messages": [{"role": "system", "content": "Test"}], "functions": [ { "type": "function", @@ -714,64 +744,93 @@ async def callback(result): finally: delattr(sys.modules["__main__"], "test_handler") - async def test_static_flow_without_initial_system_message(self): - """Test static flow initialization without initial_system_message.""" - # Create config without initial_system_message - config = { - "initial_node": "start", - "nodes": { - "start": self.sample_node_config, - }, + async def test_role_message_inheritance(self): + """Test that role messages are properly handled between nodes. + + Verifies: + 1. Role messages are included in first node + 2. Role messages are included in subsequent nodes + 3. Messages are combined correctly + """ + flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) + await flow_manager.initialize() + + # First node with role messages + first_node = { + "role_messages": [{"role": "system", "content": "You are a helpful assistant."}], + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [], } - flow_manager = FlowManager( - task=self.mock_task, - llm=self.mock_llm, - tts=self.mock_tts, - flow_config=config, - ) + # Second node without role messages + second_node = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [], + } + # Set first node and verify UpdateFrame + await flow_manager.set_node("first", first_node) + first_calls = self.mock_task.queue_frames.call_args_list[-1] + first_frames = first_calls[0][0] + update_frames = [f for f in first_frames if isinstance(f, LLMMessagesUpdateFrame)] + self.assertEqual(len(update_frames), 1) + + # Verify combined messages in first node + expected_first_messages = first_node["role_messages"] + first_node["task_messages"] + self.assertEqual(update_frames[0].messages, expected_first_messages) + + # Reset mock and set second node + self.mock_task.queue_frames.reset_mock() + await flow_manager.set_node("second", second_node) + + # Verify AppendFrame for second node + second_calls = self.mock_task.queue_frames.call_args_list[-1] + second_frames = second_calls[0][0] + append_frames = [f for f in second_frames if isinstance(f, LLMMessagesAppendFrame)] + self.assertEqual(len(append_frames), 1) + + # Verify only task messages in second node + self.assertEqual(append_frames[0].messages, second_node["task_messages"]) + + async def test_frame_type_selection(self): + """Test that correct frame types are used based on node order. + + Verifies: + 1. First node uses UpdateFrame + 2. Subsequent nodes use AppendFrame + 3. Frame content is correct + """ + flow_manager = FlowManager(task=self.mock_task, llm=self.mock_llm) await flow_manager.initialize() - # Verify initialization succeeded - self.assertTrue(flow_manager.initialized) + test_node = { + "task_messages": [{"role": "system", "content": "Test task."}], + "functions": [], + } - # Verify no initial system message was queued - calls = self.mock_task.queue_frame.call_args_list - update_frame_calls = [ - call for call in calls if isinstance(call[0][0], LLMMessagesUpdateFrame) - ] - self.assertEqual( - len(update_frame_calls), - 0, - "No LLMMessagesUpdateFrame should be queued when there's no initial system message", + # First node should use UpdateFrame + await flow_manager.set_node("first", test_node) + first_frames = self.mock_task.queue_frames.call_args[0][0] + self.assertTrue( + any(isinstance(f, LLMMessagesUpdateFrame) for f in first_frames), + "First node should use UpdateFrame", ) - - # Verify node was set - self.assertEqual(flow_manager.current_node, "start") - - async def test_static_flow_ignores_initial_messages(self): - """Test that static flows ignore passed initial messages.""" - flow_manager = FlowManager( - task=self.mock_task, - llm=self.mock_llm, - tts=self.mock_tts, - flow_config=self.static_flow_config, + self.assertFalse( + any(isinstance(f, LLMMessagesAppendFrame) for f in first_frames), + "First node should not use AppendFrame", ) - # Initialize with messages that should be ignored - with patch("loguru.logger.warning") as mock_logger: - await flow_manager.initialize([{"role": "system", "content": "Ignored message"}]) - mock_logger.assert_called_once() + # Reset mock + self.mock_task.queue_frames.reset_mock() - # Verify only initial_system_message was used - calls = self.mock_task.queue_frame.call_args_list - update_frame_calls = [ - call for call in calls if isinstance(call[0][0], LLMMessagesUpdateFrame) - ] + # Second node should use AppendFrame + await flow_manager.set_node("second", test_node) + second_frames = self.mock_task.queue_frames.call_args[0][0] self.assertTrue( - any( - call[0][0].messages == self.static_flow_config["initial_system_message"] - for call in update_frame_calls - ) + any(isinstance(f, LLMMessagesAppendFrame) for f in second_frames), + "Subsequent nodes should use AppendFrame", + ) + self.assertFalse( + any(isinstance(f, LLMMessagesUpdateFrame) for f in second_frames), + "Subsequent nodes should not use UpdateFrame", ) From d5e095fd1a73a3d231c95ce501f913ccb368c11b Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Dec 2024 18:51:32 -0500 Subject: [PATCH 08/10] Update remaining static demos --- examples/static/food_ordering.py | 22 +++++++++--------- examples/static/patient_intake.py | 28 +++++++++++------------ examples/static/restaurant_reservation.py | 20 ++++++++-------- examples/static/travel_planner.py | 28 +++++++++++------------ 4 files changed, 49 insertions(+), 49 deletions(-) diff --git a/examples/static/food_ordering.py b/examples/static/food_ordering.py index 29be17e..922a7f1 100644 --- a/examples/static/food_ordering.py +++ b/examples/static/food_ordering.py @@ -110,15 +110,15 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult: flow_config: FlowConfig = { "initial_node": "start", - "initial_system_message": [ - { - "role": "system", - "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", - } - ], "nodes": { "start": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", + } + ], + "task_messages": [ { "role": "system", "content": "For this step, ask the user if they want pizza or sushi, and wait for them to use a function to choose. Start off by greeting them. Be friendly and casual; you're taking an order for food over the phone.", @@ -146,7 +146,7 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult: ], }, "choose_pizza": { - "messages": [ + "task_messages": [ { "role": "system", "content": """You are handling a pizza order. Use the available functions: @@ -198,7 +198,7 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult: ], }, "choose_sushi": { - "messages": [ + "task_messages": [ { "role": "system", "content": """You are handling a sushi order. Use the available functions: @@ -249,7 +249,7 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult: ], }, "confirm": { - "messages": [ + "task_messages": [ { "role": "system", "content": """Read back the complete order details to the user and ask for final confirmation. Use the available functions: @@ -272,7 +272,7 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult: ], }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Concisely end the conversation—1-3 words is appropriate. Just say 'Bye' or something similarly short.", diff --git a/examples/static/patient_intake.py b/examples/static/patient_intake.py index 90add60..cf2ebe4 100644 --- a/examples/static/patient_intake.py +++ b/examples/static/patient_intake.py @@ -165,15 +165,15 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: flow_config: FlowConfig = { "initial_node": "start", - "initial_system_message": [ - { - "role": "system", - "content": "You are Jessica, an agent for Tri-County Health Services. You must ALWAYS use one of the available functions to progress the conversation. Be professional but friendly.", - } - ], "nodes": { "start": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are Jessica, an agent for Tri-County Health Services. You must ALWAYS use one of the available functions to progress the conversation. Be professional but friendly.", + } + ], + "task_messages": [ { "role": "system", "content": "Start by introducing yourself to Chad Bailey, then ask for their date of birth, including the year. Once they provide their birthday, use verify_birthday to check it. If verified (1983-01-01), proceed to prescriptions.", @@ -202,7 +202,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: ], }, "get_prescriptions": { - "messages": [ + "task_messages": [ { "role": "system", "content": "This step is for collecting prescriptions. Ask them what prescriptions they're taking, including the dosage. After recording prescriptions (or confirming none), proceed to allergies.", @@ -244,7 +244,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: ], }, "get_allergies": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions.", @@ -282,7 +282,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: ], }, "get_conditions": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons.", @@ -320,7 +320,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: ], }, "get_visit_reasons": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Collect information about the reason for their visit. Ask what brings them to the doctor today. After recording their reasons, proceed to verification.", @@ -358,7 +358,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: ], }, "verify": { - "messages": [ + "task_messages": [ { "role": "system", "content": """Review all collected information with the patient. Follow these steps: @@ -391,7 +391,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: ], }, "confirm": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Once confirmed, thank them, then use the complete_intake function to end the conversation.", @@ -410,7 +410,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: ], }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Thank them for their time and end the conversation.", diff --git a/examples/static/restaurant_reservation.py b/examples/static/restaurant_reservation.py index e79e0b7..c604db6 100644 --- a/examples/static/restaurant_reservation.py +++ b/examples/static/restaurant_reservation.py @@ -93,15 +93,15 @@ async def record_time(args: FlowArgs) -> FlowResult: flow_config: FlowConfig = { "initial_node": "start", - "initial_system_message": [ - { - "role": "system", - "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversations and your responses will be converted to audio. Avoid outputting special characters and emojis. Be causal and friendly.", - } - ], "nodes": { "start": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversations and your responses will be converted to audio. Avoid outputting special characters and emojis. Be causal and friendly.", + } + ], + "task_messages": [ { "role": "system", "content": "Warmly greet the customer and ask how many people are in their party.", @@ -127,7 +127,7 @@ async def record_time(args: FlowArgs) -> FlowResult: ], }, "get_time": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM. After they provide a time, confirm it's within operating hours before recording. Use 24-hour format for internal recording (e.g., 17:00 for 5 PM).", @@ -157,7 +157,7 @@ async def record_time(args: FlowArgs) -> FlowResult: ], }, "confirm": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Confirm the reservation details and end the conversation.", @@ -176,7 +176,7 @@ async def record_time(args: FlowArgs) -> FlowResult: ], }, "end": { - "messages": [{"role": "system", "content": "Thank them and end the conversation."}], + "task_messages": [{"role": "system", "content": "Thank them and end the conversation."}], "functions": [], "post_actions": [{"type": "end_conversation"}], }, diff --git a/examples/static/travel_planner.py b/examples/static/travel_planner.py index f9e8495..cc1d345 100644 --- a/examples/static/travel_planner.py +++ b/examples/static/travel_planner.py @@ -119,15 +119,15 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: flow_config: FlowConfig = { "initial_node": "start", - "initial_system_message": [ - { - "role": "system", - "content": "You are a travel planning assistant with Summit & Sand Getaways. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Avoid outputting special characters and emojis.", - } - ], "nodes": { "start": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are a travel planning assistant with Summit & Sand Getaways. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Avoid outputting special characters and emojis.", + } + ], + "task_messages": [ { "role": "system", "content": "For this step, ask if they're interested in planning a beach vacation or a mountain retreat, and wait for them to choose. Start with an enthusiastic greeting and be conversational; you're helping them plan their dream vacation.", @@ -155,7 +155,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: ], }, "choose_beach": { - "messages": [ + "task_messages": [ { "role": "system", "content": "You are handling beach vacation planning. Use the available functions:\n - Use select_destination when the user chooses their preferred beach location\n - After destination is selected, dates will be collected automatically\n\nAvailable beach destinations are: 'Maui', 'Cancun', or 'Maldives'. After they choose, confirm their selection. Be enthusiastic and paint a picture of each destination.", @@ -185,7 +185,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: ], }, "choose_mountain": { - "messages": [ + "task_messages": [ { "role": "system", "content": "You are handling mountain retreat planning. Use the available functions:\n - Use select_destination when the user chooses their preferred mountain location\n - After destination is selected, dates will be collected automatically\n\nAvailable mountain destinations are: 'Swiss Alps', 'Rocky Mountains', or 'Himalayas'. After they choose, confirm their selection. Be enthusiastic and paint a picture of each destination.", @@ -215,7 +215,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: ], }, "get_dates": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Handle travel date selection. Use the available functions:\n - Use record_dates when the user specifies their travel dates (can be used multiple times if they change their mind)\n - After dates are recorded, activities will be collected automatically\n\nAsk for their preferred travel dates within the next 6 months. After recording dates, confirm the selection.", @@ -250,7 +250,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: ], }, "get_activities": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Handle activity preferences. Use the available functions:\n - Use record_activities to save their activity preferences\n - After activities are recorded, verification will happen automatically\n\nFor beach destinations, suggest: snorkeling, surfing, sunset cruise\nFor mountain destinations, suggest: hiking, skiing, mountain biking\n\nAfter they choose, confirm their selections.", @@ -282,7 +282,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: ], }, "verify_itinerary": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Review the complete itinerary with the user. Summarize their destination, dates, and chosen activities. Use revise_plan to make changes or confirm_booking if they're happy. Be thorough in reviewing all details and ask for their confirmation.", @@ -310,7 +310,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: ], }, "confirm_booking": { - "messages": [ + "task_messages": [ { "role": "system", "content": "The booking is confirmed. Share some relevant tips about their chosen destination, thank them warmly, and use end to complete the conversation.", @@ -332,7 +332,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: ], }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Wish them a wonderful trip and end the conversation.", From f8cd29db9254944f3c66b1f2940adeba3df22414 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Dec 2024 19:32:47 -0500 Subject: [PATCH 09/10] Update the Flow Editor with the message changes and update the editor JSON examples --- editor/examples/food_ordering.json | 16 +++-- editor/examples/movie_explorer.json | 14 ++-- editor/examples/patient_intake.json | 22 +++--- editor/examples/restaurant_reservation.json | 14 ++-- editor/examples/travel_planner.json | 22 +++--- editor/index.html | 21 ++++-- editor/js/editor/sidePanel.js | 50 +++++++++---- editor/js/nodes/baseNode.js | 44 ++++++------ editor/js/nodes/endNode.js | 14 +++- editor/js/nodes/flowNode.js | 14 +++- editor/js/nodes/startNode.js | 20 +++++- editor/js/types.js | 4 +- editor/js/utils/export.js | 19 +++-- editor/js/utils/import.js | 6 +- editor/js/utils/validation.js | 80 +++++++++++++++++---- 15 files changed, 271 insertions(+), 89 deletions(-) diff --git a/editor/examples/food_ordering.json b/editor/examples/food_ordering.json index 3a05d3d..9bc1429 100644 --- a/editor/examples/food_ordering.json +++ b/editor/examples/food_ordering.json @@ -2,7 +2,13 @@ "initial_node": "start", "nodes": { "start": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis." + } + ], + "task_messages": [ { "role": "system", "content": "For this step, ask the user if they want pizza or sushi, and wait for them to use a function to choose. Start off by greeting them. Be friendly and casual; you're taking an order for food over the phone." @@ -36,7 +42,7 @@ ] }, "choose_pizza": { - "messages": [ + "task_messages": [ { "role": "system", "content": "You are handling a pizza order. Use the available functions:\n\n- Use select_pizza_order when the user specifies both size AND type\n\n- Use confirm_order when the user confirms they are satisfied with their selection\n\nPricing:\n\n- Small: $10\n\n- Medium: $15\n\n- Large: $20\n\nAfter selection, confirm both the size and type, state the price, and ask if they want to confirm their order. Remember to be friendly and casual." @@ -82,7 +88,7 @@ ] }, "choose_sushi": { - "messages": [ + "task_messages": [ { "role": "system", "content": "You are handling a sushi order. Use the available functions:\n\n- Use select_sushi_order when the user specifies both count AND type\n\n- Use confirm_order when the user confirms they are satisfied with their selection\n\nPricing:\n\n- $8 per roll\n\nAfter selection, confirm both the count and type, state the price, and ask if they want to confirm their order. Remember to be friendly and casual." @@ -129,7 +135,7 @@ ] }, "confirm": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Read back the complete order details to the user and ask for final confirmation. Use the available functions:\n\n- Use complete_order when the user confirms\n\n- Use revise_order if they want to change something\n\nBe friendly and clear when reading back the order details." @@ -151,7 +157,7 @@ ] }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Concisely end the conversation—1-3 words is appropriate. Just say 'Bye' or something similarly short." diff --git a/editor/examples/movie_explorer.json b/editor/examples/movie_explorer.json index b19d008..cf3165a 100644 --- a/editor/examples/movie_explorer.json +++ b/editor/examples/movie_explorer.json @@ -2,10 +2,16 @@ "initial_node": "greeting", "nodes": { "greeting": { - "messages": [ + "role_messages": [ { "role": "system", - "content": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies." + "content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally." + } + ], + "task_messages": [ + { + "role": "system", + "content": "Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies." } ], "functions": [ @@ -38,7 +44,7 @@ ] }, "explore_movie": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Help the user learn more about movies. You can:\n\n- Use get_movie_details when they express interest in a specific movie\n\n- Use get_similar_movies to show recommendations\n\n- Use get_current_movies to see what's playing now\n\n- Use get_upcoming_movies to see what's coming soon\n\n- Use end_conversation when they're done exploring\n\nAfter showing details or recommendations, ask if they'd like to explore another movie or end the conversation." @@ -120,7 +126,7 @@ ] }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Thank the user warmly and mention they can return anytime to discover more movies." diff --git a/editor/examples/patient_intake.json b/editor/examples/patient_intake.json index 03d74c0..769e248 100644 --- a/editor/examples/patient_intake.json +++ b/editor/examples/patient_intake.json @@ -2,7 +2,13 @@ "initial_node": "start", "nodes": { "start": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are Jessica, an agent for Tri-County Health Services. You must ALWAYS use one of the available functions to progress the conversation. Be professional but friendly." + } + ], + "task_messages": [ { "role": "system", "content": "Start by introducing yourself to Chad Bailey, then ask for their date of birth, including the year. Once they provide their birthday, use verify_birthday to check it. If verified (1983-01-01), proceed to prescriptions." @@ -31,7 +37,7 @@ ] }, "get_prescriptions": { - "messages": [ + "task_messages": [ { "role": "system", "content": "This step is for collecting prescriptions. Ask them what prescriptions they're taking, including the dosage. After recording prescriptions (or confirming none), proceed to allergies." @@ -73,7 +79,7 @@ ] }, "get_allergies": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions." @@ -111,7 +117,7 @@ ] }, "get_conditions": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons." @@ -149,7 +155,7 @@ ] }, "get_visit_reasons": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Collect information about the reason for their visit. Ask what brings them to the doctor today. After recording their reasons, proceed to verification." @@ -187,7 +193,7 @@ ] }, "verify": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Review all collected information with the patient. Follow these steps:\n\n1. Summarize their prescriptions, allergies, conditions, and visit reasons\n\n2. Ask if everything is correct\n\n3. Use the appropriate function based on their response\n\nBe thorough in reviewing all details and wait for explicit confirmation." @@ -221,7 +227,7 @@ ] }, "confirm": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Once confirmed, thank them, then use the complete_intake function to end the conversation." @@ -243,7 +249,7 @@ ] }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Thank them for their time and end the conversation." diff --git a/editor/examples/restaurant_reservation.json b/editor/examples/restaurant_reservation.json index 53e6514..223e744 100644 --- a/editor/examples/restaurant_reservation.json +++ b/editor/examples/restaurant_reservation.json @@ -2,7 +2,13 @@ "initial_node": "start", "nodes": { "start": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversations and your responses will be converted to audio. Avoid outputting special characters and emojis. Be causal and friendly." + } + ], + "task_messages": [ { "role": "system", "content": "Warmly greet the customer and ask how many people are in their party." @@ -32,7 +38,7 @@ ] }, "get_time": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM. After they provide a time, confirm it's within operating hours before recording. Use 24-hour format for internal recording (e.g., 17:00 for 5 PM)." @@ -62,7 +68,7 @@ ] }, "confirm": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Confirm the reservation details and end the conversation." @@ -84,7 +90,7 @@ ] }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Thank them and end the conversation." diff --git a/editor/examples/travel_planner.json b/editor/examples/travel_planner.json index b85826b..c2ccff6 100644 --- a/editor/examples/travel_planner.json +++ b/editor/examples/travel_planner.json @@ -2,7 +2,13 @@ "initial_node": "start", "nodes": { "start": { - "messages": [ + "role_messages": [ + { + "role": "system", + "content": "You are a travel planning assistant with Summit & Sand Getaways. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Avoid outputting special characters and emojis." + } + ], + "task_messages": [ { "role": "system", "content": "For this step, ask if they're interested in planning a beach vacation or a mountain retreat, and wait for them to choose. Start with an enthusiastic greeting and be conversational; you're helping them plan their dream vacation." @@ -36,7 +42,7 @@ ] }, "choose_beach": { - "messages": [ + "task_messages": [ { "role": "system", "content": "You are handling beach vacation planning. Use the available functions:\n - Use select_destination when the user chooses their preferred beach location\n - After destination is selected, dates will be collected automatically\n\nAvailable beach destinations are: 'Maui', 'Cancun', or 'Maldives'. After they choose, confirm their selection. Be enthusiastic and paint a picture of each destination." @@ -66,7 +72,7 @@ ] }, "choose_mountain": { - "messages": [ + "task_messages": [ { "role": "system", "content": "You are handling mountain retreat planning. Use the available functions:\n - Use select_destination when the user chooses their preferred mountain location\n - After destination is selected, dates will be collected automatically\n\nAvailable mountain destinations are: 'Swiss Alps', 'Rocky Mountains', or 'Himalayas'. After they choose, confirm their selection. Be enthusiastic and paint a picture of each destination." @@ -96,7 +102,7 @@ ] }, "get_dates": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Handle travel date selection. Use the available functions:\n - Use record_dates when the user specifies their travel dates (can be used multiple times if they change their mind)\n - After dates are recorded, activities will be collected automatically\n\nAsk for their preferred travel dates within the next 6 months. After recording dates, confirm the selection." @@ -131,7 +137,7 @@ ] }, "get_activities": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Handle activity preferences. Use the available functions:\n - Use record_activities to save their activity preferences\n - After activities are recorded, verification will happen automatically\n\nFor beach destinations, suggest: snorkeling, surfing, sunset cruise\nFor mountain destinations, suggest: hiking, skiing, mountain biking\n\nAfter they choose, confirm their selections." @@ -165,7 +171,7 @@ ] }, "verify_itinerary": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Review the complete itinerary with the user. Summarize their destination, dates, and chosen activities. Use revise_plan to make changes or confirm_booking if they're happy. Be thorough in reviewing all details and ask for their confirmation." @@ -199,7 +205,7 @@ ] }, "confirm_booking": { - "messages": [ + "task_messages": [ { "role": "system", "content": "The booking is confirmed. Share some relevant tips about their chosen destination, thank them warmly, and use end to complete the conversation." @@ -227,7 +233,7 @@ ] }, "end": { - "messages": [ + "task_messages": [ { "role": "system", "content": "Wish them a wonderful trip and end the conversation." diff --git a/editor/index.html b/editor/index.html index 92365e1..383c8dc 100644 --- a/editor/index.html +++ b/editor/index.html @@ -49,13 +49,24 @@