From 5bfcac1f5ca383040ccfe4a34e51334b7be3404f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 17 Dec 2024 16:02:33 -0800 Subject: [PATCH] transports: call parent stop() before disconnecting This rollbacks a previous change https://github.com/pipecat-ai/pipecat/pull/855 which was trying to fix an issue in the wrong way. The reasoning behind this fix is that the parent class might be sending audio or messages (through the subclass) and if we disconnect before all the data is sent we will run into incomplete audio or even errors. Therefore, we first make sure the parent tasks stop and then it will be safe to disconnect. --- .../transports/network/websocket_server.py | 4 ++-- src/pipecat/transports/services/daily.py | 16 ++++++++-------- src/pipecat/transports/services/livekit.py | 8 ++++---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index beac5a3ee..58d104038 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -72,14 +72,14 @@ async def start(self, frame: StartFrame): self._server_task = self.get_event_loop().create_task(self._server_task_handler()) async def stop(self, frame: EndFrame): + await super().stop(frame) self._stop_server_event.set() await self._server_task - await super().stop(frame) async def cancel(self, frame: CancelFrame): + await super().cancel(frame) self._stop_server_event.set() await self._server_task - await super().cancel(frame) async def _server_task_handler(self): logger.info(f"Starting websocket server on {self._host}:{self._port}") diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 44c9f37ee..7456ef816 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -694,6 +694,8 @@ async def start(self, frame: StartFrame): self._audio_in_task = self.get_event_loop().create_task(self._audio_in_task_handler()) async def stop(self, frame: EndFrame): + # Parent stop. + await super().stop(frame) # Leave the room. await self._client.leave() # Stop audio thread. @@ -701,10 +703,10 @@ async def stop(self, frame: EndFrame): self._audio_in_task.cancel() await self._audio_in_task self._audio_in_task = None - # Parent stop. - await super().stop(frame) async def cancel(self, frame: CancelFrame): + # Parent stop. + await super().cancel(frame) # Leave the room. await self._client.leave() # Stop audio thread. @@ -712,8 +714,6 @@ async def cancel(self, frame: CancelFrame): self._audio_in_task.cancel() await self._audio_in_task self._audio_in_task = None - # Parent stop. - await super().cancel(frame) async def cleanup(self): await super().cleanup() @@ -817,16 +817,16 @@ async def start(self, frame: StartFrame): await self._client.join() async def stop(self, frame: EndFrame): - # Leave the room. - await self._client.leave() # Parent stop. await super().stop(frame) - - async def cancel(self, frame: CancelFrame): # Leave the room. await self._client.leave() + + async def cancel(self, frame: CancelFrame): # Parent stop. await super().cancel(frame) + # Leave the room. + await self._client.leave() async def cleanup(self): await super().cleanup() diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index 21a91f664..81a0ffdd1 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -323,19 +323,19 @@ async def start(self, frame: StartFrame): logger.info("LiveKitInputTransport started") async def stop(self, frame: EndFrame): + await super().stop(frame) await self._client.disconnect() if self._audio_in_task: self._audio_in_task.cancel() await self._audio_in_task - await super().stop(frame) logger.info("LiveKitInputTransport stopped") async def cancel(self, frame: CancelFrame): + await super().cancel(frame) await self._client.disconnect() if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled): self._audio_in_task.cancel() await self._audio_in_task - await super().cancel(frame) def vad_analyzer(self) -> VADAnalyzer | None: return self._vad_analyzer @@ -397,13 +397,13 @@ async def start(self, frame: StartFrame): logger.info("LiveKitOutputTransport started") async def stop(self, frame: EndFrame): - await self._client.disconnect() await super().stop(frame) + await self._client.disconnect() logger.info("LiveKitOutputTransport stopped") async def cancel(self, frame: CancelFrame): - await self._client.disconnect() await super().cancel(frame) + await self._client.disconnect() async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): if isinstance(frame, (LiveKitTransportMessageFrame, LiveKitTransportMessageUrgentFrame)):