diff --git a/examples/foundational/12-describe-video.py b/examples/foundational/12-describe-video.py index f7cf851b6..8e20d6533 100644 --- a/examples/foundational/12-describe-video.py +++ b/examples/foundational/12-describe-video.py @@ -5,7 +5,7 @@ from typing import AsyncGenerator -from dailyai.pipeline.aggregators import FrameProcessor, UserResponseAggregator +from dailyai.pipeline.aggregators import FrameProcessor, UserResponseAggregator, VisionImageFrameAggregator from dailyai.pipeline.frames import Frame, TextFrame, UserImageRequestFrame from dailyai.pipeline.pipeline import Pipeline @@ -59,6 +59,8 @@ async def main(room_url: str, token): image_requester = UserImageRequester() + vision_aggregator = VisionImageFrameAggregator() + moondream = MoondreamService() tts = ElevenLabsTTSService( @@ -73,7 +75,7 @@ async def on_first_other_participant_joined(transport, participant): transport.render_participant_video(participant["id"], framerate=0) image_requester.set_participant_id(participant["id"]) - pipeline = Pipeline([user_response, image_requester, moondream, tts]) + pipeline = Pipeline([user_response, image_requester, vision_aggregator, moondream, tts]) await transport.run(pipeline) diff --git a/src/dailyai/pipeline/aggregators.py b/src/dailyai/pipeline/aggregators.py index 12da8b6dc..9edc87384 100644 --- a/src/dailyai/pipeline/aggregators.py +++ b/src/dailyai/pipeline/aggregators.py @@ -7,6 +7,7 @@ EndFrame, EndPipeFrame, Frame, + ImageFrame, LLMMessagesFrame, LLMResponseEndFrame, LLMResponseStartFrame, @@ -14,6 +15,7 @@ TranscriptionFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, + VisionImageFrame, ) from dailyai.pipeline.pipeline import Pipeline from dailyai.services.ai_services import AIService @@ -463,3 +465,37 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: self.accumulator = [] else: self.accumulator.append(frame) + + +class VisionImageFrameAggregator(FrameProcessor): + """This aggregator waits for a consecutive TextFrame and an + ImageFrame. After the ImageFrame arrives it will output a VisionImageFrame. + + >>> from dailyai.pipeline.frames import ImageFrame + + >>> async def print_frames(aggregator, frame): + ... async for frame in aggregator.process_frame(frame): + ... print(frame) + + >>> aggregator = VisionImageFrameAggregator() + >>> asyncio.run(print_frames(aggregator, TextFrame("What do you see?"))) + >>> asyncio.run(print_frames(aggregator, ImageFrame(image=bytes([]), size=(0, 0)))) + VisionImageFrame, text: What do you see?, image size: 0x0, buffer size: 0 B + + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._describe_text = None + + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if isinstance(frame, TextFrame): + self._describe_text = frame.text + elif isinstance(frame, ImageFrame): + if self._describe_text: + yield VisionImageFrame(self._describe_text, frame.image, frame.size) + self._describe_text = None + else: + yield frame + else: + yield frame diff --git a/src/dailyai/pipeline/frames.py b/src/dailyai/pipeline/frames.py index 7117e8f88..4322e65c8 100644 --- a/src/dailyai/pipeline/frames.py +++ b/src/dailyai/pipeline/frames.py @@ -79,8 +79,10 @@ def __str__(self): @dataclass() class URLImageFrame(ImageFrame): - """An image. Will be shown by the transport if the transport's camera is - enabled.""" + """An image with an associated URL. Will be shown by the transport if the + transport's camera is enabled. + + """ url: str | None def __init__(self, url, image, size): @@ -91,6 +93,22 @@ def __str__(self): return f"{self.__class__.__name__}, url: {self.url}, image size: {self.size[0]}x{self.size[1]}, buffer size: {len(self.image)} B" +@dataclass() +class VisionImageFrame(ImageFrame): + """An image with an associated text to ask for a description of it. Will be shown by the + transport if the transport's camera is enabled. + + """ + text: str | None + + def __init__(self, text, image, size): + super().__init__(image, size) + self.text = text + + def __str__(self): + return f"{self.__class__.__name__}, text: {self.text}, image size: {self.size[0]}x{self.size[1]}, buffer size: {len(self.image)} B" + + @dataclass() class UserImageFrame(ImageFrame): """An image associated to a user. Will be shown by the transport if the transport's camera is diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index d620be3d2..babf6be75 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -15,6 +15,7 @@ TextFrame, TranscriptionFrame, URLImageFrame, + VisionImageFrame, ) from abc import abstractmethod @@ -108,19 +109,13 @@ def __init__(self, **kwargs): self._describe_text = None @abstractmethod - async def run_vision(self, describe_text: str, frame: ImageFrame) -> str: + async def run_vision(self, frame: VisionImageFrame) -> str: pass async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: - if isinstance(frame, TextFrame): - self._describe_text = frame.text - elif isinstance(frame, ImageFrame): - if self._describe_text: - description = await self.run_vision(self._describe_text, frame) - self._describe_text = None - yield TextFrame(description) - else: - yield frame + if isinstance(frame, VisionImageFrame): + description = await self.run_vision(frame) + yield TextFrame(description) else: yield frame diff --git a/src/dailyai/services/moondream_ai_service.py b/src/dailyai/services/moondream_ai_service.py index 2784a669e..07ff9e534 100644 --- a/src/dailyai/services/moondream_ai_service.py +++ b/src/dailyai/services/moondream_ai_service.py @@ -1,4 +1,4 @@ -from dailyai.pipeline.frames import ImageFrame +from dailyai.pipeline.frames import ImageFrame, VisionImageFrame from dailyai.services.ai_services import VisionService from PIL import Image @@ -42,11 +42,11 @@ def __init__( ).to(device=device, dtype=dtype) self._model.eval() - async def run_vision(self, describe_text: str, frame: ImageFrame) -> str: + async def run_vision(self, frame: VisionImageFrame) -> str: image = Image.frombytes("RGB", (frame.size[0], frame.size[1]), frame.image) image_embeds = self._model.encode_image(image) description = self._model.answer_question( image_embeds=image_embeds, - question=describe_text, + question=frame.text, tokenizer=self._tokenizer) return description