Skip to content

Commit

Permalink
Merge pull request #524 from pipecat-ai/aleix/everything-is-async
Browse files Browse the repository at this point in the history
all frame processors are asynchrnous
  • Loading branch information
aconchillo authored Sep 30, 2024
2 parents 9a63df1 + 4d1e370 commit 392f210
Show file tree
Hide file tree
Showing 18 changed files with 207 additions and 154 deletions.
22 changes: 8 additions & 14 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,10 @@ async def on_connected(processor):
frames. To achieve that, each frame processor should only output frames from a
single task.

In this version we introduce synchronous and asynchronous frame
processors. The synchronous processors push output frames from the same task
that they receive input frames, and therefore only pushing frames from one
task. Asynchronous frame processors can have internal tasks to perform things
asynchronously (e.g. receiving data from a websocket) but they also have a
single task where they push frames from.

By default, frame processors are synchronous. To change a frame processor to
asynchronous you only need to pass `sync=False` to the base class constructor.
In this version all the frame processors have their own task to push
frames. That is, when `push_frame()` is called the given frame will be put
into an internal queue (with the exception of system frames) and a frame
processor task will push it out.

- Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an
Expand All @@ -68,9 +63,7 @@ async def on_connected(processor):
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.

- Added `CartesiaHttpTTSService`. This is a synchronous frame processor
(i.e. given an input text frame it will wait for the whole output before
returning).
- Added `CartesiaHttpTTSService`.

