Skip to content

Commit

Permalink
Merge pull request #855 from pipecat-ai/aleix/transport-services-disc…
Browse files Browse the repository at this point in the history
…onnect-fixes

transports(services): disconnect client first
  • Loading branch information
aconchillo authored Dec 13, 2024
2 parents 3d96369 + ccc9699 commit 2b8c35c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 38 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ gstreamer = [ "pygobject~=3.48.2" ]
fireworks = [ "openai~=1.57.2" ]
krisp = [ "pipecat-ai-krisp~=0.3.0" ]
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
livekit = [ "livekit~=0.17.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ]
livekit = [ "livekit~=0.18.2", "livekit-api~=0.8.0", "tenacity~=8.5.0" ]
lmnt = [ "lmnt~=1.1.4" ]
local = [ "pyaudio~=0.2.14" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ]
Expand Down
6 changes: 3 additions & 3 deletions src/pipecat/transports/network/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,18 @@ def __init__(
self._stop_server_event = asyncio.Event()

async def start(self, frame: StartFrame):
self._server_task = self.get_event_loop().create_task(self._server_task_handler())
await super().start(frame)
self._server_task = self.get_event_loop().create_task(self._server_task_handler())

async def stop(self, frame: EndFrame):
await super().stop(frame)
self._stop_server_event.set()
await self._server_task
await super().stop(frame)

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
self._stop_server_event.set()
await self._server_task
await super().cancel(frame)

async def _server_task_handler(self):
logger.info(f"Starting websocket server on {self._host}:{self._port}")
Expand Down
16 changes: 8 additions & 8 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,26 +694,26 @@ async def start(self, frame: StartFrame):
self._audio_in_task = self.get_event_loop().create_task(self._audio_in_task_handler())

async def stop(self, frame: EndFrame):
# Parent stop.
await super().stop(frame)
# Leave the room.
await self._client.leave()
# Stop audio thread.
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
self._audio_in_task = None
# Parent stop.
await super().stop(frame)

async def cancel(self, frame: CancelFrame):
# Parent stop.
await super().cancel(frame)
# Leave the room.
await self._client.leave()
# Stop audio thread.
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
self._audio_in_task = None
# Parent stop.
await super().cancel(frame)

async def cleanup(self):
await super().cleanup()
Expand Down Expand Up @@ -817,16 +817,16 @@ async def start(self, frame: StartFrame):
await self._client.join()

async def stop(self, frame: EndFrame):
# Parent stop.
await super().stop(frame)
# Leave the room.
await self._client.leave()
# Parent stop.
await super().stop(frame)

async def cancel(self, frame: CancelFrame):
# Parent stop.
await super().cancel(frame)
# Leave the room.
await self._client.leave()
# Parent stop.
await super().cancel(frame)

async def cleanup(self):
await super().cleanup()
Expand Down
31 changes: 5 additions & 26 deletions src/pipecat/transports/services/livekit.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,28 +324,19 @@ async def start(self, frame: StartFrame):
logger.info("LiveKitInputTransport started")

async def stop(self, frame: EndFrame):
await self._client.disconnect()
if self._audio_in_task:
self._audio_in_task.cancel()
try:
await self._audio_in_task
except asyncio.CancelledError:
pass
await self._audio_in_task
await super().stop(frame)
await self._client.disconnect()
logger.info("LiveKitInputTransport stopped")

async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, EndFrame):
await self.stop(frame)
else:
await super().process_frame(frame, direction)

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._client.disconnect()
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
await super().cancel(frame)

def vad_analyzer(self) -> VADAnalyzer | None:
return self._vad_analyzer
Expand Down Expand Up @@ -407,19 +398,13 @@ async def start(self, frame: StartFrame):
logger.info("LiveKitOutputTransport started")

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._client.disconnect()
await super().stop(frame)
logger.info("LiveKitOutputTransport stopped")

async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, EndFrame):
await self.stop(frame)
else:
await super().process_frame(frame, direction)

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._client.disconnect()
await super().cancel(frame)

async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
if isinstance(frame, (LiveKitTransportMessageFrame, LiveKitTransportMessageUrgentFrame)):
Expand Down Expand Up @@ -526,12 +511,6 @@ async def _on_connected(self):

async def _on_disconnected(self):
await self._call_event_handler("on_disconnected")
# Attempt to reconnect
try:
await self._client.connect()
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Failed to reconnect: {e}")

async def _on_participant_connected(self, participant_id: str):
await self._call_event_handler("on_participant_connected", participant_id)
Expand Down

0 comments on commit 2b8c35c

Please sign in to comment.