From 7e39d9ad3d8ee7b38cc32911cb8e97b64a42a8c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 19 Sep 2024 19:31:56 -0700 Subject: [PATCH] introduce input/output audio and image frames We now distinguish between input and output audio and image frames. We introduce `InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame` and `OutputImageRawFrame` (and other subclasses of those). The input frames usually come from an input transport and are meant to be processed inside the pipeline to generate new frames. However, the input frames will not be sent through an output transport. The output frames can also be processed by any frame processor in the pipeline and they are allowed to be sent by the output transport. --- CHANGELOG.md | 9 ++ examples/dialin-chatbot/requirements.txt | 2 +- .../05a-local-sync-speech-and-image.py | 12 +- examples/foundational/06a-image-sync.py | 13 +- examples/foundational/09-mirror.py | 25 +++- examples/foundational/09a-local-mirror.py | 23 +++- examples/foundational/11-sound-effects.py | 6 +- examples/moondream-chatbot/bot.py | 11 +- examples/moondream-chatbot/requirements.txt | 2 +- examples/patient-intake/bot.py | 7 +- examples/patient-intake/requirements.txt | 2 +- examples/simple-chatbot/bot.py | 12 +- examples/simple-chatbot/requirements.txt | 2 +- .../storytelling-chatbot/requirements.txt | 2 +- .../storytelling-chatbot/src/utils/helpers.py | 11 +- examples/twilio-chatbot/README.md | 2 +- examples/twilio-chatbot/bot.py | 118 +++++++++--------- examples/twilio-chatbot/requirements.txt | 2 +- examples/websocket-server/bot.py | 106 ++++++++-------- examples/websocket-server/frames.proto | 1 + examples/websocket-server/requirements.txt | 2 +- src/pipecat/frames/frames.proto | 1 + src/pipecat/frames/frames.py | 66 +++++++--- src/pipecat/frames/protobufs/frames_pb2.py | 12 +- .../processors/aggregators/llm_response.py | 6 +- .../aggregators/openai_llm_context.py | 6 +- .../aggregators/vision_image_frame.py | 12 +- .../processors/gstreamer/pipeline_source.py | 12 +- src/pipecat/serializers/livekit.py | 13 +- src/pipecat/serializers/protobuf.py | 24 ++-- src/pipecat/serializers/twilio.py | 9 +- src/pipecat/services/ai_services.py | 3 +- src/pipecat/services/azure.py | 4 +- src/pipecat/services/cartesia.py | 6 +- src/pipecat/services/deepgram.py | 5 +- src/pipecat/services/elevenlabs.py | 4 +- src/pipecat/services/lmnt.py | 4 +- src/pipecat/services/openai.py | 4 +- src/pipecat/services/playht.py | 8 +- src/pipecat/services/together.py | 12 +- src/pipecat/services/xtts.py | 6 +- src/pipecat/transports/base_input.py | 6 +- src/pipecat/transports/base_output.py | 39 +++--- src/pipecat/transports/local/audio.py | 8 +- src/pipecat/transports/local/tk.py | 13 +- .../transports/network/fastapi_websocket.py | 18 ++- .../transports/network/websocket_server.py | 9 +- src/pipecat/transports/services/daily.py | 20 +-- 48 files changed, 425 insertions(+), 275 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 985834aba..1a89a97b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- We now distinguish between input and output audio and image frames. We + introduce `InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame` + and `OutputImageRawFrame` (and other subclasses of those). The input frames + usually come from an input transport and are meant to be processed inside the + pipeline to generate new frames. However, the input frames will not be sent + through an output transport. The output frames can also be processed by any + frame processor in the pipeline and they are allowed to be sent by the output + transport. + - `ParallelTask` has been renamed to `SyncParallelPipeline`. A `SyncParallelPipeline` is a frame processor that contains a list of different pipelines to be executed concurrently. The difference between a diff --git a/examples/dialin-chatbot/requirements.txt b/examples/dialin-chatbot/requirements.txt index e59a9c3d2..1e15004b1 100644 --- a/examples/dialin-chatbot/requirements.txt +++ b/examples/dialin-chatbot/requirements.txt @@ -1,4 +1,4 @@ -pipecat-ai[daily,openai,silero] +pipecat-ai[daily,elevenlabs,openai,silero] fastapi uvicorn python-dotenv diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index 66a6e7f57..d9a0e792e 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -11,7 +11,13 @@ import tkinter as tk -from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame +from pipecat.frames.frames import ( + Frame, + OutputAudioRawFrame, + TTSAudioRawFrame, + URLImageRawFrame, + LLMMessagesFrame, + TextFrame) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline @@ -65,9 +71,9 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): + if isinstance(frame, TTSAudioRawFrame): self.audio.extend(frame.audio) - self.frame = AudioRawFrame( + self.frame = OutputAudioRawFrame( bytes(self.audio), frame.sample_rate, frame.num_channels) class ImageGrabber(FrameProcessor): diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 6b3e58cf7..db48b709e 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -11,7 +11,7 @@ from PIL import Image -from pipecat.frames.frames import ImageRawFrame, Frame, SystemFrame, TextFrame +from pipecat.frames.frames import Frame, OutputImageRawFrame, SystemFrame, TextFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask @@ -52,9 +52,16 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM: - await self.push_frame(ImageRawFrame(image=self._speaking_image_bytes, size=(1024, 1024), format=self._speaking_image_format)) + await self.push_frame(OutputImageRawFrame( + image=self._speaking_image_bytes, + size=(1024, 1024), + format=self._speaking_image_format) + ) await self.push_frame(frame) - await self.push_frame(ImageRawFrame(image=self._waiting_image_bytes, size=(1024, 1024), format=self._waiting_image_format)) + await self.push_frame(OutputImageRawFrame( + image=self._waiting_image_bytes, + size=(1024, 1024), + format=self._waiting_image_format)) else: await self.push_frame(frame) diff --git a/examples/foundational/09-mirror.py b/examples/foundational/09-mirror.py index 8f5f1073b..bb6253deb 100644 --- a/examples/foundational/09-mirror.py +++ b/examples/foundational/09-mirror.py @@ -8,9 +8,11 @@ import asyncio import sys +from pipecat.frames.frames import Frame, InputAudioRawFrame, InputImageRawFrame, OutputAudioRawFrame, OutputImageRawFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.services.daily import DailyTransport, DailyParams from runner import configure @@ -24,6 +26,27 @@ logger.add(sys.stderr, level="DEBUG") +class MirrorProcessor(FrameProcessor): + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, InputAudioRawFrame): + await self.push_frame(OutputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels) + ) + elif isinstance(frame, InputImageRawFrame): + await self.push_frame(OutputImageRawFrame( + image=frame.image, + size=frame.size, + format=frame.format) + ) + else: + await self.push_frame(frame, direction) + + async def main(): async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) @@ -44,7 +67,7 @@ async def main(): async def on_first_participant_joined(transport, participant): transport.capture_participant_video(participant["id"]) - pipeline = Pipeline([transport.input(), transport.output()]) + pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()]) runner = PipelineRunner() diff --git a/examples/foundational/09a-local-mirror.py b/examples/foundational/09a-local-mirror.py index d657a3631..afc77470d 100644 --- a/examples/foundational/09a-local-mirror.py +++ b/examples/foundational/09a-local-mirror.py @@ -10,9 +10,11 @@ import tkinter as tk +from pipecat.frames.frames import Frame, InputAudioRawFrame, InputImageRawFrame, OutputAudioRawFrame, OutputImageRawFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.base_transport import TransportParams from pipecat.transports.local.tk import TkLocalTransport from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -27,6 +29,25 @@ logger.remove(0) logger.add(sys.stderr, level="DEBUG") +class MirrorProcessor(FrameProcessor): + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, InputAudioRawFrame): + await self.push_frame(OutputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels) + ) + elif isinstance(frame, InputImageRawFrame): + await self.push_frame(OutputImageRawFrame( + image=frame.image, + size=frame.size, + format=frame.format) + ) + else: + await self.push_frame(frame, direction) async def main(): async with aiohttp.ClientSession() as session: @@ -52,7 +73,7 @@ async def main(): async def on_first_participant_joined(transport, participant): transport.capture_participant_video(participant["id"]) - pipeline = Pipeline([daily_transport.input(), tk_transport.output()]) + pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()]) task = PipelineTask(pipeline) diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 9dc4dc99b..21b03bedf 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -12,9 +12,9 @@ from pipecat.frames.frames import ( Frame, - AudioRawFrame, LLMFullResponseEndFrame, LLMMessagesFrame, + OutputAudioRawFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -53,8 +53,8 @@ filename = os.path.splitext(os.path.basename(full_path))[0] # Open the image and convert it to bytes with wave.open(full_path) as audio_file: - sounds[file] = AudioRawFrame(audio_file.readframes(-1), - audio_file.getframerate(), audio_file.getnchannels()) + sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1), + audio_file.getframerate(), audio_file.getnchannels()) class OutboundSoundEffectWrapper(FrameProcessor): diff --git a/examples/moondream-chatbot/bot.py b/examples/moondream-chatbot/bot.py index 32bbf7e6b..d14a5f016 100644 --- a/examples/moondream-chatbot/bot.py +++ b/examples/moondream-chatbot/bot.py @@ -13,10 +13,11 @@ from pipecat.frames.frames import ( ImageRawFrame, + OutputImageRawFrame, SpriteFrame, Frame, LLMMessagesFrame, - AudioRawFrame, + TTSAudioRawFrame, TTSStoppedFrame, TextFrame, UserImageRawFrame, @@ -59,7 +60,11 @@ # Get the filename without the extension to use as the dictionary key # Open the image and convert it to bytes with Image.open(full_path) as img: - sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)) + sprites.append(OutputImageRawFrame( + image=img.tobytes(), + size=img.size, + format=img.format) + ) flipped = sprites[::-1] sprites.extend(flipped) @@ -82,7 +87,7 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): + if isinstance(frame, TTSAudioRawFrame): if not self._is_talking: await self.push_frame(talking_frame) self._is_talking = True diff --git a/examples/moondream-chatbot/requirements.txt b/examples/moondream-chatbot/requirements.txt index 11132e136..08fd27cb7 100644 --- a/examples/moondream-chatbot/requirements.txt +++ b/examples/moondream-chatbot/requirements.txt @@ -1,4 +1,4 @@ python-dotenv fastapi[all] uvicorn -pipecat-ai[daily,moondream,openai,silero] +pipecat-ai[daily,cartesia,moondream,openai,silero] diff --git a/examples/patient-intake/bot.py b/examples/patient-intake/bot.py index 7dc404c52..33ca9e26d 100644 --- a/examples/patient-intake/bot.py +++ b/examples/patient-intake/bot.py @@ -10,7 +10,7 @@ import sys import wave -from pipecat.frames.frames import AudioRawFrame +from pipecat.frames.frames import OutputAudioRawFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -49,8 +49,9 @@ filename = os.path.splitext(os.path.basename(full_path))[0] # Open the sound and convert it to bytes with wave.open(full_path) as audio_file: - sounds[file] = AudioRawFrame(audio_file.readframes(-1), - audio_file.getframerate(), audio_file.getnchannels()) + sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1), + audio_file.getframerate(), + audio_file.getnchannels()) class IntakeProcessor: diff --git a/examples/patient-intake/requirements.txt b/examples/patient-intake/requirements.txt index a7a8729df..e8bfcd8e4 100644 --- a/examples/patient-intake/requirements.txt +++ b/examples/patient-intake/requirements.txt @@ -1,4 +1,4 @@ python-dotenv fastapi[all] uvicorn -pipecat-ai[daily,openai,silero] +pipecat-ai[daily,cartesia,openai,silero] diff --git a/examples/simple-chatbot/bot.py b/examples/simple-chatbot/bot.py index 1664e47fb..f179dfeb5 100644 --- a/examples/simple-chatbot/bot.py +++ b/examples/simple-chatbot/bot.py @@ -16,11 +16,11 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator from pipecat.frames.frames import ( - AudioRawFrame, - ImageRawFrame, + OutputImageRawFrame, SpriteFrame, Frame, LLMMessagesFrame, + TTSAudioRawFrame, TTSStoppedFrame ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -49,7 +49,11 @@ # Get the filename without the extension to use as the dictionary key # Open the image and convert it to bytes with Image.open(full_path) as img: - sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)) + sprites.append(OutputImageRawFrame( + image=img.tobytes(), + size=img.size, + format=img.format) + ) flipped = sprites[::-1] sprites.extend(flipped) @@ -72,7 +76,7 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): + if isinstance(frame, TTSAudioRawFrame): if not self._is_talking: await self.push_frame(talking_frame) self._is_talking = True diff --git a/examples/simple-chatbot/requirements.txt b/examples/simple-chatbot/requirements.txt index a7a8729df..a4e6aa1db 100644 --- a/examples/simple-chatbot/requirements.txt +++ b/examples/simple-chatbot/requirements.txt @@ -1,4 +1,4 @@ python-dotenv fastapi[all] uvicorn -pipecat-ai[daily,openai,silero] +pipecat-ai[daily,elevenlabs,openai,silero] diff --git a/examples/storytelling-chatbot/requirements.txt b/examples/storytelling-chatbot/requirements.txt index 663f78a76..0cebe6edb 100644 --- a/examples/storytelling-chatbot/requirements.txt +++ b/examples/storytelling-chatbot/requirements.txt @@ -2,4 +2,4 @@ async_timeout fastapi uvicorn python-dotenv -pipecat-ai[daily,openai,fal] +pipecat-ai[daily,elevenlabs,openai,fal] diff --git a/examples/storytelling-chatbot/src/utils/helpers.py b/examples/storytelling-chatbot/src/utils/helpers.py index 2c576fdff..743a04c97 100644 --- a/examples/storytelling-chatbot/src/utils/helpers.py +++ b/examples/storytelling-chatbot/src/utils/helpers.py @@ -2,7 +2,7 @@ import wave from PIL import Image -from pipecat.frames.frames import AudioRawFrame, ImageRawFrame +from pipecat.frames.frames import OutputAudioRawFrame, OutputImageRawFrame script_dir = os.path.dirname(__file__) @@ -16,7 +16,8 @@ def load_images(image_files): filename = os.path.splitext(os.path.basename(full_path))[0] # Open the image and convert it to bytes with Image.open(full_path) as img: - images[filename] = ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format) + images[filename] = OutputImageRawFrame( + image=img.tobytes(), size=img.size, format=img.format) return images @@ -30,8 +31,8 @@ def load_sounds(sound_files): filename = os.path.splitext(os.path.basename(full_path))[0] # Open the sound and convert it to bytes with wave.open(full_path) as audio_file: - sounds[filename] = AudioRawFrame(audio=audio_file.readframes(-1), - sample_rate=audio_file.getframerate(), - num_channels=audio_file.getnchannels()) + sounds[filename] = OutputAudioRawFrame(audio=audio_file.readframes(-1), + sample_rate=audio_file.getframerate(), + num_channels=audio_file.getnchannels()) return sounds diff --git a/examples/twilio-chatbot/README.md b/examples/twilio-chatbot/README.md index 5d5d2385a..fdea359f9 100644 --- a/examples/twilio-chatbot/README.md +++ b/examples/twilio-chatbot/README.md @@ -55,7 +55,7 @@ This project is a FastAPI-based chatbot that integrates with Twilio to handle We 2. **Update the Twilio Webhook**: Copy the ngrok URL and update your Twilio phone number webhook URL to `http:///start_call`. -3. **Update the streams.xml**: +3. **Update streams.xml**: Copy the ngrok URL and update templates/streams.xml with `wss:///ws`. ## Running the Application diff --git a/examples/twilio-chatbot/bot.py b/examples/twilio-chatbot/bot.py index b7376e084..5b83139f9 100644 --- a/examples/twilio-chatbot/bot.py +++ b/examples/twilio-chatbot/bot.py @@ -1,4 +1,3 @@ -import aiohttp import os import sys @@ -27,63 +26,62 @@ async def run_bot(websocket_client, stream_sid): - async with aiohttp.ClientSession() as session: - transport = FastAPIWebsocketTransport( - websocket=websocket_client, - params=FastAPIWebsocketParams( - audio_out_enabled=True, - add_wav_header=False, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, - serializer=TwilioFrameSerializer(stream_sid) - ) + transport = FastAPIWebsocketTransport( + websocket=websocket_client, + params=FastAPIWebsocketParams( + audio_out_enabled=True, + add_wav_header=False, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + serializer=TwilioFrameSerializer(stream_sid) ) - - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") - - stt = DeepgramSTTService(api_key=os.getenv('DEEPGRAM_API_KEY')) - - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady - ) - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", - }, - ] - - tma_in = LLMUserResponseAggregator(messages) - tma_out = LLMAssistantResponseAggregator(messages) - - pipeline = Pipeline([ - transport.input(), # Websocket input from client - stt, # Speech-To-Text - tma_in, # User responses - llm, # LLM - tts, # Text-To-Speech - transport.output(), # Websocket output to client - tma_out # LLM responses - ]) - - task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) - - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - # Kick off the conversation. - messages.append( - {"role": "system", "content": "Please introduce yourself to the user."}) - await task.queue_frames([LLMMessagesFrame(messages)]) - - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - await task.queue_frames([EndFrame()]) - - runner = PipelineRunner(handle_sigint=False) - - await runner.run(task) + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + stt = DeepgramSTTService(api_key=os.getenv('DEEPGRAM_API_KEY')) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), # Websocket input from client + stt, # Speech-To-Text + tma_in, # User responses + llm, # LLM + tts, # Text-To-Speech + transport.output(), # Websocket output to client + tma_out # LLM responses + ]) + + task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + await task.queue_frames([EndFrame()]) + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) diff --git a/examples/twilio-chatbot/requirements.txt b/examples/twilio-chatbot/requirements.txt index f0456fcd5..eefaca888 100644 --- a/examples/twilio-chatbot/requirements.txt +++ b/examples/twilio-chatbot/requirements.txt @@ -1,4 +1,4 @@ -pipecat-ai[daily,openai,silero,deepgram] +pipecat-ai[daily,cartesia,openai,silero,deepgram] fastapi uvicorn python-dotenv diff --git a/examples/websocket-server/bot.py b/examples/websocket-server/bot.py index 29a99614f..61d285fa8 100644 --- a/examples/websocket-server/bot.py +++ b/examples/websocket-server/bot.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import aiohttp import asyncio import os import sys @@ -33,60 +32,59 @@ async def main(): - async with aiohttp.ClientSession() as session: - transport = WebsocketServerTransport( - params=WebsocketServerParams( - audio_out_enabled=True, - add_wav_header=True, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True - ) + transport = WebsocketServerTransport( + params=WebsocketServerParams( + audio_out_enabled=True, + add_wav_header=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True ) - - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") - - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady - ) - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", - }, - ] - - tma_in = LLMUserResponseAggregator(messages) - tma_out = LLMAssistantResponseAggregator(messages) - - pipeline = Pipeline([ - transport.input(), # Websocket input from client - stt, # Speech-To-Text - tma_in, # User responses - llm, # LLM - tts, # Text-To-Speech - transport.output(), # Websocket output to client - tma_out # LLM responses - ]) - - task = PipelineTask(pipeline) - - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - # Kick off the conversation. - messages.append( - {"role": "system", "content": "Please introduce yourself to the user."}) - await task.queue_frames([LLMMessagesFrame(messages)]) - - runner = PipelineRunner() - - await runner.run(task) + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), # Websocket input from client + stt, # Speech-To-Text + tma_in, # User responses + llm, # LLM + tts, # Text-To-Speech + transport.output(), # Websocket output to client + tma_out # LLM responses + ]) + + task = PipelineTask(pipeline) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) if __name__ == "__main__": asyncio.run(main()) diff --git a/examples/websocket-server/frames.proto b/examples/websocket-server/frames.proto index 5c5d81d4d..4c58d2a34 100644 --- a/examples/websocket-server/frames.proto +++ b/examples/websocket-server/frames.proto @@ -24,6 +24,7 @@ message AudioRawFrame { bytes audio = 3; uint32 sample_rate = 4; uint32 num_channels = 5; + optional uint64 pts = 6; } message TranscriptionFrame { diff --git a/examples/websocket-server/requirements.txt b/examples/websocket-server/requirements.txt index 77e5b9e91..0815c6b8a 100644 --- a/examples/websocket-server/requirements.txt +++ b/examples/websocket-server/requirements.txt @@ -1,2 +1,2 @@ python-dotenv -pipecat-ai[openai,silero,websocket,whisper] +pipecat-ai[cartesia,openai,silero,websocket,whisper] diff --git a/src/pipecat/frames/frames.proto b/src/pipecat/frames/frames.proto index 5c5d81d4d..4c58d2a34 100644 --- a/src/pipecat/frames/frames.proto +++ b/src/pipecat/frames/frames.proto @@ -24,6 +24,7 @@ message AudioRawFrame { bytes audio = 3; uint32 sample_rate = 4; uint32 num_channels = 5; + optional uint64 pts = 6; } message TranscriptionFrame { diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 4d207fecd..d45487150 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -41,10 +41,7 @@ class DataFrame(Frame): @dataclass class AudioRawFrame(DataFrame): - """A chunk of audio. Will be played by the transport if the transport's - microphone has been enabled. - - """ + """A chunk of audio.""" audio: bytes sample_rate: int num_channels: int @@ -58,6 +55,31 @@ def __str__(self): return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})" +@dataclass +class InputAudioRawFrame(AudioRawFrame): + """A chunk of audio usually coming from an input transport. + + """ + pass + + +@dataclass +class OutputAudioRawFrame(AudioRawFrame): + """A chunk of audio. Will be played by the output transport if the + transport's microphone has been enabled. + + """ + pass + + +@dataclass +class TTSAudioRawFrame(OutputAudioRawFrame): + """A chunk of output audio generated by a TTS service. + + """ + pass + + @dataclass class ImageRawFrame(DataFrame): """An image. Will be shown by the transport if the transport's camera is @@ -74,20 +96,30 @@ def __str__(self): @dataclass -class URLImageRawFrame(ImageRawFrame): - """An image with an associated URL. Will be shown by the transport if the +class InputImageRawFrame(ImageRawFrame): + pass + + +@dataclass +class OutputImageRawFrame(ImageRawFrame): + pass + + +@dataclass +class UserImageRawFrame(InputImageRawFrame): + """An image associated to a user. Will be shown by the transport if the transport's camera is enabled. """ - url: str | None + user_id: str def __str__(self): pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})" + return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})" @dataclass -class VisionImageRawFrame(ImageRawFrame): +class VisionImageRawFrame(InputImageRawFrame): """An image with an associated text to ask for a description of it. Will be shown by the transport if the transport's camera is enabled. @@ -100,16 +132,16 @@ def __str__(self): @dataclass -class UserImageRawFrame(ImageRawFrame): - """An image associated to a user. Will be shown by the transport if the +class URLImageRawFrame(OutputImageRawFrame): + """An image with an associated URL. Will be shown by the transport if the transport's camera is enabled. """ - user_id: str + url: str | None def __str__(self): pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})" + return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})" @dataclass @@ -420,10 +452,10 @@ class BotSpeakingFrame(ControlFrame): @dataclass class TTSStartedFrame(ControlFrame): """Used to indicate the beginning of a TTS response. Following - AudioRawFrames are part of the TTS response until an TTSStoppedFrame. These - frames can be used for aggregating audio frames in a transport to optimize - the size of frames sent to the session, without needing to control this in - the TTS service. + TTSAudioRawFrames are part of the TTS response until an + TTSStoppedFrame. These frames can be used for aggregating audio frames in a + transport to optimize the size of frames sent to the session, without + needing to control this in the TTS service. """ pass diff --git a/src/pipecat/frames/protobufs/frames_pb2.py b/src/pipecat/frames/protobufs/frames_pb2.py index 5040efc97..d58bc8baa 100644 --- a/src/pipecat/frames/protobufs/frames_pb2.py +++ b/src/pipecat/frames/protobufs/frames_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"c\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"}\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\x12\x10\n\x03pts\x18\x06 \x01(\x04H\x00\x88\x01\x01\x42\x06\n\x04_pts\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -24,9 +24,9 @@ _globals['_TEXTFRAME']._serialized_start=25 _globals['_TEXTFRAME']._serialized_end=76 _globals['_AUDIORAWFRAME']._serialized_start=78 - _globals['_AUDIORAWFRAME']._serialized_end=177 - _globals['_TRANSCRIPTIONFRAME']._serialized_start=179 - _globals['_TRANSCRIPTIONFRAME']._serialized_end=275 - _globals['_FRAME']._serialized_start=278 - _globals['_FRAME']._serialized_end=425 + _globals['_AUDIORAWFRAME']._serialized_end=203 + _globals['_TRANSCRIPTIONFRAME']._serialized_start=205 + _globals['_TRANSCRIPTIONFRAME']._serialized_end=301 + _globals['_FRAME']._serialized_start=304 + _globals['_FRAME']._serialized_end=451 # @@protoc_insertion_point(module_scope) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 379394120..13920c59b 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import List +from typing import List, Type from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext @@ -34,8 +34,8 @@ def __init__( role: str, start_frame, end_frame, - accumulator_frame: TextFrame, - interim_accumulator_frame: TextFrame | None = None, + accumulator_frame: Type[TextFrame], + interim_accumulator_frame: Type[TextFrame] | None = None, handle_interruptions: bool = False ): super().__init__() diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index 0d8b19a36..3d1acf32e 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -13,7 +13,11 @@ from PIL import Image -from pipecat.frames.frames import Frame, VisionImageRawFrame, FunctionCallInProgressFrame, FunctionCallResultFrame +from pipecat.frames.frames import ( + Frame, + VisionImageRawFrame, + FunctionCallInProgressFrame, + FunctionCallResultFrame) from pipecat.processors.frame_processor import FrameProcessor from loguru import logger diff --git a/src/pipecat/processors/aggregators/vision_image_frame.py b/src/pipecat/processors/aggregators/vision_image_frame.py index 0bbb10841..97f6b5ec8 100644 --- a/src/pipecat/processors/aggregators/vision_image_frame.py +++ b/src/pipecat/processors/aggregators/vision_image_frame.py @@ -4,13 +4,19 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from pipecat.frames.frames import Frame, ImageRawFrame, TextFrame, VisionImageRawFrame +from pipecat.frames.frames import ( + Frame, + InputImageRawFrame, + TextFrame, + VisionImageRawFrame +) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class VisionImageFrameAggregator(FrameProcessor): """This aggregator waits for a consecutive TextFrame and an - ImageFrame. After the ImageFrame arrives it will output a VisionImageFrame. + InputImageRawFrame. After the InputImageRawFrame arrives it will output a + VisionImageRawFrame. >>> from pipecat.frames.frames import ImageFrame @@ -34,7 +40,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): self._describe_text = frame.text - elif isinstance(frame, ImageRawFrame): + elif isinstance(frame, InputImageRawFrame): if self._describe_text: frame = VisionImageRawFrame( text=self._describe_text, diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index 8d46105a7..f852dd641 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -9,11 +9,11 @@ from pydantic import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, - ImageRawFrame, + OutputAudioRawFrame, + OutputImageRawFrame, StartFrame, SystemFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -182,9 +182,9 @@ def _decodebin_video(self, pad: Gst.Pad): def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): buffer = appsink.pull_sample().get_buffer() (_, info) = buffer.map(Gst.MapFlags.READ) - frame = AudioRawFrame(audio=info.data, - sample_rate=self._out_params.audio_sample_rate, - num_channels=self._out_params.audio_channels) + frame = OutputAudioRawFrame(audio=info.data, + sample_rate=self._out_params.audio_sample_rate, + num_channels=self._out_params.audio_channels) asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK @@ -192,7 +192,7 @@ def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): def _appsink_video_new_sample(self, appsink: GstApp.AppSink): buffer = appsink.pull_sample().get_buffer() (_, info) = buffer.map(Gst.MapFlags.READ) - frame = ImageRawFrame( + frame = OutputImageRawFrame( image=info.data, size=(self._out_params.video_width, self._out_params.video_height), format="RGB") diff --git a/src/pipecat/serializers/livekit.py b/src/pipecat/serializers/livekit.py index 7a0e8afd1..fec5243f5 100644 --- a/src/pipecat/serializers/livekit.py +++ b/src/pipecat/serializers/livekit.py @@ -7,7 +7,10 @@ import ctypes import pickle -from pipecat.frames.frames import AudioRawFrame, Frame +from pipecat.frames.frames import ( + Frame, + InputAudioRawFrame, + OutputAudioRawFrame) from pipecat.serializers.base_serializer import FrameSerializer from loguru import logger @@ -22,12 +25,8 @@ class LivekitFrameSerializer(FrameSerializer): - SERIALIZABLE_TYPES = { - AudioRawFrame: "audio", - } - def serialize(self, frame: Frame) -> str | bytes | None: - if not isinstance(frame, AudioRawFrame): + if not isinstance(frame, OutputAudioRawFrame): return None audio_frame = AudioFrame( data=frame.audio, @@ -39,7 +38,7 @@ def serialize(self, frame: Frame) -> str | bytes | None: def deserialize(self, data: str | bytes) -> Frame | None: audio_frame: AudioFrame = pickle.loads(data)['frame'] - return AudioRawFrame( + return InputAudioRawFrame( audio=bytes(audio_frame.data), sample_rate=audio_frame.sample_rate, num_channels=audio_frame.num_channels, diff --git a/src/pipecat/serializers/protobuf.py b/src/pipecat/serializers/protobuf.py index 0a6dee0b1..6ae1b0c03 100644 --- a/src/pipecat/serializers/protobuf.py +++ b/src/pipecat/serializers/protobuf.py @@ -8,7 +8,11 @@ import pipecat.frames.protobufs.frames_pb2 as frame_protos -from pipecat.frames.frames import AudioRawFrame, Frame, TextFrame, TranscriptionFrame +from pipecat.frames.frames import ( + AudioRawFrame, + Frame, + TextFrame, + TranscriptionFrame) from pipecat.serializers.base_serializer import FrameSerializer from loguru import logger @@ -29,14 +33,15 @@ def __init__(self): def serialize(self, frame: Frame) -> str | bytes | None: proto_frame = frame_protos.Frame() if type(frame) not in self.SERIALIZABLE_TYPES: - raise ValueError( - f"Frame type {type(frame)} is not serializable. You may need to add it to ProtobufFrameSerializer.SERIALIZABLE_FIELDS.") + logger.warning(f"Frame type {type(frame)} is not serializable") + return None # ignoring linter errors; we check that type(frame) is in this dict above proto_optional_name = self.SERIALIZABLE_TYPES[type(frame)] # type: ignore for field in dataclasses.fields(frame): # type: ignore - setattr(getattr(proto_frame, proto_optional_name), field.name, - getattr(frame, field.name)) + value = getattr(frame, field.name) + if value: + setattr(getattr(proto_frame, proto_optional_name), field.name, value) result = proto_frame.SerializeToString() return result @@ -48,8 +53,8 @@ def deserialize(self, data: str | bytes) -> Frame | None: >>> serializer = ProtobufFrameSerializer() >>> serializer.deserialize( - ... serializer.serialize(AudioFrame(data=b'1234567890'))) - AudioFrame(data=b'1234567890') + ... serializer.serialize(OutputAudioFrame(data=b'1234567890'))) + InputAudioFrame(data=b'1234567890') >>> serializer.deserialize( ... serializer.serialize(TextFrame(text='hello world'))) @@ -75,10 +80,13 @@ def deserialize(self, data: str | bytes) -> Frame | None: # Remove special fields if needed id = getattr(args, "id") name = getattr(args, "name") + pts = getattr(args, "pts") if not id: del args_dict["id"] if not name: del args_dict["name"] + if not pts: + del args_dict["pts"] # Create the instance instance = class_name(**args_dict) @@ -88,5 +96,7 @@ def deserialize(self, data: str | bytes) -> Frame | None: setattr(instance, "id", getattr(args, "id")) if name: setattr(instance, "name", getattr(args, "name")) + if pts: + setattr(instance, "pts", getattr(args, "pts")) return instance diff --git a/src/pipecat/serializers/twilio.py b/src/pipecat/serializers/twilio.py index 583234ae4..ed2905a40 100644 --- a/src/pipecat/serializers/twilio.py +++ b/src/pipecat/serializers/twilio.py @@ -9,7 +9,10 @@ from pydantic import BaseModel -from pipecat.frames.frames import AudioRawFrame, Frame, StartInterruptionFrame +from pipecat.frames.frames import ( + AudioRawFrame, + Frame, + StartInterruptionFrame) from pipecat.serializers.base_serializer import FrameSerializer from pipecat.utils.audio import ulaw_to_pcm, pcm_to_ulaw @@ -19,10 +22,6 @@ class InputParams(BaseModel): twilio_sample_rate: int = 8000 sample_rate: int = 16000 - SERIALIZABLE_TYPES = { - AudioRawFrame: "audio", - } - def __init__(self, stream_sid: str, params: InputParams = InputParams()): self._stream_sid = stream_sid self._params = params diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index d91782e42..c00064e5d 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -22,6 +22,7 @@ STTModelUpdateFrame, StartFrame, StartInterruptionFrame, + TTSAudioRawFrame, TTSLanguageUpdateFrame, TTSModelUpdateFrame, TTSSpeakFrame, @@ -277,7 +278,7 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect if self._push_stop_frames and ( isinstance(frame, StartInterruptionFrame) or isinstance(frame, TTSStartedFrame) or - isinstance(frame, AudioRawFrame) or + isinstance(frame, TTSAudioRawFrame) or isinstance(frame, TTSStoppedFrame)): await self._stop_frame_queue.put(frame) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index 90674fcc4..2c01fff3c 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -12,12 +12,12 @@ from typing import AsyncGenerator from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame, @@ -115,7 +115,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.stop_ttfb_metrics() await self.push_frame(TTSStartedFrame()) # Azure always sends a 44-byte header. Strip it off. - yield AudioRawFrame(audio=result.audio_data[44:], sample_rate=self._sample_rate, num_channels=1) + yield TTSAudioRawFrame(audio=result.audio_data[44:], sample_rate=self._sample_rate, num_channels=1) await self.push_frame(TTSStoppedFrame()) elif result.reason == ResultReason.Canceled: cancellation_details = result.cancellation_details diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 078926235..ac0a1b85b 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -15,10 +15,10 @@ CancelFrame, ErrorFrame, Frame, - AudioRawFrame, StartInterruptionFrame, StartFrame, EndFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, LLMFullResponseEndFrame @@ -201,7 +201,7 @@ async def _receive_task_handler(self): elif msg["type"] == "chunk": await self.stop_ttfb_metrics() self.start_word_timestamps() - frame = AudioRawFrame( + frame = TTSAudioRawFrame( audio=base64.b64decode(msg["data"]), sample_rate=self._output_format["sample_rate"], num_channels=1 @@ -326,7 +326,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.stop_ttfb_metrics() - frame = AudioRawFrame( + frame = TTSAudioRawFrame( audio=output["audio"], sample_rate=self._output_format["sample_rate"], num_channels=1 diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 708c3c511..5ce17d6ee 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -9,13 +9,13 @@ from typing import AsyncGenerator from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame) @@ -101,7 +101,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.push_frame(TTSStartedFrame()) async for data in r.content: await self.stop_ttfb_metrics() - frame = AudioRawFrame(audio=data, sample_rate=self._sample_rate, num_channels=1) + frame = TTSAudioRawFrame( + audio=data, sample_rate=self._sample_rate, num_channels=1) yield frame await self.push_frame(TTSStoppedFrame()) except Exception as e: diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 081a6bf5d..f78eba266 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -12,12 +12,12 @@ from pydantic import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame) from pipecat.processors.frame_processor import FrameDirection @@ -209,7 +209,7 @@ async def _receive_task_handler(self): self.start_word_timestamps() audio = base64.b64decode(msg["audio"]) - frame = AudioRawFrame(audio, self.sample_rate, 1) + frame = TTSAudioRawFrame(audio, self.sample_rate, 1) await self.push_frame(frame) if msg.get("alignment"): diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 60f0cb7df..9285f1583 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -10,13 +10,13 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, StartInterruptionFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, ) @@ -126,7 +126,7 @@ async def _receive_task_handler(self): await self.push_error(ErrorFrame(f'{self} error: {msg["error"]}')) elif "audio" in msg: await self.stop_ttfb_metrics() - frame = AudioRawFrame( + frame = TTSAudioRawFrame( audio=msg["audio"], sample_rate=self._output_format["sample_rate"], num_channels=1 diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 7483e2eb5..8a1355916 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -17,13 +17,13 @@ from PIL import Image from pipecat.frames.frames import ( - AudioRawFrame, ErrorFrame, Frame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, LLMMessagesFrame, LLMModelUpdateFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TextFrame, @@ -365,7 +365,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async for chunk in r.iter_bytes(8192): if len(chunk) > 0: await self.stop_ttfb_metrics() - frame = AudioRawFrame(chunk, self.sample_rate, 1) + frame = TTSAudioRawFrame(chunk, self.sample_rate, 1) yield frame await self.push_frame(TTSStoppedFrame()) except BadRequestError as e: diff --git a/src/pipecat/services/playht.py b/src/pipecat/services/playht.py index c3200fee9..ae8606e91 100644 --- a/src/pipecat/services/playht.py +++ b/src/pipecat/services/playht.py @@ -9,7 +9,11 @@ from typing import AsyncGenerator -from pipecat.frames.frames import AudioRawFrame, Frame, TTSStartedFrame, TTSStoppedFrame +from pipecat.frames.frames import ( + Frame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame) from pipecat.services.ai_services import TTSService from loguru import logger @@ -91,7 +95,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: else: if len(chunk): await self.stop_ttfb_metrics() - frame = AudioRawFrame(chunk, 16000, 1) + frame = TTSAudioRawFrame(chunk, 16000, 1) yield frame await self.push_frame(TTSStoppedFrame()) except Exception as e: diff --git a/src/pipecat/services/together.py b/src/pipecat/services/together.py index 49759cb01..dfd4c2966 100644 --- a/src/pipecat/services/together.py +++ b/src/pipecat/services/together.py @@ -4,23 +4,19 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import base64 import json -import io -import copy -from typing import List, Optional -from dataclasses import dataclass -from asyncio import CancelledError import re import uuid +from typing import List +from dataclasses import dataclass +from asyncio import CancelledError + from pipecat.frames.frames import ( Frame, LLMModelUpdateFrame, TextFrame, - VisionImageRawFrame, UserImageRequestFrame, - UserImageRawFrame, LLMMessagesFrame, LLMFullResponseStartFrame, LLMFullResponseEndFrame, diff --git a/src/pipecat/services/xtts.py b/src/pipecat/services/xtts.py index 38f0f9a64..69b754f55 100644 --- a/src/pipecat/services/xtts.py +++ b/src/pipecat/services/xtts.py @@ -9,10 +9,10 @@ from typing import Any, AsyncGenerator, Dict from pipecat.frames.frames import ( - AudioRawFrame, ErrorFrame, Frame, StartFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame) from pipecat.services.ai_services import TTSService @@ -128,7 +128,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: # Convert the numpy array back to bytes resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes() # Create the frame with the resampled audio - frame = AudioRawFrame(resampled_audio_bytes, 16000, 1) + frame = TTSAudioRawFrame(resampled_audio_bytes, 16000, 1) yield frame # Process any remaining data in the buffer @@ -136,7 +136,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: audio_np = np.frombuffer(buffer, dtype=np.int16) resampled_audio = resampy.resample(audio_np, 24000, 16000) resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes() - frame = AudioRawFrame(resampled_audio_bytes, 16000, 1) + frame = TTSAudioRawFrame(resampled_audio_bytes, 16000, 1) yield frame await self.push_frame(TTSStoppedFrame()) diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index de1ec8884..4e398e779 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -10,9 +10,9 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import ( - AudioRawFrame, BotInterruptionFrame, CancelFrame, + InputAudioRawFrame, StartFrame, EndFrame, Frame, @@ -59,7 +59,7 @@ async def cancel(self, frame: CancelFrame): def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer - async def push_audio_frame(self, frame: AudioRawFrame): + async def push_audio_frame(self, frame: InputAudioRawFrame): if self._params.audio_in_enabled or self._params.vad_enabled: await self._audio_in_queue.put(frame) @@ -151,7 +151,7 @@ async def _audio_task_handler(self): vad_state: VADState = VADState.QUIET while True: try: - frame: AudioRawFrame = await self._audio_in_queue.get() + frame: InputAudioRawFrame = await self._audio_in_queue.get() audio_passthrough = True diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 9b1b9c29e..263bb64f4 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -15,17 +15,17 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import ( - AudioRawFrame, BotSpeakingFrame, BotStartedSpeakingFrame, BotStoppedSpeakingFrame, CancelFrame, MetricsFrame, + OutputAudioRawFrame, + OutputImageRawFrame, SpriteFrame, StartFrame, EndFrame, Frame, - ImageRawFrame, StartInterruptionFrame, StopInterruptionFrame, SystemFrame, @@ -122,7 +122,7 @@ async def send_message(self, frame: TransportMessageFrame): async def send_metrics(self, frame: MetricsFrame): pass - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): pass async def write_raw_audio_frames(self, frames: bytes): @@ -162,9 +162,9 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._sink_queue.put(frame) await self.stop(frame) # Other frames. - elif isinstance(frame, AudioRawFrame): + elif isinstance(frame, OutputAudioRawFrame): await self._handle_audio(frame) - elif isinstance(frame, ImageRawFrame) or isinstance(frame, SpriteFrame): + elif isinstance(frame, OutputImageRawFrame) or isinstance(frame, SpriteFrame): await self._handle_image(frame) elif isinstance(frame, TransportMessageFrame) and frame.urgent: await self.send_message(frame) @@ -191,7 +191,7 @@ async def _handle_interruptions(self, frame: Frame): if self._bot_speaking: await self._bot_stopped_speaking() - async def _handle_audio(self, frame: AudioRawFrame): + async def _handle_audio(self, frame: OutputAudioRawFrame): if not self._params.audio_out_enabled: return @@ -200,12 +200,14 @@ async def _handle_audio(self, frame: AudioRawFrame): else: self._audio_buffer.extend(frame.audio) while len(self._audio_buffer) >= self._audio_chunk_size: - chunk = AudioRawFrame(bytes(self._audio_buffer[:self._audio_chunk_size]), - sample_rate=frame.sample_rate, num_channels=frame.num_channels) + chunk = OutputAudioRawFrame( + bytes(self._audio_buffer[:self._audio_chunk_size]), + sample_rate=frame.sample_rate, num_channels=frame.num_channels + ) await self._sink_queue.put(chunk) self._audio_buffer = self._audio_buffer[self._audio_chunk_size:] - async def _handle_image(self, frame: ImageRawFrame | SpriteFrame): + async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame): if not self._params.camera_out_enabled: return @@ -226,11 +228,11 @@ def _create_sink_tasks(self): self._sink_clock_task = loop.create_task(self._sink_clock_task_handler()) async def _sink_frame_handler(self, frame: Frame): - if isinstance(frame, AudioRawFrame): + if isinstance(frame, OutputAudioRawFrame): await self.write_raw_audio_frames(frame.audio) await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) - elif isinstance(frame, ImageRawFrame): + elif isinstance(frame, OutputImageRawFrame): await self._set_camera_image(frame) elif isinstance(frame, SpriteFrame): await self._set_camera_images(frame.images) @@ -305,10 +307,10 @@ async def _bot_stopped_speaking(self): # Camera out # - async def send_image(self, frame: ImageRawFrame | SpriteFrame): + async def send_image(self, frame: OutputImageRawFrame | SpriteFrame): await self.process_frame(frame, FrameDirection.DOWNSTREAM) - async def _draw_image(self, frame: ImageRawFrame): + async def _draw_image(self, frame: OutputImageRawFrame): desired_size = (self._params.camera_out_width, self._params.camera_out_height) if frame.size != desired_size: @@ -316,14 +318,17 @@ async def _draw_image(self, frame: ImageRawFrame): resized_image = image.resize(desired_size) logger.warning( f"{frame} does not have the expected size {desired_size}, resizing") - frame = ImageRawFrame(resized_image.tobytes(), resized_image.size, resized_image.format) + frame = OutputImageRawFrame( + resized_image.tobytes(), + resized_image.size, + resized_image.format) await self.write_frame_to_camera(frame) - async def _set_camera_image(self, image: ImageRawFrame): + async def _set_camera_image(self, image: OutputImageRawFrame): self._camera_images = itertools.cycle([image]) - async def _set_camera_images(self, images: List[ImageRawFrame]): + async def _set_camera_images(self, images: List[OutputImageRawFrame]): self._camera_images = itertools.cycle(images) async def _camera_out_task_handler(self): @@ -375,7 +380,7 @@ async def _camera_out_is_live_handler(self): # Audio out # - async def send_audio(self, frame: AudioRawFrame): + async def send_audio(self, frame: OutputAudioRawFrame): await self.process_frame(frame, FrameDirection.DOWNSTREAM) async def _audio_out_task_handler(self): diff --git a/src/pipecat/transports/local/audio.py b/src/pipecat/transports/local/audio.py index cd05550a9..45d18db52 100644 --- a/src/pipecat/transports/local/audio.py +++ b/src/pipecat/transports/local/audio.py @@ -8,7 +8,7 @@ from concurrent.futures import ThreadPoolExecutor -from pipecat.frames.frames import AudioRawFrame, StartFrame +from pipecat.frames.frames import InputAudioRawFrame, StartFrame from pipecat.processors.frame_processor import FrameProcessor from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport @@ -54,9 +54,9 @@ async def cleanup(self): self._in_stream.close() def _audio_in_callback(self, in_data, frame_count, time_info, status): - frame = AudioRawFrame(audio=in_data, - sample_rate=self._params.audio_in_sample_rate, - num_channels=self._params.audio_in_channels) + frame = InputAudioRawFrame(audio=in_data, + sample_rate=self._params.audio_in_sample_rate, + num_channels=self._params.audio_in_channels) asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop()) diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index e7dc04902..75dd30331 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -11,8 +11,7 @@ import numpy as np import tkinter as tk -from pipecat.frames.frames import AudioRawFrame, ImageRawFrame, StartFrame -from pipecat.processors.frame_processor import FrameProcessor +from pipecat.frames.frames import InputAudioRawFrame, OutputImageRawFrame, StartFrame from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams @@ -64,9 +63,9 @@ async def cleanup(self): self._in_stream.close() def _audio_in_callback(self, in_data, frame_count, time_info, status): - frame = AudioRawFrame(audio=in_data, - sample_rate=self._params.audio_in_sample_rate, - num_channels=self._params.audio_in_channels) + frame = InputAudioRawFrame(audio=in_data, + sample_rate=self._params.audio_in_sample_rate, + num_channels=self._params.audio_in_channels) asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop()) @@ -108,10 +107,10 @@ async def cleanup(self): async def write_raw_audio_frames(self, frames: bytes): await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames) - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): self.get_event_loop().call_soon(self._write_frame_to_tk, frame) - def _write_frame_to_tk(self, frame: ImageRawFrame): + def _write_frame_to_tk(self, frame: OutputImageRawFrame): width = frame.size[0] height = frame.size[1] data = f"P6 {width} {height} 255 ".encode() + frame.image diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index 7169c73bd..815d7c2ef 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -12,8 +12,16 @@ from typing import Awaitable, Callable from pydantic.main import BaseModel -from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.frames.frames import ( + AudioRawFrame, + CancelFrame, + EndFrame, + Frame, + InputAudioRawFrame, + StartFrame, + StartInterruptionFrame +) +from pipecat.processors.frame_processor import FrameDirection from pipecat.serializers.base_serializer import FrameSerializer from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport @@ -79,7 +87,11 @@ async def _receive_messages(self): continue if isinstance(frame, AudioRawFrame): - await self.push_audio_frame(frame) + await self.push_audio_frame(InputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels) + ) await self._callbacks.on_client_disconnected(self._websocket) diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index c17818898..329ae8994 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -11,8 +11,7 @@ from typing import Awaitable, Callable from pydantic.main import BaseModel -from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, StartFrame -from pipecat.processors.frame_processor import FrameProcessor +from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, InputAudioRawFrame, StartFrame from pipecat.serializers.base_serializer import FrameSerializer from pipecat.serializers.protobuf import ProtobufFrameSerializer from pipecat.transports.base_input import BaseInputTransport @@ -98,7 +97,11 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p continue if isinstance(frame, AudioRawFrame): - await self.queue_audio_frame(frame) + await self.push_audio_frame(InputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels) + ) else: await self.push_frame(frame) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 2a45adf36..d595aaceb 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -22,13 +22,14 @@ from pydantic.main import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, - ImageRawFrame, + InputAudioRawFrame, InterimTranscriptionFrame, MetricsFrame, + OutputAudioRawFrame, + OutputImageRawFrame, SpriteFrame, StartFrame, TranscriptionFrame, @@ -239,7 +240,7 @@ async def send_message(self, frame: TransportMessageFrame): completion=completion_callback(future)) await future - async def read_next_audio_frame(self) -> AudioRawFrame | None: + async def read_next_audio_frame(self) -> InputAudioRawFrame | None: if not self._speaker: return None @@ -252,7 +253,10 @@ async def read_next_audio_frame(self) -> AudioRawFrame | None: audio = await future if len(audio) > 0: - return AudioRawFrame(audio=audio, sample_rate=sample_rate, num_channels=num_channels) + return InputAudioRawFrame( + audio=audio, + sample_rate=sample_rate, + num_channels=num_channels) else: # If we don't read any audio it could be there's no participant # connected. daily-python will return immediately if that's the @@ -268,7 +272,7 @@ async def write_raw_audio_frames(self, frames: bytes): self._mic.write_frames(frames, completion=completion_callback(future)) await future - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): if not self._camera: return None @@ -749,7 +753,7 @@ async def send_metrics(self, frame: MetricsFrame): async def write_raw_audio_frames(self, frames: bytes): await self._client.write_raw_audio_frames(frames) - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): await self._client.write_frame_to_camera(frame) @@ -829,11 +833,11 @@ def output(self) -> DailyOutputTransport: def participant_id(self) -> str: return self._client.participant_id - async def send_image(self, frame: ImageRawFrame | SpriteFrame): + async def send_image(self, frame: OutputImageRawFrame | SpriteFrame): if self._output: await self._output.process_frame(frame, FrameDirection.DOWNSTREAM) - async def send_audio(self, frame: AudioRawFrame): + async def send_audio(self, frame: OutputAudioRawFrame): if self._output: await self._output.process_frame(frame, FrameDirection.DOWNSTREAM)