Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gen-AI python implementation #290

Merged
merged 17 commits into from
Nov 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
# TODO: Use Semantic Conventions once upgrade to 0.47b0
GEN_AI_REQUEST_MODEL: str = "gen_ai.request.model"
GEN_AI_SYSTEM: str = "gen_ai.system"

GEN_AI_REQUEST_MAX_TOKENS: str = "gen_ai.request.max_tokens"
GEN_AI_REQUEST_TEMPERATURE: str = "gen_ai.request.temperature"
GEN_AI_REQUEST_TOP_P: str = "gen_ai.request.top_p"
GEN_AI_RESPONSE_FINISH_REASONS: str = "gen_ai.response.finish_reasons"
GEN_AI_USAGE_INPUT_TOKENS: str = "gen_ai.usage.input_tokens"
GEN_AI_USAGE_OUTPUT_TOKENS: str = "gen_ai.usage.output_tokens"

# Get dialect keywords retrieved from dialect_keywords.json file.
# Only meant to be invoked by SQL_KEYWORD_PATTERN and unit tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
# SPDX-License-Identifier: Apache-2.0
import abc
import inspect
from typing import Dict, Optional
from typing import Any, Dict, Optional
import json
from botocore.response import StreamingBody
import math

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_BEDROCK_AGENT_ID,
Expand All @@ -11,7 +14,7 @@
AWS_BEDROCK_GUARDRAIL_ID,
AWS_BEDROCK_KNOWLEDGE_BASE_ID,
)
from amazon.opentelemetry.distro._aws_span_processing_util import GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM
from amazon.opentelemetry.distro._aws_span_processing_util import GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM, GEN_AI_REQUEST_MAX_TOKENS, GEN_AI_REQUEST_TEMPERATURE, GEN_AI_REQUEST_TOP_P, GEN_AI_RESPONSE_FINISH_REASONS, GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkCallContext,
Expand Down Expand Up @@ -238,5 +241,164 @@ def extract_attributes(self, attributes: _AttributeMapT):
attributes[GEN_AI_SYSTEM] = _AWS_BEDROCK_SYSTEM

model_id = self._call_context.params.get(_MODEL_ID)
#attributes["Testing"]= "Test"
if model_id:
attributes[GEN_AI_REQUEST_MODEL] = model_id
attributes[GEN_AI_REQUEST_MODEL] = model_id

# Get the request body if it exists
body = self._call_context.params.get('body')
#print("This is the body :",body)
if body:
try:
request_body = json.loads(body)

if 'amazon.titan' in model_id:
self._extract_titan_attributes(attributes, request_body)
elif 'anthropic.claude' in model_id:
self._extract_claude_attributes(attributes, request_body)
elif 'meta.llama' in model_id:
self._extract_llama_attributes(attributes, request_body)
elif 'cohere.command' in model_id:
self._extract_cohere_attributes(attributes, request_body)
elif 'ai21.jamba' in model_id:
self._extract_ai21_attributes(attributes, request_body)
elif 'mistral' in model_id:
self._extract_mistral_attributes(attributes, request_body)

except json.JSONDecodeError:
print("Error: Unable to parse the body as JSON")
Jeel-mehta marked this conversation as resolved.
Show resolved Hide resolved
def _extract_titan_attributes(self, attributes, request_body):
config = request_body.get('textGenerationConfig', {})
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, config.get('temperature'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, config.get('topP'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get('maxTokenCount'))

def _extract_claude_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get('max_tokens'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get('temperature'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get('top_p'))

def _extract_cohere_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get('max_tokens'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get('temperature'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get('p'))

def _extract_ai21_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get('max_tokens'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get('temperature'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get('top_p'))

def _extract_llama_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get('max_gen_len'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get('temperature'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get('top_p'))

def _extract_mistral_attributes(self, attributes, request_body):
prompt = request_body.get('prompt')
if prompt:
attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6)
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get('max_tokens'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get('temperature'))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get('top_p'))

@staticmethod
def _set_if_not_none(attributes, key, value):
if value is not None:
attributes[key] = value

def on_success(self, span: Span, result: Dict[str, Any]):
super().on_success(span, result)
Jeel-mehta marked this conversation as resolved.
Show resolved Hide resolved

model_id = self._call_context.params.get(_MODEL_ID)
if not model_id:
return

if 'body' in result and isinstance(result['body'], StreamingBody):
try:
# Read the entire content of the StreamingBody
body_content = result['body'].read()
# Decode the bytes to string and parse as JSON
response_body = json.loads(body_content.decode('utf-8'))

if 'amazon.titan' in model_id:
self._handle_amazon_titan_response(span, response_body)
elif 'anthropic.claude' in model_id:
self._handle_anthropic_claude_response(span, response_body)
elif 'meta.llama' in model_id:
self._handle_meta_llama_response(span, response_body)
elif 'cohere.command' in model_id:
self._handle_cohere_command_response(span, response_body)
elif 'ai21.jamba' in model_id:
self._handle_ai21_jamba_response(span, response_body)
elif 'mistral' in model_id:
self._handle_mistral_mistral_response(span, response_body)

except json.JSONDecodeError:
print("Error: Unable to parse the response body as JSON")
Jeel-mehta marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
print(f"Error processing response: {str(e)}")
finally:
# Make sure to close the stream
result['body'].close()

def _handle_amazon_titan_response(self, span: Span, response_body: Dict[str, Any]):
if 'inputTextTokenCount' in response_body:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body['inputTextTokenCount'])

result = response_body['results'][0]
if 'tokenCount' in result:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, result['tokenCount'])
if 'completionReason' in result:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [result['completionReason']])

def _handle_anthropic_claude_response(self, span: Span, response_body: Dict[str, Any]):
if 'usage' in response_body:
usage = response_body['usage']
if 'input_tokens' in usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage['input_tokens'])
if 'output_tokens' in usage:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage['output_tokens'])
if 'stop_reason' in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body['stop_reason']])

def _handle_cohere_command_response(self, span: Span, response_body: Dict[str, Any]):
# Input tokens: Approximate from the user's message in chat history
if 'chat_history' in response_body:
user_messages = [msg['message'] for msg in response_body['chat_history'] if msg['role'] == 'USER']
input_text = ' '.join(user_messages)
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, math.ceil(len(input_text) / 6))
# Output tokens: Approximate from the response text
if 'text' in response_body:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(response_body['text']) / 6))
if 'finish_reason' in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body['finish_reason']])

def _handle_ai21_jamba_response(self, span: Span, response_body: Dict[str, Any]):
print("This is the response body :", response_body)
if 'usage' in response_body:
usage = response_body['usage']
if 'prompt_tokens' in usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage['prompt_tokens'])
if 'completion_tokens' in usage:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage['completion_tokens'])
if 'choices' in response_body:
choices = response_body['choices'][0]
if 'finish_reason' in choices:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [choices['finish_reason']])

def _handle_meta_llama_response(self, span: Span, response_body: Dict[str, Any]):
#print("This is the response body :", response_body)
if 'prompt_token_count' in response_body:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body['prompt_token_count'])
if 'generation_token_count' in response_body:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, response_body['generation_token_count'])
if 'stop_reason' in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, response_body['stop_reason'])
Jeel-mehta marked this conversation as resolved.
Show resolved Hide resolved

def _handle_mistral_mistral_response(self, span: Span, response_body: Dict[str, Any]):
if "outputs" in response_body:
outputs = response_body["outputs"][0]
if "text" in outputs:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(outputs["text"]) / 6))
if 'stop_reason' in outputs:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [outputs['stop_reason']])
Loading