- `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new
Expand Down Expand Up @@ -110,8 +103,9 @@ async def on_connected(processor):
pipelines to be executed concurrently. The difference between a
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
the `SyncParallelPipeline` will wait for all the internal pipelines to
complete. This is achieved by ensuring all the processors in each of the
internal pipelines are synchronous.
complete. This is achieved by making sure the last processor in each of the
pipelines is synchronous (e.g. an HTTP-based service that waits for the
response).

- `StartFrame` is back a system frame so we make sure it's processed immediately
by all processors. `EndFrame` stays a control frame since it needs to be
Expand Down
10 changes: 6 additions & 4 deletions examples/foundational/05-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ async def main():
),
)

llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
Expand All @@ -107,8 +107,10 @@ async def main():
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# Note that `SyncParallelPipeline` requires all processors in it to be
# synchronous (which is the default for most processors).
# Note that `SyncParallelPipeline` requires the last processor in each
# of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
Expand Down
8 changes: 6 additions & 2 deletions examples/foundational/05a-local-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self.frame = OutputAudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels
)
await self.push_frame(frame, direction)

class ImageGrabber(FrameProcessor):
def __init__(self):
Expand All @@ -93,6 +94,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):

if isinstance(frame, URLImageRawFrame):
self.frame = frame
await self.push_frame(frame, direction)

llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

Expand Down Expand Up @@ -121,8 +123,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires all processors in it to
# be synchronous (which is the default for most processors).
# Note that `SyncParallelPipeline` requires the last processor in
# each of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
Expand Down
66 changes: 56 additions & 10 deletions src/pipecat/pipeline/sync_parallel_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@

import asyncio

from dataclasses import dataclass
from itertools import chain
from typing import List

from pipecat.frames.frames import ControlFrame, EndFrame, Frame, SystemFrame
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import Frame

from loguru import logger


@dataclass
class SyncFrame(ControlFrame):
"""This frame is used to know when the internal pipelines have finished."""

pass


class Source(FrameProcessor):
def __init__(self, upstream_queue: asyncio.Queue):
super().__init__()
Expand Down Expand Up @@ -67,13 +75,16 @@ def __init__(self, *args):
raise TypeError(f"SyncParallelPipeline argument {processors} is not a list")

# We add a source at the beginning of the pipeline and a sink at the end.
source = Source(self._up_queue)
sink = Sink(self._down_queue)
up_queue = asyncio.Queue()
down_queue = asyncio.Queue()
source = Source(up_queue)
sink = Sink(down_queue)
processors: List[FrameProcessor] = [source] + processors + [sink]

# Keep track of sources and sinks.
self._sources.append(source)
self._sinks.append(sink)
# Keep track of sources and sinks. We also keep the output queue of
# the source and the sinks so we can use it later.
self._sources.append({"processor": source, "queue": down_queue})
self._sinks.append({"processor": sink, "queue": up_queue})

# Create pipeline
pipeline = Pipeline(processors)
Expand All @@ -94,25 +105,60 @@ def processors_with_metrics(self) -> List[FrameProcessor]:
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

# The last processor of each pipeline needs to be synchronous otherwise
# this element won't work. Since, we know it should be synchronous we
# push a SyncFrame. Since frames are ordered we know this frame will be
# pushed after the synchronous processor has pushed its data allowing us
# to synchrnonize all the internal pipelines by waiting for the
# SyncFrame in all of them.
async def wait_for_sync(
obj, main_queue: asyncio.Queue, frame: Frame, direction: FrameDirection
):
processor = obj["processor"]
queue = obj["queue"]

await processor.process_frame(frame, direction)

if isinstance(frame, (SystemFrame, EndFrame)):
new_frame = await queue.get()
if isinstance(new_frame, (SystemFrame, EndFrame)):
await main_queue.put(new_frame)
else:
while not isinstance(new_frame, (SystemFrame, EndFrame)):
await main_queue.put(new_frame)
queue.task_done()
new_frame = await queue.get()
else:
await processor.process_frame(SyncFrame(), direction)
new_frame = await queue.get()
while not isinstance(new_frame, SyncFrame):
await main_queue.put(new_frame)
queue.task_done()
new_frame = await queue.get()

if direction == FrameDirection.UPSTREAM:
# If we get an upstream frame we process it in each sink.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sinks])
await asyncio.gather(
*[wait_for_sync(s, self._up_queue, frame, direction) for s in self._sinks]
)
elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sources])
await asyncio.gather(
*[wait_for_sync(s, self._down_queue, frame, direction) for s in self._sources]
)

seen_ids = set()
while not self._up_queue.empty():
frame = await self._up_queue.get()
if frame and frame.id not in seen_ids:
if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.UPSTREAM)
seen_ids.add(frame.id)
self._up_queue.task_done()

seen_ids = set()
while not self._down_queue.empty():
frame = await self._down_queue.get()
if frame and frame.id not in seen_ids:
if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
seen_ids.add(frame.id)
self._down_queue.task_done()
43 changes: 34 additions & 9 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ async def _handle_upstream_frame(self, frame: Frame):
await self._up_queue.put(StopTaskFrame())


class Sink(FrameProcessor):
def __init__(self, down_queue: asyncio.Queue):
super().__init__()
self._down_queue = down_queue

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

# We really just want to know when the EndFrame reached the sink.
if isinstance(frame, EndFrame):
await self._down_queue.put(frame)


class PipelineTask:
def __init__(
self,
Expand All @@ -84,12 +97,16 @@ def __init__(
self._params = params
self._finished = False

self._down_queue = asyncio.Queue()
self._up_queue = asyncio.Queue()
self._down_queue = asyncio.Queue()
self._push_queue = asyncio.Queue()

self._source = Source(self._up_queue)
self._source.link(pipeline)

self._sink = Sink(self._down_queue)
pipeline.link(self._sink)

def has_finished(self):
return self._finished

Expand All @@ -103,19 +120,19 @@ async def cancel(self):
# out-of-band from the main streaming task which is what we want since
# we want to cancel right away.
await self._source.push_frame(CancelFrame())
self._process_down_task.cancel()
self._process_push_task.cancel()
self._process_up_task.cancel()
await self._process_down_task
await self._process_push_task
await self._process_up_task

async def run(self):
self._process_up_task = asyncio.create_task(self._process_up_queue())
self._process_down_task = asyncio.create_task(self._process_down_queue())
await asyncio.gather(self._process_up_task, self._process_down_task)
self._process_push_task = asyncio.create_task(self._process_push_queue())
await asyncio.gather(self._process_up_task, self._process_push_task)
self._finished = True

async def queue_frame(self, frame: Frame):
await self._down_queue.put(frame)
await self._push_queue.put(frame)

async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
if isinstance(frames, AsyncIterable):
Expand All @@ -133,7 +150,7 @@ def _initial_metrics_frame(self) -> MetricsFrame:
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
return MetricsFrame(data=data)

async def _process_down_queue(self):
async def _process_push_queue(self):
self._clock.start()

start_frame = StartFrame(
Expand All @@ -154,11 +171,13 @@ async def _process_down_queue(self):
should_cleanup = True
while running:
try:
frame = await self._down_queue.get()
frame = await self._push_queue.get()
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
if isinstance(frame, EndFrame):
await self._wait_for_endframe()
running = not (isinstance(frame, StopTaskFrame) or isinstance(frame, EndFrame))
should_cleanup = not isinstance(frame, StopTaskFrame)
self._down_queue.task_done()
self._push_queue.task_done()
except asyncio.CancelledError:
break
# Cleanup only if we need to.
Expand All @@ -169,6 +188,12 @@ async def _process_down_queue(self):
self._process_up_task.cancel()
await self._process_up_task

async def _wait_for_endframe(self):
# NOTE(aleix): the Sink element just pushes EndFrames to the down queue,
# so just wait for it. In the future we might do something else here,
# but for now this is fine.
await self._down_queue.get()

async def _process_up_queue(self):
while True:
try:
Expand Down
22 changes: 8 additions & 14 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def __init__(
*,
name: str | None = None,
metrics: FrameProcessorMetrics | None = None,
sync: bool = True,
loop: asyncio.AbstractEventLoop | None = None,
**kwargs,
):
Expand All @@ -47,7 +46,6 @@ def __init__(
self._prev: "FrameProcessor" | None = None
self._next: "FrameProcessor" | None = None
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
self._sync = sync

self._event_handlers: dict = {}

Expand All @@ -66,11 +64,8 @@ def __init__(

# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are
# the exception to this rule.
#
# This create this task.
if not self._sync:
self.__create_push_task()
# the exception to this rule. This create this task.
self.__create_push_task()

@property
def interruptions_allowed(self):
Expand Down Expand Up @@ -167,7 +162,7 @@ async def push_error(self, error: ErrorFrame):
await self.push_frame(error, FrameDirection.UPSTREAM)

async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if self._sync or isinstance(frame, SystemFrame):
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))
Expand All @@ -194,13 +189,12 @@ def _register_event_handler(self, event_name: str):
#

async def _start_interruption(self):
if not self._sync:
# Cancel the task. This will stop pushing frames downstream.
self.__push_frame_task.cancel()
await self.__push_frame_task
# Cancel the task. This will stop pushing frames downstream.
self.__push_frame_task.cancel()
await self.__push_frame_task

# Create a new queue and task.
self.__create_push_task()
# Create a new queue and task.
self.__create_push_task()

async def _stop_interruption(self):
# Nothing to do right now.
Expand Down
2 changes: 1 addition & 1 deletion src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def __init__(
params: RTVIProcessorParams = RTVIProcessorParams(),
**kwargs,
):
super().__init__(sync=False, **kwargs)
super().__init__(**kwargs)
self._config = config
self._params = params

Expand Down
2 changes: 1 addition & 1 deletion src/pipecat/processors/gstreamer/pipeline_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class OutputParams(BaseModel):
clock_sync: bool = True

def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs):
super().__init__(sync=False, **kwargs)
super().__init__(**kwargs)

self._out_params = out_params

Expand Down
Loading

0 comments on commit 392f210

Please sign in to comment.