Skip to content

Commit

Permalink
fix(botocore): ensure streamed bedrock spans finish [backport #11499
Browse files Browse the repository at this point in the history
…to 2.16] (#11509)

Backport #11499 to 2.16.

Fixes #11295.

There was an issue with the langchain-aws library and our bedrock
integration in that langchain-aws' ChatBedrock class was actually
resulting in a GeneratorExit because of [this

code](https://github.com/langchain-ai/langchain/blob/f173b72e35979b842933774c9c4568c329a0ae8a/libs/core/langchain_core/language_models/chat_models.py#L88-L90)
implicitly calling close(stream) under the hood in Python. this means
that any [post-processor

code](https://github.com/DataDog/dd-trace-py/blob/a664aab0a9d53e8dbb874d68933613de4cbecb1d/ddtrace/contrib/internal/botocore/services/bedrock.py#L84-L94)
was actually never reached. Since GeneratorExits do not inherit from
Exception (they actually inherit from BaseException), this was not
caught in our `except Except` block, meaning spans never went through
either of our post-processing (success or error) code.

The solution is to move post processing code into a finally block, to
ensure that spans will always be finished. Note that GeneratorExits are
not indicative of actual user/system errors and will not be flagged as
such (spans will not be marked with error, post processing will simply
include only information available until the last yielded chunk)

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
Yun-Kim authored Nov 22, 2024
1 parent 09a4236 commit c5f71db
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 100 deletions.
11 changes: 8 additions & 3 deletions ddtrace/contrib/internal/botocore/services/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,18 @@ def readlines(self):

def __iter__(self):
"""Wraps around method to tags the response data and finish the span as the user consumes the stream."""
exception_raised = False
try:
for line in self.__wrapped__:
self._body.append(json.loads(line["chunk"]["bytes"]))
yield line
except Exception:
core.dispatch("botocore.patched_bedrock_api_call.exception", [self._execution_ctx, sys.exc_info()])
exception_raised = True
raise
finally:
if exception_raised:
return
metadata = _extract_streamed_response_metadata(self._execution_ctx, self._body)
formatted_response = _extract_streamed_response(self._execution_ctx, self._body)
model_provider = self._execution_ctx["model_provider"]
Expand All @@ -92,9 +100,6 @@ def __iter__(self):
"botocore.bedrock.process_response",
[self._execution_ctx, formatted_response, metadata, self._body, should_set_choice_ids],
)
except Exception:
core.dispatch("botocore.patched_bedrock_api_call.exception", [self._execution_ctx, sys.exc_info()])
raise


def _extract_request_params(params: Dict[str, Any], provider: str) -> Dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
botocore: This fix resolves an issue in the Bedrock integration where not consuming the full response stream would prevent spans from finishing.
- |
LLM Observability: This fix ensures bedrock spans are finished even when streamed responses are not fully consumed.
39 changes: 25 additions & 14 deletions tests/contrib/botocore/test_bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ def aws_credentials():
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"


@pytest.fixture
def mock_tracer(ddtrace_global_config, bedrock_client):
pin = Pin.get_from(bedrock_client)
mock_tracer = DummyTracer(writer=DummyWriter(trace_flush_enabled=False))
pin.override(bedrock_client, tracer=mock_tracer)
yield mock_tracer


@pytest.fixture
def boto3(aws_credentials, mock_llmobs_span_writer, ddtrace_global_config):
global_config = {"_dd_api_key": "<not-a-real-api_key>"}
Expand Down Expand Up @@ -315,20 +323,6 @@ def test_read_error(bedrock_client, request_vcr):
response.get("body").read()


@pytest.mark.snapshot(ignores=["meta.error.stack"])
def test_read_stream_error(bedrock_client, request_vcr):
body, model = json.dumps(_REQUEST_BODIES["meta"]), _MODELS["meta"]
with request_vcr.use_cassette("meta_invoke_stream.yaml"):
response = bedrock_client.invoke_model_with_response_stream(body=body, modelId=model)
with mock.patch(
"ddtrace.contrib.internal.botocore.services.bedrock._extract_streamed_response"
) as mock_extract_response:
mock_extract_response.side_effect = Exception("test")
with pytest.raises(Exception):
for _ in response.get("body"):
pass


@pytest.mark.snapshot(ignores=["meta.error.stack"])
def test_readlines_error(bedrock_client, request_vcr):
body, model = json.dumps(_REQUEST_BODIES["meta"]), _MODELS["meta"]
Expand Down Expand Up @@ -358,3 +352,20 @@ def test_cohere_embedding(bedrock_client, request_vcr):
with request_vcr.use_cassette("cohere_embedding.yaml"):
response = bedrock_client.invoke_model(body=body, modelId=model)
json.loads(response.get("body").read())


def test_span_finishes_after_generator_exit(bedrock_client, request_vcr, mock_tracer):
body, model = json.dumps(_REQUEST_BODIES["anthropic_message"]), _MODELS["anthropic_message"]
with request_vcr.use_cassette("anthropic_message_invoke_stream.yaml"):
response = bedrock_client.invoke_model_with_response_stream(body=body, modelId=model)
i = 0
with pytest.raises(GeneratorExit):
for _ in response.get("body"):
if i >= 6:
raise GeneratorExit
i += 1
span = mock_tracer.pop_traces()[0][0]
assert span is not None
assert span.name == "bedrock-runtime.command"
assert span.resource == "InvokeModelWithResponseStream"
assert span.get_tag("bedrock.response.choices.0.text").startswith("Hobb")

This file was deleted.

This file was deleted.

0 comments on commit c5f71db

Please sign in to comment.