Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(botocore): ensure streamed bedrock spans finish [backport #11499 to 2.16] #11509

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions ddtrace/contrib/internal/botocore/services/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,18 @@ def readlines(self):

def __iter__(self):
"""Wraps around method to tags the response data and finish the span as the user consumes the stream."""
exception_raised = False
try:
for line in self.__wrapped__:
self._body.append(json.loads(line["chunk"]["bytes"]))
yield line
except Exception:
core.dispatch("botocore.patched_bedrock_api_call.exception", [self._execution_ctx, sys.exc_info()])
exception_raised = True
raise
finally:
if exception_raised:
return
metadata = _extract_streamed_response_metadata(self._execution_ctx, self._body)
formatted_response = _extract_streamed_response(self._execution_ctx, self._body)
model_provider = self._execution_ctx["model_provider"]
Expand All @@ -92,9 +100,6 @@ def __iter__(self):
"botocore.bedrock.process_response",
[self._execution_ctx, formatted_response, metadata, self._body, should_set_choice_ids],
)
except Exception:
core.dispatch("botocore.patched_bedrock_api_call.exception", [self._execution_ctx, sys.exc_info()])
raise


def _extract_request_params(params: Dict[str, Any], provider: str) -> Dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
botocore: This fix resolves an issue in the Bedrock integration where not consuming the full response stream would prevent spans from finishing.
- |
LLM Observability: This fix ensures bedrock spans are finished even when streamed responses are not fully consumed.
39 changes: 25 additions & 14 deletions tests/contrib/botocore/test_bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ def aws_credentials():
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"


@pytest.fixture
def mock_tracer(ddtrace_global_config, bedrock_client):
pin = Pin.get_from(bedrock_client)
mock_tracer = DummyTracer(writer=DummyWriter(trace_flush_enabled=False))
pin.override(bedrock_client, tracer=mock_tracer)
yield mock_tracer


@pytest.fixture
def boto3(aws_credentials, mock_llmobs_span_writer, ddtrace_global_config):
global_config = {"_dd_api_key": "<not-a-real-api_key>"}
Expand Down Expand Up @@ -315,20 +323,6 @@ def test_read_error(bedrock_client, request_vcr):
response.get("body").read()


@pytest.mark.snapshot(ignores=["meta.error.stack"])
def test_read_stream_error(bedrock_client, request_vcr):
body, model = json.dumps(_REQUEST_BODIES["meta"]), _MODELS["meta"]
with request_vcr.use_cassette("meta_invoke_stream.yaml"):
response = bedrock_client.invoke_model_with_response_stream(body=body, modelId=model)
with mock.patch(
"ddtrace.contrib.internal.botocore.services.bedrock._extract_streamed_response"
) as mock_extract_response:
mock_extract_response.side_effect = Exception("test")
with pytest.raises(Exception):
for _ in response.get("body"):
pass


@pytest.mark.snapshot(ignores=["meta.error.stack"])
def test_readlines_error(bedrock_client, request_vcr):
body, model = json.dumps(_REQUEST_BODIES["meta"]), _MODELS["meta"]
Expand Down Expand Up @@ -358,3 +352,20 @@ def test_cohere_embedding(bedrock_client, request_vcr):
with request_vcr.use_cassette("cohere_embedding.yaml"):
response = bedrock_client.invoke_model(body=body, modelId=model)
json.loads(response.get("body").read())


def test_span_finishes_after_generator_exit(bedrock_client, request_vcr, mock_tracer):
body, model = json.dumps(_REQUEST_BODIES["anthropic_message"]), _MODELS["anthropic_message"]
with request_vcr.use_cassette("anthropic_message_invoke_stream.yaml"):
response = bedrock_client.invoke_model_with_response_stream(body=body, modelId=model)
i = 0
with pytest.raises(GeneratorExit):
for _ in response.get("body"):
if i >= 6:
raise GeneratorExit
i += 1
span = mock_tracer.pop_traces()[0][0]
assert span is not None
assert span.name == "bedrock-runtime.command"
assert span.resource == "InvokeModelWithResponseStream"
assert span.get_tag("bedrock.response.choices.0.text").startswith("Hobb")

This file was deleted.

This file was deleted.

Loading