Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports: call parent stop() before disconnecting #882

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/pipecat/transports/network/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ async def start(self, frame: StartFrame):
self._server_task = self.get_event_loop().create_task(self._server_task_handler())

async def stop(self, frame: EndFrame):
await super().stop(frame)
self._stop_server_event.set()
await self._server_task
await super().stop(frame)

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
self._stop_server_event.set()
await self._server_task
await super().cancel(frame)

async def _server_task_handler(self):
logger.info(f"Starting websocket server on {self._host}:{self._port}")
Expand Down
16 changes: 8 additions & 8 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,26 +694,26 @@ async def start(self, frame: StartFrame):
self._audio_in_task = self.get_event_loop().create_task(self._audio_in_task_handler())

async def stop(self, frame: EndFrame):
# Parent stop.
await super().stop(frame)
# Leave the room.
await self._client.leave()
# Stop audio thread.
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
self._audio_in_task = None
# Parent stop.
await super().stop(frame)

async def cancel(self, frame: CancelFrame):
# Parent stop.
await super().cancel(frame)
# Leave the room.
await self._client.leave()
# Stop audio thread.
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
self._audio_in_task = None
# Parent stop.
await super().cancel(frame)

async def cleanup(self):
await super().cleanup()
Expand Down Expand Up @@ -817,16 +817,16 @@ async def start(self, frame: StartFrame):
await self._client.join()

async def stop(self, frame: EndFrame):
# Leave the room.
await self._client.leave()
# Parent stop.
await super().stop(frame)

async def cancel(self, frame: CancelFrame):
# Leave the room.
await self._client.leave()

async def cancel(self, frame: CancelFrame):
# Parent stop.
await super().cancel(frame)
# Leave the room.
await self._client.leave()

async def cleanup(self):
await super().cleanup()
Expand Down
8 changes: 4 additions & 4 deletions src/pipecat/transports/services/livekit.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,19 +323,19 @@ async def start(self, frame: StartFrame):
logger.info("LiveKitInputTransport started")

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._client.disconnect()
if self._audio_in_task:
self._audio_in_task.cancel()
await self._audio_in_task
await super().stop(frame)
logger.info("LiveKitInputTransport stopped")

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._client.disconnect()
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
await super().cancel(frame)

def vad_analyzer(self) -> VADAnalyzer | None:
return self._vad_analyzer
Expand Down Expand Up @@ -397,13 +397,13 @@ async def start(self, frame: StartFrame):
logger.info("LiveKitOutputTransport started")

async def stop(self, frame: EndFrame):
await self._client.disconnect()
await super().stop(frame)
await self._client.disconnect()
logger.info("LiveKitOutputTransport stopped")

async def cancel(self, frame: CancelFrame):
await self._client.disconnect()
await super().cancel(frame)
await self._client.disconnect()

async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
if isinstance(frame, (LiveKitTransportMessageFrame, LiveKitTransportMessageUrgentFrame)):
Expand Down
Loading