Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fleshed out MetricsFrames and their various types to enforce better #468

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions examples/foundational/06-listen-and-respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys

from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData, LLMUsageMetricsData, TTSUsageMetricsData
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -37,8 +38,18 @@
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, MetricsFrame):
print(
f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}")
for d in frame.data:
if isinstance(d, TTFBMetricsData):
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
print(
f"!!! MetricsFrame: {frame}, tokens: {
d.prompt_tokens}, characters: {
d.completion_tokens}")
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
await self.push_frame(frame, direction)


Expand Down
38 changes: 28 additions & 10 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

from typing import Any, List, Mapping, Optional, Tuple
from typing import Any, List, Optional, Tuple

from dataclasses import dataclass, field

from pipecat.clocks.base_clock import BaseClock
from pipecat.metrics.metrics import MetricsData
from pipecat.transcriptions.language import Language
from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id
Expand Down Expand Up @@ -55,7 +56,8 @@ def __post_init__(self):

def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {
self.sample_rate}, channels: {self.num_channels})"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same changes by autopep8. I think they broke something. If we do this change the output will be messed up with carriage return. What I do, before committing, is just revert only these changes. Very annoying, but I haven't found a better way.

Copy link
Contributor Author

@mattieruth mattieruth Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one actually seems right to me (or almost right. this is wrapping at 108 😕 ). shouldn't it wrap at 100 chars? meanwhile, I agree.. something is up with the formatter because it seems inconsistent



@dataclass
Expand Down Expand Up @@ -96,7 +98,11 @@ class VisionImageRawFrame(ImageRawFrame):

def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text}, size: {self.size}, format: {self.format})"
return f"{
self.name}(pts: {pts}, text: {
self.text}, size: {
self.size}, format: {
self.format})"


@dataclass
Expand All @@ -109,7 +115,11 @@ class UserImageRawFrame(ImageRawFrame):

def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})"
return f"{
self.name}(pts: {pts}, user: {
self.user_id}, size: {
self.size}, format: {
self.format})"


@dataclass
Expand Down Expand Up @@ -150,7 +160,12 @@ class TranscriptionFrame(TextFrame):
language: Language | None = None

def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
return f"{
self.name}(user: {
self.user_id}, text: {
self.text}, language: {
self.language}, timestamp: {
self.timestamp})"


@dataclass
Expand All @@ -162,7 +177,12 @@ class InterimTranscriptionFrame(TextFrame):
language: Language | None = None

def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
return f"{
self.name}(user: {
self.user_id}, text: {
self.text}, language: {
self.language}, timestamp: {
self.timestamp})"


