Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fleshed out MetricsFrames and their various types to enforce better #468

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions examples/foundational/06-listen-and-respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys

from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData, LLMUsageMetricsData, TTSUsageMetricsData
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -37,8 +38,18 @@
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, MetricsFrame):
print(
f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}")
for d in frame.data:
if isinstance(d, TTFBMetricsData):
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
print(
f"!!! MetricsFrame: {frame}, tokens: {
d.prompt_tokens}, characters: {
d.completion_tokens}")
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
await self.push_frame(frame, direction)


Expand Down
38 changes: 28 additions & 10 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

from typing import Any, List, Mapping, Optional, Tuple
from typing import Any, List, Optional, Tuple

from dataclasses import dataclass, field

from pipecat.clocks.base_clock import BaseClock
from pipecat.metrics.metrics import MetricsData
from pipecat.transcriptions.language import Language
from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id
Expand Down Expand Up @@ -55,7 +56,8 @@ def __post_init__(self):

def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {
self.sample_rate}, channels: {self.num_channels})"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same changes by autopep8. I think they broke something. If we do this change the output will be messed up with carriage return. What I do, before committing, is just revert only these changes. Very annoying, but I haven't found a better way.

Copy link
Contributor Author

@mattieruth mattieruth Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one actually seems right to me (or almost right. this is wrapping at 108 😕 ). shouldn't it wrap at 100 chars? meanwhile, I agree.. something is up with the formatter because it seems inconsistent



@dataclass
Expand Down Expand Up @@ -96,7 +98,11 @@ class VisionImageRawFrame(ImageRawFrame):

def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text}, size: {self.size}, format: {self.format})"
return f"{
self.name}(pts: {pts}, text: {
self.text}, size: {
self.size}, format: {
self.format})"


@dataclass
Expand All @@ -109,7 +115,11 @@ class UserImageRawFrame(ImageRawFrame):

def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})"
return f"{
self.name}(pts: {pts}, user: {
self.user_id}, size: {
self.size}, format: {
self.format})"


@dataclass
Expand Down Expand Up @@ -150,7 +160,12 @@ class TranscriptionFrame(TextFrame):
language: Language | None = None

def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
return f"{
self.name}(user: {
self.user_id}, text: {
self.text}, language: {
self.language}, timestamp: {
self.timestamp})"


@dataclass
Expand All @@ -162,7 +177,12 @@ class InterimTranscriptionFrame(TextFrame):
language: Language | None = None

def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
return f"{
self.name}(user: {
self.user_id}, text: {
self.text}, language: {
self.language}, timestamp: {
self.timestamp})"


