diff --git a/CHANGELOG.md b/CHANGELOG.md index fb08aab2e..6c4bf3c92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `AsyncGeneratorProcessor`. This processor can be used together with a + `FrameSerializer` as an async generator. It provides a `generator()` function + that returns an `AsyncGenerator` and that yields serialized frames. + - Added `EndTaskFrame` and `CancelTaskFrame`. These are new frames that are meant to be pushed upstream to tell the pipeline task to stop nicely or immediately respectively. diff --git a/src/pipecat/processors/async_generator.py b/src/pipecat/processors/async_generator.py new file mode 100644 index 000000000..66b2a3e99 --- /dev/null +++ b/src/pipecat/processors/async_generator.py @@ -0,0 +1,42 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio + +from typing import Any, AsyncGenerator + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + Frame, +) +from pipecat.processors.frame_processor import FrameProcessor, FrameDirection +from pipecat.serializers.base_serializer import FrameSerializer + + +class AsyncGeneratorProcessor(FrameProcessor): + def __init__(self, *, serializer: FrameSerializer, **kwargs): + super().__init__(**kwargs) + self._serializer = serializer + self._data_queue = asyncio.Queue() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, (CancelFrame, EndFrame)): + await self._data_queue.put(None) + else: + data = self._serializer.serialize(frame) + if data: + await self._data_queue.put(data) + + async def generator(self) -> AsyncGenerator[Any, None]: + running = True + while running: + data = await self._data_queue.get() + running = data is not None + if data: + yield data