Skip to content

Commit

Permalink
introduce FrameProcessor single async push frame task
Browse files Browse the repository at this point in the history
Pipecat has a pipeline-based architecture. The pipeline consists of frame
processors linked to each other. The elements travelling across the pipeline are
called frames.

To have a deterministic behavior the frames travelling through the pipeline are
awlays ordered, except system frames which are out-of-band frames.

To achieve ordering each frame processor only outputs frames from a single
task. This single task is internally created in each FrameProcessor and the
developer doesn't need to know much about it, except conceptually.

Having a single output task avoids problems such as audio overlapping.
  • Loading branch information
aconchillo committed Sep 1, 2024
1 parent 7c342f7 commit 9bbb824
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 350 deletions.
2 changes: 1 addition & 1 deletion examples/foundational/17-detect-user-idle.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def main():
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."})
await user_idle.queue_frame(LLMMessagesFrame(messages))
await user_idle.push_frame(LLMMessagesFrame(messages))

user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)

Expand Down
1 change: 0 additions & 1 deletion src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

import sys
from typing import List

from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext
Expand Down
64 changes: 0 additions & 64 deletions src/pipecat/processors/async_frame_processor.py

This file was deleted.

51 changes: 49 additions & 2 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
from enum import Enum

from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
MetricsFrame,
StartFrame,
StartInterruptionFrame,
UserStoppedSpeakingFrame)
StopInterruptionFrame,
SystemFrame)
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand Down Expand Up @@ -105,6 +107,13 @@ def __init__(
# Metrics
self._metrics = FrameProcessorMetrics(name=self.name)

# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are
# the exception to this rule.
#
# This create this task.
self.__create_push_task()

@property
def interruptions_allowed(self):
return self._allow_interruptions
Expand Down Expand Up @@ -184,14 +193,41 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self._enable_usage_metrics = frame.enable_usage_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
elif isinstance(frame, StartInterruptionFrame):
await self._start_interruption()
await self.stop_all_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
elif isinstance(frame, StopInterruptionFrame):
self._should_report_ttfb = True

async def push_error(self, error: ErrorFrame):
await self.push_frame(error, FrameDirection.UPSTREAM)

async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))

#
# Handle interruptions
#

async def _start_interruption(self):
# Cancel the task. This will stop pushing frames downstream.
self.__push_frame_task.cancel()
await self.__push_frame_task

# Create a new queue and task.
self.__create_push_task()

async def _stop_interruption(self):
# Nothing to do right now.
pass

def __create_push_task(self):
self.__push_queue = asyncio.Queue()
self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler())

async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
try:
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
Expand All @@ -202,5 +238,16 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")

async def __push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
self.__push_queue.task_done()
except asyncio.CancelledError:
break

