diff --git a/CHANGELOG.md b/CHANGELOG.md index a9b78496f..2b7b3d7f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added TTFB debug logging for TTS services +- Added `enable_metrics` to `PipelineParams`. + +- Added `MetricsFrame`. The `MetricsFrame` will report different metrics in the + system. Right now, it can report TTFB (Time To First Byte) values for + different services, that is the time spent between the arrival of a `Frame` to + the processor/service until the first `DataFrame` is pushed downstream. + +- Added TTFB metrics and debug logging for TTS services. ### Fixed -- Fixed PlayHT TTS service to work properly async +- Fixed PlayHT TTS service to work properly async. ## [0.0.28] - 2024-06-05 diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 9a5725de1..b535d0de0 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -239,6 +239,13 @@ class StopInterruptionFrame(SystemFrame): pass +@dataclass +class MetricsFrame(SystemFrame): + """Emitted by processor who can compute metrics like latencies. + """ + ttfb: Mapping[str, float] + + # # Control frames # diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index b1f72c096..76d98165b 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -19,6 +19,7 @@ class PipelineParams(BaseModel): allow_interruptions: bool = False + enable_metrics: bool = False class Source(FrameProcessor): @@ -89,8 +90,12 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): raise Exception("Frames must be an iterable or async iterable") async def _process_down_queue(self): - await self._source.process_frame( - StartFrame(allow_interruptions=self._params.allow_interruptions), FrameDirection.DOWNSTREAM) + start_frame = StartFrame( + allow_interruptions=self._params.allow_interruptions, + enable_metrics=self._params.enable_metrics, + ) + await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM) + running = True should_cleanup = True while running: diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 0b9c71fef..e0a072477 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -5,10 +5,11 @@ # import asyncio +import time from enum import Enum -from pipecat.frames.frames import ErrorFrame, Frame, StartFrame +from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -32,14 +33,28 @@ def __init__(self, loop: asyncio.AbstractEventLoop | None = None): self._allow_interruptions = False self._enable_metrics = False + # Metrics + self._start_ttfb_time = 0 + @property - def allow_interruptions(self): + def interruptions_allowed(self): return self._allow_interruptions @property - def enable_metrics(self): + def metrics_enabled(self): return self._enable_metrics + async def start_ttfb_metrics(self): + if self.metrics_enabled: + self._start_ttfb_time = time.time() + + async def stop_ttfb_metrics(self): + if self.metrics_enabled and self._start_ttfb_time > 0: + ttfb = time.time() - self._start_ttfb_time + logger.debug(f"{self.name} TTFB: {ttfb}") + await self.push_frame(MetricsFrame(ttfb={self.name: ttfb})) + self._start_ttfb_time = 0 + async def cleanup(self): pass diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index 956941073..95b31f336 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import time import base64 from pipecat.frames.frames import ( @@ -102,13 +101,16 @@ async def _process_context(self, context: OpenAILLMContext): messages = self._get_messages_from_openai_context(context) - start_time = time.time() + await self.start_ttfb_metric() + response = await self._client.messages.create( messages=messages, model=self._model, max_tokens=self._max_tokens, stream=True) - logger.debug(f"Anthropic LLM TTFB: {time.time() - start_time}") + + await self.stop_ttfb_metric() + async for event in response: # logger.debug(f"Anthropic LLM event: {event}") if (event.type == "content_block_delta"): diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index fce3c5939..d35584303 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -7,12 +7,10 @@ import aiohttp import asyncio import io -import time from PIL import Image from typing import AsyncGenerator -from numpy import str_ from openai import AsyncAzureOpenAI from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, URLImageRawFrame @@ -47,10 +45,10 @@ def __init__(self, *, api_key: str, region: str, voice="en-US-SaraNeural", **kwa self._voice = voice async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - start_time = time.time() - ttfb = None logger.debug(f"Generating TTS: {text}") + await self.start_ttfb_metrics() + ssml = ( "" @@ -64,9 +62,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: result = await asyncio.to_thread(self.speech_synthesizer.speak_ssml, (ssml)) if result.reason == ResultReason.SynthesizingAudioCompleted: - if ttfb is None: - ttfb = time.time() - start_time - logger.debug(f"TTS ttfb: {ttfb}") + await self.stop_ttfb_metrics() # Azure always sends a 44-byte header. Strip it off. yield AudioRawFrame(audio=result.audio_data[44:], sample_rate=16000, num_channels=1) elif result.reason == ResultReason.Canceled: diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 9c730a283..474600ffa 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -3,7 +3,6 @@ # # SPDX-License-Identifier: BSD 2-Clause License # -import time from cartesia.tts import AsyncCartesiaTTS @@ -41,11 +40,11 @@ def __init__( logger.error(f"Cartesia initialization error: {e}") async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - start_time = time.time() - ttfb = None logger.debug(f"Generating TTS: [{text}]") try: + await self.start_ttfb_metrics() + chunk_generator = await self._client.generate( stream=True, transcript=text, @@ -55,9 +54,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: ) async for chunk in chunk_generator: - if ttfb is None: - ttfb = time.time() - start_time - logger.debug(f"TTS ttfb: {ttfb}") + await self.stop_ttfb_metrics() yield AudioRawFrame(chunk["audio"], chunk["sampling_rate"], 1) except Exception as e: logger.error(f"Cartesia exception: {e}") diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 7b19e04e2..aaf0c01ee 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -5,7 +5,6 @@ # import aiohttp -import time from typing import AsyncGenerator @@ -31,8 +30,6 @@ def __init__( self._aiohttp_session = aiohttp_session async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - start_time = time.time() - ttfb = None logger.debug(f"Generating TTS: [{text}]") base_url = "https://api.deepgram.com/v1/speak" @@ -41,6 +38,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: body = {"text": text} try: + await self.start_ttfb_metrics() async with self._aiohttp_session.post(request_url, headers=headers, json=body) as r: if r.status != 200: text = await r.text() @@ -49,9 +47,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: return async for data in r.content: - if ttfb is None: - ttfb = time.time() - start_time - logger.debug(f"TTS ttfb: {ttfb}") + await self.stop_ttfb_metrics() frame = AudioRawFrame(audio=data, sample_rate=16000, num_channels=1) yield frame except Exception as e: diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index d5b476160..717460fd3 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -5,7 +5,6 @@ # import aiohttp -import time from typing import AsyncGenerator @@ -33,8 +32,6 @@ def __init__( self._model = model async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - start_time = time.time() - ttfb = None logger.debug(f"Generating TTS: [{text}]") url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream" @@ -50,6 +47,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: "Content-Type": "application/json", } + await self.start_ttfb_metrics() + async with self._aiohttp_session.post(url, json=payload, headers=headers, params=querystring) as r: if r.status != 200: text = await r.text() @@ -59,8 +58,6 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async for chunk in r.content: if len(chunk) > 0: - if ttfb is None: - ttfb = time.time() - start_time - logger.debug(f"TTS ttfb: {ttfb}") + await self.stop_ttfb_metrics() frame = AudioRawFrame(chunk, 16000, 1) yield frame diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index 89ac6d83d..ffe9a9fee 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -1,8 +1,10 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# -import json -import os import asyncio -import time from typing import List @@ -81,9 +83,11 @@ async def _process_context(self, context: OpenAILLMContext): messages = self._get_messages_from_openai_context(context) - start_time = time.time() + await self.start_ttfb_metrics() + response = self._client.generate_content(messages, stream=True) - logger.debug(f"Google LLM TTFB: {time.time() - start_time}") + + await self.stop_ttfb_metrics() async for chunk in self._async_generator_wrapper(response): try: diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 57bcc09d9..aaed7d8e7 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -3,13 +3,14 @@ # # SPDX-License-Identifier: BSD 2-Clause License # + +import aiohttp import base64 import io import json -import time + from typing import AsyncGenerator, List, Literal -import aiohttp from loguru import logger from PIL import Image @@ -94,7 +95,6 @@ async def _stream_chat_completions( del message["data"] del message["mime_type"] - start_time = time.time() chunks: AsyncStream[ChatCompletionChunk] = ( await self._client.chat.completions.create( model=self._model, @@ -105,8 +105,6 @@ async def _stream_chat_completions( ) ) - logger.debug(f"OpenAI LLM TTFB: {time.time() - start_time}") - return chunks async def _chat_completions(self, messages) -> str | None: @@ -123,6 +121,8 @@ async def _process_context(self, context: OpenAILLMContext): arguments = "" tool_call_id = "" + await self.start_ttfb_metrics() + chunk_stream: AsyncStream[ChatCompletionChunk] = ( await self._stream_chat_completions(context) ) @@ -131,6 +131,8 @@ async def _process_context(self, context: OpenAILLMContext): if len(chunk.choices) == 0: continue + await self.stop_ttfb_metrics() + if chunk.choices[0].delta.tool_calls: # We're streaming the LLM response to enable the fastest response times. # For text, we just yield each chunk as we receive it and count on consumers @@ -306,11 +308,11 @@ def __init__( self._client = AsyncOpenAI(api_key=api_key) async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - start_time = time.time() - ttfb = None logger.debug(f"Generating TTS: [{text}]") try: + await self.start_ttfb_metrics() + async with self._client.audio.speech.with_streaming_response.create( input=text, model=self._model, @@ -324,9 +326,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: return async for chunk in r.iter_bytes(8192): if len(chunk) > 0: - if ttfb is None: - ttfb = time.time() - start_time - logger.debug(f"TTS ttfb: {ttfb}") + await self.stop_ttfb_metrics() frame = AudioRawFrame(chunk, 24_000, 1) yield frame except BadRequestError as e: diff --git a/src/pipecat/services/playht.py b/src/pipecat/services/playht.py index 8373eca7b..cf38e125d 100644 --- a/src/pipecat/services/playht.py +++ b/src/pipecat/services/playht.py @@ -6,8 +6,6 @@ import io import struct -import time -import asyncio from typing import AsyncGenerator @@ -49,21 +47,19 @@ def __del__(self): self._client.close() async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - start_time = time.time() - ttfb = None logger.debug(f"Generating TTS: [{text}]") try: b = bytearray() in_header = True + + await self.start_ttfb_metrics() + playht_gen = self._client.tts( text, voice_engine="PlayHT2.0-turbo", options=self._options) - # need to ask Aleix about this. frames are getting pushed. - # but playback is blocked - async for chunk in playht_gen: # skip the RIFF header. if in_header: @@ -80,9 +76,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: in_header = False else: if len(chunk): - if ttfb is None: - ttfb = time.time() - start_time - logger.debug(f"TTS ttfb: {ttfb}") + await self.stop_ttfb_metrics() frame = AudioRawFrame(chunk, 16000, 1) yield frame except Exception as e: diff --git a/src/pipecat/services/whisper.py b/src/pipecat/services/whisper.py index 7884d454b..6c5dd0d1f 100644 --- a/src/pipecat/services/whisper.py +++ b/src/pipecat/services/whisper.py @@ -73,6 +73,8 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: logger.error("Whisper model not available") return + await self.start_ttfb_metrics() + # Divide by 32768 because we have signed 16-bit data. audio_float = np.frombuffer(audio, dtype=np.int16).astype(np.float32) / 32768.0 @@ -83,4 +85,5 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: text += f"{segment.text} " if text: + await self.stop_ttfb_metrics() yield TranscriptionFrame(text, "", int(time.time_ns() / 1000000)) diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 3657727fb..823ab5844 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -123,7 +123,7 @@ async def _push_frame_task_handler(self): # async def _handle_interruptions(self, frame: Frame): - if self.allow_interruptions: + if self.interruptions_allowed: # Make sure we notify about interruptions quickly out-of-band if isinstance(frame, UserStartedSpeakingFrame): logger.debug("User started speaking") diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index e880c99d4..b81f2f8db 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -132,7 +132,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._stopped_event.wait() async def _handle_interruptions(self, frame: Frame): - if not self.allow_interruptions: + if not self.interruptions_allowed: return if isinstance(frame, StartInterruptionFrame):