Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anthropic client fixes #2981

Merged
merged 12 commits into from
Jun 21, 2024
259 changes: 163 additions & 96 deletions autogen/oai/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@
import inspect
import json
import os
import time
import warnings
from typing import Any, Dict, List, Tuple, Union

from anthropic import Anthropic
from anthropic import __version__ as anthropic_version
from anthropic.types import Completion, Message
from client_utils import validate_parameter
from anthropic.types import Completion, Message, TextBlock, ToolUseBlock
from openai.types.chat import ChatCompletion, ChatCompletionMessageToolCall
from openai.types.chat.chat_completion import ChatCompletionMessage, Choice
from openai.types.completion_usage import CompletionUsage
from typing_extensions import Annotated

from autogen.oai.client_utils import validate_parameter

TOOL_ENABLED = anthropic_version >= "0.23.1"
if TOOL_ENABLED:
from anthropic.types.tool_use_block_param import (
Expand All @@ -43,6 +46,7 @@


ANTHROPIC_PRICING_1k = {
"claude-3-5-sonnet-20240620": (0.003, 0.015),
"claude-3-sonnet-20240229": (0.003, 0.015),
"claude-3-opus-20240229": (0.015, 0.075),
"claude-2.0": (0.008, 0.024),
Expand Down Expand Up @@ -104,49 +108,14 @@ def api_key(self):
return self._api_key

def create(self, params: Dict[str, Any]) -> Completion:
"""Create a completion for a given config.

Args:
params: The params for the completion.

Returns:
The completion.
"""
if "tools" in params:
converted_functions = self.convert_tools_to_functions(params["tools"])
params["functions"] = params.get("functions", []) + converted_functions

raw_contents = params["messages"]
# Convert AutoGen messages to Together.AI messages
anthropic_messages = oai_messages_to_together_messages(params)
Hk669 marked this conversation as resolved.
Show resolved Hide resolved
anthropic_params = self.load_config(params)

processed_messages = []
for message in raw_contents:

if message["role"] == "system":
params["system"] = message["content"]
elif message["role"] == "function":
processed_messages.append(self.return_function_call_result(message["content"]))
elif "function_call" in message:
processed_messages.append(self.restore_last_tooluse_status())
elif message["content"] == "":
message["content"] = "I'm done. Please send TERMINATE" # Not sure about this one.
processed_messages.append(message)
else:
processed_messages.append(message)

# Check for interleaving roles and correct, for Anthropic must be: user, assistant, user, etc.
for i, message in enumerate(processed_messages):
if message["role"] is not ("user" if i % 2 == 0 else "assistant"):
message["role"] = "user" if i % 2 == 0 else "assistant"

# Note: When using reflection_with_llm we may end up with an "assistant" message as the last message
if processed_messages[-1]["role"] != "user":
# If the last role is not user, add a continue message at the end
continue_message = {"content": "continue", "role": "user"}
processed_messages.append(continue_message)

params["messages"] = processed_messages

# TODO: support stream
params = params.copy()
if "functions" in params:
Expand All @@ -157,7 +126,7 @@ def create(self, params: Dict[str, Any]) -> Completion:
# Anthropic doesn't accept None values, so we need to use keyword argument unpacking instead of setting parameters.
# Copy params we need into anthropic_params
# Remove any that don't have values
anthropic_params["messages"] = params["messages"]
anthropic_params["messages"] = anthropic_messages
if "system" in params:
anthropic_params["system"] = params["system"]
if "tools" in params:
Expand All @@ -174,61 +143,65 @@ def create(self, params: Dict[str, Any]) -> Completion:
# Calculate and save the cost onto the response
prompt_tokens = response.usage.input_tokens
completion_tokens = response.usage.output_tokens
response.cost = _calculate_cost(prompt_tokens, completion_tokens, anthropic_params["model"])

return response

def message_retrieval(self, response: Union[Message]) -> Union[List[str], List[ChatCompletionMessage]]:
"""Retrieve the messages from the response."""
messages = response.content
if len(messages) == 0:
return [None]
res = []
if TOOL_ENABLED:
for choice in messages:
if choice.type == "tool_use":
res.insert(0, self.response_to_openai_message(choice))
self._last_tooluse_status["tool_use"] = choice.model_dump()
else:
res.append(choice.text)
self._last_tooluse_status["think"] = choice.text

return res

else:
return [ # type: ignore [return-value]
choice.text if choice.message.function_call is not None else choice.message.content # type: ignore [union-attr]
for choice in messages
]

def response_to_openai_message(self, response) -> ChatCompletionMessage:
"""Convert the client response to OpenAI ChatCompletion Message"""
dict_response = response.model_dump()
return ChatCompletionMessage(
content=None,
message_text = ""
if response is not None:
# If we have tool use as the response, populate completed tool calls for our return OAI response
if response.stop_reason == "tool_use":
anthropic_finish = "tool_calls"
tool_calls = []
for content in response.content:
if type(content) == ToolUseBlock:
tool_calls.append(
ChatCompletionMessageToolCall(
id=content.id,
function={"name": content.name, "arguments": json.dumps(content.input)},
type="function",
)
)
else:
anthropic_finish = "stop"
tool_calls = None

# Retrieve any text content from the response
for content in response.content:
if type(content) == TextBlock:
message_text = content.text
break

# Convert output back to AutoGen response format
message = ChatCompletionMessage(
role="assistant",
function_call={"name": dict_response["name"], "arguments": json.dumps(dict_response["input"])},
content=message_text,
function_call=None,
tool_calls=tool_calls,
)
choices = [Choice(finish_reason=anthropic_finish, index=0, message=message)]

response_oai = ChatCompletion(
id=response.id,
model=anthropic_params["model"],
created=int(time.time() * 1000),
object="chat.completion",
choices=choices,
usage=CompletionUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
),
cost=_calculate_cost(prompt_tokens, completion_tokens, anthropic_params["model"]),
)

def restore_last_tooluse_status(self) -> Dict:
cached_content = []
if "think" in self._last_tooluse_status:
cached_content.append({"type": "text", "text": self._last_tooluse_status["think"]})
cached_content.append(self._last_tooluse_status["tool_use"])
res = {"role": "assistant", "content": cached_content}
return res
return response_oai

def return_function_call_result(self, result: str) -> Dict:
return {
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": self._last_tooluse_status["tool_use"]["id"],
"content": result,
}
],
}
def message_retrieval(self, response) -> List:
"""
Retrieve and return a list of strings or a list of Choice.Message from the response.

NOTE: if a list of Choice.Message is returned, it currently needs to contain the fields of OpenAI's ChatCompletion Message object,
since that is expected for function or tool calling in the rest of the codebase at the moment, unless a custom agent is being used.
"""
return [choice.message for choice in response.choices]

