From b337e984b34558ba7d98dc4385574123bb34ba86 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Thu, 16 May 2024 20:48:24 -0700 Subject: [PATCH 1/7] Initial commit of Google Gemini LLM service. Gemini text input works. We translate from OpenAILLMContext format on the fly in the GoogleLLMService implementation. This commit also implements image input (vision) in both the GoogleLLMService and in the OpenAILLMService. Image input is a hack and needs to be revisited. OpenAI expects images to be uploaded as base64-encoded JPEGs. Google does not require the base64 encoding. Other than for images, we use the OpenAI format as our standard, but base64-encoding the images and then unencoding them in the GoogleLLMService feels wasteful. --- .../12a-describe-video-gemini-flash.py | 103 +++++++++++++++ .../foundational/12b-describe-video-gpt-4o.py | 106 +++++++++++++++ macos-py3.10-requirements.txt | 121 +++++++++++++----- pyproject.toml | 1 + .../aggregators/openai_llm_context.py | 30 ++++- src/pipecat/services/google.py | 96 ++++++++++++++ src/pipecat/services/openai.py | 23 +++- 7 files changed, 444 insertions(+), 36 deletions(-) create mode 100644 examples/foundational/12a-describe-video-gemini-flash.py create mode 100644 examples/foundational/12b-describe-video-gpt-4o.py create mode 100644 src/pipecat/services/google.py diff --git a/examples/foundational/12a-describe-video-gemini-flash.py b/examples/foundational/12a-describe-video-gemini-flash.py new file mode 100644 index 000000000..33240dd13 --- /dev/null +++ b/examples/foundational/12a-describe-video-gemini-flash.py @@ -0,0 +1,103 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.user_response import UserResponseAggregator +from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.google import GoogleLLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVAD + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class UserImageRequester(FrameProcessor): + + def __init__(self, participant_id: str | None = None): + super().__init__() + self._participant_id = participant_id + + def set_participant_id(self, participant_id: str): + self._participant_id = participant_id + + async def process_frame(self, frame: Frame, direction: FrameDirection): + if self._participant_id and isinstance(frame, TextFrame): + await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM) + await self.push_frame(frame, direction) + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Describe participant video", + DailyParams( + audio_in_enabled=True, # This is so Silero VAD can get audio data + audio_out_enabled=True, + transcription_enabled=True + ) + ) + + vad = SileroVAD() + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + user_response = UserResponseAggregator() + + image_requester = UserImageRequester() + + vision_aggregator = VisionImageFrameAggregator() + + google = GoogleLLMService(model="gemini-1.5-flash-latest") + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await tts.say("Hi there! Feel free to ask me what I see.") + transport.capture_participant_video(participant["id"], framerate=0) + transport.capture_participant_transcription(participant["id"]) + image_requester.set_participant_id(participant["id"]) + + pipeline = Pipeline([transport.input(), vad, user_response, image_requester, + vision_aggregator, google, tts, transport.output()]) + + task = PipelineTask(pipeline) + + runner = PipelineRunner() + + await runner.run(task) + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/examples/foundational/12b-describe-video-gpt-4o.py b/examples/foundational/12b-describe-video-gpt-4o.py new file mode 100644 index 000000000..dd386c8b4 --- /dev/null +++ b/examples/foundational/12b-describe-video-gpt-4o.py @@ -0,0 +1,106 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.user_response import UserResponseAggregator +from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVAD + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class UserImageRequester(FrameProcessor): + + def __init__(self, participant_id: str | None = None): + super().__init__() + self._participant_id = participant_id + + def set_participant_id(self, participant_id: str): + self._participant_id = participant_id + + async def process_frame(self, frame: Frame, direction: FrameDirection): + if self._participant_id and isinstance(frame, TextFrame): + await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM) + await self.push_frame(frame, direction) + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Describe participant video", + DailyParams( + audio_in_enabled=True, # This is so Silero VAD can get audio data + audio_out_enabled=True, + transcription_enabled=True + ) + ) + + vad = SileroVAD() + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + user_response = UserResponseAggregator() + + image_requester = UserImageRequester() + + vision_aggregator = VisionImageFrameAggregator() + + google = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o" + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await tts.say("Hi there! Feel free to ask me what I see.") + transport.capture_participant_video(participant["id"], framerate=0) + transport.capture_participant_transcription(participant["id"]) + image_requester.set_participant_id(participant["id"]) + + pipeline = Pipeline([transport.input(), vad, user_response, image_requester, + vision_aggregator, google, tts, transport.output()]) + + task = PipelineTask(pipeline) + + runner = PipelineRunner() + + await runner.run(task) + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/macos-py3.10-requirements.txt b/macos-py3.10-requirements.txt index 24b51521c..334b01d3d 100644 --- a/macos-py3.10-requirements.txt +++ b/macos-py3.10-requirements.txt @@ -1,32 +1,33 @@ +WARNING: --strip-extras is becoming the default in version 8.0.0. To silence this warning, either use --strip-extras to opt into the new default or use --no-strip-extras to retain the existing behavior. # -# This file is autogenerated by pip-compile with Python 3.10 +# This file is autogenerated by pip-compile with Python 3.11 # by the following command: # # pip-compile --all-extras pyproject.toml # aiohttp==3.9.5 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) aiosignal==1.3.1 # via aiohttp annotated-types==0.6.0 # via pydantic -anthropic==0.25.8 - # via pipecat (pyproject.toml) +anthropic==0.25.9 + # via pipecat-ai (pyproject.toml) anyio==4.3.0 # via # anthropic # httpx # openai -async-timeout==4.0.3 - # via aiohttp attrs==23.2.0 # via aiohttp av==12.0.0 # via faster-whisper azure-cognitiveservices-speech==1.37.0 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) blinker==1.8.2 # via flask +cachetools==5.3.3 + # via google-auth certifi==2024.2.2 # via # httpcore @@ -41,19 +42,17 @@ coloredlogs==15.0.1 ctranslate2==4.2.1 # via faster-whisper daily-python==0.7.4 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) distro==1.9.0 # via # anthropic # openai einops==0.8.0 - # via pipecat (pyproject.toml) -exceptiongroup==1.2.1 - # via anyio + # via pipecat-ai (pyproject.toml) fal-client==0.4.0 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) faster-whisper==1.0.2 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) filelock==3.14.0 # via # huggingface-hub @@ -63,25 +62,58 @@ filelock==3.14.0 flask==3.0.3 # via # flask-cors - # pipecat (pyproject.toml) + # pipecat-ai (pyproject.toml) flask-cors==4.0.1 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) flatbuffers==24.3.25 # via onnxruntime frozenlist==1.4.1 # via # aiohttp # aiosignal -fsspec==2024.3.1 +fsspec==2024.5.0 # via # huggingface-hub # torch +google-ai-generativelanguage==0.6.3 + # via google-generativeai +google-api-core[grpc]==2.19.0 + # via + # google-ai-generativelanguage + # google-api-python-client + # google-generativeai +google-api-python-client==2.129.0 + # via google-generativeai +google-auth==2.29.0 + # via + # google-ai-generativelanguage + # google-api-core + # google-api-python-client + # google-auth-httplib2 + # google-generativeai +google-auth-httplib2==0.2.0 + # via google-api-python-client +google-generativeai==0.5.3 + # via pipecat-ai (pyproject.toml) +googleapis-common-protos==1.63.0 + # via + # google-api-core + # grpcio-status grpcio==1.63.0 - # via pyht + # via + # google-api-core + # grpcio-status + # pyht +grpcio-status==1.62.2 + # via google-api-core h11==0.14.0 # via httpcore httpcore==1.0.5 # via httpx +httplib2==0.22.0 + # via + # google-api-python-client + # google-auth-httplib2 httpx==0.27.0 # via # anthropic @@ -110,7 +142,7 @@ jinja2==3.1.4 # flask # torch loguru==0.7.2 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) markupsafe==2.1.5 # via # jinja2 @@ -127,13 +159,13 @@ numpy==1.26.4 # via # ctranslate2 # onnxruntime - # pipecat (pyproject.toml) + # pipecat-ai (pyproject.toml) # torchvision # transformers onnxruntime==1.17.3 # via faster-whisper openai==1.26.0 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) packaging==24.0 # via # huggingface-hub @@ -141,37 +173,59 @@ packaging==24.0 # transformers pillow==10.3.0 # via - # pipecat (pyproject.toml) + # pipecat-ai (pyproject.toml) # torchvision +proto-plus==1.23.0 + # via + # google-ai-generativelanguage + # google-api-core protobuf==4.25.3 # via + # google-ai-generativelanguage + # google-api-core + # google-generativeai + # googleapis-common-protos + # grpcio-status # onnxruntime + # proto-plus # pyht +pyasn1==0.6.0 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.4.0 + # via google-auth pyaudio==0.2.14 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) pydantic==2.7.1 # via # anthropic + # google-generativeai # openai pydantic-core==2.18.2 # via pydantic pyht==0.0.28 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) +pyparsing==3.1.2 + # via httplib2 python-dotenv==1.0.1 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) pyyaml==6.0.1 # via # ctranslate2 # huggingface-hub # timm # transformers -regex==2024.5.10 +regex==2024.5.15 # via transformers requests==2.31.0 # via + # google-api-core # huggingface-hub # pyht # transformers +rsa==4.9 + # via google-auth safetensors==0.4.3 # via # timm @@ -187,7 +241,7 @@ sympy==1.12 # onnxruntime # torch timm==0.9.16 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) tokenizers==0.19.1 # via # anthropic @@ -195,35 +249,38 @@ tokenizers==0.19.1 # transformers torch==2.3.0 # via - # pipecat (pyproject.toml) + # pipecat-ai (pyproject.toml) # timm # torchaudio # torchvision torchaudio==2.3.0 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) torchvision==0.18.0 # via timm tqdm==4.66.4 # via + # google-generativeai # huggingface-hub # openai # transformers transformers==4.40.2 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) typing-extensions==4.11.0 # via # anthropic - # anyio + # google-generativeai # huggingface-hub # openai - # pipecat (pyproject.toml) + # pipecat-ai (pyproject.toml) # pydantic # pydantic-core # torch +uritemplate==4.1.1 + # via google-api-python-client urllib3==2.2.1 # via requests websockets==12.0 - # via pipecat (pyproject.toml) + # via pipecat-ai (pyproject.toml) werkzeug==3.0.3 # via flask yarl==1.9.4 diff --git a/pyproject.toml b/pyproject.toml index 33d61424e..983a7b4c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ azure = [ "azure-cognitiveservices-speech~=1.37.0" ] daily = [ "daily-python~=0.7.4" ] examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ] fal = [ "fal-client~=0.4.0" ] +google = [ "google-generativeai~=0.5.3" ] fireworks = [ "openai~=1.26.0" ] local = [ "pyaudio~=0.2.0" ] moondream = [ "einops~=0.8.0", "timm~=0.9.16", "transformers~=4.40.2" ] diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index c446e732e..e44c22e3a 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -5,10 +5,13 @@ # from dataclasses import dataclass +import io from typing import List -from pipecat.frames.frames import Frame +from PIL import Image + +from pipecat.frames.frames import Frame, VisionImageRawFrame from openai._types import NOT_GIVEN, NotGiven @@ -43,6 +46,31 @@ def from_messages(messages: List[dict]) -> "OpenAILLMContext": }) return context + @staticmethod + def from_image_frame(frame: VisionImageRawFrame) -> "OpenAILLMContext": + """ + For images, we are deviating from the OpenAI messages shape. OpenAI + expects images to be base64 encoded, but other vision models may not. + So we'll store the image as bytes and do the base64 encoding as needed + in the LLM service. + """ + context = OpenAILLMContext() + buffer = io.BytesIO() + Image.frombytes( + frame.format, + frame.size, + frame.image + ).save( + buffer, + format="JPEG") + context.add_message({ + "content": frame.text, + "role": "user", + "data": buffer.getvalue(), + "mime_type": "image/jpeg" + }) + return context + def add_message(self, message: ChatCompletionMessageParam): self.messages.append(message) diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py new file mode 100644 index 000000000..74059afbc --- /dev/null +++ b/src/pipecat/services/google.py @@ -0,0 +1,96 @@ +import google.generativeai as gai +import google.ai.generativelanguage as glm +import os +import asyncio + +from typing import List + +from pipecat.frames.frames import ( + Frame, + TextFrame, + VisionImageRawFrame, + LLMMessagesFrame, + LLMResponseStartFrame, + LLMResponseEndFrame) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import LLMService +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame + +from loguru import logger + + +class GoogleLLMService(LLMService): + """This class implements inference with Google's AI models + + This service translates internally from OpenAILLMContext to the messages format + expected by the Google AI model. We are using the OpenAILLMContext as a lingua + franca for all LLM services, so that it is easy to switch between different LLMs. + """ + + def __init__(self, model="gemini-1.5-flash-latest", api_key=None, **kwargs): + super().__init__(**kwargs) + self.model = model + gai.configure(api_key=api_key or os.environ["GOOGLE_API_KEY"]) + self.create_client() + + def create_client(self): + self._client = gai.GenerativeModel(self.model) + + def _get_messages_from_openai_context( + self, context: OpenAILLMContext) -> List[glm.Content]: + openai_messages = context.get_messages() + google_messages = [] + + for message in openai_messages: + role = message["role"] + content = message["content"] + if role == "system": + role = "user" + elif role == "assistant": + role = "model" + + parts = [glm.Part(text=content)] + if "mime_type" in message: + parts.append( + glm.Part(inline_data=glm.Blob( + mime_type=message["mime_type"], + data=message["data"] + ))) + google_messages.append({"role": role, "parts": parts}) + + return google_messages + + async def _async_generator_wrapper(self, sync_generator): + for item in sync_generator: + yield item + await asyncio.sleep(0) + + async def _process_context(self, context: OpenAILLMContext): + try: + messages = self._get_messages_from_openai_context(context) + + await self.push_frame(LLMResponseStartFrame()) + response = self._client.generate_content(messages, stream=True) + + async for chunk in self._async_generator_wrapper(response): + logger.debug(f"Pushing inference text: {chunk.text}") + await self.push_frame(TextFrame(chunk.text)) + + await self.push_frame(LLMResponseEndFrame()) + except Exception as e: + logger.error(f"Exception: {e}") + + async def process_frame(self, frame: Frame, direction: FrameDirection): + context = None + + if isinstance(frame, OpenAILLMContextFrame): + context: OpenAILLMContext = frame.context + elif isinstance(frame, LLMMessagesFrame): + context = OpenAILLMContext.from_messages(frame.messages) + elif isinstance(frame, VisionImageRawFrame): + context = OpenAILLMContext.from_image_frame(frame) + else: + await self.push_frame(frame, direction) + + if context: + await self._process_context(context) diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 56224e8fe..94753c513 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -8,6 +8,7 @@ import json import time import aiohttp +import base64 from PIL import Image @@ -22,7 +23,8 @@ LLMResponseEndFrame, LLMResponseStartFrame, TextFrame, - URLImageRawFrame + URLImageRawFrame, + VisionImageRawFrame ) from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame from pipecat.processors.frame_processor import FrameDirection @@ -67,8 +69,21 @@ async def _stream_chat_completions( self, context: OpenAILLMContext ) -> AsyncStream[ChatCompletionChunk]: messages: List[ChatCompletionMessageParam] = context.get_messages() - messages_for_log = json.dumps(messages) - logger.debug(f"Generating chat: {messages_for_log}") + + # base64 encode any images + for message in messages: + if message.get("mime_type") == "image/jpeg": + encoded_image = base64.b64encode(message["data"]).decode("utf-8") + text = message["content"] + message["content"] = [ + {"type": "text", "text": text}, + {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}} + ] + del message["data"] + del message["mime_type"] + + # messages_for_log = json.dumps(messages) + # logger.debug(f"Generating chat: {messages_for_log}") start_time = time.time() chunks: AsyncStream[ChatCompletionChunk] = ( @@ -151,6 +166,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context: OpenAILLMContext = frame.context elif isinstance(frame, LLMMessagesFrame): context = OpenAILLMContext.from_messages(frame.messages) + elif isinstance(frame, VisionImageRawFrame): + context = OpenAILLMContext.from_image_frame(frame) else: await self.push_frame(frame, direction) From d83f0aabca6d925781cfb3cd25bae1d17a785b50 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Sun, 19 May 2024 10:53:50 -0700 Subject: [PATCH 2/7] generate macos-py3.10-requirements.txt with Python 3.10 --- macos-py3.10-requirements.txt | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/macos-py3.10-requirements.txt b/macos-py3.10-requirements.txt index 334b01d3d..e4545f67d 100644 --- a/macos-py3.10-requirements.txt +++ b/macos-py3.10-requirements.txt @@ -1,6 +1,5 @@ -WARNING: --strip-extras is becoming the default in version 8.0.0. To silence this warning, either use --strip-extras to opt into the new default or use --no-strip-extras to retain the existing behavior. # -# This file is autogenerated by pip-compile with Python 3.11 +# This file is autogenerated by pip-compile with Python 3.10 # by the following command: # # pip-compile --all-extras pyproject.toml @@ -18,6 +17,8 @@ anyio==4.3.0 # anthropic # httpx # openai +async-timeout==4.0.3 + # via aiohttp attrs==23.2.0 # via aiohttp av==12.0.0 @@ -49,6 +50,8 @@ distro==1.9.0 # openai einops==0.8.0 # via pipecat-ai (pyproject.toml) +exceptiongroup==1.2.1 + # via anyio fal-client==0.4.0 # via pipecat-ai (pyproject.toml) faster-whisper==1.0.2 @@ -268,6 +271,7 @@ transformers==4.40.2 typing-extensions==4.11.0 # via # anthropic + # anyio # google-generativeai # huggingface-hub # openai From cf597a2f6b16d5c11fdbbf0a167f69d8451c9845 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Sun, 19 May 2024 11:08:38 -0700 Subject: [PATCH 3/7] add back in debug log line in openai.py --- src/pipecat/services/openai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 94753c513..ea59b7a54 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -83,7 +83,7 @@ async def _stream_chat_completions( del message["mime_type"] # messages_for_log = json.dumps(messages) - # logger.debug(f"Generating chat: {messages_for_log}") + logger.debug(f"Generating chat: {messages_for_log}") start_time = time.time() chunks: AsyncStream[ChatCompletionChunk] = ( From e5ddaf14f4a73de25ed78668547fef1585e583f5 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Sun, 19 May 2024 11:09:30 -0700 Subject: [PATCH 4/7] add google and deepgram to README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d2cd573c4..51f14f390 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ pip install "pipecat-ai[option,...]" Your project may or may not need these, so they're made available as optional requirements. Here is a list: -- **AI services**: `anthropic`, `azure`, `fal`, `moondream`, `openai`, `playht`, `silero`, `whisper` +- **AI services**: `anthropic`, `azure`, `deepgram`, `google`, `fal`, `moondream`, `openai`, `playht`, `silero`, `whisper` - **Transports**: `local`, `websocket`, `daily` ## Code examples From e507686cefc89b0ea9a67774cab00c9a21c333aa Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Sun, 19 May 2024 11:13:39 -0700 Subject: [PATCH 5/7] oops, fix openai.py --- src/pipecat/services/openai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index ea59b7a54..b27b8d975 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -82,7 +82,7 @@ async def _stream_chat_completions( del message["data"] del message["mime_type"] - # messages_for_log = json.dumps(messages) + messages_for_log = json.dumps(messages) logger.debug(f"Generating chat: {messages_for_log}") start_time = time.time() From 66377954cbc2f5419dc7304d6a4bc14a6586b4f8 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Sun, 19 May 2024 12:33:57 -0700 Subject: [PATCH 6/7] fix up openai vision and gemini implementation --- .../12a-describe-video-gemini-flash.py | 19 +++++++---- .../foundational/12b-describe-video-gpt-4o.py | 22 ++++++++----- .../aggregators/openai_llm_context.py | 18 +++++++++- src/pipecat/services/google.py | 33 +++++++++++++++---- src/pipecat/services/openai.py | 11 ++----- 5 files changed, 73 insertions(+), 30 deletions(-) diff --git a/examples/foundational/12a-describe-video-gemini-flash.py b/examples/foundational/12a-describe-video-gemini-flash.py index 33240dd13..0b5a7893a 100644 --- a/examples/foundational/12a-describe-video-gemini-flash.py +++ b/examples/foundational/12a-describe-video-gemini-flash.py @@ -19,7 +19,7 @@ from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.google import GoogleLLMService from pipecat.transports.services.daily import DailyParams, DailyTransport -from pipecat.vad.silero import SileroVAD +from pipecat.vad.silero import SileroVADAnalyzer from runner import configure @@ -56,12 +56,12 @@ async def main(room_url: str, token): DailyParams( audio_in_enabled=True, # This is so Silero VAD can get audio data audio_out_enabled=True, - transcription_enabled=True + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() ) ) - vad = SileroVAD() - tts = ElevenLabsTTSService( aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), @@ -89,8 +89,15 @@ async def on_first_participant_joined(transport, participant): transport.capture_participant_transcription(participant["id"]) image_requester.set_participant_id(participant["id"]) - pipeline = Pipeline([transport.input(), vad, user_response, image_requester, - vision_aggregator, google, tts, transport.output()]) + pipeline = Pipeline([ + transport.input(), + user_response, + image_requester, + vision_aggregator, + google, + tts, + transport.output() + ]) task = PipelineTask(pipeline) diff --git a/examples/foundational/12b-describe-video-gpt-4o.py b/examples/foundational/12b-describe-video-gpt-4o.py index dd386c8b4..2d1e82959 100644 --- a/examples/foundational/12b-describe-video-gpt-4o.py +++ b/examples/foundational/12b-describe-video-gpt-4o.py @@ -19,7 +19,7 @@ from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport -from pipecat.vad.silero import SileroVAD +from pipecat.vad.silero import SileroVADAnalyzer from runner import configure @@ -54,14 +54,13 @@ async def main(room_url: str, token): token, "Describe participant video", DailyParams( - audio_in_enabled=True, # This is so Silero VAD can get audio data audio_out_enabled=True, - transcription_enabled=True + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() ) ) - vad = SileroVAD() - tts = ElevenLabsTTSService( aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), @@ -74,7 +73,7 @@ async def main(room_url: str, token): vision_aggregator = VisionImageFrameAggregator() - google = OpenAILLMService( + openai = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o" ) @@ -92,8 +91,15 @@ async def on_first_participant_joined(transport, participant): transport.capture_participant_transcription(participant["id"]) image_requester.set_participant_id(participant["id"]) - pipeline = Pipeline([transport.input(), vad, user_response, image_requester, - vision_aggregator, google, tts, transport.output()]) + pipeline = Pipeline([ + transport.input(), + user_response, + image_requester, + vision_aggregator, + openai, + tts, + transport.output() + ]) task = PipelineTask(pipeline) diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index e44c22e3a..94b60baac 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -6,6 +6,7 @@ from dataclasses import dataclass import io +import json from typing import List @@ -21,6 +22,17 @@ ChatCompletionMessageParam ) +# JSON custom encoder to handle bytes arrays so that we can log contexts +# with images to the console. + + +class CustomEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, io.BytesIO): + # Convert the first 8 bytes to an ASCII hex string + return (f"{obj.getbuffer()[0:8].hex()}...") + return super().default(obj) + class OpenAILLMContext: @@ -66,7 +78,7 @@ def from_image_frame(frame: VisionImageRawFrame) -> "OpenAILLMContext": context.add_message({ "content": frame.text, "role": "user", - "data": buffer.getvalue(), + "data": buffer, "mime_type": "image/jpeg" }) return context @@ -77,6 +89,10 @@ def add_message(self, message: ChatCompletionMessageParam): def get_messages(self) -> List[ChatCompletionMessageParam]: return self.messages + def get_messages_json(self) -> str: + return json.dumps(self.messages, cls=CustomEncoder) + # return json.dumps(self.messages) + def set_tool_choice( self, tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven ): diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index 74059afbc..f21de30c2 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -1,7 +1,8 @@ -import google.generativeai as gai -import google.ai.generativelanguage as glm + +import json import os import asyncio +import time from typing import List @@ -10,14 +11,26 @@ TextFrame, VisionImageRawFrame, LLMMessagesFrame, + LLMFullResponseStartFrame, LLMResponseStartFrame, - LLMResponseEndFrame) + LLMResponseEndFrame, + LLMFullResponseEndFrame +) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame from loguru import logger +try: + import google.generativeai as gai + import google.ai.generativelanguage as glm +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Google AI, you need to `pip install pipecat-ai[google]`. Also, set `GOOGLE_API_KEY` environment variable.") + raise Exception(f"Missing module: {e}") + class GoogleLLMService(LLMService): """This class implements inference with Google's AI models @@ -54,7 +67,7 @@ def _get_messages_from_openai_context( parts.append( glm.Part(inline_data=glm.Blob( mime_type=message["mime_type"], - data=message["data"] + data=message["data"].getvalue() ))) google_messages.append({"role": role, "parts": parts}) @@ -66,19 +79,25 @@ async def _async_generator_wrapper(self, sync_generator): await asyncio.sleep(0) async def _process_context(self, context: OpenAILLMContext): + await self.push_frame(LLMFullResponseStartFrame()) try: + logger.debug(f"Generating chat: {context.get_messages_json()}") + messages = self._get_messages_from_openai_context(context) - await self.push_frame(LLMResponseStartFrame()) + start_time = time.time() response = self._client.generate_content(messages, stream=True) + logger.debug(f"Google LLM TTFB: {time.time() - start_time}") async for chunk in self._async_generator_wrapper(response): - logger.debug(f"Pushing inference text: {chunk.text}") + await self.push_frame(LLMResponseStartFrame()) await self.push_frame(TextFrame(chunk.text)) + await self.push_frame(LLMResponseEndFrame()) - await self.push_frame(LLMResponseEndFrame()) except Exception as e: logger.error(f"Exception: {e}") + finally: + await self.push_frame(LLMFullResponseEndFrame()) async def process_frame(self, frame: Frame, direction: FrameDirection): context = None diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index b27b8d975..86f2ec158 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -68,12 +68,14 @@ def create_client(self, api_key=None, base_url=None): async def _stream_chat_completions( self, context: OpenAILLMContext ) -> AsyncStream[ChatCompletionChunk]: + logger.debug(f"Generating chat: {context.get_messages_json()}") + messages: List[ChatCompletionMessageParam] = context.get_messages() # base64 encode any images for message in messages: if message.get("mime_type") == "image/jpeg": - encoded_image = base64.b64encode(message["data"]).decode("utf-8") + encoded_image = base64.b64encode(message["data"].getvalue()).decode("utf-8") text = message["content"] message["content"] = [ {"type": "text", "text": text}, @@ -82,9 +84,6 @@ async def _stream_chat_completions( del message["data"] del message["mime_type"] - messages_for_log = json.dumps(messages) - logger.debug(f"Generating chat: {messages_for_log}") - start_time = time.time() chunks: AsyncStream[ChatCompletionChunk] = ( await self._client.chat.completions.create( @@ -101,10 +100,6 @@ async def _stream_chat_completions( return chunks async def _chat_completions(self, messages) -> str | None: - messages_for_log = json.dumps(messages) - - logger.debug(f"Generating chat: {messages_for_log}") - response: ChatCompletion = await self._client.chat.completions.create( model=self._model, stream=False, messages=messages ) From 7ffb10d7f599b502af9536259bafb6e090fadb17 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Sun, 19 May 2024 12:44:45 -0700 Subject: [PATCH 7/7] add to CHANGELOG.md --- CHANGELOG.md | 10 ++++++++++ .../processors/aggregators/openai_llm_context.py | 1 - 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10b964a30..9fcdd4fea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `google.generativeai` model support, including vision. This new `google` service defaults to using + `gemini-1.5-flash-latest`. Example in `examples/foundational/12a-describe-video-gemini-flash.py`. + +- Added vision support to `openai` service. Example in + `examples/foundational/12a-describe-video-gemini-flash.py`. + +## [Unreleased] + +### Added + - Added initial interruptions support. The assistant contexts (or aggregators) should now be placed after the output transport. This way, only the completed spoken context is added to the assistant context. diff --git a/src/pipecat/processors/aggregators/openai_llm_context.py b/src/pipecat/processors/aggregators/openai_llm_context.py index 94b60baac..65c8da6ad 100644 --- a/src/pipecat/processors/aggregators/openai_llm_context.py +++ b/src/pipecat/processors/aggregators/openai_llm_context.py @@ -91,7 +91,6 @@ def get_messages(self) -> List[ChatCompletionMessageParam]: def get_messages_json(self) -> str: return json.dumps(self.messages, cls=CustomEncoder) - # return json.dumps(self.messages) def set_tool_choice( self, tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven