From 337f048864d154dc3a70e4ae38eb3fc54f69abbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 30 Aug 2024 16:24:04 -0700 Subject: [PATCH 01/11] introduce synchronous and asynchronous frame processors Pipecat has a pipeline-based architecture. The pipeline consists of frame processors linked to each other. The elements travelling across the pipeline are called frames. To have a deterministic behavior the frames travelling through the pipeline should always be ordered, except system frames which are out-of-band frames. To achieve that, each frame processor should only output frames from a single task. There are synchronous and asynchronous frame processors. The synchronous processors push output frames from the same task that they receive input frames, and therefore only pushing frames from one task. Asynchrnous frame processors can have internal tasks to perform things asynchrnously (e.g. receiving data from a websocket) but they also have a single task where they push frames from. --- examples/foundational/17-detect-user-idle.py | 2 +- .../processors/aggregators/llm_response.py | 1 - .../processors/async_frame_processor.py | 64 ------------- src/pipecat/processors/frame_processor.py | 55 ++++++++++- src/pipecat/processors/frameworks/rtvi.py | 34 +------ .../processors/gstreamer/pipeline_source.py | 50 ++-------- .../processors/idle_frame_processor.py | 19 +--- src/pipecat/processors/user_idle_processor.py | 16 +--- src/pipecat/services/ai_services.py | 27 +----- src/pipecat/services/azure.py | 22 ++--- src/pipecat/services/deepgram.py | 2 +- src/pipecat/services/gladia.py | 35 +++---- src/pipecat/services/lmnt.py | 2 +- src/pipecat/transports/base_input.py | 91 ++++--------------- src/pipecat/transports/base_output.py | 60 +++--------- .../transports/network/websocket_server.py | 4 +- src/pipecat/transports/services/daily.py | 6 +- 17 files changed, 130 insertions(+), 360 deletions(-) delete mode 100644 src/pipecat/processors/async_frame_processor.py diff --git a/examples/foundational/17-detect-user-idle.py b/examples/foundational/17-detect-user-idle.py index 903f35f4e..66fcfb200 100644 --- a/examples/foundational/17-detect-user-idle.py +++ b/examples/foundational/17-detect-user-idle.py @@ -70,7 +70,7 @@ async def main(): async def user_idle_callback(user_idle: UserIdleProcessor): messages.append( {"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."}) - await user_idle.queue_frame(LLMMessagesFrame(messages)) + await user_idle.push_frame(LLMMessagesFrame(messages)) user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index ab0552578..379394120 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import sys from typing import List from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext diff --git a/src/pipecat/processors/async_frame_processor.py b/src/pipecat/processors/async_frame_processor.py deleted file mode 100644 index 28a27d255..000000000 --- a/src/pipecat/processors/async_frame_processor.py +++ /dev/null @@ -1,64 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio - -from pipecat.frames.frames import EndFrame, Frame, StartInterruptionFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor - - -class AsyncFrameProcessor(FrameProcessor): - - def __init__( - self, - *, - name: str | None = None, - loop: asyncio.AbstractEventLoop | None = None, - **kwargs): - super().__init__(name=name, loop=loop, **kwargs) - - self._create_push_task() - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartInterruptionFrame): - await self._handle_interruptions(frame) - - async def queue_frame( - self, - frame: Frame, - direction: FrameDirection = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def cleanup(self): - self._push_frame_task.cancel() - await self._push_frame_task - - async def _handle_interruptions(self, frame: Frame): - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task). - await self.push_frame(frame) - # Create a new queue and task. - self._create_push_task() - - def _create_push_task(self): - self._push_queue = asyncio.Queue() - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index dfdee7d40..72924776c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -11,12 +11,14 @@ from pipecat.clocks.base_clock import BaseClock from pipecat.frames.frames import ( + EndFrame, ErrorFrame, Frame, MetricsFrame, StartFrame, StartInterruptionFrame, - UserStoppedSpeakingFrame) + StopInterruptionFrame, + SystemFrame) from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -88,6 +90,7 @@ def __init__( self, *, name: str | None = None, + sync: bool = True, loop: asyncio.AbstractEventLoop | None = None, **kwargs): self.id: int = obj_id() @@ -96,6 +99,7 @@ def __init__( self._prev: "FrameProcessor" | None = None self._next: "FrameProcessor" | None = None self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop() + self._sync = sync # Clock self._clock: BaseClock | None = None @@ -109,6 +113,14 @@ def __init__( # Metrics self._metrics = FrameProcessorMetrics(name=self.name) + # Every processor in Pipecat should only output frames from a single + # task. This avoid problems like audio overlapping. System frames are + # the exception to this rule. + # + # This create this task. + if not self._sync: + self.__create_push_task() + @property def interruptions_allowed(self): return self._allow_interruptions @@ -192,14 +204,38 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._enable_usage_metrics = frame.enable_usage_metrics self._report_only_initial_ttfb = frame.report_only_initial_ttfb elif isinstance(frame, StartInterruptionFrame): + await self._start_interruption() await self.stop_all_metrics() - elif isinstance(frame, UserStoppedSpeakingFrame): + elif isinstance(frame, StopInterruptionFrame): self._should_report_ttfb = True async def push_error(self, error: ErrorFrame): await self.push_frame(error, FrameDirection.UPSTREAM) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + if self._sync or isinstance(frame, SystemFrame): + await self.__internal_push_frame(frame, direction) + else: + await self.__push_queue.put((frame, direction)) + + # + # Handle interruptions + # + + async def _start_interruption(self): + if not self._sync: + # Cancel the task. This will stop pushing frames downstream. + self.__push_frame_task.cancel() + await self.__push_frame_task + + # Create a new queue and task. + self.__create_push_task() + + async def _stop_interruption(self): + # Nothing to do right now. + pass + + async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): try: if direction == FrameDirection.DOWNSTREAM and self._next: logger.trace(f"Pushing {frame} from {self} to {self._next}") @@ -210,5 +246,20 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect except Exception as e: logger.exception(f"Uncaught exception in {self}: {e}") + def __create_push_task(self): + self.__push_queue = asyncio.Queue() + self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler()) + + async def __push_frame_task_handler(self): + running = True + while running: + try: + (frame, direction) = await self.__push_queue.get() + await self.__internal_push_frame(frame, direction) + running = not isinstance(frame, EndFrame) + self.__push_queue.task_done() + except asyncio.CancelledError: + break + def __str__(self): return self.name diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index dd28e7252..d32f0f640 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -272,8 +272,9 @@ class RTVIProcessor(FrameProcessor): def __init__(self, *, config: RTVIConfig = RTVIConfig(config=[]), - params: RTVIProcessorParams = RTVIProcessorParams()): - super().__init__() + params: RTVIProcessorParams = RTVIProcessorParams(), + **kwargs): + super().__init__(sync=False, **kwargs) self._config = config self._params = params @@ -286,9 +287,6 @@ def __init__(self, self._registered_actions: Dict[str, RTVIAction] = {} self._registered_services: Dict[str, RTVIService] = {} - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - self._push_queue = asyncio.Queue() - self._message_task = self.get_event_loop().create_task(self._message_task_handler()) self._message_queue = asyncio.Queue() @@ -335,12 +333,6 @@ async def handle_function_call_start( message = RTVILLMFunctionCallStartMessage(data=fn) await self._push_transport_message(message, exclude_none=False) - async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): - if isinstance(frame, SystemFrame): - await super().push_frame(frame, direction) - else: - await self._internal_push_frame(frame, direction) - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -394,30 +386,10 @@ async def _stop(self, frame: EndFrame): # processing EndFrames. self._message_task.cancel() await self._message_task - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._message_task.cancel() await self._message_task - self._push_frame_task.cancel() - await self._push_frame_task - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await super().push_frame(frame, direction) - self._push_queue.task_done() - running = not isinstance(frame, EndFrame) - except asyncio.CancelledError: - break async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True): frame = TransportMessageFrame( diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index eef0d56cc..5f0ee089b 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -41,7 +41,7 @@ class OutputParams(BaseModel): clock_sync: bool = True def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._out_params = out_params @@ -62,10 +62,6 @@ def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), bus.add_signal_watch() bus.connect("message", self._on_gstreamer_message) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -80,60 +76,28 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self._stop(frame) # Other frames else: - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) async def _start(self, frame: StartFrame): self._player.set_state(Gst.State.PLAYING) async def _stop(self, frame: EndFrame): self._player.set_state(Gst.State.NULL) - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._player.set_state(Gst.State.NULL) - # Cancel all the tasks and wait for them to finish. - self._push_frame_task.cancel() - await self._push_frame_task - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break # - # GStreaner + # GStreamer # def _on_gstreamer_message(self, bus: Gst.Bus, message: Gst.Message): @@ -221,7 +185,7 @@ def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): frame = AudioRawFrame(audio=info.data, sample_rate=self._out_params.audio_sample_rate, num_channels=self._out_params.audio_channels) - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK @@ -232,6 +196,6 @@ def _appsink_video_new_sample(self, appsink: GstApp.AppSink): image=info.data, size=(self._out_params.video_width, self._out_params.video_height), format="RGB") - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index 40304a5c6..42b81517e 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -8,19 +8,14 @@ from typing import Awaitable, Callable, List -from pipecat.frames.frames import Frame, SystemFrame -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.frames.frames import Frame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class IdleFrameProcessor(AsyncFrameProcessor): +class IdleFrameProcessor(FrameProcessor): """This class waits to receive any frame or list of desired frames within a given timeout. If the timeout is reached before receiving any of those frames the provided callback will be called. - - The callback can then be used to push frames downstream by using - `queue_frame()` (or `push_frame()` for system frames). - """ def __init__( @@ -30,7 +25,7 @@ def __init__( timeout: float, types: List[type] = [], **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._callback = callback self._timeout = timeout @@ -41,10 +36,7 @@ def __init__( async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) + await self.push_frame(frame, direction) # If we are not waiting for any specific frame set the event, otherwise # check if we have received one of the desired frames. @@ -55,7 +47,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, t): self._idle_event.set() - # If we are not waiting for any specific frame set the event, otherwise async def cleanup(self): self._idle_task.cancel() await self._idle_task diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 7deda2555..36c394a5d 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -11,21 +11,16 @@ from pipecat.frames.frames import ( BotSpeakingFrame, Frame, - SystemFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class UserIdleProcessor(AsyncFrameProcessor): +class UserIdleProcessor(FrameProcessor): """This class is useful to check if the user is interacting with the bot within a given timeout. If the timeout is reached before any interaction occurred the provided callback will be called. - The callback can then be used to push frames downstream by using - `queue_frame()` (or `push_frame()` for system frames). - """ def __init__( @@ -34,7 +29,7 @@ def __init__( callback: Callable[["UserIdleProcessor"], Awaitable[None]], timeout: float, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._callback = callback self._timeout = timeout @@ -46,10 +41,7 @@ def __init__( async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) + await self.push_frame(frame, direction) # We shouldn't call the idle callback if the user or the bot are speaking. if isinstance(frame, UserStartedSpeakingFrame): diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 7291e7db9..d7226dba3 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -32,7 +32,6 @@ UserImageRequestFrame, VisionImageRawFrame ) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.utils.audio import calculate_audio_volume @@ -67,7 +66,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, EndFrame): await self.stop(frame) - async def process_generator(self, generator: AsyncGenerator[Frame, None]): + async def process_generator(self, generator: AsyncGenerator[Frame | None, None]): async for f in generator: if f: if isinstance(f, ErrorFrame): @@ -76,30 +75,6 @@ async def process_generator(self, generator: AsyncGenerator[Frame, None]): await self.push_frame(f) -class AsyncAIService(AsyncFrameProcessor): - def __init__(self, **kwargs): - super().__init__(**kwargs) - - async def start(self, frame: StartFrame): - pass - - async def stop(self, frame: EndFrame): - pass - - async def cancel(self, frame: CancelFrame): - pass - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartFrame): - await self.start(frame) - elif isinstance(frame, CancelFrame): - await self.cancel(frame) - elif isinstance(frame, EndFrame): - await self.stop(frame) - - class LLMService(AIService): """This class is a no-op but serves as a base class for LLM services.""" diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index c2f984b75..90674fcc4 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -18,13 +18,11 @@ ErrorFrame, Frame, StartFrame, - SystemFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame, URLImageRawFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService, TTSService, ImageGenService +from pipecat.services.ai_services import STTService, TTSService, ImageGenService from pipecat.services.openai import BaseOpenAILLMService from pipecat.utils.time import time_now_iso8601 @@ -126,7 +124,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.error(f"{self} error: {cancellation_details.error_details}") -class AzureSTTService(AsyncAIService): +class AzureSTTService(STTService): def __init__( self, *, @@ -149,15 +147,11 @@ def __init__( speech_config=speech_config, audio_config=audio_config) self._speech_recognizer.recognized.connect(self._on_handle_recognized) - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - self._audio_stream.write(frame.audio) - else: - await self._push_queue.put((frame, direction)) + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + self._audio_stream.write(audio) + await self.stop_processing_metrics() + yield None async def start(self, frame: StartFrame): await super().start(frame) @@ -176,7 +170,7 @@ async def cancel(self, frame: CancelFrame): def _on_handle_recognized(self, event): if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0: frame = TranscriptionFrame(event.result.text, "", time_now_iso8601()) - asyncio.run_coroutine_threadsafe(self.queue_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) class AzureImageGenServiceREST(ImageGenService): diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index d899d4bdb..708c3c511 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -161,8 +161,8 @@ async def cancel(self, frame: CancelFrame): async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: await self.start_processing_metrics() await self._connection.send(audio) - yield None await self.stop_processing_metrics() + yield None async def _connect(self): if await self._connection.start(self._live_options): diff --git a/src/pipecat/services/gladia.py b/src/pipecat/services/gladia.py index 886300897..ead8f63dc 100644 --- a/src/pipecat/services/gladia.py +++ b/src/pipecat/services/gladia.py @@ -7,20 +7,17 @@ import base64 import json -from typing import Optional +from typing import AsyncGenerator, Optional from pydantic.main import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, InterimTranscriptionFrame, StartFrame, - SystemFrame, TranscriptionFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService +from pipecat.services.ai_services import STTService from pipecat.utils.time import time_now_iso8601 from loguru import logger @@ -35,7 +32,7 @@ raise Exception(f"Missing module: {e}") -class GladiaSTTService(AsyncAIService): +class GladiaSTTService(STTService): class InputParams(BaseModel): sample_rate: Optional[int] = 16000 language: Optional[str] = "english" @@ -50,23 +47,13 @@ def __init__(self, confidence: float = 0.5, params: InputParams = InputParams(), **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._api_key = api_key self._url = url self._params = params self._confidence = confidence - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - await self._send_audio(frame) - else: - await self.queue_frame(frame, direction) - async def start(self, frame: StartFrame): await super().start(frame) self._websocket = await websockets.connect(self._url) @@ -81,6 +68,12 @@ async def cancel(self, frame: CancelFrame): await super().cancel(frame) await self._websocket.close() + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + await self._send_audio(audio) + await self.stop_processing_metrics() + yield None + async def _setup_gladia(self): configuration = { "x_gladia_key": self._api_key, @@ -92,9 +85,9 @@ async def _setup_gladia(self): await self._websocket.send(json.dumps(configuration)) - async def _send_audio(self, frame: AudioRawFrame): + async def _send_audio(self, audio: bytes): message = { - 'frames': base64.b64encode(frame.audio).decode("utf-8") + 'frames': base64.b64encode(audio).decode("utf-8") } await self._websocket.send(json.dumps(message)) @@ -113,6 +106,6 @@ async def _receive_task_handler(self): transcript = utterance["transcription"] if confidence >= self._confidence: if type == "final": - await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) else: - await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 638e394a1..60f0cb7df 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -46,7 +46,7 @@ def __init__( **kwargs): # Let TTSService produce TTSStoppedFrames after a short delay of # no activity. - super().__init__(push_stop_frames=True, sample_rate=sample_rate, **kwargs) + super().__init__(sync=False, push_stop_frames=True, sample_rate=sample_rate, **kwargs) self._api_key = api_key self._voice_id = voice_id diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 3d1d0c4d7..8836fbd1e 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -31,16 +31,12 @@ class BaseInputTransport(FrameProcessor): def __init__(self, params: TransportParams, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._params = params self._executor = ThreadPoolExecutor(max_workers=5) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create audio input queue and task if needed. if self._params.audio_in_enabled or self._params.vad_enabled: @@ -53,10 +49,6 @@ async def stop(self, frame: EndFrame): self._audio_task.cancel() await self._audio_task - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task - async def cancel(self, frame: CancelFrame): # Cancel all the tasks and wait for them to finish. @@ -64,9 +56,6 @@ async def cancel(self, frame: CancelFrame): self._audio_task.cancel() await self._audio_task - self._push_frame_task.cancel() - await self._push_frame_task - def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer @@ -86,11 +75,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, BotInterruptionFrame): - await self._handle_interruptions(frame, False) - elif isinstance(frame, StartInterruptionFrame): + logger.debug("Bot interruption") await self._start_interruption() - elif isinstance(frame, StopInterruptionFrame): - await self._stop_interruption() # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -98,12 +84,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self.start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self.stop(frame) elif isinstance(frame, VADParamsUpdateFrame): vad_analyzer = self.vad_analyzer() @@ -111,73 +97,28 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): vad_analyzer.set_params(frame.params) # Other frames else: - await self._internal_push_frame(frame, direction) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(frame, direction) # # Handle interruptions # - async def _start_interruption(self): - if not self.interruptions_allowed: - return - - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task) to stop everything, specially at the output - # transport. - await self.push_frame(StartInterruptionFrame()) - # Create a new queue and task. - self._create_push_task() - - async def _stop_interruption(self): - if not self.interruptions_allowed: - return - - await self.push_frame(StopInterruptionFrame()) - - async def _handle_interruptions(self, frame: Frame, push_frame: bool): + async def _handle_interruptions(self, frame: Frame): if self.interruptions_allowed: - # Make sure we notify about interruptions quickly out-of-band - if isinstance(frame, BotInterruptionFrame): - logger.debug("Bot interruption") - await self._start_interruption() - elif isinstance(frame, UserStartedSpeakingFrame): + # Make sure we notify about interruptions quickly out-of-band. + if isinstance(frame, UserStartedSpeakingFrame): logger.debug("User started speaking") await self._start_interruption() + # Push an out-of-band frame (i.e. not using the ordered push + # frame task) to stop everything, specially at the output + # transport. + await self.push_frame(StartInterruptionFrame()) elif isinstance(frame, UserStoppedSpeakingFrame): logger.debug("User stopped speaking") await self._stop_interruption() + await self.push_frame(StopInterruptionFrame()) - if push_frame: - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio input @@ -201,7 +142,7 @@ async def _handle_vad(self, audio_frames: bytes, vad_state: VADState): frame = UserStoppedSpeakingFrame() if frame: - await self._handle_interruptions(frame, True) + await self._handle_interruptions(frame) vad_state = new_vad_state return vad_state @@ -222,7 +163,7 @@ async def _audio_task_handler(self): # Push audio downstream if passthrough. if audio_passthrough: - await self._internal_push_frame(frame) + await self.push_frame(frame) self._audio_in_queue.task_done() except asyncio.CancelledError: diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index a24c9f4d2..bc683721a 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -43,7 +43,7 @@ class BaseOutputTransport(FrameProcessor): def __init__(self, params: TransportParams, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._params = params @@ -70,10 +70,6 @@ def __init__(self, params: TransportParams, **kwargs): # generating frames upstream while, for example, the audio is playing. self._create_sink_tasks() - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create camera output queue and task if needed. if self._params.camera_out_enabled: @@ -95,9 +91,8 @@ async def stop(self, frame: EndFrame): self._audio_out_task.cancel() await self._audio_out_task - # Wait for the push frame and sink tasks to finish. They will finish when - # the EndFrame is actually processed. - await self._push_frame_task + # Wait for the sink task to finish. They will finish when the EndFrame + # is actually processed. await self._sink_task async def cancel(self, frame: CancelFrame): @@ -107,9 +102,6 @@ async def cancel(self, frame: CancelFrame): self._camera_out_task.cancel() await self._camera_out_task - self._push_frame_task.cancel() - await self._push_frame_task - self._sink_task.cancel() await self._sink_task @@ -182,10 +174,6 @@ async def _handle_interruptions(self, frame: Frame): await self._sink_clock_task # Create sink tasks. self._create_sink_tasks() - # Stop push task. - self._push_frame_task.cancel() - await self._push_frame_task - self._create_push_task() # Let's send a bot stopped speaking if we have to. if self._bot_speaking: await self._bot_stopped_speaking() @@ -227,7 +215,7 @@ def _create_sink_tasks(self): async def _sink_frame_handler(self, frame: Frame): if isinstance(frame, AudioRawFrame): await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) elif isinstance(frame, ImageRawFrame): await self._set_camera_image(frame) @@ -237,12 +225,12 @@ async def _sink_frame_handler(self, frame: Frame): await self.send_message(frame) elif isinstance(frame, TTSStartedFrame): await self._bot_started_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) elif isinstance(frame, TTSStoppedFrame): await self._bot_stopped_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) async def _sink_task_handler(self): running = True @@ -261,7 +249,7 @@ async def _sink_clock_frame_handler(self, frame: Frame): # TODO(aleix): For now we just process TextFrame. But we should process # audio and video as well. if isinstance(frame, TextFrame): - await self._internal_push_frame(frame) + await self.push_frame(frame) async def _sink_clock_task_handler(self): running = True @@ -293,38 +281,12 @@ async def _sink_clock_task_handler(self): async def _bot_started_speaking(self): logger.debug("Bot started speaking") self._bot_speaking = True - await self._internal_push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) + await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) async def _bot_stopped_speaking(self): logger.debug("Bot stopped speaking") self._bot_speaking = False - await self._internal_push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) # # Camera out @@ -408,7 +370,7 @@ async def _audio_out_task_handler(self): try: frame = await self._audio_out_queue.get() await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) except asyncio.CancelledError: break diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 08ca9719a..819950d72 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -98,9 +98,9 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p continue if isinstance(frame, AudioRawFrame): - await self.push_audio_frame(frame) + await self.queue_audio_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) # Notify disconnection await self._callbacks.on_client_disconnected(websocket) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 7cf330b9e..210cc7341 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -625,11 +625,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # async def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame): - await self._internal_push_frame(frame) + await self.push_frame(frame) async def push_app_message(self, message: Any, sender: str): frame = DailyTransportMessageFrame(message=message, participant_id=sender) - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio in @@ -692,7 +692,7 @@ async def _on_participant_video_frame(self, participant_id: str, buffer, size, f image=buffer, size=size, format=format) - await self._internal_push_frame(frame) + await self.push_frame(frame) self._video_renderers[participant_id]["timestamp"] = curr_time From 0ed3d118d6cfbafe513f5f5dba5441e8ef245615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Sun, 1 Sep 2024 15:10:08 -0700 Subject: [PATCH 02/11] services(moondream); update revision to 2024-08-26 --- CHANGELOG.md | 2 ++ src/pipecat/services/moondream.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a757c7ffd..f2618edab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Updated `MoondreamService` revision to `2024-08-26`. + - `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation timestamps to their text output. This allows the output transport to push the text frames downstream at almost the same time the words are spoken. We say diff --git a/src/pipecat/services/moondream.py b/src/pipecat/services/moondream.py index cff8d3172..10ac3353e 100644 --- a/src/pipecat/services/moondream.py +++ b/src/pipecat/services/moondream.py @@ -48,7 +48,7 @@ def __init__( self, *, model="vikhyatk/moondream2", - revision="2024-04-02", + revision="2024-08-26", use_cpu=False ): super().__init__() From 23d6eed5ea92dfa3bed2ecb58f21b1a10281a9c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 22:34:24 -0700 Subject: [PATCH 03/11] transports: input()/output() return subclass instead of base class --- src/pipecat/transports/local/tk.py | 4 ++-- src/pipecat/transports/network/fastapi_websocket.py | 4 ++-- src/pipecat/transports/network/websocket_server.py | 4 ++-- src/pipecat/transports/services/daily.py | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index 588e7d2ae..e7dc04902 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -141,12 +141,12 @@ def __init__(self, tk_root: tk.Tk, params: TransportParams): # BaseTransport # - def input(self) -> FrameProcessor: + def input(self) -> TkInputTransport: if not self._input: self._input = TkInputTransport(self._pyaudio, self._params) return self._input - def output(self) -> FrameProcessor: + def output(self) -> TkOutputTransport: if not self._output: self._output = TkOutputTransport(self._tk_root, self._pyaudio, self._params) return self._output diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index 2c4bd187b..7169c73bd 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -164,10 +164,10 @@ def __init__( self._register_event_handler("on_client_connected") self._register_event_handler("on_client_disconnected") - def input(self) -> FrameProcessor: + def input(self) -> FastAPIWebsocketInputTransport: return self._input - def output(self) -> FrameProcessor: + def output(self) -> FastAPIWebsocketOutputTransport: return self._output async def _on_client_connected(self, websocket): diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 819950d72..c17818898 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -190,13 +190,13 @@ def __init__( self._register_event_handler("on_client_connected") self._register_event_handler("on_client_disconnected") - def input(self) -> FrameProcessor: + def input(self) -> WebsocketServerInputTransport: if not self._input: self._input = WebsocketServerInputTransport( self._host, self._port, self._params, self._callbacks, name=self._input_name) return self._input - def output(self) -> FrameProcessor: + def output(self) -> WebsocketServerOutputTransport: if not self._output: self._output = WebsocketServerOutputTransport(self._params, name=self._output_name) return self._output diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 210cc7341..2a45adf36 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -811,12 +811,12 @@ def __init__( # BaseTransport # - def input(self) -> FrameProcessor: + def input(self) -> DailyInputTransport: if not self._input: self._input = DailyInputTransport(self._client, self._params, name=self._input_name) return self._input - def output(self) -> FrameProcessor: + def output(self) -> DailyOutputTransport: if not self._output: self._output = DailyOutputTransport(self._client, self._params, name=self._output_name) return self._output From f078d156de65f674effa7df8418c168c00a288cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 22:36:59 -0700 Subject: [PATCH 04/11] frames: StartFrame is now a SystemFrame --- CHANGELOG.md | 4 ++++ src/pipecat/frames/frames.py | 20 +++++++++---------- src/pipecat/processors/frameworks/rtvi.py | 12 +++++------ .../processors/gstreamer/pipeline_source.py | 12 +++++------ src/pipecat/transports/base_input.py | 12 +++++------ src/pipecat/transports/base_output.py | 10 ++++++---- 6 files changed, 38 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f2618edab..e1c6bbadc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `StartFrame` is back a system frame so we make sure it's processed immediately + by all processors. `EndFrame` stays a control frame since it needs to be + ordered allowing the frames in the pipeline to be processed. + - Updated `MoondreamService` revision to `2024-08-26`. - `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 51770dff1..a400d68d9 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -248,6 +248,16 @@ class SystemFrame(Frame): pass +@dataclass +class StartFrame(SystemFrame): + """This is the first frame that should be pushed down a pipeline.""" + clock: BaseClock + allow_interruptions: bool = False + enable_metrics: bool = False + enable_usage_metrics: bool = False + report_only_initial_ttfb: bool = False + + @dataclass class CancelFrame(SystemFrame): """Indicates that a pipeline needs to stop right away.""" @@ -338,16 +348,6 @@ class ControlFrame(Frame): pass -@dataclass -class StartFrame(ControlFrame): - """This is the first frame that should be pushed down a pipeline.""" - clock: BaseClock - allow_interruptions: bool = False - enable_metrics: bool = False - enable_usage_metrics: bool = False - report_only_initial_ttfb: bool = False - - @dataclass class EndFrame(ControlFrame): """Indicates that a pipeline has ended and frame processors and pipelines diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index d32f0f640..66adb9ad0 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -337,7 +337,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # Specific system frames - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self._start(frame) + elif isinstance(frame, CancelFrame): await self._cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, ErrorFrame): @@ -347,11 +352,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames - elif isinstance(frame, StartFrame): - # Push StartFrame before start(), because we want StartFrame to be - # processed by every processor before any other frame is processed. - await self.push_frame(frame, direction) - await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index 5f0ee089b..8d46105a7 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -66,18 +66,18 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # Specific system frames - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self._start(frame) + elif isinstance(frame, CancelFrame): await self._cancel(frame) await self.push_frame(frame, direction) # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames - elif isinstance(frame, StartFrame): - # Push StartFrame before start(), because we want StartFrame to be - # processed by every processor before any other frame is processed. - await self.push_frame(frame, direction) - await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 8836fbd1e..de1ec8884 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -71,7 +71,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # Specific system frames - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self.start(frame) + elif isinstance(frame, CancelFrame): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, BotInterruptionFrame): @@ -81,11 +86,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames - elif isinstance(frame, StartFrame): - # Push StartFrame before start(), because we want StartFrame to be - # processed by every processor before any other frame is processed. - await self.push_frame(frame, direction) - await self.start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index bc683721a..461f0567d 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -129,7 +129,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # immediately. Other frames require order so they are put in the sink # queue. # - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self.start(frame) + elif isinstance(frame, CancelFrame): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame): @@ -141,9 +146,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames. - elif isinstance(frame, StartFrame): - await self._sink_queue.put(frame) - await self.start(frame) elif isinstance(frame, EndFrame): await self._sink_clock_queue.put((sys.maxsize, frame.id, frame)) await self._sink_queue.put(frame) From fbf6eef68ff5c723d7a24a4e57f014a253f52af4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 22:40:52 -0700 Subject: [PATCH 05/11] transports(base_output): wait for sink tasks before canceling audio/video tasks --- CHANGELOG.md | 4 ++++ src/pipecat/transports/base_output.py | 31 ++++++++++++++++++--------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1c6bbadc..2d9fcadaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a `BaseOutputTransport` issue that would stop audio and video rendering + tasks (after receiving and `EndFrame`) before the internal queue was emptied, + causing the pipeline to finish prematurely. + - `StartFrame` should be the first frame every processor receives to avoid situations where things are not initialized (because initialization happens on `StartFrame`) and other frames come in resulting in undesired behavior. diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 461f0567d..9b1b9c29e 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -81,6 +81,13 @@ async def start(self, frame: StartFrame): self._audio_out_task = self.get_event_loop().create_task(self._audio_out_task_handler()) async def stop(self, frame: EndFrame): + # At this point we have enqueued an EndFrame and we need to wait for + # that EndFrame to be processed by the sink tasks. We also need to wait + # for these tasks before cancelling the camera and audio tasks below + # because they might be still rendering. + await self._sink_task + await self._sink_clock_task + # Cancel and wait for the camera output task to finish. if self._params.camera_out_enabled: self._camera_out_task.cancel() @@ -91,19 +98,23 @@ async def stop(self, frame: EndFrame): self._audio_out_task.cancel() await self._audio_out_task - # Wait for the sink task to finish. They will finish when the EndFrame - # is actually processed. - await self._sink_task - async def cancel(self, frame: CancelFrame): - # Cancel all the tasks and wait for them to finish. + # Since we are cancelling everything it doesn't matter if we cancel sink + # tasks first or not. + self._sink_task.cancel() + self._sink_clock_task.cancel() + await self._sink_task + await self._sink_clock_task + # Cancel and wait for the camera output task to finish. if self._params.camera_out_enabled: self._camera_out_task.cancel() await self._camera_out_task - self._sink_task.cancel() - await self._sink_task + # Cancel and wait for the audio output task to finish. + if self._params.audio_out_enabled and self._params.audio_out_is_live: + self._audio_out_task.cancel() + await self._audio_out_task async def send_message(self, frame: TransportMessageFrame): pass @@ -259,7 +270,7 @@ async def _sink_clock_task_handler(self): try: timestamp, _, frame = await self._sink_clock_queue.get() - # If we hit an EndFrame, we cna finish right away. + # If we hit an EndFrame, we can finish right away. running = not isinstance(frame, EndFrame) # If we have a frame we check it's presentation timestamp. If it @@ -327,9 +338,9 @@ async def _camera_out_task_handler(self): elif self._camera_images: image = next(self._camera_images) await self._draw_image(image) - await asyncio.sleep(1.0 / self._params.camera_out_framerate) + await asyncio.sleep(self._camera_out_frame_duration) else: - await asyncio.sleep(1.0 / self._params.camera_out_framerate) + await asyncio.sleep(self._camera_out_frame_duration) except asyncio.CancelledError: break except Exception as e: From 8224538372d192d677313e3229a115ac86b56fc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 22:46:13 -0700 Subject: [PATCH 06/11] services(cartesia): added CartesiaHttpTTSService --- CHANGELOG.md | 4 ++ pyproject.toml | 2 +- src/pipecat/services/cartesia.py | 88 ++++++++++++++++++++++++++++++-- 3 files changed, 89 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d9fcadaf..752666c07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `CartesiaHttpTTSService`. This is a synchronous frame processor + (i.e. given an input text frame it will wait for the whole output before + returning). + - A clock can now be specified to `PipelineTask` (defaults to `SystemClock`). This clock will be passed to each frame processor via the `StartFrame`. diff --git a/pyproject.toml b/pyproject.toml index 8a1e3a800..170ecd326 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ Website = "https://pipecat.ai" [project.optional-dependencies] anthropic = [ "anthropic~=0.34.0" ] azure = [ "azure-cognitiveservices-speech~=1.40.0" ] -cartesia = [ "websockets~=12.0" ] +cartesia = [ "cartesia~=1.0.13", "websockets~=12.0" ] daily = [ "daily-python~=0.10.1" ] deepgram = [ "deepgram-sdk~=3.5.0" ] elevenlabs = [ "websockets~=12.0" ] diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index ea790fab7..078926235 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -8,7 +8,6 @@ import uuid import base64 import asyncio -import time from typing import AsyncGenerator @@ -22,17 +21,17 @@ EndFrame, TTSStartedFrame, TTSStoppedFrame, - TextFrame, LLMFullResponseEndFrame ) from pipecat.processors.frame_processor import FrameDirection from pipecat.transcriptions.language import Language -from pipecat.services.ai_services import AsyncWordTTSService +from pipecat.services.ai_services import AsyncWordTTSService, TTSService from loguru import logger # See .env.example for Cartesia configuration needed try: + from cartesia import AsyncCartesia import websockets except ModuleNotFoundError as e: logger.error(f"Exception: {e}") @@ -165,7 +164,7 @@ async def _handle_interruption(self, frame: StartInterruptionFrame, direction: F async def flush_audio(self): if not self._context_id or not self._websocket: return - logger.debug("Flushing audio") + logger.trace("Flushing audio") msg = { "transcript": "", "continue": False, @@ -257,3 +256,84 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: logger.error(f"{self} exception: {e}") + + +class CartesiaHttpTTSService(TTSService): + + def __init__( + self, + *, + api_key: str, + voice_id: str, + model_id: str = "sonic-english", + base_url: str = "https://api.cartesia.ai", + encoding: str = "pcm_s16le", + sample_rate: int = 16000, + language: str = "en", + **kwargs): + super().__init__(**kwargs) + + self._api_key = api_key + self._voice_id = voice_id + self._model_id = model_id + self._output_format = { + "container": "raw", + "encoding": encoding, + "sample_rate": sample_rate, + } + self._language = language + + self._client = AsyncCartesia(api_key=api_key, base_url=base_url) + + def can_generate_metrics(self) -> bool: + return True + + async def set_model(self, model: str): + logger.debug(f"Switching TTS model to: [{model}]") + self._model_id = model + + async def set_voice(self, voice: str): + logger.debug(f"Switching TTS voice to: [{voice}]") + self._voice_id = voice + + async def set_language(self, language: Language): + logger.debug(f"Switching TTS language to: [{language}]") + self._language = language_to_cartesia_language(language) + + async def stop(self, frame: EndFrame): + await super().stop(frame) + await self._client.close() + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + await self._client.close() + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + await self.push_frame(TTSStartedFrame()) + await self.start_ttfb_metrics() + + try: + output = await self._client.tts.sse( + model_id=self._model_id, + transcript=text, + voice_id=self._voice_id, + output_format=self._output_format, + language=self._language, + stream=False + ) + + await self.stop_ttfb_metrics() + + frame = AudioRawFrame( + audio=output["audio"], + sample_rate=self._output_format["sample_rate"], + num_channels=1 + ) + yield frame + except Exception as e: + logger.error(f"{self} exception: {e}") + + await self.start_tts_usage_metrics(text) + await self.push_frame(TTSStoppedFrame()) From 0e8f56c7525a7abc13949b4bb35dea3bd8a88d04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 22:47:17 -0700 Subject: [PATCH 07/11] services: move TTSService push_stop_frames to AsyncTTSService --- src/pipecat/services/ai_services.py | 42 ++++++++++++++++------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index d7226dba3..b63188512 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -140,21 +140,15 @@ def __init__( self, *, aggregate_sentences: bool = True, - # if True, subclass is responsible for pushing TextFrames and LLMFullResponseEndFrames + # if True, TTSService will push TextFrames and LLMFullResponseEndFrames, + # otherwise subclass must do it push_text_frames: bool = True, - # if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it - push_stop_frames: bool = False, - # if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame - stop_frame_timeout_s: float = 1.0, + # TTS output sample rate sample_rate: int = 16000, **kwargs): super().__init__(**kwargs) self._aggregate_sentences: bool = aggregate_sentences self._push_text_frames: bool = push_text_frames - self._push_stop_frames: bool = push_stop_frames - self._stop_frame_timeout_s: float = stop_frame_timeout_s - self._stop_frame_task: Optional[asyncio.Task] = None - self._stop_frame_queue: asyncio.Queue = asyncio.Queue() self._current_sentence: str = "" self._sample_rate: int = sample_rate @@ -199,7 +193,7 @@ async def _process_text_frame(self, frame: TextFrame): if text: await self._push_tts_frames(text) - async def _push_tts_frames(self, text: str, text_passthrough: bool = True): + async def _push_tts_frames(self, text: str): text = text.strip() if not text: return @@ -239,6 +233,25 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): else: await self.push_frame(frame, direction) + +class AsyncTTSService(TTSService): + def __init__( + self, + # if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it + push_stop_frames: bool = False, + # if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame + stop_frame_timeout_s: float = 1.0, + **kwargs): + super().__init__(sync=False, **kwargs) + self._push_stop_frames: bool = push_stop_frames + self._stop_frame_timeout_s: float = stop_frame_timeout_s + self._stop_frame_task: Optional[asyncio.Task] = None + self._stop_frame_queue: asyncio.Queue = asyncio.Queue() + + @abstractmethod + async def flush_audio(self): + pass + async def start(self, frame: StartFrame): await super().start(frame) if self._push_stop_frames: @@ -287,15 +300,6 @@ async def _stop_frame_handler(self): pass -class AsyncTTSService(TTSService): - def __init__(self, **kwargs): - super().__init__(**kwargs) - - @abstractmethod - async def flush_audio(self): - pass - - class AsyncWordTTSService(AsyncTTSService): def __init__(self, **kwargs): super().__init__(**kwargs) From 3298f935ef5de51a9b701706535340577f7f5eec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 22:48:06 -0700 Subject: [PATCH 08/11] services(fal,moondream): add missing **kwargs --- src/pipecat/services/fal.py | 3 ++- src/pipecat/services/moondream.py | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/pipecat/services/fal.py b/src/pipecat/services/fal.py index 4d99f6066..672135d02 100644 --- a/src/pipecat/services/fal.py +++ b/src/pipecat/services/fal.py @@ -43,8 +43,9 @@ def __init__( aiohttp_session: aiohttp.ClientSession, model: str = "fal-ai/fast-sdxl", key: str | None = None, + **kwargs ): - super().__init__() + super().__init__(**kwargs) self._model = model self._params = params self._aiohttp_session = aiohttp_session diff --git a/src/pipecat/services/moondream.py b/src/pipecat/services/moondream.py index 10ac3353e..3441aeeb9 100644 --- a/src/pipecat/services/moondream.py +++ b/src/pipecat/services/moondream.py @@ -46,12 +46,13 @@ def detect_device(): class MoondreamService(VisionService): def __init__( self, - *, + *, model="vikhyatk/moondream2", revision="2024-08-26", - use_cpu=False + use_cpu=False, + **kwargs ): - super().__init__() + super().__init__(**kwargs) if not use_cpu: device, dtype = detect_device() From 62e9a33a70ef504d557e19ad3cf88ca07ec84fde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 22:51:00 -0700 Subject: [PATCH 09/11] examples: use CartesiaHttpTTSService to synchronize frames --- examples/foundational/01-say-one-thing.py | 8 ++++---- examples/foundational/02-llm-say-one-thing.py | 8 ++++---- examples/foundational/06-listen-and-respond.py | 5 ----- examples/foundational/06a-image-sync.py | 8 ++++---- examples/foundational/11-sound-effects.py | 8 ++++---- 5 files changed, 16 insertions(+), 21 deletions(-) diff --git a/examples/foundational/01-say-one-thing.py b/examples/foundational/01-say-one-thing.py index 8102518f7..fce774822 100644 --- a/examples/foundational/01-say-one-thing.py +++ b/examples/foundational/01-say-one-thing.py @@ -9,11 +9,11 @@ import os import sys -from pipecat.frames.frames import TextFrame +from pipecat.frames.frames import EndFrame, TextFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.task import PipelineTask from pipecat.pipeline.runner import PipelineRunner -from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.transports.services.daily import DailyParams, DailyTransport from runner import configure @@ -34,7 +34,7 @@ async def main(): transport = DailyTransport( room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)) - tts = CartesiaTTSService( + tts = CartesiaHttpTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) @@ -48,7 +48,7 @@ async def main(): @transport.event_handler("on_participant_joined") async def on_new_participant_joined(transport, participant): participant_name = participant["info"]["userName"] or '' - await task.queue_frame(TextFrame(f"Hello there, {participant_name}!")) + await task.queue_frames([TextFrame(f"Hello there, {participant_name}!"), EndFrame()]) await runner.run(task) diff --git a/examples/foundational/02-llm-say-one-thing.py b/examples/foundational/02-llm-say-one-thing.py index e53aee9ae..00a1e9e51 100644 --- a/examples/foundational/02-llm-say-one-thing.py +++ b/examples/foundational/02-llm-say-one-thing.py @@ -9,11 +9,11 @@ import os import sys -from pipecat.frames.frames import LLMMessagesFrame +from pipecat.frames.frames import EndFrame, LLMMessagesFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask -from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -38,7 +38,7 @@ async def main(): "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)) - tts = CartesiaTTSService( + tts = CartesiaHttpTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) @@ -59,7 +59,7 @@ async def main(): @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): - await task.queue_frame(LLMMessagesFrame(messages)) + await task.queue_frames([LLMMessagesFrame(messages), EndFrame()]) await runner.run(task) diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index 35965185f..e99a95068 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -90,11 +90,6 @@ async def main(): ]) task = PipelineTask(pipeline) - task = PipelineTask(pipeline, PipelineParams( - allow_interruptions=True, - enable_metrics=True, - report_only_initial_ttfb=False, - )) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 812dab137..6b3e58cf7 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -20,8 +20,8 @@ LLMUserResponseAggregator, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.transports.services.daily import DailyTransport from pipecat.vad.silero import SileroVADAnalyzer @@ -78,9 +78,9 @@ async def main(): ) ) - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) llm = OpenAILLMService( diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 146b3bd09..9dc4dc99b 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -25,7 +25,7 @@ ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.logger import FrameLogger -from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport from pipecat.vad.silero import SileroVADAnalyzer @@ -103,9 +103,9 @@ async def main(): api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id="ErXwobaYiN019PkySvjV", + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) messages = [ From 4f1b06e6b2d731eb04934f68cc15723d07d681b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 22:56:28 -0700 Subject: [PATCH 10/11] pipeline: renamed ParallelTask to SyncParallelPipeline --- CHANGELOG.md | 8 ++++ .../foundational/05-sync-speech-and-image.py | 34 ++++++++--------- .../05a-local-sync-speech-and-image.py | 38 ++++++++++++------- ...llel_task.py => sync_parallel_pipeline.py} | 6 +-- 4 files changed, 51 insertions(+), 35 deletions(-) rename src/pipecat/pipeline/{parallel_task.py => sync_parallel_pipeline.py} (94%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 752666c07..5d33b8257 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `ParallelTask` has been renamed to `SyncParallelPipeline`. A + `SyncParallelPipeline` is a frame processor that contains a list of different + pipelines to be executed concurrently. The difference between a + `SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame, + the `SyncParallelPipeline` will wait for all the internal pipelines to + complete. This is achieved by ensuring all the processors in each of the + internal pipelines are synchronous. + - `StartFrame` is back a system frame so we make sure it's processed immediately by all processors. `EndFrame` stays a control frame since it needs to be ordered allowing the frames in the pipeline to be processed. diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index ca3ff9557..07e54ab8a 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -14,21 +14,18 @@ from pipecat.frames.frames import ( AppFrame, Frame, - ImageRawFrame, LLMFullResponseStartFrame, LLMMessagesFrame, TextFrame ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline from pipecat.pipeline.task import PipelineTask -from pipecat.pipeline.parallel_task import ParallelTask from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.processors.aggregators.gated import GatedAggregator -from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator from pipecat.processors.aggregators.sentence import SentenceAggregator +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.fal import FalImageGenService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -88,9 +85,9 @@ async def main(): ) ) - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) llm = OpenAILLMService( @@ -105,24 +102,23 @@ async def main(): key=os.getenv("FAL_KEY"), ) - gated_aggregator = GatedAggregator( - gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame), - gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame), - start_open=False - ) - sentence_aggregator = SentenceAggregator() month_prepender = MonthPrepender() - llm_full_response_aggregator = LLMFullResponseAggregator() + # With `SyncParallelPipeline` we synchronize audio and images by pushing + # them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 I3 A3). To do + # that, each pipeline runs concurrently and `SyncParallelPipeline` will + # wait for the input frame to be processed. + # + # Note that `SyncParallelPipeline` requires all processors in it to be + # synchronous (which is the default for most processors). pipeline = Pipeline([ llm, # LLM sentence_aggregator, # Aggregates LLM output into full sentences - ParallelTask( # Run pipelines in parallel aggregating the result - [month_prepender, tts], # Create "Month: sentence" and output audio - [llm_full_response_aggregator, imagegen] # Aggregate full LLM response + SyncParallelPipeline( # Run pipelines in parallel aggregating the result + [month_prepender, tts], # Create "Month: sentence" and output audio + [imagegen] # Generate image ), - gated_aggregator, # Queues everything until an image is available transport.output() # Transport output ]) diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index 63bcf1e9d..66a6e7f57 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -12,17 +12,17 @@ import tkinter as tk from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame -from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline from pipecat.pipeline.task import PipelineTask -from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator +from pipecat.processors.aggregators.sentence import SentenceAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.fal import FalImageGenService from pipecat.transports.base_transport import TransportParams -from pipecat.transports.local.tk import TkLocalTransport +from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport from loguru import logger @@ -60,6 +60,7 @@ class AudioGrabber(FrameProcessor): def __init__(self): super().__init__() self.audio = bytearray() + self.frame = None async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -84,9 +85,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID")) + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) imagegen = FalImageGenService( params=FalImageGenService.InputParams( @@ -95,7 +97,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): aiohttp_session=session, key=os.getenv("FAL_KEY")) - aggregator = LLMFullResponseAggregator() + sentence_aggregator = SentenceAggregator() description = ImageDescription() @@ -103,12 +105,22 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): image_grabber = ImageGrabber() + # With `SyncParallelPipeline` we synchronize audio and images by + # pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 + # I3 A3). To do that, each pipeline runs concurrently and + # `SyncParallelPipeline` will wait for the input frame to be + # processed. + # + # Note that `SyncParallelPipeline` requires all processors in it to + # be synchronous (which is the default for most processors). pipeline = Pipeline([ - llm, - aggregator, - description, - ParallelPipeline([tts, audio_grabber], - [imagegen, image_grabber]) + llm, # LLM + sentence_aggregator, # Aggregates LLM output into full sentences + description, # Store sentence + SyncParallelPipeline( + [tts, audio_grabber], # Generate and store audio for the given sentence + [imagegen, image_grabber] # Generate and storeimage for the given sentence + ) ]) task = PipelineTask(pipeline) diff --git a/src/pipecat/pipeline/parallel_task.py b/src/pipecat/pipeline/sync_parallel_pipeline.py similarity index 94% rename from src/pipecat/pipeline/parallel_task.py rename to src/pipecat/pipeline/sync_parallel_pipeline.py index 724183b3b..d922134f4 100644 --- a/src/pipecat/pipeline/parallel_task.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -49,12 +49,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._down_queue.put(frame) -class ParallelTask(BasePipeline): +class SyncParallelPipeline(BasePipeline): def __init__(self, *args): super().__init__() if len(args) == 0: - raise Exception(f"ParallelTask needs at least one argument") + raise Exception(f"SyncParallelPipeline needs at least one argument") self._sinks = [] self._sources = [] @@ -66,7 +66,7 @@ def __init__(self, *args): logger.debug(f"Creating {self} pipelines") for processors in args: if not isinstance(processors, list): - raise TypeError(f"ParallelTask argument {processors} is not a list") + raise TypeError(f"SyncParallelPipeline argument {processors} is not a list") # We add a source at the beginning of the pipeline and a sink at the end. source = Source(self._up_queue) From 607a2465723ceab17e9e4b72f404b8acfba295d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 23:08:22 -0700 Subject: [PATCH 11/11] updated CHANGELOG with sync/async frame processors --- CHANGELOG.md | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d33b8257..985834aba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,13 +9,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added `CartesiaHttpTTSService`. This is a synchronous frame processor - (i.e. given an input text frame it will wait for the whole output before - returning). +- Pipecat has a pipeline-based architecture. The pipeline consists of frame + processors linked to each other. The elements traveling across the pipeline + are called frames. -- A clock can now be specified to `PipelineTask` (defaults to - `SystemClock`). This clock will be passed to each frame processor via the - `StartFrame`. + To have a deterministic behavior the frames traveling through the pipeline + should always be ordered, except system frames which are out-of-band + frames. To achieve that, each frame processor should only output frames from a + single task. + + In this version we introduce synchronous and asynchronous frame + processors. The synchronous processors push output frames from the same task + that they receive input frames, and therefore only pushing frames from one + task. Asynchronous frame processors can have internal tasks to perform things + asynchronously (e.g. receiving data from a websocket) but they also have a + single task where they push frames from. + + By default, frame processors are synchronous. To change a frame processor to + asynchronous you only need to pass `sync=False` to the base class constructor. - Added pipeline clocks. A pipeline clock is used by the output transport to know when a frame needs to be presented. For that, all frames now have an @@ -23,6 +34,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 clock implementation `SystemClock` and the `pts` field is currently only used for `TextFrame`s (audio and image frames will be next). +- A clock can now be specified to `PipelineTask` (defaults to + `SystemClock`). This clock will be passed to each frame processor via the + `StartFrame`. + +- Added `CartesiaHttpTTSService`. This is a synchronous frame processor + (i.e. given an input text frame it will wait for the whole output before + returning). + - `DailyTransport` now supports setting the audio bitrate to improve audio quality through the `DailyParams.audio_out_bitrate` parameter. The new default is 96kbps.