Skip to content

Commit

Permalink
Merge pull request #65 from daily-co/app-messages
Browse files Browse the repository at this point in the history
Support for app messages
  • Loading branch information
Moishe authored Mar 18, 2024
2 parents c6dfcb6 + 58726dc commit b1ab6f9
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
17 changes: 15 additions & 2 deletions src/dailyai/pipeline/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,22 @@ class OpenAILLMContextFrame(Frame):
context: OpenAILLMContext


class AppMessageQueueFrame(Frame):
@dataclass()
class ReceivedAppMessageFrame(Frame):
message: Any
participantId: str
sender: str

def __str__(self):
return f"ReceivedAppMessageFrame: sender: {self.sender}, message: {self.message}"


@dataclass()
class SendAppMessageFrame(Frame):
message: Any
participantId: str | None

def __str__(self):
return f"SendAppMessageFrame: participantId: {self.participantId}, message: {self.message}"


class UserStartedSpeakingFrame(Frame):
Expand Down
23 changes: 16 additions & 7 deletions src/dailyai/services/base_transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
import queue
import threading
import time
from typing import AsyncGenerator
from typing import Any, AsyncGenerator
from enum import Enum
from dailyai.pipeline.frame_processor import FrameProcessor

from dailyai.pipeline.frames import (
SendAppMessageFrame,
AudioFrame,
EndFrame,
ImageFrame,
Expand Down Expand Up @@ -317,19 +318,21 @@ def _vad(self):
self._vad_state == VADState.STARTING
and self._vad_starting_count >= self._vad_start_frames
):
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(UserStartedSpeakingFrame()), self._loop
)
if self._loop:
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(UserStartedSpeakingFrame()), self._loop
)
# self.interrupt()
self._vad_state = VADState.SPEAKING
self._vad_starting_count = 0
if (
self._vad_state == VADState.STOPPING
and self._vad_stopping_count >= self._vad_stop_frames
):
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop
)
if self._loop:
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop
)
self._vad_state = VADState.QUIET
self._vad_stopping_count = 0

Expand Down Expand Up @@ -375,6 +378,10 @@ def _set_image(self, image: bytes):
def _set_images(self, images: list[bytes], start_frame=0):
self._images = itertools.cycle(images)

def send_app_message(self, message: Any, participantId:str|None):
""" Child classes should override this to send a custom message to the room. """
pass

def _run_camera(self):
try:
while not self._stop_threads.is_set():
Expand Down Expand Up @@ -440,6 +447,8 @@ def _frame_consumer(self):
self._set_image(frame.image)
elif isinstance(frame, SpriteFrame):
self._set_images(frame.images)
elif isinstance(frame, SendAppMessageFrame):
self.send_app_message(frame.message, frame.participantId)
elif len(b):
self.write_frame_to_mic(bytes(b))
b = bytearray()
Expand Down
16 changes: 13 additions & 3 deletions src/dailyai/services/daily_transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import types

from functools import partial
from typing import Any

from dailyai.pipeline.frames import (
ReceivedAppMessageFrame,
TranscriptionQueueFrame,
)

Expand Down Expand Up @@ -124,6 +126,9 @@ def write_frame_to_camera(self, frame: bytes):
def write_frame_to_mic(self, frame: bytes):
self.mic.write_frames(frame)

def send_app_message(self, message: Any, participantId: str | None):
self.client.send_app_message(message, participantId)

def read_audio_frames(self, desired_frame_count):
bytes = self._speaker.read_frames(desired_frame_count)
return bytes
Expand Down Expand Up @@ -219,7 +224,7 @@ def on_first_other_participant_joined(self):
pass

def call_joined(self, join_data, client_error):
#self._logger.info(f"Call_joined: {join_data}, {client_error}")
# self._logger.info(f"Call_joined: {join_data}, {client_error}")
pass

def dialout(self, number):
Expand All @@ -243,8 +248,13 @@ def on_participant_left(self, participant, reason):
if len(self.client.participants()) < self._min_others_count + 1:
self._stop_threads.set()

def on_app_message(self, message, sender):
pass
def on_app_message(self, message:Any, sender:str):
if self._loop:
frame = ReceivedAppMessageFrame(message, sender)
print(frame)
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(frame), self._loop
)

def on_transcription_message(self, message: dict):
if self._loop:
Expand Down

0 comments on commit b1ab6f9

Please sign in to comment.