From 5c0ba1b6f09d75b9e80262af659f13d03af3ddd5 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Thu, 28 Mar 2024 08:34:34 -0400 Subject: [PATCH] Fix off by one errors, add tests and comment --- .../foundational/04-utterance-and-speech.py | 2 +- src/dailyai/pipeline/pipeline.py | 45 +++++++++++----- tests/test_pipeline.py | 52 +++++++++++++++++++ 3 files changed, 84 insertions(+), 15 deletions(-) diff --git a/examples/foundational/04-utterance-and-speech.py b/examples/foundational/04-utterance-and-speech.py index 0ce76ff39..2a033942b 100644 --- a/examples/foundational/04-utterance-and-speech.py +++ b/examples/foundational/04-utterance-and-speech.py @@ -65,7 +65,7 @@ async def main(room_url: str): simple_tts_pipeline = Pipeline([azure_tts]) await simple_tts_pipeline.queue_frames( [ - TextFrame("My friend the LLM is going to tell a joke about llamas"), + TextFrame("My friend the LLM is going to tell a joke about llamas."), EndPipeFrame(), ] ) diff --git a/src/dailyai/pipeline/pipeline.py b/src/dailyai/pipeline/pipeline.py index 0f23b4903..bc6c3d0ae 100644 --- a/src/dailyai/pipeline/pipeline.py +++ b/src/dailyai/pipeline/pipeline.py @@ -18,7 +18,8 @@ def __init__( self, processors: List[FrameProcessor], source: asyncio.Queue | None = None, - sink: asyncio.Queue[Frame] | None = None + sink: asyncio.Queue[Frame] | None = None, + name: str | None = None, ): """Create a new pipeline. By default we create the sink and source queues if they're not provided, but these can be overridden to point to other @@ -31,6 +32,9 @@ def __init__( self.sink: asyncio.Queue[Frame] = sink or asyncio.Queue() self._logger = logging.getLogger("dailyai.pipeline") + self._last_log_line = "" + self._shown_repeated_log = False + self._name = name or str(id(self)) def set_source(self, source: asyncio.Queue[Frame]): """Set the source queue for this pipeline. Frames from this queue @@ -85,7 +89,6 @@ async def run_pipeline(self): try: while True: initial_frame = await self.source.get() - self._log_frame(initial_frame, 0) async for frame in self._run_pipeline_recursively( initial_frame, self._processors ): @@ -118,15 +121,29 @@ async def _run_pipeline_recursively( yield initial_frame def _log_frame(self, frame: Frame, depth: int): - if not isinstance(frame, AudioFrame): - if depth == 0: - source = "source" - dest = str(self._processors[depth - 1]) - elif depth == len(self._processors) + 1: - dest = "sink" - source = str(self._processors[depth - 2]) - else: - dest = str(self._processors[depth - 1]) - source = str(self._processors[depth - 2]) - - self._logger.debug(" " * (depth + 1) + " -> ".join([source, str(frame), dest])) + """Log a frame as it moves through the pipeline. This is useful for debugging. + Note that this function inherits the logging level from the "dailyai" logger. + If you want debug output from dailyai in general but not this function (it is + noisy) you can silence this function by doing something like this: + + # enable debug logging for the dailyai package. + logger = logging.getLogger("dailyai") + logger.setLevel(logging.DEBUG) + + # silence the pipeline logging + logger = logging.getLogger("dailyai.pipeline") + logger.setLevel(logging.WARNING) + """ + source = str(self._processors[depth - 2]) if depth > 1 else "source" + dest = str(self._processors[depth - 1]) if depth < (len(self._processors) + 1) else "sink" + prefix = self._name + " " * depth + logline = prefix + " -> ".join([source, frame.__class__.__name__, dest]) + if logline == self._last_log_line: + if self._shown_repeated_log: + return + self._shown_repeated_log = True + self._logger.debug(prefix + "... repeated") + else: + self._shown_repeated_log = False + self._last_log_line = logline + self._logger.debug(logline) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 0d7f43f3b..27cb947e2 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,6 +1,9 @@ import asyncio import unittest +from unittest.mock import Mock + from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer +from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import EndFrame, TextFrame from dailyai.pipeline.pipeline import Pipeline @@ -57,3 +60,52 @@ async def test_pipeline_multiple_stages(self): TextFrame(" "), ) self.assertIsInstance(await outgoing_queue.get(), EndFrame) + + +class TestLogFrame(unittest.TestCase): + class MockProcessor(FrameProcessor): + def __init__(self, name): + self.name = name + + def __str__(self): + return self.name + + def setUp(self): + self.processor1 = self.MockProcessor('processor1') + self.processor2 = self.MockProcessor('processor2') + self.pipeline = Pipeline( + processors=[self.processor1, self.processor2]) + self.pipeline._name = 'MyClass' + self.pipeline._logger = Mock() + + def test_log_frame_from_source(self): + frame = Mock(__class__=Mock(__name__='MyFrame')) + self.pipeline._log_frame(frame, depth=1) + self.pipeline._logger.debug.assert_called_once_with( + 'MyClass source -> MyFrame -> processor1') + + def test_log_frame_to_sink(self): + frame = Mock(__class__=Mock(__name__='MyFrame')) + self.pipeline._log_frame(frame, depth=3) + self.pipeline._logger.debug.assert_called_once_with( + 'MyClass processor2 -> MyFrame -> sink') + + def test_log_frame_repeated_log(self): + frame = Mock(__class__=Mock(__name__='MyFrame')) + self.pipeline._log_frame(frame, depth=2) + self.pipeline._logger.debug.assert_called_once_with( + 'MyClass processor1 -> MyFrame -> processor2') + self.pipeline._log_frame(frame, depth=2) + self.pipeline._logger.debug.assert_called_with('MyClass ... repeated') + + def test_log_frame_reset_repeated_log(self): + frame1 = Mock(__class__=Mock(__name__='MyFrame1')) + frame2 = Mock(__class__=Mock(__name__='MyFrame2')) + self.pipeline._log_frame(frame1, depth=2) + self.pipeline._logger.debug.assert_called_once_with( + 'MyClass processor1 -> MyFrame1 -> processor2') + self.pipeline._log_frame(frame1, depth=2) + self.pipeline._logger.debug.assert_called_with('MyClass ... repeated') + self.pipeline._log_frame(frame2, depth=2) + self.pipeline._logger.debug.assert_called_with( + 'MyClass processor1 -> MyFrame2 -> processor2')