From c5f71db23ecbeb841c2cf0115c0f41b879a0f7d5 Mon Sep 17 00:00:00 2001 From: Yun Kim <35776586+Yun-Kim@users.noreply.github.com> Date: Fri, 22 Nov 2024 14:22:55 -0500 Subject: [PATCH] fix(botocore): ensure streamed bedrock spans finish [backport #11499 to 2.16] (#11509) Backport #11499 to 2.16. Fixes #11295. There was an issue with the langchain-aws library and our bedrock integration in that langchain-aws' ChatBedrock class was actually resulting in a GeneratorExit because of [this code](https://github.com/langchain-ai/langchain/blob/f173b72e35979b842933774c9c4568c329a0ae8a/libs/core/langchain_core/language_models/chat_models.py#L88-L90) implicitly calling close(stream) under the hood in Python. this means that any [post-processor code](https://github.com/DataDog/dd-trace-py/blob/a664aab0a9d53e8dbb874d68933613de4cbecb1d/ddtrace/contrib/internal/botocore/services/bedrock.py#L84-L94) was actually never reached. Since GeneratorExits do not inherit from Exception (they actually inherit from BaseException), this was not caught in our `except Except` block, meaning spans never went through either of our post-processing (success or error) code. The solution is to move post processing code into a finally block, to ensure that spans will always be finished. Note that GeneratorExits are not indicative of actual user/system errors and will not be flagged as such (spans will not be marked with error, post processing will simply include only information available until the last yielded chunk) ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --- .../internal/botocore/services/bedrock.py | 11 +++-- ...ck-early-stream-exit-81da39d97fb1b26e.yaml | 6 +++ tests/contrib/botocore/test_bedrock.py | 39 ++++++++++------ ..._cohere_invoke_stream_multiple_output.json | 44 ------------------- ...e.test_bedrock.test_read_stream_error.json | 39 ---------------- 5 files changed, 39 insertions(+), 100 deletions(-) create mode 100644 releasenotes/notes/fix-bedrock-early-stream-exit-81da39d97fb1b26e.yaml delete mode 100644 tests/snapshots/tests.contrib.botocore.test_bedrock.test_cohere_invoke_stream_multiple_output.json delete mode 100644 tests/snapshots/tests.contrib.botocore.test_bedrock.test_read_stream_error.json diff --git a/ddtrace/contrib/internal/botocore/services/bedrock.py b/ddtrace/contrib/internal/botocore/services/bedrock.py index dba11d1d450..7c5f26b07a5 100644 --- a/ddtrace/contrib/internal/botocore/services/bedrock.py +++ b/ddtrace/contrib/internal/botocore/services/bedrock.py @@ -77,10 +77,18 @@ def readlines(self): def __iter__(self): """Wraps around method to tags the response data and finish the span as the user consumes the stream.""" + exception_raised = False try: for line in self.__wrapped__: self._body.append(json.loads(line["chunk"]["bytes"])) yield line + except Exception: + core.dispatch("botocore.patched_bedrock_api_call.exception", [self._execution_ctx, sys.exc_info()]) + exception_raised = True + raise + finally: + if exception_raised: + return metadata = _extract_streamed_response_metadata(self._execution_ctx, self._body) formatted_response = _extract_streamed_response(self._execution_ctx, self._body) model_provider = self._execution_ctx["model_provider"] @@ -92,9 +100,6 @@ def __iter__(self): "botocore.bedrock.process_response", [self._execution_ctx, formatted_response, metadata, self._body, should_set_choice_ids], ) - except Exception: - core.dispatch("botocore.patched_bedrock_api_call.exception", [self._execution_ctx, sys.exc_info()]) - raise def _extract_request_params(params: Dict[str, Any], provider: str) -> Dict[str, Any]: diff --git a/releasenotes/notes/fix-bedrock-early-stream-exit-81da39d97fb1b26e.yaml b/releasenotes/notes/fix-bedrock-early-stream-exit-81da39d97fb1b26e.yaml new file mode 100644 index 00000000000..8447cce8f65 --- /dev/null +++ b/releasenotes/notes/fix-bedrock-early-stream-exit-81da39d97fb1b26e.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + botocore: This fix resolves an issue in the Bedrock integration where not consuming the full response stream would prevent spans from finishing. + - | + LLM Observability: This fix ensures bedrock spans are finished even when streamed responses are not fully consumed. diff --git a/tests/contrib/botocore/test_bedrock.py b/tests/contrib/botocore/test_bedrock.py index 3c91b147a97..1001aff0dac 100644 --- a/tests/contrib/botocore/test_bedrock.py +++ b/tests/contrib/botocore/test_bedrock.py @@ -38,6 +38,14 @@ def aws_credentials(): os.environ["AWS_DEFAULT_REGION"] = "us-east-1" +@pytest.fixture +def mock_tracer(ddtrace_global_config, bedrock_client): + pin = Pin.get_from(bedrock_client) + mock_tracer = DummyTracer(writer=DummyWriter(trace_flush_enabled=False)) + pin.override(bedrock_client, tracer=mock_tracer) + yield mock_tracer + + @pytest.fixture def boto3(aws_credentials, mock_llmobs_span_writer, ddtrace_global_config): global_config = {"_dd_api_key": ""} @@ -315,20 +323,6 @@ def test_read_error(bedrock_client, request_vcr): response.get("body").read() -@pytest.mark.snapshot(ignores=["meta.error.stack"]) -def test_read_stream_error(bedrock_client, request_vcr): - body, model = json.dumps(_REQUEST_BODIES["meta"]), _MODELS["meta"] - with request_vcr.use_cassette("meta_invoke_stream.yaml"): - response = bedrock_client.invoke_model_with_response_stream(body=body, modelId=model) - with mock.patch( - "ddtrace.contrib.internal.botocore.services.bedrock._extract_streamed_response" - ) as mock_extract_response: - mock_extract_response.side_effect = Exception("test") - with pytest.raises(Exception): - for _ in response.get("body"): - pass - - @pytest.mark.snapshot(ignores=["meta.error.stack"]) def test_readlines_error(bedrock_client, request_vcr): body, model = json.dumps(_REQUEST_BODIES["meta"]), _MODELS["meta"] @@ -358,3 +352,20 @@ def test_cohere_embedding(bedrock_client, request_vcr): with request_vcr.use_cassette("cohere_embedding.yaml"): response = bedrock_client.invoke_model(body=body, modelId=model) json.loads(response.get("body").read()) + + +def test_span_finishes_after_generator_exit(bedrock_client, request_vcr, mock_tracer): + body, model = json.dumps(_REQUEST_BODIES["anthropic_message"]), _MODELS["anthropic_message"] + with request_vcr.use_cassette("anthropic_message_invoke_stream.yaml"): + response = bedrock_client.invoke_model_with_response_stream(body=body, modelId=model) + i = 0 + with pytest.raises(GeneratorExit): + for _ in response.get("body"): + if i >= 6: + raise GeneratorExit + i += 1 + span = mock_tracer.pop_traces()[0][0] + assert span is not None + assert span.name == "bedrock-runtime.command" + assert span.resource == "InvokeModelWithResponseStream" + assert span.get_tag("bedrock.response.choices.0.text").startswith("Hobb") diff --git a/tests/snapshots/tests.contrib.botocore.test_bedrock.test_cohere_invoke_stream_multiple_output.json b/tests/snapshots/tests.contrib.botocore.test_bedrock.test_cohere_invoke_stream_multiple_output.json deleted file mode 100644 index 4aec0662f05..00000000000 --- a/tests/snapshots/tests.contrib.botocore.test_bedrock.test_cohere_invoke_stream_multiple_output.json +++ /dev/null @@ -1,44 +0,0 @@ -[[ - { - "name": "bedrock-runtime.command", - "service": "aws.bedrock-runtime", - "resource": "InvokeModelWithResponseStream", - "trace_id": 0, - "span_id": 1, - "parent_id": 0, - "type": "", - "error": 0, - "meta": { - "_dd.base_service": "", - "_dd.p.dm": "-0", - "_dd.p.tid": "65a0359000000000", - "bedrock.request.max_tokens": "10", - "bedrock.request.model": "command-light-text-v14", - "bedrock.request.model_provider": "cohere", - "bedrock.request.n": "2", - "bedrock.request.prompt": "\\n\\nHuman: %s\\n\\nAssistant: Can you explain what a LLM chain is?", - "bedrock.request.stop_sequences": "[]", - "bedrock.request.stream": "True", - "bedrock.request.temperature": "0.9", - "bedrock.request.top_k": "0", - "bedrock.request.top_p": "1.0", - "bedrock.response.choices.0.finish_reason": "MAX_TOKENS", - "bedrock.response.choices.0.text": " In machine learning and natural language processing, Long Short", - "bedrock.response.choices.1.finish_reason": "MAX_TOKENS", - "bedrock.response.choices.1.text": " A large language model (LLM) chain refers", - "bedrock.response.duration": "597", - "bedrock.response.id": "2501c28f-0a40-49ec-9b59-5d58313c05f3", - "bedrock.usage.completion_tokens": "20", - "bedrock.usage.prompt_tokens": "40", - "language": "python", - "runtime-id": "e2468c635ce44f8788acce3e9e569237" - }, - "metrics": { - "_dd.top_level": 1, - "_dd.tracer_kr": 1.0, - "_sampling_priority_v1": 1, - "process_id": 21816 - }, - "duration": 980170000, - "start": 1704998288813204000 - }]] diff --git a/tests/snapshots/tests.contrib.botocore.test_bedrock.test_read_stream_error.json b/tests/snapshots/tests.contrib.botocore.test_bedrock.test_read_stream_error.json deleted file mode 100644 index abdffb55cdf..00000000000 --- a/tests/snapshots/tests.contrib.botocore.test_bedrock.test_read_stream_error.json +++ /dev/null @@ -1,39 +0,0 @@ -[[ - { - "name": "bedrock-runtime.command", - "service": "aws.bedrock-runtime", - "resource": "InvokeModelWithResponseStream", - "trace_id": 0, - "span_id": 1, - "parent_id": 0, - "type": "", - "error": 1, - "meta": { - "_dd.base_service": "", - "_dd.p.dm": "-0", - "_dd.p.tid": "659dea8d00000000", - "bedrock.request.max_tokens": "60", - "bedrock.request.model": "llama2-13b-chat-v1", - "bedrock.request.model_provider": "meta", - "bedrock.request.prompt": "What does 'lorem ipsum' mean?", - "bedrock.request.temperature": "0.9", - "bedrock.request.top_p": "1.0", - "bedrock.response.duration": "", - "bedrock.response.id": "4dcc90b7-81b8-4983-b2d3-9989798b0db1", - "bedrock.usage.completion_tokens": "", - "bedrock.usage.prompt_tokens": "", - "error.message": "test", - "error.stack": "Traceback (most recent call last):\n File \"/Users/yun.kim/go/src/github.com/DataDog/dd-trace-py/ddtrace/contrib/botocore/services/bedrock.py\", line 66, in __iter__\n formatted_response = _extract_streamed_response(self._datadog_span, self._body)\n File \"/Users/yun.kim/go/src/github.com/DataDog/dd-trace-py/.riot/venv_py3105_mock_pytest_pytest-mock_coverage_pytest-cov_opentracing_hypothesis6451_moto[all]_botocore_pytest-randomly_vcrpy/lib/python3.10/site-packages/mock/mock.py\", line 1178, in __call__\n return _mock_self._mock_call(*args, **kwargs)\n File \"/Users/yun.kim/go/src/github.com/DataDog/dd-trace-py/.riot/venv_py3105_mock_pytest_pytest-mock_coverage_pytest-cov_opentracing_hypothesis6451_moto[all]_botocore_pytest-randomly_vcrpy/lib/python3.10/site-packages/mock/mock.py\", line 1182, in _mock_call\n return _mock_self._execute_mock_call(*args, **kwargs)\n File \"/Users/yun.kim/go/src/github.com/DataDog/dd-trace-py/.riot/venv_py3105_mock_pytest_pytest-mock_coverage_pytest-cov_opentracing_hypothesis6451_moto[all]_botocore_pytest-randomly_vcrpy/lib/python3.10/site-packages/mock/mock.py\", line 1239, in _execute_mock_call\n raise effect\nException: test\n", - "error.type": "builtins.Exception", - "language": "python", - "runtime-id": "5e97a84a83514f5383132eed9df2755f" - }, - "metrics": { - "_dd.top_level": 1, - "_dd.tracer_kr": 1.0, - "_sampling_priority_v1": 1, - "process_id": 42139 - }, - "duration": 3661000, - "start": 1704848013372717000 - }]]