Skip to content

Commit

Permalink
Fix off by one errors, add tests and comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Moishe committed Mar 28, 2024
1 parent 05c77bc commit 5c0ba1b
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 15 deletions.
2 changes: 1 addition & 1 deletion examples/foundational/04-utterance-and-speech.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def main(room_url: str):
simple_tts_pipeline = Pipeline([azure_tts])
await simple_tts_pipeline.queue_frames(
[
TextFrame("My friend the LLM is going to tell a joke about llamas"),
TextFrame("My friend the LLM is going to tell a joke about llamas."),
EndPipeFrame(),
]
)
Expand Down
45 changes: 31 additions & 14 deletions src/dailyai/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def __init__(
self,
processors: List[FrameProcessor],
source: asyncio.Queue | None = None,
sink: asyncio.Queue[Frame] | None = None
sink: asyncio.Queue[Frame] | None = None,
name: str | None = None,
):
"""Create a new pipeline. By default we create the sink and source queues
if they're not provided, but these can be overridden to point to other
Expand All @@ -31,6 +32,9 @@ def __init__(
self.sink: asyncio.Queue[Frame] = sink or asyncio.Queue()

self._logger = logging.getLogger("dailyai.pipeline")
self._last_log_line = ""
self._shown_repeated_log = False
self._name = name or str(id(self))

def set_source(self, source: asyncio.Queue[Frame]):
"""Set the source queue for this pipeline. Frames from this queue
Expand Down Expand Up @@ -85,7 +89,6 @@ async def run_pipeline(self):
try:
while True:
initial_frame = await self.source.get()
self._log_frame(initial_frame, 0)
async for frame in self._run_pipeline_recursively(
initial_frame, self._processors
):
Expand Down Expand Up @@ -118,15 +121,29 @@ async def _run_pipeline_recursively(
yield initial_frame

def _log_frame(self, frame: Frame, depth: int):
if not isinstance(frame, AudioFrame):
if depth == 0:
source = "source"
dest = str(self._processors[depth - 1])
elif depth == len(self._processors) + 1:
dest = "sink"
source = str(self._processors[depth - 2])
else:
dest = str(self._processors[depth - 1])
source = str(self._processors[depth - 2])

self._logger.debug(" " * (depth + 1) + " -> ".join([source, str(frame), dest]))
"""Log a frame as it moves through the pipeline. This is useful for debugging.
Note that this function inherits the logging level from the "dailyai" logger.
If you want debug output from dailyai in general but not this function (it is
noisy) you can silence this function by doing something like this:
# enable debug logging for the dailyai package.
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
# silence the pipeline logging
logger = logging.getLogger("dailyai.pipeline")
logger.setLevel(logging.WARNING)
"""
source = str(self._processors[depth - 2]) if depth > 1 else "source"
dest = str(self._processors[depth - 1]) if depth < (len(self._processors) + 1) else "sink"
prefix = self._name + " " * depth
logline = prefix + " -> ".join([source, frame.__class__.__name__, dest])
if logline == self._last_log_line:
if self._shown_repeated_log:
return
self._shown_repeated_log = True
self._logger.debug(prefix + "... repeated")
else:
self._shown_repeated_log = False
self._last_log_line = logline
self._logger.debug(logline)
52 changes: 52 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import unittest
from unittest.mock import Mock

from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import EndFrame, TextFrame

from dailyai.pipeline.pipeline import Pipeline
Expand Down Expand Up @@ -57,3 +60,52 @@ async def test_pipeline_multiple_stages(self):
TextFrame(" "),
)
self.assertIsInstance(await outgoing_queue.get(), EndFrame)


class TestLogFrame(unittest.TestCase):
class MockProcessor(FrameProcessor):
def __init__(self, name):
self.name = name

def __str__(self):
return self.name

def setUp(self):
self.processor1 = self.MockProcessor('processor1')
self.processor2 = self.MockProcessor('processor2')
self.pipeline = Pipeline(
processors=[self.processor1, self.processor2])
self.pipeline._name = 'MyClass'
self.pipeline._logger = Mock()

def test_log_frame_from_source(self):
frame = Mock(__class__=Mock(__name__='MyFrame'))
self.pipeline._log_frame(frame, depth=1)
self.pipeline._logger.debug.assert_called_once_with(
'MyClass source -> MyFrame -> processor1')

def test_log_frame_to_sink(self):
frame = Mock(__class__=Mock(__name__='MyFrame'))
self.pipeline._log_frame(frame, depth=3)
self.pipeline._logger.debug.assert_called_once_with(
'MyClass processor2 -> MyFrame -> sink')

def test_log_frame_repeated_log(self):
frame = Mock(__class__=Mock(__name__='MyFrame'))
self.pipeline._log_frame(frame, depth=2)
self.pipeline._logger.debug.assert_called_once_with(
'MyClass processor1 -> MyFrame -> processor2')
self.pipeline._log_frame(frame, depth=2)
self.pipeline._logger.debug.assert_called_with('MyClass ... repeated')

def test_log_frame_reset_repeated_log(self):
frame1 = Mock(__class__=Mock(__name__='MyFrame1'))
frame2 = Mock(__class__=Mock(__name__='MyFrame2'))
self.pipeline._log_frame(frame1, depth=2)
self.pipeline._logger.debug.assert_called_once_with(
'MyClass processor1 -> MyFrame1 -> processor2')
self.pipeline._log_frame(frame1, depth=2)
self.pipeline._logger.debug.assert_called_with('MyClass ... repeated')
self.pipeline._log_frame(frame2, depth=2)
self.pipeline._logger.debug.assert_called_with(
'MyClass processor1 -> MyFrame2 -> processor2')

0 comments on commit 5c0ba1b

Please sign in to comment.