Skip to content

Commit

Permalink
Merge pull request #41 from daily-co/optimize-pipeline
Browse files Browse the repository at this point in the history
Optimize pipeline processing
  • Loading branch information
Moishe authored Mar 8, 2024
2 parents 3c5f480 + 196279e commit 192b46b
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 20 deletions.
31 changes: 15 additions & 16 deletions src/dailyai/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ async def get_next_source_frame(self) -> AsyncGenerator[Frame, None]:
raise ValueError("Source queue not set")
yield await self.source.get()

async def run_pipeline_recursively(
self, initial_frame: Frame, processors: List[FrameProcessor]
) -> AsyncGenerator[Frame, None]:
if processors:
async for frame in processors[0].process_frame(initial_frame):
async for final_frame in self.run_pipeline_recursively(frame, processors[1:]):
yield final_frame
else:
yield initial_frame

async def run_pipeline(self):
""" Run the pipeline. Take each frame from the source queue, pass it to
the first frame_processor, pass the output of that frame_processor to the
Expand All @@ -65,23 +75,12 @@ async def run_pipeline(self):

try:
while True:
frame_generators = [self.get_next_source_frame()]
for processor in self.processors:
next_frame_generators = []
for frame_generator in frame_generators:
async for frame in frame_generator:
next_frame_generators.append(processor.process_frame(frame))
frame_generators = next_frame_generators
initial_frame = await self.source.get()
async for frame in self.run_pipeline_recursively(initial_frame, self.processors):
await self.sink.put(frame)

for frame_generator in frame_generators:
async for frame in frame_generator:
await self.sink.put(frame)
if isinstance(
frame, EndFrame
) or isinstance(
frame, EndPipeFrame
):
return
if isinstance(initial_frame, EndFrame) or isinstance(initial_frame, EndPipeFrame):
break
except asyncio.CancelledError:
# this means there's been an interruption, do any cleanup necessary here.
for processor in self.processors:
Expand Down
1 change: 0 additions & 1 deletion src/dailyai/tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
from doctest import OutputChecker
import unittest
from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer
from dailyai.pipeline.frames import EndFrame, TextFrame
Expand Down
3 changes: 2 additions & 1 deletion src/examples/foundational/04-utterance-and-speech.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ async def main(room_url: str):
buffer_queue = asyncio.Queue()
source_queue = asyncio.Queue()
pipeline = Pipeline(source = source_queue, sink=buffer_queue, processors=[llm, elevenlabs_tts])
source_queue.put_nowait(LLMMessagesQueueFrame(messages))
await source_queue.put(LLMMessagesQueueFrame(messages))
await source_queue.put(EndFrame())
pipeline_run_task = pipeline.run_pipeline()

@transport.event_handler("on_first_other_participant_joined")
Expand Down
15 changes: 14 additions & 1 deletion src/examples/foundational/05-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,20 @@ async def main(room_url):

source_queue = asyncio.Queue()

for month in ["January", "February"]:
for month in [
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]:
messages = [
{
"role": "system",
Expand Down
2 changes: 1 addition & 1 deletion src/examples/foundational/06-listen-and-respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ async def handle_transcriptions():
tma_in,
llm,
fl2,
tts,
tma_out,
tts
],
)
await transport.run_uninterruptible_pipeline(pipeline)
Expand Down

0 comments on commit 192b46b

Please sign in to comment.