diff --git a/CHANGELOG.md b/CHANGELOG.md index a757c7ffd..985834aba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- A clock can now be specified to `PipelineTask` (defaults to - `SystemClock`). This clock will be passed to each frame processor via the - `StartFrame`. +- Pipecat has a pipeline-based architecture. The pipeline consists of frame + processors linked to each other. The elements traveling across the pipeline + are called frames. + + To have a deterministic behavior the frames traveling through the pipeline + should always be ordered, except system frames which are out-of-band + frames. To achieve that, each frame processor should only output frames from a + single task. + + In this version we introduce synchronous and asynchronous frame + processors. The synchronous processors push output frames from the same task + that they receive input frames, and therefore only pushing frames from one + task. Asynchronous frame processors can have internal tasks to perform things + asynchronously (e.g. receiving data from a websocket) but they also have a + single task where they push frames from. + + By default, frame processors are synchronous. To change a frame processor to + asynchronous you only need to pass `sync=False` to the base class constructor. - Added pipeline clocks. A pipeline clock is used by the output transport to know when a frame needs to be presented. For that, all frames now have an @@ -19,6 +34,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 clock implementation `SystemClock` and the `pts` field is currently only used for `TextFrame`s (audio and image frames will be next). +- A clock can now be specified to `PipelineTask` (defaults to + `SystemClock`). This clock will be passed to each frame processor via the + `StartFrame`. + +- Added `CartesiaHttpTTSService`. This is a synchronous frame processor + (i.e. given an input text frame it will wait for the whole output before + returning). + - `DailyTransport` now supports setting the audio bitrate to improve audio quality through the `DailyParams.audio_out_bitrate` parameter. The new default is 96kbps. @@ -40,6 +63,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `ParallelTask` has been renamed to `SyncParallelPipeline`. A + `SyncParallelPipeline` is a frame processor that contains a list of different + pipelines to be executed concurrently. The difference between a + `SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame, + the `SyncParallelPipeline` will wait for all the internal pipelines to + complete. This is achieved by ensuring all the processors in each of the + internal pipelines are synchronous. + +- `StartFrame` is back a system frame so we make sure it's processed immediately + by all processors. `EndFrame` stays a control frame since it needs to be + ordered allowing the frames in the pipeline to be processed. + +- Updated `MoondreamService` revision to `2024-08-26`. + - `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation timestamps to their text output. This allows the output transport to push the text frames downstream at almost the same time the words are spoken. We say @@ -60,6 +97,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a `BaseOutputTransport` issue that would stop audio and video rendering + tasks (after receiving and `EndFrame`) before the internal queue was emptied, + causing the pipeline to finish prematurely. + - `StartFrame` should be the first frame every processor receives to avoid situations where things are not initialized (because initialization happens on `StartFrame`) and other frames come in resulting in undesired behavior. diff --git a/examples/foundational/01-say-one-thing.py b/examples/foundational/01-say-one-thing.py index 8102518f7..fce774822 100644 --- a/examples/foundational/01-say-one-thing.py +++ b/examples/foundational/01-say-one-thing.py @@ -9,11 +9,11 @@ import os import sys -from pipecat.frames.frames import TextFrame +from pipecat.frames.frames import EndFrame, TextFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.task import PipelineTask from pipecat.pipeline.runner import PipelineRunner -from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.transports.services.daily import DailyParams, DailyTransport from runner import configure @@ -34,7 +34,7 @@ async def main(): transport = DailyTransport( room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)) - tts = CartesiaTTSService( + tts = CartesiaHttpTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) @@ -48,7 +48,7 @@ async def main(): @transport.event_handler("on_participant_joined") async def on_new_participant_joined(transport, participant): participant_name = participant["info"]["userName"] or '' - await task.queue_frame(TextFrame(f"Hello there, {participant_name}!")) + await task.queue_frames([TextFrame(f"Hello there, {participant_name}!"), EndFrame()]) await runner.run(task) diff --git a/examples/foundational/02-llm-say-one-thing.py b/examples/foundational/02-llm-say-one-thing.py index e53aee9ae..00a1e9e51 100644 --- a/examples/foundational/02-llm-say-one-thing.py +++ b/examples/foundational/02-llm-say-one-thing.py @@ -9,11 +9,11 @@ import os import sys -from pipecat.frames.frames import LLMMessagesFrame +from pipecat.frames.frames import EndFrame, LLMMessagesFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask -from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -38,7 +38,7 @@ async def main(): "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)) - tts = CartesiaTTSService( + tts = CartesiaHttpTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) @@ -59,7 +59,7 @@ async def main(): @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): - await task.queue_frame(LLMMessagesFrame(messages)) + await task.queue_frames([LLMMessagesFrame(messages), EndFrame()]) await runner.run(task) diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index ca3ff9557..07e54ab8a 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -14,21 +14,18 @@ from pipecat.frames.frames import ( AppFrame, Frame, - ImageRawFrame, LLMFullResponseStartFrame, LLMMessagesFrame, TextFrame ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline from pipecat.pipeline.task import PipelineTask -from pipecat.pipeline.parallel_task import ParallelTask from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.processors.aggregators.gated import GatedAggregator -from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator from pipecat.processors.aggregators.sentence import SentenceAggregator +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.fal import FalImageGenService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -88,9 +85,9 @@ async def main(): ) ) - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) llm = OpenAILLMService( @@ -105,24 +102,23 @@ async def main(): key=os.getenv("FAL_KEY"), ) - gated_aggregator = GatedAggregator( - gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame), - gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame), - start_open=False - ) - sentence_aggregator = SentenceAggregator() month_prepender = MonthPrepender() - llm_full_response_aggregator = LLMFullResponseAggregator() + # With `SyncParallelPipeline` we synchronize audio and images by pushing + # them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 I3 A3). To do + # that, each pipeline runs concurrently and `SyncParallelPipeline` will + # wait for the input frame to be processed. + # + # Note that `SyncParallelPipeline` requires all processors in it to be + # synchronous (which is the default for most processors). pipeline = Pipeline([ llm, # LLM sentence_aggregator, # Aggregates LLM output into full sentences - ParallelTask( # Run pipelines in parallel aggregating the result - [month_prepender, tts], # Create "Month: sentence" and output audio - [llm_full_response_aggregator, imagegen] # Aggregate full LLM response + SyncParallelPipeline( # Run pipelines in parallel aggregating the result + [month_prepender, tts], # Create "Month: sentence" and output audio + [imagegen] # Generate image ), - gated_aggregator, # Queues everything until an image is available transport.output() # Transport output ]) diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index 63bcf1e9d..66a6e7f57 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -12,17 +12,17 @@ import tkinter as tk from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame -from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline from pipecat.pipeline.task import PipelineTask -from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator +from pipecat.processors.aggregators.sentence import SentenceAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.fal import FalImageGenService from pipecat.transports.base_transport import TransportParams -from pipecat.transports.local.tk import TkLocalTransport +from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport from loguru import logger @@ -60,6 +60,7 @@ class AudioGrabber(FrameProcessor): def __init__(self): super().__init__() self.audio = bytearray() + self.frame = None async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -84,9 +85,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID")) + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) imagegen = FalImageGenService( params=FalImageGenService.InputParams( @@ -95,7 +97,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): aiohttp_session=session, key=os.getenv("FAL_KEY")) - aggregator = LLMFullResponseAggregator() + sentence_aggregator = SentenceAggregator() description = ImageDescription() @@ -103,12 +105,22 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): image_grabber = ImageGrabber() + # With `SyncParallelPipeline` we synchronize audio and images by + # pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 + # I3 A3). To do that, each pipeline runs concurrently and + # `SyncParallelPipeline` will wait for the input frame to be + # processed. + # + # Note that `SyncParallelPipeline` requires all processors in it to + # be synchronous (which is the default for most processors). pipeline = Pipeline([ - llm, - aggregator, - description, - ParallelPipeline([tts, audio_grabber], - [imagegen, image_grabber]) + llm, # LLM + sentence_aggregator, # Aggregates LLM output into full sentences + description, # Store sentence + SyncParallelPipeline( + [tts, audio_grabber], # Generate and store audio for the given sentence + [imagegen, image_grabber] # Generate and storeimage for the given sentence + ) ]) task = PipelineTask(pipeline) diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index 35965185f..e99a95068 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -90,11 +90,6 @@ async def main(): ]) task = PipelineTask(pipeline) - task = PipelineTask(pipeline, PipelineParams( - allow_interruptions=True, - enable_metrics=True, - report_only_initial_ttfb=False, - )) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 812dab137..6b3e58cf7 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -20,8 +20,8 @@ LLMUserResponseAggregator, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.transports.services.daily import DailyTransport from pipecat.vad.silero import SileroVADAnalyzer @@ -78,9 +78,9 @@ async def main(): ) ) - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) llm = OpenAILLMService( diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 146b3bd09..9dc4dc99b 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -25,7 +25,7 @@ ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.logger import FrameLogger -from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.cartesia import CartesiaHttpTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport from pipecat.vad.silero import SileroVADAnalyzer @@ -103,9 +103,9 @@ async def main(): api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - tts = ElevenLabsTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id="ErXwobaYiN019PkySvjV", + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady ) messages = [ diff --git a/examples/foundational/17-detect-user-idle.py b/examples/foundational/17-detect-user-idle.py index 903f35f4e..66fcfb200 100644 --- a/examples/foundational/17-detect-user-idle.py +++ b/examples/foundational/17-detect-user-idle.py @@ -70,7 +70,7 @@ async def main(): async def user_idle_callback(user_idle: UserIdleProcessor): messages.append( {"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."}) - await user_idle.queue_frame(LLMMessagesFrame(messages)) + await user_idle.push_frame(LLMMessagesFrame(messages)) user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0) diff --git a/pyproject.toml b/pyproject.toml index 8a1e3a800..170ecd326 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ Website = "https://pipecat.ai" [project.optional-dependencies] anthropic = [ "anthropic~=0.34.0" ] azure = [ "azure-cognitiveservices-speech~=1.40.0" ] -cartesia = [ "websockets~=12.0" ] +cartesia = [ "cartesia~=1.0.13", "websockets~=12.0" ] daily = [ "daily-python~=0.10.1" ] deepgram = [ "deepgram-sdk~=3.5.0" ] elevenlabs = [ "websockets~=12.0" ] diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 51770dff1..a400d68d9 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -248,6 +248,16 @@ class SystemFrame(Frame): pass +@dataclass +class StartFrame(SystemFrame): + """This is the first frame that should be pushed down a pipeline.""" + clock: BaseClock + allow_interruptions: bool = False + enable_metrics: bool = False + enable_usage_metrics: bool = False + report_only_initial_ttfb: bool = False + + @dataclass class CancelFrame(SystemFrame): """Indicates that a pipeline needs to stop right away.""" @@ -338,16 +348,6 @@ class ControlFrame(Frame): pass -@dataclass -class StartFrame(ControlFrame): - """This is the first frame that should be pushed down a pipeline.""" - clock: BaseClock - allow_interruptions: bool = False - enable_metrics: bool = False - enable_usage_metrics: bool = False - report_only_initial_ttfb: bool = False - - @dataclass class EndFrame(ControlFrame): """Indicates that a pipeline has ended and frame processors and pipelines diff --git a/src/pipecat/pipeline/parallel_task.py b/src/pipecat/pipeline/sync_parallel_pipeline.py similarity index 94% rename from src/pipecat/pipeline/parallel_task.py rename to src/pipecat/pipeline/sync_parallel_pipeline.py index 724183b3b..d922134f4 100644 --- a/src/pipecat/pipeline/parallel_task.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -49,12 +49,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._down_queue.put(frame) -class ParallelTask(BasePipeline): +class SyncParallelPipeline(BasePipeline): def __init__(self, *args): super().__init__() if len(args) == 0: - raise Exception(f"ParallelTask needs at least one argument") + raise Exception(f"SyncParallelPipeline needs at least one argument") self._sinks = [] self._sources = [] @@ -66,7 +66,7 @@ def __init__(self, *args): logger.debug(f"Creating {self} pipelines") for processors in args: if not isinstance(processors, list): - raise TypeError(f"ParallelTask argument {processors} is not a list") + raise TypeError(f"SyncParallelPipeline argument {processors} is not a list") # We add a source at the beginning of the pipeline and a sink at the end. source = Source(self._up_queue) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index ab0552578..379394120 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import sys from typing import List from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext diff --git a/src/pipecat/processors/async_frame_processor.py b/src/pipecat/processors/async_frame_processor.py deleted file mode 100644 index 28a27d255..000000000 --- a/src/pipecat/processors/async_frame_processor.py +++ /dev/null @@ -1,64 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio - -from pipecat.frames.frames import EndFrame, Frame, StartInterruptionFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor - - -class AsyncFrameProcessor(FrameProcessor): - - def __init__( - self, - *, - name: str | None = None, - loop: asyncio.AbstractEventLoop | None = None, - **kwargs): - super().__init__(name=name, loop=loop, **kwargs) - - self._create_push_task() - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartInterruptionFrame): - await self._handle_interruptions(frame) - - async def queue_frame( - self, - frame: Frame, - direction: FrameDirection = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def cleanup(self): - self._push_frame_task.cancel() - await self._push_frame_task - - async def _handle_interruptions(self, frame: Frame): - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task). - await self.push_frame(frame) - # Create a new queue and task. - self._create_push_task() - - def _create_push_task(self): - self._push_queue = asyncio.Queue() - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index dfdee7d40..72924776c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -11,12 +11,14 @@ from pipecat.clocks.base_clock import BaseClock from pipecat.frames.frames import ( + EndFrame, ErrorFrame, Frame, MetricsFrame, StartFrame, StartInterruptionFrame, - UserStoppedSpeakingFrame) + StopInterruptionFrame, + SystemFrame) from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -88,6 +90,7 @@ def __init__( self, *, name: str | None = None, + sync: bool = True, loop: asyncio.AbstractEventLoop | None = None, **kwargs): self.id: int = obj_id() @@ -96,6 +99,7 @@ def __init__( self._prev: "FrameProcessor" | None = None self._next: "FrameProcessor" | None = None self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop() + self._sync = sync # Clock self._clock: BaseClock | None = None @@ -109,6 +113,14 @@ def __init__( # Metrics self._metrics = FrameProcessorMetrics(name=self.name) + # Every processor in Pipecat should only output frames from a single + # task. This avoid problems like audio overlapping. System frames are + # the exception to this rule. + # + # This create this task. + if not self._sync: + self.__create_push_task() + @property def interruptions_allowed(self): return self._allow_interruptions @@ -192,14 +204,38 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._enable_usage_metrics = frame.enable_usage_metrics self._report_only_initial_ttfb = frame.report_only_initial_ttfb elif isinstance(frame, StartInterruptionFrame): + await self._start_interruption() await self.stop_all_metrics() - elif isinstance(frame, UserStoppedSpeakingFrame): + elif isinstance(frame, StopInterruptionFrame): self._should_report_ttfb = True async def push_error(self, error: ErrorFrame): await self.push_frame(error, FrameDirection.UPSTREAM) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + if self._sync or isinstance(frame, SystemFrame): + await self.__internal_push_frame(frame, direction) + else: + await self.__push_queue.put((frame, direction)) + + # + # Handle interruptions + # + + async def _start_interruption(self): + if not self._sync: + # Cancel the task. This will stop pushing frames downstream. + self.__push_frame_task.cancel() + await self.__push_frame_task + + # Create a new queue and task. + self.__create_push_task() + + async def _stop_interruption(self): + # Nothing to do right now. + pass + + async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): try: if direction == FrameDirection.DOWNSTREAM and self._next: logger.trace(f"Pushing {frame} from {self} to {self._next}") @@ -210,5 +246,20 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect except Exception as e: logger.exception(f"Uncaught exception in {self}: {e}") + def __create_push_task(self): + self.__push_queue = asyncio.Queue() + self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler()) + + async def __push_frame_task_handler(self): + running = True + while running: + try: + (frame, direction) = await self.__push_queue.get() + await self.__internal_push_frame(frame, direction) + running = not isinstance(frame, EndFrame) + self.__push_queue.task_done() + except asyncio.CancelledError: + break + def __str__(self): return self.name diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index dd28e7252..66adb9ad0 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -272,8 +272,9 @@ class RTVIProcessor(FrameProcessor): def __init__(self, *, config: RTVIConfig = RTVIConfig(config=[]), - params: RTVIProcessorParams = RTVIProcessorParams()): - super().__init__() + params: RTVIProcessorParams = RTVIProcessorParams(), + **kwargs): + super().__init__(sync=False, **kwargs) self._config = config self._params = params @@ -286,9 +287,6 @@ def __init__(self, self._registered_actions: Dict[str, RTVIAction] = {} self._registered_services: Dict[str, RTVIService] = {} - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - self._push_queue = asyncio.Queue() - self._message_task = self.get_event_loop().create_task(self._message_task_handler()) self._message_queue = asyncio.Queue() @@ -335,17 +333,16 @@ async def handle_function_call_start( message = RTVILLMFunctionCallStartMessage(data=fn) await self._push_transport_message(message, exclude_none=False) - async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): - if isinstance(frame, SystemFrame): - await super().push_frame(frame, direction) - else: - await self._internal_push_frame(frame, direction) - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # Specific system frames - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self._start(frame) + elif isinstance(frame, CancelFrame): await self._cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, ErrorFrame): @@ -355,11 +352,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames - elif isinstance(frame, StartFrame): - # Push StartFrame before start(), because we want StartFrame to be - # processed by every processor before any other frame is processed. - await self.push_frame(frame, direction) - await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. @@ -394,30 +386,10 @@ async def _stop(self, frame: EndFrame): # processing EndFrames. self._message_task.cancel() await self._message_task - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._message_task.cancel() await self._message_task - self._push_frame_task.cancel() - await self._push_frame_task - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await super().push_frame(frame, direction) - self._push_queue.task_done() - running = not isinstance(frame, EndFrame) - except asyncio.CancelledError: - break async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True): frame = TransportMessageFrame( diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index eef0d56cc..8d46105a7 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -41,7 +41,7 @@ class OutputParams(BaseModel): clock_sync: bool = True def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._out_params = out_params @@ -62,78 +62,42 @@ def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), bus.add_signal_watch() bus.connect("message", self._on_gstreamer_message) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # Specific system frames - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self._start(frame) + elif isinstance(frame, CancelFrame): await self._cancel(frame) await self.push_frame(frame, direction) # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames - elif isinstance(frame, StartFrame): - # Push StartFrame before start(), because we want StartFrame to be - # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) - await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self._stop(frame) # Other frames else: - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) async def _start(self, frame: StartFrame): self._player.set_state(Gst.State.PLAYING) async def _stop(self, frame: EndFrame): self._player.set_state(Gst.State.NULL) - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._player.set_state(Gst.State.NULL) - # Cancel all the tasks and wait for them to finish. - self._push_frame_task.cancel() - await self._push_frame_task - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break # - # GStreaner + # GStreamer # def _on_gstreamer_message(self, bus: Gst.Bus, message: Gst.Message): @@ -221,7 +185,7 @@ def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): frame = AudioRawFrame(audio=info.data, sample_rate=self._out_params.audio_sample_rate, num_channels=self._out_params.audio_channels) - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK @@ -232,6 +196,6 @@ def _appsink_video_new_sample(self, appsink: GstApp.AppSink): image=info.data, size=(self._out_params.video_width, self._out_params.video_height), format="RGB") - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index 40304a5c6..42b81517e 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -8,19 +8,14 @@ from typing import Awaitable, Callable, List -from pipecat.frames.frames import Frame, SystemFrame -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.frames.frames import Frame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class IdleFrameProcessor(AsyncFrameProcessor): +class IdleFrameProcessor(FrameProcessor): """This class waits to receive any frame or list of desired frames within a given timeout. If the timeout is reached before receiving any of those frames the provided callback will be called. - - The callback can then be used to push frames downstream by using - `queue_frame()` (or `push_frame()` for system frames). - """ def __init__( @@ -30,7 +25,7 @@ def __init__( timeout: float, types: List[type] = [], **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._callback = callback self._timeout = timeout @@ -41,10 +36,7 @@ def __init__( async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) + await self.push_frame(frame, direction) # If we are not waiting for any specific frame set the event, otherwise # check if we have received one of the desired frames. @@ -55,7 +47,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, t): self._idle_event.set() - # If we are not waiting for any specific frame set the event, otherwise async def cleanup(self): self._idle_task.cancel() await self._idle_task diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 7deda2555..36c394a5d 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -11,21 +11,16 @@ from pipecat.frames.frames import ( BotSpeakingFrame, Frame, - SystemFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class UserIdleProcessor(AsyncFrameProcessor): +class UserIdleProcessor(FrameProcessor): """This class is useful to check if the user is interacting with the bot within a given timeout. If the timeout is reached before any interaction occurred the provided callback will be called. - The callback can then be used to push frames downstream by using - `queue_frame()` (or `push_frame()` for system frames). - """ def __init__( @@ -34,7 +29,7 @@ def __init__( callback: Callable[["UserIdleProcessor"], Awaitable[None]], timeout: float, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._callback = callback self._timeout = timeout @@ -46,10 +41,7 @@ def __init__( async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) + await self.push_frame(frame, direction) # We shouldn't call the idle callback if the user or the bot are speaking. if isinstance(frame, UserStartedSpeakingFrame): diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 7291e7db9..b63188512 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -32,7 +32,6 @@ UserImageRequestFrame, VisionImageRawFrame ) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.utils.audio import calculate_audio_volume @@ -67,7 +66,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, EndFrame): await self.stop(frame) - async def process_generator(self, generator: AsyncGenerator[Frame, None]): + async def process_generator(self, generator: AsyncGenerator[Frame | None, None]): async for f in generator: if f: if isinstance(f, ErrorFrame): @@ -76,30 +75,6 @@ async def process_generator(self, generator: AsyncGenerator[Frame, None]): await self.push_frame(f) -class AsyncAIService(AsyncFrameProcessor): - def __init__(self, **kwargs): - super().__init__(**kwargs) - - async def start(self, frame: StartFrame): - pass - - async def stop(self, frame: EndFrame): - pass - - async def cancel(self, frame: CancelFrame): - pass - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartFrame): - await self.start(frame) - elif isinstance(frame, CancelFrame): - await self.cancel(frame) - elif isinstance(frame, EndFrame): - await self.stop(frame) - - class LLMService(AIService): """This class is a no-op but serves as a base class for LLM services.""" @@ -165,21 +140,15 @@ def __init__( self, *, aggregate_sentences: bool = True, - # if True, subclass is responsible for pushing TextFrames and LLMFullResponseEndFrames + # if True, TTSService will push TextFrames and LLMFullResponseEndFrames, + # otherwise subclass must do it push_text_frames: bool = True, - # if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it - push_stop_frames: bool = False, - # if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame - stop_frame_timeout_s: float = 1.0, + # TTS output sample rate sample_rate: int = 16000, **kwargs): super().__init__(**kwargs) self._aggregate_sentences: bool = aggregate_sentences self._push_text_frames: bool = push_text_frames - self._push_stop_frames: bool = push_stop_frames - self._stop_frame_timeout_s: float = stop_frame_timeout_s - self._stop_frame_task: Optional[asyncio.Task] = None - self._stop_frame_queue: asyncio.Queue = asyncio.Queue() self._current_sentence: str = "" self._sample_rate: int = sample_rate @@ -224,7 +193,7 @@ async def _process_text_frame(self, frame: TextFrame): if text: await self._push_tts_frames(text) - async def _push_tts_frames(self, text: str, text_passthrough: bool = True): + async def _push_tts_frames(self, text: str): text = text.strip() if not text: return @@ -264,6 +233,25 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): else: await self.push_frame(frame, direction) + +class AsyncTTSService(TTSService): + def __init__( + self, + # if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it + push_stop_frames: bool = False, + # if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame + stop_frame_timeout_s: float = 1.0, + **kwargs): + super().__init__(sync=False, **kwargs) + self._push_stop_frames: bool = push_stop_frames + self._stop_frame_timeout_s: float = stop_frame_timeout_s + self._stop_frame_task: Optional[asyncio.Task] = None + self._stop_frame_queue: asyncio.Queue = asyncio.Queue() + + @abstractmethod + async def flush_audio(self): + pass + async def start(self, frame: StartFrame): await super().start(frame) if self._push_stop_frames: @@ -312,15 +300,6 @@ async def _stop_frame_handler(self): pass -class AsyncTTSService(TTSService): - def __init__(self, **kwargs): - super().__init__(**kwargs) - - @abstractmethod - async def flush_audio(self): - pass - - class AsyncWordTTSService(AsyncTTSService): def __init__(self, **kwargs): super().__init__(**kwargs) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index c2f984b75..90674fcc4 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -18,13 +18,11 @@ ErrorFrame, Frame, StartFrame, - SystemFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame, URLImageRawFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService, TTSService, ImageGenService +from pipecat.services.ai_services import STTService, TTSService, ImageGenService from pipecat.services.openai import BaseOpenAILLMService from pipecat.utils.time import time_now_iso8601 @@ -126,7 +124,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.error(f"{self} error: {cancellation_details.error_details}") -class AzureSTTService(AsyncAIService): +class AzureSTTService(STTService): def __init__( self, *, @@ -149,15 +147,11 @@ def __init__( speech_config=speech_config, audio_config=audio_config) self._speech_recognizer.recognized.connect(self._on_handle_recognized) - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - self._audio_stream.write(frame.audio) - else: - await self._push_queue.put((frame, direction)) + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + self._audio_stream.write(audio) + await self.stop_processing_metrics() + yield None async def start(self, frame: StartFrame): await super().start(frame) @@ -176,7 +170,7 @@ async def cancel(self, frame: CancelFrame): def _on_handle_recognized(self, event): if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0: frame = TranscriptionFrame(event.result.text, "", time_now_iso8601()) - asyncio.run_coroutine_threadsafe(self.queue_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) class AzureImageGenServiceREST(ImageGenService): diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index ea790fab7..078926235 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -8,7 +8,6 @@ import uuid import base64 import asyncio -import time from typing import AsyncGenerator @@ -22,17 +21,17 @@ EndFrame, TTSStartedFrame, TTSStoppedFrame, - TextFrame, LLMFullResponseEndFrame ) from pipecat.processors.frame_processor import FrameDirection from pipecat.transcriptions.language import Language -from pipecat.services.ai_services import AsyncWordTTSService +from pipecat.services.ai_services import AsyncWordTTSService, TTSService from loguru import logger # See .env.example for Cartesia configuration needed try: + from cartesia import AsyncCartesia import websockets except ModuleNotFoundError as e: logger.error(f"Exception: {e}") @@ -165,7 +164,7 @@ async def _handle_interruption(self, frame: StartInterruptionFrame, direction: F async def flush_audio(self): if not self._context_id or not self._websocket: return - logger.debug("Flushing audio") + logger.trace("Flushing audio") msg = { "transcript": "", "continue": False, @@ -257,3 +256,84 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: logger.error(f"{self} exception: {e}") + + +class CartesiaHttpTTSService(TTSService): + + def __init__( + self, + *, + api_key: str, + voice_id: str, + model_id: str = "sonic-english", + base_url: str = "https://api.cartesia.ai", + encoding: str = "pcm_s16le", + sample_rate: int = 16000, + language: str = "en", + **kwargs): + super().__init__(**kwargs) + + self._api_key = api_key + self._voice_id = voice_id + self._model_id = model_id + self._output_format = { + "container": "raw", + "encoding": encoding, + "sample_rate": sample_rate, + } + self._language = language + + self._client = AsyncCartesia(api_key=api_key, base_url=base_url) + + def can_generate_metrics(self) -> bool: + return True + + async def set_model(self, model: str): + logger.debug(f"Switching TTS model to: [{model}]") + self._model_id = model + + async def set_voice(self, voice: str): + logger.debug(f"Switching TTS voice to: [{voice}]") + self._voice_id = voice + + async def set_language(self, language: Language): + logger.debug(f"Switching TTS language to: [{language}]") + self._language = language_to_cartesia_language(language) + + async def stop(self, frame: EndFrame): + await super().stop(frame) + await self._client.close() + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + await self._client.close() + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + await self.push_frame(TTSStartedFrame()) + await self.start_ttfb_metrics() + + try: + output = await self._client.tts.sse( + model_id=self._model_id, + transcript=text, + voice_id=self._voice_id, + output_format=self._output_format, + language=self._language, + stream=False + ) + + await self.stop_ttfb_metrics() + + frame = AudioRawFrame( + audio=output["audio"], + sample_rate=self._output_format["sample_rate"], + num_channels=1 + ) + yield frame + except Exception as e: + logger.error(f"{self} exception: {e}") + + await self.start_tts_usage_metrics(text) + await self.push_frame(TTSStoppedFrame()) diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index d899d4bdb..708c3c511 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -161,8 +161,8 @@ async def cancel(self, frame: CancelFrame): async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: await self.start_processing_metrics() await self._connection.send(audio) - yield None await self.stop_processing_metrics() + yield None async def _connect(self): if await self._connection.start(self._live_options): diff --git a/src/pipecat/services/fal.py b/src/pipecat/services/fal.py index 4d99f6066..672135d02 100644 --- a/src/pipecat/services/fal.py +++ b/src/pipecat/services/fal.py @@ -43,8 +43,9 @@ def __init__( aiohttp_session: aiohttp.ClientSession, model: str = "fal-ai/fast-sdxl", key: str | None = None, + **kwargs ): - super().__init__() + super().__init__(**kwargs) self._model = model self._params = params self._aiohttp_session = aiohttp_session diff --git a/src/pipecat/services/gladia.py b/src/pipecat/services/gladia.py index 886300897..ead8f63dc 100644 --- a/src/pipecat/services/gladia.py +++ b/src/pipecat/services/gladia.py @@ -7,20 +7,17 @@ import base64 import json -from typing import Optional +from typing import AsyncGenerator, Optional from pydantic.main import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, InterimTranscriptionFrame, StartFrame, - SystemFrame, TranscriptionFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService +from pipecat.services.ai_services import STTService from pipecat.utils.time import time_now_iso8601 from loguru import logger @@ -35,7 +32,7 @@ raise Exception(f"Missing module: {e}") -class GladiaSTTService(AsyncAIService): +class GladiaSTTService(STTService): class InputParams(BaseModel): sample_rate: Optional[int] = 16000 language: Optional[str] = "english" @@ -50,23 +47,13 @@ def __init__(self, confidence: float = 0.5, params: InputParams = InputParams(), **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._api_key = api_key self._url = url self._params = params self._confidence = confidence - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - await self._send_audio(frame) - else: - await self.queue_frame(frame, direction) - async def start(self, frame: StartFrame): await super().start(frame) self._websocket = await websockets.connect(self._url) @@ -81,6 +68,12 @@ async def cancel(self, frame: CancelFrame): await super().cancel(frame) await self._websocket.close() + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + await self._send_audio(audio) + await self.stop_processing_metrics() + yield None + async def _setup_gladia(self): configuration = { "x_gladia_key": self._api_key, @@ -92,9 +85,9 @@ async def _setup_gladia(self): await self._websocket.send(json.dumps(configuration)) - async def _send_audio(self, frame: AudioRawFrame): + async def _send_audio(self, audio: bytes): message = { - 'frames': base64.b64encode(frame.audio).decode("utf-8") + 'frames': base64.b64encode(audio).decode("utf-8") } await self._websocket.send(json.dumps(message)) @@ -113,6 +106,6 @@ async def _receive_task_handler(self): transcript = utterance["transcription"] if confidence >= self._confidence: if type == "final": - await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) else: - await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 638e394a1..60f0cb7df 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -46,7 +46,7 @@ def __init__( **kwargs): # Let TTSService produce TTSStoppedFrames after a short delay of # no activity. - super().__init__(push_stop_frames=True, sample_rate=sample_rate, **kwargs) + super().__init__(sync=False, push_stop_frames=True, sample_rate=sample_rate, **kwargs) self._api_key = api_key self._voice_id = voice_id diff --git a/src/pipecat/services/moondream.py b/src/pipecat/services/moondream.py index cff8d3172..3441aeeb9 100644 --- a/src/pipecat/services/moondream.py +++ b/src/pipecat/services/moondream.py @@ -46,12 +46,13 @@ def detect_device(): class MoondreamService(VisionService): def __init__( self, - *, + *, model="vikhyatk/moondream2", - revision="2024-04-02", - use_cpu=False + revision="2024-08-26", + use_cpu=False, + **kwargs ): - super().__init__() + super().__init__(**kwargs) if not use_cpu: device, dtype = detect_device() diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 3d1d0c4d7..de1ec8884 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -31,16 +31,12 @@ class BaseInputTransport(FrameProcessor): def __init__(self, params: TransportParams, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._params = params self._executor = ThreadPoolExecutor(max_workers=5) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create audio input queue and task if needed. if self._params.audio_in_enabled or self._params.vad_enabled: @@ -53,10 +49,6 @@ async def stop(self, frame: EndFrame): self._audio_task.cancel() await self._audio_task - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task - async def cancel(self, frame: CancelFrame): # Cancel all the tasks and wait for them to finish. @@ -64,9 +56,6 @@ async def cancel(self, frame: CancelFrame): self._audio_task.cancel() await self._audio_task - self._push_frame_task.cancel() - await self._push_frame_task - def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer @@ -82,28 +71,25 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # Specific system frames - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self.start(frame) + elif isinstance(frame, CancelFrame): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, BotInterruptionFrame): - await self._handle_interruptions(frame, False) - elif isinstance(frame, StartInterruptionFrame): + logger.debug("Bot interruption") await self._start_interruption() - elif isinstance(frame, StopInterruptionFrame): - await self._stop_interruption() # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames - elif isinstance(frame, StartFrame): - # Push StartFrame before start(), because we want StartFrame to be - # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) - await self.start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self.stop(frame) elif isinstance(frame, VADParamsUpdateFrame): vad_analyzer = self.vad_analyzer() @@ -111,73 +97,28 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): vad_analyzer.set_params(frame.params) # Other frames else: - await self._internal_push_frame(frame, direction) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(frame, direction) # # Handle interruptions # - async def _start_interruption(self): - if not self.interruptions_allowed: - return - - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task) to stop everything, specially at the output - # transport. - await self.push_frame(StartInterruptionFrame()) - # Create a new queue and task. - self._create_push_task() - - async def _stop_interruption(self): - if not self.interruptions_allowed: - return - - await self.push_frame(StopInterruptionFrame()) - - async def _handle_interruptions(self, frame: Frame, push_frame: bool): + async def _handle_interruptions(self, frame: Frame): if self.interruptions_allowed: - # Make sure we notify about interruptions quickly out-of-band - if isinstance(frame, BotInterruptionFrame): - logger.debug("Bot interruption") - await self._start_interruption() - elif isinstance(frame, UserStartedSpeakingFrame): + # Make sure we notify about interruptions quickly out-of-band. + if isinstance(frame, UserStartedSpeakingFrame): logger.debug("User started speaking") await self._start_interruption() + # Push an out-of-band frame (i.e. not using the ordered push + # frame task) to stop everything, specially at the output + # transport. + await self.push_frame(StartInterruptionFrame()) elif isinstance(frame, UserStoppedSpeakingFrame): logger.debug("User stopped speaking") await self._stop_interruption() + await self.push_frame(StopInterruptionFrame()) - if push_frame: - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio input @@ -201,7 +142,7 @@ async def _handle_vad(self, audio_frames: bytes, vad_state: VADState): frame = UserStoppedSpeakingFrame() if frame: - await self._handle_interruptions(frame, True) + await self._handle_interruptions(frame) vad_state = new_vad_state return vad_state @@ -222,7 +163,7 @@ async def _audio_task_handler(self): # Push audio downstream if passthrough. if audio_passthrough: - await self._internal_push_frame(frame) + await self.push_frame(frame) self._audio_in_queue.task_done() except asyncio.CancelledError: diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index a24c9f4d2..9b1b9c29e 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -43,7 +43,7 @@ class BaseOutputTransport(FrameProcessor): def __init__(self, params: TransportParams, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._params = params @@ -70,10 +70,6 @@ def __init__(self, params: TransportParams, **kwargs): # generating frames upstream while, for example, the audio is playing. self._create_sink_tasks() - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create camera output queue and task if needed. if self._params.camera_out_enabled: @@ -85,6 +81,13 @@ async def start(self, frame: StartFrame): self._audio_out_task = self.get_event_loop().create_task(self._audio_out_task_handler()) async def stop(self, frame: EndFrame): + # At this point we have enqueued an EndFrame and we need to wait for + # that EndFrame to be processed by the sink tasks. We also need to wait + # for these tasks before cancelling the camera and audio tasks below + # because they might be still rendering. + await self._sink_task + await self._sink_clock_task + # Cancel and wait for the camera output task to finish. if self._params.camera_out_enabled: self._camera_out_task.cancel() @@ -95,23 +98,23 @@ async def stop(self, frame: EndFrame): self._audio_out_task.cancel() await self._audio_out_task - # Wait for the push frame and sink tasks to finish. They will finish when - # the EndFrame is actually processed. - await self._push_frame_task - await self._sink_task - async def cancel(self, frame: CancelFrame): - # Cancel all the tasks and wait for them to finish. + # Since we are cancelling everything it doesn't matter if we cancel sink + # tasks first or not. + self._sink_task.cancel() + self._sink_clock_task.cancel() + await self._sink_task + await self._sink_clock_task + # Cancel and wait for the camera output task to finish. if self._params.camera_out_enabled: self._camera_out_task.cancel() await self._camera_out_task - self._push_frame_task.cancel() - await self._push_frame_task - - self._sink_task.cancel() - await self._sink_task + # Cancel and wait for the audio output task to finish. + if self._params.audio_out_enabled and self._params.audio_out_is_live: + self._audio_out_task.cancel() + await self._audio_out_task async def send_message(self, frame: TransportMessageFrame): pass @@ -137,7 +140,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # immediately. Other frames require order so they are put in the sink # queue. # - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self.start(frame) + elif isinstance(frame, CancelFrame): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame): @@ -149,9 +157,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames. - elif isinstance(frame, StartFrame): - await self._sink_queue.put(frame) - await self.start(frame) elif isinstance(frame, EndFrame): await self._sink_clock_queue.put((sys.maxsize, frame.id, frame)) await self._sink_queue.put(frame) @@ -182,10 +187,6 @@ async def _handle_interruptions(self, frame: Frame): await self._sink_clock_task # Create sink tasks. self._create_sink_tasks() - # Stop push task. - self._push_frame_task.cancel() - await self._push_frame_task - self._create_push_task() # Let's send a bot stopped speaking if we have to. if self._bot_speaking: await self._bot_stopped_speaking() @@ -227,7 +228,7 @@ def _create_sink_tasks(self): async def _sink_frame_handler(self, frame: Frame): if isinstance(frame, AudioRawFrame): await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) elif isinstance(frame, ImageRawFrame): await self._set_camera_image(frame) @@ -237,12 +238,12 @@ async def _sink_frame_handler(self, frame: Frame): await self.send_message(frame) elif isinstance(frame, TTSStartedFrame): await self._bot_started_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) elif isinstance(frame, TTSStoppedFrame): await self._bot_stopped_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) async def _sink_task_handler(self): running = True @@ -261,7 +262,7 @@ async def _sink_clock_frame_handler(self, frame: Frame): # TODO(aleix): For now we just process TextFrame. But we should process # audio and video as well. if isinstance(frame, TextFrame): - await self._internal_push_frame(frame) + await self.push_frame(frame) async def _sink_clock_task_handler(self): running = True @@ -269,7 +270,7 @@ async def _sink_clock_task_handler(self): try: timestamp, _, frame = await self._sink_clock_queue.get() - # If we hit an EndFrame, we cna finish right away. + # If we hit an EndFrame, we can finish right away. running = not isinstance(frame, EndFrame) # If we have a frame we check it's presentation timestamp. If it @@ -293,38 +294,12 @@ async def _sink_clock_task_handler(self): async def _bot_started_speaking(self): logger.debug("Bot started speaking") self._bot_speaking = True - await self._internal_push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) + await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) async def _bot_stopped_speaking(self): logger.debug("Bot stopped speaking") self._bot_speaking = False - await self._internal_push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) # # Camera out @@ -363,9 +338,9 @@ async def _camera_out_task_handler(self): elif self._camera_images: image = next(self._camera_images) await self._draw_image(image) - await asyncio.sleep(1.0 / self._params.camera_out_framerate) + await asyncio.sleep(self._camera_out_frame_duration) else: - await asyncio.sleep(1.0 / self._params.camera_out_framerate) + await asyncio.sleep(self._camera_out_frame_duration) except asyncio.CancelledError: break except Exception as e: @@ -408,7 +383,7 @@ async def _audio_out_task_handler(self): try: frame = await self._audio_out_queue.get() await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) except asyncio.CancelledError: break diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index 588e7d2ae..e7dc04902 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -141,12 +141,12 @@ def __init__(self, tk_root: tk.Tk, params: TransportParams): # BaseTransport # - def input(self) -> FrameProcessor: + def input(self) -> TkInputTransport: if not self._input: self._input = TkInputTransport(self._pyaudio, self._params) return self._input - def output(self) -> FrameProcessor: + def output(self) -> TkOutputTransport: if not self._output: self._output = TkOutputTransport(self._tk_root, self._pyaudio, self._params) return self._output diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index 2c4bd187b..7169c73bd 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -164,10 +164,10 @@ def __init__( self._register_event_handler("on_client_connected") self._register_event_handler("on_client_disconnected") - def input(self) -> FrameProcessor: + def input(self) -> FastAPIWebsocketInputTransport: return self._input - def output(self) -> FrameProcessor: + def output(self) -> FastAPIWebsocketOutputTransport: return self._output async def _on_client_connected(self, websocket): diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 08ca9719a..c17818898 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -98,9 +98,9 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p continue if isinstance(frame, AudioRawFrame): - await self.push_audio_frame(frame) + await self.queue_audio_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) # Notify disconnection await self._callbacks.on_client_disconnected(websocket) @@ -190,13 +190,13 @@ def __init__( self._register_event_handler("on_client_connected") self._register_event_handler("on_client_disconnected") - def input(self) -> FrameProcessor: + def input(self) -> WebsocketServerInputTransport: if not self._input: self._input = WebsocketServerInputTransport( self._host, self._port, self._params, self._callbacks, name=self._input_name) return self._input - def output(self) -> FrameProcessor: + def output(self) -> WebsocketServerOutputTransport: if not self._output: self._output = WebsocketServerOutputTransport(self._params, name=self._output_name) return self._output diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 7cf330b9e..2a45adf36 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -625,11 +625,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # async def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame): - await self._internal_push_frame(frame) + await self.push_frame(frame) async def push_app_message(self, message: Any, sender: str): frame = DailyTransportMessageFrame(message=message, participant_id=sender) - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio in @@ -692,7 +692,7 @@ async def _on_participant_video_frame(self, participant_id: str, buffer, size, f image=buffer, size=size, format=format) - await self._internal_push_frame(frame) + await self.push_frame(frame) self._video_renderers[participant_id]["timestamp"] = curr_time @@ -811,12 +811,12 @@ def __init__( # BaseTransport # - def input(self) -> FrameProcessor: + def input(self) -> DailyInputTransport: if not self._input: self._input = DailyInputTransport(self._client, self._params, name=self._input_name) return self._input - def output(self) -> FrameProcessor: + def output(self) -> DailyOutputTransport: if not self._output: self._output = DailyOutputTransport(self._client, self._params, name=self._output_name) return self._output