diff --git a/examples/dialin-chatbot/requirements.txt b/examples/dialin-chatbot/requirements.txt index e59a9c3d2..1e15004b1 100644 --- a/examples/dialin-chatbot/requirements.txt +++ b/examples/dialin-chatbot/requirements.txt @@ -1,4 +1,4 @@ -pipecat-ai[daily,openai,silero] +pipecat-ai[daily,elevenlabs,openai,silero] fastapi uvicorn python-dotenv diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index 66a6e7f57..d9a0e792e 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -11,7 +11,13 @@ import tkinter as tk -from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame +from pipecat.frames.frames import ( + Frame, + OutputAudioRawFrame, + TTSAudioRawFrame, + URLImageRawFrame, + LLMMessagesFrame, + TextFrame) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline @@ -65,9 +71,9 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): + if isinstance(frame, TTSAudioRawFrame): self.audio.extend(frame.audio) - self.frame = AudioRawFrame( + self.frame = OutputAudioRawFrame( bytes(self.audio), frame.sample_rate, frame.num_channels) class ImageGrabber(FrameProcessor): diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 6b3e58cf7..db48b709e 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -11,7 +11,7 @@ from PIL import Image -from pipecat.frames.frames import ImageRawFrame, Frame, SystemFrame, TextFrame +from pipecat.frames.frames import Frame, OutputImageRawFrame, SystemFrame, TextFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask @@ -52,9 +52,16 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM: - await self.push_frame(ImageRawFrame(image=self._speaking_image_bytes, size=(1024, 1024), format=self._speaking_image_format)) + await self.push_frame(OutputImageRawFrame( + image=self._speaking_image_bytes, + size=(1024, 1024), + format=self._speaking_image_format) + ) await self.push_frame(frame) - await self.push_frame(ImageRawFrame(image=self._waiting_image_bytes, size=(1024, 1024), format=self._waiting_image_format)) + await self.push_frame(OutputImageRawFrame( + image=self._waiting_image_bytes, + size=(1024, 1024), + format=self._waiting_image_format)) else: await self.push_frame(frame) diff --git a/examples/foundational/09-mirror.py b/examples/foundational/09-mirror.py index 8f5f1073b..bb6253deb 100644 --- a/examples/foundational/09-mirror.py +++ b/examples/foundational/09-mirror.py @@ -8,9 +8,11 @@ import asyncio import sys +from pipecat.frames.frames import Frame, InputAudioRawFrame, InputImageRawFrame, OutputAudioRawFrame, OutputImageRawFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.services.daily import DailyTransport, DailyParams from runner import configure @@ -24,6 +26,27 @@ logger.add(sys.stderr, level="DEBUG") +class MirrorProcessor(FrameProcessor): + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, InputAudioRawFrame): + await self.push_frame(OutputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels) + ) + elif isinstance(frame, InputImageRawFrame): + await self.push_frame(OutputImageRawFrame( + image=frame.image, + size=frame.size, + format=frame.format) + ) + else: + await self.push_frame(frame, direction) + + async def main(): async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) @@ -44,7 +67,7 @@ async def main(): async def on_first_participant_joined(transport, participant): transport.capture_participant_video(participant["id"]) - pipeline = Pipeline([transport.input(), transport.output()]) + pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()]) runner = PipelineRunner() diff --git a/examples/foundational/09a-local-mirror.py b/examples/foundational/09a-local-mirror.py index d657a3631..afc77470d 100644 --- a/examples/foundational/09a-local-mirror.py +++ b/examples/foundational/09a-local-mirror.py @@ -10,9 +10,11 @@ import tkinter as tk +from pipecat.frames.frames import Frame, InputAudioRawFrame, InputImageRawFrame, OutputAudioRawFrame, OutputImageRawFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.base_transport import TransportParams from pipecat.transports.local.tk import TkLocalTransport from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -27,6 +29,25 @@ logger.remove(0) logger.add(sys.stderr, level="DEBUG") +class MirrorProcessor(FrameProcessor): + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, InputAudioRawFrame): + await self.push_frame(OutputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels) + ) + elif isinstance(frame, InputImageRawFrame): + await self.push_frame(OutputImageRawFrame( + image=frame.image, + size=frame.size, + format=frame.format) + ) + else: + await self.push_frame(frame, direction) async def main(): async with aiohttp.ClientSession() as session: @@ -52,7 +73,7 @@ async def main(): async def on_first_participant_joined(transport, participant): transport.capture_participant_video(participant["id"]) - pipeline = Pipeline([daily_transport.input(), tk_transport.output()]) + pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()]) task = PipelineTask(pipeline) diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 9dc4dc99b..21b03bedf 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -12,9 +12,9 @@ from pipecat.frames.frames import ( Frame, - AudioRawFrame, LLMFullResponseEndFrame, LLMMessagesFrame, + OutputAudioRawFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -53,8 +53,8 @@ filename = os.path.splitext(os.path.basename(full_path))[0] # Open the image and convert it to bytes with wave.open(full_path) as audio_file: - sounds[file] = AudioRawFrame(audio_file.readframes(-1), - audio_file.getframerate(), audio_file.getnchannels()) + sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1), + audio_file.getframerate(), audio_file.getnchannels()) class OutboundSoundEffectWrapper(FrameProcessor): diff --git a/examples/moondream-chatbot/bot.py b/examples/moondream-chatbot/bot.py index 32bbf7e6b..d14a5f016 100644 --- a/examples/moondream-chatbot/bot.py +++ b/examples/moondream-chatbot/bot.py @@ -13,10 +13,11 @@ from pipecat.frames.frames import ( ImageRawFrame, + OutputImageRawFrame, SpriteFrame, Frame, LLMMessagesFrame, - AudioRawFrame, + TTSAudioRawFrame, TTSStoppedFrame, TextFrame, UserImageRawFrame, @@ -59,7 +60,11 @@ # Get the filename without the extension to use as the dictionary key # Open the image and convert it to bytes with Image.open(full_path) as img: - sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)) + sprites.append(OutputImageRawFrame( + image=img.tobytes(), + size=img.size, + format=img.format) + ) flipped = sprites[::-1] sprites.extend(flipped) @@ -82,7 +87,7 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): + if isinstance(frame, TTSAudioRawFrame): if not self._is_talking: await self.push_frame(talking_frame) self._is_talking = True diff --git a/examples/moondream-chatbot/requirements.txt b/examples/moondream-chatbot/requirements.txt index 11132e136..08fd27cb7 100644 --- a/examples/moondream-chatbot/requirements.txt +++ b/examples/moondream-chatbot/requirements.txt @@ -1,4 +1,4 @@ python-dotenv fastapi[all] uvicorn -pipecat-ai[daily,moondream,openai,silero] +pipecat-ai[daily,cartesia,moondream,openai,silero] diff --git a/examples/patient-intake/bot.py b/examples/patient-intake/bot.py index 7dc404c52..33ca9e26d 100644 --- a/examples/patient-intake/bot.py +++ b/examples/patient-intake/bot.py @@ -10,7 +10,7 @@ import sys import wave -from pipecat.frames.frames import AudioRawFrame +from pipecat.frames.frames import OutputAudioRawFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -49,8 +49,9 @@ filename = os.path.splitext(os.path.basename(full_path))[0] # Open the sound and convert it to bytes with wave.open(full_path) as audio_file: - sounds[file] = AudioRawFrame(audio_file.readframes(-1), - audio_file.getframerate(), audio_file.getnchannels()) + sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1), + audio_file.getframerate(), + audio_file.getnchannels()) class IntakeProcessor: diff --git a/examples/patient-intake/requirements.txt b/examples/patient-intake/requirements.txt index a7a8729df..e8bfcd8e4 100644 --- a/examples/patient-intake/requirements.txt +++ b/examples/patient-intake/requirements.txt @@ -1,4 +1,4 @@ python-dotenv fastapi[all] uvicorn -pipecat-ai[daily,openai,silero] +pipecat-ai[daily,cartesia,openai,silero] diff --git a/examples/simple-chatbot/bot.py b/examples/simple-chatbot/bot.py index 1664e47fb..f179dfeb5 100644 --- a/examples/simple-chatbot/bot.py +++ b/examples/simple-chatbot/bot.py @@ -16,11 +16,11 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator from pipecat.frames.frames import ( - AudioRawFrame, - ImageRawFrame, + OutputImageRawFrame, SpriteFrame, Frame, LLMMessagesFrame, + TTSAudioRawFrame, TTSStoppedFrame ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -49,7 +49,11 @@ # Get the filename without the extension to use as the dictionary key # Open the image and convert it to bytes with Image.open(full_path) as img: - sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)) + sprites.append(OutputImageRawFrame( + image=img.tobytes(), + size=img.size, + format=img.format) + ) flipped = sprites[::-1] sprites.extend(flipped) @@ -72,7 +76,7 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): + if isinstance(frame, TTSAudioRawFrame): if not self._is_talking: await self.push_frame(talking_frame) self._is_talking = True diff --git a/examples/simple-chatbot/requirements.txt b/examples/simple-chatbot/requirements.txt index a7a8729df..a4e6aa1db 100644 --- a/examples/simple-chatbot/requirements.txt +++ b/examples/simple-chatbot/requirements.txt @@ -1,4 +1,4 @@ python-dotenv fastapi[all] uvicorn -pipecat-ai[daily,openai,silero] +pipecat-ai[daily,elevenlabs,openai,silero] diff --git a/examples/storytelling-chatbot/requirements.txt b/examples/storytelling-chatbot/requirements.txt index 663f78a76..0cebe6edb 100644 --- a/examples/storytelling-chatbot/requirements.txt +++ b/examples/storytelling-chatbot/requirements.txt @@ -2,4 +2,4 @@ async_timeout fastapi uvicorn python-dotenv -pipecat-ai[daily,openai,fal] +pipecat-ai[daily,elevenlabs,openai,fal] diff --git a/examples/storytelling-chatbot/src/utils/helpers.py b/examples/storytelling-chatbot/src/utils/helpers.py index 2c576fdff..743a04c97 100644 --- a/examples/storytelling-chatbot/src/utils/helpers.py +++ b/examples/storytelling-chatbot/src/utils/helpers.py @@ -2,7 +2,7 @@ import wave from PIL import Image -from pipecat.frames.frames import AudioRawFrame, ImageRawFrame +from pipecat.frames.frames import OutputAudioRawFrame, OutputImageRawFrame script_dir = os.path.dirname(__file__) @@ -16,7 +16,8 @@ def load_images(image_files): filename = os.path.splitext(os.path.basename(full_path))[0] # Open the image and convert it to bytes with Image.open(full_path) as img: - images[filename] = ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format) + images[filename] = OutputImageRawFrame( + image=img.tobytes(), size=img.size, format=img.format) return images @@ -30,8 +31,8 @@ def load_sounds(sound_files): filename = os.path.splitext(os.path.basename(full_path))[0] # Open the sound and convert it to bytes with wave.open(full_path) as audio_file: - sounds[filename] = AudioRawFrame(audio=audio_file.readframes(-1), - sample_rate=audio_file.getframerate(), - num_channels=audio_file.getnchannels()) + sounds[filename] = OutputAudioRawFrame(audio=audio_file.readframes(-1), + sample_rate=audio_file.getframerate(), + num_channels=audio_file.getnchannels()) return sounds diff --git a/examples/twilio-chatbot/README.md b/examples/twilio-chatbot/README.md index 5d5d2385a..fdea359f9 100644 --- a/examples/twilio-chatbot/README.md +++ b/examples/twilio-chatbot/README.md @@ -55,7 +55,7 @@ This project is a FastAPI-based chatbot that integrates with Twilio to handle We 2. **Update the Twilio Webhook**: Copy the ngrok URL and update your Twilio phone number webhook URL to `http:///start_call`. -3. **Update the streams.xml**: +3. **Update streams.xml**: Copy the ngrok URL and update templates/streams.xml with `wss:///ws`. ## Running the Application diff --git a/examples/twilio-chatbot/bot.py b/examples/twilio-chatbot/bot.py index b7376e084..5b83139f9 100644 --- a/examples/twilio-chatbot/bot.py +++ b/examples/twilio-chatbot/bot.py @@ -1,4 +1,3 @@ -import aiohttp import os import sys @@ -27,63 +26,62 @@ async def run_bot(websocket_client, stream_sid): - async with aiohttp.ClientSession() as session: - transport = FastAPIWebsocketTransport( - websocket=websocket_client, - params=FastAPIWebsocketParams( - audio_out_enabled=True, - add_wav_header=False, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, - serializer=TwilioFrameSerializer(stream_sid) - ) + transport = FastAPIWebsocketTransport( + websocket=websocket_client, + params=FastAPIWebsocketParams( + audio_out_enabled=True, + add_wav_header=False, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + serializer=TwilioFrameSerializer(stream_sid) ) - - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") - - stt = DeepgramSTTService(api_key=os.getenv('DEEPGRAM_API_KEY')) - - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady - ) - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", - }, - ] - - tma_in = LLMUserResponseAggregator(messages) - tma_out = LLMAssistantResponseAggregator(messages) - - pipeline = Pipeline([ - transport.input(), # Websocket input from client - stt, # Speech-To-Text - tma_in, # User responses - llm, # LLM - tts, # Text-To-Speech - transport.output(), # Websocket output to client - tma_out # LLM responses - ]) - - task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) - - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - # Kick off the conversation. - messages.append( - {"role": "system", "content": "Please introduce yourself to the user."}) - await task.queue_frames([LLMMessagesFrame(messages)]) - - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - await task.queue_frames([EndFrame()]) - - runner = PipelineRunner(handle_sigint=False) - - await runner.run(task) + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + stt = DeepgramSTTService(api_key=os.getenv('DEEPGRAM_API_KEY')) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), # Websocket input from client + stt, # Speech-To-Text + tma_in, # User responses + llm, # LLM + tts, # Text-To-Speech + transport.output(), # Websocket output to client + tma_out # LLM responses + ]) + + task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + await task.queue_frames([EndFrame()]) + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) diff --git a/examples/twilio-chatbot/requirements.txt b/examples/twilio-chatbot/requirements.txt index f0456fcd5..eefaca888 100644 --- a/examples/twilio-chatbot/requirements.txt +++ b/examples/twilio-chatbot/requirements.txt @@ -1,4 +1,4 @@ -pipecat-ai[daily,openai,silero,deepgram] +pipecat-ai[daily,cartesia,openai,silero,deepgram] fastapi uvicorn python-dotenv diff --git a/examples/websocket-server/bot.py b/examples/websocket-server/bot.py index 29a99614f..61d285fa8 100644 --- a/examples/websocket-server/bot.py +++ b/examples/websocket-server/bot.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import aiohttp import asyncio import os import sys @@ -33,60 +32,59 @@ async def main(): - async with aiohttp.ClientSession() as session: - transport = WebsocketServerTransport( - params=WebsocketServerParams( - audio_out_enabled=True, - add_wav_header=True, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True - ) + transport = WebsocketServerTransport( + params=WebsocketServerParams( + audio_out_enabled=True, + add_wav_header=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True ) - - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") - - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady - ) - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", - }, - ] - - tma_in = LLMUserResponseAggregator(messages) - tma_out = LLMAssistantResponseAggregator(messages) - - pipeline = Pipeline([ - transport.input(), # Websocket input from client - stt, # Speech-To-Text - tma_in, # User responses - llm, # LLM - tts, # Text-To-Speech - transport.output(), # Websocket output to client - tma_out # LLM responses - ]) - - task = PipelineTask(pipeline) - - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - # Kick off the conversation. - messages.append( - {"role": "system", "content": "Please introduce yourself to the user."}) - await task.queue_frames([LLMMessagesFrame(messages)]) - - runner = PipelineRunner() - - await runner.run(task) + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), # Websocket input from client + stt, # Speech-To-Text + tma_in, # User responses + llm, # LLM + tts, # Text-To-Speech + transport.output(), # Websocket output to client + tma_out # LLM responses + ]) + + task = PipelineTask(pipeline) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) if __name__ == "__main__": asyncio.run(main()) diff --git a/examples/websocket-server/frames.proto b/examples/websocket-server/frames.proto index 5c5d81d4d..4c58d2a34 100644 --- a/examples/websocket-server/frames.proto +++ b/examples/websocket-server/frames.proto @@ -24,6 +24,7 @@ message AudioRawFrame { bytes audio = 3; uint32 sample_rate = 4; uint32 num_channels = 5; + optional uint64 pts = 6; } message TranscriptionFrame { diff --git a/examples/websocket-server/requirements.txt b/examples/websocket-server/requirements.txt index 77e5b9e91..0815c6b8a 100644 --- a/examples/websocket-server/requirements.txt +++ b/examples/websocket-server/requirements.txt @@ -1,2 +1,2 @@ python-dotenv -pipecat-ai[openai,silero,websocket,whisper] +pipecat-ai[cartesia,openai,silero,websocket,whisper] diff --git a/src/pipecat/frames/frames.proto b/src/pipecat/frames/frames.proto index 5c5d81d4d..4c58d2a34 100644 --- a/src/pipecat/frames/frames.proto +++ b/src/pipecat/frames/frames.proto @@ -24,6 +24,7 @@ message AudioRawFrame { bytes audio = 3; uint32 sample_rate = 4; uint32 num_channels = 5; + optional uint64 pts = 6; } message TranscriptionFrame { diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 4d207fecd..d45487150 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -41,10 +41,7 @@ class DataFrame(Frame): @dataclass class AudioRawFrame(DataFrame): - """A chunk of audio. Will be played by the transport if the transport's - microphone has been enabled. - - """ + """A chunk of audio.""" audio: bytes sample_rate: int num_channels: int @@ -58,6 +55,31 @@ def __str__(self): return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})" +@dataclass +class InputAudioRawFrame(AudioRawFrame): + """A chunk of audio usually coming from an input transport. + + """ + pass + + +@dataclass +class OutputAudioRawFrame(AudioRawFrame): + """A chunk of audio. Will be played by the output transport if the + transport's microphone has been enabled. + + """ + pass + + +@dataclass +class TTSAudioRawFrame(OutputAudioRawFrame): + """A chunk of output audio generated by a TTS service. + + """ + pass + + @dataclass class ImageRawFrame(DataFrame): """An image. Will be shown by the transport if the transport's camera is @@ -74,20 +96,30 @@ def __str__(self): @dataclass -class URLImageRawFrame(ImageRawFrame): - """An image with an associated URL. Will be shown by the transport if the +class InputImageRawFrame(ImageRawFrame): + pass + + +@dataclass +class OutputImageRawFrame(ImageRawFrame): + pass + + +@dataclass +class UserImageRawFrame(InputImageRawFrame): + """An image associated to a user. Will be shown by the transport if the transport's camera is enabled. """ - url: str | None + user_id: str def __str__(self): pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})" + return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})" @dataclass -class VisionImageRawFrame(ImageRawFrame): +class VisionImageRawFrame(InputImageRawFrame): """An image with an associated text to ask for a description of it. Will be shown by the transport if the transport's camera is enabled. @@ -100,16 +132,16 @@ def __str__(self): @dataclass -class UserImageRawFrame(ImageRawFrame): - """An image associated to a user. Will be shown by the transport if the +class URLImageRawFrame(OutputImageRawFrame): + """An image with an associated URL. Will be shown by the transport if the transport's camera is enabled. """ - user_id: str + url: str | None def __str__(self): pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})" + return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})" @dataclass @@ -420,10 +452,10 @@ class BotSpeakingFrame(ControlFrame): @dataclass class TTSStartedFrame(ControlFrame): """Used to indicate the beginning of a TTS response. Following - AudioRawFrames are part of the TTS response until an TTSStoppedFrame. These - frames can be used for aggregating audio frames in a transport to optimize - the size of frames sent to the session, without needing to control this in - the TTS service. + TTSAudioRawFrames are part of the TTS response until an + TTSStoppedFrame. These frames can be used for aggregating audio frames in a + transport to optimize the size of frames sent to the session, without + needing to control this in the TTS service. """ pass diff --git a/src/pipecat/frames/protobufs/frames_pb2.py b/src/pipecat/frames/protobufs/frames_pb2.py index 5040efc97..d58bc8baa 100644 --- a/src/pipecat/frames/protobufs/frames_pb2.py +++ b/src/pipecat/frames/protobufs/frames_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"c\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"}\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\x12\x10\n\x03pts\x18\x06 \x01(\x04H\x00\x88\x01\x01\x42\x06\n\x04_pts\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -24,9 +24,9 @@ _globals['_TEXTFRAME']._serialized_start=25 _globals['_TEXTFRAME']._serialized_end=76 _globals['_AUDIORAWFRAME']._serialized_start=78 - _globals['_AUDIORAWFRAME']._serialized_end=177 - _globals['_TRANSCRIPTIONFRAME']._serialized_start=179 - _globals['_TRANSCRIPTIONFRAME']._serialized_end=275 - _globals['_FRAME']._serialized_start=278 - _globals['_FRAME']._serialized_end=425 + _globals['_AUDIORAWFRAME']._serialized_end=203 + _globals['_TRANSCRIPTIONFRAME']._serialized_start=205 + _globals['_TRANSCRIPTIONFRAME']._serialized_end=301 + _globals['_FRAME']._serialized_start=304 + _globals['_FRAME']._serialized_end=451 # @@protoc_insertion_point(module_scope) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 379394120..13920c59b 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import List +from typing import List, Type from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext @@ -34,8 +34,8 @@ def __init__( role: str, start_frame, end_frame, - accumulator_frame: TextFrame, - interim_accumulator_frame: TextFrame | None = None, + accumulator_frame: Type[TextFrame], + interim_accumulator_frame: Type[TextFrame] | None = None, handle_interruptions: bool = False ): super().__init__() diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index 0d8b19a36..3d1acf32e 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -13,7 +13,11 @@ from PIL import Image -from pipecat.frames.frames import Frame, VisionImageRawFrame, FunctionCallInProgressFrame, FunctionCallResultFrame +from pipecat.frames.frames import ( + Frame, + VisionImageRawFrame, + FunctionCallInProgressFrame, + FunctionCallResultFrame) from pipecat.processors.frame_processor import FrameProcessor from loguru import logger diff --git a/src/pipecat/processors/aggregators/vision_image_frame.py b/src/pipecat/processors/aggregators/vision_image_frame.py index 0bbb10841..97f6b5ec8 100644 --- a/src/pipecat/processors/aggregators/vision_image_frame.py +++ b/src/pipecat/processors/aggregators/vision_image_frame.py @@ -4,13 +4,19 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from pipecat.frames.frames import Frame, ImageRawFrame, TextFrame, VisionImageRawFrame +from pipecat.frames.frames import ( + Frame, + InputImageRawFrame, + TextFrame, + VisionImageRawFrame +) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class VisionImageFrameAggregator(FrameProcessor): """This aggregator waits for a consecutive TextFrame and an - ImageFrame. After the ImageFrame arrives it will output a VisionImageFrame. + InputImageRawFrame. After the InputImageRawFrame arrives it will output a + VisionImageRawFrame. >>> from pipecat.frames.frames import ImageFrame @@ -34,7 +40,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): self._describe_text = frame.text - elif isinstance(frame, ImageRawFrame): + elif isinstance(frame, InputImageRawFrame): if self._describe_text: frame = VisionImageRawFrame( text=self._describe_text, diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index 8d46105a7..f852dd641 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -9,11 +9,11 @@ from pydantic import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, - ImageRawFrame, + OutputAudioRawFrame, + OutputImageRawFrame, StartFrame, SystemFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -182,9 +182,9 @@ def _decodebin_video(self, pad: Gst.Pad): def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): buffer = appsink.pull_sample().get_buffer() (_, info) = buffer.map(Gst.MapFlags.READ) - frame = AudioRawFrame(audio=info.data, - sample_rate=self._out_params.audio_sample_rate, - num_channels=self._out_params.audio_channels) + frame = OutputAudioRawFrame(audio=info.data, + sample_rate=self._out_params.audio_sample_rate, + num_channels=self._out_params.audio_channels) asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK @@ -192,7 +192,7 @@ def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): def _appsink_video_new_sample(self, appsink: GstApp.AppSink): buffer = appsink.pull_sample().get_buffer() (_, info) = buffer.map(Gst.MapFlags.READ) - frame = ImageRawFrame( + frame = OutputImageRawFrame( image=info.data, size=(self._out_params.video_width, self._out_params.video_height), format="RGB") diff --git a/src/pipecat/serializers/livekit.py b/src/pipecat/serializers/livekit.py index 7a0e8afd1..fec5243f5 100644 --- a/src/pipecat/serializers/livekit.py +++ b/src/pipecat/serializers/livekit.py @@ -7,7 +7,10 @@ import ctypes import pickle -from pipecat.frames.frames import AudioRawFrame, Frame +from pipecat.frames.frames import ( + Frame, + InputAudioRawFrame, + OutputAudioRawFrame) from pipecat.serializers.base_serializer import FrameSerializer from loguru import logger @@ -22,12 +25,8 @@ class LivekitFrameSerializer(FrameSerializer): - SERIALIZABLE_TYPES = { - AudioRawFrame: "audio", - } - def serialize(self, frame: Frame) -> str | bytes | None: - if not isinstance(frame, AudioRawFrame): + if not isinstance(frame, OutputAudioRawFrame): return None audio_frame = AudioFrame( data=frame.audio, @@ -39,7 +38,7 @@ def serialize(self, frame: Frame) -> str | bytes | None: def deserialize(self, data: str | bytes) -> Frame | None: audio_frame: AudioFrame = pickle.loads(data)['frame'] - return AudioRawFrame( + return InputAudioRawFrame( audio=bytes(audio_frame.data), sample_rate=audio_frame.sample_rate, num_channels=audio_frame.num_channels, diff --git a/src/pipecat/serializers/protobuf.py b/src/pipecat/serializers/protobuf.py index 0a6dee0b1..6ae1b0c03 100644 --- a/src/pipecat/serializers/protobuf.py +++ b/src/pipecat/serializers/protobuf.py @@ -8,7 +8,11 @@ import pipecat.frames.protobufs.frames_pb2 as frame_protos -from pipecat.frames.frames import AudioRawFrame, Frame, TextFrame, TranscriptionFrame +from pipecat.frames.frames import ( + AudioRawFrame, + Frame, + TextFrame, + TranscriptionFrame) from pipecat.serializers.base_serializer import FrameSerializer from loguru import logger @@ -29,14 +33,15 @@ def __init__(self): def serialize(self, frame: Frame) -> str | bytes | None: proto_frame = frame_protos.Frame() if type(frame) not in self.SERIALIZABLE_TYPES: - raise ValueError( - f"Frame type {type(frame)} is not serializable. You may need to add it to ProtobufFrameSerializer.SERIALIZABLE_FIELDS.") + logger.warning(f"Frame type {type(frame)} is not serializable") + return None # ignoring linter errors; we check that type(frame) is in this dict above proto_optional_name = self.SERIALIZABLE_TYPES[type(frame)] # type: ignore for field in dataclasses.fields(frame): # type: ignore - setattr(getattr(proto_frame, proto_optional_name), field.name, - getattr(frame, field.name)) + value = getattr(frame, field.name) + if value: + setattr(getattr(proto_frame, proto_optional_name), field.name, value) result = proto_frame.SerializeToString() return result @@ -48,8 +53,8 @@ def deserialize(self, data: str | bytes) -> Frame | None: >>> serializer = ProtobufFrameSerializer() >>> serializer.deserialize( - ... serializer.serialize(AudioFrame(data=b'1234567890'))) - AudioFrame(data=b'1234567890') + ... serializer.serialize(OutputAudioFrame(data=b'1234567890'))) + InputAudioFrame(data=b'1234567890') >>> serializer.deserialize( ... serializer.serialize(TextFrame(text='hello world'))) @@ -75,10 +80,13 @@ def deserialize(self, data: str | bytes) -> Frame | None: # Remove special fields if needed id = getattr(args, "id") name = getattr(args, "name") + pts = getattr(args, "pts") if not id: del args_dict["id"] if not name: del args_dict["name"] + if not pts: + del args_dict["pts"] # Create the instance instance = class_name(**args_dict) @@ -88,5 +96,7 @@ def deserialize(self, data: str | bytes) -> Frame | None: setattr(instance, "id", getattr(args, "id")) if name: setattr(instance, "name", getattr(args, "name")) + if pts: + setattr(instance, "pts", getattr(args, "pts")) return instance diff --git a/src/pipecat/serializers/twilio.py b/src/pipecat/serializers/twilio.py index 583234ae4..ed2905a40 100644 --- a/src/pipecat/serializers/twilio.py +++ b/src/pipecat/serializers/twilio.py @@ -9,7 +9,10 @@ from pydantic import BaseModel -from pipecat.frames.frames import AudioRawFrame, Frame, StartInterruptionFrame +from pipecat.frames.frames import ( + AudioRawFrame, + Frame, + StartInterruptionFrame) from pipecat.serializers.base_serializer import FrameSerializer from pipecat.utils.audio import ulaw_to_pcm, pcm_to_ulaw @@ -19,10 +22,6 @@ class InputParams(BaseModel): twilio_sample_rate: int = 8000 sample_rate: int = 16000 - SERIALIZABLE_TYPES = { - AudioRawFrame: "audio", - } - def __init__(self, stream_sid: str, params: InputParams = InputParams()): self._stream_sid = stream_sid self._params = params diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index d91782e42..c00064e5d 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -22,6 +22,7 @@ STTModelUpdateFrame, StartFrame, StartInterruptionFrame, + TTSAudioRawFrame, TTSLanguageUpdateFrame, TTSModelUpdateFrame, TTSSpeakFrame, @@ -277,7 +278,7 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect if self._push_stop_frames and ( isinstance(frame, StartInterruptionFrame) or isinstance(frame, TTSStartedFrame) or - isinstance(frame, AudioRawFrame) or + isinstance(frame, TTSAudioRawFrame) or isinstance(frame, TTSStoppedFrame)): await self._stop_frame_queue.put(frame) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index 90674fcc4..2c01fff3c 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -12,12 +12,12 @@ from typing import AsyncGenerator from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame, @@ -115,7 +115,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.stop_ttfb_metrics() await self.push_frame(TTSStartedFrame()) # Azure always sends a 44-byte header. Strip it off. - yield AudioRawFrame(audio=result.audio_data[44:], sample_rate=self._sample_rate, num_channels=1) + yield TTSAudioRawFrame(audio=result.audio_data[44:], sample_rate=self._sample_rate, num_channels=1) await self.push_frame(TTSStoppedFrame()) elif result.reason == ResultReason.Canceled: cancellation_details = result.cancellation_details diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 078926235..ac0a1b85b 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -15,10 +15,10 @@ CancelFrame, ErrorFrame, Frame, - AudioRawFrame, StartInterruptionFrame, StartFrame, EndFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, LLMFullResponseEndFrame @@ -201,7 +201,7 @@ async def _receive_task_handler(self): elif msg["type"] == "chunk": await self.stop_ttfb_metrics() self.start_word_timestamps() - frame = AudioRawFrame( + frame = TTSAudioRawFrame( audio=base64.b64decode(msg["data"]), sample_rate=self._output_format["sample_rate"], num_channels=1 @@ -326,7 +326,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.stop_ttfb_metrics() - frame = AudioRawFrame( + frame = TTSAudioRawFrame( audio=output["audio"], sample_rate=self._output_format["sample_rate"], num_channels=1 diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 708c3c511..5ce17d6ee 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -9,13 +9,13 @@ from typing import AsyncGenerator from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame) @@ -101,7 +101,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.push_frame(TTSStartedFrame()) async for data in r.content: await self.stop_ttfb_metrics() - frame = AudioRawFrame(audio=data, sample_rate=self._sample_rate, num_channels=1) + frame = TTSAudioRawFrame( + audio=data, sample_rate=self._sample_rate, num_channels=1) yield frame await self.push_frame(TTSStoppedFrame()) except Exception as e: diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 081a6bf5d..f78eba266 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -12,12 +12,12 @@ from pydantic import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame) from pipecat.processors.frame_processor import FrameDirection @@ -209,7 +209,7 @@ async def _receive_task_handler(self): self.start_word_timestamps() audio = base64.b64decode(msg["audio"]) - frame = AudioRawFrame(audio, self.sample_rate, 1) + frame = TTSAudioRawFrame(audio, self.sample_rate, 1) await self.push_frame(frame) if msg.get("alignment"): diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 60f0cb7df..9285f1583 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -10,13 +10,13 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, StartInterruptionFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, ) @@ -126,7 +126,7 @@ async def _receive_task_handler(self): await self.push_error(ErrorFrame(f'{self} error: {msg["error"]}')) elif "audio" in msg: await self.stop_ttfb_metrics() - frame = AudioRawFrame( + frame = TTSAudioRawFrame( audio=msg["audio"], sample_rate=self._output_format["sample_rate"], num_channels=1 diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 7483e2eb5..8a1355916 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -17,13 +17,13 @@ from PIL import Image from pipecat.frames.frames import ( - AudioRawFrame, ErrorFrame, Frame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, LLMMessagesFrame, LLMModelUpdateFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, TextFrame, @@ -365,7 +365,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async for chunk in r.iter_bytes(8192): if len(chunk) > 0: await self.stop_ttfb_metrics() - frame = AudioRawFrame(chunk, self.sample_rate, 1) + frame = TTSAudioRawFrame(chunk, self.sample_rate, 1) yield frame await self.push_frame(TTSStoppedFrame()) except BadRequestError as e: diff --git a/src/pipecat/services/playht.py b/src/pipecat/services/playht.py index c3200fee9..ae8606e91 100644 --- a/src/pipecat/services/playht.py +++ b/src/pipecat/services/playht.py @@ -9,7 +9,11 @@ from typing import AsyncGenerator -from pipecat.frames.frames import AudioRawFrame, Frame, TTSStartedFrame, TTSStoppedFrame +from pipecat.frames.frames import ( + Frame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame) from pipecat.services.ai_services import TTSService from loguru import logger @@ -91,7 +95,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: else: if len(chunk): await self.stop_ttfb_metrics() - frame = AudioRawFrame(chunk, 16000, 1) + frame = TTSAudioRawFrame(chunk, 16000, 1) yield frame await self.push_frame(TTSStoppedFrame()) except Exception as e: diff --git a/src/pipecat/services/together.py b/src/pipecat/services/together.py index 49759cb01..dfd4c2966 100644 --- a/src/pipecat/services/together.py +++ b/src/pipecat/services/together.py @@ -4,23 +4,19 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import base64 import json -import io -import copy -from typing import List, Optional -from dataclasses import dataclass -from asyncio import CancelledError import re import uuid +from typing import List +from dataclasses import dataclass +from asyncio import CancelledError + from pipecat.frames.frames import ( Frame, LLMModelUpdateFrame, TextFrame, - VisionImageRawFrame, UserImageRequestFrame, - UserImageRawFrame, LLMMessagesFrame, LLMFullResponseStartFrame, LLMFullResponseEndFrame, diff --git a/src/pipecat/services/xtts.py b/src/pipecat/services/xtts.py index 38f0f9a64..69b754f55 100644 --- a/src/pipecat/services/xtts.py +++ b/src/pipecat/services/xtts.py @@ -9,10 +9,10 @@ from typing import Any, AsyncGenerator, Dict from pipecat.frames.frames import ( - AudioRawFrame, ErrorFrame, Frame, StartFrame, + TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame) from pipecat.services.ai_services import TTSService @@ -128,7 +128,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: # Convert the numpy array back to bytes resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes() # Create the frame with the resampled audio - frame = AudioRawFrame(resampled_audio_bytes, 16000, 1) + frame = TTSAudioRawFrame(resampled_audio_bytes, 16000, 1) yield frame # Process any remaining data in the buffer @@ -136,7 +136,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: audio_np = np.frombuffer(buffer, dtype=np.int16) resampled_audio = resampy.resample(audio_np, 24000, 16000) resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes() - frame = AudioRawFrame(resampled_audio_bytes, 16000, 1) + frame = TTSAudioRawFrame(resampled_audio_bytes, 16000, 1) yield frame await self.push_frame(TTSStoppedFrame()) diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index de1ec8884..4e398e779 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -10,9 +10,9 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import ( - AudioRawFrame, BotInterruptionFrame, CancelFrame, + InputAudioRawFrame, StartFrame, EndFrame, Frame, @@ -59,7 +59,7 @@ async def cancel(self, frame: CancelFrame): def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer - async def push_audio_frame(self, frame: AudioRawFrame): + async def push_audio_frame(self, frame: InputAudioRawFrame): if self._params.audio_in_enabled or self._params.vad_enabled: await self._audio_in_queue.put(frame) @@ -151,7 +151,7 @@ async def _audio_task_handler(self): vad_state: VADState = VADState.QUIET while True: try: - frame: AudioRawFrame = await self._audio_in_queue.get() + frame: InputAudioRawFrame = await self._audio_in_queue.get() audio_passthrough = True diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 9b1b9c29e..263bb64f4 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -15,17 +15,17 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import ( - AudioRawFrame, BotSpeakingFrame, BotStartedSpeakingFrame, BotStoppedSpeakingFrame, CancelFrame, MetricsFrame, + OutputAudioRawFrame, + OutputImageRawFrame, SpriteFrame, StartFrame, EndFrame, Frame, - ImageRawFrame, StartInterruptionFrame, StopInterruptionFrame, SystemFrame, @@ -122,7 +122,7 @@ async def send_message(self, frame: TransportMessageFrame): async def send_metrics(self, frame: MetricsFrame): pass - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): pass async def write_raw_audio_frames(self, frames: bytes): @@ -162,9 +162,9 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._sink_queue.put(frame) await self.stop(frame) # Other frames. - elif isinstance(frame, AudioRawFrame): + elif isinstance(frame, OutputAudioRawFrame): await self._handle_audio(frame) - elif isinstance(frame, ImageRawFrame) or isinstance(frame, SpriteFrame): + elif isinstance(frame, OutputImageRawFrame) or isinstance(frame, SpriteFrame): await self._handle_image(frame) elif isinstance(frame, TransportMessageFrame) and frame.urgent: await self.send_message(frame) @@ -191,7 +191,7 @@ async def _handle_interruptions(self, frame: Frame): if self._bot_speaking: await self._bot_stopped_speaking() - async def _handle_audio(self, frame: AudioRawFrame): + async def _handle_audio(self, frame: OutputAudioRawFrame): if not self._params.audio_out_enabled: return @@ -200,12 +200,14 @@ async def _handle_audio(self, frame: AudioRawFrame): else: self._audio_buffer.extend(frame.audio) while len(self._audio_buffer) >= self._audio_chunk_size: - chunk = AudioRawFrame(bytes(self._audio_buffer[:self._audio_chunk_size]), - sample_rate=frame.sample_rate, num_channels=frame.num_channels) + chunk = OutputAudioRawFrame( + bytes(self._audio_buffer[:self._audio_chunk_size]), + sample_rate=frame.sample_rate, num_channels=frame.num_channels + ) await self._sink_queue.put(chunk) self._audio_buffer = self._audio_buffer[self._audio_chunk_size:] - async def _handle_image(self, frame: ImageRawFrame | SpriteFrame): + async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame): if not self._params.camera_out_enabled: return @@ -226,11 +228,11 @@ def _create_sink_tasks(self): self._sink_clock_task = loop.create_task(self._sink_clock_task_handler()) async def _sink_frame_handler(self, frame: Frame): - if isinstance(frame, AudioRawFrame): + if isinstance(frame, OutputAudioRawFrame): await self.write_raw_audio_frames(frame.audio) await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) - elif isinstance(frame, ImageRawFrame): + elif isinstance(frame, OutputImageRawFrame): await self._set_camera_image(frame) elif isinstance(frame, SpriteFrame): await self._set_camera_images(frame.images) @@ -305,10 +307,10 @@ async def _bot_stopped_speaking(self): # Camera out # - async def send_image(self, frame: ImageRawFrame | SpriteFrame): + async def send_image(self, frame: OutputImageRawFrame | SpriteFrame): await self.process_frame(frame, FrameDirection.DOWNSTREAM) - async def _draw_image(self, frame: ImageRawFrame): + async def _draw_image(self, frame: OutputImageRawFrame): desired_size = (self._params.camera_out_width, self._params.camera_out_height) if frame.size != desired_size: @@ -316,14 +318,17 @@ async def _draw_image(self, frame: ImageRawFrame): resized_image = image.resize(desired_size) logger.warning( f"{frame} does not have the expected size {desired_size}, resizing") - frame = ImageRawFrame(resized_image.tobytes(), resized_image.size, resized_image.format) + frame = OutputImageRawFrame( + resized_image.tobytes(), + resized_image.size, + resized_image.format) await self.write_frame_to_camera(frame) - async def _set_camera_image(self, image: ImageRawFrame): + async def _set_camera_image(self, image: OutputImageRawFrame): self._camera_images = itertools.cycle([image]) - async def _set_camera_images(self, images: List[ImageRawFrame]): + async def _set_camera_images(self, images: List[OutputImageRawFrame]): self._camera_images = itertools.cycle(images) async def _camera_out_task_handler(self): @@ -375,7 +380,7 @@ async def _camera_out_is_live_handler(self): # Audio out # - async def send_audio(self, frame: AudioRawFrame): + async def send_audio(self, frame: OutputAudioRawFrame): await self.process_frame(frame, FrameDirection.DOWNSTREAM) async def _audio_out_task_handler(self): diff --git a/src/pipecat/transports/local/audio.py b/src/pipecat/transports/local/audio.py index cd05550a9..45d18db52 100644 --- a/src/pipecat/transports/local/audio.py +++ b/src/pipecat/transports/local/audio.py @@ -8,7 +8,7 @@ from concurrent.futures import ThreadPoolExecutor -from pipecat.frames.frames import AudioRawFrame, StartFrame +from pipecat.frames.frames import InputAudioRawFrame, StartFrame from pipecat.processors.frame_processor import FrameProcessor from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport @@ -54,9 +54,9 @@ async def cleanup(self): self._in_stream.close() def _audio_in_callback(self, in_data, frame_count, time_info, status): - frame = AudioRawFrame(audio=in_data, - sample_rate=self._params.audio_in_sample_rate, - num_channels=self._params.audio_in_channels) + frame = InputAudioRawFrame(audio=in_data, + sample_rate=self._params.audio_in_sample_rate, + num_channels=self._params.audio_in_channels) asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop()) diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index e7dc04902..75dd30331 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -11,8 +11,7 @@ import numpy as np import tkinter as tk -from pipecat.frames.frames import AudioRawFrame, ImageRawFrame, StartFrame -from pipecat.processors.frame_processor import FrameProcessor +from pipecat.frames.frames import InputAudioRawFrame, OutputImageRawFrame, StartFrame from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams @@ -64,9 +63,9 @@ async def cleanup(self): self._in_stream.close() def _audio_in_callback(self, in_data, frame_count, time_info, status): - frame = AudioRawFrame(audio=in_data, - sample_rate=self._params.audio_in_sample_rate, - num_channels=self._params.audio_in_channels) + frame = InputAudioRawFrame(audio=in_data, + sample_rate=self._params.audio_in_sample_rate, + num_channels=self._params.audio_in_channels) asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop()) @@ -108,10 +107,10 @@ async def cleanup(self): async def write_raw_audio_frames(self, frames: bytes): await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames) - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): self.get_event_loop().call_soon(self._write_frame_to_tk, frame) - def _write_frame_to_tk(self, frame: ImageRawFrame): + def _write_frame_to_tk(self, frame: OutputImageRawFrame): width = frame.size[0] height = frame.size[1] data = f"P6 {width} {height} 255 ".encode() + frame.image diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index 7169c73bd..815d7c2ef 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -12,8 +12,16 @@ from typing import Awaitable, Callable from pydantic.main import BaseModel -from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.frames.frames import ( + AudioRawFrame, + CancelFrame, + EndFrame, + Frame, + InputAudioRawFrame, + StartFrame, + StartInterruptionFrame +) +from pipecat.processors.frame_processor import FrameDirection from pipecat.serializers.base_serializer import FrameSerializer from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport @@ -79,7 +87,11 @@ async def _receive_messages(self): continue if isinstance(frame, AudioRawFrame): - await self.push_audio_frame(frame) + await self.push_audio_frame(InputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels) + ) await self._callbacks.on_client_disconnected(self._websocket) diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index c17818898..329ae8994 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -11,8 +11,7 @@ from typing import Awaitable, Callable from pydantic.main import BaseModel -from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, StartFrame -from pipecat.processors.frame_processor import FrameProcessor +from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, InputAudioRawFrame, StartFrame from pipecat.serializers.base_serializer import FrameSerializer from pipecat.serializers.protobuf import ProtobufFrameSerializer from pipecat.transports.base_input import BaseInputTransport @@ -98,7 +97,11 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p continue if isinstance(frame, AudioRawFrame): - await self.queue_audio_frame(frame) + await self.push_audio_frame(InputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels) + ) else: await self.push_frame(frame) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 2a45adf36..d595aaceb 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -22,13 +22,14 @@ from pydantic.main import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, - ImageRawFrame, + InputAudioRawFrame, InterimTranscriptionFrame, MetricsFrame, + OutputAudioRawFrame, + OutputImageRawFrame, SpriteFrame, StartFrame, TranscriptionFrame, @@ -239,7 +240,7 @@ async def send_message(self, frame: TransportMessageFrame): completion=completion_callback(future)) await future - async def read_next_audio_frame(self) -> AudioRawFrame | None: + async def read_next_audio_frame(self) -> InputAudioRawFrame | None: if not self._speaker: return None @@ -252,7 +253,10 @@ async def read_next_audio_frame(self) -> AudioRawFrame | None: audio = await future if len(audio) > 0: - return AudioRawFrame(audio=audio, sample_rate=sample_rate, num_channels=num_channels) + return InputAudioRawFrame( + audio=audio, + sample_rate=sample_rate, + num_channels=num_channels) else: # If we don't read any audio it could be there's no participant # connected. daily-python will return immediately if that's the @@ -268,7 +272,7 @@ async def write_raw_audio_frames(self, frames: bytes): self._mic.write_frames(frames, completion=completion_callback(future)) await future - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): if not self._camera: return None @@ -749,7 +753,7 @@ async def send_metrics(self, frame: MetricsFrame): async def write_raw_audio_frames(self, frames: bytes): await self._client.write_raw_audio_frames(frames) - async def write_frame_to_camera(self, frame: ImageRawFrame): + async def write_frame_to_camera(self, frame: OutputImageRawFrame): await self._client.write_frame_to_camera(frame) @@ -829,11 +833,11 @@ def output(self) -> DailyOutputTransport: def participant_id(self) -> str: return self._client.participant_id - async def send_image(self, frame: ImageRawFrame | SpriteFrame): + async def send_image(self, frame: OutputImageRawFrame | SpriteFrame): if self._output: await self._output.process_frame(frame, FrameDirection.DOWNSTREAM) - async def send_audio(self, frame: AudioRawFrame): + async def send_audio(self, frame: OutputAudioRawFrame): if self._output: await self._output.process_frame(frame, FrameDirection.DOWNSTREAM)