Skip to content

Commit

Permalink
workflow_test working except for text_input node
Browse files Browse the repository at this point in the history
  • Loading branch information
kwindla committed Nov 2, 2024
1 parent 388b3a2 commit b20687e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 53 deletions.
11 changes: 5 additions & 6 deletions src/pipecat/workflow/workflow_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
from ..services.openai import OpenAILLMService
from ..services.deepgram import DeepgramSTTService
from ..transports.services.daily import DailyTransport
from ..processors.aggregators.openai_llm_context import OpenAILLMContext
from ..processors.frame_processor import FrameProcessor

# Map workflow types to their corresponding Python classes
WORKFLOW_MAPPING = {
"frames/audio_input": DailyTransport,
"frame_processors/speech_to_text": DeepgramSTTService,
"frame_processors/llm": OpenAILLMService,
"frame_processors/text_to_speech": CartesiaTTSService,
"frame_processors/audio_output_transport": DailyTransport,
"inputs/audio_input": DailyTransport,
"processors/speech_to_text": DeepgramSTTService,
"processors/llm": OpenAILLMService,
"processors/text_to_speech": CartesiaTTSService,
"outputs/audio_output": DailyTransport,
}


Expand Down
21 changes: 10 additions & 11 deletions src/pipecat/workflow/workflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from ..pipeline.runner import PipelineRunner
from ..pipeline.task import PipelineTask, PipelineParams
from .workflow_translator import translate_workflow
from ..frames.frames import LLMMessagesFrame
from ..transports.services.daily import DailyTransport
from ..services.openai import OpenAIUserContextAggregator


load_dotenv(override=True)

Expand All @@ -21,7 +21,7 @@ async def main():

# Translate the workflow to a list of processors
print("Translating workflow to processors")
processors = translate_workflow(workflow_path)
processors, daily_transport = translate_workflow(workflow_path)
print(f"Processors created: {processors}")

# Create a pipeline from the processors
Expand All @@ -39,15 +39,14 @@ async def main():
runner = PipelineRunner()
print(f"Pipeline runner created: {runner}")

# # Add event handler
# daily_transport = next(p for p in processors if isinstance(p, DailyTransport))
user_context_aggregator = next(
p for p in processors if isinstance(p, OpenAIUserContextAggregator)
)

# @daily_transport.event_handler("on_first_participant_joined")
# async def on_first_participant_joined(transport, participant):
# transport.capture_participant_transcription(participant["id"])
# # Kick off the conversation.
# messages = [{"role": "system", "content": "Please introduce yourself to the user."}]
# await task.queue_frames([LLMMessagesFrame(messages)])
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([user_context_aggregator.get_context_frame()])

# Run the pipeline
print("Running the pipeline")
Expand Down
76 changes: 40 additions & 36 deletions src/pipecat/workflow/workflow_translator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import json
import os

from typing import List, Dict, Any
from typing import Any, Dict, List, Tuple
from .workflow_mapping import get_processor_class
from ..processors.frame_processor import FrameProcessor
from ..transports.services.daily import DailyParams
from ..processors.aggregators.openai_llm_context import OpenAILLMContext
from ..audio.vad.silero import SileroVADAnalyzer
from ..transports.base_transport import BaseTransport


def load_workflow(file_path: str) -> Dict[str, Any]:
Expand All @@ -27,10 +29,10 @@ def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) ->

# Extract relevant properties for initialization
init_params = {}
if node["type"] == "frames/audio_input":
if node["type"] == "inputs/audio_input":
init_params = {
"room_url": node["properties"]["daily_url"],
"token": None,
"room_url": os.getenv("DAILY_SAMPLE_ROOM_URL"),
"token": "",
"bot_name": "PipecatBot",
"params": DailyParams(
audio_out_enabled=True,
Expand All @@ -39,24 +41,24 @@ def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) ->
vad_analyzer=SileroVADAnalyzer(),
),
}
elif node["type"] == "frame_processors/speech_to_text":
elif node["type"] == "processors/speech_to_text":
init_params = {
"api_key": "sample_api_key",
"api_key": os.getenv("DEEPGRAM_API_KEY"),
}
elif node["type"] == "frame_processors/text_to_speech":
elif node["type"] == "processors/text_to_speech":
init_params = {
"api_key": node["properties"]["cartesia_api_key"],
"voice_id": node["properties"]["voice"],
"model": node["properties"]["model"],
"api_key": os.getenv("CARTESIA_API_KEY"),
"voice_id": "79a125e8-cd45-4c13-8a67-188112f4dd22",
}

