diff --git a/CHANGELOG.md b/CHANGELOG.md index de4a16a5d..84ee390e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `ParallelTask` has been renamed to `SyncParallelPipeline`. A + `SyncParallelPipeline` is a frame processor that contains a list of different + pipelines to be executed concurrently. The difference between a + `SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame, + the `SyncParallelPipeline` will wait for all the internal pipelines to + complete. This is achieved by ensuring all the processors in each of the + internal pipelines are synchronous. + - `StartFrame` is back a system frame so we make sure it's processed immediately by all processors. `EndFrame` stays a control frame since it needs to be ordered allowing the frames in the pipeline to be processed. diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index ca3ff9557..07e54ab8a 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -14,21 +14,18 @@ from pipecat.frames.frames import ( AppFrame, Frame, - ImageRawFrame, LLMFullResponseStartFrame, LLMMessagesFrame, TextFrame ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline from pipecat.pipeline.task import PipelineTask -from pipecat.pipeline.parallel_task import ParallelTask from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.processors.aggregators.gated import GatedAggregator -from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator from pipecat.processors.aggregators.sentence import SentenceAggregator +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.fal import FalImageGenService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -88,9 +85,9 @@ async def main(): ) ) - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) llm = OpenAILLMService( @@ -105,24 +102,23 @@ async def main(): key=os.getenv("FAL_KEY"), ) - gated_aggregator = GatedAggregator( - gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame), - gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame), - start_open=False - ) - sentence_aggregator = SentenceAggregator() month_prepender = MonthPrepender() - llm_full_response_aggregator = LLMFullResponseAggregator() + # With `SyncParallelPipeline` we synchronize audio and images by pushing + # them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 I3 A3). To do + # that, each pipeline runs concurrently and `SyncParallelPipeline` will + # wait for the input frame to be processed. + # + # Note that `SyncParallelPipeline` requires all processors in it to be + # synchronous (which is the default for most processors). pipeline = Pipeline([ llm, # LLM sentence_aggregator, # Aggregates LLM output into full sentences - ParallelTask( # Run pipelines in parallel aggregating the result - [month_prepender, tts], # Create "Month: sentence" and output audio - [llm_full_response_aggregator, imagegen] # Aggregate full LLM response + SyncParallelPipeline( # Run pipelines in parallel aggregating the result + [month_prepender, tts], # Create "Month: sentence" and output audio + [imagegen] # Generate image ), - gated_aggregator, # Queues everything until an image is available transport.output() # Transport output ]) diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index 63bcf1e9d..66a6e7f57 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -12,17 +12,17 @@ import tkinter as tk from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame -from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline from pipecat.pipeline.task import PipelineTask -from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator +from pipecat.processors.aggregators.sentence import SentenceAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.fal import FalImageGenService from pipecat.transports.base_transport import TransportParams -from pipecat.transports.local.tk import TkLocalTransport +from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport from loguru import logger @@ -60,6 +60,7 @@ class AudioGrabber(FrameProcessor): def __init__(self): super().__init__() self.audio = bytearray() + self.frame = None async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -84,9 +85,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID")) + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) imagegen = FalImageGenService( params=FalImageGenService.InputParams( @@ -95,7 +97,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): aiohttp_session=session, key=os.getenv("FAL_KEY")) - aggregator = LLMFullResponseAggregator() + sentence_aggregator = SentenceAggregator() description = ImageDescription() @@ -103,12 +105,22 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): image_grabber = ImageGrabber() + # With `SyncParallelPipeline` we synchronize audio and images by + # pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 + # I3 A3). To do that, each pipeline runs concurrently and + # `SyncParallelPipeline` will wait for the input frame to be + # processed. + # + # Note that `SyncParallelPipeline` requires all processors in it to + # be synchronous (which is the default for most processors). pipeline = Pipeline([ - llm, - aggregator, - description, - ParallelPipeline([tts, audio_grabber], - [imagegen, image_grabber]) + llm, # LLM + sentence_aggregator, # Aggregates LLM output into full sentences + description, # Store sentence + SyncParallelPipeline( + [tts, audio_grabber], # Generate and store audio for the given sentence + [imagegen, image_grabber] # Generate and storeimage for the given sentence + ) ]) task = PipelineTask(pipeline) diff --git a/src/pipecat/pipeline/parallel_task.py b/src/pipecat/pipeline/sync_parallel_pipeline.py similarity index 94% rename from src/pipecat/pipeline/parallel_task.py rename to src/pipecat/pipeline/sync_parallel_pipeline.py index 724183b3b..d922134f4 100644 --- a/src/pipecat/pipeline/parallel_task.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -49,12 +49,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._down_queue.put(frame) -class ParallelTask(BasePipeline): +class SyncParallelPipeline(BasePipeline): def __init__(self, *args): super().__init__() if len(args) == 0: - raise Exception(f"ParallelTask needs at least one argument") + raise Exception(f"SyncParallelPipeline needs at least one argument") self._sinks = [] self._sources = [] @@ -66,7 +66,7 @@ def __init__(self, *args): logger.debug(f"Creating {self} pipelines") for processors in args: if not isinstance(processors, list): - raise TypeError(f"ParallelTask argument {processors} is not a list") + raise TypeError(f"SyncParallelPipeline argument {processors} is not a list") # We add a source at the beginning of the pipeline and a sink at the end. source = Source(self._up_queue)