From 2fc5de6afe4e53ff59c70f3099566990e508126e Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Tue, 26 Mar 2024 08:35:04 -0400 Subject: [PATCH 1/4] Starting refactor of transports into their own directory --- examples/foundational/01-say-one-thing.py | 4 ++-- examples/foundational/01a-local-transport.py | 4 ++-- examples/foundational/02-llm-say-one-thing.py | 4 ++-- examples/foundational/03-still-frame.py | 4 ++-- examples/foundational/03a-image-local.py | 4 ++-- examples/foundational/04-utterance-and-speech.py | 4 ++-- examples/foundational/05-sync-speech-and-image.py | 4 ++-- .../foundational/05a-local-sync-speech-and-text.py | 4 ++-- examples/foundational/06-listen-and-respond.py | 4 ++-- examples/foundational/06a-image-sync.py | 4 ++-- examples/foundational/07-interruptible.py | 4 ++-- examples/foundational/08-bots-arguing.py | 4 ++-- examples/foundational/10-wake-word.py | 4 ++-- examples/foundational/11-sound-effects.py | 4 ++-- examples/foundational/13-whisper-transcription.py | 4 ++-- examples/foundational/13a-whisper-local.py | 4 ++-- examples/foundational/websocket-server/sample.py | 2 +- examples/image-gen.py | 4 ++-- examples/internal/11a-dial-out.py | 4 ++-- examples/starter-apps/chatbot.py | 4 ++-- examples/starter-apps/patient-intake.py | 4 ++-- examples/starter-apps/storybot.py | 4 ++-- examples/starter-apps/translator.py | 4 ++-- .../base_transport.py} | 2 +- .../daily_transport.py} | 10 +++++----- .../local_transport.py} | 4 ++-- .../websocket_transport.py} | 4 ++-- tests/test_daily_transport_service.py | 4 ++-- tests/test_websocket_transport.py | 2 +- 29 files changed, 58 insertions(+), 58 deletions(-) rename src/dailyai/{services/base_transport_service.py => transports/base_transport.py} (99%) rename src/dailyai/{services/daily_transport_service.py => transports/daily_transport.py} (96%) rename src/dailyai/{services/local_transport_service.py => transports/local_transport.py} (95%) rename src/dailyai/{services/websocket_transport_service.py => transports/websocket_transport.py} (97%) diff --git a/examples/foundational/01-say-one-thing.py b/examples/foundational/01-say-one-thing.py index c92e28419..d9af16d7f 100644 --- a/examples/foundational/01-say-one-thing.py +++ b/examples/foundational/01-say-one-thing.py @@ -5,7 +5,7 @@ from dailyai.pipeline.frames import EndFrame, TextFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from runner import configure @@ -20,7 +20,7 @@ async def main(room_url): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Say One Thing", diff --git a/examples/foundational/01a-local-transport.py b/examples/foundational/01a-local-transport.py index b54ecedbd..d653a684c 100644 --- a/examples/foundational/01a-local-transport.py +++ b/examples/foundational/01a-local-transport.py @@ -4,7 +4,7 @@ import os from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService -from dailyai.services.local_transport_service import LocalTransportService +from dailyai.transports.local_transport import LocalTransport logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") logger = logging.getLogger("dailyai") @@ -14,7 +14,7 @@ async def main(): async with aiohttp.ClientSession() as session: meeting_duration_minutes = 1 - transport = LocalTransportService( + transport = LocalTransport( duration_minutes=meeting_duration_minutes, mic_enabled=True ) tts = ElevenLabsTTSService( diff --git a/examples/foundational/02-llm-say-one-thing.py b/examples/foundational/02-llm-say-one-thing.py index e773b7ddb..8e185a581 100644 --- a/examples/foundational/02-llm-say-one-thing.py +++ b/examples/foundational/02-llm-say-one-thing.py @@ -6,7 +6,7 @@ from dailyai.pipeline.frames import EndFrame, LLMMessagesQueueFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.open_ai_services import OpenAILLMService @@ -22,7 +22,7 @@ async def main(room_url): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Say One Thing From an LLM", diff --git a/examples/foundational/03-still-frame.py b/examples/foundational/03-still-frame.py index e1be8f57c..12b519057 100644 --- a/examples/foundational/03-still-frame.py +++ b/examples/foundational/03-still-frame.py @@ -5,7 +5,7 @@ from dailyai.pipeline.frames import TextFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.fal_ai_services import FalImageGenService from runner import configure @@ -20,7 +20,7 @@ async def main(room_url): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Show a still frame image", diff --git a/examples/foundational/03a-image-local.py b/examples/foundational/03a-image-local.py index d3e6b5797..eba208970 100644 --- a/examples/foundational/03a-image-local.py +++ b/examples/foundational/03a-image-local.py @@ -7,7 +7,7 @@ from dailyai.pipeline.frames import TextFrame from dailyai.services.fal_ai_services import FalImageGenService -from dailyai.services.local_transport_service import LocalTransportService +from dailyai.transports.local_transport import LocalTransport logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") logger = logging.getLogger("dailyai") @@ -22,7 +22,7 @@ async def main(): meeting_duration_minutes = 2 tk_root = tk.Tk() tk_root.title("Calendar") - transport = LocalTransportService( + transport = LocalTransport( tk_root=tk_root, mic_enabled=True, camera_enabled=True, diff --git a/examples/foundational/04-utterance-and-speech.py b/examples/foundational/04-utterance-and-speech.py index 7a57cc0e6..7de830df7 100644 --- a/examples/foundational/04-utterance-and-speech.py +++ b/examples/foundational/04-utterance-and-speech.py @@ -6,7 +6,7 @@ from dailyai.pipeline.merge_pipeline import SequentialMergePipeline from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.deepgram_ai_services import DeepgramTTSService from dailyai.pipeline.frames import EndPipeFrame, LLMMessagesQueueFrame, TextFrame @@ -24,7 +24,7 @@ async def main(room_url: str): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Static And Dynamic Speech", diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 35985a8d8..d9343dcd9 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -23,7 +23,7 @@ from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService @@ -63,7 +63,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: async def main(room_url): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Month Narration Bot", diff --git a/examples/foundational/05a-local-sync-speech-and-text.py b/examples/foundational/05a-local-sync-speech-and-text.py index cd21474b0..fd1076ff5 100644 --- a/examples/foundational/05a-local-sync-speech-and-text.py +++ b/examples/foundational/05a-local-sync-speech-and-text.py @@ -9,7 +9,7 @@ from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService -from dailyai.services.local_transport_service import LocalTransportService +from dailyai.transports.local_transport import LocalTransport logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") logger = logging.getLogger("dailyai") @@ -22,7 +22,7 @@ async def main(room_url): tk_root = tk.Tk() tk_root.title("Calendar") - transport = LocalTransportService( + transport = LocalTransport( mic_enabled=True, camera_enabled=True, camera_width=1024, diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index c1112d01f..503ab4e9c 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -5,7 +5,7 @@ from dailyai.pipeline.frames import LLMMessagesQueueFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.ai_services import FrameLogger @@ -25,7 +25,7 @@ async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 37c2ac7fe..6334b46ba 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -6,7 +6,7 @@ from PIL import Image from dailyai.pipeline.frames import ImageFrame, Frame -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.ai_services import AIService from dailyai.pipeline.aggregators import ( LLMAssistantContextAggregator, @@ -42,7 +42,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/foundational/07-interruptible.py b/examples/foundational/07-interruptible.py index 5faf82dec..7b354988d 100644 --- a/examples/foundational/07-interruptible.py +++ b/examples/foundational/07-interruptible.py @@ -9,7 +9,7 @@ from dailyai.pipeline.pipeline import Pipeline from dailyai.services.ai_services import FrameLogger -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -25,7 +25,7 @@ async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/foundational/08-bots-arguing.py b/examples/foundational/08-bots-arguing.py index 42e4d81cc..cca6e5ae3 100644 --- a/examples/foundational/08-bots-arguing.py +++ b/examples/foundational/08-bots-arguing.py @@ -6,7 +6,7 @@ from dailyai.pipeline.aggregators import SentenceAggregator from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService @@ -24,7 +24,7 @@ async def main(room_url: str): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Respond bot", diff --git a/examples/foundational/10-wake-word.py b/examples/foundational/10-wake-word.py index cd82a59e3..565268923 100644 --- a/examples/foundational/10-wake-word.py +++ b/examples/foundational/10-wake-word.py @@ -6,7 +6,7 @@ from typing import AsyncGenerator from PIL import Image -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.pipeline.aggregators import ( @@ -116,7 +116,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Santa Cat", diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 97101f2a8..8fe1e5166 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -4,7 +4,7 @@ import os import wave -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.pipeline.aggregators import ( @@ -72,7 +72,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/foundational/13-whisper-transcription.py b/examples/foundational/13-whisper-transcription.py index 89fbcdacf..701fa012a 100644 --- a/examples/foundational/13-whisper-transcription.py +++ b/examples/foundational/13-whisper-transcription.py @@ -1,7 +1,7 @@ import asyncio import logging -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.whisper_ai_services import WhisperSTTService from runner import configure @@ -15,7 +15,7 @@ async def main(room_url: str): - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Transcription bot", diff --git a/examples/foundational/13a-whisper-local.py b/examples/foundational/13a-whisper-local.py index cbe751766..1fc862038 100644 --- a/examples/foundational/13a-whisper-local.py +++ b/examples/foundational/13a-whisper-local.py @@ -3,7 +3,7 @@ import logging from dailyai.pipeline.frames import EndFrame, TranscriptionQueueFrame -from dailyai.services.local_transport_service import LocalTransportService +from dailyai.transports.local_transport import LocalTransport from dailyai.services.whisper_ai_services import WhisperSTTService logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") @@ -16,7 +16,7 @@ async def main(room_url: str): global stt meeting_duration_minutes = 1 - transport = LocalTransportService( + transport = LocalTransport( mic_enabled=True, camera_enabled=False, speaker_enabled=True, diff --git a/examples/foundational/websocket-server/sample.py b/examples/foundational/websocket-server/sample.py index 72390831e..d82b0cfea 100644 --- a/examples/foundational/websocket-server/sample.py +++ b/examples/foundational/websocket-server/sample.py @@ -6,7 +6,7 @@ from dailyai.pipeline.frames import TextFrame, TranscriptionQueueFrame from dailyai.pipeline.pipeline import Pipeline from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService -from dailyai.services.websocket_transport_service import WebsocketTransport +from dailyai.transports.websocket_transport import WebsocketTransport from dailyai.services.whisper_ai_services import WhisperSTTService logging.basicConfig(format="%(levelno)s %(asctime)s %(message)s") diff --git a/examples/image-gen.py b/examples/image-gen.py index e1cedfcca..6b1efbf76 100644 --- a/examples/image-gen.py +++ b/examples/image-gen.py @@ -5,7 +5,7 @@ import urllib.parse import random -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.pipeline.frames import Frame, FrameType from dailyai.services.fal_ai_services import FalImageGenService @@ -17,7 +17,7 @@ async def main(room_url: str, token): global llm global tts - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Imagebot", diff --git a/examples/internal/11a-dial-out.py b/examples/internal/11a-dial-out.py index 1ed1339ee..ed1ce96f2 100644 --- a/examples/internal/11a-dial-out.py +++ b/examples/internal/11a-dial-out.py @@ -3,7 +3,7 @@ import os import wave -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.pipeline.aggregators import LLMContextAggregator from dailyai.services.ai_services import AIService, FrameLogger @@ -66,7 +66,7 @@ async def main(room_url: str, token, phone): global llm global tts - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/starter-apps/chatbot.py b/examples/starter-apps/chatbot.py index 8ff3cfdfe..364743413 100644 --- a/examples/starter-apps/chatbot.py +++ b/examples/starter-apps/chatbot.py @@ -20,7 +20,7 @@ ) from dailyai.services.ai_services import AIService from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -92,7 +92,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Chatbot", diff --git a/examples/starter-apps/patient-intake.py b/examples/starter-apps/patient-intake.py index aee1d1f32..d05cbc06f 100644 --- a/examples/starter-apps/patient-intake.py +++ b/examples/starter-apps/patient-intake.py @@ -13,7 +13,7 @@ ) from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.openai_llm_context import OpenAILLMContext from dailyai.services.open_ai_services import OpenAILLMService # from dailyai.services.deepgram_ai_services import DeepgramTTSService @@ -292,7 +292,7 @@ async def main(room_url: str, token): global llm global tts - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Intake Bot", diff --git a/examples/starter-apps/storybot.py b/examples/starter-apps/storybot.py index 69070da0e..28c5caedd 100644 --- a/examples/starter-apps/storybot.py +++ b/examples/starter-apps/storybot.py @@ -11,7 +11,7 @@ from dailyai.pipeline.pipeline import Pipeline from dailyai.pipeline.frame_processor import FrameProcessor -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.fal_ai_services import FalImageGenService from dailyai.services.open_ai_services import OpenAILLMService @@ -214,7 +214,7 @@ async def main(room_url: str, token): sp = StoryProcessor(messages, story) sig = StoryImageGenerator(story, llm, img) - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Storybot", diff --git a/examples/starter-apps/translator.py b/examples/starter-apps/translator.py index c7dcf581a..3e691ab26 100644 --- a/examples/starter-apps/translator.py +++ b/examples/starter-apps/translator.py @@ -10,7 +10,7 @@ from dailyai.pipeline.frames import Frame, LLMMessagesQueueFrame, TextFrame from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureTTSService from dailyai.services.open_ai_services import OpenAILLMService @@ -51,7 +51,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Translator", diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/transports/base_transport.py similarity index 99% rename from src/dailyai/services/base_transport_service.py rename to src/dailyai/transports/base_transport.py index c4d963007..43803edc5 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/transports/base_transport.py @@ -72,7 +72,7 @@ class VADState(Enum): STOPPING = 4 -class BaseTransportService: +class ThreadedTransport: def __init__( self, diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/transports/daily_transport.py similarity index 96% rename from src/dailyai/services/daily_transport_service.py rename to src/dailyai/transports/daily_transport.py index 9c8aa5e78..2eab8759a 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/transports/daily_transport.py @@ -24,10 +24,10 @@ VirtualSpeakerDevice, ) -from dailyai.services.base_transport_service import BaseTransportService +from dailyai.transports.base_transport import ThreadedTransport -class DailyTransportService(BaseTransportService, EventHandler): +class DailyTransport(ThreadedTransport, EventHandler): _daily_initialized = False _lock = threading.Lock() @@ -140,10 +140,10 @@ def read_audio_frames(self, desired_frame_count): def _prerun(self): # Only initialize Daily once - if not DailyTransportService._daily_initialized: - with DailyTransportService._lock: + if not DailyTransport._daily_initialized: + with DailyTransport._lock: Daily.init() - DailyTransportService._daily_initialized = True + DailyTransport._daily_initialized = True self.client = CallClient(event_handler=self) if self._mic_enabled: diff --git a/src/dailyai/services/local_transport_service.py b/src/dailyai/transports/local_transport.py similarity index 95% rename from src/dailyai/services/local_transport_service.py rename to src/dailyai/transports/local_transport.py index 2d334e7cd..4c330e024 100644 --- a/src/dailyai/services/local_transport_service.py +++ b/src/dailyai/transports/local_transport.py @@ -3,10 +3,10 @@ import tkinter as tk import pyaudio -from dailyai.services.base_transport_service import BaseTransportService +from dailyai.transports.base_transport import ThreadedTransport -class LocalTransportService(BaseTransportService): +class LocalTransport(ThreadedTransport): def __init__(self, **kwargs): super().__init__(**kwargs) self._sample_width = kwargs.get("sample_width") or 2 diff --git a/src/dailyai/services/websocket_transport_service.py b/src/dailyai/transports/websocket_transport.py similarity index 97% rename from src/dailyai/services/websocket_transport_service.py rename to src/dailyai/transports/websocket_transport.py index a9df7c07d..fc42a4cb9 100644 --- a/src/dailyai/services/websocket_transport_service.py +++ b/src/dailyai/transports/websocket_transport.py @@ -7,7 +7,7 @@ from dailyai.pipeline.frames import AudioFrame, ControlFrame, EndFrame, Frame, TTSEndFrame, TTSStartFrame, TextFrame from dailyai.pipeline.pipeline import Pipeline from dailyai.serializers.protobuf_serializer import ProtobufFrameSerializer -from dailyai.services.base_transport_service import BaseTransportService +from dailyai.transports.base_transport import ThreadedTransport class WebSocketFrameProcessor(FrameProcessor): @@ -45,7 +45,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: yield frame -class WebsocketTransport(BaseTransportService): +class WebsocketTransport(ThreadedTransport): def __init__(self, **kwargs): super().__init__(**kwargs) self._sample_width = kwargs.get("sample_width", 2) diff --git a/tests/test_daily_transport_service.py b/tests/test_daily_transport_service.py index fb748cc97..da4e67389 100644 --- a/tests/test_daily_transport_service.py +++ b/tests/test_daily_transport_service.py @@ -4,9 +4,9 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase): async def test_event_handler(self): - from dailyai.services.daily_transport_service import DailyTransportService + from dailyai.transports.daily_transport import DailyTransport - transport = DailyTransportService("mock.daily.co/mock", "token", "bot") + transport = DailyTransport("mock.daily.co/mock", "token", "bot") was_called = False diff --git a/tests/test_websocket_transport.py b/tests/test_websocket_transport.py index 4b0d3d9b3..ebcf94ea6 100644 --- a/tests/test_websocket_transport.py +++ b/tests/test_websocket_transport.py @@ -4,7 +4,7 @@ from dailyai.pipeline.frames import AudioFrame, EndFrame, TextFrame, TTSEndFrame, TTSStartFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.websocket_transport_service import WebSocketFrameProcessor, WebsocketTransport +from dailyai.transports.websocket_transport import WebSocketFrameProcessor, WebsocketTransport class TestWebSocketTransportService(unittest.IsolatedAsyncioTestCase): From 78c80d894128732081937be6d635c15422230e99 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Tue, 26 Mar 2024 15:57:19 -0400 Subject: [PATCH 2/4] some more renames --- src/dailyai/transports/abstract_transport.py | 12 ++++++++++++ .../{base_transport.py => threaded_transport.py} | 3 +-- 2 files changed, 13 insertions(+), 2 deletions(-) create mode 100644 src/dailyai/transports/abstract_transport.py rename src/dailyai/transports/{base_transport.py => threaded_transport.py} (99%) diff --git a/src/dailyai/transports/abstract_transport.py b/src/dailyai/transports/abstract_transport.py new file mode 100644 index 000000000..6b9c9549c --- /dev/null +++ b/src/dailyai/transports/abstract_transport.py @@ -0,0 +1,12 @@ +from abc import abstractmethod + +from dailyai.pipeline.pipeline import Pipeline + + +class AbstractTransport: + def __init__(self, **kwargs): + pass + + @abstractmethod + async def run(self, pipeline: Pipeline, override_pipeline_source_queue=True): + pass diff --git a/src/dailyai/transports/base_transport.py b/src/dailyai/transports/threaded_transport.py similarity index 99% rename from src/dailyai/transports/base_transport.py rename to src/dailyai/transports/threaded_transport.py index 43803edc5..50bd56cda 100644 --- a/src/dailyai/transports/base_transport.py +++ b/src/dailyai/transports/threaded_transport.py @@ -193,8 +193,7 @@ async def run_pipeline(self, pipeline: Pipeline, override_pipeline_source_queue= async def run_interruptible_pipeline( self, pipeline: Pipeline, - allow_interruptions=True, - pre_processor=None, + pre_processor: FrameProcessor | None = None, post_processor: FrameProcessor | None = None, ): pipeline.set_sink(self.send_queue) From e8a6560ac12331f435f8519ccbf2c5bfd44854b4 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Tue, 26 Mar 2024 16:24:47 -0400 Subject: [PATCH 3/4] Merge forgotten files --- src/dailyai/transports/daily_transport.py | 4 ++-- src/dailyai/transports/local_transport.py | 2 +- src/dailyai/transports/websocket_transport.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dailyai/transports/daily_transport.py b/src/dailyai/transports/daily_transport.py index 2eab8759a..1da114d31 100644 --- a/src/dailyai/transports/daily_transport.py +++ b/src/dailyai/transports/daily_transport.py @@ -24,7 +24,7 @@ VirtualSpeakerDevice, ) -from dailyai.transports.base_transport import ThreadedTransport +from dailyai.transports.threaded_transport import ThreadedTransport class DailyTransport(ThreadedTransport, EventHandler): @@ -48,7 +48,7 @@ def __init__( start_transcription: bool = False, **kwargs, ): - # This will call BaseTransportService.__init__ method, not EventHandler + # This will call ThreadedTransport.__init__ method, not EventHandler super().__init__(**kwargs) self._room_url: str = room_url diff --git a/src/dailyai/transports/local_transport.py b/src/dailyai/transports/local_transport.py index 4c330e024..1e791a467 100644 --- a/src/dailyai/transports/local_transport.py +++ b/src/dailyai/transports/local_transport.py @@ -3,7 +3,7 @@ import tkinter as tk import pyaudio -from dailyai.transports.base_transport import ThreadedTransport +from dailyai.transports.threaded_transport import ThreadedTransport class LocalTransport(ThreadedTransport): diff --git a/src/dailyai/transports/websocket_transport.py b/src/dailyai/transports/websocket_transport.py index fc42a4cb9..84c185a3d 100644 --- a/src/dailyai/transports/websocket_transport.py +++ b/src/dailyai/transports/websocket_transport.py @@ -7,7 +7,7 @@ from dailyai.pipeline.frames import AudioFrame, ControlFrame, EndFrame, Frame, TTSEndFrame, TTSStartFrame, TextFrame from dailyai.pipeline.pipeline import Pipeline from dailyai.serializers.protobuf_serializer import ProtobufFrameSerializer -from dailyai.transports.base_transport import ThreadedTransport +from dailyai.transports.threaded_transport import ThreadedTransport class WebSocketFrameProcessor(FrameProcessor): From 4ce140bf844add96c60b5bafb611f51e2c948e6b Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Wed, 27 Mar 2024 12:59:08 -0400 Subject: [PATCH 4/4] Move some things to AbstractTransport class --- src/dailyai/pipeline/merge_pipeline.py | 3 +- src/dailyai/transports/abstract_transport.py | 31 ++++++++++++++++++- src/dailyai/transports/threaded_transport.py | 22 ++----------- src/dailyai/transports/websocket_transport.py | 3 +- 4 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/dailyai/pipeline/merge_pipeline.py b/src/dailyai/pipeline/merge_pipeline.py index 736903e9d..e4b865f4a 100644 --- a/src/dailyai/pipeline/merge_pipeline.py +++ b/src/dailyai/pipeline/merge_pipeline.py @@ -12,9 +12,10 @@ def __init__(self, pipelines: List[Pipeline]): self.pipelines = pipelines async def run_pipeline(self): - for pipeline in self.pipelines: + for idx, pipeline in enumerate(self.pipelines): while True: frame = await pipeline.sink.get() + print(idx, frame) if isinstance( frame, EndFrame) or isinstance( frame, EndPipeFrame): diff --git a/src/dailyai/transports/abstract_transport.py b/src/dailyai/transports/abstract_transport.py index 6b9c9549c..f5ecc5880 100644 --- a/src/dailyai/transports/abstract_transport.py +++ b/src/dailyai/transports/abstract_transport.py @@ -1,12 +1,41 @@ from abc import abstractmethod +import asyncio +import logging +import time +from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.pipeline import Pipeline class AbstractTransport: def __init__(self, **kwargs): - pass + self.send_queue = asyncio.Queue() + self.receive_queue = asyncio.Queue() + self.completed_queue = asyncio.Queue() + + duration_minutes = kwargs.get("duration_minutes") or 10 + self._expiration = time.time() + duration_minutes * 60 + + self._mic_enabled = kwargs.get("mic_enabled") or False + self._mic_sample_rate = kwargs.get("mic_sample_rate") or 16000 + self._camera_enabled = kwargs.get("camera_enabled") or False + self._camera_width = kwargs.get("camera_width") or 1024 + self._camera_height = kwargs.get("camera_height") or 768 + self._speaker_enabled = kwargs.get("speaker_enabled") or False + self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000 + self._fps = kwargs.get("fps") or 8 + + self._logger: logging.Logger = logging.getLogger("dailyai.transport") @abstractmethod async def run(self, pipeline: Pipeline, override_pipeline_source_queue=True): pass + + @abstractmethod + async def run_interruptible_pipeline( + self, + pipeline: Pipeline, + pre_processor: FrameProcessor | None = None, + post_processor: FrameProcessor | None = None, + ): + pass diff --git a/src/dailyai/transports/threaded_transport.py b/src/dailyai/transports/threaded_transport.py index 50bd56cda..d529731cd 100644 --- a/src/dailyai/transports/threaded_transport.py +++ b/src/dailyai/transports/threaded_transport.py @@ -27,6 +27,7 @@ ) from dailyai.pipeline.pipeline import Pipeline from dailyai.services.ai_services import TTSService +from dailyai.transports.abstract_transport import AbstractTransport torch.set_num_threads(1) @@ -72,20 +73,13 @@ class VADState(Enum): STOPPING = 4 -class ThreadedTransport: +class ThreadedTransport(AbstractTransport): def __init__( self, **kwargs, ) -> None: - self._mic_enabled = kwargs.get("mic_enabled") or False - self._mic_sample_rate = kwargs.get("mic_sample_rate") or 16000 - self._camera_enabled = kwargs.get("camera_enabled") or False - self._camera_width = kwargs.get("camera_width") or 1024 - self._camera_height = kwargs.get("camera_height") or 768 - self._speaker_enabled = kwargs.get("speaker_enabled") or False - self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000 - self._fps = kwargs.get("fps") or 8 + super().__init__(**kwargs) self._vad_start_s = kwargs.get("vad_start_s") or 0.2 self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8 self._context = kwargs.get("context") or [] @@ -105,14 +99,6 @@ def __init__( self._vad_state = VADState.QUIET self._user_is_speaking = False - duration_minutes = kwargs.get("duration_minutes") or 10 - self._expiration = time.time() + duration_minutes * 60 - - self.send_queue = asyncio.Queue() - self.receive_queue = asyncio.Queue() - - self.completed_queue = asyncio.Queue() - self._threadsafe_send_queue = queue.Queue() self._images = None @@ -125,8 +111,6 @@ def __init__( self._stop_threads = threading.Event() self._is_interrupted = threading.Event() - self._logger: logging.Logger = logging.getLogger() - async def run(self, pipeline: Pipeline | None = None, override_pipeline_source_queue=True): self._prerun() diff --git a/src/dailyai/transports/websocket_transport.py b/src/dailyai/transports/websocket_transport.py index 84c185a3d..0784eb89f 100644 --- a/src/dailyai/transports/websocket_transport.py +++ b/src/dailyai/transports/websocket_transport.py @@ -7,6 +7,7 @@ from dailyai.pipeline.frames import AudioFrame, ControlFrame, EndFrame, Frame, TTSEndFrame, TTSStartFrame, TextFrame from dailyai.pipeline.pipeline import Pipeline from dailyai.serializers.protobuf_serializer import ProtobufFrameSerializer +from dailyai.transports.abstract_transport import AbstractTransport from dailyai.transports.threaded_transport import ThreadedTransport @@ -45,7 +46,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: yield frame -class WebsocketTransport(ThreadedTransport): +class WebsocketTransport(AbstractTransport): def __init__(self, **kwargs): super().__init__(**kwargs) self._sample_width = kwargs.get("sample_width", 2)