Skip to content

Commit

Permalink
services: use start/stop_ttfb_metrics to report TTFB metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed Jun 6, 2024
1 parent e765a29 commit 390582d
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 62 deletions.
11 changes: 9 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added TTFB debug logging for TTS services
- Added `enable_metrics` to `PipelineParams`.

- Added `MetricsFrame`. The `MetricsFrame` will report different metrics in the
system. Right now, it can report TTFB (Time To First Byte) values for
different services, that is the time spent between the arrival of a `Frame` to
the processor/service until the first `DataFrame` is pushed downstream.

- Added TTFB metrics and debug logging for TTS services.

### Fixed

- Fixed PlayHT TTS service to work properly async
- Fixed PlayHT TTS service to work properly async.

## [0.0.28] - 2024-06-05

Expand Down
7 changes: 7 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ class StopInterruptionFrame(SystemFrame):
pass


@dataclass
class MetricsFrame(SystemFrame):
"""Emitted by processor who can compute metrics like latencies.
"""
ttfb: Mapping[str, float]


#
# Control frames
#
Expand Down
9 changes: 7 additions & 2 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

class PipelineParams(BaseModel):
allow_interruptions: bool = False
enable_metrics: bool = False


class Source(FrameProcessor):
Expand Down Expand Up @@ -89,8 +90,12 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
raise Exception("Frames must be an iterable or async iterable")

async def _process_down_queue(self):
await self._source.process_frame(
StartFrame(allow_interruptions=self._params.allow_interruptions), FrameDirection.DOWNSTREAM)
start_frame = StartFrame(
allow_interruptions=self._params.allow_interruptions,
enable_metrics=self._params.enable_metrics,
)
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)

running = True
should_cleanup = True
while running:
Expand Down
21 changes: 18 additions & 3 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
#

import asyncio
import time

from enum import Enum

from pipecat.frames.frames import ErrorFrame, Frame, StartFrame
from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand All @@ -32,14 +33,28 @@ def __init__(self, loop: asyncio.AbstractEventLoop | None = None):
self._allow_interruptions = False
self._enable_metrics = False

# Metrics
self._start_ttfb_time = 0

@property
def allow_interruptions(self):
def interruptions_allowed(self):
return self._allow_interruptions

@property
def enable_metrics(self):
def metrics_enabled(self):
return self._enable_metrics

async def start_ttfb_metrics(self):
if self.metrics_enabled:
self._start_ttfb_time = time.time()

async def stop_ttfb_metrics(self):
if self.metrics_enabled and self._start_ttfb_time > 0:
ttfb = time.time() - self._start_ttfb_time
logger.debug(f"{self.name} TTFB: {ttfb}")
await self.push_frame(MetricsFrame(ttfb={self.name: ttfb}))
self._start_ttfb_time = 0

async def cleanup(self):
pass

Expand Down
8 changes: 5 additions & 3 deletions src/pipecat/services/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

import time
import base64

