Skip to content

Commit

Permalink
[mq] working branch - merge e4ea23a on top of 2.17 at ea648a2
Browse files Browse the repository at this point in the history
{"baseBranch":"2.17","baseCommit":"ea648a22dc880dba39e66fda6864a660fcbed412","createdAt":"2024-11-25T23:12:06.445596Z","headSha":"e4ea23a3fa4d29a8127e5fd5ff0c4b62742379c7","id":"2842b5f9-cb17-4ef4-85b1-f7c346b668c7","priority":"200","pullRequestNumber":"11449","queuedAt":"2024-11-25T23:21:14.986092Z","status":"STATUS_QUEUED"}
  • Loading branch information
dd-mergequeue[bot] authored Nov 25, 2024
2 parents 8be5d36 + e4ea23a commit 9da88b9
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 20 deletions.
80 changes: 60 additions & 20 deletions ddtrace/contrib/internal/grpc/aio_server_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,18 @@ def _wrap_unary_response(
span.finish()


def _create_span(pin, handler_call_details, method_kind):
# type: (Pin, grpc.HandlerCallDetails, str) -> Span
def _create_span(pin, method, invocation_metadata, method_kind):
# type: (Pin, str, grpc.HandlerCallDetails, str) -> Span
tracer = pin.tracer
headers = dict(handler_call_details.invocation_metadata)

trace_utils.activate_distributed_headers(tracer, int_config=config.grpc_aio_server, request_headers=headers)
trace_utils.activate_distributed_headers(
tracer, int_config=config.grpc_aio_server, request_headers=dict(invocation_metadata)
)

span = tracer.trace(
schematize_url_operation("grpc", protocol="grpc", direction=SpanDirection.INBOUND),
span_type=SpanTypes.GRPC,
service=trace_utils.int_service(pin, config.grpc_aio_server),
resource=handler_call_details.method,
resource=method,
)

span.set_tag_str(COMPONENT, config.grpc_aio_server.integration_name)
Expand All @@ -193,7 +193,7 @@ def _create_span(pin, handler_call_details, method_kind):

span.set_tag(SPAN_MEASURED_KEY)

set_grpc_method_meta(span, handler_call_details.method, method_kind)
set_grpc_method_meta(span, method, method_kind)
span.set_tag_str(constants.GRPC_SPAN_KIND_KEY, constants.GRPC_SPAN_KIND_VALUE_SERVER)

sample_rate = config.grpc_aio_server.get_analytics_sample_rate()
Expand All @@ -211,22 +211,37 @@ def __init__(self, pin, handler_call_details, wrapped):
# type: (Pin, grpc.HandlerCallDetails, grpc.RpcMethodHandler) -> None
super(_TracedCoroRpcMethodHandler, self).__init__(wrapped)
self._pin = pin
self._handler_call_details = handler_call_details
self.method = handler_call_details.method

async def unary_unary(self, request: RequestType, context: aio.ServicerContext) -> ResponseType:
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_UNARY)
span = _create_span(self._pin, self.method, context.invocation_metadata(), constants.GRPC_METHOD_KIND_UNARY)
return await _wrap_aio_unary_response(self.__wrapped__.unary_unary, request, context, span)

async def unary_stream(self, request: RequestType, context: aio.ServicerContext) -> ResponseType:
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_SERVER_STREAMING)
span = _create_span(
self._pin,
self.method,
context.invocation_metadata(),
constants.GRPC_METHOD_KIND_SERVER_STREAMING,
)
return await _wrap_aio_unary_response(self.__wrapped__.unary_stream, request, context, span)

async def stream_unary(self, request_iterator: RequestIterableType, context: aio.ServicerContext) -> ResponseType:
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_CLIENT_STREAMING)
span = _create_span(
self._pin,
self.method,
context.invocation_metadata(),
constants.GRPC_METHOD_KIND_CLIENT_STREAMING,
)
return await _wrap_aio_unary_response(self.__wrapped__.stream_unary, request_iterator, context, span)

