Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Sentry instrumentation for performance and error tracking #470

Merged
merged 13 commits into from
Sep 23, 2024
Merged
79 changes: 5 additions & 74 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@
EndFrame,
ErrorFrame,
Frame,
MetricsFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame)
from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData)
MetricsData)
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand All @@ -35,79 +31,13 @@ class FrameDirection(Enum):
DOWNSTREAM = 1
UPSTREAM = 2


class FrameProcessorMetrics:
def __init__(self, name: str):
self._core_metrics_data = MetricsData(processor=name)
self._start_ttfb_time = 0
self._start_processing_time = 0
self._should_report_ttfb = True

def _processor_name(self):
return self._core_metrics_data.processor

def _model_name(self):
return self._core_metrics_data.model

def set_core_metrics_data(self, data: MetricsData):
self._core_metrics_data = data

async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
if self._start_ttfb_time == 0:
return None

value = time.time() - self._start_ttfb_time
logger.debug(f"{self._processor_name()} TTFB: {value}")
ttfb = TTFBMetricsData(
processor=self._processor_name(),
value=value,
model=self._model_name())
self._start_ttfb_time = 0
return MetricsFrame(data=[ttfb])

async def start_processing_metrics(self):
self._start_processing_time = time.time()

async def stop_processing_metrics(self):
if self._start_processing_time == 0:
return None

value = time.time() - self._start_processing_time
logger.debug(f"{self._processor_name()} processing time: {value}")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name())
self._start_processing_time = 0
return MetricsFrame(data=[processing])

async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
logger.debug(
f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}")
value = LLMUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=tokens)
return MetricsFrame(data=[value])

async def start_tts_usage_metrics(self, text: str):
characters = TTSUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=len(text))
logger.debug(f"{self._processor_name()} usage characters: {characters.value}")
return MetricsFrame(data=[characters])


class FrameProcessor:

def __init__(
self,
*,
name: str | None = None,
metrics: FrameProcessorMetrics | None = None,
sync: bool = True,
loop: asyncio.AbstractEventLoop | None = None,
**kwargs):
Expand All @@ -129,7 +59,8 @@ def __init__(
self._report_only_initial_ttfb = False

# Metrics
self._metrics = FrameProcessorMetrics(name=self.name)
self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name)

# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are
Expand Down
79 changes: 79 additions & 0 deletions src/pipecat/processors/metrics/frame_processor_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import time

from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData)

from loguru import logger

class FrameProcessorMetrics:
def __init__(self):
self._start_ttfb_time = 0
self._start_processing_time = 0
self._should_report_ttfb = True

def _processor_name(self):
return self._core_metrics_data.processor

def _model_name(self):
return self._core_metrics_data.model

def set_core_metrics_data(self, data: MetricsData):
self._core_metrics_data = data

def set_processor_name(self, name: str):
self._core_metrics_data = MetricsData(processor=name)

async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
if self._start_ttfb_time == 0:
return None

value = time.time() - self._start_ttfb_time
logger.debug(f"{self._processor_name()} TTFB: {value}")
ttfb = TTFBMetricsData(
processor=self._processor_name(),
value=value,
model=self._model_name())
self._start_ttfb_time = 0
return MetricsFrame(data=[ttfb])

async def start_processing_metrics(self):
self._start_processing_time = time.time()

async def stop_processing_metrics(self):
if self._start_processing_time == 0:
return None

value = time.time() - self._start_processing_time
logger.debug(f"{self._processor_name()} processing time: {value}")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name())
self._start_processing_time = 0
return MetricsFrame(data=[processing])

async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
logger.debug(
f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}")
value = LLMUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=tokens)
return MetricsFrame(data=[value])

async def start_tts_usage_metrics(self, text: str):
characters = TTSUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=len(text))
logger.debug(f"{self._processor_name()} usage characters: {characters.value}")
return MetricsFrame(data=[characters])
52 changes: 52 additions & 0 deletions src/pipecat/processors/metrics/sentry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import time
from loguru import logger

try:
import sentry_sdk
sentry_available = sentry_sdk.is_initialized()
if not sentry_available:
logger.debug("Sentry SDK not initialized. Sentry features will be disabled.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems this should be a warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ✅

except ImportError:
sentry_available = False
logger.debug("Sentry SDK not installed. Sentry features will be disabled.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ✅


from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics

class SentryMetrics(FrameProcessorMetrics):
def __init__(self):
super().__init__()
self._ttfb_metrics_span = None
self._processing_metrics_span = None

async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
if sentry_available:
self._ttfb_metrics_span = sentry_sdk.start_span(
op="ttfb",
description=f"TTFB for {self._processor_name()}",
start_timestamp=self._start_ttfb_time
)
logger.debug(f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: {self._ttfb_metrics_span.description} started.")
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self):
stop_time = time.time()
if sentry_available:
self._ttfb_metrics_span.finish(end_timestamp=stop_time)

async def start_processing_metrics(self):
self._start_processing_time = time.time()
if sentry_available:
self._processing_metrics_span = sentry_sdk.start_span(
op="processing",
description=f"Processing for {self._processor_name()}",
start_timestamp=self._start_processing_time
)
logger.debug(f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: {self._processing_metrics_span.description} started.")


async def stop_processing_metrics(self):
stop_time = time.time()
if sentry_available:
self._processing_metrics_span.finish(end_timestamp=stop_time)
18 changes: 16 additions & 2 deletions src/pipecat/services/deepgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def __init__(self,
smart_format=True,
punctuate=True,
profanity_filter=True,
vad_events=False,
),
**kwargs):
super().__init__(**kwargs)
Expand All @@ -134,6 +135,15 @@ def __init__(self,
api_key, config=DeepgramClientOptions(url=url, options={"keepalive": "true"}))
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
if self.vad_enabled:
self._connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started)

@property
def vad_enabled(self):
return self._live_options.vad_events

def can_generate_metrics(self) -> bool:
return self.vad_enabled

async def set_model(self, model: str):
await super().set_model(model)
Expand Down Expand Up @@ -161,9 +171,7 @@ async def cancel(self, frame: CancelFrame):
await self._disconnect()

async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
await self.start_processing_metrics()
await self._connection.send(audio)
await self.stop_processing_metrics()
yield None

async def _connect(self):
Expand All @@ -176,6 +184,10 @@ async def _disconnect(self):
if self._connection.is_connected:
await self._connection.finish()
logger.debug(f"{self}: Disconnected from Deepgram")

async def _on_speech_started(self, *args, **kwargs):
await self.start_ttfb_metrics()
await self.start_processing_metrics()

async def _on_message(self, *args, **kwargs):
result: LiveResultResponse = kwargs["result"]
Expand All @@ -188,7 +200,9 @@ async def _on_message(self, *args, **kwargs):
language = result.channel.alternatives[0].languages[0]
language = Language(language)
if len(transcript) > 0:
await self.stop_ttfb_metrics()
if is_final:
await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601(), language))
await self.stop_processing_metrics()
else:
await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601(), language))
Loading