From 337d42133885d55194493b76436aa249a05f0b0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 12 Dec 2024 17:40:01 -0800 Subject: [PATCH 1/2] transports: disconnect client first --- pyproject.toml | 2 +- .../transports/network/websocket_server.py | 6 ++-- src/pipecat/transports/services/daily.py | 16 +++++----- src/pipecat/transports/services/livekit.py | 31 +++---------------- 4 files changed, 17 insertions(+), 38 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f8b122580..fd0c1be56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,7 +55,7 @@ gstreamer = [ "pygobject~=3.48.2" ] fireworks = [ "openai~=1.57.2" ] krisp = [ "pipecat-ai-krisp~=0.3.0" ] langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ] -livekit = [ "livekit~=0.17.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ] +livekit = [ "livekit~=0.12.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ] lmnt = [ "lmnt~=1.1.4" ] local = [ "pyaudio~=0.2.14" ] moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ] diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 711bc7596..ce9b9614d 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -69,18 +69,18 @@ def __init__( self._stop_server_event = asyncio.Event() async def start(self, frame: StartFrame): - self._server_task = self.get_event_loop().create_task(self._server_task_handler()) await super().start(frame) + self._server_task = self.get_event_loop().create_task(self._server_task_handler()) async def stop(self, frame: EndFrame): - await super().stop(frame) self._stop_server_event.set() await self._server_task + await super().stop(frame) async def cancel(self, frame: CancelFrame): - await super().cancel(frame) self._stop_server_event.set() await self._server_task + await super().cancel(frame) async def _server_task_handler(self): logger.info(f"Starting websocket server on {self._host}:{self._port}") diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 7456ef816..44c9f37ee 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -694,8 +694,6 @@ async def start(self, frame: StartFrame): self._audio_in_task = self.get_event_loop().create_task(self._audio_in_task_handler()) async def stop(self, frame: EndFrame): - # Parent stop. - await super().stop(frame) # Leave the room. await self._client.leave() # Stop audio thread. @@ -703,10 +701,10 @@ async def stop(self, frame: EndFrame): self._audio_in_task.cancel() await self._audio_in_task self._audio_in_task = None + # Parent stop. + await super().stop(frame) async def cancel(self, frame: CancelFrame): - # Parent stop. - await super().cancel(frame) # Leave the room. await self._client.leave() # Stop audio thread. @@ -714,6 +712,8 @@ async def cancel(self, frame: CancelFrame): self._audio_in_task.cancel() await self._audio_in_task self._audio_in_task = None + # Parent stop. + await super().cancel(frame) async def cleanup(self): await super().cleanup() @@ -817,16 +817,16 @@ async def start(self, frame: StartFrame): await self._client.join() async def stop(self, frame: EndFrame): - # Parent stop. - await super().stop(frame) # Leave the room. await self._client.leave() + # Parent stop. + await super().stop(frame) async def cancel(self, frame: CancelFrame): - # Parent stop. - await super().cancel(frame) # Leave the room. await self._client.leave() + # Parent stop. + await super().cancel(frame) async def cleanup(self): await super().cleanup() diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index f53c8332f..ded16182f 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -324,28 +324,19 @@ async def start(self, frame: StartFrame): logger.info("LiveKitInputTransport started") async def stop(self, frame: EndFrame): + await self._client.disconnect() if self._audio_in_task: self._audio_in_task.cancel() - try: - await self._audio_in_task - except asyncio.CancelledError: - pass + await self._audio_in_task await super().stop(frame) - await self._client.disconnect() logger.info("LiveKitInputTransport stopped") - async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, EndFrame): - await self.stop(frame) - else: - await super().process_frame(frame, direction) - async def cancel(self, frame: CancelFrame): - await super().cancel(frame) await self._client.disconnect() if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled): self._audio_in_task.cancel() await self._audio_in_task + await super().cancel(frame) def vad_analyzer(self) -> VADAnalyzer | None: return self._vad_analyzer @@ -407,19 +398,13 @@ async def start(self, frame: StartFrame): logger.info("LiveKitOutputTransport started") async def stop(self, frame: EndFrame): - await super().stop(frame) await self._client.disconnect() + await super().stop(frame) logger.info("LiveKitOutputTransport stopped") - async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, EndFrame): - await self.stop(frame) - else: - await super().process_frame(frame, direction) - async def cancel(self, frame: CancelFrame): - await super().cancel(frame) await self._client.disconnect() + await super().cancel(frame) async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): if isinstance(frame, (LiveKitTransportMessageFrame, LiveKitTransportMessageUrgentFrame)): @@ -526,12 +511,6 @@ async def _on_connected(self): async def _on_disconnected(self): await self._call_event_handler("on_disconnected") - # Attempt to reconnect - try: - await self._client.connect() - await self._call_event_handler("on_connected") - except Exception as e: - logger.error(f"Failed to reconnect: {e}") async def _on_participant_connected(self, participant_id: str): await self._call_event_handler("on_participant_connected", participant_id) From ccc96994e9bd195029b0f6012ee48a35598befb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 12 Dec 2024 19:09:36 -0800 Subject: [PATCH 2/2] pyproject: update livekit --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index fd0c1be56..862981c7b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,7 +55,7 @@ gstreamer = [ "pygobject~=3.48.2" ] fireworks = [ "openai~=1.57.2" ] krisp = [ "pipecat-ai-krisp~=0.3.0" ] langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ] -livekit = [ "livekit~=0.12.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ] +livekit = [ "livekit~=0.18.2", "livekit-api~=0.8.0", "tenacity~=8.5.0" ] lmnt = [ "lmnt~=1.1.4" ] local = [ "pyaudio~=0.2.14" ] moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ]