Skip to content

Commit

Permalink
Refactor into a single FlowManager
Browse files Browse the repository at this point in the history
  • Loading branch information
markbackman committed Dec 2, 2024
1 parent 17375e6 commit 781f454
Show file tree
Hide file tree
Showing 22 changed files with 944 additions and 2,022 deletions.
292 changes: 161 additions & 131 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,23 @@

[![PyPI](https://img.shields.io/pypi/v/pipecat-ai-flows)](https://pypi.org/project/pipecat-ai-flows) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat)

Pipecat Flows provides a framework for building structured conversations in your AI applications. It is comprised of:
# Pipecat Flows

- A [python module](#pipecat-flows-package) for building conversation flows with Pipecat
- A [visual editor](#pipecat-flows-editor) for visualizing conversations and exporting into flow_configs
## Overview

The framework offers two approaches to managing conversation flows:
Pipecat Flows provides a framework for building structured conversations in your AI applications. It enables you to create both predefined conversation paths and dynamically generated flows while handling the complexities of state management and LLM interactions.

1. **Static Flows**: Configuration-driven conversations with predefined paths. Ideal for flows where the entire conversation structure can be defined upfront, from simple scripts to complex decision trees.
The framework consists of:

2. **Dynamic Flows**: Runtime-determined conversations where paths are created or modified during execution. Perfect for scenarios where flow structure depends on external data, business logic, or needs to adapt during the conversation.
- A Python module for building conversation flows with Pipecat
- A visual editor for designing and exporting flow configurations

To learn more about building with Pipecat Flows, [check out the guide](https://docs.pipecat.ai/guides/pipecat-flows).
### When to Use Pipecat Flows

## Pipecat Flows Package
- **Static Flows**: When your conversation structure is known upfront and follows predefined paths. Perfect for customer service scripts, intake forms, or guided experiences.
- **Dynamic Flows**: When conversation paths need to be determined at runtime based on user input, external data, or business logic. Ideal for personalized experiences or complex decision trees.

A Python package for managing conversation flows in Pipecat applications.

### Installation
## Installation

If you're already using Pipecat:

Expand All @@ -35,167 +34,197 @@ If you're starting fresh:
# Basic installation
pip install pipecat-ai-flows

# Install Pipecat with required options
# For example, to use Daily, OpenAI, and Deepgram:
pip install "pipecat-ai[daily, openai,deepgram]"
# Install Pipecat with specific LLM provider options:
pip install "pipecat-ai[daily,openai,deepgram]" # For OpenAI
pip install "pipecat-ai[daily,anthropic,deepgram]" # For Anthropic
pip install "pipecat-ai[daily,google,deepgram]" # For Google
```

Learn more about the available options with [Pipecat](https://github.com/pipecat-ai/pipecat).

## Static Flows

Static flows use a JSON configuration to define the complete conversation structure upfront.

### Configuration
## Quick Start

Each node in your flow consists of:

- Messages that set context for the LLM
- Available functions for that state
- Optional pre/post actions

### Basic Usage
Here's a basic example of setting up a conversation flow:

```python
from pipecat_flows import StaticFlowManager # When developing with the repository
# or
from pipecat.flows import StaticFlowManager # When installed via pip

# Initialize context and tools
initial_tools = flow_config["nodes"]["start"]["functions"] # Available functions for starting state
context = OpenAILLMContext(messages, initial_tools) # Create LLM context with initial state
context_aggregator = llm.create_context_aggregator(context)

# Create your pipeline: No new processors are required
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
from pipecat_flows import FlowManager

# Create the Pipecat task
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
# Initialize flow manager with static configuration
flow_manager = FlowManager(task, llm, tts, flow_config=flow_config)

# Initialize flow management
flow_manager = StaticFlowManager(flow_config, task, llm, tts) # Create flow manager
# Or with dynamic flow handling
flow_manager = FlowManager(
task,
llm,
tts,
transition_callback=handle_transitions
)

# Initialize with starting messages
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Initialize the flow processor
await flow_manager.initialize(messages)
# Kick off the conversation using the context aggregator
await task.queue_frames([context_aggregator.user().get_context_frame()])
```

## Dynamic Flows
For more detailed examples and guides, visit our [documentation](https://docs.pipecat.ai/guides/pipecat-flows).

## Core Concepts

### Flow Configuration

Each conversation flow consists of nodes that define the conversation structure. A node includes:

Dynamic flows allow for runtime creation and modification of conversation paths based on data or business logic.
#### Messages

### Configuration
Messages set the context for the LLM at each state:

Each node consists of the same components as static flows:
```python
"messages": [
{
"role": "system",
"content": "You are handling pizza orders. Ask for size selection."
}
]
```

- Messages that set context for the LLM
- Available functions for that state
- Optional pre/post actions
#### Functions

The key difference is that nodes are created programmatically rather than defined in a JSON configuration.
Functions come in two types:

### Basic Usage
1. **Node Functions**: Execute operations within the current state

```python
from pipecat_flows import DynamicFlowManager # When developing with the repository
# or
from pipecat.flows import DynamicFlowManager # When installed via pip
{
"type": "function",
"function": {
"name": "select_size",
"handler": select_size_handler, # Required for node functions
"description": "Select pizza size",
"parameters": {
"type": "object",
"properties": {
"size": {"type": "string", "enum": ["small", "medium", "large"]}
}
}
}
}
```

# Define your transition callback
async def handle_transitions(function_name: str, args: Dict[str, Any], flow_manager):
if function_name == "collect_age":
# Create next node based on age
if args["age"] < 25:
await flow_manager.set_node("young_adult", create_young_adult_node())
else:
await flow_manager.set_node("standard", create_standard_node())

# Initialize context and tools
context = OpenAILLMContext(messages, []) # Start with empty tools
context_aggregator = llm.create_context_aggregator(context)

# Create your pipeline: No new processors are required
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
2. **Edge Functions**: Create transitions between states

# Create the Pipecat task
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
```python
{
"type": "function",
"function": {
"name": "next_node", # Must match a node name
"description": "Move to next state",
"parameters": {"type": "object", "properties": {}}
}
}
```

# Initialize flow management
flow_manager = DynamicFlowManager(
task,
llm,
tts,
transition_callback=handle_transitions
) # Create flow manager
#### Actions

# Initialize with starting messages
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Register function handlers
await flow_manager.register_functions({
"collect_age": collect_age_handler,
"process_data": process_data_handler
})
# Initialize the flow processor
await flow_manager.initialize(messages)
# Set initial node
await flow_manager.set_node("initial", create_initial_node())
# Kick off the conversation using the context aggregator
await task.queue_frames([context_aggregator.user().get_context_frame()])
Actions execute during state transitions:

```python
"pre_actions": [
{
"type": "tts_say",
"text": "Processing your order..."
}
]
```

The key differences from static flows are:
#### Provider-Specific Formats

Pipecat Flows automatically handles format differences between LLM providers:

**OpenAI Format**

```python
"functions": [{
"type": "function",
"function": {
"name": "function_name",
"description": "description",
"parameters": {...}
}
}]
```

1. The transition callback that determines flow progression
2. Function registration through `register_functions()`
3. Node creation and setting through `set_node()`
4. No upfront flow configuration required
**Anthropic Format**

```python
"functions": [{
"name": "function_name",
"description": "description",
"input_schema": {...}
}]
```

**Google (Gemini) Format**

```python
"functions": [{
"function_declarations": [{
"name": "function_name",
"description": "description",
"parameters": {...}
}]
}]
```

### Running Examples
### Flow Management

The FlowManager handles both static and dynamic flows through a unified interface:

#### Static Flows

```python
# Define flow configuration upfront
flow_config = {
"initial_node": "greeting",
"nodes": {
"greeting": {
"messages": [...],
"functions": [...]
}
}
}

# Initialize with static configuration
flow_manager = FlowManager(task, llm, tts, flow_config=flow_config)
```

#### Dynamic Flows

```python
# Define transition handling
async def handle_transitions(function_name: str, args: Dict, flow_manager):
if function_name == "collect_age":
await flow_manager.set_node("next_step", create_next_node())

# Initialize with transition callback
flow_manager = FlowManager(task, llm, tts, transition_callback=handle_transitions)
```

## Examples

The repository includes several complete example implementations in the `examples/` directory.

#### Static
### Static

In the `examples/static` directory, you'll find these examples:

- `food_ordering.py` - A restaurant order flow demonstrating node and edge functions
- `movie_booking.py` - A movie ticket booking system with date-based branching
- `movie_explorer_openai.py` - Movie information bot demonstrating real API integration with TMDB
- `movie_explorer_anthropic.py` - The same movie information demo adapted for Anthropic's format
- `movie_explorer_gemini.py` - The same movie explorer demo adapted for Google Gemini's format
- `patient_intake.py` - A medical intake system showing complex state management
- `restaurant_reservation.py` - A reservation system with availability checking
- `travel_planner_openai.py` - A vacation planning assistant with parallel paths
- `travel_planner_gemini.py` - The same vacation planning assistant adapted for Google Gemini's format
- `travel_planner.py` - A vacation planning assistant with parallel paths

#### Dynamic
### Dynamic

In the `examples/dynamic` directory, you'll find these examples:

Expand Down Expand Up @@ -250,6 +279,7 @@ To run these examples:
- DEEPGRAM_API_KEY
- OPENAI_API_KEY
- ANTHROPIC_API_KEY
- GOOGLE_API_KEY
- DAILY_API_KEY

Looking for a Daily API key and room URL? Sign up on the [Daily Dashboard](https://dashboard.daily.co).
Expand All @@ -259,11 +289,11 @@ To run these examples:
python examples/static/food_ordering.py -u YOUR_DAILY_ROOM_URL
```

### Running Tests
## Tests

The package includes a comprehensive test suite covering the core functionality.

#### Setup Test Environment
### Setup Test Environment

1. **Create Virtual Environment**:

Expand All @@ -279,7 +309,7 @@ The package includes a comprehensive test suite covering the core functionality.
pip install -e .
```

#### Running Tests
### Running Tests

Run all tests:

Expand Down
Loading

0 comments on commit 781f454

Please sign in to comment.