From d9d41d9db47262fe8be1a1525d39fba72f4ea07e Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Tue, 17 Sep 2024 21:57:14 +0200 Subject: [PATCH 01/11] feat: Add Sentry support in FrameProcessor This update add optional Sentry integration for performance tracking and error monitoring. Key changes include: - Add conditional Sentry import and initialization check - Implement Sentry spans in FrameProcessorMetrics to measure TTFB (Time To First Byte) and processing time when Sentry is available - Maintain existing metrics functionality with MetricsFrame regardless of Sentry availability --- src/pipecat/processors/frame_processor.py | 34 +++++++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index dfdee7d40..c2c0135b0 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -21,6 +21,14 @@ from loguru import logger +try: + import sentry_sdk + sentry_available = sentry_sdk.is_initialized() + if not sentry_available: + logger.debug("Sentry SDK not initialized. Sentry features will be disabled.") +except ImportError: + sentry_available = False + logger.debug("Sentry SDK not installed. Sentry features will be disabled.") class FrameDirection(Enum): DOWNSTREAM = 1 @@ -33,17 +41,29 @@ def __init__(self, name: str): self._start_ttfb_time = 0 self._start_processing_time = 0 self._should_report_ttfb = True + if sentry_available: + self._ttfb_metrics_span = None + self._processing_metrics_span = None async def start_ttfb_metrics(self, report_only_initial_ttfb): if self._should_report_ttfb: self._start_ttfb_time = time.time() + if sentry_available: + self._ttfb_metrics_span = sentry_sdk.start_span( + op="ttfb", + description=f"TTFB for {self._name}", + start_timestamp=self._start_ttfb_time + ) self._should_report_ttfb = not report_only_initial_ttfb async def stop_ttfb_metrics(self): if self._start_ttfb_time == 0: return None - value = time.time() - self._start_ttfb_time + stop_time = time.time() + value = stop_time - self._start_ttfb_time + if sentry_available: + self._ttfb_metrics_span.finish(end_timestamp=stop_time) logger.debug(f"{self._name} TTFB: {value}") ttfb = { "processor": self._name, @@ -54,12 +74,20 @@ async def stop_ttfb_metrics(self): async def start_processing_metrics(self): self._start_processing_time = time.time() + if sentry_available: + self._processing_metrics_span = sentry_sdk.start_span( + op="processing", + description=f"TTFB for {self._name}", + start_timestamp=self._start_processing_time + ) async def stop_processing_metrics(self): if self._start_processing_time == 0: return None - - value = time.time() - self._start_processing_time + stop_time = time.time() + value = stop_time - self._start_processing_time + if sentry_available: + self._processing_metrics_span.finish(end_timestamp=stop_time) logger.debug(f"{self._name} processing time: {value}") processing = { "processor": self._name, From 64605b3c0b0c18926f843956088b3c0b6d9dc38c Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Tue, 17 Sep 2024 22:05:55 +0200 Subject: [PATCH 02/11] feat: Enable metrics in DeepgramSTTService for Sentry This commit enhances the DeepgramSTTService class to enable metrics generation for use with Sentry. Key changes include: 1. Enable general metrics generation: - Implement `can_generate_metrics` method, returning True when VAD is enabled - This allows metrics to be collected and used by both Sentry and the metrics system in frame_processor.py 2. Integrate Sentry-compatible performance tracking: - Add start_ttfb_metrics and start_processing_metrics calls in the VAD speech detection handler - Implement stop_ttfb_metrics call when receiving transcripts - Add stop_processing_metrics for final transcripts 3. Enhance VAD support for metrics: - Add `vad_enabled` property to check VAD event availability - Implement VAD-based speech detection handler for precise metric timing These changes enable detailed performance tracking via both Sentry and the general metrics system when VAD is active. This allows for better monitoring and analysis of the speech-to-text process, providing valuable insights through Sentry and any other metrics consumers in the pipeline. --- src/pipecat/services/deepgram.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index d899d4bdb..381ec1bda 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -123,6 +123,7 @@ def __init__(self, smart_format=True, punctuate=True, profanity_filter=True, + vad_events=False, ), **kwargs): super().__init__(**kwargs) @@ -133,6 +134,15 @@ def __init__(self, api_key, config=DeepgramClientOptions(url=url, options={"keepalive": "true"})) self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1") self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message) + if self.vad_enabled: + self._connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started) + + @property + def vad_enabled(self): + return self._live_options.vad_events + + def can_generate_metrics(self) -> bool: + return self.vad_enabled async def set_model(self, model: str): logger.debug(f"Switching STT model to: [{model}]") @@ -159,10 +169,8 @@ async def cancel(self, frame: CancelFrame): await self._disconnect() async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - await self.start_processing_metrics() await self._connection.send(audio) yield None - await self.stop_processing_metrics() async def _connect(self): if await self._connection.start(self._live_options): @@ -174,6 +182,10 @@ async def _disconnect(self): if self._connection.is_connected: await self._connection.finish() logger.debug(f"{self}: Disconnected from Deepgram") + + async def _on_speech_started(self, *args, **kwargs): + await self.start_ttfb_metrics() + await self.start_processing_metrics() async def _on_message(self, *args, **kwargs): result: LiveResultResponse = kwargs["result"] @@ -186,7 +198,9 @@ async def _on_message(self, *args, **kwargs): language = result.channel.alternatives[0].languages[0] language = Language(language) if len(transcript) > 0: + await self.stop_ttfb_metrics() if is_final: await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601(), language)) + await self.stop_processing_metrics() else: await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601(), language)) From ad97e75a533de2fe62a890eb0b7039c55fbc25ed Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Tue, 17 Sep 2024 23:56:34 +0200 Subject: [PATCH 03/11] Update frame_processor.py --- src/pipecat/processors/frame_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index c2c0135b0..18d203283 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -77,7 +77,7 @@ async def start_processing_metrics(self): if sentry_available: self._processing_metrics_span = sentry_sdk.start_span( op="processing", - description=f"TTFB for {self._name}", + description=f"Processing for {self._name}", start_timestamp=self._start_processing_time ) From c442c229affa30a44ddf3a6b0922b8e8447c25d0 Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Thu, 19 Sep 2024 13:40:28 +0200 Subject: [PATCH 04/11] Refactor to support flexible metrics implementation - Modified the __init__ method to accept a metrics parameter that is either FrameProcessorMetrics or one of its subclasses - Updated the metrics initialization to create an instance with the processor's name - Moved all FrameProcessorMetrics-related logic to a new processors\metrics\base.py file --- src/pipecat/processors/frame_processor.py | 90 +---------------------- 1 file changed, 3 insertions(+), 87 deletions(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 18d203283..a675e3e2c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -5,7 +5,6 @@ # import asyncio -import time from enum import Enum @@ -13,109 +12,26 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, - MetricsFrame, StartFrame, StartInterruptionFrame, UserStoppedSpeakingFrame) +from pipecat.processors.metrics.base import FrameProcessorMetrics from pipecat.utils.utils import obj_count, obj_id from loguru import logger -try: - import sentry_sdk - sentry_available = sentry_sdk.is_initialized() - if not sentry_available: - logger.debug("Sentry SDK not initialized. Sentry features will be disabled.") -except ImportError: - sentry_available = False - logger.debug("Sentry SDK not installed. Sentry features will be disabled.") class FrameDirection(Enum): DOWNSTREAM = 1 UPSTREAM = 2 - -class FrameProcessorMetrics: - def __init__(self, name: str): - self._name = name - self._start_ttfb_time = 0 - self._start_processing_time = 0 - self._should_report_ttfb = True - if sentry_available: - self._ttfb_metrics_span = None - self._processing_metrics_span = None - - async def start_ttfb_metrics(self, report_only_initial_ttfb): - if self._should_report_ttfb: - self._start_ttfb_time = time.time() - if sentry_available: - self._ttfb_metrics_span = sentry_sdk.start_span( - op="ttfb", - description=f"TTFB for {self._name}", - start_timestamp=self._start_ttfb_time - ) - self._should_report_ttfb = not report_only_initial_ttfb - - async def stop_ttfb_metrics(self): - if self._start_ttfb_time == 0: - return None - - stop_time = time.time() - value = stop_time - self._start_ttfb_time - if sentry_available: - self._ttfb_metrics_span.finish(end_timestamp=stop_time) - logger.debug(f"{self._name} TTFB: {value}") - ttfb = { - "processor": self._name, - "value": value - } - self._start_ttfb_time = 0 - return MetricsFrame(ttfb=[ttfb]) - - async def start_processing_metrics(self): - self._start_processing_time = time.time() - if sentry_available: - self._processing_metrics_span = sentry_sdk.start_span( - op="processing", - description=f"Processing for {self._name}", - start_timestamp=self._start_processing_time - ) - - async def stop_processing_metrics(self): - if self._start_processing_time == 0: - return None - stop_time = time.time() - value = stop_time - self._start_processing_time - if sentry_available: - self._processing_metrics_span.finish(end_timestamp=stop_time) - logger.debug(f"{self._name} processing time: {value}") - processing = { - "processor": self._name, - "value": value - } - self._start_processing_time = 0 - return MetricsFrame(processing=[processing]) - - async def start_llm_usage_metrics(self, tokens: dict): - logger.debug( - f"{self._name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}") - return MetricsFrame(tokens=[tokens]) - - async def start_tts_usage_metrics(self, text: str): - characters = { - "processor": self._name, - "value": len(text), - } - logger.debug(f"{self._name} usage characters: {characters['value']}") - return MetricsFrame(characters=[characters]) - - class FrameProcessor: def __init__( self, *, name: str | None = None, + metrics: FrameProcessorMetrics = FrameProcessorMetrics, loop: asyncio.AbstractEventLoop | None = None, **kwargs): self.id: int = obj_id() @@ -135,7 +51,7 @@ def __init__( self._report_only_initial_ttfb = False # Metrics - self._metrics = FrameProcessorMetrics(name=self.name) + self._metrics = metrics(name=self.name) @property def interruptions_allowed(self): From 34014eba6315bfe0e906a3ec1b46b8622f6a55f8 Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Thu, 19 Sep 2024 13:43:27 +0200 Subject: [PATCH 05/11] Implement flexible metrics system with Sentry integration 1. Created a new metrics module in processors/metrics/ 2. Implemented FrameProcessorMetrics base class in base.py: 3. Implemented SentryMetrics class in sentry.py: - Inherits from FrameProcessorMetrics - Integrates with Sentry SDK for advanced metrics tracking - Implements Sentry-specific span creation and management for TTFB and processing metrics - Handles cases where Sentry is not available or initialized --- src/pipecat/processors/metrics/base.py | 57 ++++++++++++++++++++++++ src/pipecat/processors/metrics/sentry.py | 49 ++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 src/pipecat/processors/metrics/base.py create mode 100644 src/pipecat/processors/metrics/sentry.py diff --git a/src/pipecat/processors/metrics/base.py b/src/pipecat/processors/metrics/base.py new file mode 100644 index 000000000..c9df65d6d --- /dev/null +++ b/src/pipecat/processors/metrics/base.py @@ -0,0 +1,57 @@ +import time +from loguru import logger +from pipecat.frames.frames import MetricsFrame + +class FrameProcessorMetrics: + def __init__(self, name: str): + self._name = name + self._start_ttfb_time = 0 + self._start_processing_time = 0 + self._should_report_ttfb = True + + async def start_ttfb_metrics(self, report_only_initial_ttfb): + if self._should_report_ttfb: + self._start_ttfb_time = time.time() + self._should_report_ttfb = not report_only_initial_ttfb + + async def stop_ttfb_metrics(self): + if self._start_ttfb_time == 0: + return None + + value = time.time() - self._start_ttfb_time + logger.debug(f"{self._name} TTFB: {value}") + ttfb = { + "processor": self._name, + "value": value + } + self._start_ttfb_time = 0 + return MetricsFrame(ttfb=[ttfb]) + + async def start_processing_metrics(self): + self._start_processing_time = time.time() + + async def stop_processing_metrics(self): + if self._start_processing_time == 0: + return None + + value = time.time() - self._start_processing_time + logger.debug(f"{self._name} processing time: {value}") + processing = { + "processor": self._name, + "value": value + } + self._start_processing_time = 0 + return MetricsFrame(processing=[processing]) + + async def start_llm_usage_metrics(self, tokens: dict): + logger.debug( + f"{self._name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}") + return MetricsFrame(tokens=[tokens]) + + async def start_tts_usage_metrics(self, text: str): + characters = { + "processor": self._name, + "value": len(text), + } + logger.debug(f"{self._name} usage characters: {characters['value']}") + return MetricsFrame(characters=[characters]) \ No newline at end of file diff --git a/src/pipecat/processors/metrics/sentry.py b/src/pipecat/processors/metrics/sentry.py new file mode 100644 index 000000000..ebad58316 --- /dev/null +++ b/src/pipecat/processors/metrics/sentry.py @@ -0,0 +1,49 @@ +import time +from loguru import logger + +try: + import sentry_sdk + sentry_available = sentry_sdk.is_initialized() + if not sentry_available: + logger.debug("Sentry SDK not initialized. Sentry features will be disabled.") +except ImportError: + sentry_available = False + logger.debug("Sentry SDK not installed. Sentry features will be disabled.") + +from pipecat.processors.metrics.base import FrameProcessorMetrics + +class SentryMetrics(FrameProcessorMetrics): + def __init__(self, name: str): + super().__init__(name) + self._ttfb_metrics_span = None + self._processing_metrics_span = None + + async def start_ttfb_metrics(self, report_only_initial_ttfb): + if self._should_report_ttfb: + self._start_ttfb_time = time.time() + if sentry_available: + self._ttfb_metrics_span = sentry_sdk.start_span( + op="ttfb", + description=f"TTFB for {self._name}", + start_timestamp=self._start_ttfb_time + ) + self._should_report_ttfb = not report_only_initial_ttfb + + async def stop_ttfb_metrics(self): + stop_time = time.time() + if sentry_available: + self._ttfb_metrics_span.finish(end_timestamp=stop_time) + + async def start_processing_metrics(self): + self._start_processing_time = time.time() + if sentry_available: + self._processing_metrics_span = sentry_sdk.start_span( + op="processing", + description=f"Processing for {self._name}", + start_timestamp=self._start_processing_time + ) + + async def stop_processing_metrics(self): + stop_time = time.time() + if sentry_available: + self._processing_metrics_span.finish(end_timestamp=stop_time) From bbbcf4f104877fcec7792f83d6be95031a4d05a4 Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Thu, 19 Sep 2024 14:09:32 +0200 Subject: [PATCH 06/11] Add missing newline at end of file --- src/pipecat/processors/metrics/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/processors/metrics/base.py b/src/pipecat/processors/metrics/base.py index c9df65d6d..ddda212a3 100644 --- a/src/pipecat/processors/metrics/base.py +++ b/src/pipecat/processors/metrics/base.py @@ -54,4 +54,4 @@ async def start_tts_usage_metrics(self, text: str): "value": len(text), } logger.debug(f"{self._name} usage characters: {characters['value']}") - return MetricsFrame(characters=[characters]) \ No newline at end of file + return MetricsFrame(characters=[characters]) From cd13aff2ab809abc307455253b7243bbe51cd8c2 Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Sun, 22 Sep 2024 15:18:31 +0200 Subject: [PATCH 07/11] Refactor to Align with Merged PR #474 --- src/pipecat/processors/frame_processor.py | 11 ++- src/pipecat/processors/metrics/base.py | 57 ------------- .../metrics/frame_processor_metrics.py | 79 +++++++++++++++++++ src/pipecat/processors/metrics/sentry.py | 13 +-- 4 files changed, 95 insertions(+), 65 deletions(-) delete mode 100644 src/pipecat/processors/metrics/base.py create mode 100644 src/pipecat/processors/metrics/frame_processor_metrics.py diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index ffb603a58..7daa93199 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -5,6 +5,7 @@ # import asyncio +import time from enum import Enum @@ -17,7 +18,10 @@ StartInterruptionFrame, StopInterruptionFrame, SystemFrame) -from pipecat.processors.metrics.base import FrameProcessorMetrics +from pipecat.metrics.metrics import ( + LLMTokenUsage, + MetricsData) +from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -33,7 +37,7 @@ def __init__( self, *, name: str | None = None, - metrics: FrameProcessorMetrics = FrameProcessorMetrics, + metrics: FrameProcessorMetrics | None = None, sync: bool = True, loop: asyncio.AbstractEventLoop | None = None, **kwargs): @@ -55,7 +59,8 @@ def __init__( self._report_only_initial_ttfb = False # Metrics - self._metrics = metrics(name=self.name) + self._metrics = metrics or FrameProcessorMetrics() + self._metrics.set_processor_name(self.name) # Every processor in Pipecat should only output frames from a single # task. This avoid problems like audio overlapping. System frames are diff --git a/src/pipecat/processors/metrics/base.py b/src/pipecat/processors/metrics/base.py deleted file mode 100644 index ddda212a3..000000000 --- a/src/pipecat/processors/metrics/base.py +++ /dev/null @@ -1,57 +0,0 @@ -import time -from loguru import logger -from pipecat.frames.frames import MetricsFrame - -class FrameProcessorMetrics: - def __init__(self, name: str): - self._name = name - self._start_ttfb_time = 0 - self._start_processing_time = 0 - self._should_report_ttfb = True - - async def start_ttfb_metrics(self, report_only_initial_ttfb): - if self._should_report_ttfb: - self._start_ttfb_time = time.time() - self._should_report_ttfb = not report_only_initial_ttfb - - async def stop_ttfb_metrics(self): - if self._start_ttfb_time == 0: - return None - - value = time.time() - self._start_ttfb_time - logger.debug(f"{self._name} TTFB: {value}") - ttfb = { - "processor": self._name, - "value": value - } - self._start_ttfb_time = 0 - return MetricsFrame(ttfb=[ttfb]) - - async def start_processing_metrics(self): - self._start_processing_time = time.time() - - async def stop_processing_metrics(self): - if self._start_processing_time == 0: - return None - - value = time.time() - self._start_processing_time - logger.debug(f"{self._name} processing time: {value}") - processing = { - "processor": self._name, - "value": value - } - self._start_processing_time = 0 - return MetricsFrame(processing=[processing]) - - async def start_llm_usage_metrics(self, tokens: dict): - logger.debug( - f"{self._name} prompt tokens: {tokens['prompt_tokens']}, completion tokens: {tokens['completion_tokens']}") - return MetricsFrame(tokens=[tokens]) - - async def start_tts_usage_metrics(self, text: str): - characters = { - "processor": self._name, - "value": len(text), - } - logger.debug(f"{self._name} usage characters: {characters['value']}") - return MetricsFrame(characters=[characters]) diff --git a/src/pipecat/processors/metrics/frame_processor_metrics.py b/src/pipecat/processors/metrics/frame_processor_metrics.py new file mode 100644 index 000000000..9f3aa3039 --- /dev/null +++ b/src/pipecat/processors/metrics/frame_processor_metrics.py @@ -0,0 +1,79 @@ +import time + +from pipecat.frames.frames import MetricsFrame +from pipecat.metrics.metrics import ( + LLMTokenUsage, + LLMUsageMetricsData, + MetricsData, + ProcessingMetricsData, + TTFBMetricsData, + TTSUsageMetricsData) + +from loguru import logger + +class FrameProcessorMetrics: + def __init__(self): + self._start_ttfb_time = 0 + self._start_processing_time = 0 + self._should_report_ttfb = True + + def _processor_name(self): + return self._core_metrics_data.processor + + def _model_name(self): + return self._core_metrics_data.model + + def set_core_metrics_data(self, data: MetricsData): + self._core_metrics_data = data + + def set_processor_name(self, name: str): + self._core_metrics_data = MetricsData(processor=name) + + async def start_ttfb_metrics(self, report_only_initial_ttfb): + if self._should_report_ttfb: + self._start_ttfb_time = time.time() + self._should_report_ttfb = not report_only_initial_ttfb + + async def stop_ttfb_metrics(self): + if self._start_ttfb_time == 0: + return None + + value = time.time() - self._start_ttfb_time + logger.debug(f"{self._processor_name()} TTFB: {value}") + ttfb = TTFBMetricsData( + processor=self._processor_name(), + value=value, + model=self._model_name()) + self._start_ttfb_time = 0 + return MetricsFrame(data=[ttfb]) + + async def start_processing_metrics(self): + self._start_processing_time = time.time() + + async def stop_processing_metrics(self): + if self._start_processing_time == 0: + return None + + value = time.time() - self._start_processing_time + logger.debug(f"{self._processor_name()} processing time: {value}") + processing = ProcessingMetricsData( + processor=self._processor_name(), value=value, model=self._model_name()) + self._start_processing_time = 0 + return MetricsFrame(data=[processing]) + + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): + logger.debug( + f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}") + value = LLMUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=tokens) + return MetricsFrame(data=[value]) + + async def start_tts_usage_metrics(self, text: str): + characters = TTSUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=len(text)) + logger.debug(f"{self._processor_name()} usage characters: {characters.value}") + return MetricsFrame(data=[characters]) diff --git a/src/pipecat/processors/metrics/sentry.py b/src/pipecat/processors/metrics/sentry.py index ebad58316..0263a0e9e 100644 --- a/src/pipecat/processors/metrics/sentry.py +++ b/src/pipecat/processors/metrics/sentry.py @@ -10,11 +10,11 @@ sentry_available = False logger.debug("Sentry SDK not installed. Sentry features will be disabled.") -from pipecat.processors.metrics.base import FrameProcessorMetrics +from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics class SentryMetrics(FrameProcessorMetrics): - def __init__(self, name: str): - super().__init__(name) + def __init__(self): + super().__init__() self._ttfb_metrics_span = None self._processing_metrics_span = None @@ -24,9 +24,10 @@ async def start_ttfb_metrics(self, report_only_initial_ttfb): if sentry_available: self._ttfb_metrics_span = sentry_sdk.start_span( op="ttfb", - description=f"TTFB for {self._name}", + description=f"TTFB for {self._processor_name()}", start_timestamp=self._start_ttfb_time ) + logger.debug(f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: {self._ttfb_metrics_span.description} started.") self._should_report_ttfb = not report_only_initial_ttfb async def stop_ttfb_metrics(self): @@ -39,9 +40,11 @@ async def start_processing_metrics(self): if sentry_available: self._processing_metrics_span = sentry_sdk.start_span( op="processing", - description=f"Processing for {self._name}", + description=f"Processing for {self._processor_name()}", start_timestamp=self._start_processing_time ) + logger.debug(f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: {self._processing_metrics_span.description} started.") + async def stop_processing_metrics(self): stop_time = time.time() From f7f4c9517860345086e9bba7e1f4416c18ecef92 Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Mon, 23 Sep 2024 11:56:16 +0200 Subject: [PATCH 08/11] Format with autopep8 & adjust logger level --- src/pipecat/processors/metrics/sentry.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/pipecat/processors/metrics/sentry.py b/src/pipecat/processors/metrics/sentry.py index 0263a0e9e..da3057e76 100644 --- a/src/pipecat/processors/metrics/sentry.py +++ b/src/pipecat/processors/metrics/sentry.py @@ -5,13 +5,16 @@ import sentry_sdk sentry_available = sentry_sdk.is_initialized() if not sentry_available: - logger.debug("Sentry SDK not initialized. Sentry features will be disabled.") + logger.warning( + "Sentry SDK not initialized. Sentry features will be disabled.") except ImportError: sentry_available = False - logger.debug("Sentry SDK not installed. Sentry features will be disabled.") + logger.warning( + "Sentry SDK not installed. Sentry features will be disabled.") from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics + class SentryMetrics(FrameProcessorMetrics): def __init__(self): super().__init__() @@ -23,11 +26,12 @@ async def start_ttfb_metrics(self, report_only_initial_ttfb): self._start_ttfb_time = time.time() if sentry_available: self._ttfb_metrics_span = sentry_sdk.start_span( - op="ttfb", + op="ttfb", description=f"TTFB for {self._processor_name()}", start_timestamp=self._start_ttfb_time ) - logger.debug(f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: {self._ttfb_metrics_span.description} started.") + logger.debug(f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: { + self._ttfb_metrics_span.description} started.") self._should_report_ttfb = not report_only_initial_ttfb async def stop_ttfb_metrics(self): @@ -39,12 +43,12 @@ async def start_processing_metrics(self): self._start_processing_time = time.time() if sentry_available: self._processing_metrics_span = sentry_sdk.start_span( - op="processing", + op="processing", description=f"Processing for {self._processor_name()}", start_timestamp=self._start_processing_time ) - logger.debug(f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: {self._processing_metrics_span.description} started.") - + logger.debug(f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: { + self._processing_metrics_span.description} started.") async def stop_processing_metrics(self): stop_time = time.time() From a2adca1c4003ba22b28865e5a2fa5f98ae157aa0 Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Mon, 23 Sep 2024 11:56:44 +0200 Subject: [PATCH 09/11] Format with autopep8 --- src/pipecat/processors/metrics/frame_processor_metrics.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pipecat/processors/metrics/frame_processor_metrics.py b/src/pipecat/processors/metrics/frame_processor_metrics.py index 9f3aa3039..b9de3c2b4 100644 --- a/src/pipecat/processors/metrics/frame_processor_metrics.py +++ b/src/pipecat/processors/metrics/frame_processor_metrics.py @@ -11,6 +11,7 @@ from loguru import logger + class FrameProcessorMetrics: def __init__(self): self._start_ttfb_time = 0 @@ -75,5 +76,6 @@ async def start_tts_usage_metrics(self, text: str): processor=self._processor_name(), model=self._model_name(), value=len(text)) - logger.debug(f"{self._processor_name()} usage characters: {characters.value}") + logger.debug(f"{self._processor_name()} usage characters: { + characters.value}") return MetricsFrame(data=[characters]) From 4ef47b2bb524671ecd8c3cc8e1393c8b49429844 Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Mon, 23 Sep 2024 11:57:25 +0200 Subject: [PATCH 10/11] Format with autopep8 --- src/pipecat/processors/frame_processor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 7daa93199..69c957c97 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -31,6 +31,7 @@ class FrameDirection(Enum): DOWNSTREAM = 1 UPSTREAM = 2 + class FrameProcessor: def __init__( @@ -193,14 +194,16 @@ async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): logger.trace(f"Pushing {frame} from {self} to {self._next}") await self._next.process_frame(frame, direction) elif direction == FrameDirection.UPSTREAM and self._prev: - logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}") + logger.trace(f"Pushing {frame} upstream from { + self} to {self._prev}") await self._prev.process_frame(frame, direction) except Exception as e: logger.exception(f"Uncaught exception in {self}: {e}") def __create_push_task(self): self.__push_queue = asyncio.Queue() - self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler()) + self.__push_frame_task = self.get_event_loop( + ).create_task(self.__push_frame_task_handler()) async def __push_frame_task_handler(self): running = True From 6d6d2cdc951f6a1ab7a1420f3453bbe7d8d8b77e Mon Sep 17 00:00:00 2001 From: cs-metadexlabs Date: Mon, 23 Sep 2024 11:57:57 +0200 Subject: [PATCH 11/11] Format with autopep8 --- src/pipecat/services/deepgram.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 3466f485a..25f7b7a56 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -75,7 +75,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") base_url = self._base_url - request_url = f"{base_url}?model={self._voice}&encoding={self._encoding}&container=none&sample_rate={self._sample_rate}" + request_url = f"{base_url}?model={self._voice}&encoding={ + self._encoding}&container=none&sample_rate={self._sample_rate}" headers = {"authorization": f"token {self._api_key}"} body = {"text": text} @@ -133,15 +134,18 @@ def __init__(self, self._client = DeepgramClient( api_key, config=DeepgramClientOptions(url=url, options={"keepalive": "true"})) - self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1") - self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message) + self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v( + "1") + self._connection.on( + LiveTranscriptionEvents.Transcript, self._on_message) if self.vad_enabled: - self._connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started) + self._connection.on( + LiveTranscriptionEvents.SpeechStarted, self._on_speech_started) @property def vad_enabled(self): return self._live_options.vad_events - + def can_generate_metrics(self) -> bool: return self.vad_enabled @@ -184,7 +188,7 @@ async def _disconnect(self): if self._connection.is_connected: await self._connection.finish() logger.debug(f"{self}: Disconnected from Deepgram") - + async def _on_speech_started(self, *args, **kwargs): await self.start_ttfb_metrics() await self.start_processing_metrics()