Skip to content

Commit

Permalink
Merge pull request #436 from pipecat-ai/aleix/frameprocessor-single-task
Browse files Browse the repository at this point in the history
introduce synchronous and asynchronous frame processors
  • Loading branch information
aconchillo authored Sep 19, 2024
2 parents 6f3c421 + 607a246 commit 1790fa4
Show file tree
Hide file tree
Showing 33 changed files with 407 additions and 494 deletions.
47 changes: 44 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,39 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- A clock can now be specified to `PipelineTask` (defaults to
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Pipecat has a pipeline-based architecture. The pipeline consists of frame
processors linked to each other. The elements traveling across the pipeline
are called frames.

To have a deterministic behavior the frames traveling through the pipeline
should always be ordered, except system frames which are out-of-band
frames. To achieve that, each frame processor should only output frames from a
single task.

In this version we introduce synchronous and asynchronous frame
processors. The synchronous processors push output frames from the same task
that they receive input frames, and therefore only pushing frames from one
task. Asynchronous frame processors can have internal tasks to perform things
asynchronously (e.g. receiving data from a websocket) but they also have a
single task where they push frames from.

By default, frame processors are synchronous. To change a frame processor to
asynchronous you only need to pass `sync=False` to the base class constructor.

- Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an
optional `pts` field (prensentation timestamp). There's currently just one
clock implementation `SystemClock` and the `pts` field is currently only used
for `TextFrame`s (audio and image frames will be next).

- A clock can now be specified to `PipelineTask` (defaults to
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.

- Added `CartesiaHttpTTSService`. This is a synchronous frame processor
(i.e. given an input text frame it will wait for the whole output before
returning).

- `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new
default is 96kbps.
Expand All @@ -40,6 +63,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- `ParallelTask` has been renamed to `SyncParallelPipeline`. A
`SyncParallelPipeline` is a frame processor that contains a list of different
pipelines to be executed concurrently. The difference between a
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
the `SyncParallelPipeline` will wait for all the internal pipelines to
complete. This is achieved by ensuring all the processors in each of the
internal pipelines are synchronous.

- `StartFrame` is back a system frame so we make sure it's processed immediately
by all processors. `EndFrame` stays a control frame since it needs to be
ordered allowing the frames in the pipeline to be processed.

- Updated `MoondreamService` revision to `2024-08-26`.

- `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation
timestamps to their text output. This allows the output transport to push the
text frames downstream at almost the same time the words are spoken. We say
Expand All @@ -60,6 +97,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Fixed a `BaseOutputTransport` issue that would stop audio and video rendering
tasks (after receiving and `EndFrame`) before the internal queue was emptied,
causing the pipeline to finish prematurely.

- `StartFrame` should be the first frame every processor receives to avoid
situations where things are not initialized (because initialization happens on
`StartFrame`) and other frames come in resulting in undesired behavior.
Expand Down
8 changes: 4 additions & 4 deletions examples/foundational/01-say-one-thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import os
import sys

from pipecat.frames.frames import TextFrame
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport

from runner import configure
Expand All @@ -34,7 +34,7 @@ async def main():
transport = DailyTransport(
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True))

tts = CartesiaTTSService(
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
Expand All @@ -48,7 +48,7 @@ async def main():
@transport.event_handler("on_participant_joined")
async def on_new_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
await task.queue_frames([TextFrame(f"Hello there, {participant_name}!"), EndFrame()])

await runner.run(task)

Expand Down
8 changes: 4 additions & 4 deletions examples/foundational/02-llm-say-one-thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import os
import sys

from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

Expand All @@ -38,7 +38,7 @@ async def main():
"Say One Thing From an LLM",
DailyParams(audio_out_enabled=True))

tts = CartesiaTTSService(
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
Expand All @@ -59,7 +59,7 @@ async def main():

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frame(LLMMessagesFrame(messages))
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])

await runner.run(task)

Expand Down
34 changes: 15 additions & 19 deletions examples/foundational/05-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,18 @@
from pipecat.frames.frames import (
AppFrame,
Frame,
ImageRawFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.parallel_task import ParallelTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.services.daily import DailyParams, DailyTransport

Expand Down Expand Up @@ -88,9 +85,9 @@ async def main():
)
)

tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

llm = OpenAILLMService(
Expand All @@ -105,24 +102,23 @@ async def main():
key=os.getenv("FAL_KEY"),
)

gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame),
start_open=False
)

sentence_aggregator = SentenceAggregator()
month_prepender = MonthPrepender()
llm_full_response_aggregator = LLMFullResponseAggregator()

# With `SyncParallelPipeline` we synchronize audio and images by pushing
# them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 I3 A3). To do
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# Note that `SyncParallelPipeline` requires all processors in it to be
# synchronous (which is the default for most processors).
pipeline = Pipeline([
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
ParallelTask( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[llm_full_response_aggregator, imagegen] # Aggregate full LLM response
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[imagegen] # Generate image
),
gated_aggregator, # Queues everything until an image is available
transport.output() # Transport output
])

Expand Down
38 changes: 25 additions & 13 deletions examples/foundational/05a-local-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
import tkinter as tk

from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport

from loguru import logger

Expand Down Expand Up @@ -60,6 +60,7 @@ class AudioGrabber(FrameProcessor):
def __init__(self):
super().__init__()
self.audio = bytearray()
self.frame = None

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
Expand All @@ -84,9 +85,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")

tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
Expand All @@ -95,20 +97,30 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
aiohttp_session=session,
key=os.getenv("FAL_KEY"))

aggregator = LLMFullResponseAggregator()
sentence_aggregator = SentenceAggregator()

description = ImageDescription()

audio_grabber = AudioGrabber()

image_grabber = ImageGrabber()

# With `SyncParallelPipeline` we synchronize audio and images by
# pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2
# I3 A3). To do that, each pipeline runs concurrently and
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires all processors in it to
# be synchronous (which is the default for most processors).
pipeline = Pipeline([
llm,
aggregator,
description,
ParallelPipeline([tts, audio_grabber],
[imagegen, image_grabber])
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
description, # Store sentence
SyncParallelPipeline(
[tts, audio_grabber], # Generate and store audio for the given sentence
[imagegen, image_grabber] # Generate and storeimage for the given sentence
)
])

task = PipelineTask(pipeline)
Expand Down
5 changes: 0 additions & 5 deletions examples/foundational/06-listen-and-respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ async def main():
])

task = PipelineTask(pipeline)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=False,
))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
8 changes: 4 additions & 4 deletions examples/foundational/06a-image-sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
LLMUserResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

Expand Down Expand Up @@ -78,9 +78,9 @@ async def main():
)
)

tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

llm = OpenAILLMService(
Expand Down
8 changes: 4 additions & 4 deletions examples/foundational/11-sound-effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
Expand Down Expand Up @@ -103,9 +103,9 @@ async def main():
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")

tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="ErXwobaYiN019PkySvjV",
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

messages = [
Expand Down
2 changes: 1 addition & 1 deletion examples/foundational/17-detect-user-idle.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def main():
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."})
await user_idle.queue_frame(LLMMessagesFrame(messages))
await user_idle.push_frame(LLMMessagesFrame(messages))

user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.34.0" ]
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
cartesia = [ "websockets~=12.0" ]
cartesia = [ "cartesia~=1.0.13", "websockets~=12.0" ]
daily = [ "daily-python~=0.10.1" ]
deepgram = [ "deepgram-sdk~=3.5.0" ]
elevenlabs = [ "websockets~=12.0" ]
Expand Down
Loading

0 comments on commit 1790fa4

Please sign in to comment.