@dataclass
Expand Down Expand Up @@ -323,10 +343,8 @@ class BotInterruptionFrame(SystemFrame):
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies.
"""
ttfb: List[Mapping[str, Any]] | None = None
processing: List[Mapping[str, Any]] | None = None
tokens: List[Mapping[str, Any]] | None = None
characters: List[Mapping[str, Any]] | None = None
data: List[MetricsData]


#
# Control frames
Expand Down
34 changes: 34 additions & 0 deletions src/pipecat/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a new module, we need an empty __init__.py

from pydantic import BaseModel


class MetricsData(BaseModel):
processor: str


class TTFBMetricsData(MetricsData):
value: float
model: Optional[str]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can initialize this to None so we don't need to pass it.

    model: Optional[str] = None



class ProcessingMetricsData(MetricsData):
value: float
model: Optional[str]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same



class LLMUsageMetricsData(MetricsData):
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int


class CacheUsageMetricsData(LLMUsageMetricsData):
cache_read_input_tokens: int
cache_creation_input_tokens: int


class TTSUsageMetricsData(MetricsData):
processor: str
model: Optional[str]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    processor: str
    value: int
    model: Optional[str] = None

value: int
9 changes: 6 additions & 3 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
MetricsFrame,
StartFrame,
StopTaskFrame)
from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.utils import obj_count, obj_id
Expand Down Expand Up @@ -118,9 +119,11 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):

def _initial_metrics_frame(self) -> MetricsFrame:
processors = self._pipeline.processors_with_metrics()
ttfb = [{"processor": p.name, "value": 0.0} for p in processors]
processing = [{"processor": p.name, "value": 0.0} for p in processors]
return MetricsFrame(ttfb=ttfb, processing=processing)
data = []
for p in processors:
data.append(TTFBMetricsData(processor=p.name, value=0.0, model=None))
data.append(ProcessingMetricsData(processor=p.name, value=0.0, model=None))
return MetricsFrame(data=data)

async def _process_down_queue(self):
self._clock.start()
Expand Down
59 changes: 25 additions & 34 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
StartFrame,
StartInterruptionFrame,
UserStoppedSpeakingFrame)
from pipecat.metrics.metrics import LLMUsageMetricsData, ProcessingMetricsData, TTFBMetricsData, TTSUsageMetricsData
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand All @@ -39,47 +40,37 @@ async def start_ttfb_metrics(self, report_only_initial_ttfb):
self._start_ttfb_time = time.time()
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
async def stop_ttfb_metrics(self, model: str | None = None):
if self._start_ttfb_time == 0:
return None

value = time.time() - self._start_ttfb_time
logger.debug(f"{self._name} TTFB: {value}")
ttfb = {
"processor": self._name,
"value": value
}
ttfb = TTFBMetricsData(processor=self._name, value=value, model=model)
self._start_ttfb_time = 0
return MetricsFrame(ttfb=[ttfb])
return MetricsFrame(data=[ttfb])

async def start_processing_metrics(self):
self._start_processing_time = time.time()

async def stop_processing_metrics(self):
async def stop_processing_metrics(self, model: str | None = None):
if self._start_processing_time == 0:
return None

value = time.time() - self._start_processing_time
logger.debug(f"{self._name} processing time: {value}")
processing = {
"processor": self._name,
"value": value
}
processing = ProcessingMetricsData(processor=self._name, value=value, model=model)
self._start_processing_time = 0
return MetricsFrame(processing=[processing])
return MetricsFrame(data=[processing])

async def start_llm_usage_metrics(self, tokens: dict):
logger.debug(
f"{self._name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}")
return MetricsFrame(tokens=[tokens])
async def start_llm_usage_metrics(self, usage_params: LLMUsageMetricsData):
logger.debug(f"{self._name} prompt tokens: {
usage_params.prompt_tokens}, completion tokens: {usage_params.completion_tokens}")
return MetricsFrame(data=[usage_params])

async def start_tts_usage_metrics(self, text: str):
characters = {
"processor": self._name,
"value": len(text),
}
logger.debug(f"{self._name} usage characters: {characters['value']}")
return MetricsFrame(characters=[characters])
async def start_tts_usage_metrics(self, usage_params: TTSUsageMetricsData):
logger.debug(f"{self._name} usage characters: {usage_params.value}")
return MetricsFrame(data=[usage_params])


class FrameProcessor:
Expand Down Expand Up @@ -132,37 +123,37 @@ async def start_ttfb_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)

async def stop_ttfb_metrics(self):
async def stop_ttfb_metrics(self, model: str | None = None):
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_ttfb_metrics()
frame = await self._metrics.stop_ttfb_metrics(model)
if frame:
await self.push_frame(frame)

async def start_processing_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_processing_metrics()

async def stop_processing_metrics(self):
async def stop_processing_metrics(self, model: str | None = None):
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics()
frame = await self._metrics.stop_processing_metrics(model)
if frame:
await self.push_frame(frame)

async def start_llm_usage_metrics(self, tokens: dict):
async def start_llm_usage_metrics(self, usage_params: LLMUsageMetricsData):
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_llm_usage_metrics(tokens)
frame = await self._metrics.start_llm_usage_metrics(usage_params)
if frame:
await self.push_frame(frame)

async def start_tts_usage_metrics(self, text: str):
async def start_tts_usage_metrics(self, usage_params: TTSUsageMetricsData):
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_tts_usage_metrics(text)
frame = await self._metrics.start_tts_usage_metrics(usage_params)
if frame:
await self.push_frame(frame)

async def stop_all_metrics(self):
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
async def stop_all_metrics(self, model: str | None = None):
await self.stop_ttfb_metrics(model)
await self.stop_processing_metrics(model)

async def cleanup(self):
pass
Expand Down
5 changes: 3 additions & 2 deletions src/pipecat/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from abc import abstractmethod
from typing import AsyncGenerator, List, Optional, Tuple

from attr import has
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: remove (not sure why this got added)

from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
Expand Down Expand Up @@ -497,7 +498,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await self.push_frame(frame, direction)
await self.start_processing_metrics()
await self.process_generator(self.run_image_gen(frame.text))
await self.stop_processing_metrics()
await self.stop_processing_metrics(self._model if hasattr(self, "_model") else None)
else:
await self.push_frame(frame, direction)

Expand All @@ -519,6 +520,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, VisionImageRawFrame):
await self.start_processing_metrics()
await self.process_generator(self.run_vision(frame))
await self.stop_processing_metrics()
await self.stop_processing_metrics(self._model if hasattr(self, "_model") else None)
else:
await self.push_frame(frame, direction)
23 changes: 12 additions & 11 deletions src/pipecat/services/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
FunctionCallInProgressFrame,
StartInterruptionFrame
)
from pipecat.metrics.metrics import CacheUsageMetricsData
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.processors.aggregators.openai_llm_context import (
Expand Down Expand Up @@ -141,7 +142,7 @@ async def _process_context(self, context: OpenAILLMContext):
max_tokens=self._max_tokens,
stream=True)

await self.stop_ttfb_metrics()
await self.stop_ttfb_metrics(self._model)

# Function calling
tool_use_block = None
Expand Down Expand Up @@ -205,7 +206,7 @@ async def _process_context(self, context: OpenAILLMContext):
except Exception as e:
logger.exception(f"{self} exception: {e}")
finally:
await self.stop_processing_metrics()
await self.stop_processing_metrics(self._model)
await self.push_frame(LLMFullResponseEndFrame())
comp_tokens = completion_tokens if not use_completion_tokens_estimate else completion_tokens_estimate
await self._report_usage_metrics(
Expand Down Expand Up @@ -251,15 +252,15 @@ async def _report_usage_metrics(
cache_creation_input_tokens: int,
cache_read_input_tokens: int):
if prompt_tokens or completion_tokens or cache_creation_input_tokens or cache_read_input_tokens:
tokens = {
"processor": self.name,
"model": self._model,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"cache_creation_input_tokens": cache_creation_input_tokens,
"cache_read_input_tokens": cache_read_input_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
tokens = CacheUsageMetricsData(
processor=self.name,
model=self._model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cache_creation_input_tokens=cache_creation_input_tokens,
cache_read_input_tokens=cache_read_input_tokens,
total_tokens=prompt_tokens + completion_tokens
)
await self.start_llm_usage_metrics(tokens)


Expand Down
3 changes: 2 additions & 1 deletion src/pipecat/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
TTSStoppedFrame,
TranscriptionFrame,
URLImageRawFrame)
from pipecat.metrics.metrics import TTSUsageMetricsData
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AsyncAIService, TTSService, ImageGenService
from pipecat.services.openai import BaseOpenAILLMService
Expand Down Expand Up @@ -105,7 +106,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, (ssml))

if result.reason == ResultReason.SynthesizingAudioCompleted:
await self.start_tts_usage_metrics(text)
await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, model=None, value=len(text)))
await self.stop_ttfb_metrics()
await self.push_frame(TTSStartedFrame())
# Azure always sends a 44-byte header. Strip it off.
Expand Down
Loading
Loading