From 1a3de0e8191a7529a474ec17f49b906d78c82623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 24 Sep 2024 19:12:06 -0700 Subject: [PATCH] rtvi: add RTVIProcessor.handle_message() --- src/pipecat/processors/frameworks/rtvi.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 49095b2e4..b75b65627 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -343,6 +343,9 @@ async def set_client_ready(self): self._client_ready = True await self._maybe_send_bot_ready() + async def handle_message(self, message: RTVIMessage): + await self._message_queue.put(message) + async def handle_function_call( self, function_name: str, @@ -492,20 +495,21 @@ async def _action_task_handler(self): async def _message_task_handler(self): while True: try: - frame = await self._message_queue.get() - await self._handle_message(frame) + message = await self._message_queue.get() + await self._handle_message(message) self._message_queue.task_done() except asyncio.CancelledError: break - async def _handle_message(self, frame: TransportMessageFrame): + async def _handle_transport_message(self, frame: TransportMessageFrame): try: message = RTVIMessage.model_validate(frame.message) + await self._message_queue.put(message) except ValidationError as e: - await self.send_error(f"Invalid incoming message: {e}") - logger.warning(f"Invalid incoming message: {e}") - return + await self.send_error(f"Invalid RTVI transport message: {e}") + logger.warning(f"Invalid RTVI transport message: {e}") + async def _handle_message(self, message: RTVIMessage): try: match message.type: case "client-ready": @@ -531,8 +535,8 @@ async def _handle_message(self, frame: TransportMessageFrame): await self._send_error_response(message.id, f"Unsupported type {message.type}") except ValidationError as e: - await self._send_error_response(message.id, f"Invalid incoming message: {e}") - logger.warning(f"Invalid incoming message: {e}") + await self._send_error_response(message.id, f"Invalid message: {e}") + logger.warning(f"Invalid message: {e}") except Exception as e: await self._send_error_response(message.id, f"Exception processing message: {e}") logger.warning(f"Exception processing message: {e}")