From 935eb088607dfa132a8b15da9ac6e37c272232bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 15 May 2024 23:35:15 -0700 Subject: [PATCH] add missing queue task_done() --- src/pipecat/frames/frames.py | 6 +++--- src/pipecat/transports/base_input.py | 2 ++ src/pipecat/transports/base_output.py | 3 +++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 7fcb0b6c2..ed7ac75e8 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -119,7 +119,7 @@ class TextFrame(DataFrame): text: str def __str__(self): - return f'{self.name}: "{self.text}"' + return f"{self.name}(text: {self.text})" @dataclass @@ -132,7 +132,7 @@ class TranscriptionFrame(TextFrame): timestamp: str def __str__(self): - return f"{self.name}(user: {self.user_id}, timestamp: {self.timestamp})" + return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})" @dataclass @@ -143,7 +143,7 @@ class InterimTranscriptionFrame(TextFrame): timestamp: str def __str__(self): - return f"{self.name}(user: {self.user_id}, timestamp: {self.timestamp})" + return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})" @dataclass diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 5345eda6f..603a10b88 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -135,6 +135,8 @@ def _audio_out_thread_handler(self): future = asyncio.run_coroutine_threadsafe( self.push_frame(frame), self.get_event_loop()) future.result() + + self._audio_in_queue.task_done() except queue.Empty: pass except BaseException as e: diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 520372803..ff9040406 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -169,6 +169,8 @@ def _sink_thread_handler(self): # Send any remaining audio self._send_audio_truncated(buffer, bytes_size_10ms) buffer = bytearray() + + self._sink_queue.task_done() except queue.Empty: pass except BaseException as e: @@ -208,6 +210,7 @@ def _camera_out_thread_handler(self): if self._params.camera_out_is_live: image = self._camera_out_queue.get(timeout=1) self._draw_image(image) + self._camera_out_queue.task_done() elif self._camera_images: image = next(self._camera_images) self._draw_image(image)