Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

botocore: add basic tracing for Bedrock InvokeModelWithStreamResponse #3206

Merged
merged 3 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3200](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3200))
- `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock ConverseStream API
([#3204](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3204))
- `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock InvokeModelWithStreamResponse API
([#3206](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3206))
- `opentelemetry-instrumentation-pymssql` Add pymssql instrumentation
([#394](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/394))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Available examples
- `converse.py` uses `bedrock-runtime` `Converse API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html>_`.
- `converse_stream.py` uses `bedrock-runtime` `ConverseStream API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html>_`.
- `invoke_model.py` uses `bedrock-runtime` `InvokeModel API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html>_`.
- `invoke_model_stream.py` uses `bedrock-runtime` `InvokeModelWithResponseStrea API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModelWithResponseStream.html>_`.

Setup
-----
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
import os

import boto3


def main():
chat_model = os.getenv("CHAT_MODEL", "amazon.titan-text-lite-v1")
prompt = "Write a short poem on OpenTelemetry."
if "amazon.titan" in chat_model:
body = {
"inputText": prompt,
"textGenerationConfig": {},
}
elif "amazon.nova" in chat_model:
body = {
"messages": [{"role": "user", "content": [{"text": prompt}]}],
"schemaVersion": "messages-v1",
}
elif "anthropic.claude" in chat_model:
body = {
"messages": [
{"role": "user", "content": [{"text": prompt, "type": "text"}]}
],
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 200,
}
else:
raise ValueError()
client = boto3.client("bedrock-runtime")
response = client.invoke_model_with_response_stream(
modelId=chat_model,
body=json.dumps(body),
)

answer = ""
for event in response["body"]:
json_bytes = event.get("chunk", {}).get("bytes", b"")
decoded = json_bytes.decode("utf-8")
chunk = json.loads(decoded)
if "outputText" in chunk:
answer += chunk["outputText"]
elif "completion" in chunk:
answer += chunk["completion"]
elif "contentBlockDelta" in chunk:
answer += chunk["contentBlockDelta"]["delta"]["text"]
print(answer)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import (
ConverseStreamWrapper,
InvokeModelWithResponseStreamWrapper,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
Expand Down Expand Up @@ -66,8 +67,16 @@ class _BedrockRuntimeExtension(_AwsSdkExtension):
Amazon Bedrock Runtime</a>.
"""

_HANDLED_OPERATIONS = {"Converse", "ConverseStream", "InvokeModel"}
_DONT_CLOSE_SPAN_ON_END_OPERATIONS = {"ConverseStream"}
_HANDLED_OPERATIONS = {
"Converse",
"ConverseStream",
"InvokeModel",
"InvokeModelWithResponseStream",
}
_DONT_CLOSE_SPAN_ON_END_OPERATIONS = {
"ConverseStream",
"InvokeModelWithResponseStream",
}

def should_end_span_on_exit(self):
return (
Expand Down Expand Up @@ -288,6 +297,20 @@ def stream_done_callback(response):
# InvokeModel
if "body" in result and isinstance(result["body"], StreamingBody):
self._invoke_model_on_success(span, result, model_id)
return

# InvokeModelWithResponseStream
if "body" in result and isinstance(result["body"], EventStream):

def invoke_model_stream_done_callback(response):
# the callback gets data formatted as the simpler converse API
self._converse_on_success(span, response)
span.end()

result["body"] = InvokeModelWithResponseStreamWrapper(
result["body"], invoke_model_stream_done_callback, model_id
)
return

# pylint: disable=no-self-use
def _handle_amazon_titan_response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from __future__ import annotations

import json

from botocore.eventstream import EventStream
from wrapt import ObjectProxy

Expand Down Expand Up @@ -46,20 +48,21 @@ def __iter__(self):
def _process_event(self, event):
if "messageStart" in event:
# {'messageStart': {'role': 'assistant'}}
pass
return

if "contentBlockDelta" in event:
# {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}}
pass
return

if "contentBlockStop" in event:
# {'contentBlockStop': {'contentBlockIndex': 0}}
pass
return

if "messageStop" in event:
# {'messageStop': {'stopReason': 'end_turn'}}
if stop_reason := event["messageStop"].get("stopReason"):
self._response["stopReason"] = stop_reason
return

if "metadata" in event:
# {'metadata': {'usage': {'inputTokens': 12, 'outputTokens': 15, 'totalTokens': 27}, 'metrics': {'latencyMs': 2980}}}
Expand All @@ -72,3 +75,136 @@ def _process_event(self, event):
self._response["usage"]["outputTokens"] = output_tokens

self._stream_done_callback(self._response)
return


# pylint: disable=abstract-method
class InvokeModelWithResponseStreamWrapper(ObjectProxy):
"""Wrapper for botocore.eventstream.EventStream"""

def __init__(
self,
stream: EventStream,
stream_done_callback,
model_id: str,
):
super().__init__(stream)

self._stream_done_callback = stream_done_callback
self._model_id = model_id

# accumulating things in the same shape of the Converse API
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
self._response = {}

def __iter__(self):
for event in self.__wrapped__:
self._process_event(event)
yield event

def _process_event(self, event):
if "chunk" not in event:
return

json_bytes = event["chunk"].get("bytes", b"")
decoded = json_bytes.decode("utf-8")
try:
chunk = json.loads(decoded)
except json.JSONDecodeError:
return

if "amazon.titan" in self._model_id:
self._process_amazon_titan_chunk(chunk)
elif "amazon.nova" in self._model_id:
self._process_amazon_nova_chunk(chunk)
elif "anthropic.claude" in self._model_id:
self._process_anthropic_claude_chunk(chunk)

def _process_invocation_metrics(self, invocation_metrics):
self._response["usage"] = {}
if input_tokens := invocation_metrics.get("inputTokenCount"):
self._response["usage"]["inputTokens"] = input_tokens

if output_tokens := invocation_metrics.get("outputTokenCount"):
self._response["usage"]["outputTokens"] = output_tokens

def _process_amazon_titan_chunk(self, chunk):
if (stop_reason := chunk.get("completionReason")) is not None:
self._response["stopReason"] = stop_reason

if invocation_metrics := chunk.get("amazon-bedrock-invocationMetrics"):
# "amazon-bedrock-invocationMetrics":{
# "inputTokenCount":9,"outputTokenCount":128,"invocationLatency":3569,"firstByteLatency":2180
# }
self._process_invocation_metrics(invocation_metrics)
self._stream_done_callback(self._response)

def _process_amazon_nova_chunk(self, chunk):
if "messageStart" in chunk:
# {'messageStart': {'role': 'assistant'}}
return

if "contentBlockDelta" in chunk:
# {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}}
return

if "contentBlockStop" in chunk:
# {'contentBlockStop': {'contentBlockIndex': 0}}
return

if "messageStop" in chunk:
# {'messageStop': {'stopReason': 'end_turn'}}
if stop_reason := chunk["messageStop"].get("stopReason"):
self._response["stopReason"] = stop_reason
return

if "metadata" in chunk:
# {'metadata': {'usage': {'inputTokens': 8, 'outputTokens': 117}, 'metrics': {}, 'trace': {}}}
if usage := chunk["metadata"].get("usage"):
self._response["usage"] = {}
if input_tokens := usage.get("inputTokens"):
self._response["usage"]["inputTokens"] = input_tokens

if output_tokens := usage.get("outputTokens"):
self._response["usage"]["outputTokens"] = output_tokens

self._stream_done_callback(self._response)
return

def _process_anthropic_claude_chunk(self, chunk):
# pylint: disable=too-many-return-statements
if not (message_type := chunk.get("type")):
return

if message_type == "message_start":
# {'type': 'message_start', 'message': {'id': 'id', 'type': 'message', 'role': 'assistant', 'model': 'claude-2.0', 'content': [], 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': 18, 'output_tokens': 1}}}
return

if message_type == "content_block_start":
# {'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}}
return

if message_type == "content_block_delta":
# {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Here'}}
return

if message_type == "content_block_stop":
# {'type': 'content_block_stop', 'index': 0}
return

if message_type == "message_delta":
# {'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'usage': {'output_tokens': 123}}
if (
stop_reason := chunk.get("delta", {}).get("stop_reason")
) is not None:
self._response["stopReason"] = stop_reason
return

if message_type == "message_stop":
# {'type': 'message_stop', 'amazon-bedrock-invocationMetrics': {'inputTokenCount': 18, 'outputTokenCount': 123, 'invocationLatency': 5250, 'firstByteLatency': 290}}
if invocation_metrics := chunk.get(
"amazon-bedrock-invocationMetrics"
):
self._process_invocation_metrics(invocation_metrics)
self._stream_done_callback(self._response)
return
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def assert_converse_completion_attributes(
)


def assert_converse_stream_completion_attributes(
def assert_stream_completion_attributes(
span: ReadableSpan,
request_model: str,
input_tokens: int | None = None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
interactions:
- request:
body: null
headers:
Content-Length:
- '0'
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjRUMTM0NDM5Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLTFlMjljM2Y1LTU2MzZhOWI4MmViYTYxOTFiOTcwOTI2YTtQYXJlbnQ9NzA1NzBlZjUy
YzJkZjliYjtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
ZDg2MjFlMzAtNTk3Yi00ZWM3LWJlNGEtMThkMDQwZTRhMzcw
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/does-not-exist/invoke-with-response-stream
response:
body:
string: '{"message":"The provided model identifier is invalid."}'
headers:
Connection:
- keep-alive
Content-Length:
- '55'
Content-Type:
- application/json
Date:
- Fri, 24 Jan 2025 13:44:40 GMT
Set-Cookie: test_set_cookie
x-amzn-ErrorType:
- ValidationException:http://internal.amazon.com/coral/com.amazon.bedrock/
x-amzn-RequestId:
- 6460a108-875d-4e26-bcdf-f03c4c815f74
status:
code: 400
message: Bad Request
version: 1
Loading
Loading