@dataclass
Expand Down Expand Up @@ -323,10 +343,8 @@ class BotInterruptionFrame(SystemFrame):
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies.
"""
ttfb: List[Mapping[str, Any]] | None = None
processing: List[Mapping[str, Any]] | None = None
tokens: List[Mapping[str, Any]] | None = None
characters: List[Mapping[str, Any]] | None = None
data: List[MetricsData]


#
# Control frames
Expand Down
Empty file added src/pipecat/metrics/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions src/pipecat/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a new module, we need an empty __init__.py

from pydantic import BaseModel


class MetricsData(BaseModel):
processor: str


class TTFBMetricsData(MetricsData):
value: float
model: Optional[str] = None


class ProcessingMetricsData(MetricsData):
value: float
model: Optional[str] = None


class LLMUsageMetricsData(MetricsData):
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int


class CacheUsageMetricsData(LLMUsageMetricsData):
cache_read_input_tokens: int
cache_creation_input_tokens: int


class TTSUsageMetricsData(MetricsData):
processor: str
model: Optional[str] = None
value: int
9 changes: 6 additions & 3 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
MetricsFrame,
StartFrame,
StopTaskFrame)
from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.utils import obj_count, obj_id
Expand Down Expand Up @@ -118,9 +119,11 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):

def _initial_metrics_frame(self) -> MetricsFrame:
processors = self._pipeline.processors_with_metrics()
ttfb = [{"processor": p.name, "value": 0.0} for p in processors]
processing = [{"processor": p.name, "value": 0.0} for p in processors]
return MetricsFrame(ttfb=ttfb, processing=processing)
data = []
for p in processors:
data.append(TTFBMetricsData(processor=p.name, value=0.0))
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
return MetricsFrame(data=data)

async def _process_down_queue(self):
self._clock.start()
Expand Down
61 changes: 25 additions & 36 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
StartFrame,
StartInterruptionFrame,
UserStoppedSpeakingFrame)
from pipecat.metrics.metrics import LLMUsageMetricsData, ProcessingMetricsData, TTFBMetricsData, TTSUsageMetricsData
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand All @@ -39,47 +40,37 @@ async def start_ttfb_metrics(self, report_only_initial_ttfb):
self._start_ttfb_time = time.time()
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
async def stop_ttfb_metrics(self, model: str | None = None):
if self._start_ttfb_time == 0:
return None

value = time.time() - self._start_ttfb_time
logger.debug(f"{self._name} TTFB: {value}")
ttfb = {
"processor": self._name,
"value": value
}
ttfb = TTFBMetricsData(processor=self._name, value=value, model=model)
self._start_ttfb_time = 0
return MetricsFrame(ttfb=[ttfb])
return MetricsFrame(data=[ttfb])

async def start_processing_metrics(self):
self._start_processing_time = time.time()

async def stop_processing_metrics(self):
async def stop_processing_metrics(self, model: str | None = None):
if self._start_processing_time == 0:
return None

value = time.time() - self._start_processing_time
logger.debug(f"{self._name} processing time: {value}")
processing = {
"processor": self._name,
"value": value
}
processing = ProcessingMetricsData(processor=self._name, value=value, model=model)
self._start_processing_time = 0
return MetricsFrame(processing=[processing])
return MetricsFrame(data=[processing])

async def start_llm_usage_metrics(self, tokens: dict):
logger.debug(
f"{self._name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}")
return MetricsFrame(tokens=[tokens])
async def start_llm_usage_metrics(self, usage_params: LLMUsageMetricsData):
logger.debug(f"{self._name} prompt tokens: {
usage_params.prompt_tokens}, completion tokens: {usage_params.completion_tokens}")
return MetricsFrame(data=[usage_params])

async def start_tts_usage_metrics(self, text: str):
characters = {
"processor": self._name,
"value": len(text),
}
logger.debug(f"{self._name} usage characters: {characters['value']}")
return MetricsFrame(characters=[characters])
async def start_tts_usage_metrics(self, usage_params: TTSUsageMetricsData):
logger.debug(f"{self._name} usage characters: {usage_params.value}")
return MetricsFrame(data=[usage_params])


class FrameProcessor:
Expand Down Expand Up @@ -132,37 +123,37 @@ async def start_ttfb_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)

async def stop_ttfb_metrics(self):
async def stop_ttfb_metrics(self, model: str | None = None):
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_ttfb_metrics()
frame = await self._metrics.stop_ttfb_metrics(model)
if frame:
await self.push_frame(frame)

async def start_processing_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_processing_metrics()

async def stop_processing_metrics(self):
async def stop_processing_metrics(self, model: str | None = None):
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics()
frame = await self._metrics.stop_processing_metrics(model)
if frame:
await self.push_frame(frame)

async def start_llm_usage_metrics(self, tokens: dict):
async def start_llm_usage_metrics(self, usage_params: LLMUsageMetricsData):
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_llm_usage_metrics(tokens)
frame = await self._metrics.start_llm_usage_metrics(usage_params)
if frame:
await self.push_frame(frame)

async def start_tts_usage_metrics(self, text: str):
async def start_tts_usage_metrics(self, usage_params: TTSUsageMetricsData):
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_tts_usage_metrics(text)
frame = await self._metrics.start_tts_usage_metrics(usage_params)
if frame:
await self.push_frame(frame)

async def stop_all_metrics(self):
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
async def stop_all_metrics(self, model: str | None = None):
await self.stop_ttfb_metrics(model)
await self.stop_processing_metrics(model)

async def cleanup(self):
pass
Expand Down Expand Up @@ -191,8 +182,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self._enable_metrics = frame.enable_metrics
self._enable_usage_metrics = frame.enable_usage_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
elif isinstance(frame, StartInterruptionFrame):
await self.stop_all_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
self._should_report_ttfb = True

Expand Down
22 changes: 1 addition & 21 deletions src/pipecat/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from abc import abstractmethod
from typing import AsyncGenerator, List, Optional, Tuple

from attr import has
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: remove (not sure why this got added)

from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
Expand Down Expand Up @@ -490,17 +491,6 @@ def __init__(self, **kwargs):
async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]:
pass

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, TextFrame):
await self.push_frame(frame, direction)
await self.start_processing_metrics()
await self.process_generator(self.run_image_gen(frame.text))
await self.stop_processing_metrics()
else:
await self.push_frame(frame, direction)


class VisionService(AIService):
"""VisionService is a base class for vision services."""
Expand All @@ -512,13 +502,3 @@ def __init__(self, **kwargs):
@abstractmethod
async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]:
pass

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, VisionImageRawFrame):
await self.start_processing_metrics()
await self.process_generator(self.run_vision(frame))
await self.stop_processing_metrics()
else:
await self.push_frame(frame, direction)
Loading
Loading