Skip to content

Commit

Permalink
Merge pull request #474 from pipecat-ai/ruthless/improve-metrics-types-2
Browse files Browse the repository at this point in the history
Ruthless/improve metrics types 2
  • Loading branch information
mattieruth authored Sep 20, 2024
2 parents ed409d0 + a4edb3d commit 58d9c84
Show file tree
Hide file tree
Showing 21 changed files with 190 additions and 98 deletions.
16 changes: 14 additions & 2 deletions examples/foundational/06-listen-and-respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys

from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData, LLMUsageMetricsData, TTSUsageMetricsData
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -37,8 +38,19 @@
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, MetricsFrame):
print(
f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}")
for d in frame.data:
if isinstance(d, TTFBMetricsData):
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
tokens = d.value
print(
f"!!! MetricsFrame: {frame}, tokens: {
tokens.prompt_tokens}, characters: {
tokens.completion_tokens}")
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
await self.push_frame(frame, direction)


Expand Down
9 changes: 4 additions & 5 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

from typing import Any, List, Mapping, Optional, Tuple
from typing import Any, List, Optional, Tuple

from dataclasses import dataclass, field

from pipecat.clocks.base_clock import BaseClock
from pipecat.metrics.metrics import MetricsData
from pipecat.transcriptions.language import Language
from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id
Expand Down Expand Up @@ -333,10 +334,8 @@ class BotInterruptionFrame(SystemFrame):
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies.
"""
ttfb: List[Mapping[str, Any]] | None = None
processing: List[Mapping[str, Any]] | None = None
tokens: List[Mapping[str, Any]] | None = None
characters: List[Mapping[str, Any]] | None = None
data: List[MetricsData]


#
# Control frames
Expand Down
Empty file added src/pipecat/metrics/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions src/pipecat/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Optional
from pydantic import BaseModel


class MetricsData(BaseModel):
processor: str
model: Optional[str] = None


class TTFBMetricsData(MetricsData):
value: float


class ProcessingMetricsData(MetricsData):
value: float


class LLMTokenUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
cache_read_input_tokens: Optional[int] = None
cache_creation_input_tokens: Optional[int] = None


class LLMUsageMetricsData(MetricsData):
value: LLMTokenUsage


class TTSUsageMetricsData(MetricsData):
value: int
9 changes: 6 additions & 3 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
MetricsFrame,
StartFrame,
StopTaskFrame)
from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.utils import obj_count, obj_id
Expand Down Expand Up @@ -118,9 +119,11 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):

def _initial_metrics_frame(self) -> MetricsFrame:
processors = self._pipeline.processors_with_metrics()
ttfb = [{"processor": p.name, "value": 0.0} for p in processors]
processing = [{"processor": p.name, "value": 0.0} for p in processors]
return MetricsFrame(ttfb=ttfb, processing=processing)
data = []
for p in processors:
data.append(TTFBMetricsData(processor=p.name, value=0.0))
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
return MetricsFrame(data=data)

async def _process_down_queue(self):
self._clock.start()
Expand Down
67 changes: 44 additions & 23 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame)
from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData)
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand All @@ -31,11 +38,20 @@ class FrameDirection(Enum):

class FrameProcessorMetrics:
def __init__(self, name: str):
self._name = name
self._core_metrics_data = MetricsData(processor=name)
self._start_ttfb_time = 0
self._start_processing_time = 0
self._should_report_ttfb = True

def _processor_name(self):
return self._core_metrics_data.processor

def _model_name(self):
return self._core_metrics_data.model

def set_core_metrics_data(self, data: MetricsData):
self._core_metrics_data = data

async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
Expand All @@ -46,13 +62,13 @@ async def stop_ttfb_metrics(self):
return None

value = time.time() - self._start_ttfb_time
logger.debug(f"{self._name} TTFB: {value}")
ttfb = {
"processor": self._name,
"value": value
}
logger.debug(f"{self._processor_name()} TTFB: {value}")
ttfb = TTFBMetricsData(
processor=self._processor_name(),
value=value,
model=self._model_name())
self._start_ttfb_time = 0
return MetricsFrame(ttfb=[ttfb])
return MetricsFrame(data=[ttfb])

async def start_processing_metrics(self):
self._start_processing_time = time.time()
Expand All @@ -62,26 +78,28 @@ async def stop_processing_metrics(self):
return None

value = time.time() - self._start_processing_time
logger.debug(f"{self._name} processing time: {value}")
processing = {
"processor": self._name,
"value": value
}
logger.debug(f"{self._processor_name()} processing time: {value}")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name())
self._start_processing_time = 0
return MetricsFrame(processing=[processing])
return MetricsFrame(data=[processing])

async def start_llm_usage_metrics(self, tokens: dict):
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
logger.debug(
f"{self._name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}")
return MetricsFrame(tokens=[tokens])
f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}")
value = LLMUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=tokens)
return MetricsFrame(data=[value])

async def start_tts_usage_metrics(self, text: str):
characters = {
"processor": self._name,
"value": len(text),
}
logger.debug(f"{self._name} usage characters: {characters['value']}")
return MetricsFrame(characters=[characters])
characters = TTSUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=len(text))
logger.debug(f"{self._processor_name()} usage characters: {characters.value}")
return MetricsFrame(data=[characters])


class FrameProcessor:
Expand Down Expand Up @@ -140,6 +158,9 @@ def report_only_initial_ttfb(self):
def can_generate_metrics(self) -> bool:
return False

def set_core_metrics_data(self, data: MetricsData):
self._metrics.set_core_metrics_data(data)

async def start_ttfb_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)
Expand All @@ -160,7 +181,7 @@ async def stop_processing_metrics(self):
if frame:
await self.push_frame(frame)

async def start_llm_usage_metrics(self, tokens: dict):
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_llm_usage_metrics(tokens)
if frame:
Expand Down
14 changes: 12 additions & 2 deletions src/pipecat/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
UserImageRequestFrame,
VisionImageRawFrame
)
from pipecat.metrics.metrics import MetricsData
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transcriptions.language import Language
from pipecat.utils.audio import calculate_audio_volume
Expand All @@ -46,6 +47,15 @@
class AIService(FrameProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._model_name: str = ""

@property
def model_name(self) -> str:
return self._model_name

def set_model_name(self, model: str):
self._model_name = model
self.set_core_metrics_data(MetricsData(processor=self.name, model=self._model_name))

async def start(self, frame: StartFrame):
pass
Expand Down Expand Up @@ -158,7 +168,7 @@ def sample_rate(self) -> int:

@abstractmethod
async def set_model(self, model: str):
pass
self.set_model_name(model)

@abstractmethod
async def set_voice(self, voice: str):
Expand Down Expand Up @@ -367,7 +377,7 @@ def __init__(self, **kwargs):

@abstractmethod
async def set_model(self, model: str):
pass
self.set_model_name(model)

@abstractmethod
async def set_language(self, language: Language):
Expand Down
23 changes: 11 additions & 12 deletions src/pipecat/services/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
FunctionCallInProgressFrame,
StartInterruptionFrame
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.processors.aggregators.openai_llm_context import (
Expand Down Expand Up @@ -84,7 +85,7 @@ def __init__(
**kwargs):
super().__init__(**kwargs)
self._client = AsyncAnthropic(api_key=api_key)
self._model = model
self.set_model_name(model)
self._max_tokens = max_tokens
self._enable_prompt_caching_beta = enable_prompt_caching_beta

Expand Down Expand Up @@ -137,7 +138,7 @@ async def _process_context(self, context: OpenAILLMContext):
tools=context.tools or [],
system=context.system,
messages=messages,
model=self._model,
model=self.model_name,
max_tokens=self._max_tokens,
stream=True)

Expand Down Expand Up @@ -231,7 +232,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
context = AnthropicLLMContext.from_image_frame(frame)
elif isinstance(frame, LLMModelUpdateFrame):
logger.debug(f"Switching LLM model to: [{frame.model}]")
self._model = frame.model
self.set_model_name(frame.model)
elif isinstance(frame, LLMEnablePromptCachingFrame):
logger.debug(f"Setting enable prompt caching to: [{frame.enable}]")
self._enable_prompt_caching_beta = frame.enable
Expand All @@ -251,15 +252,13 @@ async def _report_usage_metrics(
cache_creation_input_tokens: int,
cache_read_input_tokens: int):
if prompt_tokens or completion_tokens or cache_creation_input_tokens or cache_read_input_tokens:
tokens = {
"processor": self.name,
"model": self._model,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"cache_creation_input_tokens": cache_creation_input_tokens,
"cache_read_input_tokens": cache_read_input_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
tokens = LLMTokenUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cache_creation_input_tokens=cache_creation_input_tokens,
cache_read_input_tokens=cache_read_input_tokens,
total_tokens=prompt_tokens + completion_tokens
)
await self.start_llm_usage_metrics(tokens)


Expand Down
4 changes: 3 additions & 1 deletion src/pipecat/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
TTSStoppedFrame,
TranscriptionFrame,
URLImageRawFrame)
from pipecat.metrics.metrics import TTSUsageMetricsData
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import STTService, TTSService, ImageGenService
from pipecat.services.openai import BaseOpenAILLMService
from pipecat.utils.time import time_now_iso8601
Expand Down Expand Up @@ -190,7 +192,7 @@ def __init__(
self._api_key = api_key
self._azure_endpoint = endpoint
self._api_version = api_version
self._model = model
self.set_model_name(model)
self._image_size = image_size
self._aiohttp_session = aiohttp_session

Expand Down
Loading

0 comments on commit 58d9c84

Please sign in to comment.