From 7c516e0fe510331fee58927efea12c450a428b9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 12 Dec 2024 17:40:01 -0800 Subject: [PATCH] transports: disconnect client first --- .../transports/network/websocket_server.py | 6 ++-- src/pipecat/transports/services/daily.py | 16 +++++----- src/pipecat/transports/services/livekit.py | 31 +++---------------- 3 files changed, 16 insertions(+), 37 deletions(-) diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 711bc7596..ce9b9614d 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -69,18 +69,18 @@ def __init__( self._stop_server_event = asyncio.Event() async def start(self, frame: StartFrame): - self._server_task = self.get_event_loop().create_task(self._server_task_handler()) await super().start(frame) + self._server_task = self.get_event_loop().create_task(self._server_task_handler()) async def stop(self, frame: EndFrame): - await super().stop(frame) self._stop_server_event.set() await self._server_task + await super().stop(frame) async def cancel(self, frame: CancelFrame): - await super().cancel(frame) self._stop_server_event.set() await self._server_task + await super().cancel(frame) async def _server_task_handler(self): logger.info(f"Starting websocket server on {self._host}:{self._port}") diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 7456ef816..44c9f37ee 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -694,8 +694,6 @@ async def start(self, frame: StartFrame): self._audio_in_task = self.get_event_loop().create_task(self._audio_in_task_handler()) async def stop(self, frame: EndFrame): - # Parent stop. - await super().stop(frame) # Leave the room. await self._client.leave() # Stop audio thread. @@ -703,10 +701,10 @@ async def stop(self, frame: EndFrame): self._audio_in_task.cancel() await self._audio_in_task self._audio_in_task = None + # Parent stop. + await super().stop(frame) async def cancel(self, frame: CancelFrame): - # Parent stop. - await super().cancel(frame) # Leave the room. await self._client.leave() # Stop audio thread. @@ -714,6 +712,8 @@ async def cancel(self, frame: CancelFrame): self._audio_in_task.cancel() await self._audio_in_task self._audio_in_task = None + # Parent stop. + await super().cancel(frame) async def cleanup(self): await super().cleanup() @@ -817,16 +817,16 @@ async def start(self, frame: StartFrame): await self._client.join() async def stop(self, frame: EndFrame): - # Parent stop. - await super().stop(frame) # Leave the room. await self._client.leave() + # Parent stop. + await super().stop(frame) async def cancel(self, frame: CancelFrame): - # Parent stop. - await super().cancel(frame) # Leave the room. await self._client.leave() + # Parent stop. + await super().cancel(frame) async def cleanup(self): await super().cleanup() diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index f53c8332f..ded16182f 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -324,28 +324,19 @@ async def start(self, frame: StartFrame): logger.info("LiveKitInputTransport started") async def stop(self, frame: EndFrame): + await self._client.disconnect() if self._audio_in_task: self._audio_in_task.cancel() - try: - await self._audio_in_task - except asyncio.CancelledError: - pass + await self._audio_in_task await super().stop(frame) - await self._client.disconnect() logger.info("LiveKitInputTransport stopped") - async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, EndFrame): - await self.stop(frame) - else: - await super().process_frame(frame, direction) - async def cancel(self, frame: CancelFrame): - await super().cancel(frame) await self._client.disconnect() if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled): self._audio_in_task.cancel() await self._audio_in_task + await super().cancel(frame) def vad_analyzer(self) -> VADAnalyzer | None: return self._vad_analyzer @@ -407,19 +398,13 @@ async def start(self, frame: StartFrame): logger.info("LiveKitOutputTransport started") async def stop(self, frame: EndFrame): - await super().stop(frame) await self._client.disconnect() + await super().stop(frame) logger.info("LiveKitOutputTransport stopped") - async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, EndFrame): - await self.stop(frame) - else: - await super().process_frame(frame, direction) - async def cancel(self, frame: CancelFrame): - await super().cancel(frame) await self._client.disconnect() + await super().cancel(frame) async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): if isinstance(frame, (LiveKitTransportMessageFrame, LiveKitTransportMessageUrgentFrame)): @@ -526,12 +511,6 @@ async def _on_connected(self): async def _on_disconnected(self): await self._call_event_handler("on_disconnected") - # Attempt to reconnect - try: - await self._client.connect() - await self._call_event_handler("on_connected") - except Exception as e: - logger.error(f"Failed to reconnect: {e}") async def _on_participant_connected(self, participant_id: str): await self._call_event_handler("on_participant_connected", participant_id)