diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index e99a95068..6a10f927c 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -10,6 +10,7 @@ import sys from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame +from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData, LLMUsageMetricsData, TTSUsageMetricsData from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -37,8 +38,19 @@ class MetricsLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, MetricsFrame): - print( - f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}") + for d in frame.data: + if isinstance(d, TTFBMetricsData): + print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}") + elif isinstance(d, ProcessingMetricsData): + print(f"!!! MetricsFrame: {frame}, processing: {d.value}") + elif isinstance(d, LLMUsageMetricsData): + tokens = d.value + print( + f"!!! MetricsFrame: {frame}, tokens: { + tokens.prompt_tokens}, characters: { + tokens.completion_tokens}") + elif isinstance(d, TTSUsageMetricsData): + print(f"!!! MetricsFrame: {frame}, characters: {d.value}") await self.push_frame(frame, direction) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 4d207fecd..adb3f88c6 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -4,11 +4,12 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import Any, List, Mapping, Optional, Tuple +from typing import Any, List, Optional, Tuple from dataclasses import dataclass, field from pipecat.clocks.base_clock import BaseClock +from pipecat.metrics.metrics import MetricsData from pipecat.transcriptions.language import Language from pipecat.utils.time import nanoseconds_to_str from pipecat.utils.utils import obj_count, obj_id @@ -333,10 +334,8 @@ class BotInterruptionFrame(SystemFrame): class MetricsFrame(SystemFrame): """Emitted by processor that can compute metrics like latencies. """ - ttfb: List[Mapping[str, Any]] | None = None - processing: List[Mapping[str, Any]] | None = None - tokens: List[Mapping[str, Any]] | None = None - characters: List[Mapping[str, Any]] | None = None + data: List[MetricsData] + # # Control frames diff --git a/src/pipecat/metrics/__init__.py b/src/pipecat/metrics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/pipecat/metrics/metrics.py b/src/pipecat/metrics/metrics.py new file mode 100644 index 000000000..053708998 --- /dev/null +++ b/src/pipecat/metrics/metrics.py @@ -0,0 +1,31 @@ +from typing import Optional +from pydantic import BaseModel + + +class MetricsData(BaseModel): + processor: str + model: Optional[str] = None + + +class TTFBMetricsData(MetricsData): + value: float + + +class ProcessingMetricsData(MetricsData): + value: float + + +class LLMTokenUsage(BaseModel): + prompt_tokens: int + completion_tokens: int + total_tokens: int + cache_read_input_tokens: Optional[int] = None + cache_creation_input_tokens: Optional[int] = None + + +class LLMUsageMetricsData(MetricsData): + value: LLMTokenUsage + + +class TTSUsageMetricsData(MetricsData): + value: int diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 03fd5c734..26e6e9f4f 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -20,6 +20,7 @@ MetricsFrame, StartFrame, StopTaskFrame) +from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.utils.utils import obj_count, obj_id @@ -118,9 +119,11 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): def _initial_metrics_frame(self) -> MetricsFrame: processors = self._pipeline.processors_with_metrics() - ttfb = [{"processor": p.name, "value": 0.0} for p in processors] - processing = [{"processor": p.name, "value": 0.0} for p in processors] - return MetricsFrame(ttfb=ttfb, processing=processing) + data = [] + for p in processors: + data.append(TTFBMetricsData(processor=p.name, value=0.0)) + data.append(ProcessingMetricsData(processor=p.name, value=0.0)) + return MetricsFrame(data=data) async def _process_down_queue(self): self._clock.start() diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 72924776c..e44b8b0ff 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -19,6 +19,13 @@ StartInterruptionFrame, StopInterruptionFrame, SystemFrame) +from pipecat.metrics.metrics import ( + LLMTokenUsage, + LLMUsageMetricsData, + MetricsData, + ProcessingMetricsData, + TTFBMetricsData, + TTSUsageMetricsData) from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -31,11 +38,20 @@ class FrameDirection(Enum): class FrameProcessorMetrics: def __init__(self, name: str): - self._name = name + self._core_metrics_data = MetricsData(processor=name) self._start_ttfb_time = 0 self._start_processing_time = 0 self._should_report_ttfb = True + def _processor_name(self): + return self._core_metrics_data.processor + + def _model_name(self): + return self._core_metrics_data.model + + def set_core_metrics_data(self, data: MetricsData): + self._core_metrics_data = data + async def start_ttfb_metrics(self, report_only_initial_ttfb): if self._should_report_ttfb: self._start_ttfb_time = time.time() @@ -46,13 +62,13 @@ async def stop_ttfb_metrics(self): return None value = time.time() - self._start_ttfb_time - logger.debug(f"{self._name} TTFB: {value}") - ttfb = { - "processor": self._name, - "value": value - } + logger.debug(f"{self._processor_name()} TTFB: {value}") + ttfb = TTFBMetricsData( + processor=self._processor_name(), + value=value, + model=self._model_name()) self._start_ttfb_time = 0 - return MetricsFrame(ttfb=[ttfb]) + return MetricsFrame(data=[ttfb]) async def start_processing_metrics(self): self._start_processing_time = time.time() @@ -62,26 +78,28 @@ async def stop_processing_metrics(self): return None value = time.time() - self._start_processing_time - logger.debug(f"{self._name} processing time: {value}") - processing = { - "processor": self._name, - "value": value - } + logger.debug(f"{self._processor_name()} processing time: {value}") + processing = ProcessingMetricsData( + processor=self._processor_name(), value=value, model=self._model_name()) self._start_processing_time = 0 - return MetricsFrame(processing=[processing]) + return MetricsFrame(data=[processing]) - async def start_llm_usage_metrics(self, tokens: dict): + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): logger.debug( - f"{self._name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}") - return MetricsFrame(tokens=[tokens]) + f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}") + value = LLMUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=tokens) + return MetricsFrame(data=[value]) async def start_tts_usage_metrics(self, text: str): - characters = { - "processor": self._name, - "value": len(text), - } - logger.debug(f"{self._name} usage characters: {characters['value']}") - return MetricsFrame(characters=[characters]) + characters = TTSUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=len(text)) + logger.debug(f"{self._processor_name()} usage characters: {characters.value}") + return MetricsFrame(data=[characters]) class FrameProcessor: @@ -140,6 +158,9 @@ def report_only_initial_ttfb(self): def can_generate_metrics(self) -> bool: return False + def set_core_metrics_data(self, data: MetricsData): + self._metrics.set_core_metrics_data(data) + async def start_ttfb_metrics(self): if self.can_generate_metrics() and self.metrics_enabled: await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb) @@ -160,7 +181,7 @@ async def stop_processing_metrics(self): if frame: await self.push_frame(frame) - async def start_llm_usage_metrics(self, tokens: dict): + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): if self.can_generate_metrics() and self.usage_metrics_enabled: frame = await self._metrics.start_llm_usage_metrics(tokens) if frame: diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index d91782e42..0207b0511 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -32,6 +32,7 @@ UserImageRequestFrame, VisionImageRawFrame ) +from pipecat.metrics.metrics import MetricsData from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.utils.audio import calculate_audio_volume @@ -46,6 +47,15 @@ class AIService(FrameProcessor): def __init__(self, **kwargs): super().__init__(**kwargs) + self._model_name: str = "" + + @property + def model_name(self) -> str: + return self._model_name + + def set_model_name(self, model: str): + self._model_name = model + self.set_core_metrics_data(MetricsData(processor=self.name, model=self._model_name)) async def start(self, frame: StartFrame): pass @@ -158,7 +168,7 @@ def sample_rate(self) -> int: @abstractmethod async def set_model(self, model: str): - pass + self.set_model_name(model) @abstractmethod async def set_voice(self, voice: str): @@ -367,7 +377,7 @@ def __init__(self, **kwargs): @abstractmethod async def set_model(self, model: str): - pass + self.set_model_name(model) @abstractmethod async def set_language(self, language: Language): diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index 329959b1f..7935691ce 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -29,6 +29,7 @@ FunctionCallInProgressFrame, StartInterruptionFrame ) +from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService from pipecat.processors.aggregators.openai_llm_context import ( @@ -84,7 +85,7 @@ def __init__( **kwargs): super().__init__(**kwargs) self._client = AsyncAnthropic(api_key=api_key) - self._model = model + self.set_model_name(model) self._max_tokens = max_tokens self._enable_prompt_caching_beta = enable_prompt_caching_beta @@ -137,7 +138,7 @@ async def _process_context(self, context: OpenAILLMContext): tools=context.tools or [], system=context.system, messages=messages, - model=self._model, + model=self.model_name, max_tokens=self._max_tokens, stream=True) @@ -231,7 +232,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context = AnthropicLLMContext.from_image_frame(frame) elif isinstance(frame, LLMModelUpdateFrame): logger.debug(f"Switching LLM model to: [{frame.model}]") - self._model = frame.model + self.set_model_name(frame.model) elif isinstance(frame, LLMEnablePromptCachingFrame): logger.debug(f"Setting enable prompt caching to: [{frame.enable}]") self._enable_prompt_caching_beta = frame.enable @@ -251,15 +252,13 @@ async def _report_usage_metrics( cache_creation_input_tokens: int, cache_read_input_tokens: int): if prompt_tokens or completion_tokens or cache_creation_input_tokens or cache_read_input_tokens: - tokens = { - "processor": self.name, - "model": self._model, - "prompt_tokens": prompt_tokens, - "completion_tokens": completion_tokens, - "cache_creation_input_tokens": cache_creation_input_tokens, - "cache_read_input_tokens": cache_read_input_tokens, - "total_tokens": prompt_tokens + completion_tokens - } + tokens = LLMTokenUsage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + cache_creation_input_tokens=cache_creation_input_tokens, + cache_read_input_tokens=cache_read_input_tokens, + total_tokens=prompt_tokens + completion_tokens + ) await self.start_llm_usage_metrics(tokens) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index 90674fcc4..daac57119 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -22,6 +22,8 @@ TTSStoppedFrame, TranscriptionFrame, URLImageRawFrame) +from pipecat.metrics.metrics import TTSUsageMetricsData +from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import STTService, TTSService, ImageGenService from pipecat.services.openai import BaseOpenAILLMService from pipecat.utils.time import time_now_iso8601 @@ -190,7 +192,7 @@ def __init__( self._api_key = api_key self._azure_endpoint = endpoint self._api_version = api_version - self._model = model + self.set_model_name(model) self._image_size = image_size self._aiohttp_session = aiohttp_session diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 078926235..ab6026052 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -89,7 +89,7 @@ def __init__( self._cartesia_version = cartesia_version self._url = url self._voice_id = voice_id - self._model_id = model_id + self.set_model_name(model_id) self._output_format = { "container": "raw", "encoding": encoding, @@ -105,8 +105,8 @@ def can_generate_metrics(self) -> bool: return True async def set_model(self, model: str): + await super().set_model(model) logger.debug(f"Switching TTS model to: [{model}]") - self._model_id = model async def set_voice(self, voice: str): logger.debug(f"Switching TTS voice to: [{voice}]") @@ -155,6 +155,11 @@ async def _disconnect(self): except Exception as e: logger.error(f"{self} error closing websocket: {e}") + def _get_websocket(self): + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): await super()._handle_interruption(frame, direction) await self.stop_all_metrics() @@ -169,7 +174,7 @@ async def flush_audio(self): "transcript": "", "continue": False, "context_id": self._context_id, - "model_id": self._model_id, + "model_id": self.model_name, "voice": { "mode": "id", "id": self._voice_id @@ -182,7 +187,7 @@ async def flush_audio(self): async def _receive_task_handler(self): try: - async for message in self._websocket: + async for message in self._get_websocket(): msg = json.loads(message) if not msg or msg["context_id"] != self._context_id: continue @@ -235,7 +240,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: "transcript": text + " ", "continue": True, "context_id": self._context_id, - "model_id": self._model_id, + "model_id": self.model_name, "voice": { "mode": "id", "id": self._voice_id @@ -245,7 +250,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: "add_timestamps": True, } try: - await self._websocket.send(json.dumps(msg)) + await self._get_websocket().send(json.dumps(msg)) await self.start_tts_usage_metrics(text) except Exception as e: logger.error(f"{self} error sending message: {e}") diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 708c3c511..5aebdfbb3 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -135,6 +135,7 @@ def __init__(self, self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message) async def set_model(self, model: str): + await super().set_model(model) logger.debug(f"Switching STT model to: [{model}]") self._live_options.model = model await self._disconnect() diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 081a6bf5d..40aa25865 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -107,7 +107,7 @@ def __init__( self._api_key = api_key self._voice_id = voice_id - self._model = model + self.set_model_name(model) self._url = url self._params = params @@ -122,8 +122,8 @@ def can_generate_metrics(self) -> bool: return True async def set_model(self, model: str): + await super().set_model(model) logger.debug(f"Switching TTS model to: [{model}]") - self._model = model await self._disconnect() await self._connect() @@ -160,7 +160,7 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect async def _connect(self): try: voice_id = self._voice_id - model = self._model + model = self.model_name output_format = self._params.output_format url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}" self._websocket = await websockets.connect(url) diff --git a/src/pipecat/services/fal.py b/src/pipecat/services/fal.py index 672135d02..58768180f 100644 --- a/src/pipecat/services/fal.py +++ b/src/pipecat/services/fal.py @@ -46,7 +46,7 @@ def __init__( **kwargs ): super().__init__(**kwargs) - self._model = model + self.set_model_name(model) self._params = params self._aiohttp_session = aiohttp_session if key: @@ -56,7 +56,7 @@ async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating image from prompt: {prompt}") response = await fal_client.run_async( - self._model, + self.model_name, arguments={"prompt": prompt, **self._params.model_dump(exclude_none=True)} ) diff --git a/src/pipecat/services/fireworks.py b/src/pipecat/services/fireworks.py index 7fa4d64e8..87fddd838 100644 --- a/src/pipecat/services/fireworks.py +++ b/src/pipecat/services/fireworks.py @@ -22,4 +22,4 @@ def __init__(self, *, model: str = "accounts/fireworks/models/firefunction-v1", base_url: str = "https://api.fireworks.ai/inference/v1"): - super().__init__(model, base_url) + super().__init__(model=model, base_url=base_url) diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index 7f20f1b8f..b72169b70 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -50,6 +50,7 @@ def can_generate_metrics(self) -> bool: return True def _create_client(self, model: str): + self.set_model_name(model) self._client = gai.GenerativeModel(model) def _get_messages_from_openai_context( diff --git a/src/pipecat/services/moondream.py b/src/pipecat/services/moondream.py index 3441aeeb9..b6391cc93 100644 --- a/src/pipecat/services/moondream.py +++ b/src/pipecat/services/moondream.py @@ -54,6 +54,8 @@ def __init__( ): super().__init__(**kwargs) + self.set_model_name(model) + if not use_cpu: device, dtype = detect_device() else: @@ -73,7 +75,7 @@ def __init__( async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]: if not self._model: - logger.error(f"{self} error: Moondream model not available") + logger.error(f"{self} error: Moondream model not available ({self.model_name})") yield ErrorFrame("Moondream model not available") return diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 7483e2eb5..6fe710b5e 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -33,6 +33,7 @@ FunctionCallInProgressFrame, StartInterruptionFrame ) +from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator, LLMAssistantContextAggregator from pipecat.processors.aggregators.openai_llm_context import ( @@ -83,7 +84,7 @@ class BaseOpenAILLMService(LLMService): def __init__(self, *, model: str, api_key=None, base_url=None, **kwargs): super().__init__(**kwargs) - self._model: str = model + self.set_model_name(model) self._client = self.create_client(api_key=api_key, base_url=base_url, **kwargs) def create_client(self, api_key=None, base_url=None, **kwargs): @@ -104,7 +105,7 @@ async def get_chat_completions( context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]) -> AsyncStream[ChatCompletionChunk]: chunks = await self._client.chat.completions.create( - model=self._model, + model=self.model_name, stream=True, messages=messages, tools=context.tools, @@ -148,13 +149,11 @@ async def _process_context(self, context: OpenAILLMContext): async for chunk in chunk_stream: if chunk.usage: - tokens = { - "processor": self.name, - "model": self._model, - "prompt_tokens": chunk.usage.prompt_tokens, - "completion_tokens": chunk.usage.completion_tokens, - "total_tokens": chunk.usage.total_tokens - } + tokens = LLMTokenUsage( + prompt_tokens=chunk.usage.prompt_tokens, + completion_tokens=chunk.usage.completion_tokens, + total_tokens=chunk.usage.total_tokens + ) await self.start_llm_usage_metrics(tokens) if len(chunk.choices) == 0: @@ -223,7 +222,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context = OpenAILLMContext.from_image_frame(frame) elif isinstance(frame, LLMModelUpdateFrame): logger.debug(f"Switching LLM model to: [{frame.model}]") - self._model = frame.model + self.set_model_name(frame.model) else: await self.push_frame(frame, direction) @@ -273,7 +272,7 @@ def __init__( model: str = "dall-e-3", ): super().__init__() - self._model = model + self.set_model_name(model) self._image_size = image_size self._client = AsyncOpenAI(api_key=api_key) self._aiohttp_session = aiohttp_session @@ -283,7 +282,7 @@ async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: image = await self._client.images.generate( prompt=prompt, - model=self._model, + model=self.model_name, n=1, size=self._image_size ) @@ -325,7 +324,7 @@ def __init__( super().__init__(sample_rate=sample_rate, **kwargs) self._voice: ValidVoice = VALID_VOICES.get(voice, "alloy") - self._model = model + self.set_model_name(model) self._sample_rate = sample_rate self._client = AsyncOpenAI(api_key=api_key) @@ -348,7 +347,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async with self._client.audio.speech.with_streaming_response.create( input=text, - model=self._model, + model=self.model_name, voice=self._voice, response_format="pcm", ) as r: diff --git a/src/pipecat/services/openpipe.py b/src/pipecat/services/openpipe.py index ada7824fb..e4e14dc15 100644 --- a/src/pipecat/services/openpipe.py +++ b/src/pipecat/services/openpipe.py @@ -60,7 +60,7 @@ async def get_chat_completions( context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]) -> AsyncStream[ChatCompletionChunk]: chunks = await self._client.chat.completions.create( - model=self._model, + model=self.model_name, stream=True, messages=messages, openpipe={ diff --git a/src/pipecat/services/together.py b/src/pipecat/services/together.py index 49759cb01..004236ac8 100644 --- a/src/pipecat/services/together.py +++ b/src/pipecat/services/together.py @@ -18,9 +18,7 @@ Frame, LLMModelUpdateFrame, TextFrame, - VisionImageRawFrame, UserImageRequestFrame, - UserImageRawFrame, LLMMessagesFrame, LLMFullResponseStartFrame, LLMFullResponseEndFrame, @@ -28,6 +26,7 @@ FunctionCallInProgressFrame, StartInterruptionFrame ) +from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame @@ -69,7 +68,7 @@ def __init__( **kwargs): super().__init__(**kwargs) self._client = AsyncTogether(api_key=api_key) - self._model = model + self.set_model_name(model) self._max_tokens = max_tokens def can_generate_metrics(self) -> bool: @@ -95,7 +94,7 @@ async def _process_context(self, context: OpenAILLMContext): stream = await self._client.chat.completions.create( messages=context.messages, - model=self._model, + model=self.model_name, max_tokens=self._max_tokens, stream=True, ) @@ -108,13 +107,11 @@ async def _process_context(self, context: OpenAILLMContext): async for chunk in stream: # logger.debug(f"Together LLM event: {chunk}") if chunk.usage: - tokens = { - "processor": self.name, - "model": self._model, - "prompt_tokens": chunk.usage.prompt_tokens, - "completion_tokens": chunk.usage.completion_tokens, - "total_tokens": chunk.usage.total_tokens - } + tokens = LLMTokenUsage( + prompt_tokens=chunk.usage.prompt_tokens, + completion_tokens=chunk.usage.completion_tokens, + total_tokens=chunk.usage.total_tokens + ) await self.start_llm_usage_metrics(tokens) if len(chunk.choices) == 0: @@ -156,7 +153,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context = TogetherLLMContext.from_messages(frame.messages) elif isinstance(frame, LLMModelUpdateFrame): logger.debug(f"Switching LLM model to: [{frame.model}]") - self._model = frame.model + self.set_model_name(frame.model) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/whisper.py b/src/pipecat/services/whisper.py index 04f357a94..9f54f9ca0 100644 --- a/src/pipecat/services/whisper.py +++ b/src/pipecat/services/whisper.py @@ -52,7 +52,7 @@ def __init__(self, super().__init__(**kwargs) self._device: str = device self._compute_type = compute_type - self._model_name: str | Model = model + self.set_model_name(model if isinstance(model, str) else model.value) self._no_speech_prob = no_speech_prob self._model: WhisperModel | None = None self._load() @@ -65,7 +65,7 @@ def _load(self): this model is being run, it will take time to download.""" logger.debug("Loading Whisper model...") self._model = WhisperModel( - self._model_name.value if isinstance(self._model_name, Enum) else self._model_name, + self.model_name, device=self._device, compute_type=self._compute_type) logger.debug("Loaded Whisper model") diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 2a45adf36..e28fe6083 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -35,6 +35,7 @@ TransportMessageFrame, UserImageRawFrame, UserImageRequestFrame) +from pipecat.metrics.metrics import LLMUsageMetricsData, ProcessingMetricsData, TTFBMetricsData, TTSUsageMetricsData from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.transports.base_input import BaseInputTransport @@ -731,14 +732,23 @@ async def send_message(self, frame: TransportMessageFrame): async def send_metrics(self, frame: MetricsFrame): metrics = {} - if frame.ttfb: - metrics["ttfb"] = frame.ttfb - if frame.processing: - metrics["processing"] = frame.processing - if frame.tokens: - metrics["tokens"] = frame.tokens - if frame.characters: - metrics["characters"] = frame.characters + for d in frame.data: + if isinstance(d, TTFBMetricsData): + if "ttfb" not in metrics: + metrics["ttfb"] = [] + metrics["ttfb"].append(d.model_dump()) + elif isinstance(d, ProcessingMetricsData): + if "processing" not in metrics: + metrics["processing"] = [] + metrics["processing"].append(d.model_dump()) + elif isinstance(d, LLMUsageMetricsData): + if "tokens" not in metrics: + metrics["tokens"] = [] + metrics["tokens"].append(d.value.model_dump(exclude_none=True)) + elif isinstance(d, TTSUsageMetricsData): + if "characters" not in metrics: + metrics["characters"] = [] + metrics["characters"].append(d.model_dump()) message = DailyTransportMessageFrame(message={ "type": "pipecat-metrics",