def __str__(self):
return self.name
29 changes: 0 additions & 29 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,6 @@ def __init__(self,
self._registered_actions: Dict[str, RTVIAction] = {}
self._registered_services: Dict[str, RTVIService] = {}

self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
self._push_queue = asyncio.Queue()

self._message_task = self.get_event_loop().create_task(self._message_task_handler())
self._message_queue = asyncio.Queue()

Expand Down Expand Up @@ -335,12 +332,6 @@ async def handle_function_call_start(
message = RTVILLMFunctionCallStartMessage(data=fn)
await self._push_transport_message(message, exclude_none=False)

async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if isinstance(frame, SystemFrame):
await super().push_frame(frame, direction)
else:
await self._internal_push_frame(frame, direction)

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

Expand Down Expand Up @@ -394,30 +385,10 @@ async def _stop(self, frame: EndFrame):
# processing EndFrames.
self._message_task.cancel()
await self._message_task
await self._push_frame_task

async def _cancel(self, frame: CancelFrame):
self._message_task.cancel()
await self._message_task
self._push_frame_task.cancel()
await self._push_frame_task

async def _internal_push_frame(
self,
frame: Frame | None,
direction: FrameDirection | None = FrameDirection.DOWNSTREAM):
await self._push_queue.put((frame, direction))

async def _push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self._push_queue.get()
await super().push_frame(frame, direction)
self._push_queue.task_done()
running = not isinstance(frame, EndFrame)
except asyncio.CancelledError:
break

async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True):
frame = TransportMessageFrame(
Expand Down
48 changes: 6 additions & 42 deletions src/pipecat/processors/gstreamer/pipeline_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(),
bus.add_signal_watch()
bus.connect("message", self._on_gstreamer_message)

# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
self._create_push_task()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

Expand All @@ -80,60 +76,28 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
elif isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self._internal_push_frame(frame, direction)
await self.push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() waits on the task to
# finish and the task finishes when EndFrame is processed.
await self._internal_push_frame(frame, direction)
await self.push_frame(frame, direction)
await self._stop(frame)
# Other frames
else:
await self._internal_push_frame(frame, direction)
await self.push_frame(frame, direction)

async def _start(self, frame: StartFrame):
self._player.set_state(Gst.State.PLAYING)

async def _stop(self, frame: EndFrame):
self._player.set_state(Gst.State.NULL)
# Wait for the push frame task to finish. It will finish when the
# EndFrame is actually processed.
await self._push_frame_task

async def _cancel(self, frame: CancelFrame):
self._player.set_state(Gst.State.NULL)
# Cancel all the tasks and wait for them to finish.
self._push_frame_task.cancel()
await self._push_frame_task

#
# Push frames task
#

def _create_push_task(self):
loop = self.get_event_loop()
self._push_queue = asyncio.Queue()
self._push_frame_task = loop.create_task(self._push_frame_task_handler())

async def _internal_push_frame(
self,
frame: Frame | None,
direction: FrameDirection | None = FrameDirection.DOWNSTREAM):
await self._push_queue.put((frame, direction))

async def _push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self._push_queue.get()
await self.push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
self._push_queue.task_done()
except asyncio.CancelledError:
break

#
# GStreaner
# GStreamer
#

def _on_gstreamer_message(self, bus: Gst.Bus, message: Gst.Message):
Expand Down Expand Up @@ -221,7 +185,7 @@ def _appsink_audio_new_sample(self, appsink: GstApp.AppSink):
frame = AudioRawFrame(audio=info.data,
sample_rate=self._out_params.audio_sample_rate,
num_channels=self._out_params.audio_channels)
asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop())
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
buffer.unmap(info)
return Gst.FlowReturn.OK

Expand All @@ -232,6 +196,6 @@ def _appsink_video_new_sample(self, appsink: GstApp.AppSink):
image=info.data,
size=(self._out_params.video_width, self._out_params.video_height),
format="RGB")
asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop())
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
buffer.unmap(info)
return Gst.FlowReturn.OK
17 changes: 4 additions & 13 deletions src/pipecat/processors/idle_frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@

from typing import Awaitable, Callable, List

from pipecat.frames.frames import Frame, SystemFrame
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class IdleFrameProcessor(AsyncFrameProcessor):
class IdleFrameProcessor(FrameProcessor):
"""This class waits to receive any frame or list of desired frames within a
given timeout. If the timeout is reached before receiving any of those
frames the provided callback will be called.
The callback can then be used to push frames downstream by using
`queue_frame()` (or `push_frame()` for system frames).
"""

def __init__(
Expand All @@ -41,10 +36,7 @@ def __init__(
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
await self.queue_frame(frame, direction)
await self.push_frame(frame, direction)

# If we are not waiting for any specific frame set the event, otherwise
# check if we have received one of the desired frames.
Expand All @@ -55,7 +47,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, t):
self._idle_event.set()

# If we are not waiting for any specific frame set the event, otherwise
async def cleanup(self):
self._idle_task.cancel()
await self._idle_task
Expand Down
14 changes: 3 additions & 11 deletions src/pipecat/processors/user_idle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,16 @@
from pipecat.frames.frames import (
BotSpeakingFrame,
Frame,
SystemFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame)
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class UserIdleProcessor(AsyncFrameProcessor):
class UserIdleProcessor(FrameProcessor):
"""This class is useful to check if the user is interacting with the bot
within a given timeout. If the timeout is reached before any interaction
occurred the provided callback will be called.
The callback can then be used to push frames downstream by using
`queue_frame()` (or `push_frame()` for system frames).
"""

def __init__(
Expand All @@ -46,10 +41,7 @@ def __init__(
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
await self.queue_frame(frame, direction)
await self.push_frame(frame, direction)

# We shouldn't call the idle callback if the user or the bot are speaking.
if isinstance(frame, UserStartedSpeakingFrame):
Expand Down
Loading

0 comments on commit 9bbb824

Please sign in to comment.