Skip to content

Commit

Permalink
wip proposal: add VAD analyzers support
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed Apr 25, 2024
1 parent 32a7cfd commit 2946c35
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 277 deletions.
12 changes: 7 additions & 5 deletions examples/foundational/new-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dailyai.processors.demuxer import Demuxer
from dailyai.processors.llm_response_aggregator import LLMUserResponseAggregator

from dailyai.vad.silero_vad import SileroVADAnalyzer
from runner import configure

from loguru import logger
Expand All @@ -17,19 +18,20 @@
load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
logger.add(sys.stderr, level="TRACE")


async def main(room_url, token):
transport = DailyTransport(
room_url, token,
camera_enabled=True,
camera_enabled=False,
camera_width=1280,
camera_height=720,
mic_enabled=True,
mic_enabled=False,
speaker_enabled=True,
video_capture_enabled=True,
video_capture_enabled=False,
transcription_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)

media_passthrough = Passthrough([FrameType.AUDIO_RAW, FrameType.IMAGE_RAW])
Expand All @@ -49,7 +51,7 @@ async def main(room_url, token):
@livestream_source.event_handler("on_first_participant_joined")
async def on_first_participant_joined(livestream, participant):
livestream_source.capture_participant_transcription(participant["id"])
livestream_source.capture_participant_video(participant["id"])
# livestream_source.capture_participant_video(participant["id"])

pipeline = Pipeline([livestream_source,
Demuxer([llm_user_response],
Expand Down
41 changes: 27 additions & 14 deletions src/dailyai/pipeline/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def process_frame(self, frame: Frame):
def link(self, processor: 'FrameProcessor'):
self._next = processor
processor._prev = self
logger.debug(f"linking {self} -> {self._next}")
logger.debug(f"Linking {self} -> {self._next}")

def event_loop(self) -> AbstractEventLoop:
return self._loop
Expand All @@ -56,33 +56,46 @@ def state(self) -> State:
return self._state

async def set_state(self, state: State):
if self._next:
await self._next.set_state(state)

if state != self._state:
logger.debug(f"changing {self} state from {self._state} to {state}")
logger.debug(f"Changing {self} state from {self._state} to {state}")
self._state = state

if self._next:
await self._next.set_state(state)

async def push_event(self, event: Event):
# If we are in NULL state, we are not pushing events anymore.
if self._state == State.NULL:
return

logger.trace(f"Pushing event {event} from {self} to {self._next}")

if self._next:
logger.trace(f"pushing {event} from {self} to {self._next}")
await self._next.process_event(event)
else:
logger.warning(f"can't push event {event}. {self} has no downstream peer")
await self._next.push_event(event)

async def push_event_upstream(self, event: Event):
# If we are in NULL state, we are not pushing events anymore.
if self._state == State.NULL:
return

logger.trace(f"Pushing event upstream {event} from {self} to {self._prev}")

async def push_upstream_event(self, event: Event):
if self._prev:
logger.trace(f"pushing {event} from {self} to {self._prev}")
await self._prev.process_event(event)
else:
logger.warning(f"can't push event upstream {event}. {self} has no upstream peer")
await self._prev.push_event_upstream(event)

async def push_frame(self, frame: Frame):
# If we are in NULL state, we are not pushing frames anymore.
if self._state == State.NULL:
return

logger.trace(f"Pushing {frame} from {self} to {self._next}")

if self._next:
logger.trace(f"pushing {frame} from {self} to {self._next}")
await self._next.process_frame(frame)
else:
logger.warning(f"can't push frame {frame}. {self} has no downstream peer")
logger.warning(f"Can't push frame {frame}. {self} has no downstream peer")

def __str__(self):
return f"{self.__class__.__name__}#{self.id}"
4 changes: 1 addition & 3 deletions src/dailyai/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ async def set_state(self, state: State):
await p.set_state(state)

if state != self._state:
logger.debug(f"changing {self} state from {self._state} to {state}")
logger.debug(f"Changing {self} state from {self._state} to {state}")
self._state = state
else:
logger.debug(f"not changing state. {self} already in state {state}")

self._state = state

Expand Down
8 changes: 7 additions & 1 deletion src/dailyai/processors/demuxer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from typing import List

from dailyai.pipeline.events import Event
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import Frame
from dailyai.pipeline.state import State
Expand Down Expand Up @@ -34,7 +35,7 @@ def __init__(self, *args, ** kwargs):
self._processors.append(processors)

logger.debug(
f"created {self} with input frames {self._input_frames} and output frames {self._output_frames}")
f"Created {self} with input frames {self._input_frames} and output frames {self._output_frames}")

#
# Frame processor
Expand All @@ -61,6 +62,11 @@ async def set_state(self, state: State):
await first.set_state(state)
await super().set_state(state)

async def process_event(self, event: Event):
for processors in self._processors:
for p in processors:
await p.process_event(event)

async def process_frame(self, frame: Frame):
for processors in self._processors:
first = processors[0]
Expand Down
15 changes: 2 additions & 13 deletions src/dailyai/processors/llm_response_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,10 @@ async def process_event(self, event: Event):

send_aggregation = False

logger.debug(f"received event {event.type}")

if event.type == self._start_event:
self._seen_start_frame = True
self._aggregating = True
logger.debug(f"aggregation started")
elif event.type == self._end_event:
logger.debug(f"we just saw end event {event.type}")
self._seen_end_frame = True

# We might have received the end frame but we might still be
Expand All @@ -95,8 +91,6 @@ async def process_event(self, event: Event):
# more interim results received).
send_aggregation = not self._aggregating

logger.debug("aggregation %s", "stopped" if send_aggregation else "continues")

if send_aggregation:
await self._push_aggregation()

Expand All @@ -107,31 +101,26 @@ async def process_frame(self, frame: Frame):
send_aggregation = False

if frame.type == self._accumulator_frame:
logger.debug(f"received transcription frame {frame}")
if self._aggregating:
self._aggregation += f" {frame.data}"
# We have recevied a complete sentence, so if we have seen the
# end event and we were still aggregating, it means we should
# send the aggregation.
send_aggregation = self._seen_end_event

logger.debug(f"aggregating frame {frame}")

# We just got our final result, so let's reset interim results.
self._seen_interim_results = False
elif self._interim_accumulator_frame and frame.type == self._interim_accumulator_frame:
logger.debug(f"received interim transcription frame {frame}")
self._seen_interim_results = True

if send_aggregation:
await self._push_aggregation()

async def _push_aggregation(self):
if len(self._aggregation) > 0:
logger.debug(f"pushing aggregation")

self._messages.append({"role": self._role, "content": self._aggregation})
await self.push_frame(LLMMessagesFrame(self._messages))
frame = LLMMessagesFrame(self._messages)
await self.push_frame(frame)

# Reset
self._aggregation = ""
Expand Down
2 changes: 1 addition & 1 deletion src/dailyai/processors/passthrough.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, input_frames: List[str], ** kwargs):

self._input_frames = input_frames

logger.debug(f"created {self} with passthrough frames {input_frames}")
logger.debug(f"Created {self} with passthrough frames {input_frames}")

#
# Frame processor
Expand Down
84 changes: 0 additions & 84 deletions src/dailyai/processors/silero_vad_processor.py

This file was deleted.

Loading

0 comments on commit 2946c35

Please sign in to comment.