Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gen-AI python implementation #290

Merged
merged 17 commits into from
Nov 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
# TODO: Use Semantic Conventions once upgrade to 0.47b0
GEN_AI_REQUEST_MODEL: str = "gen_ai.request.model"
GEN_AI_SYSTEM: str = "gen_ai.system"
GEN_AI_REQUEST_MAX_TOKENS: str = "gen_ai.request.max_tokens"
GEN_AI_REQUEST_TEMPERATURE: str = "gen_ai.request.temperature"
GEN_AI_REQUEST_TOP_P: str = "gen_ai.request.top_p"
GEN_AI_RESPONSE_FINISH_REASONS: str = "gen_ai.response.finish_reasons"
GEN_AI_USAGE_INPUT_TOKENS: str = "gen_ai.usage.input_tokens"
GEN_AI_USAGE_OUTPUT_TOKENS: str = "gen_ai.usage.output_tokens"


# Get dialect keywords retrieved from dialect_keywords.json file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
# SPDX-License-Identifier: Apache-2.0
import abc
import inspect
from typing import Dict, Optional
import json
import math
from typing import Any, Dict, Optional

from botocore.response import StreamingBody

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_BEDROCK_AGENT_ID,
Expand All @@ -11,7 +15,16 @@
AWS_BEDROCK_GUARDRAIL_ID,
AWS_BEDROCK_KNOWLEDGE_BASE_ID,
)
from amazon.opentelemetry.distro._aws_span_processing_util import GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM
from amazon.opentelemetry.distro._aws_span_processing_util import (
GEN_AI_REQUEST_MAX_TOKENS,
GEN_AI_REQUEST_MODEL,
GEN_AI_REQUEST_TEMPERATURE,
GEN_AI_REQUEST_TOP_P,
GEN_AI_RESPONSE_FINISH_REASONS,
GEN_AI_SYSTEM,
GEN_AI_USAGE_INPUT_TOKENS,
GEN_AI_USAGE_OUTPUT_TOKENS,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkCallContext,
Expand Down Expand Up @@ -240,3 +253,161 @@ def extract_attributes(self, attributes: _AttributeMapT):
model_id = self._call_context.params.get(_MODEL_ID)
if model_id:
attributes[GEN_AI_REQUEST_MODEL] = model_id

# Get the request body if it exists
body = self._call_context.params.get("body")
if body:
try:
request_body = json.loads(body)

if "amazon.titan" in model_id:
self._extract_titan_attributes(attributes, request_body)
elif "anthropic.claude" in model_id:
self._extract_claude_attributes(attributes, request_body)
elif "meta.llama" in model_id:
self._extract_llama_attributes(attributes, request_body)
elif "cohere.command" in model_id:
self._extract_cohere_attributes(attributes, request_body)
elif "ai21.jamba" in model_id:
self._extract_ai21_attributes(attributes, request_body)
elif "mistral" in model_id:
self._extract_mistral_attributes(attributes, request_body)

except json.JSONDecodeError:
print("Error: Unable to parse the body as JSON")
Jeel-mehta marked this conversation as resolved.
Show resolved Hide resolved

def _extract_titan_attributes(self, attributes, request_body):
config = request_body.get("textGenerationConfig", {})
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, config.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, config.get("topP"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount"))

def _extract_claude_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

def _extract_cohere_attributes(self, attributes, request_body):
prompt = request_body.get("message")
if prompt:
attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6)
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("p"))

def _extract_ai21_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

def _extract_llama_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_gen_len"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

def _extract_mistral_attributes(self, attributes, request_body):
prompt = request_body.get("prompt")
if prompt:
attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6)
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

@staticmethod
def _set_if_not_none(attributes, key, value):
if value is not None:
attributes[key] = value

def on_success(self, span: Span, result: Dict[str, Any]):
model_id = self._call_context.params.get(_MODEL_ID)
if not model_id:
return

if "body" in result and isinstance(result["body"], StreamingBody):
try:
# Read the entire content of the StreamingBody
body_content = result["body"].read()
# Decode the bytes to string and parse as JSON
response_body = json.loads(body_content.decode("utf-8"))
Jeel-mehta marked this conversation as resolved.
Show resolved Hide resolved

if "amazon.titan" in model_id:
self._handle_amazon_titan_response(span, response_body)
elif "anthropic.claude" in model_id:
self._handle_anthropic_claude_response(span, response_body)
elif "meta.llama" in model_id:
self._handle_meta_llama_response(span, response_body)
elif "cohere.command" in model_id:
self._handle_cohere_command_response(span, response_body)
elif "ai21.jamba" in model_id:
self._handle_ai21_jamba_response(span, response_body)
elif "mistral" in model_id:
self._handle_mistral_mistral_response(span, response_body)

except json.JSONDecodeError:
print("Error: Unable to parse the response body as JSON")
Jeel-mehta marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e: # pylint: disable=broad-exception-caught, invalid-name
print(f"Error processing response: {str(e)}")
finally:
# Make sure to close the stream
result["body"].close()

# pylint: disable=no-self-use
def _handle_amazon_titan_response(self, span: Span, response_body: Dict[str, Any]):
if "inputTextTokenCount" in response_body:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"])

result = response_body["results"][0]
Jeel-mehta marked this conversation as resolved.
Show resolved Hide resolved
if "tokenCount" in result:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"])
if "completionReason" in result:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [result["completionReason"]])

# pylint: disable=no-self-use
def _handle_anthropic_claude_response(self, span: Span, response_body: Dict[str, Any]):
if "usage" in response_body:
usage = response_body["usage"]
if "input_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["input_tokens"])
if "output_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["output_tokens"])
if "stop_reason" in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]])

# pylint: disable=no-self-use
def _handle_cohere_command_response(self, span: Span, response_body: Dict[str, Any]):
# Output tokens: Approximate from the response text
if "text" in response_body:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(response_body["text"]) / 6))
if "finish_reason" in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["finish_reason"]])

# pylint: disable=no-self-use
def _handle_ai21_jamba_response(self, span: Span, response_body: Dict[str, Any]):
if "usage" in response_body:
usage = response_body["usage"]
if "prompt_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["prompt_tokens"])
if "completion_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["completion_tokens"])
if "choices" in response_body:
choices = response_body["choices"][0]
if "finish_reason" in choices:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [choices["finish_reason"]])

# pylint: disable=no-self-use
def _handle_meta_llama_response(self, span: Span, response_body: Dict[str, Any]):
if "prompt_token_count" in response_body:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["prompt_token_count"])
if "generation_token_count" in response_body:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, response_body["generation_token_count"])
if "stop_reason" in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]])

# pylint: disable=no-self-use
def _handle_mistral_mistral_response(self, span: Span, response_body: Dict[str, Any]):
if "outputs" in response_body:
outputs = response_body["outputs"][0]
if "text" in outputs:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(outputs["text"]) / 6))
if "stop_reason" in outputs:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [outputs["stop_reason"]])
Loading
Loading