From 55879bf36561052eccbb0d9447c2dabfc01ff5f0 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 13 Dec 2024 15:38:59 -0500 Subject: [PATCH 01/10] Add TranscriptionProcessor --- .../07a-interruptible-anthropic.py | 42 ++++- .../28a-transcription-update-openai.py | 128 +++++++++++++++ .../28b-transcription-update-anthropic.py | 128 +++++++++++++++ .../28c-transcription-update-gemini.py | 138 ++++++++++++++++ src/pipecat/frames/frames.py | 36 ++++- .../processors/transcript_processor.py | 150 ++++++++++++++++++ 6 files changed, 613 insertions(+), 9 deletions(-) create mode 100644 examples/foundational/28a-transcription-update-openai.py create mode 100644 examples/foundational/28b-transcription-update-anthropic.py create mode 100644 examples/foundational/28c-transcription-update-gemini.py create mode 100644 src/pipecat/processors/transcript_processor.py diff --git a/examples/foundational/07a-interruptible-anthropic.py b/examples/foundational/07a-interruptible-anthropic.py index e7e680eab..25a301269 100644 --- a/examples/foundational/07a-interruptible-anthropic.py +++ b/examples/foundational/07a-interruptible-anthropic.py @@ -7,6 +7,7 @@ import asyncio import os import sys +from typing import List import aiohttp from dotenv import load_dotenv @@ -14,12 +15,13 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMMessagesFrame +from pipecat.frames.frames import Frame, LLMMessagesFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.anthropic import AnthropicLLMService +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.anthropic import AnthropicLLMContext, AnthropicLLMService from pipecat.services.cartesia import CartesiaTTSService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -29,6 +31,28 @@ logger.add(sys.stderr, level="DEBUG") +class TestAnthropicLLMService(AnthropicLLMService): + async def process_frame(self, frame: Frame, direction: FrameDirection): + if isinstance(frame, LLMMessagesFrame): + logger.info("Original OpenAI format messages:") + logger.info(frame.messages) + + # Convert to Anthropic format + context = AnthropicLLMContext.from_messages(frame.messages) + logger.info("Converted to Anthropic format:") + logger.info(context.messages) + + # Convert back to OpenAI format + openai_messages = [] + for msg in context.messages: + converted = context.to_standard_messages(msg) + openai_messages.extend(converted) + logger.info("Converted back to OpenAI format:") + logger.info(openai_messages) + + await super().process_frame(frame, direction) + + async def main(): async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) @@ -50,18 +74,24 @@ async def main(): voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) - llm = AnthropicLLMService( + llm = TestAnthropicLLMService( api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-opus-20240229" ) - # todo: think more about how to handle system prompts in a more general way. OpenAI, - # Google, and Anthropic all have slightly different approaches to providing a system - # prompt. + # Test messages including various formats messages = [ { "role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way. Say hello.", }, + { + "role": "assistant", + "content": [ + {"type": "text", "text": "Hello! How can I help you today?"}, + {"type": "text", "text": "I'm ready to assist."}, + ], + }, + {"role": "user", "content": "Hi there!"}, ] context = OpenAILLMContext(messages) diff --git a/examples/foundational/28a-transcription-update-openai.py b/examples/foundational/28a-transcription-update-openai.py new file mode 100644 index 000000000..ec103ff82 --- /dev/null +++ b/examples/foundational/28a-transcription-update-openai.py @@ -0,0 +1,128 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys +from typing import List + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMMessagesFrame, TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.transcript_processor import TranscriptProcessor +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class TranscriptHandler: + """Simple handler to demonstrate transcript processing.""" + + def __init__(self): + self.messages: List[TranscriptionMessage] = [] + + async def on_transcript_update( + self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame + ): + """Handle new transcript messages.""" + self.messages.extend(frame.messages) + + # Log the new messages + logger.info("New transcript messages:") + for msg in frame.messages: + logger.info(f"{msg.role}: {msg.content}") + + # Log the full transcript + logger.info("Full transcript:") + for msg in self.messages: + logger.info(f"{msg.role}: {msg.content}") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o", + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way. Say hello.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + # Create transcript processor and handler + transcript_processor = TranscriptProcessor() + transcript_handler = TranscriptHandler() + + # Register event handler for transcript updates + @transcript_processor.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + await transcript_handler.on_transcript_update(processor, frame) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + transcript_processor, # Process transcripts + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/foundational/28b-transcription-update-anthropic.py b/examples/foundational/28b-transcription-update-anthropic.py new file mode 100644 index 000000000..23ee93a21 --- /dev/null +++ b/examples/foundational/28b-transcription-update-anthropic.py @@ -0,0 +1,128 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys +from typing import List + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMMessagesFrame, TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.transcript_processor import TranscriptProcessor +from pipecat.services.anthropic import AnthropicLLMService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class TranscriptHandler: + """Simple handler to demonstrate transcript processing.""" + + def __init__(self): + self.messages: List[TranscriptionMessage] = [] + + async def on_transcript_update( + self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame + ): + """Handle new transcript messages.""" + self.messages.extend(frame.messages) + + # Log the new messages + logger.info("New transcript messages:") + for msg in frame.messages: + logger.info(f"{msg.role}: {msg.content}") + + # Log the full transcript + logger.info("Full transcript:") + for msg in self.messages: + logger.info(f"{msg.role}: {msg.content}") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20241022" + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way.", + }, + {"role": "user", "content": "Say hello."}, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + # Create transcript processor and handler + transcript_processor = TranscriptProcessor() + transcript_handler = TranscriptHandler() + + # Register event handler for transcript updates + @transcript_processor.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + await transcript_handler.on_transcript_update(processor, frame) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + transcript_processor, # Process transcripts + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/foundational/28c-transcription-update-gemini.py b/examples/foundational/28c-transcription-update-gemini.py new file mode 100644 index 000000000..27291a7c9 --- /dev/null +++ b/examples/foundational/28c-transcription-update-gemini.py @@ -0,0 +1,138 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys +from typing import List + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.transcript_processor import TranscriptProcessor +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.google import GoogleLLMService +from pipecat.services.openai import OpenAILLMContext +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class TranscriptHandler: + """Simple handler to demonstrate transcript processing.""" + + def __init__(self): + self.messages: List[TranscriptionMessage] = [] + + async def on_transcript_update( + self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame + ): + """Handle new transcript messages.""" + self.messages.extend(frame.messages) + + # Log the new messages + logger.info("New transcript messages:") + for msg in frame.messages: + logger.info(f"{msg.role}: {msg.content}") + + # Log the full transcript + logger.info("Full transcript:") + for msg in self.messages: + logger.info(f"{msg.role}: {msg.content}") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + llm = GoogleLLMService( + model="models/gemini-2.0-flash-exp", + # model="gemini-exp-1114", + api_key=os.getenv("GOOGLE_API_KEY"), + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way.", + }, + {"role": "user", "content": "Say hello."}, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + # Create transcript processor and handler + transcript_processor = TranscriptProcessor() + transcript_handler = TranscriptHandler() + + # Register event handler for transcript updates + @transcript_processor.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + await transcript_handler.on_transcript_update(processor, frame) + + pipeline = Pipeline( + [ + transport.input(), + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + transcript_processor, + ] + ) + + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index d3792f537..f74d30371 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -5,7 +5,7 @@ # from dataclasses import dataclass, field -from typing import Any, List, Mapping, Optional, Tuple +from typing import Any, List, Literal, Mapping, Optional, Tuple, TypeAlias from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.clocks.base_clock import BaseClock @@ -195,7 +195,8 @@ def __str__(self): @dataclass class InterimTranscriptionFrame(TextFrame): """A text frame with interim transcription-specific data. Will be placed in - the transport's receive queue when a participant speaks.""" + the transport's receive queue when a participant speaks. + """ text: str user_id: str @@ -206,6 +207,34 @@ def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})" +@dataclass +class TranscriptionMessage: + """A message in a conversation transcript containing the role and content. + + Messages are in standard format with roles normalized to user/assistant. + """ + + role: Literal["user", "assistant"] + content: str + timestamp: str | None = None + + +@dataclass +class TranscriptionUpdateFrame(DataFrame): + """A frame containing new messages added to the conversation transcript. + + This frame is emitted when new messages are added to the conversation history, + containing only the newly added messages rather than the full transcript. + Messages have normalized roles (user/assistant) regardless of the LLM service used. + """ + + messages: List[TranscriptionMessage] + + def __str__(self): + pts = format_pts(self.pts) + return f"{self.name}(pts: {pts}, messages: {len(self.messages)})" + + @dataclass class LLMMessagesFrame(DataFrame): """A frame containing a list of LLM messages. Used to signal that an LLM @@ -546,7 +575,8 @@ class EndFrame(ControlFrame): @dataclass class LLMFullResponseStartFrame(ControlFrame): """Used to indicate the beginning of an LLM response. Following by one or - more TextFrame and a final LLMFullResponseEndFrame.""" + more TextFrame and a final LLMFullResponseEndFrame. + """ pass diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py new file mode 100644 index 000000000..97173f967 --- /dev/null +++ b/src/pipecat/processors/transcript_processor.py @@ -0,0 +1,150 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from typing import List + +from loguru import logger + +from pipecat.frames.frames import ( + ErrorFrame, + Frame, + TranscriptionMessage, + TranscriptionUpdateFrame, +) +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class TranscriptProcessor(FrameProcessor): + """Processes LLM context frames to generate conversation transcripts. + + This processor monitors OpenAILLMContextFrame frames and extracts conversation + content, filtering out system messages and function calls. When new messages + are detected, it emits a TranscriptionUpdateFrame containing only the new + messages. + + Each LLM context (OpenAI, Anthropic, Google) provides conversion to the standard format: + [ + { + "role": "user", + "content": [{"type": "text", "text": "Hi, how are you?"}] + }, + { + "role": "assistant", + "content": [{"type": "text", "text": "Great! And you?"}] + } + ] + + Events: + on_transcript_update: Emitted when new transcript messages are available. + Args: TranscriptionUpdateFrame containing new messages. + + Example: + ```python + transcript_processor = TranscriptProcessor() + + @transcript_processor.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + for msg in frame.messages: + print(f"{msg.role}: {msg.content}") + ``` + """ + + def __init__(self, **kwargs): + """Initialize the transcript processor. + + Args: + **kwargs: Additional arguments passed to FrameProcessor + """ + super().__init__(**kwargs) + self._processed_messages: List[TranscriptionMessage] = [] + self._register_event_handler("on_transcript_update") + + def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]: + """Extract conversation messages from standard format. + + Args: + messages: List of messages in standard format with structured content + + Returns: + List[TranscriptionMessage]: Normalized conversation messages + """ + result = [] + for msg in messages: + # Only process user and assistant messages + if msg["role"] not in ("user", "assistant"): + continue + + content = msg.get("content", []) + if isinstance(content, list): + # Extract text from structured content + text_parts = [] + for part in content: + if isinstance(part, dict) and part.get("type") == "text": + text_parts.append(part["text"]) + + if text_parts: + result.append( + TranscriptionMessage(role=msg["role"], content=" ".join(text_parts)) + ) + + return result + + def _find_new_messages(self, current: List[TranscriptionMessage]) -> List[TranscriptionMessage]: + """Find messages in current that aren't in self._processed_messages. + + Args: + current: List of current messages + + Returns: + List[TranscriptionMessage]: New messages not yet processed + """ + if not self._processed_messages: + return current + + processed_len = len(self._processed_messages) + if len(current) <= processed_len: + return [] + + return current[processed_len:] + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process incoming frames, watching for OpenAILLMContextFrame. + + Args: + frame: The frame to process + direction: Frame processing direction + + Raises: + ErrorFrame: If message processing fails + """ + await super().process_frame(frame, direction) + + if isinstance(frame, OpenAILLMContextFrame): + try: + # Convert context messages to standard format + standard_messages = [] + for msg in frame.context.messages: + converted = frame.context.to_standard_messages(msg) + standard_messages.extend(converted) + + # Extract and process messages + current_messages = self._extract_messages(standard_messages) + new_messages = self._find_new_messages(current_messages) + + if new_messages: + # Update state and notify listeners + self._processed_messages.extend(new_messages) + update_frame = TranscriptionUpdateFrame(messages=new_messages) + await self._call_event_handler("on_transcript_update", update_frame) + await self.push_frame(update_frame) + + except Exception as e: + logger.error(f"Error processing transcript in {self}: {e}") + await self.push_error(ErrorFrame(str(e))) + + # Always push the original frame downstream + await self.push_frame(frame, direction) From 4f2aee5fba578862c17caf2c41c239bed6094990 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 14 Dec 2024 09:14:34 -0500 Subject: [PATCH 02/10] Update OpenAI's to_standard_messages to return the verboase message format --- .../aggregators/openai_llm_context.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index 6e7474c17..e84584053 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -115,8 +115,25 @@ def get_messages_for_logging(self) -> str: def from_standard_message(self, message): return message - # convert a message in this LLM's format to one or more messages in OpenAI format def to_standard_messages(self, obj) -> list: + """Convert OpenAI message to standard structured format. + + Args: + obj: Message in OpenAI format {"role": "user", "content": "text"} + + Returns: + List containing message with structured content: + [{"role": "user", "content": [{"type": "text", "text": "message"}]}] + """ + # Skip messages without content + if not obj.get("content"): + return [] + + # Convert simple string content to structured format + if isinstance(obj["content"], str): + return [{"role": obj["role"], "content": [{"type": "text", "text": obj["content"]}]}] + + # Return original message if content is already structured return [obj] def get_messages_for_initializing_history(self): From 51b235df4b022db807c1e3365d5f81564403aa6f Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 14 Dec 2024 09:22:33 -0500 Subject: [PATCH 03/10] Add docstrings for Google and Anthropic's to_standard_messages and from_standard_message functions --- src/pipecat/services/anthropic.py | 44 +++++++++++++++++++++++++++++++ src/pipecat/services/google.py | 40 ++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index f0c033375..d8f485296 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -378,6 +378,26 @@ def set_messages(self, messages: List): # convert a message in Anthropic format into one or more messages in OpenAI format def to_standard_messages(self, obj): + """Convert Anthropic message format to standard structured format. + + Handles text content and function calls for both user and assistant messages. + + Args: + obj: Message in Anthropic format: + { + "role": "user/assistant", + "content": str | [{"type": "text/tool_use/tool_result", ...}] + } + + Returns: + List of messages in standard format: + [ + { + "role": "user/assistant/tool", + "content": [{"type": "text", "text": str}] + } + ] + """ # todo: image format (?) # tool_use role = obj.get("role") @@ -432,6 +452,30 @@ def to_standard_messages(self, obj): return messages def from_standard_message(self, message): + """Convert standard format message to Anthropic format. + + Handles conversion of text content, tool calls, and tool results. + Empty text content is converted to "(empty)". + + Args: + message: Message in standard format: + { + "role": "user/assistant/tool", + "content": str | [{"type": "text", ...}], + "tool_calls": [{"id": str, "function": {"name": str, "arguments": str}}] + } + + Returns: + Message in Anthropic format: + { + "role": "user/assistant", + "content": str | [ + {"type": "text", "text": str} | + {"type": "tool_use", "id": str, "name": str, "input": dict} | + {"type": "tool_result", "tool_use_id": str, "content": str} + ] + } + """ # todo: image messages (?) if message["role"] == "tool": return { diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index 5442ee91c..383dde624 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -412,6 +412,25 @@ def add_audio_frames_message(self, *, audio_frames: list[AudioRawFrame], text: s # self.add_message(message) def from_standard_message(self, message): + """Convert standard format message to Google Content object. + + Handles conversion of text, images, and function calls to Google's format. + System messages are stored separately and return None. + + Args: + message: Message in standard format: + { + "role": "user/assistant/system/tool", + "content": str | [{"type": "text/image_url", ...}] | None, + "tool_calls": [{"function": {"name": str, "arguments": str}}] + } + + Returns: + glm.Content object with: + - role: "user" or "model" (converted from "assistant") + - parts: List[Part] containing text, inline_data, or function calls + Returns None for system messages. + """ role = message["role"] content = message.get("content", []) if role == "system": @@ -461,6 +480,27 @@ def from_standard_message(self, message): return message def to_standard_messages(self, obj) -> list: + """Convert Google Content object to standard structured format. + + Handles text, images, and function calls from Google's Content/Part objects. + + Args: + obj: Google Content object with: + - role: "model" (converted to "assistant") or "user" + - parts: List[Part] containing text, inline_data, or function calls + + Returns: + List of messages in standard format: + [ + { + "role": "user/assistant/tool", + "content": [ + {"type": "text", "text": str} | + {"type": "image_url", "image_url": {"url": str}} + ] + } + ] + """ msg = {"role": obj.role, "content": []} if msg["role"] == "model": msg["role"] = "assistant" From 77aeda36eba644bd4471f98a0fd0d11ba94fbf2f Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 14 Dec 2024 09:23:43 -0500 Subject: [PATCH 04/10] Update OpenAI's from_standard_message to convert back to OpenAI's simple format --- .../aggregators/openai_llm_context.py | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index e84584053..4adf76de0 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -112,7 +112,38 @@ def get_messages_for_logging(self) -> str: msgs.append(msg) return json.dumps(msgs) - def from_standard_message(self, message): + def from_standard_message(self, message) -> dict: + """Convert standard format message to OpenAI format. + + Converts structured content back to OpenAI's simple string format. + + Args: + message: Message in standard format: + { + "role": "user/assistant", + "content": [{"type": "text", "text": str}] + } + + Returns: + Message in OpenAI format: + { + "role": "user/assistant", + "content": str + } + """ + # If content is already a string, return as-is + if isinstance(message.get("content"), str): + return message + + # Convert structured content to string + if isinstance(message.get("content"), list): + text_parts = [] + for part in message["content"]: + if part.get("type") == "text": + text_parts.append(part["text"]) + + return {"role": message["role"], "content": " ".join(text_parts) if text_parts else ""} + return message def to_standard_messages(self, obj) -> list: From dd2703317ae125d2c1a12653476e49f7d7712f71 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 14 Dec 2024 11:03:08 -0500 Subject: [PATCH 05/10] Add timestamp frames and include timestamps in the transcription event and frame --- .../28a-transcription-update-openai.py | 26 +++-- .../28b-transcription-update-anthropic.py | 26 +++-- .../28c-transcription-update-gemini.py | 26 +++-- src/pipecat/frames/frames.py | 14 +++ .../processors/aggregators/llm_response.py | 6 ++ .../processors/transcript_processor.py | 94 ++++++++++++------- src/pipecat/services/anthropic.py | 7 ++ src/pipecat/services/google.py | 13 +++ src/pipecat/services/openai.py | 7 ++ 9 files changed, 162 insertions(+), 57 deletions(-) diff --git a/examples/foundational/28a-transcription-update-openai.py b/examples/foundational/28a-transcription-update-openai.py index ec103ff82..390343465 100644 --- a/examples/foundational/28a-transcription-update-openai.py +++ b/examples/foundational/28a-transcription-update-openai.py @@ -32,7 +32,10 @@ class TranscriptHandler: - """Simple handler to demonstrate transcript processing.""" + """Simple handler to demonstrate transcript processing. + + Maintains a list of conversation messages and logs them with timestamps. + """ def __init__(self): self.messages: List[TranscriptionMessage] = [] @@ -40,18 +43,25 @@ def __init__(self): async def on_transcript_update( self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame ): - """Handle new transcript messages.""" + """Handle new transcript messages. + + Args: + processor: The TranscriptProcessor that emitted the update + frame: TranscriptionUpdateFrame containing new messages + """ self.messages.extend(frame.messages) # Log the new messages logger.info("New transcript messages:") for msg in frame.messages: - logger.info(f"{msg.role}: {msg.content}") - - # Log the full transcript - logger.info("Full transcript:") - for msg in self.messages: - logger.info(f"{msg.role}: {msg.content}") + timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + logger.info(f"{timestamp}{msg.role}: {msg.content}") + + # # Log the full transcript + # logger.info("Full transcript:") + # for msg in self.messages: + # timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + # logger.info(f"{timestamp}{msg.role}: {msg.content}") async def main(): diff --git a/examples/foundational/28b-transcription-update-anthropic.py b/examples/foundational/28b-transcription-update-anthropic.py index 23ee93a21..1119efad2 100644 --- a/examples/foundational/28b-transcription-update-anthropic.py +++ b/examples/foundational/28b-transcription-update-anthropic.py @@ -32,7 +32,10 @@ class TranscriptHandler: - """Simple handler to demonstrate transcript processing.""" + """Simple handler to demonstrate transcript processing. + + Maintains a list of conversation messages and logs them with timestamps. + """ def __init__(self): self.messages: List[TranscriptionMessage] = [] @@ -40,18 +43,25 @@ def __init__(self): async def on_transcript_update( self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame ): - """Handle new transcript messages.""" + """Handle new transcript messages. + + Args: + processor: The TranscriptProcessor that emitted the update + frame: TranscriptionUpdateFrame containing new messages + """ self.messages.extend(frame.messages) # Log the new messages logger.info("New transcript messages:") for msg in frame.messages: - logger.info(f"{msg.role}: {msg.content}") - - # Log the full transcript - logger.info("Full transcript:") - for msg in self.messages: - logger.info(f"{msg.role}: {msg.content}") + timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + logger.info(f"{timestamp}{msg.role}: {msg.content}") + + # # Log the full transcript + # logger.info("Full transcript:") + # for msg in self.messages: + # timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + # logger.info(f"{timestamp}{msg.role}: {msg.content}") async def main(): diff --git a/examples/foundational/28c-transcription-update-gemini.py b/examples/foundational/28c-transcription-update-gemini.py index 27291a7c9..bf9448199 100644 --- a/examples/foundational/28c-transcription-update-gemini.py +++ b/examples/foundational/28c-transcription-update-gemini.py @@ -33,7 +33,10 @@ class TranscriptHandler: - """Simple handler to demonstrate transcript processing.""" + """Simple handler to demonstrate transcript processing. + + Maintains a list of conversation messages and logs them with timestamps. + """ def __init__(self): self.messages: List[TranscriptionMessage] = [] @@ -41,18 +44,25 @@ def __init__(self): async def on_transcript_update( self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame ): - """Handle new transcript messages.""" + """Handle new transcript messages. + + Args: + processor: The TranscriptProcessor that emitted the update + frame: TranscriptionUpdateFrame containing new messages + """ self.messages.extend(frame.messages) # Log the new messages logger.info("New transcript messages:") for msg in frame.messages: - logger.info(f"{msg.role}: {msg.content}") - - # Log the full transcript - logger.info("Full transcript:") - for msg in self.messages: - logger.info(f"{msg.role}: {msg.content}") + timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + logger.info(f"{timestamp}{msg.role}: {msg.content}") + + # # Log the full transcript + # logger.info("Full transcript:") + # for msg in self.messages: + # timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + # logger.info(f"{timestamp}{msg.role}: {msg.content}") async def main(): diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index f74d30371..ab8a6f6ad 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -207,6 +207,20 @@ def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})" +@dataclass +class OpenAILLMContextUserTimestampFrame(DataFrame): + """Timestamp information for user message in LLM context.""" + + timestamp: str + + +@dataclass +class OpenAILLMContextAssistantTimestampFrame(DataFrame): + """Timestamp information for assistant message in LLM context.""" + + timestamp: str + + @dataclass class TranscriptionMessage: """A message in a conversation transcript containing the role and content. diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 479746471..612375da2 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -15,6 +15,7 @@ LLMMessagesFrame, LLMMessagesUpdateFrame, LLMSetToolsFrame, + OpenAILLMContextUserTimestampFrame, StartInterruptionFrame, TextFrame, TranscriptionFrame, @@ -26,6 +27,7 @@ OpenAILLMContextFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.utils.time import time_now_iso8601 class LLMResponseAggregator(FrameProcessor): @@ -289,6 +291,10 @@ async def _push_aggregation(self): frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) + # Push timestamp frame with current time + timestamp_frame = OpenAILLMContextUserTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + # Reset our accumulator state. self._reset() diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index 97173f967..fcd4bfe52 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -4,13 +4,15 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import List +from typing import List, Optional from loguru import logger from pipecat.frames.frames import ( ErrorFrame, Frame, + OpenAILLMContextAssistantTimestampFrame, + OpenAILLMContextUserTimestampFrame, TranscriptionMessage, TranscriptionUpdateFrame, ) @@ -19,12 +21,12 @@ class TranscriptProcessor(FrameProcessor): - """Processes LLM context frames to generate conversation transcripts. + """Processes LLM context frames to generate timestamped conversation transcripts. - This processor monitors OpenAILLMContextFrame frames and extracts conversation - content, filtering out system messages and function calls. When new messages - are detected, it emits a TranscriptionUpdateFrame containing only the new - messages. + This processor monitors OpenAILLMContextFrame frames and their corresponding + timestamp frames to build a chronological conversation transcript. Messages are + stored by role until their matching timestamp frame arrives, then emitted via + TranscriptionUpdateFrame. Each LLM context (OpenAI, Anthropic, Google) provides conversion to the standard format: [ @@ -39,8 +41,8 @@ class TranscriptProcessor(FrameProcessor): ] Events: - on_transcript_update: Emitted when new transcript messages are available. - Args: TranscriptionUpdateFrame containing new messages. + on_transcript_update: Emitted when timestamped messages are available. + Args: TranscriptionUpdateFrame containing timestamped messages. Example: ```python @@ -49,7 +51,7 @@ class TranscriptProcessor(FrameProcessor): @transcript_processor.event_handler("on_transcript_update") async def on_transcript_update(processor, frame): for msg in frame.messages: - print(f"{msg.role}: {msg.content}") + print(f"[{msg.timestamp}] {msg.role}: {msg.content}") ``` """ @@ -62,6 +64,8 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self._processed_messages: List[TranscriptionMessage] = [] self._register_event_handler("on_transcript_update") + self._pending_user_messages: List[TranscriptionMessage] = [] + self._pending_assistant_messages: List[TranscriptionMessage] = [] def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]: """Extract conversation messages from standard format. @@ -112,7 +116,16 @@ def _find_new_messages(self, current: List[TranscriptionMessage]) -> List[Transc return current[processed_len:] async def process_frame(self, frame: Frame, direction: FrameDirection): - """Process incoming frames, watching for OpenAILLMContextFrame. + """Process frames to build a timestamped conversation transcript. + + Handles three frame types in sequence: + 1. OpenAILLMContextFrame: Contains new messages to be timestamped + 2. OpenAILLMContextUserTimestampFrame: Timestamp for user messages + 3. OpenAILLMContextAssistantTimestampFrame: Timestamp for assistant messages + + Messages are stored by role until their corresponding timestamp frame arrives. + When a timestamp frame is received, the matching messages are timestamped and + emitted in chronological order via TranscriptionUpdateFrame. Args: frame: The frame to process @@ -124,27 +137,42 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, OpenAILLMContextFrame): - try: - # Convert context messages to standard format - standard_messages = [] - for msg in frame.context.messages: - converted = frame.context.to_standard_messages(msg) - standard_messages.extend(converted) - - # Extract and process messages - current_messages = self._extract_messages(standard_messages) - new_messages = self._find_new_messages(current_messages) - - if new_messages: - # Update state and notify listeners - self._processed_messages.extend(new_messages) - update_frame = TranscriptionUpdateFrame(messages=new_messages) - await self._call_event_handler("on_transcript_update", update_frame) - await self.push_frame(update_frame) - - except Exception as e: - logger.error(f"Error processing transcript in {self}: {e}") - await self.push_error(ErrorFrame(str(e))) - - # Always push the original frame downstream + # Extract and store messages by role + standard_messages = [] + for msg in frame.context.messages: + converted = frame.context.to_standard_messages(msg) + standard_messages.extend(converted) + + current_messages = self._extract_messages(standard_messages) + new_messages = self._find_new_messages(current_messages) + + # Store new messages by role + for msg in new_messages: + if msg.role == "user": + self._pending_user_messages.append(msg) + elif msg.role == "assistant": + self._pending_assistant_messages.append(msg) + + elif isinstance(frame, OpenAILLMContextUserTimestampFrame): + # Process pending user messages with timestamp + if self._pending_user_messages: + for msg in self._pending_user_messages: + msg.timestamp = frame.timestamp + self._processed_messages.extend(self._pending_user_messages) + update_frame = TranscriptionUpdateFrame(messages=self._pending_user_messages) + await self._call_event_handler("on_transcript_update", update_frame) + await self.push_frame(update_frame) + self._pending_user_messages = [] + + elif isinstance(frame, OpenAILLMContextAssistantTimestampFrame): + # Process pending assistant messages with timestamp + if self._pending_assistant_messages: + for msg in self._pending_assistant_messages: + msg.timestamp = frame.timestamp + self._processed_messages.extend(self._pending_assistant_messages) + update_frame = TranscriptionUpdateFrame(messages=self._pending_assistant_messages) + await self._call_event_handler("on_transcript_update", update_frame) + await self.push_frame(update_frame) + self._pending_assistant_messages = [] + await self.push_frame(frame, direction) diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index d8f485296..93cff6f9e 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -26,6 +26,7 @@ LLMFullResponseStartFrame, LLMMessagesFrame, LLMUpdateSettingsFrame, + OpenAILLMContextAssistantTimestampFrame, StartInterruptionFrame, TextFrame, UserImageRawFrame, @@ -43,6 +44,7 @@ ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService +from pipecat.utils.time import time_now_iso8601 try: from anthropic import NOT_GIVEN, AsyncAnthropic, NotGiven @@ -791,8 +793,13 @@ async def _push_aggregation(self): if run_llm: await self._user_context_aggregator.push_context_frame() + # Push context frame frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) + # Push timestamp frame with current time + timestamp_frame = OpenAILLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + except Exception as e: logger.error(f"Error processing frame: {e}") diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index 383dde624..c7d32eff3 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -23,6 +23,8 @@ LLMFullResponseStartFrame, LLMMessagesFrame, LLMUpdateSettingsFrame, + OpenAILLMContextAssistantTimestampFrame, + OpenAILLMContextUserTimestampFrame, TextFrame, TTSAudioRawFrame, TTSStartedFrame, @@ -41,6 +43,7 @@ OpenAIUserContextAggregator, ) from pipecat.transcriptions.language import Language +from pipecat.utils.time import time_now_iso8601 try: import google.ai.generativelanguage as glm @@ -227,9 +230,14 @@ async def _push_aggregation(self): # if the tasks gets cancelled we won't be able to clear things up. self._aggregation = "" + # Push context frame frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) + # Push timestamp frame with current time + timestamp_frame = OpenAILLMContextUserTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + # Reset our accumulator state. self._reset() @@ -300,9 +308,14 @@ async def _push_aggregation(self): if run_llm: await self._user_context_aggregator.push_context_frame() + # Push context frame frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) + # Push timestamp frame with current time + timestamp_frame = OpenAILLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + except Exception as e: logger.exception(f"Error processing frame: {e}") diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 85e1a95f0..159db8d2f 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -25,6 +25,7 @@ LLMFullResponseStartFrame, LLMMessagesFrame, LLMUpdateSettingsFrame, + OpenAILLMContextAssistantTimestampFrame, StartInterruptionFrame, TextFrame, TTSAudioRawFrame, @@ -46,6 +47,7 @@ ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import ImageGenService, LLMService, TTSService +from pipecat.utils.time import time_now_iso8601 try: from openai import ( @@ -597,8 +599,13 @@ async def _push_aggregation(self): if run_llm: await self._user_context_aggregator.push_context_frame() + # Push context frame frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) + # Push timestamp frame with current time + timestamp_frame = OpenAILLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + except Exception as e: logger.error(f"Error processing frame: {e}") From b5bd662fe16b7daff28d7e376714095098142beb Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 14 Dec 2024 12:08:38 -0500 Subject: [PATCH 06/10] Add changelog and rename examples --- CHANGELOG.md | 17 ++++++++++++++++- ...py => 28a-transcription-processor-openai.py} | 0 ...py => 28b-transcript-processor-anthropic.py} | 0 ...py => 28c-transcription-processor-gemini.py} | 0 4 files changed, 16 insertions(+), 1 deletion(-) rename examples/foundational/{28a-transcription-update-openai.py => 28a-transcription-processor-openai.py} (100%) rename examples/foundational/{28b-transcription-update-anthropic.py => 28b-transcript-processor-anthropic.py} (100%) rename examples/foundational/{28c-transcription-update-gemini.py => 28c-transcription-processor-gemini.py} (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78e069fc7..5ecba8686 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `audioop-lts` (https://github.com/AbstractUmbra/audioop) to provide the same functionality. -- Added support for more languages to ElevenLabs (Arabic, Croatian, Filipino, +- Added timestamped conversation transcript support: + + - New `TranscriptProcessor` factory provides access to user and assistant + transcript processors. + - `UserTranscriptProcessor` processes user speech with timestamps from + transcription. + - `AssistantTranscriptProcessor` processes assistant responses with LLM + context timestamps. + - Messages emitted with ISO 8601 timestamps indicating when they were spoken. + - Supports all LLM formats (OpenAI, Anthropic, Google) via standard message + format. + - New examples: `28a-transcription-processor-openai.py`, + `28b-transcription-processor-anthropic.py`, and + `28c-transcription-processor-gemini.py` + +- Add support for more languages to ElevenLabs (Arabic, Croatian, Filipino, Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian, Galician, Hebrew, Mandarin, Serbian, Tagalog, Urdu, Xhosa). diff --git a/examples/foundational/28a-transcription-update-openai.py b/examples/foundational/28a-transcription-processor-openai.py similarity index 100% rename from examples/foundational/28a-transcription-update-openai.py rename to examples/foundational/28a-transcription-processor-openai.py diff --git a/examples/foundational/28b-transcription-update-anthropic.py b/examples/foundational/28b-transcript-processor-anthropic.py similarity index 100% rename from examples/foundational/28b-transcription-update-anthropic.py rename to examples/foundational/28b-transcript-processor-anthropic.py diff --git a/examples/foundational/28c-transcription-update-gemini.py b/examples/foundational/28c-transcription-processor-gemini.py similarity index 100% rename from examples/foundational/28c-transcription-update-gemini.py rename to examples/foundational/28c-transcription-processor-gemini.py From 1f8a217cd1ef3da357cda2d968323d6648ca6560 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 16 Dec 2024 10:17:33 -0500 Subject: [PATCH 07/10] Code review changes --- CHANGELOG.md | 2 +- .../07a-interruptible-anthropic.py | 42 ++------------ .../28a-transcription-processor-openai.py | 2 +- .../28b-transcript-processor-anthropic.py | 2 +- src/pipecat/frames/frames.py | 30 +++++++++- .../aggregators/openai_llm_context.py | 58 ++++++------------- 6 files changed, 57 insertions(+), 79 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ecba8686..5e784cdba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 format. - New examples: `28a-transcription-processor-openai.py`, `28b-transcription-processor-anthropic.py`, and - `28c-transcription-processor-gemini.py` + `28c-transcription-processor-gemini.py`. - Add support for more languages to ElevenLabs (Arabic, Croatian, Filipino, Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian, diff --git a/examples/foundational/07a-interruptible-anthropic.py b/examples/foundational/07a-interruptible-anthropic.py index 25a301269..e7e680eab 100644 --- a/examples/foundational/07a-interruptible-anthropic.py +++ b/examples/foundational/07a-interruptible-anthropic.py @@ -7,7 +7,6 @@ import asyncio import os import sys -from typing import List import aiohttp from dotenv import load_dotenv @@ -15,13 +14,12 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import Frame, LLMMessagesFrame +from pipecat.frames.frames import LLMMessagesFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.anthropic import AnthropicLLMContext, AnthropicLLMService +from pipecat.services.anthropic import AnthropicLLMService from pipecat.services.cartesia import CartesiaTTSService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -31,28 +29,6 @@ logger.add(sys.stderr, level="DEBUG") -class TestAnthropicLLMService(AnthropicLLMService): - async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, LLMMessagesFrame): - logger.info("Original OpenAI format messages:") - logger.info(frame.messages) - - # Convert to Anthropic format - context = AnthropicLLMContext.from_messages(frame.messages) - logger.info("Converted to Anthropic format:") - logger.info(context.messages) - - # Convert back to OpenAI format - openai_messages = [] - for msg in context.messages: - converted = context.to_standard_messages(msg) - openai_messages.extend(converted) - logger.info("Converted back to OpenAI format:") - logger.info(openai_messages) - - await super().process_frame(frame, direction) - - async def main(): async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) @@ -74,24 +50,18 @@ async def main(): voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) - llm = TestAnthropicLLMService( + llm = AnthropicLLMService( api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-opus-20240229" ) - # Test messages including various formats + # todo: think more about how to handle system prompts in a more general way. OpenAI, + # Google, and Anthropic all have slightly different approaches to providing a system + # prompt. messages = [ { "role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way. Say hello.", }, - { - "role": "assistant", - "content": [ - {"type": "text", "text": "Hello! How can I help you today?"}, - {"type": "text", "text": "I'm ready to assist."}, - ], - }, - {"role": "user", "content": "Hi there!"}, ] context = OpenAILLMContext(messages) diff --git a/examples/foundational/28a-transcription-processor-openai.py b/examples/foundational/28a-transcription-processor-openai.py index 390343465..1e8463b69 100644 --- a/examples/foundational/28a-transcription-processor-openai.py +++ b/examples/foundational/28a-transcription-processor-openai.py @@ -127,7 +127,7 @@ async def on_transcript_update(processor, frame): async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) # Kick off the conversation. - await task.queue_frames([LLMMessagesFrame(messages)]) + await task.queue_frames([context_aggregator.user().get_context_frame()]) runner = PipelineRunner() diff --git a/examples/foundational/28b-transcript-processor-anthropic.py b/examples/foundational/28b-transcript-processor-anthropic.py index 1119efad2..626206c5f 100644 --- a/examples/foundational/28b-transcript-processor-anthropic.py +++ b/examples/foundational/28b-transcript-processor-anthropic.py @@ -127,7 +127,7 @@ async def on_transcript_update(processor, frame): async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) # Kick off the conversation. - await task.queue_frames([LLMMessagesFrame(messages)]) + await task.queue_frames([context_aggregator.user().get_context_frame()]) runner = PipelineRunner() diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index ab8a6f6ad..d02112a6f 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -5,7 +5,7 @@ # from dataclasses import dataclass, field -from typing import Any, List, Literal, Mapping, Optional, Tuple, TypeAlias +from typing import Any, List, Literal, Mapping, Optional, Tuple from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.clocks.base_clock import BaseClock @@ -240,6 +240,34 @@ class TranscriptionUpdateFrame(DataFrame): This frame is emitted when new messages are added to the conversation history, containing only the newly added messages rather than the full transcript. Messages have normalized roles (user/assistant) regardless of the LLM service used. + Messages are always in the OpenAI standard message format, which supports both: + + Simple format: + [ + { + "role": "user", + "content": "Hi, how are you?" + }, + { + "role": "assistant", + "content": "Great! And you?" + } + ] + + Content list format: + [ + { + "role": "user", + "content": [{"type": "text", "text": "Hi, how are you?"}] + }, + { + "role": "assistant", + "content": [{"type": "text", "text": "Great! And you?"}] + } + ] + + OpenAI supports both formats. Anthropic and Google messages are converted to the + content list format. """ messages: List[TranscriptionMessage] diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index 4adf76de0..853ac1baa 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -112,59 +112,39 @@ def get_messages_for_logging(self) -> str: msgs.append(msg) return json.dumps(msgs) - def from_standard_message(self, message) -> dict: - """Convert standard format message to OpenAI format. + def from_standard_message(self, message): + """Convert from OpenAI message format to OpenAI message format (passthrough). - Converts structured content back to OpenAI's simple string format. + OpenAI's format allows both simple string content and structured content: + - Simple: {"role": "user", "content": "Hello"} + - Structured: {"role": "user", "content": [{"type": "text", "text": "Hello"}]} + + Since OpenAI is our standard format, this is a passthrough function. Args: - message: Message in standard format: - { - "role": "user/assistant", - "content": [{"type": "text", "text": str}] - } + message (dict): Message in OpenAI format Returns: - Message in OpenAI format: - { - "role": "user/assistant", - "content": str - } + dict: Same message, unchanged """ - # If content is already a string, return as-is - if isinstance(message.get("content"), str): - return message - - # Convert structured content to string - if isinstance(message.get("content"), list): - text_parts = [] - for part in message["content"]: - if part.get("type") == "text": - text_parts.append(part["text"]) - - return {"role": message["role"], "content": " ".join(text_parts) if text_parts else ""} - return message def to_standard_messages(self, obj) -> list: - """Convert OpenAI message to standard structured format. + """Convert from OpenAI message format to OpenAI message format (passthrough). + + OpenAI's format is our standard format throughout Pipecat. This function + returns a list containing the original message to maintain consistency with + other LLM services that may need to return multiple messages. Args: - obj: Message in OpenAI format {"role": "user", "content": "text"} + obj (dict): Message in OpenAI format with either: + - Simple content: {"role": "user", "content": "Hello"} + - List content: {"role": "user", "content": [{"type": "text", "text": "Hello"}]} Returns: - List containing message with structured content: - [{"role": "user", "content": [{"type": "text", "text": "message"}]}] + list: List containing the original messages, preserving whether + the content was in simple string or structured list format """ - # Skip messages without content - if not obj.get("content"): - return [] - - # Convert simple string content to structured format - if isinstance(obj["content"], str): - return [{"role": obj["role"], "content": [{"type": "text", "text": obj["content"]}]}] - - # Return original message if content is already structured return [obj] def get_messages_for_initializing_history(self): From 4211664a77fb605f3d3a2bca2c86f3ec3b26aab0 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 16 Dec 2024 10:33:34 -0500 Subject: [PATCH 08/10] TranscriptProcessor to handle simple and list content --- src/pipecat/processors/transcript_processor.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index fcd4bfe52..be53cd79a 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -71,7 +71,9 @@ def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]: """Extract conversation messages from standard format. Args: - messages: List of messages in standard format with structured content + messages: List of messages in OpenAI format, which can be either: + - Simple format: {"role": "user", "content": "Hello"} + - Content list: {"role": "user", "content": [{"type": "text", "text": "Hello"}]} Returns: List[TranscriptionMessage]: Normalized conversation messages @@ -82,9 +84,17 @@ def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]: if msg["role"] not in ("user", "assistant"): continue - content = msg.get("content", []) - if isinstance(content, list): - # Extract text from structured content + if "content" not in msg: + logger.warning(f"Message missing content field: {msg}") + continue + + content = msg.get("content") + if isinstance(content, str): + # Handle simple string content + if content: + result.append(TranscriptionMessage(role=msg["role"], content=content)) + elif isinstance(content, list): + # Handle structured content text_parts = [] for part in content: if isinstance(part, dict) and part.get("type") == "text": From 1117c2148365caca2202bef57792438ba8cfb27e Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 16 Dec 2024 15:24:58 -0500 Subject: [PATCH 09/10] Refactor TranscriptProcessor into user and assistant processors --- CHANGELOG.md | 3 +- .../28a-transcription-processor-openai.py | 23 +- .../28b-transcript-processor-anthropic.py | 31 ++- .../28c-transcription-processor-gemini.py | 41 ++-- src/pipecat/frames/frames.py | 7 - .../processors/aggregators/llm_response.py | 6 - .../processors/transcript_processor.py | 221 +++++++++++------- src/pipecat/services/google.py | 5 - 8 files changed, 182 insertions(+), 155 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e784cdba..e3f2c2dc7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,9 +25,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Messages emitted with ISO 8601 timestamps indicating when they were spoken. - Supports all LLM formats (OpenAI, Anthropic, Google) via standard message format. + - Shared event handling for both user and assistant transcript updates. - New examples: `28a-transcription-processor-openai.py`, `28b-transcription-processor-anthropic.py`, and - `28c-transcription-processor-gemini.py`. + `28c-transcription-processor-gemini.py` - Add support for more languages to ElevenLabs (Arabic, Croatian, Filipino, Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian, diff --git a/examples/foundational/28a-transcription-processor-openai.py b/examples/foundational/28a-transcription-processor-openai.py index 1e8463b69..0966c882f 100644 --- a/examples/foundational/28a-transcription-processor-openai.py +++ b/examples/foundational/28a-transcription-processor-openai.py @@ -15,13 +15,14 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMMessagesFrame, TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -57,12 +58,6 @@ async def on_transcript_update( timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" logger.info(f"{timestamp}{msg.role}: {msg.content}") - # # Log the full transcript - # logger.info("Full transcript:") - # for msg in self.messages: - # timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - # logger.info(f"{timestamp}{msg.role}: {msg.content}") - async def main(): async with aiohttp.ClientSession() as session: @@ -70,16 +65,18 @@ async def main(): transport = DailyTransport( room_url, - token, + None, "Respond bot", DailyParams( audio_out_enabled=True, - transcription_enabled=True, vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, ), ) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady @@ -101,23 +98,25 @@ async def main(): context_aggregator = llm.create_context_aggregator(context) # Create transcript processor and handler - transcript_processor = TranscriptProcessor() + transcript = TranscriptProcessor() transcript_handler = TranscriptHandler() # Register event handler for transcript updates - @transcript_processor.event_handler("on_transcript_update") + @transcript.event_handler("on_transcript_update") async def on_transcript_update(processor, frame): await transcript_handler.on_transcript_update(processor, frame) pipeline = Pipeline( [ transport.input(), # Transport user input + stt, # STT + transcript.user(), # User transcripts context_aggregator.user(), # User responses llm, # LLM tts, # TTS transport.output(), # Transport bot output context_aggregator.assistant(), # Assistant spoken responses - transcript_processor, # Process transcripts + transcript.assistant(), # Assistant transcripts ] ) diff --git a/examples/foundational/28b-transcript-processor-anthropic.py b/examples/foundational/28b-transcript-processor-anthropic.py index 626206c5f..066828652 100644 --- a/examples/foundational/28b-transcript-processor-anthropic.py +++ b/examples/foundational/28b-transcript-processor-anthropic.py @@ -15,7 +15,7 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMMessagesFrame, TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -23,6 +23,7 @@ from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.services.anthropic import AnthropicLLMService from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.transports.services.daily import DailyParams, DailyTransport load_dotenv(override=True) @@ -57,12 +58,6 @@ async def on_transcript_update( timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" logger.info(f"{timestamp}{msg.role}: {msg.content}") - # # Log the full transcript - # logger.info("Full transcript:") - # for msg in self.messages: - # timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - # logger.info(f"{timestamp}{msg.role}: {msg.content}") - async def main(): async with aiohttp.ClientSession() as session: @@ -70,16 +65,18 @@ async def main(): transport = DailyTransport( room_url, - token, + None, "Respond bot", DailyParams( audio_out_enabled=True, - transcription_enabled=True, vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, ), ) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady @@ -101,23 +98,20 @@ async def main(): context_aggregator = llm.create_context_aggregator(context) # Create transcript processor and handler - transcript_processor = TranscriptProcessor() + transcript = TranscriptProcessor() transcript_handler = TranscriptHandler() - # Register event handler for transcript updates - @transcript_processor.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - await transcript_handler.on_transcript_update(processor, frame) - pipeline = Pipeline( [ transport.input(), # Transport user input + stt, # STT + transcript.user(), # User transcripts context_aggregator.user(), # User responses llm, # LLM tts, # TTS transport.output(), # Transport bot output context_aggregator.assistant(), # Assistant spoken responses - transcript_processor, # Process transcripts + transcript.assistant(), # Assistant transcripts ] ) @@ -129,6 +123,11 @@ async def on_first_participant_joined(transport, participant): # Kick off the conversation. await task.queue_frames([context_aggregator.user().get_context_frame()]) + # Register event handler for transcript updates + @transcript.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + await transcript_handler.on_transcript_update(processor, frame) + runner = PipelineRunner() await runner.run(task) diff --git a/examples/foundational/28c-transcription-processor-gemini.py b/examples/foundational/28c-transcription-processor-gemini.py index bf9448199..6c8118c57 100644 --- a/examples/foundational/28c-transcription-processor-gemini.py +++ b/examples/foundational/28c-transcription-processor-gemini.py @@ -22,6 +22,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.google import GoogleLLMService from pipecat.services.openai import OpenAILLMContext from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -58,12 +59,6 @@ async def on_transcript_update( timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" logger.info(f"{timestamp}{msg.role}: {msg.content}") - # # Log the full transcript - # logger.info("Full transcript:") - # for msg in self.messages: - # timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - # logger.info(f"{timestamp}{msg.role}: {msg.content}") - async def main(): async with aiohttp.ClientSession() as session: @@ -71,16 +66,18 @@ async def main(): transport = DailyTransport( room_url, - token, + None, "Respond bot", DailyParams( audio_out_enabled=True, - transcription_enabled=True, vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, ), ) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady @@ -104,23 +101,20 @@ async def main(): context_aggregator = llm.create_context_aggregator(context) # Create transcript processor and handler - transcript_processor = TranscriptProcessor() + transcript = TranscriptProcessor() transcript_handler = TranscriptHandler() - # Register event handler for transcript updates - @transcript_processor.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - await transcript_handler.on_transcript_update(processor, frame) - pipeline = Pipeline( [ - transport.input(), - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - transcript_processor, + transport.input(), # Transport user input + stt, # STT + transcript.user(), # User transcripts + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + transcript.assistant(), # Assistant transcripts ] ) @@ -139,6 +133,11 @@ async def on_first_participant_joined(transport, participant): # Kick off the conversation. await task.queue_frames([context_aggregator.user().get_context_frame()]) + # Register event handler for transcript updates + @transcript.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + await transcript_handler.on_transcript_update(processor, frame) + runner = PipelineRunner() await runner.run(task) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index d02112a6f..e9d942b4e 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -207,13 +207,6 @@ def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})" -@dataclass -class OpenAILLMContextUserTimestampFrame(DataFrame): - """Timestamp information for user message in LLM context.""" - - timestamp: str - - @dataclass class OpenAILLMContextAssistantTimestampFrame(DataFrame): """Timestamp information for assistant message in LLM context.""" diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 612375da2..479746471 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -15,7 +15,6 @@ LLMMessagesFrame, LLMMessagesUpdateFrame, LLMSetToolsFrame, - OpenAILLMContextUserTimestampFrame, StartInterruptionFrame, TextFrame, TranscriptionFrame, @@ -27,7 +26,6 @@ OpenAILLMContextFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.utils.time import time_now_iso8601 class LLMResponseAggregator(FrameProcessor): @@ -291,10 +289,6 @@ async def _push_aggregation(self): frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) - # Push timestamp frame with current time - timestamp_frame = OpenAILLMContextUserTimestampFrame(timestamp=time_now_iso8601()) - await self.push_frame(timestamp_frame) - # Reset our accumulator state. self._reset() diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index be53cd79a..a95e502a8 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -4,7 +4,8 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import List, Optional +from abc import ABC, abstractmethod +from typing import List from loguru import logger @@ -12,7 +13,7 @@ ErrorFrame, Frame, OpenAILLMContextAssistantTimestampFrame, - OpenAILLMContextUserTimestampFrame, + TranscriptionFrame, TranscriptionMessage, TranscriptionUpdateFrame, ) @@ -20,55 +21,72 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class TranscriptProcessor(FrameProcessor): - """Processes LLM context frames to generate timestamped conversation transcripts. +class BaseTranscriptProcessor(FrameProcessor, ABC): + """Base class for processing conversation transcripts. - This processor monitors OpenAILLMContextFrame frames and their corresponding - timestamp frames to build a chronological conversation transcript. Messages are - stored by role until their matching timestamp frame arrives, then emitted via - TranscriptionUpdateFrame. + Provides common functionality for handling transcript messages and updates. + """ - Each LLM context (OpenAI, Anthropic, Google) provides conversion to the standard format: - [ - { - "role": "user", - "content": [{"type": "text", "text": "Hi, how are you?"}] - }, - { - "role": "assistant", - "content": [{"type": "text", "text": "Great! And you?"}] - } - ] + def __init__(self, **kwargs): + """Initialize processor with empty message store.""" + super().__init__(**kwargs) + self._processed_messages: List[TranscriptionMessage] = [] + self._register_event_handler("on_transcript_update") - Events: - on_transcript_update: Emitted when timestamped messages are available. - Args: TranscriptionUpdateFrame containing timestamped messages. + async def _emit_update(self, messages: List[TranscriptionMessage]): + """Emit transcript updates for new messages. - Example: - ```python - transcript_processor = TranscriptProcessor() + Args: + messages: New messages to emit in update + """ + if messages: + self._processed_messages.extend(messages) + update_frame = TranscriptionUpdateFrame(messages=messages) + await self._call_event_handler("on_transcript_update", update_frame) + await self.push_frame(update_frame) - @transcript_processor.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - for msg in frame.messages: - print(f"[{msg.timestamp}] {msg.role}: {msg.content}") - ``` - """ + @abstractmethod + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process incoming frames to build conversation transcript. - def __init__(self, **kwargs): - """Initialize the transcript processor. + Args: + frame: Input frame to process + direction: Frame processing direction + """ + await super().process_frame(frame, direction) + + +class UserTranscriptProcessor(BaseTranscriptProcessor): + """Processes user transcription frames into timestamped conversation messages.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process TranscriptionFrames into user conversation messages. Args: - **kwargs: Additional arguments passed to FrameProcessor + frame: Input frame to process + direction: Frame processing direction """ + await super().process_frame(frame, direction) + + if isinstance(frame, TranscriptionFrame): + message = TranscriptionMessage( + role="user", content=frame.text, timestamp=frame.timestamp + ) + await self._emit_update([message]) + + await self.push_frame(frame, direction) + + +class AssistantTranscriptProcessor(BaseTranscriptProcessor): + """Processes assistant LLM context frames into timestamped conversation messages.""" + + def __init__(self, **kwargs): + """Initialize processor with empty message stores.""" super().__init__(**kwargs) - self._processed_messages: List[TranscriptionMessage] = [] - self._register_event_handler("on_transcript_update") - self._pending_user_messages: List[TranscriptionMessage] = [] self._pending_assistant_messages: List[TranscriptionMessage] = [] def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]: - """Extract conversation messages from standard format. + """Extract assistant messages from the OpenAI standard message format. Args: messages: List of messages in OpenAI format, which can be either: @@ -80,21 +98,14 @@ def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]: """ result = [] for msg in messages: - # Only process user and assistant messages - if msg["role"] not in ("user", "assistant"): - continue - - if "content" not in msg: - logger.warning(f"Message missing content field: {msg}") + if msg["role"] != "assistant": continue content = msg.get("content") if isinstance(content, str): - # Handle simple string content if content: - result.append(TranscriptionMessage(role=msg["role"], content=content)) + result.append(TranscriptionMessage(role="assistant", content=content)) elif isinstance(content, list): - # Handle structured content text_parts = [] for part in content: if isinstance(part, dict) and part.get("type") == "text": @@ -102,13 +113,13 @@ def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]: if text_parts: result.append( - TranscriptionMessage(role=msg["role"], content=" ".join(text_parts)) + TranscriptionMessage(role="assistant", content=" ".join(text_parts)) ) return result def _find_new_messages(self, current: List[TranscriptionMessage]) -> List[TranscriptionMessage]: - """Find messages in current that aren't in self._processed_messages. + """Find unprocessed messages from current list. Args: current: List of current messages @@ -126,28 +137,15 @@ def _find_new_messages(self, current: List[TranscriptionMessage]) -> List[Transc return current[processed_len:] async def process_frame(self, frame: Frame, direction: FrameDirection): - """Process frames to build a timestamped conversation transcript. - - Handles three frame types in sequence: - 1. OpenAILLMContextFrame: Contains new messages to be timestamped - 2. OpenAILLMContextUserTimestampFrame: Timestamp for user messages - 3. OpenAILLMContextAssistantTimestampFrame: Timestamp for assistant messages - - Messages are stored by role until their corresponding timestamp frame arrives. - When a timestamp frame is received, the matching messages are timestamped and - emitted in chronological order via TranscriptionUpdateFrame. + """Process frames into assistant conversation messages. Args: - frame: The frame to process + frame: Input frame to process direction: Frame processing direction - - Raises: - ErrorFrame: If message processing fails """ await super().process_frame(frame, direction) if isinstance(frame, OpenAILLMContextFrame): - # Extract and store messages by role standard_messages = [] for msg in frame.context.messages: converted = frame.context.to_standard_messages(msg) @@ -155,34 +153,83 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): current_messages = self._extract_messages(standard_messages) new_messages = self._find_new_messages(current_messages) - - # Store new messages by role - for msg in new_messages: - if msg.role == "user": - self._pending_user_messages.append(msg) - elif msg.role == "assistant": - self._pending_assistant_messages.append(msg) - - elif isinstance(frame, OpenAILLMContextUserTimestampFrame): - # Process pending user messages with timestamp - if self._pending_user_messages: - for msg in self._pending_user_messages: - msg.timestamp = frame.timestamp - self._processed_messages.extend(self._pending_user_messages) - update_frame = TranscriptionUpdateFrame(messages=self._pending_user_messages) - await self._call_event_handler("on_transcript_update", update_frame) - await self.push_frame(update_frame) - self._pending_user_messages = [] + self._pending_assistant_messages.extend(new_messages) elif isinstance(frame, OpenAILLMContextAssistantTimestampFrame): - # Process pending assistant messages with timestamp if self._pending_assistant_messages: for msg in self._pending_assistant_messages: msg.timestamp = frame.timestamp - self._processed_messages.extend(self._pending_assistant_messages) - update_frame = TranscriptionUpdateFrame(messages=self._pending_assistant_messages) - await self._call_event_handler("on_transcript_update", update_frame) - await self.push_frame(update_frame) + await self._emit_update(self._pending_assistant_messages) self._pending_assistant_messages = [] await self.push_frame(frame, direction) + + +class TranscriptProcessor: + """Factory for creating and managing transcript processors. + + Provides unified access to user and assistant transcript processors + with shared event handling. + + Example: + ```python + transcript = TranscriptProcessor() + + pipeline = Pipeline( + [ + transport.input(), + stt, + transcript.user(), # User transcripts + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + transcript.assistant(), # Assistant transcripts + ] + ) + + @transcript.event_handler("on_transcript_update") + async def handle_update(processor, frame): + print(f"New messages: {frame.messages}") + ``` + """ + + def __init__(self, **kwargs): + """Initialize factory with user and assistant processors.""" + self._user_processor = UserTranscriptProcessor(**kwargs) + self._assistant_processor = AssistantTranscriptProcessor(**kwargs) + self._event_handlers = {} + + def user(self) -> UserTranscriptProcessor: + """Get the user transcript processor.""" + return self._user_processor + + def assistant(self) -> AssistantTranscriptProcessor: + """Get the assistant transcript processor.""" + return self._assistant_processor + + def event_handler(self, event_name: str): + """Register event handler for both processors. + + Args: + event_name: Name of event to handle + + Returns: + Decorator function that registers handler with both processors + """ + + def decorator(handler): + self._event_handlers[event_name] = handler + + @self._user_processor.event_handler(event_name) + async def user_handler(processor, frame): + return await handler(processor, frame) + + @self._assistant_processor.event_handler(event_name) + async def assistant_handler(processor, frame): + return await handler(processor, frame) + + return handler + + return decorator diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index c7d32eff3..6bbf1d000 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -24,7 +24,6 @@ LLMMessagesFrame, LLMUpdateSettingsFrame, OpenAILLMContextAssistantTimestampFrame, - OpenAILLMContextUserTimestampFrame, TextFrame, TTSAudioRawFrame, TTSStartedFrame, @@ -234,10 +233,6 @@ async def _push_aggregation(self): frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) - # Push timestamp frame with current time - timestamp_frame = OpenAILLMContextUserTimestampFrame(timestamp=time_now_iso8601()) - await self.push_frame(timestamp_frame) - # Reset our accumulator state. self._reset() From 8a7a61914edbd80083d4215006b29f6c22cd4c39 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Tue, 17 Dec 2024 22:30:23 -0500 Subject: [PATCH 10/10] Code review feedback --- CHANGELOG.md | 3 +- .../processors/transcript_processor.py | 75 ++++++++++++------- 2 files changed, 47 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e3f2c2dc7..5e784cdba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,10 +25,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Messages emitted with ISO 8601 timestamps indicating when they were spoken. - Supports all LLM formats (OpenAI, Anthropic, Google) via standard message format. - - Shared event handling for both user and assistant transcript updates. - New examples: `28a-transcription-processor-openai.py`, `28b-transcription-processor-anthropic.py`, and - `28c-transcription-processor-gemini.py` + `28c-transcription-processor-gemini.py`. - Add support for more languages to ElevenLabs (Arabic, Croatian, Filipino, Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian, diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index a95e502a8..1e5e97c59 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -4,13 +4,9 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from abc import ABC, abstractmethod from typing import List -from loguru import logger - from pipecat.frames.frames import ( - ErrorFrame, Frame, OpenAILLMContextAssistantTimestampFrame, TranscriptionFrame, @@ -21,7 +17,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class BaseTranscriptProcessor(FrameProcessor, ABC): +class BaseTranscriptProcessor(FrameProcessor): """Base class for processing conversation transcripts. Provides common functionality for handling transcript messages and updates. @@ -45,16 +41,6 @@ async def _emit_update(self, messages: List[TranscriptionMessage]): await self._call_event_handler("on_transcript_update", update_frame) await self.push_frame(update_frame) - @abstractmethod - async def process_frame(self, frame: Frame, direction: FrameDirection): - """Process incoming frames to build conversation transcript. - - Args: - frame: Input frame to process - direction: Frame processing direction - """ - await super().process_frame(frame, direction) - class UserTranscriptProcessor(BaseTranscriptProcessor): """Processes user transcription frames into timestamped conversation messages.""" @@ -195,18 +181,44 @@ async def handle_update(processor, frame): ``` """ - def __init__(self, **kwargs): - """Initialize factory with user and assistant processors.""" - self._user_processor = UserTranscriptProcessor(**kwargs) - self._assistant_processor = AssistantTranscriptProcessor(**kwargs) + def __init__(self): + """Initialize factory.""" + self._user_processor = None + self._assistant_processor = None self._event_handlers = {} - def user(self) -> UserTranscriptProcessor: - """Get the user transcript processor.""" + def user(self, **kwargs) -> UserTranscriptProcessor: + """Get the user transcript processor. + + Args: + **kwargs: Arguments specific to UserTranscriptProcessor + """ + if self._user_processor is None: + self._user_processor = UserTranscriptProcessor(**kwargs) + # Apply any registered event handlers + for event_name, handler in self._event_handlers.items(): + + @self._user_processor.event_handler(event_name) + async def user_handler(processor, frame): + return await handler(processor, frame) + return self._user_processor - def assistant(self) -> AssistantTranscriptProcessor: - """Get the assistant transcript processor.""" + def assistant(self, **kwargs) -> AssistantTranscriptProcessor: + """Get the assistant transcript processor. + + Args: + **kwargs: Arguments specific to AssistantTranscriptProcessor + """ + if self._assistant_processor is None: + self._assistant_processor = AssistantTranscriptProcessor(**kwargs) + # Apply any registered event handlers + for event_name, handler in self._event_handlers.items(): + + @self._assistant_processor.event_handler(event_name) + async def assistant_handler(processor, frame): + return await handler(processor, frame) + return self._assistant_processor def event_handler(self, event_name: str): @@ -222,13 +234,18 @@ def event_handler(self, event_name: str): def decorator(handler): self._event_handlers[event_name] = handler - @self._user_processor.event_handler(event_name) - async def user_handler(processor, frame): - return await handler(processor, frame) + # Apply handler to existing processors if they exist + if self._user_processor: + + @self._user_processor.event_handler(event_name) + async def user_handler(processor, frame): + return await handler(processor, frame) + + if self._assistant_processor: - @self._assistant_processor.event_handler(event_name) - async def assistant_handler(processor, frame): - return await handler(processor, frame) + @self._assistant_processor.event_handler(event_name) + async def assistant_handler(processor, frame): + return await handler(processor, frame) return handler