From 3958bb79035661a8ed9e23f2711ad3a323eba21e Mon Sep 17 00:00:00 2001 From: chadbailey59 Date: Wed, 7 Aug 2024 08:55:51 -0500 Subject: [PATCH] Additional LLM and TTS metrics (#343) * added llm and tts usage metrics * Metrics debug logging * cleanup --- .../foundational/06-listen-and-respond.py | 29 +++++++++++++------ src/pipecat/frames/frames.py | 2 ++ src/pipecat/processors/frameworks/rtvi.py | 6 +++- src/pipecat/services/azure.py | 9 +++++- src/pipecat/services/cartesia.py | 8 +++++ src/pipecat/services/deepgram.py | 9 +++++- src/pipecat/services/elevenlabs.py | 10 +++++-- src/pipecat/services/openai.py | 23 ++++++++++++++- src/pipecat/services/playht.py | 10 +++++-- src/pipecat/services/xtts.py | 10 +++++-- src/pipecat/transports/services/daily.py | 4 +++ 11 files changed, 101 insertions(+), 19 deletions(-) diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index af325cda1..b659615e0 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -9,14 +9,15 @@ import os import sys -from pipecat.frames.frames import LLMMessagesFrame +from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import ( LLMAssistantResponseAggregator, LLMUserResponseAggregator, ) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.logger import FrameLogger from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService @@ -34,6 +35,14 @@ logger.add(sys.stderr, level="DEBUG") +class MetricsLogger(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + if isinstance(frame, MetricsFrame): + print( + f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}") + await self.push_frame(frame, direction) + + async def main(): async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) @@ -58,11 +67,10 @@ async def main(): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") + model="gpt-4o" + ) - fl = FrameLogger("!!! after LLM", "red") - fltts = FrameLogger("@@@ out of tts", "green") - flend = FrameLogger("### out of the end", "magenta") + ml = MetricsLogger() messages = [ { @@ -77,15 +85,18 @@ async def main(): transport.input(), tma_in, llm, - fl, tts, - fltts, + ml, transport.output(), tma_out, - flend ]) task = PipelineTask(pipeline) + task = PipelineTask(pipeline, PipelineParams( + allow_interruptions=True, + enable_metrics=True, + report_only_initial_ttfb=False, + )) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index d634aa49a..8cd1fe8e7 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -276,6 +276,8 @@ class MetricsFrame(SystemFrame): """ ttfb: List[Mapping[str, Any]] | None = None processing: List[Mapping[str, Any]] | None = None + tokens: List[Mapping[str, Any]] | None = None + characters: List[Mapping[str, Any]] | None = None # # Control frames diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index cd4e2e8db..2e316f1c4 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -537,7 +537,11 @@ async def _handle_pipeline_setup(self, start_frame: StartFrame, config: RTVIConf processors.extend(pipeline.processors_with_metrics()) ttfb = [{"processor": p.name, "value": 0.0} for p in processors] processing = [{"processor": p.name, "value": 0.0} for p in processors] - await self.push_frame(MetricsFrame(ttfb=ttfb, processing=processing)) + tokens = [{"processor": p.name, "value": {"prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0}} for p in processors] + characters = [{"processor": p.name, "value": 0} for p in processors] + await self.push_frame(MetricsFrame(ttfb=ttfb, processing=processing, tokens=tokens, characters=characters)) self._pipeline = pipeline diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index 24de2d5b2..13a3e3b38 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -18,6 +18,7 @@ EndFrame, ErrorFrame, Frame, + MetricsFrame, StartFrame, SystemFrame, TranscriptionFrame, @@ -87,7 +88,13 @@ async def set_voice(self, voice: str): async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") - + if self.can_generate_metrics() and self.metrics_enabled: + characters = { + "processor": self.name, + "value": len(text), + } + logger.debug(f"{self.name} Characters: {characters['value']}") + await self.push_frame(MetricsFrame(characters=[characters])) await self.start_ttfb_metrics() ssml = ( diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 592215cd6..b9e7499db 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -21,6 +21,7 @@ StartFrame, EndFrame, TextFrame, + MetricsFrame, LLMFullResponseEndFrame ) from pipecat.services.ai_services import TTSService @@ -200,6 +201,13 @@ async def _context_appending_task_handler(self): async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") + if self.can_generate_metrics() and self.metrics_enabled: + characters = { + "processor": self.name, + "value": len(text), + } + logger.debug(f"{self.name} Characters: {characters['value']}") + await self.push_frame(MetricsFrame(characters=[characters])) try: if not self._websocket: diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 05e05f074..bbe5d3007 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -15,6 +15,7 @@ ErrorFrame, Frame, InterimTranscriptionFrame, + MetricsFrame, StartFrame, SystemFrame, TranscriptionFrame) @@ -70,7 +71,13 @@ async def set_voice(self, voice: str): async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") - + if self.can_generate_metrics() and self.metrics_enabled: + characters = { + "processor": self.name, + "value": len(text), + } + logger.debug(f"{self.name} Characters: {characters['value']}") + await self.push_frame(MetricsFrame(characters=[characters])) base_url = self._base_url request_url = f"{base_url}?model={self._voice}&encoding={self._encoding}&container=none&sample_rate={self._sample_rate}" headers = {"authorization": f"token {self._api_key}"} diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 798de2973..33590a637 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -8,7 +8,7 @@ from typing import AsyncGenerator -from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame +from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, MetricsFrame from pipecat.services.ai_services import TTSService from loguru import logger @@ -40,7 +40,13 @@ async def set_voice(self, voice: str): async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") - + if self.can_generate_metrics() and self.metrics_enabled: + characters = { + "processor": self.name, + "value": len(text), + } + logger.debug(f"{self.name} Characters: {characters['value']}") + await self.push_frame(MetricsFrame(characters=[characters])) url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream" payload = {"text": text, "model_id": self._model} diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index e3113ae8b..01f5e596f 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -23,6 +23,7 @@ LLMFullResponseStartFrame, LLMMessagesFrame, LLMModelUpdateFrame, + MetricsFrame, TextFrame, URLImageRawFrame, VisionImageRawFrame @@ -95,6 +96,7 @@ async def get_chat_completions( messages=messages, tools=context.tools, tool_choice=context.tool_choice, + stream_options={"include_usage": True} ) return chunks @@ -132,6 +134,19 @@ async def _process_context(self, context: OpenAILLMContext): ) async for chunk in chunk_stream: + if chunk.usage: + if self.can_generate_metrics() and self.metrics_enabled: + tokens = { + "processor": self.name, + "prompt_tokens": chunk.usage.prompt_tokens, + "completion_tokens": chunk.usage.completion_tokens, + "total_tokens": chunk.usage.total_tokens + } + logger.debug( + f"{self.name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}") + + await self.push_frame(MetricsFrame(tokens=[tokens])) + if len(chunk.choices) == 0: continue @@ -323,7 +338,13 @@ async def set_voice(self, voice: str): async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") - + if self.can_generate_metrics() and self.metrics_enabled: + characters = { + "processor": self.name, + "value": len(text), + } + logger.debug(f"{self.name} Characters: {characters['value']}") + await self.push_frame(MetricsFrame(characters=[characters])) try: await self.start_ttfb_metrics() diff --git a/src/pipecat/services/playht.py b/src/pipecat/services/playht.py index dc3b9e6fe..c24d8b8b3 100644 --- a/src/pipecat/services/playht.py +++ b/src/pipecat/services/playht.py @@ -9,7 +9,7 @@ from typing import AsyncGenerator -from pipecat.frames.frames import AudioRawFrame, Frame +from pipecat.frames.frames import AudioRawFrame, Frame, MetricsFrame from pipecat.services.ai_services import TTSService from loguru import logger @@ -48,7 +48,13 @@ def can_generate_metrics(self) -> bool: async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") - + if self.can_generate_metrics() and self.metrics_enabled: + characters = { + "processor": self.name, + "value": len(text), + } + logger.debug(f"{self.name} Characters: {characters['value']}") + await self.push_frame(MetricsFrame(characters=[characters])) try: b = bytearray() in_header = True diff --git a/src/pipecat/services/xtts.py b/src/pipecat/services/xtts.py index 151a48e74..d67472891 100644 --- a/src/pipecat/services/xtts.py +++ b/src/pipecat/services/xtts.py @@ -8,7 +8,7 @@ from typing import Any, AsyncGenerator, Dict -from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, StartFrame +from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, MetricsFrame, StartFrame from pipecat.services.ai_services import TTSService from loguru import logger @@ -70,7 +70,13 @@ async def set_voice(self, voice: str): async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") - + if self.can_generate_metrics() and self.metrics_enabled: + characters = { + "processor": self.name, + "value": len(text), + } + logger.debug(f"{self.name} Characters: {characters['value']}") + await self.push_frame(MetricsFrame(characters=[characters])) if not self._studio_speakers: logger.error(f"{self} no studio speakers available") return diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 3047e4fb2..0c3cb5ccb 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -699,6 +699,10 @@ async def send_metrics(self, frame: MetricsFrame): metrics["ttfb"] = frame.ttfb if frame.processing: metrics["processing"] = frame.processing + if frame.tokens: + metrics["tokens"] = frame.tokens + if frame.characters: + metrics["characters"] = frame.characters message = DailyTransportMessageFrame(message={ "type": "pipecat-metrics",