Skip to content

Commit

Permalink
introduce FrameProcessor single async push frame task
Browse files Browse the repository at this point in the history
Pipecat has a pipeline-based architecture. The pipeline consists of frame
processors linked to each other. The elements travelling across the pipeline are
called frames.

To have a deterministic behavior the frames travelling through the pipeline are
awlays ordered, except system frames which are out-of-band frames.

To achieve ordering each frame processor only outputs frames from a single
task. This single task is internally created in each FrameProcessor and the
developer doesn't need to know much about it, except conceptually.

Having a single output task avoids problems such as audio overlapping.
  • Loading branch information
aconchillo committed Sep 1, 2024
1 parent 7c342f7 commit 55a4867
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 343 deletions.
1 change: 0 additions & 1 deletion src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

import sys
from typing import List

from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext
Expand Down
64 changes: 0 additions & 64 deletions src/pipecat/processors/async_frame_processor.py

This file was deleted.

51 changes: 49 additions & 2 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
from enum import Enum

from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
MetricsFrame,
StartFrame,
StartInterruptionFrame,
UserStoppedSpeakingFrame)
StopInterruptionFrame,
SystemFrame)
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand Down Expand Up @@ -105,6 +107,13 @@ def __init__(
# Metrics
self._metrics = FrameProcessorMetrics(name=self.name)

# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are
# the exception to this rule.
#
# This create this task.
self.__create_push_task()

@property
def interruptions_allowed(self):
return self._allow_interruptions
Expand Down Expand Up @@ -184,14 +193,41 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self._enable_usage_metrics = frame.enable_usage_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
elif isinstance(frame, StartInterruptionFrame):
await self._start_interruption()
await self.stop_all_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
elif isinstance(frame, StopInterruptionFrame):
self._should_report_ttfb = True

async def push_error(self, error: ErrorFrame):
await self.push_frame(error, FrameDirection.UPSTREAM)

async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))

#
# Handle interruptions
#

async def _start_interruption(self):
# Cancel the task. This will stop pushing frames downstream.
self.__push_frame_task.cancel()
await self.__push_frame_task

# Create a new queue and task.
self.__create_push_task()

async def _stop_interruption(self):
# Nothing to do right now.
pass

def __create_push_task(self):
self.__push_queue = asyncio.Queue()
self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler())

async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
try:
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
Expand All @@ -202,5 +238,16 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")

async def __push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
self.__push_queue.task_done()
except asyncio.CancelledError:
break

