diff --git a/dot-env.template b/dot-env.template index bb7db9e9f..d7bd82cc5 100644 --- a/dot-env.template +++ b/dot-env.template @@ -33,3 +33,6 @@ PLAY_HT_API_KEY=... # OpenAI OPENAI_API_KEY=... + +# Fireworks +FIREWORKS_API_KEY=... \ No newline at end of file diff --git a/examples/foundational/03-still-frame.py b/examples/foundational/03-still-frame.py index 51ef47de8..cd89a041d 100644 --- a/examples/foundational/03-still-frame.py +++ b/examples/foundational/03-still-frame.py @@ -7,6 +7,7 @@ from dailyai.pipeline.pipeline import Pipeline from dailyai.transports.daily_transport import DailyTransport from dailyai.services.fal_ai_services import FalImageGenService +from dailyai.services.fireworks_ai_services import FireworksImageGenService from runner import configure @@ -30,13 +31,20 @@ async def main(room_url): duration_minutes=1 ) - imagegen = FalImageGenService( - params=FalImageGenService.InputParams( - image_size="square_hd" - ), + # imagegen = FalImageGenService( + # params=FalImageGenService.InputParams( + # image_size="square_hd" + # ), + # aiohttp_session=session, + # key_id=os.getenv("FAL_KEY_ID"), + # key_secret=os.getenv("FAL_KEY_SECRET"), + # ) + + imagegen = FireworksImageGenService( aiohttp_session=session, - key=os.getenv("FAL_KEY"), - ) + api_key=os.getenv("FIREWORKS_API_KEY"), + model="accounts/fireworks/models/stable-diffusion-xl-1024-v1-0", + image_size="1024x1024") pipeline = Pipeline([imagegen]) diff --git a/examples/foundational/12a-fireworks-describe-video.py b/examples/foundational/12a-fireworks-describe-video.py new file mode 100644 index 000000000..14e3b6d42 --- /dev/null +++ b/examples/foundational/12a-fireworks-describe-video.py @@ -0,0 +1,87 @@ +import asyncio +import aiohttp +import logging +import os + +from typing import AsyncGenerator + +from dailyai.pipeline.aggregators import FrameProcessor, UserResponseAggregator, VisionImageFrameAggregator + +from dailyai.pipeline.frames import Frame, TextFrame, UserImageRequestFrame +from dailyai.pipeline.pipeline import Pipeline +from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService +from dailyai.services.open_ai_services import OpenAIVisionService +from dailyai.transports.daily_transport import DailyTransport + +from runner import configure + +from dotenv import load_dotenv +load_dotenv(override=True) + +logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") +logger = logging.getLogger("dailyai") +logger.setLevel(logging.DEBUG) + + +class UserImageRequester(FrameProcessor): + participant_id: str + + def set_participant_id(self, participant_id: str): + self.participant_id = participant_id + + async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + if self.participant_id and isinstance(frame, TextFrame): + yield UserImageRequestFrame(self.participant_id) + yield frame + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Describe participant video", + duration_minutes=5, + mic_enabled=True, + mic_sample_rate=16000, + vad_enabled=True, + start_transcription=True, + video_rendering_enabled=True + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + user_response = UserResponseAggregator() + + image_requester = UserImageRequester() + + vision_aggregator = VisionImageFrameAggregator() + + # If you run into weird description, try with use_cpu=True + img_desc = OpenAIVisionService( + api_key=os.getenv("OPENAI_API_KEY") + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + @transport.event_handler("on_first_other_participant_joined") + async def on_first_other_participant_joined(transport, participant): + await transport.say("Hi there! Feel free to ask me what I see.", tts) + transport.render_participant_video(participant["id"], framerate=0) + image_requester.set_participant_id(participant["id"]) + + pipeline = Pipeline([user_response, image_requester, vision_aggregator, img_desc, tts]) + + await transport.run(pipeline) + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 5ba732acd..d4e375f60 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -109,12 +109,20 @@ def __init__(self, **kwargs): @abstractmethod async def run_vision(self, frame: VisionImageFrame) -> str: + print(f"### uh oh, abstract run vision") pass async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]: + print(f"### visionservice process frame, {frame}") if isinstance(frame, VisionImageFrame): - description = await self.run_vision(frame) - yield TextFrame(description) + print(f"### ### calling self.run_vision") + if getattr(self, "run_vision_async"): + async for frame in self.run_vision_async(frame): + yield frame + + else: + description = await self.run_vision(frame) + yield TextFrame(description) else: yield frame diff --git a/src/dailyai/services/fireworks_ai_services.py b/src/dailyai/services/fireworks_ai_services.py index e5ccbc658..a2dcc4462 100644 --- a/src/dailyai/services/fireworks_ai_services.py +++ b/src/dailyai/services/fireworks_ai_services.py @@ -1,5 +1,9 @@ +import aiohttp import os +from typing import Literal + +from dailyai.services.ai_services import ImageGenService, VisionService from dailyai.services.openai_api_llm_service import BaseOpenAILLMService @@ -16,3 +20,44 @@ class FireworksLLMService(BaseOpenAILLMService): def __init__(self, model="accounts/fireworks/models/firefunction-v1", *args, **kwargs): kwargs["base_url"] = "https://api.fireworks.ai/inference/v1" super().__init__(model, *args, **kwargs) + + +class FireworksImageGenService(ImageGenService): + + def __init__( + self, + *, + image_size: Literal["256x256", "512x512", "1024x1024", "1792x1024", "1024x1792"], + aiohttp_session: aiohttp.ClientSession, + api_key, + model="accounts/fireworks/models/stable-diffusion-xl-1024-v1-0", + ): + super().__init__() + self._model = model + self._image_size = image_size + self._client = AsyncOpenAI(api_key=api_key, + base_url="https://api.fireworks.ai/inference/v1") + self._aiohttp_session = aiohttp_session + + async def run_image_gen(self, prompt: str) -> tuple[str, bytes, tuple[int, int]]: + self.logger.info(f"Generating Fireworks image: {prompt}") + + image = await self._client.images.generate( + prompt=prompt, + model=self._model, + n=1, + size=self._image_size + ) + print(f"!!! image is {image}") + image_url = image.data[0].url + if not image_url: + raise Exception("No image provided in response", image) + + # Load the image from the url + async with self._aiohttp_session.get(image_url) as response: + image_stream = io.BytesIO(await response.content.read()) + image = Image.open(image_stream) + return (image_url, image.tobytes(), image.size) + +class FireworksVisionService(VisionService): + \ No newline at end of file diff --git a/src/dailyai/services/open_ai_services.py b/src/dailyai/services/open_ai_services.py index 9eaec5218..4683770b1 100644 --- a/src/dailyai/services/open_ai_services.py +++ b/src/dailyai/services/open_ai_services.py @@ -2,8 +2,16 @@ import aiohttp from PIL import Image import io +import base64 -from dailyai.services.ai_services import ImageGenService +from openai import AsyncOpenAI, AsyncStream + +from openai.types.chat import ( + ChatCompletionChunk, +) + +from dailyai.pipeline.frames import VisionImageFrame, TextFrame +from dailyai.services.ai_services import ImageGenService, VisionService from dailyai.services.openai_api_llm_service import BaseOpenAILLMService @@ -56,3 +64,63 @@ async def run_image_gen(self, prompt: str) -> tuple[str, bytes, tuple[int, int]] image_stream = io.BytesIO(await response.content.read()) image = Image.open(image_stream) return (image_url, image.tobytes(), image.size) + + +class OpenAIVisionService(VisionService): + def __init__( + self, + *, + model="gpt-4-vision-preview", + api_key, + ): + self._model = model + self._client = AsyncOpenAI(api_key=api_key) + super().__init__() + + async def run_vision_async(self, frame): + print(f"!!! i got a run_vision call") + print(f"!!! run vision, frame: {frame}") + prompt = frame.text + IMAGE_WIDTH = frame.size[0] + IMAGE_HEIGHT = frame.size[1] + new_image = Image.frombytes( + 'RGB', (IMAGE_WIDTH, IMAGE_HEIGHT), frame.image) + + # Uncomment these lines to write the frame to a jpg in the same directory. + # current_path = os.getcwd() + # image_path = os.path.join(current_path, "image.jpg") + # image.save(image_path, format="JPEG") + + jpeg_buffer = io.BytesIO() + + new_image.save(jpeg_buffer, format='JPEG') + + jpeg_bytes = jpeg_buffer.getvalue() + base64_image = base64.b64encode(jpeg_bytes).decode('utf-8') + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{base64_image}" + }, + }, + ], + } + ] + print(f"!!! messages: {messages}") + chunks: AsyncStream[ChatCompletionChunk] = ( + await self._client.chat.completions.create( + model=self._model, + stream=True, + messages=messages, + ) + ) + async for chunk in chunks: + if len(chunk.choices) == 0: + continue + if chunk.choices[0].delta.content: + yield TextFrame(chunk.choices[0].delta.content)