From 68e429301a99bfa0c8c325294d97345b571abd52 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 17 Jul 2024 17:23:18 -0400 Subject: [PATCH 1/2] feat: track code.function --- src/message-queues.ts | 6 +++-- src/publisher/index.ts | 7 +++--- src/publisher/message-queues.ts | 3 ++- src/telemetry-tracing.ts | 42 +++++++++++++++++++++++--------- test/publisher/flow-publisher.ts | 3 ++- test/telemetry-tracing.ts | 38 +++++++++++++++++++++-------- 6 files changed, 71 insertions(+), 28 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index 5bdc675b5..456142b1f 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -479,7 +479,8 @@ export class AckQueue extends MessageQueue { protected async _sendBatch(batch: QueuedMessages): Promise { const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan( batch.map(b => b.message.tracingSpan), - this._subscriber.name + this._subscriber.name, + 'AckQueue._sendBatch' ); const client = await this._subscriber.getClient(); const ackIds = batch.map(({message}) => message.ackId); @@ -541,7 +542,8 @@ export class ModAckQueue extends MessageQueue { protected async _sendBatch(batch: QueuedMessages): Promise { const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan( batch.map(b => b.message.tracingSpan), - this._subscriber.name + this._subscriber.name, + 'ModAckQueue._sendBatch' ); const client = await this._subscriber.getClient(); const subscription = this._subscriber.name; diff --git a/src/publisher/index.ts b/src/publisher/index.ts index c685af977..82a81ccf4 100644 --- a/src/publisher/index.ts +++ b/src/publisher/index.ts @@ -212,7 +212,7 @@ export class Publisher { // Ensure that there's a parent span for subsequent publishes // to hang off of. - this.getParentSpan(message); + this.getParentSpan(message, 'Publisher.publishMessage'); if (!message.orderingKey) { this.queue.add(message, callback!); @@ -333,7 +333,7 @@ export class Publisher { * * @param {PubsubMessage} message The message to create a span for */ - getParentSpan(message: PubsubMessage): Span | undefined { + getParentSpan(message: PubsubMessage, caller: string): Span | undefined { const enabled = tracing.isEnabled(this.settings); if (!enabled) { return undefined; @@ -345,7 +345,8 @@ export class Publisher { const span = tracing.PubsubSpans.createPublisherSpan( message, - this.topic.name + this.topic.name, + caller ); // If the span's context is valid we should inject the propagation trace context. diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index 040ebc67e..8ed396bc8 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -117,7 +117,8 @@ export abstract class MessageQueue extends EventEmitter { const rpcSpan = tracing.PubsubSpans.createPublishRpcSpan( spanMessages, - topic.name + topic.name, + 'MessageQueue._publish' ); const requestCallback = topic.request; diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 92b1f280c..ace632193 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -276,7 +276,8 @@ export function getTopicInfo(fullName: string): AttributeParams { export class PubsubSpans { static createAttributes( params: AttributeParams, - message?: PubsubMessage + message?: PubsubMessage, + caller?: string ): SpanAttributes { const destinationName = params.topicName ?? params.subName; const destinationId = params.topicId ?? params.subId; @@ -299,6 +300,7 @@ export class PubsubSpans { ['messaging.system']: 'gcp_pubsub', ['messaging.destination.name']: destinationId ?? destinationName, ['gcp.project_id']: projectId, + ['code.function']: caller ?? 'unknown', } as SpanAttributes; if (message) { @@ -327,11 +329,15 @@ export class PubsubSpans { return spanAttributes; } - static createPublisherSpan(message: PubsubMessage, topicName: string): Span { + static createPublisherSpan( + message: PubsubMessage, + topicName: string, + caller: string + ): Span { const topicInfo = getTopicInfo(topicName); const span: Span = getTracer().startSpan(`${topicName} create`, { kind: SpanKind.PRODUCER, - attributes: PubsubSpans.createAttributes(topicInfo, message), + attributes: PubsubSpans.createAttributes(topicInfo, message, caller), }); if (topicInfo.topicId) { span.updateName(`${topicInfo.topicId} create`); @@ -357,11 +363,12 @@ export class PubsubSpans { static createReceiveSpan( message: PubsubMessage, subName: string, - parent: Context | undefined + parent: Context | undefined, + caller: string ): Span { const subInfo = getSubscriptionInfo(subName); const name = `${subInfo.subId ?? subName} subscribe`; - const attributes = this.createAttributes(subInfo, message); + const attributes = this.createAttributes(subInfo, message, caller); if (subInfo.subId) { attributes['messaging.destination.name'] = subInfo.subId; } @@ -414,10 +421,13 @@ export class PubsubSpans { static createPublishRpcSpan( messages: MessageWithAttributes[], - topicName: string + topicName: string, + caller: string ): Span { const spanAttributes = PubsubSpans.createAttributes( - getTopicInfo(topicName) + getTopicInfo(topicName), + undefined, + caller ); const links: Link[] = messages .map(m => ({context: m.parentSpan?.spanContext()}) as Link) @@ -451,10 +461,13 @@ export class PubsubSpans { static createReceiveResponseRpcSpan( messageSpans: (Span | undefined)[], - subName: string + subName: string, + caller: string ): Span { const spanAttributes = PubsubSpans.createAttributes( - getSubscriptionInfo(subName) + getSubscriptionInfo(subName), + undefined, + caller ); const links: Link[] = messageSpans .map(m => ({context: m?.spanContext()}) as Link) @@ -520,6 +533,7 @@ export class PubsubSpans { message: MessageWithAttributes, subName: string, type: 'modack' | 'ack' | 'nack', + caller: string, deadline?: Duration, isInitial?: boolean ): Span | undefined { @@ -527,7 +541,8 @@ export class PubsubSpans { const span = PubsubSpans.createReceiveSpan( message, `${subInfo.subId ?? subInfo.subName} ${type}`, - undefined + undefined, + caller ); if (deadline) { @@ -744,7 +759,12 @@ export function extractSpan( } } - const span = PubsubSpans.createReceiveSpan(message, subName, context); + const span = PubsubSpans.createReceiveSpan( + message, + subName, + context, + 'extractSpan' + ); message.parentSpan = span; return span; } diff --git a/test/publisher/flow-publisher.ts b/test/publisher/flow-publisher.ts index 6493c384c..83af435b7 100644 --- a/test/publisher/flow-publisher.ts +++ b/test/publisher/flow-publisher.ts @@ -56,7 +56,8 @@ describe('Flow control publisher', () => { data: Buffer.from('foo'), parentSpan: tracing.PubsubSpans.createPublisherSpan( {}, - 'projects/foo/topics/topic' + 'projects/foo/topics/topic', + 'tests' ), }; fcp.publish(message as unknown as PubsubMessage); diff --git a/test/telemetry-tracing.ts b/test/telemetry-tracing.ts index 60def061a..41e8c30b9 100644 --- a/test/telemetry-tracing.ts +++ b/test/telemetry-tracing.ts @@ -79,7 +79,8 @@ describe('OpenTelemetryTracer', () => { const message: PubsubMessage = {}; const span = otel.PubsubSpans.createPublisherSpan( message, - 'projects/test/topics/topicfoo' + 'projects/test/topics/topicfoo', + 'tests' ) as trace.Span; span.end(); @@ -97,7 +98,8 @@ describe('OpenTelemetryTracer', () => { }; const span = otel.PubsubSpans.createPublisherSpan( message, - 'projects/test/topics/topicfoo' + 'projects/test/topics/topicfoo', + 'tests' ) as trace.Span; otel.injectSpan(span, message, otel.OpenTelemetryLevel.Modern); @@ -118,7 +120,8 @@ describe('OpenTelemetryTracer', () => { }; const span = otel.PubsubSpans.createPublisherSpan( message, - 'projects/test/topics/topicfoo' + 'projects/test/topics/topicfoo', + 'tests' ); otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy); @@ -146,7 +149,8 @@ describe('OpenTelemetryTracer', () => { }; const span = otel.PubsubSpans.createPublisherSpan( message, - 'projects/test/topics/topicfoo' + 'projects/test/topics/topicfoo', + 'tests' ); const warnSpy = sinon.spy(console, 'warn'); @@ -213,7 +217,11 @@ describe('OpenTelemetryTracer', () => { ackId: 'ackack', }; - const topicAttrs = otel.PubsubSpans.createAttributes(topicInfo, message); + const topicAttrs = otel.PubsubSpans.createAttributes( + topicInfo, + message, + 'tests' + ); assert.deepStrictEqual(topicAttrs, { 'messaging.system': 'gcp_pubsub', 'messaging.destination.name': topicInfo.topicId, @@ -223,6 +231,7 @@ describe('OpenTelemetryTracer', () => { 'messaging.gcp_pubsub.message.exactly_once_delivery': message.isExactlyOnceDelivery, 'messaging.gcp_pubsub.message.ack_id': message.ackId, + 'code.function': 'tests', }); // Check again with no calculated size and other parameters missing. @@ -231,12 +240,17 @@ describe('OpenTelemetryTracer', () => { delete message.isExactlyOnceDelivery; delete message.ackId; - const topicAttrs2 = otel.PubsubSpans.createAttributes(topicInfo, message); + const topicAttrs2 = otel.PubsubSpans.createAttributes( + topicInfo, + message, + 'tests' + ); assert.deepStrictEqual(topicAttrs2, { 'messaging.system': 'gcp_pubsub', 'messaging.destination.name': topicInfo.topicId, 'gcp.project_id': topicInfo.projectId, 'messaging.message.envelope.size': message.data?.length, + 'code.function': 'tests', }); }); }); @@ -266,7 +280,8 @@ describe('OpenTelemetryTracer', () => { it('creates publisher spans', () => { const span = otel.PubsubSpans.createPublisherSpan( tests.message, - tests.topicInfo.topicName! + tests.topicInfo.topicName!, + 'tests' ); span.end(); @@ -289,7 +304,8 @@ describe('OpenTelemetryTracer', () => { it('updates publisher topic names', () => { const span = otel.PubsubSpans.createPublisherSpan( tests.message, - tests.topicInfo.topicName! + tests.topicInfo.topicName!, + 'tests' ); otel.PubsubSpans.updatePublisherTopicName( span, @@ -313,12 +329,14 @@ describe('OpenTelemetryTracer', () => { it('creates receive spans', () => { const parentSpan = otel.PubsubSpans.createPublisherSpan( tests.message, - tests.topicInfo.topicName! + tests.topicInfo.topicName!, + 'tests' ); const span = otel.PubsubSpans.createReceiveSpan( tests.message, tests.subInfo.subName!, - otel.spanContextToContext(parentSpan.spanContext()) + otel.spanContextToContext(parentSpan.spanContext()), + 'tests' ); span.end(); parentSpan.end(); From b3e75249036166a6ebd2363fc25801a0cf260dff Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Fri, 26 Jul 2024 15:08:03 -0400 Subject: [PATCH 2/2] chore: small merge issue --- test/telemetry-tracing.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/telemetry-tracing.ts b/test/telemetry-tracing.ts index d95155c38..d624a6146 100644 --- a/test/telemetry-tracing.ts +++ b/test/telemetry-tracing.ts @@ -368,14 +368,16 @@ describe('OpenTelemetryTracer', () => { const topicName = 'projects/test/topics/topicfoo'; const span = otel.PubsubSpans.createPublisherSpan( message, - topicName + topicName, + 'test' ) as trace.Span; message.parentSpan = span; span.end(); const publishSpan = otel.PubsubSpans.createPublishRpcSpan( [message], - topicName + topicName, + 'test' ); publishSpan?.end();