def __str__(self):
return self.name
29 changes: 0 additions & 29 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,6 @@ def __init__(self,
self._registered_actions: Dict[str, RTVIAction] = {}
self._registered_services: Dict[str, RTVIService] = {}

self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
self._push_queue = asyncio.Queue()

self._message_task = self.get_event_loop().create_task(self._message_task_handler())
self._message_queue = asyncio.Queue()

Expand Down Expand Up @@ -335,12 +332,6 @@ async def handle_function_call_start(
message = RTVILLMFunctionCallStartMessage(data=fn)
await self._push_transport_message(message, exclude_none=False)

async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if isinstance(frame, SystemFrame):
await super().push_frame(frame, direction)
else:
await self._internal_push_frame(frame, direction)

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

Expand Down Expand Up @@ -394,30 +385,10 @@ async def _stop(self, frame: EndFrame):
# processing EndFrames.
self._message_task.cancel()
await self._message_task
await self._push_frame_task

async def _cancel(self, frame: CancelFrame):
self._message_task.cancel()
await self._message_task
self._push_frame_task.cancel()
await self._push_frame_task

async def _internal_push_frame(
self,
frame: Frame | None,
direction: FrameDirection | None = FrameDirection.DOWNSTREAM):
await self._push_queue.put((frame, direction))

async def _push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self._push_queue.get()
await super().push_frame(frame, direction)
self._push_queue.task_done()
running = not isinstance(frame, EndFrame)
except asyncio.CancelledError:
break

async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True):
frame = TransportMessageFrame(
Expand Down
48 changes: 6 additions & 42 deletions src/pipecat/processors/gstreamer/pipeline_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(),
bus.add_signal_watch()
bus.connect("message", self._on_gstreamer_message)

# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
self._create_push_task()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

Expand All @@ -80,60 +76,28 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
elif isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self._internal_push_frame(frame, direction)
await self.push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() waits on the task to
# finish and the task finishes when EndFrame is processed.
await self._internal_push_frame(frame, direction)
await self.push_frame(frame, direction)
await self._stop(frame)
# Other frames
else:
await self._internal_push_frame(frame, direction)
await self.push_frame(frame, direction)

async def _start(self, frame: StartFrame):
self._player.set_state(Gst.State.PLAYING)

async def _stop(self, frame: EndFrame):
self._player.set_state(Gst.State.NULL)
# Wait for the push frame task to finish. It will finish when the
# EndFrame is actually processed.
await self._push_frame_task

async def _cancel(self, frame: CancelFrame):
self._player.set_state(Gst.State.NULL)
# Cancel all the tasks and wait for them to finish.
self._push_frame_task.cancel()
await self._push_frame_task

#
# Push frames task
#

def _create_push_task(self):
loop = self.get_event_loop()
self._push_queue = asyncio.Queue()
self._push_frame_task = loop.create_task(self._push_frame_task_handler())

async def _internal_push_frame(
self,
frame: Frame | None,
direction: FrameDirection | None = FrameDirection.DOWNSTREAM):
await self._push_queue.put((frame, direction))

async def _push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self._push_queue.get()
await self.push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
self._push_queue.task_done()
except asyncio.CancelledError:
break

#
# GStreaner
# GStreamer
#

def _on_gstreamer_message(self, bus: Gst.Bus, message: Gst.Message):
Expand Down Expand Up @@ -221,7 +185,7 @@ def _appsink_audio_new_sample(self, appsink: GstApp.AppSink):
frame = AudioRawFrame(audio=info.data,
sample_rate=self._out_params.audio_sample_rate,
num_channels=self._out_params.audio_channels)
asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop())
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
buffer.unmap(info)
return Gst.FlowReturn.OK

Expand All @@ -232,6 +196,6 @@ def _appsink_video_new_sample(self, appsink: GstApp.AppSink):
image=info.data,
size=(self._out_params.video_width, self._out_params.video_height),
format="RGB")
asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop())
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
buffer.unmap(info)
return Gst.FlowReturn.OK
13 changes: 3 additions & 10 deletions src/pipecat/processors/idle_frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@

from typing import Awaitable, Callable, List

from pipecat.frames.frames import Frame, SystemFrame
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class IdleFrameProcessor(AsyncFrameProcessor):
class IdleFrameProcessor(FrameProcessor):
"""This class waits to receive any frame or list of desired frames within a
given timeout. If the timeout is reached before receiving any of those
frames the provided callback will be called.
Expand Down Expand Up @@ -41,11 +40,6 @@ def __init__(
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
await self.queue_frame(frame, direction)

# If we are not waiting for any specific frame set the event, otherwise
# check if we have received one of the desired frames.
if not self._types:
Expand All @@ -55,7 +49,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, t):
self._idle_event.set()

# If we are not waiting for any specific frame set the event, otherwise
async def cleanup(self):
self._idle_task.cancel()
await self._idle_task
Expand Down
10 changes: 2 additions & 8 deletions src/pipecat/processors/user_idle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
SystemFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame)
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class UserIdleProcessor(AsyncFrameProcessor):
class UserIdleProcessor(FrameProcessor):
"""This class is useful to check if the user is interacting with the bot
within a given timeout. If the timeout is reached before any interaction
occurred the provided callback will be called.
Expand Down Expand Up @@ -46,11 +45,6 @@ def __init__(
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
await self.queue_frame(frame, direction)

# We shouldn't call the idle callback if the user or the bot are speaking.
if isinstance(frame, UserStartedSpeakingFrame):
self._interrupted = True
Expand Down
Loading

0 comments on commit 55a4867

Please sign in to comment.