diff --git a/CHANGELOG.md b/CHANGELOG.md index 998066fba..ea9360155 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `audioop-lts` (https://github.com/AbstractUmbra/audioop) to provide the same functionality. -- Added support for more languages to ElevenLabs (Arabic, Croatian, Filipino, +- Added timestamped conversation transcript support: + + - New `TranscriptProcessor` factory provides access to user and assistant + transcript processors. + - `UserTranscriptProcessor` processes user speech with timestamps from + transcription. + - `AssistantTranscriptProcessor` processes assistant responses with LLM + context timestamps. + - Messages emitted with ISO 8601 timestamps indicating when they were spoken. + - Supports all LLM formats (OpenAI, Anthropic, Google) via standard message + format. + - New examples: `28a-transcription-processor-openai.py`, + `28b-transcription-processor-anthropic.py`, and + `28c-transcription-processor-gemini.py`. + +- Add support for more languages to ElevenLabs (Arabic, Croatian, Filipino, Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian, Galician, Hebrew, Mandarin, Serbian, Tagalog, Urdu, Xhosa). diff --git a/examples/foundational/28a-transcription-processor-openai.py b/examples/foundational/28a-transcription-processor-openai.py new file mode 100644 index 000000000..0966c882f --- /dev/null +++ b/examples/foundational/28a-transcription-processor-openai.py @@ -0,0 +1,137 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys +from typing import List + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.transcript_processor import TranscriptProcessor +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class TranscriptHandler: + """Simple handler to demonstrate transcript processing. + + Maintains a list of conversation messages and logs them with timestamps. + """ + + def __init__(self): + self.messages: List[TranscriptionMessage] = [] + + async def on_transcript_update( + self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame + ): + """Handle new transcript messages. + + Args: + processor: The TranscriptProcessor that emitted the update + frame: TranscriptionUpdateFrame containing new messages + """ + self.messages.extend(frame.messages) + + # Log the new messages + logger.info("New transcript messages:") + for msg in frame.messages: + timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + logger.info(f"{timestamp}{msg.role}: {msg.content}") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + None, + "Respond bot", + DailyParams( + audio_out_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o", + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way. Say hello.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + # Create transcript processor and handler + transcript = TranscriptProcessor() + transcript_handler = TranscriptHandler() + + # Register event handler for transcript updates + @transcript.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + await transcript_handler.on_transcript_update(processor, frame) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + transcript.user(), # User transcripts + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + transcript.assistant(), # Assistant transcripts + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/foundational/28b-transcript-processor-anthropic.py b/examples/foundational/28b-transcript-processor-anthropic.py new file mode 100644 index 000000000..066828652 --- /dev/null +++ b/examples/foundational/28b-transcript-processor-anthropic.py @@ -0,0 +1,137 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys +from typing import List + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.transcript_processor import TranscriptProcessor +from pipecat.services.anthropic import AnthropicLLMService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class TranscriptHandler: + """Simple handler to demonstrate transcript processing. + + Maintains a list of conversation messages and logs them with timestamps. + """ + + def __init__(self): + self.messages: List[TranscriptionMessage] = [] + + async def on_transcript_update( + self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame + ): + """Handle new transcript messages. + + Args: + processor: The TranscriptProcessor that emitted the update + frame: TranscriptionUpdateFrame containing new messages + """ + self.messages.extend(frame.messages) + + # Log the new messages + logger.info("New transcript messages:") + for msg in frame.messages: + timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + logger.info(f"{timestamp}{msg.role}: {msg.content}") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + None, + "Respond bot", + DailyParams( + audio_out_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20241022" + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way.", + }, + {"role": "user", "content": "Say hello."}, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + # Create transcript processor and handler + transcript = TranscriptProcessor() + transcript_handler = TranscriptHandler() + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + transcript.user(), # User transcripts + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + transcript.assistant(), # Assistant transcripts + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + # Register event handler for transcript updates + @transcript.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + await transcript_handler.on_transcript_update(processor, frame) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/foundational/28c-transcription-processor-gemini.py b/examples/foundational/28c-transcription-processor-gemini.py new file mode 100644 index 000000000..6c8118c57 --- /dev/null +++ b/examples/foundational/28c-transcription-processor-gemini.py @@ -0,0 +1,147 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys +from typing import List + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.transcript_processor import TranscriptProcessor +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.services.google import GoogleLLMService +from pipecat.services.openai import OpenAILLMContext +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class TranscriptHandler: + """Simple handler to demonstrate transcript processing. + + Maintains a list of conversation messages and logs them with timestamps. + """ + + def __init__(self): + self.messages: List[TranscriptionMessage] = [] + + async def on_transcript_update( + self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame + ): + """Handle new transcript messages. + + Args: + processor: The TranscriptProcessor that emitted the update + frame: TranscriptionUpdateFrame containing new messages + """ + self.messages.extend(frame.messages) + + # Log the new messages + logger.info("New transcript messages:") + for msg in frame.messages: + timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" + logger.info(f"{timestamp}{msg.role}: {msg.content}") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + None, + "Respond bot", + DailyParams( + audio_out_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + llm = GoogleLLMService( + model="models/gemini-2.0-flash-exp", + # model="gemini-exp-1114", + api_key=os.getenv("GOOGLE_API_KEY"), + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way.", + }, + {"role": "user", "content": "Say hello."}, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + # Create transcript processor and handler + transcript = TranscriptProcessor() + transcript_handler = TranscriptHandler() + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + transcript.user(), # User transcripts + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + transcript.assistant(), # Assistant transcripts + ] + ) + + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + # Register event handler for transcript updates + @transcript.event_handler("on_transcript_update") + async def on_transcript_update(processor, frame): + await transcript_handler.on_transcript_update(processor, frame) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index d3792f537..e9d942b4e 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -5,7 +5,7 @@ # from dataclasses import dataclass, field -from typing import Any, List, Mapping, Optional, Tuple +from typing import Any, List, Literal, Mapping, Optional, Tuple from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.clocks.base_clock import BaseClock @@ -195,7 +195,8 @@ def __str__(self): @dataclass class InterimTranscriptionFrame(TextFrame): """A text frame with interim transcription-specific data. Will be placed in - the transport's receive queue when a participant speaks.""" + the transport's receive queue when a participant speaks. + """ text: str user_id: str @@ -206,6 +207,69 @@ def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})" +@dataclass +class OpenAILLMContextAssistantTimestampFrame(DataFrame): + """Timestamp information for assistant message in LLM context.""" + + timestamp: str + + +@dataclass +class TranscriptionMessage: + """A message in a conversation transcript containing the role and content. + + Messages are in standard format with roles normalized to user/assistant. + """ + + role: Literal["user", "assistant"] + content: str + timestamp: str | None = None + + +@dataclass +class TranscriptionUpdateFrame(DataFrame): + """A frame containing new messages added to the conversation transcript. + + This frame is emitted when new messages are added to the conversation history, + containing only the newly added messages rather than the full transcript. + Messages have normalized roles (user/assistant) regardless of the LLM service used. + Messages are always in the OpenAI standard message format, which supports both: + + Simple format: + [ + { + "role": "user", + "content": "Hi, how are you?" + }, + { + "role": "assistant", + "content": "Great! And you?" + } + ] + + Content list format: + [ + { + "role": "user", + "content": [{"type": "text", "text": "Hi, how are you?"}] + }, + { + "role": "assistant", + "content": [{"type": "text", "text": "Great! And you?"}] + } + ] + + OpenAI supports both formats. Anthropic and Google messages are converted to the + content list format. + """ + + messages: List[TranscriptionMessage] + + def __str__(self): + pts = format_pts(self.pts) + return f"{self.name}(pts: {pts}, messages: {len(self.messages)})" + + @dataclass class LLMMessagesFrame(DataFrame): """A frame containing a list of LLM messages. Used to signal that an LLM @@ -546,7 +610,8 @@ class EndFrame(ControlFrame): @dataclass class LLMFullResponseStartFrame(ControlFrame): """Used to indicate the beginning of an LLM response. Following by one or - more TextFrame and a final LLMFullResponseEndFrame.""" + more TextFrame and a final LLMFullResponseEndFrame. + """ pass diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index 6e7474c17..853ac1baa 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -113,10 +113,38 @@ def get_messages_for_logging(self) -> str: return json.dumps(msgs) def from_standard_message(self, message): + """Convert from OpenAI message format to OpenAI message format (passthrough). + + OpenAI's format allows both simple string content and structured content: + - Simple: {"role": "user", "content": "Hello"} + - Structured: {"role": "user", "content": [{"type": "text", "text": "Hello"}]} + + Since OpenAI is our standard format, this is a passthrough function. + + Args: + message (dict): Message in OpenAI format + + Returns: + dict: Same message, unchanged + """ return message - # convert a message in this LLM's format to one or more messages in OpenAI format def to_standard_messages(self, obj) -> list: + """Convert from OpenAI message format to OpenAI message format (passthrough). + + OpenAI's format is our standard format throughout Pipecat. This function + returns a list containing the original message to maintain consistency with + other LLM services that may need to return multiple messages. + + Args: + obj (dict): Message in OpenAI format with either: + - Simple content: {"role": "user", "content": "Hello"} + - List content: {"role": "user", "content": [{"type": "text", "text": "Hello"}]} + + Returns: + list: List containing the original messages, preserving whether + the content was in simple string or structured list format + """ return [obj] def get_messages_for_initializing_history(self): diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py new file mode 100644 index 000000000..1e5e97c59 --- /dev/null +++ b/src/pipecat/processors/transcript_processor.py @@ -0,0 +1,252 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from typing import List + +from pipecat.frames.frames import ( + Frame, + OpenAILLMContextAssistantTimestampFrame, + TranscriptionFrame, + TranscriptionMessage, + TranscriptionUpdateFrame, +) +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class BaseTranscriptProcessor(FrameProcessor): + """Base class for processing conversation transcripts. + + Provides common functionality for handling transcript messages and updates. + """ + + def __init__(self, **kwargs): + """Initialize processor with empty message store.""" + super().__init__(**kwargs) + self._processed_messages: List[TranscriptionMessage] = [] + self._register_event_handler("on_transcript_update") + + async def _emit_update(self, messages: List[TranscriptionMessage]): + """Emit transcript updates for new messages. + + Args: + messages: New messages to emit in update + """ + if messages: + self._processed_messages.extend(messages) + update_frame = TranscriptionUpdateFrame(messages=messages) + await self._call_event_handler("on_transcript_update", update_frame) + await self.push_frame(update_frame) + + +class UserTranscriptProcessor(BaseTranscriptProcessor): + """Processes user transcription frames into timestamped conversation messages.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process TranscriptionFrames into user conversation messages. + + Args: + frame: Input frame to process + direction: Frame processing direction + """ + await super().process_frame(frame, direction) + + if isinstance(frame, TranscriptionFrame): + message = TranscriptionMessage( + role="user", content=frame.text, timestamp=frame.timestamp + ) + await self._emit_update([message]) + + await self.push_frame(frame, direction) + + +class AssistantTranscriptProcessor(BaseTranscriptProcessor): + """Processes assistant LLM context frames into timestamped conversation messages.""" + + def __init__(self, **kwargs): + """Initialize processor with empty message stores.""" + super().__init__(**kwargs) + self._pending_assistant_messages: List[TranscriptionMessage] = [] + + def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]: + """Extract assistant messages from the OpenAI standard message format. + + Args: + messages: List of messages in OpenAI format, which can be either: + - Simple format: {"role": "user", "content": "Hello"} + - Content list: {"role": "user", "content": [{"type": "text", "text": "Hello"}]} + + Returns: + List[TranscriptionMessage]: Normalized conversation messages + """ + result = [] + for msg in messages: + if msg["role"] != "assistant": + continue + + content = msg.get("content") + if isinstance(content, str): + if content: + result.append(TranscriptionMessage(role="assistant", content=content)) + elif isinstance(content, list): + text_parts = [] + for part in content: + if isinstance(part, dict) and part.get("type") == "text": + text_parts.append(part["text"]) + + if text_parts: + result.append( + TranscriptionMessage(role="assistant", content=" ".join(text_parts)) + ) + + return result + + def _find_new_messages(self, current: List[TranscriptionMessage]) -> List[TranscriptionMessage]: + """Find unprocessed messages from current list. + + Args: + current: List of current messages + + Returns: + List[TranscriptionMessage]: New messages not yet processed + """ + if not self._processed_messages: + return current + + processed_len = len(self._processed_messages) + if len(current) <= processed_len: + return [] + + return current[processed_len:] + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process frames into assistant conversation messages. + + Args: + frame: Input frame to process + direction: Frame processing direction + """ + await super().process_frame(frame, direction) + + if isinstance(frame, OpenAILLMContextFrame): + standard_messages = [] + for msg in frame.context.messages: + converted = frame.context.to_standard_messages(msg) + standard_messages.extend(converted) + + current_messages = self._extract_messages(standard_messages) + new_messages = self._find_new_messages(current_messages) + self._pending_assistant_messages.extend(new_messages) + + elif isinstance(frame, OpenAILLMContextAssistantTimestampFrame): + if self._pending_assistant_messages: + for msg in self._pending_assistant_messages: + msg.timestamp = frame.timestamp + await self._emit_update(self._pending_assistant_messages) + self._pending_assistant_messages = [] + + await self.push_frame(frame, direction) + + +class TranscriptProcessor: + """Factory for creating and managing transcript processors. + + Provides unified access to user and assistant transcript processors + with shared event handling. + + Example: + ```python + transcript = TranscriptProcessor() + + pipeline = Pipeline( + [ + transport.input(), + stt, + transcript.user(), # User transcripts + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + transcript.assistant(), # Assistant transcripts + ] + ) + + @transcript.event_handler("on_transcript_update") + async def handle_update(processor, frame): + print(f"New messages: {frame.messages}") + ``` + """ + + def __init__(self): + """Initialize factory.""" + self._user_processor = None + self._assistant_processor = None + self._event_handlers = {} + + def user(self, **kwargs) -> UserTranscriptProcessor: + """Get the user transcript processor. + + Args: + **kwargs: Arguments specific to UserTranscriptProcessor + """ + if self._user_processor is None: + self._user_processor = UserTranscriptProcessor(**kwargs) + # Apply any registered event handlers + for event_name, handler in self._event_handlers.items(): + + @self._user_processor.event_handler(event_name) + async def user_handler(processor, frame): + return await handler(processor, frame) + + return self._user_processor + + def assistant(self, **kwargs) -> AssistantTranscriptProcessor: + """Get the assistant transcript processor. + + Args: + **kwargs: Arguments specific to AssistantTranscriptProcessor + """ + if self._assistant_processor is None: + self._assistant_processor = AssistantTranscriptProcessor(**kwargs) + # Apply any registered event handlers + for event_name, handler in self._event_handlers.items(): + + @self._assistant_processor.event_handler(event_name) + async def assistant_handler(processor, frame): + return await handler(processor, frame) + + return self._assistant_processor + + def event_handler(self, event_name: str): + """Register event handler for both processors. + + Args: + event_name: Name of event to handle + + Returns: + Decorator function that registers handler with both processors + """ + + def decorator(handler): + self._event_handlers[event_name] = handler + + # Apply handler to existing processors if they exist + if self._user_processor: + + @self._user_processor.event_handler(event_name) + async def user_handler(processor, frame): + return await handler(processor, frame) + + if self._assistant_processor: + + @self._assistant_processor.event_handler(event_name) + async def assistant_handler(processor, frame): + return await handler(processor, frame) + + return handler + + return decorator diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index f0c033375..93cff6f9e 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -26,6 +26,7 @@ LLMFullResponseStartFrame, LLMMessagesFrame, LLMUpdateSettingsFrame, + OpenAILLMContextAssistantTimestampFrame, StartInterruptionFrame, TextFrame, UserImageRawFrame, @@ -43,6 +44,7 @@ ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService +from pipecat.utils.time import time_now_iso8601 try: from anthropic import NOT_GIVEN, AsyncAnthropic, NotGiven @@ -378,6 +380,26 @@ def set_messages(self, messages: List): # convert a message in Anthropic format into one or more messages in OpenAI format def to_standard_messages(self, obj): + """Convert Anthropic message format to standard structured format. + + Handles text content and function calls for both user and assistant messages. + + Args: + obj: Message in Anthropic format: + { + "role": "user/assistant", + "content": str | [{"type": "text/tool_use/tool_result", ...}] + } + + Returns: + List of messages in standard format: + [ + { + "role": "user/assistant/tool", + "content": [{"type": "text", "text": str}] + } + ] + """ # todo: image format (?) # tool_use role = obj.get("role") @@ -432,6 +454,30 @@ def to_standard_messages(self, obj): return messages def from_standard_message(self, message): + """Convert standard format message to Anthropic format. + + Handles conversion of text content, tool calls, and tool results. + Empty text content is converted to "(empty)". + + Args: + message: Message in standard format: + { + "role": "user/assistant/tool", + "content": str | [{"type": "text", ...}], + "tool_calls": [{"id": str, "function": {"name": str, "arguments": str}}] + } + + Returns: + Message in Anthropic format: + { + "role": "user/assistant", + "content": str | [ + {"type": "text", "text": str} | + {"type": "tool_use", "id": str, "name": str, "input": dict} | + {"type": "tool_result", "tool_use_id": str, "content": str} + ] + } + """ # todo: image messages (?) if message["role"] == "tool": return { @@ -747,8 +793,13 @@ async def _push_aggregation(self): if run_llm: await self._user_context_aggregator.push_context_frame() + # Push context frame frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) + # Push timestamp frame with current time + timestamp_frame = OpenAILLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + except Exception as e: logger.error(f"Error processing frame: {e}") diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index 5442ee91c..6bbf1d000 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -23,6 +23,7 @@ LLMFullResponseStartFrame, LLMMessagesFrame, LLMUpdateSettingsFrame, + OpenAILLMContextAssistantTimestampFrame, TextFrame, TTSAudioRawFrame, TTSStartedFrame, @@ -41,6 +42,7 @@ OpenAIUserContextAggregator, ) from pipecat.transcriptions.language import Language +from pipecat.utils.time import time_now_iso8601 try: import google.ai.generativelanguage as glm @@ -227,6 +229,7 @@ async def _push_aggregation(self): # if the tasks gets cancelled we won't be able to clear things up. self._aggregation = "" + # Push context frame frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) @@ -300,9 +303,14 @@ async def _push_aggregation(self): if run_llm: await self._user_context_aggregator.push_context_frame() + # Push context frame frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) + # Push timestamp frame with current time + timestamp_frame = OpenAILLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + except Exception as e: logger.exception(f"Error processing frame: {e}") @@ -412,6 +420,25 @@ def add_audio_frames_message(self, *, audio_frames: list[AudioRawFrame], text: s # self.add_message(message) def from_standard_message(self, message): + """Convert standard format message to Google Content object. + + Handles conversion of text, images, and function calls to Google's format. + System messages are stored separately and return None. + + Args: + message: Message in standard format: + { + "role": "user/assistant/system/tool", + "content": str | [{"type": "text/image_url", ...}] | None, + "tool_calls": [{"function": {"name": str, "arguments": str}}] + } + + Returns: + glm.Content object with: + - role: "user" or "model" (converted from "assistant") + - parts: List[Part] containing text, inline_data, or function calls + Returns None for system messages. + """ role = message["role"] content = message.get("content", []) if role == "system": @@ -461,6 +488,27 @@ def from_standard_message(self, message): return message def to_standard_messages(self, obj) -> list: + """Convert Google Content object to standard structured format. + + Handles text, images, and function calls from Google's Content/Part objects. + + Args: + obj: Google Content object with: + - role: "model" (converted to "assistant") or "user" + - parts: List[Part] containing text, inline_data, or function calls + + Returns: + List of messages in standard format: + [ + { + "role": "user/assistant/tool", + "content": [ + {"type": "text", "text": str} | + {"type": "image_url", "image_url": {"url": str}} + ] + } + ] + """ msg = {"role": obj.role, "content": []} if msg["role"] == "model": msg["role"] = "assistant" diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 85e1a95f0..159db8d2f 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -25,6 +25,7 @@ LLMFullResponseStartFrame, LLMMessagesFrame, LLMUpdateSettingsFrame, + OpenAILLMContextAssistantTimestampFrame, StartInterruptionFrame, TextFrame, TTSAudioRawFrame, @@ -46,6 +47,7 @@ ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import ImageGenService, LLMService, TTSService +from pipecat.utils.time import time_now_iso8601 try: from openai import ( @@ -597,8 +599,13 @@ async def _push_aggregation(self): if run_llm: await self._user_context_aggregator.push_context_frame() + # Push context frame frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) + # Push timestamp frame with current time + timestamp_frame = OpenAILLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + except Exception as e: logger.error(f"Error processing frame: {e}")