From 6c5e03cf1d83fb88184641c80c7d1451d52e57e9 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Thu, 29 Aug 2024 00:03:00 +0000 Subject: [PATCH] feat: Changes to OpenTelemetry implementation to add links earlier and prevent methods from being exposed to users --- google-cloud-pubsub/pom.xml | 1 - .../cloud/pubsub/v1/OpenTelemetryUtil.java | 56 +++++++++++-------- .../cloud/pubsub/v1/PubsubMessageWrapper.java | 4 +- .../cloud/pubsub/v1/OpenTelemetryTest.java | 12 ++-- 4 files changed, 40 insertions(+), 33 deletions(-) diff --git a/google-cloud-pubsub/pom.xml b/google-cloud-pubsub/pom.xml index 454076012..b40f4ddde 100644 --- a/google-cloud-pubsub/pom.xml +++ b/google-cloud-pubsub/pom.xml @@ -111,7 +111,6 @@ io.opentelemetry opentelemetry-semconv - 1.26.0-alpha diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java index e6d275935..c6760f78a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java @@ -21,6 +21,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; @@ -36,7 +37,7 @@ public class OpenTelemetryUtil { private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; /** Populates attributes that are common the publisher parent span and publish RPC span. */ - public static final AttributesBuilder createCommonSpanAttributesBuilder( + protected static final AttributesBuilder createCommonSpanAttributesBuilder( String destinationName, String projectName, String codeFunction, String operation) { AttributesBuilder attributesBuilder = Attributes.builder() @@ -55,7 +56,7 @@ public static final AttributesBuilder createCommonSpanAttributesBuilder( * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional * links with the publisher parent span are created for sampled messages in the batch. */ - public static final Span startPublishRpcSpan( + protected static final Span startPublishRpcSpan( Tracer tracer, String topic, List messages, @@ -64,21 +65,24 @@ public static final Span startPublishRpcSpan( TopicName topicName = TopicName.parse(topic); Attributes attributes = createCommonSpanAttributesBuilder( - topicName.getTopic(), topicName.getProject(), "Publisher.publishCall", "publish") + topicName.getTopic(), topicName.getProject(), "publishCall", "publish") .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()) .build(); - Span publishRpcSpan = + SpanBuilder publishRpcSpanBuilder = tracer .spanBuilder(topicName.getTopic() + PUBLISH_RPC_SPAN_SUFFIX) .setSpanKind(SpanKind.CLIENT) - .setAllAttributes(attributes) - .startSpan(); + .setAllAttributes(attributes); + Attributes linkAttributes = + Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build(); + for (PubsubMessageWrapper message : messages) { + if (message.getPublisherSpan().getSpanContext().isSampled()) + publishRpcSpanBuilder.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes); + } + Span publishRpcSpan = publishRpcSpanBuilder.startSpan(); for (PubsubMessageWrapper message : messages) { if (message.getPublisherSpan().getSpanContext().isSampled()) { - Attributes linkAttributes = - Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build(); - publishRpcSpan.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes); message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes); message.addPublishStartEvent(); } @@ -89,7 +93,7 @@ public static final Span startPublishRpcSpan( } /** Ends the given publish RPC span if it exists. */ - public static final void endPublishRpcSpan( + protected static final void endPublishRpcSpan( Span publishRpcSpan, boolean enableOpenTelemetryTracing) { if (enableOpenTelemetryTracing && publishRpcSpan != null) { publishRpcSpan.end(); @@ -100,7 +104,7 @@ public static final void endPublishRpcSpan( * Sets an error status and records an exception when an exception is thrown when publishing the * message batch. */ - public static final void setPublishRpcSpanException( + protected static final void setPublishRpcSpanException( Span publishRpcSpan, Throwable t, boolean enableOpenTelemetryTracing) { if (enableOpenTelemetryTracing && publishRpcSpan != null) { publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC."); @@ -113,7 +117,7 @@ public static final void setPublishRpcSpanException( * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links * to parent subscribe span for sampled messages are added. */ - public static final Span startSubscribeRpcSpan( + protected static final Span startSubscribeRpcSpan( Tracer tracer, String subscription, String rpcOperation, @@ -124,8 +128,8 @@ public static final Span startSubscribeRpcSpan( if (enableOpenTelemetryTracing && tracer != null) { String codeFunction = rpcOperation == "ack" - ? "StreamingSubscriberConnection.sendAckOperations" - : "StreamingSubscriberConnection.sendModAckOperations"; + ? "sendAckOperations" + : "sendModAckOperations"; SubscriptionName subscriptionName = SubscriptionName.parse(subscription); AttributesBuilder attributesBuilder = createCommonSpanAttributesBuilder( @@ -142,20 +146,24 @@ public static final Span startSubscribeRpcSpan( .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack); } - Span rpcSpan = + SpanBuilder rpcSpanBuilder = tracer .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation) .setSpanKind(SpanKind.CLIENT) - .setAllAttributes(attributesBuilder.build()) - .startSpan(); + .setAllAttributes(attributesBuilder.build()); + Attributes linkAttributes = + Attributes.builder() + .put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation) + .build(); + for (PubsubMessageWrapper message : messages) { + if (message.getSubscriberSpan().getSpanContext().isSampled()) { + rpcSpanBuilder.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes); + } + } + Span rpcSpan = rpcSpanBuilder.startSpan(); for (PubsubMessageWrapper message : messages) { if (message.getSubscriberSpan().getSpanContext().isSampled()) { - Attributes linkAttributes = - Attributes.builder() - .put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation) - .build(); - rpcSpan.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes); message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes); switch (rpcOperation) { case "ack": @@ -176,13 +184,13 @@ public static final Span startSubscribeRpcSpan( } /** Ends the given subscribe RPC span if it exists. */ - public static final void endSubscribeRpcSpan(Span rpcSpan, boolean enableOpenTelemetryTracing) { + protected static final void endSubscribeRpcSpan(Span rpcSpan, boolean enableOpenTelemetryTracing) { if (enableOpenTelemetryTracing && rpcSpan != null) { rpcSpan.end(); } } - public static final void setSubscribeRpcSpanException( + protected static final void setSubscribeRpcSpanException( Span rpcSpan, boolean isModack, int ackDeadline, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java index ed35ebb6d..4fa899ad3 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -150,7 +150,7 @@ public void startPublisherSpan(Tracer tracer) { if (enableOpenTelemetryTracing && tracer != null) { AttributesBuilder attributesBuilder = OpenTelemetryUtil.createCommonSpanAttributesBuilder( - topicName.getTopic(), topicName.getProject(), "Publisher.publish", "create"); + topicName.getTopic(), topicName.getProject(), "publish", "create"); attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getData().size()); if (!message.getOrderingKey().isEmpty()) { @@ -266,7 +266,7 @@ public void startSubscriberSpan(Tracer tracer, boolean exactlyOnceDeliveryEnable OpenTelemetryUtil.createCommonSpanAttributesBuilder( subscriptionName.getSubscription(), subscriptionName.getProject(), - "StreamingSubscriberConnection.onResponse", + "onResponse", null); attributesBuilder diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index aeb21cce4..9106f69c4 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -164,7 +164,7 @@ public void testPublishSpansSuccess() { .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic()) .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) - .containsEntry(SemanticAttributes.CODE_FUNCTION, "Publisher.publishCall") + .containsEntry(SemanticAttributes.CODE_FUNCTION, "publishCall") .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "publish") .containsEntry(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messageWrappers.size()); @@ -195,7 +195,7 @@ public void testPublishSpansSuccess() { .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic()) .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) - .containsEntry(SemanticAttributes.CODE_FUNCTION, "Publisher.publish") + .containsEntry(SemanticAttributes.CODE_FUNCTION, "publish") .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create") .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) @@ -460,7 +460,7 @@ public void testSubscribeSpansSuccess() { SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) .containsEntry( - SemanticAttributes.CODE_FUNCTION, "StreamingSubscriberConnection.sendModAckOperations") + SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack") .containsEntry( SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()) @@ -488,7 +488,7 @@ public void testSubscribeSpansSuccess() { SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) .containsEntry( - SemanticAttributes.CODE_FUNCTION, "StreamingSubscriberConnection.sendAckOperations") + SemanticAttributes.CODE_FUNCTION, "sendAckOperations") .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "ack") .containsEntry( SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); @@ -514,7 +514,7 @@ public void testSubscribeSpansSuccess() { SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) .containsEntry( - SemanticAttributes.CODE_FUNCTION, "StreamingSubscriberConnection.sendModAckOperations") + SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "nack") .containsEntry( SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); @@ -562,7 +562,7 @@ public void testSubscribeSpansSuccess() { .containsEntry( SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) - .containsEntry(SemanticAttributes.CODE_FUNCTION, "StreamingSubscriberConnection.onResponse") + .containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse") .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) .containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID)