From a4edb3dab119bedb4d0ae1771a0b5b2f2f1661ca Mon Sep 17 00:00:00 2001 From: mattie ruth backman Date: Tue, 17 Sep 2024 14:49:08 -0400 Subject: [PATCH] Cleanup on aisle METRICS. Note: See below, this is a breaking change 1. Fleshed out MetricsFrames and broke it into a proper set of types 2. Add model_name as a property to the AIService so that it can be automatically included in metrics and also remove that overhead from all the various services themselves Breaking change! Because of the types improvements, the MetricsFrame type has changed. Each frame will have a list of metrics simlilar to before except each item in the list will only contain one type of metric: "ttfb", "tokens", "characters", or "processing". Previously these fields would be in every entry but set to None if they didn't apply. While this changes internal handling of the MetricsFrame, it does NOT break the RTVI/daily messaging of metrics. That format remains the same. Also. Remember to use model_name for accessing a service's current model and set_model_name for setting it. --- .../foundational/06-listen-and-respond.py | 16 ++++- src/pipecat/frames/frames.py | 9 ++- src/pipecat/metrics/__init__.py | 0 src/pipecat/metrics/metrics.py | 31 +++++++++ src/pipecat/pipeline/task.py | 9 ++- src/pipecat/processors/frame_processor.py | 67 ++++++++++++------- src/pipecat/services/ai_services.py | 14 +++- src/pipecat/services/anthropic.py | 23 +++---- src/pipecat/services/azure.py | 4 +- src/pipecat/services/cartesia.py | 17 +++-- src/pipecat/services/deepgram.py | 1 + src/pipecat/services/elevenlabs.py | 6 +- src/pipecat/services/fal.py | 4 +- src/pipecat/services/fireworks.py | 2 +- src/pipecat/services/google.py | 1 + src/pipecat/services/moondream.py | 4 +- src/pipecat/services/openai.py | 27 ++++---- src/pipecat/services/openpipe.py | 2 +- src/pipecat/services/together.py | 21 +++--- src/pipecat/services/whisper.py | 4 +- src/pipecat/transports/services/daily.py | 26 ++++--- 21 files changed, 190 insertions(+), 98 deletions(-) create mode 100644 src/pipecat/metrics/__init__.py create mode 100644 src/pipecat/metrics/metrics.py diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index e99a95068..6a10f927c 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -10,6 +10,7 @@ import sys from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame +from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData, LLMUsageMetricsData, TTSUsageMetricsData from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -37,8 +38,19 @@ class MetricsLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, MetricsFrame): - print( - f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}") + for d in frame.data: + if isinstance(d, TTFBMetricsData): + print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}") + elif isinstance(d, ProcessingMetricsData): + print(f"!!! MetricsFrame: {frame}, processing: {d.value}") + elif isinstance(d, LLMUsageMetricsData): + tokens = d.value + print( + f"!!! MetricsFrame: {frame}, tokens: { + tokens.prompt_tokens}, characters: { + tokens.completion_tokens}") + elif isinstance(d, TTSUsageMetricsData): + print(f"!!! MetricsFrame: {frame}, characters: {d.value}") await self.push_frame(frame, direction) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 4d207fecd..adb3f88c6 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -4,11 +4,12 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import Any, List, Mapping, Optional, Tuple +from typing import Any, List, Optional, Tuple from dataclasses import dataclass, field from pipecat.clocks.base_clock import BaseClock +from pipecat.metrics.metrics import MetricsData from pipecat.transcriptions.language import Language from pipecat.utils.time import nanoseconds_to_str from pipecat.utils.utils import obj_count, obj_id @@ -333,10 +334,8 @@ class BotInterruptionFrame(SystemFrame): class MetricsFrame(SystemFrame): """Emitted by processor that can compute metrics like latencies. """ - ttfb: List[Mapping[str, Any]] | None = None - processing: List[Mapping[str, Any]] | None = None - tokens: List[Mapping[str, Any]] | None = None - characters: List[Mapping[str, Any]] | None = None + data: List[MetricsData] + # # Control frames diff --git a/src/pipecat/metrics/__init__.py b/src/pipecat/metrics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/pipecat/metrics/metrics.py b/src/pipecat/metrics/metrics.py new file mode 100644 index 000000000..053708998 --- /dev/null +++ b/src/pipecat/metrics/metrics.py @@ -0,0 +1,31 @@ +from typing import Optional +from pydantic import BaseModel + + +class MetricsData(BaseModel): + processor: str + model: Optional[str] = None + + +class TTFBMetricsData(MetricsData): + value: float + + +class ProcessingMetricsData(MetricsData): + value: float + + +class LLMTokenUsage(BaseModel): + prompt_tokens: int + completion_tokens: int + total_tokens: int + cache_read_input_tokens: Optional[int] = None + cache_creation_input_tokens: Optional[int] = None + + +class LLMUsageMetricsData(MetricsData): + value: LLMTokenUsage + + +class TTSUsageMetricsData(MetricsData): + value: int diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 03fd5c734..26e6e9f4f 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -20,6 +20,7 @@ MetricsFrame, StartFrame, StopTaskFrame) +from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.utils.utils import obj_count, obj_id @@ -118,9 +119,11 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): def _initial_metrics_frame(self) -> MetricsFrame: processors = self._pipeline.processors_with_metrics() - ttfb = [{"processor": p.name, "value": 0.0} for p in processors] - processing = [{"processor": p.name, "value": 0.0} for p in processors] - return MetricsFrame(ttfb=ttfb, processing=processing) + data = [] + for p in processors: + data.append(TTFBMetricsData(processor=p.name, value=0.0)) + data.append(ProcessingMetricsData(processor=p.name, value=0.0)) + return MetricsFrame(data=data) async def _process_down_queue(self): self._clock.start() diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 72924776c..e44b8b0ff 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -19,6 +19,13 @@ StartInterruptionFrame, StopInterruptionFrame, SystemFrame) +from pipecat.metrics.metrics import ( + LLMTokenUsage, + LLMUsageMetricsData, + MetricsData, + ProcessingMetricsData, + TTFBMetricsData, + TTSUsageMetricsData) from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -31,11 +38,20 @@ class FrameDirection(Enum): class FrameProcessorMetrics: def __init__(self, name: str): - self._name = name + self._core_metrics_data = MetricsData(processor=name) self._start_ttfb_time = 0 self._start_processing_time = 0 self._should_report_ttfb = True + def _processor_name(self): + return self._core_metrics_data.processor + + def _model_name(self): + return self._core_metrics_data.model + + def set_core_metrics_data(self, data: MetricsData): + self._core_metrics_data = data + async def start_ttfb_metrics(self, report_only_initial_ttfb): if self._should_report_ttfb: self._start_ttfb_time = time.time() @@ -46,13 +62,13 @@ async def stop_ttfb_metrics(self): return None value = time.time() - self._start_ttfb_time - logger.debug(f"{self._name} TTFB: {value}") - ttfb = { - "processor": self._name, - "value": value - } + logger.debug(f"{self._processor_name()} TTFB: {value}") + ttfb = TTFBMetricsData( + processor=self._processor_name(), + value=value, + model=self._model_name()) self._start_ttfb_time = 0 - return MetricsFrame(ttfb=[ttfb]) + return MetricsFrame(data=[ttfb]) async def start_processing_metrics(self): self._start_processing_time = time.time() @@ -62,26 +78,28 @@ async def stop_processing_metrics(self): return None value = time.time() - self._start_processing_time - logger.debug(f"{self._name} processing time: {value}") - processing = { - "processor": self._name, - "value": value - } + logger.debug(f"{self._processor_name()} processing time: {value}") + processing = ProcessingMetricsData( + processor=self._processor_name(), value=value, model=self._model_name()) self._start_processing_time = 0 - return MetricsFrame(processing=[processing]) + return MetricsFrame(data=[processing]) - async def start_llm_usage_metrics(self, tokens: dict): + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): logger.debug( - f"{self._name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}") - return MetricsFrame(tokens=[tokens]) + f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}") + value = LLMUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=tokens) + return MetricsFrame(data=[value]) async def start_tts_usage_metrics(self, text: str): - characters = { - "processor": self._name, - "value": len(text), - } - logger.debug(f"{self._name} usage characters: {characters['value']}") - return MetricsFrame(characters=[characters]) + characters = TTSUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=len(text)) + logger.debug(f"{self._processor_name()} usage characters: {characters.value}") + return MetricsFrame(data=[characters]) class FrameProcessor: @@ -140,6 +158,9 @@ def report_only_initial_ttfb(self): def can_generate_metrics(self) -> bool: return False + def set_core_metrics_data(self, data: MetricsData): + self._metrics.set_core_metrics_data(data) + async def start_ttfb_metrics(self): if self.can_generate_metrics() and self.metrics_enabled: await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb) @@ -160,7 +181,7 @@ async def stop_processing_metrics(self): if frame: await self.push_frame(frame) - async def start_llm_usage_metrics(self, tokens: dict): + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): if self.can_generate_metrics() and self.usage_metrics_enabled: frame = await self._metrics.start_llm_usage_metrics(tokens) if frame: diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index d91782e42..0207b0511 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -32,6 +32,7 @@ UserImageRequestFrame, VisionImageRawFrame ) +from pipecat.metrics.metrics import MetricsData from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.utils.audio import calculate_audio_volume @@ -46,6 +47,15 @@ class AIService(FrameProcessor): def __init__(self, **kwargs): super().__init__(**kwargs) + self._model_name: str = "" + + @property + def model_name(self) -> str: + return self._model_name + + def set_model_name(self, model: str): + self._model_name = model + self.set_core_metrics_data(MetricsData(processor=self.name, model=self._model_name)) async def start(self, frame: StartFrame): pass @@ -158,7 +168,7 @@ def sample_rate(self) -> int: @abstractmethod async def set_model(self, model: str): - pass + self.set_model_name(model) @abstractmethod async def set_voice(self, voice: str): @@ -367,7 +377,7 @@ def __init__(self, **kwargs): @abstractmethod async def set_model(self, model: str): - pass + self.set_model_name(model) @abstractmethod async def set_language(self, language: Language): diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index 329959b1f..7935691ce 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -29,6 +29,7 @@ FunctionCallInProgressFrame, StartInterruptionFrame ) +from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService from pipecat.processors.aggregators.openai_llm_context import ( @@ -84,7 +85,7 @@ def __init__( **kwargs): super().__init__(**kwargs) self._client = AsyncAnthropic(api_key=api_key) - self._model = model + self.set_model_name(model) self._max_tokens = max_tokens self._enable_prompt_caching_beta = enable_prompt_caching_beta @@ -137,7 +138,7 @@ async def _process_context(self, context: OpenAILLMContext): tools=context.tools or [], system=context.system, messages=messages, - model=self._model, + model=self.model_name, max_tokens=self._max_tokens, stream=True) @@ -231,7 +232,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context = AnthropicLLMContext.from_image_frame(frame) elif isinstance(frame, LLMModelUpdateFrame): logger.debug(f"Switching LLM model to: [{frame.model}]") - self._model = frame.model + self.set_model_name(frame.model) elif isinstance(frame, LLMEnablePromptCachingFrame): logger.debug(f"Setting enable prompt caching to: [{frame.enable}]") self._enable_prompt_caching_beta = frame.enable @@ -251,15 +252,13 @@ async def _report_usage_metrics( cache_creation_input_tokens: int, cache_read_input_tokens: int): if prompt_tokens or completion_tokens or cache_creation_input_tokens or cache_read_input_tokens: - tokens = { - "processor": self.name, - "model": self._model, - "prompt_tokens": prompt_tokens, - "completion_tokens": completion_tokens, - "cache_creation_input_tokens": cache_creation_input_tokens, - "cache_read_input_tokens": cache_read_input_tokens, - "total_tokens": prompt_tokens + completion_tokens - } + tokens = LLMTokenUsage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + cache_creation_input_tokens=cache_creation_input_tokens, + cache_read_input_tokens=cache_read_input_tokens, + total_tokens=prompt_tokens + completion_tokens + ) await self.start_llm_usage_metrics(tokens) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index 90674fcc4..daac57119 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -22,6 +22,8 @@ TTSStoppedFrame, TranscriptionFrame, URLImageRawFrame) +from pipecat.metrics.metrics import TTSUsageMetricsData +from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import STTService, TTSService, ImageGenService from pipecat.services.openai import BaseOpenAILLMService from pipecat.utils.time import time_now_iso8601 @@ -190,7 +192,7 @@ def __init__( self._api_key = api_key self._azure_endpoint = endpoint self._api_version = api_version - self._model = model + self.set_model_name(model) self._image_size = image_size self._aiohttp_session = aiohttp_session diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 078926235..ab6026052 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -89,7 +89,7 @@ def __init__( self._cartesia_version = cartesia_version self._url = url self._voice_id = voice_id - self._model_id = model_id + self.set_model_name(model_id) self._output_format = { "container": "raw", "encoding": encoding, @@ -105,8 +105,8 @@ def can_generate_metrics(self) -> bool: return True async def set_model(self, model: str): + await super().set_model(model) logger.debug(f"Switching TTS model to: [{model}]") - self._model_id = model async def set_voice(self, voice: str): logger.debug(f"Switching TTS voice to: [{voice}]") @@ -155,6 +155,11 @@ async def _disconnect(self): except Exception as e: logger.error(f"{self} error closing websocket: {e}") + def _get_websocket(self): + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): await super()._handle_interruption(frame, direction) await self.stop_all_metrics() @@ -169,7 +174,7 @@ async def flush_audio(self): "transcript": "", "continue": False, "context_id": self._context_id, - "model_id": self._model_id, + "model_id": self.model_name, "voice": { "mode": "id", "id": self._voice_id @@ -182,7 +187,7 @@ async def flush_audio(self): async def _receive_task_handler(self): try: - async for message in self._websocket: + async for message in self._get_websocket(): msg = json.loads(message) if not msg or msg["context_id"] != self._context_id: continue @@ -235,7 +240,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: "transcript": text + " ", "continue": True, "context_id": self._context_id, - "model_id": self._model_id, + "model_id": self.model_name, "voice": { "mode": "id", "id": self._voice_id @@ -245,7 +250,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: "add_timestamps": True, } try: - await self._websocket.send(json.dumps(msg)) + await self._get_websocket().send(json.dumps(msg)) await self.start_tts_usage_metrics(text) except Exception as e: logger.error(f"{self} error sending message: {e}") diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 708c3c511..5aebdfbb3 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -135,6 +135,7 @@ def __init__(self, self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message) async def set_model(self, model: str): + await super().set_model(model) logger.debug(f"Switching STT model to: [{model}]") self._live_options.model = model await self._disconnect() diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 081a6bf5d..40aa25865 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -107,7 +107,7 @@ def __init__( self._api_key = api_key self._voice_id = voice_id - self._model = model + self.set_model_name(model) self._url = url self._params = params @@ -122,8 +122,8 @@ def can_generate_metrics(self) -> bool: return True async def set_model(self, model: str): + await super().set_model(model) logger.debug(f"Switching TTS model to: [{model}]") - self._model = model await self._disconnect() await self._connect() @@ -160,7 +160,7 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect async def _connect(self): try: voice_id = self._voice_id - model = self._model + model = self.model_name output_format = self._params.output_format url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}" self._websocket = await websockets.connect(url) diff --git a/src/pipecat/services/fal.py b/src/pipecat/services/fal.py index 672135d02..58768180f 100644 --- a/src/pipecat/services/fal.py +++ b/src/pipecat/services/fal.py @@ -46,7 +46,7 @@ def __init__( **kwargs ): super().__init__(**kwargs) - self._model = model + self.set_model_name(model) self._params = params self._aiohttp_session = aiohttp_session if key: @@ -56,7 +56,7 @@ async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating image from prompt: {prompt}") response = await fal_client.run_async( - self._model, + self.model_name, arguments={"prompt": prompt, **self._params.model_dump(exclude_none=True)} ) diff --git a/src/pipecat/services/fireworks.py b/src/pipecat/services/fireworks.py index 7fa4d64e8..87fddd838 100644 --- a/src/pipecat/services/fireworks.py +++ b/src/pipecat/services/fireworks.py @@ -22,4 +22,4 @@ def __init__(self, *, model: str = "accounts/fireworks/models/firefunction-v1", base_url: str = "https://api.fireworks.ai/inference/v1"): - super().__init__(model, base_url) + super().__init__(model=model, base_url=base_url) diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index 7f20f1b8f..b72169b70 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -50,6 +50,7 @@ def can_generate_metrics(self) -> bool: return True def _create_client(self, model: str): + self.set_model_name(model) self._client = gai.GenerativeModel(model) def _get_messages_from_openai_context( diff --git a/src/pipecat/services/moondream.py b/src/pipecat/services/moondream.py index 3441aeeb9..b6391cc93 100644 --- a/src/pipecat/services/moondream.py +++ b/src/pipecat/services/moondream.py @@ -54,6 +54,8 @@ def __init__( ): super().__init__(**kwargs) + self.set_model_name(model) + if not use_cpu: device, dtype = detect_device() else: @@ -73,7 +75,7 @@ def __init__( async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]: if not self._model: - logger.error(f"{self} error: Moondream model not available") + logger.error(f"{self} error: Moondream model not available ({self.model_name})") yield ErrorFrame("Moondream model not available") return diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 7483e2eb5..6fe710b5e 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -33,6 +33,7 @@ FunctionCallInProgressFrame, StartInterruptionFrame ) +from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator, LLMAssistantContextAggregator from pipecat.processors.aggregators.openai_llm_context import ( @@ -83,7 +84,7 @@ class BaseOpenAILLMService(LLMService): def __init__(self, *, model: str, api_key=None, base_url=None, **kwargs): super().__init__(**kwargs) - self._model: str = model + self.set_model_name(model) self._client = self.create_client(api_key=api_key, base_url=base_url, **kwargs) def create_client(self, api_key=None, base_url=None, **kwargs): @@ -104,7 +105,7 @@ async def get_chat_completions( context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]) -> AsyncStream[ChatCompletionChunk]: chunks = await self._client.chat.completions.create( - model=self._model, + model=self.model_name, stream=True, messages=messages, tools=context.tools, @@ -148,13 +149,11 @@ async def _process_context(self, context: OpenAILLMContext): async for chunk in chunk_stream: if chunk.usage: - tokens = { - "processor": self.name, - "model": self._model, - "prompt_tokens": chunk.usage.prompt_tokens, - "completion_tokens": chunk.usage.completion_tokens, - "total_tokens": chunk.usage.total_tokens - } + tokens = LLMTokenUsage( + prompt_tokens=chunk.usage.prompt_tokens, + completion_tokens=chunk.usage.completion_tokens, + total_tokens=chunk.usage.total_tokens + ) await self.start_llm_usage_metrics(tokens) if len(chunk.choices) == 0: @@ -223,7 +222,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context = OpenAILLMContext.from_image_frame(frame) elif isinstance(frame, LLMModelUpdateFrame): logger.debug(f"Switching LLM model to: [{frame.model}]") - self._model = frame.model + self.set_model_name(frame.model) else: await self.push_frame(frame, direction) @@ -273,7 +272,7 @@ def __init__( model: str = "dall-e-3", ): super().__init__() - self._model = model + self.set_model_name(model) self._image_size = image_size self._client = AsyncOpenAI(api_key=api_key) self._aiohttp_session = aiohttp_session @@ -283,7 +282,7 @@ async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: image = await self._client.images.generate( prompt=prompt, - model=self._model, + model=self.model_name, n=1, size=self._image_size ) @@ -325,7 +324,7 @@ def __init__( super().__init__(sample_rate=sample_rate, **kwargs) self._voice: ValidVoice = VALID_VOICES.get(voice, "alloy") - self._model = model + self.set_model_name(model) self._sample_rate = sample_rate self._client = AsyncOpenAI(api_key=api_key) @@ -348,7 +347,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async with self._client.audio.speech.with_streaming_response.create( input=text, - model=self._model, + model=self.model_name, voice=self._voice, response_format="pcm", ) as r: diff --git a/src/pipecat/services/openpipe.py b/src/pipecat/services/openpipe.py index ada7824fb..e4e14dc15 100644 --- a/src/pipecat/services/openpipe.py +++ b/src/pipecat/services/openpipe.py @@ -60,7 +60,7 @@ async def get_chat_completions( context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]) -> AsyncStream[ChatCompletionChunk]: chunks = await self._client.chat.completions.create( - model=self._model, + model=self.model_name, stream=True, messages=messages, openpipe={ diff --git a/src/pipecat/services/together.py b/src/pipecat/services/together.py index 49759cb01..004236ac8 100644 --- a/src/pipecat/services/together.py +++ b/src/pipecat/services/together.py @@ -18,9 +18,7 @@ Frame, LLMModelUpdateFrame, TextFrame, - VisionImageRawFrame, UserImageRequestFrame, - UserImageRawFrame, LLMMessagesFrame, LLMFullResponseStartFrame, LLMFullResponseEndFrame, @@ -28,6 +26,7 @@ FunctionCallInProgressFrame, StartInterruptionFrame ) +from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame @@ -69,7 +68,7 @@ def __init__( **kwargs): super().__init__(**kwargs) self._client = AsyncTogether(api_key=api_key) - self._model = model + self.set_model_name(model) self._max_tokens = max_tokens def can_generate_metrics(self) -> bool: @@ -95,7 +94,7 @@ async def _process_context(self, context: OpenAILLMContext): stream = await self._client.chat.completions.create( messages=context.messages, - model=self._model, + model=self.model_name, max_tokens=self._max_tokens, stream=True, ) @@ -108,13 +107,11 @@ async def _process_context(self, context: OpenAILLMContext): async for chunk in stream: # logger.debug(f"Together LLM event: {chunk}") if chunk.usage: - tokens = { - "processor": self.name, - "model": self._model, - "prompt_tokens": chunk.usage.prompt_tokens, - "completion_tokens": chunk.usage.completion_tokens, - "total_tokens": chunk.usage.total_tokens - } + tokens = LLMTokenUsage( + prompt_tokens=chunk.usage.prompt_tokens, + completion_tokens=chunk.usage.completion_tokens, + total_tokens=chunk.usage.total_tokens + ) await self.start_llm_usage_metrics(tokens) if len(chunk.choices) == 0: @@ -156,7 +153,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context = TogetherLLMContext.from_messages(frame.messages) elif isinstance(frame, LLMModelUpdateFrame): logger.debug(f"Switching LLM model to: [{frame.model}]") - self._model = frame.model + self.set_model_name(frame.model) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/whisper.py b/src/pipecat/services/whisper.py index 04f357a94..9f54f9ca0 100644 --- a/src/pipecat/services/whisper.py +++ b/src/pipecat/services/whisper.py @@ -52,7 +52,7 @@ def __init__(self, super().__init__(**kwargs) self._device: str = device self._compute_type = compute_type - self._model_name: str | Model = model + self.set_model_name(model if isinstance(model, str) else model.value) self._no_speech_prob = no_speech_prob self._model: WhisperModel | None = None self._load() @@ -65,7 +65,7 @@ def _load(self): this model is being run, it will take time to download.""" logger.debug("Loading Whisper model...") self._model = WhisperModel( - self._model_name.value if isinstance(self._model_name, Enum) else self._model_name, + self.model_name, device=self._device, compute_type=self._compute_type) logger.debug("Loaded Whisper model") diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 2a45adf36..e28fe6083 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -35,6 +35,7 @@ TransportMessageFrame, UserImageRawFrame, UserImageRequestFrame) +from pipecat.metrics.metrics import LLMUsageMetricsData, ProcessingMetricsData, TTFBMetricsData, TTSUsageMetricsData from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.transports.base_input import BaseInputTransport @@ -731,14 +732,23 @@ async def send_message(self, frame: TransportMessageFrame): async def send_metrics(self, frame: MetricsFrame): metrics = {} - if frame.ttfb: - metrics["ttfb"] = frame.ttfb - if frame.processing: - metrics["processing"] = frame.processing - if frame.tokens: - metrics["tokens"] = frame.tokens - if frame.characters: - metrics["characters"] = frame.characters + for d in frame.data: + if isinstance(d, TTFBMetricsData): + if "ttfb" not in metrics: + metrics["ttfb"] = [] + metrics["ttfb"].append(d.model_dump()) + elif isinstance(d, ProcessingMetricsData): + if "processing" not in metrics: + metrics["processing"] = [] + metrics["processing"].append(d.model_dump()) + elif isinstance(d, LLMUsageMetricsData): + if "tokens" not in metrics: + metrics["tokens"] = [] + metrics["tokens"].append(d.value.model_dump(exclude_none=True)) + elif isinstance(d, TTSUsageMetricsData): + if "characters" not in metrics: + metrics["characters"] = [] + metrics["characters"].append(d.model_dump()) message = DailyTransportMessageFrame(message={ "type": "pipecat-metrics",