From 781f454a6f98baf6e833d04ff5ee40ed2b568488 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 2 Dec 2024 15:37:00 -0500 Subject: [PATCH] Refactor into a single FlowManager --- README.md | 292 ++++++------ editor/examples/movie_booking.json | 189 -------- examples/dynamic/insurance_anthropic.py | 38 +- examples/dynamic/insurance_gemini.py | 42 +- examples/dynamic/insurance_openai.py | 146 +++--- examples/static/food_ordering.py | 70 ++- examples/static/movie_booking_openai.py | 314 ------------- examples/static/movie_explorer_anthropic.py | 38 +- examples/static/movie_explorer_gemini.py | 41 +- ...e_explorer.py => movie_explorer_openai.py} | 36 +- examples/static/patient_intake.py | 103 ++--- examples/static/restaurant_reservation.py | 44 +- ...el_planner_openai.py => travel_planner.py} | 66 ++- examples/static/travel_planner_gemini.py | 391 ---------------- src/pipecat_flows/__init__.py | 103 +++-- src/pipecat_flows/adapters.py | 26 +- src/pipecat_flows/base.py | 146 ------ src/pipecat_flows/dynamic.py | 247 ----------- src/pipecat_flows/manager.py | 419 ++++++++++++++++++ src/pipecat_flows/state.py | 2 +- src/pipecat_flows/static.py | 203 --------- src/pipecat_flows/{config.py => types.py} | 10 + 22 files changed, 944 insertions(+), 2022 deletions(-) delete mode 100644 editor/examples/movie_booking.json delete mode 100644 examples/static/movie_booking_openai.py rename examples/static/{movie_explorer.py => movie_explorer_openai.py} (92%) rename examples/static/{travel_planner_openai.py => travel_planner.py} (94%) delete mode 100644 examples/static/travel_planner_gemini.py delete mode 100644 src/pipecat_flows/base.py delete mode 100644 src/pipecat_flows/dynamic.py create mode 100644 src/pipecat_flows/manager.py delete mode 100644 src/pipecat_flows/static.py rename src/pipecat_flows/{config.py => types.py} (94%) diff --git a/README.md b/README.md index 253b60b..c1980ab 100644 --- a/README.md +++ b/README.md @@ -4,24 +4,23 @@ [![PyPI](https://img.shields.io/pypi/v/pipecat-ai-flows)](https://pypi.org/project/pipecat-ai-flows) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat) -Pipecat Flows provides a framework for building structured conversations in your AI applications. It is comprised of: +# Pipecat Flows -- A [python module](#pipecat-flows-package) for building conversation flows with Pipecat -- A [visual editor](#pipecat-flows-editor) for visualizing conversations and exporting into flow_configs +## Overview -The framework offers two approaches to managing conversation flows: +Pipecat Flows provides a framework for building structured conversations in your AI applications. It enables you to create both predefined conversation paths and dynamically generated flows while handling the complexities of state management and LLM interactions. -1. **Static Flows**: Configuration-driven conversations with predefined paths. Ideal for flows where the entire conversation structure can be defined upfront, from simple scripts to complex decision trees. +The framework consists of: -2. **Dynamic Flows**: Runtime-determined conversations where paths are created or modified during execution. Perfect for scenarios where flow structure depends on external data, business logic, or needs to adapt during the conversation. +- A Python module for building conversation flows with Pipecat +- A visual editor for designing and exporting flow configurations -To learn more about building with Pipecat Flows, [check out the guide](https://docs.pipecat.ai/guides/pipecat-flows). +### When to Use Pipecat Flows -## Pipecat Flows Package +- **Static Flows**: When your conversation structure is known upfront and follows predefined paths. Perfect for customer service scripts, intake forms, or guided experiences. +- **Dynamic Flows**: When conversation paths need to be determined at runtime based on user input, external data, or business logic. Ideal for personalized experiences or complex decision trees. -A Python package for managing conversation flows in Pipecat applications. - -### Installation +## Installation If you're already using Pipecat: @@ -35,167 +34,197 @@ If you're starting fresh: # Basic installation pip install pipecat-ai-flows -# Install Pipecat with required options -# For example, to use Daily, OpenAI, and Deepgram: -pip install "pipecat-ai[daily, openai,deepgram]" +# Install Pipecat with specific LLM provider options: +pip install "pipecat-ai[daily,openai,deepgram]" # For OpenAI +pip install "pipecat-ai[daily,anthropic,deepgram]" # For Anthropic +pip install "pipecat-ai[daily,google,deepgram]" # For Google ``` -Learn more about the available options with [Pipecat](https://github.com/pipecat-ai/pipecat). - -## Static Flows - -Static flows use a JSON configuration to define the complete conversation structure upfront. - -### Configuration +## Quick Start -Each node in your flow consists of: - -- Messages that set context for the LLM -- Available functions for that state -- Optional pre/post actions - -### Basic Usage +Here's a basic example of setting up a conversation flow: ```python -from pipecat_flows import StaticFlowManager # When developing with the repository -# or -from pipecat.flows import StaticFlowManager # When installed via pip - -# Initialize context and tools -initial_tools = flow_config["nodes"]["start"]["functions"] # Available functions for starting state -context = OpenAILLMContext(messages, initial_tools) # Create LLM context with initial state -context_aggregator = llm.create_context_aggregator(context) - -# Create your pipeline: No new processors are required -pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, # STT - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses - ] -) +from pipecat_flows import FlowManager -# Create the Pipecat task -task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) +# Initialize flow manager with static configuration +flow_manager = FlowManager(task, llm, tts, flow_config=flow_config) -# Initialize flow management -flow_manager = StaticFlowManager(flow_config, task, llm, tts) # Create flow manager +# Or with dynamic flow handling +flow_manager = FlowManager( + task, + llm, + tts, + transition_callback=handle_transitions +) -# Initialize with starting messages @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) - # Initialize the flow processor await flow_manager.initialize(messages) - # Kick off the conversation using the context aggregator await task.queue_frames([context_aggregator.user().get_context_frame()]) ``` -## Dynamic Flows +For more detailed examples and guides, visit our [documentation](https://docs.pipecat.ai/guides/pipecat-flows). + +## Core Concepts + +### Flow Configuration + +Each conversation flow consists of nodes that define the conversation structure. A node includes: -Dynamic flows allow for runtime creation and modification of conversation paths based on data or business logic. +#### Messages -### Configuration +Messages set the context for the LLM at each state: -Each node consists of the same components as static flows: +```python +"messages": [ + { + "role": "system", + "content": "You are handling pizza orders. Ask for size selection." + } +] +``` -- Messages that set context for the LLM -- Available functions for that state -- Optional pre/post actions +#### Functions -The key difference is that nodes are created programmatically rather than defined in a JSON configuration. +Functions come in two types: -### Basic Usage +1. **Node Functions**: Execute operations within the current state ```python -from pipecat_flows import DynamicFlowManager # When developing with the repository -# or -from pipecat.flows import DynamicFlowManager # When installed via pip +{ + "type": "function", + "function": { + "name": "select_size", + "handler": select_size_handler, # Required for node functions + "description": "Select pizza size", + "parameters": { + "type": "object", + "properties": { + "size": {"type": "string", "enum": ["small", "medium", "large"]} + } + } + } +} +``` -# Define your transition callback -async def handle_transitions(function_name: str, args: Dict[str, Any], flow_manager): - if function_name == "collect_age": - # Create next node based on age - if args["age"] < 25: - await flow_manager.set_node("young_adult", create_young_adult_node()) - else: - await flow_manager.set_node("standard", create_standard_node()) - -# Initialize context and tools -context = OpenAILLMContext(messages, []) # Start with empty tools -context_aggregator = llm.create_context_aggregator(context) - -# Create your pipeline: No new processors are required -pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, # STT - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses - ] -) +2. **Edge Functions**: Create transitions between states -# Create the Pipecat task -task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) +```python +{ + "type": "function", + "function": { + "name": "next_node", # Must match a node name + "description": "Move to next state", + "parameters": {"type": "object", "properties": {}} + } +} +``` -# Initialize flow management -flow_manager = DynamicFlowManager( - task, - llm, - tts, - transition_callback=handle_transitions -) # Create flow manager +#### Actions -# Initialize with starting messages -@transport.event_handler("on_first_participant_joined") -async def on_first_participant_joined(transport, participant): - await transport.capture_participant_transcription(participant["id"]) - # Register function handlers - await flow_manager.register_functions({ - "collect_age": collect_age_handler, - "process_data": process_data_handler - }) - # Initialize the flow processor - await flow_manager.initialize(messages) - # Set initial node - await flow_manager.set_node("initial", create_initial_node()) - # Kick off the conversation using the context aggregator - await task.queue_frames([context_aggregator.user().get_context_frame()]) +Actions execute during state transitions: + +```python +"pre_actions": [ + { + "type": "tts_say", + "text": "Processing your order..." + } +] ``` -The key differences from static flows are: +#### Provider-Specific Formats + +Pipecat Flows automatically handles format differences between LLM providers: + +**OpenAI Format** + +```python +"functions": [{ + "type": "function", + "function": { + "name": "function_name", + "description": "description", + "parameters": {...} + } +}] +``` -1. The transition callback that determines flow progression -2. Function registration through `register_functions()` -3. Node creation and setting through `set_node()` -4. No upfront flow configuration required +**Anthropic Format** + +```python +"functions": [{ + "name": "function_name", + "description": "description", + "input_schema": {...} +}] +``` + +**Google (Gemini) Format** + +```python +"functions": [{ + "function_declarations": [{ + "name": "function_name", + "description": "description", + "parameters": {...} + }] +}] +``` -### Running Examples +### Flow Management + +The FlowManager handles both static and dynamic flows through a unified interface: + +#### Static Flows + +```python +# Define flow configuration upfront +flow_config = { + "initial_node": "greeting", + "nodes": { + "greeting": { + "messages": [...], + "functions": [...] + } + } +} + +# Initialize with static configuration +flow_manager = FlowManager(task, llm, tts, flow_config=flow_config) +``` + +#### Dynamic Flows + +```python +# Define transition handling +async def handle_transitions(function_name: str, args: Dict, flow_manager): + if function_name == "collect_age": + await flow_manager.set_node("next_step", create_next_node()) + +# Initialize with transition callback +flow_manager = FlowManager(task, llm, tts, transition_callback=handle_transitions) +``` + +## Examples The repository includes several complete example implementations in the `examples/` directory. -#### Static +### Static In the `examples/static` directory, you'll find these examples: - `food_ordering.py` - A restaurant order flow demonstrating node and edge functions -- `movie_booking.py` - A movie ticket booking system with date-based branching - `movie_explorer_openai.py` - Movie information bot demonstrating real API integration with TMDB - `movie_explorer_anthropic.py` - The same movie information demo adapted for Anthropic's format - `movie_explorer_gemini.py` - The same movie explorer demo adapted for Google Gemini's format - `patient_intake.py` - A medical intake system showing complex state management - `restaurant_reservation.py` - A reservation system with availability checking -- `travel_planner_openai.py` - A vacation planning assistant with parallel paths -- `travel_planner_gemini.py` - The same vacation planning assistant adapted for Google Gemini's format +- `travel_planner.py` - A vacation planning assistant with parallel paths -#### Dynamic +### Dynamic In the `examples/dynamic` directory, you'll find these examples: @@ -250,6 +279,7 @@ To run these examples: - DEEPGRAM_API_KEY - OPENAI_API_KEY - ANTHROPIC_API_KEY + - GOOGLE_API_KEY - DAILY_API_KEY Looking for a Daily API key and room URL? Sign up on the [Daily Dashboard](https://dashboard.daily.co). @@ -259,11 +289,11 @@ To run these examples: python examples/static/food_ordering.py -u YOUR_DAILY_ROOM_URL ``` -### Running Tests +## Tests The package includes a comprehensive test suite covering the core functionality. -#### Setup Test Environment +### Setup Test Environment 1. **Create Virtual Environment**: @@ -279,7 +309,7 @@ The package includes a comprehensive test suite covering the core functionality. pip install -e . ``` -#### Running Tests +### Running Tests Run all tests: diff --git a/editor/examples/movie_booking.json b/editor/examples/movie_booking.json deleted file mode 100644 index e40539f..0000000 --- a/editor/examples/movie_booking.json +++ /dev/null @@ -1,189 +0,0 @@ -{ - "initial_node": "start", - "nodes": { - "start": { - "messages": [ - { - "role": "system", - "content": "For this step, ask if they want to see what's playing today or tomorrow, and wait for them to choose. Start with a warm greeting and be helpful and enthusiastic; you're helping them plan their entertainment." - } - ], - "functions": [ - { - "type": "function", - "function": { - "name": "check_today", - "description": "User wants to see today's movies", - "parameters": { - "type": "object", - "properties": {} - } - } - }, - { - "type": "function", - "function": { - "name": "check_tomorrow", - "description": "User wants to see tomorrow's movies", - "parameters": { - "type": "object", - "properties": {} - } - } - } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Welcome to MoviePlex! Let me help you book some tickets." - } - ] - }, - "check_today": { - "messages": [ - { - "role": "system", - "content": "You are handling today's movie selection. Use the available functions:\n - Use select_movie when the user chooses a movie (can be used multiple times if they change their mind)\n - Use select_showtime after they've chosen a movie to pick their preferred time\n - Use the end function ONLY when the user confirms their final selection\n\nAfter each selection, confirm their choice and ask about the next step. Remember to be enthusiastic and helpful.\n\nStart by telling them today's available movies: 'Jurassic Park' at 3:00 PM and 7:00 PM, or 'The Matrix' at 4:00 PM and 8:00 PM." - } - ], - "functions": [ - { - "type": "function", - "function": { - "name": "select_movie", - "description": "Record the selected movie", - "parameters": { - "type": "object", - "properties": { - "movie": { - "type": "string", - "enum": ["Jurassic Park", "The Matrix"], - "description": "Selected movie" - } - }, - "required": ["movie"] - } - } - }, - { - "type": "function", - "function": { - "name": "select_showtime", - "description": "Record the selected showtime", - "parameters": { - "type": "object", - "properties": { - "time": { - "type": "string", - "enum": ["3:00 PM", "4:00 PM", "7:00 PM", "8:00 PM"], - "description": "Selected showtime" - } - }, - "required": ["time"] - } - } - }, - { - "type": "function", - "function": { - "name": "end", - "description": "Complete the booking (use only after user confirms)", - "parameters": { - "type": "object", - "properties": {} - } - } - } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Let me show you what's playing today..." - } - ] - }, - "check_tomorrow": { - "messages": [ - { - "role": "system", - "content": "You are handling tomorrow's movie selection. Use the available functions:\n - Use select_movie when the user chooses a movie (can be used multiple times if they change their mind)\n - Use select_showtime after they've chosen a movie to pick their preferred time\n - Use the end function ONLY when the user confirms their final selection\n\nAfter each selection, confirm their choice and ask about the next step. Remember to be enthusiastic and helpful.\n\nStart by telling them tomorrow's available movies: 'The Lion King' at 2:00 PM and 6:00 PM, or 'Inception' at 3:00 PM and 7:00 PM." - } - ], - "functions": [ - { - "type": "function", - "function": { - "name": "select_movie", - "description": "Record the selected movie", - "parameters": { - "type": "object", - "properties": { - "movie": { - "type": "string", - "enum": ["The Lion King", "Inception"], - "description": "Selected movie" - } - }, - "required": ["movie"] - } - } - }, - { - "type": "function", - "function": { - "name": "select_showtime", - "description": "Record the selected showtime", - "parameters": { - "type": "object", - "properties": { - "time": { - "type": "string", - "enum": ["2:00 PM", "3:00 PM", "6:00 PM", "7:00 PM"], - "description": "Selected showtime" - } - }, - "required": ["time"] - } - } - }, - { - "type": "function", - "function": { - "name": "end", - "description": "Complete the booking (use only after user confirms)", - "parameters": { - "type": "object", - "properties": {} - } - } - } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Let me show you what's playing tomorrow..." - } - ] - }, - "end": { - "messages": [ - { - "role": "system", - "content": "The booking is complete. Thank the user enthusiastically and end the conversation." - } - ], - "functions": [], - "pre_actions": [ - { - "type": "tts_say", - "text": "Your tickets are confirmed! Enjoy the show!" - } - ], - "post_actions": [ - { - "type": "end_conversation" - } - ] - } - } -} diff --git a/examples/dynamic/insurance_anthropic.py b/examples/dynamic/insurance_anthropic.py index 7efee6d..78c3934 100644 --- a/examples/dynamic/insurance_anthropic.py +++ b/examples/dynamic/insurance_anthropic.py @@ -47,7 +47,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import DynamicFlowManager +from pipecat_flows import FlowArgs, FlowManager, FlowResult load_dotenv(override=True) @@ -72,21 +72,21 @@ class InsuranceQuote(TypedDict): # Function handlers -async def collect_age(args: Dict[str, Any]) -> Dict[str, Any]: +async def collect_age(args: FlowArgs) -> FlowResult: """Process age collection.""" age = args["age"] logger.debug(f"Processing age: {age}") return {"age": age} -async def collect_marital_status(args: Dict[str, Any]) -> Dict[str, Any]: +async def collect_marital_status(args: FlowArgs) -> FlowResult: """Process marital status collection.""" status = args["marital_status"] logger.debug(f"Processing marital status: {status}") return {"marital_status": status} -async def calculate_quote(args: Dict[str, Any]) -> Dict[str, Any]: +async def calculate_quote(args: FlowArgs) -> FlowResult: """Calculate insurance quote based on age and marital status.""" age = args["age"] marital_status = args["marital_status"] @@ -109,7 +109,7 @@ async def calculate_quote(args: Dict[str, Any]) -> Dict[str, Any]: return quote -async def update_coverage(args: Dict[str, Any]) -> Dict[str, Any]: +async def update_coverage(args: FlowArgs) -> FlowResult: """Update coverage options and recalculate premium.""" coverage_amount = args["coverage_amount"] deductible = args["deductible"] @@ -130,7 +130,7 @@ async def update_coverage(args: Dict[str, Any]) -> Dict[str, Any]: return adjusted_quote -async def end_quote(args: Dict[str, Any]) -> Dict[str, Any]: +async def end_quote() -> FlowResult: """Handle quote completion.""" logger.debug("Completing quote process") return {"status": "completed"} @@ -158,6 +158,7 @@ def create_initial_node(): "functions": [ { "name": "collect_age", + "handler": collect_age, "description": "Record customer's age after they provide it", "input_schema": { "type": "object", @@ -189,6 +190,7 @@ def create_marital_status_node(): "functions": [ { "name": "collect_marital_status", + "handler": collect_marital_status, "description": "Record marital status", "input_schema": { "type": "object", @@ -224,6 +226,7 @@ def create_quote_calculation_node(age: int, marital_status: str): "functions": [ { "name": "calculate_quote", + "handler": calculate_quote, "description": "Calculate initial insurance quote", "input_schema": { "type": "object", @@ -263,6 +266,7 @@ def create_quote_results_node(quote: Dict[str, Any]): "functions": [ { "name": "update_coverage", + "handler": update_coverage, "description": "Update coverage options", "input_schema": { "type": "object", @@ -275,6 +279,7 @@ def create_quote_results_node(quote: Dict[str, Any]): }, { "name": "end_quote", + "handler": end_quote, "description": "Complete the quote process", "input_schema": {"type": "object", "properties": {}}, }, @@ -308,9 +313,7 @@ def create_end_node(): # Transition callback -async def handle_insurance_transition( - function_name: str, args: Dict, flow_manager: DynamicFlowManager -): +async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager): """Handle transitions between insurance flow states.""" logger.debug(f"Handling transition for function: {function_name} with args: {args}") @@ -367,15 +370,6 @@ async def main(): api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-latest" ) - # Create function handlers dictionary - function_handlers = { - "collect_age": collect_age, - "collect_marital_status": collect_marital_status, - "calculate_quote": calculate_quote, - "update_coverage": update_coverage, - "end_quote": end_quote, - } - # Create initial context messages = [ { @@ -412,13 +406,7 @@ async def main(): task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Initialize flow manager with transition callback - flow_manager = DynamicFlowManager( - task, llm, tts, transition_callback=handle_insurance_transition - ) - flow_manager.state = {} # Initialize state storage - - # Register all functions - await flow_manager.register_functions(function_handlers) + flow_manager = FlowManager(task, llm, tts, transition_callback=handle_insurance_transition) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/dynamic/insurance_gemini.py b/examples/dynamic/insurance_gemini.py index 087835a..70615ac 100644 --- a/examples/dynamic/insurance_gemini.py +++ b/examples/dynamic/insurance_gemini.py @@ -47,7 +47,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import DynamicFlowManager +from pipecat_flows import FlowArgs, FlowManager, FlowResult load_dotenv(override=True) @@ -72,21 +72,21 @@ class InsuranceQuote(TypedDict): # Function handlers -async def collect_age(args: Dict[str, Any]) -> Dict[str, Any]: +async def collect_age(args: FlowArgs) -> FlowResult: """Process age collection.""" age = args["age"] logger.debug(f"Processing age: {age}") return {"age": age} -async def collect_marital_status(args: Dict[str, Any]) -> Dict[str, Any]: +async def collect_marital_status(args: FlowArgs) -> FlowResult: """Process marital status collection.""" status = args["marital_status"] logger.debug(f"Processing marital status: {status}") return {"marital_status": status} -async def calculate_quote(args: Dict[str, Any]) -> Dict[str, Any]: +async def calculate_quote(args: FlowArgs) -> FlowResult: """Calculate insurance quote based on age and marital status.""" age = args["age"] marital_status = args["marital_status"] @@ -109,7 +109,7 @@ async def calculate_quote(args: Dict[str, Any]) -> Dict[str, Any]: return quote -async def update_coverage(args: Dict[str, Any]) -> Dict[str, Any]: +async def update_coverage(args: FlowArgs) -> FlowResult: """Update coverage options and recalculate premium.""" coverage_amount = args["coverage_amount"] deductible = args["deductible"] @@ -130,7 +130,7 @@ async def update_coverage(args: Dict[str, Any]) -> Dict[str, Any]: return adjusted_quote -async def end_quote(args: Dict[str, Any]) -> Dict[str, Any]: +async def end_quote() -> FlowResult: """Handle quote completion.""" logger.debug("Completing quote process") return {"status": "completed"} @@ -155,6 +155,7 @@ def create_initial_node(): "function_declarations": [ { "name": "collect_age", + "handler": collect_age, "description": "Record customer's age after they provide it", "parameters": { "type": "object", @@ -189,6 +190,7 @@ def create_marital_status_node(): "function_declarations": [ { "name": "collect_marital_status", + "handler": collect_marital_status, "description": "Record marital status after customer provides it", "parameters": { "type": "object", @@ -223,6 +225,7 @@ def create_quote_calculation_node(age: int, marital_status: str): "function_declarations": [ { "name": "calculate_quote", + "handler": calculate_quote, "description": "Calculate initial insurance quote", "parameters": { "type": "object", @@ -261,6 +264,7 @@ def create_quote_results_node(quote: Dict[str, Any]): "function_declarations": [ { "name": "update_coverage", + "handler": update_coverage, "description": "Update coverage options when customer requests changes", "parameters": { "type": "object", @@ -273,12 +277,11 @@ def create_quote_results_node(quote: Dict[str, Any]): }, { "name": "end_quote", + "handler": end_quote, "description": "Complete the quote process when customer is satisfied", "parameters": { "type": "object", - "properties": { - "status": {"type": "string", "enum": ["completed"]} # Add property - }, + "properties": {"status": {"type": "string", "enum": ["completed"]}}, "required": ["status"], }, }, @@ -309,9 +312,7 @@ def create_end_node(): # Transition callback -async def handle_insurance_transition( - function_name: str, args: Dict, flow_manager: DynamicFlowManager -): +async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager): """Handle transitions between insurance flow states.""" logger.debug(f"Handling transition for function: {function_name} with args: {args}") @@ -366,15 +367,6 @@ async def main(): tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-1.5-flash-latest") - # Create function handlers dictionary - function_handlers = { - "collect_age": collect_age, - "collect_marital_status": collect_marital_status, - "calculate_quote": calculate_quote, - "update_coverage": update_coverage, - "end_quote": end_quote, - } - # Create initial context messages = [ { @@ -407,13 +399,7 @@ async def main(): task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Initialize flow manager with transition callback - flow_manager = DynamicFlowManager( - task, llm, tts, transition_callback=handle_insurance_transition - ) - flow_manager.state = {} # Initialize state storage - - # Register all functions - await flow_manager.register_functions(function_handlers) + flow_manager = FlowManager(task, llm, tts, transition_callback=handle_insurance_transition) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/dynamic/insurance_openai.py b/examples/dynamic/insurance_openai.py index 131a708..3b8d536 100644 --- a/examples/dynamic/insurance_openai.py +++ b/examples/dynamic/insurance_openai.py @@ -44,7 +44,7 @@ from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport -from pipecat_flows import DynamicFlowManager +from pipecat_flows import FlowArgs, FlowManager, FlowResult sys.path.append(str(Path(__file__).parent.parent)) from runner import configure @@ -72,25 +72,25 @@ class InsuranceQuote(TypedDict): # Function handlers -async def collect_age(args: Dict[str, Any]) -> Dict[str, Any]: +async def collect_age(args: FlowArgs) -> FlowResult: """Process age collection.""" age = args["age"] - logger.debug(f"Processing age: {age}") + logger.debug(f"collect_age handler executing with age: {age}") return {"age": age} -async def collect_marital_status(args: Dict[str, Any]) -> Dict[str, Any]: +async def collect_marital_status(args: FlowArgs) -> FlowResult: """Process marital status collection.""" status = args["marital_status"] - logger.debug(f"Processing marital status: {status}") + logger.debug(f"collect_marital_status handler executing with status: {status}") return {"marital_status": status} -async def calculate_quote(args: Dict[str, Any]) -> Dict[str, Any]: +async def calculate_quote(args: FlowArgs) -> FlowResult: """Calculate insurance quote based on age and marital status.""" age = args["age"] marital_status = args["marital_status"] - logger.debug(f"Calculating quote for age: {age}, status: {marital_status}") + logger.debug(f"calculate_quote handler executing with age: {age}, status: {marital_status}") # Determine rate category age_category = "young" if age < 25 else "adult" @@ -109,11 +109,13 @@ async def calculate_quote(args: Dict[str, Any]) -> Dict[str, Any]: return quote -async def update_coverage(args: Dict[str, Any]) -> Dict[str, Any]: +async def update_coverage(args: FlowArgs) -> FlowResult: """Update coverage options and recalculate premium.""" coverage_amount = args["coverage_amount"] deductible = args["deductible"] - logger.debug(f"Updating coverage: amount={coverage_amount}, deductible={deductible}") + logger.debug( + f"update_coverage handler executing with amount: {coverage_amount}, deductible: {deductible}" + ) # Calculate adjusted quote monthly_premium = (coverage_amount / 250000) * 100 @@ -130,9 +132,9 @@ async def update_coverage(args: Dict[str, Any]) -> Dict[str, Any]: return adjusted_quote -async def end_quote(args: Dict[str, Any]) -> Dict[str, Any]: +async def end_quote() -> FlowResult: """Handle quote completion.""" - logger.debug("Completing quote process") + logger.debug("end_quote handler executing") return {"status": "completed"} @@ -148,13 +150,16 @@ def create_initial_node(): ], "functions": [ { - "name": "collect_age", - "handler": collect_age, - "description": "Record customer's age", - "parameters": { - "type": "object", - "properties": {"age": {"type": "integer"}}, - "required": ["age"], + "type": "function", + "function": { + "name": "collect_age", + "handler": collect_age, + "description": "Record customer's age", + "parameters": { + "type": "object", + "properties": {"age": {"type": "integer"}}, + "required": ["age"], + }, }, } ], @@ -175,15 +180,18 @@ def create_marital_status_node(): ], "functions": [ { - "name": "collect_marital_status", - "handler": collect_marital_status, - "description": "Record marital status", - "parameters": { - "type": "object", - "properties": { - "marital_status": {"type": "string", "enum": ["single", "married"]} + "type": "function", + "function": { + "name": "collect_marital_status", + "handler": collect_marital_status, + "description": "Record marital status", + "parameters": { + "type": "object", + "properties": { + "marital_status": {"type": "string", "enum": ["single", "married"]} + }, + "required": ["marital_status"], }, - "required": ["marital_status"], }, } ], @@ -206,19 +214,22 @@ def create_quote_calculation_node(age: int, marital_status: str): ], "functions": [ { - "name": "calculate_quote", - "handler": calculate_quote, - "description": "Calculate initial insurance quote", - "parameters": { - "type": "object", - "properties": { - "age": {"type": "integer"}, - "marital_status": { - "type": "string", - "enum": ["single", "married"], + "type": "function", + "function": { + "name": "calculate_quote", + "handler": calculate_quote, + "description": "Calculate initial insurance quote", + "parameters": { + "type": "object", + "properties": { + "age": {"type": "integer"}, + "marital_status": { + "type": "string", + "enum": ["single", "married"], + }, }, + "required": ["age", "marital_status"], }, - "required": ["age", "marital_status"], }, } ], @@ -244,25 +255,28 @@ def create_quote_results_node(quote: Dict[str, Any]): ], "functions": [ { - "name": "update_coverage", - "handler": update_coverage, - "description": "Update coverage options", - "parameters": { - "type": "object", - "properties": { - "coverage_amount": {"type": "integer"}, - "deductible": {"type": "integer"}, + "type": "function", + "function": { + "name": "update_coverage", + "handler": update_coverage, + "description": "Update coverage options", + "parameters": { + "type": "object", + "properties": { + "coverage_amount": {"type": "integer"}, + "deductible": {"type": "integer"}, + }, + "required": ["coverage_amount", "deductible"], }, - "required": ["coverage_amount", "deductible"], }, }, { - "name": "end_quote", - "handler": end_quote, - "description": "Complete the quote process", - "parameters": { - "type": "object", - "properties": {}, + "type": "function", + "function": { + "name": "end_quote", + "handler": end_quote, + "description": "Complete the quote process", + "parameters": {"type": "object", "properties": {}}, }, }, ], @@ -290,17 +304,18 @@ def create_end_node(): # Transition callback -async def handle_insurance_transition( - function_name: str, args: Dict, flow_manager: DynamicFlowManager -): +async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager): """Handle transitions between insurance flow states.""" - logger.debug(f"Handling transition for function: {function_name} with args: {args}") + logger.debug(f"Transition callback executing for function: {function_name} with args: {args}") if function_name == "collect_age": + logger.debug("Processing collect_age transition") flow_manager.state["age"] = args["age"] await flow_manager.set_node("marital_status", create_marital_status_node()) + logger.debug("Completed collect_age transition") elif function_name == "collect_marital_status": + logger.debug("Processing collect_marital_status transition") flow_manager.state["marital_status"] = args["marital_status"] await flow_manager.set_node( "quote_calculation", @@ -308,27 +323,32 @@ async def handle_insurance_transition( flow_manager.state["age"], flow_manager.state["marital_status"] ), ) + logger.debug("Completed collect_marital_status transition") elif function_name == "calculate_quote": - # Calculate the quote using the handler + logger.debug("Processing calculate_quote transition") quote = await calculate_quote(args) flow_manager.state["quote"] = quote await flow_manager.set_node( "quote_results", create_quote_results_node(quote), ) + logger.debug("Completed calculate_quote transition") elif function_name == "update_coverage": - # Calculate updated quote using the handler + logger.debug("Processing update_coverage transition") updated_quote = await update_coverage(args) flow_manager.state["quote"] = updated_quote await flow_manager.set_node( "quote_results", create_quote_results_node(updated_quote), ) + logger.debug("Completed update_coverage transition") elif function_name == "end_quote": + logger.debug("Processing end_quote transition") await flow_manager.set_node("end", create_end_node()) + logger.debug("Completed end_quote transition") async def main(): @@ -384,18 +404,16 @@ async def main(): task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Initialize flow manager with transition callback - flow_manager = DynamicFlowManager( - task, llm, tts, transition_callback=handle_insurance_transition - ) - flow_manager.state = {} # Initialize state storage + flow_manager = FlowManager(task, llm, tts, transition_callback=handle_insurance_transition) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) - # Initialize flow + logger.debug("Initializing flow") await flow_manager.initialize(messages) - # Set initial node + logger.debug("Setting initial node") await flow_manager.set_node("initial", create_initial_node()) + logger.debug("Queueing initial context") await task.queue_frames([context_aggregator.user().get_context_frame()]) # Run the pipeline diff --git a/examples/static/food_ordering.py b/examples/static/food_ordering.py index 6c529e1..bff4a1d 100644 --- a/examples/static/food_ordering.py +++ b/examples/static/food_ordering.py @@ -24,7 +24,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import StaticFlowManager +from pipecat_flows import FlowArgs, FlowConfig, FlowManager, FlowResult load_dotenv(override=True) @@ -60,7 +60,21 @@ # - Pre-action: Farewell message # - Post-action: Ends conversation -flow_config = { + +# Function handlers +async def select_pizza_size(args: FlowArgs) -> FlowResult: + """Handle pizza size selection.""" + size = args["size"] + return {"status": "success", "size": size} + + +async def select_roll_count(args: FlowArgs) -> FlowResult: + """Handle sushi roll count selection.""" + count = args["count"] + return {"status": "success", "count": count} + + +flow_config: FlowConfig = { "initial_node": "start", "nodes": { "start": { @@ -101,6 +115,7 @@ "type": "function", "function": { "name": "select_pizza_size", + "handler": select_pizza_size, "description": "Record the selected pizza size", "parameters": { "type": "object", @@ -140,6 +155,7 @@ "type": "function", "function": { "name": "select_roll_count", + "handler": select_roll_count, "description": "Record the number of sushi rolls", "parameters": { "type": "object", @@ -183,28 +199,12 @@ } -# Node function handlers -async def select_pizza_size_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - size = args["size"] - # In a real app, this would store the selection - await result_callback({"status": "success", "size": size}) - - -async def select_roll_count_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - count = args["count"] - # In a real app, this would store the selection - await result_callback({"status": "success", "count": count}) - - async def main(): """Main function to set up and run the food ordering bot.""" async with aiohttp.ClientSession() as session: (room_url, _) = await configure(session) + # Initialize services transport = DailyTransport( room_url, None, @@ -221,13 +221,6 @@ async def main(): tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - # Register node function handlers with LLM - llm.register_function("select_pizza_size", select_pizza_size_handler) - llm.register_function("select_roll_count", select_roll_count_handler) - - # Get initial tools from the first node - initial_tools = flow_config["nodes"]["start"]["functions"] - # Create initial context messages = [ { @@ -236,32 +229,33 @@ async def main(): } ] - context = OpenAILLMContext(messages, initial_tools) + context = OpenAILLMContext(messages, flow_config["nodes"]["start"]["functions"]) context_aggregator = llm.create_context_aggregator(context) + # Create pipeline pipeline = Pipeline( [ - transport.input(), # Transport user input - stt, # STT - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), ] ) task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) - # Initialize flow manager with LLM - flow_manager = StaticFlowManager(flow_config, task, llm, tts) + # Initialize flow manager in static mode + flow_manager = FlowManager(task, llm, tts, flow_config=flow_config) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) - # Initialize the flow processor + logger.debug("Initializing flow") await flow_manager.initialize(messages) - # Kick off the conversation using the context aggregator + logger.debug("Starting conversation") await task.queue_frames([context_aggregator.user().get_context_frame()]) runner = PipelineRunner() diff --git a/examples/static/movie_booking_openai.py b/examples/static/movie_booking_openai.py deleted file mode 100644 index d0370ce..0000000 --- a/examples/static/movie_booking_openai.py +++ /dev/null @@ -1,314 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio -import os -import sys -from pathlib import Path - -import aiohttp -from dotenv import load_dotenv -from loguru import logger -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService -from pipecat.services.openai import OpenAILLMService -from pipecat.transports.services.daily import DailyParams, DailyTransport - -sys.path.append(str(Path(__file__).parent.parent)) -from runner import configure - -from pipecat_flows import StaticFlowManager - -load_dotenv(override=True) - -logger.remove(0) -logger.add(sys.stderr, level="DEBUG") - -# Flow Configuration - Movie Booking -# -# This configuration defines a movie ticket booking system with the following states: -# -# 1. start -# - Initial state where user chooses between today or tomorrow's showings -# - Functions: check_today, check_tomorrow -# - Pre-action: Welcome message -# - Transitions to: check_today or check_tomorrow -# -# 2. check_today -# - Handles movie selection for today's showings -# - Functions: -# * select_movie (node function with today's movies) -# * select_showtime (node function with available times) -# * end (transitions to end node after confirmation) -# - Pre-action: Today's movie listing message -# -# 3. check_tomorrow -# - Handles movie selection for tomorrow's showings -# - Functions: -# * select_movie (node function with tomorrow's movies) -# * select_showtime (node function with available times) -# * end (transitions to end node after confirmation) -# - Pre-action: Tomorrow's movie listing message -# -# 4. end -# - Final state that closes the conversation -# - No functions available -# - Pre-action: Ticket confirmation message -# - Post-action: Ends conversation -# -# Note: Both check_today and check_tomorrow allow multiple selections -# until the user confirms their final choice - -flow_config = { - "initial_node": "start", - "nodes": { - "start": { - "messages": [ - { - "role": "system", - "content": "For this step, ask if they want to see what's playing today or tomorrow, and wait for them to choose. Start with a warm greeting and be helpful and enthusiastic; you're helping them plan their entertainment.", - } - ], - "functions": [ - { - "type": "function", - "function": { - "name": "check_today", - "description": "User wants to see today's movies", - "parameters": {"type": "object", "properties": {}}, - }, - }, - { - "type": "function", - "function": { - "name": "check_tomorrow", - "description": "User wants to see tomorrow's movies", - "parameters": {"type": "object", "properties": {}}, - }, - }, - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Welcome to MoviePlex! Let me help you book some tickets.", - } - ], - }, - "check_today": { - "messages": [ - { - "role": "system", - "content": "You are handling today's movie selection. Use the available functions:\n - Use select_movie when the user chooses a movie (can be used multiple times if they change their mind)\n - Use select_showtime after they've chosen a movie to pick their preferred time\n - Use the end function ONLY when the user confirms their final selection\n\nAfter each selection, confirm their choice and ask about the next step. Remember to be enthusiastic and helpful.\n\nStart by telling them today's available movies: 'Jurassic Park' at 3:00 PM and 7:00 PM, or 'The Matrix' at 4:00 PM and 8:00 PM.", - } - ], - "functions": [ - { - "type": "function", - "function": { - "name": "select_movie", - "description": "Record the selected movie", - "parameters": { - "type": "object", - "properties": { - "movie": { - "type": "string", - "enum": ["Jurassic Park", "The Matrix"], - "description": "Selected movie", - } - }, - "required": ["movie"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "select_showtime", - "description": "Record the selected showtime", - "parameters": { - "type": "object", - "properties": { - "time": { - "type": "string", - "enum": ["3:00 PM", "4:00 PM", "7:00 PM", "8:00 PM"], - "description": "Selected showtime", - } - }, - "required": ["time"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "end", - "description": "Complete the booking (use only after user confirms)", - "parameters": {"type": "object", "properties": {}}, - }, - }, - ], - "pre_actions": [{"type": "tts_say", "text": "Let me show you what's playing today..."}], - }, - "check_tomorrow": { - "messages": [ - { - "role": "system", - "content": "You are handling tomorrow's movie selection. Use the available functions:\n - Use select_movie when the user chooses a movie (can be used multiple times if they change their mind)\n - Use select_showtime after they've chosen a movie to pick their preferred time\n - Use the end function ONLY when the user confirms their final selection\n\nAfter each selection, confirm their choice and ask about the next step. Remember to be enthusiastic and helpful.\n\nStart by telling them tomorrow's available movies: 'The Lion King' at 2:00 PM and 6:00 PM, or 'Inception' at 3:00 PM and 7:00 PM.", - } - ], - "functions": [ - { - "type": "function", - "function": { - "name": "select_movie", - "description": "Record the selected movie", - "parameters": { - "type": "object", - "properties": { - "movie": { - "type": "string", - "enum": ["The Lion King", "Inception"], - "description": "Selected movie", - } - }, - "required": ["movie"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "select_showtime", - "description": "Record the selected showtime", - "parameters": { - "type": "object", - "properties": { - "time": { - "type": "string", - "enum": ["2:00 PM", "3:00 PM", "6:00 PM", "7:00 PM"], - "description": "Selected showtime", - } - }, - "required": ["time"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "end", - "description": "Complete the booking (use only after user confirms)", - "parameters": {"type": "object", "properties": {}}, - }, - }, - ], - "pre_actions": [ - {"type": "tts_say", "text": "Let me show you what's playing tomorrow..."} - ], - }, - "end": { - "messages": [ - { - "role": "system", - "content": "The booking is complete. Thank the user enthusiastically and end the conversation.", - } - ], - "functions": [], - "pre_actions": [ - {"type": "tts_say", "text": "Your tickets are confirmed! Enjoy the show!"} - ], - "post_actions": [{"type": "end_conversation"}], - }, - }, -} - - -# Node function handlers -async def select_movie_handler(function_name, tool_call_id, args, llm, context, result_callback): - movie = args["movie"] - # In a real app, this would store the selection - await result_callback({"status": "success", "movie": movie}) - - -async def select_showtime_handler(function_name, tool_call_id, args, llm, context, result_callback): - time = args["time"] - # In a real app, this would store the selection - await result_callback({"status": "success", "time": time}) - - -async def main(): - async with aiohttp.ClientSession() as session: - (room_url, _) = await configure(session) - - transport = DailyTransport( - room_url, - None, - "Movie booking bot", - DailyParams( - audio_out_enabled=True, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, - ), - ) - - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") - llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - - # Register node function handlers with LLM - llm.register_function("select_movie", select_movie_handler) - llm.register_function("select_showtime", select_showtime_handler) - - # Get initial tools from the first node - initial_tools = flow_config["nodes"]["start"]["functions"] - - # Create initial context - messages = [ - { - "role": "system", - "content": "You are a movie ticket booking assistant. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversations and your responses will be converted to audio. Avoid outputting special characters and emojis.", - } - ] - - context = OpenAILLMContext(messages, initial_tools) - context_aggregator = llm.create_context_aggregator(context) - - pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, # STT - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses - ] - ) - - task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) - - # Initialize flow manager - flow_manager = StaticFlowManager(flow_config, task, llm, tts) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - await transport.capture_participant_transcription(participant["id"]) - # Initialize the flow processor - await flow_manager.initialize(messages) - # Kick off the conversation using the context aggregator - await task.queue_frames([context_aggregator.user().get_context_frame()]) - - runner = PipelineRunner() - await runner.run(task) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/static/movie_explorer_anthropic.py b/examples/static/movie_explorer_anthropic.py index 22c8747..dff5d0b 100644 --- a/examples/static/movie_explorer_anthropic.py +++ b/examples/static/movie_explorer_anthropic.py @@ -43,7 +43,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import StaticFlowManager +from pipecat_flows import FlowArgs, FlowConfig, FlowManager, FlowResult load_dotenv(override=True) @@ -195,22 +195,22 @@ async def fetch_similar_movies( # Function handlers for the LLM # These are node functions that perform operations without changing conversation state -async def get_movies_handler(function_name, tool_call_id, args, llm, context, result_callback): +# Function handlers for the LLM +# These are node functions that perform operations without changing conversation state +async def get_movies() -> FlowResult: """Handler for fetching current movies.""" logger.debug("Calling TMDB API: get_movies") async with aiohttp.ClientSession() as session: try: movies = await tmdb_api.fetch_current_movies(session) logger.debug(f"TMDB API Response: {movies}") - await result_callback({"movies": movies}) + return {"movies": movies} except Exception as e: logger.error(f"TMDB API Error: {e}") - await result_callback({"error": "Failed to fetch movies"}) + return {"error": "Failed to fetch movies"} -async def get_movie_details_handler( - function_name, tool_call_id, args, llm, context, result_callback -): +async def get_movie_details(args: FlowArgs) -> FlowResult: """Handler for fetching movie details including cast.""" movie_id = args["movie_id"] logger.debug(f"Calling TMDB API: get_movie_details for ID {movie_id}") @@ -218,15 +218,13 @@ async def get_movie_details_handler( try: details = await tmdb_api.fetch_movie_details(session, movie_id) logger.debug(f"TMDB API Response: {details}") - await result_callback(details) + return details except Exception as e: logger.error(f"TMDB API Error: {e}") - await result_callback({"error": f"Failed to fetch details for movie {movie_id}"}) + return {"error": f"Failed to fetch details for movie {movie_id}"} -async def get_similar_movies_handler( - function_name, tool_call_id, args, llm, context, result_callback -): +async def get_similar_movies(args: FlowArgs) -> FlowResult: """Handler for fetching similar movies.""" movie_id = args["movie_id"] logger.debug(f"Calling TMDB API: get_similar_movies for ID {movie_id}") @@ -234,14 +232,14 @@ async def get_similar_movies_handler( try: similar = await tmdb_api.fetch_similar_movies(session, movie_id) logger.debug(f"TMDB API Response: {similar}") - await result_callback({"movies": similar}) + return {"movies": similar} except Exception as e: logger.error(f"TMDB API Error: {e}") - await result_callback({"error": f"Failed to fetch similar movies for {movie_id}"}) + return {"error": f"Failed to fetch similar movies for {movie_id}"} # Flow configuration -flow_config = { +flow_config: FlowConfig = { "initial_node": "greeting", "nodes": { "greeting": { @@ -259,6 +257,7 @@ async def get_similar_movies_handler( "functions": [ { "name": "get_movies", + "handler": get_movies, "description": "Fetch currently playing movies", "input_schema": {"type": "object", "properties": {}}, }, @@ -290,6 +289,7 @@ async def get_similar_movies_handler( "functions": [ { "name": "get_movie_details", + "handler": get_movie_details, "description": "Get details about a specific movie including cast", "input_schema": { "type": "object", @@ -301,6 +301,7 @@ async def get_similar_movies_handler( }, { "name": "get_similar_movies", + "handler": get_similar_movies, "description": "Get similar movies as recommendations", "input_schema": { "type": "object", @@ -364,11 +365,6 @@ async def main(): api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-latest" ) - # Register node function handlers first - llm.register_function("get_movies", get_movies_handler) - llm.register_function("get_movie_details", get_movie_details_handler) - llm.register_function("get_similar_movies", get_similar_movies_handler) - # Get initial tools from the first node initial_tools = flow_config["nodes"]["greeting"]["functions"] @@ -403,7 +399,7 @@ async def main(): task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Initialize flow manager - flow_manager = StaticFlowManager(flow_config, task, llm, tts) + flow_manager = FlowManager(task, llm, tts, flow_config) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/static/movie_explorer_gemini.py b/examples/static/movie_explorer_gemini.py index a2f9116..cfb6fe0 100644 --- a/examples/static/movie_explorer_gemini.py +++ b/examples/static/movie_explorer_gemini.py @@ -43,7 +43,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import StaticFlowManager +from pipecat_flows import FlowArgs, FlowConfig, FlowManager, FlowResult load_dotenv(override=True) @@ -195,22 +195,20 @@ async def fetch_similar_movies( # Function handlers for the LLM # These are node functions that perform operations without changing conversation state -async def get_movies_handler(function_name, tool_call_id, args, llm, context, result_callback): +async def get_movies() -> FlowResult: """Handler for fetching current movies.""" logger.debug("Calling TMDB API: get_movies") async with aiohttp.ClientSession() as session: try: movies = await tmdb_api.fetch_current_movies(session) logger.debug(f"TMDB API Response: {movies}") - await result_callback({"movies": movies}) + return {"movies": movies} except Exception as e: logger.error(f"TMDB API Error: {e}") - await result_callback({"error": "Failed to fetch movies"}) + return {"error": "Failed to fetch movies"} -async def get_movie_details_handler( - function_name, tool_call_id, args, llm, context, result_callback -): +async def get_movie_details(args: FlowArgs) -> FlowResult: """Handler for fetching movie details including cast.""" movie_id = args["movie_id"] logger.debug(f"Calling TMDB API: get_movie_details for ID {movie_id}") @@ -218,15 +216,13 @@ async def get_movie_details_handler( try: details = await tmdb_api.fetch_movie_details(session, movie_id) logger.debug(f"TMDB API Response: {details}") - await result_callback(details) + return details except Exception as e: logger.error(f"TMDB API Error: {e}") - await result_callback({"error": f"Failed to fetch details for movie {movie_id}"}) + return {"error": f"Failed to fetch details for movie {movie_id}"} -async def get_similar_movies_handler( - function_name, tool_call_id, args, llm, context, result_callback -): +async def get_similar_movies(args: FlowArgs) -> FlowResult: """Handler for fetching similar movies.""" movie_id = args["movie_id"] logger.debug(f"Calling TMDB API: get_similar_movies for ID {movie_id}") @@ -234,14 +230,14 @@ async def get_similar_movies_handler( try: similar = await tmdb_api.fetch_similar_movies(session, movie_id) logger.debug(f"TMDB API Response: {similar}") - await result_callback({"movies": similar}) + return {"movies": similar} except Exception as e: logger.error(f"TMDB API Error: {e}") - await result_callback({"error": f"Failed to fetch similar movies for {movie_id}"}) + return {"error": f"Failed to fetch similar movies for {movie_id}"} # Flow configuration -flow_config = { +flow_config: FlowConfig = { "initial_node": "greeting", "nodes": { "greeting": { @@ -254,7 +250,11 @@ async def get_similar_movies_handler( "functions": [ { "function_declarations": [ - {"name": "get_movies", "description": "Fetch currently playing movies"}, + { + "name": "get_movies", + "handler": get_movies, + "description": "Fetch currently playing movies", + }, {"name": "explore_movie", "description": "Move to movie exploration"}, ] } @@ -278,6 +278,7 @@ async def get_similar_movies_handler( "function_declarations": [ { "name": "get_movie_details", + "handler": get_movie_details, "description": "Get details about a specific movie including cast", "parameters": { "type": "object", @@ -289,6 +290,7 @@ async def get_similar_movies_handler( }, { "name": "get_similar_movies", + "handler": get_similar_movies, "description": "Get similar movies as recommendations", "parameters": { "type": "object", @@ -337,11 +339,6 @@ async def main(): tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-1.5-flash-latest") - # Register node function handlers first - llm.register_function("get_movies", get_movies_handler) - llm.register_function("get_movie_details", get_movie_details_handler) - llm.register_function("get_similar_movies", get_similar_movies_handler) - # Get initial tools initial_tools = [ { @@ -379,7 +376,7 @@ async def main(): task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Initialize flow manager - flow_manager = StaticFlowManager(flow_config, task, llm, tts) + flow_manager = FlowManager(task, llm, tts, flow_config) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/static/movie_explorer.py b/examples/static/movie_explorer_openai.py similarity index 92% rename from examples/static/movie_explorer.py rename to examples/static/movie_explorer_openai.py index f368c69..edc5f63 100644 --- a/examples/static/movie_explorer.py +++ b/examples/static/movie_explorer_openai.py @@ -43,7 +43,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import StaticFlowManager +from pipecat_flows import FlowArgs, FlowConfig, FlowManager, FlowResult load_dotenv(override=True) @@ -195,22 +195,20 @@ async def fetch_similar_movies( # Function handlers for the LLM # These are node functions that perform operations without changing conversation state -async def get_movies_handler(function_name, tool_call_id, args, llm, context, result_callback): +async def get_movies() -> FlowResult: """Handler for fetching current movies.""" logger.debug("Calling TMDB API: get_movies") async with aiohttp.ClientSession() as session: try: movies = await tmdb_api.fetch_current_movies(session) logger.debug(f"TMDB API Response: {movies}") - await result_callback({"movies": movies}) + return {"movies": movies} except Exception as e: logger.error(f"TMDB API Error: {e}") - await result_callback({"error": "Failed to fetch movies"}) + return {"error": "Failed to fetch movies"} -async def get_movie_details_handler( - function_name, tool_call_id, args, llm, context, result_callback -): +async def get_movie_details(args: FlowArgs) -> FlowResult: """Handler for fetching movie details including cast.""" movie_id = args["movie_id"] logger.debug(f"Calling TMDB API: get_movie_details for ID {movie_id}") @@ -218,15 +216,13 @@ async def get_movie_details_handler( try: details = await tmdb_api.fetch_movie_details(session, movie_id) logger.debug(f"TMDB API Response: {details}") - await result_callback(details) + return details except Exception as e: logger.error(f"TMDB API Error: {e}") - await result_callback({"error": f"Failed to fetch details for movie {movie_id}"}) + return {"error": f"Failed to fetch details for movie {movie_id}"} -async def get_similar_movies_handler( - function_name, tool_call_id, args, llm, context, result_callback -): +async def get_similar_movies(args: FlowArgs) -> FlowResult: """Handler for fetching similar movies.""" movie_id = args["movie_id"] logger.debug(f"Calling TMDB API: get_similar_movies for ID {movie_id}") @@ -234,14 +230,14 @@ async def get_similar_movies_handler( try: similar = await tmdb_api.fetch_similar_movies(session, movie_id) logger.debug(f"TMDB API Response: {similar}") - await result_callback({"movies": similar}) + return {"movies": similar} except Exception as e: logger.error(f"TMDB API Error: {e}") - await result_callback({"error": f"Failed to fetch similar movies for {movie_id}"}) + return {"error": f"Failed to fetch similar movies for {movie_id}"} # Flow configuration -flow_config = { +flow_config: FlowConfig = { "initial_node": "greeting", "nodes": { "greeting": { @@ -256,6 +252,7 @@ async def get_similar_movies_handler( "type": "function", "function": { "name": "get_movies", + "handler": get_movies, "description": "Fetch currently playing movies", "parameters": {"type": "object", "properties": {}}, }, @@ -288,6 +285,7 @@ async def get_similar_movies_handler( "type": "function", "function": { "name": "get_movie_details", + "handler": get_movie_details, "description": "Get details about a specific movie including cast", "parameters": { "type": "object", @@ -302,6 +300,7 @@ async def get_similar_movies_handler( "type": "function", "function": { "name": "get_similar_movies", + "handler": get_similar_movies, "description": "Get similar movies as recommendations", "parameters": { "type": "object", @@ -363,11 +362,6 @@ async def main(): tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - # Register node function handlers first - llm.register_function("get_movies", get_movies_handler) - llm.register_function("get_movie_details", get_movie_details_handler) - llm.register_function("get_similar_movies", get_similar_movies_handler) - # Get initial tools from the first node initial_tools = flow_config["nodes"]["greeting"]["functions"] @@ -397,7 +391,7 @@ async def main(): task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Initialize flow manager - flow_manager = StaticFlowManager(flow_config, task, llm, tts) + flow_manager = FlowManager(task, llm, tts, flow_config) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/static/patient_intake.py b/examples/static/patient_intake.py index 59ce3af..06009e6 100644 --- a/examples/static/patient_intake.py +++ b/examples/static/patient_intake.py @@ -24,7 +24,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import StaticFlowManager +from pipecat_flows import FlowArgs, FlowConfig, FlowManager, FlowResult load_dotenv(override=True) @@ -84,7 +84,44 @@ # - Pre-action: Thank you message # - Post-action: Ends conversation -flow_config = { + +async def verify_birthday(args: FlowArgs) -> FlowResult: + """Handler for birthday verification.""" + birthday = args["birthday"] + # In a real app, this would verify against patient records + is_valid = birthday == "1983-01-01" + return {"status": "success", "verified": is_valid} + + +async def record_prescriptions(args: FlowArgs) -> FlowResult: + """Handler for recording prescriptions.""" + prescriptions = args["prescriptions"] + # In a real app, this would store in patient records + return {"status": "success", "count": len(prescriptions)} + + +async def record_allergies(args: FlowArgs) -> FlowResult: + """Handler for recording allergies.""" + allergies = args["allergies"] + # In a real app, this would store in patient records + return {"status": "success", "count": len(allergies)} + + +async def record_conditions(args: FlowArgs) -> FlowResult: + """Handler for recording medical conditions.""" + conditions = args["conditions"] + # In a real app, this would store in patient records + return {"status": "success", "count": len(conditions)} + + +async def record_visit_reasons(args: FlowArgs) -> FlowResult: + """Handler for recording visit reasons.""" + visit_reasons = args["visit_reasons"] + # In a real app, this would store in patient records + return {"status": "success", "count": len(visit_reasons)} + + +flow_config: FlowConfig = { "initial_node": "start", "nodes": { "start": { @@ -99,6 +136,7 @@ "type": "function", "function": { "name": "verify_birthday", + "handler": verify_birthday, "description": "Verify the user has provided their correct birthday", "parameters": { "type": "object", @@ -121,9 +159,6 @@ }, }, ], - "pre_actions": [ - {"type": "tts_say", "text": "Hello, I'm Jessica from Tri-County Health Services."} - ], }, "get_prescriptions": { "messages": [ @@ -137,6 +172,7 @@ "type": "function", "function": { "name": "record_prescriptions", + "handler": record_prescriptions, "description": "Record the user's prescriptions", "parameters": { "type": "object", @@ -185,6 +221,7 @@ "type": "function", "function": { "name": "record_allergies", + "handler": record_allergies, "description": "Record the user's allergies", "parameters": { "type": "object", @@ -229,6 +266,7 @@ "type": "function", "function": { "name": "record_conditions", + "handler": record_conditions, "description": "Record the user's medical conditions", "parameters": { "type": "object", @@ -273,6 +311,7 @@ "type": "function", "function": { "name": "record_visit_reasons", + "handler": record_visit_reasons, "description": "Record the reasons for their visit", "parameters": { "type": "object", @@ -375,51 +414,6 @@ } -# Node function handlers -async def verify_birthday_handler(function_name, tool_call_id, args, llm, context, result_callback): - """Handler for birthday verification.""" - birthday = args["birthday"] - # In a real app, this would verify against patient records - is_valid = birthday == "1983-01-01" - await result_callback({"verified": is_valid}) - - -async def record_prescriptions_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - """Handler for recording prescriptions.""" - prescriptions = args["prescriptions"] - # In a real app, this would store in patient records - await result_callback({"status": "recorded", "count": len(prescriptions)}) - - -async def record_allergies_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - """Handler for recording allergies.""" - allergies = args["allergies"] - # In a real app, this would store in patient records - await result_callback({"status": "recorded", "count": len(allergies)}) - - -async def record_conditions_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - """Handler for recording medical conditions.""" - conditions = args["conditions"] - # In a real app, this would store in patient records - await result_callback({"status": "recorded", "count": len(conditions)}) - - -async def record_visit_reasons_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - """Handler for recording visit reasons.""" - visit_reasons = args["visit_reasons"] - # In a real app, this would store in patient records - await result_callback({"status": "recorded", "count": len(visit_reasons)}) - - async def main(): """Main function to set up and run the patient intake bot.""" async with aiohttp.ClientSession() as session: @@ -441,13 +435,6 @@ async def main(): tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-asteria-en") llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - # Register node function handlers with LLM - llm.register_function("verify_birthday", verify_birthday_handler) - llm.register_function("record_prescriptions", record_prescriptions_handler) - llm.register_function("record_allergies", record_allergies_handler) - llm.register_function("record_conditions", record_conditions_handler) - llm.register_function("record_visit_reasons", record_visit_reasons_handler) - # Get initial tools from the first node initial_tools = flow_config["nodes"]["start"]["functions"] @@ -477,7 +464,7 @@ async def main(): task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Initialize flow manager with LLM - flow_manager = StaticFlowManager(flow_config, task, llm, tts) + flow_manager = FlowManager(task, llm, tts, flow_config) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/static/restaurant_reservation.py b/examples/static/restaurant_reservation.py index 2991b31..e4b0ab8 100644 --- a/examples/static/restaurant_reservation.py +++ b/examples/static/restaurant_reservation.py @@ -24,7 +24,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import StaticFlowManager +from pipecat_flows import FlowArgs, FlowConfig, FlowManager, FlowResult load_dotenv(override=True) @@ -65,7 +65,22 @@ # a state) and edge functions (which transition between states), while maintaining a # clear and efficient reservation process. -flow_config = { + +async def record_party_size(args: FlowArgs) -> FlowResult: + """Handler for recording party size.""" + size = args["size"] + # In a real app, this would store the reservation details + return {"status": "success", "size": size} + + +async def record_time(args: FlowArgs) -> FlowResult: + """Handler for recording reservation time.""" + time = args["time"] + # In a real app, this would validate availability and store the time + return {"status": "success", "time": time} + + +flow_config: FlowConfig = { "initial_node": "start", "nodes": { "start": { @@ -80,6 +95,7 @@ "type": "function", "function": { "name": "record_party_size", + "handler": record_party_size, "description": "Record the number of people in the party", "parameters": { "type": "object", @@ -112,6 +128,7 @@ "type": "function", "function": { "name": "record_time", + "handler": record_time, "description": "Record the requested time", "parameters": { "type": "object", @@ -162,23 +179,6 @@ } -# Node function handlers -async def record_party_size_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - """Handler for recording party size.""" - size = args["size"] - # In a real app, this would store the reservation details - await result_callback({"status": "success", "size": size}) - - -async def record_time_handler(function_name, tool_call_id, args, llm, context, result_callback): - """Handler for recording reservation time.""" - time = args["time"] - # In a real app, this would validate availability and store the time - await result_callback({"status": "success", "time": time}) - - async def main(): async with aiohttp.ClientSession() as session: (room_url, _) = await configure(session) @@ -199,10 +199,6 @@ async def main(): tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - # Register node function handlers with LLM - llm.register_function("record_party_size", record_party_size_handler) - llm.register_function("record_time", record_time_handler) - # Get initial tools from the first node initial_tools = flow_config["nodes"]["start"]["functions"] @@ -232,7 +228,7 @@ async def main(): task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Initialize flow manager with LLM - flow_manager = StaticFlowManager(flow_config, task, llm, tts) + flow_manager = FlowManager(task, llm, tts, flow_config) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/static/travel_planner_openai.py b/examples/static/travel_planner.py similarity index 94% rename from examples/static/travel_planner_openai.py rename to examples/static/travel_planner.py index ca313ea..96322a3 100644 --- a/examples/static/travel_planner_openai.py +++ b/examples/static/travel_planner.py @@ -24,7 +24,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import StaticFlowManager +from pipecat_flows import FlowArgs, FlowConfig, FlowManager, FlowResult load_dotenv(override=True) @@ -76,7 +76,31 @@ # - No functions available # - Post-action: Ends conversation -flow_config = { + +# Node function handlers +async def select_destination(args: FlowArgs) -> FlowResult: + """Handler for destination selection.""" + destination = args["destination"] + # In a real app, this would store the selection + return {"status": "success", "destination": destination} + + +async def record_dates(args: FlowArgs) -> FlowResult: + """Handler for travel date recording.""" + check_in = args["check_in"] + check_out = args["check_out"] + # In a real app, this would validate and store the dates + return {"status": "success", "check_in": check_in, "check_out": check_out} + + +async def record_activities(args: FlowArgs) -> FlowResult: + """Handler for activity selection.""" + activities = args["activities"] + # In a real app, this would validate and store the activities + return {"status": "success", "activities": activities} + + +flow_config: FlowConfig = { "initial_node": "start", "nodes": { "start": { @@ -123,6 +147,7 @@ "type": "function", "function": { "name": "select_destination", + "handler": select_destination, "description": "Record the selected beach destination", "parameters": { "type": "object", @@ -162,6 +187,7 @@ "type": "function", "function": { "name": "select_destination", + "handler": select_destination, "description": "Record the selected mountain destination", "parameters": { "type": "object", @@ -201,6 +227,7 @@ "type": "function", "function": { "name": "record_dates", + "handler": record_dates, "description": "Record the selected travel dates", "parameters": { "type": "object", @@ -242,6 +269,7 @@ "type": "function", "function": { "name": "record_activities", + "handler": record_activities, "description": "Record selected activities", "parameters": { "type": "object", @@ -329,33 +357,6 @@ } -# Node function handlers -async def select_destination_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - """Handler for destination selection.""" - destination = args["destination"] - # In a real app, this would store the selection - await result_callback({"status": "success", "destination": destination}) - - -async def record_dates_handler(function_name, tool_call_id, args, llm, context, result_callback): - """Handler for travel date recording.""" - check_in = args["check_in"] - check_out = args["check_out"] - # In a real app, this would validate and store the dates - await result_callback({"status": "success", "check_in": check_in, "check_out": check_out}) - - -async def record_activities_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - """Handler for activity selection.""" - activities = args["activities"] - # In a real app, this would validate and store the activities - await result_callback({"status": "success", "activities": activities}) - - async def main(): """Main function to set up and run the travel planning bot.""" async with aiohttp.ClientSession() as session: @@ -377,11 +378,6 @@ async def main(): tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - # Register node function handlers with LLM - llm.register_function("select_destination", select_destination_handler) - llm.register_function("record_dates", record_dates_handler) - llm.register_function("record_activities", record_activities_handler) - # Get initial tools from the first node initial_tools = flow_config["nodes"]["start"]["functions"] @@ -411,7 +407,7 @@ async def main(): task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Initialize flow manager with LLM - flow_manager = StaticFlowManager(flow_config, task, llm, tts) + flow_manager = FlowManager(task, llm, tts, flow_config) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/static/travel_planner_gemini.py b/examples/static/travel_planner_gemini.py deleted file mode 100644 index 2f4e1f8..0000000 --- a/examples/static/travel_planner_gemini.py +++ /dev/null @@ -1,391 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# -# Requirements: -# - TMDB API key (https://www.themoviedb.org/documentation/api) -# - Daily room URL -# - Google API key (also, pip install pipecat-ai[google]) -# - Deepgram API key - -import asyncio -import os -import sys -from pathlib import Path - -import aiohttp -from dotenv import load_dotenv -from loguru import logger -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService -from pipecat.services.google import GoogleLLMService -from pipecat.transports.services.daily import DailyParams, DailyTransport - -sys.path.append(str(Path(__file__).parent.parent)) -from runner import configure - -from pipecat_flows import StaticFlowManager - -load_dotenv(override=True) - -logger.remove(0) -logger.add(sys.stderr, level="DEBUG") - -# Flow Configuration - Travel Planner -# -# This configuration defines a vacation planning system with the following states: -# -# 1. start -# - Initial state where user chooses between beach or mountain vacation -# - Functions: choose_beach, choose_mountain -# - Pre-action: Welcome message -# - Transitions to: choose_beach or choose_mountain -# -# 2. choose_beach/choose_mountain -# - Handles destination selection for chosen vacation type -# - Functions: -# * select_destination (node function with location-specific options) -# * get_dates (transitions to date selection) -# - Pre-action: Destination-specific welcome message -# -# 3. get_dates -# - Handles travel date selection -# - Functions: -# * record_dates (node function, can be modified) -# * get_activities (transitions to activity selection) -# -# 4. get_activities -# - Handles activity preference selection -# - Functions: -# * record_activities (node function, array-based selection) -# * verify_itinerary (transitions to verification) -# -# 5. verify_itinerary -# - Reviews complete vacation plan -# - Functions: -# * revise_plan (loops back to get_dates) -# * confirm_booking (transitions to confirmation) -# -# 6. confirm_booking -# - Handles final confirmation and tips -# - Functions: end -# - Pre-action: Confirmation message -# -# 7. end -# - Final state that closes the conversation -# - No functions available -# - Post-action: Ends conversation - -flow_config = { - "initial_node": "start", - "nodes": { - "start": { - "messages": [ - { - "role": "user", - "content": "For this step, ask if they're interested in planning a beach vacation or a mountain retreat, and wait for them to choose. Start with an enthusiastic greeting and be conversational; you're helping them plan their dream vacation.", - } - ], - "functions": [ - { - "function_declarations": [ - { - "name": "choose_beach", - "description": "User wants to plan a beach vacation", - }, - { - "name": "choose_mountain", - "description": "User wants to plan a mountain retreat", - }, - ] - } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Welcome to Dream Vacations! I'll help you plan your perfect getaway.", - } - ], - }, - "choose_beach": { - "messages": [ - { - "role": "user", - "content": "You are handling beach vacation planning. Use the available functions:\n - Use select_destination when the user chooses their preferred beach location\n - Use get_dates once they've selected a destination\n\nAvailable beach destinations are: 'Maui', 'Cancun', or 'Maldives'. After they choose, confirm their selection and proceed to dates. Be enthusiastic and paint a picture of each destination.", - } - ], - "functions": [ - { - "function_declarations": [ - { - "name": "select_destination", - "description": "Record the selected beach destination", - "parameters": { - "type": "object", - "properties": { - "destination": { - "type": "string", - "enum": ["Maui", "Cancun", "Maldives"], - "description": "Selected beach destination", - } - }, - "required": ["destination"], - }, - }, - {"name": "get_dates", "description": "Proceed to date selection"}, - ] - } - ], - "pre_actions": [ - {"type": "tts_say", "text": "Let's find your perfect beach paradise..."} - ], - }, - "choose_mountain": { - "messages": [ - { - "role": "user", - "content": "You are handling mountain retreat planning. Use the available functions:\n - Use select_destination when the user chooses their preferred mountain location\n - Use get_dates once they've selected a destination\n\nAvailable mountain destinations are: 'Swiss Alps', 'Rocky Mountains', or 'Himalayas'. After they choose, confirm their selection and proceed to dates. Be enthusiastic and paint a picture of each destination.", - } - ], - "functions": [ - { - "function_declarations": [ - { - "name": "select_destination", - "description": "Record the selected mountain destination", - "parameters": { - "type": "object", - "properties": { - "destination": { - "type": "string", - "enum": ["Swiss Alps", "Rocky Mountains", "Himalayas"], - "description": "Selected mountain destination", - } - }, - "required": ["destination"], - }, - }, - {"name": "get_dates", "description": "Proceed to date selection"}, - ] - } - ], - "pre_actions": [ - {"type": "tts_say", "text": "Let's find your perfect mountain getaway..."} - ], - }, - "get_dates": { - "messages": [ - { - "role": "user", - "content": "Handle travel date selection. Use the available functions:\n - Use record_dates when the user specifies their travel dates (can be used multiple times if they change their mind)\n - Use get_activities once dates are confirmed\n\nAsk for their preferred travel dates within the next 6 months. After recording dates, confirm the selection and proceed to activities.", - } - ], - "functions": [ - { - "function_declarations": [ - { - "name": "record_dates", - "description": "Record the selected travel dates", - "parameters": { - "type": "object", - "properties": { - "check_in": { - "type": "string", - "description": "Check-in date in YYYY-MM-DD format", - }, - "check_out": { - "type": "string", - "description": "Check-out date in YYYY-MM-DD format", - }, - }, - "required": ["check_in", "check_out"], - }, - }, - {"name": "get_activities", "description": "Proceed to activity selection"}, - ] - } - ], - }, - "get_activities": { - "messages": [ - { - "role": "user", - "content": "Handle activity preferences. Use the available functions:\n - Use record_activities to save their activity preferences\n - Use verify_itinerary once activities are selected\n\nFor beach destinations, suggest: snorkeling, surfing, sunset cruise\nFor mountain destinations, suggest: hiking, skiing, mountain biking\n\nAfter they choose, confirm their selections and proceed to verification.", - } - ], - "functions": [ - { - "function_declarations": [ - { - "name": "record_activities", - "description": "Record selected activities (choose 1-3 activities)", - "parameters": { - "type": "object", - "properties": { - "activities": { - "type": "array", - "items": {"type": "string"}, - "description": "Selected activities (1-3 choices)", - } - }, - "required": ["activities"], - }, - }, - { - "name": "verify_itinerary", - "description": "Proceed to itinerary verification", - }, - ] - } - ], - }, - "verify_itinerary": { - "messages": [ - { - "role": "user", - "content": "Review the complete itinerary with the user. Summarize their destination, dates, and chosen activities. Use the available functions:\n - Use get_dates if they want to make changes\n - Use confirm_booking if they're happy with everything\n\nBe thorough in reviewing all details and ask for their confirmation.", - } - ], - "functions": [ - { - "function_declarations": [ - { - "name": "get_dates", - "description": "Return to date selection to revise the plan", - }, - { - "name": "confirm_booking", - "description": "Confirm the booking and proceed to end", - }, - ] - } - ], - }, - "confirm_booking": { - "messages": [ - { - "role": "user", - "content": "The booking is confirmed. Share some relevant tips about their chosen destination, thank them warmly, and use end to complete the conversation.", - } - ], - "functions": [ - {"function_declarations": [{"name": "end", "description": "End the conversation"}]} - ], - "pre_actions": [ - {"type": "tts_say", "text": "Fantastic! Your dream vacation is confirmed!"} - ], - }, - "end": { - "messages": [ - {"role": "user", "content": "Wish them a wonderful trip and end the conversation."} - ], - "functions": [], - "post_actions": [{"type": "end_conversation"}], - }, - }, -} - - -# Node function handlers -async def select_destination_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - """Handler for destination selection.""" - destination = args["destination"] - # In a real app, this would store the selection - await result_callback({"status": "success", "destination": destination}) - - -async def record_dates_handler(function_name, tool_call_id, args, llm, context, result_callback): - """Handler for travel date recording.""" - check_in = args["check_in"] - check_out = args["check_out"] - # In a real app, this would validate and store the dates - await result_callback({"status": "success", "check_in": check_in, "check_out": check_out}) - - -async def record_activities_handler( - function_name, tool_call_id, args, llm, context, result_callback -): - """Handler for activity selection.""" - activities = args["activities"] - # In a real app, this would validate and store the activities - await result_callback({"status": "success", "activities": activities}) - - -async def main(): - """Main function to set up and run the travel planning bot.""" - async with aiohttp.ClientSession() as session: - (room_url, _) = await configure(session) - - transport = DailyTransport( - room_url, - None, - "Planner Bot", - DailyParams( - audio_out_enabled=True, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, - ), - ) - - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") - llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-1.5-flash-latest") - - # Register node function handlers with LLM - llm.register_function("select_destination", select_destination_handler) - llm.register_function("record_dates", record_dates_handler) - llm.register_function("record_activities", record_activities_handler) - - # Get initial tools from the first node - initial_tools = flow_config["nodes"]["start"]["functions"] - - # Create initial context - messages = [ - { - "role": "system", - "content": "You are a travel planning assistant. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Avoid outputting special characters and emojis.", - } - ] - - context = OpenAILLMContext(messages, initial_tools) - context_aggregator = llm.create_context_aggregator(context) - - pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, # STT - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses - ] - ) - - task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) - - # Initialize flow manager with LLM - flow_manager = StaticFlowManager(flow_config, task, llm, tts) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - await transport.capture_participant_transcription(participant["id"]) - # Initialize the flow processor - await flow_manager.initialize(messages) - # Kick off the conversation using the context aggregator - await task.queue_frames([context_aggregator.user().get_context_frame()]) - - runner = PipelineRunner() - await runner.run(task) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/src/pipecat_flows/__init__.py b/src/pipecat_flows/__init__.py index 40d56e3..52030fe 100644 --- a/src/pipecat_flows/__init__.py +++ b/src/pipecat_flows/__init__.py @@ -6,34 +6,56 @@ """ Pipecat Flows -This package provides two approaches to managing conversation flows in Pipecat: +This package provides a framework for building structured conversations in Pipecat. +The FlowManager can handle both static and dynamic conversation flows: -1. Static Flows (StaticFlowManager): - - Predefined conversation paths - - Configuration-driven - - Best for straightforward flows +1. Static Flows: + - Configuration-driven conversations with predefined paths + - Entire flow structure defined upfront + - Example: + from pipecat_flows import FlowArgs, FlowResult -Example: - flow_manager = StaticFlowManager(flow_config, task, llm) - await flow_manager.initialize(messages) + async def collect_name(args: FlowArgs) -> FlowResult: + name = args["name"] + return {"status": "success", "name": name} -2. Dynamic Flows (DynamicFlowManager): - - Runtime-determined paths - - Callback-driven - - Best for complex, data-driven flows + flow_config = { + "initial_node": "greeting", + "nodes": { + "greeting": { + "messages": [...], + "functions": [{ + "type": "function", + "function": { + "name": "collect_name", + "handler": collect_name, + "description": "...", + "parameters": {...} + } + }] + } + } + } + flow_manager = FlowManager(task, llm, flow_config=flow_config) -Example: - flow_manager = DynamicFlowManager(task, llm, transition_callback=handle_transitions) - await flow_manager.register_functions(handlers) -""" +2. Dynamic Flows: + - Runtime-determined conversations + - Nodes created or modified during execution + - Example: + from pipecat_flows import FlowArgs, FlowResult + + async def collect_age(args: FlowArgs) -> FlowResult: + age = args["age"] + return {"status": "success", "age": age} -import warnings -from typing import Any + async def handle_transitions(function_name: str, args: Dict, flow_manager): + if function_name == "collect_age": + await flow_manager.set_node("next_step", create_next_node()) + + flow_manager = FlowManager(task, llm, transition_callback=handle_transitions) +""" from .adapters import LLMProvider -from .base import BaseFlowManager -from .config import FlowConfig, NodeConfig -from .dynamic import DynamicFlowManager from .exceptions import ( ActionError, FlowError, @@ -41,41 +63,16 @@ FlowTransitionError, InvalidFunctionError, ) -from .static import StaticFlowManager - - -def FlowManager(*args: Any, **kwargs: Any) -> StaticFlowManager: - """Deprecated: Use StaticFlowManager instead. - - This is a temporary compatibility wrapper that will be removed in a future version. - Please update your imports to use StaticFlowManager. - - Example: - # Old - from pipecat_flows import FlowManager - - # New - from pipecat_flows import StaticFlowManager - """ - warnings.warn( - "FlowManager has been renamed to StaticFlowManager and will be removed in a future version. " - "Please update your imports to use StaticFlowManager instead.", - DeprecationWarning, - stacklevel=2, - ) - return StaticFlowManager(*args, **kwargs) - +from .manager import FlowManager +from .types import FlowArgs, FlowConfig, FlowResult __all__ = [ - # Base Classes - "BaseFlowManager", - # Flow Managers - "StaticFlowManager", - "DynamicFlowManager", - "FlowManager", # Temporary backward compatibility - # Configuration + # Flow Manager + "FlowManager", + # Types + "FlowArgs", "FlowConfig", - "NodeConfig", + "FlowResult", # Exceptions "FlowError", "FlowInitializationError", diff --git a/src/pipecat_flows/adapters.py b/src/pipecat_flows/adapters.py index 64962a1..2fade68 100644 --- a/src/pipecat_flows/adapters.py +++ b/src/pipecat_flows/adapters.py @@ -8,6 +8,7 @@ from enum import Enum from typing import Any, Dict, List +from loguru import logger from pipecat.services.anthropic import AnthropicLLMService from pipecat.services.google import GoogleLLMService from pipecat.services.openai import OpenAILLMService @@ -98,8 +99,12 @@ class GeminiAdapter(LLMAdapter): def get_function_name(self, function_def: Dict[str, Any]) -> str: """Extract function name from provider-specific function definition.""" - if "name" in function_def: - return function_def["name"] + logger.debug(f"Getting function name from: {function_def}") + if "function_declarations" in function_def: + declarations = function_def["function_declarations"] + if declarations and isinstance(declarations, list): + # Return name of current function being processed + return declarations[0]["name"] return "" def get_function_args(self, function_call: Dict[str, Any]) -> dict: @@ -115,7 +120,14 @@ def format_functions(self, functions: List[Dict[str, Any]]) -> List[Dict[str, An all_declarations = [] for func in functions: if "function_declarations" in func: - all_declarations.extend(func["function_declarations"]) + # Process each declaration separately + for decl in func["function_declarations"]: + formatted_decl = { + "name": decl["name"], + "description": decl.get("description", ""), + "parameters": decl.get("parameters", {"type": "object", "properties": {}}), + } + all_declarations.append(formatted_decl) elif "function" in func: all_declarations.append( { @@ -124,14 +136,6 @@ def format_functions(self, functions: List[Dict[str, Any]]) -> List[Dict[str, An "parameters": func["function"].get("parameters", {}), } ) - else: - all_declarations.append( - { - "name": func["name"], - "description": func.get("description", ""), - "parameters": func.get("parameters", {}), - } - ) return [{"function_declarations": all_declarations}] if all_declarations else [] diff --git a/src/pipecat_flows/base.py b/src/pipecat_flows/base.py deleted file mode 100644 index d3d1e9a..0000000 --- a/src/pipecat_flows/base.py +++ /dev/null @@ -1,146 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# -""" -Base Flow Manager for Pipecat Flows - -This module provides the base functionality shared between static and dynamic -flow managers. It handles common operations like: -- Action management -- LLM context updates -- Basic initialization -- Frame queueing - -Both StaticFlowManager and DynamicFlowManager build upon this base. -""" - -from abc import ABC -from typing import Any, Callable, List, Optional - -from loguru import logger -from pipecat.frames.frames import ( - LLMMessagesAppendFrame, - LLMSetToolsFrame, -) -from pipecat.pipeline.task import PipelineTask - -from .actions import ActionManager -from .exceptions import FlowTransitionError - - -class BaseFlowManager(ABC): - """Base class for Pipecat flow managers. - - This abstract base class provides common functionality for both static and - dynamic flow managers. It handles basic operations like action management, - LLM context updates, and initialization. - - Attributes: - task: PipelineTask instance for queueing frames - llm: LLM service instance - tts: Optional TTS service - action_manager: Manages execution of pre/post actions - initialized: Whether the manager has been initialized - current_node: ID of current conversation node - - The base manager provides: - - Action registration and execution - - LLM context management - - Frame queueing - - Basic state tracking - """ - - def __init__(self, task: PipelineTask, llm: Any, tts: Optional[Any] = None) -> None: - """Initialize base flow manager. - - Args: - task: PipelineTask instance for queueing frames - llm: LLM service instance (e.g., OpenAI, Anthropic) - tts: Optional TTS service for voice actions - """ - self.task = task - self.llm = llm - self.tts = tts - self.action_manager = ActionManager(task, tts) - self.initialized = False - self.current_node: Optional[str] = None - - async def initialize(self, initial_messages: List[dict]) -> None: - """Initialize the flow with starting messages. - - This base implementation sets up the initial LLM context. - Subclasses should call super().initialize() before adding - their specific initialization logic. - - Args: - initial_messages: Initial system messages for the LLM - - Raises: - FlowInitializationError: If initialization fails - """ - if self.initialized: - logger.warning(f"{self.__class__.__name__} already initialized") - return - - self.initialized = True - logger.debug(f"Initialized {self.__class__.__name__}") - - def register_action(self, action_type: str, handler: Callable) -> None: - """Register a handler for a specific action type. - - Args: - action_type: String identifier for the action (e.g., "tts_say") - handler: Async or sync function that handles the action - - Example: - async def custom_notification(action: dict): - text = action.get("text", "") - await notify_user(text) - - flow_manager.register_action("notify", custom_notification) - """ - self.action_manager._register_action(action_type, handler) - - async def _update_llm_context(self, messages: List[dict], functions: List[dict]) -> None: - """Update LLM context with new messages and functions. - - Internal utility method for updating the LLM's context. Handles - queueing the appropriate frames in the correct order. - - Args: - messages: New messages to add to context - functions: New functions to make available - """ - await self.task.queue_frames( - [LLMMessagesAppendFrame(messages=messages), LLMSetToolsFrame(tools=functions)] - ) - - async def _execute_actions( - self, pre_actions: Optional[List[dict]] = None, post_actions: Optional[List[dict]] = None - ) -> None: - """Execute pre and post actions. - - Internal utility method for executing actions in the correct order. - - Args: - pre_actions: Actions to execute before context update - post_actions: Actions to execute after context update - """ - if pre_actions: - await self.action_manager.execute_actions(pre_actions) - if post_actions: - await self.action_manager.execute_actions(post_actions) - - async def _validate_initialization(self) -> None: - """Validate that the manager is properly initialized. - - This abstract method should be implemented by subclasses to - perform any necessary initialization validation. - - Raises: - FlowError: If validation fails - """ - if not self.initialized: - raise FlowTransitionError(f"{self.__class__.__name__} must be initialized first") diff --git a/src/pipecat_flows/dynamic.py b/src/pipecat_flows/dynamic.py deleted file mode 100644 index 79558cc..0000000 --- a/src/pipecat_flows/dynamic.py +++ /dev/null @@ -1,247 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -from typing import Any, Awaitable, Callable, Dict, List, Optional, Set - -from loguru import logger -from pipecat.frames.frames import ( - LLMMessagesUpdateFrame, - LLMSetToolsFrame, -) -from pipecat.pipeline.task import PipelineTask - -from .base import BaseFlowManager -from .exceptions import FlowError, FlowInitializationError - - -def create_handler_wrapper( - handler: Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]], -) -> Callable: - """Creates a Pipecat-compatible handler from a simplified function. - - Args: - handler: Simple async function that takes args dict and returns result dict - - Returns: - Wrapped handler compatible with Pipecat's LLM function calling system - """ - - async def wrapped( - function_name: str, - tool_call_id: str, - args: Dict[str, Any], - llm: Any, - context: Any, - result_callback: Callable, - ) -> None: - logger.debug(f"Handler called for {function_name} with args: {args}") - result = await handler(args) - await result_callback(result) - logger.debug(f"Handler completed for {function_name}") - - return wrapped - - -class DynamicFlowManager(BaseFlowManager): - """Manages dynamically created conversation flows. - - Designed for flows where nodes and functions are created during runtime. - Each node specifies its available functions, and the flow manager handles - registration and state transitions. - - Example: - async def handle_transitions( - function_name: str, - args: Dict[str, Any], - flow_manager: "DynamicFlowManager" - ) -> None: - # Query business logic - next_step = await get_next_step(args) - - # Create new node based on results - await flow_manager.set_node( - "next_step", - create_node_for_step(next_step) - ) - """ - - def __init__( - self, - task: PipelineTask, - llm: Any, - tts: Optional[Any] = None, - transition_callback: Optional[ - Callable[[str, Dict[str, Any], "DynamicFlowManager"], Awaitable[None]] - ] = None, - ): - """Initialize the dynamic flow manager. - - Args: - task: PipelineTask instance for queueing frames - llm: LLM service instance - tts: Optional TTS service for voice actions - transition_callback: Optional callback for handling transitions - """ - super().__init__(task, llm, tts) - self.transition_callback = transition_callback - self.state: Dict[str, Any] = {} - self.current_functions: Set[str] = set() - - async def initialize(self, initial_messages: List[dict]) -> None: - """Initialize the flow with starting messages. - - Args: - initial_messages: Initial system messages for the LLM - - Raises: - FlowInitializationError: If initialization fails - """ - await super().initialize(initial_messages) - - if not self.initialized: - return - - try: - # Set initial context with no tools - await self.task.queue_frame(LLMMessagesUpdateFrame(messages=initial_messages)) - await self.task.queue_frame(LLMSetToolsFrame(tools=[])) - logger.debug("Initialized dynamic flow manager") - except Exception as e: - self.initialized = False - raise FlowInitializationError(f"Failed to initialize flow: {str(e)}") from e - - async def set_node(self, node_id: str, node_config: Dict[str, Any]) -> None: - """Set up a new conversation node. - - Handles: - 1. Function registration for the node - 2. Pre-actions execution - 3. LLM context updates - 4. Post-actions execution - - Args: - node_id: Identifier for the new node - node_config: Node configuration including: - - messages: List of messages for LLM context - - functions: List of function configurations - - pre_actions: Optional actions to execute before transition - - post_actions: Optional actions to execute after transition - - Example: - await flow_manager.set_node( - "collect_info", - { - "messages": [{ - "role": "system", - "content": "Collect user information" - }], - "functions": [ - { - "name": "save_info", - "handler": save_info_handler, - "description": "Save user information", - "parameters": { - "type": "object", - "properties": { - "name": {"type": "string"}, - "age": {"type": "integer"} - } - } - } - ] - } - ) - """ - await self._validate_initialization() - - try: - self._validate_node_config(node_id, node_config) - - # Execute pre-actions if any - if pre_actions := node_config.get("pre_actions"): - await self._execute_actions(pre_actions=pre_actions) - - # Register functions and create tools list - tools = [] - new_functions: Set[str] = set() - - for func_config in node_config["functions"]: - name = func_config["name"] - if name not in self.current_functions: - # Register new function - await self.llm.register_function( - name, create_handler_wrapper(func_config["handler"]) - ) - new_functions.add(name) - - # Create function definition in provider-specific format - function_def = { - "name": name, - "description": func_config["description"], - "parameters": func_config["parameters"], - } - - # Let the adapter format it correctly - tools.extend(self.adapter.format_functions([function_def])) - - # Update LLM context with new messages and tools - await self._update_llm_context(node_config["messages"], tools) - - # Execute post-actions if any - if post_actions := node_config.get("post_actions"): - await self._execute_actions(post_actions=post_actions) - - # Update state - self.current_node = node_id - self.current_functions = new_functions - - logger.debug(f"Successfully set node: {node_id}") - - except Exception as e: - raise FlowError(f"Failed to set node {node_id}: {str(e)}") from e - - async def handle_function_call(self, function_name: str, args: Dict[str, Any]) -> None: - """Handle function calls and transitions. - - This method: - 1. Executes the function - 2. Calls the transition callback if provided - 3. Updates state based on results - - Args: - function_name: Name of the called function - args: Arguments passed to the function - """ - try: - # Execute transition callback if provided - if self.transition_callback: - await self.transition_callback(function_name, args, self) - - logger.debug(f"Handled function call: {function_name}") - - except Exception as e: - raise FlowError(f"Error handling function {function_name}: {str(e)}") from e - - def _validate_node_config(self, node_id: str, config: Dict[str, Any]) -> None: - """Validate node configuration structure. - - Args: - node_id: Identifier for the node being validated - config: Node configuration to validate - - Raises: - ValueError: If configuration is invalid - """ - if "messages" not in config: - raise ValueError(f"Node '{node_id}' missing required 'messages' field") - if "functions" not in config: - raise ValueError(f"Node '{node_id}' missing required 'functions' field") - - for func in config["functions"]: - required = {"name", "handler", "description", "parameters"} - missing = required - set(func.keys()) - if missing: - raise ValueError(f"Function in node '{node_id}' missing required fields: {missing}") diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py new file mode 100644 index 0000000..39d18bd --- /dev/null +++ b/src/pipecat_flows/manager.py @@ -0,0 +1,419 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import copy +import inspect +from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, Union + +from loguru import logger +from pipecat.frames.frames import ( + LLMMessagesAppendFrame, + LLMMessagesUpdateFrame, + LLMSetToolsFrame, +) +from pipecat.pipeline.task import PipelineTask +from pipecat.services.anthropic import AnthropicLLMService +from pipecat.services.google import GoogleLLMService +from pipecat.services.openai import OpenAILLMService + +from .actions import ActionManager +from .adapters import create_adapter +from .exceptions import FlowError, FlowInitializationError, FlowTransitionError +from .types import FlowArgs, FlowResult + + +class FlowManager: + """Manages conversation flows, supporting both static and dynamic configurations. + + This class provides a unified approach to managing conversation flows, whether + they are predefined (static) or created at runtime (dynamic). + + Static flows use a configuration that defines all nodes upfront: + flow_config = { + "initial_node": "greeting", + "nodes": { + "greeting": { + "messages": [...], + "functions": [{ + "type": "function", + "function": { + "name": "collect_name", + "handler": collect_name_handler, + "description": "Record user's name", + "parameters": {...} + } + }] + } + } + } + flow_manager = FlowManager(task, llm, flow_config=flow_config) + + Dynamic flows create nodes at runtime: + async def handle_transitions(function_name: str, args: Dict, flow_manager): + if function_name == "collect_age": + flow_manager.state["age"] = args["age"] + await flow_manager.set_node("next_step", create_next_node()) + + flow_manager = FlowManager(task, llm, transition_callback=handle_transitions) + + Each node in the flow consists of: + - messages: List of messages for LLM context + - functions: List of available functions with their handlers + - pre_actions: Optional actions to execute before transition + - post_actions: Optional actions to execute after transition + + Functions in nodes can be: + 1. Node functions (require handlers): + { + "type": "function", + "function": { + "name": "collect_age", + "handler": collect_age_handler, # Required + "description": "Record age", + "parameters": {...} + } + } + + 2. Edge functions (handlers optional): + { + "type": "function", + "function": { + "name": "next_node", # Matches a node name + "description": "Go to next node", + "parameters": {...} + } + } + """ + + def __init__( + self, + task: PipelineTask, + llm: Union[OpenAILLMService, AnthropicLLMService, GoogleLLMService], + tts: Optional[Any] = None, + flow_config: Optional[Dict[str, Any]] = None, + transition_callback: Optional[ + Callable[[str, Dict[str, Any], "FlowManager"], Awaitable[None]] + ] = None, + ): + """Initialize the flow manager. + + Args: + task: PipelineTask instance for queueing frames + llm: LLM service instance (e.g., OpenAI, Anthropic) + tts: Optional TTS service for voice actions + flow_config: Optional static flow configuration. If provided, + operates in static mode with predefined nodes + transition_callback: Optional callback for handling transitions. + Required for dynamic flows, ignored for static flows + in favor of static transitions + """ + self.task = task + self.llm = llm + self.tts = tts + self.action_manager = ActionManager(task, tts) + self.adapter = create_adapter(llm) + self.initialized = False + + # Set up static or dynamic mode + if flow_config: + self.nodes = flow_config["nodes"] + self.initial_node = flow_config["initial_node"] + self.transition_callback = self._handle_static_transition + logger.debug("Initialized in static mode") + else: + self.nodes = {} + self.initial_node = None + self.transition_callback = transition_callback + logger.debug("Initialized in dynamic mode") + + self.state: Dict[str, Any] = {} # Shared state across nodes + self.current_functions: Set[str] = set() # Track registered functions + self.current_node: Optional[str] = None + + async def initialize(self, initial_messages: List[dict]) -> None: + """Initialize the flow with starting messages. + + For static flows, also sets the initial node from config. + + Args: + initial_messages: Initial system messages for the LLM + + Raises: + FlowInitializationError: If initialization fails + """ + if self.initialized: + logger.warning(f"{self.__class__.__name__} already initialized") + return + + try: + # Set initial context with no tools + await self.task.queue_frame(LLMMessagesUpdateFrame(messages=initial_messages)) + await self.task.queue_frame(LLMSetToolsFrame(tools=[])) + + self.initialized = True + logger.debug(f"Initialized {self.__class__.__name__}") + + # If in static mode, set initial node + if self.initial_node: + logger.debug(f"Setting initial node: {self.initial_node}") + await self.set_node(self.initial_node, self.nodes[self.initial_node]) + + except Exception as e: + self.initialized = False + raise FlowInitializationError(f"Failed to initialize flow: {str(e)}") from e + + def register_action(self, action_type: str, handler: Callable) -> None: + """Register a handler for a specific action type. + + Args: + action_type: String identifier for the action (e.g., "tts_say") + handler: Async or sync function that handles the action + + Example: + async def custom_notification(action: dict): + text = action.get("text", "") + await notify_user(text) + + flow_manager.register_action("notify", custom_notification) + """ + self.action_manager._register_action(action_type, handler) + + async def _call_handler(self, handler: Callable, args: FlowArgs) -> FlowResult: + """Call handler with or without args based on its signature. + + Args: + handler: The function to call + args: Arguments dictionary + + Returns: + Dict[str, Any]: Handler result + """ + sig = inspect.signature(handler) + if "args" in sig.parameters: + return await handler(args) + return await handler() + + async def _create_transition_func(self, name: str, handler: Optional[Callable]) -> Callable: + """Create a transition function for the given name and handler. + + Args: + name: Function name + handler: Optional handler for node functions + + Returns: + Callable: Transition function that handles both node and edge functions + """ + + async def transition_func( + function_name: str, + tool_call_id: str, + args: Dict[str, Any], + llm: Any, + context: Any, + result_callback: Callable, + ) -> None: + if handler: + # Node function with handler + result = await self._call_handler(handler, args) + await result_callback(result) + logger.debug(f"Handler completed for {name}") + else: + # Edge function without handler + await result_callback({"status": "acknowledged"}) + logger.debug(f"Edge function called: {name}") + + # Execute transition callback if provided + if self.transition_callback: + logger.debug(f"Executing transition for {name}") + await self.transition_callback(function_name, args, self) + + return transition_func + + async def _register_function( + self, name: str, handler: Optional[Callable], new_functions: Set[str] + ) -> None: + """Register a function with the LLM if not already registered. + + Args: + name: Function name + handler: Optional function handler + new_functions: Set to track newly registered functions + """ + if name not in self.current_functions: + self.llm.register_function(name, await self._create_transition_func(name, handler)) + new_functions.add(name) + logger.debug(f"Registered function: {name}") + + def _remove_handlers(self, tool_config: Dict[str, Any]) -> None: + """Remove handlers from tool configuration. + + Args: + tool_config: Function configuration to clean + """ + if "function" in tool_config and "handler" in tool_config["function"]: + del tool_config["function"]["handler"] + elif "handler" in tool_config: + del tool_config["handler"] + elif "function_declarations" in tool_config: + for decl in tool_config["function_declarations"]: + if "handler" in decl: + del decl["handler"] + + async def set_node(self, node_id: str, node_config: Dict[str, Any]) -> None: + """Set up a new conversation node. + + This method: + 1. Validates the node configuration + 2. Executes any pre-actions + 3. Registers functions with the LLM + 4. Updates the LLM context + 5. Executes any post-actions + 6. Updates the flow state + + Args: + node_id: Identifier for the new node + node_config: Node configuration including: + - messages: List of messages for LLM context + - functions: List of available functions + - pre_actions: Optional actions to execute before transition + - post_actions: Optional actions to execute after transition + + Raises: + FlowError: If node setup fails + """ + if not self.initialized: + raise FlowTransitionError(f"{self.__class__.__name__} must be initialized first") + + try: + self._validate_node_config(node_id, node_config) + logger.debug(f"Setting node: {node_id}") + + if pre_actions := node_config.get("pre_actions"): + await self._execute_actions(pre_actions=pre_actions) + + tools = [] + new_functions: Set[str] = set() + + for func_config in node_config["functions"]: + # Handle Gemini's nested function declarations + if "function_declarations" in func_config: + for declaration in func_config["function_declarations"]: + name = declaration["name"] + handler = declaration.get("handler") + logger.debug(f"Processing function: {name}") + await self._register_function(name, handler, new_functions) + else: + name = self.adapter.get_function_name(func_config) + logger.debug(f"Processing function: {name}") + + handler = None + if "function" in func_config: + handler = func_config["function"].get("handler") + elif "handler" in func_config: + handler = func_config.get("handler") + + await self._register_function(name, handler, new_functions) + + # Create tool config + tool_config = copy.deepcopy(func_config) + self._remove_handlers(tool_config) + tools.append(tool_config) + + # Let adapter format tools for provider + formatted_tools = self.adapter.format_functions(tools) + + # Update LLM context + await self._update_llm_context(node_config["messages"], formatted_tools) + logger.debug("Updated LLM context") + + # Execute post-actions if any + if post_actions := node_config.get("post_actions"): + await self._execute_actions(post_actions=post_actions) + + # Update state + self.current_node = node_id + self.current_functions = new_functions + + logger.debug(f"Successfully set node: {node_id}") + + except Exception as e: + logger.error(f"Error setting node {node_id}: {str(e)}") + raise FlowError(f"Failed to set node {node_id}: {str(e)}") from e + + async def _handle_static_transition( + self, + function_name: str, + args: Dict[str, Any], + flow_manager: "FlowManager", + ) -> None: + """Handle transitions for static flows. + + In static flows, transitions occur when a function name matches + a node name in the configuration. + + Args: + function_name: Name of the called function + args: Arguments passed to the function + flow_manager: Reference to this instance + """ + if function_name in self.nodes: + logger.debug(f"Static transition to node: {function_name}") + await self.set_node(function_name, self.nodes[function_name]) + + async def _update_llm_context(self, messages: List[dict], functions: List[dict]) -> None: + """Update LLM context with new messages and functions. + + Args: + messages: New messages to add to context + functions: New functions to make available + """ + await self.task.queue_frames( + [LLMMessagesAppendFrame(messages=messages), LLMSetToolsFrame(tools=functions)] + ) + + async def _execute_actions( + self, pre_actions: Optional[List[dict]] = None, post_actions: Optional[List[dict]] = None + ) -> None: + """Execute pre and post actions. + + Args: + pre_actions: Actions to execute before context update + post_actions: Actions to execute after context update + """ + if pre_actions: + await self.action_manager.execute_actions(pre_actions) + if post_actions: + await self.action_manager.execute_actions(post_actions) + + def _validate_node_config(self, node_id: str, config: Dict[str, Any]) -> None: + """Validate node configuration structure.""" + if "messages" not in config: + raise ValueError(f"Node '{node_id}' missing required 'messages' field") + if "functions" not in config: + raise ValueError(f"Node '{node_id}' missing required 'functions' field") + + # Validate each function configuration + for func in config["functions"]: + # Get function name based on provider format + try: + name = self.adapter.get_function_name(func) + except KeyError: + raise ValueError(f"Function in node '{node_id}' missing name field") + + # Node functions (not matching node names) require handlers + if name not in self.nodes: + # Check for handler in all formats + has_handler = ( + ("function" in func and "handler" in func["function"]) # OpenAI format + or "handler" in func # Anthropic format + or ( # Gemini format + "function_declarations" in func + and func["function_declarations"] + and "handler" in func["function_declarations"][0] + ) + ) + if not has_handler: + raise ValueError(f"Node function '{name}' in node '{node_id}' missing handler") diff --git a/src/pipecat_flows/state.py b/src/pipecat_flows/state.py index d889b20..d51dcbf 100644 --- a/src/pipecat_flows/state.py +++ b/src/pipecat_flows/state.py @@ -9,7 +9,7 @@ from loguru import logger from .adapters import GeminiAdapter, create_adapter -from .config import FlowConfig, NodeConfig +from .types import FlowConfig, NodeConfig class FlowState: diff --git a/src/pipecat_flows/static.py b/src/pipecat_flows/static.py deleted file mode 100644 index 0cdec12..0000000 --- a/src/pipecat_flows/static.py +++ /dev/null @@ -1,203 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# -""" -Static Flow Manager for Pipecat Flows - -This module provides the StaticFlowManager for handling predefined conversation flows. -It's designed for cases where the entire conversation structure is known in advance -and defined through a configuration file. - -Example: - flow_config = { - "initial_node": "greeting", - "nodes": { - "greeting": { - "messages": [{ - "role": "system", - "content": "Greet the user" - }], - "functions": [{ - "type": "function", - "function": { - "name": "collect_name", - "description": "Record user's name" - } - }] - } - } - } - - flow_manager = StaticFlowManager(flow_config, task, llm) - await flow_manager.initialize(initial_messages) -""" - -from typing import Dict, List, Set - -from loguru import logger -from pipecat.frames.frames import LLMMessagesAppendFrame, LLMMessagesUpdateFrame, LLMSetToolsFrame - -from .base import BaseFlowManager -from .config import FlowConfig -from .exceptions import ( - FlowInitializationError, - FlowTransitionError, - InvalidFunctionError, -) -from .state import FlowState - - -class StaticFlowManager(BaseFlowManager): - """Manages predefined conversation flows in Pipecat applications. - - The StaticFlowManager handles conversations where the flow is defined upfront - through a configuration. It manages: - - State transitions between predefined nodes - - Function availability based on current state - - Automatic registration of edge functions - - Pre/post action execution during transitions - - The flow is defined by a configuration that specifies: - - Initial node - - Available nodes and their configurations - - Transitions between nodes via function calls - - Function Types: - - Node Functions: Operations within a state (registered directly with LLM) - - Edge Functions: Trigger transitions (registered automatically by manager) - - Example: - flow_manager = StaticFlowManager(flow_config, task, llm) - - # Register node functions - llm.register_function("collect_name", collect_name_handler) - - # Initialize flow - await flow_manager.initialize(initial_messages) - """ - - def __init__(self, flow_config: FlowConfig, task, llm, tts=None): - """Initialize the static flow manager. - - Args: - flow_config: Complete flow configuration defining all nodes - task: PipelineTask instance for queueing frames - llm: LLM service instance - tts: Optional TTS service for voice actions - """ - super().__init__(task, llm, tts) - self.flow = FlowState(flow_config, llm) - - async def initialize(self, initial_messages: List[dict]) -> None: - """Initialize the flow with starting messages and functions. - - This method: - 1. Registers edge functions with the LLM - 2. Sets up initial context with system messages - 3. Configures available functions for initial node - - Args: - initial_messages: Initial system messages for the LLM - - Raises: - FlowInitializationError: If initialization fails - """ - await super().initialize(initial_messages) - - try: - await self._register_edge_functions() - - # Queue node-specific frames individually - messages = initial_messages + self.flow.get_current_messages() - functions = self.flow.get_current_functions() - - await self.task.queue_frame(LLMMessagesUpdateFrame(messages=messages)) - await self.task.queue_frame(LLMSetToolsFrame(tools=functions)) - - self.current_node = self.flow.get_current_node() - logger.debug(f"Initialized static flow at node: {self.current_node}") - except Exception as e: - self.initialized = False - raise FlowInitializationError(f"Failed to initialize static flow: {str(e)}") from e - - async def _register_edge_functions(self) -> None: - """Register edge functions from the flow configuration. - - Edge functions are those whose names match node names in the configuration. - These functions trigger transitions between nodes. - """ - registered_handlers: Set[str] = set() - - async def handle_edge_function( - function_name: str, tool_call_id: str, arguments: Dict, llm, context, result_callback - ) -> None: - await self.handle_transition(function_name) - await result_callback("Acknowledged") - - # Get all available functions across nodes - all_functions = self.flow.get_all_available_function_names() - - for function_name in all_functions: - if function_name in registered_handlers: - continue - - # Edge functions are those that match node names - is_edge_function = function_name in self.flow.nodes - - if is_edge_function: - self.llm.register_function(function_name, handle_edge_function) - logger.debug(f"Registered edge function: {function_name}") - registered_handlers.add(function_name) - - async def handle_transition(self, function_name: str) -> None: - """Handle transitions between nodes. - - This method: - 1. Validates the transition is allowed - 2. Executes pre-actions of new node - 3. Updates LLM context - 4. Executes post-actions of new node - - Args: - function_name: Name of the edge function triggering transition - - Raises: - FlowTransitionError: If transition fails - InvalidFunctionError: If function not available in current node - """ - await self._validate_initialization() - - available_functions = self.flow.get_available_function_names() - if function_name not in available_functions: - raise InvalidFunctionError( - f"Function '{function_name}' not available in node '{self.flow.current_node}'" - ) - - try: - new_node = self.flow.transition(function_name) - if new_node is not None: - # Execute pre-actions - if pre_actions := self.flow.get_current_pre_actions(): - await self._execute_actions(pre_actions=pre_actions) - - # Queue context updates individually - await self.task.queue_frame( - LLMMessagesAppendFrame(messages=self.flow.get_current_messages()) - ) - await self.task.queue_frame( - LLMSetToolsFrame(tools=self.flow.get_current_functions()) - ) - - # Execute post-actions - if post_actions := self.flow.get_current_post_actions(): - await self._execute_actions(post_actions=post_actions) - - self.current_node = new_node - logger.debug(f"Completed transition to node {new_node}") - else: - logger.debug(f"Node function {function_name} executed without transition") - - except Exception as e: - raise FlowTransitionError(f"Failed to execute transition: {str(e)}") from e diff --git a/src/pipecat_flows/config.py b/src/pipecat_flows/types.py similarity index 94% rename from src/pipecat_flows/config.py rename to src/pipecat_flows/types.py index d4532cd..3491887 100644 --- a/src/pipecat_flows/config.py +++ b/src/pipecat_flows/types.py @@ -7,6 +7,16 @@ from typing import Any, Dict, List, Optional, TypedDict +class FlowResult(TypedDict, total=False): + """Base type for function results.""" + + status: str + error: str + + +FlowArgs = Dict[str, Any] + + class NodeConfig(TypedDict): """Configuration for a single node in the flow.