Skip to content

Commit

Permalink
Merge pull request #506 from pipecat-ai/aleix/async-generator-processor
Browse files Browse the repository at this point in the history
processors: add AsyncGeneratorProcessor
  • Loading branch information
aconchillo authored Sep 25, 2024
2 parents cf0ab85 + b871366 commit e123f33
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `AsyncGeneratorProcessor`. This processor can be used together with a
`FrameSerializer` as an async generator. It provides a `generator()` function
that returns an `AsyncGenerator` and that yields serialized frames.

- Added `EndTaskFrame` and `CancelTaskFrame`. These are new frames that are
meant to be pushed upstream to tell the pipeline task to stop nicely or
immediately respectively.
Expand Down
42 changes: 42 additions & 0 deletions src/pipecat/processors/async_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio

from typing import Any, AsyncGenerator

from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
)
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.serializers.base_serializer import FrameSerializer


class AsyncGeneratorProcessor(FrameProcessor):
def __init__(self, *, serializer: FrameSerializer, **kwargs):
super().__init__(**kwargs)
self._serializer = serializer
self._data_queue = asyncio.Queue()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, (CancelFrame, EndFrame)):
await self._data_queue.put(None)
else:
data = self._serializer.serialize(frame)
if data:
await self._data_queue.put(data)

async def generator(self) -> AsyncGenerator[Any, None]:
running = True
while running:
data = await self._data_queue.get()
running = data is not None
if data:
yield data

0 comments on commit e123f33

Please sign in to comment.