Skip to content

Commit

Permalink
wip proposal: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed Apr 25, 2024
1 parent e60b6a8 commit 2ecc923
Show file tree
Hide file tree
Showing 16 changed files with 2,080 additions and 603 deletions.
21 changes: 21 additions & 0 deletions src/dailyai/pipeline/base_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from threading import Lock

_IDS = {}

_IDS_MUTEX = Lock()


def base_id(obj) -> int:
name = obj.__class__.__name__
with _IDS_MUTEX:
if name not in _IDS:
_IDS[name] = 0
else:
_IDS[name] += 1
return _IDS[name]
54 changes: 54 additions & 0 deletions src/dailyai/pipeline/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from dailyai.pipeline.base_id import base_id


class EventType:
LLM_STARTED_RESPONSE = 'llm/started-response'
LLM_STOPPED_RESPONSE = 'llm/stopped-response'
USER_STARTED_SPEAKING = 'user/started-speaking'
USER_STOPPED_SPEAKING = 'user/stopped-speaking'
USER_IMAGE_REQUEST = 'user/image-request'


class Event:
def __init__(self, type: str):
self.id: int = base_id(self)
self.type: str = type
self.metadata = {}

def __str__(self):
return f"{self.__class__.__name__}#{self.id}"


class LLMStartedResponseEvent(Event):
def __init__(self):
super().__init__(EventType.LLM_STARTED_RESPONSE)


class LLMStoppedResponseEvent(Event):
def __init__(self):
super().__init__(EventType.LLM_STOPPED_RESPONSE)


class UserStartedSpeakingEvent(Event):
def __init__(self):
super().__init__(EventType.USER_STARTED_SPEAKING)


class UserStoppedSpeakingEvent(Event):
def __init__(self):
super().__init__(EventType.USER_STOPPED_SPEAKING)


class UserImageRequest(Event):
def __init__(self, user_id):
super().__init__(EventType.USER_IMAGE_REQUEST)
self.metadata["user_id"] = user_id

def user_id(self):
return self.metadata["user_id"]
94 changes: 74 additions & 20 deletions src/dailyai/pipeline/frame_processor.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,88 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from abc import abstractmethod
from typing import AsyncGenerator
from asyncio import AbstractEventLoop
from typing import List

from dailyai.pipeline.events import Event
from dailyai.pipeline.frames import Frame
from dailyai.pipeline.base_id import base_id
from dailyai.pipeline.state import State

from dailyai.pipeline.frames import ControlFrame, Frame
from loguru import logger


class FrameProcessor:
"""This is the base class for all frame processors. Frame processors consume a frame
and yield 0 or more frames. Generally frame processors are used as part of a pipeline
where frames come from a source queue, are processed by a series of frame processors,
then placed on a sink queue.

By convention, FrameProcessors should immediately yield any frames they don't process.
def __init__(self, **kwargs):
self.id: int = base_id(self)
self._prev: 'FrameProcessor' | None = None
self._next: 'FrameProcessor' | None = None
self._loop: AbstractEventLoop | None = None
self._state: State = State.NULL

Stateful FrameProcessors should watch for the EndFrame and finalize their
output, eg. yielding an unfinished sentence if they're aggregating LLM output to full
sentences. EndFrame is also a chance to clean up any services that need to
be closed, del'd, etc.
"""
@abstractmethod
def input_frames(self) -> List[str]:
pass

@abstractmethod
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
"""Process a single frame and yield 0 or more frames."""
yield frame
def output_frames(self) -> List[str]:
pass

@abstractmethod
async def interrupted(self) -> None:
"""Handle any cleanup if the pipeline was interrupted."""
async def process_event(self, event: Event):
pass

@abstractmethod
async def process_frame(self, frame: Frame):
pass

def link(self, processor: 'FrameProcessor'):
self._next = processor
processor._prev = self
logger.debug(f"linking {self} -> {self._next}")

def event_loop(self) -> AbstractEventLoop:
return self._loop

def set_event_loop(self, loop: AbstractEventLoop):
self._loop = loop

def state(self) -> State:
return self._state

async def set_state(self, state: State):
if self._next:
await self._next.set_state(state)

if state != self._state:
logger.debug(f"changing {self} state from {self._state} to {state}")
self._state = state

async def push_event(self, event: Event):
if self._next:
logger.trace(f"pushing {event} from {self} to {self._next}")
await self._next.process_event(event)
else:
logger.warning(f"can't push event {event}. {self} has no downstream peer")

async def push_upstream_event(self, event: Event):
if self._prev:
logger.trace(f"pushing {event} from {self} to {self._prev}")
await self._prev.process_event(event)
else:
logger.warning(f"can't push event upstream {event}. {self} has no upstream peer")

async def push_frame(self, frame: Frame):
if self._next:
logger.trace(f"pushing {frame} from {self} to {self._next}")
await self._next.process_frame(frame)
else:
logger.warning(f"can't push frame {frame}. {self} has no downstream peer")

def __str__(self):
return self.__class__.__name__
return f"{self.__class__.__name__}#{self.id}"
Loading

0 comments on commit 2ecc923

Please sign in to comment.