Skip to content

Commit

Permalink
Revert "feat: Use MessagingIncubatingAttributes for gcp_pubsub attrib…
Browse files Browse the repository at this point in the history
…ute names"

This reverts commit 305610e.
  • Loading branch information
michaelpri10 committed Sep 30, 2024
1 parent e8dce75 commit acd208c
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 33 deletions.
5 changes: 0 additions & 5 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv-incubating</artifactId>
<version>1.27.0-alpha</version>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;

Expand All @@ -41,8 +40,14 @@ public class OpenTelemetryPubsubTracer {
"subscriber concurrency control";
private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler";

private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
"messaging.gcp_pubsub.message.exactly_once_delivery";
private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
"messaging.gcp_pubsub.message.delivery_attempt";
private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
private static final String PROJECT_ATTR_KEY = "gcp.project_id";
private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish";
Expand Down Expand Up @@ -88,12 +93,9 @@ void startPublisherSpan(PubsubMessageWrapper message) {
createCommonSpanAttributesBuilder(
message.getTopicName(), message.getTopicProject(), "publish", "create");

attributesBuilder.put(
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, message.getDataSize());
attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize());
if (!message.getOrderingKey().isEmpty()) {
attributesBuilder.put(
MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY,
message.getOrderingKey());
attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
}

Span publisherSpan =
Expand Down Expand Up @@ -237,18 +239,14 @@ void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDelive

attributesBuilder
.put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId())
.put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, message.getDataSize())
.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID, message.getAckId())
.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize())
.put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId())
.put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled);
if (!message.getOrderingKey().isEmpty()) {
attributesBuilder.put(
MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY,
message.getOrderingKey());
attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
}
if (message.getDeliveryAttempt() > 0) {
attributesBuilder.put(
MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_DELIVERY_ATTEMPT,
message.getDeliveryAttempt());
attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt());
}
Attributes attributes = attributesBuilder.build();
Context publisherSpanContext = message.extractSpanContext(attributes);
Expand Down Expand Up @@ -382,7 +380,7 @@ Span startSubscribeRpcSpan(
// Ack deadline and receipt modack are specific to the modack operation
if (rpcOperation == "modack") {
attributesBuilder
.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE, ackDeadline)
.put(ACK_DEADLINE_ATTR_KEY, ackDeadline)
.put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing)
return this;
}

/** Sets the instance of OpenTelemetry for the Subscriber class. */
/** Sets the instance of OpenTelemetry for the Publisher class. */
public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -85,10 +84,16 @@ public class OpenTelemetryTest {

private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
private static final String PROJECT_ATTR_KEY = "gcp.project_id";
private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
"messaging.gcp_pubsub.message.exactly_once_delivery";
private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result";
private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
"messaging.gcp_pubsub.message.delivery_attempt";

private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent";

Expand Down Expand Up @@ -190,9 +195,8 @@ public void testPublishSpansSuccess() {
.containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
.containsEntry(SemanticAttributes.CODE_FUNCTION, "publish")
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create")
.containsEntry(
MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, ORDERING_KEY)
.containsEntry(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, messageSize)
.containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
.containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
.containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);

// Check that the message has the attribute containing the trace context.
Expand Down Expand Up @@ -402,7 +406,7 @@ public void testSubscribeSpansSuccess() {
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack")
.containsEntry(
SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size())
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE, 10)
.containsEntry(ACK_DEADLINE_ATTR_KEY, 10)
.containsEntry(RECEIPT_MODACK_ATTR_KEY, true);

// Check span data, links, and attributes for the ack RPC span
Expand Down Expand Up @@ -499,13 +503,10 @@ public void testSubscribeSpansSuccess() {
SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
.containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
.containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse")
.containsEntry(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, messageSize)
.containsEntry(
MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, ORDERING_KEY)
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID, ACK_ID)
.containsEntry(
MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_DELIVERY_ATTEMPT,
DELIVERY_ATTEMPT)
.containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
.containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
.containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID)
.containsEntry(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, DELIVERY_ATTEMPT)
.containsEntry(MESSAGE_EXACTLY_ONCE_ATTR_KEY, EXACTLY_ONCE_ENABLED)
.containsEntry(MESSAGE_RESULT_ATTR_KEY, PROCESS_ACTION)
.containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);
Expand Down

0 comments on commit acd208c

Please sign in to comment.