Skip to content

Commit

Permalink
Implement Sentry instrumentation for performance and error tracking (#…
Browse files Browse the repository at this point in the history
…470)

* feat: Add Sentry support in FrameProcessor

This update add optional Sentry integration for performance tracking and error monitoring.

Key changes include:

- Add conditional Sentry import and initialization check
- Implement Sentry spans in FrameProcessorMetrics to measure TTFB (Time To First Byte) and processing time when Sentry is available
- Maintain existing metrics functionality with MetricsFrame regardless of Sentry availability

* feat: Enable metrics in DeepgramSTTService for Sentry

This commit enhances the DeepgramSTTService class to enable metrics generation for use with Sentry.

Key changes include:

1. Enable general metrics generation:
   - Implement `can_generate_metrics` method, returning True when VAD is enabled
   - This allows metrics to be collected and used by both Sentry and the metrics system in frame_processor.py

2. Integrate Sentry-compatible performance tracking:
   - Add start_ttfb_metrics and start_processing_metrics calls in the VAD speech detection handler
   - Implement stop_ttfb_metrics call when receiving transcripts
   - Add stop_processing_metrics for final transcripts

3. Enhance VAD support for metrics:
   - Add `vad_enabled` property to check VAD event availability
   - Implement VAD-based speech detection handler for precise metric timing

These changes enable detailed performance tracking via both Sentry and the general metrics system when VAD is active. This allows for better monitoring and analysis of the speech-to-text process, providing valuable insights through Sentry and any other metrics consumers in the pipeline.

* Update frame_processor.py

* Refactor to support flexible metrics implementation

- Modified the __init__ method to accept a metrics parameter that is either FrameProcessorMetrics or one of its subclasses
- Updated the metrics initialization to create an instance with the processor's name
- Moved all FrameProcessorMetrics-related logic to a new processors\metrics\base.py file

* Implement flexible metrics system with Sentry integration

1. Created a new metrics module in processors/metrics/

2. Implemented FrameProcessorMetrics base class in base.py:

3. Implemented SentryMetrics class in sentry.py:
   - Inherits from FrameProcessorMetrics
   - Integrates with Sentry SDK for advanced metrics tracking
   - Implements Sentry-specific span creation and management for TTFB and processing metrics
   - Handles cases where Sentry is not available or initialized
  • Loading branch information
cyrilS-dev authored and kwindla committed Sep 25, 2024
1 parent e4f5e56 commit 33cf91b
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 80 deletions.
84 changes: 9 additions & 75 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@
EndFrame,
ErrorFrame,
Frame,
MetricsFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame)
from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData)
MetricsData)
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand All @@ -36,78 +32,13 @@ class FrameDirection(Enum):
UPSTREAM = 2


class FrameProcessorMetrics:
def __init__(self, name: str):
self._core_metrics_data = MetricsData(processor=name)
self._start_ttfb_time = 0
self._start_processing_time = 0
self._should_report_ttfb = True

def _processor_name(self):
return self._core_metrics_data.processor

def _model_name(self):
return self._core_metrics_data.model

def set_core_metrics_data(self, data: MetricsData):
self._core_metrics_data = data

async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
if self._start_ttfb_time == 0:
return None

value = time.time() - self._start_ttfb_time
logger.debug(f"{self._processor_name()} TTFB: {value}")
ttfb = TTFBMetricsData(
processor=self._processor_name(),
value=value,
model=self._model_name())
self._start_ttfb_time = 0
return MetricsFrame(data=[ttfb])

async def start_processing_metrics(self):
self._start_processing_time = time.time()

async def stop_processing_metrics(self):
if self._start_processing_time == 0:
return None

value = time.time() - self._start_processing_time
logger.debug(f"{self._processor_name()} processing time: {value}")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name())
self._start_processing_time = 0
return MetricsFrame(data=[processing])

async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
logger.debug(
f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}")
value = LLMUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=tokens)
return MetricsFrame(data=[value])

async def start_tts_usage_metrics(self, text: str):
characters = TTSUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=len(text))
logger.debug(f"{self._processor_name()} usage characters: {characters.value}")
return MetricsFrame(data=[characters])


class FrameProcessor:

def __init__(
self,
*,
name: str | None = None,
metrics: FrameProcessorMetrics | None = None,
sync: bool = True,
loop: asyncio.AbstractEventLoop | None = None,
**kwargs):
Expand All @@ -129,7 +60,8 @@ def __init__(
self._report_only_initial_ttfb = False

# Metrics
self._metrics = FrameProcessorMetrics(name=self.name)
self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name)

# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are
Expand Down Expand Up @@ -262,14 +194,16 @@ async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
logger.trace(f"Pushing {frame} from {self} to {self._next}")
await self._next.process_frame(frame, direction)
elif direction == FrameDirection.UPSTREAM and self._prev:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
logger.trace(f"Pushing {frame} upstream from {
self} to {self._prev}")
await self._prev.process_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")

def __create_push_task(self):
self.__push_queue = asyncio.Queue()
self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler())
self.__push_frame_task = self.get_event_loop(
).create_task(self.__push_frame_task_handler())

async def __push_frame_task_handler(self):
running = True
Expand Down
81 changes: 81 additions & 0 deletions src/pipecat/processors/metrics/frame_processor_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import time

from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData)

from loguru import logger