from pipecat.frames.frames import (
Expand Down Expand Up @@ -102,13 +101,16 @@ async def _process_context(self, context: OpenAILLMContext):

messages = self._get_messages_from_openai_context(context)

start_time = time.time()
await self.start_ttfb_metric()

response = await self._client.messages.create(
messages=messages,
model=self._model,
max_tokens=self._max_tokens,
stream=True)
logger.debug(f"Anthropic LLM TTFB: {time.time() - start_time}")

await self.stop_ttfb_metric()

async for event in response:
# logger.debug(f"Anthropic LLM event: {event}")
if (event.type == "content_block_delta"):
Expand Down
10 changes: 3 additions & 7 deletions src/pipecat/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
import aiohttp
import asyncio
import io
import time

from PIL import Image
from typing import AsyncGenerator

from numpy import str_
from openai import AsyncAzureOpenAI

from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, URLImageRawFrame
Expand Down Expand Up @@ -47,10 +45,10 @@ def __init__(self, *, api_key: str, region: str, voice="en-US-SaraNeural", **kwa
self._voice = voice

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
start_time = time.time()
ttfb = None
logger.debug(f"Generating TTS: {text}")

await self.start_ttfb_metrics()

ssml = (
"<speak version='1.0' xml:lang='en-US' xmlns='http://www.w3.org/2001/10/synthesis' "
"xmlns:mstts='http://www.w3.org/2001/mstts'>"
Expand All @@ -64,9 +62,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
result = await asyncio.to_thread(self.speech_synthesizer.speak_ssml, (ssml))

if result.reason == ResultReason.SynthesizingAudioCompleted:
if ttfb is None:
ttfb = time.time() - start_time
logger.debug(f"TTS ttfb: {ttfb}")
await self.stop_ttfb_metrics()
# Azure always sends a 44-byte header. Strip it off.
yield AudioRawFrame(audio=result.audio_data[44:], sample_rate=16000, num_channels=1)
elif result.reason == ResultReason.Canceled:
Expand Down
9 changes: 3 additions & 6 deletions src/pipecat/services/cartesia.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import time

from cartesia.tts import AsyncCartesiaTTS

Expand Down Expand Up @@ -41,11 +40,11 @@ def __init__(
logger.error(f"Cartesia initialization error: {e}")

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
start_time = time.time()
ttfb = None
logger.debug(f"Generating TTS: [{text}]")

try:
await self.start_ttfb_metrics()

chunk_generator = await self._client.generate(
stream=True,
transcript=text,
Expand All @@ -55,9 +54,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
)

async for chunk in chunk_generator:
if ttfb is None:
ttfb = time.time() - start_time
logger.debug(f"TTS ttfb: {ttfb}")
await self.stop_ttfb_metrics()
yield AudioRawFrame(chunk["audio"], chunk["sampling_rate"], 1)
except Exception as e:
logger.error(f"Cartesia exception: {e}")
8 changes: 2 additions & 6 deletions src/pipecat/services/deepgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#

import aiohttp
import time

from typing import AsyncGenerator

Expand All @@ -31,8 +30,6 @@ def __init__(
self._aiohttp_session = aiohttp_session

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
start_time = time.time()
ttfb = None
logger.debug(f"Generating TTS: [{text}]")

base_url = "https://api.deepgram.com/v1/speak"
Expand All @@ -41,6 +38,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
body = {"text": text}

try:
await self.start_ttfb_metrics()
async with self._aiohttp_session.post(request_url, headers=headers, json=body) as r:
if r.status != 200:
text = await r.text()
Expand All @@ -49,9 +47,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
return

async for data in r.content:
if ttfb is None:
ttfb = time.time() - start_time
logger.debug(f"TTS ttfb: {ttfb}")
await self.stop_ttfb_metrics()
frame = AudioRawFrame(audio=data, sample_rate=16000, num_channels=1)
yield frame
except Exception as e:
Expand Down
9 changes: 3 additions & 6 deletions src/pipecat/services/elevenlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#

import aiohttp
import time

from typing import AsyncGenerator

Expand Down Expand Up @@ -33,8 +32,6 @@ def __init__(
self._model = model

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
start_time = time.time()
ttfb = None
logger.debug(f"Generating TTS: [{text}]")

url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream"
Expand All @@ -50,6 +47,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"Content-Type": "application/json",
}

await self.start_ttfb_metrics()

async with self._aiohttp_session.post(url, json=payload, headers=headers, params=querystring) as r:
if r.status != 200:
text = await r.text()
Expand All @@ -59,8 +58,6 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:

async for chunk in r.content:
if len(chunk) > 0:
if ttfb is None:
ttfb = time.time() - start_time
logger.debug(f"TTS ttfb: {ttfb}")
await self.stop_ttfb_metrics()
frame = AudioRawFrame(chunk, 16000, 1)
yield frame
14 changes: 9 additions & 5 deletions src/pipecat/services/google.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import json
import os
import asyncio
import time

from typing import List

Expand Down Expand Up @@ -81,9 +83,11 @@ async def _process_context(self, context: OpenAILLMContext):

messages = self._get_messages_from_openai_context(context)

start_time = time.time()
await self.start_ttfb_metrics()

response = self._client.generate_content(messages, stream=True)
logger.debug(f"Google LLM TTFB: {time.time() - start_time}")

await self.stop_ttfb_metrics()

async for chunk in self._async_generator_wrapper(response):
try:
Expand Down
20 changes: 10 additions & 10 deletions src/pipecat/services/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import aiohttp
import base64
import io
import json
import time

from typing import AsyncGenerator, List, Literal

import aiohttp
from loguru import logger
from PIL import Image

Expand Down Expand Up @@ -94,7 +95,6 @@ async def _stream_chat_completions(
del message["data"]
del message["mime_type"]

start_time = time.time()
chunks: AsyncStream[ChatCompletionChunk] = (
await self._client.chat.completions.create(
model=self._model,
Expand All @@ -105,8 +105,6 @@ async def _stream_chat_completions(
)
)

logger.debug(f"OpenAI LLM TTFB: {time.time() - start_time}")

return chunks

async def _chat_completions(self, messages) -> str | None:
Expand All @@ -123,6 +121,8 @@ async def _process_context(self, context: OpenAILLMContext):
arguments = ""
tool_call_id = ""

await self.start_ttfb_metrics()

chunk_stream: AsyncStream[ChatCompletionChunk] = (
await self._stream_chat_completions(context)
)
Expand All @@ -131,6 +131,8 @@ async def _process_context(self, context: OpenAILLMContext):
if len(chunk.choices) == 0:
continue

await self.stop_ttfb_metrics()

if chunk.choices[0].delta.tool_calls:
# We're streaming the LLM response to enable the fastest response times.
# For text, we just yield each chunk as we receive it and count on consumers
Expand Down Expand Up @@ -306,11 +308,11 @@ def __init__(
self._client = AsyncOpenAI(api_key=api_key)

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
start_time = time.time()
ttfb = None
logger.debug(f"Generating TTS: [{text}]")

try:
await self.start_ttfb_metrics()

async with self._client.audio.speech.with_streaming_response.create(
input=text,
model=self._model,
Expand All @@ -324,9 +326,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
return
async for chunk in r.iter_bytes(8192):
if len(chunk) > 0:
if ttfb is None:
ttfb = time.time() - start_time
logger.debug(f"TTS ttfb: {ttfb}")
await self.stop_ttfb_metrics()
frame = AudioRawFrame(chunk, 24_000, 1)
yield frame
except BadRequestError as e:
Expand Down
Loading

0 comments on commit 390582d

Please sign in to comment.