diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index e44b8b0ff..69c957c97 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -14,18 +14,14 @@ EndFrame, ErrorFrame, Frame, - MetricsFrame, StartFrame, StartInterruptionFrame, StopInterruptionFrame, SystemFrame) from pipecat.metrics.metrics import ( LLMTokenUsage, - LLMUsageMetricsData, - MetricsData, - ProcessingMetricsData, - TTFBMetricsData, - TTSUsageMetricsData) + MetricsData) +from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -36,78 +32,13 @@ class FrameDirection(Enum): UPSTREAM = 2 -class FrameProcessorMetrics: - def __init__(self, name: str): - self._core_metrics_data = MetricsData(processor=name) - self._start_ttfb_time = 0 - self._start_processing_time = 0 - self._should_report_ttfb = True - - def _processor_name(self): - return self._core_metrics_data.processor - - def _model_name(self): - return self._core_metrics_data.model - - def set_core_metrics_data(self, data: MetricsData): - self._core_metrics_data = data - - async def start_ttfb_metrics(self, report_only_initial_ttfb): - if self._should_report_ttfb: - self._start_ttfb_time = time.time() - self._should_report_ttfb = not report_only_initial_ttfb - - async def stop_ttfb_metrics(self): - if self._start_ttfb_time == 0: - return None - - value = time.time() - self._start_ttfb_time - logger.debug(f"{self._processor_name()} TTFB: {value}") - ttfb = TTFBMetricsData( - processor=self._processor_name(), - value=value, - model=self._model_name()) - self._start_ttfb_time = 0 - return MetricsFrame(data=[ttfb]) - - async def start_processing_metrics(self): - self._start_processing_time = time.time() - - async def stop_processing_metrics(self): - if self._start_processing_time == 0: - return None - - value = time.time() - self._start_processing_time - logger.debug(f"{self._processor_name()} processing time: {value}") - processing = ProcessingMetricsData( - processor=self._processor_name(), value=value, model=self._model_name()) - self._start_processing_time = 0 - return MetricsFrame(data=[processing]) - - async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): - logger.debug( - f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}") - value = LLMUsageMetricsData( - processor=self._processor_name(), - model=self._model_name(), - value=tokens) - return MetricsFrame(data=[value]) - - async def start_tts_usage_metrics(self, text: str): - characters = TTSUsageMetricsData( - processor=self._processor_name(), - model=self._model_name(), - value=len(text)) - logger.debug(f"{self._processor_name()} usage characters: {characters.value}") - return MetricsFrame(data=[characters]) - - class FrameProcessor: def __init__( self, *, name: str | None = None, + metrics: FrameProcessorMetrics | None = None, sync: bool = True, loop: asyncio.AbstractEventLoop | None = None, **kwargs): @@ -129,7 +60,8 @@ def __init__( self._report_only_initial_ttfb = False # Metrics - self._metrics = FrameProcessorMetrics(name=self.name) + self._metrics = metrics or FrameProcessorMetrics() + self._metrics.set_processor_name(self.name) # Every processor in Pipecat should only output frames from a single # task. This avoid problems like audio overlapping. System frames are @@ -262,14 +194,16 @@ async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): logger.trace(f"Pushing {frame} from {self} to {self._next}") await self._next.process_frame(frame, direction) elif direction == FrameDirection.UPSTREAM and self._prev: - logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}") + logger.trace(f"Pushing {frame} upstream from { + self} to {self._prev}") await self._prev.process_frame(frame, direction) except Exception as e: logger.exception(f"Uncaught exception in {self}: {e}") def __create_push_task(self): self.__push_queue = asyncio.Queue() - self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler()) + self.__push_frame_task = self.get_event_loop( + ).create_task(self.__push_frame_task_handler()) async def __push_frame_task_handler(self): running = True diff --git a/src/pipecat/processors/metrics/frame_processor_metrics.py b/src/pipecat/processors/metrics/frame_processor_metrics.py new file mode 100644 index 000000000..b9de3c2b4 --- /dev/null +++ b/src/pipecat/processors/metrics/frame_processor_metrics.py @@ -0,0 +1,81 @@ +import time + +from pipecat.frames.frames import MetricsFrame +from pipecat.metrics.metrics import ( + LLMTokenUsage, + LLMUsageMetricsData, + MetricsData, + ProcessingMetricsData, + TTFBMetricsData, + TTSUsageMetricsData) + +from loguru import logger + + +class FrameProcessorMetrics: + def __init__(self): + self._start_ttfb_time = 0 + self._start_processing_time = 0 + self._should_report_ttfb = True + + def _processor_name(self): + return self._core_metrics_data.processor + + def _model_name(self): + return self._core_metrics_data.model + + def set_core_metrics_data(self, data: MetricsData): + self._core_metrics_data = data + + def set_processor_name(self, name: str): + self._core_metrics_data = MetricsData(processor=name) + + async def start_ttfb_metrics(self, report_only_initial_ttfb): + if self._should_report_ttfb: + self._start_ttfb_time = time.time() + self._should_report_ttfb = not report_only_initial_ttfb + + async def stop_ttfb_metrics(self): + if self._start_ttfb_time == 0: + return None + + value = time.time() - self._start_ttfb_time + logger.debug(f"{self._processor_name()} TTFB: {value}") + ttfb = TTFBMetricsData( + processor=self._processor_name(), + value=value, + model=self._model_name()) + self._start_ttfb_time = 0 + return MetricsFrame(data=[ttfb]) + + async def start_processing_metrics(self): + self._start_processing_time = time.time() + + async def stop_processing_metrics(self): + if self._start_processing_time == 0: + return None + + value = time.time() - self._start_processing_time + logger.debug(f"{self._processor_name()} processing time: {value}") + processing = ProcessingMetricsData( + processor=self._processor_name(), value=value, model=self._model_name()) + self._start_processing_time = 0 + return MetricsFrame(data=[processing]) + + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): + logger.debug( + f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}") + value = LLMUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=tokens) + return MetricsFrame(data=[value]) + + async def start_tts_usage_metrics(self, text: str): + characters = TTSUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=len(text)) + logger.debug(f"{self._processor_name()} usage characters: { + characters.value}") + return MetricsFrame(data=[characters]) diff --git a/src/pipecat/processors/metrics/sentry.py b/src/pipecat/processors/metrics/sentry.py new file mode 100644 index 000000000..da3057e76 --- /dev/null +++ b/src/pipecat/processors/metrics/sentry.py @@ -0,0 +1,56 @@ +import time +from loguru import logger + +try: + import sentry_sdk + sentry_available = sentry_sdk.is_initialized() + if not sentry_available: + logger.warning( + "Sentry SDK not initialized. Sentry features will be disabled.") +except ImportError: + sentry_available = False + logger.warning( + "Sentry SDK not installed. Sentry features will be disabled.") + +from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics + + +class SentryMetrics(FrameProcessorMetrics): + def __init__(self): + super().__init__() + self._ttfb_metrics_span = None + self._processing_metrics_span = None + + async def start_ttfb_metrics(self, report_only_initial_ttfb): + if self._should_report_ttfb: + self._start_ttfb_time = time.time() + if sentry_available: + self._ttfb_metrics_span = sentry_sdk.start_span( + op="ttfb", + description=f"TTFB for {self._processor_name()}", + start_timestamp=self._start_ttfb_time + ) + logger.debug(f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: { + self._ttfb_metrics_span.description} started.") + self._should_report_ttfb = not report_only_initial_ttfb + + async def stop_ttfb_metrics(self): + stop_time = time.time() + if sentry_available: + self._ttfb_metrics_span.finish(end_timestamp=stop_time) + + async def start_processing_metrics(self): + self._start_processing_time = time.time() + if sentry_available: + self._processing_metrics_span = sentry_sdk.start_span( + op="processing", + description=f"Processing for {self._processor_name()}", + start_timestamp=self._start_processing_time + ) + logger.debug(f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: { + self._processing_metrics_span.description} started.") + + async def stop_processing_metrics(self): + stop_time = time.time() + if sentry_available: + self._processing_metrics_span.finish(end_timestamp=stop_time) diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index e3089dd0c..25f7b7a56 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -75,7 +75,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") base_url = self._base_url - request_url = f"{base_url}?model={self._voice}&encoding={self._encoding}&container=none&sample_rate={self._sample_rate}" + request_url = f"{base_url}?model={self._voice}&encoding={ + self._encoding}&container=none&sample_rate={self._sample_rate}" headers = {"authorization": f"token {self._api_key}"} body = {"text": text} @@ -124,6 +125,7 @@ def __init__(self, smart_format=True, punctuate=True, profanity_filter=True, + vad_events=False, ), **kwargs): super().__init__(**kwargs) @@ -132,8 +134,20 @@ def __init__(self, self._client = DeepgramClient( api_key, config=DeepgramClientOptions(url=url, options={"keepalive": "true"})) - self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1") - self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message) + self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v( + "1") + self._connection.on( + LiveTranscriptionEvents.Transcript, self._on_message) + if self.vad_enabled: + self._connection.on( + LiveTranscriptionEvents.SpeechStarted, self._on_speech_started) + + @property + def vad_enabled(self): + return self._live_options.vad_events + + def can_generate_metrics(self) -> bool: + return self.vad_enabled async def set_model(self, model: str): await super().set_model(model) @@ -161,9 +175,7 @@ async def cancel(self, frame: CancelFrame): await self._disconnect() async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - await self.start_processing_metrics() await self._connection.send(audio) - await self.stop_processing_metrics() yield None async def _connect(self): @@ -177,6 +189,10 @@ async def _disconnect(self): await self._connection.finish() logger.debug(f"{self}: Disconnected from Deepgram") + async def _on_speech_started(self, *args, **kwargs): + await self.start_ttfb_metrics() + await self.start_processing_metrics() + async def _on_message(self, *args, **kwargs): result: LiveResultResponse = kwargs["result"] if len(result.channel.alternatives) == 0: @@ -188,7 +204,9 @@ async def _on_message(self, *args, **kwargs): language = result.channel.alternatives[0].languages[0] language = Language(language) if len(transcript) > 0: + await self.stop_ttfb_metrics() if is_final: await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601(), language)) + await self.stop_processing_metrics() else: await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601(), language))