class FrameProcessorMetrics:
def __init__(self):
self._start_ttfb_time = 0
self._start_processing_time = 0
self._should_report_ttfb = True

def _processor_name(self):
return self._core_metrics_data.processor

def _model_name(self):
return self._core_metrics_data.model

def set_core_metrics_data(self, data: MetricsData):
self._core_metrics_data = data

def set_processor_name(self, name: str):
self._core_metrics_data = MetricsData(processor=name)

async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
if self._start_ttfb_time == 0:
return None

value = time.time() - self._start_ttfb_time
logger.debug(f"{self._processor_name()} TTFB: {value}")
ttfb = TTFBMetricsData(
processor=self._processor_name(),
value=value,
model=self._model_name())
self._start_ttfb_time = 0
return MetricsFrame(data=[ttfb])

async def start_processing_metrics(self):
self._start_processing_time = time.time()

async def stop_processing_metrics(self):
if self._start_processing_time == 0:
return None

value = time.time() - self._start_processing_time
logger.debug(f"{self._processor_name()} processing time: {value}")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name())
self._start_processing_time = 0
return MetricsFrame(data=[processing])

async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
logger.debug(
f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}")
value = LLMUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=tokens)
return MetricsFrame(data=[value])

async def start_tts_usage_metrics(self, text: str):
characters = TTSUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=len(text))
logger.debug(f"{self._processor_name()} usage characters: {
characters.value}")
return MetricsFrame(data=[characters])
56 changes: 56 additions & 0 deletions src/pipecat/processors/metrics/sentry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import time
from loguru import logger

try:
import sentry_sdk
sentry_available = sentry_sdk.is_initialized()
if not sentry_available:
logger.warning(
"Sentry SDK not initialized. Sentry features will be disabled.")
except ImportError:
sentry_available = False
logger.warning(
"Sentry SDK not installed. Sentry features will be disabled.")

from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics


class SentryMetrics(FrameProcessorMetrics):
def __init__(self):
super().__init__()
self._ttfb_metrics_span = None
self._processing_metrics_span = None

async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
if sentry_available:
self._ttfb_metrics_span = sentry_sdk.start_span(
op="ttfb",
description=f"TTFB for {self._processor_name()}",
start_timestamp=self._start_ttfb_time
)
logger.debug(f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: {
self._ttfb_metrics_span.description} started.")
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
stop_time = time.time()
if sentry_available:
self._ttfb_metrics_span.finish(end_timestamp=stop_time)

async def start_processing_metrics(self):
self._start_processing_time = time.time()
if sentry_available:
self._processing_metrics_span = sentry_sdk.start_span(
op="processing",
description=f"Processing for {self._processor_name()}",
start_timestamp=self._start_processing_time
)
logger.debug(f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: {
self._processing_metrics_span.description} started.")

async def stop_processing_metrics(self):
stop_time = time.time()
if sentry_available:
self._processing_metrics_span.finish(end_timestamp=stop_time)
28 changes: 23 additions & 5 deletions src/pipecat/services/deepgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

base_url = self._base_url
request_url = f"{base_url}?model={self._voice}&encoding={self._encoding}&container=none&sample_rate={self._sample_rate}"
request_url = f"{base_url}?model={self._voice}&encoding={
self._encoding}&container=none&sample_rate={self._sample_rate}"
headers = {"authorization": f"token {self._api_key}"}
body = {"text": text}

Expand Down Expand Up @@ -124,6 +125,7 @@ def __init__(self,
smart_format=True,
punctuate=True,
profanity_filter=True,
vad_events=False,
),
**kwargs):
super().__init__(**kwargs)
Expand All @@ -132,8 +134,20 @@ def __init__(self,

self._client = DeepgramClient(
api_key, config=DeepgramClientOptions(url=url, options={"keepalive": "true"}))
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v(
"1")
self._connection.on(
LiveTranscriptionEvents.Transcript, self._on_message)
if self.vad_enabled:
self._connection.on(
LiveTranscriptionEvents.SpeechStarted, self._on_speech_started)

@property
def vad_enabled(self):
return self._live_options.vad_events

def can_generate_metrics(self) -> bool:
return self.vad_enabled

async def set_model(self, model: str):
await super().set_model(model)
Expand Down Expand Up @@ -161,9 +175,7 @@ async def cancel(self, frame: CancelFrame):
await self._disconnect()

async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
await self.start_processing_metrics()
await self._connection.send(audio)
await self.stop_processing_metrics()
yield None

async def _connect(self):
Expand All @@ -177,6 +189,10 @@ async def _disconnect(self):
await self._connection.finish()
logger.debug(f"{self}: Disconnected from Deepgram")

async def _on_speech_started(self, *args, **kwargs):
await self.start_ttfb_metrics()
await self.start_processing_metrics()

async def _on_message(self, *args, **kwargs):
result: LiveResultResponse = kwargs["result"]
if len(result.channel.alternatives) == 0:
Expand All @@ -188,7 +204,9 @@ async def _on_message(self, *args, **kwargs):
language = result.channel.alternatives[0].languages[0]
language = Language(language)
if len(transcript) > 0:
await self.stop_ttfb_metrics()
if is_final:
await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601(), language))
await self.stop_processing_metrics()
else:
await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601(), language))

0 comments on commit 33cf91b

Please sign in to comment.