Skip to content

Commit

Permalink
Merge pull request #91 from daily-co/pipeline-logging
Browse files Browse the repository at this point in the history
Add logging for pipeline
  • Loading branch information
Moishe authored Mar 28, 2024
2 parents fef1366 + 5c0ba1b commit a2295b6
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 6 deletions.
2 changes: 1 addition & 1 deletion examples/foundational/04-utterance-and-speech.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def main(room_url: str):
simple_tts_pipeline = Pipeline([azure_tts])
await simple_tts_pipeline.queue_frames(
[
TextFrame("My friend the LLM is going to tell a joke about llamas"),
TextFrame("My friend the LLM is going to tell a joke about llamas."),
EndPipeFrame(),
]
)
Expand Down
3 changes: 3 additions & 0 deletions src/dailyai/pipeline/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ async def process_frame(
async def interrupted(self) -> None:
"""Handle any cleanup if the pipeline was interrupted."""
pass

def __str__(self):
return self.__class__.__name__
46 changes: 41 additions & 5 deletions src/dailyai/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import logging
from typing import AsyncGenerator, AsyncIterable, Iterable, List
from dailyai.pipeline.frame_processor import FrameProcessor

from dailyai.pipeline.frames import EndPipeFrame, EndFrame, Frame
from dailyai.pipeline.frames import AudioFrame, EndPipeFrame, EndFrame, Frame


class Pipeline:
Expand All @@ -17,7 +18,8 @@ def __init__(
self,
processors: List[FrameProcessor],
source: asyncio.Queue | None = None,
sink: asyncio.Queue[Frame] | None = None
sink: asyncio.Queue[Frame] | None = None,
name: str | None = None,
):
"""Create a new pipeline. By default we create the sink and source queues
if they're not provided, but these can be overridden to point to other
Expand All @@ -29,6 +31,11 @@ def __init__(
self.source: asyncio.Queue[Frame] = source or asyncio.Queue()
self.sink: asyncio.Queue[Frame] = sink or asyncio.Queue()

self._logger = logging.getLogger("dailyai.pipeline")
self._last_log_line = ""
self._shown_repeated_log = False
self._name = name or str(id(self))

def set_source(self, source: asyncio.Queue[Frame]):
"""Set the source queue for this pipeline. Frames from this queue
will be processed by each frame_processor in the pipeline, or order
Expand Down Expand Up @@ -85,6 +92,7 @@ async def run_pipeline(self):
async for frame in self._run_pipeline_recursively(
initial_frame, self._processors
):
self._log_frame(frame, len(self._processors) + 1)
await self.sink.put(frame)

if isinstance(initial_frame, EndFrame) or isinstance(
Expand All @@ -96,18 +104,46 @@ async def run_pipeline(self):
# here.
for processor in self._processors:
await processor.interrupted()
pass

async def _run_pipeline_recursively(
self, initial_frame: Frame, processors: List[FrameProcessor]
self, initial_frame: Frame, processors: List[FrameProcessor], depth=1
) -> AsyncGenerator[Frame, None]:
"""Internal function to add frames to the pipeline as they're yielded
by each processor."""
if processors:
self._log_frame(initial_frame, depth)
async for frame in processors[0].process_frame(initial_frame):
async for final_frame in self._run_pipeline_recursively(
frame, processors[1:]
frame, processors[1:], depth + 1
):
yield final_frame
else:
yield initial_frame

def _log_frame(self, frame: Frame, depth: int):
"""Log a frame as it moves through the pipeline. This is useful for debugging.
Note that this function inherits the logging level from the "dailyai" logger.
If you want debug output from dailyai in general but not this function (it is
noisy) you can silence this function by doing something like this:
# enable debug logging for the dailyai package.
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
# silence the pipeline logging
logger = logging.getLogger("dailyai.pipeline")
logger.setLevel(logging.WARNING)
"""
source = str(self._processors[depth - 2]) if depth > 1 else "source"
dest = str(self._processors[depth - 1]) if depth < (len(self._processors) + 1) else "sink"
prefix = self._name + " " * depth
logline = prefix + " -> ".join([source, frame.__class__.__name__, dest])
if logline == self._last_log_line:
if self._shown_repeated_log:
return
self._shown_repeated_log = True
self._logger.debug(prefix + "... repeated")
else:
self._shown_repeated_log = False
self._last_log_line = logline
self._logger.debug(logline)
52 changes: 52 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import unittest
from unittest.mock import Mock

from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import EndFrame, TextFrame

from dailyai.pipeline.pipeline import Pipeline
Expand Down Expand Up @@ -57,3 +60,52 @@ async def test_pipeline_multiple_stages(self):
TextFrame(" "),
)
self.assertIsInstance(await outgoing_queue.get(), EndFrame)


class TestLogFrame(unittest.TestCase):
class MockProcessor(FrameProcessor):
def __init__(self, name):
self.name = name

def __str__(self):
return self.name

def setUp(self):
self.processor1 = self.MockProcessor('processor1')
self.processor2 = self.MockProcessor('processor2')
self.pipeline = Pipeline(
processors=[self.processor1, self.processor2])
self.pipeline._name = 'MyClass'
self.pipeline._logger = Mock()

def test_log_frame_from_source(self):
frame = Mock(__class__=Mock(__name__='MyFrame'))
self.pipeline._log_frame(frame, depth=1)
self.pipeline._logger.debug.assert_called_once_with(
'MyClass source -> MyFrame -> processor1')

def test_log_frame_to_sink(self):
frame = Mock(__class__=Mock(__name__='MyFrame'))
self.pipeline._log_frame(frame, depth=3)
self.pipeline._logger.debug.assert_called_once_with(
'MyClass processor2 -> MyFrame -> sink')

def test_log_frame_repeated_log(self):
frame = Mock(__class__=Mock(__name__='MyFrame'))
self.pipeline._log_frame(frame, depth=2)
self.pipeline._logger.debug.assert_called_once_with(
'MyClass processor1 -> MyFrame -> processor2')
self.pipeline._log_frame(frame, depth=2)
self.pipeline._logger.debug.assert_called_with('MyClass ... repeated')

def test_log_frame_reset_repeated_log(self):
frame1 = Mock(__class__=Mock(__name__='MyFrame1'))
frame2 = Mock(__class__=Mock(__name__='MyFrame2'))
self.pipeline._log_frame(frame1, depth=2)
self.pipeline._logger.debug.assert_called_once_with(
'MyClass processor1 -> MyFrame1 -> processor2')
self.pipeline._log_frame(frame1, depth=2)
self.pipeline._logger.debug.assert_called_with('MyClass ... repeated')
self.pipeline._log_frame(frame2, depth=2)
self.pipeline._logger.debug.assert_called_with(
'MyClass processor1 -> MyFrame2 -> processor2')

0 comments on commit a2295b6

Please sign in to comment.