diff --git a/src/pipecat/workflow/workflow_mapping.py b/src/pipecat/workflow/workflow_mapping.py index fd826a7a5..1bb37fdd5 100644 --- a/src/pipecat/workflow/workflow_mapping.py +++ b/src/pipecat/workflow/workflow_mapping.py @@ -2,16 +2,15 @@ from ..services.openai import OpenAILLMService from ..services.deepgram import DeepgramSTTService from ..transports.services.daily import DailyTransport -from ..processors.aggregators.openai_llm_context import OpenAILLMContext from ..processors.frame_processor import FrameProcessor # Map workflow types to their corresponding Python classes WORKFLOW_MAPPING = { - "frames/audio_input": DailyTransport, - "frame_processors/speech_to_text": DeepgramSTTService, - "frame_processors/llm": OpenAILLMService, - "frame_processors/text_to_speech": CartesiaTTSService, - "frame_processors/audio_output_transport": DailyTransport, + "inputs/audio_input": DailyTransport, + "processors/speech_to_text": DeepgramSTTService, + "processors/llm": OpenAILLMService, + "processors/text_to_speech": CartesiaTTSService, + "outputs/audio_output": DailyTransport, } diff --git a/src/pipecat/workflow/workflow_test.py b/src/pipecat/workflow/workflow_test.py index d109cbafe..e454f20cc 100644 --- a/src/pipecat/workflow/workflow_test.py +++ b/src/pipecat/workflow/workflow_test.py @@ -5,8 +5,8 @@ from ..pipeline.runner import PipelineRunner from ..pipeline.task import PipelineTask, PipelineParams from .workflow_translator import translate_workflow -from ..frames.frames import LLMMessagesFrame -from ..transports.services.daily import DailyTransport +from ..services.openai import OpenAIUserContextAggregator + load_dotenv(override=True) @@ -21,7 +21,7 @@ async def main(): # Translate the workflow to a list of processors print("Translating workflow to processors") - processors = translate_workflow(workflow_path) + processors, daily_transport = translate_workflow(workflow_path) print(f"Processors created: {processors}") # Create a pipeline from the processors @@ -39,15 +39,14 @@ async def main(): runner = PipelineRunner() print(f"Pipeline runner created: {runner}") - # # Add event handler - # daily_transport = next(p for p in processors if isinstance(p, DailyTransport)) + user_context_aggregator = next( + p for p in processors if isinstance(p, OpenAIUserContextAggregator) + ) - # @daily_transport.event_handler("on_first_participant_joined") - # async def on_first_participant_joined(transport, participant): - # transport.capture_participant_transcription(participant["id"]) - # # Kick off the conversation. - # messages = [{"role": "system", "content": "Please introduce yourself to the user."}] - # await task.queue_frames([LLMMessagesFrame(messages)]) + @daily_transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([user_context_aggregator.get_context_frame()]) # Run the pipeline print("Running the pipeline") diff --git a/src/pipecat/workflow/workflow_translator.py b/src/pipecat/workflow/workflow_translator.py index 5321e24dc..58c350aba 100644 --- a/src/pipecat/workflow/workflow_translator.py +++ b/src/pipecat/workflow/workflow_translator.py @@ -1,11 +1,13 @@ import json +import os -from typing import List, Dict, Any +from typing import Any, Dict, List, Tuple from .workflow_mapping import get_processor_class from ..processors.frame_processor import FrameProcessor from ..transports.services.daily import DailyParams from ..processors.aggregators.openai_llm_context import OpenAILLMContext from ..audio.vad.silero import SileroVADAnalyzer +from ..transports.base_transport import BaseTransport def load_workflow(file_path: str) -> Dict[str, Any]: @@ -27,10 +29,10 @@ def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) -> # Extract relevant properties for initialization init_params = {} - if node["type"] == "frames/audio_input": + if node["type"] == "inputs/audio_input": init_params = { - "room_url": node["properties"]["daily_url"], - "token": None, + "room_url": os.getenv("DAILY_SAMPLE_ROOM_URL"), + "token": "", "bot_name": "PipecatBot", "params": DailyParams( audio_out_enabled=True, @@ -39,24 +41,24 @@ def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) -> vad_analyzer=SileroVADAnalyzer(), ), } - elif node["type"] == "frame_processors/speech_to_text": + elif node["type"] == "processors/speech_to_text": init_params = { - "api_key": "sample_api_key", + "api_key": os.getenv("DEEPGRAM_API_KEY"), } - elif node["type"] == "frame_processors/text_to_speech": + elif node["type"] == "processors/text_to_speech": init_params = { - "api_key": node["properties"]["cartesia_api_key"], - "voice_id": node["properties"]["voice"], - "model": node["properties"]["model"], + "api_key": os.getenv("CARTESIA_API_KEY"), + "voice_id": "79a125e8-cd45-4c13-8a67-188112f4dd22", } print(f"Initialization parameters: {init_params}") processor = processor_class(**init_params) print(f"Processor created: {processor}") + return processor -def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]: +def create_pipeline(workflow: Dict[str, Any]) -> Tuple[List[FrameProcessor], BaseTransport]: print("Creating pipeline from workflow") nodes = {node["id"]: node for node in workflow["nodes"]} links = workflow["links"] @@ -74,20 +76,27 @@ def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]: for node_id, node in nodes.items(): print(f"Creating processor for node: {node_id}") - if node["type"] == "frames/audio_input": + if node["type"] == "inputs/audio_input": daily_transport = create_processor(node) processors[node_id] = {"processor": daily_transport, "type": node["type"]} - elif node["type"] == "frame_processors/audio_output_transport": + elif node["type"] == "outputs/audio_output": if daily_transport is None: raise ValueError("Audio output transport node found before audio input node") processors[node_id] = {"processor": daily_transport, "type": node["type"]} - elif node["type"] == "frame_processors/llm": + elif node["type"] == "processors/llm": llm_service = create_processor(node) processors[node_id] = {"processor": llm_service, "type": node["type"]} context = OpenAILLMContext( - [{"role": "system", "content": "You are a helpful assistant."}] + [ + { + "role": "system", + "content": "You are a helpful assistant. Your name is Housecat. You are participating in a voice conversation. Keep your answers brief. For punctuation use only period, comma, and question mark.", + }, + {"role": "user", "content": "Introduce yourself."}, + ] ) context_aggregator = llm_service.create_context_aggregator(context) + print(f"Context aggregator created: {context_aggregator}") else: processors[node_id] = {"processor": create_processor(node), "type": node["type"]} @@ -97,40 +106,35 @@ def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]: source_id, _, _, target_id, _, _ = link print(f"Processing link: {source_id} -> {target_id}") - if source_id not in pipeline: - print(f"Adding source processor: {source_id}") - if processors[source_id]["type"] == "frames/audio_input": + if processors[source_id]["processor"] not in pipeline: + print(f"Adding source processor: {source_id}, {processors[source_id]['processor']}") + if processors[source_id]["type"] == "inputs/audio_input": pipeline.append(processors[source_id]["processor"].input()) else: pipeline.append(processors[source_id]["processor"]) - # Add context_aggregator.user() before LLM - if processors[target_id]["type"] == "frame_processors/llm" and context_aggregator: - pipeline.append(context_aggregator.user()) - - if target_id not in pipeline and target_id in processors: - print(f"Adding target processor: {target_id}") - if processors[target_id]["type"] == "frame_processors/audio_output_transport": + if processors[target_id]["processor"] not in pipeline and target_id in processors: + print(f"Adding target processor: {target_id} {processors[target_id]['processor']}") + if processors[target_id]["type"] == "outputs/audio_output": pipeline.append(processors[target_id]["processor"].output()) + elif processors[target_id]["type"] == "processors/llm": + print("TRYING TO LINK AGGREGATOR") + if context_aggregator: + print("AGGREGATOR FOUND") + pipeline.append(context_aggregator.user()) + pipeline.append(processors[target_id]["processor"]) else: pipeline.append(processors[target_id]["processor"]) - # Add context_aggregator.assistant() after audio output transport - if ( - processors[target_id]["type"] == "frame_processors/audio_output_transport" - and context_aggregator - ): - pipeline.append(context_aggregator.assistant()) - print(f"Pipeline created with {len(pipeline)} processors") print(f"Pipeline: {pipeline}") - return pipeline + return pipeline, daily_transport -def translate_workflow(file_path: str) -> List[FrameProcessor]: +def translate_workflow(file_path: str) -> Tuple[List[FrameProcessor], BaseTransport]: print(f"Translating workflow from file: {file_path}") workflow = load_workflow(file_path) - pipeline = create_pipeline(workflow) + pipeline, transport = create_pipeline(workflow) print("Workflow translation completed") - return pipeline + return pipeline, transport