diff --git a/src/dailyai/pipeline/pipeline.py b/src/dailyai/pipeline/pipeline.py index 8e6c13d2b..e755377ef 100644 --- a/src/dailyai/pipeline/pipeline.py +++ b/src/dailyai/pipeline/pipeline.py @@ -48,6 +48,16 @@ async def get_next_source_frame(self) -> AsyncGenerator[Frame, None]: raise ValueError("Source queue not set") yield await self.source.get() + async def run_pipeline_recursively( + self, initial_frame: Frame, processors: List[FrameProcessor] + ) -> AsyncGenerator[Frame, None]: + if processors: + async for frame in processors[0].process_frame(initial_frame): + async for final_frame in self.run_pipeline_recursively(frame, processors[1:]): + yield final_frame + else: + yield initial_frame + async def run_pipeline(self): """ Run the pipeline. Take each frame from the source queue, pass it to the first frame_processor, pass the output of that frame_processor to the @@ -65,23 +75,12 @@ async def run_pipeline(self): try: while True: - frame_generators = [self.get_next_source_frame()] - for processor in self.processors: - next_frame_generators = [] - for frame_generator in frame_generators: - async for frame in frame_generator: - next_frame_generators.append(processor.process_frame(frame)) - frame_generators = next_frame_generators + initial_frame = await self.source.get() + async for frame in self.run_pipeline_recursively(initial_frame, self.processors): + await self.sink.put(frame) - for frame_generator in frame_generators: - async for frame in frame_generator: - await self.sink.put(frame) - if isinstance( - frame, EndFrame - ) or isinstance( - frame, EndPipeFrame - ): - return + if isinstance(initial_frame, EndFrame) or isinstance(initial_frame, EndPipeFrame): + break except asyncio.CancelledError: # this means there's been an interruption, do any cleanup necessary here. for processor in self.processors: diff --git a/src/dailyai/tests/test_pipeline.py b/src/dailyai/tests/test_pipeline.py index b5c9edd05..0d7f43f3b 100644 --- a/src/dailyai/tests/test_pipeline.py +++ b/src/dailyai/tests/test_pipeline.py @@ -1,5 +1,4 @@ import asyncio -from doctest import OutputChecker import unittest from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer from dailyai.pipeline.frames import EndFrame, TextFrame diff --git a/src/examples/foundational/04-utterance-and-speech.py b/src/examples/foundational/04-utterance-and-speech.py index 3b67fb149..242a5c21c 100644 --- a/src/examples/foundational/04-utterance-and-speech.py +++ b/src/examples/foundational/04-utterance-and-speech.py @@ -44,7 +44,8 @@ async def main(room_url: str): buffer_queue = asyncio.Queue() source_queue = asyncio.Queue() pipeline = Pipeline(source = source_queue, sink=buffer_queue, processors=[llm, elevenlabs_tts]) - source_queue.put_nowait(LLMMessagesQueueFrame(messages)) + await source_queue.put(LLMMessagesQueueFrame(messages)) + await source_queue.put(EndFrame()) pipeline_run_task = pipeline.run_pipeline() @transport.event_handler("on_first_other_participant_joined") diff --git a/src/examples/foundational/05-sync-speech-and-image.py b/src/examples/foundational/05-sync-speech-and-image.py index ee02f6ed3..cb72642d1 100644 --- a/src/examples/foundational/05-sync-speech-and-image.py +++ b/src/examples/foundational/05-sync-speech-and-image.py @@ -47,7 +47,20 @@ async def main(room_url): source_queue = asyncio.Queue() - for month in ["January", "February"]: + for month in [ + "January", + "February", + "March", + "April", + "May", + "June", + "July", + "August", + "September", + "October", + "November", + "December", + ]: messages = [ { "role": "system", diff --git a/src/examples/foundational/06-listen-and-respond.py b/src/examples/foundational/06-listen-and-respond.py index ce117a9a9..90fcfb396 100644 --- a/src/examples/foundational/06-listen-and-respond.py +++ b/src/examples/foundational/06-listen-and-respond.py @@ -51,8 +51,8 @@ async def handle_transcriptions(): tma_in, llm, fl2, + tts, tma_out, - tts ], ) await transport.run_uninterruptible_pipeline(pipeline)