Skip to content

Commit

Permalink
Merge pull request #65 from pipecat-ai/mb/simplify-static-flows
Browse files Browse the repository at this point in the history
Simplify the static flows API
  • Loading branch information
markbackman authored Dec 18, 2024
2 parents fee8206 + e1e0fc6 commit 06b0109
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 237 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ All notable changes to **Pipecat Flows** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- New `initial_system_message` field in `FlowConfig`, which allows setting a
global system message for static flows.

### Changed

- Simplified FlowManager initialization by removing the need for manual context
setup in static flows.
- Updated static examples to use the updated API.

## [0.0.9] - 2024-12-08

### Changed
Expand Down
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,20 @@ The FlowManager handles both static and dynamic flows through a unified interfac
# Define flow configuration upfront
flow_config = {
"initial_node": "greeting",
"initial_system_message": [
{
"role": "system",
"content": "You are a helpful assistant. Your responses will be converted to audio."
}
],
"nodes": {
"greeting": {
"messages": [...],
"messages": [
{
"role": "system",
"content": "Start by greeting the user and asking for their name."
}
],
"functions": [{
"type": "function",
"function": {
Expand All @@ -206,8 +217,9 @@ flow_config = {
}
}

# Initialize with static configuration
# Create and initialize the FlowManager
flow_manager = FlowManager(task, llm, tts, flow_config=flow_config)
await flow_manager.initialize()
```

#### Dynamic Flows
Expand Down
2 changes: 1 addition & 1 deletion examples/dynamic/insurance_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def create_initial_node() -> NodeConfig:
{
"type": "text",
"text": (
"You are an insurance agent. Ask the customer for their age. "
"Ask the customer for their age. "
"Wait for their response before calling collect_age. "
"Only call collect_age after the customer provides their age."
),
Expand Down
5 changes: 1 addition & 4 deletions examples/dynamic/insurance_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def create_initial_node() -> NodeConfig:
"messages": [
{
"role": "system",
"content": "You are an insurance agent. Start by asking for the customer's age.",
"content": "Start by asking for the customer's age.",
}
],
"functions": [
Expand All @@ -174,9 +174,6 @@ def create_initial_node() -> NodeConfig:
},
}
],
"pre_actions": [
{"type": "tts_say", "text": "Welcome! Let's find the right insurance coverage for you."}
],
}


Expand Down
5 changes: 1 addition & 4 deletions examples/dynamic/insurance_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def create_initial_node() -> NodeConfig:
"messages": [
{
"role": "system",
"content": "You are an insurance agent. Start by asking for the customer's age.",
"content": "Start by asking for the customer's age.",
}
],
"functions": [
Expand All @@ -174,9 +174,6 @@ def create_initial_node() -> NodeConfig:
},
}
],
"pre_actions": [
{"type": "tts_say", "text": "Welcome! Let's find the right insurance coverage for you."}
],
}


Expand Down
17 changes: 8 additions & 9 deletions examples/static/food_ordering.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult:

flow_config: FlowConfig = {
"initial_node": "start",
"initial_system_message": [
{
"role": "system",
"content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.",
}
],
"nodes": {
"start": {
"messages": [
Expand Down Expand Up @@ -304,15 +310,8 @@ async def main():
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

# Create initial context
messages = [
{
"role": "system",
"content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.",
}
]

context = OpenAILLMContext(messages, flow_config["nodes"]["start"]["functions"])
context = OpenAILLMContext()
context_aggregator = llm.create_context_aggregator(context)

# Create pipeline
Expand All @@ -337,7 +336,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
logger.debug("Initializing flow")
await flow_manager.initialize(messages)
await flow_manager.initialize()
logger.debug("Starting conversation")
await task.queue_frames([context_aggregator.user().get_context_frame()])

Expand Down
28 changes: 9 additions & 19 deletions examples/static/movie_explorer_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error
# Flow configuration
flow_config: FlowConfig = {
"initial_node": "greeting",
"initial_system_message": [
{
"role": "system",
"content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.",
}
],
"nodes": {
"greeting": {
"messages": [
Expand All @@ -308,7 +314,7 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error
"content": [
{
"type": "text",
"text": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.",
"text": "Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.",
}
],
}
Expand Down Expand Up @@ -447,23 +453,7 @@ async def main():
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-latest"
)

# Get initial tools from the first node
initial_tools = flow_config["nodes"]["greeting"]["functions"]

# Create initial context
messages = [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.",
}
],
}
]

context = OpenAILLMContext(messages, initial_tools)
context = OpenAILLMContext()
context_aggregator = llm.create_context_aggregator(context)

pipeline = Pipeline(
Expand All @@ -486,7 +476,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await flow_manager.initialize(messages)
await flow_manager.initialize()
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
Expand Down
31 changes: 9 additions & 22 deletions examples/static/movie_explorer_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,18 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error
# Flow configuration
flow_config: FlowConfig = {
"initial_node": "greeting",
"initial_system_message": [
{
"role": "system",
"content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.",
}
],
"nodes": {
"greeting": {
"messages": [
{
"role": "system",
"content": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.",
"content": "Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.",
}
],
"functions": [
Expand Down Expand Up @@ -429,26 +435,7 @@ async def main():
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-exp")

# Get initial tools
initial_tools = [
{
"function_declarations": [
# Extract each function from the first node's functions array
func["function_declarations"][0]
for func in flow_config["nodes"]["greeting"]["functions"]
]
}
]

# Create initial context
messages = [
{
"role": "system",
"content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.",
}
]

context = OpenAILLMContext(messages, initial_tools)
context = OpenAILLMContext()
context_aggregator = llm.create_context_aggregator(context)

pipeline = Pipeline(
Expand All @@ -471,7 +458,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await flow_manager.initialize(messages)
await flow_manager.initialize()
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
Expand Down
23 changes: 9 additions & 14 deletions examples/static/movie_explorer_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,18 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error
# Flow configuration
flow_config: FlowConfig = {
"initial_node": "greeting",
"initial_system_message": [
{
"role": "system",
"content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.",
}
],
"nodes": {
"greeting": {
"messages": [
{
"role": "system",
"content": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.",
"content": "Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.",
}
],
"functions": [
Expand Down Expand Up @@ -446,18 +452,7 @@ async def main():
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

# Get initial tools from the first node
initial_tools = flow_config["nodes"]["greeting"]["functions"]

# Create initial context
messages = [
{
"role": "system",
"content": "You are a friendly movie expert. Your responses will be converted to audio, so avoid special characters. Always use the available functions to progress the conversation naturally.",
}
]

context = OpenAILLMContext(messages, initial_tools)
context = OpenAILLMContext()
context_aggregator = llm.create_context_aggregator(context)

pipeline = Pipeline(
Expand All @@ -480,7 +475,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await flow_manager.initialize(messages)
await flow_manager.initialize()
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
Expand Down
21 changes: 8 additions & 13 deletions examples/static/patient_intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult:

flow_config: FlowConfig = {
"initial_node": "start",
"initial_system_message": [
{
"role": "system",
"content": "You are Jessica, an agent for Tri-County Health Services. You must ALWAYS use one of the available functions to progress the conversation. Be professional but friendly.",
}
],
"nodes": {
"start": {
"messages": [
Expand Down Expand Up @@ -441,18 +447,7 @@ async def main():
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

# Get initial tools from the first node
initial_tools = flow_config["nodes"]["start"]["functions"]

# Create initial context
messages = [
{
"role": "system",
"content": "You are Jessica, an agent for Tri-County Health Services. You must ALWAYS use one of the available functions to progress the conversation. Be professional but friendly.",
}
]

context = OpenAILLMContext(messages, initial_tools)
context = OpenAILLMContext()
context_aggregator = llm.create_context_aggregator(context)

pipeline = Pipeline(
Expand All @@ -476,7 +471,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Initialize the flow processor
await flow_manager.initialize(messages)
await flow_manager.initialize()
# Kick off the conversation using the context aggregator
await task.queue_frames([context_aggregator.user().get_context_frame()])

Expand Down
21 changes: 8 additions & 13 deletions examples/static/restaurant_reservation.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ async def record_time(args: FlowArgs) -> FlowResult:

flow_config: FlowConfig = {
"initial_node": "start",
"initial_system_message": [
{
"role": "system",
"content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversations and your responses will be converted to audio. Avoid outputting special characters and emojis. Be causal and friendly.",
}
],
"nodes": {
"start": {
"messages": [
Expand Down Expand Up @@ -201,18 +207,7 @@ async def main():
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

# Get initial tools from the first node
initial_tools = flow_config["nodes"]["start"]["functions"]

# Create initial context
messages = [
{
"role": "system",
"content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversations and your responses will be converted to audio. Avoid outputting special characters and emojis. Be causal and friendly.",
}
]

context = OpenAILLMContext(messages, initial_tools)
context = OpenAILLMContext()
context_aggregator = llm.create_context_aggregator(context)

pipeline = Pipeline(
Expand All @@ -236,7 +231,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Initialize the flow processor
await flow_manager.initialize(messages)
await flow_manager.initialize()
# Kick off the conversation using the context aggregator
await task.queue_frames([context_aggregator.user().get_context_frame()])

Expand Down
Loading

0 comments on commit 06b0109

Please sign in to comment.