From 55a4867051fe8e17f4743d4b6f258caeb392be79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 30 Aug 2024 16:24:04 -0700 Subject: [PATCH] introduce FrameProcessor single async push frame task Pipecat has a pipeline-based architecture. The pipeline consists of frame processors linked to each other. The elements travelling across the pipeline are called frames. To have a deterministic behavior the frames travelling through the pipeline are awlays ordered, except system frames which are out-of-band frames. To achieve ordering each frame processor only outputs frames from a single task. This single task is internally created in each FrameProcessor and the developer doesn't need to know much about it, except conceptually. Having a single output task avoids problems such as audio overlapping. --- .../processors/aggregators/llm_response.py | 1 - .../processors/async_frame_processor.py | 64 ------------- src/pipecat/processors/frame_processor.py | 51 ++++++++++- src/pipecat/processors/frameworks/rtvi.py | 29 ------ .../processors/gstreamer/pipeline_source.py | 48 ++-------- .../processors/idle_frame_processor.py | 13 +-- src/pipecat/processors/user_idle_processor.py | 10 +-- src/pipecat/services/ai_services.py | 27 +----- src/pipecat/services/azure.py | 22 ++--- src/pipecat/services/deepgram.py | 2 +- src/pipecat/services/gladia.py | 33 +++---- src/pipecat/transports/base_input.py | 89 ++++--------------- src/pipecat/transports/base_output.py | 56 ++---------- .../transports/network/websocket_server.py | 4 +- src/pipecat/transports/services/daily.py | 6 +- 15 files changed, 112 insertions(+), 343 deletions(-) delete mode 100644 src/pipecat/processors/async_frame_processor.py diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 7c38e62ad..f6abf2e90 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import sys from typing import List from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext diff --git a/src/pipecat/processors/async_frame_processor.py b/src/pipecat/processors/async_frame_processor.py deleted file mode 100644 index 28a27d255..000000000 --- a/src/pipecat/processors/async_frame_processor.py +++ /dev/null @@ -1,64 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio - -from pipecat.frames.frames import EndFrame, Frame, StartInterruptionFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor - - -class AsyncFrameProcessor(FrameProcessor): - - def __init__( - self, - *, - name: str | None = None, - loop: asyncio.AbstractEventLoop | None = None, - **kwargs): - super().__init__(name=name, loop=loop, **kwargs) - - self._create_push_task() - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartInterruptionFrame): - await self._handle_interruptions(frame) - - async def queue_frame( - self, - frame: Frame, - direction: FrameDirection = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def cleanup(self): - self._push_frame_task.cancel() - await self._push_frame_task - - async def _handle_interruptions(self, frame: Frame): - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task). - await self.push_frame(frame) - # Create a new queue and task. - self._create_push_task() - - def _create_push_task(self): - self._push_queue = asyncio.Queue() - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 156e1c0ae..b0952d945 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -10,12 +10,14 @@ from enum import Enum from pipecat.frames.frames import ( + EndFrame, ErrorFrame, Frame, MetricsFrame, StartFrame, StartInterruptionFrame, - UserStoppedSpeakingFrame) + StopInterruptionFrame, + SystemFrame) from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -105,6 +107,13 @@ def __init__( # Metrics self._metrics = FrameProcessorMetrics(name=self.name) + # Every processor in Pipecat should only output frames from a single + # task. This avoid problems like audio overlapping. System frames are + # the exception to this rule. + # + # This create this task. + self.__create_push_task() + @property def interruptions_allowed(self): return self._allow_interruptions @@ -184,14 +193,41 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._enable_usage_metrics = frame.enable_usage_metrics self._report_only_initial_ttfb = frame.report_only_initial_ttfb elif isinstance(frame, StartInterruptionFrame): + await self._start_interruption() await self.stop_all_metrics() - elif isinstance(frame, UserStoppedSpeakingFrame): + elif isinstance(frame, StopInterruptionFrame): self._should_report_ttfb = True async def push_error(self, error: ErrorFrame): await self.push_frame(error, FrameDirection.UPSTREAM) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + if isinstance(frame, SystemFrame): + await self.__internal_push_frame(frame, direction) + else: + await self.__push_queue.put((frame, direction)) + + # + # Handle interruptions + # + + async def _start_interruption(self): + # Cancel the task. This will stop pushing frames downstream. + self.__push_frame_task.cancel() + await self.__push_frame_task + + # Create a new queue and task. + self.__create_push_task() + + async def _stop_interruption(self): + # Nothing to do right now. + pass + + def __create_push_task(self): + self.__push_queue = asyncio.Queue() + self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler()) + + async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): try: if direction == FrameDirection.DOWNSTREAM and self._next: logger.trace(f"Pushing {frame} from {self} to {self._next}") @@ -202,5 +238,16 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect except Exception as e: logger.exception(f"Uncaught exception in {self}: {e}") + async def __push_frame_task_handler(self): + running = True + while running: + try: + (frame, direction) = await self.__push_queue.get() + await self.__internal_push_frame(frame, direction) + running = not isinstance(frame, EndFrame) + self.__push_queue.task_done() + except asyncio.CancelledError: + break + def __str__(self): return self.name diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index dd28e7252..38547d68b 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -286,9 +286,6 @@ def __init__(self, self._registered_actions: Dict[str, RTVIAction] = {} self._registered_services: Dict[str, RTVIService] = {} - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - self._push_queue = asyncio.Queue() - self._message_task = self.get_event_loop().create_task(self._message_task_handler()) self._message_queue = asyncio.Queue() @@ -335,12 +332,6 @@ async def handle_function_call_start( message = RTVILLMFunctionCallStartMessage(data=fn) await self._push_transport_message(message, exclude_none=False) - async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): - if isinstance(frame, SystemFrame): - await super().push_frame(frame, direction) - else: - await self._internal_push_frame(frame, direction) - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -394,30 +385,10 @@ async def _stop(self, frame: EndFrame): # processing EndFrames. self._message_task.cancel() await self._message_task - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._message_task.cancel() await self._message_task - self._push_frame_task.cancel() - await self._push_frame_task - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await super().push_frame(frame, direction) - self._push_queue.task_done() - running = not isinstance(frame, EndFrame) - except asyncio.CancelledError: - break async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True): frame = TransportMessageFrame( diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index eef0d56cc..6929559b3 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -62,10 +62,6 @@ def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), bus.add_signal_watch() bus.connect("message", self._on_gstreamer_message) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -80,60 +76,28 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self._stop(frame) # Other frames else: - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) async def _start(self, frame: StartFrame): self._player.set_state(Gst.State.PLAYING) async def _stop(self, frame: EndFrame): self._player.set_state(Gst.State.NULL) - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._player.set_state(Gst.State.NULL) - # Cancel all the tasks and wait for them to finish. - self._push_frame_task.cancel() - await self._push_frame_task - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break # - # GStreaner + # GStreamer # def _on_gstreamer_message(self, bus: Gst.Bus, message: Gst.Message): @@ -221,7 +185,7 @@ def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): frame = AudioRawFrame(audio=info.data, sample_rate=self._out_params.audio_sample_rate, num_channels=self._out_params.audio_channels) - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK @@ -232,6 +196,6 @@ def _appsink_video_new_sample(self, appsink: GstApp.AppSink): image=info.data, size=(self._out_params.video_width, self._out_params.video_height), format="RGB") - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index 40304a5c6..04e3ef096 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -8,12 +8,11 @@ from typing import Awaitable, Callable, List -from pipecat.frames.frames import Frame, SystemFrame -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.frames.frames import Frame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class IdleFrameProcessor(AsyncFrameProcessor): +class IdleFrameProcessor(FrameProcessor): """This class waits to receive any frame or list of desired frames within a given timeout. If the timeout is reached before receiving any of those frames the provided callback will be called. @@ -41,11 +40,6 @@ def __init__( async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) - # If we are not waiting for any specific frame set the event, otherwise # check if we have received one of the desired frames. if not self._types: @@ -55,7 +49,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, t): self._idle_event.set() - # If we are not waiting for any specific frame set the event, otherwise async def cleanup(self): self._idle_task.cancel() await self._idle_task diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 7deda2555..50f8da0b3 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -14,11 +14,10 @@ SystemFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class UserIdleProcessor(AsyncFrameProcessor): +class UserIdleProcessor(FrameProcessor): """This class is useful to check if the user is interacting with the bot within a given timeout. If the timeout is reached before any interaction occurred the provided callback will be called. @@ -46,11 +45,6 @@ def __init__( async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) - # We shouldn't call the idle callback if the user or the bot are speaking. if isinstance(frame, UserStartedSpeakingFrame): self._interrupted = True diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index c5cb7cc0d..65a5557ea 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -32,7 +32,6 @@ UserImageRequestFrame, VisionImageRawFrame ) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.utils.audio import calculate_audio_volume @@ -64,7 +63,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, EndFrame): await self.stop(frame) - async def process_generator(self, generator: AsyncGenerator[Frame, None]): + async def process_generator(self, generator: AsyncGenerator[Frame | None, None]): async for f in generator: if f: if isinstance(f, ErrorFrame): @@ -73,30 +72,6 @@ async def process_generator(self, generator: AsyncGenerator[Frame, None]): await self.push_frame(f) -class AsyncAIService(AsyncFrameProcessor): - def __init__(self, **kwargs): - super().__init__(**kwargs) - - async def start(self, frame: StartFrame): - pass - - async def stop(self, frame: EndFrame): - pass - - async def cancel(self, frame: CancelFrame): - pass - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartFrame): - await self.start(frame) - elif isinstance(frame, CancelFrame): - await self.cancel(frame) - elif isinstance(frame, EndFrame): - await self.stop(frame) - - class LLMService(AIService): """This class is a no-op but serves as a base class for LLM services.""" diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index 76e884992..b2db48dd5 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -18,13 +18,11 @@ ErrorFrame, Frame, StartFrame, - SystemFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame, URLImageRawFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService, TTSService, ImageGenService +from pipecat.services.ai_services import STTService, TTSService, ImageGenService from pipecat.services.openai import BaseOpenAILLMService from pipecat.utils.time import time_now_iso8601 @@ -118,7 +116,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.error(f"{self} error: {cancellation_details.error_details}") -class AzureSTTService(AsyncAIService): +class AzureSTTService(STTService): def __init__( self, *, @@ -141,15 +139,11 @@ def __init__( speech_config=speech_config, audio_config=audio_config) self._speech_recognizer.recognized.connect(self._on_handle_recognized) - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - self._audio_stream.write(frame.audio) - else: - await self._push_queue.put((frame, direction)) + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + self._audio_stream.write(audio) + await self.stop_processing_metrics() + yield None async def start(self, frame: StartFrame): await super().start(frame) @@ -168,7 +162,7 @@ async def cancel(self, frame: CancelFrame): def _on_handle_recognized(self, event): if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0: frame = TranscriptionFrame(event.result.text, "", time_now_iso8601()) - asyncio.run_coroutine_threadsafe(self.queue_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) class AzureImageGenServiceREST(ImageGenService): diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index d899d4bdb..708c3c511 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -161,8 +161,8 @@ async def cancel(self, frame: CancelFrame): async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: await self.start_processing_metrics() await self._connection.send(audio) - yield None await self.stop_processing_metrics() + yield None async def _connect(self): if await self._connection.start(self._live_options): diff --git a/src/pipecat/services/gladia.py b/src/pipecat/services/gladia.py index 886300897..6ffff12f9 100644 --- a/src/pipecat/services/gladia.py +++ b/src/pipecat/services/gladia.py @@ -7,20 +7,17 @@ import base64 import json -from typing import Optional +from typing import AsyncGenerator, Optional from pydantic.main import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, InterimTranscriptionFrame, StartFrame, - SystemFrame, TranscriptionFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService +from pipecat.services.ai_services import STTService from pipecat.utils.time import time_now_iso8601 from loguru import logger @@ -35,7 +32,7 @@ raise Exception(f"Missing module: {e}") -class GladiaSTTService(AsyncAIService): +class GladiaSTTService(STTService): class InputParams(BaseModel): sample_rate: Optional[int] = 16000 language: Optional[str] = "english" @@ -57,16 +54,6 @@ def __init__(self, self._params = params self._confidence = confidence - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - await self._send_audio(frame) - else: - await self.queue_frame(frame, direction) - async def start(self, frame: StartFrame): await super().start(frame) self._websocket = await websockets.connect(self._url) @@ -81,6 +68,12 @@ async def cancel(self, frame: CancelFrame): await super().cancel(frame) await self._websocket.close() + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + await self._send_audio(audio) + await self.stop_processing_metrics() + yield None + async def _setup_gladia(self): configuration = { "x_gladia_key": self._api_key, @@ -92,9 +85,9 @@ async def _setup_gladia(self): await self._websocket.send(json.dumps(configuration)) - async def _send_audio(self, frame: AudioRawFrame): + async def _send_audio(self, audio: bytes): message = { - 'frames': base64.b64encode(frame.audio).decode("utf-8") + 'frames': base64.b64encode(audio).decode("utf-8") } await self._websocket.send(json.dumps(message)) @@ -113,6 +106,6 @@ async def _receive_task_handler(self): transcript = utterance["transcription"] if confidence >= self._confidence: if type == "final": - await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) else: - await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 3d1d0c4d7..dda5a66ea 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -37,10 +37,6 @@ def __init__(self, params: TransportParams, **kwargs): self._executor = ThreadPoolExecutor(max_workers=5) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create audio input queue and task if needed. if self._params.audio_in_enabled or self._params.vad_enabled: @@ -53,10 +49,6 @@ async def stop(self, frame: EndFrame): self._audio_task.cancel() await self._audio_task - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task - async def cancel(self, frame: CancelFrame): # Cancel all the tasks and wait for them to finish. @@ -64,9 +56,6 @@ async def cancel(self, frame: CancelFrame): self._audio_task.cancel() await self._audio_task - self._push_frame_task.cancel() - await self._push_frame_task - def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer @@ -86,11 +75,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, BotInterruptionFrame): - await self._handle_interruptions(frame, False) - elif isinstance(frame, StartInterruptionFrame): + logger.debug("Bot interruption") await self._start_interruption() - elif isinstance(frame, StopInterruptionFrame): - await self._stop_interruption() # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -98,12 +84,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self.start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self.stop(frame) elif isinstance(frame, VADParamsUpdateFrame): vad_analyzer = self.vad_analyzer() @@ -111,73 +97,28 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): vad_analyzer.set_params(frame.params) # Other frames else: - await self._internal_push_frame(frame, direction) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(frame, direction) # # Handle interruptions # - async def _start_interruption(self): - if not self.interruptions_allowed: - return - - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task) to stop everything, specially at the output - # transport. - await self.push_frame(StartInterruptionFrame()) - # Create a new queue and task. - self._create_push_task() - - async def _stop_interruption(self): - if not self.interruptions_allowed: - return - - await self.push_frame(StopInterruptionFrame()) - - async def _handle_interruptions(self, frame: Frame, push_frame: bool): + async def _handle_interruptions(self, frame: Frame): if self.interruptions_allowed: - # Make sure we notify about interruptions quickly out-of-band - if isinstance(frame, BotInterruptionFrame): - logger.debug("Bot interruption") - await self._start_interruption() - elif isinstance(frame, UserStartedSpeakingFrame): + # Make sure we notify about interruptions quickly out-of-band. + if isinstance(frame, UserStartedSpeakingFrame): logger.debug("User started speaking") await self._start_interruption() + # Push an out-of-band frame (i.e. not using the ordered push + # frame task) to stop everything, specially at the output + # transport. + await self.push_frame(StartInterruptionFrame()) elif isinstance(frame, UserStoppedSpeakingFrame): logger.debug("User stopped speaking") await self._stop_interruption() + await self.push_frame(StopInterruptionFrame()) - if push_frame: - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio input @@ -201,7 +142,7 @@ async def _handle_vad(self, audio_frames: bytes, vad_state: VADState): frame = UserStoppedSpeakingFrame() if frame: - await self._handle_interruptions(frame, True) + await self._handle_interruptions(frame) vad_state = new_vad_state return vad_state @@ -222,7 +163,7 @@ async def _audio_task_handler(self): # Push audio downstream if passthrough. if audio_passthrough: - await self._internal_push_frame(frame) + await self.push_frame(frame) self._audio_in_queue.task_done() except asyncio.CancelledError: diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index d2c6add2b..5f9dcbe08 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -66,10 +66,6 @@ def __init__(self, params: TransportParams, **kwargs): # generating frames upstream while, for example, the audio is playing. self._create_sink_task() - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create camera output queue and task if needed. if self._params.camera_out_enabled: @@ -91,9 +87,8 @@ async def stop(self, frame: EndFrame): self._audio_out_task.cancel() await self._audio_out_task - # Wait for the push frame and sink tasks to finish. They will finish when - # the EndFrame is actually processed. - await self._push_frame_task + # Wait for the sink task to finish. They will finish when the EndFrame + # is actually processed. await self._sink_task async def cancel(self, frame: CancelFrame): @@ -103,9 +98,6 @@ async def cancel(self, frame: CancelFrame): self._camera_out_task.cancel() await self._camera_out_task - self._push_frame_task.cancel() - await self._push_frame_task - self._sink_task.cancel() await self._sink_task @@ -170,10 +162,6 @@ async def _handle_interruptions(self, frame: Frame): self._sink_task.cancel() await self._sink_task self._create_sink_task() - # Stop push task. - self._push_frame_task.cancel() - await self._push_frame_task - self._create_push_task() # Let's send a bot stopped speaking if we have to. if self._bot_speaking: await self._bot_stopped_speaking() @@ -213,7 +201,7 @@ async def _sink_task_handler(self): frame = await self._sink_queue.get() if isinstance(frame, AudioRawFrame): await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) elif isinstance(frame, ImageRawFrame): await self._set_camera_image(frame) @@ -223,12 +211,12 @@ async def _sink_task_handler(self): await self.send_message(frame) elif isinstance(frame, TTSStartedFrame): await self._bot_started_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) elif isinstance(frame, TTSStoppedFrame): await self._bot_stopped_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) running = not isinstance(frame, EndFrame) @@ -241,38 +229,12 @@ async def _sink_task_handler(self): async def _bot_started_speaking(self): logger.debug("Bot started speaking") self._bot_speaking = True - await self._internal_push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) + await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) async def _bot_stopped_speaking(self): logger.debug("Bot stopped speaking") self._bot_speaking = False - await self._internal_push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) # # Camera out @@ -356,7 +318,7 @@ async def _audio_out_task_handler(self): try: frame = await self._audio_out_queue.get() await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) except asyncio.CancelledError: break diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 08ca9719a..819950d72 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -98,9 +98,9 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p continue if isinstance(frame, AudioRawFrame): - await self.push_audio_frame(frame) + await self.queue_audio_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) # Notify disconnection await self._callbacks.on_client_disconnected(websocket) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index bb6032fa4..e9e99121c 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -619,11 +619,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # async def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame): - await self._internal_push_frame(frame) + await self.push_frame(frame) async def push_app_message(self, message: Any, sender: str): frame = DailyTransportMessageFrame(message=message, participant_id=sender) - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio in @@ -686,7 +686,7 @@ async def _on_participant_video_frame(self, participant_id: str, buffer, size, f image=buffer, size=size, format=format) - await self._internal_push_frame(frame) + await self.push_frame(frame) self._video_renderers[participant_id]["timestamp"] = curr_time