From d654208b623b484b7017fcc7d48633a7b5ca06d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 19 Sep 2024 19:31:56 -0700 Subject: [PATCH] introduce input/output audio and image frames --- .../05a-local-sync-speech-and-image.py | 12 +++- examples/foundational/06a-image-sync.py | 13 +++- examples/foundational/11-sound-effects.py | 6 +- examples/moondream-chatbot/bot.py | 11 +++- examples/patient-intake/bot.py | 7 +- examples/simple-chatbot/bot.py | 12 ++-- .../storytelling-chatbot/src/utils/helpers.py | 11 ++-- src/pipecat/frames/frames.py | 66 ++++++++++++++----- .../aggregators/openai_llm_context.py | 6 +- .../aggregators/vision_image_frame.py | 12 +++- .../processors/gstreamer/pipeline_source.py | 12 ++-- src/pipecat/serializers/livekit.py | 13 ++-- src/pipecat/serializers/protobuf.py | 29 +++++--- src/pipecat/serializers/twilio.py | 14 ++-- src/pipecat/services/ai_services.py | 3 +- src/pipecat/services/azure.py | 4 +- src/pipecat/services/cartesia.py | 6 +- src/pipecat/services/deepgram.py | 5 +- src/pipecat/services/elevenlabs.py | 4 +- src/pipecat/services/lmnt.py | 4 +- src/pipecat/services/openai.py | 4 +- src/pipecat/services/playht.py | 8 ++- src/pipecat/services/together.py | 12 ++-- src/pipecat/services/xtts.py | 6 +- src/pipecat/transports/base_input.py | 5 +- src/pipecat/transports/base_output.py | 39 ++++++----- src/pipecat/transports/local/audio.py | 8 +-- src/pipecat/transports/local/tk.py | 13 ++-- src/pipecat/transports/services/daily.py | 20 +++--- 29 files changed, 226 insertions(+), 139 deletions(-) diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index 66a6e7f57..d9a0e792e 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -11,7 +11,13 @@ import tkinter as tk -from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame +from pipecat.frames.frames import ( + Frame, + OutputAudioRawFrame, + TTSAudioRawFrame, + URLImageRawFrame, + LLMMessagesFrame, + TextFrame) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline @@ -65,9 +71,9 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): + if isinstance(frame, TTSAudioRawFrame): self.audio.extend(frame.audio) - self.frame = AudioRawFrame( + self.frame = OutputAudioRawFrame( bytes(self.audio), frame.sample_rate, frame.num_channels) class ImageGrabber(FrameProcessor): diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 6b3e58cf7..db48b709e 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -11,7 +11,7 @@ from PIL import Image -from pipecat.frames.frames import ImageRawFrame, Frame, SystemFrame, TextFrame +from pipecat.frames.frames import Frame, OutputImageRawFrame, SystemFrame, TextFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask @@ -52,9 +52,16 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM: - await self.push_frame(ImageRawFrame(image=self._speaking_image_bytes, size=(1024, 1024), format=self._speaking_image_format)) + await self.push_frame(OutputImageRawFrame( + image=self._speaking_image_bytes, + size=(1024, 1024), + format=self._speaking_image_format) + ) await self.push_frame(frame) - await self.push_frame(ImageRawFrame(image=self._waiting_image_bytes, size=(1024, 1024), format=self._waiting_image_format)) + await self.push_frame(OutputImageRawFrame( + image=self._waiting_image_bytes, + size=(1024, 1024), + format=self._waiting_image_format)) else: await self.push_frame(frame) diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 9dc4dc99b..21b03bedf 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -12,9 +12,9 @@ from pipecat.frames.frames import ( Frame, - AudioRawFrame, LLMFullResponseEndFrame, LLMMessagesFrame, + OutputAudioRawFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -53,8 +53,8 @@ filename = os.path.splitext(os.path.basename(full_path))[0] # Open the image and convert it to bytes with wave.open(full_path) as audio_file: - sounds[file] = AudioRawFrame(audio_file.readframes(-1), - audio_file.getframerate(), audio_file.getnchannels()) + sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1), + audio_file.getframerate(), audio_file.getnchannels()) class OutboundSoundEffectWrapper(FrameProcessor): diff --git a/examples/moondream-chatbot/bot.py b/examples/moondream-chatbot/bot.py index 32bbf7e6b..d14a5f016 100644 --- a/examples/moondream-chatbot/bot.py +++ b/examples/moondream-chatbot/bot.py @@ -13,10 +13,11 @@ from pipecat.frames.frames import ( ImageRawFrame, + OutputImageRawFrame, SpriteFrame, Frame, LLMMessagesFrame, - AudioRawFrame, + TTSAudioRawFrame, TTSStoppedFrame, TextFrame, UserImageRawFrame, @@ -59,7 +60,11 @@ # Get the filename without the extension to use as the dictionary key # Open the image and convert it to bytes with Image.open(full_path) as img: - sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)) + sprites.append(OutputImageRawFrame( + image=img.tobytes(), + size=img.size, + format=img.format) + ) flipped = sprites[::-1] sprites.extend(flipped) @@ -82,7 +87,7 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): + if isinstance(frame, TTSAudioRawFrame): if not self._is_talking: await self.push_frame(talking_frame) self._is_talking = True diff --git a/examples/patient-intake/bot.py b/examples/patient-intake/bot.py index 7dc404c52..33ca9e26d 100644 --- a/examples/patient-intake/bot.py +++ b/examples/patient-intake/bot.py @@ -10,7 +10,7 @@ import sys import wave -from pipecat.frames.frames import AudioRawFrame +from pipecat.frames.frames import OutputAudioRawFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -49,8 +49,9 @@ filename = os.path.splitext(os.path.basename(full_path))[0] # Open the sound and convert it to bytes with wave.open(full_path) as audio_file: - sounds[file] = AudioRawFrame(audio_file.readframes(-1), - audio_file.getframerate(), audio_file.getnchannels()) + sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1), + audio_file.getframerate(), + audio_file.getnchannels()) class IntakeProcessor: diff --git a/examples/simple-chatbot/bot.py b/examples/simple-chatbot/bot.py index 1664e47fb..f179dfeb5 100644 --- a/examples/simple-chatbot/bot.py +++ b/examples/simple-chatbot/bot.py @@ -16,11 +16,11 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator from pipecat.frames.frames import ( - AudioRawFrame, - ImageRawFrame, + OutputImageRawFrame, SpriteFrame, Frame, LLMMessagesFrame, + TTSAudioRawFrame, TTSStoppedFrame ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -49,7 +49,11 @@ # Get the filename without the extension to use as the dictionary key # Open the image and convert it to bytes with Image.open(full_path) as img: - sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)) + sprites.append(OutputImageRawFrame( + image=img.tobytes(), + size=img.size, + format=img.format) + ) flipped = sprites[::-1] sprites.extend(flipped) @@ -72,7 +76,7 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): + if isinstance(frame, TTSAudioRawFrame): if not self._is_talking: await self.push_frame(talking_frame) self._is_talking = True diff --git a/examples/storytelling-chatbot/src/utils/helpers.py b/examples/storytelling-chatbot/src/utils/helpers.py index 2c576fdff..743a04c97 100644 --- a/examples/storytelling-chatbot/src/utils/helpers.py +++ b/examples/storytelling-chatbot/src/utils/helpers.py @@ -2,7 +2,7 @@ import wave from PIL import Image -from pipecat.frames.frames import AudioRawFrame, ImageRawFrame +from pipecat.frames.frames import OutputAudioRawFrame, OutputImageRawFrame script_dir = os.path.dirname(__file__) @@ -16,7 +16,8 @@ def load_images(image_files): filename = os.path.splitext(os.path.basename(full_path))[0] # Open the image and convert it to bytes with Image.open(full_path) as img: - images[filename] = ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format) + images[filename] = OutputImageRawFrame( + image=img.tobytes(), size=img.size, format=img.format) return images @@ -30,8 +31,8 @@ def load_sounds(sound_files): filename = os.path.splitext(os.path.basename(full_path))[0] # Open the sound and convert it to bytes with wave.open(full_path) as audio_file: - sounds[filename] = AudioRawFrame(audio=audio_file.readframes(-1), - sample_rate=audio_file.getframerate(), - num_channels=audio_file.getnchannels()) + sounds[filename] = OutputAudioRawFrame(audio=audio_file.readframes(-1), + sample_rate=audio_file.getframerate(), + num_channels=audio_file.getnchannels()) return sounds diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 4d207fecd..d45487150 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -41,10 +41,7 @@ class DataFrame(Frame): @dataclass class AudioRawFrame(DataFrame): - """A chunk of audio. Will be played by the transport if the transport's - microphone has been enabled. - - """ + """A chunk of audio.""" audio: bytes sample_rate: int num_channels: int @@ -58,6 +55,31 @@ def __str__(self): return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})" +@dataclass +class InputAudioRawFrame(AudioRawFrame): + """A chunk of audio usually coming from an input transport. + + """ + pass + + +@dataclass +class OutputAudioRawFrame(AudioRawFrame): + """A chunk of audio. Will be played by the output transport if the + transport's microphone has been enabled. + + """ + pass + + +@dataclass +class TTSAudioRawFrame(OutputAudioRawFrame): + """A chunk of output audio generated by a TTS service. + + """ + pass + + @dataclass class ImageRawFrame(DataFrame): """An image. Will be shown by the transport if the transport's camera is @@ -74,20 +96,30 @@ def __str__(self): @dataclass -class URLImageRawFrame(ImageRawFrame): - """An image with an associated URL. Will be shown by the transport if the +class InputImageRawFrame(ImageRawFrame): + pass + + +@dataclass +class OutputImageRawFrame(ImageRawFrame): + pass + + +@dataclass +class UserImageRawFrame(InputImageRawFrame): + """An image associated to a user. Will be shown by the transport if the transport's camera is enabled. """ - url: str | None + user_id: str def __str__(self): pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})" + return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})" @dataclass -class VisionImageRawFrame(ImageRawFrame): +class VisionImageRawFrame(InputImageRawFrame): """An image with an associated text to ask for a description of it. Will be shown by the transport if the transport's camera is enabled. @@ -100,16 +132,16 @@ def __str__(self): @dataclass -class UserImageRawFrame(ImageRawFrame): - """An image associated to a user. Will be shown by the transport if the +class URLImageRawFrame(OutputImageRawFrame): + """An image with an associated URL. Will be shown by the transport if the transport's camera is enabled. """ - user_id: str + url: str | None def __str__(self): pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})" + return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})" @dataclass @@ -420,10 +452,10 @@ class BotSpeakingFrame(ControlFrame): @dataclass class TTSStartedFrame(ControlFrame): """Used to indicate the beginning of a TTS response. Following - AudioRawFrames are part of the TTS response until an TTSStoppedFrame. These - frames can be used for aggregating audio frames in a transport to optimize - the size of frames sent to the session, without needing to control this in - the TTS service. + TTSAudioRawFrames are part of the TTS response until an + TTSStoppedFrame. These frames can be used for aggregating audio frames in a + transport to optimize the size of frames sent to the session, without + needing to control this in the TTS service. """ pass diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index 0d8b19a36..3d1acf32e 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -13,7 +13,11 @@ from PIL import Image -from pipecat.frames.frames import Frame, VisionImageRawFrame, FunctionCallInProgressFrame, FunctionCallResultFrame +from pipecat.frames.frames import ( + Frame, + VisionImageRawFrame, + FunctionCallInProgressFrame, + FunctionCallResultFrame) from pipecat.processors.frame_processor import FrameProcessor from loguru import logger diff --git a/src/pipecat/processors/aggregators/vision_image_frame.py b/src/pipecat/processors/aggregators/vision_image_frame.py index 0bbb10841..97f6b5ec8 100644 --- a/src/pipecat/processors/aggregators/vision_image_frame.py +++ b/src/pipecat/processors/aggregators/vision_image_frame.py @@ -4,13 +4,19 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from pipecat.frames.frames import Frame, ImageRawFrame, TextFrame, VisionImageRawFrame +from pipecat.frames.frames import ( + Frame, + InputImageRawFrame, + TextFrame, + VisionImageRawFrame +) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class VisionImageFrameAggregator(FrameProcessor): """This aggregator waits for a consecutive TextFrame and an - ImageFrame. After the ImageFrame arrives it will output a VisionImageFrame. + InputImageRawFrame. After the InputImageRawFrame arrives it will output a + VisionImageRawFrame. >>> from pipecat.frames.frames import ImageFrame @@ -34,7 +40,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): self._describe_text = frame.text - elif isinstance(frame, ImageRawFrame): + elif isinstance(frame, InputImageRawFrame): if self._describe_text: frame = VisionImageRawFrame( text=self._describe_text, diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index 8d46105a7..f852dd641 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -9,11 +9,11 @@ from pydantic import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, - ImageRawFrame, + OutputAudioRawFrame, + OutputImageRawFrame, StartFrame, SystemFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -182,9 +182,9 @@ def _decodebin_video(self, pad: Gst.Pad): def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): buffer = appsink.pull_sample().get_buffer() (_, info) = buffer.map(Gst.MapFlags.READ) - frame = AudioRawFrame(audio=info.data, - sample_rate=self._out_params.audio_sample_rate, - num_channels=self._out_params.audio_channels) + frame = OutputAudioRawFrame(audio=info.data, + sample_rate=self._out_params.audio_sample_rate, + num_channels=self._out_params.audio_channels) asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK @@ -192,7 +192,7 @@ def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): def _appsink_video_new_sample(self, appsink: GstApp.AppSink): buffer = appsink.pull_sample().get_buffer() (_, info) = buffer.map(Gst.MapFlags.READ) - frame = ImageRawFrame( + frame = OutputImageRawFrame( image=info.data, size=(self._out_params.video_width, self._out_params.video_height), format="RGB") diff --git a/src/pipecat/serializers/livekit.py b/src/pipecat/serializers/livekit.py index 7a0e8afd1..fec5243f5 100644 --- a/src/pipecat/serializers/livekit.py +++ b/src/pipecat/serializers/livekit.py @@ -7,7 +7,10 @@ import ctypes import pickle -from pipecat.frames.frames import AudioRawFrame, Frame +from pipecat.frames.frames import ( + Frame, + InputAudioRawFrame, + OutputAudioRawFrame) from pipecat.serializers.base_serializer import FrameSerializer from loguru import logger @@ -22,12 +25,8 @@ class LivekitFrameSerializer(FrameSerializer): - SERIALIZABLE_TYPES = { - AudioRawFrame: "audio", - } - def serialize(self, frame: Frame) -> str | bytes | None: - if not isinstance(frame, AudioRawFrame): + if not isinstance(frame, OutputAudioRawFrame): return None audio_frame = AudioFrame( data=frame.audio, @@ -39,7 +38,7 @@ def serialize(self, frame: Frame) -> str | bytes | None: def deserialize(self, data: str | bytes) -> Frame | None: audio_frame: AudioFrame = pickle.loads(data)['frame'] - return AudioRawFrame( + return InputAudioRawFrame( audio=bytes(audio_frame.data), sample_rate=audio_frame.sample_rate, num_channels=audio_frame.num_channels, diff --git a/src/pipecat/serializers/protobuf.py b/src/pipecat/serializers/protobuf.py index 0a6dee0b1..f0cc32c1d 100644 --- a/src/pipecat/serializers/protobuf.py +++ b/src/pipecat/serializers/protobuf.py @@ -8,7 +8,12 @@ import pipecat.frames.protobufs.frames_pb2 as frame_protos -from pipecat.frames.frames import AudioRawFrame, Frame, TextFrame, TranscriptionFrame +from pipecat.frames.frames import ( + Frame, + InputAudioRawFrame, + OutputAudioRawFrame, + TextFrame, + TranscriptionFrame) from pipecat.serializers.base_serializer import FrameSerializer from loguru import logger @@ -17,11 +22,17 @@ class ProtobufFrameSerializer(FrameSerializer): SERIALIZABLE_TYPES = { TextFrame: "text", - AudioRawFrame: "audio", + OutputAudioRawFrame: "audio", TranscriptionFrame: "transcription" } - SERIALIZABLE_FIELDS = {v: k for k, v in SERIALIZABLE_TYPES.items()} + DESERIALIZABLE_TYPES = { + TextFrame: "text", + InputAudioRawFrame: "audio", + TranscriptionFrame: "transcription" + } + + DESERIALIZABLE_FIELDS = {v: k for k, v in DESERIALIZABLE_TYPES.items()} def __init__(self): pass @@ -29,8 +40,8 @@ def __init__(self): def serialize(self, frame: Frame) -> str | bytes | None: proto_frame = frame_protos.Frame() if type(frame) not in self.SERIALIZABLE_TYPES: - raise ValueError( - f"Frame type {type(frame)} is not serializable. You may need to add it to ProtobufFrameSerializer.SERIALIZABLE_FIELDS.") + logger.warning(f"Frame type {type(frame)} is not serializable") + return None # ignoring linter errors; we check that type(frame) is in this dict above proto_optional_name = self.SERIALIZABLE_TYPES[type(frame)] # type: ignore @@ -48,8 +59,8 @@ def deserialize(self, data: str | bytes) -> Frame | None: >>> serializer = ProtobufFrameSerializer() >>> serializer.deserialize( - ... serializer.serialize(AudioFrame(data=b'1234567890'))) - AudioFrame(data=b'1234567890') + ... serializer.serialize(OutputAudioFrame(data=b'1234567890'))) + InputAudioFrame(data=b'1234567890') >>> serializer.deserialize( ... serializer.serialize(TextFrame(text='hello world'))) @@ -62,11 +73,11 @@ def deserialize(self, data: str | bytes) -> Frame | None: proto = frame_protos.Frame.FromString(data) which = proto.WhichOneof("frame") - if which not in self.SERIALIZABLE_FIELDS: + if which not in self.DESERIALIZABLE_FIELDS: logger.error("Unable to deserialize a valid frame") return None - class_name = self.SERIALIZABLE_FIELDS[which] + class_name = self.DESERIALIZABLE_FIELDS[which] args = getattr(proto, which) args_dict = {} for field in proto.DESCRIPTOR.fields_by_name[which].message_type.fields: diff --git a/src/pipecat/serializers/twilio.py b/src/pipecat/serializers/twilio.py index 583234ae4..23ec0d828 100644 --- a/src/pipecat/serializers/twilio.py +++ b/src/pipecat/serializers/twilio.py @@ -9,7 +9,11 @@ from pydantic import BaseModel -from pipecat.frames.frames import AudioRawFrame, Frame, StartInterruptionFrame +from pipecat.frames.frames import ( + Frame, + InputAudioRawFrame, + OutputAudioRawFrame, + StartInterruptionFrame) from pipecat.serializers.base_serializer import FrameSerializer from pipecat.utils.audio import ulaw_to_pcm, pcm_to_ulaw @@ -19,16 +23,12 @@ class InputParams(BaseModel): twilio_sample_rate: int = 8000 sample_rate: int = 16000 - SERIALIZABLE_TYPES = { - AudioRawFrame: "audio", - } - def __init__(self, stream_sid: str, params: InputParams = InputParams()): self._stream_sid = stream_sid self._params = params def serialize(self, frame: Frame) -> str | bytes | None: - if isinstance(frame, AudioRawFrame): + if isinstance(frame, OutputAudioRawFrame): data = frame.audio serialized_data = pcm_to_ulaw( @@ -61,7 +61,7 @@ def deserialize(self, data: str | bytes) -> Frame | None: payload, self._params.twilio_sample_rate, self._params.sample_rate) - audio_frame = AudioRawFrame( + audio_frame = InputAudioRawFrame( audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index d91782e42..c00064e5d 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -22,6 +22,7 @@ STTModelUpdateFrame, StartFrame, StartInterruptionFrame, + TTSAudioRawFrame, TTSLanguageUpdateFrame, TTSModelUpdateFrame, TTSSpeakFrame, @@ -277,7 +278,7 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect if self._push_stop_frames and ( isinstance(frame, StartInterruptionFrame) or isinstance(frame, TTSStartedFrame) or - isinstance(frame, AudioRawFrame) or + isinstance(frame, TTSAudioRawFrame) or isinstance(frame, TTSStoppedFrame)): await self._stop_frame_queue.put(frame) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index 90674fcc4..2c01fff3c 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -12,12 +12,12 @@ from typing import AsyncGenerator from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame, @@ -115,7 +115,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.stop_ttfb_metrics() await self.push_frame(TTSStartedFrame()) # Azure always sends a 44-byte header. Strip it off. - yield AudioRawFrame(audio=result.audio_data[44:], sample_rate=self._sample_rate, num_channels=1) + yield TTSAudioRawFrame(audio=result.audio_data[44:], sample_rate=self._sample_rate, num_channels=1) await self.push_frame(TTSStoppedFrame()) elif result.reason == ResultReason.Canceled: cancellation_details = result.cancellation_details diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 078926235..ac0a1b85b 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -15,10 +15,10 @@ CancelFrame, ErrorFrame, Frame, - AudioRawFrame, StartInterruptionFrame, StartFrame, EndFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, LLMFullResponseEndFrame @@ -201,7 +201,7 @@ async def _receive_task_handler(self): elif msg["type"] == "chunk": await self.stop_ttfb_metrics() self.start_word_timestamps() - frame = AudioRawFrame( + frame = TTSAudioRawFrame( audio=base64.b64decode(msg["data"]), sample_rate=self._output_format["sample_rate"], num_channels=1 @@ -326,7 +326,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.stop_ttfb_metrics() - frame = AudioRawFrame( + frame = TTSAudioRawFrame( audio=output["audio"], sample_rate=self._output_format["sample_rate"], num_channels=1 diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 708c3c511..5ce17d6ee 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -9,13 +9,13 @@ from typing import AsyncGenerator from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame) @@ -101,7 +101,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.push_frame(TTSStartedFrame()) async for data in r.content: await self.stop_ttfb_metrics() - frame = AudioRawFrame(audio=data, sample_rate=self._sample_rate, num_channels=1) + frame = TTSAudioRawFrame( + audio=data, sample_rate=self._sample_rate, num_channels=1) yield frame await self.push_frame(TTSStoppedFrame()) except Exception as e: diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 081a6bf5d..f78eba266 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -12,12 +12,12 @@ from pydantic import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame) from pipecat.processors.frame_processor import FrameDirection @@ -209,7 +209,7 @@ async def _receive_task_handler(self): self.start_word_timestamps() audio = base64.b64decode(msg["audio"]) - frame = AudioRawFrame(audio, self.sample_rate, 1) + frame = TTSAudioRawFrame(audio, self.sample_rate, 1) await self.push_frame(frame) if msg.get("alignment"): diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 60f0cb7df..9285f1583 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -10,13 +10,13 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, StartInterruptionFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, ) @@ -126,7 +126,7 @@ async def _receive_task_handler(self): await self.push_error(ErrorFrame(f'{self} error: {msg["error"]}')) elif "audio" in msg: await self.stop_ttfb_metrics() - frame = AudioRawFrame( + frame = TTSAudioRawFrame( audio=msg["audio"], sample_rate=self._output_format["sample_rate"], num_channels=1 diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 7483e2eb5..8a1355916 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -17,13 +17,13 @@ from PIL import Image from pipecat.frames.frames import ( - AudioRawFrame, ErrorFrame, Frame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, LLMMessagesFrame, LLMModelUpdateFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TextFrame, @@ -365,7 +365,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async for chunk in r.iter_bytes(8192): if len(chunk) > 0: await self.stop_ttfb_metrics() - frame = AudioRawFrame(chunk, self.sample_rate, 1) + frame = TTSAudioRawFrame(chunk, self.sample_rate, 1) yield frame await self.push_frame(TTSStoppedFrame()) except BadRequestError as e: diff --git a/src/pipecat/services/playht.py b/src/pipecat/services/playht.py index c3200fee9..ae8606e91 100644 --- a/src/pipecat/services/playht.py +++ b/src/pipecat/services/playht.py @@ -9,7 +9,11 @@ from typing import AsyncGenerator -from pipecat.frames.frames import AudioRawFrame, Frame, TTSStartedFrame, TTSStoppedFrame +from pipecat.frames.frames import ( + Frame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame) from pipecat.services.ai_services import TTSService from loguru import logger @@ -91,7 +95,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: else: if len(chunk): await self.stop_ttfb_metrics() - frame = AudioRawFrame(chunk, 16000, 1) + frame = TTSAudioRawFrame(chunk, 16000, 1) yield frame await self.push_frame(TTSStoppedFrame()) except Exception as e: diff --git a/src/pipecat/services/together.py b/src/pipecat/services/together.py index 49759cb01..dfd4c2966 100644 --- a/src/pipecat/services/together.py +++ b/src/pipecat/services/together.py @@ -4,23 +4,19 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import base64 import json -import io -import copy -from typing import List, Optional -from dataclasses import dataclass -from asyncio import CancelledError import re import uuid +from typing import List +from dataclasses import dataclass +from asyncio import CancelledError + from pipecat.frames.frames import ( Frame, LLMModelUpdateFrame, TextFrame, - VisionImageRawFrame, UserImageRequestFrame, - UserImageRawFrame, LLMMessagesFrame, LLMFullResponseStartFrame, LLMFullResponseEndFrame, diff --git a/src/pipecat/services/xtts.py b/src/pipecat/services/xtts.py index 38f0f9a64..69b754f55 100644 --- a/src/pipecat/services/xtts.py +++ b/src/pipecat/services/xtts.py @@ -9,10 +9,10 @@ from typing import Any, AsyncGenerator, Dict from pipecat.frames.frames import ( - AudioRawFrame, ErrorFrame, Frame, StartFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame) from pipecat.services.ai_services import TTSService @@ -128,7 +128,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: # Convert the numpy array back to bytes resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes() # Create the frame with the resampled audio - frame = AudioRawFrame(resampled_audio_bytes, 16000, 1) + frame = TTSAudioRawFrame(resampled_audio_bytes, 16000, 1) yield frame # Process any remaining data in the buffer @@ -136,7 +136,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: audio_np = np.frombuffer(buffer, dtype=np.int16) resampled_audio = resampy.resample(audio_np, 24000, 16000) resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes() - frame = AudioRawFrame(resampled_audio_bytes, 16000, 1) + frame = TTSAudioRawFrame(resampled_audio_bytes, 16000, 1) yield frame await self.push_frame(TTSStoppedFrame()) diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index de1ec8884..cd78f65e6 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -13,6 +13,7 @@ AudioRawFrame, BotInterruptionFrame, CancelFrame, + InputAudioRawFrame, StartFrame, EndFrame, Frame, @@ -59,7 +60,7 @@ async def cancel(self, frame: CancelFrame): def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer - async def push_audio_frame(self, frame: AudioRawFrame): + async def push_audio_frame(self, frame: InputAudioRawFrame): if self._params.audio_in_enabled or self._params.vad_enabled: await self._audio_in_queue.put(frame) @@ -151,7 +152,7 @@ async def _audio_task_handler(self): vad_state: VADState = VADState.QUIET while True: try: - frame: AudioRawFrame = await self._audio_in_queue.get() + frame: InputAudioRawFrame = await self._audio_in_queue.get() audio_passthrough = True diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 9b1b9c29e..263bb64f4 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -15,17 +15,17 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import ( - AudioRawFrame, BotSpeakingFrame, BotStartedSpeakingFrame, BotStoppedSpeakingFrame, CancelFrame, MetricsFrame, + OutputAudioRawFrame, + OutputImageRawFrame, SpriteFrame, StartFrame, EndFrame, Frame, - ImageRawFrame, StartInterruptionFrame, StopInterruptionFrame, SystemFrame, @@ -122,7 +122,7 @@ async def send_message(self, frame: TransportMessageFrame): async def send_metrics(self, frame: MetricsFrame): pass - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): pass async def write_raw_audio_frames(self, frames: bytes): @@ -162,9 +162,9 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._sink_queue.put(frame) await self.stop(frame) # Other frames. - elif isinstance(frame, AudioRawFrame): + elif isinstance(frame, OutputAudioRawFrame): await self._handle_audio(frame) - elif isinstance(frame, ImageRawFrame) or isinstance(frame, SpriteFrame): + elif isinstance(frame, OutputImageRawFrame) or isinstance(frame, SpriteFrame): await self._handle_image(frame) elif isinstance(frame, TransportMessageFrame) and frame.urgent: await self.send_message(frame) @@ -191,7 +191,7 @@ async def _handle_interruptions(self, frame: Frame): if self._bot_speaking: await self._bot_stopped_speaking() - async def _handle_audio(self, frame: AudioRawFrame): + async def _handle_audio(self, frame: OutputAudioRawFrame): if not self._params.audio_out_enabled: return @@ -200,12 +200,14 @@ async def _handle_audio(self, frame: AudioRawFrame): else: self._audio_buffer.extend(frame.audio) while len(self._audio_buffer) >= self._audio_chunk_size: - chunk = AudioRawFrame(bytes(self._audio_buffer[:self._audio_chunk_size]), - sample_rate=frame.sample_rate, num_channels=frame.num_channels) + chunk = OutputAudioRawFrame( + bytes(self._audio_buffer[:self._audio_chunk_size]), + sample_rate=frame.sample_rate, num_channels=frame.num_channels + ) await self._sink_queue.put(chunk) self._audio_buffer = self._audio_buffer[self._audio_chunk_size:] - async def _handle_image(self, frame: ImageRawFrame | SpriteFrame): + async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame): if not self._params.camera_out_enabled: return @@ -226,11 +228,11 @@ def _create_sink_tasks(self): self._sink_clock_task = loop.create_task(self._sink_clock_task_handler()) async def _sink_frame_handler(self, frame: Frame): - if isinstance(frame, AudioRawFrame): + if isinstance(frame, OutputAudioRawFrame): await self.write_raw_audio_frames(frame.audio) await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) - elif isinstance(frame, ImageRawFrame): + elif isinstance(frame, OutputImageRawFrame): await self._set_camera_image(frame) elif isinstance(frame, SpriteFrame): await self._set_camera_images(frame.images) @@ -305,10 +307,10 @@ async def _bot_stopped_speaking(self): # Camera out # - async def send_image(self, frame: ImageRawFrame | SpriteFrame): + async def send_image(self, frame: OutputImageRawFrame | SpriteFrame): await self.process_frame(frame, FrameDirection.DOWNSTREAM) - async def _draw_image(self, frame: ImageRawFrame): + async def _draw_image(self, frame: OutputImageRawFrame): desired_size = (self._params.camera_out_width, self._params.camera_out_height) if frame.size != desired_size: @@ -316,14 +318,17 @@ async def _draw_image(self, frame: ImageRawFrame): resized_image = image.resize(desired_size) logger.warning( f"{frame} does not have the expected size {desired_size}, resizing") - frame = ImageRawFrame(resized_image.tobytes(), resized_image.size, resized_image.format) + frame = OutputImageRawFrame( + resized_image.tobytes(), + resized_image.size, + resized_image.format) await self.write_frame_to_camera(frame) - async def _set_camera_image(self, image: ImageRawFrame): + async def _set_camera_image(self, image: OutputImageRawFrame): self._camera_images = itertools.cycle([image]) - async def _set_camera_images(self, images: List[ImageRawFrame]): + async def _set_camera_images(self, images: List[OutputImageRawFrame]): self._camera_images = itertools.cycle(images) async def _camera_out_task_handler(self): @@ -375,7 +380,7 @@ async def _camera_out_is_live_handler(self): # Audio out # - async def send_audio(self, frame: AudioRawFrame): + async def send_audio(self, frame: OutputAudioRawFrame): await self.process_frame(frame, FrameDirection.DOWNSTREAM) async def _audio_out_task_handler(self): diff --git a/src/pipecat/transports/local/audio.py b/src/pipecat/transports/local/audio.py index cd05550a9..45d18db52 100644 --- a/src/pipecat/transports/local/audio.py +++ b/src/pipecat/transports/local/audio.py @@ -8,7 +8,7 @@ from concurrent.futures import ThreadPoolExecutor -from pipecat.frames.frames import AudioRawFrame, StartFrame +from pipecat.frames.frames import InputAudioRawFrame, StartFrame from pipecat.processors.frame_processor import FrameProcessor from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport @@ -54,9 +54,9 @@ async def cleanup(self): self._in_stream.close() def _audio_in_callback(self, in_data, frame_count, time_info, status): - frame = AudioRawFrame(audio=in_data, - sample_rate=self._params.audio_in_sample_rate, - num_channels=self._params.audio_in_channels) + frame = InputAudioRawFrame(audio=in_data, + sample_rate=self._params.audio_in_sample_rate, + num_channels=self._params.audio_in_channels) asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop()) diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index e7dc04902..75dd30331 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -11,8 +11,7 @@ import numpy as np import tkinter as tk -from pipecat.frames.frames import AudioRawFrame, ImageRawFrame, StartFrame -from pipecat.processors.frame_processor import FrameProcessor +from pipecat.frames.frames import InputAudioRawFrame, OutputImageRawFrame, StartFrame from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams @@ -64,9 +63,9 @@ async def cleanup(self): self._in_stream.close() def _audio_in_callback(self, in_data, frame_count, time_info, status): - frame = AudioRawFrame(audio=in_data, - sample_rate=self._params.audio_in_sample_rate, - num_channels=self._params.audio_in_channels) + frame = InputAudioRawFrame(audio=in_data, + sample_rate=self._params.audio_in_sample_rate, + num_channels=self._params.audio_in_channels) asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop()) @@ -108,10 +107,10 @@ async def cleanup(self): async def write_raw_audio_frames(self, frames: bytes): await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames) - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): self.get_event_loop().call_soon(self._write_frame_to_tk, frame) - def _write_frame_to_tk(self, frame: ImageRawFrame): + def _write_frame_to_tk(self, frame: OutputImageRawFrame): width = frame.size[0] height = frame.size[1] data = f"P6 {width} {height} 255 ".encode() + frame.image diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 2a45adf36..d595aaceb 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -22,13 +22,14 @@ from pydantic.main import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, - ImageRawFrame, + InputAudioRawFrame, InterimTranscriptionFrame, MetricsFrame, + OutputAudioRawFrame, + OutputImageRawFrame, SpriteFrame, StartFrame, TranscriptionFrame, @@ -239,7 +240,7 @@ async def send_message(self, frame: TransportMessageFrame): completion=completion_callback(future)) await future - async def read_next_audio_frame(self) -> AudioRawFrame | None: + async def read_next_audio_frame(self) -> InputAudioRawFrame | None: if not self._speaker: return None @@ -252,7 +253,10 @@ async def read_next_audio_frame(self) -> AudioRawFrame | None: audio = await future if len(audio) > 0: - return AudioRawFrame(audio=audio, sample_rate=sample_rate, num_channels=num_channels) + return InputAudioRawFrame( + audio=audio, + sample_rate=sample_rate, + num_channels=num_channels) else: # If we don't read any audio it could be there's no participant # connected. daily-python will return immediately if that's the @@ -268,7 +272,7 @@ async def write_raw_audio_frames(self, frames: bytes): self._mic.write_frames(frames, completion=completion_callback(future)) await future - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): if not self._camera: return None @@ -749,7 +753,7 @@ async def send_metrics(self, frame: MetricsFrame): async def write_raw_audio_frames(self, frames: bytes): await self._client.write_raw_audio_frames(frames) - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): await self._client.write_frame_to_camera(frame) @@ -829,11 +833,11 @@ def output(self) -> DailyOutputTransport: def participant_id(self) -> str: return self._client.participant_id - async def send_image(self, frame: ImageRawFrame | SpriteFrame): + async def send_image(self, frame: OutputImageRawFrame | SpriteFrame): if self._output: await self._output.process_frame(frame, FrameDirection.DOWNSTREAM) - async def send_audio(self, frame: AudioRawFrame): + async def send_audio(self, frame: OutputAudioRawFrame): if self._output: await self._output.process_frame(frame, FrameDirection.DOWNSTREAM)