From e52d18e42d1218b96dde70031b77af265180c8a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 15 Oct 2024 13:39:31 -0700 Subject: [PATCH 1/5] processors(audiobuffer): make functions public --- .../audio/audio_buffer_processor.py | 19 +++++----- src/pipecat/services/canonical.py | 38 ++++++++++--------- src/pipecat/transports/services/livekit.py | 6 +++ 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py index 0c07d6815..25a1f0237 100644 --- a/src/pipecat/processors/audio/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -1,18 +1,17 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + import wave from io import BytesIO from pipecat.frames.frames import ( AudioRawFrame, - BotInterruptionFrame, - BotStartedSpeakingFrame, - BotStoppedSpeakingFrame, Frame, InputAudioRawFrame, OutputAudioRawFrame, - StartInterruptionFrame, - StopInterruptionFrame, - UserStartedSpeakingFrame, - UserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -39,18 +38,18 @@ def __init__(self, **kwargs): def _buffer_has_audio(self, buffer: bytearray): return buffer is not None and len(buffer) > 0 - def _has_audio(self): + def has_audio(self): return ( self._buffer_has_audio(self._user_audio_buffer) and self._buffer_has_audio(self._assistant_audio_buffer) and self._sample_rate is not None ) - def _reset_audio_buffer(self): + def reset_audio_buffer(self): self._user_audio_buffer = bytearray() self._assistant_audio_buffer = bytearray() - def _merge_audio_buffers(self): + def merge_audio_buffers(self): with BytesIO() as buffer: with wave.open(buffer, "wb") as wf: wf.setnchannels(2) diff --git a/src/pipecat/services/canonical.py b/src/pipecat/services/canonical.py index 0f4fb6a2f..048a6a4ee 100644 --- a/src/pipecat/services/canonical.py +++ b/src/pipecat/services/canonical.py @@ -1,9 +1,21 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import aiohttp import os import uuid + from datetime import datetime from typing import Dict, List, Tuple -import aiohttp +from pipecat.frames.frames import CancelFrame, EndFrame, Frame +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import AIService + from loguru import logger try: @@ -18,27 +30,18 @@ raise Exception(f"Missing module: {e}") -from pipecat.frames.frames import CancelFrame, EndFrame, Frame -from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AIService - # Multipart upload part size in bytes, cannot be smaller than 5MB PART_SIZE = 1024 * 1024 * 5 -""" -This class extends AudioBufferProcessor to handle audio processing and uploading -for the Canonical Voice API. -""" class CanonicalMetricsService(AIService): - """ - Initialize a CanonicalAudioProcessor instance. + """Initialize a CanonicalAudioProcessor instance. - This class extends AudioBufferProcessor to handle audio processing and uploading - for the Canonical Voice API. + This class uses an AudioBufferProcessor to get the conversation audio and + uploads it to Canonical Voice API for audio processing. Args: + call_id (str): Your unique identifier for the call. This is used to match the call in the Canonical Voice system to the call in your system. assistant (str): Identifier for the AI assistant. This can be whatever you want, it's intended for you convenience so you can distinguish between different assistants and a grouping mechanism for calls. @@ -52,7 +55,6 @@ class CanonicalMetricsService(AIService): output_dir (str): Directory path for saving temporary audio files. The constructor also ensures that the output directory exists. - This class requires a Canonical API key to be set in the CANONICAL_API_KEY environment variable. """ def __init__( @@ -90,17 +92,17 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): async def _process_audio(self): pipeline = self._audio_buffer_processor - if pipeline._has_audio(): + if pipeline.has_audio(): os.makedirs(self._output_dir, exist_ok=True) filename = self._get_output_filename() - wave_data = pipeline._merge_audio_buffers() + wave_data = pipeline.merge_audio_buffers() async with aiofiles.open(filename, "wb") as file: await file.write(wave_data) try: await self._multipart_upload(filename) - pipeline._reset_audio_buffer() + pipeline.reset_audio_buffer() await aiofiles.os.remove(filename) except FileNotFoundError: pass diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index d8aff2ecb..25678db4e 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -1,3 +1,9 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + import asyncio from dataclasses import dataclass from typing import Any, Awaitable, Callable, List From 04da51c7d8a83beb27ded242859d97d498148f48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 15 Oct 2024 17:46:48 -0700 Subject: [PATCH 2/5] transport(base_output): push EndFrame downstream at the right time --- src/pipecat/transports/base_output.py | 33 +++++++++++++++++---------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 9bd508f1d..8b9cfe858 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -95,15 +95,6 @@ async def start(self, frame: StartFrame): self._audio_out_task = self.get_event_loop().create_task(self._audio_out_task_handler()) async def stop(self, frame: EndFrame): - # At this point we have enqueued an EndFrame and we need to wait for - # that EndFrame to be processed by the sink tasks. We also need to wait - # for these tasks before cancelling the camera and audio tasks below - # because they might be still rendering. - if self._sink_task: - await self._sink_task - if self._sink_clock_task: - await self._sink_clock_task - # Cancel and wait for the camera output task to finish. if self._camera_out_task and self._params.camera_out_enabled: self._camera_out_task.cancel() @@ -191,9 +182,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.push_frame(frame, direction) # Control frames. elif isinstance(frame, EndFrame): - await self._sink_clock_queue.put((sys.maxsize, frame.id, frame)) - await self._sink_queue.put(frame) + # Process sink tasks. + await self._stop_sink_tasks(frame) + # Now we can stop. await self.stop(frame) + # We finally push EndFrame down so PipelineTask stops nicely. + await self.push_frame(frame, direction) # Other frames. elif isinstance(frame, OutputAudioRawFrame): await self._handle_audio(frame) @@ -205,6 +199,20 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): else: await self._sink_queue.put(frame) + async def _stop_sink_tasks(self, frame: EndFrame): + # Let the sink tasks process the queue until they reach this EndFrame. + await self._sink_clock_queue.put((sys.maxsize, frame.id, frame)) + await self._sink_queue.put(frame) + + # At this point we have enqueued an EndFrame and we need to wait for + # that EndFrame to be processed by the sink tasks. We also need to wait + # for these tasks before cancelling the camera and audio tasks below + # because they might be still rendering. + if self._sink_task: + await self._sink_task + if self._sink_clock_task: + await self._sink_clock_task + async def _handle_interruptions(self, frame: Frame): if not self.interruptions_allowed: return @@ -278,7 +286,8 @@ async def _sink_frame_handler(self, frame: Frame): elif isinstance(frame, TTSStoppedFrame): await self._bot_stopped_speaking() await self.push_frame(frame) - else: + # We will push EndFrame later. + elif not isinstance(frame, EndFrame): await self.push_frame(frame) async def _sink_task_handler(self): From 713dcb7a4dedcc52163408e794bea404660722f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 15 Oct 2024 17:47:12 -0700 Subject: [PATCH 3/5] transports(daily): cancel messages task when canceling --- src/pipecat/transports/services/daily.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 7c124e376..7c537bd53 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -740,13 +740,17 @@ async def stop(self, frame: EndFrame): self._messages_task.cancel() await self._messages_task self._messages_task = None - self._messages_task = None # Leave the room. await self._client.leave() async def cancel(self, frame: CancelFrame): # Parent stop. await super().cancel(frame) + # Cancel messages task + if self._messages_task: + self._messages_task.cancel() + await self._messages_task + self._messages_task = None # Leave the room. await self._client.leave() From 3910aeb4decc16d5895ed7974afbae67201047f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 15 Oct 2024 17:47:29 -0700 Subject: [PATCH 4/5] transports(daily): don't send messages if not joined --- src/pipecat/transports/services/daily.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 7c537bd53..f28efdcb8 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -241,7 +241,7 @@ def set_callbacks(self, callbacks: DailyCallbacks): self._callbacks = callbacks async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): - if not self._client: + if not self._joined or self._leaving: return participant_id = None From 4eb2c95b63f8bd0017c844973a3eca9d7ea3ac11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 15 Oct 2024 13:40:05 -0700 Subject: [PATCH 5/5] update CHANGELOG for 0.0.44 --- CHANGELOG.md | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e21b4655..07c30c002 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,14 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.0.44] - 2024-10-15 ### Added +- Added support for OpenAI Realtime API with the new + `OpenAILLMServiceRealtimeBeta` processor. + (see https://platform.openai.com/docs/guides/realtime/overview) + - Added `RTVIBotTranscriptionProcessor` which will send the RTVI `bot-transcription` protocol message. These are TTS text aggregated (into sentences) messages. @@ -17,14 +21,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `filter_code` to filter code from text and `filter_tables` to filter tables from text. +- Added `CanonicalMetricsService`. This processor uses the new + `AudioBufferProcessor` to capture conversation audio and later send it to + Canonical AI. + (see https://canonical.chat/) + +- Added `AudioBufferProcessor`. This processor can be used to buffer mixed user and + bot audio. This can later be saved into an audio file or processed by some + audio analyzer. + +- Added `on_first_participant_joined` event to `LiveKitTransport`. + +### Changed + +- LLM text responses are now logged properly as unicode characters. + +- `UserStartedSpeakingFrame`, `UserStoppedSpeakingFrame`, + `BotStartedSpeakingFrame`, `BotStoppedSpeakingFrame`, `BotSpeakingFrame` and + `UserImageRequestFrame` are now based from `SystemFrame` + ### Fixed - Merge `RTVIBotLLMProcessor`/`RTVIBotLLMTextProcessor` and `RTVIBotTTSProcessor`/`RTVIBotTTSTextProcessor` to avoid out of order issues. +- Fixed an issue in RTVI protocol that could cause a `bot-llm-stopped` or + `bot-tts-stopped` message to be sent before a `bot-llm-text` or `bot-tts-text` + message. + +- Fixed `DeepgramSTTService` constructor settings not being merged with default + ones. + - Fixed an issue in Daily transport that would cause tasks to be hanging if urgent transport messages were being sent from a transport event handler. +- Fixed an issue in `BaseOutputTransport` that would cause `EndFrame` to be + pushed downed too early and call `FrameProcessor.cleanup()` before letting the + transport stop properly. + ## [0.0.43] - 2024-10-10 ### Added