Skip to content

Commit

Permalink
introduce input/output audio and image frames
Browse files Browse the repository at this point in the history
We now distinguish between input and output audio and image frames. We introduce
`InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame` and
`OutputImageRawFrame` (and other subclasses of those). The input frames usually
come from an input transport and are meant to be processed inside the pipeline
to generate new frames. However, the input frames will not be sent through an
output transport. The output frames can also be processed by any frame processor
in the pipeline and they are allowed to be sent by the output transport.
  • Loading branch information
aconchillo committed Sep 20, 2024
1 parent ed409d0 commit 7e39d9a
Show file tree
Hide file tree
Showing 48 changed files with 425 additions and 275 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- We now distinguish between input and output audio and image frames. We
introduce `InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame`
and `OutputImageRawFrame` (and other subclasses of those). The input frames
usually come from an input transport and are meant to be processed inside the
pipeline to generate new frames. However, the input frames will not be sent
through an output transport. The output frames can also be processed by any
frame processor in the pipeline and they are allowed to be sent by the output
transport.

- `ParallelTask` has been renamed to `SyncParallelPipeline`. A
`SyncParallelPipeline` is a frame processor that contains a list of different
pipelines to be executed concurrently. The difference between a
Expand Down
2 changes: 1 addition & 1 deletion examples/dialin-chatbot/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pipecat-ai[daily,openai,silero]
pipecat-ai[daily,elevenlabs,openai,silero]
fastapi
uvicorn
python-dotenv
Expand Down
12 changes: 9 additions & 3 deletions examples/foundational/05a-local-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@

import tkinter as tk

from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame
from pipecat.frames.frames import (
Frame,
OutputAudioRawFrame,
TTSAudioRawFrame,
URLImageRawFrame,
LLMMessagesFrame,
TextFrame)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
Expand Down Expand Up @@ -65,9 +71,9 @@ def __init__(self):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, AudioRawFrame):
if isinstance(frame, TTSAudioRawFrame):
self.audio.extend(frame.audio)
self.frame = AudioRawFrame(
self.frame = OutputAudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels)

class ImageGrabber(FrameProcessor):
Expand Down
13 changes: 10 additions & 3 deletions examples/foundational/06a-image-sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from PIL import Image

from pipecat.frames.frames import ImageRawFrame, Frame, SystemFrame, TextFrame
from pipecat.frames.frames import Frame, OutputImageRawFrame, SystemFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
Expand Down Expand Up @@ -52,9 +52,16 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM:
await self.push_frame(ImageRawFrame(image=self._speaking_image_bytes, size=(1024, 1024), format=self._speaking_image_format))
await self.push_frame(OutputImageRawFrame(
image=self._speaking_image_bytes,
size=(1024, 1024),
format=self._speaking_image_format)
)
await self.push_frame(frame)
await self.push_frame(ImageRawFrame(image=self._waiting_image_bytes, size=(1024, 1024), format=self._waiting_image_format))
await self.push_frame(OutputImageRawFrame(
image=self._waiting_image_bytes,
size=(1024, 1024),
format=self._waiting_image_format))
else:
await self.push_frame(frame)

Expand Down
25 changes: 24 additions & 1 deletion examples/foundational/09-mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import asyncio
import sys

from pipecat.frames.frames import Frame, InputAudioRawFrame, InputImageRawFrame, OutputAudioRawFrame, OutputImageRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyTransport, DailyParams

from runner import configure
Expand All @@ -24,6 +26,27 @@
logger.add(sys.stderr, level="DEBUG")


class MirrorProcessor(FrameProcessor):

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, InputAudioRawFrame):
await self.push_frame(OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(OutputImageRawFrame(
image=frame.image,
size=frame.size,
format=frame.format)
)
else:
await self.push_frame(frame, direction)


async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
Expand All @@ -44,7 +67,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])

pipeline = Pipeline([transport.input(), transport.output()])
pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()])

runner = PipelineRunner()

Expand Down
23 changes: 22 additions & 1 deletion examples/foundational/09a-local-mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import tkinter as tk

from pipecat.frames.frames import Frame, InputAudioRawFrame, InputImageRawFrame, OutputAudioRawFrame, OutputImageRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
Expand All @@ -27,6 +29,25 @@
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

class MirrorProcessor(FrameProcessor):

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, InputAudioRawFrame):
await self.push_frame(OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(OutputImageRawFrame(
image=frame.image,
size=frame.size,
format=frame.format)
)
else:
await self.push_frame(frame, direction)

async def main():
async with aiohttp.ClientSession() as session:
Expand All @@ -52,7 +73,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])

