Skip to content

Commit

Permalink
Merge pull request #504 from pipecat-ai/aleix/rtvi-handle-frame
Browse files Browse the repository at this point in the history
rtvi: add RTVIProcessor.handle_message()
  • Loading branch information
aconchillo authored Sep 25, 2024
2 parents e89814d + 1a3de0e commit 8502c7c
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ async def set_client_ready(self):
self._client_ready = True
await self._maybe_send_bot_ready()

async def handle_message(self, message: RTVIMessage):
await self._message_queue.put(message)

async def handle_function_call(
self,
function_name: str,
Expand Down Expand Up @@ -492,20 +495,21 @@ async def _action_task_handler(self):
async def _message_task_handler(self):
while True:
try:
frame = await self._message_queue.get()
await self._handle_message(frame)
message = await self._message_queue.get()
await self._handle_message(message)
self._message_queue.task_done()
except asyncio.CancelledError:
break

async def _handle_message(self, frame: TransportMessageFrame):
async def _handle_transport_message(self, frame: TransportMessageFrame):
try:
message = RTVIMessage.model_validate(frame.message)
await self._message_queue.put(message)
except ValidationError as e:
await self.send_error(f"Invalid incoming message: {e}")
logger.warning(f"Invalid incoming message: {e}")
return
await self.send_error(f"Invalid RTVI transport message: {e}")
logger.warning(f"Invalid RTVI transport message: {e}")

async def _handle_message(self, message: RTVIMessage):
try:
match message.type:
case "client-ready":
Expand All @@ -531,8 +535,8 @@ async def _handle_message(self, frame: TransportMessageFrame):
await self._send_error_response(message.id, f"Unsupported type {message.type}")

except ValidationError as e:
await self._send_error_response(message.id, f"Invalid incoming message: {e}")
logger.warning(f"Invalid incoming message: {e}")
await self._send_error_response(message.id, f"Invalid message: {e}")
logger.warning(f"Invalid message: {e}")
except Exception as e:
await self._send_error_response(message.id, f"Exception processing message: {e}")
logger.warning(f"Exception processing message: {e}")
Expand Down

0 comments on commit 8502c7c

Please sign in to comment.