From 43d489f4c6d87736f45aef702b028e4077ce316b Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Thu, 19 Sep 2024 07:22:10 +0000 Subject: [PATCH] feat: Add additional sampling checks to the Otel implementation --- .../pubsub/v1/OpenTelemetryPubsubTracer.java | 10 ++++--- .../google/cloud/pubsub/v1/PubsubTracer.java | 3 ++- .../v1/StreamingSubscriberConnection.java | 27 +++++++++++++++---- .../cloud/pubsub/v1/OpenTelemetryTest.java | 6 ++--- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java index e8ca33a6f..7fb974f8e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -158,7 +158,7 @@ public Span startPublishRpcSpan(String topic, List message Span publishRpcSpan = publishRpcSpanBuilder.startSpan(); for (PubsubMessageWrapper message : messages) { - if (message.getPublisherSpan().getSpanContext().isSampled()) { + if (publishRpcSpan.getSpanContext().isSampled()) { message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes); message.addPublishStartEvent(); } @@ -336,7 +336,7 @@ public Span startSubscribeRpcSpan( Span rpcSpan = rpcSpanBuilder.startSpan(); for (PubsubMessageWrapper message : messages) { - if (message.getSubscriberSpan().getSpanContext().isSampled()) { + if (rpcSpan.getSpanContext().isSampled()) { message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes); switch (rpcOperation) { case "ack": @@ -379,7 +379,11 @@ public void setSubscribeRpcSpanException( /** Adds the appropriate subscribe-side RPC end event. */ @Override - public void addEndRpcEvent(PubsubMessageWrapper message, boolean isModack, int ackDeadline) { + public void addEndRpcEvent( + PubsubMessageWrapper message, boolean rpcSampled, boolean isModack, int ackDeadline) { + if (!rpcSampled) { + return; + } if (!isModack) { message.addAckEndEvent(); } else if (ackDeadline == 0) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java index 98003235c..70123f98b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java @@ -131,7 +131,8 @@ default void setSubscribeRpcSpanException( // noop } - default void addEndRpcEvent(PubsubMessageWrapper message, boolean isModack, int ackDeadline) { + default void addEndRpcEvent( + PubsubMessageWrapper message, boolean rpcSampled, boolean isModack, int ackDeadline) { // noop } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index f7b4279bb..f33d2243d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -553,6 +553,8 @@ private ApiFutureCallback getCallback( // Check if ack or nack boolean setResponseOnSuccess = (!isModack || (deadlineExtensionSeconds == 0)) ? true : false; + boolean rpcSpanSampled = rpcSpan == null ? false : rpcSpan.getSpanContext().isSampled(); + return new ApiFutureCallback() { @Override public void onSuccess(Empty empty) { @@ -567,7 +569,10 @@ public void onSuccess(Empty empty) { // Remove from our pending operations pendingRequests.remove(ackRequestData); tracer.addEndRpcEvent( - ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); if (!isModack || deadlineExtensionSeconds == 0) { tracer.endSubscriberSpan(ackRequestData.getMessageWrapper()); } @@ -588,7 +593,10 @@ public void onFailure(Throwable t) { if (enableOpenTelemetryTracing) { for (AckRequestData ackRequestData : ackRequestDataList) { tracer.addEndRpcEvent( - ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); if (!isModack || deadlineExtensionSeconds == 0) { tracer.endSubscriberSpan(ackRequestData.getMessageWrapper()); } @@ -618,7 +626,10 @@ public void onFailure(Throwable t) { ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess); messageDispatcher.notifyAckFailed(ackRequestData); tracer.addEndRpcEvent( - ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); tracer.setSubscriberSpanException( ackRequestData.getMessageWrapper(), t, "Invalid ack ID"); } else { @@ -626,7 +637,10 @@ public void onFailure(Throwable t) { ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess); messageDispatcher.notifyAckFailed(ackRequestData); tracer.addEndRpcEvent( - ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); tracer.setSubscriberSpanException( ackRequestData.getMessageWrapper(), t, "Unknown error message"); ackRequestData @@ -638,7 +652,10 @@ public void onFailure(Throwable t) { messageDispatcher.notifyAckSuccess(ackRequestData); tracer.endSubscriberSpan(ackRequestData.getMessageWrapper()); tracer.addEndRpcEvent( - ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); } // Remove from our pending pendingRequests.remove(ackRequestData); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index 76f3aba09..204e12e3d 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -333,17 +333,17 @@ public void testSubscribeSpansSuccess() { ACK_DEADLINE, true); tracer.endSubscribeRpcSpan(subscribeModackRpcSpan); - tracer.addEndRpcEvent(subscribeMessageWrapper, true, ACK_DEADLINE); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, ACK_DEADLINE); Span subscribeAckRpcSpan = tracer.startSubscribeRpcSpan( FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false); tracer.endSubscribeRpcSpan(subscribeAckRpcSpan); - tracer.addEndRpcEvent(subscribeMessageWrapper, false, 0); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, false, 0); Span subscribeNackRpcSpan = tracer.startSubscribeRpcSpan( FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false); tracer.endSubscribeRpcSpan(subscribeNackRpcSpan); - tracer.addEndRpcEvent(subscribeMessageWrapper, true, 0); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, 0); tracer.endSubscriberSpan(subscribeMessageWrapper); List allSpans = openTelemetryTesting.getSpans();