diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f5bac054..f557463a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,11 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unrelease] +### Added + +- Added `send_initial_empty_metrics` flag to `PipelineParams` to request for + initial empty metrics (zero values). True by default. + ### Fixed +- Fixed initial metrics format. It was using the wrong keys name/time instead of + processor/value. + - STT services should be using ISO 8601 time format for transcription frames. -- Fix an issue that would cause Daily transport to show a stop transcription +- Fixed an issue that would cause Daily transport to show a stop transcription error when actually none occurred. ## [0.0.37] - 2024-07-22 diff --git a/src/pipecat/pipeline/pipeline.py b/src/pipecat/pipeline/pipeline.py index 165717ad3..6805cfad0 100644 --- a/src/pipecat/pipeline/pipeline.py +++ b/src/pipecat/pipeline/pipeline.py @@ -64,7 +64,7 @@ def processors_with_metrics(self): services = [] for p in self._processors: if isinstance(p, BasePipeline): - services += p.processors_with_metrics() + services.extend(p.processors_with_metrics()) elif p.can_generate_metrics(): services.append(p) return services diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 3424aac86..9f45f33ea 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -21,6 +21,7 @@ class PipelineParams(BaseModel): allow_interruptions: bool = False enable_metrics: bool = False + send_initial_empty_metrics: bool = True report_only_initial_ttfb: bool = False @@ -95,8 +96,8 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): def _initial_metrics_frame(self) -> MetricsFrame: processors = self._pipeline.processors_with_metrics() - ttfb = [{"name": p.name, "time": 0.0} for p in processors] - processing = [{"name": p.name, "time": 0.0} for p in processors] + ttfb = [{"processor": p.name, "value": 0.0} for p in processors] + processing = [{"processor": p.name, "value": 0.0} for p in processors] return MetricsFrame(ttfb=ttfb, processing=processing) async def _process_down_queue(self): @@ -106,7 +107,9 @@ async def _process_down_queue(self): report_only_initial_ttfb=self._params.report_only_initial_ttfb ) await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM) - await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM) + + if self._params.send_initial_empty_metrics: + await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM) running = True should_cleanup = True diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index f9e76fb6d..986b59bcd 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -19,6 +19,7 @@ LLMMessagesAppendFrame, LLMMessagesUpdateFrame, LLMModelUpdateFrame, + MetricsFrame, StartFrame, SystemFrame, TTSSpeakFrame, @@ -456,6 +457,13 @@ async def _handle_setup(self, setup: RTVISetup | None): start_frame = dataclasses.replace(self._start_frame) await self.push_frame(start_frame) + # Send new initial metrics with the new processors + processors = parent.processors_with_metrics() + processors.extend(self._pipeline.processors_with_metrics()) + ttfb = [{"processor": p.name, "value": 0.0} for p in processors] + processing = [{"processor": p.name, "value": 0.0} for p in processors] + await self.push_frame(MetricsFrame(ttfb=ttfb, processing=processing)) + message = RTVIBotReady() frame = TransportMessageFrame(message=message.model_dump(exclude_none=True)) await self.push_frame(frame) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 142a38c02..8cfbc7530 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -678,12 +678,15 @@ async def send_message(self, frame: TransportMessageFrame): await self._client.send_message(frame) async def send_metrics(self, frame: MetricsFrame): + metrics = {} + if frame.ttfb: + metrics["ttfb"] = frame.ttfb + if frame.processing: + metrics["processing"] = frame.processing + message = DailyTransportMessageFrame(message={ "type": "pipecat-metrics", - "metrics": { - "ttfb": frame.ttfb or [], - "processing": frame.processing or [], - }, + "metrics": metrics }) await self._client.send_message(message)