-
Notifications
You must be signed in to change notification settings - Fork 435
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
223 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
*.json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
python -m pipecat.workflow.workflow_test to run |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from ..services.cartesia import CartesiaTTSService | ||
from ..services.openai import OpenAILLMService | ||
from ..services.deepgram import DeepgramSTTService | ||
from ..transports.services.daily import DailyTransport | ||
from ..processors.aggregators.openai_llm_context import OpenAILLMContext | ||
from ..processors.frame_processor import FrameProcessor | ||
|
||
# Map workflow types to their corresponding Python classes | ||
WORKFLOW_MAPPING = { | ||
"frames/audio_input": DailyTransport, | ||
"frame_processors/speech_to_text": DeepgramSTTService, | ||
"frame_processors/llm": OpenAILLMService, | ||
"frame_processors/text_to_speech": CartesiaTTSService, | ||
"frame_processors/audio_output_transport": DailyTransport, | ||
} | ||
|
||
|
||
def get_processor_class(node_type: str) -> type[FrameProcessor]: | ||
return WORKFLOW_MAPPING.get(node_type, FrameProcessor) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import asyncio | ||
import os | ||
from dotenv import load_dotenv | ||
from ..pipeline.pipeline import Pipeline | ||
from ..pipeline.runner import PipelineRunner | ||
from ..pipeline.task import PipelineTask, PipelineParams | ||
from .workflow_translator import translate_workflow | ||
from ..frames.frames import LLMMessagesFrame | ||
from ..transports.services.daily import DailyTransport | ||
|
||
load_dotenv(override=True) | ||
|
||
|
||
async def main(): | ||
print("Starting workflow test") | ||
|
||
# Update the path to the workflow.json file | ||
script_dir = os.path.dirname(os.path.abspath(__file__)) | ||
workflow_path = os.path.join(script_dir, "workflow.json") | ||
print(f"Workflow path: {workflow_path}") | ||
|
||
# Translate the workflow to a list of processors | ||
print("Translating workflow to processors") | ||
processors = translate_workflow(workflow_path) | ||
print(f"Processors created: {processors}") | ||
|
||
# Create a pipeline from the processors | ||
print("Creating pipeline") | ||
pipeline = Pipeline(processors) | ||
print(f"Pipeline created: {pipeline}") | ||
|
||
# Create a pipeline task | ||
print("Creating pipeline task") | ||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) | ||
print(f"Pipeline task created: {task}") | ||
|
||
# Create a pipeline runner | ||
print("Creating pipeline runner") | ||
runner = PipelineRunner() | ||
print(f"Pipeline runner created: {runner}") | ||
|
||
# # Add event handler | ||
# daily_transport = next(p for p in processors if isinstance(p, DailyTransport)) | ||
|
||
# @daily_transport.event_handler("on_first_participant_joined") | ||
# async def on_first_participant_joined(transport, participant): | ||
# transport.capture_participant_transcription(participant["id"]) | ||
# # Kick off the conversation. | ||
# messages = [{"role": "system", "content": "Please introduce yourself to the user."}] | ||
# await task.queue_frames([LLMMessagesFrame(messages)]) | ||
|
||
# Run the pipeline | ||
print("Running the pipeline") | ||
try: | ||
await runner.run(task) | ||
print("Pipeline execution completed successfully") | ||
except Exception as e: | ||
print(f"Error during pipeline execution: {e}") | ||
|
||
print("Workflow test completed") | ||
|
||
|
||
if __name__ == "__main__": | ||
print("Starting main execution") | ||
asyncio.run(main()) | ||
print("Main execution completed") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
import json | ||
|
||
from typing import List, Dict, Any | ||
from .workflow_mapping import get_processor_class | ||
from ..processors.frame_processor import FrameProcessor | ||
from ..transports.services.daily import DailyParams | ||
from ..processors.aggregators.openai_llm_context import OpenAILLMContext | ||
from ..audio.vad.silero import SileroVADAnalyzer | ||
|
||
|
||
def load_workflow(file_path: str) -> Dict[str, Any]: | ||
print(f"Loading workflow from file: {file_path}") | ||
try: | ||
with open(file_path, "r") as f: | ||
workflow = json.load(f) | ||
print(f"Workflow loaded successfully: {workflow}") | ||
return workflow | ||
except Exception as e: | ||
print(f"Error loading workflow: {e}") | ||
raise | ||
|
||
|
||
def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) -> FrameProcessor: | ||
print(f"Creating processor for node: {node['id']} of type: {node['type']}") | ||
processor_class = get_processor_class(node["type"]) | ||
print(f"Processor class: {processor_class}") | ||
|
||
# Extract relevant properties for initialization | ||
init_params = {} | ||
if node["type"] == "frames/audio_input": | ||
init_params = { | ||
"room_url": node["properties"]["daily_url"], | ||
"token": None, | ||
"bot_name": "PipecatBot", | ||
"params": DailyParams( | ||
audio_out_enabled=True, | ||
vad_enabled=True, | ||
vad_audio_passthrough=True, | ||
vad_analyzer=SileroVADAnalyzer(), | ||
), | ||
} | ||
elif node["type"] == "frame_processors/speech_to_text": | ||
init_params = { | ||
"api_key": "sample_api_key", | ||
} | ||
elif node["type"] == "frame_processors/text_to_speech": | ||
init_params = { | ||
"api_key": node["properties"]["cartesia_api_key"], | ||
"voice_id": node["properties"]["voice"], | ||
"model": node["properties"]["model"], | ||
} | ||
|
||
print(f"Initialization parameters: {init_params}") | ||
processor = processor_class(**init_params) | ||
print(f"Processor created: {processor}") | ||
return processor | ||
|
||
|
||
def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]: | ||
print("Creating pipeline from workflow") | ||
nodes = {node["id"]: node for node in workflow["nodes"]} | ||
links = workflow["links"] | ||
|
||
print(f"Nodes: {nodes}") | ||
print(f"Links: {links}") | ||
|
||
# Create a dictionary to store processors | ||
processors = {} | ||
daily_transport = None | ||
llm_service = None | ||
context_aggregator = None | ||
|
||
# Create processors for each node | ||
for node_id, node in nodes.items(): | ||
print(f"Creating processor for node: {node_id}") | ||
|
||
if node["type"] == "frames/audio_input": | ||
daily_transport = create_processor(node) | ||
processors[node_id] = {"processor": daily_transport, "type": node["type"]} | ||
elif node["type"] == "frame_processors/audio_output_transport": | ||
if daily_transport is None: | ||
raise ValueError("Audio output transport node found before audio input node") | ||
processors[node_id] = {"processor": daily_transport, "type": node["type"]} | ||
elif node["type"] == "frame_processors/llm": | ||
llm_service = create_processor(node) | ||
processors[node_id] = {"processor": llm_service, "type": node["type"]} | ||
context = OpenAILLMContext( | ||
[{"role": "system", "content": "You are a helpful assistant."}] | ||
) | ||
context_aggregator = llm_service.create_context_aggregator(context) | ||
else: | ||
processors[node_id] = {"processor": create_processor(node), "type": node["type"]} | ||
|
||
# Create the pipeline based on the links | ||
pipeline = [] | ||
for link in links: | ||
source_id, _, _, target_id, _, _ = link | ||
print(f"Processing link: {source_id} -> {target_id}") | ||
|
||
if source_id not in pipeline: | ||
print(f"Adding source processor: {source_id}") | ||
if processors[source_id]["type"] == "frames/audio_input": | ||
pipeline.append(processors[source_id]["processor"].input()) | ||
else: | ||
pipeline.append(processors[source_id]["processor"]) | ||
|
||
# Add context_aggregator.user() before LLM | ||
if processors[target_id]["type"] == "frame_processors/llm" and context_aggregator: | ||
pipeline.append(context_aggregator.user()) | ||
|
||
if target_id not in pipeline and target_id in processors: | ||
print(f"Adding target processor: {target_id}") | ||
if processors[target_id]["type"] == "frame_processors/audio_output_transport": | ||
pipeline.append(processors[target_id]["processor"].output()) | ||
else: | ||
pipeline.append(processors[target_id]["processor"]) | ||
|
||
# Add context_aggregator.assistant() after audio output transport | ||
if ( | ||
processors[target_id]["type"] == "frame_processors/audio_output_transport" | ||
and context_aggregator | ||
): | ||
pipeline.append(context_aggregator.assistant()) | ||
|
||
print(f"Pipeline created with {len(pipeline)} processors") | ||
print(f"Pipeline: {pipeline}") | ||
|
||
return pipeline | ||
|
||
|
||
def translate_workflow(file_path: str) -> List[FrameProcessor]: | ||
print(f"Translating workflow from file: {file_path}") | ||
workflow = load_workflow(file_path) | ||
pipeline = create_pipeline(workflow) | ||
print("Workflow translation completed") | ||
return pipeline |