diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 257a6181e..747eccb63 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -9,7 +9,7 @@ import os import sys -import daily +from dataclasses import dataclass from pipecat.frames.frames import ( AppFrame, @@ -44,20 +44,13 @@ logger.add(sys.stderr, level="DEBUG") +@dataclass class MonthFrame(AppFrame): - def __init__(self, month): - super().__init__() - self.metadata["month"] = month - - @ property - def month(self) -> str: - return self.metadata["month"] + month: str def __str__(self): return f"{self.name}(month: {self.month})" - month: str - class MonthPrepender(FrameProcessor): def __init__(self): @@ -69,7 +62,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, MonthFrame): self.most_recent_month = frame.month elif self.prepend_to_next_text_frame and isinstance(frame, TextFrame): - await self.push_frame(TextFrame(f"{self.most_recent_month}: {frame.data}")) + await self.push_frame(TextFrame(f"{self.most_recent_month}: {frame.text}")) self.prepend_to_next_text_frame = False elif isinstance(frame, LLMResponseStartFrame): self.prepend_to_next_text_frame = True @@ -152,7 +145,7 @@ async def main(room_url): "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", } ] - frames.append(MonthFrame(month)) + frames.append(MonthFrame(month=month)) frames.append(LLMMessagesFrame(messages)) frames.append(EndFrame()) diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index 529317a90..a5629745b 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -61,7 +61,7 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, AudioRawFrame): - self.audio.extend(frame.data) + self.audio.extend(frame.audio) self.frame = AudioRawFrame( bytes(self.audio), frame.sample_rate, frame.num_channels) diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 6b7d219cd..73878976f 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -49,9 +49,9 @@ def __init__(self, speaking_path: str, waiting_path: str): async def process_frame(self, frame: Frame, direction: FrameDirection): if not isinstance(frame, SystemFrame): - await self.push_frame(ImageRawFrame(self._speaking_image_bytes, (1024, 1024), self._speaking_image_format)) + await self.push_frame(ImageRawFrame(image=self._speaking_image_bytes, size=(1024, 1024), format=self._speaking_image_format)) await self.push_frame(frame) - await self.push_frame(ImageRawFrame(self._waiting_image_bytes, (1024, 1024), self._waiting_image_format)) + await self.push_frame(ImageRawFrame(image=self._waiting_image_bytes, size=(1024, 1024), format=self._waiting_image_format)) else: await self.push_frame(frame) diff --git a/examples/foundational/08-bots-arguing.py b/examples/foundational/08-bots-arguing.py index 9e88b7f4c..3be829ba5 100644 --- a/examples/foundational/08-bots-arguing.py +++ b/examples/foundational/08-bots-arguing.py @@ -92,7 +92,7 @@ async def get_text_and_audio(messages) -> Tuple[str, bytearray]: if isinstance(frame, TextFrame): message += frame.text elif isinstance(frame, AudioFrame): - all_audio.extend(frame.data) + all_audio.extend(frame.audio) return (message, all_audio) diff --git a/examples/foundational/10-wake-word.py b/examples/foundational/10-wake-word.py index a89de5b13..430f76e8c 100644 --- a/examples/foundational/10-wake-word.py +++ b/examples/foundational/10-wake-word.py @@ -63,7 +63,7 @@ filename = os.path.splitext(os.path.basename(full_path))[0] # Open the image and convert it to bytes with Image.open(full_path) as img: - sprites[file] = ImageRawFrame(img.tobytes(), img.size, img.format) + sprites[file] = ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format) # When the bot isn't talking, show a static image of the cat listening quiet_frame = sprites["sc-listen-1.png"] @@ -99,7 +99,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # TODO: split up transcription by participant if isinstance(frame, TranscriptionFrame): - content = frame.data + content = frame.text self._sentence += content if self._sentence.endswith((".", "?", "!")): if any(name in self._sentence for name in self._names): diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 223a35746..3f6d16bbc 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -4,214 +4,195 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import Any, List +from typing import List, Tuple + +from dataclasses import dataclass, field from pipecat.utils.utils import obj_count, obj_id +@dataclass class Frame: - def __init__(self, data=None): + id: int = field(init=False) + name: str = field(init=False) + + def __post_init__(self): self.id: int = obj_id() - self.data: Any = data - self.metadata = {} self.name: str = f"{self.__class__.__name__}#{obj_count(self)}" def __str__(self): return self.name +@dataclass class DataFrame(Frame): - def __init__(self, data): - super().__init__(data) + pass +@dataclass class AudioRawFrame(DataFrame): - def __init__(self, data, sample_rate: int, num_channels: int): - super().__init__(data) - self.metadata["sample_rate"] = sample_rate - self.metadata["num_channels"] = num_channels - self.metadata["num_frames"] = int(len(data) / (num_channels * 2)) - - @property - def num_frames(self) -> int: - return self.metadata["num_frames"] + """A chunk of audio. Will be played by the transport if the transport's + microphone has been enabled. - @property - def sample_rate(self) -> int: - return self.metadata["sample_rate"] + """ + audio: bytes + sample_rate: int + num_channels: int - @property - def num_channels(self) -> int: - return self.metadata["num_channels"] + def __post_init__(self): + super().__post_init__() + self.num_frames = int(len(self.audio) / (self.num_channels * 2)) def __str__(self): - return f"{self.name}(frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})" + return f"{self.name}(size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})" +@dataclass class ImageRawFrame(DataFrame): - def __init__(self, data, size: tuple[int, int], format: str): - super().__init__(data) - self.metadata["size"] = size - self.metadata["format"] = format - - @property - def image(self) -> bytes: - return self.data - - @property - def size(self) -> tuple[int, int]: - return self.metadata["size"] + """An image. Will be shown by the transport if the transport's camera is + enabled. - @property - def format(self) -> str: - return self.metadata["format"] + """ + image: bytes + size: Tuple[int, int] + format: str def __str__(self): return f"{self.name}(size: {self.size}, format: {self.format})" +@dataclass class URLImageRawFrame(ImageRawFrame): - def __init__(self, url: str, data, size: tuple[int, int], format: str): - super().__init__(data, size, format) - self.metadata["url"] = url + """An image with an associated URL. Will be shown by the transport if the + transport's camera is enabled. - @property - def url(self) -> str: - return self.metadata["url"] + """ + url: str | None def __str__(self): return f"{self.name}(url: {self.url}, size: {self.size}, format: {self.format})" +@dataclass class VisionImageRawFrame(ImageRawFrame): - def __init__(self, text: str, data, size: tuple[int, int], format: str): - super().__init__(data, size, format) - self.metadata["text"] = text + """An image with an associated text to ask for a description of it. Will be + shown by the transport if the transport's camera is enabled. - @property - def text(self) -> str: - return self.metadata["text"] + """ + text: str | None def __str__(self): return f"{self.name}(text: {self.text}, size: {self.size}, format: {self.format})" +@dataclass class UserImageRawFrame(ImageRawFrame): - def __init__(self, user_id: str, data, size: tuple[int, int], format: str): - super().__init__(data, size, format) - self.metadata["user_id"] = user_id + """An image associated to a user. Will be shown by the transport if the + transport's camera is enabled. - @property - def user_id(self) -> str: - return self.metadata["user_id"] + """ + user_id: str def __str__(self): return f"{self.name}(user: {self.user_id}, size: {self.size}, format: {self.format})" +@dataclass class SpriteFrame(Frame): - def __init__(self, data): - super().__init__(data) + """An animated sprite. Will be shown by the transport if the transport's + camera is enabled. Will play at the framerate specified in the transport's + `fps` constructor parameter. - @property - def images(self) -> List[ImageRawFrame]: - return self.data + """ + images: List[ImageRawFrame] def __str__(self): return f"{self.name}(size: {len(self.images)})" +@dataclass class TextFrame(DataFrame): - def __init__(self, data): - super().__init__(data) + """A chunk of text. Emitted by LLM services, consumed by TTS services, can + be used to send text through pipelines. - @property - def text(self) -> str: - return self.data + """ + text: str + def __str__(self): + return f'{self.name}: "{self.text}"' -class TranscriptionFrame(TextFrame): - def __init__(self, data, user_id: str, timestamp: int): - super().__init__(data) - self.metadata["user_id"] = user_id - self.metadata["timestamp"] = timestamp - @property - def user_id(self) -> str: - return self.metadata["user_id"] +@dataclass +class TranscriptionFrame(TextFrame): + """A text frame with transcription-specific data. Will be placed in the + transport's receive queue when a participant speaks. - @property - def timestamp(self) -> str: - return self.metadata["timestamp"] + """ + user_id: str + timestamp: str def __str__(self): return f"{self.name}(user: {self.user_id}, timestamp: {self.timestamp})" +@dataclass class InterimTranscriptionFrame(TextFrame): - def __init__(self, data, user_id: str, timestamp: int): - super().__init__(data) - self.metadata["user_id"] = user_id - self.metadata["timestamp"] = timestamp - - @property - def user_id(self) -> str: - return self.metadata["user_id"] - - @property - def timestamp(self) -> str: - return self.metadata["timestamp"] + """A text frame with interim transcription-specific data. Will be placed in + the transport's receive queue when a participant speaks.""" + user_id: str + timestamp: str def __str__(self): return f"{self.name}(user: {self.user_id}, timestamp: {self.timestamp})" +@dataclass class LLMMessagesFrame(DataFrame): """A frame containing a list of LLM messages. Used to signal that an LLM - service should run a chat completion and emit an LLM started response event, - text frames and an LLM stopped response event. - """ + service should run a chat completion and emit an LLMStartFrames, TextFrames + and an LLMEndFrame. Note that the messages property on this class is + mutable, and will be be updated by various ResponseAggregator frame + processors. - def __init__(self, messages): - super().__init__(messages) + """ + messages: List[dict] # # App frames. Application user-defined frames. # +@dataclass class AppFrame(Frame): - def __init__(self, data=None): - super().__init__(data) - + pass # # System frames # + +@dataclass class SystemFrame(Frame): - def __init__(self, data=None): - super().__init__(data) + pass +@dataclass class StartFrame(SystemFrame): - def __init__(self): - super().__init__() + """This is the first frame that should be pushed down a pipeline.""" + pass +@dataclass class CancelFrame(SystemFrame): - def __init__(self): - super().__init__() + """Indicates that a pipeline needs to stop right away.""" + pass +@dataclass class ErrorFrame(SystemFrame): - def __init__(self, data): - super().__init__(data) - self.metadata["error"] = data - - @property - def error(self) -> str: - return self.metadata["error"] + """This is used notify upstream that an error has occurred downstream the + pipeline.""" + error: str | None def __str__(self): return f"{self.name}(error: {self.error})" @@ -221,247 +202,75 @@ def __str__(self): # +@dataclass class ControlFrame(Frame): - def __init__(self, data=None): - super().__init__(data) + pass +@dataclass class EndFrame(ControlFrame): - def __init__(self): - super().__init__() + """Indicates that a pipeline has ended and frame processors and pipelines + should be shut down. If the transport receives this frame, it will stop + sending frames to its output channel(s) and close all its threads. Note, + that this is a control frame, which means it will received in the order it + was sent (unline system frames). + + """ + pass +@dataclass class LLMResponseStartFrame(ControlFrame): """Used to indicate the beginning of an LLM response. Following TextFrames are part of the LLM response until an LLMResponseEndFrame""" - - def __init__(self): - super().__init__() + pass +@dataclass class LLMResponseEndFrame(ControlFrame): """Indicates the end of an LLM response.""" - - def __init__(self): - super().__init__() + pass +@dataclass class UserStartedSpeakingFrame(ControlFrame): - def __init__(self): - super().__init__() + """Emitted by VAD to indicate that a user has started speaking. This can be + used for interruptions or other times when detecting that someone is + speaking is more important than knowing what they're saying (as you will + with a TranscriptionFrame) + """ + pass + +@dataclass class UserStoppedSpeakingFrame(ControlFrame): - def __init__(self): - super().__init__() + """Emitted by the VAD to indicate that a user stopped speaking.""" + pass +@dataclass class TTSStartedFrame(ControlFrame): - def __init__(self): - super().__init__() + """Used to indicate the beginning of a TTS response. Following + AudioRawFrames are part of the TTS response until an TTSEndFrame. These + frames can be used for aggregating audio frames in a transport to optimize + the size of frames sent to the session, without needing to control this in + the TTS service. + + """ + pass +@dataclass class TTSStoppedFrame(ControlFrame): - def __init__(self): - super().__init__() + """Indicates the end of a TTS response.""" + pass +@dataclass class UserImageRequestFrame(ControlFrame): - def __init__(self, user_id): - super().__init__() - self.metadata["user_id"] = user_id - - @property - def user_id(self) -> str: - return self.metadata["user_id"] + """A frame user to request an image from the given user.""" + user_id: str def __str__(self): return f"{self.name}, user: {self.user_id}" - - -# class StartFrame(ControlFrame): -# """Used (but not required) to start a pipeline, and is also used to -# indicate that an interruption has ended and the transport should start -# processing frames again.""" -# pass - - -# class EndFrame(ControlFrame): -# """Indicates that a pipeline has ended and frame processors and pipelines -# should be shut down. If the transport receives this frame, it will stop -# sending frames to its output channel(s) and close all its threads.""" -# pass - - -# class EndPipeFrame(ControlFrame): -# """Indicates that a pipeline has ended but that the transport should -# continue processing. This frame is used in parallel pipelines and other -# sub-pipelines.""" -# pass - - -# class PipelineStartedFrame(ControlFrame): -# """ -# Used by the transport to indicate that execution of a pipeline is starting -# (or restarting). It should be the first frame your app receives when it -# starts, or when an interruptible pipeline has been interrupted. -# """ - -# pass - - -# @dataclass() -# class URLImageFrame(ImageFrame): -# """An image with an associated URL. Will be shown by the transport if the -# transport's camera is enabled. - -# """ -# url: str | None - -# def __init__(self, url, image, size): -# super().__init__(image, size) -# self.url = url - -# def __str__(self): -# return f"{self.__class__.__name__}, url: {self.url}, image size: -# {self.size[0]}x{self.size[1]}, buffer size: {len(self.image)} B" - - -# @dataclass() -# class VisionImageFrame(ImageFrame): -# """An image with an associated text to ask for a description of it. Will be shown by the -# transport if the transport's camera is enabled. - -# """ -# text: str | None - -# def __init__(self, text, image, size): -# super().__init__(image, size) -# self.text = text - -# def __str__(self): -# return f"{self.__class__.__name__}, text: {self.text}, image size: -# {self.size[0]}x{self.size[1]}, buffer size: {len(self.image)} B" - - -# @dataclass() -# class UserImageFrame(ImageFrame): -# """An image associated to a user. Will be shown by the transport if the transport's camera is -# enabled.""" -# user_id: str - -# def __init__(self, user_id, image, size): -# super().__init__(image, size) -# self.user_id = user_id - -# def __str__(self): -# return f"{self.__class__.__name__}, user: {self.user_id}, image size: -# {self.size[0]}x{self.size[1]}, buffer size: {len(self.image)} B" - - -# @dataclass() -# class UserImageRequestFrame(Frame): -# """A frame user to request an image from the given user.""" -# user_id: str - -# def __str__(self): -# return f"{self.__class__.__name__}, user: {self.user_id}" - - -# @dataclass() -# class SpriteFrame(Frame): -# """An animated sprite. Will be shown by the transport if the transport's -# camera is enabled. Will play at the framerate specified in the transport's -# `fps` constructor parameter.""" -# images: list[bytes] - -# def __str__(self): -# return f"{self.__class__.__name__}, list size: {len(self.images)}" - - -# @dataclass() -# class TextFrame(Frame): -# """A chunk of text. Emitted by LLM services, consumed by TTS services, can -# be used to send text through pipelines.""" -# text: str - -# def __str__(self): -# return f'{self.__class__.__name__}: "{self.text}"' - - -# class TTSStartFrame(ControlFrame): -# """Used to indicate the beginning of a TTS response. Following AudioFrames -# are part of the TTS response until an TTEndFrame. These frames can be used -# for aggregating audio frames in a transport to optimize the size of frames -# sent to the session, without needing to control this in the TTS service.""" -# pass - - -# class TTSEndFrame(ControlFrame): -# """Indicates the end of a TTS response.""" -# pass - - -# @dataclass() -# class LLMMessagesFrame(Frame): -# """A frame containing a list of LLM messages. Used to signal that an LLM -# service should run a chat completion and emit an LLMStartFrames, TextFrames -# and an LLMEndFrame. -# Note that the messages property on this class is mutable, and will be -# be updated by various ResponseAggregator frame processors.""" -# messages: List[dict] - - -# @dataclass() -# class ReceivedAppMessageFrame(Frame): -# message: Any -# sender: str - -# def __str__(self): -# return f"ReceivedAppMessageFrame: sender: {self.sender}, message: {self.message}" - - -# @dataclass() -# class SendAppMessageFrame(Frame): -# message: Any -# participant_id: str | None - -# def __str__(self): -# return f"SendAppMessageFrame: participant: {self.participant_id}, message: {self.message}" - - -# class UserStartedSpeakingFrame(Frame): -# """Emitted by VAD to indicate that a participant has started speaking. -# This can be used for interruptions or other times when detecting that -# someone is speaking is more important than knowing what they're saying -# (as you will with a TranscriptionFrame)""" -# pass - - -# class UserStoppedSpeakingFrame(Frame): -# """Emitted by the VAD to indicate that a user stopped speaking.""" -# pass - - -# class BotStartedSpeakingFrame(Frame): -# pass - - -# class BotStoppedSpeakingFrame(Frame): -# pass - - -# @dataclass() -# class LLMFunctionStartFrame(Frame): -# """Emitted when the LLM receives the beginning of a function call -# completion. A frame processor can use this frame to indicate that it should -# start preparing to make a function call, if it can do so in the absence of -# any arguments.""" -# function_name: str - - -# @dataclass() -# class LLMFunctionCallFrame(Frame): -# """Emitted when the LLM has received an entire function call completion.""" -# function_name: str -# arguments: str diff --git a/src/pipecat/frames/openai_frames.py b/src/pipecat/frames/openai_frames.py deleted file mode 100644 index e5e47b222..000000000 --- a/src/pipecat/frames/openai_frames.py +++ /dev/null @@ -1,15 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -from pipecat.frames.frames import Frame - - -class OpenAILLMContextFrame(Frame): - """Like an LLMMessagesFrame, but with extra context specific to the - OpenAI API.""" - - def __init__(self, data): - super().__init__(data) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 9dc0e8862..c2ecd9385 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -26,8 +26,8 @@ def __init__( role: str, start_frame, end_frame, - accumulator_frame, - interim_accumulator_frame=None + accumulator_frame: TextFrame, + interim_accumulator_frame: TextFrame | None = None ): super().__init__() @@ -86,7 +86,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): send_aggregation = not self._aggregating elif isinstance(frame, self._accumulator_frame): if self._aggregating: - self._aggregation += f" {frame.data}" + self._aggregation += f" {frame.text}" # We have recevied a complete sentence, so if we have seen the # end frame and we were still aggregating, it means we should # send the aggregation. @@ -181,7 +181,7 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): - self._aggregation += frame.data + self._aggregation += frame.text elif isinstance(frame, LLMResponseEndFrame): await self.push_frame(TextFrame(self._aggregation)) await self.push_frame(frame) diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index 4e706de18..8aa827099 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -4,6 +4,8 @@ # SPDX-License-Identifier: BSD 2-Clause License # +from dataclasses import dataclass + from typing import AsyncGenerator, Callable, List from pipecat.frames.frames import ( @@ -15,7 +17,6 @@ UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) -from pipecat.frames.openai_frames import OpenAILLMContextFrame from pipecat.processors.frame_processor import FrameProcessor from openai._types import NOT_GIVEN, NotGiven @@ -162,3 +163,13 @@ def __init__(self, context: OpenAILLMContext): accumulator_frame=TextFrame, pass_through=True, ) + + +@dataclass +class OpenAILLMContextFrame(Frame): + """Like an LLMMessagesFrame, but with extra context specific to the OpenAI + API. The context in this message is also mutable, and will be changed by the + OpenAIContextAggregator frame processor. + + """ + context: OpenAILLMContext diff --git a/src/pipecat/processors/aggregators/sentence.py b/src/pipecat/processors/aggregators/sentence.py index cf5efdb5e..266c11e69 100644 --- a/src/pipecat/processors/aggregators/sentence.py +++ b/src/pipecat/processors/aggregators/sentence.py @@ -36,12 +36,12 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): - m = re.search("(.*[?.!])(.*)", frame.data) + m = re.search("(.*[?.!])(.*)", frame.text) if m: await self.push_frame(TextFrame(self._aggregation + m.group(1))) self._aggregation = m.group(2) else: - self._aggregation += frame.data + self._aggregation += frame.text elif isinstance(frame, EndFrame): if self._aggregation: await self.push_frame(TextFrame(self._aggregation)) diff --git a/src/pipecat/processors/aggregators/user_response.py b/src/pipecat/processors/aggregators/user_response.py index 8804d5b8d..e112fd3e4 100644 --- a/src/pipecat/processors/aggregators/user_response.py +++ b/src/pipecat/processors/aggregators/user_response.py @@ -47,8 +47,8 @@ def __init__( *, start_frame, end_frame, - accumulator_frame, - interim_accumulator_frame=None + accumulator_frame: TextFrame, + interim_accumulator_frame: TextFrame | None = None ): super().__init__() @@ -102,7 +102,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): send_aggregation = not self._aggregating elif isinstance(frame, self._accumulator_frame): if self._aggregating: - self._aggregation += f" {frame.data}" + self._aggregation += f" {frame.text}" # We have recevied a complete sentence, so if we have seen the # end frame and we were still aggregating, it means we should # send the aggregation. diff --git a/src/pipecat/processors/aggregators/vision_image_frame.py b/src/pipecat/processors/aggregators/vision_image_frame.py index c5350665f..45fd6756b 100644 --- a/src/pipecat/processors/aggregators/vision_image_frame.py +++ b/src/pipecat/processors/aggregators/vision_image_frame.py @@ -35,7 +35,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, ImageRawFrame): if self._describe_text: frame = VisionImageRawFrame( - self._describe_text, frame.image, frame.size, frame.format) + text=self._describe_text, + image=frame.image, + size=frame.size, + format=frame.format) await self.push_frame(frame) self._describe_text = None else: diff --git a/src/pipecat/processors/text_transformer.py b/src/pipecat/processors/text_transformer.py index a71bb2f16..550b8dc1c 100644 --- a/src/pipecat/processors/text_transformer.py +++ b/src/pipecat/processors/text_transformer.py @@ -28,7 +28,7 @@ def __init__(self, transform_fn): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): - result = self._transform_fn(frame.data) + result = self._transform_fn(frame.text) if isinstance(result, Coroutine): result = await result await self.push_frame(result) diff --git a/src/pipecat/processors/utils/audio.py b/src/pipecat/processors/utils/audio.py index 0e657efd1..cb7c17052 100644 --- a/src/pipecat/processors/utils/audio.py +++ b/src/pipecat/processors/utils/audio.py @@ -12,10 +12,14 @@ def maybe_split_audio_frame(frame: AudioRawFrame, largest_write_size: int) -> List[AudioRawFrame]: """Subdivide large audio frames to enable interruption.""" frames: List[AudioRawFrame] = [] - if len(frame.data) > largest_write_size: - for i in range(0, len(frame.data), largest_write_size): - chunk = frame.data[i: i + largest_write_size] - frames.append(AudioRawFrame(chunk, frame.sample_rate, frame.num_channels)) + if len(frame.audio) > largest_write_size: + for i in range(0, len(frame.audio), largest_write_size): + chunk = frame.audio[i: i + largest_write_size] + frames.append( + AudioRawFrame( + audio=chunk, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels)) else: frames.append(frame) return frames diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index f9946ced9..cd8d3e09a 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -46,14 +46,14 @@ async def run_tts(self, text: str): pass async def say(self, text: str): - await self.process_frame(TextFrame(text), FrameDirection.DOWNSTREAM) + await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM) async def _process_text_frame(self, frame: TextFrame): text: str | None = None if not self._aggregate_sentences: - text = frame.data + text = frame.text else: - self._current_sentence += frame.data + self._current_sentence += frame.text if self._current_sentence.strip().endswith((".", "?", "!")): text = self._current_sentence self._current_sentence = "" @@ -78,11 +78,13 @@ class STTService(AIService): def __init__(self, min_rms: int = 400, max_silence_frames: int = 3, - sample_rate: int = 16000): + sample_rate: int = 16000, + num_channels: int = 1): super().__init__() self._min_rms = min_rms self._max_silence_frames = max_silence_frames self._sample_rate = sample_rate + self._num_channels = num_channels self._current_silence_frames = 0 (self._content, self._wave) = self._new_wave() @@ -94,8 +96,8 @@ async def run_stt(self, audio: BinaryIO): def _new_wave(self): content = io.BufferedRandom(io.BytesIO()) ww = wave.open(content, "wb") - ww.setnchannels(1) ww.setsampwidth(2) + ww.setnchannels(self._num_channels) ww.setframerate(self._sample_rate) return (content, ww) @@ -113,14 +115,14 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.push_frame(frame, direction) return - data = frame.data + audio = frame.audio # Try to filter out empty background noise # (Very rudimentary approach, can be improved) - rms = self._get_volume(data) + rms = self._get_volume(audio) if rms >= self._min_rms: # If volume is high enough, write new data to wave file - self._wave.writeframesraw(data) + self._wave.writeframes(audio) # If buffer is not empty and we detect a 3-frame pause in speech, # transcribe the audio gathered so far. @@ -146,7 +148,7 @@ async def run_image_gen(self, prompt: str): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): - await self.run_image_gen(frame.data) + await self.run_image_gen(frame.text) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 42de548a1..6c1884037 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -32,5 +32,5 @@ async def run_tts(self, text: str): body = {"text": text} async with self._aiohttp_session.post(request_url, headers=headers, json=body) as r: async for data in r.content: - frame = AudioRawFrame(data, 16000, 1) + frame = AudioRawFrame(audio=data, sample_rate=16000, num_channels=1) await self.push_frame(frame) diff --git a/src/pipecat/services/fal.py b/src/pipecat/services/fal.py index ca58f0337..da0af2b89 100644 --- a/src/pipecat/services/fal.py +++ b/src/pipecat/services/fal.py @@ -75,5 +75,9 @@ async def run_image_gen(self, prompt: str): image_stream = io.BytesIO(await response.content.read()) image = Image.open(image_stream) - frame = URLImageRawFrame(image_url, image.tobytes(), image.size, image.format) + frame = URLImageRawFrame( + url=image_url, + image=image.tobytes(), + size=image.size, + format=image.format) await self.push_frame(frame) diff --git a/src/pipecat/services/moondream.py b/src/pipecat/services/moondream.py index e069c98ed..6922f2f1c 100644 --- a/src/pipecat/services/moondream.py +++ b/src/pipecat/services/moondream.py @@ -69,7 +69,7 @@ async def run_vision(self, frame: VisionImageRawFrame): logger.debug(f"Analyzing image: {frame}") def get_image_description(frame: VisionImageRawFrame): - image = Image.frombytes(frame.format, (frame.size[0], frame.size[1]), frame.data) + image = Image.frombytes(frame.format, (frame.size[0], frame.size[1]), frame.image) image_embeds = self._model.encode_image(image) description = self._model.answer_question( image_embeds=image_embeds, @@ -79,4 +79,4 @@ def get_image_description(frame: VisionImageRawFrame): description = await asyncio.to_thread(get_image_description, frame) - await self.push_frame(TextFrame(description)) + await self.push_frame(TextFrame(text=description)) diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 50b8b1478..7fa88e16e 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -14,8 +14,7 @@ TextFrame, URLImageRawFrame ) -from pipecat.frames.openai_frames import OpenAILLMContextFrame -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService, ImageGenService @@ -137,9 +136,9 @@ async def _process_context(self, context: OpenAILLMContext): async def process_frame(self, frame: Frame, direction: FrameDirection): context = None if isinstance(frame, OpenAILLMContextFrame): - context: OpenAILLMContext = frame.data + context: OpenAILLMContext = frame.context elif isinstance(frame, LLMMessagesFrame): - context = OpenAILLMContext.from_messages(frame.data) + context = OpenAILLMContext.from_messages(frame.messages) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/whisper.py b/src/pipecat/services/whisper.py index e0d14e903..e43bf0b65 100644 --- a/src/pipecat/services/whisper.py +++ b/src/pipecat/services/whisper.py @@ -54,11 +54,10 @@ def _load(self): """Loads the Whisper model. Note that if this is the first time this model is being run, it will take time to download.""" logger.debug("Loading Whisper model...") - model = WhisperModel( + self._model = WhisperModel( self._model_name.value, device=self._device, compute_type=self._compute_type) - self._model = model logger.debug("Loaded Whisper model") async def run_stt(self, audio: BinaryIO): diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 0ae2e7508..c21d58e23 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -108,7 +108,10 @@ def _audio_in_thread_handler(self): try: audio_frames = self.read_raw_audio_frames(num_frames) if len(audio_frames) > 0: - frame = AudioRawFrame(audio_frames, sample_rate, num_channels) + frame = AudioRawFrame( + audio=audio_frames, + sample_rate=sample_rate, + num_channels=num_channels) self._audio_in_queue.put(frame) except BaseException as e: logger.error(f"Error reading audio frames: {e}") @@ -124,7 +127,7 @@ def _audio_out_thread_handler(self): # Check VAD and push event if necessary. We just care about changes # from QUIET to SPEAKING and vice versa. if self._params.vad_enabled: - vad_state = self._handle_vad(frame.data, vad_state) + vad_state = self._handle_vad(frame.audio, vad_state) audio_passthrough = self._params.vad_audio_passthrough # Push audio downstream if passthrough. diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index f104e36af..396e69233 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -115,7 +115,7 @@ def _sink_thread_handler(self): future.result() elif isinstance(frame, AudioRawFrame): if self._params.audio_out_enabled: - buffer.extend(frame.data) + buffer.extend(frame.audio) buffer = self._send_audio_truncated(buffer, bytes_size_10ms) elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled: self._set_camera_image(frame) diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index 3d5ea9650..5a32e85d7 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -92,7 +92,7 @@ async def cleanup(self): async def _write_frame_to_tk(self, frame: ImageRawFrame): width = frame.size[0] height = frame.size[1] - data = f"P6 {width} {height} 255 ".encode() + frame.data + data = f"P6 {width} {height} 255 ".encode() + frame.image photo = tk.PhotoImage( width=width, height=height, diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 0343c53d8..eb94a592a 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -176,7 +176,7 @@ def write_raw_audio_frames(self, frames: bytes): self._mic.write_frames(frames) def write_frame_to_camera(self, frame: ImageRawFrame): - self._camera.write_frame(frame.data) + self._camera.write_frame(frame.image) async def join(self): # Transport already joined, ignore. @@ -498,7 +498,11 @@ def _on_participant_video_frame(self, participant_id: str, buffer, size, format) render_frame = True if render_frame: - frame = UserImageRawFrame(participant_id, buffer, size, format) + frame = UserImageRawFrame( + user_id=participant_id, + image=buffer, + size=size, + format=format) self._camera_in_queue.put(frame) self._video_renderers[participant_id]["timestamp"] = curr_time diff --git a/src/pipecat/vad/silero.py b/src/pipecat/vad/silero.py index f2438b085..cddfb9bf6 100644 --- a/src/pipecat/vad/silero.py +++ b/src/pipecat/vad/silero.py @@ -89,7 +89,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): async def _analyze_audio(self, frame: AudioRawFrame): # Check VAD and push event if necessary. We just care about changes # from QUIET to SPEAKING and vice versa. - new_vad_state = self.analyze_audio(frame.data) + new_vad_state = self.analyze_audio(frame.audio) if new_vad_state != self._processor_vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING: new_frame = None