From 6c9425d66a39ca18a7c3dc113f21761162744cc7 Mon Sep 17 00:00:00 2001 From: Chad Bailey Date: Mon, 18 Mar 2024 22:14:02 +0000 Subject: [PATCH] wip: video image frames --- src/dailyai/pipeline/frames.py | 19 ++++ src/dailyai/services/ai_services.py | 17 +++ .../services/base_transport_service.py | 3 +- .../services/daily_transport_service.py | 27 ++++- src/dailyai/services/open_ai_services.py | 50 ++++++++- .../foundational/12-describe-video.py | 102 ++++++++++++++++++ 6 files changed, 210 insertions(+), 8 deletions(-) create mode 100644 src/examples/foundational/12-describe-video.py diff --git a/src/dailyai/pipeline/frames.py b/src/dailyai/pipeline/frames.py index 6c39fb2c7..fd709b7aa 100644 --- a/src/dailyai/pipeline/frames.py +++ b/src/dailyai/pipeline/frames.py @@ -179,3 +179,22 @@ class LLMFunctionCallFrame(Frame): """Emitted when the LLM has received an entire function call completion.""" function_name: str arguments: str + + +@dataclass() +class VideoImageFrame(Frame): + """Contains a still image from a partcipant's video stream.""" + participantId: str + image: bytes + + def __str__(self): + return f"{self.__class__.__name__}, participantId: {self.participantId}, image size: {len(self.image)} B" + + +@dataclass() +class VisionFrame(Frame): + prompt: str + image: bytes + + def __str__(self): + return f"{self.__class__.__name__}, prompt: {self.prompt}, image size: {len(self.image)} B" diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 0afe8412f..cc4f9820b 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -18,6 +18,7 @@ Frame, TextFrame, TranscriptionQueueFrame, + VisionFrame ) from abc import abstractmethod @@ -133,6 +134,22 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: yield TranscriptionQueueFrame(text, "", str(time.time())) +class VisionService(AIService): + def __init__(self): + super().__init__() + + # Renders the image. Returns an Image object. + # TODO-CB: return type + @abstractmethod + async def run_vision(self, prompt: str, image: bytes): + pass + + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if isinstance(frame, VisionFrame): + async for frame in self.run_vision(frame.prompt, frame.image): + yield frame + + class FrameLogger(AIService): def __init__(self, prefix="Frame", **kwargs): super().__init__(**kwargs) diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/services/base_transport_service.py index c4d963007..fcba24ae7 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/services/base_transport_service.py @@ -90,7 +90,8 @@ def __init__( self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8 self._context = kwargs.get("context") or [] self._vad_enabled = kwargs.get("vad_enabled") or False - + self._receive_video = kwargs.get("receive_video") or False + self._receive_video_fps = kwargs.get("receive_video_fps") or 1.0 if self._vad_enabled and self._speaker_enabled: raise Exception( "Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False." diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 9c8aa5e78..0ece7e5af 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -2,6 +2,7 @@ import inspect import logging import signal +import time import threading import types @@ -11,6 +12,7 @@ from dailyai.pipeline.frames import ( ReceivedAppMessageFrame, TranscriptionQueueFrame, + VideoImageFrame ) from threading import Event @@ -62,6 +64,7 @@ def __init__( self._other_participant_has_joined = False self._my_participant_id = None + self._participant_frame_times = {} self.transcription_settings = { "language": "en", @@ -204,11 +207,12 @@ def _prerun(self): ) self._my_participant_id = self.client.participants()["local"]["id"] - self.client.update_subscription_profiles({ - "base": { - "camera": "unsubscribed", - } - }) + if not self._receive_video: + self.client.update_subscription_profiles({ + "base": { + "camera": "unsubscribed", + } + }) if self._token and self._start_transcription: self.client.start_transcription(self.transcription_settings) @@ -225,6 +229,16 @@ def _post_run(self): self.client.leave() self.client.release() + def _handle_video_frame(self, participant_id, video_frame): + # TODO-CB: What about multiple participants? + if (not participant_id in self._participant_frame_times) or (time.time() > self._participant_frame_times[participant_id] + 1.0/self._receive_video_fps): + print(f"### sending frame now") + self._participant_frame_times[participant_id] = time.time() + asyncio.run_coroutine_threadsafe( + self.receive_queue.put( + VideoImageFrame(participant_id, video_frame)), self._loop + ) + def on_first_other_participant_joined(self): pass @@ -248,6 +262,9 @@ def on_participant_joined(self, participant): if not self._other_participant_has_joined and participant["id"] != self._my_participant_id: self._other_participant_has_joined = True self.on_first_other_participant_joined() + if self._receive_video: + self.client.set_video_renderer( + participant["id"], self._handle_video_frame) def on_participant_left(self, participant, reason): if len(self.client.participants()) < self._min_others_count + 1: diff --git a/src/dailyai/services/open_ai_services.py b/src/dailyai/services/open_ai_services.py index b2eb3872c..23650854b 100644 --- a/src/dailyai/services/open_ai_services.py +++ b/src/dailyai/services/open_ai_services.py @@ -2,13 +2,21 @@ from PIL import Image import io import time -from openai import AsyncOpenAI +import base64 +from openai import AsyncOpenAI, AsyncStream import json from collections.abc import AsyncGenerator -from dailyai.services.ai_services import LLMService, ImageGenService +from openai.types.chat import ( + ChatCompletion, + ChatCompletionChunk, + ChatCompletionMessageParam, +) + +from dailyai.services.ai_services import LLMService, ImageGenService, VisionService from dailyai.services.openai_api_llm_service import BaseOpenAILLMService +from dailyai.pipeline.frames import TextFrame class OpenAILLMService(BaseOpenAILLMService): @@ -50,3 +58,41 @@ async def run_image_gen(self, sentence) -> tuple[str, bytes]: image_stream = io.BytesIO(await response.content.read()) image = Image.open(image_stream) return (image_url, image.tobytes()) + + +class OpenAIVisionService(VisionService): + def __init__( + self, + *, + model="gpt-4-vision-preview", + api_key, + ): + self._model = model + self._client = AsyncOpenAI(api_key=api_key) + + async def run_vision(self, prompt: str, image: bytes): + base64_image = base64.b64encode(image).decode('utf-8') + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{base64_image}" + }, + }, + ], + } + ] + chunks: AsyncStream[ChatCompletionChunk] = ( + await self._client.chat.completions.create( + model=self._model, + stream=True, + messages=messages, + ) + ) + async for chunk in chunks: + print(f"!!! chunk: {chunk}") + yield TextFrame(chunk) diff --git a/src/examples/foundational/12-describe-video.py b/src/examples/foundational/12-describe-video.py new file mode 100644 index 000000000..8e87d9d86 --- /dev/null +++ b/src/examples/foundational/12-describe-video.py @@ -0,0 +1,102 @@ +import asyncio +import aiohttp +import logging +import os +from typing import AsyncGenerator + +from dailyai.pipeline.frames import Frame, LLMMessagesQueueFrame +from dailyai.pipeline.pipeline import Pipeline +from dailyai.pipeline.frame_processor import FrameProcessor +from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService +from dailyai.services.open_ai_services import OpenAILLMService, OpenAIVisionService +from dailyai.services.ai_services import FrameLogger +from dailyai.pipeline.aggregators import ( + LLMAssistantContextAggregator, + LLMUserContextAggregator, +) +from dailyai.pipeline.frames import VideoImageFrame, VisionFrame +from examples.support.runner import configure + +logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") +logger = logging.getLogger("dailyai") +logger.setLevel(logging.DEBUG) + + +class VideoImageFrameProcessor(FrameProcessor): + def __init__(self): + pass + + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if isinstance(frame, VideoImageFrame): + yield VisionFrame("What is in this image?", frame.image) + else: + yield frame + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransportService( + room_url, + token, + "Respond bot", + duration_minutes=5, + start_transcription=True, + mic_enabled=True, + mic_sample_rate=16000, + camera_enabled=False, + vad_enabled=True, + receive_video=True, + receive_video_fps=1/10.0 + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_CHATGPT_API_KEY"), + model="gpt-4-turbo-preview") + fl = FrameLogger("!!! before VIFP") + fl2 = FrameLogger("Outer") + fl3 = FrameLogger("### Before VS") + fl4 = FrameLogger("$$$ After VS") + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserContextAggregator( + messages, transport._my_participant_id) + tma_out = LLMAssistantContextAggregator( + messages, transport._my_participant_id + ) + vs = OpenAIVisionService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY")) + + vifp = VideoImageFrameProcessor() + pipeline = Pipeline( + processors=[ + fl, + vifp, + fl3, + vs, + fl4, + llm, + fl2, + tts, + tma_out, + ], + ) + + transport.transcription_settings["extra"]["endpointing"] = True + transport.transcription_settings["extra"]["punctuate"] = True + await transport.run(pipeline) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token))