Skip to content

Commit

Permalink
chore(otel): use newer messaging semantic conventions (#14373)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc authored Jun 26, 2024
1 parent 37a1b7f commit 04cad43
Show file tree
Hide file tree
Showing 19 changed files with 157 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> StartPublishSpan(
{{sc::kMessagingSystem, "gcp_pubsub"},
{sc::kMessagingDestinationName, topic.topic_id()},
{"gcp.project_id", topic.project_id()},
{sc::kMessagingOperation, "create"},
{/*sc::kMessagingOperationType=*/"messaging.operation.type", "create"},
{/*sc::kMessagingMessageEnvelopeSize=*/"messaging.message.envelope.size",
static_cast<std::int64_t>(MessageSize(m))},
{sc::kCodeFunction, "pubsub::BlockingPublisher::Publish"}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> StartPublishSpan(
{{sc::kMessagingSystem, "gcp_pubsub"},
{sc::kMessagingDestinationName, topic.topic_id()},
{"gcp.project_id", topic.project_id()},
{sc::kMessagingOperation, "create"},
{/*sc::kMessagingOperationType=*/"messaging.operation.type", "create"},
{/*sc::kMessagingMessageEnvelopeSize=*/"messaging.message.envelope.size",
static_cast<std::int64_t>(MessageSize(m))},
{sc::kCodeFunction, "pubsub::PublisherConnection::Publish"}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ TEST(PublisherTracingConnectionTest, PublishSpanOnSuccess) {
OTelAttribute<std::int64_t>(/*sc::kMessagingMessageEnvelopeSize=*/
"messaging.message.envelope.size",
45),
OTelAttribute<std::string>(sc::kMessagingOperation, "create"),
OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"create"),
OTelAttribute<std::string>(sc::kMessagingMessageId, "test-id-0"),
OTelAttribute<std::string>(
sc::kCodeFunction,
Expand Down Expand Up @@ -137,7 +139,9 @@ TEST(PublisherTracingConnectionTest, PublishSpanOnError) {
OTelAttribute<std::string>(
"messaging.gcp_pubsub.message.ordering_key",
"ordering-key-0"),
OTelAttribute<std::string>(sc::kMessagingOperation, "create"),
OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"create"),
OTelAttribute<int>("gl-cpp.status_code", kErrorCode),
OTelAttribute<std::int64_t>(/*sc::kMessagingMessageEnvelopeSize=*/
"messaging.message.envelope.size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> StartPullSpan() {
auto span = internal::MakeSpan(
subscription.subscription_id() + " receive",
{{sc::kMessagingSystem, "gcp_pubsub"},
{sc::kMessagingOperation, "receive"},
{/*sc::kMessagingOperationType=*/"messaging.operation.type", "receive"},
{sc::kCodeFunction, "pubsub::SubscriberConnection::Pull"},
{"gcp.project_id", subscription.project_id()},
{sc::kMessagingDestinationName, subscription.subscription_id()}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,12 @@ TEST(SubscriberTracingConnectionTest, PullAttributes) {
SpanHasAttributes(OTelAttribute<std::string>(
sc::kCodeFunction,
"pubsub::SubscriberConnection::Pull")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("test-subscription receive"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kMessagingOperation, "receive")))));
EXPECT_THAT(
spans, Contains(AllOf(
SpanNamed("test-subscription receive"),
SpanHasAttributes(OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"receive")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("test-subscription receive"),
SpanHasAttributes(OTelAttribute<std::string>(
Expand Down
5 changes: 3 additions & 2 deletions google/cloud/pubsub/internal/tracing_batch_callback.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> StartSubscribeSpan(
auto span = internal::MakeSpan(
subscription.subscription_id() + " subscribe",
{{sc::kMessagingSystem, "gcp_pubsub"},
{sc::kMessagingOperation, "subscribe"},
{/*sc::kMessagingOperationType=*/"messaging.operation.type",
"subscribe"},
{"gcp.project_id", subscription.project_id()},
{sc::kMessagingDestinationName, subscription.subscription_id()},
{sc::kMessagingMessageId, m.message_id()},
Expand Down Expand Up @@ -207,7 +208,7 @@ class TracingBatchCallback : public BatchCallback {
auto span = internal::MakeSpan(
subscription_.subscription_id() + " modack",
{{sc::kMessagingSystem, "gcp_pubsub"},
{sc::kMessagingOperation, "extend"},
{/*sc::kMessagingOperationType=*/"messaging.operation.type", "extend"},
{sc::kMessagingBatchMessageCount,
static_cast<int64_t>(request.ack_ids().size())},
{"messaging.gcp_pubsub.message.ack_deadline_seconds",
Expand Down
12 changes: 9 additions & 3 deletions google/cloud/pubsub/internal/tracing_batch_callback_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ TEST(TracingBatchCallback, StartAndEndModackSpanForOneMessage) {
SpanHasAttributes(
OTelAttribute<std::string>(sc::kMessagingSystem, "gcp_pubsub"),
OTelAttribute<std::string>("gcp.project_id", "test-project"),
OTelAttribute<std::string>(sc::kMessagingOperation, "extend"),
OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"extend"),
OTelAttribute<int64_t>(sc::kMessagingBatchMessageCount, 1),
OTelAttribute<int64_t>(
"messaging.gcp_pubsub.message.ack_deadline_seconds", 10),
Expand Down Expand Up @@ -195,7 +197,9 @@ TEST(TracingBatchCallback, StartAndEndModackSpanForMultipleMessages) {
SpanHasAttributes(
OTelAttribute<std::string>(sc::kMessagingSystem, "gcp_pubsub"),
OTelAttribute<std::string>("gcp.project_id", "test-project"),
OTelAttribute<std::string>(sc::kMessagingOperation, "extend"),
OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"extend"),
OTelAttribute<int64_t>(sc::kMessagingBatchMessageCount, 2),
OTelAttribute<int64_t>(
"messaging.gcp_pubsub.message.ack_deadline_seconds", 10),
Expand Down Expand Up @@ -246,7 +250,9 @@ TEST(TracingBatchCallback, SubscribeAttributes) {
SpanHasAttributes(
OTelAttribute<std::string>(sc::kMessagingSystem, "gcp_pubsub"),
OTelAttribute<std::string>("gcp.project_id", "test-project"),
OTelAttribute<std::string>(sc::kMessagingOperation, "subscribe"),
OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"subscribe"),
OTelAttribute<std::string>(sc::kMessagingMessageId, "id-0"),
OTelAttribute<std::string>("messaging.gcp_pubsub.message.ack_id",
"ack-id-0"),
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsub/internal/tracing_batch_sink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ auto MakeParent(Links const& links, Spans const& message_spans,
{{sc::kMessagingBatchMessageCount,
static_cast<std::int64_t>(message_spans.size())},
{sc::kCodeFunction, "BatchSink::AsyncPublish"},
{/*sc::kMessagingOperation=*/
"messaging.operation", "publish"},
{/*sc::kMessagingOperationType=*/
"messaging.operation.type", "publish"},
{sc::kThreadId, internal::CurrentThreadId()},
{sc::kMessagingSystem, "gcp_pubsub"},
{/*sc::kServerAddress=*/"server.address", endpoint},
Expand Down
10 changes: 6 additions & 4 deletions google/cloud/pubsub/internal/tracing_batch_sink_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,12 @@ TEST(TracingBatchSink, PublishSpanHasAttributes) {
SpanNamed("test-topic publish"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kCodeFunction, "BatchSink::AsyncPublish")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("test-topic publish"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kMessagingOperation, "publish")))));
EXPECT_THAT(
spans, Contains(AllOf(
SpanNamed("test-topic publish"),
SpanHasAttributes(OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"publish")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("test-topic publish"),
SpanHasAttributes(OTelAttribute<std::string>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class TracingExactlyOnceAckHandler
{sc::kMessagingDestinationName, sub.subscription_id()},
{"messaging.gcp_pubsub.message.delivery_attempt",
static_cast<int32_t>(delivery_attempt())},
{sc::kMessagingOperation, "settle"}},
{/*sc::kMessagingOperationType=*/"messaging.operation.type",
"settle"}},
std::move(links), options);
auto scope = internal::OTelScope(span);
return internal::EndSpan(std::move(span), child_->ack());
Expand Down Expand Up @@ -104,7 +105,8 @@ class TracingExactlyOnceAckHandler
{sc::kMessagingDestinationName, sub.subscription_id()},
{"messaging.gcp_pubsub.message.delivery_attempt",
static_cast<int32_t>(delivery_attempt())},
{sc::kMessagingOperation, "settle"}},
{/*sc::kMessagingOperationType=*/"messaging.operation.type",
"settle"}},
std::move(links), options);

auto scope = internal::OTelScope(span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ TEST(TracingAckHandlerTest, AckAttributes) {
SpanHasAttributes(
OTelAttribute<std::string>(sc::kMessagingSystem, "gcp_pubsub"),
OTelAttribute<std::string>("gcp.project_id", "test-project"),
OTelAttribute<std::string>(sc::kMessagingOperation, "settle"),
OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"settle"),
OTelAttribute<std::string>(sc::kCodeFunction,
"pubsub::AckHandler::ack"),
OTelAttribute<std::int32_t>(
Expand Down Expand Up @@ -270,7 +272,9 @@ TEST(TracingAckHandlerTest, NackAttributes) {
SpanHasAttributes(
OTelAttribute<std::string>(sc::kMessagingSystem, "gcp_pubsub"),
OTelAttribute<std::string>("gcp.project_id", "test-project"),
OTelAttribute<std::string>(sc::kMessagingOperation, "settle"),
OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"settle"),
OTelAttribute<std::string>(sc::kCodeFunction,
"pubsub::AckHandler::nack"),
OTelAttribute<std::int32_t>(
Expand Down
6 changes: 4 additions & 2 deletions google/cloud/pubsub/internal/tracing_pull_ack_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class TracingPullAckHandler : public pubsub::PullAckHandler::Impl {
TracingAttributes attributes = MakeSharedAttributes(ack_id, subscription);
attributes.emplace_back(
std::make_pair(sc::kCodeFunction, "pubsub::PullAckHandler::ack"));
attributes.emplace_back(std::make_pair(sc::kMessagingOperation, "ack"));
attributes.emplace_back(std::make_pair(
/*sc::kMessagingOperationType=*/"messaging.operation.type", "ack"));
auto span =
internal::MakeSpan(subscription.subscription_id() + " ack", attributes,
CreateLinks(consumer_span_context_), options);
Expand All @@ -92,7 +93,8 @@ class TracingPullAckHandler : public pubsub::PullAckHandler::Impl {
TracingAttributes attributes = MakeSharedAttributes(ack_id, subscription);
attributes.emplace_back(
std::make_pair(sc::kCodeFunction, "pubsub::PullAckHandler::nack"));
attributes.emplace_back(std::make_pair(sc::kMessagingOperation, "nack"));
attributes.emplace_back(std::make_pair(
/*sc::kMessagingOperationType=*/"messaging.operation.type", "nack"));
auto span =
internal::MakeSpan(subscription.subscription_id() + " nack", attributes,
CreateLinks(consumer_span_context_), options);
Expand Down
20 changes: 12 additions & 8 deletions google/cloud/pubsub/internal/tracing_pull_ack_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ TEST(TracingAckHandlerTest, AckAttributes) {
Contains(AllOf(SpanNamed("test-subscription ack"),
SpanHasAttributes(OTelAttribute<std::string>(
"gcp.project_id", "test-project")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("test-subscription ack"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kMessagingOperation, "ack")))));
EXPECT_THAT(
spans, Contains(AllOf(
SpanNamed("test-subscription ack"),
SpanHasAttributes(OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"ack")))));
EXPECT_THAT(
spans,
Contains(AllOf(SpanNamed("test-subscription ack"),
Expand Down Expand Up @@ -182,10 +184,12 @@ TEST(TracingAckHandlerTest, NackAttributes) {
Contains(AllOf(SpanNamed("test-subscription nack"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kMessagingSystem, "gcp_pubsub")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("test-subscription nack"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kMessagingOperation, "nack")))));
EXPECT_THAT(
spans, Contains(AllOf(
SpanNamed("test-subscription nack"),
SpanHasAttributes(OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"nack")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("test-subscription nack"),
SpanHasAttributes(OTelAttribute<std::string>(
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub/internal/tracing_pull_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TracingPullLeaseManagerImpl : public PullLeaseManagerImpl {
auto span = internal::MakeSpan(
subscription_.subscription_id() + " modack",
{{sc::kMessagingSystem, "gcp_pubsub"},
{sc::kMessagingOperation, "modack"},
{/*sc::kMessagingOperationType=*/"messaging.operation.type", "modack"},
{sc::kCodeFunction, "pubsub::PullLeaseManager::ExtendLease"},
{"messaging.gcp_pubsub.message.ack_id", ack_id_},
{"messaging.gcp_pubsub.message.ack_deadline_seconds",
Expand Down
10 changes: 6 additions & 4 deletions google/cloud/pubsub/internal/tracing_pull_lease_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ TEST(TracingPullLeaseManagerImplTest, AsyncModifyAckDeadlineAttributes) {
Contains(AllOf(SpanNamed("test-subscription modack"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kMessagingSystem, "gcp_pubsub")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("test-subscription modack"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kMessagingOperation, "modack")))));
EXPECT_THAT(
spans, Contains(AllOf(
SpanNamed("test-subscription modack"),
SpanHasAttributes(OTelAttribute<std::string>(
/*sc::kMessagingOperationType=*/"messaging.operation.type",
"modack")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("test-subscription modack"),
SpanHasAttributes(OTelAttribute<std::string>(
Expand Down
28 changes: 15 additions & 13 deletions google/cloud/storage/internal/async/reader_connection_tracing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,24 @@ class AsyncReaderConnectionTracing
.then([count = ++count_, span = span_](auto f) -> ReadResponse {
auto r = f.get();
if (absl::holds_alternative<Status>(r)) {
span->AddEvent("gl-cpp.read",
{
{sc::kMessageType, "RECEIVED"},
{sc::kMessageId, count},
{sc::kThreadId, internal::CurrentThreadId()},
});
span->AddEvent(
"gl-cpp.read",
{
{/*sc::kRpcMessageType=*/"rpc.message.type", "RECEIVED"},
{/*sc::kRpcMessageId=*/"rpc.message.id", count},
{sc::kThreadId, internal::CurrentThreadId()},
});
return internal::EndSpan(*span, absl::get<Status>(std::move(r)));
}
auto const& payload = absl::get<storage_experimental::ReadPayload>(r);
span->AddEvent("gl-cpp.read",
{
{sc::kMessageType, "RECEIVED"},
{sc::kMessageId, count},
{sc::kThreadId, internal::CurrentThreadId()},
{"message.starting_offset", payload.offset()},
});
span->AddEvent(
"gl-cpp.read",
{
{/*sc::kRpcMessageType=*/"rpc.message.type", "RECEIVED"},
{/*sc::kRpcMessageId=*/"rpc.message.id", count},
{sc::kThreadId, internal::CurrentThreadId()},
{"message.starting_offset", payload.offset()},
});
return r;
})
.then([oc = opentelemetry::context::RuntimeContext::GetCurrent()](
Expand Down
Loading

0 comments on commit 04cad43

Please sign in to comment.