diff --git a/google-cloud-pubsub/pom.xml b/google-cloud-pubsub/pom.xml
index 1115d8bfa..3604f86b4 100644
--- a/google-cloud-pubsub/pom.xml
+++ b/google-cloud-pubsub/pom.xml
@@ -103,12 +103,12 @@
io.opentelemetry
opentelemetry-api
- 1.38.0
+ 1.39.0
io.opentelemetry
opentelemetry-context
- 1.38.0
+ 1.39.0
io.opentelemetry
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java
index 3b67ce219..2bbdb1d75 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java
@@ -22,10 +22,12 @@
public class AckRequestData {
private final String ackId;
private final Optional> messageFuture;
+ private PubsubMessageWrapper messageWrapper;
protected AckRequestData(Builder builder) {
this.ackId = builder.ackId;
this.messageFuture = builder.messageFuture;
+ this.messageWrapper = builder.messageWrapper;
}
public String getAckId() {
@@ -36,6 +38,10 @@ public SettableApiFuture getMessageFutureIfExists() {
return this.messageFuture.orElse(null);
}
+ public PubsubMessageWrapper getMessageWrapper() {
+ return messageWrapper;
+ }
+
public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
switch (ackResponse) {
@@ -68,6 +74,7 @@ public static Builder newBuilder(String ackId) {
protected static final class Builder {
private final String ackId;
private Optional> messageFuture = Optional.empty();
+ private PubsubMessageWrapper messageWrapper;
protected Builder(String ackId) {
this.ackId = ackId;
@@ -78,6 +85,11 @@ public Builder setMessageFuture(SettableApiFuture messageFuture) {
return this;
}
+ public Builder setMessageWrapper(PubsubMessageWrapper messageWrapper) {
+ this.messageWrapper = messageWrapper;
+ return this;
+ }
+
public AckRequestData build() {
return new AckRequestData(this);
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
index 1810badd2..a766efb18 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
@@ -28,6 +28,8 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriptionName;
+import io.opentelemetry.api.trace.Tracer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -104,6 +106,10 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;
+ private final String subscriptionName;
+ private final boolean enableOpenTelemetryTracing;
+ private final Tracer tracer;
+
/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
@@ -157,6 +163,7 @@ public void onFailure(Throwable t) {
t);
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
+ this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
forget();
}
@@ -169,9 +176,11 @@ public void onSuccess(AckReply reply) {
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
+ this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("ack");
break;
case NACK:
pendingNacks.add(this.ackRequestData);
+ this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
@@ -217,6 +226,10 @@ private MessageDispatcher(Builder builder) {
jobLock = new ReentrantLock();
messagesWaiter = new Waiter();
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);
+
+ subscriptionName = builder.subscriptionName;
+ enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
+ tracer = builder.tracer;
}
private boolean shouldSetMessageFuture() {
@@ -351,13 +364,15 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
}
private static class OutstandingMessage {
- private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;
- private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
- this.receivedMessage = receivedMessage;
+ private OutstandingMessage(AckHandler ackHandler) {
this.ackHandler = ackHandler;
}
+
+ public PubsubMessageWrapper messageWrapper() {
+ return this.ackHandler.ackRequestData.getMessageWrapper();
+ }
}
private static class ReceiptCompleteData {
@@ -390,10 +405,21 @@ void processReceivedMessages(List messages) {
if (shouldSetMessageFuture()) {
builder.setMessageFuture(SettableApiFuture.create());
}
+ PubsubMessageWrapper messageWrapper =
+ PubsubMessageWrapper.newBuilder(
+ message.getMessage(),
+ SubscriptionName.parse(subscriptionName),
+ message.getAckId(),
+ message.getDeliveryAttempt(),
+ enableOpenTelemetryTracing)
+ .build();
+ builder.setMessageWrapper(messageWrapper);
+ messageWrapper.startSubscriberSpan(tracer, this.exactlyOnceDeliveryEnabled.get());
+
AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
- OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
+ OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler);
if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
@@ -457,30 +483,39 @@ private void processBatch(List batch) {
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
+ message.messageWrapper().startSubscribeConcurrencyControlSpan(tracer);
try {
- flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
+ flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
+ message.messageWrapper().endSubscribeConcurrencyControlSpan();
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
+ message.messageWrapper().setSubscribeConcurrencyControlSpanException(unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
- processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
+ addDeliveryInfoCount(message.messageWrapper());
+ processOutstandingMessage(message.ackHandler);
}
}
- private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
- PubsubMessage originalMessage = receivedMessage.getMessage();
- int deliveryAttempt = receivedMessage.getDeliveryAttempt();
+ private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) {
+ PubsubMessage originalMessage = messageWrapper.getPubsubMessage();
+ int deliveryAttempt = messageWrapper.getDeliveryAttempt();
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
if (deliveryAttempt > 0) {
- return PubsubMessage.newBuilder(originalMessage)
- .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
- .build();
+ messageWrapper.setPubsubMessage(
+ PubsubMessage.newBuilder(originalMessage)
+ .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
+ .build());
}
- return originalMessage;
}
- private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
+ private void processOutstandingMessage(final AckHandler ackHandler) {
+ // Get the PubsubMessageWrapper and the PubsubMessage it wraps that are stored withing the
+ // AckHandler object.
+ PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper();
+ PubsubMessage message = messageWrapper.getPubsubMessage();
+
// This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
// use below in the consumers
SettableApiFuture ackReplySettableApiFuture = SettableApiFuture.create();
@@ -499,8 +534,10 @@ public void run() {
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
+ messageWrapper.setSubscriberSpanExpirationResult();
return;
}
+ messageWrapper.startSubscribeProcessSpan(tracer);
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture messageFuture =
@@ -521,7 +558,9 @@ public void run() {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
+ messageWrapper.startSubscribeSchedulerSpan(tracer);
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
+ messageWrapper.endSubscribeSchedulerSpan();
}
}
@@ -607,8 +646,10 @@ void processOutstandingOperations() {
List ackRequestDataReceipts = new ArrayList();
pendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
- modackRequestData.add(
- new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
+ ModackRequestData receiptModack =
+ new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
+ receiptModack.setIsReceiptModack(true);
+ modackRequestData.add(receiptModack);
}
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());
@@ -645,6 +686,10 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;
+ private String subscriptionName;
+ private boolean enableOpenTelemetryTracing;
+ private Tracer tracer;
+
protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
}
@@ -715,6 +760,21 @@ public Builder setApiClock(ApiClock clock) {
return this;
}
+ public Builder setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ return this;
+ }
+
+ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
+ this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
+ return this;
+ }
+
+ public Builder setTracer(Tracer tracer) {
+ this.tracer = tracer;
+ return this;
+ }
+
public MessageDispatcher build() {
return new MessageDispatcher(this);
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java
index b4d2dae0f..54c7436af 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java
@@ -21,6 +21,7 @@
class ModackRequestData {
private final int deadlineExtensionSeconds;
private List ackRequestData;
+ private boolean isReceiptModack;
ModackRequestData(int deadlineExtensionSeconds) {
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
@@ -45,8 +46,17 @@ public List getAckRequestData() {
return ackRequestData;
}
+ public boolean getIsReceiptModack() {
+ return isReceiptModack;
+ }
+
public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
this.ackRequestData.add(ackRequestData);
return this;
}
+
+ public ModackRequestData setIsReceiptModack(boolean isReceiptModack) {
+ this.isReceiptModack = isReceiptModack;
+ return this;
+ }
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java
index 37cc05f7e..6ee6bbccb 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java
@@ -16,6 +16,7 @@
package com.google.cloud.pubsub.v1;
+import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
@@ -28,27 +29,31 @@
public class OpenTelemetryUtil {
private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
- private static final String PROJECT_ATTR_KEY = "gcp_pubsub.project_id";
+ private static final String PROJECT_ATTR_KEY = "gcp.project_id";
+ private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
+ private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish";
/** Populates attributes that are common the publisher parent span and publish RPC span. */
- public static final AttributesBuilder createPublishSpanAttributesBuilder(
- TopicName topicName, String codeFunction, String operation) {
+ public static final AttributesBuilder createCommonSpanAttributesBuilder(
+ String destinationName, String projectName, String codeFunction, String operation) {
AttributesBuilder attributesBuilder =
Attributes.builder()
.put(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
- .put(SemanticAttributes.MESSAGING_DESTINATION_NAME, topicName.getTopic())
- .put(PROJECT_ATTR_KEY, topicName.getProject())
- .put(SemanticAttributes.CODE_FUNCTION, codeFunction)
- .put(SemanticAttributes.MESSAGING_OPERATION, operation);
+ .put(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName)
+ .put(PROJECT_ATTR_KEY, projectName)
+ .put(SemanticAttributes.CODE_FUNCTION, codeFunction);
+ if (operation != null) {
+ attributesBuilder.put(SemanticAttributes.MESSAGING_OPERATION, operation);
+ }
return attributesBuilder;
}
/**
* Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional
- * links with the publisher parent span are created for each message in the batch.
+ * links with the publisher parent span are created for sampled messages in the batch.
*/
public static final Span startPublishRpcSpan(
Tracer tracer,
@@ -57,7 +62,8 @@ public static final Span startPublishRpcSpan(
boolean enableOpenTelemetryTracing) {
if (enableOpenTelemetryTracing && tracer != null) {
Attributes attributes =
- createPublishSpanAttributesBuilder(topicName, "Publisher.publishCall", "publish")
+ createCommonSpanAttributesBuilder(
+ topicName.getTopic(), topicName.getProject(), "Publisher.publishCall", "publish")
.put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size())
.build();
Span publishRpcSpan =
@@ -68,11 +74,13 @@ public static final Span startPublishRpcSpan(
.startSpan();
for (PubsubMessageWrapper message : messages) {
- Attributes linkAttributes =
- Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build();
- publishRpcSpan.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes);
- message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes);
- message.addPublishStartEvent();
+ if (message.getPublisherSpan().getSpanContext().isSampled()) {
+ Attributes linkAttributes =
+ Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build();
+ publishRpcSpan.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes);
+ message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes);
+ message.addPublishStartEvent();
+ }
}
return publishRpcSpan;
}
@@ -99,4 +107,76 @@ public static final void setPublishRpcSpanException(
endPublishRpcSpan(publishRpcSpan, enableOpenTelemetryTracing);
}
}
+
+ /**
+ * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links
+ * to parent subscribe span for sampled messages are added.
+ */
+ public static final Span startSubscribeRpcSpan(
+ Tracer tracer,
+ SubscriptionName subscriptionName,
+ String rpcOperation,
+ List messages,
+ int ackDeadline,
+ boolean isReceiptModack,
+ boolean enableOpenTelemetryTracing) {
+ if (enableOpenTelemetryTracing && tracer != null) {
+ String codeFunction =
+ rpcOperation == "ack"
+ ? "StreamingSubscriberConnection.sendAckOperations"
+ : "StreamingSubscriberConnection.sendModAckOperations";
+ AttributesBuilder attributesBuilder =
+ createCommonSpanAttributesBuilder(
+ subscriptionName.getSubscription(),
+ subscriptionName.getProject(),
+ codeFunction,
+ rpcOperation)
+ .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size());
+
+ // Ack deadline and receipt modack are specific to the modack operation
+ if (rpcOperation == "modack") {
+ attributesBuilder
+ .put(ACK_DEADLINE_ATTR_KEY, ackDeadline)
+ .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack);
+ }
+
+ Span rpcSpan =
+ tracer
+ .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation)
+ .setSpanKind(SpanKind.CLIENT)
+ .setAllAttributes(attributesBuilder.build())
+ .startSpan();
+
+ for (PubsubMessageWrapper message : messages) {
+ if (message.getSubscriberSpan().getSpanContext().isSampled()) {
+ Attributes linkAttributes =
+ Attributes.builder()
+ .put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation)
+ .build();
+ rpcSpan.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes);
+ message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes);
+ switch (rpcOperation) {
+ case "ack":
+ message.addAckStartEvent();
+ break;
+ case "modack":
+ message.addModAckStartEvent();
+ break;
+ case "nack":
+ message.addNackStartEvent();
+ break;
+ }
+ }
+ }
+ return rpcSpan;
+ }
+ return null;
+ }
+
+ /** Ends the given subscribe RPC span if it exists. */
+ public static final void endSubscribeRpcSpan(Span rpcSpan, boolean enableOpenTelemetryTracing) {
+ if (enableOpenTelemetryTracing && rpcSpan != null) {
+ rpcSpan.end();
+ }
+ }
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java
index 1701d53f2..d5547062e 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java
@@ -275,7 +275,9 @@ public ApiFuture publish(PubsubMessage message) {
PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
- messageTransform.apply(message), topicName, enableOpenTelemetryTracing)
+ messageTransform.apply(message),
+ TopicName.parse(topicName),
+ enableOpenTelemetryTracing)
.build();
messageWrapper.startPublisherSpan(tracer);
@@ -622,7 +624,7 @@ private void onSuccess(Iterable results) {
flowController.release(nextPublish.messageSize);
}
nextPublish.publishResult.set(messageId);
- nextPublish.messageWrapper.setMessageIdSpanAttribute(messageId);
+ nextPublish.messageWrapper.setPublisherMessageIdSpanAttribute(messageId);
nextPublish.messageWrapper.endPublisherSpan();
}
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java
index cc00248df..78e6625fe 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java
@@ -18,7 +18,9 @@
import com.google.common.base.Preconditions;
import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
+import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
@@ -26,6 +28,7 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
@@ -35,36 +38,83 @@
*/
public class PubsubMessageWrapper {
private PubsubMessage message;
+ private final boolean enableOpenTelemetryTracing;
private final TopicName topicName;
+ private final SubscriptionName subscriptionName;
- private final boolean enableOpenTelemetryTracing;
+ // Attributes set only for messages received from a streaming pull response.
+ private final String ackId;
+ private final int deliveryAttempt;
- private final String PUBLISHER_SPAN_NAME;
+ private String PUBLISHER_SPAN_NAME;
private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control";
private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching";
private static final String PUBLISH_START_EVENT = "publish start";
private static final String PUBLISH_END_EVENT = "publish end";
+ private String SUBSCRIBER_SPAN_NAME;
+ private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME =
+ "subscriber concurrency control";
+ private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler";
+ private String SUBSCRIBE_PROCESS_SPAN_NAME;
+ private static final String MODACK_START_EVENT = "modack start";
+ private static final String MODACK_END_EVENT = "modack end";
+ private static final String NACK_START_EVENT = "nack start";
+ private static final String NACK_END_EVENT = "nack end";
+ private static final String ACK_START_EVENT = "ack start";
+ private static final String ACK_END_EVENT = "ack start";
+
private static final String GOOGCLIENT_PREFIX = "googclient_";
- private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.envelope.size";
+ private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
+ private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
+ private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
+ private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
+ "messaging.gcp_pubsub.message.exactly_once_delivery";
+ private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result";
+ private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
+ "messaging.gcp_pubsub.message.delivery_attempt";
private Span publisherSpan;
private Span publishFlowControlSpan;
private Span publishBatchingSpan;
+ private Span subscriberSpan;
+ private Span subscribeConcurrencyControlSpan;
+ private Span subscribeSchedulerSpan;
+ private Span subscribeProcessSpan;
+
public PubsubMessageWrapper(Builder builder) {
this.message = builder.message;
this.topicName = builder.topicName;
+ this.subscriptionName = builder.subscriptionName;
+ this.ackId = builder.ackId;
+ this.deliveryAttempt = builder.deliveryAttempt;
this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
- this.PUBLISHER_SPAN_NAME = builder.topicName.getTopic() + " create";
+ if (this.topicName != null) {
+ this.PUBLISHER_SPAN_NAME = builder.topicName.getTopic() + " create";
+ }
+ if (this.subscriptionName != null) {
+ this.SUBSCRIBER_SPAN_NAME = builder.subscriptionName.getSubscription() + " subscribe";
+ this.SUBSCRIBE_PROCESS_SPAN_NAME = builder.subscriptionName.getSubscription() + " process";
+ }
}
public static Builder newBuilder(
- PubsubMessage message, String fullTopicName, boolean enableOpenTelemetryTracing) {
- return new Builder(message, TopicName.parse(fullTopicName), enableOpenTelemetryTracing);
+ PubsubMessage message, TopicName topicName, boolean enableOpenTelemetryTracing) {
+ return new Builder(message, topicName, enableOpenTelemetryTracing);
+ }
+
+ public static Builder newBuilder(
+ PubsubMessage message,
+ SubscriptionName subscriptionName,
+ String ackId,
+ int deliveryAttempt,
+ boolean enableOpenTelemetryTracing) {
+ return new Builder(
+ message, subscriptionName, ackId, deliveryAttempt, enableOpenTelemetryTracing);
}
/** Returns the PubsubMessage associated with this wrapper. */
@@ -72,11 +122,26 @@ public PubsubMessage getPubsubMessage() {
return message;
}
- /** Returns the parent span for this message wrapper. */
+ /** Returns the parent publisher span for this message wrapper. */
public Span getPublisherSpan() {
return publisherSpan;
}
+ /** Returns the parent subscriber span for this message wrapper. */
+ public Span getSubscriberSpan() {
+ return subscriberSpan;
+ }
+
+ /** Returns the delivery attempt for the received PubsubMessage. */
+ public int getDeliveryAttempt() {
+ return deliveryAttempt;
+ }
+
+ /** Sets the PubsubMessage for this wrapper. */
+ public void setPubsubMessage(PubsubMessage message) {
+ this.message = message;
+ }
+
/**
* Creates and starts the parent span with the appropriate span attributes and injects the span
* context into the {@link PubsubMessage} attributes.
@@ -84,9 +149,10 @@ public Span getPublisherSpan() {
public void startPublisherSpan(Tracer tracer) {
if (enableOpenTelemetryTracing && tracer != null) {
AttributesBuilder attributesBuilder =
- OpenTelemetryUtil.createPublishSpanAttributesBuilder(
- topicName, "Publisher.publish", "create");
- attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getSerializedSize());
+ OpenTelemetryUtil.createCommonSpanAttributesBuilder(
+ topicName.getTopic(), topicName.getProject(), "Publisher.publish", "create");
+
+ attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getData().size());
if (!message.getOrderingKey().isEmpty()) {
attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
}
@@ -138,7 +204,7 @@ public void addPublishEndEvent() {
* Sets the message ID attribute in the publisher parent span. This is called after the publish
* RPC returns with a message ID.
*/
- public void setMessageIdSpanAttribute(String messageId) {
+ public void setPublisherMessageIdSpanAttribute(String messageId) {
if (enableOpenTelemetryTracing && publisherSpan != null) {
publisherSpan.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId);
}
@@ -190,44 +256,250 @@ public void setPublishBatchingSpanException(Throwable t) {
}
}
+ /**
+ * Creates the subscriber parent span using span context propagated in the message attributes and
+ * sets the appropriate span attributes.
+ */
+ public void startSubscriberSpan(Tracer tracer, boolean exactlyOnceDeliveryEnabled) {
+ if (enableOpenTelemetryTracing && tracer != null) {
+ AttributesBuilder attributesBuilder =
+ OpenTelemetryUtil.createCommonSpanAttributesBuilder(
+ subscriptionName.getSubscription(),
+ subscriptionName.getProject(),
+ "StreamingSubscriberConnection.onResponse",
+ null);
+
+ attributesBuilder
+ .put(MESSAGE_SIZE_ATTR_KEY, message.getData().size())
+ .put(MESSAGE_ACK_ID_ATTR_KEY, ackId)
+ .put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled);
+ if (!message.getOrderingKey().isEmpty()) {
+ attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
+ }
+ if (deliveryAttempt > 0) {
+ attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, deliveryAttempt);
+ }
+ subscriberSpan = extractSpanContext(tracer, attributesBuilder.build());
+ }
+ }
+
+ /** Creates a span for subscribe concurrency control as a child of the parent subscriber span. */
+ public void startSubscribeConcurrencyControlSpan(Tracer tracer) {
+ if (enableOpenTelemetryTracing && tracer != null) {
+ subscribeConcurrencyControlSpan =
+ startChildSpan(tracer, SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME, subscriberSpan);
+ }
+ }
+
+ /**
+ * Creates a span for subscribe ordering key scheduling as a child of the parent subscriber span.
+ */
+ public void startSubscribeSchedulerSpan(Tracer tracer) {
+ if (enableOpenTelemetryTracing && tracer != null) {
+ subscribeSchedulerSpan =
+ startChildSpan(tracer, SUBSCRIBE_SCHEDULER_SPAN_NAME, subscriberSpan);
+ }
+ }
+
+ /** Creates a span for subscribe message processing as a child of the parent subscriber span. */
+ public void startSubscribeProcessSpan(Tracer tracer) {
+ if (enableOpenTelemetryTracing && tracer != null) {
+ subscribeProcessSpan = startChildSpan(tracer, SUBSCRIBE_PROCESS_SPAN_NAME, subscriberSpan);
+ subscribeProcessSpan.setAttribute(
+ SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE);
+ if (publisherSpan != null) {
+ subscribeProcessSpan.addLink(publisherSpan.getSpanContext());
+ }
+ }
+ }
+
+ /**
+ * Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding
+ * RPC span start and end times.
+ */
+ public void addModAckStartEvent() {
+ if (enableOpenTelemetryTracing && subscriberSpan != null) {
+ subscriberSpan.addEvent(MODACK_START_EVENT);
+ }
+ }
+
+ public void addModAckEndEvent() {
+ if (enableOpenTelemetryTracing && subscriberSpan != null) {
+ subscriberSpan.addEvent(MODACK_END_EVENT);
+ }
+ }
+
+ public void addNackStartEvent() {
+ if (enableOpenTelemetryTracing && subscriberSpan != null) {
+ subscriberSpan.addEvent(NACK_START_EVENT);
+ }
+ }
+
+ public void addNackEndEvent() {
+ if (enableOpenTelemetryTracing && subscriberSpan != null) {
+ subscriberSpan.addEvent(NACK_END_EVENT);
+ }
+ }
+
+ public void addAckStartEvent() {
+ if (enableOpenTelemetryTracing && subscriberSpan != null) {
+ subscriberSpan.addEvent(ACK_START_EVENT);
+ }
+ }
+
+ public void addAckEndEvent() {
+ if (enableOpenTelemetryTracing && subscriberSpan != null) {
+ subscriberSpan.addEvent(ACK_END_EVENT);
+ }
+ }
+
+ public void addEndRpcEvent(boolean isModack, int ackDeadline) {
+ if (!isModack) {
+ addAckEndEvent();
+ } else if (ackDeadline == 0) {
+ addNackEndEvent();
+ } else {
+ addModAckEndEvent();
+ }
+ }
+
+ /** Ends the subscriber parent span if exists. */
+ public void endSubscriberSpan() {
+ if (enableOpenTelemetryTracing && subscriberSpan != null) {
+ subscriberSpan.end();
+ }
+ }
+
+ /** Ends the subscribe concurreny control span if exists. */
+ public void endSubscribeConcurrencyControlSpan() {
+ if (enableOpenTelemetryTracing && subscribeConcurrencyControlSpan != null) {
+ subscribeConcurrencyControlSpan.end();
+ }
+ }
+
+ /** Ends the subscribe scheduler span if exists. */
+ public void endSubscribeSchedulerSpan() {
+ if (enableOpenTelemetryTracing && subscribeSchedulerSpan != null) {
+ subscribeSchedulerSpan.end();
+ }
+ }
+
+ /**
+ * Ends the subscribe process span if it exists, creates an event with the appropriate result, and
+ * sets the result on the parent subscriber span.
+ */
+ public void endSubscribeProcessSpan(String action) {
+ if (enableOpenTelemetryTracing && subscribeProcessSpan != null) {
+ subscribeProcessSpan.addEvent(action + " called");
+ subscribeProcessSpan.end();
+ subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, action);
+ }
+ }
+
+ /** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */
+ public void setSubscriberSpanException(Throwable t, String exception) {
+ if (enableOpenTelemetryTracing && subscriberSpan != null) {
+ subscriberSpan.setStatus(StatusCode.ERROR, exception);
+ subscriberSpan.recordException(t);
+ endAllSubscribeSpans();
+ }
+ }
+
+ /** Sets result of the parent subscriber span to expired and ends its. */
+ public void setSubscriberSpanExpirationResult() {
+ if (enableOpenTelemetryTracing && subscriberSpan != null) {
+ subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, "expired");
+ endSubscriberSpan();
+ }
+ }
+
+ /**
+ * Sets an error status and records an exception when an exception is thrown subscriber
+ * concurrency control.
+ */
+ public void setSubscribeConcurrencyControlSpanException(Throwable t) {
+ if (enableOpenTelemetryTracing && subscribeConcurrencyControlSpan != null) {
+ subscribeConcurrencyControlSpan.setStatus(
+ StatusCode.ERROR, "Exception thrown during subscribe concurrency control.");
+ subscribeConcurrencyControlSpan.recordException(t);
+ endAllSubscribeSpans();
+ }
+ }
+
/** Creates a child span of the given parent span. */
private Span startChildSpan(Tracer tracer, String name, Span parent) {
return tracer.spanBuilder(name).setParent(Context.current().with(parent)).startSpan();
}
- /** Ends all spans associated with this message wrapper. */
+ /** Ends all publisher-side spans associated with this message wrapper. */
private void endAllPublishSpans() {
endPublishFlowControlSpan();
endPublishBatchingSpan();
endPublisherSpan();
}
+ /** Ends all subscriber-side spans associated with this message wrapper. */
+ private void endAllSubscribeSpans() {
+ endSubscribeConcurrencyControlSpan();
+ endSubscribeSchedulerSpan();
+ endSubscriberSpan();
+ }
+
/**
* Injects the span context into the attributes of a Pub/Sub message for propagation to the
* subscriber client.
*/
private void injectSpanContext() {
- if (enableOpenTelemetryTracing && publisherSpan != null) {
- TextMapSetter injectMessageAttributes =
- new TextMapSetter() {
- @Override
- public void set(PubsubMessageWrapper carrier, String key, String value) {
- PubsubMessage newMessage =
- PubsubMessage.newBuilder(carrier.message)
- .putAttributes(GOOGCLIENT_PREFIX + key, value)
- .build();
- carrier.message = newMessage;
- }
- };
- W3CTraceContextPropagator.getInstance()
- .inject(Context.current().with(publisherSpan), this, injectMessageAttributes);
- }
+ TextMapSetter injectMessageAttributes =
+ new TextMapSetter() {
+ @Override
+ public void set(PubsubMessageWrapper carrier, String key, String value) {
+ PubsubMessage newMessage =
+ PubsubMessage.newBuilder(carrier.message)
+ .putAttributes(GOOGCLIENT_PREFIX + key, value)
+ .build();
+ carrier.message = newMessage;
+ }
+ };
+ W3CTraceContextPropagator.getInstance()
+ .inject(Context.current().with(publisherSpan), this, injectMessageAttributes);
+ }
+
+ /**
+ * Extracts the span context from the attributes of a Pub/Sub message and creates the parent
+ * subscriber span using that context.
+ */
+ private Span extractSpanContext(Tracer tracer, Attributes attributes) {
+ TextMapGetter extractMessageAttributes =
+ new TextMapGetter() {
+ @Override
+ public String get(PubsubMessageWrapper carrier, String key) {
+ return carrier.message.getAttributesOrDefault(GOOGCLIENT_PREFIX + key, "");
+ }
+
+ public Iterable keys(PubsubMessageWrapper carrier) {
+ return carrier.message.getAttributesMap().keySet();
+ }
+ };
+ Context context =
+ W3CTraceContextPropagator.getInstance()
+ .extract(Context.current(), this, extractMessageAttributes);
+ publisherSpan = Span.fromContextOrNull(context);
+ return tracer
+ .spanBuilder(SUBSCRIBER_SPAN_NAME)
+ .setSpanKind(SpanKind.CONSUMER)
+ .setParent(context)
+ .setAllAttributes(attributes)
+ .startSpan();
}
/** Builder of {@link PubsubMessageWrapper PubsubMessageWrapper}. */
protected static final class Builder {
private PubsubMessage message = null;
private TopicName topicName = null;
+ private SubscriptionName subscriptionName = null;
+ private String ackId = null;
+ private int deliveryAttempt = 0;
private boolean enableOpenTelemetryTracing = false;
public Builder(PubsubMessage message, TopicName topicName, boolean enableOpenTelemetryTracing) {
@@ -236,8 +508,21 @@ public Builder(PubsubMessage message, TopicName topicName, boolean enableOpenTel
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
}
+ public Builder(
+ PubsubMessage message,
+ SubscriptionName subscriptionName,
+ String ackId,
+ int deliveryAttempt,
+ boolean enableOpenTelemetryTracing) {
+ this.message = message;
+ this.subscriptionName = subscriptionName;
+ this.ackId = ackId;
+ this.deliveryAttempt = deliveryAttempt;
+ this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
+ }
+
public PubsubMessageWrapper build() {
- Preconditions.checkArgument(this.topicName != null);
+ Preconditions.checkArgument(this.topicName != null || this.subscriptionName != null);
return new PubsubMessageWrapper(this);
}
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
index 7849bdb74..5824ba41b 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
@@ -47,9 +47,12 @@
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
+import com.google.pubsub.v1.SubscriptionName;
import com.google.rpc.ErrorInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -118,6 +121,10 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
*/
private final String clientId = UUID.randomUUID().toString();
+ private final String subscriptionName;
+ private final boolean enableOpenTelemetryTracing;
+ private final Tracer tracer;
+
private StreamingSubscriberConnection(Builder builder) {
subscription = builder.subscription;
systemExecutor = builder.systemExecutor;
@@ -151,6 +158,10 @@ private StreamingSubscriberConnection(Builder builder) {
messageDispatcherBuilder = MessageDispatcher.newBuilder(builder.receiverWithAckResponse);
}
+ subscriptionName = builder.subscriptionName;
+ enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
+ tracer = builder.tracer;
+
messageDispatcher =
messageDispatcherBuilder
.setAckProcessor(this)
@@ -165,6 +176,9 @@ private StreamingSubscriberConnection(Builder builder) {
.setExecutor(builder.executor)
.setSystemExecutor(builder.systemExecutor)
.setApiClock(builder.clock)
+ .setSubscriptionName(subscriptionName)
+ .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
+ .setTracer(tracer)
.build();
flowControlSettings = builder.flowControlSettings;
@@ -432,15 +446,27 @@ private void sendAckOperations(
for (List ackRequestDataInRequestList :
Lists.partition(ackRequestDataList, MAX_PER_REQUEST_CHANGES)) {
List ackIdsInRequest = new ArrayList<>();
+ List messagesInRequest = new ArrayList<>();
for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
ackIdsInRequest.add(ackRequestData.getAckId());
+ messagesInRequest.add(ackRequestData.getMessageWrapper());
if (ackRequestData.hasMessageFuture()) {
// Add to our pending requests if we care about the response
pendingRequests.add(ackRequestData);
}
}
+ // Creates an Ack span to be passed to the callback
+ Span rpcSpan =
+ OpenTelemetryUtil.startSubscribeRpcSpan(
+ tracer,
+ SubscriptionName.parse(subscriptionName),
+ "ack",
+ messagesInRequest,
+ 0,
+ false,
+ enableOpenTelemetryTracing);
ApiFutureCallback callback =
- getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis);
+ getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan);
ApiFuture ackFuture =
subscriberStub
.acknowledgeCallable()
@@ -463,19 +489,34 @@ private void sendModackOperations(
for (List ackRequestDataInRequestList :
Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) {
List ackIdsInRequest = new ArrayList<>();
+ List messagesInRequest = new ArrayList<>();
for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
ackIdsInRequest.add(ackRequestData.getAckId());
+ messagesInRequest.add(ackRequestData.getMessageWrapper());
if (ackRequestData.hasMessageFuture()) {
// Add to our pending requests if we care about the response
pendingRequests.add(ackRequestData);
}
}
+ int deadlineExtensionSeconds = modackRequestData.getDeadlineExtensionSeconds();
+ String rpcOperation = deadlineExtensionSeconds == 0 ? "nack" : "modack";
+ // Creates either a ModAck span or a Nack span depending on the given ack deadline
+ Span rpcSpan =
+ OpenTelemetryUtil.startSubscribeRpcSpan(
+ tracer,
+ SubscriptionName.parse(subscriptionName),
+ rpcOperation,
+ messagesInRequest,
+ deadlineExtensionSeconds,
+ modackRequestData.getIsReceiptModack(),
+ enableOpenTelemetryTracing);
ApiFutureCallback callback =
getCallback(
modackRequestData.getAckRequestData(),
- modackRequestData.getDeadlineExtensionSeconds(),
+ deadlineExtensionSeconds,
true,
- currentBackoffMillis);
+ currentBackoffMillis,
+ rpcSpan);
ApiFuture modackFuture =
subscriberStub
.modifyAckDeadlineCallable()
@@ -517,7 +558,8 @@ private ApiFutureCallback getCallback(
List ackRequestDataList,
int deadlineExtensionSeconds,
boolean isModack,
- long currentBackoffMillis) {
+ long currentBackoffMillis,
+ Span rpcSpan) {
// This callback handles retries, and sets message futures
// Check if ack or nack
@@ -533,7 +575,16 @@ public void onSuccess(Empty empty) {
messageDispatcher.notifyAckSuccess(ackRequestData);
// Remove from our pending operations
pendingRequests.remove(ackRequestData);
+ if (!isModack) {
+ ackRequestData.getMessageWrapper().endSubscriberSpan();
+ ackRequestData.getMessageWrapper().addAckEndEvent();
+ } else if (deadlineExtensionSeconds == 0) {
+ ackRequestData.getMessageWrapper().addNackEndEvent();
+ } else {
+ ackRequestData.getMessageWrapper().addModAckEndEvent();
+ }
}
+ OpenTelemetryUtil.endSubscribeRpcSpan(rpcSpan, enableOpenTelemetryTracing);
}
@Override
@@ -544,10 +595,17 @@ public void onFailure(Throwable t) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
+ OpenTelemetryUtil.endSubscribeRpcSpan(rpcSpan, enableOpenTelemetryTracing);
+
if (!getExactlyOnceDeliveryEnabled()) {
+ if (enableOpenTelemetryTracing) {
+ for (AckRequestData ackRequestData : ackRequestDataList) {
+ ackRequestData.getMessageWrapper().endSubscriberSpan();
+ ackRequestData.getMessageWrapper().addEndRpcEvent(isModack, deadlineExtensionSeconds);
+ }
+ }
return;
}
-
List ackRequestDataArrayRetryList = new ArrayList<>();
try {
Map metadataMap = getMetadataMapFromThrowable(t);
@@ -569,14 +627,30 @@ public void onFailure(Throwable t) {
errorMessage);
ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess);
messageDispatcher.notifyAckFailed(ackRequestData);
+ ackRequestData
+ .getMessageWrapper()
+ .setSubscriberSpanException(t, "Invalid ack ID");
+ ackRequestData
+ .getMessageWrapper()
+ .addEndRpcEvent(isModack, deadlineExtensionSeconds);
} else {
logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage);
ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
messageDispatcher.notifyAckFailed(ackRequestData);
+ ackRequestData
+ .getMessageWrapper()
+ .setSubscriberSpanException(t, "Unknown error message");
+ ackRequestData
+ .getMessageWrapper()
+ .addEndRpcEvent(isModack, deadlineExtensionSeconds);
}
} else {
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
messageDispatcher.notifyAckSuccess(ackRequestData);
+ ackRequestData.getMessageWrapper().endSubscriberSpan();
+ ackRequestData
+ .getMessageWrapper()
+ .addEndRpcEvent(isModack, deadlineExtensionSeconds);
}
// Remove from our pending
pendingRequests.remove(ackRequestData);
@@ -637,6 +711,10 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;
+ private String subscriptionName;
+ private boolean enableOpenTelemetryTracing;
+ private Tracer tracer;
+
protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
}
@@ -727,6 +805,21 @@ public Builder setClock(ApiClock clock) {
return this;
}
+ public Builder setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ return this;
+ }
+
+ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
+ this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
+ return this;
+ }
+
+ public Builder setTracer(Tracer tracer) {
+ this.tracer = tracer;
+ return this;
+ }
+
public StreamingSubscriberConnection build() {
return new StreamingSubscriberConnection(this);
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
index 1723c72b1..14b6731f1 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
@@ -43,6 +43,8 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -117,6 +119,8 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
+ private static final String OPEN_TELEMETRY_TRACER_NAME = "com.google.cloud.pubsub.v1";
+
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
@@ -145,6 +149,10 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private final ApiClock clock;
private final List backgroundResources = new ArrayList<>();
+ private final boolean enableOpenTelemetryTracing;
+ private final OpenTelemetry openTelemetry;
+ private Tracer tracer = null;
+
private Subscriber(Builder builder) {
receiver = builder.receiver;
receiverWithAckResponse = builder.receiverWithAckResponse;
@@ -199,6 +207,12 @@ private Subscriber(Builder builder) {
throw new IllegalStateException(e);
}
+ this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
+ this.openTelemetry = builder.openTelemetry;
+ if (this.openTelemetry != null) {
+ this.tracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME);
+ }
+
streamingSubscriberConnections = new ArrayList(numPullers);
// We regularly look up the distribution for a good subscription deadline.
@@ -386,6 +400,9 @@ private void startStreamingConnections() {
.setExecutor(executor)
.setSystemExecutor(alarmsExecutor)
.setClock(clock)
+ .setSubscriptionName(subscriptionName)
+ .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
+ .setTracer(tracer)
.build();
streamingSubscriberConnections.add(streamingSubscriberConnection);
@@ -495,6 +512,9 @@ public static final class Builder {
private String endpoint = null;
private String universeDomain = null;
+ private boolean enableOpenTelemetryTracing = false;
+ private OpenTelemetry openTelemetry = null;
+
Builder(String subscription, MessageReceiver receiver) {
this.subscription = subscription;
this.receiver = receiver;
@@ -684,6 +704,18 @@ Builder setClock(ApiClock clock) {
return this;
}
+ /** Gives the ability to enable Open Telemetry Tracing */
+ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
+ this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
+ return this;
+ }
+
+ /** Sets the instance of OpenTelemetry for the Publisher class. */
+ public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
+ this.openTelemetry = openTelemetry;
+ return this;
+ }
+
/** Returns the default FlowControlSettings used by the client if settings are not provided. */
public static FlowControlSettings getDefaultFlowControlSettings() {
return DEFAULT_FLOW_CONTROL_SETTINGS;
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java
index a0e06303c..9c414d6a9 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java
@@ -67,8 +67,7 @@ public void testPublishSpansSuccess() {
openTelemetryTesting.clearSpans();
PubsubMessageWrapper messageWrapper =
- PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true)
- .build();
+ PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME, true).build();
List messageWrappers = Arrays.asList(messageWrapper);
long messageSize = messageWrapper.getPubsubMessage().getSerializedSize();
@@ -83,7 +82,7 @@ public void testPublishSpansSuccess() {
Span publishRpcSpan =
OpenTelemetryUtil.startPublishRpcSpan(tracer, FULL_TOPIC_NAME, messageWrappers, true);
OpenTelemetryUtil.endPublishRpcSpan(publishRpcSpan, true);
- messageWrapper.setMessageIdSpanAttribute(MESSAGE_ID);
+ messageWrapper.setPublisherMessageIdSpanAttribute(MESSAGE_ID);
messageWrapper.endPublisherSpan();
List allSpans = openTelemetryTesting.getSpans();
@@ -182,8 +181,7 @@ public void testPublishFlowControlSpanFailure() {
openTelemetryTesting.clearSpans();
PubsubMessageWrapper messageWrapper =
- PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true)
- .build();
+ PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME, true).build();
Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
@@ -222,8 +220,7 @@ public void testPublishBatchingSpanFailure() {
openTelemetryTesting.clearSpans();
PubsubMessageWrapper messageWrapper =
- PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true)
- .build();
+ PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME, true).build();
Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
@@ -261,8 +258,7 @@ public void testPublishRpcSpanFailure() {
openTelemetryTesting.clearSpans();
PubsubMessageWrapper messageWrapper =
- PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true)
- .build();
+ PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME, true).build();
List messageWrappers = Arrays.asList(messageWrapper);
Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");