From b8b35db89c8547740b9bfc16e860184175fb9060 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Fri, 15 Mar 2024 11:04:22 -0400 Subject: [PATCH 1/2] Remove run_to_queue and run from AIService class --- pyproject.toml | 2 +- src/dailyai/pipeline/merge_pipeline.py | 21 ++++++ src/dailyai/pipeline/pipeline.py | 74 ++++++++++--------- src/dailyai/services/ai_services.py | 51 +------------ .../services/base_transport_service.py | 28 +++++-- .../services/daily_transport_service.py | 3 +- src/dailyai/tests/test_ai_services.py | 26 +------ src/examples/foundational/01-say-one-thing.py | 21 ++---- .../foundational/02-llm-say-one-thing.py | 16 ++-- src/examples/foundational/03-still-frame.py | 20 ++--- .../foundational/04-utterance-and-speech.py | 58 +++++---------- .../foundational/05-sync-speech-and-image.py | 66 +++++++---------- .../foundational/06-listen-and-respond.py | 52 ++++++------- src/examples/foundational/07-interruptible.py | 2 +- src/examples/starter-apps/storybot.py | 2 +- 15 files changed, 182 insertions(+), 260 deletions(-) create mode 100644 src/dailyai/pipeline/merge_pipeline.py diff --git a/pyproject.toml b/pyproject.toml index c4a6cf81a..00958d83c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "dailyai" -version = "0.0.1" +version = "0.0.3" description = "An open source framework for real-time, multi-modal, conversational AI applications" license = { text = "BSD 2-Clause License" } readme = "README.md" diff --git a/src/dailyai/pipeline/merge_pipeline.py b/src/dailyai/pipeline/merge_pipeline.py new file mode 100644 index 000000000..51178aead --- /dev/null +++ b/src/dailyai/pipeline/merge_pipeline.py @@ -0,0 +1,21 @@ +from typing import List +from dailyai.pipeline.frames import EndFrame, EndPipeFrame +from dailyai.pipeline.pipeline import Pipeline + + +class SequentialMergePipeline(Pipeline): + """This class merges the sink queues from a list of pipelines. Frames from + each pipeline's sink are merged in the order of pipelines in the list.""" + def __init__(self, pipelines:List[Pipeline]): + super().__init__([]) + self.pipelines = pipelines + + async def run_pipeline(self): + for pipeline in self.pipelines: + while True: + frame = await pipeline.sink.get() + if isinstance(frame, EndFrame) or isinstance(frame, EndPipeFrame): + break + await self.sink.put(frame) + + await self.sink.put(EndFrame()) diff --git a/src/dailyai/pipeline/pipeline.py b/src/dailyai/pipeline/pipeline.py index be15330ad..b62458f3a 100644 --- a/src/dailyai/pipeline/pipeline.py +++ b/src/dailyai/pipeline/pipeline.py @@ -1,5 +1,5 @@ import asyncio -from typing import AsyncGenerator, List +from typing import AsyncGenerator, AsyncIterable, Iterable, List from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import EndPipeFrame, EndFrame, Frame @@ -17,17 +17,17 @@ def __init__( self, processors: List[FrameProcessor], source: asyncio.Queue | None = None, - sink: asyncio.Queue[Frame] | None = None, + sink: asyncio.Queue[Frame] | None = None ): - """Create a new pipeline. By default neither the source nor sink - queues are set, so you'll need to pass them to this constructor or - call set_source and set_sink before using the pipeline. Note that - the transport's run_*_pipeline methods will set the source and sink - queues on the pipeline for you. + """Create a new pipeline. By default we create the sink and source queues + if they're not provided, but these can be overridden to point to other + queues. If this pipeline is run by a transport, its sink and source queues + will be overridden. """ - self.processors = processors - self.source: asyncio.Queue[Frame] | None = source - self.sink: asyncio.Queue[Frame] | None = sink + self.processors: List[FrameProcessor] = processors + + self.source: asyncio.Queue[Frame] = source or asyncio.Queue() + self.sink: asyncio.Queue[Frame] = sink or asyncio.Queue() def set_source(self, source: asyncio.Queue[Frame]): """Set the source queue for this pipeline. Frames from this queue @@ -44,21 +44,24 @@ async def get_next_source_frame(self) -> AsyncGenerator[Frame, None]: """Convenience function to get the next frame from the source queue. This lets us consistently have an AsyncGenerator yield frames, from either the source queue or a frame_processor.""" - if self.source is None: - raise ValueError("Source queue not set") + yield await self.source.get() - async def run_pipeline_recursively( - self, initial_frame: Frame, processors: List[FrameProcessor] - ) -> AsyncGenerator[Frame, None]: - if processors: - async for frame in processors[0].process_frame(initial_frame): - async for final_frame in self.run_pipeline_recursively( - frame, processors[1:] - ): - yield final_frame + async def queue_frames( + self, + frames: Iterable[Frame] | AsyncIterable[Frame], + ) -> None: + """Insert frames directly into a pipeline. This is typically used inside a transport + participant_joined callback to prompt a bot to start a conversation, for example.""" + + if isinstance(frames, AsyncIterable): + async for frame in frames: + await self.source.put(frame) + elif isinstance(frames, Iterable): + for frame in frames: + await self.source.put(frame) else: - yield initial_frame + raise Exception("Frames must be an iterable or async iterable") async def run_pipeline(self): """Run the pipeline. Take each frame from the source queue, pass it to @@ -73,13 +76,10 @@ async def run_pipeline(self): if it's not the last frame yielded by the last frame_processor in the pipeline.. """ - if self.source is None or self.sink is None: - raise ValueError("Source or sink queue not set") - try: while True: initial_frame = await self.source.get() - async for frame in self.run_pipeline_recursively( + async for frame in self._run_pipeline_recursively( initial_frame, self.processors ): await self.sink.put(frame) @@ -94,11 +94,17 @@ async def run_pipeline(self): await processor.interrupted() pass - async def queue_frames(self, frames: Frame | List[Frame]): - """Insert frames directly into a pipeline. This is typically used inside a transport - participant_joined callback to prompt a bot to start a conversation, for example. - """ - if not isinstance(frames, List): - frames = [frames] - for f in frames: - await self.source.put(f) + async def _run_pipeline_recursively( + self, initial_frame: Frame, processors: List[FrameProcessor] + ) -> AsyncGenerator[Frame, None]: + """Internal function to add frames to the pipeline as they're yielded + by each processor.""" + if processors: + async for frame in processors[0].process_frame(initial_frame): + async for final_frame in self._run_pipeline_recursively( + frame, processors[1:] + ): + yield final_frame + else: + yield initial_frame + diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 81b43f597..7300a6c1c 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -8,6 +8,7 @@ from dailyai.pipeline.frames import ( AudioFrame, EndFrame, + EndPipeFrame, ImageFrame, LLMMessagesQueueFrame, LLMResponseEndFrame, @@ -20,53 +21,13 @@ ) from abc import abstractmethod -from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable, List +from typing import AsyncGenerator, BinaryIO class AIService(FrameProcessor): - def __init__(self): self.logger = logging.getLogger("dailyai") - def stop(self): - pass - - async def run_to_queue( - self, queue: asyncio.Queue, frames, add_end_of_stream=False - ) -> None: - async for frame in self.run(frames): - await queue.put(frame) - - if add_end_of_stream: - await queue.put(EndFrame()) - - async def run( - self, - frames: Iterable[Frame] | AsyncIterable[Frame] | asyncio.Queue[Frame], - ) -> AsyncGenerator[Frame, None]: - try: - if isinstance(frames, AsyncIterable): - async for frame in frames: - async for output_frame in self.process_frame(frame): - yield output_frame - elif isinstance(frames, Iterable): - for frame in frames: - async for output_frame in self.process_frame(frame): - yield output_frame - elif isinstance(frames, asyncio.Queue): - while True: - frame = await frames.get() - async for output_frame in self.process_frame(frame): - yield output_frame - if isinstance(frame, EndFrame): - break - else: - raise Exception("Frames must be an iterable or async iterable") - except Exception as e: - self.logger.error("Exception occurred while running AI service", e) - raise e - - class LLMService(AIService): """This class is a no-op but serves as a base class for LLM services.""" @@ -92,7 +53,7 @@ async def run_tts(self, text) -> AsyncGenerator[bytes, None]: yield bytes() async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: - if isinstance(frame, EndFrame): + if isinstance(frame, EndFrame) or isinstance(frame, EndPipeFrame): if self.current_sentence: async for audio_chunk in self.run_tts(self.current_sentence): yield AudioFrame(audio_chunk) @@ -118,12 +79,6 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: # note we pass along the text frame *after* the audio, so the text frame is completed after the audio is processed. yield TextFrame(text) - # Convenience function to send the audio for a sentence to the given queue - async def say(self, sentence, queue: asyncio.Queue): - await self.run_to_queue( - queue, [LLMResponseStartFrame(), TextFrame(sentence), LLMResponseEndFrame()] - ) - class ImageGenService(AIService): def __init__(self, image_size, **kwargs): diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/services/base_transport_service.py index 135698ff1..c2440c586 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/services/base_transport_service.py @@ -20,11 +20,12 @@ PipelineStartedFrame, SpriteFrame, StartFrame, - TranscriptionQueueFrame, + TextFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) from dailyai.pipeline.pipeline import Pipeline +from dailyai.services.ai_services import TTSService torch.set_num_threads(1) @@ -125,7 +126,7 @@ def __init__( self._logger: logging.Logger = logging.getLogger() - async def run(self): + async def run(self, pipeline:Pipeline | None=None, override_pipeline_source_queue=True): self._prerun() async_output_queue_marshal_task = asyncio.create_task(self._marshal_frames()) @@ -148,6 +149,13 @@ async def run(self): self._vad_thread = threading.Thread(target=self._vad, daemon=True) self._vad_thread.start() + pipeline_task = None + if pipeline: + pipeline.set_sink(self.send_queue) + if override_pipeline_source_queue: + pipeline.set_source(self.receive_queue) + pipeline_task = asyncio.create_task(pipeline.run_pipeline()) + try: while time.time() < self._expiration and not self._stop_threads.is_set(): await asyncio.sleep(1) @@ -160,9 +168,12 @@ async def run(self): self._stop_threads.set() + if pipeline_task: + pipeline_task.cancel() + await self.send_queue.put(EndFrame()) + await async_output_queue_marshal_task - await self.send_queue.join() self._frame_consumer_thread.join() if self._speaker_enabled: @@ -171,11 +182,6 @@ async def run(self): if self._vad_enabled: self._vad_thread.join() - async def run_uninterruptible_pipeline(self, pipeline: Pipeline): - pipeline.set_sink(self.send_queue) - pipeline.set_source(self.receive_queue) - await pipeline.run_pipeline() - async def run_interruptible_pipeline( self, pipeline: Pipeline, @@ -232,6 +238,11 @@ async def post_process(post_processor: FrameProcessor): await asyncio.gather(pipeline_task, post_process_task) + async def say(self, text:str, tts:TTSService): + """Say a phrase. Use with caution; this bypasses any running pipelines.""" + async for frame in tts.process_frame(TextFrame(text)): + await self.send_queue.put(frame) + def _post_run(self): # Note that this function must be idempotent! It can be called multiple times # if, for example, a keyboard interrupt occurs. @@ -399,6 +410,7 @@ def _frame_consumer(self): for frame in frames: if isinstance(frame, EndFrame): self._logger.info("Stopping frame consumer thread") + self._stop_threads.set() self._threadsafe_send_queue.task_done() if self._loop: asyncio.run_coroutine_threadsafe( diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index d29695ef0..16c8519ba 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -219,7 +219,8 @@ def on_first_other_participant_joined(self): pass def call_joined(self, join_data, client_error): - self._logger.info(f"Call_joined: {join_data}, {client_error}") + #self._logger.info(f"Call_joined: {join_data}, {client_error}") + pass def dialout(self, number): self.client.start_dialout({"phoneNumber": number}) diff --git a/src/dailyai/tests/test_ai_services.py b/src/dailyai/tests/test_ai_services.py index 6e007a85a..bb185c3f2 100644 --- a/src/dailyai/tests/test_ai_services.py +++ b/src/dailyai/tests/test_ai_services.py @@ -12,7 +12,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: class TestBaseAIService(unittest.IsolatedAsyncioTestCase): - async def test_async_input(self): + async def test_simple_processing(self): service = SimpleAIService() input_frames = [ @@ -20,28 +20,10 @@ async def test_async_input(self): EndFrame() ] - async def iterate_frames() -> AsyncGenerator[Frame, None]: - for frame in input_frames: - yield frame - - output_frames = [] - async for frame in service.run(iterate_frames()): - output_frames.append(frame) - - self.assertEqual(input_frames, output_frames) - - async def test_nonasync_input(self): - service = SimpleAIService() - - input_frames = [TextFrame("hello"), EndFrame()] - - def iterate_frames() -> Generator[Frame, None, None]: - for frame in input_frames: - yield frame - output_frames = [] - async for frame in service.run(iterate_frames()): - output_frames.append(frame) + for input_frame in input_frames: + async for output_frame in service.process_frame(input_frame): + output_frames.append(output_frame) self.assertEqual(input_frames, output_frames) diff --git a/src/examples/foundational/01-say-one-thing.py b/src/examples/foundational/01-say-one-thing.py index 45896269c..6a0765095 100644 --- a/src/examples/foundational/01-say-one-thing.py +++ b/src/examples/foundational/01-say-one-thing.py @@ -2,6 +2,8 @@ import aiohttp import logging import os +from dailyai.pipeline.frames import EndFrame, TextFrame +from dailyai.pipeline.pipeline import Pipeline from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -28,19 +30,7 @@ async def main(room_url): voice_id=os.getenv("ELEVENLABS_VOICE_ID"), ) - other_joined_event = asyncio.Event() - participant_name = '' - - async def say_hello(): - nonlocal tts - nonlocal participant_name - - await other_joined_event.wait() - await tts.say( - "Hello there, " + participant_name + "!", - transport.send_queue, - ) - await transport.stop_when_done() + pipeline = Pipeline([tts]) # Register an event handler so we can play the audio when the participant joins. @transport.event_handler("on_participant_joined") @@ -48,11 +38,10 @@ async def on_participant_joined(transport, participant): if participant["info"]["isLocal"]: return - nonlocal participant_name participant_name = participant["info"]["userName"] or '' - other_joined_event.set() + await pipeline.queue_frames([TextFrame("Hello there, " + participant_name + "!"), EndFrame()]) - await asyncio.gather(transport.run(), say_hello()) + await transport.run(pipeline) del tts diff --git a/src/examples/foundational/02-llm-say-one-thing.py b/src/examples/foundational/02-llm-say-one-thing.py index 370ab7d77..0737703e8 100644 --- a/src/examples/foundational/02-llm-say-one-thing.py +++ b/src/examples/foundational/02-llm-say-one-thing.py @@ -4,7 +4,8 @@ import aiohttp -from dailyai.pipeline.frames import LLMMessagesQueueFrame +from dailyai.pipeline.frames import EndFrame, LLMMessagesQueueFrame +from dailyai.pipeline.pipeline import Pipeline from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.open_ai_services import OpenAILLMService @@ -42,20 +43,13 @@ async def main(room_url): } ] - other_joined_event = asyncio.Event() - async def speak_from_llm(): - await other_joined_event.wait() - await tts.run_to_queue( - transport.send_queue, - llm.run([LLMMessagesQueueFrame(messages)]) - ) - await transport.stop_when_done() + pipeline= Pipeline([llm, tts]) @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): - other_joined_event.set() + await pipeline.queue_frames([LLMMessagesQueueFrame(messages), EndFrame()]) - await asyncio.gather(transport.run(), speak_from_llm()) + await transport.run(pipeline) if __name__ == "__main__": diff --git a/src/examples/foundational/03-still-frame.py b/src/examples/foundational/03-still-frame.py index 11274ea66..d5b0badb5 100644 --- a/src/examples/foundational/03-still-frame.py +++ b/src/examples/foundational/03-still-frame.py @@ -3,7 +3,8 @@ import logging import os -from dailyai.pipeline.frames import TextFrame +from dailyai.pipeline.frames import EndFrame, TextFrame +from dailyai.pipeline.pipeline import Pipeline from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.fal_ai_services import FalImageGenService @@ -33,19 +34,18 @@ async def main(room_url): key_secret=os.getenv("FAL_KEY_SECRET"), ) - other_joined_event = asyncio.Event() - - async def show_image(): - await other_joined_event.wait() - await imagegen.run_to_queue( - transport.send_queue, [TextFrame("a cat in the style of picasso")] - ) + pipeline = Pipeline([imagegen]) @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): - other_joined_event.set() + # Note that we do not put an EndFrame() item in the pipeline for this demo. + # This means that the bot will stay in the channel until it times out. + # An EndFrame() in the pipeline would cause the transport to shut down. + await pipeline.queue_frames( + [TextFrame("a cat in the style of picasso")] + ) - await asyncio.gather(transport.run(), show_image()) + await transport.run(pipeline) if __name__ == "__main__": diff --git a/src/examples/foundational/04-utterance-and-speech.py b/src/examples/foundational/04-utterance-and-speech.py index 3bfeb5972..f056005d6 100644 --- a/src/examples/foundational/04-utterance-and-speech.py +++ b/src/examples/foundational/04-utterance-and-speech.py @@ -3,12 +3,13 @@ import os import aiohttp +from dailyai.pipeline.merge_pipeline import SequentialMergePipeline from dailyai.pipeline.pipeline import Pipeline from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.deepgram_ai_services import DeepgramTTSService -from dailyai.pipeline.frames import EndFrame, LLMMessagesQueueFrame +from dailyai.pipeline.frames import EndFrame, EndPipeFrame, LLMMessagesQueueFrame, TextFrame from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from examples.support.runner import configure @@ -53,49 +54,24 @@ async def main(room_url: str): # Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task # will run in parallel with generating and speaking the audio for static text, so there's no delay to # speak the LLM response. - buffer_queue = asyncio.Queue() - source_queue = asyncio.Queue() - pipeline = Pipeline( - source=source_queue, sink=buffer_queue, processors=[llm, elevenlabs_tts] + llm_pipeline = Pipeline([llm, elevenlabs_tts]) + await llm_pipeline.queue_frames([LLMMessagesQueueFrame(messages), EndPipeFrame()]) + + simple_tts_pipeline = Pipeline([azure_tts]) + await simple_tts_pipeline.queue_frames( + [ + TextFrame("My friend the LLM is going to tell a joke about llamas"), + EndPipeFrame(), + ] ) - await source_queue.put(LLMMessagesQueueFrame(messages)) - await source_queue.put(EndFrame()) - pipeline_run_task = pipeline.run_pipeline() - other_participant_joined = asyncio.Event() + merge_pipeline = SequentialMergePipeline([simple_tts_pipeline, llm_pipeline]) - @transport.event_handler("on_first_other_participant_joined") - async def on_first_other_participant_joined(transport): - other_participant_joined.set() - - async def say_something(): - await other_participant_joined.wait() - - await azure_tts.say( - "My friend the LLM is now going to tell a joke about llamas.", - transport.send_queue, - ) - - # khk: deepgram_tts.say() doesn't seem to put bytes in the transport - # queue. I get a debug log line that indicates we're set up okay, but - # no further log lines or audio bytes. debug this later: - # 20 2024-03-10 13:24:46,235 Running deepgram tts for My friend the LLM is now going to tell a joke about llamas. - # await deepgram_tts.say( - # "My friend the LLM is now going to tell a joke about llamas.", - # transport.send_queue, - # ) - - async def buffer_to_send_queue(): - while True: - frame = await buffer_queue.get() - await transport.send_queue.put(frame) - buffer_queue.task_done() - if isinstance(frame, EndFrame): - break - - await asyncio.gather(pipeline_run_task, buffer_to_send_queue()) - - await asyncio.gather(transport.run(), say_something()) + await asyncio.gather( + transport.run(merge_pipeline), + simple_tts_pipeline.run_pipeline(), + llm_pipeline.run_pipeline(), + ) if __name__ == "__main__": diff --git a/src/examples/foundational/05-sync-speech-and-image.py b/src/examples/foundational/05-sync-speech-and-image.py index 2c2dbdee6..85b6311c8 100644 --- a/src/examples/foundational/05-sync-speech-and-image.py +++ b/src/examples/foundational/05-sync-speech-and-image.py @@ -89,33 +89,6 @@ async def main(room_url): key_secret=os.getenv("FAL_KEY_SECRET"), ) - source_queue = asyncio.Queue() - - for month in [ - "January", - "February", - "March", - "April", - "May", - "June", - "July", - "August", - "September", - "October", - "November", - "December", - ]: - messages = [ - { - "role": "system", - "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", - } - ] - await source_queue.put(MonthFrame(month)) - await source_queue.put(LLMMessagesQueueFrame(messages)) - - await source_queue.put(EndFrame()) - gated_aggregator = GatedAggregator( gate_open_fn=lambda frame: isinstance(frame, ImageFrame), gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartFrame), @@ -127,8 +100,6 @@ async def main(room_url): llm_full_response_aggregator = LLMFullResponseAggregator() pipeline = Pipeline( - source=source_queue, - sink=transport.send_queue, processors=[ llm, sentence_aggregator, @@ -138,20 +109,35 @@ async def main(room_url): gated_aggregator, ], ) - pipeline_task = pipeline.run_pipeline() - - other_joined = asyncio.Event() - @transport.event_handler("on_first_other_participant_joined") - async def on_first_other_participant_joined(transport): - other_joined.set() + frames = [] + for month in [ + "January", + "February", + "March", + "April", + "May", + "June", + "July", + "August", + "September", + "October", + "November", + "December", + ]: + messages = [ + { + "role": "system", + "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", + } + ] + frames.append(MonthFrame(month)) + frames.append(LLMMessagesQueueFrame(messages)) - async def show_calendar(): - await other_joined.wait() - await pipeline_task - await transport.stop_when_done() + frames.append(EndFrame()) + await pipeline.queue_frames(frames) - await asyncio.gather(transport.run(), show_calendar()) + await transport.run(pipeline, override_pipeline_source_queue=False) if __name__ == "__main__": diff --git a/src/examples/foundational/06-listen-and-respond.py b/src/examples/foundational/06-listen-and-respond.py index 9c3099712..a5c3ce13c 100644 --- a/src/examples/foundational/06-listen-and-respond.py +++ b/src/examples/foundational/06-listen-and-respond.py @@ -2,6 +2,7 @@ import aiohttp import logging import os +from dailyai.pipeline.frames import LLMMessagesQueueFrame from dailyai.pipeline.pipeline import Pipeline from dailyai.services.daily_transport_service import DailyTransportService @@ -44,38 +45,37 @@ async def main(room_url: str, token): ) fl = FrameLogger("Inner") fl2 = FrameLogger("Outer") + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserContextAggregator(messages, transport._my_participant_id) + tma_out = LLMAssistantContextAggregator( + messages, transport._my_participant_id + ) + pipeline = Pipeline( + processors=[ + fl, + tma_in, + llm, + fl2, + tts, + tma_out, + ], + ) @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): - await tts.say("Hi, I'm listening!", transport.send_queue) - - async def have_conversation(): - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.", - }, - ] - - tma_in = LLMUserContextAggregator(messages, transport._my_participant_id) - tma_out = LLMAssistantContextAggregator( - messages, transport._my_participant_id - ) - pipeline = Pipeline( - processors=[ - fl, - tma_in, - llm, - fl2, - tts, - tma_out, - ], - ) - await transport.run_uninterruptible_pipeline(pipeline) + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await pipeline.queue_frames([LLMMessagesQueueFrame(messages)]) transport.transcription_settings["extra"]["endpointing"] = True transport.transcription_settings["extra"]["punctuate"] = True - await asyncio.gather(transport.run(), have_conversation()) + await transport.run(pipeline) if __name__ == "__main__": diff --git a/src/examples/foundational/07-interruptible.py b/src/examples/foundational/07-interruptible.py index 9277a98d3..7fb3759f9 100644 --- a/src/examples/foundational/07-interruptible.py +++ b/src/examples/foundational/07-interruptible.py @@ -49,7 +49,7 @@ async def main(room_url: str, token): @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): - await tts.say("Hi, I'm listening!", transport.send_queue) + await transport.say("Hi, I'm listening!", tts) async def run_conversation(): messages = [ diff --git a/src/examples/starter-apps/storybot.py b/src/examples/starter-apps/storybot.py index d636d721d..4a7bcedb2 100644 --- a/src/examples/starter-apps/storybot.py +++ b/src/examples/starter-apps/storybot.py @@ -273,7 +273,7 @@ async def storytime(): lra, ] ) - await transport.run_uninterruptible_pipeline( + await transport.run_pipeline( pipeline, ) From 18bf26de14e1ef57608b51e49fc449d124859ac3 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Fri, 15 Mar 2024 13:39:33 -0400 Subject: [PATCH 2/2] Update apps --- src/dailyai/services/azure_ai_services.py | 12 ++++--- .../services/base_transport_service.py | 13 +++++--- .../services/openai_api_llm_service.py | 3 ++ src/examples/starter-apps/chatbot.py | 7 +--- src/examples/starter-apps/storybot.py | 33 ++++++++++--------- 5 files changed, 38 insertions(+), 30 deletions(-) diff --git a/src/dailyai/services/azure_ai_services.py b/src/dailyai/services/azure_ai_services.py index d557579ce..9ffa52a46 100644 --- a/src/dailyai/services/azure_ai_services.py +++ b/src/dailyai/services/azure_ai_services.py @@ -64,13 +64,17 @@ async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]: class AzureLLMService(BaseOpenAILLMService): def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model): - super().__init__(model) + self._endpoint = endpoint + self._api_version = api_version + + super().__init__(api_key=api_key, model=model) + self._model: str = model - # This overrides the client created by the super class init + def create_client(self, api_key=None, base_url=None): self._client = AsyncAzureOpenAI( api_key=api_key, - azure_endpoint=endpoint, - api_version=api_version, + azure_endpoint=self._endpoint, + api_version=self._api_version, ) diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/services/base_transport_service.py index c2440c586..a24a5dc48 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/services/base_transport_service.py @@ -151,10 +151,9 @@ async def run(self, pipeline:Pipeline | None=None, override_pipeline_source_queu pipeline_task = None if pipeline: - pipeline.set_sink(self.send_queue) - if override_pipeline_source_queue: - pipeline.set_source(self.receive_queue) - pipeline_task = asyncio.create_task(pipeline.run_pipeline()) + pipeline_task = asyncio.create_task( + self.run_pipeline(pipeline, override_pipeline_source_queue) + ) try: while time.time() < self._expiration and not self._stop_threads.is_set(): @@ -182,6 +181,12 @@ async def run(self, pipeline:Pipeline | None=None, override_pipeline_source_queu if self._vad_enabled: self._vad_thread.join() + async def run_pipeline(self, pipeline:Pipeline, override_pipeline_source_queue=True): + pipeline.set_sink(self.send_queue) + if override_pipeline_source_queue: + pipeline.set_source(self.receive_queue) + await pipeline.run_pipeline() + async def run_interruptible_pipeline( self, pipeline: Pipeline, diff --git a/src/dailyai/services/openai_api_llm_service.py b/src/dailyai/services/openai_api_llm_service.py index 02331947f..681582c15 100644 --- a/src/dailyai/services/openai_api_llm_service.py +++ b/src/dailyai/services/openai_api_llm_service.py @@ -35,6 +35,9 @@ class BaseOpenAILLMService(LLMService): def __init__(self, model: str, api_key=None, base_url=None): super().__init__() self._model: str = model + self.create_client(api_key=api_key, base_url=base_url) + + def create_client(self, api_key=None, base_url=None): self._client = AsyncOpenAI(api_key=api_key, base_url=base_url) async def _stream_chat_completions( diff --git a/src/examples/starter-apps/chatbot.py b/src/examples/starter-apps/chatbot.py index 60dc1e6be..a78b821ce 100644 --- a/src/examples/starter-apps/chatbot.py +++ b/src/examples/starter-apps/chatbot.py @@ -6,9 +6,7 @@ from typing import AsyncGenerator from dailyai.pipeline.aggregators import ( - LLMAssistantContextAggregator, LLMResponseAggregator, - LLMUserContextAggregator, UserResponseAggregator, ) from dailyai.pipeline.frames import ( @@ -16,15 +14,12 @@ SpriteFrame, Frame, LLMResponseEndFrame, - LLMResponseStartFrame, LLMMessagesQueueFrame, - UserStartedSpeakingFrame, AudioFrame, PipelineStartedFrame, ) from dailyai.services.ai_services import AIService from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.ai_services import FrameLogger from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -130,7 +125,7 @@ async def main(room_url: str, token): @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): print(f"!!! in here, pipeline.source is {pipeline.source}") - await pipeline.queue_frames(LLMMessagesQueueFrame(messages)) + await pipeline.queue_frames([LLMMessagesQueueFrame(messages)]) async def run_conversation(): diff --git a/src/examples/starter-apps/storybot.py b/src/examples/starter-apps/storybot.py index 4a7bcedb2..f9fe3797d 100644 --- a/src/examples/starter-apps/storybot.py +++ b/src/examples/starter-apps/storybot.py @@ -27,6 +27,7 @@ ) from examples.support.runner import configure from dailyai.pipeline.frames import ( + EndPipeFrame, LLMMessagesQueueFrame, TranscriptionQueueFrame, Frame, @@ -187,10 +188,6 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - global transport - global llm - global tts - messages = [ { "role": "system", @@ -235,8 +232,15 @@ async def main(room_url: str, token): vad_stop_s=1.5, ) + start_story_event = asyncio.Event() + @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): + start_story_event.set() + + async def storytime(): + await start_story_event.wait() + # We're being a bit tricky here by using a special system prompt to # ask the user for a story topic. After their intial response, we'll # use a different system prompt to create story pages. @@ -247,20 +251,17 @@ async def on_first_other_participant_joined(transport): } ] lca = LLMAssistantContextAggregator(messages) - await tts.run_to_queue( - transport.send_queue, - lca.run( - llm.run( - [ - ImageFrame(None, images["grandma-listening.png"]), - LLMMessagesQueueFrame(intro_messages), - AudioFrame(sounds["listening.wav"]), - ] - ), - ), + local_pipeline = Pipeline([llm, lca, tts], sink=transport.send_queue) + await local_pipeline.queue_frames( + [ + ImageFrame(None, images["grandma-listening.png"]), + LLMMessagesQueueFrame(intro_messages), + AudioFrame(sounds["listening.wav"]), + EndPipeFrame(), + ] ) + await local_pipeline.run_pipeline() - async def storytime(): fl = FrameLogger("### After Image Generation") pipeline = Pipeline( processors=[