From 05c77bce25751c469be76ac0acbb397237b647eb Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Wed, 27 Mar 2024 18:48:30 -0400 Subject: [PATCH 1/2] Add logging for pipeline --- src/dailyai/pipeline/frame_processor.py | 3 +++ src/dailyai/pipeline/pipeline.py | 27 +++++++++++++++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/dailyai/pipeline/frame_processor.py b/src/dailyai/pipeline/frame_processor.py index 3f42b6987..d8550c237 100644 --- a/src/dailyai/pipeline/frame_processor.py +++ b/src/dailyai/pipeline/frame_processor.py @@ -29,3 +29,6 @@ async def process_frame( async def interrupted(self) -> None: """Handle any cleanup if the pipeline was interrupted.""" pass + + def __str__(self): + return self.__class__.__name__ diff --git a/src/dailyai/pipeline/pipeline.py b/src/dailyai/pipeline/pipeline.py index fbb4488db..0f23b4903 100644 --- a/src/dailyai/pipeline/pipeline.py +++ b/src/dailyai/pipeline/pipeline.py @@ -1,8 +1,9 @@ import asyncio +import logging from typing import AsyncGenerator, AsyncIterable, Iterable, List from dailyai.pipeline.frame_processor import FrameProcessor -from dailyai.pipeline.frames import EndPipeFrame, EndFrame, Frame +from dailyai.pipeline.frames import AudioFrame, EndPipeFrame, EndFrame, Frame class Pipeline: @@ -29,6 +30,8 @@ def __init__( self.source: asyncio.Queue[Frame] = source or asyncio.Queue() self.sink: asyncio.Queue[Frame] = sink or asyncio.Queue() + self._logger = logging.getLogger("dailyai.pipeline") + def set_source(self, source: asyncio.Queue[Frame]): """Set the source queue for this pipeline. Frames from this queue will be processed by each frame_processor in the pipeline, or order @@ -82,9 +85,11 @@ async def run_pipeline(self): try: while True: initial_frame = await self.source.get() + self._log_frame(initial_frame, 0) async for frame in self._run_pipeline_recursively( initial_frame, self._processors ): + self._log_frame(frame, len(self._processors) + 1) await self.sink.put(frame) if isinstance(initial_frame, EndFrame) or isinstance( @@ -96,18 +101,32 @@ async def run_pipeline(self): # here. for processor in self._processors: await processor.interrupted() - pass async def _run_pipeline_recursively( - self, initial_frame: Frame, processors: List[FrameProcessor] + self, initial_frame: Frame, processors: List[FrameProcessor], depth=1 ) -> AsyncGenerator[Frame, None]: """Internal function to add frames to the pipeline as they're yielded by each processor.""" if processors: + self._log_frame(initial_frame, depth) async for frame in processors[0].process_frame(initial_frame): async for final_frame in self._run_pipeline_recursively( - frame, processors[1:] + frame, processors[1:], depth + 1 ): yield final_frame else: yield initial_frame + + def _log_frame(self, frame: Frame, depth: int): + if not isinstance(frame, AudioFrame): + if depth == 0: + source = "source" + dest = str(self._processors[depth - 1]) + elif depth == len(self._processors) + 1: + dest = "sink" + source = str(self._processors[depth - 2]) + else: + dest = str(self._processors[depth - 1]) + source = str(self._processors[depth - 2]) + + self._logger.debug(" " * (depth + 1) + " -> ".join([source, str(frame), dest])) From 5c0ba1b6f09d75b9e80262af659f13d03af3ddd5 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Thu, 28 Mar 2024 08:34:34 -0400 Subject: [PATCH 2/2] Fix off by one errors, add tests and comment --- .../foundational/04-utterance-and-speech.py | 2 +- src/dailyai/pipeline/pipeline.py | 45 +++++++++++----- tests/test_pipeline.py | 52 +++++++++++++++++++ 3 files changed, 84 insertions(+), 15 deletions(-) diff --git a/examples/foundational/04-utterance-and-speech.py b/examples/foundational/04-utterance-and-speech.py index 0ce76ff39..2a033942b 100644 --- a/examples/foundational/04-utterance-and-speech.py +++ b/examples/foundational/04-utterance-and-speech.py @@ -65,7 +65,7 @@ async def main(room_url: str): simple_tts_pipeline = Pipeline([azure_tts]) await simple_tts_pipeline.queue_frames( [ - TextFrame("My friend the LLM is going to tell a joke about llamas"), + TextFrame("My friend the LLM is going to tell a joke about llamas."), EndPipeFrame(), ] ) diff --git a/src/dailyai/pipeline/pipeline.py b/src/dailyai/pipeline/pipeline.py index 0f23b4903..bc6c3d0ae 100644 --- a/src/dailyai/pipeline/pipeline.py +++ b/src/dailyai/pipeline/pipeline.py @@ -18,7 +18,8 @@ def __init__( self, processors: List[FrameProcessor], source: asyncio.Queue | None = None, - sink: asyncio.Queue[Frame] | None = None + sink: asyncio.Queue[Frame] | None = None, + name: str | None = None, ): """Create a new pipeline. By default we create the sink and source queues if they're not provided, but these can be overridden to point to other @@ -31,6 +32,9 @@ def __init__( self.sink: asyncio.Queue[Frame] = sink or asyncio.Queue() self._logger = logging.getLogger("dailyai.pipeline") + self._last_log_line = "" + self._shown_repeated_log = False + self._name = name or str(id(self)) def set_source(self, source: asyncio.Queue[Frame]): """Set the source queue for this pipeline. Frames from this queue @@ -85,7 +89,6 @@ async def run_pipeline(self): try: while True: initial_frame = await self.source.get() - self._log_frame(initial_frame, 0) async for frame in self._run_pipeline_recursively( initial_frame, self._processors ): @@ -118,15 +121,29 @@ async def _run_pipeline_recursively( yield initial_frame def _log_frame(self, frame: Frame, depth: int): - if not isinstance(frame, AudioFrame): - if depth == 0: - source = "source" - dest = str(self._processors[depth - 1]) - elif depth == len(self._processors) + 1: - dest = "sink" - source = str(self._processors[depth - 2]) - else: - dest = str(self._processors[depth - 1]) - source = str(self._processors[depth - 2]) - - self._logger.debug(" " * (depth + 1) + " -> ".join([source, str(frame), dest])) + """Log a frame as it moves through the pipeline. This is useful for debugging. + Note that this function inherits the logging level from the "dailyai" logger. + If you want debug output from dailyai in general but not this function (it is + noisy) you can silence this function by doing something like this: + + # enable debug logging for the dailyai package. + logger = logging.getLogger("dailyai") + logger.setLevel(logging.DEBUG) + + # silence the pipeline logging + logger = logging.getLogger("dailyai.pipeline") + logger.setLevel(logging.WARNING) + """ + source = str(self._processors[depth - 2]) if depth > 1 else "source" + dest = str(self._processors[depth - 1]) if depth < (len(self._processors) + 1) else "sink" + prefix = self._name + " " * depth + logline = prefix + " -> ".join([source, frame.__class__.__name__, dest]) + if logline == self._last_log_line: + if self._shown_repeated_log: + return + self._shown_repeated_log = True + self._logger.debug(prefix + "... repeated") + else: + self._shown_repeated_log = False + self._last_log_line = logline + self._logger.debug(logline) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 0d7f43f3b..27cb947e2 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,6 +1,9 @@ import asyncio import unittest +from unittest.mock import Mock + from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer +from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import EndFrame, TextFrame from dailyai.pipeline.pipeline import Pipeline @@ -57,3 +60,52 @@ async def test_pipeline_multiple_stages(self): TextFrame(" "), ) self.assertIsInstance(await outgoing_queue.get(), EndFrame) + + +class TestLogFrame(unittest.TestCase): + class MockProcessor(FrameProcessor): + def __init__(self, name): + self.name = name + + def __str__(self): + return self.name + + def setUp(self): + self.processor1 = self.MockProcessor('processor1') + self.processor2 = self.MockProcessor('processor2') + self.pipeline = Pipeline( + processors=[self.processor1, self.processor2]) + self.pipeline._name = 'MyClass' + self.pipeline._logger = Mock() + + def test_log_frame_from_source(self): + frame = Mock(__class__=Mock(__name__='MyFrame')) + self.pipeline._log_frame(frame, depth=1) + self.pipeline._logger.debug.assert_called_once_with( + 'MyClass source -> MyFrame -> processor1') + + def test_log_frame_to_sink(self): + frame = Mock(__class__=Mock(__name__='MyFrame')) + self.pipeline._log_frame(frame, depth=3) + self.pipeline._logger.debug.assert_called_once_with( + 'MyClass processor2 -> MyFrame -> sink') + + def test_log_frame_repeated_log(self): + frame = Mock(__class__=Mock(__name__='MyFrame')) + self.pipeline._log_frame(frame, depth=2) + self.pipeline._logger.debug.assert_called_once_with( + 'MyClass processor1 -> MyFrame -> processor2') + self.pipeline._log_frame(frame, depth=2) + self.pipeline._logger.debug.assert_called_with('MyClass ... repeated') + + def test_log_frame_reset_repeated_log(self): + frame1 = Mock(__class__=Mock(__name__='MyFrame1')) + frame2 = Mock(__class__=Mock(__name__='MyFrame2')) + self.pipeline._log_frame(frame1, depth=2) + self.pipeline._logger.debug.assert_called_once_with( + 'MyClass processor1 -> MyFrame1 -> processor2') + self.pipeline._log_frame(frame1, depth=2) + self.pipeline._logger.debug.assert_called_with('MyClass ... repeated') + self.pipeline._log_frame(frame2, depth=2) + self.pipeline._logger.debug.assert_called_with( + 'MyClass processor1 -> MyFrame2 -> processor2')