Skip to content

Commit

Permalink
add new response frame types and vision support for anthropic
Browse files Browse the repository at this point in the history
  • Loading branch information
kwindla committed May 23, 2024
1 parent fd13d3c commit 681250f
Show file tree
Hide file tree
Showing 3 changed files with 306 additions and 4 deletions.
95 changes: 95 additions & 0 deletions examples/foundational/07a-interruptible-anthropic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import aiohttp
import os
import sys

from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

from runner import configure

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)

tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-opus-20240229")

# todo: think more about how to handle system prompts in a more general way. OpenAI,
# Google, and Anthropic all have slightly different approaches to providing a system
# prompt.
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way. Say hello.",
},
]

tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)

pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])

task = PipelineTask(pipeline, allow_interruptions=True)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])

runner = PipelineRunner()

await runner.run(task)


if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))
112 changes: 112 additions & 0 deletions examples/foundational/12c-describe-video-anthropic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import aiohttp
import os
import sys

from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

from runner import configure

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


class UserImageRequester(FrameProcessor):

def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id

def set_participant_id(self, participant_id: str):
self._participant_id = participant_id

async def process_frame(self, frame: Frame, direction: FrameDirection):
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(frame, direction)


async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Describe participant video",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)

tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

user_response = UserResponseAggregator()

image_requester = UserImageRequester()

vision_aggregator = VisionImageFrameAggregator()

anthropic = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-sonnet-20240229"
)

tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await tts.say("Hi there! Feel free to ask me what I see.")
transport.capture_participant_video(participant["id"], framerate=0)
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])

pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
anthropic,
tts,
transport.output()
])

task = PipelineTask(pipeline)

runner = PipelineRunner()

await runner.run(task)

if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))
103 changes: 99 additions & 4 deletions src/pipecat/services/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,24 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

from pipecat.frames.frames import Frame, LLMMessagesFrame, TextFrame
import os
import asyncio
import time
import base64

from pipecat.frames.frames import (
Frame,
TextFrame,
VisionImageRawFrame,
LLMMessagesFrame,
LLMFullResponseStartFrame,
LLMResponseStartFrame,
LLMResponseEndFrame,
LLMFullResponseEndFrame
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame

from loguru import logger

Expand All @@ -20,18 +35,98 @@


class AnthropicLLMService(LLMService):
"""This class implements inference with Anthropic's AI models
This service translates internally from OpenAILLMContext to the messages format
expected by the Anthropic Python SDK. We are using the OpenAILLMContext as a lingua
franca for all LLM services, so that it is easy to switch between different LLMs.
"""

def __init__(
self,
api_key,
model="claude-3-opus-20240229",
max_tokens=1024):
super().__init__()
self.client = AsyncAnthropic(api_key=api_key)
self.model = model
self.max_tokens = max_tokens
self._client = AsyncAnthropic(api_key=api_key)
self._model = model
self._max_tokens = max_tokens

def _get_messages_from_openai_context(
self, context: OpenAILLMContext):
openai_messages = context.get_messages()
anthropic_messages = []

for message in openai_messages:
role = message["role"]
text = message["content"]
if role == "system":
role = "user"
if message.get("mime_type") == "image/jpeg":
# vision frame
encoded_image = base64.b64encode(message["data"].getvalue()).decode("utf-8")
anthropic_messages.append({
"role": role,
"content": [{
"type": "image",
"source": {
"type": "base64",
"media_type": message.get("mime_type"),
"data": encoded_image,
}
}, {
"type": "text",
"text": text
}]
})
else:
# text frame
anthropic_messages.append({"role": role, "content": content})

return anthropic_messages

async def _process_context(self, context: OpenAILLMContext):
await self.push_frame(LLMFullResponseStartFrame())
try:
logger.debug(f"Generating chat: {context.get_messages_json()}")

messages = self._get_messages_from_openai_context(context)

start_time = time.time()
response = await self._client.messages.create(
messages=messages,
model=self._model,
max_tokens=self._max_tokens,
stream=True)
logger.debug(f"Anthropic LLM TTFB: {time.time() - start_time}")
async for event in response:
# logger.debug(f"Anthropic LLM event: {event}")
if (event.type == "content_block_delta"):
await self.push_frame(LLMResponseStartFrame())
await self.push_frame(TextFrame(event.delta.text))
await self.push_frame(LLMResponseEndFrame())

except Exception as e:
logger.error(f"Exception: {e}")
finally:
await self.push_frame(LLMFullResponseEndFrame())

async def process_frame(self, frame: Frame, direction: FrameDirection):
context = None

if isinstance(frame, OpenAILLMContextFrame):
context: OpenAILLMContext = frame.context
elif isinstance(frame, LLMMessagesFrame):
context = OpenAILLMContext.from_messages(frame.messages)
elif isinstance(frame, VisionImageRawFrame):
context = OpenAILLMContext.from_image_frame(frame)
else:
await self.push_frame(frame, direction)

if context:
await self._process_context(context)

async def x_process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, LLMMessagesFrame):
stream = await self.client.messages.create(
max_tokens=self.max_tokens,
Expand Down

0 comments on commit 681250f

Please sign in to comment.