Skip to content

Commit

Permalink
pipeline: renamed ParallelTask to SyncParallelPipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed Sep 19, 2024
1 parent bccc7bf commit 3e9ce11
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 35 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- `ParallelTask` has been renamed to `SyncParallelPipeline`. A
`SyncParallelPipeline` is a frame processor that contains a list of different
pipelines to be executed concurrently. The difference between a
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
the `SyncParallelPipeline` will wait for all the internal pipelines to
complete. This is achieved by ensuring all the processors in each of the
internal pipelines are synchronous.

- `StartFrame` is back a system frame so we make sure it's processed immediately
by all processors. `EndFrame` stays a control frame since it needs to be
ordered allowing the frames in the pipeline to be processed.
Expand Down
34 changes: 15 additions & 19 deletions examples/foundational/05-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,18 @@
from pipecat.frames.frames import (
AppFrame,
Frame,
ImageRawFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.parallel_task import ParallelTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.services.daily import DailyParams, DailyTransport

Expand Down Expand Up @@ -88,9 +85,9 @@ async def main():
)
)

tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

llm = OpenAILLMService(
Expand All @@ -105,24 +102,23 @@ async def main():
key=os.getenv("FAL_KEY"),
)

gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame),
start_open=False
)

sentence_aggregator = SentenceAggregator()
month_prepender = MonthPrepender()
llm_full_response_aggregator = LLMFullResponseAggregator()

# With `SyncParallelPipeline` we synchronize audio and images by pushing
# them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 I3 A3). To do
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# Note that `SyncParallelPipeline` requires all processors in it to be
# synchronous (which is the default for most processors).
pipeline = Pipeline([
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
ParallelTask( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[llm_full_response_aggregator, imagegen] # Aggregate full LLM response
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[imagegen] # Generate image
),
gated_aggregator, # Queues everything until an image is available
transport.output() # Transport output
])

Expand Down
38 changes: 25 additions & 13 deletions examples/foundational/05a-local-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
import tkinter as tk

from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport

from loguru import logger

Expand Down Expand Up @@ -60,6 +60,7 @@ class AudioGrabber(FrameProcessor):
def __init__(self):
super().__init__()
self.audio = bytearray()
self.frame = None

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
Expand All @@ -84,9 +85,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")

tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
Expand All @@ -95,20 +97,30 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
aiohttp_session=session,
key=os.getenv("FAL_KEY"))

aggregator = LLMFullResponseAggregator()
sentence_aggregator = SentenceAggregator()

description = ImageDescription()

audio_grabber = AudioGrabber()

image_grabber = ImageGrabber()

# With `SyncParallelPipeline` we synchronize audio and images by
# pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2
# I3 A3). To do that, each pipeline runs concurrently and
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires all processors in it to
# be synchronous (which is the default for most processors).
pipeline = Pipeline([
llm,
aggregator,
description,
ParallelPipeline([tts, audio_grabber],
[imagegen, image_grabber])
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
description, # Store sentence
SyncParallelPipeline(
[tts, audio_grabber], # Generate and store audio for the given sentence
[imagegen, image_grabber] # Generate and storeimage for the given sentence
)
])

task = PipelineTask(pipeline)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await self._down_queue.put(frame)


class ParallelTask(BasePipeline):
class SyncParallelPipeline(BasePipeline):
def __init__(self, *args):
super().__init__()

if len(args) == 0:
raise Exception(f"ParallelTask needs at least one argument")
raise Exception(f"SyncParallelPipeline needs at least one argument")

self._sinks = []
self._sources = []
Expand All @@ -66,7 +66,7 @@ def __init__(self, *args):
logger.debug(f"Creating {self} pipelines")
for processors in args:
if not isinstance(processors, list):
raise TypeError(f"ParallelTask argument {processors} is not a list")
raise TypeError(f"SyncParallelPipeline argument {processors} is not a list")

# We add a source at the beginning of the pipeline and a sink at the end.
source = Source(self._up_queue)
Expand Down

0 comments on commit 3e9ce11

Please sign in to comment.