From fe4753f35cf22a082c8b76c0c43d5d7ce3ff295f Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Wed, 25 Sep 2024 00:17:48 +0000 Subject: [PATCH] feat: Make OTel classes/methods package-private and remove non-generic PubsubTracer interface --- .../cloud/pubsub/v1/BasePubsubTracer.java | 21 -- .../cloud/pubsub/v1/MessageDispatcher.java | 6 +- .../pubsub/v1/OpenTelemetryPubsubTracer.java | 191 ++++++++++++------ .../cloud/pubsub/v1/OpenTelemetryUtil.java | 42 ---- .../com/google/cloud/pubsub/v1/Publisher.java | 4 +- .../cloud/pubsub/v1/PubsubMessageWrapper.java | 92 +++++---- .../google/cloud/pubsub/v1/PubsubTracer.java | 138 ------------- .../v1/StreamingSubscriberConnection.java | 6 +- .../google/cloud/pubsub/v1/Subscriber.java | 4 +- .../cloud/pubsub/v1/OpenTelemetryTest.java | 14 +- 10 files changed, 192 insertions(+), 326 deletions(-) delete mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/BasePubsubTracer.java delete mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java delete mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/BasePubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/BasePubsubTracer.java deleted file mode 100644 index f574ee163..000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/BasePubsubTracer.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.v1; - -public class BasePubsubTracer implements PubsubTracer { - BasePubsubTracer() {} -} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 9695e542d..860fcbcf9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -106,7 +106,7 @@ class MessageDispatcher { private final String subscriptionName; private final boolean enableOpenTelemetryTracing; - private PubsubTracer tracer = new BasePubsubTracer(); + private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); /** Internal representation of a reply to a Pubsub message, to be sent back to the service. */ public enum AckReply { @@ -688,7 +688,7 @@ public static final class Builder { private String subscriptionName; private boolean enableOpenTelemetryTracing; - private PubsubTracer tracer; + private OpenTelemetryPubsubTracer tracer; protected Builder(MessageReceiver receiver) { this.receiver = receiver; @@ -770,7 +770,7 @@ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) return this; } - public Builder setTracer(PubsubTracer tracer) { + public Builder setTracer(OpenTelemetryPubsubTracer tracer) { this.tracer = tracer; return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java index 7fb974f8e..fda28898f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -16,8 +16,6 @@ package com.google.cloud.pubsub.v1; -import com.google.api.core.InternalApi; -import com.google.common.base.Preconditions; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.SubscriptionName; import com.google.pubsub.v1.TopicName; @@ -32,9 +30,9 @@ import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.util.List; -@InternalApi("For use by the google-cloud-pubsub library only") -public class OpenTelemetryPubsubTracer implements PubsubTracer { +public class OpenTelemetryPubsubTracer { private final Tracer tracer; + private boolean enabled = false; private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; @@ -51,12 +49,32 @@ public class OpenTelemetryPubsubTracer implements PubsubTracer { "messaging.gcp_pubsub.message.delivery_attempt"; private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; + private static final String PROJECT_ATTR_KEY = "gcp.project_id"; private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; - OpenTelemetryPubsubTracer(Tracer tracer) { - this.tracer = Preconditions.checkNotNull(tracer, "OpenTelemetry tracer cannot be null"); + OpenTelemetryPubsubTracer(Tracer tracer, boolean enableOpenTelemetry) { + this.tracer = tracer; + if (this.tracer != null && enableOpenTelemetry) { + this.enabled = true; + } + } + + /** Populates attributes that are common the publisher parent span and publish RPC span. */ + private static final AttributesBuilder createCommonSpanAttributesBuilder( + String destinationName, String projectName, String codeFunction, String operation) { + AttributesBuilder attributesBuilder = + Attributes.builder() + .put(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .put(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName) + .put(PROJECT_ATTR_KEY, projectName) + .put(SemanticAttributes.CODE_FUNCTION, codeFunction); + if (operation != null) { + attributesBuilder.put(SemanticAttributes.MESSAGING_OPERATION, operation); + } + + return attributesBuilder; } private Span startChildSpan(String name, Span parent) { @@ -67,10 +85,12 @@ private Span startChildSpan(String name, Span parent) { * Creates and starts the parent span with the appropriate span attributes and injects the span * context into the {@link PubsubMessage} attributes. */ - @Override - public void startPublisherSpan(PubsubMessageWrapper message) { + void startPublisherSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } AttributesBuilder attributesBuilder = - OpenTelemetryUtil.createCommonSpanAttributesBuilder( + createCommonSpanAttributesBuilder( message.getTopicName(), message.getTopicProject(), "publish", "create"); attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()); @@ -91,44 +111,60 @@ public void startPublisherSpan(PubsubMessageWrapper message) { } } - public void endPublisherSpan(PubsubMessageWrapper message) { + void endPublisherSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } message.endPublisherSpan(); } - public void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) { + void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) { + if (!enabled) { + return; + } message.setPublisherMessageIdSpanAttribute(messageId); } /** Creates a span for publish-side flow control as a child of the parent publisher span. */ - @Override - public void startPublishFlowControlSpan(PubsubMessageWrapper message) { + void startPublishFlowControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } Span publisherSpan = message.getPublisherSpan(); if (publisherSpan != null) message.setPublishFlowControlSpan( startChildSpan(PUBLISH_FLOW_CONTROL_SPAN_NAME, publisherSpan)); } - @Override - public void endPublishFlowControlSpan(PubsubMessageWrapper message) { + void endPublishFlowControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } message.endPublishFlowControlSpan(); } - @Override - public void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) { + void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) { + if (!enabled) { + return; + } message.setPublishFlowControlSpanException(t); } /** Creates a span for publish message batching as a child of the parent publisher span. */ - @Override - public void startPublishBatchingSpan(PubsubMessageWrapper message) { + void startPublishBatchingSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } Span publisherSpan = message.getPublisherSpan(); if (publisherSpan != null) { message.setPublishBatchingSpan(startChildSpan(PUBLISH_BATCHING_SPAN_NAME, publisherSpan)); } } - @Override - public void endPublishBatchingSpan(PubsubMessageWrapper message) { + void endPublishBatchingSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } message.endPublishBatchingSpan(); } @@ -136,11 +172,13 @@ public void endPublishBatchingSpan(PubsubMessageWrapper message) { * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional * links with the publisher parent span are created for sampled messages in the batch. */ - @Override - public Span startPublishRpcSpan(String topic, List messages) { + Span startPublishRpcSpan(String topic, List messages) { + if (!enabled) { + return null; + } TopicName topicName = TopicName.parse(topic); Attributes attributes = - OpenTelemetryUtil.createCommonSpanAttributesBuilder( + createCommonSpanAttributesBuilder( topicName.getTopic(), topicName.getProject(), "publishCall", "publish") .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()) .build(); @@ -167,8 +205,10 @@ public Span startPublishRpcSpan(String topic, List message } /** Ends the given publish RPC span if it exists. */ - @Override - public void endPublishRpcSpan(Span publishRpcSpan) { + void endPublishRpcSpan(Span publishRpcSpan) { + if (!enabled) { + return; + } if (publishRpcSpan != null) { publishRpcSpan.end(); } @@ -178,8 +218,10 @@ public void endPublishRpcSpan(Span publishRpcSpan) { * Sets an error status and records an exception when an exception is thrown when publishing the * message batch. */ - @Override - public void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) { + void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) { + if (!enabled) { + return; + } if (publishRpcSpan != null) { publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC."); publishRpcSpan.recordException(t); @@ -187,11 +229,13 @@ public void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) { } } - @Override - public void startSubscriberSpan( + void startSubscriberSpan( PubsubMessageWrapper message, boolean exactlyOnceDeliveryEnabled) { + if (!enabled) { + return; + } AttributesBuilder attributesBuilder = - OpenTelemetryUtil.createCommonSpanAttributesBuilder( + createCommonSpanAttributesBuilder( message.getSubscriptionName(), message.getSubscriptionProject(), "onResponse", null); attributesBuilder @@ -217,25 +261,33 @@ public void startSubscriberSpan( .startSpan()); } - @Override - public void endSubscriberSpan(PubsubMessageWrapper message) { + void endSubscriberSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } message.endSubscriberSpan(); } - @Override - public void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) { + void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) { + if (!enabled) { + return; + } message.setSubscriberSpanExpirationResult(); } - @Override - public void setSubscriberSpanException( + void setSubscriberSpanException( PubsubMessageWrapper message, Throwable t, String exception) { + if (!enabled) { + return; + } message.setSubscriberSpanException(t, exception); } /** Creates a span for subscribe concurrency control as a child of the parent subscriber span. */ - @Override - public void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } Span subscriberSpan = message.getSubscriberSpan(); if (subscriberSpan != null) { message.setSubscribeConcurrencyControlSpan( @@ -243,22 +295,28 @@ public void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { } } - @Override - public void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } message.endSubscribeConcurrencyControlSpan(); } - @Override - public void setSubscribeConcurrencyControlSpanException( + void setSubscribeConcurrencyControlSpanException( PubsubMessageWrapper message, Throwable t) { + if (!enabled) { + return; + } message.setSubscribeConcurrencyControlSpanException(t); } /** * Creates a span for subscribe ordering key scheduling as a child of the parent subscriber span. */ - @Override - public void startSubscribeSchedulerSpan(PubsubMessageWrapper message) { + void startSubscribeSchedulerSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } Span subscriberSpan = message.getSubscriberSpan(); if (subscriberSpan != null) { message.setSubscribeSchedulerSpan( @@ -266,14 +324,18 @@ public void startSubscribeSchedulerSpan(PubsubMessageWrapper message) { } } - @Override - public void endSubscribeSchedulerSpan(PubsubMessageWrapper message) { + void endSubscribeSchedulerSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } message.endSubscribeSchedulerSpan(); } /** Creates a span for subscribe message processing as a child of the parent subscriber span. */ - @Override - public void startSubscribeProcessSpan(PubsubMessageWrapper message) { + void startSubscribeProcessSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } Span subscriberSpan = message.getSubscriberSpan(); if (subscriberSpan != null) { Span subscribeProcessSpan = @@ -288,8 +350,10 @@ public void startSubscribeProcessSpan(PubsubMessageWrapper message) { } } - @Override - public void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { + void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { + if (!enabled) { + return; + } message.endSubscribeProcessSpan(action); } @@ -297,17 +361,19 @@ public void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links * to parent subscribe span for sampled messages are added. */ - @Override - public Span startSubscribeRpcSpan( + Span startSubscribeRpcSpan( String subscription, String rpcOperation, List messages, int ackDeadline, boolean isReceiptModack) { + if (!enabled) { + return null; + } String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations"; SubscriptionName subscriptionName = SubscriptionName.parse(subscription); AttributesBuilder attributesBuilder = - OpenTelemetryUtil.createCommonSpanAttributesBuilder( + createCommonSpanAttributesBuilder( subscriptionName.getSubscription(), subscriptionName.getProject(), codeFunction, @@ -355,8 +421,10 @@ public Span startSubscribeRpcSpan( } /** Ends the given subscribe RPC span if it exists. */ - @Override - public void endSubscribeRpcSpan(Span rpcSpan) { + void endSubscribeRpcSpan(Span rpcSpan) { + if (!enabled) { + return; + } if (rpcSpan != null) { rpcSpan.end(); } @@ -366,9 +434,11 @@ public void endSubscribeRpcSpan(Span rpcSpan) { * Sets an error status and records an exception when an exception is thrown when handling a * subscribe-side RPC. */ - @Override - public void setSubscribeRpcSpanException( + void setSubscribeRpcSpanException( Span rpcSpan, boolean isModack, int ackDeadline, Throwable t) { + if (!enabled) { + return; + } if (rpcSpan != null) { String operation = !isModack ? "ack" : (ackDeadline == 0 ? "nack" : "modack"); rpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on " + operation + " RPC."); @@ -378,10 +448,9 @@ public void setSubscribeRpcSpanException( } /** Adds the appropriate subscribe-side RPC end event. */ - @Override - public void addEndRpcEvent( + void addEndRpcEvent( PubsubMessageWrapper message, boolean rpcSampled, boolean isModack, int ackDeadline) { - if (!rpcSampled) { + if (!enabled || !rpcSampled) { return; } if (!isModack) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java deleted file mode 100644 index c100675b5..000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.v1; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; - -public class OpenTelemetryUtil { - private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; - private static final String PROJECT_ATTR_KEY = "gcp.project_id"; - - /** Populates attributes that are common the publisher parent span and publish RPC span. */ - protected static final AttributesBuilder createCommonSpanAttributesBuilder( - String destinationName, String projectName, String codeFunction, String operation) { - AttributesBuilder attributesBuilder = - Attributes.builder() - .put(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) - .put(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName) - .put(PROJECT_ATTR_KEY, projectName) - .put(SemanticAttributes.CODE_FUNCTION, codeFunction); - if (operation != null) { - attributesBuilder.put(SemanticAttributes.MESSAGING_OPERATION, operation); - } - - return attributesBuilder; - } -} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 5cd20235d..a4cfdc257 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -131,7 +131,7 @@ public class Publisher implements PublisherInterface { private final boolean enableOpenTelemetryTracing; private final OpenTelemetry openTelemetry; - private PubsubTracer tracer = new BasePubsubTracer(); + private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { @@ -166,7 +166,7 @@ private Publisher(Builder builder) throws IOException { if (this.openTelemetry != null && this.enableOpenTelemetryTracing) { Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); if (openTelemetryTracer != null) { - this.tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + this.tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, this.enableOpenTelemetryTracing); } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java index 53cf95544..94fd13085 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -16,7 +16,6 @@ package com.google.cloud.pubsub.v1; -import com.google.api.core.InternalApi; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.SubscriptionName; import com.google.pubsub.v1.TopicName; @@ -66,8 +65,7 @@ public class PubsubMessageWrapper { private Span subscribeSchedulerSpan; private Span subscribeProcessSpan; - @InternalApi("For use by the google-cloud-pubsub library only") - public PubsubMessageWrapper(Builder builder) { + private PubsubMessageWrapper(Builder builder) { this.message = builder.message; this.topicName = builder.topicName; this.subscriptionName = builder.subscriptionName; @@ -75,33 +73,33 @@ public PubsubMessageWrapper(Builder builder) { this.deliveryAttempt = builder.deliveryAttempt; } - public static Builder newBuilder(PubsubMessage message, String topicName) { + static Builder newBuilder(PubsubMessage message, String topicName) { return new Builder(message, topicName); } - public static Builder newBuilder( + static Builder newBuilder( PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { return new Builder(message, subscriptionName, ackId, deliveryAttempt); } /** Returns the PubsubMessage associated with this wrapper. */ - protected PubsubMessage getPubsubMessage() { + PubsubMessage getPubsubMessage() { return message; } - protected void setPubsubMessage(PubsubMessage message) { + void setPubsubMessage(PubsubMessage message) { this.message = message; } /** Returns the TopicName for this wrapper as a string. */ - protected String getTopicName() { + String getTopicName() { if (topicName != null) { return topicName.getTopic(); } return ""; } - protected String getTopicProject() { + String getTopicProject() { if (topicName != null) { return topicName.getProject(); } @@ -109,78 +107,78 @@ protected String getTopicProject() { } /** Returns the SubscriptionName for this wrapper as a string. */ - protected String getSubscriptionName() { + String getSubscriptionName() { if (subscriptionName != null) { return subscriptionName.getSubscription(); } return ""; } - protected String getSubscriptionProject() { + String getSubscriptionProject() { if (subscriptionName != null) { return subscriptionName.getProject(); } return ""; } - protected String getMessageId() { + String getMessageId() { return message.getMessageId(); } - protected String getAckId() { + String getAckId() { return ackId; } - protected int getDataSize() { + int getDataSize() { return message.getData().size(); } - protected String getOrderingKey() { + String getOrderingKey() { return message.getOrderingKey(); } - protected int getDeliveryAttempt() { + int getDeliveryAttempt() { return deliveryAttempt; } - protected Span getPublisherSpan() { + Span getPublisherSpan() { return publisherSpan; } - protected void setPublisherSpan(Span span) { + void setPublisherSpan(Span span) { this.publisherSpan = span; } - protected void setPublishFlowControlSpan(Span span) { + void setPublishFlowControlSpan(Span span) { this.publishFlowControlSpan = span; } - protected void setPublishBatchingSpan(Span span) { + void setPublishBatchingSpan(Span span) { this.publishBatchingSpan = span; } - protected Span getSubscriberSpan() { + Span getSubscriberSpan() { return subscriberSpan; } - protected void setSubscriberSpan(Span span) { + void setSubscriberSpan(Span span) { this.subscriberSpan = span; } - protected void setSubscribeConcurrencyControlSpan(Span span) { + void setSubscribeConcurrencyControlSpan(Span span) { this.subscribeConcurrencyControlSpan = span; } - protected void setSubscribeSchedulerSpan(Span span) { + void setSubscribeSchedulerSpan(Span span) { this.subscribeSchedulerSpan = span; } - protected void setSubscribeProcessSpan(Span span) { + void setSubscribeProcessSpan(Span span) { this.subscribeProcessSpan = span; } /** Creates a publish start event that is tied to the publish RPC span time. */ - protected void addPublishStartEvent() { + void addPublishStartEvent() { if (publisherSpan != null) { publisherSpan.addEvent(PUBLISH_START_EVENT); } @@ -190,14 +188,14 @@ protected void addPublishStartEvent() { * Sets the message ID attribute in the publisher parent span. This is called after the publish * RPC returns with a message ID. */ - protected void setPublisherMessageIdSpanAttribute(String messageId) { + void setPublisherMessageIdSpanAttribute(String messageId) { if (publisherSpan != null) { publisherSpan.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId); } } /** Ends the publisher parent span if it exists. */ - protected void endPublisherSpan() { + void endPublisherSpan() { if (publisherSpan != null) { publisherSpan.addEvent(PUBLISH_END_EVENT); publisherSpan.end(); @@ -205,14 +203,14 @@ protected void endPublisherSpan() { } /** Ends the publish flow control span if it exists. */ - protected void endPublishFlowControlSpan() { + void endPublishFlowControlSpan() { if (publishFlowControlSpan != null) { publishFlowControlSpan.end(); } } /** Ends the publish batching span if it exists. */ - protected void endPublishBatchingSpan() { + void endPublishBatchingSpan() { if (publishBatchingSpan != null) { publishBatchingSpan.end(); } @@ -221,7 +219,7 @@ protected void endPublishBatchingSpan() { /** * Sets an error status and records an exception when an exception is thrown during flow control. */ - protected void setPublishFlowControlSpanException(Throwable t) { + void setPublishFlowControlSpanException(Throwable t) { if (publishFlowControlSpan != null) { publishFlowControlSpan.setStatus( StatusCode.ERROR, "Exception thrown during publish flow control."); @@ -234,58 +232,58 @@ protected void setPublishFlowControlSpanException(Throwable t) { * Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding * RPC span start and end times. */ - protected void addModAckStartEvent() { + void addModAckStartEvent() { if (subscriberSpan != null) { subscriberSpan.addEvent(MODACK_START_EVENT); } } - protected void addModAckEndEvent() { + void addModAckEndEvent() { if (subscriberSpan != null) { subscriberSpan.addEvent(MODACK_END_EVENT); } } - protected void addNackStartEvent() { + void addNackStartEvent() { if (subscriberSpan != null) { subscriberSpan.addEvent(NACK_START_EVENT); } } - protected void addNackEndEvent() { + void addNackEndEvent() { if (subscriberSpan != null) { subscriberSpan.addEvent(NACK_END_EVENT); } } - protected void addAckStartEvent() { + void addAckStartEvent() { if (subscriberSpan != null) { subscriberSpan.addEvent(ACK_START_EVENT); } } - protected void addAckEndEvent() { + void addAckEndEvent() { if (subscriberSpan != null) { subscriberSpan.addEvent(ACK_END_EVENT); } } /** Ends the subscriber parent span if exists. */ - protected void endSubscriberSpan() { + void endSubscriberSpan() { if (subscriberSpan != null) { subscriberSpan.end(); } } /** Ends the subscribe concurreny control span if exists. */ - protected void endSubscribeConcurrencyControlSpan() { + void endSubscribeConcurrencyControlSpan() { if (subscribeConcurrencyControlSpan != null) { subscribeConcurrencyControlSpan.end(); } } /** Ends the subscribe scheduler span if exists. */ - protected void endSubscribeSchedulerSpan() { + void endSubscribeSchedulerSpan() { if (subscribeSchedulerSpan != null) { subscribeSchedulerSpan.end(); } @@ -295,7 +293,7 @@ protected void endSubscribeSchedulerSpan() { * Ends the subscribe process span if it exists, creates an event with the appropriate result, and * sets the result on the parent subscriber span. */ - protected void endSubscribeProcessSpan(String action) { + void endSubscribeProcessSpan(String action) { if (subscribeProcessSpan != null) { subscribeProcessSpan.addEvent(action + " called"); subscribeProcessSpan.end(); @@ -304,7 +302,7 @@ protected void endSubscribeProcessSpan(String action) { } /** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */ - protected void setSubscriberSpanException(Throwable t, String exception) { + void setSubscriberSpanException(Throwable t, String exception) { if (subscriberSpan != null) { subscriberSpan.setStatus(StatusCode.ERROR, exception); subscriberSpan.recordException(t); @@ -313,7 +311,7 @@ protected void setSubscriberSpanException(Throwable t, String exception) { } /** Sets result of the parent subscriber span to expired and ends its. */ - protected void setSubscriberSpanExpirationResult() { + void setSubscriberSpanExpirationResult() { if (subscriberSpan != null) { subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, "expired"); endSubscriberSpan(); @@ -324,7 +322,7 @@ protected void setSubscriberSpanExpirationResult() { * Sets an error status and records an exception when an exception is thrown subscriber * concurrency control. */ - protected void setSubscribeConcurrencyControlSpanException(Throwable t) { + void setSubscribeConcurrencyControlSpanException(Throwable t) { if (subscribeConcurrencyControlSpan != null) { subscribeConcurrencyControlSpan.setStatus( StatusCode.ERROR, "Exception thrown during subscribe concurrency control."); @@ -351,7 +349,7 @@ private void endAllSubscribeSpans() { * Injects the span context into the attributes of a Pub/Sub message for propagation to the * subscriber client. */ - protected void injectSpanContext() { + void injectSpanContext() { TextMapSetter injectMessageAttributes = new TextMapSetter() { @Override @@ -371,7 +369,7 @@ public void set(PubsubMessageWrapper carrier, String key, String value) { * Extracts the span context from the attributes of a Pub/Sub message and creates the parent * subscriber span using that context. */ - protected Context extractSpanContext(Attributes attributes) { + Context extractSpanContext(Attributes attributes) { TextMapGetter extractMessageAttributes = new TextMapGetter() { @Override @@ -390,7 +388,7 @@ public Iterable keys(PubsubMessageWrapper carrier) { } /** Builder of {@link PubsubMessageWrapper PubsubMessageWrapper}. */ - protected static final class Builder { + static final class Builder { private PubsubMessage message = null; private TopicName topicName = null; private SubscriptionName subscriptionName = null; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java deleted file mode 100644 index 70123f98b..000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubTracer.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.v1; - -import io.opentelemetry.api.trace.Span; -import java.util.List; - -public interface PubsubTracer { - default void startPublisherSpan(PubsubMessageWrapper message) { - // noop - } - - default void endPublisherSpan(PubsubMessageWrapper message) { - // noop - } - - default void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) { - // noop - } - - default void startPublishFlowControlSpan(PubsubMessageWrapper message) { - // noop - } - - default void endPublishFlowControlSpan(PubsubMessageWrapper message) { - // noop - } - - default void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) { - // noop - } - - default void startPublishBatchingSpan(PubsubMessageWrapper message) { - // noop - } - - default void endPublishBatchingSpan(PubsubMessageWrapper message) { - // noop - } - - default Span startPublishRpcSpan(String topic, List messages) { - // noop - return null; - } - - default void endPublishRpcSpan(Span publishRpcSpan) { - // noop - } - - default void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) { - // noop - } - - default void startSubscriberSpan( - PubsubMessageWrapper message, boolean exactlyOnceDeliveryEnabled) { - // noop - } - - default void endSubscriberSpan(PubsubMessageWrapper message) { - // noop - } - - default void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) { - // noop - } - - default void setSubscriberSpanException( - PubsubMessageWrapper message, Throwable t, String exception) { - // noop - } - - default void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { - // noop - } - - default void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { - // noop - } - - default void setSubscribeConcurrencyControlSpanException( - PubsubMessageWrapper message, Throwable t) { - // noop - } - - default void startSubscribeSchedulerSpan(PubsubMessageWrapper message) { - // noop - } - - default void endSubscribeSchedulerSpan(PubsubMessageWrapper message) { - // noop - } - - default void startSubscribeProcessSpan(PubsubMessageWrapper message) { - // noop - } - - default void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { - // noop - } - - default Span startSubscribeRpcSpan( - String subscription, - String rpcOperation, - List messages, - int ackDeadline, - boolean isReceiptModack) { - // noop - return null; - } - - default void endSubscribeRpcSpan(Span rpcSpan) { - // noop - } - - default void setSubscribeRpcSpanException( - Span rpcSpan, boolean isModack, int ackDeadline, Throwable t) { - // noop - } - - default void addEndRpcEvent( - PubsubMessageWrapper message, boolean rpcSampled, boolean isModack, int ackDeadline) { - // noop - } -} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index f33d2243d..60da55cee 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -120,7 +120,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final String clientId = UUID.randomUUID().toString(); private final boolean enableOpenTelemetryTracing; - private PubsubTracer tracer = new BasePubsubTracer(); + private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); private StreamingSubscriberConnection(Builder builder) { subscription = builder.subscription; @@ -717,7 +717,7 @@ public static final class Builder { private ApiClock clock; private boolean enableOpenTelemetryTracing; - private PubsubTracer tracer; + private OpenTelemetryPubsubTracer tracer; protected Builder(MessageReceiver receiver) { this.receiver = receiver; @@ -814,7 +814,7 @@ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) return this; } - public Builder setTracer(PubsubTracer tracer) { + public Builder setTracer(OpenTelemetryPubsubTracer tracer) { this.tracer = tracer; return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index a6413389f..8f4e1a464 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -151,7 +151,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final boolean enableOpenTelemetryTracing; private final OpenTelemetry openTelemetry; - private PubsubTracer tracer = new BasePubsubTracer(); + private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); private Subscriber(Builder builder) { receiver = builder.receiver; @@ -212,7 +212,7 @@ private Subscriber(Builder builder) { if (this.openTelemetry != null && this.enableOpenTelemetryTracing) { Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); if (openTelemetryTracer != null) { - this.tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + this.tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, this.enableOpenTelemetryTracing); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index 204e12e3d..b4433f41e 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -109,7 +109,7 @@ public void testPublishSpansSuccess() { long messageSize = messageWrapper.getPubsubMessage().getData().size(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); // Call all span start/end methods in the expected order tracer.startPublisherSpan(messageWrapper); @@ -221,7 +221,7 @@ public void testPublishFlowControlSpanFailure() { PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); tracer.startPublisherSpan(messageWrapper); tracer.startPublishFlowControlSpan(messageWrapper); @@ -262,7 +262,7 @@ public void testPublishRpcSpanFailure() { List messageWrappers = Arrays.asList(messageWrapper); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); tracer.startPublisherSpan(messageWrapper); Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); @@ -299,7 +299,7 @@ public void testSubscribeSpansSuccess() { openTelemetryTesting.clearSpans(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); PubsubMessageWrapper publishMessageWrapper = PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); @@ -522,7 +522,7 @@ public void testSubscribeConcurrencyControlSpanFailure() { .build(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); tracer.startSubscribeConcurrencyControlSpan(messageWrapper); @@ -566,7 +566,7 @@ public void testSubscriberSpanFailure() { .build(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); @@ -600,7 +600,7 @@ public void testSubscribeRpcSpanFailures() { List messageWrappers = Arrays.asList(messageWrapper); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - PubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); Span subscribeModackRpcSpan =