From d257b6bb59ecbc607b1333580d615cff79898c85 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Thu, 12 Sep 2024 06:55:50 +0000 Subject: [PATCH] feat: Refactor OpenTelemetry implementation to use a context aware wrapper for the tracer and a PubsubTracer interface --- .../cloud/pubsub/v1/AckRequestData.java | 2 +- .../cloud/pubsub/v1/MessageDispatcher.java | 33 +- .../pubsub/v1/OpenTelemetryPubsubTracer.java | 391 ++++++++++++++++++ .../cloud/pubsub/v1/OpenTelemetryUtil.java | 164 -------- .../com/google/cloud/pubsub/v1/Publisher.java | 39 +- .../cloud/pubsub/v1/PubsubMessageWrapper.java | 368 ++++++----------- .../google/cloud/pubsub/v1/PubsubTracer.java | 137 ++++++ .../v1/StreamingSubscriberConnection.java | 56 ++- .../google/cloud/pubsub/v1/Subscriber.java | 9 +- .../cloud/pubsub/v1/OpenTelemetryTest.java | 232 ++++------- 10 files changed, 799 insertions(+), 632 deletions(-) create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java index 64bce6cc2..5cab83f49 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java @@ -44,7 +44,7 @@ public SettableApiFuture getMessageFutureIfExists() { */ public PubsubMessageWrapper getMessageWrapper() { if (this.messageWrapper == null) { - return PubsubMessageWrapper.newBuilder(null, null, false).build(); + return PubsubMessageWrapper.newBuilder(null, null).build(); } return messageWrapper; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 26d1f253d..3ebb99733 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -28,7 +28,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; -import io.opentelemetry.api.trace.Tracer; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -107,7 +106,7 @@ class MessageDispatcher { private final String subscriptionName; private final boolean enableOpenTelemetryTracing; - private final Tracer tracer; + private final PubsubTracer tracer; /** Internal representation of a reply to a Pubsub message, to be sent back to the service. */ public enum AckReply { @@ -162,7 +161,7 @@ public void onFailure(Throwable t) { t); this.ackRequestData.setResponse(AckResponse.OTHER, false); pendingNacks.add(this.ackRequestData); - this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack"); + tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack"); forget(); } @@ -175,11 +174,11 @@ public void onSuccess(AckReply reply) { ackLatencyDistribution.record( Ints.saturatedCast( (long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D))); - this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("ack"); + tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack"); break; case NACK: pendingNacks.add(this.ackRequestData); - this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack"); + tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack"); break; default: throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply)); @@ -409,11 +408,10 @@ void processReceivedMessages(List messages) { message.getMessage(), subscriptionName, message.getAckId(), - message.getDeliveryAttempt(), - enableOpenTelemetryTracing) + message.getDeliveryAttempt()) .build(); builder.setMessageWrapper(messageWrapper); - messageWrapper.startSubscriberSpan(tracer, this.exactlyOnceDeliveryEnabled.get()); + tracer.startSubscriberSpan(messageWrapper, this.exactlyOnceDeliveryEnabled.get()); AckRequestData ackRequestData = builder.build(); AckHandler ackHandler = @@ -482,13 +480,14 @@ private void processBatch(List batch) { for (OutstandingMessage message : batch) { // This is a blocking flow controller. We have already incremented messagesWaiter, so // shutdown will block on processing of all these messages anyway. - message.messageWrapper().startSubscribeConcurrencyControlSpan(tracer); + tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper()); try { flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize()); - message.messageWrapper().endSubscribeConcurrencyControlSpan(); + tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper()); } catch (FlowControlException unexpectedException) { // This should be a blocking flow controller and never throw an exception. - message.messageWrapper().setSubscribeConcurrencyControlSpanException(unexpectedException); + tracer.setSubscribeConcurrencyControlSpanException( + message.messageWrapper(), unexpectedException); throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } addDeliveryInfoCount(message.messageWrapper()); @@ -533,10 +532,10 @@ public void run() { // so it was probably sent to someone else. Don't work on it. // Don't nack it either, because we'd be nacking someone else's message. ackHandler.forget(); - messageWrapper.setSubscriberSpanExpirationResult(); + tracer.setSubscriberSpanExpirationResult(messageWrapper); return; } - messageWrapper.startSubscribeProcessSpan(tracer); + tracer.startSubscribeProcessSpan(messageWrapper); if (shouldSetMessageFuture()) { // This is the message future that is propagated to the user SettableApiFuture messageFuture = @@ -557,9 +556,9 @@ public void run() { if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) { executor.execute(deliverMessageTask); } else { - messageWrapper.startSubscribeSchedulerSpan(tracer); + tracer.startSubscribeSchedulerSpan(messageWrapper); sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask); - messageWrapper.endSubscribeSchedulerSpan(); + tracer.endSubscribeSchedulerSpan(messageWrapper); } } @@ -687,7 +686,7 @@ public static final class Builder { private String subscriptionName; private boolean enableOpenTelemetryTracing; - private Tracer tracer; + private PubsubTracer tracer; protected Builder(MessageReceiver receiver) { this.receiver = receiver; @@ -769,7 +768,7 @@ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) return this; } - public Builder setTracer(Tracer tracer) { + public Builder setTracer(PubsubTracer tracer) { this.tracer = tracer; return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java new file mode 100644 index 000000000..e8ca33a6f --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -0,0 +1,391 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.api.core.InternalApi; +import com.google.common.base.Preconditions; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.List; + +@InternalApi("For use by the google-cloud-pubsub library only") +public class OpenTelemetryPubsubTracer implements PubsubTracer { + private final Tracer tracer; + + private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; + private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; + private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME = + "subscriber concurrency control"; + private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; + + private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; + private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; + private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; + private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = + "messaging.gcp_pubsub.message.exactly_once_delivery"; + private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = + "messaging.gcp_pubsub.message.delivery_attempt"; + private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; + private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; + private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; + + private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; + + OpenTelemetryPubsubTracer(Tracer tracer) { + this.tracer = Preconditions.checkNotNull(tracer, "OpenTelemetry tracer cannot be null"); + } + + private Span startChildSpan(String name, Span parent) { + return tracer.spanBuilder(name).setParent(Context.current().with(parent)).startSpan(); + } + + /** + * Creates and starts the parent span with the appropriate span attributes and injects the span + * context into the {@link PubsubMessage} attributes. + */ + @Override + public void startPublisherSpan(PubsubMessageWrapper message) { + AttributesBuilder attributesBuilder = + OpenTelemetryUtil.createCommonSpanAttributesBuilder( + message.getTopicName(), message.getTopicProject(), "publish", "create"); + + attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()); + if (!message.getOrderingKey().isEmpty()) { + attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); + } + + Span publisherSpan = + tracer + .spanBuilder(message.getTopicName() + " create") + .setSpanKind(SpanKind.PRODUCER) + .setAllAttributes(attributesBuilder.build()) + .startSpan(); + + message.setPublisherSpan(publisherSpan); + if (publisherSpan.getSpanContext().isValid()) { + message.injectSpanContext(); + } + } + + public void endPublisherSpan(PubsubMessageWrapper message) { + message.endPublisherSpan(); + } + + public void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) { + message.setPublisherMessageIdSpanAttribute(messageId); + } + + /** Creates a span for publish-side flow control as a child of the parent publisher span. */ + @Override + public void startPublishFlowControlSpan(PubsubMessageWrapper message) { + Span publisherSpan = message.getPublisherSpan(); + if (publisherSpan != null) + message.setPublishFlowControlSpan( + startChildSpan(PUBLISH_FLOW_CONTROL_SPAN_NAME, publisherSpan)); + } + + @Override + public void endPublishFlowControlSpan(PubsubMessageWrapper message) { + message.endPublishFlowControlSpan(); + } + + @Override + public void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) { + message.setPublishFlowControlSpanException(t); + } + + /** Creates a span for publish message batching as a child of the parent publisher span. */ + @Override + public void startPublishBatchingSpan(PubsubMessageWrapper message) { + Span publisherSpan = message.getPublisherSpan(); + if (publisherSpan != null) { + message.setPublishBatchingSpan(startChildSpan(PUBLISH_BATCHING_SPAN_NAME, publisherSpan)); + } + } + + @Override + public void endPublishBatchingSpan(PubsubMessageWrapper message) { + message.endPublishBatchingSpan(); + } + + /** + * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional + * links with the publisher parent span are created for sampled messages in the batch. + */ + @Override + public Span startPublishRpcSpan(String topic, List messages) { + TopicName topicName = TopicName.parse(topic); + Attributes attributes = + OpenTelemetryUtil.createCommonSpanAttributesBuilder( + topicName.getTopic(), topicName.getProject(), "publishCall", "publish") + .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()) + .build(); + SpanBuilder publishRpcSpanBuilder = + tracer + .spanBuilder(topicName.getTopic() + PUBLISH_RPC_SPAN_SUFFIX) + .setSpanKind(SpanKind.CLIENT) + .setAllAttributes(attributes); + Attributes linkAttributes = + Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build(); + for (PubsubMessageWrapper message : messages) { + if (message.getPublisherSpan().getSpanContext().isSampled()) + publishRpcSpanBuilder.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes); + } + Span publishRpcSpan = publishRpcSpanBuilder.startSpan(); + + for (PubsubMessageWrapper message : messages) { + if (message.getPublisherSpan().getSpanContext().isSampled()) { + message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes); + message.addPublishStartEvent(); + } + } + return publishRpcSpan; + } + + /** Ends the given publish RPC span if it exists. */ + @Override + public void endPublishRpcSpan(Span publishRpcSpan) { + if (publishRpcSpan != null) { + publishRpcSpan.end(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown when publishing the + * message batch. + */ + @Override + public void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) { + if (publishRpcSpan != null) { + publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC."); + publishRpcSpan.recordException(t); + publishRpcSpan.end(); + } + } + + @Override + public void startSubscriberSpan( + PubsubMessageWrapper message, boolean exactlyOnceDeliveryEnabled) { + AttributesBuilder attributesBuilder = + OpenTelemetryUtil.createCommonSpanAttributesBuilder( + message.getSubscriptionName(), message.getSubscriptionProject(), "onResponse", null); + + attributesBuilder + .put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId()) + .put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()) + .put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId()) + .put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled); + if (!message.getOrderingKey().isEmpty()) { + attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); + } + if (message.getDeliveryAttempt() > 0) { + attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt()); + } + Attributes attributes = attributesBuilder.build(); + Context publisherSpanContext = message.extractSpanContext(attributes); + message.setPublisherSpan(Span.fromContextOrNull(publisherSpanContext)); + message.setSubscriberSpan( + tracer + .spanBuilder(message.getSubscriptionName() + " subscribe") + .setSpanKind(SpanKind.CONSUMER) + .setParent(publisherSpanContext) + .setAllAttributes(attributes) + .startSpan()); + } + + @Override + public void endSubscriberSpan(PubsubMessageWrapper message) { + message.endSubscriberSpan(); + } + + @Override + public void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) { + message.setSubscriberSpanExpirationResult(); + } + + @Override + public void setSubscriberSpanException( + PubsubMessageWrapper message, Throwable t, String exception) { + message.setSubscriberSpanException(t, exception); + } + + /** Creates a span for subscribe concurrency control as a child of the parent subscriber span. */ + @Override + public void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + Span subscriberSpan = message.getSubscriberSpan(); + if (subscriberSpan != null) { + message.setSubscribeConcurrencyControlSpan( + startChildSpan(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME, subscriberSpan)); + } + } + + @Override + public void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + message.endSubscribeConcurrencyControlSpan(); + } + + @Override + public void setSubscribeConcurrencyControlSpanException( + PubsubMessageWrapper message, Throwable t) { + message.setSubscribeConcurrencyControlSpanException(t); + } + + /** + * Creates a span for subscribe ordering key scheduling as a child of the parent subscriber span. + */ + @Override + public void startSubscribeSchedulerSpan(PubsubMessageWrapper message) { + Span subscriberSpan = message.getSubscriberSpan(); + if (subscriberSpan != null) { + message.setSubscribeSchedulerSpan( + startChildSpan(SUBSCRIBE_SCHEDULER_SPAN_NAME, subscriberSpan)); + } + } + + @Override + public void endSubscribeSchedulerSpan(PubsubMessageWrapper message) { + message.endSubscribeSchedulerSpan(); + } + + /** Creates a span for subscribe message processing as a child of the parent subscriber span. */ + @Override + public void startSubscribeProcessSpan(PubsubMessageWrapper message) { + Span subscriberSpan = message.getSubscriberSpan(); + if (subscriberSpan != null) { + Span subscribeProcessSpan = + startChildSpan(message.getSubscriptionName() + " process", subscriberSpan); + subscribeProcessSpan.setAttribute( + SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE); + Span publisherSpan = message.getPublisherSpan(); + if (publisherSpan != null) { + subscribeProcessSpan.addLink(publisherSpan.getSpanContext()); + } + message.setSubscribeProcessSpan(subscribeProcessSpan); + } + } + + @Override + public void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { + message.endSubscribeProcessSpan(action); + } + + /** + * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links + * to parent subscribe span for sampled messages are added. + */ + @Override + public Span startSubscribeRpcSpan( + String subscription, + String rpcOperation, + List messages, + int ackDeadline, + boolean isReceiptModack) { + String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations"; + SubscriptionName subscriptionName = SubscriptionName.parse(subscription); + AttributesBuilder attributesBuilder = + OpenTelemetryUtil.createCommonSpanAttributesBuilder( + subscriptionName.getSubscription(), + subscriptionName.getProject(), + codeFunction, + rpcOperation) + .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()); + + // Ack deadline and receipt modack are specific to the modack operation + if (rpcOperation == "modack") { + attributesBuilder + .put(ACK_DEADLINE_ATTR_KEY, ackDeadline) + .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack); + } + + SpanBuilder rpcSpanBuilder = + tracer + .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation) + .setSpanKind(SpanKind.CLIENT) + .setAllAttributes(attributesBuilder.build()); + Attributes linkAttributes = + Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation).build(); + for (PubsubMessageWrapper message : messages) { + if (message.getSubscriberSpan().getSpanContext().isSampled()) { + rpcSpanBuilder.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes); + } + } + Span rpcSpan = rpcSpanBuilder.startSpan(); + + for (PubsubMessageWrapper message : messages) { + if (message.getSubscriberSpan().getSpanContext().isSampled()) { + message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes); + switch (rpcOperation) { + case "ack": + message.addAckStartEvent(); + break; + case "modack": + message.addModAckStartEvent(); + break; + case "nack": + message.addNackStartEvent(); + break; + } + } + } + return rpcSpan; + } + + /** Ends the given subscribe RPC span if it exists. */ + @Override + public void endSubscribeRpcSpan(Span rpcSpan) { + if (rpcSpan != null) { + rpcSpan.end(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown when handling a + * subscribe-side RPC. + */ + @Override + public void setSubscribeRpcSpanException( + Span rpcSpan, boolean isModack, int ackDeadline, Throwable t) { + if (rpcSpan != null) { + String operation = !isModack ? "ack" : (ackDeadline == 0 ? "nack" : "modack"); + rpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on " + operation + " RPC."); + rpcSpan.recordException(t); + rpcSpan.end(); + } + } + + /** Adds the appropriate subscribe-side RPC end event. */ + @Override + public void addEndRpcEvent(PubsubMessageWrapper message, boolean isModack, int ackDeadline) { + if (!isModack) { + message.addAckEndEvent(); + } else if (ackDeadline == 0) { + message.addNackEndEvent(); + } else { + message.addModAckEndEvent(); + } + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java index c6760f78a..c100675b5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java @@ -16,25 +16,13 @@ package com.google.cloud.pubsub.v1; -import com.google.pubsub.v1.SubscriptionName; -import com.google.pubsub.v1.TopicName; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanBuilder; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.util.List; public class OpenTelemetryUtil { private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; private static final String PROJECT_ATTR_KEY = "gcp.project_id"; - private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; - private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; - - private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; /** Populates attributes that are common the publisher parent span and publish RPC span. */ protected static final AttributesBuilder createCommonSpanAttributesBuilder( @@ -51,156 +39,4 @@ protected static final AttributesBuilder createCommonSpanAttributesBuilder( return attributesBuilder; } - - /** - * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional - * links with the publisher parent span are created for sampled messages in the batch. - */ - protected static final Span startPublishRpcSpan( - Tracer tracer, - String topic, - List messages, - boolean enableOpenTelemetryTracing) { - if (enableOpenTelemetryTracing && tracer != null) { - TopicName topicName = TopicName.parse(topic); - Attributes attributes = - createCommonSpanAttributesBuilder( - topicName.getTopic(), topicName.getProject(), "publishCall", "publish") - .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()) - .build(); - SpanBuilder publishRpcSpanBuilder = - tracer - .spanBuilder(topicName.getTopic() + PUBLISH_RPC_SPAN_SUFFIX) - .setSpanKind(SpanKind.CLIENT) - .setAllAttributes(attributes); - Attributes linkAttributes = - Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build(); - for (PubsubMessageWrapper message : messages) { - if (message.getPublisherSpan().getSpanContext().isSampled()) - publishRpcSpanBuilder.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes); - } - Span publishRpcSpan = publishRpcSpanBuilder.startSpan(); - - for (PubsubMessageWrapper message : messages) { - if (message.getPublisherSpan().getSpanContext().isSampled()) { - message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes); - message.addPublishStartEvent(); - } - } - return publishRpcSpan; - } - return null; - } - - /** Ends the given publish RPC span if it exists. */ - protected static final void endPublishRpcSpan( - Span publishRpcSpan, boolean enableOpenTelemetryTracing) { - if (enableOpenTelemetryTracing && publishRpcSpan != null) { - publishRpcSpan.end(); - } - } - - /** - * Sets an error status and records an exception when an exception is thrown when publishing the - * message batch. - */ - protected static final void setPublishRpcSpanException( - Span publishRpcSpan, Throwable t, boolean enableOpenTelemetryTracing) { - if (enableOpenTelemetryTracing && publishRpcSpan != null) { - publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC."); - publishRpcSpan.recordException(t); - publishRpcSpan.end(); - } - } - - /** - * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links - * to parent subscribe span for sampled messages are added. - */ - protected static final Span startSubscribeRpcSpan( - Tracer tracer, - String subscription, - String rpcOperation, - List messages, - int ackDeadline, - boolean isReceiptModack, - boolean enableOpenTelemetryTracing) { - if (enableOpenTelemetryTracing && tracer != null) { - String codeFunction = - rpcOperation == "ack" - ? "sendAckOperations" - : "sendModAckOperations"; - SubscriptionName subscriptionName = SubscriptionName.parse(subscription); - AttributesBuilder attributesBuilder = - createCommonSpanAttributesBuilder( - subscriptionName.getSubscription(), - subscriptionName.getProject(), - codeFunction, - rpcOperation) - .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()); - - // Ack deadline and receipt modack are specific to the modack operation - if (rpcOperation == "modack") { - attributesBuilder - .put(ACK_DEADLINE_ATTR_KEY, ackDeadline) - .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack); - } - - SpanBuilder rpcSpanBuilder = - tracer - .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation) - .setSpanKind(SpanKind.CLIENT) - .setAllAttributes(attributesBuilder.build()); - Attributes linkAttributes = - Attributes.builder() - .put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation) - .build(); - for (PubsubMessageWrapper message : messages) { - if (message.getSubscriberSpan().getSpanContext().isSampled()) { - rpcSpanBuilder.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes); - } - } - Span rpcSpan = rpcSpanBuilder.startSpan(); - - for (PubsubMessageWrapper message : messages) { - if (message.getSubscriberSpan().getSpanContext().isSampled()) { - message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes); - switch (rpcOperation) { - case "ack": - message.addAckStartEvent(); - break; - case "modack": - message.addModAckStartEvent(); - break; - case "nack": - message.addNackStartEvent(); - break; - } - } - } - return rpcSpan; - } - return null; - } - - /** Ends the given subscribe RPC span if it exists. */ - protected static final void endSubscribeRpcSpan(Span rpcSpan, boolean enableOpenTelemetryTracing) { - if (enableOpenTelemetryTracing && rpcSpan != null) { - rpcSpan.end(); - } - } - - protected static final void setSubscribeRpcSpanException( - Span rpcSpan, - boolean isModack, - int ackDeadline, - Throwable t, - boolean enableOpenTelemetryTracing) { - if (enableOpenTelemetryTracing && rpcSpan != null) { - String operation = !isModack ? "ack" : (ackDeadline == 0 ? "nack" : "modack"); - rpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on " + operation + " RPC."); - rpcSpan.recordException(t); - rpcSpan.end(); - } - } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 451bf6e89..0aed2428d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -131,7 +131,7 @@ public class Publisher implements PublisherInterface { private final boolean enableOpenTelemetryTracing; private final OpenTelemetry openTelemetry; - private Tracer tracer = null; + private PubsubTracer tracer = null; /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { @@ -163,8 +163,11 @@ private Publisher(Builder builder) throws IOException { this.compressionBytesThreshold = builder.compressionBytesThreshold; this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; this.openTelemetry = builder.openTelemetry; - if (this.openTelemetry != null) { - this.tracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); + if (this.openTelemetry != null && this.enableOpenTelemetryTracing) { + Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); + if (openTelemetryTracer != null) { + this.tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + } } messagesBatches = new HashMap<>(); @@ -274,24 +277,22 @@ public ApiFuture publish(PubsubMessage message) { + "setEnableMessageOrdering(true) in the builder."); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder( - messageTransform.apply(message), topicName, enableOpenTelemetryTracing) - .build(); - messageWrapper.startPublisherSpan(tracer); + PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicName).build(); + tracer.startPublisherSpan(messageWrapper); final OutstandingPublish outstandingPublish = new OutstandingPublish(messageWrapper); if (flowController != null) { - outstandingPublish.messageWrapper.startPublishFlowControlSpan(tracer); + tracer.startPublishFlowControlSpan(messageWrapper); try { flowController.acquire(outstandingPublish.messageSize); - outstandingPublish.messageWrapper.endPublishFlowControlSpan(); + tracer.endPublishFlowControlSpan(messageWrapper); } catch (FlowController.FlowControlException e) { if (!orderingKey.isEmpty()) { sequentialExecutor.stopPublish(orderingKey); } outstandingPublish.publishResult.setException(e); - outstandingPublish.messageWrapper.setPublishFlowControlSpanException(e); + tracer.setPublishFlowControlSpanException(messageWrapper, e); return outstandingPublish.publishResult; } } @@ -299,7 +300,7 @@ public ApiFuture publish(PubsubMessage message) { List batchesToSend; messagesBatchLock.lock(); try { - outstandingPublish.messageWrapper.startPublishBatchingSpan(tracer); + tracer.startPublishBatchingSpan(messageWrapper); if (!orderingKey.isEmpty() && sequentialExecutor.keyHasError(orderingKey)) { outstandingPublish.publishResult.setException( SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); @@ -480,13 +481,11 @@ private ApiFuture publishCall(OutstandingBatch outstandingBatch List pubsubMessagesList = new ArrayList(numMessagesInBatch); List messageWrappers = outstandingBatch.getMessageWrappers(); for (PubsubMessageWrapper messageWrapper : messageWrappers) { - messageWrapper.endPublishBatchingSpan(); + tracer.endPublishBatchingSpan(messageWrapper); pubsubMessagesList.add(messageWrapper.getPubsubMessage()); } - outstandingBatch.publishRpcSpan = - OpenTelemetryUtil.startPublishRpcSpan( - tracer, topicName, messageWrappers, enableOpenTelemetryTracing); + outstandingBatch.publishRpcSpan = tracer.startPublishRpcSpan(topicName, messageWrappers); return publisherStub .publishCallable() @@ -601,19 +600,19 @@ private List getMessageWrappers() { } private void onFailure(Throwable t) { - OpenTelemetryUtil.setPublishRpcSpanException(publishRpcSpan, t, enableOpenTelemetryTracing); + tracer.setPublishRpcSpanException(publishRpcSpan, t); for (OutstandingPublish outstandingPublish : outstandingPublishes) { if (flowController != null) { flowController.release(outstandingPublish.messageSize); } outstandingPublish.publishResult.setException(t); - outstandingPublish.messageWrapper.endPublisherSpan(); + tracer.endPublisherSpan(outstandingPublish.messageWrapper); } } private void onSuccess(Iterable results) { - OpenTelemetryUtil.endPublishRpcSpan(publishRpcSpan, enableOpenTelemetryTracing); + tracer.endPublishRpcSpan(publishRpcSpan); Iterator messagesResultsIt = outstandingPublishes.iterator(); for (String messageId : results) { @@ -622,8 +621,8 @@ private void onSuccess(Iterable results) { flowController.release(nextPublish.messageSize); } nextPublish.publishResult.set(messageId); - nextPublish.messageWrapper.setPublisherMessageIdSpanAttribute(messageId); - nextPublish.messageWrapper.endPublisherSpan(); + tracer.setPublisherMessageIdSpanAttribute(nextPublish.messageWrapper, messageId); + tracer.endPublisherSpan(nextPublish.messageWrapper); } } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java index 4fa899ad3..eefaef724 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -16,16 +16,14 @@ package com.google.cloud.pubsub.v1; +import com.google.api.core.InternalApi; import com.google.common.base.Preconditions; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.SubscriptionName; import com.google.pubsub.v1.TopicName; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.TextMapGetter; @@ -38,7 +36,6 @@ */ public class PubsubMessageWrapper { private PubsubMessage message; - private final boolean enableOpenTelemetryTracing; private final TopicName topicName; private final SubscriptionName subscriptionName; @@ -47,17 +44,9 @@ public class PubsubMessageWrapper { private final String ackId; private final int deliveryAttempt; - private String PUBLISHER_SPAN_NAME; - private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; - private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; private static final String PUBLISH_START_EVENT = "publish start"; private static final String PUBLISH_END_EVENT = "publish end"; - private String SUBSCRIBER_SPAN_NAME; - private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME = - "subscriber concurrency control"; - private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; - private String SUBSCRIBE_PROCESS_SPAN_NAME; private static final String MODACK_START_EVENT = "modack start"; private static final String MODACK_END_EVENT = "modack end"; private static final String NACK_START_EVENT = "nack start"; @@ -67,15 +56,7 @@ public class PubsubMessageWrapper { private static final String GOOGCLIENT_PREFIX = "googclient_"; - private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; - private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; - private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; - private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; - private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = - "messaging.gcp_pubsub.message.exactly_once_delivery"; private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result"; - private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = - "messaging.gcp_pubsub.message.delivery_attempt"; private Span publisherSpan; private Span publishFlowControlSpan; @@ -86,117 +67,123 @@ public class PubsubMessageWrapper { private Span subscribeSchedulerSpan; private Span subscribeProcessSpan; + @InternalApi("For use by the google-cloud-pubsub library only") public PubsubMessageWrapper(Builder builder) { this.message = builder.message; this.topicName = builder.topicName; this.subscriptionName = builder.subscriptionName; this.ackId = builder.ackId; this.deliveryAttempt = builder.deliveryAttempt; - this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; - if (this.topicName != null) { - this.PUBLISHER_SPAN_NAME = builder.topicName.getTopic() + " create"; - } - if (this.subscriptionName != null) { - this.SUBSCRIBER_SPAN_NAME = builder.subscriptionName.getSubscription() + " subscribe"; - this.SUBSCRIBE_PROCESS_SPAN_NAME = builder.subscriptionName.getSubscription() + " process"; - } } - public static Builder newBuilder( - PubsubMessage message, String topicName, boolean enableOpenTelemetryTracing) { - return new Builder(message, topicName, enableOpenTelemetryTracing); + public static Builder newBuilder(PubsubMessage message, String topicName) { + return new Builder(message, topicName); } public static Builder newBuilder( - PubsubMessage message, - String subscriptionName, - String ackId, - int deliveryAttempt, - boolean enableOpenTelemetryTracing) { - return new Builder( - message, subscriptionName, ackId, deliveryAttempt, enableOpenTelemetryTracing); + PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { + return new Builder(message, subscriptionName, ackId, deliveryAttempt); } /** Returns the PubsubMessage associated with this wrapper. */ - public PubsubMessage getPubsubMessage() { + protected PubsubMessage getPubsubMessage() { return message; } - /** Returns the parent publisher span for this message wrapper. */ - public Span getPublisherSpan() { - return publisherSpan; + protected void setPubsubMessage(PubsubMessage message) { + this.message = message; } - /** Returns the parent subscriber span for this message wrapper. */ - public Span getSubscriberSpan() { - return subscriberSpan; + /** Returns the TopicName for this wrapper as a string. */ + protected String getTopicName() { + if (topicName != null) { + return topicName.getTopic(); + } + return ""; + } + + protected String getTopicProject() { + if (topicName != null) { + return topicName.getProject(); + } + return ""; + } + + /** Returns the SubscriptionName for this wrapper as a string. */ + protected String getSubscriptionName() { + if (subscriptionName != null) { + return subscriptionName.getSubscription(); + } + return ""; + } + + protected String getSubscriptionProject() { + if (subscriptionName != null) { + return subscriptionName.getProject(); + } + return ""; + } + + protected String getMessageId() { + return message.getMessageId(); + } + + protected String getAckId() { + return ackId; } - /** Returns the delivery attempt for the received PubsubMessage. */ - public int getDeliveryAttempt() { + protected int getDataSize() { + return message.getData().size(); + } + + protected String getOrderingKey() { + return message.getOrderingKey(); + } + + protected int getDeliveryAttempt() { return deliveryAttempt; } - /** Sets the PubsubMessage for this wrapper. */ - public void setPubsubMessage(PubsubMessage message) { - this.message = message; + protected Span getPublisherSpan() { + return publisherSpan; } - /** - * Creates and starts the parent span with the appropriate span attributes and injects the span - * context into the {@link PubsubMessage} attributes. - */ - public void startPublisherSpan(Tracer tracer) { - if (enableOpenTelemetryTracing && tracer != null) { - AttributesBuilder attributesBuilder = - OpenTelemetryUtil.createCommonSpanAttributesBuilder( - topicName.getTopic(), topicName.getProject(), "publish", "create"); - - attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getData().size()); - if (!message.getOrderingKey().isEmpty()) { - attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); - } + protected void setPublisherSpan(Span span) { + this.publisherSpan = span; + } - publisherSpan = - tracer - .spanBuilder(PUBLISHER_SPAN_NAME) - .setSpanKind(SpanKind.PRODUCER) - .setAllAttributes(attributesBuilder.build()) - .startSpan(); + protected void setPublishFlowControlSpan(Span span) { + this.publishFlowControlSpan = span; + } - if (publisherSpan.getSpanContext().isValid()) { - injectSpanContext(); - } - } + protected void setPublishBatchingSpan(Span span) { + this.publishBatchingSpan = span; } - /** Creates a span for publish-side flow control as a child of the parent publisher span. */ - public void startPublishFlowControlSpan(Tracer tracer) { - if (enableOpenTelemetryTracing && tracer != null) { - publishFlowControlSpan = - startChildSpan(tracer, PUBLISH_FLOW_CONTROL_SPAN_NAME, publisherSpan); - } + protected Span getSubscriberSpan() { + return subscriberSpan; } - /** Creates a span for publish message batching as a child of the parent publisher span. */ - public void startPublishBatchingSpan(Tracer tracer) { - if (enableOpenTelemetryTracing && tracer != null) { - publishBatchingSpan = startChildSpan(tracer, PUBLISH_BATCHING_SPAN_NAME, publisherSpan); - } + protected void setSubscriberSpan(Span span) { + this.subscriberSpan = span; } - /** - * Creates publish start and end events that are tied to the publish RPC span start and end times. - */ - public void addPublishStartEvent() { - if (enableOpenTelemetryTracing && publisherSpan != null) { - publisherSpan.addEvent(PUBLISH_START_EVENT); - } + protected void setSubscribeConcurrencyControlSpan(Span span) { + this.subscribeConcurrencyControlSpan = span; } - public void addPublishEndEvent() { - if (enableOpenTelemetryTracing && publisherSpan != null) { - publisherSpan.addEvent(PUBLISH_END_EVENT); + protected void setSubscribeSchedulerSpan(Span span) { + this.subscribeSchedulerSpan = span; + } + + protected void setSubscribeProcessSpan(Span span) { + this.subscribeProcessSpan = span; + } + + /** Creates a publish start event that is tied to the publish RPC span time. */ + protected void addPublishStartEvent() { + if (publisherSpan != null) { + publisherSpan.addEvent(PUBLISH_START_EVENT); } } @@ -204,30 +191,30 @@ public void addPublishEndEvent() { * Sets the message ID attribute in the publisher parent span. This is called after the publish * RPC returns with a message ID. */ - public void setPublisherMessageIdSpanAttribute(String messageId) { - if (enableOpenTelemetryTracing && publisherSpan != null) { + protected void setPublisherMessageIdSpanAttribute(String messageId) { + if (publisherSpan != null) { publisherSpan.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId); } } /** Ends the publisher parent span if it exists. */ - public void endPublisherSpan() { - if (enableOpenTelemetryTracing && publisherSpan != null) { - addPublishEndEvent(); + protected void endPublisherSpan() { + if (publisherSpan != null) { + publisherSpan.addEvent(PUBLISH_END_EVENT); publisherSpan.end(); } } /** Ends the publish flow control span if it exists. */ - public void endPublishFlowControlSpan() { - if (enableOpenTelemetryTracing && publishFlowControlSpan != null) { + protected void endPublishFlowControlSpan() { + if (publishFlowControlSpan != null) { publishFlowControlSpan.end(); } } /** Ends the publish batching span if it exists. */ - public void endPublishBatchingSpan() { - if (enableOpenTelemetryTracing && publishBatchingSpan != null) { + protected void endPublishBatchingSpan() { + if (publishBatchingSpan != null) { publishBatchingSpan.end(); } } @@ -235,8 +222,8 @@ public void endPublishBatchingSpan() { /** * Sets an error status and records an exception when an exception is thrown during flow control. */ - public void setPublishFlowControlSpanException(Throwable t) { - if (enableOpenTelemetryTracing && publishFlowControlSpan != null) { + protected void setPublishFlowControlSpanException(Throwable t) { + if (publishFlowControlSpan != null) { publishFlowControlSpan.setStatus( StatusCode.ERROR, "Exception thrown during publish flow control."); publishFlowControlSpan.recordException(t); @@ -244,143 +231,63 @@ public void setPublishFlowControlSpanException(Throwable t) { } } - /** - * Sets an error status and records an exception when an exception is thrown during message - * batching. - */ - public void setPublishBatchingSpanException(Throwable t) { - if (enableOpenTelemetryTracing && publishBatchingSpan != null) { - publishBatchingSpan.setStatus(StatusCode.ERROR, "Exception thrown during publish batching."); - publishBatchingSpan.recordException(t); - endAllPublishSpans(); - } - } - - /** - * Creates the subscriber parent span using span context propagated in the message attributes and - * sets the appropriate span attributes. - */ - public void startSubscriberSpan(Tracer tracer, boolean exactlyOnceDeliveryEnabled) { - if (enableOpenTelemetryTracing && tracer != null) { - AttributesBuilder attributesBuilder = - OpenTelemetryUtil.createCommonSpanAttributesBuilder( - subscriptionName.getSubscription(), - subscriptionName.getProject(), - "onResponse", - null); - - attributesBuilder - .put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId()) - .put(MESSAGE_SIZE_ATTR_KEY, message.getData().size()) - .put(MESSAGE_ACK_ID_ATTR_KEY, ackId) - .put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled); - if (!message.getOrderingKey().isEmpty()) { - attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); - } - if (deliveryAttempt > 0) { - attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, deliveryAttempt); - } - subscriberSpan = extractSpanContext(tracer, attributesBuilder.build()); - } - } - - /** Creates a span for subscribe concurrency control as a child of the parent subscriber span. */ - public void startSubscribeConcurrencyControlSpan(Tracer tracer) { - if (enableOpenTelemetryTracing && tracer != null) { - subscribeConcurrencyControlSpan = - startChildSpan(tracer, SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME, subscriberSpan); - } - } - - /** - * Creates a span for subscribe ordering key scheduling as a child of the parent subscriber span. - */ - public void startSubscribeSchedulerSpan(Tracer tracer) { - if (enableOpenTelemetryTracing && tracer != null) { - subscribeSchedulerSpan = - startChildSpan(tracer, SUBSCRIBE_SCHEDULER_SPAN_NAME, subscriberSpan); - } - } - - /** Creates a span for subscribe message processing as a child of the parent subscriber span. */ - public void startSubscribeProcessSpan(Tracer tracer) { - if (enableOpenTelemetryTracing && tracer != null) { - subscribeProcessSpan = startChildSpan(tracer, SUBSCRIBE_PROCESS_SPAN_NAME, subscriberSpan); - subscribeProcessSpan.setAttribute( - SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE); - if (publisherSpan != null) { - subscribeProcessSpan.addLink(publisherSpan.getSpanContext()); - } - } - } - /** * Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding * RPC span start and end times. */ - public void addModAckStartEvent() { - if (enableOpenTelemetryTracing && subscriberSpan != null) { + protected void addModAckStartEvent() { + if (subscriberSpan != null) { subscriberSpan.addEvent(MODACK_START_EVENT); } } - public void addModAckEndEvent() { - if (enableOpenTelemetryTracing && subscriberSpan != null) { + protected void addModAckEndEvent() { + if (subscriberSpan != null) { subscriberSpan.addEvent(MODACK_END_EVENT); } } - public void addNackStartEvent() { - if (enableOpenTelemetryTracing && subscriberSpan != null) { + protected void addNackStartEvent() { + if (subscriberSpan != null) { subscriberSpan.addEvent(NACK_START_EVENT); } } - public void addNackEndEvent() { - if (enableOpenTelemetryTracing && subscriberSpan != null) { + protected void addNackEndEvent() { + if (subscriberSpan != null) { subscriberSpan.addEvent(NACK_END_EVENT); } } - public void addAckStartEvent() { - if (enableOpenTelemetryTracing && subscriberSpan != null) { + protected void addAckStartEvent() { + if (subscriberSpan != null) { subscriberSpan.addEvent(ACK_START_EVENT); } } - public void addAckEndEvent() { - if (enableOpenTelemetryTracing && subscriberSpan != null) { + protected void addAckEndEvent() { + if (subscriberSpan != null) { subscriberSpan.addEvent(ACK_END_EVENT); } } - public void addEndRpcEvent(boolean isModack, int ackDeadline) { - if (!isModack) { - addAckEndEvent(); - } else if (ackDeadline == 0) { - addNackEndEvent(); - } else { - addModAckEndEvent(); - } - } - /** Ends the subscriber parent span if exists. */ - public void endSubscriberSpan() { - if (enableOpenTelemetryTracing && subscriberSpan != null) { + protected void endSubscriberSpan() { + if (subscriberSpan != null) { subscriberSpan.end(); } } /** Ends the subscribe concurreny control span if exists. */ - public void endSubscribeConcurrencyControlSpan() { - if (enableOpenTelemetryTracing && subscribeConcurrencyControlSpan != null) { + protected void endSubscribeConcurrencyControlSpan() { + if (subscribeConcurrencyControlSpan != null) { subscribeConcurrencyControlSpan.end(); } } /** Ends the subscribe scheduler span if exists. */ - public void endSubscribeSchedulerSpan() { - if (enableOpenTelemetryTracing && subscribeSchedulerSpan != null) { + protected void endSubscribeSchedulerSpan() { + if (subscribeSchedulerSpan != null) { subscribeSchedulerSpan.end(); } } @@ -389,8 +296,8 @@ public void endSubscribeSchedulerSpan() { * Ends the subscribe process span if it exists, creates an event with the appropriate result, and * sets the result on the parent subscriber span. */ - public void endSubscribeProcessSpan(String action) { - if (enableOpenTelemetryTracing && subscribeProcessSpan != null) { + protected void endSubscribeProcessSpan(String action) { + if (subscribeProcessSpan != null) { subscribeProcessSpan.addEvent(action + " called"); subscribeProcessSpan.end(); subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, action); @@ -398,8 +305,8 @@ public void endSubscribeProcessSpan(String action) { } /** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */ - public void setSubscriberSpanException(Throwable t, String exception) { - if (enableOpenTelemetryTracing && subscriberSpan != null) { + protected void setSubscriberSpanException(Throwable t, String exception) { + if (subscriberSpan != null) { subscriberSpan.setStatus(StatusCode.ERROR, exception); subscriberSpan.recordException(t); endAllSubscribeSpans(); @@ -407,8 +314,8 @@ public void setSubscriberSpanException(Throwable t, String exception) { } /** Sets result of the parent subscriber span to expired and ends its. */ - public void setSubscriberSpanExpirationResult() { - if (enableOpenTelemetryTracing && subscriberSpan != null) { + protected void setSubscriberSpanExpirationResult() { + if (subscriberSpan != null) { subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, "expired"); endSubscriberSpan(); } @@ -418,8 +325,8 @@ public void setSubscriberSpanExpirationResult() { * Sets an error status and records an exception when an exception is thrown subscriber * concurrency control. */ - public void setSubscribeConcurrencyControlSpanException(Throwable t) { - if (enableOpenTelemetryTracing && subscribeConcurrencyControlSpan != null) { + protected void setSubscribeConcurrencyControlSpanException(Throwable t) { + if (subscribeConcurrencyControlSpan != null) { subscribeConcurrencyControlSpan.setStatus( StatusCode.ERROR, "Exception thrown during subscribe concurrency control."); subscribeConcurrencyControlSpan.recordException(t); @@ -427,11 +334,6 @@ public void setSubscribeConcurrencyControlSpanException(Throwable t) { } } - /** Creates a child span of the given parent span. */ - private Span startChildSpan(Tracer tracer, String name, Span parent) { - return tracer.spanBuilder(name).setParent(Context.current().with(parent)).startSpan(); - } - /** Ends all publisher-side spans associated with this message wrapper. */ private void endAllPublishSpans() { endPublishFlowControlSpan(); @@ -450,7 +352,7 @@ private void endAllSubscribeSpans() { * Injects the span context into the attributes of a Pub/Sub message for propagation to the * subscriber client. */ - private void injectSpanContext() { + protected void injectSpanContext() { TextMapSetter injectMessageAttributes = new TextMapSetter() { @Override @@ -470,7 +372,7 @@ public void set(PubsubMessageWrapper carrier, String key, String value) { * Extracts the span context from the attributes of a Pub/Sub message and creates the parent * subscriber span using that context. */ - private Span extractSpanContext(Tracer tracer, Attributes attributes) { + protected Context extractSpanContext(Attributes attributes) { TextMapGetter extractMessageAttributes = new TextMapGetter() { @Override @@ -485,13 +387,7 @@ public Iterable keys(PubsubMessageWrapper carrier) { Context context = W3CTraceContextPropagator.getInstance() .extract(Context.current(), this, extractMessageAttributes); - publisherSpan = Span.fromContextOrNull(context); - return tracer - .spanBuilder(SUBSCRIBER_SPAN_NAME) - .setSpanKind(SpanKind.CONSUMER) - .setParent(context) - .setAllAttributes(attributes) - .startSpan(); + return context; } /** Builder of {@link PubsubMessageWrapper PubsubMessageWrapper}. */ @@ -501,49 +397,37 @@ protected static final class Builder { private SubscriptionName subscriptionName = null; private String ackId = null; private int deliveryAttempt = 0; - private boolean enableOpenTelemetryTracing = false; - public Builder(PubsubMessage message, String topicName, boolean enableOpenTelemetryTracing) { + public Builder(PubsubMessage message, String topicName) { this.message = message; if (topicName != null) { this.topicName = TopicName.parse(topicName); } - this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; } public Builder( - PubsubMessage message, - String subscriptionName, - String ackId, - int deliveryAttempt, - boolean enableOpenTelemetryTracing) { + PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { this.message = message; if (subscriptionName != null) { this.subscriptionName = SubscriptionName.parse(subscriptionName); } this.ackId = ackId; this.deliveryAttempt = deliveryAttempt; - this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; } public Builder( PubsubMessage message, SubscriptionName subscriptionName, String ackId, - int deliveryAttempt, - boolean enableOpenTelemetryTracing) { + int deliveryAttempt) { this.message = message; this.subscriptionName = subscriptionName; this.ackId = ackId; this.deliveryAttempt = deliveryAttempt; - this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; } public PubsubMessageWrapper build() { - Preconditions.checkArgument( - this.enableOpenTelemetryTracing == false - || this.topicName != null - || this.subscriptionName != null); + Preconditions.checkArgument(this.topicName != null || this.subscriptionName != null); return new PubsubMessageWrapper(this); } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java new file mode 100644 index 000000000..98003235c --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java @@ -0,0 +1,137 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import io.opentelemetry.api.trace.Span; +import java.util.List; + +public interface PubsubTracer { + default void startPublisherSpan(PubsubMessageWrapper message) { + // noop + } + + default void endPublisherSpan(PubsubMessageWrapper message) { + // noop + } + + default void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) { + // noop + } + + default void startPublishFlowControlSpan(PubsubMessageWrapper message) { + // noop + } + + default void endPublishFlowControlSpan(PubsubMessageWrapper message) { + // noop + } + + default void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) { + // noop + } + + default void startPublishBatchingSpan(PubsubMessageWrapper message) { + // noop + } + + default void endPublishBatchingSpan(PubsubMessageWrapper message) { + // noop + } + + default Span startPublishRpcSpan(String topic, List messages) { + // noop + return null; + } + + default void endPublishRpcSpan(Span publishRpcSpan) { + // noop + } + + default void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) { + // noop + } + + default void startSubscriberSpan( + PubsubMessageWrapper message, boolean exactlyOnceDeliveryEnabled) { + // noop + } + + default void endSubscriberSpan(PubsubMessageWrapper message) { + // noop + } + + default void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) { + // noop + } + + default void setSubscriberSpanException( + PubsubMessageWrapper message, Throwable t, String exception) { + // noop + } + + default void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + // noop + } + + default void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + // noop + } + + default void setSubscribeConcurrencyControlSpanException( + PubsubMessageWrapper message, Throwable t) { + // noop + } + + default void startSubscribeSchedulerSpan(PubsubMessageWrapper message) { + // noop + } + + default void endSubscribeSchedulerSpan(PubsubMessageWrapper message) { + // noop + } + + default void startSubscribeProcessSpan(PubsubMessageWrapper message) { + // noop + } + + default void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { + // noop + } + + default Span startSubscribeRpcSpan( + String subscription, + String rpcOperation, + List messages, + int ackDeadline, + boolean isReceiptModack) { + // noop + return null; + } + + default void endSubscribeRpcSpan(Span rpcSpan) { + // noop + } + + default void setSubscribeRpcSpanException( + Span rpcSpan, boolean isModack, int ackDeadline, Throwable t) { + // noop + } + + default void addEndRpcEvent(PubsubMessageWrapper message, boolean isModack, int ackDeadline) { + // noop + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 26f2e0eec..015f78c33 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -51,7 +51,6 @@ import io.grpc.Status; import io.grpc.protobuf.StatusProto; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.Tracer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -121,7 +120,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final String clientId = UUID.randomUUID().toString(); private final boolean enableOpenTelemetryTracing; - private final Tracer tracer; + private final PubsubTracer tracer; private StreamingSubscriberConnection(Builder builder) { subscription = builder.subscription; @@ -453,9 +452,7 @@ private void sendAckOperations( } } // Creates an Ack span to be passed to the callback - Span rpcSpan = - OpenTelemetryUtil.startSubscribeRpcSpan( - tracer, subscription, "ack", messagesInRequest, 0, false, enableOpenTelemetryTracing); + Span rpcSpan = tracer.startSubscribeRpcSpan(subscription, "ack", messagesInRequest, 0, false); ApiFutureCallback callback = getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan); ApiFuture ackFuture = @@ -493,14 +490,12 @@ private void sendModackOperations( String rpcOperation = deadlineExtensionSeconds == 0 ? "nack" : "modack"; // Creates either a ModAck span or a Nack span depending on the given ack deadline Span rpcSpan = - OpenTelemetryUtil.startSubscribeRpcSpan( - tracer, + tracer.startSubscribeRpcSpan( subscription, rpcOperation, messagesInRequest, deadlineExtensionSeconds, - modackRequestData.getIsReceiptModack(), - enableOpenTelemetryTracing); + modackRequestData.getIsReceiptModack()); ApiFutureCallback callback = getCallback( modackRequestData.getAckRequestData(), @@ -561,7 +556,7 @@ private ApiFutureCallback getCallback( public void onSuccess(Empty empty) { ackOperationsWaiter.incrementPendingCount(-1); - OpenTelemetryUtil.endSubscribeRpcSpan(rpcSpan, enableOpenTelemetryTracing); + tracer.endSubscribeRpcSpan(rpcSpan); for (AckRequestData ackRequestData : ackRequestDataList) { // This will check if a response is needed, and if it has already been set @@ -569,9 +564,10 @@ public void onSuccess(Empty empty) { messageDispatcher.notifyAckSuccess(ackRequestData); // Remove from our pending operations pendingRequests.remove(ackRequestData); - ackRequestData.getMessageWrapper().addEndRpcEvent(isModack, deadlineExtensionSeconds); + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); if (!isModack || deadlineExtensionSeconds == 0) { - ackRequestData.getMessageWrapper().endSubscriberSpan(); + tracer.endSubscriberSpan(ackRequestData.getMessageWrapper()); } } } @@ -584,15 +580,15 @@ public void onFailure(Throwable t) { Level level = isAlive() ? Level.WARNING : Level.FINER; logger.log(level, "failed to send operations", t); - OpenTelemetryUtil.setSubscribeRpcSpanException( - rpcSpan, isModack, deadlineExtensionSeconds, t, enableOpenTelemetryTracing); + tracer.setSubscribeRpcSpanException(rpcSpan, isModack, deadlineExtensionSeconds, t); if (!getExactlyOnceDeliveryEnabled()) { if (enableOpenTelemetryTracing) { for (AckRequestData ackRequestData : ackRequestDataList) { - ackRequestData.getMessageWrapper().addEndRpcEvent(isModack, deadlineExtensionSeconds); + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); if (!isModack || deadlineExtensionSeconds == 0) { - ackRequestData.getMessageWrapper().endSubscriberSpan(); + tracer.endSubscriberSpan(ackRequestData.getMessageWrapper()); } } } @@ -619,19 +615,18 @@ public void onFailure(Throwable t) { errorMessage); ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess); messageDispatcher.notifyAckFailed(ackRequestData); - ackRequestData - .getMessageWrapper() - .addEndRpcEvent(isModack, deadlineExtensionSeconds); - ackRequestData - .getMessageWrapper() - .setSubscriberSpanException(t, "Invalid ack ID"); + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); + tracer.setSubscriberSpanException( + ackRequestData.getMessageWrapper(), t, "Invalid ack ID"); } else { logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage); ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess); messageDispatcher.notifyAckFailed(ackRequestData); - ackRequestData - .getMessageWrapper() - .addEndRpcEvent(isModack, deadlineExtensionSeconds); + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); + tracer.setSubscriberSpanException( + ackRequestData.getMessageWrapper(), t, "Unknown error message"); ackRequestData .getMessageWrapper() .setSubscriberSpanException(t, "Unknown error message"); @@ -639,10 +634,9 @@ public void onFailure(Throwable t) { } else { ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); messageDispatcher.notifyAckSuccess(ackRequestData); - ackRequestData.getMessageWrapper().endSubscriberSpan(); - ackRequestData - .getMessageWrapper() - .addEndRpcEvent(isModack, deadlineExtensionSeconds); + tracer.endSubscriberSpan(ackRequestData.getMessageWrapper()); + tracer.addEndRpcEvent( + ackRequestData.getMessageWrapper(), isModack, deadlineExtensionSeconds); } // Remove from our pending pendingRequests.remove(ackRequestData); @@ -704,7 +698,7 @@ public static final class Builder { private ApiClock clock; private boolean enableOpenTelemetryTracing; - private Tracer tracer; + private PubsubTracer tracer; protected Builder(MessageReceiver receiver) { this.receiver = receiver; @@ -801,7 +795,7 @@ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) return this; } - public Builder setTracer(Tracer tracer) { + public Builder setTracer(PubsubTracer tracer) { this.tracer = tracer; return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index e581875a7..aceea0c86 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -151,7 +151,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final boolean enableOpenTelemetryTracing; private final OpenTelemetry openTelemetry; - private Tracer tracer = null; + private PubsubTracer tracer = null; private Subscriber(Builder builder) { receiver = builder.receiver; @@ -209,8 +209,11 @@ private Subscriber(Builder builder) { this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; this.openTelemetry = builder.openTelemetry; - if (this.openTelemetry != null) { - this.tracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); + if (this.openTelemetry != null && this.enableOpenTelemetryTracing) { + Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); + if (openTelemetryTracer != null) { + this.tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + } } streamingSubscriberConnections = new ArrayList(numPullers); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index 9106f69c4..76f3aba09 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -104,25 +104,23 @@ public void testPublishSpansSuccess() { openTelemetryTesting.clearSpans(); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true) - .build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); List messageWrappers = Arrays.asList(messageWrapper); long messageSize = messageWrapper.getPubsubMessage().getData().size(); - Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); // Call all span start/end methods in the expected order - messageWrapper.startPublisherSpan(tracer); - messageWrapper.startPublishFlowControlSpan(tracer); - messageWrapper.endPublishFlowControlSpan(); - messageWrapper.startPublishBatchingSpan(tracer); - messageWrapper.endPublishBatchingSpan(); - Span publishRpcSpan = - OpenTelemetryUtil.startPublishRpcSpan( - tracer, FULL_TOPIC_NAME.toString(), messageWrappers, true); - OpenTelemetryUtil.endPublishRpcSpan(publishRpcSpan, true); - messageWrapper.setPublisherMessageIdSpanAttribute(MESSAGE_ID); - messageWrapper.endPublisherSpan(); + tracer.startPublisherSpan(messageWrapper); + tracer.startPublishFlowControlSpan(messageWrapper); + tracer.endPublishFlowControlSpan(messageWrapper); + tracer.startPublishBatchingSpan(messageWrapper); + tracer.endPublishBatchingSpan(messageWrapper); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); + tracer.endPublishRpcSpan(publishRpcSpan); + tracer.setPublisherMessageIdSpanAttribute(messageWrapper, MESSAGE_ID); + tracer.endPublisherSpan(messageWrapper); List allSpans = openTelemetryTesting.getSpans(); assertEquals(4, allSpans.size()); @@ -220,16 +218,16 @@ public void testPublishFlowControlSpanFailure() { openTelemetryTesting.clearSpans(); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true) - .build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); - Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); - messageWrapper.startPublisherSpan(tracer); - messageWrapper.startPublishFlowControlSpan(tracer); + tracer.startPublisherSpan(messageWrapper); + tracer.startPublishFlowControlSpan(messageWrapper); Exception e = new Exception("test-exception"); - messageWrapper.setPublishFlowControlSpanException(e); + tracer.setPublishFlowControlSpanException(messageWrapper, e); List allSpans = openTelemetryTesting.getSpans(); assertEquals(2, allSpans.size()); @@ -255,64 +253,23 @@ public void testPublishFlowControlSpanFailure() { .hasEnded(); } - @Test - public void testPublishBatchingSpanFailure() { - openTelemetryTesting.clearSpans(); - - PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true) - .build(); - - Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - - messageWrapper.startPublisherSpan(tracer); - messageWrapper.startPublishBatchingSpan(tracer); - - Exception e = new Exception("test-exception"); - messageWrapper.setPublishBatchingSpanException(e); - - List allSpans = openTelemetryTesting.getSpans(); - assertEquals(2, allSpans.size()); - SpanData batchingSpanData = allSpans.get(0); - SpanData publisherSpanData = allSpans.get(1); - - SpanDataAssert batchingSpanDataAssert = OpenTelemetryAssertions.assertThat(batchingSpanData); - StatusData expectedStatus = - StatusData.create(StatusCode.ERROR, "Exception thrown during publish batching."); - batchingSpanDataAssert - .hasName(PUBLISH_BATCHING_SPAN_NAME) - .hasParent(publisherSpanData) - .hasStatus(expectedStatus) - .hasException(e) - .hasEnded(); - - SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); - publisherSpanDataAssert - .hasName(PUBLISHER_SPAN_NAME) - .hasKind(SpanKind.PRODUCER) - .hasNoParent() - .hasEnded(); - } - @Test public void testPublishRpcSpanFailure() { openTelemetryTesting.clearSpans(); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true) - .build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); List messageWrappers = Arrays.asList(messageWrapper); - Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); - messageWrapper.startPublisherSpan(tracer); - Span publishRpcSpan = - OpenTelemetryUtil.startPublishRpcSpan( - tracer, FULL_TOPIC_NAME.toString(), messageWrappers, true); + tracer.startPublisherSpan(messageWrapper); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); Exception e = new Exception("test-exception"); - OpenTelemetryUtil.setPublishRpcSpanException(publishRpcSpan, e, true); - messageWrapper.endPublisherSpan(); + tracer.setPublishRpcSpanException(publishRpcSpan, e); + tracer.endPublisherSpan(messageWrapper); List allSpans = openTelemetryTesting.getSpans(); assertEquals(2, allSpans.size()); @@ -341,67 +298,53 @@ public void testPublishRpcSpanFailure() { public void testSubscribeSpansSuccess() { openTelemetryTesting.clearSpans(); - Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); PubsubMessageWrapper publishMessageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true) - .build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); // Initialize the Publisher span to inject the context in the message - publishMessageWrapper.startPublisherSpan(tracer); - publishMessageWrapper.endPublisherSpan(); + tracer.startPublisherSpan(publishMessageWrapper); + tracer.endPublisherSpan(publishMessageWrapper); PubsubMessage publishedMessage = publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build(); PubsubMessageWrapper subscribeMessageWrapper = PubsubMessageWrapper.newBuilder( - publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1, true) + publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1) .build(); List subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper); long messageSize = subscribeMessageWrapper.getPubsubMessage().getData().size(); // Call all span start/end methods in the expected order - subscribeMessageWrapper.startSubscriberSpan(tracer, EXACTLY_ONCE_ENABLED); - subscribeMessageWrapper.startSubscribeConcurrencyControlSpan(tracer); - subscribeMessageWrapper.endSubscribeConcurrencyControlSpan(); - subscribeMessageWrapper.startSubscribeSchedulerSpan(tracer); - subscribeMessageWrapper.endSubscribeSchedulerSpan(); - subscribeMessageWrapper.startSubscribeProcessSpan(tracer); - subscribeMessageWrapper.endSubscribeProcessSpan(PROCESS_ACTION); + tracer.startSubscriberSpan(subscribeMessageWrapper, EXACTLY_ONCE_ENABLED); + tracer.startSubscribeConcurrencyControlSpan(subscribeMessageWrapper); + tracer.endSubscribeConcurrencyControlSpan(subscribeMessageWrapper); + tracer.startSubscribeSchedulerSpan(subscribeMessageWrapper); + tracer.endSubscribeSchedulerSpan(subscribeMessageWrapper); + tracer.startSubscribeProcessSpan(subscribeMessageWrapper); + tracer.endSubscribeProcessSpan(subscribeMessageWrapper, PROCESS_ACTION); Span subscribeModackRpcSpan = - OpenTelemetryUtil.startSubscribeRpcSpan( - tracer, + tracer.startSubscribeRpcSpan( FULL_SUBSCRIPTION_NAME.toString(), "modack", subscribeMessageWrappers, ACK_DEADLINE, - true, true); - OpenTelemetryUtil.endSubscribeRpcSpan(subscribeModackRpcSpan, true); - subscribeMessageWrapper.addEndRpcEvent(true, ACK_DEADLINE); + tracer.endSubscribeRpcSpan(subscribeModackRpcSpan); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, ACK_DEADLINE); Span subscribeAckRpcSpan = - OpenTelemetryUtil.startSubscribeRpcSpan( - tracer, - FULL_SUBSCRIPTION_NAME.toString(), - "ack", - subscribeMessageWrappers, - 0, - false, - true); - OpenTelemetryUtil.endSubscribeRpcSpan(subscribeAckRpcSpan, true); - subscribeMessageWrapper.addEndRpcEvent(false, 0); + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false); + tracer.endSubscribeRpcSpan(subscribeAckRpcSpan); + tracer.addEndRpcEvent(subscribeMessageWrapper, false, 0); Span subscribeNackRpcSpan = - OpenTelemetryUtil.startSubscribeRpcSpan( - tracer, - FULL_SUBSCRIPTION_NAME.toString(), - "nack", - subscribeMessageWrappers, - 0, - false, - true); - OpenTelemetryUtil.endSubscribeRpcSpan(subscribeNackRpcSpan, true); - subscribeMessageWrapper.addEndRpcEvent(true, 0); - subscribeMessageWrapper.endSubscriberSpan(); + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false); + tracer.endSubscribeRpcSpan(subscribeNackRpcSpan); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, 0); + tracer.endSubscriberSpan(subscribeMessageWrapper); List allSpans = openTelemetryTesting.getSpans(); assertEquals(8, allSpans.size()); @@ -459,8 +402,7 @@ public void testSubscribeSpansSuccess() { .containsEntry( SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) - .containsEntry( - SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") + .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack") .containsEntry( SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()) @@ -487,8 +429,7 @@ public void testSubscribeSpansSuccess() { .containsEntry( SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) - .containsEntry( - SemanticAttributes.CODE_FUNCTION, "sendAckOperations") + .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendAckOperations") .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "ack") .containsEntry( SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); @@ -513,8 +454,7 @@ public void testSubscribeSpansSuccess() { .containsEntry( SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) - .containsEntry( - SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") + .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "nack") .containsEntry( SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); @@ -578,20 +518,17 @@ public void testSubscribeConcurrencyControlSpanFailure() { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( - getPubsubMessage(), - FULL_SUBSCRIPTION_NAME.toString(), - ACK_ID, - DELIVERY_ATTEMPT, - true) + getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) .build(); - Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); - messageWrapper.startSubscriberSpan(tracer, EXACTLY_ONCE_ENABLED); - messageWrapper.startSubscribeConcurrencyControlSpan(tracer); + tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); + tracer.startSubscribeConcurrencyControlSpan(messageWrapper); Exception e = new Exception("test-exception"); - messageWrapper.setSubscribeConcurrencyControlSpanException(e); + tracer.setSubscribeConcurrencyControlSpanException(messageWrapper, e); List allSpans = openTelemetryTesting.getSpans(); assertEquals(2, allSpans.size()); @@ -625,19 +562,16 @@ public void testSubscriberSpanFailure() { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( - getPubsubMessage(), - FULL_SUBSCRIPTION_NAME.toString(), - ACK_ID, - DELIVERY_ATTEMPT, - true) + getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) .build(); - Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); - messageWrapper.startSubscriberSpan(tracer, EXACTLY_ONCE_ENABLED); + tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); Exception e = new Exception("test-exception"); - messageWrapper.setSubscriberSpanException(e, "Test exception"); + tracer.setSubscriberSpanException(messageWrapper, e, "Test exception"); List allSpans = openTelemetryTesting.getSpans(); assertEquals(1, allSpans.size()); @@ -661,39 +595,29 @@ public void testSubscribeRpcSpanFailures() { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( - getPubsubMessage(), - FULL_SUBSCRIPTION_NAME.toString(), - ACK_ID, - DELIVERY_ATTEMPT, - true) + getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) .build(); List messageWrappers = Arrays.asList(messageWrapper); - Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); - messageWrapper.startSubscriberSpan(tracer, EXACTLY_ONCE_ENABLED); + tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); Span subscribeModackRpcSpan = - OpenTelemetryUtil.startSubscribeRpcSpan( - tracer, - FULL_SUBSCRIPTION_NAME.toString(), - "modack", - messageWrappers, - ACK_DEADLINE, - true, - true); + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "modack", messageWrappers, ACK_DEADLINE, true); Span subscribeAckRpcSpan = - OpenTelemetryUtil.startSubscribeRpcSpan( - tracer, FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false, true); + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false); Span subscribeNackRpcSpan = - OpenTelemetryUtil.startSubscribeRpcSpan( - tracer, FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false, true); + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false); Exception e = new Exception("test-exception"); - OpenTelemetryUtil.setSubscribeRpcSpanException( - subscribeModackRpcSpan, true, ACK_DEADLINE, e, true); - OpenTelemetryUtil.setSubscribeRpcSpanException(subscribeAckRpcSpan, false, 0, e, true); - OpenTelemetryUtil.setSubscribeRpcSpanException(subscribeNackRpcSpan, true, 0, e, true); - messageWrapper.endSubscriberSpan(); + tracer.setSubscribeRpcSpanException(subscribeModackRpcSpan, true, ACK_DEADLINE, e); + tracer.setSubscribeRpcSpanException(subscribeAckRpcSpan, false, 0, e); + tracer.setSubscribeRpcSpanException(subscribeNackRpcSpan, true, 0, e); + tracer.endSubscriberSpan(messageWrapper); List allSpans = openTelemetryTesting.getSpans(); assertEquals(4, allSpans.size());