Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow_test working except for text_input node #686

Draft
wants to merge 1 commit into
base: hyypeman/load-json-workflow
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/pipecat/workflow/workflow_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
from ..services.openai import OpenAILLMService
from ..services.deepgram import DeepgramSTTService
from ..transports.services.daily import DailyTransport
from ..processors.aggregators.openai_llm_context import OpenAILLMContext
from ..processors.frame_processor import FrameProcessor

# Map workflow types to their corresponding Python classes
WORKFLOW_MAPPING = {
"frames/audio_input": DailyTransport,
"frame_processors/speech_to_text": DeepgramSTTService,
"frame_processors/llm": OpenAILLMService,
"frame_processors/text_to_speech": CartesiaTTSService,
"frame_processors/audio_output_transport": DailyTransport,
"inputs/audio_input": DailyTransport,
"processors/speech_to_text": DeepgramSTTService,
"processors/llm": OpenAILLMService,
"processors/text_to_speech": CartesiaTTSService,
"outputs/audio_output": DailyTransport,
}


Expand Down
21 changes: 10 additions & 11 deletions src/pipecat/workflow/workflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from ..pipeline.runner import PipelineRunner
from ..pipeline.task import PipelineTask, PipelineParams
from .workflow_translator import translate_workflow
from ..frames.frames import LLMMessagesFrame
from ..transports.services.daily import DailyTransport
from ..services.openai import OpenAIUserContextAggregator


load_dotenv(override=True)

Expand All @@ -21,7 +21,7 @@ async def main():

# Translate the workflow to a list of processors
print("Translating workflow to processors")
processors = translate_workflow(workflow_path)
processors, daily_transport = translate_workflow(workflow_path)
print(f"Processors created: {processors}")

# Create a pipeline from the processors
Expand All @@ -39,15 +39,14 @@ async def main():
runner = PipelineRunner()
print(f"Pipeline runner created: {runner}")

# # Add event handler
# daily_transport = next(p for p in processors if isinstance(p, DailyTransport))
user_context_aggregator = next(
p for p in processors if isinstance(p, OpenAIUserContextAggregator)
)

# @daily_transport.event_handler("on_first_participant_joined")
# async def on_first_participant_joined(transport, participant):
# transport.capture_participant_transcription(participant["id"])
# # Kick off the conversation.
# messages = [{"role": "system", "content": "Please introduce yourself to the user."}]
# await task.queue_frames([LLMMessagesFrame(messages)])
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([user_context_aggregator.get_context_frame()])

# Run the pipeline
print("Running the pipeline")
Expand Down
76 changes: 40 additions & 36 deletions src/pipecat/workflow/workflow_translator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import json
import os

from typing import List, Dict, Any
from typing import Any, Dict, List, Tuple
from .workflow_mapping import get_processor_class
from ..processors.frame_processor import FrameProcessor
from ..transports.services.daily import DailyParams
from ..processors.aggregators.openai_llm_context import OpenAILLMContext
from ..audio.vad.silero import SileroVADAnalyzer
from ..transports.base_transport import BaseTransport


def load_workflow(file_path: str) -> Dict[str, Any]:
Expand All @@ -27,10 +29,10 @@ def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) ->

# Extract relevant properties for initialization
init_params = {}
if node["type"] == "frames/audio_input":
if node["type"] == "inputs/audio_input":
init_params = {
"room_url": node["properties"]["daily_url"],
"token": None,
"room_url": os.getenv("DAILY_SAMPLE_ROOM_URL"),
"token": "",
"bot_name": "PipecatBot",
"params": DailyParams(
audio_out_enabled=True,
Expand All @@ -39,24 +41,24 @@ def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) ->
vad_analyzer=SileroVADAnalyzer(),
),
}
elif node["type"] == "frame_processors/speech_to_text":
elif node["type"] == "processors/speech_to_text":
init_params = {
"api_key": "sample_api_key",
"api_key": os.getenv("DEEPGRAM_API_KEY"),
}
elif node["type"] == "frame_processors/text_to_speech":
elif node["type"] == "processors/text_to_speech":
init_params = {
"api_key": node["properties"]["cartesia_api_key"],
"voice_id": node["properties"]["voice"],
"model": node["properties"]["model"],
"api_key": os.getenv("CARTESIA_API_KEY"),
"voice_id": "79a125e8-cd45-4c13-8a67-188112f4dd22",
}