print(f"Initialization parameters: {init_params}")
processor = processor_class(**init_params)
print(f"Processor created: {processor}")

return processor


def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]:
def create_pipeline(workflow: Dict[str, Any]) -> Tuple[List[FrameProcessor], BaseTransport]:
print("Creating pipeline from workflow")
nodes = {node["id"]: node for node in workflow["nodes"]}
links = workflow["links"]
Expand All @@ -74,20 +76,27 @@ def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]:
for node_id, node in nodes.items():
print(f"Creating processor for node: {node_id}")

if node["type"] == "frames/audio_input":
if node["type"] == "inputs/audio_input":
daily_transport = create_processor(node)
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
elif node["type"] == "frame_processors/audio_output_transport":
elif node["type"] == "outputs/audio_output":
if daily_transport is None:
raise ValueError("Audio output transport node found before audio input node")
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
elif node["type"] == "frame_processors/llm":
elif node["type"] == "processors/llm":
llm_service = create_processor(node)
processors[node_id] = {"processor": llm_service, "type": node["type"]}
context = OpenAILLMContext(
[{"role": "system", "content": "You are a helpful assistant."}]
[
{
"role": "system",
"content": "You are a helpful assistant. Your name is Housecat. You are participating in a voice conversation. Keep your answers brief. For punctuation use only period, comma, and question mark.",
},
{"role": "user", "content": "Introduce yourself."},
]
)
context_aggregator = llm_service.create_context_aggregator(context)
print(f"Context aggregator created: {context_aggregator}")
else:
processors[node_id] = {"processor": create_processor(node), "type": node["type"]}

Expand All @@ -97,40 +106,35 @@ def create_pipeline(workflow: Dict[str, Any]) -> List[FrameProcessor]:
source_id, _, _, target_id, _, _ = link
print(f"Processing link: {source_id} -> {target_id}")

if source_id not in pipeline:
print(f"Adding source processor: {source_id}")
if processors[source_id]["type"] == "frames/audio_input":
if processors[source_id]["processor"] not in pipeline:
print(f"Adding source processor: {source_id}, {processors[source_id]['processor']}")
if processors[source_id]["type"] == "inputs/audio_input":
pipeline.append(processors[source_id]["processor"].input())
else:
pipeline.append(processors[source_id]["processor"])

# Add context_aggregator.user() before LLM
if processors[target_id]["type"] == "frame_processors/llm" and context_aggregator:
pipeline.append(context_aggregator.user())

if target_id not in pipeline and target_id in processors:
print(f"Adding target processor: {target_id}")
if processors[target_id]["type"] == "frame_processors/audio_output_transport":
if processors[target_id]["processor"] not in pipeline and target_id in processors:
print(f"Adding target processor: {target_id} {processors[target_id]['processor']}")
if processors[target_id]["type"] == "outputs/audio_output":
pipeline.append(processors[target_id]["processor"].output())
elif processors[target_id]["type"] == "processors/llm":
print("TRYING TO LINK AGGREGATOR")
if context_aggregator:
print("AGGREGATOR FOUND")
pipeline.append(context_aggregator.user())
pipeline.append(processors[target_id]["processor"])
else:
pipeline.append(processors[target_id]["processor"])

# Add context_aggregator.assistant() after audio output transport
if (
processors[target_id]["type"] == "frame_processors/audio_output_transport"
and context_aggregator
):
pipeline.append(context_aggregator.assistant())

print(f"Pipeline created with {len(pipeline)} processors")
print(f"Pipeline: {pipeline}")

return pipeline
return pipeline, daily_transport


def translate_workflow(file_path: str) -> List[FrameProcessor]:
def translate_workflow(file_path: str) -> Tuple[List[FrameProcessor], BaseTransport]:
print(f"Translating workflow from file: {file_path}")
workflow = load_workflow(file_path)
pipeline = create_pipeline(workflow)
pipeline, transport = create_pipeline(workflow)
print("Workflow translation completed")
return pipeline
return pipeline, transport

0 comments on commit b20687e

Please sign in to comment.