pipeline = Pipeline([daily_transport.input(), tk_transport.output()])
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])

task = PipelineTask(pipeline)

Expand Down
6 changes: 3 additions & 3 deletions examples/foundational/11-sound-effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

from pipecat.frames.frames import (
Frame,
AudioRawFrame,
LLMFullResponseEndFrame,
LLMMessagesFrame,
OutputAudioRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
Expand Down Expand Up @@ -53,8 +53,8 @@
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = AudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())
sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())


class OutboundSoundEffectWrapper(FrameProcessor):
Expand Down
11 changes: 8 additions & 3 deletions examples/moondream-chatbot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

from pipecat.frames.frames import (
ImageRawFrame,
OutputImageRawFrame,
SpriteFrame,
Frame,
LLMMessagesFrame,
AudioRawFrame,
TTSAudioRawFrame,
TTSStoppedFrame,
TextFrame,
UserImageRawFrame,
Expand Down Expand Up @@ -59,7 +60,11 @@
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
sprites.append(OutputImageRawFrame(
image=img.tobytes(),
size=img.size,
format=img.format)
)

flipped = sprites[::-1]
sprites.extend(flipped)
Expand All @@ -82,7 +87,7 @@ def __init__(self):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, AudioRawFrame):
if isinstance(frame, TTSAudioRawFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
Expand Down
2 changes: 1 addition & 1 deletion examples/moondream-chatbot/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,moondream,openai,silero]
pipecat-ai[daily,cartesia,moondream,openai,silero]
7 changes: 4 additions & 3 deletions examples/patient-intake/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
import wave

from pipecat.frames.frames import AudioRawFrame
from pipecat.frames.frames import OutputAudioRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -49,8 +49,9 @@
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = AudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())
sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(),
audio_file.getnchannels())


class IntakeProcessor:
Expand Down
2 changes: 1 addition & 1 deletion examples/patient-intake/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero]
pipecat-ai[daily,cartesia,openai,silero]
12 changes: 8 additions & 4 deletions examples/simple-chatbot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
AudioRawFrame,
ImageRawFrame,
OutputImageRawFrame,
SpriteFrame,
Frame,
LLMMessagesFrame,
TTSAudioRawFrame,
TTSStoppedFrame
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
Expand Down Expand Up @@ -49,7 +49,11 @@
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
sprites.append(OutputImageRawFrame(
image=img.tobytes(),
size=img.size,
format=img.format)
)

flipped = sprites[::-1]
sprites.extend(flipped)
Expand All @@ -72,7 +76,7 @@ def __init__(self):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, AudioRawFrame):
if isinstance(frame, TTSAudioRawFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
Expand Down
2 changes: 1 addition & 1 deletion examples/simple-chatbot/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero]
pipecat-ai[daily,elevenlabs,openai,silero]
2 changes: 1 addition & 1 deletion examples/storytelling-chatbot/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ async_timeout
fastapi
uvicorn
python-dotenv
pipecat-ai[daily,openai,fal]
pipecat-ai[daily,elevenlabs,openai,fal]
11 changes: 6 additions & 5 deletions examples/storytelling-chatbot/src/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import wave
from PIL import Image

from pipecat.frames.frames import AudioRawFrame, ImageRawFrame
from pipecat.frames.frames import OutputAudioRawFrame, OutputImageRawFrame

script_dir = os.path.dirname(__file__)

Expand All @@ -16,7 +16,8 @@ def load_images(image_files):
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with Image.open(full_path) as img:
images[filename] = ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)
images[filename] = OutputImageRawFrame(
image=img.tobytes(), size=img.size, format=img.format)
return images


Expand All @@ -30,8 +31,8 @@ def load_sounds(sound_files):
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[filename] = AudioRawFrame(audio=audio_file.readframes(-1),
sample_rate=audio_file.getframerate(),
num_channels=audio_file.getnchannels())
sounds[filename] = OutputAudioRawFrame(audio=audio_file.readframes(-1),
sample_rate=audio_file.getframerate(),
num_channels=audio_file.getnchannels())

return sounds
2 changes: 1 addition & 1 deletion examples/twilio-chatbot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ This project is a FastAPI-based chatbot that integrates with Twilio to handle We
2. **Update the Twilio Webhook**:
Copy the ngrok URL and update your Twilio phone number webhook URL to `http://<ngrok_url>/start_call`.

3. **Update the streams.xml**:
3. **Update streams.xml**:
Copy the ngrok URL and update templates/streams.xml with `wss://<ngrok_url>/ws`.

## Running the Application
Expand Down
Loading

0 comments on commit 7e39d9a

Please sign in to comment.