diff --git a/src/message-queues.ts b/src/message-queues.ts index 5bdc675b5..456142b1f 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -479,7 +479,8 @@ export class AckQueue extends MessageQueue { protected async _sendBatch(batch: QueuedMessages): Promise { const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan( batch.map(b => b.message.tracingSpan), - this._subscriber.name + this._subscriber.name, + 'AckQueue._sendBatch' ); const client = await this._subscriber.getClient(); const ackIds = batch.map(({message}) => message.ackId); @@ -541,7 +542,8 @@ export class ModAckQueue extends MessageQueue { protected async _sendBatch(batch: QueuedMessages): Promise { const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan( batch.map(b => b.message.tracingSpan), - this._subscriber.name + this._subscriber.name, + 'ModAckQueue._sendBatch' ); const client = await this._subscriber.getClient(); const subscription = this._subscriber.name; diff --git a/src/publisher/index.ts b/src/publisher/index.ts index 0d6adcc74..a4ab81fa6 100644 --- a/src/publisher/index.ts +++ b/src/publisher/index.ts @@ -212,7 +212,7 @@ export class Publisher { // Ensure that there's a parent span for subsequent publishes // to hang off of. - this.getParentSpan(message); + this.getParentSpan(message, 'Publisher.publishMessage'); if (!message.orderingKey) { this.queue.add(message, callback!); @@ -333,7 +333,7 @@ export class Publisher { * * @param {PubsubMessage} message The message to create a span for */ - getParentSpan(message: PubsubMessage): Span | undefined { + getParentSpan(message: PubsubMessage, caller: string): Span | undefined { const enabled = tracing.isEnabled(this.settings); if (!enabled) { return undefined; @@ -345,7 +345,8 @@ export class Publisher { const span = tracing.PubsubSpans.createPublisherSpan( message, - this.topic.name + this.topic.name, + caller ); // If the span's context is valid we should inject the propagation trace context. diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index 040ebc67e..8ed396bc8 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -117,7 +117,8 @@ export abstract class MessageQueue extends EventEmitter { const rpcSpan = tracing.PubsubSpans.createPublishRpcSpan( spanMessages, - topic.name + topic.name, + 'MessageQueue._publish' ); const requestCallback = topic.request; diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 01a24d39f..ea24e1702 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -306,7 +306,8 @@ function isSampled(span: Span) { export class PubsubSpans { static createAttributes( params: AttributeParams, - message?: PubsubMessage + message?: PubsubMessage, + caller?: string ): SpanAttributes { const destinationName = params.topicName ?? params.subName; const destinationId = params.topicId ?? params.subId; @@ -329,6 +330,7 @@ export class PubsubSpans { ['messaging.system']: 'gcp_pubsub', ['messaging.destination.name']: destinationId ?? destinationName, ['gcp.project_id']: projectId, + ['code.function']: caller ?? 'unknown', } as SpanAttributes; if (message) { @@ -359,7 +361,8 @@ export class PubsubSpans { static createPublisherSpan( message: PubsubMessage, - topicName: string + topicName: string, + caller: string ): Span | undefined { if (!globallyEnabled) { return undefined; @@ -368,7 +371,7 @@ export class PubsubSpans { const topicInfo = getTopicInfo(topicName); const span: Span = getTracer().startSpan(`${topicName} create`, { kind: SpanKind.PRODUCER, - attributes: PubsubSpans.createAttributes(topicInfo, message), + attributes: PubsubSpans.createAttributes(topicInfo, message, caller), }); if (topicInfo.topicId) { span.updateName(`${topicInfo.topicId} create`); @@ -394,7 +397,8 @@ export class PubsubSpans { static createReceiveSpan( message: PubsubMessage, subName: string, - parent: Context | undefined + parent: Context | undefined, + caller: string ): Span | undefined { if (!globallyEnabled) { return undefined; @@ -402,7 +406,7 @@ export class PubsubSpans { const subInfo = getSubscriptionInfo(subName); const name = `${subInfo.subId ?? subName} subscribe`; - const attributes = this.createAttributes(subInfo, message); + const attributes = this.createAttributes(subInfo, message, caller); if (subInfo.subId) { attributes['messaging.destination.name'] = subInfo.subId; } @@ -459,14 +463,17 @@ export class PubsubSpans { static createPublishRpcSpan( messages: MessageWithAttributes[], - topicName: string + topicName: string, + caller: string ): Span | undefined { if (!globallyEnabled) { return undefined; } const spanAttributes = PubsubSpans.createAttributes( - getTopicInfo(topicName) + getTopicInfo(topicName), + undefined, + caller ); const links: Link[] = messages .filter(m => m.parentSpan && isSampled(m.parentSpan)) @@ -496,14 +503,17 @@ export class PubsubSpans { static createReceiveResponseRpcSpan( messageSpans: (Span | undefined)[], - subName: string + subName: string, + caller: string ): Span | undefined { if (!globallyEnabled) { return undefined; } const spanAttributes = PubsubSpans.createAttributes( - getSubscriptionInfo(subName) + getSubscriptionInfo(subName), + undefined, + caller ); const links: Link[] = messageSpans .filter(m => m && isSampled(m)) @@ -565,6 +575,7 @@ export class PubsubSpans { message: MessageWithAttributes, subName: string, type: 'modack' | 'ack' | 'nack', + caller: string, deadline?: Duration, isInitial?: boolean ): Span | undefined { @@ -572,7 +583,8 @@ export class PubsubSpans { const span = PubsubSpans.createReceiveSpan( message, `${subInfo.subId ?? subInfo.subName} ${type}`, - undefined + undefined, + caller ); if (deadline) { @@ -797,7 +809,12 @@ export function extractSpan( } } - const span = PubsubSpans.createReceiveSpan(message, subName, context); + const span = PubsubSpans.createReceiveSpan( + message, + subName, + context, + 'extractSpan' + ); message.parentSpan = span; return span; } diff --git a/test/publisher/flow-publisher.ts b/test/publisher/flow-publisher.ts index 35cad5567..c4c0ed6d4 100644 --- a/test/publisher/flow-publisher.ts +++ b/test/publisher/flow-publisher.ts @@ -59,7 +59,8 @@ describe('Flow control publisher', () => { data: Buffer.from('foo'), parentSpan: tracing.PubsubSpans.createPublisherSpan( {}, - 'projects/foo/topics/topic' + 'projects/foo/topics/topic', + 'tests' ), }; fcp.publish(message as unknown as PubsubMessage); diff --git a/test/telemetry-tracing.ts b/test/telemetry-tracing.ts index 7baa297d1..d624a6146 100644 --- a/test/telemetry-tracing.ts +++ b/test/telemetry-tracing.ts @@ -81,7 +81,8 @@ describe('OpenTelemetryTracer', () => { const message: PubsubMessage = {}; const span = otel.PubsubSpans.createPublisherSpan( message, - 'projects/test/topics/topicfoo' + 'projects/test/topics/topicfoo', + 'tests' ) as trace.Span; span.end(); @@ -99,7 +100,8 @@ describe('OpenTelemetryTracer', () => { }; const span = otel.PubsubSpans.createPublisherSpan( message, - 'projects/test/topics/topicfoo' + 'projects/test/topics/topicfoo', + 'tests' ) as trace.Span; otel.injectSpan(span, message, otel.OpenTelemetryLevel.Modern); @@ -120,7 +122,8 @@ describe('OpenTelemetryTracer', () => { }; const span = otel.PubsubSpans.createPublisherSpan( message, - 'projects/test/topics/topicfoo' + 'projects/test/topics/topicfoo', + 'tests' ); assert.ok(span); @@ -149,7 +152,8 @@ describe('OpenTelemetryTracer', () => { }; const span = otel.PubsubSpans.createPublisherSpan( message, - 'projects/test/topics/topicfoo' + 'projects/test/topics/topicfoo', + 'tests' ); assert.ok(span); @@ -217,7 +221,11 @@ describe('OpenTelemetryTracer', () => { ackId: 'ackack', }; - const topicAttrs = otel.PubsubSpans.createAttributes(topicInfo, message); + const topicAttrs = otel.PubsubSpans.createAttributes( + topicInfo, + message, + 'tests' + ); assert.deepStrictEqual(topicAttrs, { 'messaging.system': 'gcp_pubsub', 'messaging.destination.name': topicInfo.topicId, @@ -227,6 +235,7 @@ describe('OpenTelemetryTracer', () => { 'messaging.gcp_pubsub.message.exactly_once_delivery': message.isExactlyOnceDelivery, 'messaging.gcp_pubsub.message.ack_id': message.ackId, + 'code.function': 'tests', }); // Check again with no calculated size and other parameters missing. @@ -235,12 +244,17 @@ describe('OpenTelemetryTracer', () => { delete message.isExactlyOnceDelivery; delete message.ackId; - const topicAttrs2 = otel.PubsubSpans.createAttributes(topicInfo, message); + const topicAttrs2 = otel.PubsubSpans.createAttributes( + topicInfo, + message, + 'tests' + ); assert.deepStrictEqual(topicAttrs2, { 'messaging.system': 'gcp_pubsub', 'messaging.destination.name': topicInfo.topicId, 'gcp.project_id': topicInfo.projectId, 'messaging.message.envelope.size': message.data?.length, + 'code.function': 'tests', }); }); }); @@ -270,7 +284,8 @@ describe('OpenTelemetryTracer', () => { it('creates publisher spans', () => { const span = otel.PubsubSpans.createPublisherSpan( tests.message, - tests.topicInfo.topicName! + tests.topicInfo.topicName!, + 'tests' ); assert.ok(span); span.end(); @@ -294,7 +309,8 @@ describe('OpenTelemetryTracer', () => { it('updates publisher topic names', () => { const span = otel.PubsubSpans.createPublisherSpan( tests.message, - tests.topicInfo.topicName! + tests.topicInfo.topicName!, + 'tests' ); assert.ok(span); otel.PubsubSpans.updatePublisherTopicName( @@ -319,13 +335,15 @@ describe('OpenTelemetryTracer', () => { it('creates receive spans', () => { const parentSpan = otel.PubsubSpans.createPublisherSpan( tests.message, - tests.topicInfo.topicName! + tests.topicInfo.topicName!, + 'tests' ); assert.ok(parentSpan); const span = otel.PubsubSpans.createReceiveSpan( tests.message, tests.subInfo.subName!, - otel.spanContextToContext(parentSpan.spanContext()) + otel.spanContextToContext(parentSpan.spanContext()), + 'tests' ); assert.ok(span); span.end(); @@ -350,14 +368,16 @@ describe('OpenTelemetryTracer', () => { const topicName = 'projects/test/topics/topicfoo'; const span = otel.PubsubSpans.createPublisherSpan( message, - topicName + topicName, + 'test' ) as trace.Span; message.parentSpan = span; span.end(); const publishSpan = otel.PubsubSpans.createPublishRpcSpan( [message], - topicName + topicName, + 'test' ); publishSpan?.end();