@staticmethod
def openai_func_to_anthropic(openai_func: dict) -> dict:
Expand All @@ -237,14 +210,12 @@ def openai_func_to_anthropic(openai_func: dict) -> dict:
return res

@staticmethod
def get_usage(response: Message) -> Dict:
def get_usage(response: ChatCompletion) -> Dict:
"""Get the usage of tokens and their cost information."""
return {
"prompt_tokens": response.usage.input_tokens if response.usage is not None else 0,
"completion_tokens": response.usage.output_tokens if response.usage is not None else 0,
"total_tokens": (
response.usage.input_tokens + response.usage.output_tokens if response.usage is not None else 0
),
"prompt_tokens": response.usage.prompt_tokens if response.usage is not None else 0,
"completion_tokens": response.usage.completion_tokens if response.usage is not None else 0,
"total_tokens": response.usage.total_tokens if response.usage is not None else 0,
"cost": response.cost if hasattr(response, "cost") else 0.0,
"model": response.model,
}
Expand All @@ -259,6 +230,102 @@ def convert_tools_to_functions(tools: List) -> List:
return functions


def oai_messages_to_together_messages(params: Dict[str, Any]) -> list[dict[str, Any]]:
Hk669 marked this conversation as resolved.
Show resolved Hide resolved
"""Convert messages from OAI format to Anthropic format.
We correct for any specific role orders and types, etc.
"""

# Track whether we have tools passed in. If not, tool use / result messages should be converted to text messages.
# Anthropic requires a tools parameter with the tools listed, if there are other messages with tool use or tool results.
# This can occur when we don't need tool calling, such as for group chat speaker selection.
has_tools = "tools" in params

# Convert messages to Anthropic compliant format
processed_messages = []
tool_use_messages = 0
tool_result_messages = 0
last_tool_use_index = -1
for message in params["messages"]:
if message["role"] == "system":
params["system"] = message["content"]
elif "tool_calls" in message:
# Map the tool call options to Anthropic's ToolUseBlock
tool_uses = []
tool_names = []
for tool_call in message["tool_calls"]:
tool_uses.append(
ToolUseBlock(
type="tool_use",
id=tool_call["id"],
name=tool_call["function"]["name"],
input=json.loads(tool_call["function"]["arguments"]),
)
)
tool_names.append(tool_call["function"]["name"])

if has_tools:
processed_messages.append({"role": "assistant", "content": tool_uses})
tool_use_messages += 1
last_tool_use_index = len(processed_messages) - 1
else:
# Not using tools, so put in a plain text message
processed_messages.append(
{
"role": "assistant",
"content": f"Some internal function(s) that could be used: [{', '.join(tool_names)}]",
}
)
elif "tool_call_id" in message:
if has_tools:
# Map the tool usage call to tool_result for Anthropic
processed_messages.append(
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": message["tool_call_id"],
"content": message["content"],
}
],
}
)
tool_result_messages += 1
else:
# Not using tools, so put in a plain text message
processed_messages.append(
{"role": "user", "content": f"Running the function returned: {message['content']}"}
)
elif message["content"] == "":
message["content"] = (
"I'm done. Please send TERMINATE" # TODO: Determine why we would be getting a blank response. Typically this is because 'assistant' is the last message role.
)
processed_messages.append(message)
else:
processed_messages.append(message)

# We'll drop the last tool_use if there's no tool_result (occurs if we finish the conversation before running the function)
if tool_use_messages != tool_result_messages:
# Too many tool_use messages, drop the last one as we haven't run it.
processed_messages.pop(last_tool_use_index)

# Check for interleaving roles and correct, for Anthropic must be: user, assistant, user, etc.
for i, message in enumerate(processed_messages):
if message["role"] is not ("user" if i % 2 == 0 else "assistant"):
message["role"] = "user" if i % 2 == 0 else "assistant"

# Also remove name key from message as it is not supported
message.pop("name", None)

# Note: When using reflection_with_llm we may end up with an "assistant" message as the last message and that may cause a blank response
if processed_messages[-1]["role"] != "user":
# If the last role is not user, add a continue message at the end
continue_message = {"content": "continue", "role": "user"}
processed_messages.append(continue_message)

return processed_messages


def _calculate_cost(input_tokens: int, output_tokens: int, model: str) -> float:
"""Calculate the cost of the completion using the Anthropic pricing."""
total = 0.0
Expand Down
Loading