Skip to content

Commit

Permalink
Refactor to Align with Merged PR #474
Browse files Browse the repository at this point in the history
  • Loading branch information
cyrilS-dev committed Sep 22, 2024
1 parent 3b3f1e5 commit cd13aff
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 65 deletions.
11 changes: 8 additions & 3 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#

import asyncio
import time

from enum import Enum

Expand All @@ -17,7 +18,10 @@
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame)
from pipecat.processors.metrics.base import FrameProcessorMetrics
from pipecat.metrics.metrics import (
LLMTokenUsage,
MetricsData)
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand All @@ -33,7 +37,7 @@ def __init__(
self,
*,
name: str | None = None,
metrics: FrameProcessorMetrics = FrameProcessorMetrics,
metrics: FrameProcessorMetrics | None = None,
sync: bool = True,
loop: asyncio.AbstractEventLoop | None = None,
**kwargs):
Expand All @@ -55,7 +59,8 @@ def __init__(
self._report_only_initial_ttfb = False

# Metrics
self._metrics = metrics(name=self.name)
self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name)

# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are
Expand Down
57 changes: 0 additions & 57 deletions src/pipecat/processors/metrics/base.py

This file was deleted.

79 changes: 79 additions & 0 deletions src/pipecat/processors/metrics/frame_processor_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import time

from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData)

from loguru import logger

class FrameProcessorMetrics:
def __init__(self):
self._start_ttfb_time = 0
self._start_processing_time = 0
self._should_report_ttfb = True

def _processor_name(self):
return self._core_metrics_data.processor

def _model_name(self):
return self._core_metrics_data.model

def set_core_metrics_data(self, data: MetricsData):
self._core_metrics_data = data

def set_processor_name(self, name: str):
self._core_metrics_data = MetricsData(processor=name)

async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
if self._start_ttfb_time == 0:
return None

value = time.time() - self._start_ttfb_time
logger.debug(f"{self._processor_name()} TTFB: {value}")
ttfb = TTFBMetricsData(
processor=self._processor_name(),
value=value,
model=self._model_name())
self._start_ttfb_time = 0
return MetricsFrame(data=[ttfb])

async def start_processing_metrics(self):
self._start_processing_time = time.time()

async def stop_processing_metrics(self):
if self._start_processing_time == 0:
return None

value = time.time() - self._start_processing_time
logger.debug(f"{self._processor_name()} processing time: {value}")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name())
self._start_processing_time = 0
return MetricsFrame(data=[processing])

async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
logger.debug(
f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}")
value = LLMUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=tokens)
return MetricsFrame(data=[value])

async def start_tts_usage_metrics(self, text: str):
characters = TTSUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=len(text))
logger.debug(f"{self._processor_name()} usage characters: {characters.value}")
return MetricsFrame(data=[characters])
13 changes: 8 additions & 5 deletions src/pipecat/processors/metrics/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
sentry_available = False
logger.debug("Sentry SDK not installed. Sentry features will be disabled.")

from pipecat.processors.metrics.base import FrameProcessorMetrics
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics

class SentryMetrics(FrameProcessorMetrics):
def __init__(self, name: str):
super().__init__(name)
def __init__(self):
super().__init__()
self._ttfb_metrics_span = None
self._processing_metrics_span = None

Expand All @@ -24,9 +24,10 @@ async def start_ttfb_metrics(self, report_only_initial_ttfb):
if sentry_available:
self._ttfb_metrics_span = sentry_sdk.start_span(
op="ttfb",
description=f"TTFB for {self._name}",
description=f"TTFB for {self._processor_name()}",
start_timestamp=self._start_ttfb_time
)
logger.debug(f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: {self._ttfb_metrics_span.description} started.")
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
Expand All @@ -39,9 +40,11 @@ async def start_processing_metrics(self):
if sentry_available:
self._processing_metrics_span = sentry_sdk.start_span(
op="processing",
description=f"Processing for {self._name}",
description=f"Processing for {self._processor_name()}",
start_timestamp=self._start_processing_time
)
logger.debug(f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: {self._processing_metrics_span.description} started.")


async def stop_processing_metrics(self):
stop_time = time.time()
Expand Down

0 comments on commit cd13aff

Please sign in to comment.