From 8e61fe8e3603bf37d9046bdf18e72790ccc8d50f Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Mon, 18 Mar 2024 10:08:41 -0400 Subject: [PATCH 1/2] Support for app messages --- src/dailyai/pipeline/frames.py | 14 +++++++++-- .../services/base_transport_service.py | 24 +++++++++++++------ .../services/daily_transport_service.py | 16 ++++++++++--- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/src/dailyai/pipeline/frames.py b/src/dailyai/pipeline/frames.py index d0d279bd9..09f206c87 100644 --- a/src/dailyai/pipeline/frames.py +++ b/src/dailyai/pipeline/frames.py @@ -95,9 +95,19 @@ class OpenAILLMContextFrame(Frame): context: OpenAILLMContext -class AppMessageQueueFrame(Frame): +@dataclass() +class ReceivedAppMessageFrame(Frame): message: Any - participantId: str + sender: str + + def __str__(self): + return f"ReceivedAppMessageFrame: sender: {self.sender}, message: {self.message}" + + +@dataclass() +class SendAppMessageFrame(Frame): + message: Any + participantId: str | None class UserStartedSpeakingFrame(Frame): diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/services/base_transport_service.py index a24a5dc48..316ba7be2 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/services/base_transport_service.py @@ -8,11 +8,13 @@ import queue import threading import time -from typing import AsyncGenerator +from typing import Any, AsyncGenerator from enum import Enum from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import ( + ReceivedAppMessageFrame, + SendAppMessageFrame, AudioFrame, EndFrame, ImageFrame, @@ -317,9 +319,10 @@ def _vad(self): self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames ): - asyncio.run_coroutine_threadsafe( - self.receive_queue.put(UserStartedSpeakingFrame()), self._loop - ) + if self._loop: + asyncio.run_coroutine_threadsafe( + self.receive_queue.put(UserStartedSpeakingFrame()), self._loop + ) # self.interrupt() self._vad_state = VADState.SPEAKING self._vad_starting_count = 0 @@ -327,9 +330,10 @@ def _vad(self): self._vad_state == VADState.STOPPING and self._vad_stopping_count >= self._vad_stop_frames ): - asyncio.run_coroutine_threadsafe( - self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop - ) + if self._loop: + asyncio.run_coroutine_threadsafe( + self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop + ) self._vad_state = VADState.QUIET self._vad_stopping_count = 0 @@ -375,6 +379,10 @@ def _set_image(self, image: bytes): def _set_images(self, images: list[bytes], start_frame=0): self._images = itertools.cycle(images) + def send_app_message(self, message: Any, participantId:str|None): + """ Child classes should override this to send a custom message to the room. """ + pass + def _run_camera(self): try: while not self._stop_threads.is_set(): @@ -440,6 +448,8 @@ def _frame_consumer(self): self._set_image(frame.image) elif isinstance(frame, SpriteFrame): self._set_images(frame.images) + elif isinstance(frame, SendAppMessageFrame): + self.send_app_message(frame.message, frame.participantId) elif len(b): self.write_frame_to_mic(bytes(b)) b = bytearray() diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 16c8519ba..aacb74a3c 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -6,8 +6,10 @@ import types from functools import partial +from typing import Any from dailyai.pipeline.frames import ( + ReceivedAppMessageFrame, TranscriptionQueueFrame, ) @@ -124,6 +126,9 @@ def write_frame_to_camera(self, frame: bytes): def write_frame_to_mic(self, frame: bytes): self.mic.write_frames(frame) + def send_app_message(self, message: Any, participantId: str | None): + self.client.send_app_message(message, participantId) + def read_audio_frames(self, desired_frame_count): bytes = self._speaker.read_frames(desired_frame_count) return bytes @@ -219,7 +224,7 @@ def on_first_other_participant_joined(self): pass def call_joined(self, join_data, client_error): - #self._logger.info(f"Call_joined: {join_data}, {client_error}") + # self._logger.info(f"Call_joined: {join_data}, {client_error}") pass def dialout(self, number): @@ -243,8 +248,13 @@ def on_participant_left(self, participant, reason): if len(self.client.participants()) < self._min_others_count + 1: self._stop_threads.set() - def on_app_message(self, message, sender): - pass + def on_app_message(self, message:Any, sender:str): + if self._loop: + frame = ReceivedAppMessageFrame(message, sender) + print(frame) + asyncio.run_coroutine_threadsafe( + self.receive_queue.put(frame), self._loop + ) def on_transcription_message(self, message: dict): if self._loop: From 58726dc20db116c79585559a51615b566b87b2a7 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Mon, 18 Mar 2024 10:14:51 -0400 Subject: [PATCH 2/2] clean up imports --- src/dailyai/pipeline/frames.py | 3 +++ src/dailyai/services/base_transport_service.py | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dailyai/pipeline/frames.py b/src/dailyai/pipeline/frames.py index 09f206c87..c3dbe1d25 100644 --- a/src/dailyai/pipeline/frames.py +++ b/src/dailyai/pipeline/frames.py @@ -109,6 +109,9 @@ class SendAppMessageFrame(Frame): message: Any participantId: str | None + def __str__(self): + return f"SendAppMessageFrame: participantId: {self.participantId}, message: {self.message}" + class UserStartedSpeakingFrame(Frame): pass diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/services/base_transport_service.py index 316ba7be2..b05c7b1f3 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/services/base_transport_service.py @@ -13,7 +13,6 @@ from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import ( - ReceivedAppMessageFrame, SendAppMessageFrame, AudioFrame, EndFrame,