Skip to content

Commit

Permalink
feat: Changes to OpenTelemetry implementation to add links earlier an…
Browse files Browse the repository at this point in the history
…d prevent methods from being exposed to users
  • Loading branch information
michaelpri10 committed Aug 29, 2024
1 parent 2ab460c commit 6c5e03c
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 33 deletions.
1 change: 0 additions & 1 deletion google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.26.0-alpha</version>
</dependency>

<!-- Test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
Expand All @@ -36,7 +37,7 @@ public class OpenTelemetryUtil {
private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish";

/** Populates attributes that are common the publisher parent span and publish RPC span. */
public static final AttributesBuilder createCommonSpanAttributesBuilder(
protected static final AttributesBuilder createCommonSpanAttributesBuilder(
String destinationName, String projectName, String codeFunction, String operation) {
AttributesBuilder attributesBuilder =
Attributes.builder()
Expand All @@ -55,7 +56,7 @@ public static final AttributesBuilder createCommonSpanAttributesBuilder(
* Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional
* links with the publisher parent span are created for sampled messages in the batch.
*/
public static final Span startPublishRpcSpan(
protected static final Span startPublishRpcSpan(
Tracer tracer,
String topic,
List<PubsubMessageWrapper> messages,
Expand All @@ -64,21 +65,24 @@ public static final Span startPublishRpcSpan(
TopicName topicName = TopicName.parse(topic);
Attributes attributes =
createCommonSpanAttributesBuilder(
topicName.getTopic(), topicName.getProject(), "Publisher.publishCall", "publish")
topicName.getTopic(), topicName.getProject(), "publishCall", "publish")
.put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size())
.build();
Span publishRpcSpan =
SpanBuilder publishRpcSpanBuilder =
tracer
.spanBuilder(topicName.getTopic() + PUBLISH_RPC_SPAN_SUFFIX)
.setSpanKind(SpanKind.CLIENT)
.setAllAttributes(attributes)
.startSpan();
.setAllAttributes(attributes);
Attributes linkAttributes =
Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build();
for (PubsubMessageWrapper message : messages) {
if (message.getPublisherSpan().getSpanContext().isSampled())
publishRpcSpanBuilder.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes);
}
Span publishRpcSpan = publishRpcSpanBuilder.startSpan();

for (PubsubMessageWrapper message : messages) {
if (message.getPublisherSpan().getSpanContext().isSampled()) {
Attributes linkAttributes =
Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build();
publishRpcSpan.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes);
message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes);
message.addPublishStartEvent();
}
Expand All @@ -89,7 +93,7 @@ public static final Span startPublishRpcSpan(
}

/** Ends the given publish RPC span if it exists. */
public static final void endPublishRpcSpan(
protected static final void endPublishRpcSpan(
Span publishRpcSpan, boolean enableOpenTelemetryTracing) {
if (enableOpenTelemetryTracing && publishRpcSpan != null) {
publishRpcSpan.end();
Expand All @@ -100,7 +104,7 @@ public static final void endPublishRpcSpan(
* Sets an error status and records an exception when an exception is thrown when publishing the
* message batch.
*/
public static final void setPublishRpcSpanException(
protected static final void setPublishRpcSpanException(
Span publishRpcSpan, Throwable t, boolean enableOpenTelemetryTracing) {
if (enableOpenTelemetryTracing && publishRpcSpan != null) {
publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC.");
Expand All @@ -113,7 +117,7 @@ public static final void setPublishRpcSpanException(
* Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links
* to parent subscribe span for sampled messages are added.
*/
public static final Span startSubscribeRpcSpan(
protected static final Span startSubscribeRpcSpan(
Tracer tracer,
String subscription,
String rpcOperation,
Expand All @@ -124,8 +128,8 @@ public static final Span startSubscribeRpcSpan(
if (enableOpenTelemetryTracing && tracer != null) {
String codeFunction =
rpcOperation == "ack"
? "StreamingSubscriberConnection.sendAckOperations"
: "StreamingSubscriberConnection.sendModAckOperations";
? "sendAckOperations"
: "sendModAckOperations";
SubscriptionName subscriptionName = SubscriptionName.parse(subscription);
AttributesBuilder attributesBuilder =
createCommonSpanAttributesBuilder(
Expand All @@ -142,20 +146,24 @@ public static final Span startSubscribeRpcSpan(
.put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack);
}

Span rpcSpan =
SpanBuilder rpcSpanBuilder =
tracer
.spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation)
.setSpanKind(SpanKind.CLIENT)
.setAllAttributes(attributesBuilder.build())
.startSpan();
.setAllAttributes(attributesBuilder.build());
Attributes linkAttributes =
Attributes.builder()
.put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation)
.build();
for (PubsubMessageWrapper message : messages) {
if (message.getSubscriberSpan().getSpanContext().isSampled()) {
rpcSpanBuilder.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes);
}
}
Span rpcSpan = rpcSpanBuilder.startSpan();

for (PubsubMessageWrapper message : messages) {
if (message.getSubscriberSpan().getSpanContext().isSampled()) {
Attributes linkAttributes =
Attributes.builder()
.put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation)
.build();
rpcSpan.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes);
message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes);
switch (rpcOperation) {
case "ack":
Expand All @@ -176,13 +184,13 @@ public static final Span startSubscribeRpcSpan(
}

/** Ends the given subscribe RPC span if it exists. */
public static final void endSubscribeRpcSpan(Span rpcSpan, boolean enableOpenTelemetryTracing) {
protected static final void endSubscribeRpcSpan(Span rpcSpan, boolean enableOpenTelemetryTracing) {
if (enableOpenTelemetryTracing && rpcSpan != null) {
rpcSpan.end();
}
}

public static final void setSubscribeRpcSpanException(
protected static final void setSubscribeRpcSpanException(
Span rpcSpan,
boolean isModack,
int ackDeadline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void startPublisherSpan(Tracer tracer) {
if (enableOpenTelemetryTracing && tracer != null) {
AttributesBuilder attributesBuilder =
OpenTelemetryUtil.createCommonSpanAttributesBuilder(
topicName.getTopic(), topicName.getProject(), "Publisher.publish", "create");
topicName.getTopic(), topicName.getProject(), "publish", "create");

attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getData().size());
if (!message.getOrderingKey().isEmpty()) {
Expand Down Expand Up @@ -266,7 +266,7 @@ public void startSubscriberSpan(Tracer tracer, boolean exactlyOnceDeliveryEnable
OpenTelemetryUtil.createCommonSpanAttributesBuilder(
subscriptionName.getSubscription(),
subscriptionName.getProject(),
"StreamingSubscriberConnection.onResponse",
"onResponse",
null);

attributesBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testPublishSpansSuccess() {
.containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
.containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic())
.containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
.containsEntry(SemanticAttributes.CODE_FUNCTION, "Publisher.publishCall")
.containsEntry(SemanticAttributes.CODE_FUNCTION, "publishCall")
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "publish")
.containsEntry(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messageWrappers.size());

Expand Down Expand Up @@ -195,7 +195,7 @@ public void testPublishSpansSuccess() {
.containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
.containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic())
.containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
.containsEntry(SemanticAttributes.CODE_FUNCTION, "Publisher.publish")
.containsEntry(SemanticAttributes.CODE_FUNCTION, "publish")
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create")
.containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
.containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
Expand Down Expand Up @@ -460,7 +460,7 @@ public void testSubscribeSpansSuccess() {
SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
.containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
.containsEntry(
SemanticAttributes.CODE_FUNCTION, "StreamingSubscriberConnection.sendModAckOperations")
SemanticAttributes.CODE_FUNCTION, "sendModAckOperations")
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack")
.containsEntry(
SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size())
Expand Down Expand Up @@ -488,7 +488,7 @@ public void testSubscribeSpansSuccess() {
SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
.containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
.containsEntry(
SemanticAttributes.CODE_FUNCTION, "StreamingSubscriberConnection.sendAckOperations")
SemanticAttributes.CODE_FUNCTION, "sendAckOperations")
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "ack")
.containsEntry(
SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size());
Expand All @@ -514,7 +514,7 @@ public void testSubscribeSpansSuccess() {
SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
.containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
.containsEntry(
SemanticAttributes.CODE_FUNCTION, "StreamingSubscriberConnection.sendModAckOperations")
SemanticAttributes.CODE_FUNCTION, "sendModAckOperations")
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "nack")
.containsEntry(
SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size());
Expand Down Expand Up @@ -562,7 +562,7 @@ public void testSubscribeSpansSuccess() {
.containsEntry(
SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
.containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
.containsEntry(SemanticAttributes.CODE_FUNCTION, "StreamingSubscriberConnection.onResponse")
.containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse")
.containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
.containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
.containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID)
Expand Down

0 comments on commit 6c5e03c

Please sign in to comment.