From af5a7e9092a06862003a1794572f485ed6f86b6e Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 16 Oct 2024 11:20:02 -0400 Subject: [PATCH 1/2] Move metrics from transport to rtvi --- src/pipecat/processors/frameworks/rtvi.py | 61 +++++++++++++++++++++- src/pipecat/transports/base_output.py | 25 +++------ src/pipecat/transports/services/daily.py | 32 ------------ src/pipecat/transports/services/livekit.py | 37 ++----------- 4 files changed, 71 insertions(+), 84 deletions(-) diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 220f18cd2..4ed63b723 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -6,7 +6,17 @@ import asyncio from dataclasses import dataclass -from typing import Any, Awaitable, Callable, Dict, List, Literal, Optional, Union +from typing import ( + Any, + Awaitable, + Callable, + Dict, + List, + Literal, + Mapping, + Optional, + Union, +) from loguru import logger from pydantic import BaseModel, Field, PrivateAttr, ValidationError @@ -24,6 +34,7 @@ InterimTranscriptionFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + MetricsFrame, StartFrame, SystemFrame, TextFrame, @@ -35,6 +46,12 @@ UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) +from pipecat.metrics.metrics import ( + LLMUsageMetricsData, + ProcessingMetricsData, + TTFBMetricsData, + TTSUsageMetricsData, +) from pipecat.processors.aggregators.openai_llm_context import ( OpenAILLMContext, OpenAILLMContextFrame, @@ -343,6 +360,12 @@ class RTVIBotStoppedSpeakingMessage(BaseModel): type: Literal["bot-stopped-speaking"] = "bot-stopped-speaking" +class RTVIMetricsMessage(BaseModel): + label: Literal["rtvi-ai"] = "rtvi-ai" + type: Literal["metrics"] = "metrics" + data: Mapping[str, Any] + + class RTVIProcessorParams(BaseModel): send_bot_ready: bool = True @@ -509,6 +532,42 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._push_transport_message_urgent(message) +class RTVIMetricsProcessor(RTVIFrameProcessor): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + await self.push_frame(frame, direction) + + if isinstance(frame, MetricsFrame): + await self._handle_metrics(frame) + + async def _handle_metrics(self, frame: MetricsFrame): + metrics = {} + for d in frame.data: + if isinstance(d, TTFBMetricsData): + if "ttfb" not in metrics: + metrics["ttfb"] = [] + metrics["ttfb"].append(d.model_dump(exclude_none=True)) + elif isinstance(d, ProcessingMetricsData): + if "processing" not in metrics: + metrics["processing"] = [] + metrics["processing"].append(d.model_dump(exclude_none=True)) + elif isinstance(d, LLMUsageMetricsData): + if "tokens" not in metrics: + metrics["tokens"] = [] + metrics["tokens"].append(d.value.model_dump(exclude_none=True)) + elif isinstance(d, TTSUsageMetricsData): + if "characters" not in metrics: + metrics["characters"] = [] + metrics["characters"].append(d.model_dump(exclude_none=True)) + + message = RTVIMetricsMessage(data=metrics) + await self._push_transport_message_urgent(message) + + class RTVIProcessor(FrameProcessor): def __init__( self, diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 8b9cfe858..e61bcc448 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -6,37 +6,34 @@ import asyncio import itertools -import time import sys +import time +from typing import List +from loguru import logger from PIL import Image -from typing import List -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import ( BotSpeakingFrame, BotStartedSpeakingFrame, BotStoppedSpeakingFrame, CancelFrame, - MetricsFrame, + EndFrame, + Frame, OutputAudioRawFrame, OutputImageRawFrame, SpriteFrame, StartFrame, - EndFrame, - Frame, StartInterruptionFrame, StopInterruptionFrame, SystemFrame, - TTSStartedFrame, - TTSStoppedFrame, TransportMessageFrame, TransportMessageUrgentFrame, + TTSStartedFrame, + TTSStoppedFrame, ) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.base_transport import TransportParams - -from loguru import logger - from pipecat.utils.time import nanoseconds_to_seconds @@ -141,9 +138,6 @@ async def cancel(self, frame: CancelFrame): async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): pass - async def send_metrics(self, frame: MetricsFrame): - pass - async def write_frame_to_camera(self, frame: OutputImageRawFrame): pass @@ -173,9 +167,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, (StartInterruptionFrame, StopInterruptionFrame)): await self.push_frame(frame, direction) await self._handle_interruptions(frame) - elif isinstance(frame, MetricsFrame): - await self.push_frame(frame, direction) - await self.send_metrics(frame) elif isinstance(frame, TransportMessageUrgentFrame): await self.send_message(frame) elif isinstance(frame, SystemFrame): diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 582c5669d..d1e7295d4 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -28,7 +28,6 @@ Frame, InputAudioRawFrame, InterimTranscriptionFrame, - MetricsFrame, OutputAudioRawFrame, OutputImageRawFrame, SpriteFrame, @@ -39,12 +38,6 @@ UserImageRawFrame, UserImageRequestFrame, ) -from pipecat.metrics.metrics import ( - LLMUsageMetricsData, - ProcessingMetricsData, - TTFBMetricsData, - TTSUsageMetricsData, -) from pipecat.processors.frame_processor import FrameDirection from pipecat.transcriptions.language import Language from pipecat.transports.base_input import BaseInputTransport @@ -759,31 +752,6 @@ async def cleanup(self): async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): await self._messages_queue.put(frame) - async def send_metrics(self, frame: MetricsFrame): - metrics = {} - for d in frame.data: - if isinstance(d, TTFBMetricsData): - if "ttfb" not in metrics: - metrics["ttfb"] = [] - metrics["ttfb"].append(d.model_dump(exclude_none=True)) - elif isinstance(d, ProcessingMetricsData): - if "processing" not in metrics: - metrics["processing"] = [] - metrics["processing"].append(d.model_dump(exclude_none=True)) - elif isinstance(d, LLMUsageMetricsData): - if "tokens" not in metrics: - metrics["tokens"] = [] - metrics["tokens"].append(d.value.model_dump(exclude_none=True)) - elif isinstance(d, TTSUsageMetricsData): - if "characters" not in metrics: - metrics["characters"] = [] - metrics["characters"].append(d.model_dump(exclude_none=True)) - - message = DailyTransportMessageFrame( - message={"label": "rtvi-ai", "type": "metrics", "data": metrics} - ) - await self._messages_queue.put(message) - async def write_raw_audio_frames(self, frames: bytes): await self._client.write_raw_audio_frames(frames) diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index 25678db4e..73a6445a3 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -10,31 +10,25 @@ import numpy as np from loguru import logger +from pydantic import BaseModel +from scipy import signal + from pipecat.frames.frames import ( AudioRawFrame, CancelFrame, EndFrame, Frame, InputAudioRawFrame, - MetricsFrame, OutputAudioRawFrame, StartFrame, TransportMessageFrame, TransportMessageUrgentFrame, ) -from pipecat.metrics.metrics import ( - LLMUsageMetricsData, - ProcessingMetricsData, - TTFBMetricsData, - TTSUsageMetricsData, -) from pipecat.processors.frame_processor import FrameDirection from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.vad.vad_analyzer import VADAnalyzer -from pydantic import BaseModel -from scipy import signal try: from livekit import rtc @@ -450,31 +444,6 @@ async def send_message(self, frame: TransportMessageFrame | TransportMessageUrge else: await self._client.send_data(frame.message.encode()) - async def send_metrics(self, frame: MetricsFrame): - metrics = {} - for d in frame.data: - if isinstance(d, TTFBMetricsData): - if "ttfb" not in metrics: - metrics["ttfb"] = [] - metrics["ttfb"].append(d.model_dump(exclude_none=True)) - elif isinstance(d, ProcessingMetricsData): - if "processing" not in metrics: - metrics["processing"] = [] - metrics["processing"].append(d.model_dump(exclude_none=True)) - elif isinstance(d, LLMUsageMetricsData): - if "tokens" not in metrics: - metrics["tokens"] = [] - metrics["tokens"].append(d.value.model_dump(exclude_none=True)) - elif isinstance(d, TTSUsageMetricsData): - if "characters" not in metrics: - metrics["characters"] = [] - metrics["characters"].append(d.model_dump(exclude_none=True)) - - message = LiveKitTransportMessageFrame( - message={"type": "pipecat-metrics", "metrics": metrics} - ) - await self._client.send_data(str(message.message).encode()) - async def write_raw_audio_frames(self, frames: bytes): livekit_audio = self._convert_pipecat_audio_to_livekit(frames) await self._client.publish_audio(livekit_audio) From 5760fadb444033e0426b68eb8c50b1ee2f984f02 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 16 Oct 2024 11:22:55 -0400 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07c30c002..efb53f595 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Changed + +- Metrics messages have moved out from the transport's base output into RTVI. + ## [0.0.44] - 2024-10-15 ### Added