print(f"Initialization parameters: {init_params}")
processor = processor_class(**init_params)
print(f"Processor created: {processor}")

return processor


def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]:
def create_pipeline(workflow: Dict[str, Any]) -> Tuple[List[FrameProcessor], BaseTransport]:
print("Creating pipeline from workflow")
nodes = {node["id"]: node for node in workflow["nodes"]}
links = workflow["links"]
Expand All @@ -74,20 +76,27 @@ def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]:
for node_id, node in nodes.items():
print(f"Creating processor for node: {node_id}")

if node["type"] == "frames/audio_input":
if node["type"] == "inputs/audio_input":
daily_transport = create_processor(node)
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
elif node["type"] == "frame_processors/audio_output_transport":
elif node["type"] == "outputs/audio_output":
if daily_transport is None:
raise ValueError("Audio output transport node found before audio input node")
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
elif node["type"] == "frame_processors/llm":
elif node["type"] == "processors/llm":
llm_service = create_processor(node)
processors[node_id] = {"processor": llm_service, "type": node["type"]}
context = OpenAILLMContext(
[{"role": "system", "content": "You are a helpful assistant."}]
[
{
"role": "system",
"content": "You are a helpful assistant. Your name is Housecat. You are participating in a voice conversation. Keep your answers brief. For punctuation use only period, comma, and question mark.",
},
{"role": "user", "content": "Introduce yourself."},
]
)
context_aggregator = llm_service.create_context_aggregator(context)
print(f"Context aggregator created: {context_aggregator}")
else:
processors[node_id] = {"processor": create_processor(node), "type": node["type"]}

Expand All @@ -97,40 +106,35 @@ def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]:
source_id, _, _, target_id, _, _ = link
print(f"Processing link: {source_id} -> {target_id}")

if source_id not in pipeline:
print(f"Adding source processor: {source_id}")
if processors[source_id]["type"] == "frames/audio_input":
if processors[source_id]["processor"] not in pipeline:
print(f"Adding source processor: {source_id}, {processors[source_id]['processor']}")
if processors[source_id]["type"] == "inputs/audio_input":
pipeline.append(processors[source_id]["processor"].input())
else:
pipeline.append(processors[source_id]["processor"])

# Add context_aggregator.user() before LLM
if processors[target_id]["type"] == "frame_processors/llm" and context_aggregator:
pipeline.append(context_aggregator.user())

if target_id not in pipeline and target_id in processors:
print(f"Adding target processor: {target_id}")
if processors[target_id]["type"] == "frame_processors/audio_output_transport":
if processors[target_id]["processor"] not in pipeline and target_id in processors:
print(f"Adding target processor: {target_id} {processors[target_id]['processor']}")
if processors[target_id]["type"] == "outputs/audio_output":
pipeline.append(processors[target_id]["processor"].output())
elif processors[target_id]["type"] == "processors/llm":
print("TRYING TO LINK AGGREGATOR")
if context_aggregator:
print("AGGREGATOR FOUND")
pipeline.append(context_aggregator.user())
pipeline.append(processors[target_id]["processor"])
else:
pipeline.append(processors[target_id]["processor"])

# Add context_aggregator.assistant() after audio output transport
if (
processors[target_id]["type"] == "frame_processors/audio_output_transport"
and context_aggregator
):
pipeline.append(context_aggregator.assistant())

print(f"Pipeline created with {len(pipeline)} processors")
print(f"Pipeline: {pipeline}")

return pipeline
return pipeline, daily_transport


def translate_workflow(file_path: str) -> List[FrameProcessor]:
def translate_workflow(file_path: str) -> Tuple[List[FrameProcessor], BaseTransport]:
print(f"Translating workflow from file: {file_path}")
workflow = load_workflow(file_path)
pipeline = create_pipeline(workflow)
pipeline, transport = create_pipeline(workflow)
print("Workflow translation completed")
return pipeline
return pipeline, transport
Loading