diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index ffb603a58..7daa93199 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -5,6 +5,7 @@ # import asyncio +import time from enum import Enum @@ -17,7 +18,10 @@ StartInterruptionFrame, StopInterruptionFrame, SystemFrame) -from pipecat.processors.metrics.base import FrameProcessorMetrics +from pipecat.metrics.metrics import ( + LLMTokenUsage, + MetricsData) +from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -33,7 +37,7 @@ def __init__( self, *, name: str | None = None, - metrics: FrameProcessorMetrics = FrameProcessorMetrics, + metrics: FrameProcessorMetrics | None = None, sync: bool = True, loop: asyncio.AbstractEventLoop | None = None, **kwargs): @@ -55,7 +59,8 @@ def __init__( self._report_only_initial_ttfb = False # Metrics - self._metrics = metrics(name=self.name) + self._metrics = metrics or FrameProcessorMetrics() + self._metrics.set_processor_name(self.name) # Every processor in Pipecat should only output frames from a single # task. This avoid problems like audio overlapping. System frames are diff --git a/src/pipecat/processors/metrics/base.py b/src/pipecat/processors/metrics/base.py deleted file mode 100644 index ddda212a3..000000000 --- a/src/pipecat/processors/metrics/base.py +++ /dev/null @@ -1,57 +0,0 @@ -import time -from loguru import logger -from pipecat.frames.frames import MetricsFrame - -class FrameProcessorMetrics: - def __init__(self, name: str): - self._name = name - self._start_ttfb_time = 0 - self._start_processing_time = 0 - self._should_report_ttfb = True - - async def start_ttfb_metrics(self, report_only_initial_ttfb): - if self._should_report_ttfb: - self._start_ttfb_time = time.time() - self._should_report_ttfb = not report_only_initial_ttfb - - async def stop_ttfb_metrics(self): - if self._start_ttfb_time == 0: - return None - - value = time.time() - self._start_ttfb_time - logger.debug(f"{self._name} TTFB: {value}") - ttfb = { - "processor": self._name, - "value": value - } - self._start_ttfb_time = 0 - return MetricsFrame(ttfb=[ttfb]) - - async def start_processing_metrics(self): - self._start_processing_time = time.time() - - async def stop_processing_metrics(self): - if self._start_processing_time == 0: - return None - - value = time.time() - self._start_processing_time - logger.debug(f"{self._name} processing time: {value}") - processing = { - "processor": self._name, - "value": value - } - self._start_processing_time = 0 - return MetricsFrame(processing=[processing]) - - async def start_llm_usage_metrics(self, tokens: dict): - logger.debug( - f"{self._name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}") - return MetricsFrame(tokens=[tokens]) - - async def start_tts_usage_metrics(self, text: str): - characters = { - "processor": self._name, - "value": len(text), - } - logger.debug(f"{self._name} usage characters: {characters['value']}") - return MetricsFrame(characters=[characters]) diff --git a/src/pipecat/processors/metrics/frame_processor_metrics.py b/src/pipecat/processors/metrics/frame_processor_metrics.py new file mode 100644 index 000000000..9f3aa3039 --- /dev/null +++ b/src/pipecat/processors/metrics/frame_processor_metrics.py @@ -0,0 +1,79 @@ +import time + +from pipecat.frames.frames import MetricsFrame +from pipecat.metrics.metrics import ( + LLMTokenUsage, + LLMUsageMetricsData, + MetricsData, + ProcessingMetricsData, + TTFBMetricsData, + TTSUsageMetricsData) + +from loguru import logger + +class FrameProcessorMetrics: + def __init__(self): + self._start_ttfb_time = 0 + self._start_processing_time = 0 + self._should_report_ttfb = True + + def _processor_name(self): + return self._core_metrics_data.processor + + def _model_name(self): + return self._core_metrics_data.model + + def set_core_metrics_data(self, data: MetricsData): + self._core_metrics_data = data + + def set_processor_name(self, name: str): + self._core_metrics_data = MetricsData(processor=name) + + async def start_ttfb_metrics(self, report_only_initial_ttfb): + if self._should_report_ttfb: + self._start_ttfb_time = time.time() + self._should_report_ttfb = not report_only_initial_ttfb + + async def stop_ttfb_metrics(self): + if self._start_ttfb_time == 0: + return None + + value = time.time() - self._start_ttfb_time + logger.debug(f"{self._processor_name()} TTFB: {value}") + ttfb = TTFBMetricsData( + processor=self._processor_name(), + value=value, + model=self._model_name()) + self._start_ttfb_time = 0 + return MetricsFrame(data=[ttfb]) + + async def start_processing_metrics(self): + self._start_processing_time = time.time() + + async def stop_processing_metrics(self): + if self._start_processing_time == 0: + return None + + value = time.time() - self._start_processing_time + logger.debug(f"{self._processor_name()} processing time: {value}") + processing = ProcessingMetricsData( + processor=self._processor_name(), value=value, model=self._model_name()) + self._start_processing_time = 0 + return MetricsFrame(data=[processing]) + + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): + logger.debug( + f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}") + value = LLMUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=tokens) + return MetricsFrame(data=[value]) + + async def start_tts_usage_metrics(self, text: str): + characters = TTSUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=len(text)) + logger.debug(f"{self._processor_name()} usage characters: {characters.value}") + return MetricsFrame(data=[characters]) diff --git a/src/pipecat/processors/metrics/sentry.py b/src/pipecat/processors/metrics/sentry.py index ebad58316..0263a0e9e 100644 --- a/src/pipecat/processors/metrics/sentry.py +++ b/src/pipecat/processors/metrics/sentry.py @@ -10,11 +10,11 @@ sentry_available = False logger.debug("Sentry SDK not installed. Sentry features will be disabled.") -from pipecat.processors.metrics.base import FrameProcessorMetrics +from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics class SentryMetrics(FrameProcessorMetrics): - def __init__(self, name: str): - super().__init__(name) + def __init__(self): + super().__init__() self._ttfb_metrics_span = None self._processing_metrics_span = None @@ -24,9 +24,10 @@ async def start_ttfb_metrics(self, report_only_initial_ttfb): if sentry_available: self._ttfb_metrics_span = sentry_sdk.start_span( op="ttfb", - description=f"TTFB for {self._name}", + description=f"TTFB for {self._processor_name()}", start_timestamp=self._start_ttfb_time ) + logger.debug(f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: {self._ttfb_metrics_span.description} started.") self._should_report_ttfb = not report_only_initial_ttfb async def stop_ttfb_metrics(self): @@ -39,9 +40,11 @@ async def start_processing_metrics(self): if sentry_available: self._processing_metrics_span = sentry_sdk.start_span( op="processing", - description=f"Processing for {self._name}", + description=f"Processing for {self._processor_name()}", start_timestamp=self._start_processing_time ) + logger.debug(f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: {self._processing_metrics_span.description} started.") + async def stop_processing_metrics(self): stop_time = time.time()