diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ab3fc67d..c4a005280 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `AWSTTSService` is now deprecated, use `PollyTTSService` instead. +### Fixed + +- Fixed a `BaseOutputTransport` issue that was causing non-audio frames being + processed before the previous audio frames were played. This will allow, for + example, sending a frame `A` after a `TTSSpeakFrame` and the frame `A` will + only be pushed downstream after the audio generated from `TTSSpeakFrame` has + been spoken. + ## [0.0.50] - 2024-12-11 ### Added diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index a9154f4dc..56c86076a 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -15,7 +15,6 @@ from pipecat.audio.vad.vad_analyzer import VAD_STOP_SECS from pipecat.frames.frames import ( - AudioRawFrame, BotSpeakingFrame, BotStartedSpeakingFrame, BotStoppedSpeakingFrame, @@ -52,9 +51,7 @@ def __init__(self, params: TransportParams, **kwargs): self._sink_clock_task = None # Task to write/send audio and image frames. - self._audio_out_task = None self._camera_out_task = None - self._running_out_tasks = True # These are the images that we should send to the camera at our desired # framerate. @@ -77,22 +74,31 @@ async def start(self, frame: StartFrame): # Start audio mixer. if self._params.audio_out_mixer: await self._params.audio_out_mixer.start(self._params.audio_out_sample_rate) - self._create_output_tasks() + self._create_camera_task() self._create_sink_tasks() async def stop(self, frame: EndFrame): - # We can't cancel output tasks because there might still be audio - # buffered to be played. - await self._stop_output_tasks() - # Stop audio mixer. - if self._params.audio_out_mixer: - await self._params.audio_out_mixer.stop() + # Let the sink tasks process the queue until they reach this EndFrame. + await self._sink_clock_queue.put((sys.maxsize, frame.id, frame)) + await self._sink_queue.put(frame) + + # At this point we have enqueued an EndFrame and we need to wait for + # that EndFrame to be processed by the sink tasks. We also need to wait + # for these tasks before cancelling the camera and audio tasks below + # because they might be still rendering. + if self._sink_task: + await self._sink_task + if self._sink_clock_task: + await self._sink_clock_task + + # We can now cancel the camera task. + await self._cancel_camera_task() async def cancel(self, frame: CancelFrame): # Since we are cancelling everything it doesn't matter if we cancel sink # tasks first or not. await self._cancel_sink_tasks() - await self._cancel_output_tasks() + await self._cancel_camera_task() async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): pass @@ -103,6 +109,12 @@ async def write_frame_to_camera(self, frame: OutputImageRawFrame): async def write_raw_audio_frames(self, frames: bytes): pass + async def send_audio(self, frame: OutputAudioRawFrame): + await self.queue_frame(frame, FrameDirection.DOWNSTREAM) + + async def send_image(self, frame: OutputImageRawFrame | SpriteFrame): + await self.queue_frame(frame, FrameDirection.DOWNSTREAM) + # # Frame processor # @@ -130,11 +142,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.push_frame(frame, direction) # Control frames. elif isinstance(frame, EndFrame): - # Process sink tasks. - await self._stop_sink_tasks(frame) - # Now we can stop. await self.stop(frame) - # We finally push EndFrame down so PipelineTask stops nicely. + # Keep pushing EndFrame down so all the pipeline stops nicely. await self.push_frame(frame, direction) elif isinstance(frame, MixerControlFrame) and self._params.audio_out_mixer: await self._params.audio_out_mixer.process_frame(frame) @@ -149,30 +158,16 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): else: await self._sink_queue.put(frame) - async def _stop_sink_tasks(self, frame: EndFrame): - # Let the sink tasks process the queue until they reach this EndFrame. - await self._sink_clock_queue.put((sys.maxsize, frame.id, frame)) - await self._sink_queue.put(frame) - - # At this point we have enqueued an EndFrame and we need to wait for - # that EndFrame to be processed by the sink tasks. We also need to wait - # for these tasks before cancelling the camera and audio tasks below - # because they might be still rendering. - if self._sink_task: - await self._sink_task - if self._sink_clock_task: - await self._sink_clock_task - async def _handle_interruptions(self, frame: Frame): if not self.interruptions_allowed: return if isinstance(frame, StartInterruptionFrame): - # Cancel sink and output tasks. + # Cancel sink and camera tasks. await self._cancel_sink_tasks() - await self._cancel_output_tasks() - # Create sink and output tasks. - self._create_output_tasks() + await self._cancel_camera_task() + # Create sink and camera tasks. + self._create_camera_task() self._create_sink_tasks() # Let's send a bot stopped speaking if we have to. await self._bot_stopped_speaking() @@ -181,19 +176,16 @@ async def _handle_audio(self, frame: OutputAudioRawFrame): if not self._params.audio_out_enabled: return - if self._params.audio_out_is_live: - await self._audio_out_queue.put(frame) - else: - cls = type(frame) - self._audio_buffer.extend(frame.audio) - while len(self._audio_buffer) >= self._audio_chunk_size: - chunk = cls( - bytes(self._audio_buffer[: self._audio_chunk_size]), - sample_rate=frame.sample_rate, - num_channels=frame.num_channels, - ) - await self._sink_queue.put(chunk) - self._audio_buffer = self._audio_buffer[self._audio_chunk_size :] + cls = type(frame) + self._audio_buffer.extend(frame.audio) + while len(self._audio_buffer) >= self._audio_chunk_size: + chunk = cls( + bytes(self._audio_buffer[: self._audio_chunk_size]), + sample_rate=frame.sample_rate, + num_channels=frame.num_channels, + ) + await self._sink_queue.put(chunk) + self._audio_buffer = self._audio_buffer[self._audio_chunk_size :] async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame): if not self._params.camera_out_enabled: @@ -242,9 +234,7 @@ async def _cancel_sink_tasks(self): self._sink_clock_task = None async def _sink_frame_handler(self, frame: Frame): - if isinstance(frame, OutputAudioRawFrame): - await self._audio_out_queue.put(frame) - elif isinstance(frame, OutputImageRawFrame): + if isinstance(frame, OutputImageRawFrame): await self._set_camera_image(frame) elif isinstance(frame, SpriteFrame): await self._set_camera_images(frame.images) @@ -254,19 +244,6 @@ async def _sink_frame_handler(self, frame: Frame): elif not isinstance(frame, EndFrame): await self.push_frame(frame) - async def _sink_task_handler(self): - running = True - while running: - try: - frame = await self._sink_queue.get() - await self._sink_frame_handler(frame) - running = not isinstance(frame, EndFrame) - self._sink_queue.task_done() - except asyncio.CancelledError: - break - except Exception as e: - logger.exception(f"{self} error processing sink queue: {e}") - async def _sink_clock_task_handler(self): running = True while running: @@ -292,48 +269,93 @@ async def _sink_clock_task_handler(self): except Exception as e: logger.exception(f"{self} error processing sink clock queue: {e}") + def _next_frame(self) -> AsyncGenerator[Frame, None]: + async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: + while True: + try: + frame = await asyncio.wait_for(self._sink_queue.get(), timeout=vad_stop_secs) + yield frame + except asyncio.TimeoutError: + # Notify the bot stopped speaking upstream if necessary. + await self._bot_stopped_speaking() + + async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: + last_frame_time = 0 + silence = b"\x00" * self._audio_chunk_size + while True: + try: + frame = self._sink_queue.get_nowait() + if isinstance(frame, OutputAudioRawFrame): + frame.audio = await self._params.audio_out_mixer.mix(frame.audio) + last_frame_time = time.time() + yield frame + except asyncio.QueueEmpty: + # Notify the bot stopped speaking upstream if necessary. + diff_time = time.time() - last_frame_time + if diff_time > vad_stop_secs: + await self._bot_stopped_speaking() + # Generate an audio frame with only the mixer's part. + frame = OutputAudioRawFrame( + audio=await self._params.audio_out_mixer.mix(silence), + sample_rate=self._params.audio_out_sample_rate, + num_channels=self._params.audio_out_channels, + ) + yield frame + + vad_stop_secs = ( + self._params.vad_analyzer.params.stop_secs + if self._params.vad_analyzer + else VAD_STOP_SECS + ) + if self._params.audio_out_mixer: + return with_mixer(vad_stop_secs) + else: + return without_mixer(vad_stop_secs) + + async def _sink_task_handler(self): + try: + async for frame in self._next_frame(): + # Notify the bot started speaking upstream if necessary and that + # it's actually speaking. + if isinstance(frame, TTSAudioRawFrame): + await self._bot_started_speaking() + await self.push_frame(BotSpeakingFrame()) + await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) + + # Handle frame. + await self._sink_frame_handler(frame) + + # Also, push frame downstream in case anyone else needs it. + await self.push_frame(frame) + + # Send audio. + if isinstance(frame, OutputAudioRawFrame): + await self.write_raw_audio_frames(frame.audio) + + if isinstance(frame, EndFrame): + break + except asyncio.CancelledError: + pass + except Exception as e: + logger.exception(f"{self} error writing to microphone: {e}") + # - # Output tasks + # Camera task # - def _create_output_tasks(self): + def _create_camera_task(self): loop = self.get_event_loop() # Create camera output queue and task if needed. if self._params.camera_out_enabled: self._camera_out_queue = asyncio.Queue() self._camera_out_task = loop.create_task(self._camera_out_task_handler()) - # Create audio output queue and task if needed. - if self._params.audio_out_enabled: - self._audio_out_queue = asyncio.Queue() - self._audio_out_task = loop.create_task(self._audio_out_task_handler()) - async def _stop_output_tasks(self): - self._running_out_tasks = False - # Stop camera output task. - if self._camera_out_task and self._params.camera_out_enabled: - await self._camera_out_task - # Stop audio output task. - if self._audio_out_task and self._params.audio_out_enabled: - await self._audio_out_task - - async def _cancel_output_tasks(self): + async def _cancel_camera_task(self): # Stop camera output task. if self._camera_out_task and self._params.camera_out_enabled: self._camera_out_task.cancel() await self._camera_out_task self._camera_out_task = None - # Stop audio output task. - if self._audio_out_task and self._params.audio_out_enabled: - self._audio_out_task.cancel() - await self._audio_out_task - self._audio_out_task = None - - # - # Camera out - # - - async def send_image(self, frame: OutputImageRawFrame | SpriteFrame): - await self.queue_frame(frame, FrameDirection.DOWNSTREAM) async def _draw_image(self, frame: OutputImageRawFrame): desired_size = (self._params.camera_out_width, self._params.camera_out_height) @@ -359,7 +381,7 @@ async def _camera_out_task_handler(self): self._camera_out_frame_index = 0 self._camera_out_frame_duration = 1 / self._params.camera_out_framerate self._camera_out_frame_reset = self._camera_out_frame_duration * 5 - while self._running_out_tasks: + while True: try: if self._params.camera_out_is_live: await self._camera_out_is_live_handler() @@ -398,74 +420,3 @@ async def _camera_out_is_live_handler(self): await self._draw_image(image) self._camera_out_queue.task_done() - - # - # Audio out - # - - async def send_audio(self, frame: OutputAudioRawFrame): - await self.queue_frame(frame, FrameDirection.DOWNSTREAM) - - def _next_audio_frame(self) -> AsyncGenerator[AudioRawFrame, None]: - async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[AudioRawFrame, None]: - while self._running_out_tasks or self._bot_speaking: - try: - frame = await asyncio.wait_for( - self._audio_out_queue.get(), timeout=vad_stop_secs - ) - yield frame - except asyncio.TimeoutError: - # Notify the bot stopped speaking upstream if necessary. - await self._bot_stopped_speaking() - - async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[AudioRawFrame, None]: - last_frame_time = 0 - silence = b"\x00" * self._audio_chunk_size - while self._running_out_tasks or self._bot_speaking: - try: - frame = self._audio_out_queue.get_nowait() - frame.audio = await self._params.audio_out_mixer.mix(frame.audio) - last_frame_time = time.time() - yield frame - except asyncio.QueueEmpty: - # Notify the bot stopped speaking upstream if necessary. - diff_time = time.time() - last_frame_time - if diff_time > vad_stop_secs: - await self._bot_stopped_speaking() - # Generate an audio frame with only the mixer's part. - frame = OutputAudioRawFrame( - audio=await self._params.audio_out_mixer.mix(silence), - sample_rate=self._params.audio_out_sample_rate, - num_channels=self._params.audio_out_channels, - ) - yield frame - - vad_stop_secs = ( - self._params.vad_analyzer.params.stop_secs - if self._params.vad_analyzer - else VAD_STOP_SECS - ) - if self._params.audio_out_mixer: - return with_mixer(vad_stop_secs) - else: - return without_mixer(vad_stop_secs) - - async def _audio_out_task_handler(self): - try: - async for frame in self._next_audio_frame(): - # Notify the bot started speaking upstream if necessary and that - # it's actually speaking. - if isinstance(frame, TTSAudioRawFrame): - await self._bot_started_speaking() - await self.push_frame(BotSpeakingFrame()) - await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) - - # Also, push frame downstream in case anyone else needs it. - await self.push_frame(frame) - - # Send audio. - await self.write_raw_audio_frames(frame.audio) - except asyncio.CancelledError: - pass - except Exception as e: - logger.exception(f"{self} error writing to microphone: {e}")