async def stream_stream(self, request_iterator: RequestIterableType, context: aio.ServicerContext) -> ResponseType:
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_BIDI_STREAMING)
span = _create_span(
self._pin,
self.method,
context.invocation_metadata(),
constants.GRPC_METHOD_KIND_BIDI_STREAMING,
)
return await _wrap_aio_unary_response(self.__wrapped__.stream_stream, request_iterator, context, span)


Expand All @@ -235,17 +250,27 @@ def __init__(self, pin, handler_call_details, wrapped):
# type: (Pin, grpc.HandlerCallDetails, grpc.RpcMethodHandler) -> None
super(_TracedAsyncGenRpcMethodHandler, self).__init__(wrapped)
self._pin = pin
self._handler_call_details = handler_call_details
self.method = handler_call_details.method

async def unary_stream(self, request: RequestType, context: aio.ServicerContext) -> ResponseIterableType:
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_SERVER_STREAMING)
span = _create_span(
self._pin,
self.method,
context.invocation_metadata(),
constants.GRPC_METHOD_KIND_SERVER_STREAMING,
)
async for response in _wrap_aio_stream_response(self.__wrapped__.unary_stream, request, context, span):
yield response

async def stream_stream(
self, request_iterator: RequestIterableType, context: aio.ServicerContext
) -> ResponseIterableType:
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_BIDI_STREAMING)
span = _create_span(
self._pin,
self.method,
context.invocation_metadata(),
constants.GRPC_METHOD_KIND_BIDI_STREAMING,
)
async for response in _wrap_aio_stream_response(
self.__wrapped__.stream_stream, request_iterator, context, span
):
Expand All @@ -257,27 +282,42 @@ def __init__(self, pin, handler_call_details, wrapped):
# type: (Pin, grpc.HandlerCallDetails, grpc.RpcMethodHandler) -> None
super(_TracedRpcMethodHandler, self).__init__(wrapped)
self._pin = pin
self._handler_call_details = handler_call_details
self.method = handler_call_details.method

def unary_unary(self, request, context):
# type: (Any, grpc.ServicerContext) -> Any
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_UNARY)
span = _create_span(self._pin, self.method, context.invocation_metadata(), constants.GRPC_METHOD_KIND_UNARY)
return _wrap_unary_response(self.__wrapped__.unary_unary, request, context, span)

def unary_stream(self, request, context):
# type: (Any, grpc.ServicerContext) -> Iterable[Any]
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_SERVER_STREAMING)
span = _create_span(
self._pin,
self.method,
context.invocation_metadata(),
constants.GRPC_METHOD_KIND_SERVER_STREAMING,
)
for response in _wrap_stream_response(self.__wrapped__.unary_stream, request, context, span):
yield response

def stream_unary(self, request_iterator, context):
# type: (Iterable[Any], grpc.ServicerContext) -> Any
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_CLIENT_STREAMING)
span = _create_span(
self._pin,
self.method,
context.invocation_metadata(),
constants.GRPC_METHOD_KIND_CLIENT_STREAMING,
)
return _wrap_unary_response(self.__wrapped__.stream_unary, request_iterator, context, span)

def stream_stream(self, request_iterator, context):
# type: (Iterable[Any], grpc.ServicerContext) -> Iterable[Any]
span = _create_span(self._pin, self._handler_call_details, constants.GRPC_METHOD_KIND_BIDI_STREAMING)
span = _create_span(
self._pin,
self.method,
context.invocation_metadata(),
constants.GRPC_METHOD_KIND_BIDI_STREAMING,
)
for response in _wrap_stream_response(self.__wrapped__.stream_stream, request_iterator, context, span):
yield response

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
grpcaio: Resolves a concurrency bug where distributed tracing headers were overwritten resulting in spans being assigned to the wrong trace.

0 comments on commit 9da88b9

Please sign in to comment.