From 62fd371b978a1265f96d84c1e036b537d9e6cee5 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Wed, 6 Mar 2024 14:09:06 -0500 Subject: [PATCH] Remove Queue in frame names --- src/dailyai/pipeline/aggregators.py | 60 +++++++-------- src/dailyai/pipeline/frame_processor.py | 12 +-- src/dailyai/pipeline/frames.py | 41 +++++----- src/dailyai/pipeline/pipeline.py | 18 ++--- src/dailyai/services/ai_services.py | 58 +++++++------- .../services/base_transport_service.py | 50 ++++++------ src/dailyai/services/local_stt_service.py | 6 +- src/dailyai/tests/test_aggregators.py | 76 +++++++++---------- src/dailyai/tests/test_ai_services.py | 14 ++-- .../tests/test_daily_transport_service.py | 2 +- src/dailyai/tests/test_pipeline.py | 24 +++--- src/examples/foundational/01-say-one-thing.py | 2 +- src/examples/foundational/03-still-frame.py | 4 +- src/examples/foundational/03a-image-local.py | 4 +- .../foundational/04-utterance-and-speech.py | 4 +- .../foundational/05-sync-speech-and-image.py | 8 +- .../05a-local-sync-speech-and-text.py | 6 +- .../foundational/06-listen-and-respond.py | 2 +- src/examples/foundational/06a-image-sync.py | 8 +- src/examples/foundational/08-bots-arguing.py | 10 +-- src/examples/foundational/10-wake-word.py | 24 +++--- src/examples/foundational/11-sound-effects.py | 14 ++-- .../foundational/13a-whisper-local.py | 6 +- src/examples/image-gen.py | 10 +-- src/examples/internal/11a-dial-out.py | 14 ++-- 25 files changed, 239 insertions(+), 238 deletions(-) diff --git a/src/dailyai/pipeline/aggregators.py b/src/dailyai/pipeline/aggregators.py index 9b244ef00..fd9f5a849 100644 --- a/src/dailyai/pipeline/aggregators.py +++ b/src/dailyai/pipeline/aggregators.py @@ -5,13 +5,13 @@ from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import ( - ControlQueueFrame, - EndParallelPipeQueueFrame, - EndStreamQueueFrame, + ControlFrame, + EndPipeFrame, + EndFrame, LLMMessagesQueueFrame, - LLMResponseEndQueueFrame, - QueueFrame, - TextQueueFrame, + LLMResponseEndFrame, + Frame, + TextFrame, TranscriptionQueueFrame, ) from dailyai.pipeline.pipeline import Pipeline @@ -38,10 +38,10 @@ def __init__( self.pass_through = pass_through async def process_frame( - self, frame: QueueFrame - ) -> AsyncGenerator[QueueFrame, None]: + self, frame: Frame + ) -> AsyncGenerator[Frame, None]: # We don't do anything with non-text frames, pass it along to next in the pipeline. - if not isinstance(frame, TextQueueFrame): + if not isinstance(frame, TextFrame): yield frame return @@ -71,7 +71,7 @@ async def process_frame( self.messages.append({"role": self.role, "content": frame.text}) yield LLMMessagesQueueFrame(self.messages) - async def finalize(self) -> AsyncGenerator[QueueFrame, None]: + async def finalize(self) -> AsyncGenerator[Frame, None]: # Send any dangling words that weren't finished with punctuation. if self.complete_sentences and self.sentence: self.messages.append({"role": self.role, "content": self.sentence}) @@ -106,18 +106,18 @@ def __init__(self): self.aggregation = "" async def process_frame( - self, frame: QueueFrame - ) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, TextQueueFrame): + self, frame: Frame + ) -> AsyncGenerator[Frame, None]: + if isinstance(frame, TextFrame): m = re.search("(.*[?.!])(.*)", frame.text) if m: - yield TextQueueFrame(self.aggregation + m.group(1)) + yield TextFrame(self.aggregation + m.group(1)) self.aggregation = m.group(2) else: self.aggregation += frame.text - elif isinstance(frame, EndStreamQueueFrame): + elif isinstance(frame, EndFrame): if self.aggregation: - yield TextQueueFrame(self.aggregation) + yield TextFrame(self.aggregation) yield frame else: yield frame @@ -128,12 +128,12 @@ def __init__(self): self.aggregation = "" async def process_frame( - self, frame: QueueFrame - ) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, TextQueueFrame): + self, frame: Frame + ) -> AsyncGenerator[Frame, None]: + if isinstance(frame, TextFrame): self.aggregation += frame.text - elif isinstance(frame, LLMResponseEndQueueFrame): - yield TextQueueFrame(self.aggregation) + elif isinstance(frame, LLMResponseEndFrame): + yield TextFrame(self.aggregation) self.aggregation = "" else: yield frame @@ -143,20 +143,20 @@ class StatelessTextTransformer(FrameProcessor): def __init__(self, transform_fn): self.transform_fn = transform_fn - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, TextQueueFrame): + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if isinstance(frame, TextFrame): result = self.transform_fn(frame.text) if isinstance(result, Coroutine): result = await result - yield TextQueueFrame(result) + yield TextFrame(result) else: yield frame class ParallelPipeline(FrameProcessor): def __init__(self, pipeline_definitions: List[List[FrameProcessor]]): self.sources = [asyncio.Queue() for _ in pipeline_definitions] - self.sink: asyncio.Queue[QueueFrame] = asyncio.Queue() + self.sink: asyncio.Queue[Frame] = asyncio.Queue() self.pipelines: list[Pipeline] = [ Pipeline( pipeline_definition, @@ -166,10 +166,10 @@ def __init__(self, pipeline_definitions: List[List[FrameProcessor]]): for source, pipeline_definition in zip(self.sources, pipeline_definitions) ] - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: for source in self.sources: await source.put(frame) - await source.put(EndParallelPipeQueueFrame()) + await source.put(EndPipeFrame()) await asyncio.gather(*[pipeline.run_pipeline() for pipeline in self.pipelines]) @@ -186,7 +186,7 @@ async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, N seen_ids.add(id(frame)) # Skip passing along EndParallelPipeQueueFrame, because we use them for our own flow control. - if not isinstance(frame, EndParallelPipeQueueFrame): + if not isinstance(frame, EndPipeFrame): yield frame class GatedAggregator(FrameProcessor): @@ -194,9 +194,9 @@ def __init__(self, gate_open_fn, gate_close_fn, start_open): self.gate_open_fn = gate_open_fn self.gate_close_fn = gate_close_fn self.gate_open = start_open - self.accumulator: List[QueueFrame] = [] + self.accumulator: List[Frame] = [] - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: if self.gate_open: if self.gate_close_fn(frame): self.gate_open = False diff --git a/src/dailyai/pipeline/frame_processor.py b/src/dailyai/pipeline/frame_processor.py index 5bebd9be2..2c5367c1b 100644 --- a/src/dailyai/pipeline/frame_processor.py +++ b/src/dailyai/pipeline/frame_processor.py @@ -1,7 +1,7 @@ from abc import abstractmethod from typing import AsyncGenerator -from dailyai.pipeline.frames import ControlQueueFrame, QueueFrame +from dailyai.pipeline.frames import ControlFrame, Frame """ This is the base class for all frame processors. Frame processors consume a frame @@ -20,16 +20,16 @@ class FrameProcessor: @abstractmethod async def process_frame( - self, frame: QueueFrame - ) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, ControlQueueFrame): + self, frame: Frame + ) -> AsyncGenerator[Frame, None]: + if isinstance(frame, ControlFrame): yield frame @abstractmethod - async def finalize(self) -> AsyncGenerator[QueueFrame, None]: + async def finalize(self) -> AsyncGenerator[Frame, None]: # This is a trick for the interpreter (and linter) to know that this is a generator. if False: - yield QueueFrame() + yield Frame() @abstractmethod async def interrupted(self) -> None: diff --git a/src/dailyai/pipeline/frames.py b/src/dailyai/pipeline/frames.py index e8a31d0eb..aa20b27ea 100644 --- a/src/dailyai/pipeline/frames.py +++ b/src/dailyai/pipeline/frames.py @@ -2,72 +2,73 @@ from typing import Any -class QueueFrame: - def __eq__(self, other): - return isinstance(other, self.__class__) - - -class ControlQueueFrame(QueueFrame): +class Frame: pass +class ControlFrame(Frame): + # Control frames should contain no instance data, so + # equality is based solely on the class. + def __eq__(self, other): + return type(other) == self.__class__ + -class StartStreamQueueFrame(ControlQueueFrame): +class StartFrame(ControlFrame): pass -class EndStreamQueueFrame(ControlQueueFrame): +class EndFrame(ControlFrame): pass -class EndParallelPipeQueueFrame(ControlQueueFrame): +class EndPipeFrame(ControlFrame): pass -class LLMResponseStartQueueFrame(QueueFrame): +class LLMResponseStartFrame(ControlFrame): pass -class LLMResponseEndQueueFrame(QueueFrame): +class LLMResponseEndFrame(ControlFrame): pass @dataclass() -class AudioQueueFrame(QueueFrame): +class AudioFrame(Frame): data: bytes @dataclass() -class ImageQueueFrame(QueueFrame): +class ImageFrame(Frame): url: str | None image: bytes @dataclass() -class SpriteQueueFrame(QueueFrame): +class SpriteFrame(Frame): images: list[bytes] @dataclass() -class TextQueueFrame(QueueFrame): +class TextFrame(Frame): text: str @dataclass() -class TranscriptionQueueFrame(TextQueueFrame): +class TranscriptionQueueFrame(TextFrame): participantId: str timestamp: str @dataclass() -class LLMMessagesQueueFrame(QueueFrame): +class LLMMessagesQueueFrame(Frame): messages: list[dict[str, str]] # TODO: define this more concretely! -class AppMessageQueueFrame(QueueFrame): +class AppMessageQueueFrame(Frame): message: Any participantId: str -class UserStartedSpeakingFrame(QueueFrame): +class UserStartedSpeakingFrame(Frame): pass -class UserStoppedSpeakingFrame(QueueFrame): +class UserStoppedSpeakingFrame(Frame): pass diff --git a/src/dailyai/pipeline/pipeline.py b/src/dailyai/pipeline/pipeline.py index 0ef2ab72a..dad4bc042 100644 --- a/src/dailyai/pipeline/pipeline.py +++ b/src/dailyai/pipeline/pipeline.py @@ -2,7 +2,7 @@ from typing import AsyncGenerator, List from dailyai.pipeline.frame_processor import FrameProcessor -from dailyai.pipeline.frames import EndParallelPipeQueueFrame, EndStreamQueueFrame, QueueFrame +from dailyai.pipeline.frames import EndPipeFrame, EndFrame, Frame """ This class manages a pipe of FrameProcessors, and runs them in sequence. The "source" @@ -17,19 +17,19 @@ def __init__( self, processors: List[FrameProcessor], source: asyncio.Queue | None = None, - sink: asyncio.Queue[QueueFrame] | None = None, + sink: asyncio.Queue[Frame] | None = None, ): self.processors = processors - self.source: asyncio.Queue[QueueFrame] | None = source - self.sink: asyncio.Queue[QueueFrame] | None = sink + self.source: asyncio.Queue[Frame] | None = source + self.sink: asyncio.Queue[Frame] | None = sink - def set_source(self, source: asyncio.Queue[QueueFrame]): + def set_source(self, source: asyncio.Queue[Frame]): self.source = source - def set_sink(self, sink: asyncio.Queue[QueueFrame]): + def set_sink(self, sink: asyncio.Queue[Frame]): self.sink = sink - async def get_next_source_frame(self) -> AsyncGenerator[QueueFrame, None]: + async def get_next_source_frame(self) -> AsyncGenerator[Frame, None]: if self.source is None: raise ValueError("Source queue not set") yield await self.source.get() @@ -52,9 +52,9 @@ async def run_pipeline(self): async for frame in frame_generator: await self.sink.put(frame) if isinstance( - frame, EndStreamQueueFrame + frame, EndFrame ) or isinstance( - frame, EndParallelPipeQueueFrame + frame, EndPipeFrame ): return except asyncio.CancelledError: diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 13b069a1c..129eed5cf 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -6,14 +6,14 @@ from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import ( - AudioQueueFrame, - EndStreamQueueFrame, - ImageQueueFrame, + AudioFrame, + EndFrame, + ImageFrame, LLMMessagesQueueFrame, - LLMResponseEndQueueFrame, - LLMResponseStartQueueFrame, - QueueFrame, - TextQueueFrame, + LLMResponseEndFrame, + LLMResponseStartFrame, + Frame, + TextFrame, TranscriptionQueueFrame, ) @@ -33,14 +33,14 @@ async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=Fal await queue.put(frame) if add_end_of_stream: - await queue.put(EndStreamQueueFrame()) + await queue.put(EndFrame()) async def run( self, - frames: Iterable[QueueFrame] - | AsyncIterable[QueueFrame] - | asyncio.Queue[QueueFrame], - ) -> AsyncGenerator[QueueFrame, None]: + frames: Iterable[Frame] + | AsyncIterable[Frame] + | asyncio.Queue[Frame], + ) -> AsyncGenerator[Frame, None]: try: if isinstance(frames, AsyncIterable): async for frame in frames: @@ -55,7 +55,7 @@ async def run( frame = await frames.get() async for output_frame in self.process_frame(frame): yield output_frame - if isinstance(frame, EndStreamQueueFrame): + if isinstance(frame, EndFrame): break else: raise Exception("Frames must be an iterable or async iterable") @@ -76,12 +76,12 @@ async def run_llm_async(self, messages) -> AsyncGenerator[str, None]: async def run_llm(self, messages) -> str: pass - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: if isinstance(frame, LLMMessagesQueueFrame): - yield LLMResponseStartQueueFrame() + yield LLMResponseStartFrame() async for text_chunk in self.run_llm_async(frame.messages): - yield TextQueueFrame(text_chunk) - yield LLMResponseEndQueueFrame() + yield TextFrame(text_chunk) + yield LLMResponseEndFrame() else: yield frame @@ -103,8 +103,8 @@ async def run_tts(self, text) -> AsyncGenerator[bytes, None]: # yield empty bytes here, so linting can infer what this method does yield bytes() - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if not isinstance(frame, TextQueueFrame): + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if not isinstance(frame, TextFrame): yield frame return @@ -119,16 +119,16 @@ async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, N if text: async for audio_chunk in self.run_tts(text): - yield AudioQueueFrame(audio_chunk) + yield AudioFrame(audio_chunk) async def finalize(self): if self.current_sentence: async for audio_chunk in self.run_tts(self.current_sentence): - yield AudioQueueFrame(audio_chunk) + yield AudioFrame(audio_chunk) # Convenience function to send the audio for a sentence to the given queue async def say(self, sentence, queue: asyncio.Queue): - await self.run_to_queue(queue, [TextQueueFrame(sentence)]) + await self.run_to_queue(queue, [TextFrame(sentence)]) class ImageGenService(AIService): @@ -141,13 +141,13 @@ def __init__(self, image_size, **kwargs): async def run_image_gen(self, sentence: str) -> tuple[str, bytes]: pass - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if not isinstance(frame, TextQueueFrame): + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if not isinstance(frame, TextFrame): yield frame return (url, image_data) = await self.run_image_gen(frame.text) - yield ImageQueueFrame(url, image_data) + yield ImageFrame(url, image_data) class STTService(AIService): @@ -164,9 +164,9 @@ async def run_stt(self, audio: BinaryIO) -> str: """Returns transcript as a string""" pass - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: """Processes a frame of audio data, either buffering or transcribing it.""" - if not isinstance(frame, AudioQueueFrame): + if not isinstance(frame, AudioFrame): return data = frame.data @@ -187,8 +187,8 @@ def __init__(self, prefix="Frame", **kwargs): super().__init__(**kwargs) self.prefix = prefix - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, (AudioQueueFrame, ImageQueueFrame)): + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if isinstance(frame, (AudioFrame, ImageFrame)): self.logger.info(f"{self.prefix}: {type(frame)}") else: print(f"{self.prefix}: {frame}") diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/services/base_transport_service.py index 6ad51261d..71c45c377 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/services/base_transport_service.py @@ -12,12 +12,12 @@ from enum import Enum from dailyai.pipeline.frames import ( - AudioQueueFrame, - EndStreamQueueFrame, - ImageQueueFrame, - QueueFrame, - SpriteQueueFrame, - StartStreamQueueFrame, + AudioFrame, + EndFrame, + ImageFrame, + Frame, + SpriteFrame, + StartFrame, TranscriptionQueueFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame @@ -159,7 +159,7 @@ async def run(self): self._stop_threads.set() - await self.send_queue.put(EndStreamQueueFrame()) + await self.send_queue.put(EndFrame()) await async_output_queue_marshal_task await self.send_queue.join() self._frame_consumer_thread.join() @@ -182,7 +182,7 @@ async def run_interruptible_pipeline(self, pipeline:Pipeline, allow_interruption pipeline.set_sink(self.send_queue) pipeline_task = asyncio.create_task(pipeline.run_pipeline()) - async def yield_frame(frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def yield_frame(frame:Frame) -> AsyncGenerator[Frame, None]: yield frame async def post_process(post_processor): @@ -194,7 +194,7 @@ async def post_process(post_processor): print("post-processing frame: ", frame.__class__.__name__) await post_processor.process_frame(frame) - if isinstance(frame, EndStreamQueueFrame): + if isinstance(frame, EndFrame): break post_process_task = asyncio.create_task(post_process(post_processor)) @@ -214,7 +214,7 @@ async def post_process(post_processor): async for frame in frame_generator: await source_queue.put(frame) - if isinstance(frame, EndStreamQueueFrame): + if isinstance(frame, EndFrame): break await asyncio.gather(pipeline_task, post_process_task) @@ -303,20 +303,20 @@ def _vad(self): async def _marshal_frames(self): while True: - frame: QueueFrame | list = await self.send_queue.get() + frame: Frame | list = await self.send_queue.get() self._threadsafe_send_queue.put(frame) self.send_queue.task_done() - if isinstance(frame, EndStreamQueueFrame): + if isinstance(frame, EndFrame): break def interrupt(self): self._is_interrupted.set() - async def get_receive_frames(self) -> AsyncGenerator[QueueFrame, None]: + async def get_receive_frames(self) -> AsyncGenerator[Frame, None]: while True: frame = await self.receive_queue.get() yield frame - if isinstance(frame, EndStreamQueueFrame): + if isinstance(frame, EndFrame): break def _receive_audio(self): @@ -329,13 +329,13 @@ def _receive_audio(self): while not self._stop_threads.is_set(): buffer = self.read_audio_frames(desired_frame_count) if len(buffer) > 0: - frame = AudioQueueFrame(buffer) + frame = AudioFrame(buffer) asyncio.run_coroutine_threadsafe( self.receive_queue.put(frame), self._loop ) asyncio.run_coroutine_threadsafe( - self.receive_queue.put(EndStreamQueueFrame()), self._loop + self.receive_queue.put(EndFrame()), self._loop ) def _set_image(self, image: bytes): @@ -363,18 +363,18 @@ def _frame_consumer(self): all_audio_frames = bytearray() while True: try: - frames_or_frame: QueueFrame | list[QueueFrame] = ( + frames_or_frame: Frame | list[Frame] = ( self._threadsafe_send_queue.get() ) - if isinstance(frames_or_frame, QueueFrame): - frames: list[QueueFrame] = [frames_or_frame] + if isinstance(frames_or_frame, Frame): + frames: list[Frame] = [frames_or_frame] elif isinstance(frames_or_frame, list): - frames: list[QueueFrame] = frames_or_frame + frames: list[Frame] = frames_or_frame else: raise Exception("Unknown type in output queue") for frame in frames: - if isinstance(frame, EndStreamQueueFrame): + if isinstance(frame, EndFrame): self._logger.info("Stopping frame consumer thread") self._threadsafe_send_queue.task_done() if self._loop: @@ -386,7 +386,7 @@ def _frame_consumer(self): # if interrupted, we just pull frames off the queue and discard them if not self._is_interrupted.is_set(): if frame: - if isinstance(frame, AudioQueueFrame): + if isinstance(frame, AudioFrame): chunk = frame.data all_audio_frames.extend(chunk) @@ -398,9 +398,9 @@ def _frame_consumer(self): if truncated_length: self.write_frame_to_mic(bytes(b[:truncated_length])) b = b[truncated_length:] - elif isinstance(frame, ImageQueueFrame): + elif isinstance(frame, ImageFrame): self._set_image(frame.image) - elif isinstance(frame, SpriteQueueFrame): + elif isinstance(frame, SpriteFrame): self._set_images(frame.images) elif len(b): self.write_frame_to_mic(bytes(b)) @@ -418,7 +418,7 @@ def _frame_consumer(self): self.write_frame_to_mic(bytes(b[:truncated_length])) b = bytearray() - if isinstance(frame, StartStreamQueueFrame): + if isinstance(frame, StartFrame): self._is_interrupted.clear() self._threadsafe_send_queue.task_done() diff --git a/src/dailyai/services/local_stt_service.py b/src/dailyai/services/local_stt_service.py index 30471af8d..727bafbc6 100644 --- a/src/dailyai/services/local_stt_service.py +++ b/src/dailyai/services/local_stt_service.py @@ -4,7 +4,7 @@ import time from typing import AsyncGenerator import wave -from dailyai.pipeline.frames import AudioQueueFrame, QueueFrame, TranscriptionQueueFrame +from dailyai.pipeline.frames import AudioFrame, Frame, TranscriptionQueueFrame from dailyai.services.ai_services import STTService @@ -39,9 +39,9 @@ def _new_wave(self): ww.setframerate(self._frame_rate) self._wave = ww - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: """Processes a frame of audio data, either buffering or transcribing it.""" - if not isinstance(frame, AudioQueueFrame): + if not isinstance(frame, AudioFrame): return data = frame.data diff --git a/src/dailyai/tests/test_aggregators.py b/src/dailyai/tests/test_aggregators.py index 5a2cfb07d..bad689e73 100644 --- a/src/dailyai/tests/test_aggregators.py +++ b/src/dailyai/tests/test_aggregators.py @@ -9,13 +9,13 @@ StatelessTextTransformer, ) from dailyai.pipeline.frames import ( - AudioQueueFrame, - EndStreamQueueFrame, - ImageQueueFrame, - LLMResponseEndQueueFrame, - LLMResponseStartQueueFrame, - QueueFrame, - TextQueueFrame, + AudioFrame, + EndFrame, + ImageFrame, + LLMResponseEndFrame, + LLMResponseStartFrame, + Frame, + TextFrame, ) from dailyai.pipeline.pipeline import Pipeline @@ -27,46 +27,46 @@ async def test_sentence_aggregator(self): expected_sentences = ["Hello, world.", " How are you?", " I am fine "] aggregator = SentenceAggregator() for word in sentence.split(" "): - async for sentence in aggregator.process_frame(TextQueueFrame(word + " ")): - self.assertIsInstance(sentence, TextQueueFrame) - if isinstance(sentence, TextQueueFrame): + async for sentence in aggregator.process_frame(TextFrame(word + " ")): + self.assertIsInstance(sentence, TextFrame) + if isinstance(sentence, TextFrame): self.assertEqual(sentence.text, expected_sentences.pop(0)) - async for sentence in aggregator.process_frame(EndStreamQueueFrame()): + async for sentence in aggregator.process_frame(EndFrame()): if len(expected_sentences): - self.assertIsInstance(sentence, TextQueueFrame) - if isinstance(sentence, TextQueueFrame): + self.assertIsInstance(sentence, TextFrame) + if isinstance(sentence, TextFrame): self.assertEqual(sentence.text, expected_sentences.pop(0)) else: - self.assertIsInstance(sentence, EndStreamQueueFrame) + self.assertIsInstance(sentence, EndFrame) self.assertEqual(expected_sentences, []) async def test_gated_accumulator(self): gated_aggregator = GatedAggregator( - gate_open_fn=lambda frame: isinstance(frame, ImageQueueFrame), - gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartQueueFrame), + gate_open_fn=lambda frame: isinstance(frame, ImageFrame), + gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartFrame), start_open=False, ) frames = [ - LLMResponseStartQueueFrame(), - TextQueueFrame("Hello, "), - TextQueueFrame("world."), - AudioQueueFrame(b"hello"), - ImageQueueFrame("image", b"image"), - AudioQueueFrame(b"world"), - LLMResponseEndQueueFrame(), + LLMResponseStartFrame(), + TextFrame("Hello, "), + TextFrame("world."), + AudioFrame(b"hello"), + ImageFrame("image", b"image"), + AudioFrame(b"world"), + LLMResponseEndFrame(), ] expected_output_frames = [ - ImageQueueFrame("image", b"image"), - LLMResponseStartQueueFrame(), - TextQueueFrame("Hello, "), - TextQueueFrame("world."), - AudioQueueFrame(b"hello"), - AudioQueueFrame(b"world"), - LLMResponseEndQueueFrame(), + ImageFrame("image", b"image"), + LLMResponseStartFrame(), + TextFrame("Hello, "), + TextFrame("world."), + AudioFrame(b"hello"), + AudioFrame(b"world"), + LLMResponseEndFrame(), ] for frame in frames: async for out_frame in gated_aggregator.process_frame(frame): @@ -98,16 +98,16 @@ async def slow_add(sleep_time:float, name:str, x: str): ) frames = [ - TextQueueFrame("Hello, "), - TextQueueFrame("world."), - EndStreamQueueFrame() + TextFrame("Hello, "), + TextFrame("world."), + EndFrame() ] - expected_output_frames: list[QueueFrame] = [ - TextQueueFrame(text='Hello, :pipe1.'), - TextQueueFrame(text='world.:pipe1.'), - TextQueueFrame(text='Hello, world.:pipe2.'), - EndStreamQueueFrame() + expected_output_frames: list[Frame] = [ + TextFrame(text='Hello, :pipe1.'), + TextFrame(text='world.:pipe1.'), + TextFrame(text='Hello, world.:pipe2.'), + EndFrame() ] for frame in frames: diff --git a/src/dailyai/tests/test_ai_services.py b/src/dailyai/tests/test_ai_services.py index 007616eda..6e007a85a 100644 --- a/src/dailyai/tests/test_ai_services.py +++ b/src/dailyai/tests/test_ai_services.py @@ -3,11 +3,11 @@ from typing import AsyncGenerator, Generator from dailyai.services.ai_services import AIService -from dailyai.pipeline.frames import EndStreamQueueFrame, QueueFrame, TextQueueFrame +from dailyai.pipeline.frames import EndFrame, Frame, TextFrame class SimpleAIService(AIService): - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: yield frame @@ -16,11 +16,11 @@ async def test_async_input(self): service = SimpleAIService() input_frames = [ - TextQueueFrame("hello"), - EndStreamQueueFrame() + TextFrame("hello"), + EndFrame() ] - async def iterate_frames() -> AsyncGenerator[QueueFrame, None]: + async def iterate_frames() -> AsyncGenerator[Frame, None]: for frame in input_frames: yield frame @@ -33,9 +33,9 @@ async def iterate_frames() -> AsyncGenerator[QueueFrame, None]: async def test_nonasync_input(self): service = SimpleAIService() - input_frames = [TextQueueFrame("hello"), EndStreamQueueFrame()] + input_frames = [TextFrame("hello"), EndFrame()] - def iterate_frames() -> Generator[QueueFrame, None, None]: + def iterate_frames() -> Generator[Frame, None, None]: for frame in input_frames: yield frame diff --git a/src/dailyai/tests/test_daily_transport_service.py b/src/dailyai/tests/test_daily_transport_service.py index 8914acb6f..71eec2000 100644 --- a/src/dailyai/tests/test_daily_transport_service.py +++ b/src/dailyai/tests/test_daily_transport_service.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock, patch -from dailyai.pipeline.frames import AudioQueueFrame, ImageQueueFrame +from dailyai.pipeline.frames import AudioFrame, ImageFrame class TestDailyTransport(unittest.IsolatedAsyncioTestCase): diff --git a/src/dailyai/tests/test_pipeline.py b/src/dailyai/tests/test_pipeline.py index 2e1d4289c..b5c9edd05 100644 --- a/src/dailyai/tests/test_pipeline.py +++ b/src/dailyai/tests/test_pipeline.py @@ -2,7 +2,7 @@ from doctest import OutputChecker import unittest from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer -from dailyai.pipeline.frames import EndStreamQueueFrame, TextQueueFrame +from dailyai.pipeline.frames import EndFrame, TextFrame from dailyai.pipeline.pipeline import Pipeline @@ -16,14 +16,14 @@ async def test_pipeline_simple(self): incoming_queue = asyncio.Queue() pipeline = Pipeline([aggregator], incoming_queue, outgoing_queue) - await incoming_queue.put(TextQueueFrame("Hello, ")) - await incoming_queue.put(TextQueueFrame("world.")) - await incoming_queue.put(EndStreamQueueFrame()) + await incoming_queue.put(TextFrame("Hello, ")) + await incoming_queue.put(TextFrame("world.")) + await incoming_queue.put(EndFrame()) await pipeline.run_pipeline() - self.assertEqual(await outgoing_queue.get(), TextQueueFrame("Hello, world.")) - self.assertIsInstance(await outgoing_queue.get(), EndStreamQueueFrame) + self.assertEqual(await outgoing_queue.get(), TextFrame("Hello, world.")) + self.assertIsInstance(await outgoing_queue.get(), EndFrame) async def test_pipeline_multiple_stages(self): sentence_aggregator = SentenceAggregator() @@ -40,21 +40,21 @@ async def test_pipeline_multiple_stages(self): sentence = "Hello, world. It's me, a pipeline." for c in sentence: - await incoming_queue.put(TextQueueFrame(c)) - await incoming_queue.put(EndStreamQueueFrame()) + await incoming_queue.put(TextFrame(c)) + await incoming_queue.put(EndFrame()) await pipeline.run_pipeline() self.assertEqual( - await outgoing_queue.get(), TextQueueFrame("H E L L O , W O R L D .") + await outgoing_queue.get(), TextFrame("H E L L O , W O R L D .") ) self.assertEqual( await outgoing_queue.get(), - TextQueueFrame(" I T ' S M E , A P I P E L I N E ."), + TextFrame(" I T ' S M E , A P I P E L I N E ."), ) # leftover little bit because of the spacing self.assertEqual( await outgoing_queue.get(), - TextQueueFrame(" "), + TextFrame(" "), ) - self.assertIsInstance(await outgoing_queue.get(), EndStreamQueueFrame) + self.assertIsInstance(await outgoing_queue.get(), EndFrame) diff --git a/src/examples/foundational/01-say-one-thing.py b/src/examples/foundational/01-say-one-thing.py index 9df54d708..eb78810cf 100644 --- a/src/examples/foundational/01-say-one-thing.py +++ b/src/examples/foundational/01-say-one-thing.py @@ -28,7 +28,6 @@ async def main(room_url): mic_enabled=True ) - """ tts = ElevenLabsTTSService( aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), @@ -39,6 +38,7 @@ async def main(room_url): user_id=os.getenv("PLAY_HT_USER_ID"), voice_url=os.getenv("PLAY_HT_VOICE_URL"), ) + """ # Register an event handler so we can play the audio when the participant joins. @transport.event_handler("on_participant_joined") diff --git a/src/examples/foundational/03-still-frame.py b/src/examples/foundational/03-still-frame.py index 8fefd5cba..cdafe5132 100644 --- a/src/examples/foundational/03-still-frame.py +++ b/src/examples/foundational/03-still-frame.py @@ -2,7 +2,7 @@ import aiohttp import os -from dailyai.pipeline.frames import TextQueueFrame +from dailyai.pipeline.frames import TextFrame from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.fal_ai_services import FalImageGenService from dailyai.services.open_ai_services import OpenAIImageGenService @@ -39,7 +39,7 @@ async def main(room_url): image_task = asyncio.create_task( imagegen.run_to_queue( transport.send_queue, [ - TextQueueFrame("a cat in the style of picasso")])) + TextFrame("a cat in the style of picasso")])) @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): diff --git a/src/examples/foundational/03a-image-local.py b/src/examples/foundational/03a-image-local.py index 0904a060e..05f9e82a5 100644 --- a/src/examples/foundational/03a-image-local.py +++ b/src/examples/foundational/03a-image-local.py @@ -4,7 +4,7 @@ import tkinter as tk -from dailyai.pipeline.frames import TextQueueFrame +from dailyai.pipeline.frames import TextFrame from dailyai.services.fal_ai_services import FalImageGenService from dailyai.services.local_transport_service import LocalTransportService @@ -34,7 +34,7 @@ async def main(): ) image_task = asyncio.create_task( imagegen.run_to_queue( - transport.send_queue, [TextQueueFrame("a cat in the style of picasso")] + transport.send_queue, [TextFrame("a cat in the style of picasso")] ) ) diff --git a/src/examples/foundational/04-utterance-and-speech.py b/src/examples/foundational/04-utterance-and-speech.py index 57ba3dc91..3b67fb149 100644 --- a/src/examples/foundational/04-utterance-and-speech.py +++ b/src/examples/foundational/04-utterance-and-speech.py @@ -6,7 +6,7 @@ from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.pipeline.frames import EndStreamQueueFrame, LLMMessagesQueueFrame +from dailyai.pipeline.frames import EndFrame, LLMMessagesQueueFrame from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from examples.foundational.support.runner import configure @@ -56,7 +56,7 @@ async def buffer_to_send_queue(): frame = await buffer_queue.get() await transport.send_queue.put(frame) buffer_queue.task_done() - if isinstance(frame, EndStreamQueueFrame): + if isinstance(frame, EndFrame): break await asyncio.gather(pipeline_run_task, buffer_to_send_queue()) diff --git a/src/examples/foundational/05-sync-speech-and-image.py b/src/examples/foundational/05-sync-speech-and-image.py index 0ab448275..ee02f6ed3 100644 --- a/src/examples/foundational/05-sync-speech-and-image.py +++ b/src/examples/foundational/05-sync-speech-and-image.py @@ -4,7 +4,7 @@ import os from dailyai.pipeline.aggregators import GatedAggregator, LLMFullResponseAggregator, ParallelPipeline, SentenceAggregator -from dailyai.pipeline.frames import AudioQueueFrame, EndStreamQueueFrame, ImageQueueFrame, LLMMessagesQueueFrame, LLMResponseStartQueueFrame +from dailyai.pipeline.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesQueueFrame, LLMResponseStartFrame from dailyai.pipeline.pipeline import Pipeline from dailyai.services.azure_ai_services import AzureLLMService, AzureImageGenServiceREST, AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -56,11 +56,11 @@ async def main(room_url): ] await source_queue.put(LLMMessagesQueueFrame(messages)) - await source_queue.put(EndStreamQueueFrame()) + await source_queue.put(EndFrame()) gated_aggregator = GatedAggregator( - gate_open_fn=lambda frame: isinstance(frame, ImageQueueFrame), - gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartQueueFrame), + gate_open_fn=lambda frame: isinstance(frame, ImageFrame), + gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartFrame), start_open=False, ) diff --git a/src/examples/foundational/05a-local-sync-speech-and-text.py b/src/examples/foundational/05a-local-sync-speech-and-text.py index 6a61dbb60..bd1e25181 100644 --- a/src/examples/foundational/05a-local-sync-speech-and-text.py +++ b/src/examples/foundational/05a-local-sync-speech-and-text.py @@ -4,7 +4,7 @@ import tkinter as tk import os -from dailyai.pipeline.frames import AudioQueueFrame, ImageQueueFrame +from dailyai.pipeline.frames import AudioFrame, ImageFrame from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService @@ -103,8 +103,8 @@ async def show_images(): if data: await transport.send_queue.put( [ - ImageQueueFrame(data["image_url"], data["image"]), - AudioQueueFrame(data["audio"]), + ImageFrame(data["image_url"], data["image"]), + AudioFrame(data["audio"]), ] ) diff --git a/src/examples/foundational/06-listen-and-respond.py b/src/examples/foundational/06-listen-and-respond.py index c70bc2acb..ce117a9a9 100644 --- a/src/examples/foundational/06-listen-and-respond.py +++ b/src/examples/foundational/06-listen-and-respond.py @@ -55,7 +55,7 @@ async def handle_transcriptions(): tts ], ) - await transport.run_pipeline(pipeline) + await transport.run_uninterruptible_pipeline(pipeline) transport.transcription_settings["extra"]["endpointing"] = True transport.transcription_settings["extra"]["punctuate"] = True diff --git a/src/examples/foundational/06a-image-sync.py b/src/examples/foundational/06a-image-sync.py index a43e72a3e..71ac74310 100644 --- a/src/examples/foundational/06a-image-sync.py +++ b/src/examples/foundational/06a-image-sync.py @@ -8,7 +8,7 @@ import urllib.parse from PIL import Image -from dailyai.pipeline.frames import ImageQueueFrame, QueueFrame +from dailyai.pipeline.frames import ImageFrame, Frame from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService @@ -27,10 +27,10 @@ def __init__(self, speaking_path: str, waiting_path: str): self._waiting_image = Image.open(waiting_path) self._waiting_image_bytes = self._waiting_image.tobytes() - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - yield ImageQueueFrame(None, self._speaking_image_bytes) + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + yield ImageFrame(None, self._speaking_image_bytes) yield frame - yield ImageQueueFrame(None, self._waiting_image_bytes) + yield ImageFrame(None, self._waiting_image_bytes) async def main(room_url: str, token): diff --git a/src/examples/foundational/08-bots-arguing.py b/src/examples/foundational/08-bots-arguing.py index 49e15bd79..fea687bde 100644 --- a/src/examples/foundational/08-bots-arguing.py +++ b/src/examples/foundational/08-bots-arguing.py @@ -6,7 +6,7 @@ from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService -from dailyai.pipeline.frames import AudioQueueFrame, ImageQueueFrame +from dailyai.pipeline.frames import AudioFrame, ImageFrame from examples.foundational.support.runner import configure @@ -90,8 +90,8 @@ async def argue(): ) await transport.send_queue.put( [ - ImageQueueFrame(None, image_data1[1]), - AudioQueueFrame(audio1), + ImageFrame(None, image_data1[1]), + AudioFrame(audio1), ] ) @@ -102,8 +102,8 @@ async def argue(): ) await transport.send_queue.put( [ - ImageQueueFrame(None, image_data2[1]), - AudioQueueFrame(audio2), + ImageFrame(None, image_data2[1]), + AudioFrame(audio2), ] ) diff --git a/src/examples/foundational/10-wake-word.py b/src/examples/foundational/10-wake-word.py index 00331795d..8ace28e48 100644 --- a/src/examples/foundational/10-wake-word.py +++ b/src/examples/foundational/10-wake-word.py @@ -11,10 +11,10 @@ from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.pipeline.aggregators import LLMUserContextAggregator, LLMAssistantContextAggregator from dailyai.pipeline.frames import ( - QueueFrame, - TextQueueFrame, - ImageQueueFrame, - SpriteQueueFrame, + Frame, + TextFrame, + ImageFrame, + SpriteFrame, TranscriptionQueueFrame, ) from dailyai.services.ai_services import AIService @@ -45,11 +45,11 @@ sprites[file] = img.tobytes() # When the bot isn't talking, show a static image of the cat listening -quiet_frame = ImageQueueFrame("", sprites["sc-listen-1.png"]) +quiet_frame = ImageFrame("", sprites["sc-listen-1.png"]) # When the bot is talking, build an animation from two sprites talking_list = [sprites['sc-default.png'], sprites['sc-talk.png']] talking = [random.choice(talking_list) for x in range(30)] -talking_frame = SpriteQueueFrame(images=talking) +talking_frame = SpriteFrame(images=talking) # TODO: Support "thinking" as soon as we get a valid transcript, while LLM is processing thinking_list = [ @@ -57,14 +57,14 @@ sprites['sc-think-2.png'], sprites['sc-think-3.png'], sprites['sc-think-4.png']] -thinking_frame = SpriteQueueFrame(images=thinking_list) +thinking_frame = SpriteFrame(images=thinking_list) class TranscriptFilter(AIService): def __init__(self, bot_participant_id=None): self.bot_participant_id = bot_participant_id - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: if isinstance(frame, TranscriptionQueueFrame): if frame.participantId != self.bot_participant_id: yield frame @@ -75,11 +75,11 @@ def __init__(self, names: list[str]): self.names = names self.sentence = "" - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: content: str = "" # TODO: split up transcription by participant - if isinstance(frame, TextQueueFrame): + if isinstance(frame, TextFrame): content = frame.text self.sentence += content @@ -87,7 +87,7 @@ async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, N if any(name in self.sentence for name in self.names): out = self.sentence self.sentence = "" - yield TextQueueFrame(out) + yield TextFrame(out) else: out = self.sentence self.sentence = "" @@ -97,7 +97,7 @@ class ImageSyncAggregator(AIService): def __init__(self): pass - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: yield talking_frame yield frame yield quiet_frame diff --git a/src/examples/foundational/11-sound-effects.py b/src/examples/foundational/11-sound-effects.py index a75e5acd6..d8e7b27a3 100644 --- a/src/examples/foundational/11-sound-effects.py +++ b/src/examples/foundational/11-sound-effects.py @@ -9,7 +9,7 @@ from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.pipeline.aggregators import LLMContextAggregator, LLMUserContextAggregator, LLMAssistantContextAggregator from dailyai.services.ai_services import AIService, FrameLogger -from dailyai.pipeline.frames import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame +from dailyai.pipeline.frames import Frame, AudioFrame, LLMResponseEndFrame, LLMMessagesQueueFrame from typing import AsyncGenerator from examples.foundational.support.runner import configure @@ -40,9 +40,9 @@ class OutboundSoundEffectWrapper(AIService): def __init__(self): pass - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, LLMResponseEndQueueFrame): - yield AudioQueueFrame(sounds["ding1.wav"]) + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if isinstance(frame, LLMResponseEndFrame): + yield AudioFrame(sounds["ding1.wav"]) # In case anything else up the stack needs it yield frame else: @@ -53,9 +53,9 @@ class InboundSoundEffectWrapper(AIService): def __init__(self): pass - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: if isinstance(frame, LLMMessagesQueueFrame): - yield AudioQueueFrame(sounds["ding2.wav"]) + yield AudioFrame(sounds["ding2.wav"]) # In case anything else up the stack needs it yield frame else: @@ -86,7 +86,7 @@ async def main(room_url: str, token): @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): await tts.say("Hi, I'm listening!", transport.send_queue) - await transport.send_queue.put(AudioQueueFrame(sounds["ding1.wav"])) + await transport.send_queue.put(AudioFrame(sounds["ding1.wav"])) async def handle_transcriptions(): messages = [ diff --git a/src/examples/foundational/13a-whisper-local.py b/src/examples/foundational/13a-whisper-local.py index 4971423c7..a1a9bc66c 100644 --- a/src/examples/foundational/13a-whisper-local.py +++ b/src/examples/foundational/13a-whisper-local.py @@ -1,7 +1,7 @@ import argparse import asyncio import wave -from dailyai.pipeline.frames import EndStreamQueueFrame, TranscriptionQueueFrame +from dailyai.pipeline.frames import EndFrame, TranscriptionQueueFrame from dailyai.services.local_transport_service import LocalTransportService from dailyai.services.whisper_ai_services import WhisperSTTService @@ -30,7 +30,7 @@ async def handle_transcription(): print("got item from queue", item) if isinstance(item, TranscriptionQueueFrame): print(item.text) - elif isinstance(item, EndStreamQueueFrame): + elif isinstance(item, EndFrame): break print("handle_transcription done") @@ -38,7 +38,7 @@ async def handle_speaker(): await stt.run_to_queue( transcription_output_queue, transport.get_receive_frames() ) - await transcription_output_queue.put(EndStreamQueueFrame()) + await transcription_output_queue.put(EndFrame()) print("handle speaker done.") async def run_until_done(): diff --git a/src/examples/image-gen.py b/src/examples/image-gen.py index 4819e3b15..3ef5b5a28 100644 --- a/src/examples/image-gen.py +++ b/src/examples/image-gen.py @@ -7,7 +7,7 @@ from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.pipeline.frames import QueueFrame, FrameType +from dailyai.pipeline.frames import Frame, FrameType from dailyai.services.fal_ai_services import FalImageGenService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -45,7 +45,7 @@ async def handle_transcriptions(): print(f"finder: {finder}") if finder >= 0: async for audio in tts.run_tts(f"Resetting."): - transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio)) + transport.output_queue.put(Frame(FrameType.AUDIO_FRAME, audio)) sentence = "" continue # todo: we could differentiate between transcriptions from different participants @@ -54,12 +54,12 @@ async def handle_transcriptions(): # TODO: Cache this audio phrase = random.choice(["OK.", "Got it.", "Sure.", "You bet.", "Sure thing."]) async for audio in tts.run_tts(phrase): - transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio)) + transport.output_queue.put(Frame(FrameType.AUDIO_FRAME, audio)) img_result = img.run_image_gen(sentence, "1024x1024") awaited_img = await asyncio.gather(img_result) transport.output_queue.put( [ - QueueFrame(FrameType.IMAGE_FRAME, awaited_img[0][1]), + Frame(FrameType.IMAGE_FRAME, awaited_img[0][1]), ] ) @@ -72,7 +72,7 @@ async def on_participant_joined(transport, participant): audio_generator = tts.run_tts( f"Hello, {participant['info']['userName']}! Describe an image and I'll create it. To start over, just say 'start over'.") async for audio in audio_generator: - transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio)) + transport.output_queue.put(Frame(FrameType.AUDIO_FRAME, audio)) transport.transcription_settings["extra"]["punctuate"] = False transport.transcription_settings["extra"]["endpointing"] = False diff --git a/src/examples/internal/11a-dial-out.py b/src/examples/internal/11a-dial-out.py index c169d1f1e..e72b90fcb 100644 --- a/src/examples/internal/11a-dial-out.py +++ b/src/examples/internal/11a-dial-out.py @@ -7,7 +7,7 @@ from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.pipeline.aggregators import LLMContextAggregator from dailyai.services.ai_services import AIService, FrameLogger -from dailyai.pipeline.frames import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame +from dailyai.pipeline.frames import Frame, AudioFrame, LLMResponseEndFrame, LLMMessagesQueueFrame from typing import AsyncGenerator from examples.foundational.support.runner import configure @@ -34,9 +34,9 @@ class OutboundSoundEffectWrapper(AIService): def __init__(self): pass - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, LLMResponseEndQueueFrame): - yield AudioQueueFrame(sounds["ding1.wav"]) + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if isinstance(frame, LLMResponseEndFrame): + yield AudioFrame(sounds["ding1.wav"]) # In case anything else up the stack needs it yield frame else: @@ -47,9 +47,9 @@ class InboundSoundEffectWrapper(AIService): def __init__(self): pass - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: if isinstance(frame, LLMMessagesQueueFrame): - yield AudioQueueFrame(sounds["ding2.wav"]) + yield AudioFrame(sounds["ding2.wav"]) # In case anything else up the stack needs it yield frame else: @@ -79,7 +79,7 @@ async def main(room_url: str, token, phone): @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): await tts.say("Hi, I'm listening!", transport.send_queue) - await transport.send_queue.put(AudioQueueFrame(sounds["ding1.wav"])) + await transport.send_queue.put(AudioFrame(sounds["ding1.wav"])) async def handle_transcriptions(): messages = [