Skip to content

Commit

Permalink
transports: disconnect client first
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed Dec 13, 2024
1 parent f8e69cf commit 7c516e0
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 37 deletions.
6 changes: 3 additions & 3 deletions src/pipecat/transports/network/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,18 @@ def __init__(
self._stop_server_event = asyncio.Event()

async def start(self, frame: StartFrame):
self._server_task = self.get_event_loop().create_task(self._server_task_handler())
await super().start(frame)
self._server_task = self.get_event_loop().create_task(self._server_task_handler())

async def stop(self, frame: EndFrame):
await super().stop(frame)
self._stop_server_event.set()
await self._server_task
await super().stop(frame)

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
self._stop_server_event.set()
await self._server_task
await super().cancel(frame)

async def _server_task_handler(self):
logger.info(f"Starting websocket server on {self._host}:{self._port}")
Expand Down
16 changes: 8 additions & 8 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,26 +694,26 @@ async def start(self, frame: StartFrame):
self._audio_in_task = self.get_event_loop().create_task(self._audio_in_task_handler())

async def stop(self, frame: EndFrame):
# Parent stop.
await super().stop(frame)
# Leave the room.
await self._client.leave()
# Stop audio thread.
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
self._audio_in_task = None
# Parent stop.
await super().stop(frame)

async def cancel(self, frame: CancelFrame):
# Parent stop.
await super().cancel(frame)
# Leave the room.
await self._client.leave()
# Stop audio thread.
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
self._audio_in_task = None
# Parent stop.
await super().cancel(frame)

async def cleanup(self):
await super().cleanup()
Expand Down Expand Up @@ -817,16 +817,16 @@ async def start(self, frame: StartFrame):
await self._client.join()

async def stop(self, frame: EndFrame):
# Parent stop.
await super().stop(frame)
# Leave the room.
await self._client.leave()
# Parent stop.
await super().stop(frame)

async def cancel(self, frame: CancelFrame):
# Parent stop.
await super().cancel(frame)
# Leave the room.
await self._client.leave()
# Parent stop.
await super().cancel(frame)

async def cleanup(self):
await super().cleanup()
Expand Down
31 changes: 5 additions & 26 deletions src/pipecat/transports/services/livekit.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,28 +324,19 @@ async def start(self, frame: StartFrame):
logger.info("LiveKitInputTransport started")

async def stop(self, frame: EndFrame):
await self._client.disconnect()
if self._audio_in_task:
self._audio_in_task.cancel()
try:
await self._audio_in_task
except asyncio.CancelledError:
pass
await self._audio_in_task
await super().stop(frame)
await self._client.disconnect()
logger.info("LiveKitInputTransport stopped")

async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, EndFrame):
await self.stop(frame)
else:
await super().process_frame(frame, direction)

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._client.disconnect()
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
await super().cancel(frame)

def vad_analyzer(self) -> VADAnalyzer | None:
return self._vad_analyzer
Expand Down Expand Up @@ -407,19 +398,13 @@ async def start(self, frame: StartFrame):
logger.info("LiveKitOutputTransport started")

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._client.disconnect()
await super().stop(frame)
logger.info("LiveKitOutputTransport stopped")

async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, EndFrame):
await self.stop(frame)
else:
await super().process_frame(frame, direction)

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._client.disconnect()
await super().cancel(frame)

async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
if isinstance(frame, (LiveKitTransportMessageFrame, LiveKitTransportMessageUrgentFrame)):
Expand Down Expand Up @@ -526,12 +511,6 @@ async def _on_connected(self):

async def _on_disconnected(self):
await self._call_event_handler("on_disconnected")
# Attempt to reconnect
try:
await self._client.connect()
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Failed to reconnect: {e}")

async def _on_participant_connected(self, participant_id: str):
await self._call_event_handler("on_participant_connected", participant_id)
Expand Down

0 comments on commit 7c516e0

Please sign in to comment.