diff --git a/.github/.OwlBot-hermetic.yaml b/.github/.OwlBot-hermetic.yaml
index 1757987e4..8a75909c6 100644
--- a/.github/.OwlBot-hermetic.yaml
+++ b/.github/.OwlBot-hermetic.yaml
@@ -33,6 +33,7 @@ deep-preserve-regex:
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java"
+- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StatusUtilTest.java"
@@ -51,8 +52,10 @@ deep-preserve-regex:
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java"
+- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PublisherInterface.java"
+- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java"
diff --git a/google-cloud-pubsub/pom.xml b/google-cloud-pubsub/pom.xml
index 2657c64af..46fe71d26 100644
--- a/google-cloud-pubsub/pom.xml
+++ b/google-cloud-pubsub/pom.xml
@@ -100,6 +100,18 @@
google-http-client
runtime
+
+ io.opentelemetry
+ opentelemetry-api
+
+
+ io.opentelemetry
+ opentelemetry-context
+
+
+ io.opentelemetry
+ opentelemetry-semconv
+
@@ -142,6 +154,21 @@
opencensus-impl
test
+
+ io.opentelemetry
+ opentelemetry-sdk-trace
+ test
+
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
com.google.api
@@ -174,6 +201,7 @@
com.google.auth:google-auth-library-oauth2-http:jar
io.opencensus:opencensus-impl
javax.annotation:javax.annotation-api
+ org.assertj:assertj-core
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java
index 3b67ce219..5cab83f49 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java
@@ -22,10 +22,12 @@
public class AckRequestData {
private final String ackId;
private final Optional> messageFuture;
+ private PubsubMessageWrapper messageWrapper;
protected AckRequestData(Builder builder) {
this.ackId = builder.ackId;
this.messageFuture = builder.messageFuture;
+ this.messageWrapper = builder.messageWrapper;
}
public String getAckId() {
@@ -36,6 +38,17 @@ public SettableApiFuture getMessageFutureIfExists() {
return this.messageFuture.orElse(null);
}
+ /**
+ * Returns an empty PubsubMessageWrapper with OpenTelemetry tracing disabled. This allows methods
+ * that use this method to be unit tested.
+ */
+ public PubsubMessageWrapper getMessageWrapper() {
+ if (this.messageWrapper == null) {
+ return PubsubMessageWrapper.newBuilder(null, null).build();
+ }
+ return messageWrapper;
+ }
+
public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
switch (ackResponse) {
@@ -68,6 +81,7 @@ public static Builder newBuilder(String ackId) {
protected static final class Builder {
private final String ackId;
private Optional> messageFuture = Optional.empty();
+ private PubsubMessageWrapper messageWrapper;
protected Builder(String ackId) {
this.ackId = ackId;
@@ -78,6 +92,11 @@ public Builder setMessageFuture(SettableApiFuture messageFuture) {
return this;
}
+ public Builder setMessageWrapper(PubsubMessageWrapper messageWrapper) {
+ this.messageWrapper = messageWrapper;
+ return this;
+ }
+
public AckRequestData build() {
return new AckRequestData(this);
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
index 1810badd2..860fcbcf9 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
@@ -104,6 +104,10 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;
+ private final String subscriptionName;
+ private final boolean enableOpenTelemetryTracing;
+ private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
+
/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
@@ -157,6 +161,7 @@ public void onFailure(Throwable t) {
t);
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
+ tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
forget();
}
@@ -169,9 +174,11 @@ public void onSuccess(AckReply reply) {
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
+ tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
break;
case NACK:
pendingNacks.add(this.ackRequestData);
+ tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
@@ -217,6 +224,12 @@ private MessageDispatcher(Builder builder) {
jobLock = new ReentrantLock();
messagesWaiter = new Waiter();
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);
+
+ subscriptionName = builder.subscriptionName;
+ enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
+ if (builder.tracer != null) {
+ tracer = builder.tracer;
+ }
}
private boolean shouldSetMessageFuture() {
@@ -351,13 +364,15 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
}
private static class OutstandingMessage {
- private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;
- private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
- this.receivedMessage = receivedMessage;
+ private OutstandingMessage(AckHandler ackHandler) {
this.ackHandler = ackHandler;
}
+
+ public PubsubMessageWrapper messageWrapper() {
+ return this.ackHandler.ackRequestData.getMessageWrapper();
+ }
}
private static class ReceiptCompleteData {
@@ -390,10 +405,20 @@ void processReceivedMessages(List messages) {
if (shouldSetMessageFuture()) {
builder.setMessageFuture(SettableApiFuture.create());
}
+ PubsubMessageWrapper messageWrapper =
+ PubsubMessageWrapper.newBuilder(
+ message.getMessage(),
+ subscriptionName,
+ message.getAckId(),
+ message.getDeliveryAttempt())
+ .build();
+ builder.setMessageWrapper(messageWrapper);
+ tracer.startSubscriberSpan(messageWrapper, this.exactlyOnceDeliveryEnabled.get());
+
AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
- OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
+ OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler);
if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
@@ -457,30 +482,40 @@ private void processBatch(List batch) {
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
+ tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper());
try {
- flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
+ flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
+ tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper());
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
+ tracer.setSubscribeConcurrencyControlSpanException(
+ message.messageWrapper(), unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
- processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
+ addDeliveryInfoCount(message.messageWrapper());
+ processOutstandingMessage(message.ackHandler);
}
}
- private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
- PubsubMessage originalMessage = receivedMessage.getMessage();
- int deliveryAttempt = receivedMessage.getDeliveryAttempt();
+ private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) {
+ PubsubMessage originalMessage = messageWrapper.getPubsubMessage();
+ int deliveryAttempt = messageWrapper.getDeliveryAttempt();
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
if (deliveryAttempt > 0) {
- return PubsubMessage.newBuilder(originalMessage)
- .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
- .build();
+ messageWrapper.setPubsubMessage(
+ PubsubMessage.newBuilder(originalMessage)
+ .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
+ .build());
}
- return originalMessage;
}
- private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
+ private void processOutstandingMessage(final AckHandler ackHandler) {
+ // Get the PubsubMessageWrapper and the PubsubMessage it wraps that are stored withing the
+ // AckHandler object.
+ PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper();
+ PubsubMessage message = messageWrapper.getPubsubMessage();
+
// This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
// use below in the consumers
SettableApiFuture ackReplySettableApiFuture = SettableApiFuture.create();
@@ -499,8 +534,10 @@ public void run() {
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
+ tracer.setSubscriberSpanExpirationResult(messageWrapper);
return;
}
+ tracer.startSubscribeProcessSpan(messageWrapper);
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture messageFuture =
@@ -521,7 +558,9 @@ public void run() {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
+ tracer.startSubscribeSchedulerSpan(messageWrapper);
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
+ tracer.endSubscribeSchedulerSpan(messageWrapper);
}
}
@@ -607,8 +646,10 @@ void processOutstandingOperations() {
List ackRequestDataReceipts = new ArrayList();
pendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
- modackRequestData.add(
- new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
+ ModackRequestData receiptModack =
+ new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
+ receiptModack.setIsReceiptModack(true);
+ modackRequestData.add(receiptModack);
}
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());
@@ -645,6 +686,10 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;
+ private String subscriptionName;
+ private boolean enableOpenTelemetryTracing;
+ private OpenTelemetryPubsubTracer tracer;
+
protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
}
@@ -715,6 +760,21 @@ public Builder setApiClock(ApiClock clock) {
return this;
}
+ public Builder setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ return this;
+ }
+
+ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
+ this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
+ return this;
+ }
+
+ public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
+ this.tracer = tracer;
+ return this;
+ }
+
public MessageDispatcher build() {
return new MessageDispatcher(this);
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java
index b4d2dae0f..54c7436af 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java
@@ -21,6 +21,7 @@
class ModackRequestData {
private final int deadlineExtensionSeconds;
private List ackRequestData;
+ private boolean isReceiptModack;
ModackRequestData(int deadlineExtensionSeconds) {
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
@@ -45,8 +46,17 @@ public List getAckRequestData() {
return ackRequestData;
}
+ public boolean getIsReceiptModack() {
+ return isReceiptModack;
+ }
+
public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
this.ackRequestData.add(ackRequestData);
return this;
}
+
+ public ModackRequestData setIsReceiptModack(boolean isReceiptModack) {
+ this.isReceiptModack = isReceiptModack;
+ return this;
+ }
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java
new file mode 100644
index 000000000..b946f44bf
--- /dev/null
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.SubscriptionName;
+import com.google.pubsub.v1.TopicName;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
+import java.util.List;
+
+public class OpenTelemetryPubsubTracer {
+ private final Tracer tracer;
+ private boolean enabled = false;
+
+ private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control";
+ private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching";
+ private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME =
+ "subscriber concurrency control";
+ private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler";
+
+ private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
+ private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
+ private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
+ private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
+ "messaging.gcp_pubsub.message.exactly_once_delivery";
+ private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
+ "messaging.gcp_pubsub.message.delivery_attempt";
+ private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
+ private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
+ private static final String PROJECT_ATTR_KEY = "gcp.project_id";
+ private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish";
+
+ private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
+
+ OpenTelemetryPubsubTracer(Tracer tracer, boolean enableOpenTelemetry) {
+ this.tracer = tracer;
+ if (this.tracer != null && enableOpenTelemetry) {
+ this.enabled = true;
+ }
+ }
+
+ /** Populates attributes that are common the publisher parent span and publish RPC span. */
+ private static final AttributesBuilder createCommonSpanAttributesBuilder(
+ String destinationName, String projectName, String codeFunction, String operation) {
+ AttributesBuilder attributesBuilder =
+ Attributes.builder()
+ .put(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
+ .put(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName)
+ .put(PROJECT_ATTR_KEY, projectName)
+ .put(SemanticAttributes.CODE_FUNCTION, codeFunction);
+ if (operation != null) {
+ attributesBuilder.put(SemanticAttributes.MESSAGING_OPERATION, operation);
+ }
+
+ return attributesBuilder;
+ }
+
+ private Span startChildSpan(String name, Span parent) {
+ return tracer.spanBuilder(name).setParent(Context.current().with(parent)).startSpan();
+ }
+
+ /**
+ * Creates and starts the parent span with the appropriate span attributes and injects the span
+ * context into the {@link PubsubMessage} attributes.
+ */
+ void startPublisherSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ AttributesBuilder attributesBuilder =
+ createCommonSpanAttributesBuilder(
+ message.getTopicName(), message.getTopicProject(), "publish", "create");
+
+ attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize());
+ if (!message.getOrderingKey().isEmpty()) {
+ attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
+ }
+
+ Span publisherSpan =
+ tracer
+ .spanBuilder(message.getTopicName() + " create")
+ .setSpanKind(SpanKind.PRODUCER)
+ .setAllAttributes(attributesBuilder.build())
+ .startSpan();
+
+ message.setPublisherSpan(publisherSpan);
+ if (publisherSpan.getSpanContext().isValid()) {
+ message.injectSpanContext();
+ }
+ }
+
+ void endPublisherSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ message.endPublisherSpan();
+ }
+
+ void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) {
+ if (!enabled) {
+ return;
+ }
+ message.setPublisherMessageIdSpanAttribute(messageId);
+ }
+
+ /** Creates a span for publish-side flow control as a child of the parent publisher span. */
+ void startPublishFlowControlSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ Span publisherSpan = message.getPublisherSpan();
+ if (publisherSpan != null)
+ message.setPublishFlowControlSpan(
+ startChildSpan(PUBLISH_FLOW_CONTROL_SPAN_NAME, publisherSpan));
+ }
+
+ void endPublishFlowControlSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ message.endPublishFlowControlSpan();
+ }
+
+ void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) {
+ if (!enabled) {
+ return;
+ }
+ message.setPublishFlowControlSpanException(t);
+ }
+
+ /** Creates a span for publish message batching as a child of the parent publisher span. */
+ void startPublishBatchingSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ Span publisherSpan = message.getPublisherSpan();
+ if (publisherSpan != null) {
+ message.setPublishBatchingSpan(startChildSpan(PUBLISH_BATCHING_SPAN_NAME, publisherSpan));
+ }
+ }
+
+ void endPublishBatchingSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ message.endPublishBatchingSpan();
+ }
+
+ /**
+ * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional
+ * links with the publisher parent span are created for sampled messages in the batch.
+ */
+ Span startPublishRpcSpan(String topic, List messages) {
+ if (!enabled) {
+ return null;
+ }
+ TopicName topicName = TopicName.parse(topic);
+ Attributes attributes =
+ createCommonSpanAttributesBuilder(
+ topicName.getTopic(), topicName.getProject(), "publishCall", "publish")
+ .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size())
+ .build();
+ SpanBuilder publishRpcSpanBuilder =
+ tracer
+ .spanBuilder(topicName.getTopic() + PUBLISH_RPC_SPAN_SUFFIX)
+ .setSpanKind(SpanKind.CLIENT)
+ .setAllAttributes(attributes);
+ Attributes linkAttributes =
+ Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build();
+ for (PubsubMessageWrapper message : messages) {
+ if (message.getPublisherSpan().getSpanContext().isSampled())
+ publishRpcSpanBuilder.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes);
+ }
+ Span publishRpcSpan = publishRpcSpanBuilder.startSpan();
+
+ for (PubsubMessageWrapper message : messages) {
+ if (publishRpcSpan.getSpanContext().isSampled()) {
+ message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes);
+ message.addPublishStartEvent();
+ }
+ }
+ return publishRpcSpan;
+ }
+
+ /** Ends the given publish RPC span if it exists. */
+ void endPublishRpcSpan(Span publishRpcSpan) {
+ if (!enabled) {
+ return;
+ }
+ if (publishRpcSpan != null) {
+ publishRpcSpan.end();
+ }
+ }
+
+ /**
+ * Sets an error status and records an exception when an exception is thrown when publishing the
+ * message batch.
+ */
+ void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) {
+ if (!enabled) {
+ return;
+ }
+ if (publishRpcSpan != null) {
+ publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC.");
+ publishRpcSpan.recordException(t);
+ publishRpcSpan.end();
+ }
+ }
+
+ void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDeliveryEnabled) {
+ if (!enabled) {
+ return;
+ }
+ AttributesBuilder attributesBuilder =
+ createCommonSpanAttributesBuilder(
+ message.getSubscriptionName(), message.getSubscriptionProject(), "onResponse", null);
+
+ attributesBuilder
+ .put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId())
+ .put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize())
+ .put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId())
+ .put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled);
+ if (!message.getOrderingKey().isEmpty()) {
+ attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
+ }
+ if (message.getDeliveryAttempt() > 0) {
+ attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt());
+ }
+ Attributes attributes = attributesBuilder.build();
+ Context publisherSpanContext = message.extractSpanContext(attributes);
+ message.setPublisherSpan(Span.fromContextOrNull(publisherSpanContext));
+ message.setSubscriberSpan(
+ tracer
+ .spanBuilder(message.getSubscriptionName() + " subscribe")
+ .setSpanKind(SpanKind.CONSUMER)
+ .setParent(publisherSpanContext)
+ .setAllAttributes(attributes)
+ .startSpan());
+ }
+
+ void endSubscriberSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ message.endSubscriberSpan();
+ }
+
+ void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ message.setSubscriberSpanExpirationResult();
+ }
+
+ void setSubscriberSpanException(PubsubMessageWrapper message, Throwable t, String exception) {
+ if (!enabled) {
+ return;
+ }
+ message.setSubscriberSpanException(t, exception);
+ }
+
+ /** Creates a span for subscribe concurrency control as a child of the parent subscriber span. */
+ void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ Span subscriberSpan = message.getSubscriberSpan();
+ if (subscriberSpan != null) {
+ message.setSubscribeConcurrencyControlSpan(
+ startChildSpan(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME, subscriberSpan));
+ }
+ }
+
+ void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ message.endSubscribeConcurrencyControlSpan();
+ }
+
+ void setSubscribeConcurrencyControlSpanException(PubsubMessageWrapper message, Throwable t) {
+ if (!enabled) {
+ return;
+ }
+ message.setSubscribeConcurrencyControlSpanException(t);
+ }
+
+ /**
+ * Creates a span for subscribe ordering key scheduling as a child of the parent subscriber span.
+ */
+ void startSubscribeSchedulerSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ Span subscriberSpan = message.getSubscriberSpan();
+ if (subscriberSpan != null) {
+ message.setSubscribeSchedulerSpan(
+ startChildSpan(SUBSCRIBE_SCHEDULER_SPAN_NAME, subscriberSpan));
+ }
+ }
+
+ void endSubscribeSchedulerSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ message.endSubscribeSchedulerSpan();
+ }
+
+ /** Creates a span for subscribe message processing as a child of the parent subscriber span. */
+ void startSubscribeProcessSpan(PubsubMessageWrapper message) {
+ if (!enabled) {
+ return;
+ }
+ Span subscriberSpan = message.getSubscriberSpan();
+ if (subscriberSpan != null) {
+ Span subscribeProcessSpan =
+ startChildSpan(message.getSubscriptionName() + " process", subscriberSpan);
+ subscribeProcessSpan.setAttribute(
+ SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE);
+ Span publisherSpan = message.getPublisherSpan();
+ if (publisherSpan != null) {
+ subscribeProcessSpan.addLink(publisherSpan.getSpanContext());
+ }
+ message.setSubscribeProcessSpan(subscribeProcessSpan);
+ }
+ }
+
+ void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) {
+ if (!enabled) {
+ return;
+ }
+ message.endSubscribeProcessSpan(action);
+ }
+
+ /**
+ * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links
+ * to parent subscribe span for sampled messages are added.
+ */
+ Span startSubscribeRpcSpan(
+ String subscription,
+ String rpcOperation,
+ List messages,
+ int ackDeadline,
+ boolean isReceiptModack) {
+ if (!enabled) {
+ return null;
+ }
+ String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations";
+ SubscriptionName subscriptionName = SubscriptionName.parse(subscription);
+ AttributesBuilder attributesBuilder =
+ createCommonSpanAttributesBuilder(
+ subscriptionName.getSubscription(),
+ subscriptionName.getProject(),
+ codeFunction,
+ rpcOperation)
+ .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size());
+
+ // Ack deadline and receipt modack are specific to the modack operation
+ if (rpcOperation == "modack") {
+ attributesBuilder
+ .put(ACK_DEADLINE_ATTR_KEY, ackDeadline)
+ .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack);
+ }
+
+ SpanBuilder rpcSpanBuilder =
+ tracer
+ .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation)
+ .setSpanKind(SpanKind.CLIENT)
+ .setAllAttributes(attributesBuilder.build());
+ Attributes linkAttributes =
+ Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation).build();
+ for (PubsubMessageWrapper message : messages) {
+ if (message.getSubscriberSpan().getSpanContext().isSampled()) {
+ rpcSpanBuilder.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes);
+ }
+ }
+ Span rpcSpan = rpcSpanBuilder.startSpan();
+
+ for (PubsubMessageWrapper message : messages) {
+ if (rpcSpan.getSpanContext().isSampled()) {
+ message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes);
+ switch (rpcOperation) {
+ case "ack":
+ message.addAckStartEvent();
+ break;
+ case "modack":
+ message.addModAckStartEvent();
+ break;
+ case "nack":
+ message.addNackStartEvent();
+ break;
+ }
+ }
+ }
+ return rpcSpan;
+ }
+
+ /** Ends the given subscribe RPC span if it exists. */
+ void endSubscribeRpcSpan(Span rpcSpan) {
+ if (!enabled) {
+ return;
+ }
+ if (rpcSpan != null) {
+ rpcSpan.end();
+ }
+ }
+
+ /**
+ * Sets an error status and records an exception when an exception is thrown when handling a
+ * subscribe-side RPC.
+ */
+ void setSubscribeRpcSpanException(Span rpcSpan, boolean isModack, int ackDeadline, Throwable t) {
+ if (!enabled) {
+ return;
+ }
+ if (rpcSpan != null) {
+ String operation = !isModack ? "ack" : (ackDeadline == 0 ? "nack" : "modack");
+ rpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on " + operation + " RPC.");
+ rpcSpan.recordException(t);
+ rpcSpan.end();
+ }
+ }
+
+ /** Adds the appropriate subscribe-side RPC end event. */
+ void addEndRpcEvent(
+ PubsubMessageWrapper message, boolean rpcSampled, boolean isModack, int ackDeadline) {
+ if (!enabled || !rpcSampled) {
+ return;
+ }
+ if (!isModack) {
+ message.addAckEndEvent();
+ } else if (ackDeadline == 0) {
+ message.addNackEndEvent();
+ } else {
+ message.addModAckEndEvent();
+ }
+ }
+}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java
index efaba6cf1..99d0be17b 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java
@@ -52,6 +52,9 @@
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.TopicNames;
import io.grpc.CallOptions;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -93,6 +96,8 @@ public class Publisher implements PublisherInterface {
private static final String GZIP_COMPRESSION = "gzip";
+ private static final String OPEN_TELEMETRY_TRACER_NAME = "com.google.cloud.pubsub.v1";
+
private final String topicName;
private final BatchingSettings batchingSettings;
@@ -124,6 +129,10 @@ public class Publisher implements PublisherInterface {
private final GrpcCallContext publishContext;
private final GrpcCallContext publishContextWithCompression;
+ private final boolean enableOpenTelemetryTracing;
+ private final OpenTelemetry openTelemetry;
+ private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
+
/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
return 1000L;
@@ -152,6 +161,15 @@ private Publisher(Builder builder) throws IOException {
this.messageTransform = builder.messageTransform;
this.enableCompression = builder.enableCompression;
this.compressionBytesThreshold = builder.compressionBytesThreshold;
+ this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
+ this.openTelemetry = builder.openTelemetry;
+ if (this.openTelemetry != null && this.enableOpenTelemetryTracing) {
+ Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME);
+ if (openTelemetryTracer != null) {
+ this.tracer =
+ new OpenTelemetryPubsubTracer(openTelemetryTracer, this.enableOpenTelemetryTracing);
+ }
+ }
messagesBatches = new HashMap<>();
messagesBatchLock = new ReentrantLock();
@@ -259,17 +277,23 @@ public ApiFuture publish(PubsubMessage message) {
+ "Publisher client. Please create a Publisher client with "
+ "setEnableMessageOrdering(true) in the builder.");
- final OutstandingPublish outstandingPublish =
- new OutstandingPublish(messageTransform.apply(message));
+ PubsubMessageWrapper messageWrapper =
+ PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicName).build();
+ tracer.startPublisherSpan(messageWrapper);
+
+ final OutstandingPublish outstandingPublish = new OutstandingPublish(messageWrapper);
if (flowController != null) {
+ tracer.startPublishFlowControlSpan(messageWrapper);
try {
flowController.acquire(outstandingPublish.messageSize);
+ tracer.endPublishFlowControlSpan(messageWrapper);
} catch (FlowController.FlowControlException e) {
if (!orderingKey.isEmpty()) {
sequentialExecutor.stopPublish(orderingKey);
}
outstandingPublish.publishResult.setException(e);
+ tracer.setPublishFlowControlSpanException(messageWrapper, e);
return outstandingPublish.publishResult;
}
}
@@ -277,6 +301,7 @@ public ApiFuture publish(PubsubMessage message) {
List batchesToSend;
messagesBatchLock.lock();
try {
+ tracer.startPublishBatchingSpan(messageWrapper);
if (!orderingKey.isEmpty() && sequentialExecutor.keyHasError(orderingKey)) {
outstandingPublish.publishResult.setException(
SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION);
@@ -452,12 +477,23 @@ private ApiFuture publishCall(OutstandingBatch outstandingBatch
if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) {
context = publishContextWithCompression;
}
+
+ int numMessagesInBatch = outstandingBatch.size();
+ List pubsubMessagesList = new ArrayList(numMessagesInBatch);
+ List messageWrappers = outstandingBatch.getMessageWrappers();
+ for (PubsubMessageWrapper messageWrapper : messageWrappers) {
+ tracer.endPublishBatchingSpan(messageWrapper);
+ pubsubMessagesList.add(messageWrapper.getPubsubMessage());
+ }
+
+ outstandingBatch.publishRpcSpan = tracer.startPublishRpcSpan(topicName, messageWrappers);
+
return publisherStub
.publishCallable()
.futureCall(
PublishRequest.newBuilder()
.setTopic(topicName)
- .addAllMessages(outstandingBatch.getMessages())
+ .addAllMessages(pubsubMessagesList)
.build(),
context);
}
@@ -541,6 +577,7 @@ private final class OutstandingBatch {
int attempt;
int batchSizeBytes;
final String orderingKey;
+ Span publishRpcSpan;
OutstandingBatch(
List outstandingPublishes, int batchSizeBytes, String orderingKey) {
@@ -555,24 +592,29 @@ int size() {
return outstandingPublishes.size();
}
- private List getMessages() {
- List results = new ArrayList<>(outstandingPublishes.size());
+ private List getMessageWrappers() {
+ List results = new ArrayList<>(outstandingPublishes.size());
for (OutstandingPublish outstandingPublish : outstandingPublishes) {
- results.add(outstandingPublish.message);
+ results.add(outstandingPublish.messageWrapper);
}
return results;
}
private void onFailure(Throwable t) {
+ tracer.setPublishRpcSpanException(publishRpcSpan, t);
+
for (OutstandingPublish outstandingPublish : outstandingPublishes) {
if (flowController != null) {
flowController.release(outstandingPublish.messageSize);
}
outstandingPublish.publishResult.setException(t);
+ tracer.endPublisherSpan(outstandingPublish.messageWrapper);
}
}
private void onSuccess(Iterable results) {
+ tracer.endPublishRpcSpan(publishRpcSpan);
+
Iterator messagesResultsIt = outstandingPublishes.iterator();
for (String messageId : results) {
OutstandingPublish nextPublish = messagesResultsIt.next();
@@ -580,19 +622,21 @@ private void onSuccess(Iterable results) {
flowController.release(nextPublish.messageSize);
}
nextPublish.publishResult.set(messageId);
+ tracer.setPublisherMessageIdSpanAttribute(nextPublish.messageWrapper, messageId);
+ tracer.endPublisherSpan(nextPublish.messageWrapper);
}
}
}
private static final class OutstandingPublish {
final SettableApiFuture publishResult;
- final PubsubMessage message;
+ final PubsubMessageWrapper messageWrapper;
final int messageSize;
- OutstandingPublish(PubsubMessage message) {
+ OutstandingPublish(PubsubMessageWrapper messageWrapper) {
this.publishResult = SettableApiFuture.create();
- this.message = message;
- this.messageSize = message.getSerializedSize();
+ this.messageWrapper = messageWrapper;
+ this.messageSize = messageWrapper.getPubsubMessage().getSerializedSize();
}
}
@@ -749,6 +793,9 @@ public PubsubMessage apply(PubsubMessage input) {
private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION;
private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD;
+ private boolean enableOpenTelemetryTracing = false;
+ private OpenTelemetry openTelemetry = null;
+
private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}
@@ -880,6 +927,26 @@ public Builder setCompressionBytesThreshold(long compressionBytesThreshold) {
return this;
}
+ /**
+ * OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of
+ * OpenTelemetry has been provied. Warning: traces are subject to change. The name and
+ * attributes of a span might change without notice. Only use run traces interactively. Don't
+ * use in automation. Running non-interactive traces can cause problems if the underlying trace
+ * architecture changes without notice.
+ */
+
+ /** Gives the ability to enable Open Telemetry Tracing */
+ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
+ this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
+ return this;
+ }
+
+ /** Sets the instance of OpenTelemetry for the Publisher class. */
+ public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
+ this.openTelemetry = openTelemetry;
+ return this;
+ }
+
/** Returns the default BatchingSettings used by the client if settings are not provided. */
public static BatchingSettings getDefaultBatchingSettings() {
return DEFAULT_BATCHING_SETTINGS;
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java
new file mode 100644
index 000000000..94fd13085
--- /dev/null
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java
@@ -0,0 +1,430 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.SubscriptionName;
+import com.google.pubsub.v1.TopicName;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.context.propagation.TextMapSetter;
+import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
+
+/**
+ * A wrapper class for a {@link PubsubMessage} object that handles creation and tracking of
+ * OpenTelemetry {@link Span} objects for different operations that occur during publishing.
+ */
+public class PubsubMessageWrapper {
+ private PubsubMessage message;
+
+ private final TopicName topicName;
+ private final SubscriptionName subscriptionName;
+
+ // Attributes set only for messages received from a streaming pull response.
+ private final String ackId;
+ private final int deliveryAttempt;
+
+ private static final String PUBLISH_START_EVENT = "publish start";
+ private static final String PUBLISH_END_EVENT = "publish end";
+
+ private static final String MODACK_START_EVENT = "modack start";
+ private static final String MODACK_END_EVENT = "modack end";
+ private static final String NACK_START_EVENT = "nack start";
+ private static final String NACK_END_EVENT = "nack end";
+ private static final String ACK_START_EVENT = "ack start";
+ private static final String ACK_END_EVENT = "ack end";
+
+ private static final String GOOGCLIENT_PREFIX = "googclient_";
+
+ private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result";
+
+ private Span publisherSpan;
+ private Span publishFlowControlSpan;
+ private Span publishBatchingSpan;
+
+ private Span subscriberSpan;
+ private Span subscribeConcurrencyControlSpan;
+ private Span subscribeSchedulerSpan;
+ private Span subscribeProcessSpan;
+
+ private PubsubMessageWrapper(Builder builder) {
+ this.message = builder.message;
+ this.topicName = builder.topicName;
+ this.subscriptionName = builder.subscriptionName;
+ this.ackId = builder.ackId;
+ this.deliveryAttempt = builder.deliveryAttempt;
+ }
+
+ static Builder newBuilder(PubsubMessage message, String topicName) {
+ return new Builder(message, topicName);
+ }
+
+ static Builder newBuilder(
+ PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) {
+ return new Builder(message, subscriptionName, ackId, deliveryAttempt);
+ }
+
+ /** Returns the PubsubMessage associated with this wrapper. */
+ PubsubMessage getPubsubMessage() {
+ return message;
+ }
+
+ void setPubsubMessage(PubsubMessage message) {
+ this.message = message;
+ }
+
+ /** Returns the TopicName for this wrapper as a string. */
+ String getTopicName() {
+ if (topicName != null) {
+ return topicName.getTopic();
+ }
+ return "";
+ }
+
+ String getTopicProject() {
+ if (topicName != null) {
+ return topicName.getProject();
+ }
+ return "";
+ }
+
+ /** Returns the SubscriptionName for this wrapper as a string. */
+ String getSubscriptionName() {
+ if (subscriptionName != null) {
+ return subscriptionName.getSubscription();
+ }
+ return "";
+ }
+
+ String getSubscriptionProject() {
+ if (subscriptionName != null) {
+ return subscriptionName.getProject();
+ }
+ return "";
+ }
+
+ String getMessageId() {
+ return message.getMessageId();
+ }
+
+ String getAckId() {
+ return ackId;
+ }
+
+ int getDataSize() {
+ return message.getData().size();
+ }
+
+ String getOrderingKey() {
+ return message.getOrderingKey();
+ }
+
+ int getDeliveryAttempt() {
+ return deliveryAttempt;
+ }
+
+ Span getPublisherSpan() {
+ return publisherSpan;
+ }
+
+ void setPublisherSpan(Span span) {
+ this.publisherSpan = span;
+ }
+
+ void setPublishFlowControlSpan(Span span) {
+ this.publishFlowControlSpan = span;
+ }
+
+ void setPublishBatchingSpan(Span span) {
+ this.publishBatchingSpan = span;
+ }
+
+ Span getSubscriberSpan() {
+ return subscriberSpan;
+ }
+
+ void setSubscriberSpan(Span span) {
+ this.subscriberSpan = span;
+ }
+
+ void setSubscribeConcurrencyControlSpan(Span span) {
+ this.subscribeConcurrencyControlSpan = span;
+ }
+
+ void setSubscribeSchedulerSpan(Span span) {
+ this.subscribeSchedulerSpan = span;
+ }
+
+ void setSubscribeProcessSpan(Span span) {
+ this.subscribeProcessSpan = span;
+ }
+
+ /** Creates a publish start event that is tied to the publish RPC span time. */
+ void addPublishStartEvent() {
+ if (publisherSpan != null) {
+ publisherSpan.addEvent(PUBLISH_START_EVENT);
+ }
+ }
+
+ /**
+ * Sets the message ID attribute in the publisher parent span. This is called after the publish
+ * RPC returns with a message ID.
+ */
+ void setPublisherMessageIdSpanAttribute(String messageId) {
+ if (publisherSpan != null) {
+ publisherSpan.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId);
+ }
+ }
+
+ /** Ends the publisher parent span if it exists. */
+ void endPublisherSpan() {
+ if (publisherSpan != null) {
+ publisherSpan.addEvent(PUBLISH_END_EVENT);
+ publisherSpan.end();
+ }
+ }
+
+ /** Ends the publish flow control span if it exists. */
+ void endPublishFlowControlSpan() {
+ if (publishFlowControlSpan != null) {
+ publishFlowControlSpan.end();
+ }
+ }
+
+ /** Ends the publish batching span if it exists. */
+ void endPublishBatchingSpan() {
+ if (publishBatchingSpan != null) {
+ publishBatchingSpan.end();
+ }
+ }
+
+ /**
+ * Sets an error status and records an exception when an exception is thrown during flow control.
+ */
+ void setPublishFlowControlSpanException(Throwable t) {
+ if (publishFlowControlSpan != null) {
+ publishFlowControlSpan.setStatus(
+ StatusCode.ERROR, "Exception thrown during publish flow control.");
+ publishFlowControlSpan.recordException(t);
+ endAllPublishSpans();
+ }
+ }
+
+ /**
+ * Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding
+ * RPC span start and end times.
+ */
+ void addModAckStartEvent() {
+ if (subscriberSpan != null) {
+ subscriberSpan.addEvent(MODACK_START_EVENT);
+ }
+ }
+
+ void addModAckEndEvent() {
+ if (subscriberSpan != null) {
+ subscriberSpan.addEvent(MODACK_END_EVENT);
+ }
+ }
+
+ void addNackStartEvent() {
+ if (subscriberSpan != null) {
+ subscriberSpan.addEvent(NACK_START_EVENT);
+ }
+ }
+
+ void addNackEndEvent() {
+ if (subscriberSpan != null) {
+ subscriberSpan.addEvent(NACK_END_EVENT);
+ }
+ }
+
+ void addAckStartEvent() {
+ if (subscriberSpan != null) {
+ subscriberSpan.addEvent(ACK_START_EVENT);
+ }
+ }
+
+ void addAckEndEvent() {
+ if (subscriberSpan != null) {
+ subscriberSpan.addEvent(ACK_END_EVENT);
+ }
+ }
+
+ /** Ends the subscriber parent span if exists. */
+ void endSubscriberSpan() {
+ if (subscriberSpan != null) {
+ subscriberSpan.end();
+ }
+ }
+
+ /** Ends the subscribe concurreny control span if exists. */
+ void endSubscribeConcurrencyControlSpan() {
+ if (subscribeConcurrencyControlSpan != null) {
+ subscribeConcurrencyControlSpan.end();
+ }
+ }
+
+ /** Ends the subscribe scheduler span if exists. */
+ void endSubscribeSchedulerSpan() {
+ if (subscribeSchedulerSpan != null) {
+ subscribeSchedulerSpan.end();
+ }
+ }
+
+ /**
+ * Ends the subscribe process span if it exists, creates an event with the appropriate result, and
+ * sets the result on the parent subscriber span.
+ */
+ void endSubscribeProcessSpan(String action) {
+ if (subscribeProcessSpan != null) {
+ subscribeProcessSpan.addEvent(action + " called");
+ subscribeProcessSpan.end();
+ subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, action);
+ }
+ }
+
+ /** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */
+ void setSubscriberSpanException(Throwable t, String exception) {
+ if (subscriberSpan != null) {
+ subscriberSpan.setStatus(StatusCode.ERROR, exception);
+ subscriberSpan.recordException(t);
+ endAllSubscribeSpans();
+ }
+ }
+
+ /** Sets result of the parent subscriber span to expired and ends its. */
+ void setSubscriberSpanExpirationResult() {
+ if (subscriberSpan != null) {
+ subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, "expired");
+ endSubscriberSpan();
+ }
+ }
+
+ /**
+ * Sets an error status and records an exception when an exception is thrown subscriber
+ * concurrency control.
+ */
+ void setSubscribeConcurrencyControlSpanException(Throwable t) {
+ if (subscribeConcurrencyControlSpan != null) {
+ subscribeConcurrencyControlSpan.setStatus(
+ StatusCode.ERROR, "Exception thrown during subscribe concurrency control.");
+ subscribeConcurrencyControlSpan.recordException(t);
+ endAllSubscribeSpans();
+ }
+ }
+
+ /** Ends all publisher-side spans associated with this message wrapper. */
+ private void endAllPublishSpans() {
+ endPublishFlowControlSpan();
+ endPublishBatchingSpan();
+ endPublisherSpan();
+ }
+
+ /** Ends all subscriber-side spans associated with this message wrapper. */
+ private void endAllSubscribeSpans() {
+ endSubscribeConcurrencyControlSpan();
+ endSubscribeSchedulerSpan();
+ endSubscriberSpan();
+ }
+
+ /**
+ * Injects the span context into the attributes of a Pub/Sub message for propagation to the
+ * subscriber client.
+ */
+ void injectSpanContext() {
+ TextMapSetter injectMessageAttributes =
+ new TextMapSetter() {
+ @Override
+ public void set(PubsubMessageWrapper carrier, String key, String value) {
+ PubsubMessage newMessage =
+ PubsubMessage.newBuilder(carrier.message)
+ .putAttributes(GOOGCLIENT_PREFIX + key, value)
+ .build();
+ carrier.message = newMessage;
+ }
+ };
+ W3CTraceContextPropagator.getInstance()
+ .inject(Context.current().with(publisherSpan), this, injectMessageAttributes);
+ }
+
+ /**
+ * Extracts the span context from the attributes of a Pub/Sub message and creates the parent
+ * subscriber span using that context.
+ */
+ Context extractSpanContext(Attributes attributes) {
+ TextMapGetter extractMessageAttributes =
+ new TextMapGetter() {
+ @Override
+ public String get(PubsubMessageWrapper carrier, String key) {
+ return carrier.message.getAttributesOrDefault(GOOGCLIENT_PREFIX + key, "");
+ }
+
+ public Iterable keys(PubsubMessageWrapper carrier) {
+ return carrier.message.getAttributesMap().keySet();
+ }
+ };
+ Context context =
+ W3CTraceContextPropagator.getInstance()
+ .extract(Context.current(), this, extractMessageAttributes);
+ return context;
+ }
+
+ /** Builder of {@link PubsubMessageWrapper PubsubMessageWrapper}. */
+ static final class Builder {
+ private PubsubMessage message = null;
+ private TopicName topicName = null;
+ private SubscriptionName subscriptionName = null;
+ private String ackId = null;
+ private int deliveryAttempt = 0;
+
+ public Builder(PubsubMessage message, String topicName) {
+ this.message = message;
+ if (topicName != null) {
+ this.topicName = TopicName.parse(topicName);
+ }
+ }
+
+ public Builder(
+ PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) {
+ this.message = message;
+ if (subscriptionName != null) {
+ this.subscriptionName = SubscriptionName.parse(subscriptionName);
+ }
+ this.ackId = ackId;
+ this.deliveryAttempt = deliveryAttempt;
+ }
+
+ public Builder(
+ PubsubMessage message,
+ SubscriptionName subscriptionName,
+ String ackId,
+ int deliveryAttempt) {
+ this.message = message;
+ this.subscriptionName = subscriptionName;
+ this.ackId = ackId;
+ this.deliveryAttempt = deliveryAttempt;
+ }
+
+ public PubsubMessageWrapper build() {
+ return new PubsubMessageWrapper(this);
+ }
+ }
+}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
index 7849bdb74..60da55cee 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
@@ -50,6 +50,7 @@
import com.google.rpc.ErrorInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
+import io.opentelemetry.api.trace.Span;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -118,6 +119,9 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
*/
private final String clientId = UUID.randomUUID().toString();
+ private final boolean enableOpenTelemetryTracing;
+ private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
+
private StreamingSubscriberConnection(Builder builder) {
subscription = builder.subscription;
systemExecutor = builder.systemExecutor;
@@ -151,6 +155,11 @@ private StreamingSubscriberConnection(Builder builder) {
messageDispatcherBuilder = MessageDispatcher.newBuilder(builder.receiverWithAckResponse);
}
+ enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
+ if (builder.tracer != null) {
+ tracer = builder.tracer;
+ }
+
messageDispatcher =
messageDispatcherBuilder
.setAckProcessor(this)
@@ -165,6 +174,9 @@ private StreamingSubscriberConnection(Builder builder) {
.setExecutor(builder.executor)
.setSystemExecutor(builder.systemExecutor)
.setApiClock(builder.clock)
+ .setSubscriptionName(subscription)
+ .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
+ .setTracer(tracer)
.build();
flowControlSettings = builder.flowControlSettings;
@@ -432,15 +444,19 @@ private void sendAckOperations(
for (List ackRequestDataInRequestList :
Lists.partition(ackRequestDataList, MAX_PER_REQUEST_CHANGES)) {
List ackIdsInRequest = new ArrayList<>();
+ List messagesInRequest = new ArrayList<>();
for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
ackIdsInRequest.add(ackRequestData.getAckId());
+ messagesInRequest.add(ackRequestData.getMessageWrapper());
if (ackRequestData.hasMessageFuture()) {
// Add to our pending requests if we care about the response
pendingRequests.add(ackRequestData);
}
}
+ // Creates an Ack span to be passed to the callback
+ Span rpcSpan = tracer.startSubscribeRpcSpan(subscription, "ack", messagesInRequest, 0, false);
ApiFutureCallback callback =
- getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis);
+ getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan);
ApiFuture ackFuture =
subscriberStub
.acknowledgeCallable()
@@ -463,19 +479,32 @@ private void sendModackOperations(
for (List ackRequestDataInRequestList :
Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) {
List ackIdsInRequest = new ArrayList<>();
+ List messagesInRequest = new ArrayList<>();
for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
ackIdsInRequest.add(ackRequestData.getAckId());
+ messagesInRequest.add(ackRequestData.getMessageWrapper());
if (ackRequestData.hasMessageFuture()) {
// Add to our pending requests if we care about the response
pendingRequests.add(ackRequestData);
}
}
+ int deadlineExtensionSeconds = modackRequestData.getDeadlineExtensionSeconds();
+ String rpcOperation = deadlineExtensionSeconds == 0 ? "nack" : "modack";
+ // Creates either a ModAck span or a Nack span depending on the given ack deadline
+ Span rpcSpan =
+ tracer.startSubscribeRpcSpan(
+ subscription,
+ rpcOperation,
+ messagesInRequest,
+ deadlineExtensionSeconds,
+ modackRequestData.getIsReceiptModack());
ApiFutureCallback callback =
getCallback(
modackRequestData.getAckRequestData(),
- modackRequestData.getDeadlineExtensionSeconds(),
+ deadlineExtensionSeconds,
true,
- currentBackoffMillis);
+ currentBackoffMillis,
+ rpcSpan);
ApiFuture modackFuture =
subscriberStub
.modifyAckDeadlineCallable()
@@ -517,22 +546,36 @@ private ApiFutureCallback getCallback(
List ackRequestDataList,
int deadlineExtensionSeconds,
boolean isModack,
- long currentBackoffMillis) {
+ long currentBackoffMillis,
+ Span rpcSpan) {
// This callback handles retries, and sets message futures
// Check if ack or nack
boolean setResponseOnSuccess = (!isModack || (deadlineExtensionSeconds == 0)) ? true : false;
+ boolean rpcSpanSampled = rpcSpan == null ? false : rpcSpan.getSpanContext().isSampled();
+
return new ApiFutureCallback() {
@Override
public void onSuccess(Empty empty) {
ackOperationsWaiter.incrementPendingCount(-1);
+
+ tracer.endSubscribeRpcSpan(rpcSpan);
+
for (AckRequestData ackRequestData : ackRequestDataList) {
// This will check if a response is needed, and if it has already been set
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
messageDispatcher.notifyAckSuccess(ackRequestData);
// Remove from our pending operations
pendingRequests.remove(ackRequestData);
+ tracer.addEndRpcEvent(
+ ackRequestData.getMessageWrapper(),
+ rpcSpanSampled,
+ isModack,
+ deadlineExtensionSeconds);
+ if (!isModack || deadlineExtensionSeconds == 0) {
+ tracer.endSubscriberSpan(ackRequestData.getMessageWrapper());
+ }
}
}
@@ -544,10 +587,23 @@ public void onFailure(Throwable t) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
+ tracer.setSubscribeRpcSpanException(rpcSpan, isModack, deadlineExtensionSeconds, t);
+
if (!getExactlyOnceDeliveryEnabled()) {
+ if (enableOpenTelemetryTracing) {
+ for (AckRequestData ackRequestData : ackRequestDataList) {
+ tracer.addEndRpcEvent(
+ ackRequestData.getMessageWrapper(),
+ rpcSpanSampled,
+ isModack,
+ deadlineExtensionSeconds);
+ if (!isModack || deadlineExtensionSeconds == 0) {
+ tracer.endSubscriberSpan(ackRequestData.getMessageWrapper());
+ }
+ }
+ }
return;
}
-
List ackRequestDataArrayRetryList = new ArrayList<>();
try {
Map metadataMap = getMetadataMapFromThrowable(t);
@@ -569,14 +625,37 @@ public void onFailure(Throwable t) {
errorMessage);
ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess);
messageDispatcher.notifyAckFailed(ackRequestData);
+ tracer.addEndRpcEvent(
+ ackRequestData.getMessageWrapper(),
+ rpcSpanSampled,
+ isModack,
+ deadlineExtensionSeconds);
+ tracer.setSubscriberSpanException(
+ ackRequestData.getMessageWrapper(), t, "Invalid ack ID");
} else {
logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage);
ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
messageDispatcher.notifyAckFailed(ackRequestData);
+ tracer.addEndRpcEvent(
+ ackRequestData.getMessageWrapper(),
+ rpcSpanSampled,
+ isModack,
+ deadlineExtensionSeconds);
+ tracer.setSubscriberSpanException(
+ ackRequestData.getMessageWrapper(), t, "Unknown error message");
+ ackRequestData
+ .getMessageWrapper()
+ .setSubscriberSpanException(t, "Unknown error message");
}
} else {
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
messageDispatcher.notifyAckSuccess(ackRequestData);
+ tracer.endSubscriberSpan(ackRequestData.getMessageWrapper());
+ tracer.addEndRpcEvent(
+ ackRequestData.getMessageWrapper(),
+ rpcSpanSampled,
+ isModack,
+ deadlineExtensionSeconds);
}
// Remove from our pending
pendingRequests.remove(ackRequestData);
@@ -637,6 +716,9 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;
+ private boolean enableOpenTelemetryTracing;
+ private OpenTelemetryPubsubTracer tracer;
+
protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
}
@@ -727,6 +809,16 @@ public Builder setClock(ApiClock clock) {
return this;
}
+ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
+ this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
+ return this;
+ }
+
+ public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
+ this.tracer = tracer;
+ return this;
+ }
+
public StreamingSubscriberConnection build() {
return new StreamingSubscriberConnection(this);
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
index 1723c72b1..e9926fa58 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
@@ -43,6 +43,8 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -117,6 +119,8 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
+ private static final String OPEN_TELEMETRY_TRACER_NAME = "com.google.cloud.pubsub.v1";
+
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
@@ -145,6 +149,10 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private final ApiClock clock;
private final List backgroundResources = new ArrayList<>();
+ private final boolean enableOpenTelemetryTracing;
+ private final OpenTelemetry openTelemetry;
+ private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
+
private Subscriber(Builder builder) {
receiver = builder.receiver;
receiverWithAckResponse = builder.receiverWithAckResponse;
@@ -199,6 +207,16 @@ private Subscriber(Builder builder) {
throw new IllegalStateException(e);
}
+ this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
+ this.openTelemetry = builder.openTelemetry;
+ if (this.openTelemetry != null && this.enableOpenTelemetryTracing) {
+ Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME);
+ if (openTelemetryTracer != null) {
+ this.tracer =
+ new OpenTelemetryPubsubTracer(openTelemetryTracer, this.enableOpenTelemetryTracing);
+ }
+ }
+
streamingSubscriberConnections = new ArrayList(numPullers);
// We regularly look up the distribution for a good subscription deadline.
@@ -386,6 +404,8 @@ private void startStreamingConnections() {
.setExecutor(executor)
.setSystemExecutor(alarmsExecutor)
.setClock(clock)
+ .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
+ .setTracer(tracer)
.build();
streamingSubscriberConnections.add(streamingSubscriberConnection);
@@ -495,6 +515,9 @@ public static final class Builder {
private String endpoint = null;
private String universeDomain = null;
+ private boolean enableOpenTelemetryTracing = false;
+ private OpenTelemetry openTelemetry = null;
+
Builder(String subscription, MessageReceiver receiver) {
this.subscription = subscription;
this.receiver = receiver;
@@ -684,6 +707,26 @@ Builder setClock(ApiClock clock) {
return this;
}
+ /**
+ * OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of
+ * OpenTelemetry has been provied. Warning: traces are subject to change. The name and
+ * attributes of a span might change without notice. Only use run traces interactively. Don't
+ * use in automation. Running non-interactive traces can cause problems if the underlying trace
+ * architecture changes without notice.
+ */
+
+ /** Gives the ability to enable Open Telemetry Tracing */
+ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
+ this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
+ return this;
+ }
+
+ /** Sets the instance of OpenTelemetry for the Publisher class. */
+ public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
+ this.openTelemetry = openTelemetry;
+ return this;
+ }
+
/** Returns the default FlowControlSettings used by the client if settings are not provided. */
public static FlowControlSettings getDefaultFlowControlSettings() {
return DEFAULT_FLOW_CONTROL_SETTINGS;
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java
new file mode 100644
index 000000000..b4433f41e
--- /dev/null
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java
@@ -0,0 +1,669 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.SubscriptionName;
+import com.google.pubsub.v1.TopicName;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.sdk.testing.assertj.AttributesAssert;
+import io.opentelemetry.sdk.testing.assertj.EventDataAssert;
+import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
+import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import io.opentelemetry.sdk.trace.data.LinkData;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.data.StatusData;
+import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+public class OpenTelemetryTest {
+ private static final TopicName FULL_TOPIC_NAME =
+ TopicName.parse("projects/test-project/topics/test-topic");
+ private static final SubscriptionName FULL_SUBSCRIPTION_NAME =
+ SubscriptionName.parse("projects/test-project/subscriptions/test-sub");
+ private static final String PROJECT_NAME = "test-project";
+ private static final String ORDERING_KEY = "abc";
+ private static final String MESSAGE_ID = "m0";
+ private static final String ACK_ID = "def";
+ private static final int DELIVERY_ATTEMPT = 1;
+ private static final int ACK_DEADLINE = 10;
+ private static final boolean EXACTLY_ONCE_ENABLED = true;
+
+ private static final String PUBLISHER_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " create";
+ private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control";
+ private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching";
+ private static final String PUBLISH_RPC_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " publish";
+ private static final String PUBLISH_START_EVENT = "publish start";
+ private static final String PUBLISH_END_EVENT = "publish end";
+
+ private static final String SUBSCRIBER_SPAN_NAME =
+ FULL_SUBSCRIPTION_NAME.getSubscription() + " subscribe";
+ private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME =
+ "subscriber concurrency control";
+ private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler";
+ private static final String SUBSCRIBE_PROCESS_SPAN_NAME =
+ FULL_SUBSCRIPTION_NAME.getSubscription() + " process";
+ private static final String SUBSCRIBE_MODACK_RPC_SPAN_NAME =
+ FULL_SUBSCRIPTION_NAME.getSubscription() + " modack";
+ private static final String SUBSCRIBE_ACK_RPC_SPAN_NAME =
+ FULL_SUBSCRIPTION_NAME.getSubscription() + " ack";
+ private static final String SUBSCRIBE_NACK_RPC_SPAN_NAME =
+ FULL_SUBSCRIPTION_NAME.getSubscription() + " nack";
+
+ private static final String PROCESS_ACTION = "ack";
+ private static final String MODACK_START_EVENT = "modack start";
+ private static final String MODACK_END_EVENT = "modack end";
+ private static final String NACK_START_EVENT = "nack start";
+ private static final String NACK_END_EVENT = "nack end";
+ private static final String ACK_START_EVENT = "ack start";
+ private static final String ACK_END_EVENT = "ack end";
+
+ private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
+ private static final String PROJECT_ATTR_KEY = "gcp.project_id";
+ private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
+ private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
+ private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
+ private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
+ private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
+ private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
+ "messaging.gcp_pubsub.message.exactly_once_delivery";
+ private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result";
+ private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
+ "messaging.gcp_pubsub.message.delivery_attempt";
+
+ private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent";
+
+ private static final OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create();
+
+ @Test
+ public void testPublishSpansSuccess() {
+ openTelemetryTesting.clearSpans();
+
+ PubsubMessageWrapper messageWrapper =
+ PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
+ List messageWrappers = Arrays.asList(messageWrapper);
+
+ long messageSize = messageWrapper.getPubsubMessage().getData().size();
+ Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
+ OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
+
+ // Call all span start/end methods in the expected order
+ tracer.startPublisherSpan(messageWrapper);
+ tracer.startPublishFlowControlSpan(messageWrapper);
+ tracer.endPublishFlowControlSpan(messageWrapper);
+ tracer.startPublishBatchingSpan(messageWrapper);
+ tracer.endPublishBatchingSpan(messageWrapper);
+ Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers);
+ tracer.endPublishRpcSpan(publishRpcSpan);
+ tracer.setPublisherMessageIdSpanAttribute(messageWrapper, MESSAGE_ID);
+ tracer.endPublisherSpan(messageWrapper);
+
+ List allSpans = openTelemetryTesting.getSpans();
+ assertEquals(4, allSpans.size());
+ SpanData flowControlSpanData = allSpans.get(0);
+ SpanData batchingSpanData = allSpans.get(1);
+ SpanData publishRpcSpanData = allSpans.get(2);
+ SpanData publisherSpanData = allSpans.get(3);
+
+ SpanDataAssert flowControlSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(flowControlSpanData);
+ flowControlSpanDataAssert
+ .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME)
+ .hasParent(publisherSpanData)
+ .hasEnded();
+
+ SpanDataAssert batchingSpanDataAssert = OpenTelemetryAssertions.assertThat(batchingSpanData);
+ batchingSpanDataAssert
+ .hasName(PUBLISH_BATCHING_SPAN_NAME)
+ .hasParent(publisherSpanData)
+ .hasEnded();
+
+ // Check span data, links, and attributes for the publish RPC span
+ SpanDataAssert publishRpcSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(publishRpcSpanData);
+ publishRpcSpanDataAssert
+ .hasName(PUBLISH_RPC_SPAN_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasNoParent()
+ .hasEnded();
+
+ List publishRpcLinks = publishRpcSpanData.getLinks();
+ assertEquals(messageWrappers.size(), publishRpcLinks.size());
+ assertEquals(publisherSpanData.getSpanContext(), publishRpcLinks.get(0).getSpanContext());
+
+ assertEquals(6, publishRpcSpanData.getAttributes().size());
+ AttributesAssert publishRpcSpanAttributesAssert =
+ OpenTelemetryAssertions.assertThat(publishRpcSpanData.getAttributes());
+ publishRpcSpanAttributesAssert
+ .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
+ .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic())
+ .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
+ .containsEntry(SemanticAttributes.CODE_FUNCTION, "publishCall")
+ .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "publish")
+ .containsEntry(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messageWrappers.size());
+
+ // Check span data, events, links, and attributes for the publisher create span
+ SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData);
+ publisherSpanDataAssert
+ .hasName(PUBLISHER_SPAN_NAME)
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent()
+ .hasEnded();
+
+ assertEquals(2, publisherSpanData.getEvents().size());
+ EventDataAssert startEventAssert =
+ OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(0));
+ startEventAssert.hasName(PUBLISH_START_EVENT);
+ EventDataAssert endEventAssert =
+ OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(1));
+ endEventAssert.hasName(PUBLISH_END_EVENT);
+
+ List publisherLinks = publisherSpanData.getLinks();
+ assertEquals(1, publisherLinks.size());
+ assertEquals(publishRpcSpanData.getSpanContext(), publisherLinks.get(0).getSpanContext());
+
+ assertEquals(8, publisherSpanData.getAttributes().size());
+ AttributesAssert publisherSpanAttributesAssert =
+ OpenTelemetryAssertions.assertThat(publisherSpanData.getAttributes());
+ publisherSpanAttributesAssert
+ .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
+ .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic())
+ .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
+ .containsEntry(SemanticAttributes.CODE_FUNCTION, "publish")
+ .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create")
+ .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
+ .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
+ .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);
+
+ // Check that the message has the attribute containing the trace context.
+ PubsubMessage message = messageWrapper.getPubsubMessage();
+ assertEquals(1, message.getAttributesMap().size());
+ assertTrue(message.containsAttributes(TRACEPARENT_ATTRIBUTE));
+ assertTrue(
+ message
+ .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "")
+ .contains(publisherSpanData.getTraceId()));
+ assertTrue(
+ message
+ .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "")
+ .contains(publisherSpanData.getSpanId()));
+ }
+
+ @Test
+ public void testPublishFlowControlSpanFailure() {
+ openTelemetryTesting.clearSpans();
+
+ PubsubMessageWrapper messageWrapper =
+ PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
+
+ Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
+ OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
+
+ tracer.startPublisherSpan(messageWrapper);
+ tracer.startPublishFlowControlSpan(messageWrapper);
+
+ Exception e = new Exception("test-exception");
+ tracer.setPublishFlowControlSpanException(messageWrapper, e);
+
+ List allSpans = openTelemetryTesting.getSpans();
+ assertEquals(2, allSpans.size());
+ SpanData flowControlSpanData = allSpans.get(0);
+ SpanData publisherSpanData = allSpans.get(1);
+
+ SpanDataAssert flowControlSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(flowControlSpanData);
+ StatusData expectedStatus =
+ StatusData.create(StatusCode.ERROR, "Exception thrown during publish flow control.");
+ flowControlSpanDataAssert
+ .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME)
+ .hasParent(publisherSpanData)
+ .hasStatus(expectedStatus)
+ .hasException(e)
+ .hasEnded();
+
+ SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData);
+ publisherSpanDataAssert
+ .hasName(PUBLISHER_SPAN_NAME)
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent()
+ .hasEnded();
+ }
+
+ @Test
+ public void testPublishRpcSpanFailure() {
+ openTelemetryTesting.clearSpans();
+
+ PubsubMessageWrapper messageWrapper =
+ PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
+
+ List messageWrappers = Arrays.asList(messageWrapper);
+ Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
+ OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
+
+ tracer.startPublisherSpan(messageWrapper);
+ Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers);
+
+ Exception e = new Exception("test-exception");
+ tracer.setPublishRpcSpanException(publishRpcSpan, e);
+ tracer.endPublisherSpan(messageWrapper);
+
+ List allSpans = openTelemetryTesting.getSpans();
+ assertEquals(2, allSpans.size());
+ SpanData rpcSpanData = allSpans.get(0);
+ SpanData publisherSpanData = allSpans.get(1);
+
+ SpanDataAssert rpcSpanDataAssert = OpenTelemetryAssertions.assertThat(rpcSpanData);
+ StatusData expectedStatus =
+ StatusData.create(StatusCode.ERROR, "Exception thrown on publish RPC.");
+ rpcSpanDataAssert
+ .hasName(PUBLISH_RPC_SPAN_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasStatus(expectedStatus)
+ .hasException(e)
+ .hasEnded();
+
+ SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData);
+ publisherSpanDataAssert
+ .hasName(PUBLISHER_SPAN_NAME)
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent()
+ .hasEnded();
+ }
+
+ @Test
+ public void testSubscribeSpansSuccess() {
+ openTelemetryTesting.clearSpans();
+
+ Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
+ OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
+
+ PubsubMessageWrapper publishMessageWrapper =
+ PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
+ // Initialize the Publisher span to inject the context in the message
+ tracer.startPublisherSpan(publishMessageWrapper);
+ tracer.endPublisherSpan(publishMessageWrapper);
+
+ PubsubMessage publishedMessage =
+ publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build();
+ PubsubMessageWrapper subscribeMessageWrapper =
+ PubsubMessageWrapper.newBuilder(
+ publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1)
+ .build();
+ List subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper);
+
+ long messageSize = subscribeMessageWrapper.getPubsubMessage().getData().size();
+
+ // Call all span start/end methods in the expected order
+ tracer.startSubscriberSpan(subscribeMessageWrapper, EXACTLY_ONCE_ENABLED);
+ tracer.startSubscribeConcurrencyControlSpan(subscribeMessageWrapper);
+ tracer.endSubscribeConcurrencyControlSpan(subscribeMessageWrapper);
+ tracer.startSubscribeSchedulerSpan(subscribeMessageWrapper);
+ tracer.endSubscribeSchedulerSpan(subscribeMessageWrapper);
+ tracer.startSubscribeProcessSpan(subscribeMessageWrapper);
+ tracer.endSubscribeProcessSpan(subscribeMessageWrapper, PROCESS_ACTION);
+ Span subscribeModackRpcSpan =
+ tracer.startSubscribeRpcSpan(
+ FULL_SUBSCRIPTION_NAME.toString(),
+ "modack",
+ subscribeMessageWrappers,
+ ACK_DEADLINE,
+ true);
+ tracer.endSubscribeRpcSpan(subscribeModackRpcSpan);
+ tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, ACK_DEADLINE);
+ Span subscribeAckRpcSpan =
+ tracer.startSubscribeRpcSpan(
+ FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false);
+ tracer.endSubscribeRpcSpan(subscribeAckRpcSpan);
+ tracer.addEndRpcEvent(subscribeMessageWrapper, true, false, 0);
+ Span subscribeNackRpcSpan =
+ tracer.startSubscribeRpcSpan(
+ FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false);
+ tracer.endSubscribeRpcSpan(subscribeNackRpcSpan);
+ tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, 0);
+ tracer.endSubscriberSpan(subscribeMessageWrapper);
+
+ List allSpans = openTelemetryTesting.getSpans();
+ assertEquals(8, allSpans.size());
+
+ SpanData publisherSpanData = allSpans.get(0);
+ SpanData concurrencyControlSpanData = allSpans.get(1);
+ SpanData schedulerSpanData = allSpans.get(2);
+ SpanData processSpanData = allSpans.get(3);
+ SpanData modackRpcSpanData = allSpans.get(4);
+ SpanData ackRpcSpanData = allSpans.get(5);
+ SpanData nackRpcSpanData = allSpans.get(6);
+ SpanData subscriberSpanData = allSpans.get(7);
+
+ SpanDataAssert concurrencyControlSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(concurrencyControlSpanData);
+ concurrencyControlSpanDataAssert
+ .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME)
+ .hasParent(subscriberSpanData)
+ .hasEnded();
+
+ SpanDataAssert schedulerSpanDataAssert = OpenTelemetryAssertions.assertThat(schedulerSpanData);
+ schedulerSpanDataAssert
+ .hasName(SUBSCRIBE_SCHEDULER_SPAN_NAME)
+ .hasParent(subscriberSpanData)
+ .hasEnded();
+
+ SpanDataAssert processSpanDataAssert = OpenTelemetryAssertions.assertThat(processSpanData);
+ processSpanDataAssert
+ .hasName(SUBSCRIBE_PROCESS_SPAN_NAME)
+ .hasParent(subscriberSpanData)
+ .hasEnded();
+
+ assertEquals(1, processSpanData.getEvents().size());
+ EventDataAssert actionCalledEventAssert =
+ OpenTelemetryAssertions.assertThat(processSpanData.getEvents().get(0));
+ actionCalledEventAssert.hasName(PROCESS_ACTION + " called");
+
+ // Check span data, links, and attributes for the modack RPC span
+ SpanDataAssert modackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(modackRpcSpanData);
+ modackRpcSpanDataAssert
+ .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasNoParent()
+ .hasEnded();
+
+ List modackRpcLinks = modackRpcSpanData.getLinks();
+ assertEquals(subscribeMessageWrappers.size(), modackRpcLinks.size());
+ assertEquals(subscriberSpanData.getSpanContext(), modackRpcLinks.get(0).getSpanContext());
+
+ assertEquals(8, modackRpcSpanData.getAttributes().size());
+ AttributesAssert modackRpcSpanAttributesAssert =
+ OpenTelemetryAssertions.assertThat(modackRpcSpanData.getAttributes());
+ modackRpcSpanAttributesAssert
+ .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
+ .containsEntry(
+ SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
+ .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
+ .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations")
+ .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack")
+ .containsEntry(
+ SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size())
+ .containsEntry(ACK_DEADLINE_ATTR_KEY, 10)
+ .containsEntry(RECEIPT_MODACK_ATTR_KEY, true);
+
+ // Check span data, links, and attributes for the ack RPC span
+ SpanDataAssert ackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(ackRpcSpanData);
+ ackRpcSpanDataAssert
+ .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasNoParent()
+ .hasEnded();
+
+ List ackRpcLinks = ackRpcSpanData.getLinks();
+ assertEquals(subscribeMessageWrappers.size(), ackRpcLinks.size());
+ assertEquals(subscriberSpanData.getSpanContext(), ackRpcLinks.get(0).getSpanContext());
+
+ assertEquals(6, ackRpcSpanData.getAttributes().size());
+ AttributesAssert ackRpcSpanAttributesAssert =
+ OpenTelemetryAssertions.assertThat(ackRpcSpanData.getAttributes());
+ ackRpcSpanAttributesAssert
+ .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
+ .containsEntry(
+ SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
+ .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
+ .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendAckOperations")
+ .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "ack")
+ .containsEntry(
+ SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size());
+
+ // Check span data, links, and attributes for the nack RPC span
+ SpanDataAssert nackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(nackRpcSpanData);
+ nackRpcSpanDataAssert
+ .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasNoParent()
+ .hasEnded();
+
+ List nackRpcLinks = nackRpcSpanData.getLinks();
+ assertEquals(subscribeMessageWrappers.size(), nackRpcLinks.size());
+ assertEquals(subscriberSpanData.getSpanContext(), nackRpcLinks.get(0).getSpanContext());
+
+ assertEquals(6, nackRpcSpanData.getAttributes().size());
+ AttributesAssert nackRpcSpanAttributesAssert =
+ OpenTelemetryAssertions.assertThat(nackRpcSpanData.getAttributes());
+ nackRpcSpanAttributesAssert
+ .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
+ .containsEntry(
+ SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
+ .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
+ .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations")
+ .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "nack")
+ .containsEntry(
+ SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size());
+
+ // Check span data, events, links, and attributes for the publisher create span
+ SpanDataAssert subscriberSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData);
+ subscriberSpanDataAssert
+ .hasName(SUBSCRIBER_SPAN_NAME)
+ .hasKind(SpanKind.CONSUMER)
+ .hasParent(publisherSpanData)
+ .hasEnded();
+
+ assertEquals(6, subscriberSpanData.getEvents().size());
+ EventDataAssert startModackEventAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(0));
+ startModackEventAssert.hasName(MODACK_START_EVENT);
+ EventDataAssert endModackEventAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(1));
+ endModackEventAssert.hasName(MODACK_END_EVENT);
+ EventDataAssert startAckEventAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(2));
+ startAckEventAssert.hasName(ACK_START_EVENT);
+ EventDataAssert endAckEventAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(3));
+ endAckEventAssert.hasName(ACK_END_EVENT);
+ EventDataAssert startNackEventAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(4));
+ startNackEventAssert.hasName(NACK_START_EVENT);
+ EventDataAssert endNackEventAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(5));
+ endNackEventAssert.hasName(NACK_END_EVENT);
+
+ List subscriberLinks = subscriberSpanData.getLinks();
+ assertEquals(3, subscriberLinks.size());
+ assertEquals(modackRpcSpanData.getSpanContext(), subscriberLinks.get(0).getSpanContext());
+ assertEquals(ackRpcSpanData.getSpanContext(), subscriberLinks.get(1).getSpanContext());
+ assertEquals(nackRpcSpanData.getSpanContext(), subscriberLinks.get(2).getSpanContext());
+
+ assertEquals(11, subscriberSpanData.getAttributes().size());
+ AttributesAssert subscriberSpanAttributesAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData.getAttributes());
+ subscriberSpanAttributesAssert
+ .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
+ .containsEntry(
+ SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
+ .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
+ .containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse")
+ .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
+ .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
+ .containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID)
+ .containsEntry(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, DELIVERY_ATTEMPT)
+ .containsEntry(MESSAGE_EXACTLY_ONCE_ATTR_KEY, EXACTLY_ONCE_ENABLED)
+ .containsEntry(MESSAGE_RESULT_ATTR_KEY, PROCESS_ACTION)
+ .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);
+ }
+
+ @Test
+ public void testSubscribeConcurrencyControlSpanFailure() {
+ openTelemetryTesting.clearSpans();
+
+ PubsubMessageWrapper messageWrapper =
+ PubsubMessageWrapper.newBuilder(
+ getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
+ .build();
+
+ Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
+ OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
+
+ tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED);
+ tracer.startSubscribeConcurrencyControlSpan(messageWrapper);
+
+ Exception e = new Exception("test-exception");
+ tracer.setSubscribeConcurrencyControlSpanException(messageWrapper, e);
+
+ List allSpans = openTelemetryTesting.getSpans();
+ assertEquals(2, allSpans.size());
+ SpanData concurrencyControlSpanData = allSpans.get(0);
+ SpanData subscriberSpanData = allSpans.get(1);
+
+ SpanDataAssert concurrencyControlSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(concurrencyControlSpanData);
+ StatusData expectedStatus =
+ StatusData.create(
+ StatusCode.ERROR, "Exception thrown during subscribe concurrency control.");
+ concurrencyControlSpanDataAssert
+ .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME)
+ .hasParent(subscriberSpanData)
+ .hasStatus(expectedStatus)
+ .hasException(e)
+ .hasEnded();
+
+ SpanDataAssert subscriberSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData);
+ subscriberSpanDataAssert
+ .hasName(SUBSCRIBER_SPAN_NAME)
+ .hasKind(SpanKind.CONSUMER)
+ .hasNoParent()
+ .hasEnded();
+ }
+
+ @Test
+ public void testSubscriberSpanFailure() {
+ openTelemetryTesting.clearSpans();
+
+ PubsubMessageWrapper messageWrapper =
+ PubsubMessageWrapper.newBuilder(
+ getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
+ .build();
+
+ Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
+ OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
+
+ tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED);
+
+ Exception e = new Exception("test-exception");
+ tracer.setSubscriberSpanException(messageWrapper, e, "Test exception");
+
+ List allSpans = openTelemetryTesting.getSpans();
+ assertEquals(1, allSpans.size());
+ SpanData subscriberSpanData = allSpans.get(0);
+
+ StatusData expectedStatus = StatusData.create(StatusCode.ERROR, "Test exception");
+ SpanDataAssert subscriberSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(subscriberSpanData);
+ subscriberSpanDataAssert
+ .hasName(SUBSCRIBER_SPAN_NAME)
+ .hasKind(SpanKind.CONSUMER)
+ .hasNoParent()
+ .hasStatus(expectedStatus)
+ .hasException(e)
+ .hasEnded();
+ }
+
+ @Test
+ public void testSubscribeRpcSpanFailures() {
+ openTelemetryTesting.clearSpans();
+
+ PubsubMessageWrapper messageWrapper =
+ PubsubMessageWrapper.newBuilder(
+ getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
+ .build();
+ List messageWrappers = Arrays.asList(messageWrapper);
+
+ Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
+ OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
+
+ tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED);
+ Span subscribeModackRpcSpan =
+ tracer.startSubscribeRpcSpan(
+ FULL_SUBSCRIPTION_NAME.toString(), "modack", messageWrappers, ACK_DEADLINE, true);
+ Span subscribeAckRpcSpan =
+ tracer.startSubscribeRpcSpan(
+ FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false);
+ Span subscribeNackRpcSpan =
+ tracer.startSubscribeRpcSpan(
+ FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false);
+
+ Exception e = new Exception("test-exception");
+ tracer.setSubscribeRpcSpanException(subscribeModackRpcSpan, true, ACK_DEADLINE, e);
+ tracer.setSubscribeRpcSpanException(subscribeAckRpcSpan, false, 0, e);
+ tracer.setSubscribeRpcSpanException(subscribeNackRpcSpan, true, 0, e);
+ tracer.endSubscriberSpan(messageWrapper);
+
+ List allSpans = openTelemetryTesting.getSpans();
+ assertEquals(4, allSpans.size());
+ SpanData modackSpanData = allSpans.get(0);
+ SpanData ackSpanData = allSpans.get(1);
+ SpanData nackSpanData = allSpans.get(2);
+ SpanData subscriberSpanData = allSpans.get(3);
+
+ StatusData expectedModackStatus =
+ StatusData.create(StatusCode.ERROR, "Exception thrown on modack RPC.");
+ SpanDataAssert modackSpanDataAssert = OpenTelemetryAssertions.assertThat(modackSpanData);
+ modackSpanDataAssert
+ .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasNoParent()
+ .hasStatus(expectedModackStatus)
+ .hasException(e)
+ .hasEnded();
+
+ StatusData expectedAckStatus =
+ StatusData.create(StatusCode.ERROR, "Exception thrown on ack RPC.");
+ SpanDataAssert ackSpanDataAssert = OpenTelemetryAssertions.assertThat(ackSpanData);
+ ackSpanDataAssert
+ .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasNoParent()
+ .hasStatus(expectedAckStatus)
+ .hasException(e)
+ .hasEnded();
+
+ StatusData expectedNackStatus =
+ StatusData.create(StatusCode.ERROR, "Exception thrown on nack RPC.");
+ SpanDataAssert nackSpanDataAssert = OpenTelemetryAssertions.assertThat(nackSpanData);
+ nackSpanDataAssert
+ .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasNoParent()
+ .hasStatus(expectedNackStatus)
+ .hasException(e)
+ .hasEnded();
+ }
+
+ private PubsubMessage getPubsubMessage() {
+ return PubsubMessage.newBuilder()
+ .setData(ByteString.copyFromUtf8("test-data"))
+ .setOrderingKey(ORDERING_KEY)
+ .build();
+ }
+}
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java
index 9785b7716..fedd17436 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java
@@ -48,6 +48,12 @@
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
+import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -74,6 +80,11 @@ public class PublisherImplTest {
private static final TransportChannelProvider TEST_CHANNEL_PROVIDER =
LocalChannelProvider.create("test-server");
+ private static final String PUBLISHER_SPAN_NAME = TEST_TOPIC.getTopic() + " create";
+ private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control";
+ private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching";
+ private static final String PUBLISH_RPC_SPAN_NAME = TEST_TOPIC.getTopic() + " publish";
+
private FakeScheduledExecutorService fakeExecutor;
private FakePublisherServiceImpl testPublisherServiceImpl;
@@ -1304,6 +1315,70 @@ public void run() {
publish4Completed.await();
}
+ @Test
+ public void testPublishOpenTelemetryTracing() throws Exception {
+ OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create();
+ OpenTelemetry openTelemetry = openTelemetryTesting.getOpenTelemetry();
+ final Publisher publisher =
+ getTestPublisherBuilder()
+ .setBatchingSettings(
+ Publisher.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(1L)
+ .setDelayThreshold(Duration.ofSeconds(5))
+ .setFlowControlSettings(
+ FlowControlSettings.newBuilder()
+ .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
+ .setMaxOutstandingElementCount(2L)
+ .setMaxOutstandingRequestBytes(100L)
+ .build())
+ .build())
+ .setOpenTelemetry(openTelemetry)
+ .setEnableOpenTelemetryTracing(true)
+ .build();
+
+ testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
+ ApiFuture publishFuture = sendTestMessage(publisher, "A");
+ fakeExecutor.advanceTime(Duration.ofSeconds(5));
+ assertEquals("1", publishFuture.get());
+ fakeExecutor.advanceTime(Duration.ofSeconds(5));
+
+ List allSpans = openTelemetryTesting.getSpans();
+ assertEquals(4, allSpans.size());
+ SpanData flowControlSpanData = allSpans.get(0);
+ SpanData batchingSpanData = allSpans.get(1);
+ SpanData publishRpcSpanData = allSpans.get(2);
+ SpanData publisherSpanData = allSpans.get(3);
+
+ SpanDataAssert flowControlSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(flowControlSpanData);
+ flowControlSpanDataAssert
+ .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME)
+ .hasParent(publisherSpanData)
+ .hasEnded();
+
+ SpanDataAssert batchingSpanDataAssert = OpenTelemetryAssertions.assertThat(batchingSpanData);
+ batchingSpanDataAssert
+ .hasName(PUBLISH_BATCHING_SPAN_NAME)
+ .hasParent(publisherSpanData)
+ .hasEnded();
+
+ SpanDataAssert publishRpcSpanDataAssert =
+ OpenTelemetryAssertions.assertThat(publishRpcSpanData);
+ publishRpcSpanDataAssert
+ .hasName(PUBLISH_RPC_SPAN_NAME)
+ .hasKind(SpanKind.CLIENT)
+ .hasNoParent()
+ .hasEnded();
+
+ SpanDataAssert publishSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData);
+ publishSpanDataAssert
+ .hasName(PUBLISHER_SPAN_NAME)
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent()
+ .hasEnded();
+ }
+
private Builder getTestPublisherBuilder() {
return Publisher.newBuilder(TEST_TOPIC)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
diff --git a/pom.xml b/pom.xml
index 33ba52966..2c816aabf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,12 @@
+
+ org.assertj
+ assertj-core
+ 3.26.0
+ test
+
diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml
index 604b622d3..6196121dd 100644
--- a/samples/install-without-bom/pom.xml
+++ b/samples/install-without-bom/pom.xml
@@ -93,6 +93,11 @@
google-cloud-storage
2.43.1
+
+ com.google.cloud.opentelemetry
+ exporter-trace
+ 0.31.0
+
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 12bf9d92c..3e5f9e9d5 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -92,6 +92,11 @@
google-cloud-storage
2.43.1
+
+ com.google.cloud.opentelemetry
+ exporter-trace
+ 0.31.0
+
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 8720ef215..ba6fb65d9 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -67,6 +67,11 @@
com.google.cloud
google-cloud-storage
+
+ com.google.cloud.opentelemetry
+ exporter-trace
+ 0.31.0
+
org.apache.avro
avro