From db522b60f1bbec9cc1bfa0c37477044fd2f807c7 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Mon, 30 Sep 2024 20:04:22 -0400 Subject: [PATCH] feat: Add OpenTelemetry tracing to the Publisher and Subscriber (#2086) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Initial publish side Open Telemetry support * feat: Publish-side trace context injection * feat: Tests and improvements to publish side OTel tracing * feat: More tests and refactoring for publish-side OpenTelemetry * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: Formatting files * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: Publisher test changes * test: Fix OpenTelemetry test * Feat: Use OpenTelemetry semconv * test: Fix some dependency issues * feat: Test fix * feat: Add comment for setter in builder * Opentelemetry subscribe (#2100) * feat: Add OpenTelemetry tracing to the SubscriberClient * feat: Add link to publisher create span in the subscribe process span * feat: Add Ack/Nack/ModAck RPC spans to the subscribe * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Opentelemetry subscribe (#2101) * feat: Add OpenTelemetry tracing to the SubscriberClient * feat: Add link to publisher create span in the subscribe process span * feat: Add Ack/Nack/ModAck RPC spans to the subscribe * fix: Fix test errors caused by otel changes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: Fix build errors in Publisher * test: Ignore org.assertj:assertj-core which is required for OTel testing assertions * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * test: Add tests for subscriber OTel functions * feat: Changes to OpenTelemetry implementation to add links earlier and prevent methods from being exposed to users * feat: Refactor OpenTelemetry implementation to use a context aware wrapper for the tracer and a PubsubTracer interface * feat: Initialize default no-op PubsubTracer in Publisher and Subscriber * feat: Ensure SubscriberStreamingConnection and MessageDispatcher have default no-op tracers by default for tests * samples: Add OpenTelemetry publisher and subscriber samples * feat: Add additional sampling checks to the Otel implementation * samples: Update pom.xml for samples with Cloud Trace exporter * feat: Make OTel classes/methods package-private and remove non-generic PubsubTracer interface * feat: Lint fixes for Pub/Sub * feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names * feat: Format OTel changes * Revert "feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names" This reverts commit 305610e5a23f4f128c0750970a9b6f86540cbabe. * feat: trigger build * chore: generate libraries at Mon Sep 30 20:37:03 UTC 2024 * feat: trigger build * feat: Fix file overwrite from bad merge * chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024 * Revert "chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024" This reverts commit 5ebbbf933b79f7f3c56bba5da2b3c334f544dd4d. * chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024 * Revert "chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024" This reverts commit 23f3a70d64f0f72cf18dd3a7640125ff9027dec7. * chore: generate libraries at Mon Sep 30 21:14:11 UTC 2024 * feat: Prevent new files for OpenTelemetry from being overwritten * feat: Revert automated file deletion for OpenTelemetry changes * feat: Remove OpenTelemetry samples as the samples use a released library version to run * chore: generate libraries at Mon Sep 30 22:11:14 UTC 2024 --------- Co-authored-by: Owl Bot Co-authored-by: cloud-java-bot --- .github/.OwlBot-hermetic.yaml | 3 + google-cloud-pubsub/pom.xml | 28 + .../cloud/pubsub/v1/AckRequestData.java | 19 + .../cloud/pubsub/v1/MessageDispatcher.java | 92 ++- .../cloud/pubsub/v1/ModackRequestData.java | 10 + .../pubsub/v1/OpenTelemetryPubsubTracer.java | 460 ++++++++++++ .../com/google/cloud/pubsub/v1/Publisher.java | 87 ++- .../cloud/pubsub/v1/PubsubMessageWrapper.java | 430 +++++++++++ .../v1/StreamingSubscriberConnection.java | 102 ++- .../google/cloud/pubsub/v1/Subscriber.java | 43 ++ .../cloud/pubsub/v1/OpenTelemetryTest.java | 669 ++++++++++++++++++ .../cloud/pubsub/v1/PublisherImplTest.java | 75 ++ pom.xml | 6 + samples/install-without-bom/pom.xml | 5 + samples/snapshot/pom.xml | 5 + samples/snippets/pom.xml | 5 + 16 files changed, 2008 insertions(+), 31 deletions(-) create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java create mode 100644 google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java diff --git a/.github/.OwlBot-hermetic.yaml b/.github/.OwlBot-hermetic.yaml index 1757987e4..8a75909c6 100644 --- a/.github/.OwlBot-hermetic.yaml +++ b/.github/.OwlBot-hermetic.yaml @@ -33,6 +33,7 @@ deep-preserve-regex: - "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java" - "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java" - "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java" +- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java" - "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java" - "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java" - "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StatusUtilTest.java" @@ -51,8 +52,10 @@ deep-preserve-regex: - "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java" - "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java" - "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java" +- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java" - "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java" - "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PublisherInterface.java" +- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java" - "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java" - "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java" - "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java" diff --git a/google-cloud-pubsub/pom.xml b/google-cloud-pubsub/pom.xml index 2657c64af..46fe71d26 100644 --- a/google-cloud-pubsub/pom.xml +++ b/google-cloud-pubsub/pom.xml @@ -100,6 +100,18 @@ google-http-client runtime + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-context + + + io.opentelemetry + opentelemetry-semconv + @@ -142,6 +154,21 @@ opencensus-impl test + + io.opentelemetry + opentelemetry-sdk-trace + test + + + io.opentelemetry + opentelemetry-sdk-testing + test + + + org.assertj + assertj-core + test + com.google.api @@ -174,6 +201,7 @@ com.google.auth:google-auth-library-oauth2-http:jar io.opencensus:opencensus-impl javax.annotation:javax.annotation-api + org.assertj:assertj-core diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java index 3b67ce219..5cab83f49 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java @@ -22,10 +22,12 @@ public class AckRequestData { private final String ackId; private final Optional> messageFuture; + private PubsubMessageWrapper messageWrapper; protected AckRequestData(Builder builder) { this.ackId = builder.ackId; this.messageFuture = builder.messageFuture; + this.messageWrapper = builder.messageWrapper; } public String getAckId() { @@ -36,6 +38,17 @@ public SettableApiFuture getMessageFutureIfExists() { return this.messageFuture.orElse(null); } + /** + * Returns an empty PubsubMessageWrapper with OpenTelemetry tracing disabled. This allows methods + * that use this method to be unit tested. + */ + public PubsubMessageWrapper getMessageWrapper() { + if (this.messageWrapper == null) { + return PubsubMessageWrapper.newBuilder(null, null).build(); + } + return messageWrapper; + } + public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) { if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) { switch (ackResponse) { @@ -68,6 +81,7 @@ public static Builder newBuilder(String ackId) { protected static final class Builder { private final String ackId; private Optional> messageFuture = Optional.empty(); + private PubsubMessageWrapper messageWrapper; protected Builder(String ackId) { this.ackId = ackId; @@ -78,6 +92,11 @@ public Builder setMessageFuture(SettableApiFuture messageFuture) { return this; } + public Builder setMessageWrapper(PubsubMessageWrapper messageWrapper) { + this.messageWrapper = messageWrapper; + return this; + } + public AckRequestData build() { return new AckRequestData(this); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 1810badd2..860fcbcf9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -104,6 +104,10 @@ class MessageDispatcher { // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; + private final String subscriptionName; + private final boolean enableOpenTelemetryTracing; + private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); + /** Internal representation of a reply to a Pubsub message, to be sent back to the service. */ public enum AckReply { ACK, @@ -157,6 +161,7 @@ public void onFailure(Throwable t) { t); this.ackRequestData.setResponse(AckResponse.OTHER, false); pendingNacks.add(this.ackRequestData); + tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack"); forget(); } @@ -169,9 +174,11 @@ public void onSuccess(AckReply reply) { ackLatencyDistribution.record( Ints.saturatedCast( (long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D))); + tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack"); break; case NACK: pendingNacks.add(this.ackRequestData); + tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack"); break; default: throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply)); @@ -217,6 +224,12 @@ private MessageDispatcher(Builder builder) { jobLock = new ReentrantLock(); messagesWaiter = new Waiter(); sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor); + + subscriptionName = builder.subscriptionName; + enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; + if (builder.tracer != null) { + tracer = builder.tracer; + } } private boolean shouldSetMessageFuture() { @@ -351,13 +364,15 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) { } private static class OutstandingMessage { - private final ReceivedMessage receivedMessage; private final AckHandler ackHandler; - private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { - this.receivedMessage = receivedMessage; + private OutstandingMessage(AckHandler ackHandler) { this.ackHandler = ackHandler; } + + public PubsubMessageWrapper messageWrapper() { + return this.ackHandler.ackRequestData.getMessageWrapper(); + } } private static class ReceiptCompleteData { @@ -390,10 +405,20 @@ void processReceivedMessages(List messages) { if (shouldSetMessageFuture()) { builder.setMessageFuture(SettableApiFuture.create()); } + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder( + message.getMessage(), + subscriptionName, + message.getAckId(), + message.getDeliveryAttempt()) + .build(); + builder.setMessageWrapper(messageWrapper); + tracer.startSubscriberSpan(messageWrapper, this.exactlyOnceDeliveryEnabled.get()); + AckRequestData ackRequestData = builder.build(); AckHandler ackHandler = new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration); - OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler); + OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler); if (this.exactlyOnceDeliveryEnabled.get()) { // For exactly once deliveries we don't add to outstanding batch because we first @@ -457,30 +482,40 @@ private void processBatch(List batch) { for (OutstandingMessage message : batch) { // This is a blocking flow controller. We have already incremented messagesWaiter, so // shutdown will block on processing of all these messages anyway. + tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper()); try { - flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize()); + flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize()); + tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper()); } catch (FlowControlException unexpectedException) { // This should be a blocking flow controller and never throw an exception. + tracer.setSubscribeConcurrencyControlSpanException( + message.messageWrapper(), unexpectedException); throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } - processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler); + addDeliveryInfoCount(message.messageWrapper()); + processOutstandingMessage(message.ackHandler); } } - private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) { - PubsubMessage originalMessage = receivedMessage.getMessage(); - int deliveryAttempt = receivedMessage.getDeliveryAttempt(); + private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) { + PubsubMessage originalMessage = messageWrapper.getPubsubMessage(); + int deliveryAttempt = messageWrapper.getDeliveryAttempt(); // Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In // this case, do not populate the PubsubMessage with the delivery attempt attribute. if (deliveryAttempt > 0) { - return PubsubMessage.newBuilder(originalMessage) - .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) - .build(); + messageWrapper.setPubsubMessage( + PubsubMessage.newBuilder(originalMessage) + .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) + .build()); } - return originalMessage; } - private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) { + private void processOutstandingMessage(final AckHandler ackHandler) { + // Get the PubsubMessageWrapper and the PubsubMessage it wraps that are stored withing the + // AckHandler object. + PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper(); + PubsubMessage message = messageWrapper.getPubsubMessage(); + // This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection // use below in the consumers SettableApiFuture ackReplySettableApiFuture = SettableApiFuture.create(); @@ -499,8 +534,10 @@ public void run() { // so it was probably sent to someone else. Don't work on it. // Don't nack it either, because we'd be nacking someone else's message. ackHandler.forget(); + tracer.setSubscriberSpanExpirationResult(messageWrapper); return; } + tracer.startSubscribeProcessSpan(messageWrapper); if (shouldSetMessageFuture()) { // This is the message future that is propagated to the user SettableApiFuture messageFuture = @@ -521,7 +558,9 @@ public void run() { if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) { executor.execute(deliverMessageTask); } else { + tracer.startSubscribeSchedulerSpan(messageWrapper); sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask); + tracer.endSubscribeSchedulerSpan(messageWrapper); } } @@ -607,8 +646,10 @@ void processOutstandingOperations() { List ackRequestDataReceipts = new ArrayList(); pendingReceipts.drainTo(ackRequestDataReceipts); if (!ackRequestDataReceipts.isEmpty()) { - modackRequestData.add( - new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts)); + ModackRequestData receiptModack = + new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts); + receiptModack.setIsReceiptModack(true); + modackRequestData.add(receiptModack); } logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size()); @@ -645,6 +686,10 @@ public static final class Builder { private ScheduledExecutorService systemExecutor; private ApiClock clock; + private String subscriptionName; + private boolean enableOpenTelemetryTracing; + private OpenTelemetryPubsubTracer tracer; + protected Builder(MessageReceiver receiver) { this.receiver = receiver; } @@ -715,6 +760,21 @@ public Builder setApiClock(ApiClock clock) { return this; } + public Builder setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + return this; + } + + public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) { + this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; + return this; + } + + public Builder setTracer(OpenTelemetryPubsubTracer tracer) { + this.tracer = tracer; + return this; + } + public MessageDispatcher build() { return new MessageDispatcher(this); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java index b4d2dae0f..54c7436af 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java @@ -21,6 +21,7 @@ class ModackRequestData { private final int deadlineExtensionSeconds; private List ackRequestData; + private boolean isReceiptModack; ModackRequestData(int deadlineExtensionSeconds) { this.deadlineExtensionSeconds = deadlineExtensionSeconds; @@ -45,8 +46,17 @@ public List getAckRequestData() { return ackRequestData; } + public boolean getIsReceiptModack() { + return isReceiptModack; + } + public ModackRequestData addAckRequestData(AckRequestData ackRequestData) { this.ackRequestData.add(ackRequestData); return this; } + + public ModackRequestData setIsReceiptModack(boolean isReceiptModack) { + this.isReceiptModack = isReceiptModack; + return this; + } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java new file mode 100644 index 000000000..b946f44bf --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -0,0 +1,460 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.List; + +public class OpenTelemetryPubsubTracer { + private final Tracer tracer; + private boolean enabled = false; + + private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; + private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; + private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME = + "subscriber concurrency control"; + private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; + + private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; + private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; + private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; + private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = + "messaging.gcp_pubsub.message.exactly_once_delivery"; + private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = + "messaging.gcp_pubsub.message.delivery_attempt"; + private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; + private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; + private static final String PROJECT_ATTR_KEY = "gcp.project_id"; + private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; + + private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; + + OpenTelemetryPubsubTracer(Tracer tracer, boolean enableOpenTelemetry) { + this.tracer = tracer; + if (this.tracer != null && enableOpenTelemetry) { + this.enabled = true; + } + } + + /** Populates attributes that are common the publisher parent span and publish RPC span. */ + private static final AttributesBuilder createCommonSpanAttributesBuilder( + String destinationName, String projectName, String codeFunction, String operation) { + AttributesBuilder attributesBuilder = + Attributes.builder() + .put(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .put(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName) + .put(PROJECT_ATTR_KEY, projectName) + .put(SemanticAttributes.CODE_FUNCTION, codeFunction); + if (operation != null) { + attributesBuilder.put(SemanticAttributes.MESSAGING_OPERATION, operation); + } + + return attributesBuilder; + } + + private Span startChildSpan(String name, Span parent) { + return tracer.spanBuilder(name).setParent(Context.current().with(parent)).startSpan(); + } + + /** + * Creates and starts the parent span with the appropriate span attributes and injects the span + * context into the {@link PubsubMessage} attributes. + */ + void startPublisherSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + AttributesBuilder attributesBuilder = + createCommonSpanAttributesBuilder( + message.getTopicName(), message.getTopicProject(), "publish", "create"); + + attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()); + if (!message.getOrderingKey().isEmpty()) { + attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); + } + + Span publisherSpan = + tracer + .spanBuilder(message.getTopicName() + " create") + .setSpanKind(SpanKind.PRODUCER) + .setAllAttributes(attributesBuilder.build()) + .startSpan(); + + message.setPublisherSpan(publisherSpan); + if (publisherSpan.getSpanContext().isValid()) { + message.injectSpanContext(); + } + } + + void endPublisherSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endPublisherSpan(); + } + + void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) { + if (!enabled) { + return; + } + message.setPublisherMessageIdSpanAttribute(messageId); + } + + /** Creates a span for publish-side flow control as a child of the parent publisher span. */ + void startPublishFlowControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span publisherSpan = message.getPublisherSpan(); + if (publisherSpan != null) + message.setPublishFlowControlSpan( + startChildSpan(PUBLISH_FLOW_CONTROL_SPAN_NAME, publisherSpan)); + } + + void endPublishFlowControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endPublishFlowControlSpan(); + } + + void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) { + if (!enabled) { + return; + } + message.setPublishFlowControlSpanException(t); + } + + /** Creates a span for publish message batching as a child of the parent publisher span. */ + void startPublishBatchingSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span publisherSpan = message.getPublisherSpan(); + if (publisherSpan != null) { + message.setPublishBatchingSpan(startChildSpan(PUBLISH_BATCHING_SPAN_NAME, publisherSpan)); + } + } + + void endPublishBatchingSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endPublishBatchingSpan(); + } + + /** + * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional + * links with the publisher parent span are created for sampled messages in the batch. + */ + Span startPublishRpcSpan(String topic, List messages) { + if (!enabled) { + return null; + } + TopicName topicName = TopicName.parse(topic); + Attributes attributes = + createCommonSpanAttributesBuilder( + topicName.getTopic(), topicName.getProject(), "publishCall", "publish") + .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()) + .build(); + SpanBuilder publishRpcSpanBuilder = + tracer + .spanBuilder(topicName.getTopic() + PUBLISH_RPC_SPAN_SUFFIX) + .setSpanKind(SpanKind.CLIENT) + .setAllAttributes(attributes); + Attributes linkAttributes = + Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build(); + for (PubsubMessageWrapper message : messages) { + if (message.getPublisherSpan().getSpanContext().isSampled()) + publishRpcSpanBuilder.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes); + } + Span publishRpcSpan = publishRpcSpanBuilder.startSpan(); + + for (PubsubMessageWrapper message : messages) { + if (publishRpcSpan.getSpanContext().isSampled()) { + message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes); + message.addPublishStartEvent(); + } + } + return publishRpcSpan; + } + + /** Ends the given publish RPC span if it exists. */ + void endPublishRpcSpan(Span publishRpcSpan) { + if (!enabled) { + return; + } + if (publishRpcSpan != null) { + publishRpcSpan.end(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown when publishing the + * message batch. + */ + void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) { + if (!enabled) { + return; + } + if (publishRpcSpan != null) { + publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC."); + publishRpcSpan.recordException(t); + publishRpcSpan.end(); + } + } + + void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDeliveryEnabled) { + if (!enabled) { + return; + } + AttributesBuilder attributesBuilder = + createCommonSpanAttributesBuilder( + message.getSubscriptionName(), message.getSubscriptionProject(), "onResponse", null); + + attributesBuilder + .put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId()) + .put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()) + .put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId()) + .put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled); + if (!message.getOrderingKey().isEmpty()) { + attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); + } + if (message.getDeliveryAttempt() > 0) { + attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt()); + } + Attributes attributes = attributesBuilder.build(); + Context publisherSpanContext = message.extractSpanContext(attributes); + message.setPublisherSpan(Span.fromContextOrNull(publisherSpanContext)); + message.setSubscriberSpan( + tracer + .spanBuilder(message.getSubscriptionName() + " subscribe") + .setSpanKind(SpanKind.CONSUMER) + .setParent(publisherSpanContext) + .setAllAttributes(attributes) + .startSpan()); + } + + void endSubscriberSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endSubscriberSpan(); + } + + void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.setSubscriberSpanExpirationResult(); + } + + void setSubscriberSpanException(PubsubMessageWrapper message, Throwable t, String exception) { + if (!enabled) { + return; + } + message.setSubscriberSpanException(t, exception); + } + + /** Creates a span for subscribe concurrency control as a child of the parent subscriber span. */ + void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span subscriberSpan = message.getSubscriberSpan(); + if (subscriberSpan != null) { + message.setSubscribeConcurrencyControlSpan( + startChildSpan(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME, subscriberSpan)); + } + } + + void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endSubscribeConcurrencyControlSpan(); + } + + void setSubscribeConcurrencyControlSpanException(PubsubMessageWrapper message, Throwable t) { + if (!enabled) { + return; + } + message.setSubscribeConcurrencyControlSpanException(t); + } + + /** + * Creates a span for subscribe ordering key scheduling as a child of the parent subscriber span. + */ + void startSubscribeSchedulerSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span subscriberSpan = message.getSubscriberSpan(); + if (subscriberSpan != null) { + message.setSubscribeSchedulerSpan( + startChildSpan(SUBSCRIBE_SCHEDULER_SPAN_NAME, subscriberSpan)); + } + } + + void endSubscribeSchedulerSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endSubscribeSchedulerSpan(); + } + + /** Creates a span for subscribe message processing as a child of the parent subscriber span. */ + void startSubscribeProcessSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span subscriberSpan = message.getSubscriberSpan(); + if (subscriberSpan != null) { + Span subscribeProcessSpan = + startChildSpan(message.getSubscriptionName() + " process", subscriberSpan); + subscribeProcessSpan.setAttribute( + SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE); + Span publisherSpan = message.getPublisherSpan(); + if (publisherSpan != null) { + subscribeProcessSpan.addLink(publisherSpan.getSpanContext()); + } + message.setSubscribeProcessSpan(subscribeProcessSpan); + } + } + + void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { + if (!enabled) { + return; + } + message.endSubscribeProcessSpan(action); + } + + /** + * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links + * to parent subscribe span for sampled messages are added. + */ + Span startSubscribeRpcSpan( + String subscription, + String rpcOperation, + List messages, + int ackDeadline, + boolean isReceiptModack) { + if (!enabled) { + return null; + } + String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations"; + SubscriptionName subscriptionName = SubscriptionName.parse(subscription); + AttributesBuilder attributesBuilder = + createCommonSpanAttributesBuilder( + subscriptionName.getSubscription(), + subscriptionName.getProject(), + codeFunction, + rpcOperation) + .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()); + + // Ack deadline and receipt modack are specific to the modack operation + if (rpcOperation == "modack") { + attributesBuilder + .put(ACK_DEADLINE_ATTR_KEY, ackDeadline) + .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack); + } + + SpanBuilder rpcSpanBuilder = + tracer + .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation) + .setSpanKind(SpanKind.CLIENT) + .setAllAttributes(attributesBuilder.build()); + Attributes linkAttributes = + Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation).build(); + for (PubsubMessageWrapper message : messages) { + if (message.getSubscriberSpan().getSpanContext().isSampled()) { + rpcSpanBuilder.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes); + } + } + Span rpcSpan = rpcSpanBuilder.startSpan(); + + for (PubsubMessageWrapper message : messages) { + if (rpcSpan.getSpanContext().isSampled()) { + message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes); + switch (rpcOperation) { + case "ack": + message.addAckStartEvent(); + break; + case "modack": + message.addModAckStartEvent(); + break; + case "nack": + message.addNackStartEvent(); + break; + } + } + } + return rpcSpan; + } + + /** Ends the given subscribe RPC span if it exists. */ + void endSubscribeRpcSpan(Span rpcSpan) { + if (!enabled) { + return; + } + if (rpcSpan != null) { + rpcSpan.end(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown when handling a + * subscribe-side RPC. + */ + void setSubscribeRpcSpanException(Span rpcSpan, boolean isModack, int ackDeadline, Throwable t) { + if (!enabled) { + return; + } + if (rpcSpan != null) { + String operation = !isModack ? "ack" : (ackDeadline == 0 ? "nack" : "modack"); + rpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on " + operation + " RPC."); + rpcSpan.recordException(t); + rpcSpan.end(); + } + } + + /** Adds the appropriate subscribe-side RPC end event. */ + void addEndRpcEvent( + PubsubMessageWrapper message, boolean rpcSampled, boolean isModack, int ackDeadline) { + if (!enabled || !rpcSampled) { + return; + } + if (!isModack) { + message.addAckEndEvent(); + } else if (ackDeadline == 0) { + message.addNackEndEvent(); + } else { + message.addModAckEndEvent(); + } + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index efaba6cf1..99d0be17b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -52,6 +52,9 @@ import com.google.pubsub.v1.TopicName; import com.google.pubsub.v1.TopicNames; import io.grpc.CallOptions; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -93,6 +96,8 @@ public class Publisher implements PublisherInterface { private static final String GZIP_COMPRESSION = "gzip"; + private static final String OPEN_TELEMETRY_TRACER_NAME = "com.google.cloud.pubsub.v1"; + private final String topicName; private final BatchingSettings batchingSettings; @@ -124,6 +129,10 @@ public class Publisher implements PublisherInterface { private final GrpcCallContext publishContext; private final GrpcCallContext publishContextWithCompression; + private final boolean enableOpenTelemetryTracing; + private final OpenTelemetry openTelemetry; + private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); + /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { return 1000L; @@ -152,6 +161,15 @@ private Publisher(Builder builder) throws IOException { this.messageTransform = builder.messageTransform; this.enableCompression = builder.enableCompression; this.compressionBytesThreshold = builder.compressionBytesThreshold; + this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; + this.openTelemetry = builder.openTelemetry; + if (this.openTelemetry != null && this.enableOpenTelemetryTracing) { + Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); + if (openTelemetryTracer != null) { + this.tracer = + new OpenTelemetryPubsubTracer(openTelemetryTracer, this.enableOpenTelemetryTracing); + } + } messagesBatches = new HashMap<>(); messagesBatchLock = new ReentrantLock(); @@ -259,17 +277,23 @@ public ApiFuture publish(PubsubMessage message) { + "Publisher client. Please create a Publisher client with " + "setEnableMessageOrdering(true) in the builder."); - final OutstandingPublish outstandingPublish = - new OutstandingPublish(messageTransform.apply(message)); + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicName).build(); + tracer.startPublisherSpan(messageWrapper); + + final OutstandingPublish outstandingPublish = new OutstandingPublish(messageWrapper); if (flowController != null) { + tracer.startPublishFlowControlSpan(messageWrapper); try { flowController.acquire(outstandingPublish.messageSize); + tracer.endPublishFlowControlSpan(messageWrapper); } catch (FlowController.FlowControlException e) { if (!orderingKey.isEmpty()) { sequentialExecutor.stopPublish(orderingKey); } outstandingPublish.publishResult.setException(e); + tracer.setPublishFlowControlSpanException(messageWrapper, e); return outstandingPublish.publishResult; } } @@ -277,6 +301,7 @@ public ApiFuture publish(PubsubMessage message) { List batchesToSend; messagesBatchLock.lock(); try { + tracer.startPublishBatchingSpan(messageWrapper); if (!orderingKey.isEmpty() && sequentialExecutor.keyHasError(orderingKey)) { outstandingPublish.publishResult.setException( SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); @@ -452,12 +477,23 @@ private ApiFuture publishCall(OutstandingBatch outstandingBatch if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) { context = publishContextWithCompression; } + + int numMessagesInBatch = outstandingBatch.size(); + List pubsubMessagesList = new ArrayList(numMessagesInBatch); + List messageWrappers = outstandingBatch.getMessageWrappers(); + for (PubsubMessageWrapper messageWrapper : messageWrappers) { + tracer.endPublishBatchingSpan(messageWrapper); + pubsubMessagesList.add(messageWrapper.getPubsubMessage()); + } + + outstandingBatch.publishRpcSpan = tracer.startPublishRpcSpan(topicName, messageWrappers); + return publisherStub .publishCallable() .futureCall( PublishRequest.newBuilder() .setTopic(topicName) - .addAllMessages(outstandingBatch.getMessages()) + .addAllMessages(pubsubMessagesList) .build(), context); } @@ -541,6 +577,7 @@ private final class OutstandingBatch { int attempt; int batchSizeBytes; final String orderingKey; + Span publishRpcSpan; OutstandingBatch( List outstandingPublishes, int batchSizeBytes, String orderingKey) { @@ -555,24 +592,29 @@ int size() { return outstandingPublishes.size(); } - private List getMessages() { - List results = new ArrayList<>(outstandingPublishes.size()); + private List getMessageWrappers() { + List results = new ArrayList<>(outstandingPublishes.size()); for (OutstandingPublish outstandingPublish : outstandingPublishes) { - results.add(outstandingPublish.message); + results.add(outstandingPublish.messageWrapper); } return results; } private void onFailure(Throwable t) { + tracer.setPublishRpcSpanException(publishRpcSpan, t); + for (OutstandingPublish outstandingPublish : outstandingPublishes) { if (flowController != null) { flowController.release(outstandingPublish.messageSize); } outstandingPublish.publishResult.setException(t); + tracer.endPublisherSpan(outstandingPublish.messageWrapper); } } private void onSuccess(Iterable results) { + tracer.endPublishRpcSpan(publishRpcSpan); + Iterator messagesResultsIt = outstandingPublishes.iterator(); for (String messageId : results) { OutstandingPublish nextPublish = messagesResultsIt.next(); @@ -580,19 +622,21 @@ private void onSuccess(Iterable results) { flowController.release(nextPublish.messageSize); } nextPublish.publishResult.set(messageId); + tracer.setPublisherMessageIdSpanAttribute(nextPublish.messageWrapper, messageId); + tracer.endPublisherSpan(nextPublish.messageWrapper); } } } private static final class OutstandingPublish { final SettableApiFuture publishResult; - final PubsubMessage message; + final PubsubMessageWrapper messageWrapper; final int messageSize; - OutstandingPublish(PubsubMessage message) { + OutstandingPublish(PubsubMessageWrapper messageWrapper) { this.publishResult = SettableApiFuture.create(); - this.message = message; - this.messageSize = message.getSerializedSize(); + this.messageWrapper = messageWrapper; + this.messageSize = messageWrapper.getPubsubMessage().getSerializedSize(); } } @@ -749,6 +793,9 @@ public PubsubMessage apply(PubsubMessage input) { private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION; private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD; + private boolean enableOpenTelemetryTracing = false; + private OpenTelemetry openTelemetry = null; + private Builder(String topic) { this.topicName = Preconditions.checkNotNull(topic); } @@ -880,6 +927,26 @@ public Builder setCompressionBytesThreshold(long compressionBytesThreshold) { return this; } + /** + * OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of + * OpenTelemetry has been provied. Warning: traces are subject to change. The name and + * attributes of a span might change without notice. Only use run traces interactively. Don't + * use in automation. Running non-interactive traces can cause problems if the underlying trace + * architecture changes without notice. + */ + + /** Gives the ability to enable Open Telemetry Tracing */ + public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) { + this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; + return this; + } + + /** Sets the instance of OpenTelemetry for the Publisher class. */ + public Builder setOpenTelemetry(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + return this; + } + /** Returns the default BatchingSettings used by the client if settings are not provided. */ public static BatchingSettings getDefaultBatchingSettings() { return DEFAULT_BATCHING_SETTINGS; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java new file mode 100644 index 000000000..94fd13085 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -0,0 +1,430 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; + +/** + * A wrapper class for a {@link PubsubMessage} object that handles creation and tracking of + * OpenTelemetry {@link Span} objects for different operations that occur during publishing. + */ +public class PubsubMessageWrapper { + private PubsubMessage message; + + private final TopicName topicName; + private final SubscriptionName subscriptionName; + + // Attributes set only for messages received from a streaming pull response. + private final String ackId; + private final int deliveryAttempt; + + private static final String PUBLISH_START_EVENT = "publish start"; + private static final String PUBLISH_END_EVENT = "publish end"; + + private static final String MODACK_START_EVENT = "modack start"; + private static final String MODACK_END_EVENT = "modack end"; + private static final String NACK_START_EVENT = "nack start"; + private static final String NACK_END_EVENT = "nack end"; + private static final String ACK_START_EVENT = "ack start"; + private static final String ACK_END_EVENT = "ack end"; + + private static final String GOOGCLIENT_PREFIX = "googclient_"; + + private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result"; + + private Span publisherSpan; + private Span publishFlowControlSpan; + private Span publishBatchingSpan; + + private Span subscriberSpan; + private Span subscribeConcurrencyControlSpan; + private Span subscribeSchedulerSpan; + private Span subscribeProcessSpan; + + private PubsubMessageWrapper(Builder builder) { + this.message = builder.message; + this.topicName = builder.topicName; + this.subscriptionName = builder.subscriptionName; + this.ackId = builder.ackId; + this.deliveryAttempt = builder.deliveryAttempt; + } + + static Builder newBuilder(PubsubMessage message, String topicName) { + return new Builder(message, topicName); + } + + static Builder newBuilder( + PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { + return new Builder(message, subscriptionName, ackId, deliveryAttempt); + } + + /** Returns the PubsubMessage associated with this wrapper. */ + PubsubMessage getPubsubMessage() { + return message; + } + + void setPubsubMessage(PubsubMessage message) { + this.message = message; + } + + /** Returns the TopicName for this wrapper as a string. */ + String getTopicName() { + if (topicName != null) { + return topicName.getTopic(); + } + return ""; + } + + String getTopicProject() { + if (topicName != null) { + return topicName.getProject(); + } + return ""; + } + + /** Returns the SubscriptionName for this wrapper as a string. */ + String getSubscriptionName() { + if (subscriptionName != null) { + return subscriptionName.getSubscription(); + } + return ""; + } + + String getSubscriptionProject() { + if (subscriptionName != null) { + return subscriptionName.getProject(); + } + return ""; + } + + String getMessageId() { + return message.getMessageId(); + } + + String getAckId() { + return ackId; + } + + int getDataSize() { + return message.getData().size(); + } + + String getOrderingKey() { + return message.getOrderingKey(); + } + + int getDeliveryAttempt() { + return deliveryAttempt; + } + + Span getPublisherSpan() { + return publisherSpan; + } + + void setPublisherSpan(Span span) { + this.publisherSpan = span; + } + + void setPublishFlowControlSpan(Span span) { + this.publishFlowControlSpan = span; + } + + void setPublishBatchingSpan(Span span) { + this.publishBatchingSpan = span; + } + + Span getSubscriberSpan() { + return subscriberSpan; + } + + void setSubscriberSpan(Span span) { + this.subscriberSpan = span; + } + + void setSubscribeConcurrencyControlSpan(Span span) { + this.subscribeConcurrencyControlSpan = span; + } + + void setSubscribeSchedulerSpan(Span span) { + this.subscribeSchedulerSpan = span; + } + + void setSubscribeProcessSpan(Span span) { + this.subscribeProcessSpan = span; + } + + /** Creates a publish start event that is tied to the publish RPC span time. */ + void addPublishStartEvent() { + if (publisherSpan != null) { + publisherSpan.addEvent(PUBLISH_START_EVENT); + } + } + + /** + * Sets the message ID attribute in the publisher parent span. This is called after the publish + * RPC returns with a message ID. + */ + void setPublisherMessageIdSpanAttribute(String messageId) { + if (publisherSpan != null) { + publisherSpan.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId); + } + } + + /** Ends the publisher parent span if it exists. */ + void endPublisherSpan() { + if (publisherSpan != null) { + publisherSpan.addEvent(PUBLISH_END_EVENT); + publisherSpan.end(); + } + } + + /** Ends the publish flow control span if it exists. */ + void endPublishFlowControlSpan() { + if (publishFlowControlSpan != null) { + publishFlowControlSpan.end(); + } + } + + /** Ends the publish batching span if it exists. */ + void endPublishBatchingSpan() { + if (publishBatchingSpan != null) { + publishBatchingSpan.end(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown during flow control. + */ + void setPublishFlowControlSpanException(Throwable t) { + if (publishFlowControlSpan != null) { + publishFlowControlSpan.setStatus( + StatusCode.ERROR, "Exception thrown during publish flow control."); + publishFlowControlSpan.recordException(t); + endAllPublishSpans(); + } + } + + /** + * Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding + * RPC span start and end times. + */ + void addModAckStartEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(MODACK_START_EVENT); + } + } + + void addModAckEndEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(MODACK_END_EVENT); + } + } + + void addNackStartEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(NACK_START_EVENT); + } + } + + void addNackEndEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(NACK_END_EVENT); + } + } + + void addAckStartEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(ACK_START_EVENT); + } + } + + void addAckEndEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(ACK_END_EVENT); + } + } + + /** Ends the subscriber parent span if exists. */ + void endSubscriberSpan() { + if (subscriberSpan != null) { + subscriberSpan.end(); + } + } + + /** Ends the subscribe concurreny control span if exists. */ + void endSubscribeConcurrencyControlSpan() { + if (subscribeConcurrencyControlSpan != null) { + subscribeConcurrencyControlSpan.end(); + } + } + + /** Ends the subscribe scheduler span if exists. */ + void endSubscribeSchedulerSpan() { + if (subscribeSchedulerSpan != null) { + subscribeSchedulerSpan.end(); + } + } + + /** + * Ends the subscribe process span if it exists, creates an event with the appropriate result, and + * sets the result on the parent subscriber span. + */ + void endSubscribeProcessSpan(String action) { + if (subscribeProcessSpan != null) { + subscribeProcessSpan.addEvent(action + " called"); + subscribeProcessSpan.end(); + subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, action); + } + } + + /** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */ + void setSubscriberSpanException(Throwable t, String exception) { + if (subscriberSpan != null) { + subscriberSpan.setStatus(StatusCode.ERROR, exception); + subscriberSpan.recordException(t); + endAllSubscribeSpans(); + } + } + + /** Sets result of the parent subscriber span to expired and ends its. */ + void setSubscriberSpanExpirationResult() { + if (subscriberSpan != null) { + subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, "expired"); + endSubscriberSpan(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown subscriber + * concurrency control. + */ + void setSubscribeConcurrencyControlSpanException(Throwable t) { + if (subscribeConcurrencyControlSpan != null) { + subscribeConcurrencyControlSpan.setStatus( + StatusCode.ERROR, "Exception thrown during subscribe concurrency control."); + subscribeConcurrencyControlSpan.recordException(t); + endAllSubscribeSpans(); + } + } + + /** Ends all publisher-side spans associated with this message wrapper. */ + private void endAllPublishSpans() { + endPublishFlowControlSpan(); + endPublishBatchingSpan(); + endPublisherSpan(); + } + + /** Ends all subscriber-side spans associated with this message wrapper. */ + private void endAllSubscribeSpans() { + endSubscribeConcurrencyControlSpan(); + endSubscribeSchedulerSpan(); + endSubscriberSpan(); + } + + /** + * Injects the span context into the attributes of a Pub/Sub message for propagation to the + * subscriber client. + */ + void injectSpanContext() { + TextMapSetter injectMessageAttributes = + new TextMapSetter() { + @Override + public void set(PubsubMessageWrapper carrier, String key, String value) { + PubsubMessage newMessage = + PubsubMessage.newBuilder(carrier.message) + .putAttributes(GOOGCLIENT_PREFIX + key, value) + .build(); + carrier.message = newMessage; + } + }; + W3CTraceContextPropagator.getInstance() + .inject(Context.current().with(publisherSpan), this, injectMessageAttributes); + } + + /** + * Extracts the span context from the attributes of a Pub/Sub message and creates the parent + * subscriber span using that context. + */ + Context extractSpanContext(Attributes attributes) { + TextMapGetter extractMessageAttributes = + new TextMapGetter() { + @Override + public String get(PubsubMessageWrapper carrier, String key) { + return carrier.message.getAttributesOrDefault(GOOGCLIENT_PREFIX + key, ""); + } + + public Iterable keys(PubsubMessageWrapper carrier) { + return carrier.message.getAttributesMap().keySet(); + } + }; + Context context = + W3CTraceContextPropagator.getInstance() + .extract(Context.current(), this, extractMessageAttributes); + return context; + } + + /** Builder of {@link PubsubMessageWrapper PubsubMessageWrapper}. */ + static final class Builder { + private PubsubMessage message = null; + private TopicName topicName = null; + private SubscriptionName subscriptionName = null; + private String ackId = null; + private int deliveryAttempt = 0; + + public Builder(PubsubMessage message, String topicName) { + this.message = message; + if (topicName != null) { + this.topicName = TopicName.parse(topicName); + } + } + + public Builder( + PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { + this.message = message; + if (subscriptionName != null) { + this.subscriptionName = SubscriptionName.parse(subscriptionName); + } + this.ackId = ackId; + this.deliveryAttempt = deliveryAttempt; + } + + public Builder( + PubsubMessage message, + SubscriptionName subscriptionName, + String ackId, + int deliveryAttempt) { + this.message = message; + this.subscriptionName = subscriptionName; + this.ackId = ackId; + this.deliveryAttempt = deliveryAttempt; + } + + public PubsubMessageWrapper build() { + return new PubsubMessageWrapper(this); + } + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 7849bdb74..60da55cee 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -50,6 +50,7 @@ import com.google.rpc.ErrorInfo; import io.grpc.Status; import io.grpc.protobuf.StatusProto; +import io.opentelemetry.api.trace.Span; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -118,6 +119,9 @@ final class StreamingSubscriberConnection extends AbstractApiService implements */ private final String clientId = UUID.randomUUID().toString(); + private final boolean enableOpenTelemetryTracing; + private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); + private StreamingSubscriberConnection(Builder builder) { subscription = builder.subscription; systemExecutor = builder.systemExecutor; @@ -151,6 +155,11 @@ private StreamingSubscriberConnection(Builder builder) { messageDispatcherBuilder = MessageDispatcher.newBuilder(builder.receiverWithAckResponse); } + enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; + if (builder.tracer != null) { + tracer = builder.tracer; + } + messageDispatcher = messageDispatcherBuilder .setAckProcessor(this) @@ -165,6 +174,9 @@ private StreamingSubscriberConnection(Builder builder) { .setExecutor(builder.executor) .setSystemExecutor(builder.systemExecutor) .setApiClock(builder.clock) + .setSubscriptionName(subscription) + .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing) + .setTracer(tracer) .build(); flowControlSettings = builder.flowControlSettings; @@ -432,15 +444,19 @@ private void sendAckOperations( for (List ackRequestDataInRequestList : Lists.partition(ackRequestDataList, MAX_PER_REQUEST_CHANGES)) { List ackIdsInRequest = new ArrayList<>(); + List messagesInRequest = new ArrayList<>(); for (AckRequestData ackRequestData : ackRequestDataInRequestList) { ackIdsInRequest.add(ackRequestData.getAckId()); + messagesInRequest.add(ackRequestData.getMessageWrapper()); if (ackRequestData.hasMessageFuture()) { // Add to our pending requests if we care about the response pendingRequests.add(ackRequestData); } } + // Creates an Ack span to be passed to the callback + Span rpcSpan = tracer.startSubscribeRpcSpan(subscription, "ack", messagesInRequest, 0, false); ApiFutureCallback callback = - getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis); + getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan); ApiFuture ackFuture = subscriberStub .acknowledgeCallable() @@ -463,19 +479,32 @@ private void sendModackOperations( for (List ackRequestDataInRequestList : Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) { List ackIdsInRequest = new ArrayList<>(); + List messagesInRequest = new ArrayList<>(); for (AckRequestData ackRequestData : ackRequestDataInRequestList) { ackIdsInRequest.add(ackRequestData.getAckId()); + messagesInRequest.add(ackRequestData.getMessageWrapper()); if (ackRequestData.hasMessageFuture()) { // Add to our pending requests if we care about the response pendingRequests.add(ackRequestData); } } + int deadlineExtensionSeconds = modackRequestData.getDeadlineExtensionSeconds(); + String rpcOperation = deadlineExtensionSeconds == 0 ? "nack" : "modack"; + // Creates either a ModAck span or a Nack span depending on the given ack deadline + Span rpcSpan = + tracer.startSubscribeRpcSpan( + subscription, + rpcOperation, + messagesInRequest, + deadlineExtensionSeconds, + modackRequestData.getIsReceiptModack()); ApiFutureCallback callback = getCallback( modackRequestData.getAckRequestData(), - modackRequestData.getDeadlineExtensionSeconds(), + deadlineExtensionSeconds, true, - currentBackoffMillis); + currentBackoffMillis, + rpcSpan); ApiFuture modackFuture = subscriberStub .modifyAckDeadlineCallable() @@ -517,22 +546,36 @@ private ApiFutureCallback getCallback( List ackRequestDataList, int deadlineExtensionSeconds, boolean isModack, - long currentBackoffMillis) { + long currentBackoffMillis, + Span rpcSpan) { // This callback handles retries, and sets message futures // Check if ack or nack boolean setResponseOnSuccess = (!isModack || (deadlineExtensionSeconds == 0)) ? true : false; + boolean rpcSpanSampled = rpcSpan == null ? false : rpcSpan.getSpanContext().isSampled(); + return new ApiFutureCallback() { @Override public void onSuccess(Empty empty) { ackOperationsWaiter.incrementPendingCount(-1); + + tracer.endSubscribeRpcSpan(rpcSpan); + for (AckRequestData ackRequestData : ackRequestDataList) { // This will check if a response is needed, and if it has already been set ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); messageDispatcher.notifyAckSuccess(ackRequestData); // Remove from our pending operations pendingRequests.remove(ackRequestData); + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); + if (!isModack || deadlineExtensionSeconds == 0) { + tracer.endSubscriberSpan(ackRequestData.getMessageWrapper()); + } } } @@ -544,10 +587,23 @@ public void onFailure(Throwable t) { Level level = isAlive() ? Level.WARNING : Level.FINER; logger.log(level, "failed to send operations", t); + tracer.setSubscribeRpcSpanException(rpcSpan, isModack, deadlineExtensionSeconds, t); + if (!getExactlyOnceDeliveryEnabled()) { + if (enableOpenTelemetryTracing) { + for (AckRequestData ackRequestData : ackRequestDataList) { + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); + if (!isModack || deadlineExtensionSeconds == 0) { + tracer.endSubscriberSpan(ackRequestData.getMessageWrapper()); + } + } + } return; } - List ackRequestDataArrayRetryList = new ArrayList<>(); try { Map metadataMap = getMetadataMapFromThrowable(t); @@ -569,14 +625,37 @@ public void onFailure(Throwable t) { errorMessage); ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess); messageDispatcher.notifyAckFailed(ackRequestData); + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); + tracer.setSubscriberSpanException( + ackRequestData.getMessageWrapper(), t, "Invalid ack ID"); } else { logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage); ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess); messageDispatcher.notifyAckFailed(ackRequestData); + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); + tracer.setSubscriberSpanException( + ackRequestData.getMessageWrapper(), t, "Unknown error message"); + ackRequestData + .getMessageWrapper() + .setSubscriberSpanException(t, "Unknown error message"); } } else { ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); messageDispatcher.notifyAckSuccess(ackRequestData); + tracer.endSubscriberSpan(ackRequestData.getMessageWrapper()); + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), + rpcSpanSampled, + isModack, + deadlineExtensionSeconds); } // Remove from our pending pendingRequests.remove(ackRequestData); @@ -637,6 +716,9 @@ public static final class Builder { private ScheduledExecutorService systemExecutor; private ApiClock clock; + private boolean enableOpenTelemetryTracing; + private OpenTelemetryPubsubTracer tracer; + protected Builder(MessageReceiver receiver) { this.receiver = receiver; } @@ -727,6 +809,16 @@ public Builder setClock(ApiClock clock) { return this; } + public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) { + this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; + return this; + } + + public Builder setTracer(OpenTelemetryPubsubTracer tracer) { + this.tracer = tracer; + return this; + } + public StreamingSubscriberConnection build() { return new StreamingSubscriberConnection(this); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 1723c72b1..e9926fa58 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -43,6 +43,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -117,6 +119,8 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private static final Logger logger = Logger.getLogger(Subscriber.class.getName()); + private static final String OPEN_TELEMETRY_TRACER_NAME = "com.google.cloud.pubsub.v1"; + private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final boolean useLegacyFlowControl; @@ -145,6 +149,10 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final ApiClock clock; private final List backgroundResources = new ArrayList<>(); + private final boolean enableOpenTelemetryTracing; + private final OpenTelemetry openTelemetry; + private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); + private Subscriber(Builder builder) { receiver = builder.receiver; receiverWithAckResponse = builder.receiverWithAckResponse; @@ -199,6 +207,16 @@ private Subscriber(Builder builder) { throw new IllegalStateException(e); } + this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; + this.openTelemetry = builder.openTelemetry; + if (this.openTelemetry != null && this.enableOpenTelemetryTracing) { + Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); + if (openTelemetryTracer != null) { + this.tracer = + new OpenTelemetryPubsubTracer(openTelemetryTracer, this.enableOpenTelemetryTracing); + } + } + streamingSubscriberConnections = new ArrayList(numPullers); // We regularly look up the distribution for a good subscription deadline. @@ -386,6 +404,8 @@ private void startStreamingConnections() { .setExecutor(executor) .setSystemExecutor(alarmsExecutor) .setClock(clock) + .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing) + .setTracer(tracer) .build(); streamingSubscriberConnections.add(streamingSubscriberConnection); @@ -495,6 +515,9 @@ public static final class Builder { private String endpoint = null; private String universeDomain = null; + private boolean enableOpenTelemetryTracing = false; + private OpenTelemetry openTelemetry = null; + Builder(String subscription, MessageReceiver receiver) { this.subscription = subscription; this.receiver = receiver; @@ -684,6 +707,26 @@ Builder setClock(ApiClock clock) { return this; } + /** + * OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of + * OpenTelemetry has been provied. Warning: traces are subject to change. The name and + * attributes of a span might change without notice. Only use run traces interactively. Don't + * use in automation. Running non-interactive traces can cause problems if the underlying trace + * architecture changes without notice. + */ + + /** Gives the ability to enable Open Telemetry Tracing */ + public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) { + this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; + return this; + } + + /** Sets the instance of OpenTelemetry for the Publisher class. */ + public Builder setOpenTelemetry(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + return this; + } + /** Returns the default FlowControlSettings used by the client if settings are not provided. */ public static FlowControlSettings getDefaultFlowControlSettings() { return DEFAULT_FLOW_CONTROL_SETTINGS; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java new file mode 100644 index 000000000..b4433f41e --- /dev/null +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -0,0 +1,669 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.testing.assertj.AttributesAssert; +import io.opentelemetry.sdk.testing.assertj.EventDataAssert; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +public class OpenTelemetryTest { + private static final TopicName FULL_TOPIC_NAME = + TopicName.parse("projects/test-project/topics/test-topic"); + private static final SubscriptionName FULL_SUBSCRIPTION_NAME = + SubscriptionName.parse("projects/test-project/subscriptions/test-sub"); + private static final String PROJECT_NAME = "test-project"; + private static final String ORDERING_KEY = "abc"; + private static final String MESSAGE_ID = "m0"; + private static final String ACK_ID = "def"; + private static final int DELIVERY_ATTEMPT = 1; + private static final int ACK_DEADLINE = 10; + private static final boolean EXACTLY_ONCE_ENABLED = true; + + private static final String PUBLISHER_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " create"; + private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; + private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; + private static final String PUBLISH_RPC_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " publish"; + private static final String PUBLISH_START_EVENT = "publish start"; + private static final String PUBLISH_END_EVENT = "publish end"; + + private static final String SUBSCRIBER_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " subscribe"; + private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME = + "subscriber concurrency control"; + private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; + private static final String SUBSCRIBE_PROCESS_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " process"; + private static final String SUBSCRIBE_MODACK_RPC_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " modack"; + private static final String SUBSCRIBE_ACK_RPC_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " ack"; + private static final String SUBSCRIBE_NACK_RPC_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " nack"; + + private static final String PROCESS_ACTION = "ack"; + private static final String MODACK_START_EVENT = "modack start"; + private static final String MODACK_END_EVENT = "modack end"; + private static final String NACK_START_EVENT = "nack start"; + private static final String NACK_END_EVENT = "nack end"; + private static final String ACK_START_EVENT = "ack start"; + private static final String ACK_END_EVENT = "ack end"; + + private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; + private static final String PROJECT_ATTR_KEY = "gcp.project_id"; + private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; + private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; + private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; + private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; + private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; + private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = + "messaging.gcp_pubsub.message.exactly_once_delivery"; + private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result"; + private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = + "messaging.gcp_pubsub.message.delivery_attempt"; + + private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent"; + + private static final OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create(); + + @Test + public void testPublishSpansSuccess() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + List messageWrappers = Arrays.asList(messageWrapper); + + long messageSize = messageWrapper.getPubsubMessage().getData().size(); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + // Call all span start/end methods in the expected order + tracer.startPublisherSpan(messageWrapper); + tracer.startPublishFlowControlSpan(messageWrapper); + tracer.endPublishFlowControlSpan(messageWrapper); + tracer.startPublishBatchingSpan(messageWrapper); + tracer.endPublishBatchingSpan(messageWrapper); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); + tracer.endPublishRpcSpan(publishRpcSpan); + tracer.setPublisherMessageIdSpanAttribute(messageWrapper, MESSAGE_ID); + tracer.endPublisherSpan(messageWrapper); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(4, allSpans.size()); + SpanData flowControlSpanData = allSpans.get(0); + SpanData batchingSpanData = allSpans.get(1); + SpanData publishRpcSpanData = allSpans.get(2); + SpanData publisherSpanData = allSpans.get(3); + + SpanDataAssert flowControlSpanDataAssert = + OpenTelemetryAssertions.assertThat(flowControlSpanData); + flowControlSpanDataAssert + .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME) + .hasParent(publisherSpanData) + .hasEnded(); + + SpanDataAssert batchingSpanDataAssert = OpenTelemetryAssertions.assertThat(batchingSpanData); + batchingSpanDataAssert + .hasName(PUBLISH_BATCHING_SPAN_NAME) + .hasParent(publisherSpanData) + .hasEnded(); + + // Check span data, links, and attributes for the publish RPC span + SpanDataAssert publishRpcSpanDataAssert = + OpenTelemetryAssertions.assertThat(publishRpcSpanData); + publishRpcSpanDataAssert + .hasName(PUBLISH_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasEnded(); + + List publishRpcLinks = publishRpcSpanData.getLinks(); + assertEquals(messageWrappers.size(), publishRpcLinks.size()); + assertEquals(publisherSpanData.getSpanContext(), publishRpcLinks.get(0).getSpanContext()); + + assertEquals(6, publishRpcSpanData.getAttributes().size()); + AttributesAssert publishRpcSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(publishRpcSpanData.getAttributes()); + publishRpcSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic()) + .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "publishCall") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "publish") + .containsEntry(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messageWrappers.size()); + + // Check span data, events, links, and attributes for the publisher create span + SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); + publisherSpanDataAssert + .hasName(PUBLISHER_SPAN_NAME) + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasEnded(); + + assertEquals(2, publisherSpanData.getEvents().size()); + EventDataAssert startEventAssert = + OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(0)); + startEventAssert.hasName(PUBLISH_START_EVENT); + EventDataAssert endEventAssert = + OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(1)); + endEventAssert.hasName(PUBLISH_END_EVENT); + + List publisherLinks = publisherSpanData.getLinks(); + assertEquals(1, publisherLinks.size()); + assertEquals(publishRpcSpanData.getSpanContext(), publisherLinks.get(0).getSpanContext()); + + assertEquals(8, publisherSpanData.getAttributes().size()); + AttributesAssert publisherSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(publisherSpanData.getAttributes()); + publisherSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic()) + .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "publish") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create") + .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) + .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) + .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID); + + // Check that the message has the attribute containing the trace context. + PubsubMessage message = messageWrapper.getPubsubMessage(); + assertEquals(1, message.getAttributesMap().size()); + assertTrue(message.containsAttributes(TRACEPARENT_ATTRIBUTE)); + assertTrue( + message + .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "") + .contains(publisherSpanData.getTraceId())); + assertTrue( + message + .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "") + .contains(publisherSpanData.getSpanId())); + } + + @Test + public void testPublishFlowControlSpanFailure() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startPublisherSpan(messageWrapper); + tracer.startPublishFlowControlSpan(messageWrapper); + + Exception e = new Exception("test-exception"); + tracer.setPublishFlowControlSpanException(messageWrapper, e); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(2, allSpans.size()); + SpanData flowControlSpanData = allSpans.get(0); + SpanData publisherSpanData = allSpans.get(1); + + SpanDataAssert flowControlSpanDataAssert = + OpenTelemetryAssertions.assertThat(flowControlSpanData); + StatusData expectedStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown during publish flow control."); + flowControlSpanDataAssert + .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME) + .hasParent(publisherSpanData) + .hasStatus(expectedStatus) + .hasException(e) + .hasEnded(); + + SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); + publisherSpanDataAssert + .hasName(PUBLISHER_SPAN_NAME) + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasEnded(); + } + + @Test + public void testPublishRpcSpanFailure() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + + List messageWrappers = Arrays.asList(messageWrapper); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startPublisherSpan(messageWrapper); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); + + Exception e = new Exception("test-exception"); + tracer.setPublishRpcSpanException(publishRpcSpan, e); + tracer.endPublisherSpan(messageWrapper); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(2, allSpans.size()); + SpanData rpcSpanData = allSpans.get(0); + SpanData publisherSpanData = allSpans.get(1); + + SpanDataAssert rpcSpanDataAssert = OpenTelemetryAssertions.assertThat(rpcSpanData); + StatusData expectedStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown on publish RPC."); + rpcSpanDataAssert + .hasName(PUBLISH_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasStatus(expectedStatus) + .hasException(e) + .hasEnded(); + + SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); + publisherSpanDataAssert + .hasName(PUBLISHER_SPAN_NAME) + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasEnded(); + } + + @Test + public void testSubscribeSpansSuccess() { + openTelemetryTesting.clearSpans(); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + PubsubMessageWrapper publishMessageWrapper = + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + // Initialize the Publisher span to inject the context in the message + tracer.startPublisherSpan(publishMessageWrapper); + tracer.endPublisherSpan(publishMessageWrapper); + + PubsubMessage publishedMessage = + publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build(); + PubsubMessageWrapper subscribeMessageWrapper = + PubsubMessageWrapper.newBuilder( + publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1) + .build(); + List subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper); + + long messageSize = subscribeMessageWrapper.getPubsubMessage().getData().size(); + + // Call all span start/end methods in the expected order + tracer.startSubscriberSpan(subscribeMessageWrapper, EXACTLY_ONCE_ENABLED); + tracer.startSubscribeConcurrencyControlSpan(subscribeMessageWrapper); + tracer.endSubscribeConcurrencyControlSpan(subscribeMessageWrapper); + tracer.startSubscribeSchedulerSpan(subscribeMessageWrapper); + tracer.endSubscribeSchedulerSpan(subscribeMessageWrapper); + tracer.startSubscribeProcessSpan(subscribeMessageWrapper); + tracer.endSubscribeProcessSpan(subscribeMessageWrapper, PROCESS_ACTION); + Span subscribeModackRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), + "modack", + subscribeMessageWrappers, + ACK_DEADLINE, + true); + tracer.endSubscribeRpcSpan(subscribeModackRpcSpan); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, ACK_DEADLINE); + Span subscribeAckRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false); + tracer.endSubscribeRpcSpan(subscribeAckRpcSpan); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, false, 0); + Span subscribeNackRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false); + tracer.endSubscribeRpcSpan(subscribeNackRpcSpan); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, 0); + tracer.endSubscriberSpan(subscribeMessageWrapper); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(8, allSpans.size()); + + SpanData publisherSpanData = allSpans.get(0); + SpanData concurrencyControlSpanData = allSpans.get(1); + SpanData schedulerSpanData = allSpans.get(2); + SpanData processSpanData = allSpans.get(3); + SpanData modackRpcSpanData = allSpans.get(4); + SpanData ackRpcSpanData = allSpans.get(5); + SpanData nackRpcSpanData = allSpans.get(6); + SpanData subscriberSpanData = allSpans.get(7); + + SpanDataAssert concurrencyControlSpanDataAssert = + OpenTelemetryAssertions.assertThat(concurrencyControlSpanData); + concurrencyControlSpanDataAssert + .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME) + .hasParent(subscriberSpanData) + .hasEnded(); + + SpanDataAssert schedulerSpanDataAssert = OpenTelemetryAssertions.assertThat(schedulerSpanData); + schedulerSpanDataAssert + .hasName(SUBSCRIBE_SCHEDULER_SPAN_NAME) + .hasParent(subscriberSpanData) + .hasEnded(); + + SpanDataAssert processSpanDataAssert = OpenTelemetryAssertions.assertThat(processSpanData); + processSpanDataAssert + .hasName(SUBSCRIBE_PROCESS_SPAN_NAME) + .hasParent(subscriberSpanData) + .hasEnded(); + + assertEquals(1, processSpanData.getEvents().size()); + EventDataAssert actionCalledEventAssert = + OpenTelemetryAssertions.assertThat(processSpanData.getEvents().get(0)); + actionCalledEventAssert.hasName(PROCESS_ACTION + " called"); + + // Check span data, links, and attributes for the modack RPC span + SpanDataAssert modackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(modackRpcSpanData); + modackRpcSpanDataAssert + .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasEnded(); + + List modackRpcLinks = modackRpcSpanData.getLinks(); + assertEquals(subscribeMessageWrappers.size(), modackRpcLinks.size()); + assertEquals(subscriberSpanData.getSpanContext(), modackRpcLinks.get(0).getSpanContext()); + + assertEquals(8, modackRpcSpanData.getAttributes().size()); + AttributesAssert modackRpcSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(modackRpcSpanData.getAttributes()); + modackRpcSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry( + SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) + .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack") + .containsEntry( + SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()) + .containsEntry(ACK_DEADLINE_ATTR_KEY, 10) + .containsEntry(RECEIPT_MODACK_ATTR_KEY, true); + + // Check span data, links, and attributes for the ack RPC span + SpanDataAssert ackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(ackRpcSpanData); + ackRpcSpanDataAssert + .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasEnded(); + + List ackRpcLinks = ackRpcSpanData.getLinks(); + assertEquals(subscribeMessageWrappers.size(), ackRpcLinks.size()); + assertEquals(subscriberSpanData.getSpanContext(), ackRpcLinks.get(0).getSpanContext()); + + assertEquals(6, ackRpcSpanData.getAttributes().size()); + AttributesAssert ackRpcSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(ackRpcSpanData.getAttributes()); + ackRpcSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry( + SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) + .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendAckOperations") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "ack") + .containsEntry( + SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); + + // Check span data, links, and attributes for the nack RPC span + SpanDataAssert nackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(nackRpcSpanData); + nackRpcSpanDataAssert + .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasEnded(); + + List nackRpcLinks = nackRpcSpanData.getLinks(); + assertEquals(subscribeMessageWrappers.size(), nackRpcLinks.size()); + assertEquals(subscriberSpanData.getSpanContext(), nackRpcLinks.get(0).getSpanContext()); + + assertEquals(6, nackRpcSpanData.getAttributes().size()); + AttributesAssert nackRpcSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(nackRpcSpanData.getAttributes()); + nackRpcSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry( + SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) + .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "nack") + .containsEntry( + SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); + + // Check span data, events, links, and attributes for the publisher create span + SpanDataAssert subscriberSpanDataAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData); + subscriberSpanDataAssert + .hasName(SUBSCRIBER_SPAN_NAME) + .hasKind(SpanKind.CONSUMER) + .hasParent(publisherSpanData) + .hasEnded(); + + assertEquals(6, subscriberSpanData.getEvents().size()); + EventDataAssert startModackEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(0)); + startModackEventAssert.hasName(MODACK_START_EVENT); + EventDataAssert endModackEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(1)); + endModackEventAssert.hasName(MODACK_END_EVENT); + EventDataAssert startAckEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(2)); + startAckEventAssert.hasName(ACK_START_EVENT); + EventDataAssert endAckEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(3)); + endAckEventAssert.hasName(ACK_END_EVENT); + EventDataAssert startNackEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(4)); + startNackEventAssert.hasName(NACK_START_EVENT); + EventDataAssert endNackEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(5)); + endNackEventAssert.hasName(NACK_END_EVENT); + + List subscriberLinks = subscriberSpanData.getLinks(); + assertEquals(3, subscriberLinks.size()); + assertEquals(modackRpcSpanData.getSpanContext(), subscriberLinks.get(0).getSpanContext()); + assertEquals(ackRpcSpanData.getSpanContext(), subscriberLinks.get(1).getSpanContext()); + assertEquals(nackRpcSpanData.getSpanContext(), subscriberLinks.get(2).getSpanContext()); + + assertEquals(11, subscriberSpanData.getAttributes().size()); + AttributesAssert subscriberSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getAttributes()); + subscriberSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry( + SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) + .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse") + .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) + .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) + .containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID) + .containsEntry(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, DELIVERY_ATTEMPT) + .containsEntry(MESSAGE_EXACTLY_ONCE_ATTR_KEY, EXACTLY_ONCE_ENABLED) + .containsEntry(MESSAGE_RESULT_ATTR_KEY, PROCESS_ACTION) + .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID); + } + + @Test + public void testSubscribeConcurrencyControlSpanFailure() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder( + getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + .build(); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); + tracer.startSubscribeConcurrencyControlSpan(messageWrapper); + + Exception e = new Exception("test-exception"); + tracer.setSubscribeConcurrencyControlSpanException(messageWrapper, e); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(2, allSpans.size()); + SpanData concurrencyControlSpanData = allSpans.get(0); + SpanData subscriberSpanData = allSpans.get(1); + + SpanDataAssert concurrencyControlSpanDataAssert = + OpenTelemetryAssertions.assertThat(concurrencyControlSpanData); + StatusData expectedStatus = + StatusData.create( + StatusCode.ERROR, "Exception thrown during subscribe concurrency control."); + concurrencyControlSpanDataAssert + .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME) + .hasParent(subscriberSpanData) + .hasStatus(expectedStatus) + .hasException(e) + .hasEnded(); + + SpanDataAssert subscriberSpanDataAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData); + subscriberSpanDataAssert + .hasName(SUBSCRIBER_SPAN_NAME) + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasEnded(); + } + + @Test + public void testSubscriberSpanFailure() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder( + getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + .build(); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); + + Exception e = new Exception("test-exception"); + tracer.setSubscriberSpanException(messageWrapper, e, "Test exception"); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(1, allSpans.size()); + SpanData subscriberSpanData = allSpans.get(0); + + StatusData expectedStatus = StatusData.create(StatusCode.ERROR, "Test exception"); + SpanDataAssert subscriberSpanDataAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData); + subscriberSpanDataAssert + .hasName(SUBSCRIBER_SPAN_NAME) + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasStatus(expectedStatus) + .hasException(e) + .hasEnded(); + } + + @Test + public void testSubscribeRpcSpanFailures() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder( + getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + .build(); + List messageWrappers = Arrays.asList(messageWrapper); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); + Span subscribeModackRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "modack", messageWrappers, ACK_DEADLINE, true); + Span subscribeAckRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false); + Span subscribeNackRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false); + + Exception e = new Exception("test-exception"); + tracer.setSubscribeRpcSpanException(subscribeModackRpcSpan, true, ACK_DEADLINE, e); + tracer.setSubscribeRpcSpanException(subscribeAckRpcSpan, false, 0, e); + tracer.setSubscribeRpcSpanException(subscribeNackRpcSpan, true, 0, e); + tracer.endSubscriberSpan(messageWrapper); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(4, allSpans.size()); + SpanData modackSpanData = allSpans.get(0); + SpanData ackSpanData = allSpans.get(1); + SpanData nackSpanData = allSpans.get(2); + SpanData subscriberSpanData = allSpans.get(3); + + StatusData expectedModackStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown on modack RPC."); + SpanDataAssert modackSpanDataAssert = OpenTelemetryAssertions.assertThat(modackSpanData); + modackSpanDataAssert + .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasStatus(expectedModackStatus) + .hasException(e) + .hasEnded(); + + StatusData expectedAckStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown on ack RPC."); + SpanDataAssert ackSpanDataAssert = OpenTelemetryAssertions.assertThat(ackSpanData); + ackSpanDataAssert + .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasStatus(expectedAckStatus) + .hasException(e) + .hasEnded(); + + StatusData expectedNackStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown on nack RPC."); + SpanDataAssert nackSpanDataAssert = OpenTelemetryAssertions.assertThat(nackSpanData); + nackSpanDataAssert + .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasStatus(expectedNackStatus) + .hasException(e) + .hasEnded(); + } + + private PubsubMessage getPubsubMessage() { + return PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("test-data")) + .setOrderingKey(ORDERING_KEY) + .build(); + } +} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 9785b7716..fedd17436 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -48,6 +48,12 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -74,6 +80,11 @@ public class PublisherImplTest { private static final TransportChannelProvider TEST_CHANNEL_PROVIDER = LocalChannelProvider.create("test-server"); + private static final String PUBLISHER_SPAN_NAME = TEST_TOPIC.getTopic() + " create"; + private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; + private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; + private static final String PUBLISH_RPC_SPAN_NAME = TEST_TOPIC.getTopic() + " publish"; + private FakeScheduledExecutorService fakeExecutor; private FakePublisherServiceImpl testPublisherServiceImpl; @@ -1304,6 +1315,70 @@ public void run() { publish4Completed.await(); } + @Test + public void testPublishOpenTelemetryTracing() throws Exception { + OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create(); + OpenTelemetry openTelemetry = openTelemetryTesting.getOpenTelemetry(); + final Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .setMaxOutstandingElementCount(2L) + .setMaxOutstandingRequestBytes(100L) + .build()) + .build()) + .setOpenTelemetry(openTelemetry) + .setEnableOpenTelemetryTracing(true) + .build(); + + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + ApiFuture publishFuture = sendTestMessage(publisher, "A"); + fakeExecutor.advanceTime(Duration.ofSeconds(5)); + assertEquals("1", publishFuture.get()); + fakeExecutor.advanceTime(Duration.ofSeconds(5)); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(4, allSpans.size()); + SpanData flowControlSpanData = allSpans.get(0); + SpanData batchingSpanData = allSpans.get(1); + SpanData publishRpcSpanData = allSpans.get(2); + SpanData publisherSpanData = allSpans.get(3); + + SpanDataAssert flowControlSpanDataAssert = + OpenTelemetryAssertions.assertThat(flowControlSpanData); + flowControlSpanDataAssert + .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME) + .hasParent(publisherSpanData) + .hasEnded(); + + SpanDataAssert batchingSpanDataAssert = OpenTelemetryAssertions.assertThat(batchingSpanData); + batchingSpanDataAssert + .hasName(PUBLISH_BATCHING_SPAN_NAME) + .hasParent(publisherSpanData) + .hasEnded(); + + SpanDataAssert publishRpcSpanDataAssert = + OpenTelemetryAssertions.assertThat(publishRpcSpanData); + publishRpcSpanDataAssert + .hasName(PUBLISH_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasEnded(); + + SpanDataAssert publishSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); + publishSpanDataAssert + .hasName(PUBLISHER_SPAN_NAME) + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasEnded(); + } + private Builder getTestPublisherBuilder() { return Publisher.newBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) diff --git a/pom.xml b/pom.xml index 33ba52966..2c816aabf 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,12 @@ + + org.assertj + assertj-core + 3.26.0 + test + diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index 604b622d3..6196121dd 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -93,6 +93,11 @@ google-cloud-storage 2.43.1 + + com.google.cloud.opentelemetry + exporter-trace + 0.31.0 + diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index 12bf9d92c..3e5f9e9d5 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -92,6 +92,11 @@ google-cloud-storage 2.43.1 + + com.google.cloud.opentelemetry + exporter-trace + 0.31.0 + diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 8720ef215..ba6fb65d9 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -67,6 +67,11 @@ com.google.cloud google-cloud-storage + + com.google.cloud.opentelemetry + exporter-trace + 0.31.0 + org.apache.avro avro