Skip to content

Commit 305610e

Browse files
committed
feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names
1 parent a08d169 commit 305610e

File tree

5 files changed

+39
-27
lines changed

5 files changed

+39
-27
lines changed

google-cloud-pubsub/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@
112112
<groupId>io.opentelemetry</groupId>
113113
<artifactId>opentelemetry-semconv</artifactId>
114114
</dependency>
115+
<dependency>
116+
<groupId>io.opentelemetry.semconv</groupId>
117+
<artifactId>opentelemetry-semconv-incubating</artifactId>
118+
<version>1.27.0-alpha</version>
119+
</dependency>
115120

116121
<!-- Test dependencies -->
117122
<dependency>

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java

+8-13
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.opentelemetry.api.trace.StatusCode;
2828
import io.opentelemetry.api.trace.Tracer;
2929
import io.opentelemetry.context.Context;
30+
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
3031
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
3132
import java.util.List;
3233

@@ -40,14 +41,8 @@ public class OpenTelemetryPubsubTracer {
4041
"subscriber concurrency control";
4142
private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler";
4243

43-
private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
44-
private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
45-
private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
4644
private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
4745
"messaging.gcp_pubsub.message.exactly_once_delivery";
48-
private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
49-
"messaging.gcp_pubsub.message.delivery_attempt";
50-
private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
5146
private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
5247
private static final String PROJECT_ATTR_KEY = "gcp.project_id";
5348
private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish";
@@ -93,9 +88,9 @@ void startPublisherSpan(PubsubMessageWrapper message) {
9388
createCommonSpanAttributesBuilder(
9489
message.getTopicName(), message.getTopicProject(), "publish", "create");
9590

96-
attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize());
91+
attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, message.getDataSize());
9792
if (!message.getOrderingKey().isEmpty()) {
98-
attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
93+
attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, message.getOrderingKey());
9994
}
10095

10196
Span publisherSpan =
@@ -239,14 +234,14 @@ void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDelive
239234

240235
attributesBuilder
241236
.put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId())
242-
.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize())
243-
.put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId())
237+
.put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, message.getDataSize())
238+
.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID, message.getAckId())
244239
.put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled);
245240
if (!message.getOrderingKey().isEmpty()) {
246-
attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
241+
attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, message.getOrderingKey());
247242
}
248243
if (message.getDeliveryAttempt() > 0) {
249-
attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt());
244+
attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_DELIVERY_ATTEMPT, message.getDeliveryAttempt());
250245
}
251246
Attributes attributes = attributesBuilder.build();
252247
Context publisherSpanContext = message.extractSpanContext(attributes);
@@ -380,7 +375,7 @@ Span startSubscribeRpcSpan(
380375
// Ack deadline and receipt modack are specific to the modack operation
381376
if (rpcOperation == "modack") {
382377
attributesBuilder
383-
.put(ACK_DEADLINE_ATTR_KEY, ackDeadline)
378+
.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE, ackDeadline)
384379
.put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack);
385380
}
386381

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

+9
Original file line numberDiff line numberDiff line change
@@ -927,6 +927,15 @@ public Builder setCompressionBytesThreshold(long compressionBytesThreshold) {
927927
return this;
928928
}
929929

930+
931+
/**
932+
* OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of OpenTelemetry has been provied.
933+
Warning: traces are subject to change. The name and attributes of a span might
934+
change without notice. Only use run traces interactively. Don't use in
935+
automation. Running non-interactive traces can cause problems if the underlying
936+
trace architecture changes without notice.
937+
*/
938+
930939
/** Gives the ability to enable Open Telemetry Tracing */
931940
public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
932941
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -707,13 +707,21 @@ Builder setClock(ApiClock clock) {
707707
return this;
708708
}
709709

710+
/**
711+
* OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of OpenTelemetry has been provied.
712+
Warning: traces are subject to change. The name and attributes of a span might
713+
change without notice. Only use run traces interactively. Don't use in
714+
automation. Running non-interactive traces can cause problems if the underlying
715+
trace architecture changes without notice.
716+
*/
717+
710718
/** Gives the ability to enable Open Telemetry Tracing */
711719
public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
712720
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
713721
return this;
714722
}
715723

716-
/** Sets the instance of OpenTelemetry for the Publisher class. */
724+
/** Sets the instance of OpenTelemetry for the Subscriber class. */
717725
public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
718726
this.openTelemetry = openTelemetry;
719727
return this;

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java

+8-13
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.opentelemetry.sdk.trace.data.LinkData;
3636
import io.opentelemetry.sdk.trace.data.SpanData;
3737
import io.opentelemetry.sdk.trace.data.StatusData;
38+
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
3839
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
3940
import java.util.Arrays;
4041
import java.util.List;
@@ -84,16 +85,10 @@ public class OpenTelemetryTest {
8485

8586
private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
8687
private static final String PROJECT_ATTR_KEY = "gcp.project_id";
87-
private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
88-
private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
89-
private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
9088
private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
91-
private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
9289
private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
9390
"messaging.gcp_pubsub.message.exactly_once_delivery";
9491
private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result";
95-
private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
96-
"messaging.gcp_pubsub.message.delivery_attempt";
9792

9893
private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent";
9994

@@ -195,8 +190,8 @@ public void testPublishSpansSuccess() {
195190
.containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
196191
.containsEntry(SemanticAttributes.CODE_FUNCTION, "publish")
197192
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create")
198-
.containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
199-
.containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
193+
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, ORDERING_KEY)
194+
.containsEntry(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, messageSize)
200195
.containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);
201196

202197
// Check that the message has the attribute containing the trace context.
@@ -406,7 +401,7 @@ public void testSubscribeSpansSuccess() {
406401
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack")
407402
.containsEntry(
408403
SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size())
409-
.containsEntry(ACK_DEADLINE_ATTR_KEY, 10)
404+
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE, 10)
410405
.containsEntry(RECEIPT_MODACK_ATTR_KEY, true);
411406

412407
// Check span data, links, and attributes for the ack RPC span
@@ -503,10 +498,10 @@ public void testSubscribeSpansSuccess() {
503498
SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
504499
.containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
505500
.containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse")
506-
.containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
507-
.containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
508-
.containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID)
509-
.containsEntry(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, DELIVERY_ATTEMPT)
501+
.containsEntry(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, messageSize)
502+
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, ORDERING_KEY)
503+
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID, ACK_ID)
504+
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_DELIVERY_ATTEMPT, DELIVERY_ATTEMPT)
510505
.containsEntry(MESSAGE_EXACTLY_ONCE_ATTR_KEY, EXACTLY_ONCE_ENABLED)
511506
.containsEntry(MESSAGE_RESULT_ATTR_KEY, PROCESS_ACTION)
512507
.containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);

0 commit comments

Comments
 (0)