From 0e803567e969a33e4b702688b6f0b7038acabd59 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 16 Dec 2024 10:17:33 -0500 Subject: [PATCH] Code review changes --- CHANGELOG.md | 4 +- .../07a-interruptible-anthropic.py | 42 ++------------ .../28a-transcription-processor-openai.py | 2 +- .../28b-transcript-processor-anthropic.py | 2 +- src/pipecat/frames/frames.py | 27 ++++++++- .../aggregators/openai_llm_context.py | 58 ++++++------------- 6 files changed, 55 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2822ff076..9f232ce4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,8 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Supports all LLM formats (OpenAI, Anthropic, Google) via standard message format. - New examples: `28a-transcription-processor-openai.py`, - `28a-transcription-processor-openai.py`, and - `28a-transcription-processor-openai.py`. + `28b-transcription-processor-anthropic.py`, and + `28c-transcription-processor-gemini.py`. - Add support for more languages to ElevenLabs (Arabic, Croatian, Filipino, Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian, diff --git a/examples/foundational/07a-interruptible-anthropic.py b/examples/foundational/07a-interruptible-anthropic.py index 25a301269..e7e680eab 100644 --- a/examples/foundational/07a-interruptible-anthropic.py +++ b/examples/foundational/07a-interruptible-anthropic.py @@ -7,7 +7,6 @@ import asyncio import os import sys -from typing import List import aiohttp from dotenv import load_dotenv @@ -15,13 +14,12 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import Frame, LLMMessagesFrame +from pipecat.frames.frames import LLMMessagesFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.anthropic import AnthropicLLMContext, AnthropicLLMService +from pipecat.services.anthropic import AnthropicLLMService from pipecat.services.cartesia import CartesiaTTSService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -31,28 +29,6 @@ logger.add(sys.stderr, level="DEBUG") -class TestAnthropicLLMService(AnthropicLLMService): - async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, LLMMessagesFrame): - logger.info("Original OpenAI format messages:") - logger.info(frame.messages) - - # Convert to Anthropic format - context = AnthropicLLMContext.from_messages(frame.messages) - logger.info("Converted to Anthropic format:") - logger.info(context.messages) - - # Convert back to OpenAI format - openai_messages = [] - for msg in context.messages: - converted = context.to_standard_messages(msg) - openai_messages.extend(converted) - logger.info("Converted back to OpenAI format:") - logger.info(openai_messages) - - await super().process_frame(frame, direction) - - async def main(): async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) @@ -74,24 +50,18 @@ async def main(): voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) - llm = TestAnthropicLLMService( + llm = AnthropicLLMService( api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-opus-20240229" ) - # Test messages including various formats + # todo: think more about how to handle system prompts in a more general way. OpenAI, + # Google, and Anthropic all have slightly different approaches to providing a system + # prompt. messages = [ { "role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way. Say hello.", }, - { - "role": "assistant", - "content": [ - {"type": "text", "text": "Hello! How can I help you today?"}, - {"type": "text", "text": "I'm ready to assist."}, - ], - }, - {"role": "user", "content": "Hi there!"}, ] context = OpenAILLMContext(messages) diff --git a/examples/foundational/28a-transcription-processor-openai.py b/examples/foundational/28a-transcription-processor-openai.py index 390343465..1e8463b69 100644 --- a/examples/foundational/28a-transcription-processor-openai.py +++ b/examples/foundational/28a-transcription-processor-openai.py @@ -127,7 +127,7 @@ async def on_transcript_update(processor, frame): async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) # Kick off the conversation. - await task.queue_frames([LLMMessagesFrame(messages)]) + await task.queue_frames([context_aggregator.user().get_context_frame()]) runner = PipelineRunner() diff --git a/examples/foundational/28b-transcript-processor-anthropic.py b/examples/foundational/28b-transcript-processor-anthropic.py index 1119efad2..626206c5f 100644 --- a/examples/foundational/28b-transcript-processor-anthropic.py +++ b/examples/foundational/28b-transcript-processor-anthropic.py @@ -127,7 +127,7 @@ async def on_transcript_update(processor, frame): async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) # Kick off the conversation. - await task.queue_frames([LLMMessagesFrame(messages)]) + await task.queue_frames([context_aggregator.user().get_context_frame()]) runner = PipelineRunner() diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index ab8a6f6ad..12b7a3549 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -5,7 +5,7 @@ # from dataclasses import dataclass, field -from typing import Any, List, Literal, Mapping, Optional, Tuple, TypeAlias +from typing import Any, List, Literal, Mapping, Optional, Tuple from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.clocks.base_clock import BaseClock @@ -240,6 +240,31 @@ class TranscriptionUpdateFrame(DataFrame): This frame is emitted when new messages are added to the conversation history, containing only the newly added messages rather than the full transcript. Messages have normalized roles (user/assistant) regardless of the LLM service used. + Messages are always in the OpenAI standard message format: + + OpenAI (simple format): + [ + { + "role": "user", + "content": "Hi, how are you?" + }, + { + "role": "assistant", + "content": "Great! And you?" + } + ] + + Anthropic & Google (content list): + [ + { + "role": "user", + "content": [{"type": "text", "text": "Hi, how are you?"}] + }, + { + "role": "assistant", + "content": [{"type": "text", "text": "Great! And you?"}] + } + ] """ messages: List[TranscriptionMessage] diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index 4adf76de0..853ac1baa 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -112,59 +112,39 @@ def get_messages_for_logging(self) -> str: msgs.append(msg) return json.dumps(msgs) - def from_standard_message(self, message) -> dict: - """Convert standard format message to OpenAI format. + def from_standard_message(self, message): + """Convert from OpenAI message format to OpenAI message format (passthrough). - Converts structured content back to OpenAI's simple string format. + OpenAI's format allows both simple string content and structured content: + - Simple: {"role": "user", "content": "Hello"} + - Structured: {"role": "user", "content": [{"type": "text", "text": "Hello"}]} + + Since OpenAI is our standard format, this is a passthrough function. Args: - message: Message in standard format: - { - "role": "user/assistant", - "content": [{"type": "text", "text": str}] - } + message (dict): Message in OpenAI format Returns: - Message in OpenAI format: - { - "role": "user/assistant", - "content": str - } + dict: Same message, unchanged """ - # If content is already a string, return as-is - if isinstance(message.get("content"), str): - return message - - # Convert structured content to string - if isinstance(message.get("content"), list): - text_parts = [] - for part in message["content"]: - if part.get("type") == "text": - text_parts.append(part["text"]) - - return {"role": message["role"], "content": " ".join(text_parts) if text_parts else ""} - return message def to_standard_messages(self, obj) -> list: - """Convert OpenAI message to standard structured format. + """Convert from OpenAI message format to OpenAI message format (passthrough). + + OpenAI's format is our standard format throughout Pipecat. This function + returns a list containing the original message to maintain consistency with + other LLM services that may need to return multiple messages. Args: - obj: Message in OpenAI format {"role": "user", "content": "text"} + obj (dict): Message in OpenAI format with either: + - Simple content: {"role": "user", "content": "Hello"} + - List content: {"role": "user", "content": [{"type": "text", "text": "Hello"}]} Returns: - List containing message with structured content: - [{"role": "user", "content": [{"type": "text", "text": "message"}]}] + list: List containing the original messages, preserving whether + the content was in simple string or structured list format """ - # Skip messages without content - if not obj.get("content"): - return [] - - # Convert simple string content to structured format - if isinstance(obj["content"], str): - return [{"role": obj["role"], "content": [{"type": "text", "text": obj["content"]}]}] - - # Return original message if content is already structured return [obj] def get_messages_for_initializing_history(self):