Skip to content

Commit

Permalink
Additional LLM and TTS metrics (#343)
Browse files Browse the repository at this point in the history
* added llm and tts usage metrics

* Metrics debug logging

* cleanup
  • Loading branch information
chadbailey59 authored Aug 7, 2024
1 parent 83a037a commit 3958bb7
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 19 deletions.
29 changes: 20 additions & 9 deletions examples/foundational/06-listen-and-respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
import os
import sys

from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
Expand All @@ -34,6 +35,14 @@
logger.add(sys.stderr, level="DEBUG")


class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, MetricsFrame):
print(
f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}")
await self.push_frame(frame, direction)


async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
Expand All @@ -58,11 +67,10 @@ async def main():

llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
model="gpt-4o"
)

fl = FrameLogger("!!! after LLM", "red")
fltts = FrameLogger("@@@ out of tts", "green")
flend = FrameLogger("### out of the end", "magenta")
ml = MetricsLogger()

messages = [
{
Expand All @@ -77,15 +85,18 @@ async def main():
transport.input(),
tma_in,
llm,
fl,
tts,
fltts,
ml,
transport.output(),
tma_out,
flend
])

task = PipelineTask(pipeline)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=False,
))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
2 changes: 2 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class MetricsFrame(SystemFrame):
"""
ttfb: List[Mapping[str, Any]] | None = None
processing: List[Mapping[str, Any]] | None = None
tokens: List[Mapping[str, Any]] | None = None
characters: List[Mapping[str, Any]] | None = None

#
# Control frames
Expand Down
6 changes: 5 additions & 1 deletion src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,11 @@ async def _handle_pipeline_setup(self, start_frame: StartFrame, config: RTVIConf
processors.extend(pipeline.processors_with_metrics())
ttfb = [{"processor": p.name, "value": 0.0} for p in processors]
processing = [{"processor": p.name, "value": 0.0} for p in processors]
await self.push_frame(MetricsFrame(ttfb=ttfb, processing=processing))
tokens = [{"processor": p.name, "value": {"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0}} for p in processors]
characters = [{"processor": p.name, "value": 0} for p in processors]
await self.push_frame(MetricsFrame(ttfb=ttfb, processing=processing, tokens=tokens, characters=characters))

self._pipeline = pipeline

Expand Down
9 changes: 8 additions & 1 deletion src/pipecat/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
EndFrame,
ErrorFrame,
Frame,
MetricsFrame,
StartFrame,
SystemFrame,
TranscriptionFrame,
Expand Down Expand Up @@ -87,7 +88,13 @@ async def set_voice(self, voice: str):

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

if self.can_generate_metrics() and self.metrics_enabled:
characters = {
"processor": self.name,
"value": len(text),
}
logger.debug(f"{self.name} Characters: {characters['value']}")
await self.push_frame(MetricsFrame(characters=[characters]))
await self.start_ttfb_metrics()

ssml = (
Expand Down
8 changes: 8 additions & 0 deletions src/pipecat/services/cartesia.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
StartFrame,
EndFrame,
TextFrame,
MetricsFrame,
LLMFullResponseEndFrame
)
from pipecat.services.ai_services import TTSService
Expand Down Expand Up @@ -200,6 +201,13 @@ async def _context_appending_task_handler(self):

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
if self.can_generate_metrics() and self.metrics_enabled:
characters = {
"processor": self.name,
"value": len(text),
}
logger.debug(f"{self.name} Characters: {characters['value']}")
await self.push_frame(MetricsFrame(characters=[characters]))

try:
if not self._websocket:
Expand Down
9 changes: 8 additions & 1 deletion src/pipecat/services/deepgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ErrorFrame,
Frame,
InterimTranscriptionFrame,
MetricsFrame,
StartFrame,
SystemFrame,
TranscriptionFrame)
Expand Down Expand Up @@ -70,7 +71,13 @@ async def set_voice(self, voice: str):

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

if self.can_generate_metrics() and self.metrics_enabled:
characters = {
"processor": self.name,
"value": len(text),
}
logger.debug(f"{self.name} Characters: {characters['value']}")
await self.push_frame(MetricsFrame(characters=[characters]))
base_url = self._base_url
request_url = f"{base_url}?model={self._voice}&encoding={self._encoding}&container=none&sample_rate={self._sample_rate}"
headers = {"authorization": f"token {self._api_key}"}
Expand Down
10 changes: 8 additions & 2 deletions src/pipecat/services/elevenlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from typing import AsyncGenerator

from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, MetricsFrame
from pipecat.services.ai_services import TTSService

from loguru import logger
Expand Down Expand Up @@ -40,7 +40,13 @@ async def set_voice(self, voice: str):

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

if self.can_generate_metrics() and self.metrics_enabled:
characters = {
"processor": self.name,
"value": len(text),
}
logger.debug(f"{self.name} Characters: {characters['value']}")
await self.push_frame(MetricsFrame(characters=[characters]))
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream"

payload = {"text": text, "model_id": self._model}
Expand Down
23 changes: 22 additions & 1 deletion src/pipecat/services/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMModelUpdateFrame,
MetricsFrame,
TextFrame,
URLImageRawFrame,
VisionImageRawFrame
Expand Down Expand Up @@ -95,6 +96,7 @@ async def get_chat_completions(
messages=messages,
tools=context.tools,
tool_choice=context.tool_choice,
stream_options={"include_usage": True}
)
return chunks

Expand Down Expand Up @@ -132,6 +134,19 @@ async def _process_context(self, context: OpenAILLMContext):
)

async for chunk in chunk_stream:
if chunk.usage:
if self.can_generate_metrics() and self.metrics_enabled:
tokens = {
"processor": self.name,
"prompt_tokens": chunk.usage.prompt_tokens,
"completion_tokens": chunk.usage.completion_tokens,
"total_tokens": chunk.usage.total_tokens
}
logger.debug(
f"{self.name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}")

await self.push_frame(MetricsFrame(tokens=[tokens]))

if len(chunk.choices) == 0:
continue

Expand Down Expand Up @@ -323,7 +338,13 @@ async def set_voice(self, voice: str):

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

if self.can_generate_metrics() and self.metrics_enabled:
characters = {
"processor": self.name,
"value": len(text),
}
logger.debug(f"{self.name} Characters: {characters['value']}")
await self.push_frame(MetricsFrame(characters=[characters]))
try:
await self.start_ttfb_metrics()

Expand Down
10 changes: 8 additions & 2 deletions src/pipecat/services/playht.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from typing import AsyncGenerator

from pipecat.frames.frames import AudioRawFrame, Frame
from pipecat.frames.frames import AudioRawFrame, Frame, MetricsFrame
from pipecat.services.ai_services import TTSService

from loguru import logger
Expand Down Expand Up @@ -48,7 +48,13 @@ def can_generate_metrics(self) -> bool:

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

if self.can_generate_metrics() and self.metrics_enabled:
characters = {
"processor": self.name,
"value": len(text),
}
logger.debug(f"{self.name} Characters: {characters['value']}")
await self.push_frame(MetricsFrame(characters=[characters]))
try:
b = bytearray()
in_header = True
Expand Down
10 changes: 8 additions & 2 deletions src/pipecat/services/xtts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from typing import Any, AsyncGenerator, Dict

from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, StartFrame
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, MetricsFrame, StartFrame
from pipecat.services.ai_services import TTSService

from loguru import logger
Expand Down Expand Up @@ -70,7 +70,13 @@ async def set_voice(self, voice: str):

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

if self.can_generate_metrics() and self.metrics_enabled:
characters = {
"processor": self.name,
"value": len(text),
}
logger.debug(f"{self.name} Characters: {characters['value']}")
await self.push_frame(MetricsFrame(characters=[characters]))
if not self._studio_speakers:
logger.error(f"{self} no studio speakers available")
return
Expand Down
4 changes: 4 additions & 0 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,10 @@ async def send_metrics(self, frame: MetricsFrame):
metrics["ttfb"] = frame.ttfb
if frame.processing:
metrics["processing"] = frame.processing
if frame.tokens:
metrics["tokens"] = frame.tokens
if frame.characters:
metrics["characters"] = frame.characters

message = DailyTransportMessageFrame(message={
"type": "pipecat-metrics",
Expand Down

0 comments on commit 3958bb7

Please sign in to comment.