Skip to content

Commit

Permalink
Merge pull request #509 from pipecat-ai/aleix/frameprocessor-event-ha…
Browse files Browse the repository at this point in the history
…ndlers

frame processor event handlers
  • Loading branch information
aconchillo authored Sep 26, 2024
2 parents 8f2941c + f06aa30 commit d11daee
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- All `FrameProcessors` can now register event handlers.

```
tts = SomeTTSService(...)
@tts.event_handler("on_connected"):
async def on_connected(processor):
...
```

- Added `AsyncGeneratorProcessor`. This processor can be used together with a
`FrameSerializer` as an async generator. It provides a `generator()` function
that returns an `AsyncGenerator` and that yields serialized frames.
Expand Down
30 changes: 30 additions & 0 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#

import asyncio
import inspect

from enum import Enum

Expand Down Expand Up @@ -48,6 +49,8 @@ def __init__(
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
self._sync = sync

self._event_handlers: dict = {}

# Clock
self._clock: BaseClock | None = None

Expand Down Expand Up @@ -169,6 +172,23 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect
else:
await self.__push_queue.put((frame, direction))

def event_handler(self, event_name: str):
def decorator(handler):
self.add_event_handler(event_name, handler)
return handler

return decorator

def add_event_handler(self, event_name: str, handler):
if event_name not in self._event_handlers:
raise Exception(f"Event handler {event_name} not registered")
self._event_handlers[event_name].append(handler)

def _register_event_handler(self, event_name: str):
if event_name in self._event_handlers:
raise Exception(f"Event handler {event_name} already registered")
self._event_handlers[event_name] = []

#
# Handle interruptions
#
Expand Down Expand Up @@ -212,5 +232,15 @@ async def __push_frame_task_handler(self):
except asyncio.CancelledError:
break

async def _call_event_handler(self, event_name: str, *args, **kwargs):
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}")

def __str__(self):
return self.name
5 changes: 4 additions & 1 deletion src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ def __init__(
self._message_task = self.get_event_loop().create_task(self._message_task_handler())
self._message_queue = asyncio.Queue()

self._register_event_handler("on_bot_ready")

def register_action(self, action: RTVIAction):
id = self._action_id(action.service, action.action)
self._registered_actions[id] = action
Expand Down Expand Up @@ -624,8 +626,9 @@ async def _handle_action(self, request_id: str | None, data: RTVIActionRun):

async def _maybe_send_bot_ready(self):
if self._pipeline_started and self._client_ready:
await self._send_bot_ready()
await self._update_config(self._config, False)
await self._send_bot_ready()
await self._call_event_handler("on_bot_ready")

async def _send_bot_ready(self):
if not self._params.send_bot_ready:
Expand Down

0 comments on commit d11daee

Please sign in to comment.