Skip to content

Commit

Permalink
wip: video image frames
Browse files Browse the repository at this point in the history
  • Loading branch information
chadbailey59 committed Mar 18, 2024
1 parent 6d3c52a commit 6c9425d
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 8 deletions.
19 changes: 19 additions & 0 deletions src/dailyai/pipeline/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,22 @@ class LLMFunctionCallFrame(Frame):
"""Emitted when the LLM has received an entire function call completion."""
function_name: str
arguments: str


@dataclass()
class VideoImageFrame(Frame):
"""Contains a still image from a partcipant's video stream."""
participantId: str
image: bytes

def __str__(self):
return f"{self.__class__.__name__}, participantId: {self.participantId}, image size: {len(self.image)} B"


@dataclass()
class VisionFrame(Frame):
prompt: str
image: bytes

def __str__(self):
return f"{self.__class__.__name__}, prompt: {self.prompt}, image size: {len(self.image)} B"
17 changes: 17 additions & 0 deletions src/dailyai/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Frame,
TextFrame,
TranscriptionQueueFrame,
VisionFrame
)

from abc import abstractmethod
Expand Down Expand Up @@ -133,6 +134,22 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
yield TranscriptionQueueFrame(text, "", str(time.time()))


class VisionService(AIService):
def __init__(self):
super().__init__()

# Renders the image. Returns an Image object.
# TODO-CB: return type
@abstractmethod
async def run_vision(self, prompt: str, image: bytes):
pass

async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, VisionFrame):
async for frame in self.run_vision(frame.prompt, frame.image):
yield frame


class FrameLogger(AIService):
def __init__(self, prefix="Frame", **kwargs):
super().__init__(**kwargs)
Expand Down
3 changes: 2 additions & 1 deletion src/dailyai/services/base_transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def __init__(
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8
self._context = kwargs.get("context") or []
self._vad_enabled = kwargs.get("vad_enabled") or False

self._receive_video = kwargs.get("receive_video") or False
self._receive_video_fps = kwargs.get("receive_video_fps") or 1.0
if self._vad_enabled and self._speaker_enabled:
raise Exception(
"Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False."
Expand Down
27 changes: 22 additions & 5 deletions src/dailyai/services/daily_transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import inspect
import logging
import signal
import time
import threading
import types

Expand All @@ -11,6 +12,7 @@
from dailyai.pipeline.frames import (
ReceivedAppMessageFrame,
TranscriptionQueueFrame,
VideoImageFrame
)

from threading import Event
Expand Down Expand Up @@ -62,6 +64,7 @@ def __init__(

self._other_participant_has_joined = False
self._my_participant_id = None
self._participant_frame_times = {}

self.transcription_settings = {
"language": "en",
Expand Down Expand Up @@ -204,11 +207,12 @@ def _prerun(self):
)
self._my_participant_id = self.client.participants()["local"]["id"]

self.client.update_subscription_profiles({
"base": {
"camera": "unsubscribed",
}
})
if not self._receive_video:
self.client.update_subscription_profiles({
"base": {
"camera": "unsubscribed",
}
})

if self._token and self._start_transcription:
self.client.start_transcription(self.transcription_settings)
Expand All @@ -225,6 +229,16 @@ def _post_run(self):
self.client.leave()
self.client.release()

def _handle_video_frame(self, participant_id, video_frame):
# TODO-CB: What about multiple participants?
if (not participant_id in self._participant_frame_times) or (time.time() > self._participant_frame_times[participant_id] + 1.0/self._receive_video_fps):
print(f"### sending frame now")
self._participant_frame_times[participant_id] = time.time()
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(
VideoImageFrame(participant_id, video_frame)), self._loop
)

def on_first_other_participant_joined(self):
pass

Expand All @@ -248,6 +262,9 @@ def on_participant_joined(self, participant):
if not self._other_participant_has_joined and participant["id"] != self._my_participant_id:
self._other_participant_has_joined = True
self.on_first_other_participant_joined()
if self._receive_video:
self.client.set_video_renderer(
participant["id"], self._handle_video_frame)

def on_participant_left(self, participant, reason):
if len(self.client.participants()) < self._min_others_count + 1:
Expand Down
50 changes: 48 additions & 2 deletions src/dailyai/services/open_ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@
from PIL import Image
import io
import time
from openai import AsyncOpenAI
import base64
from openai import AsyncOpenAI, AsyncStream

import json
from collections.abc import AsyncGenerator

from dailyai.services.ai_services import LLMService, ImageGenService
from openai.types.chat import (
ChatCompletion,
ChatCompletionChunk,
ChatCompletionMessageParam,
)

from dailyai.services.ai_services import LLMService, ImageGenService, VisionService
from dailyai.services.openai_api_llm_service import BaseOpenAILLMService
from dailyai.pipeline.frames import TextFrame


class OpenAILLMService(BaseOpenAILLMService):
Expand Down Expand Up @@ -50,3 +58,41 @@ async def run_image_gen(self, sentence) -> tuple[str, bytes]:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())


class OpenAIVisionService(VisionService):
def __init__(
self,
*,
model="gpt-4-vision-preview",
api_key,
):
self._model = model
self._client = AsyncOpenAI(api_key=api_key)

async def run_vision(self, prompt: str, image: bytes):
base64_image = base64.b64encode(image).decode('utf-8')
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
},
},
],
}
]
chunks: AsyncStream[ChatCompletionChunk] = (
await self._client.chat.completions.create(
model=self._model,
stream=True,
messages=messages,
)
)
async for chunk in chunks:
print(f"!!! chunk: {chunk}")
yield TextFrame(chunk)
102 changes: 102 additions & 0 deletions src/examples/foundational/12-describe-video.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import asyncio
import aiohttp
import logging
import os
from typing import AsyncGenerator

from dailyai.pipeline.frames import Frame, LLMMessagesQueueFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAILLMService, OpenAIVisionService
from dailyai.services.ai_services import FrameLogger
from dailyai.pipeline.aggregators import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from dailyai.pipeline.frames import VideoImageFrame, VisionFrame
from examples.support.runner import configure

logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)


class VideoImageFrameProcessor(FrameProcessor):
def __init__(self):
pass

async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, VideoImageFrame):
yield VisionFrame("What is in this image?", frame.image)
else:
yield frame


async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
room_url,
token,
"Respond bot",
duration_minutes=5,
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False,
vad_enabled=True,
receive_video=True,
receive_video_fps=1/10.0
)

tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

llm = OpenAILLMService(
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
model="gpt-4-turbo-preview")
fl = FrameLogger("!!! before VIFP")
fl2 = FrameLogger("Outer")
fl3 = FrameLogger("### Before VS")
fl4 = FrameLogger("$$$ After VS")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
},
]

tma_in = LLMUserContextAggregator(
messages, transport._my_participant_id)
tma_out = LLMAssistantContextAggregator(
messages, transport._my_participant_id
)
vs = OpenAIVisionService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))

vifp = VideoImageFrameProcessor()
pipeline = Pipeline(
processors=[
fl,
vifp,
fl3,
vs,
fl4,
llm,
fl2,
tts,
tma_out,
],
)

transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
await transport.run(pipeline)


if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

0 comments on commit 6c9425d

Please sign in to comment.