From 496ef8ad40a1f2aa6f59ebc6560a9c27ddcac3d0 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Tue, 2 Jul 2024 06:52:37 +0000 Subject: [PATCH] feat: Add Ack/Nack/ModAck RPC spans to the subscribe --- .../cloud/pubsub/v1/MessageDispatcher.java | 6 +- .../cloud/pubsub/v1/ModackRequestData.java | 10 ++ .../cloud/pubsub/v1/OpenTelemetryUtil.java | 91 +++++++++++++++++-- .../cloud/pubsub/v1/PubsubMessageWrapper.java | 72 ++++++++++++++- .../v1/StreamingSubscriberConnection.java | 72 +++++++++++++-- 5 files changed, 233 insertions(+), 18 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 2b327fb80..a766efb18 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -646,8 +646,10 @@ void processOutstandingOperations() { List ackRequestDataReceipts = new ArrayList(); pendingReceipts.drainTo(ackRequestDataReceipts); if (!ackRequestDataReceipts.isEmpty()) { - modackRequestData.add( - new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts)); + ModackRequestData receiptModack = + new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts); + receiptModack.setIsReceiptModack(true); + modackRequestData.add(receiptModack); } logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size()); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java index b4d2dae0f..54c7436af 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java @@ -21,6 +21,7 @@ class ModackRequestData { private final int deadlineExtensionSeconds; private List ackRequestData; + private boolean isReceiptModack; ModackRequestData(int deadlineExtensionSeconds) { this.deadlineExtensionSeconds = deadlineExtensionSeconds; @@ -45,8 +46,17 @@ public List getAckRequestData() { return ackRequestData; } + public boolean getIsReceiptModack() { + return isReceiptModack; + } + public ModackRequestData addAckRequestData(AckRequestData ackRequestData) { this.ackRequestData.add(ackRequestData); return this; } + + public ModackRequestData setIsReceiptModack(boolean isReceiptModack) { + this.isReceiptModack = isReceiptModack; + return this; + } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java index 51fdc4873..6ee6bbccb 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub.v1; +import com.google.pubsub.v1.SubscriptionName; import com.google.pubsub.v1.TopicName; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; @@ -28,7 +29,9 @@ public class OpenTelemetryUtil { private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; - private static final String PROJECT_ATTR_KEY = "gcp_pubsub.project_id"; + private static final String PROJECT_ATTR_KEY = "gcp.project_id"; + private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; + private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; @@ -50,7 +53,7 @@ public static final AttributesBuilder createCommonSpanAttributesBuilder( /** * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional - * links with the publisher parent span are created for each message in the batch. + * links with the publisher parent span are created for sampled messages in the batch. */ public static final Span startPublishRpcSpan( Tracer tracer, @@ -71,11 +74,13 @@ public static final Span startPublishRpcSpan( .startSpan(); for (PubsubMessageWrapper message : messages) { - Attributes linkAttributes = - Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build(); - publishRpcSpan.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes); - message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes); - message.addPublishStartEvent(); + if (message.getPublisherSpan().getSpanContext().isSampled()) { + Attributes linkAttributes = + Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build(); + publishRpcSpan.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes); + message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes); + message.addPublishStartEvent(); + } } return publishRpcSpan; } @@ -102,4 +107,76 @@ public static final void setPublishRpcSpanException( endPublishRpcSpan(publishRpcSpan, enableOpenTelemetryTracing); } } + + /** + * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links + * to parent subscribe span for sampled messages are added. + */ + public static final Span startSubscribeRpcSpan( + Tracer tracer, + SubscriptionName subscriptionName, + String rpcOperation, + List messages, + int ackDeadline, + boolean isReceiptModack, + boolean enableOpenTelemetryTracing) { + if (enableOpenTelemetryTracing && tracer != null) { + String codeFunction = + rpcOperation == "ack" + ? "StreamingSubscriberConnection.sendAckOperations" + : "StreamingSubscriberConnection.sendModAckOperations"; + AttributesBuilder attributesBuilder = + createCommonSpanAttributesBuilder( + subscriptionName.getSubscription(), + subscriptionName.getProject(), + codeFunction, + rpcOperation) + .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()); + + // Ack deadline and receipt modack are specific to the modack operation + if (rpcOperation == "modack") { + attributesBuilder + .put(ACK_DEADLINE_ATTR_KEY, ackDeadline) + .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack); + } + + Span rpcSpan = + tracer + .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation) + .setSpanKind(SpanKind.CLIENT) + .setAllAttributes(attributesBuilder.build()) + .startSpan(); + + for (PubsubMessageWrapper message : messages) { + if (message.getSubscriberSpan().getSpanContext().isSampled()) { + Attributes linkAttributes = + Attributes.builder() + .put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation) + .build(); + rpcSpan.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes); + message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes); + switch (rpcOperation) { + case "ack": + message.addAckStartEvent(); + break; + case "modack": + message.addModAckStartEvent(); + break; + case "nack": + message.addNackStartEvent(); + break; + } + } + } + return rpcSpan; + } + return null; + } + + /** Ends the given subscribe RPC span if it exists. */ + public static final void endSubscribeRpcSpan(Span rpcSpan, boolean enableOpenTelemetryTracing) { + if (enableOpenTelemetryTracing && rpcSpan != null) { + rpcSpan.end(); + } + } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java index b8446fd43..78e6625fe 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -58,6 +58,12 @@ public class PubsubMessageWrapper { "subscriber concurrency control"; private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; private String SUBSCRIBE_PROCESS_SPAN_NAME; + private static final String MODACK_START_EVENT = "modack start"; + private static final String MODACK_END_EVENT = "modack end"; + private static final String NACK_START_EVENT = "nack start"; + private static final String NACK_END_EVENT = "nack end"; + private static final String ACK_START_EVENT = "ack start"; + private static final String ACK_END_EVENT = "ack start"; private static final String GOOGCLIENT_PREFIX = "googclient_"; @@ -116,11 +122,16 @@ public PubsubMessage getPubsubMessage() { return message; } - /** Returns the parent span for this message wrapper. */ + /** Returns the parent publisher span for this message wrapper. */ public Span getPublisherSpan() { return publisherSpan; } + /** Returns the parent subscriber span for this message wrapper. */ + public Span getSubscriberSpan() { + return subscriberSpan; + } + /** Returns the delivery attempt for the received PubsubMessage. */ public int getDeliveryAttempt() { return deliveryAttempt; @@ -302,6 +313,56 @@ public void startSubscribeProcessSpan(Tracer tracer) { } } + /** + * Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding + * RPC span start and end times. + */ + public void addModAckStartEvent() { + if (enableOpenTelemetryTracing && subscriberSpan != null) { + subscriberSpan.addEvent(MODACK_START_EVENT); + } + } + + public void addModAckEndEvent() { + if (enableOpenTelemetryTracing && subscriberSpan != null) { + subscriberSpan.addEvent(MODACK_END_EVENT); + } + } + + public void addNackStartEvent() { + if (enableOpenTelemetryTracing && subscriberSpan != null) { + subscriberSpan.addEvent(NACK_START_EVENT); + } + } + + public void addNackEndEvent() { + if (enableOpenTelemetryTracing && subscriberSpan != null) { + subscriberSpan.addEvent(NACK_END_EVENT); + } + } + + public void addAckStartEvent() { + if (enableOpenTelemetryTracing && subscriberSpan != null) { + subscriberSpan.addEvent(ACK_START_EVENT); + } + } + + public void addAckEndEvent() { + if (enableOpenTelemetryTracing && subscriberSpan != null) { + subscriberSpan.addEvent(ACK_END_EVENT); + } + } + + public void addEndRpcEvent(boolean isModack, int ackDeadline) { + if (!isModack) { + addAckEndEvent(); + } else if (ackDeadline == 0) { + addNackEndEvent(); + } else { + addModAckEndEvent(); + } + } + /** Ends the subscriber parent span if exists. */ public void endSubscriberSpan() { if (enableOpenTelemetryTracing && subscriberSpan != null) { @@ -335,6 +396,15 @@ public void endSubscribeProcessSpan(String action) { } } + /** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */ + public void setSubscriberSpanException(Throwable t, String exception) { + if (enableOpenTelemetryTracing && subscriberSpan != null) { + subscriberSpan.setStatus(StatusCode.ERROR, exception); + subscriberSpan.recordException(t); + endAllSubscribeSpans(); + } + } + /** Sets result of the parent subscriber span to expired and ends its. */ public void setSubscriberSpanExpirationResult() { if (enableOpenTelemetryTracing && subscriberSpan != null) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index e35968777..5824ba41b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -47,9 +47,11 @@ import com.google.pubsub.v1.ModifyAckDeadlineRequest; import com.google.pubsub.v1.StreamingPullRequest; import com.google.pubsub.v1.StreamingPullResponse; +import com.google.pubsub.v1.SubscriptionName; import com.google.rpc.ErrorInfo; import io.grpc.Status; import io.grpc.protobuf.StatusProto; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import java.util.ArrayList; import java.util.Collections; @@ -444,15 +446,27 @@ private void sendAckOperations( for (List ackRequestDataInRequestList : Lists.partition(ackRequestDataList, MAX_PER_REQUEST_CHANGES)) { List ackIdsInRequest = new ArrayList<>(); + List messagesInRequest = new ArrayList<>(); for (AckRequestData ackRequestData : ackRequestDataInRequestList) { ackIdsInRequest.add(ackRequestData.getAckId()); + messagesInRequest.add(ackRequestData.getMessageWrapper()); if (ackRequestData.hasMessageFuture()) { // Add to our pending requests if we care about the response pendingRequests.add(ackRequestData); } } + // Creates an Ack span to be passed to the callback + Span rpcSpan = + OpenTelemetryUtil.startSubscribeRpcSpan( + tracer, + SubscriptionName.parse(subscriptionName), + "ack", + messagesInRequest, + 0, + false, + enableOpenTelemetryTracing); ApiFutureCallback callback = - getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis); + getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan); ApiFuture ackFuture = subscriberStub .acknowledgeCallable() @@ -475,19 +489,34 @@ private void sendModackOperations( for (List ackRequestDataInRequestList : Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) { List ackIdsInRequest = new ArrayList<>(); + List messagesInRequest = new ArrayList<>(); for (AckRequestData ackRequestData : ackRequestDataInRequestList) { ackIdsInRequest.add(ackRequestData.getAckId()); + messagesInRequest.add(ackRequestData.getMessageWrapper()); if (ackRequestData.hasMessageFuture()) { // Add to our pending requests if we care about the response pendingRequests.add(ackRequestData); } } + int deadlineExtensionSeconds = modackRequestData.getDeadlineExtensionSeconds(); + String rpcOperation = deadlineExtensionSeconds == 0 ? "nack" : "modack"; + // Creates either a ModAck span or a Nack span depending on the given ack deadline + Span rpcSpan = + OpenTelemetryUtil.startSubscribeRpcSpan( + tracer, + SubscriptionName.parse(subscriptionName), + rpcOperation, + messagesInRequest, + deadlineExtensionSeconds, + modackRequestData.getIsReceiptModack(), + enableOpenTelemetryTracing); ApiFutureCallback callback = getCallback( modackRequestData.getAckRequestData(), - modackRequestData.getDeadlineExtensionSeconds(), + deadlineExtensionSeconds, true, - currentBackoffMillis); + currentBackoffMillis, + rpcSpan); ApiFuture modackFuture = subscriberStub .modifyAckDeadlineCallable() @@ -529,7 +558,8 @@ private ApiFutureCallback getCallback( List ackRequestDataList, int deadlineExtensionSeconds, boolean isModack, - long currentBackoffMillis) { + long currentBackoffMillis, + Span rpcSpan) { // This callback handles retries, and sets message futures // Check if ack or nack @@ -547,8 +577,14 @@ public void onSuccess(Empty empty) { pendingRequests.remove(ackRequestData); if (!isModack) { ackRequestData.getMessageWrapper().endSubscriberSpan(); + ackRequestData.getMessageWrapper().addAckEndEvent(); + } else if (deadlineExtensionSeconds == 0) { + ackRequestData.getMessageWrapper().addNackEndEvent(); + } else { + ackRequestData.getMessageWrapper().addModAckEndEvent(); } } + OpenTelemetryUtil.endSubscribeRpcSpan(rpcSpan, enableOpenTelemetryTracing); } @Override @@ -559,10 +595,17 @@ public void onFailure(Throwable t) { Level level = isAlive() ? Level.WARNING : Level.FINER; logger.log(level, "failed to send operations", t); + OpenTelemetryUtil.endSubscribeRpcSpan(rpcSpan, enableOpenTelemetryTracing); + if (!getExactlyOnceDeliveryEnabled()) { + if (enableOpenTelemetryTracing) { + for (AckRequestData ackRequestData : ackRequestDataList) { + ackRequestData.getMessageWrapper().endSubscriberSpan(); + ackRequestData.getMessageWrapper().addEndRpcEvent(isModack, deadlineExtensionSeconds); + } + } return; } - List ackRequestDataArrayRetryList = new ArrayList<>(); try { Map metadataMap = getMetadataMapFromThrowable(t); @@ -584,20 +627,33 @@ public void onFailure(Throwable t) { errorMessage); ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess); messageDispatcher.notifyAckFailed(ackRequestData); + ackRequestData + .getMessageWrapper() + .setSubscriberSpanException(t, "Invalid ack ID"); + ackRequestData + .getMessageWrapper() + .addEndRpcEvent(isModack, deadlineExtensionSeconds); } else { logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage); ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess); messageDispatcher.notifyAckFailed(ackRequestData); + ackRequestData + .getMessageWrapper() + .setSubscriberSpanException(t, "Unknown error message"); + ackRequestData + .getMessageWrapper() + .addEndRpcEvent(isModack, deadlineExtensionSeconds); } } else { ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); messageDispatcher.notifyAckSuccess(ackRequestData); + ackRequestData.getMessageWrapper().endSubscriberSpan(); + ackRequestData + .getMessageWrapper() + .addEndRpcEvent(isModack, deadlineExtensionSeconds); } // Remove from our pending pendingRequests.remove(ackRequestData); - if (!isModack) { - ackRequestData.getMessageWrapper().endSubscriberSpan(); - } }); } catch (InvalidProtocolBufferException e) { // If we fail to parse out the errorInfo, we should retry all