From 61d7ecd638501b46f244547f7f7a3c8a2a1c9068 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 29 Nov 2023 14:32:33 -0500 Subject: [PATCH 01/15] feat: updates for the new design doc: batching, events, separate RPC span --- src/publisher/message-batch.ts | 27 ++++++- src/publisher/message-queues.ts | 26 +++--- src/publisher/pubsub-message.ts | 8 +- src/telemetry-tracing.ts | 135 +++++++++++++++++++++++++------- 4 files changed, 145 insertions(+), 51 deletions(-) diff --git a/src/publisher/message-batch.ts b/src/publisher/message-batch.ts index 07a6222ce..dcd42f78e 100644 --- a/src/publisher/message-batch.ts +++ b/src/publisher/message-batch.ts @@ -16,6 +16,7 @@ import {BATCH_LIMITS, PubsubMessage, PublishCallback} from './'; import {calculateMessageSize} from './pubsub-message'; +import * as tracing from '../telemetry-tracing'; export interface BatchPublishOptions { maxBytes?: number; @@ -23,6 +24,11 @@ export interface BatchPublishOptions { maxMilliseconds?: number; } +export interface BatchResults { + messages: PubsubMessage[]; + callbacks: PublishCallback[]; +} + /** * @typedef BatchPublishOptions * @property {number} [maxBytes=1 * 1024 * 1024] The maximum number of bytes to @@ -40,13 +46,15 @@ export interface BatchPublishOptions { * @param {BatchPublishOptions} options The batching options. */ export class MessageBatch { - options: BatchPublishOptions; messages: PubsubMessage[]; callbacks: PublishCallback[]; created: number; bytes: number; - constructor(options: BatchPublishOptions) { - this.options = options; + + constructor( + private options: BatchPublishOptions, + private topicName: string + ) { this.messages = []; this.callbacks = []; this.created = Date.now(); @@ -72,7 +80,18 @@ export class MessageBatch { this.messages.push(message); this.callbacks.push(callback); this.bytes += calculateMessageSize(message); + + tracing.PubsubSpans.createPublishSchedulerSpan(message); + } + + end(): BatchResults { + this.messages.forEach(m => m.publishSchedulerSpan?.end()); + return { + messages: this.messages, + callbacks: this.callbacks, + }; } + /** * Indicates if a given message can fit in the batch. * @@ -86,6 +105,7 @@ export class MessageBatch { this.bytes + calculateMessageSize(message) <= maxBytes! ); } + /** * Checks to see if this batch is at the maximum allowed payload size. * When publishing ordered messages, it is ok to exceed the user configured @@ -97,6 +117,7 @@ export class MessageBatch { const {maxMessages, maxBytes} = BATCH_LIMITS; return this.messages.length >= maxMessages! || this.bytes >= maxBytes!; } + /** * Indicates if the batch is at capacity. * diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index d16260419..019d584ac 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -114,12 +114,10 @@ export abstract class MessageQueue extends EventEmitter { }); } - messages.forEach(m => { - const span = tracing.PubsubSpans.createPublishRpcSpan(m, messages.length); - if (span) { - m.rpcSpan = span; - } - }); + const rpcSpan = tracing.PubsubSpans.createPublishRpcSpan( + spanMessages, + topic.name + ); const requestCallback = topic.request; const request = promisify(requestCallback.bind(topic)); @@ -144,7 +142,7 @@ export abstract class MessageQueue extends EventEmitter { messages.forEach(m => { // We're finished with both the RPC and the whole publish operation, // so close out all of the related spans. - m.rpcSpan?.end(); + rpcSpan?.end(); m.parentSpan?.end(); }); } @@ -163,7 +161,7 @@ export class Queue extends MessageQueue { batch: MessageBatch; constructor(publisher: Publisher) { super(publisher); - this.batch = new MessageBatch(this.batchOptions); + this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name); } // This needs to update our existing message batch. @@ -186,8 +184,6 @@ export class Queue extends MessageQueue { this.publish().catch(() => {}); } - message.batchingSpan = tracing.PubsubSpans.createPublishBatchSpan(message); - this.batch.add(message, callback); if (this.batch.isFull()) { @@ -230,17 +226,15 @@ export class Queue extends MessageQueue { * @emits Queue#drain when all messages are sent. */ async _publishInternal(fullyDrain: boolean): Promise { - const {messages, callbacks} = this.batch; + const {messages, callbacks} = this.batch.end(); - this.batch = new MessageBatch(this.batchOptions); + this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name); if (this.pending) { clearTimeout(this.pending); delete this.pending; } - messages.forEach(m => m.batchingSpan?.end()); - await this._publish(messages, callbacks); if (this.batch.messages.length) { // We only do the indefinite go-arounds when we're trying to do a @@ -358,7 +352,7 @@ export class OrderedQueue extends MessageQueue { * @returns {MessageBatch} */ createBatch(): MessageBatch { - return new MessageBatch(this.batchOptions); + return new MessageBatch(this.batchOptions, this.publisher.topic.name); } /** * In the event of a publish failure, we need to cache the error in question @@ -401,7 +395,7 @@ export class OrderedQueue extends MessageQueue { delete this.pending; } - const {messages, callbacks} = this.batches.pop()!; + const {messages, callbacks} = this.batches.pop()!.end(); try { await this._publish(messages, callbacks); diff --git a/src/publisher/pubsub-message.ts b/src/publisher/pubsub-message.ts index d979d6eac..3b05c1f20 100644 --- a/src/publisher/pubsub-message.ts +++ b/src/publisher/pubsub-message.ts @@ -43,18 +43,18 @@ export interface PubsubMessage // don't get to control what these objects are. They come from grpc. /** - * If tracing is enabled, track the batch span. + * If tracing is enabled, track the message span. * * @private */ - batchingSpan?: tracing.Span; + messageSpan?: tracing.Span; /** - * If tracing is enabled, track the RPC send time span. + * If tracing is enabled, track the batching (publish scheduling) period. * * @private */ - rpcSpan?: tracing.Span; + publishSchedulerSpan?: tracing.Span; } /** diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 1181e8dcc..d49e3c74d 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -25,6 +25,7 @@ import { TextMapSetter, ROOT_CONTEXT, Context, + Link, } from '@opentelemetry/api'; import {Attributes, PubsubMessage} from './publisher/pubsub-message'; import {PublishOptions} from './publisher/index'; @@ -223,7 +224,10 @@ export const modernAttributeName = 'googclient_traceparent'; export const legacyAttributeName = 'googclient_OpenTelemetrySpanContext'; export class PubsubSpans { - static createPublisherSpan(message: PubsubMessage, topicName: string): Span { + static createAttributes( + topicName: string, + message?: PubsubMessage + ): SpanAttributes { const spanAttributes = { // Add Opentelemetry semantic convention attributes to the span, based on: // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/trace/semantic_conventions/messaging.md @@ -233,14 +237,26 @@ export class PubsubSpans { [SemanticAttributes.MESSAGING_DESTINATION]: topicName, [SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic', [SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub', - [SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: - message.data?.length, - 'messaging.pubsub.ordering_key': message.orderingKey, } as SpanAttributes; + if (message) { + if (message.data?.length) { + spanAttributes[ + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES + ] = message.data?.length; + } + if (message.orderingKey) { + spanAttributes['messaging.pubsub.ordering_key'] = message.orderingKey; + } + } + + return spanAttributes; + } + + static createPublisherSpan(message: PubsubMessage, topicName: string): Span { const span: Span = getTracer().startSpan(`${topicName} send`, { kind: SpanKind.PRODUCER, - attributes: spanAttributes, + attributes: PubsubSpans.createAttributes(topicName, message), }); return span; @@ -251,11 +267,7 @@ export class PubsubSpans { span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, topicName); } - static createReceiveSpan( - message: MessageWithAttributes, - subName: string, - parent: Context | undefined - ): Span { + static createReceiveSpan(subName: string, parent: Context | undefined): Span { const name = `${subName} receive`; // Mostly we want to keep the context IDs; the attributes and such @@ -277,10 +289,11 @@ export class PubsubSpans { } static createChildSpan( - message: MessageWithAttributes, - name: string + name: string, + message?: MessageWithAttributes, + parentSpan?: Span ): Span | undefined { - const parent = message.parentSpan; + const parent = message?.parentSpan ?? parentSpan; if (parent) { return getTracer().startSpan( name, @@ -296,19 +309,47 @@ export class PubsubSpans { } static createPublishFlowSpan(message: PubsubMessage): Span | undefined { - return PubsubSpans.createChildSpan(message, 'publisher flow control'); + return PubsubSpans.createChildSpan('publisher flow control', message); } - static createPublishBatchSpan(message: PubsubMessage): Span | undefined { - return PubsubSpans.createChildSpan(message, 'publish scheduler'); + static createPublishSchedulerSpan(message: PubsubMessage): Span | undefined { + return PubsubSpans.createChildSpan('publisher scheduler', message); } static createPublishRpcSpan( - message: PubsubMessage, - messageCount: number - ): Span | undefined { - const span = PubsubSpans.createChildSpan(message, 'publish'); - span?.setAttribute('messaging.pubsub.num_messages_in_batch', messageCount); + messages: MessageWithAttributes[], + topicName: string + ): Span { + const spanAttributes = PubsubSpans.createAttributes(topicName); + const links: Link[] = messages + .map(m => ({context: m.parentSpan?.spanContext()}) as Link) + .filter(l => l.context); + const span: Span = getTracer().startSpan( + `${topicName} send`, + { + kind: SpanKind.PRODUCER, + attributes: spanAttributes, + links, + }, + ROOT_CONTEXT + ); + span?.setAttribute( + 'messaging.pubsub.num_messages_in_batch', + messages.length + ); + messages.forEach(m => { + // Workaround until the JS API properly supports adding links later. + if (m.parentSpan) { + m.parentSpan.setAttribute( + 'gcp_pubsub.publish.trace_id', + span.spanContext().traceId + ); + m.parentSpan.setAttribute( + 'gcp_pubsub.publish.span_id', + span.spanContext().spanId + ); + } + }); return span; } @@ -318,7 +359,7 @@ export class PubsubSpans { deadline: Duration, initial: boolean ) { - const span = PubsubSpans.createChildSpan(message, 'modify ack deadline'); + const span = PubsubSpans.createChildSpan('modify ack deadline', message); if (span) { span.setAttributes({ 'messaging.pubsub.modack_deadline_seconds': deadline.totalOf('second'), @@ -331,20 +372,20 @@ export class PubsubSpans { static createReceiveFlowSpan( message: MessageWithAttributes ): Span | undefined { - return PubsubSpans.createChildSpan(message, 'subscriber flow control'); + return PubsubSpans.createChildSpan('subscriber flow control', message); } static createReceiveSchedulerSpan( message: MessageWithAttributes ): Span | undefined { - return PubsubSpans.createChildSpan(message, 'subscribe scheduler'); + return PubsubSpans.createChildSpan('subscribe scheduler', message); } static createReceiveProcessSpan( message: MessageWithAttributes, subName: string ): Span | undefined { - return PubsubSpans.createChildSpan(message, `${subName} process`); + return PubsubSpans.createChildSpan(`${subName} process`, message); } static setReceiveProcessResult(span: Span, isAck: boolean) { @@ -356,7 +397,7 @@ export class PubsubSpans { deadline: Duration, isInitial: boolean ): Span | undefined { - const span = PubsubSpans.createChildSpan(message, 'modify ack deadline'); + const span = PubsubSpans.createChildSpan('modify ack deadline', message); span?.setAttribute( 'messaging.pubsub.modack_deadline_seconds', deadline.totalOf('second') @@ -370,7 +411,45 @@ export class PubsubSpans { isAck: boolean ): Span | undefined { const name = isAck ? 'ack' : 'nack'; - return PubsubSpans.createChildSpan(message, name); + return PubsubSpans.createChildSpan(name, message); + } +} + +/** + * Creates and manipulates Pub/Sub-related events on spans. + */ +export class PubsubEvents { + static addEvent(text: string, message: MessageWithAttributes) { + const parent = message.parentSpan; + if (!parent) { + return; + } + + parent.addEvent(text); + } + + static publishStart(message: MessageWithAttributes) { + PubsubEvents.addEvent('publish start', message); + } + + static publishEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('publish end', message); + } + + static ackStart(message: MessageWithAttributes) { + PubsubEvents.addEvent('ack start', message); + } + + static ackEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('ack end', message); + } + + static modAckStart(message: MessageWithAttributes) { + PubsubEvents.addEvent('modack start', message); + } + + static modAckEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('modack end', message); } } @@ -484,7 +563,7 @@ export function extractSpan( } } - const span = PubsubSpans.createReceiveSpan(message, subName, context); + const span = PubsubSpans.createReceiveSpan(subName, context); message.parentSpan = span; return span; } From 9ee3066713e534e0f9317d70920f059ec8e0d4a7 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 29 Nov 2023 15:12:40 -0500 Subject: [PATCH 02/15] tests: fix / update tests for batch changes --- src/publisher/message-batch.ts | 4 ++-- test/publisher/message-batch.ts | 10 ++++++++-- test/publisher/message-queues.ts | 10 +++++++++- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/publisher/message-batch.ts b/src/publisher/message-batch.ts index dcd42f78e..7baaa8866 100644 --- a/src/publisher/message-batch.ts +++ b/src/publisher/message-batch.ts @@ -52,8 +52,8 @@ export class MessageBatch { bytes: number; constructor( - private options: BatchPublishOptions, - private topicName: string + public options: BatchPublishOptions, + public topicName: string ) { this.messages = []; this.callbacks = []; diff --git a/test/publisher/message-batch.ts b/test/publisher/message-batch.ts index 161756500..f1cfe0b29 100644 --- a/test/publisher/message-batch.ts +++ b/test/publisher/message-batch.ts @@ -33,7 +33,7 @@ describe('MessageBatch', () => { }; beforeEach(() => { - batch = new MessageBatch(Object.assign({}, options)); + batch = new MessageBatch(Object.assign({}, options), 'topicName'); }); afterEach(() => { @@ -57,7 +57,7 @@ describe('MessageBatch', () => { const now = Date.now(); sandbox.stub(Date, 'now').returns(now); - batch = new MessageBatch(options); + batch = new MessageBatch(options, 'topicName'); assert.strictEqual(batch.created, now); }); @@ -198,4 +198,10 @@ describe('MessageBatch', () => { assert.strictEqual(newOptions, batch.options); }); }); + + it('returns data from end()', () => { + const output = batch.end(); + assert.strictEqual(output.messages, batch.messages); + assert.strictEqual(output.callbacks, batch.callbacks); + }); }); diff --git a/test/publisher/message-queues.ts b/test/publisher/message-queues.ts index a74b60f53..f9b92f072 100644 --- a/test/publisher/message-queues.ts +++ b/test/publisher/message-queues.ts @@ -54,11 +54,13 @@ class FakeMessageBatch { messages: p.PubsubMessage[]; options: b.BatchPublishOptions; bytes: number; - constructor(options = {} as b.BatchPublishOptions) { + topicName: string; + constructor(options = {} as b.BatchPublishOptions, topicName = 'topicName') { this.callbacks = []; this.created = Date.now(); this.messages = []; this.options = options; + this.topicName = topicName; this.bytes = 0; } // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -76,6 +78,12 @@ class FakeMessageBatch { setOptions(options: b.BatchPublishOptions) { this.options = options; } + end() { + return { + messages: this.messages, + callbacks: this.callbacks, + }; + } } class FakePublishError { From 94e1e91496c7e8cbacb81ed290d88acb0b0c3fbd Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 29 Nov 2023 15:46:39 -0500 Subject: [PATCH 03/15] feat: change ack/nack/modack to be events --- src/lease-manager.ts | 7 ++-- src/subscriber.ts | 69 ++++++++++++++++++---------------------- src/telemetry-tracing.ts | 56 ++++++++++++++++---------------- test/subscriber.ts | 25 ++++++--------- 4 files changed, 72 insertions(+), 85 deletions(-) diff --git a/src/lease-manager.ts b/src/lease-manager.ts index 61394aa8d..6de85bb12 100644 --- a/src/lease-manager.ts +++ b/src/lease-manager.ts @@ -269,7 +269,8 @@ export class LeaseManager extends EventEmitter { const lifespan = (Date.now() - message.received) / (60 * 1000); if (lifespan < this._options.maxExtensionMinutes!) { - message.subSpans.modAckStart(Duration.from({seconds: deadline}), false); + const deadlineDuration = Duration.from({seconds: deadline}); + message.subSpans.modAckStart(deadlineDuration, false); if (this._subscriber.isExactlyOnceDelivery) { message @@ -281,11 +282,11 @@ export class LeaseManager extends EventEmitter { this.remove(message); }) .finally(() => { - message.subSpans.modAckStop(); + message.subSpans.modAckEnd(); }); } else { message.modAck(deadline); - message.subSpans.modAckStop(); + message.subSpans.modAckStart(deadlineDuration, false); } } else { this.remove(message); diff --git a/src/subscriber.ts b/src/subscriber.ts index a8be9948c..7a65a460f 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -90,23 +90,32 @@ export class SubscriberSpans { } } + ackStart() { + tracing.PubsubEvents.ackStart(this.parent); + } + + ackEnd() { + tracing.PubsubEvents.ackEnd(this.parent); + } + + // Start a leasing nack span if needed. + nackStart() { + tracing.PubsubEvents.nackStart(this.parent); + } + + // End any leasing nack span. + nackEnd() { + tracing.PubsubEvents.nackEnd(this.parent); + } + // Start a leasing modAck span if needed. modAckStart(deadline: Duration, isInitial: boolean) { - if (!this.modAck) { - this.modAck = tracing.PubsubSpans.createModAckSpan( - this.parent, - deadline, - isInitial - ); - } + tracing.PubsubEvents.modAckStart(this.parent, deadline, isInitial); } // End any leasing modAck span. - modAckStop() { - if (this.modAck) { - this.modAck.end(); - this.modAck = undefined; - } + modAckEnd() { + tracing.PubsubEvents.modAckEnd(this.parent); } // Start a scheduler span if needed. @@ -147,7 +156,6 @@ export class SubscriberSpans { } } - private modAck?: tracing.Span; private flow?: tracing.Span; private scheduler?: tracing.Span; private processing?: tracing.Span; @@ -694,10 +702,7 @@ export class Subscriber extends EventEmitter { const ackTimeSeconds = (Date.now() - message.received) / 1000; this.updateAckDeadline(ackTimeSeconds); - const ackSpan = tracing.PubsubSpans.createReceiveResponseSpan( - message, - true - ); + tracing.PubsubEvents.ackStart(message); // Ignore this in this version of the method (but hook catch // to avoid unhandled exceptions). @@ -706,7 +711,7 @@ export class Subscriber extends EventEmitter { await this._acks.onFlush(); - ackSpan?.end(); + tracing.PubsubEvents.ackEnd(message); this._inventory.remove(message); } @@ -723,14 +728,11 @@ export class Subscriber extends EventEmitter { const ackTimeSeconds = (Date.now() - message.received) / 1000; this.updateAckDeadline(ackTimeSeconds); - const ackSpan = tracing.PubsubSpans.createReceiveResponseSpan( - message, - true - ); + tracing.PubsubEvents.ackStart(message); await this._acks.add(message); - ackSpan?.end(); + tracing.PubsubEvents.ackEnd(message); this._inventory.remove(message); @@ -830,15 +832,9 @@ export class Subscriber extends EventEmitter { * @private */ async nack(message: Message): Promise { - const ackSpan = tracing.PubsubSpans.createReceiveResponseSpan( - message, - false - ); - + message.subSpans.nackStart(); await this.modAck(message, 0); - - ackSpan?.end(); - + message.subSpans.nackEnd(); this._inventory.remove(message); } @@ -852,12 +848,9 @@ export class Subscriber extends EventEmitter { * @private */ async nackWithResponse(message: Message): Promise { - const ackSpan = tracing.PubsubSpans.createReceiveResponseSpan( - message, - false - ); + message.subSpans.nackStart(); const response = await this.modAckWithResponse(message, 0); - ackSpan?.end(); + message.subSpans.nackEnd(); return response; } @@ -1000,7 +993,7 @@ export class Subscriber extends EventEmitter { this._discardMessage(message); }) .finally(() => { - message.subSpans.modAckStop(); + message.subSpans.modAckEnd(); }); } else { message.subSpans.modAckStart( @@ -1008,7 +1001,7 @@ export class Subscriber extends EventEmitter { true ); message.modAck(this.ackDeadline); - message.subSpans.modAckStop(); + message.subSpans.modAckEnd(); this._inventory.add(message); } } else { diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index d49e3c74d..55ff21c99 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -354,21 +354,6 @@ export class PubsubSpans { return span; } - static createModAckSpan( - message: MessageWithAttributes, - deadline: Duration, - initial: boolean - ) { - const span = PubsubSpans.createChildSpan('modify ack deadline', message); - if (span) { - span.setAttributes({ - 'messaging.pubsub.modack_deadline_seconds': deadline.totalOf('second'), - 'messaging.pubsub.is_receipt_modack': initial ? 'true' : 'false', - } as unknown as Attributes); - } - return span; - } - static createReceiveFlowSpan( message: MessageWithAttributes ): Span | undefined { @@ -405,27 +390,23 @@ export class PubsubSpans { span?.setAttribute('messaging.pubsub.is_receipt_modack', isInitial); return span; } - - static createReceiveResponseSpan( - message: MessageWithAttributes, - isAck: boolean - ): Span | undefined { - const name = isAck ? 'ack' : 'nack'; - return PubsubSpans.createChildSpan(name, message); - } } /** * Creates and manipulates Pub/Sub-related events on spans. */ export class PubsubEvents { - static addEvent(text: string, message: MessageWithAttributes) { + static addEvent( + text: string, + message: MessageWithAttributes, + attributes?: Attributes + ): void { const parent = message.parentSpan; if (!parent) { return; } - parent.addEvent(text); + parent.addEvent(text, attributes); } static publishStart(message: MessageWithAttributes) { @@ -437,19 +418,36 @@ export class PubsubEvents { } static ackStart(message: MessageWithAttributes) { - PubsubEvents.addEvent('ack start', message); + PubsubEvents.addEvent('ack', message); } static ackEnd(message: MessageWithAttributes) { PubsubEvents.addEvent('ack end', message); } - static modAckStart(message: MessageWithAttributes) { - PubsubEvents.addEvent('modack start', message); + static nackStart(message: MessageWithAttributes) { + PubsubEvents.addEvent('nack start', message); + } + + static nackEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('nack end', message); + } + + static modAckStart( + message: MessageWithAttributes, + deadline: Duration, + isInitial: boolean + ) { + PubsubEvents.addEvent('modify ack deadline start', message, { + 'messaging.pubsub.modack_deadline_seconds': `${deadline.totalOf( + 'second' + )}`, + 'messaging.pubsub.is_receipt_modack': isInitial ? 'true' : 'false', + }); } static modAckEnd(message: MessageWithAttributes) { - PubsubEvents.addEvent('modack end', message); + PubsubEvents.addEvent('modify ack deadline end', message); } } diff --git a/test/subscriber.ts b/test/subscriber.ts index 8ef946699..f02eb482f 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -927,7 +927,8 @@ describe('Subscriber', () => { message.endParentSpan(); const spans = exporter.getFinishedSpans(); - assert.strictEqual(spans.length, 2); + assert.strictEqual(spans.length, 1); + assert.strictEqual(spans[0].events.length, 2); const firstSpan = spans.pop(); assert.ok(firstSpan); assert.strictEqual(firstSpan.parentSpanId, parentSpanContext.spanId); @@ -961,7 +962,7 @@ describe('Subscriber', () => { stream.emit('data', pullResponse); message.endParentSpan(); - assert.strictEqual(exporter.getFinishedSpans().length, 2); + assert.strictEqual(exporter.getFinishedSpans().length, 1); }); }); @@ -1206,26 +1207,20 @@ describe('Subscriber', () => { assert.strictEqual(spy.calledOnce, true); }); - it('starts a modAck span', () => { - const stub = sandbox - .stub(tracing.PubsubSpans, 'createModAckSpan') - .returns(fakeSpan); + it('fires a modAck start event', () => { + const stub = sandbox.stub(tracing.PubsubEvents, 'modAckStart'); spans.modAckStart(Duration.from({seconds: 10}), true); assert.strictEqual(stub.args[0][0], message); assert.strictEqual(stub.args[0][1].totalOf('second'), 10); assert.strictEqual(stub.args[0][2], true); - spans.modAckStart(Duration.from({seconds: 20}), false); assert.strictEqual(stub.calledOnce, true); }); - it('ends a modAck span', () => { - sandbox.stub(tracing.PubsubSpans, 'createModAckSpan').returns(fakeSpan); - spans.modAckStart(Duration.from({seconds: 10}), true); - const spy = sandbox.spy(fakeSpan, 'end'); - spans.modAckStop(); - assert.strictEqual(spy.calledOnce, true); - spans.modAckStop(); - assert.strictEqual(spy.calledOnce, true); + it('fires a modAck end event', () => { + const stub = sandbox.stub(tracing.PubsubEvents, 'modAckEnd'); + spans.modAckEnd(); + assert.strictEqual(stub.args[0][0], message); + assert.strictEqual(stub.calledOnce, true); }); it('starts a scheduler span', () => { From aac3edd4b6d90a7844dd6481854e5213ae76c7fd Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 29 Nov 2023 16:37:38 -0500 Subject: [PATCH 04/15] fix: fixes for previous commits --- src/telemetry-tracing.ts | 2 +- src/topic.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 55ff21c99..974aa2ba0 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -418,7 +418,7 @@ export class PubsubEvents { } static ackStart(message: MessageWithAttributes) { - PubsubEvents.addEvent('ack', message); + PubsubEvents.addEvent('ack start', message); } static ackEnd(message: MessageWithAttributes) { diff --git a/src/topic.ts b/src/topic.ts index 5109985db..e0a5d03b6 100644 --- a/src/topic.ts +++ b/src/topic.ts @@ -124,7 +124,6 @@ export class Topic { * @type {string} */ this.id_ = name; - this.publisher = new Publisher(this, options); /** * The parent {@link PubSub} instance of this topic instance. * @name Topic#pubsub @@ -136,6 +135,7 @@ export class Topic { * @type {PubSub} */ this.parent = this.pubsub = pubsub; + this.publisher = new Publisher(this, options); this.request = pubsub.request.bind(pubsub); /** * [IAM (Identity and Access From 0d64d9065a856210e3676fa0e91f1e8d631854a8 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Thu, 30 Nov 2023 17:01:00 -0500 Subject: [PATCH 05/15] fix: move span closing to subscriber, upon (n)ack --- src/lease-manager.ts | 13 +++++-------- src/subscriber.ts | 8 +++++++- test/lease-manager.ts | 18 ------------------ test/subscriber.ts | 4 +++- 4 files changed, 15 insertions(+), 28 deletions(-) diff --git a/src/lease-manager.ts b/src/lease-manager.ts index 6de85bb12..f8903c54a 100644 --- a/src/lease-manager.ts +++ b/src/lease-manager.ts @@ -123,16 +123,14 @@ export class LeaseManager extends EventEmitter { } } /** - * Removes ALL messages from inventory. + * Removes ALL messages from inventory, and returns the ones removed. * @private */ - clear(): void { + clear(): Message[] { const wasFull = this.isFull(); this._pending = []; - this._messages.forEach(m => { - m.endParentSpan(); - }); + const remaining = Array.from(this._messages); this._messages.clear(); this.bytes = 0; @@ -141,6 +139,8 @@ export class LeaseManager extends EventEmitter { } this._cancelExtension(); + + return remaining; } /** * Indicates if we're at or over capacity. @@ -162,9 +162,6 @@ export class LeaseManager extends EventEmitter { * @private */ remove(message: Message): void { - // The subscriber span ends when it leaves leasing. - message.endParentSpan(); - if (!this._messages.has(message)) { return; } diff --git a/src/subscriber.ts b/src/subscriber.ts index 7a65a460f..28d6cd276 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -712,6 +712,7 @@ export class Subscriber extends EventEmitter { await this._acks.onFlush(); tracing.PubsubEvents.ackEnd(message); + message.endParentSpan(); this._inventory.remove(message); } @@ -733,6 +734,7 @@ export class Subscriber extends EventEmitter { await this._acks.add(message); tracing.PubsubEvents.ackEnd(message); + message.endParentSpan(); this._inventory.remove(message); @@ -754,10 +756,12 @@ export class Subscriber extends EventEmitter { this.isOpen = false; this._stream.destroy(); - this._inventory.clear(); + const remaining = this._inventory.clear(); await this._waitForFlush(); + remaining.forEach(m => m.endParentSpan()); + this.emit('close'); this._acks.close(); @@ -835,6 +839,7 @@ export class Subscriber extends EventEmitter { message.subSpans.nackStart(); await this.modAck(message, 0); message.subSpans.nackEnd(); + message.endParentSpan(); this._inventory.remove(message); } @@ -851,6 +856,7 @@ export class Subscriber extends EventEmitter { message.subSpans.nackStart(); const response = await this.modAckWithResponse(message, 0); message.subSpans.nackEnd(); + message.endParentSpan(); return response; } diff --git a/test/lease-manager.ts b/test/lease-manager.ts index 503129446..ca26a4be0 100644 --- a/test/lease-manager.ts +++ b/test/lease-manager.ts @@ -388,14 +388,6 @@ describe('LeaseManager', () => { assert.strictEqual(leaseManager.size, 0); }); - it('should end all parent spans', () => { - const messages = [new FakeMessage(), new FakeMessage()]; - const spies = messages.map(m => sandbox.spy(m, 'endParentSpan')); - messages.forEach(m => leaseManager.add(m as {} as Message)); - leaseManager.clear(); - spies.forEach(s => assert.strictEqual(s.calledOnce, true)); - }); - it('should emit the free event if it was full', done => { leaseManager.setOptions({maxMessages: 1}); leaseManager.add(new FakeMessage() as {} as Message); @@ -452,16 +444,6 @@ describe('LeaseManager', () => { }); describe('remove', () => { - it('should end the span', () => { - const message = new FakeMessage(); - const spy = sandbox.spy(message, 'endParentSpan'); - - leaseManager.add(message as {} as Message); - leaseManager.remove(message as {} as Message); - - assert.strictEqual(spy.calledOnce, true); - }); - it('should noop for unknown messages', () => { const message = new FakeMessage(); diff --git a/test/subscriber.ts b/test/subscriber.ts index f02eb482f..c4e211d5f 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -93,7 +93,9 @@ class FakeLeaseManager extends EventEmitter { } // eslint-disable-next-line @typescript-eslint/no-unused-vars add(message: s.Message): void {} - clear(): void {} + clear(): s.Message[] { + return []; + } // eslint-disable-next-line @typescript-eslint/no-unused-vars remove(message: s.Message): void {} } From 97d6bef37ac1ed503e2ebf1da3b1ae001b587c77 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Thu, 30 Nov 2023 17:09:15 -0500 Subject: [PATCH 06/15] docs: update custom gcp attributes --- src/telemetry-tracing.ts | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 974aa2ba0..96dda481f 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -246,7 +246,8 @@ export class PubsubSpans { ] = message.data?.length; } if (message.orderingKey) { - spanAttributes['messaging.pubsub.ordering_key'] = message.orderingKey; + spanAttributes['messaging.gcp.pubsub.ordering_key'] = + message.orderingKey; } } @@ -334,18 +335,18 @@ export class PubsubSpans { ROOT_CONTEXT ); span?.setAttribute( - 'messaging.pubsub.num_messages_in_batch', + 'messaging.gcp.pubsub.num_messages_in_batch', messages.length ); messages.forEach(m => { // Workaround until the JS API properly supports adding links later. if (m.parentSpan) { m.parentSpan.setAttribute( - 'gcp_pubsub.publish.trace_id', + 'messaging.gcp.pubsub.publish.trace_id', span.spanContext().traceId ); m.parentSpan.setAttribute( - 'gcp_pubsub.publish.span_id', + 'messaging.gcp.pubsub.publish.span_id', span.spanContext().spanId ); } @@ -374,7 +375,7 @@ export class PubsubSpans { } static setReceiveProcessResult(span: Span, isAck: boolean) { - span.setAttribute('messaging.pubsub.result', isAck ? 'ack' : 'nack'); + span.setAttribute('messaging.gcp.pubsub.result', isAck ? 'ack' : 'nack'); } static createReceiveLeaseSpan( @@ -384,10 +385,10 @@ export class PubsubSpans { ): Span | undefined { const span = PubsubSpans.createChildSpan('modify ack deadline', message); span?.setAttribute( - 'messaging.pubsub.modack_deadline_seconds', + 'messaging.gcp.pubsub.modack_deadline_seconds', deadline.totalOf('second') ); - span?.setAttribute('messaging.pubsub.is_receipt_modack', isInitial); + span?.setAttribute('messaging.gcp.pubsub.is_receipt_modack', isInitial); return span; } } @@ -439,10 +440,10 @@ export class PubsubEvents { isInitial: boolean ) { PubsubEvents.addEvent('modify ack deadline start', message, { - 'messaging.pubsub.modack_deadline_seconds': `${deadline.totalOf( + 'messaging.gcp.pubsub.modack_deadline_seconds': `${deadline.totalOf( 'second' )}`, - 'messaging.pubsub.is_receipt_modack': isInitial ? 'true' : 'false', + 'messaging.gcp.pubsub.is_receipt_modack': isInitial ? 'true' : 'false', }); } From 5b9a95fd08a3308037c95e49112d22cbcab1c1af Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Thu, 30 Nov 2023 17:27:33 -0500 Subject: [PATCH 07/15] docs: update subscriber comments --- src/subscriber.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/subscriber.ts b/src/subscriber.ts index 28d6cd276..997b71def 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -90,30 +90,32 @@ export class SubscriberSpans { } } + // Emit an event for starting to send an ack. ackStart() { tracing.PubsubEvents.ackStart(this.parent); } + // Emit an event for the ack having been sent. ackEnd() { tracing.PubsubEvents.ackEnd(this.parent); } - // Start a leasing nack span if needed. + // Emit an event for starting to send a nack. nackStart() { tracing.PubsubEvents.nackStart(this.parent); } - // End any leasing nack span. + // Emit an event for the nack having been sent. nackEnd() { tracing.PubsubEvents.nackEnd(this.parent); } - // Start a leasing modAck span if needed. + // Emit an event for starting to send a modAck. modAckStart(deadline: Duration, isInitial: boolean) { tracing.PubsubEvents.modAckStart(this.parent, deadline, isInitial); } - // End any leasing modAck span. + // Emit an event for the modAck having been sent. modAckEnd() { tracing.PubsubEvents.modAckEnd(this.parent); } @@ -129,7 +131,7 @@ export class SubscriberSpans { } } - // End any schedular span. + // End any scheduler span. schedulerEnd() { if (this.scheduler) { this.scheduler.end(); From c54efc6b13e2265de3fa4e910ebe58e3b2b1bd77 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Thu, 30 Nov 2023 17:59:22 -0500 Subject: [PATCH 08/15] tests: add unit tests for shutdown events --- test/subscriber.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/test/subscriber.ts b/test/subscriber.ts index c4e211d5f..f5e1f33c9 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -549,12 +549,15 @@ describe('Subscriber', () => { assert.strictEqual(stub.callCount, 1); }); - it('should clear the inventory', () => { + it('should clear the inventory', async () => { + const message = new Message(subscriber, RECEIVED_MESSAGE); + const shutdownStub = sandbox.stub(tracing.PubsubEvents, 'shutdown'); const inventory: FakeLeaseManager = stubs.get('inventory'); - const stub = sandbox.stub(inventory, 'clear'); + const stub = sandbox.stub(inventory, 'clear').returns([message]); - subscriber.close(); + await subscriber.close(); assert.strictEqual(stub.callCount, 1); + assert.strictEqual(shutdownStub.callCount, 1); }); it('should emit a close event', done => { @@ -565,6 +568,7 @@ describe('Subscriber', () => { it('should nack any messages that come in after', () => { const stream: FakeMessageStream = stubs.get('messageStream'); const stub = sandbox.stub(subscriber, 'nack'); + const shutdownStub = sandbox.stub(tracing.PubsubEvents, 'shutdown'); const pullResponse = {receivedMessages: [RECEIVED_MESSAGE]}; subscriber.close(); @@ -572,6 +576,7 @@ describe('Subscriber', () => { const [{ackId}] = stub.lastCall.args; assert.strictEqual(ackId, RECEIVED_MESSAGE.ackId); + assert.strictEqual(shutdownStub.callCount, 1); }); describe('flushing the queues', () => { From fb42fa4f1e8e2ef92613d7075ae49105a7a835df Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Thu, 30 Nov 2023 18:02:04 -0500 Subject: [PATCH 09/15] fix: missed commits from previous --- src/subscriber.ts | 11 ++++++++++- src/telemetry-tracing.ts | 6 ++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/subscriber.ts b/src/subscriber.ts index 997b71def..58094b0f2 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -158,6 +158,11 @@ export class SubscriberSpans { } } + // If we shut down before processing can finish. + shutdown() { + tracing.PubsubEvents.shutdown(this.parent); + } + private flow?: tracing.Span; private scheduler?: tracing.Span; private processing?: tracing.Span; @@ -762,7 +767,10 @@ export class Subscriber extends EventEmitter { await this._waitForFlush(); - remaining.forEach(m => m.endParentSpan()); + remaining.forEach(m => { + m.subSpans.shutdown(); + m.endParentSpan(); + }); this.emit('close'); @@ -1013,6 +1021,7 @@ export class Subscriber extends EventEmitter { this._inventory.add(message); } } else { + message.subSpans.shutdown(); message.nack(); } } diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 96dda481f..46467094c 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -450,6 +450,12 @@ export class PubsubEvents { static modAckEnd(message: MessageWithAttributes) { PubsubEvents.addEvent('modify ack deadline end', message); } + + // Add this event any time the process is shut down before processing + // of the message can complete. + static shutdown(message: MessageWithAttributes) { + PubsubEvents.addEvent('shutdown', message); + } } /** From d70c94ba68e0c40a39f1741a1f5ac5564a3d4d2c Mon Sep 17 00:00:00 2001 From: Megan Potter <57276408+feywind@users.noreply.github.com> Date: Mon, 4 Dec 2023 15:57:35 -0500 Subject: [PATCH 10/15] fix: update span attributes for ordering key Co-authored-by: Anna Levenberg --- src/telemetry-tracing.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 46467094c..a7c0d2a16 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -246,7 +246,7 @@ export class PubsubSpans { ] = message.data?.length; } if (message.orderingKey) { - spanAttributes['messaging.gcp.pubsub.ordering_key'] = + spanAttributes['messaging.gcp_pubsub.message.ordering_key'] = message.orderingKey; } } From 1a33c99127202146baaf42b8f03c9f13d873c9db Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Tue, 5 Dec 2023 16:53:40 -0500 Subject: [PATCH 11/15] fix: update gcp.pubsub to gcp_pubsub --- src/telemetry-tracing.ts | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index a7c0d2a16..f374d91bc 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -334,19 +334,16 @@ export class PubsubSpans { }, ROOT_CONTEXT ); - span?.setAttribute( - 'messaging.gcp.pubsub.num_messages_in_batch', - messages.length - ); + span?.setAttribute('messaging.batch.message_count', messages.length); messages.forEach(m => { // Workaround until the JS API properly supports adding links later. if (m.parentSpan) { m.parentSpan.setAttribute( - 'messaging.gcp.pubsub.publish.trace_id', + 'messaging.gcp_pubsub.publish.trace_id', span.spanContext().traceId ); m.parentSpan.setAttribute( - 'messaging.gcp.pubsub.publish.span_id', + 'messaging.gcp_pubsub.publish.span_id', span.spanContext().spanId ); } @@ -375,7 +372,7 @@ export class PubsubSpans { } static setReceiveProcessResult(span: Span, isAck: boolean) { - span.setAttribute('messaging.gcp.pubsub.result', isAck ? 'ack' : 'nack'); + span.setAttribute('messaging.gcp_pubsub.result', isAck ? 'ack' : 'nack'); } static createReceiveLeaseSpan( @@ -385,10 +382,10 @@ export class PubsubSpans { ): Span | undefined { const span = PubsubSpans.createChildSpan('modify ack deadline', message); span?.setAttribute( - 'messaging.gcp.pubsub.modack_deadline_seconds', + 'messaging.gcp_pubsub.modack_deadline_seconds', deadline.totalOf('second') ); - span?.setAttribute('messaging.gcp.pubsub.is_receipt_modack', isInitial); + span?.setAttribute('messaging.gcp_pubsub.is_receipt_modack', isInitial); return span; } } @@ -440,10 +437,10 @@ export class PubsubEvents { isInitial: boolean ) { PubsubEvents.addEvent('modify ack deadline start', message, { - 'messaging.gcp.pubsub.modack_deadline_seconds': `${deadline.totalOf( + 'messaging.gcp_pubsub.modack_deadline_seconds': `${deadline.totalOf( 'second' )}`, - 'messaging.gcp.pubsub.is_receipt_modack': isInitial ? 'true' : 'false', + 'messaging.gcp_pubsub.is_receipt_modack': isInitial ? 'true' : 'false', }); } From 579d2c9083faadb105d6e53fe1c47ebaf96fc0e1 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Tue, 5 Dec 2023 17:25:46 -0500 Subject: [PATCH 12/15] feat: add back in receive response spans --- src/message-queues.ts | 36 ++++++++++++++++++++------ src/telemetry-tracing.ts | 56 ++++++++++++++++++++++++++++++++++++---- test/message-queues.ts | 4 +-- 3 files changed, 81 insertions(+), 15 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index e40be2265..5eb892f31 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -33,12 +33,18 @@ import { import {Duration} from './temporal'; import {addToBucket} from './util'; import {DebugMessage} from './debug'; +import * as tracing from './telemetry-tracing'; + +export interface ReducedMessage { + ackId: string; + tracingSpan?: tracing.Span; +} /** * @private */ export interface QueuedMessage { - ackId: string; + message: ReducedMessage; deadline?: number; responsePromise?: defer.DeferredPromise; retryCount: number; @@ -179,7 +185,7 @@ export abstract class MessageQueue { * @param {number} [deadline] The deadline. * @private */ - add({ackId}: Message, deadline?: number): Promise { + add(message: Message, deadline?: number): Promise { if (this._closed) { if (this._subscriber.isExactlyOnceDelivery) { throw new AckError(AckResponses.Invalid, 'Subscriber closed'); @@ -192,7 +198,10 @@ export abstract class MessageQueue { const responsePromise = defer(); this._requests.push({ - ackId, + message: { + ackId: message.ackId, + tracingSpan: message.parentSpan, + }, deadline, responsePromise, retryCount: 0, @@ -379,9 +388,9 @@ export abstract class MessageQueue { const codes: AckErrorCodes = processAckErrorInfo(rpcError); for (const m of batch) { - if (codes.has(m.ackId)) { + if (codes.has(m.message.ackId)) { // This ack has an ErrorInfo entry, so use that to route it. - const code = codes.get(m.ackId)!; + const code = codes.get(m.message.ackId)!; if (code.transient) { // Transient errors get retried. toRetry.push(m); @@ -407,7 +416,7 @@ export abstract class MessageQueue { // stream message if an unknown error happens during ack. const others = toError.get(AckResponses.Other); if (others?.length) { - const otherIds = others.map(e => e.ackId); + const otherIds = others.map(e => e.message.ackId); const debugMsg = new BatchError(rpcError, otherIds, operation); this._subscriber.emit('debug', debugMsg); } @@ -468,8 +477,12 @@ export class AckQueue extends MessageQueue { * @return {Promise} */ protected async _sendBatch(batch: QueuedMessages): Promise { + const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan( + batch.map(b => b.message.tracingSpan), + this._subscriber.name + ); const client = await this._subscriber.getClient(); - const ackIds = batch.map(({ackId}) => ackId); + const ackIds = batch.map(({message}) => message.ackId); const reqOpts = {subscription: this._subscriber.name, ackIds}; try { @@ -477,6 +490,7 @@ export class AckQueue extends MessageQueue { // It's okay if these pass through since they're successful anyway. this.handleAckSuccesses(batch); + responseSpan?.end(); return []; } catch (e) { // If exactly-once delivery isn't enabled, don't do error processing. We'll @@ -500,6 +514,7 @@ export class AckQueue extends MessageQueue { batch.forEach(m => { m.responsePromise?.reject(exc); }); + responseSpan?.end(); return []; } } @@ -524,6 +539,10 @@ export class ModAckQueue extends MessageQueue { * @return {Promise} */ protected async _sendBatch(batch: QueuedMessages): Promise { + const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan( + batch.map(b => b.message.tracingSpan), + this._subscriber.name + ); const client = await this._subscriber.getClient(); const subscription = this._subscriber.name; const modAckTable: {[index: string]: QueuedMessages} = batch.reduce( @@ -541,7 +560,7 @@ export class ModAckQueue extends MessageQueue { const callOptions = this.getCallOptions(); const modAckRequests = Object.keys(modAckTable).map(async deadline => { const messages = modAckTable[deadline]; - const ackIds = messages.map(m => m.ackId); + const ackIds = messages.map(m => m.message.ackId); const ackDeadlineSeconds = Number(deadline); const reqOpts = {subscription, ackIds, ackDeadlineSeconds}; @@ -575,6 +594,7 @@ export class ModAckQueue extends MessageQueue { // This catches the sub-failures and bubbles up anything we need to bubble. const allNewBatches: QueuedMessages[] = await Promise.all(modAckRequests); + responseSpan?.end(); return allNewBatches.reduce((p: QueuedMessage[], c: QueuedMessage[]) => [ ...(p ?? []), ...c, diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index f374d91bc..7a26728a2 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -223,19 +223,30 @@ export const modernAttributeName = 'googclient_traceparent'; */ export const legacyAttributeName = 'googclient_OpenTelemetrySpanContext'; +export interface AttributeParams { + topicName?: string; + subName?: string; +} + export class PubsubSpans { static createAttributes( - topicName: string, + params: AttributeParams, message?: PubsubMessage ): SpanAttributes { + const destinationName = params.topicName ?? params.subName; + if (!destinationName || (params.topicName && params.subName)) { + throw new Error('One of topicName or subName must be specified'); + } + const destinationKind = params.topicName ? 'topic' : 'subscription'; + const spanAttributes = { // Add Opentelemetry semantic convention attributes to the span, based on: // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/trace/semantic_conventions/messaging.md [SemanticAttributes.MESSAGING_TEMP_DESTINATION]: false, [SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub', [SemanticAttributes.MESSAGING_OPERATION]: 'send', - [SemanticAttributes.MESSAGING_DESTINATION]: topicName, - [SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic', + [SemanticAttributes.MESSAGING_DESTINATION]: destinationName, + [SemanticAttributes.MESSAGING_DESTINATION_KIND]: destinationKind, [SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub', } as SpanAttributes; @@ -257,7 +268,7 @@ export class PubsubSpans { static createPublisherSpan(message: PubsubMessage, topicName: string): Span { const span: Span = getTracer().startSpan(`${topicName} send`, { kind: SpanKind.PRODUCER, - attributes: PubsubSpans.createAttributes(topicName, message), + attributes: PubsubSpans.createAttributes({topicName}, message), }); return span; @@ -321,7 +332,7 @@ export class PubsubSpans { messages: MessageWithAttributes[], topicName: string ): Span { - const spanAttributes = PubsubSpans.createAttributes(topicName); + const spanAttributes = PubsubSpans.createAttributes({topicName}); const links: Link[] = messages .map(m => ({context: m.parentSpan?.spanContext()}) as Link) .filter(l => l.context); @@ -352,6 +363,41 @@ export class PubsubSpans { return span; } + static createReceiveResponseRpcSpan( + messageSpans: (Span | undefined)[], + subName: string + ): Span { + const spanAttributes = PubsubSpans.createAttributes({subName}); + const links: Link[] = messageSpans + .map(m => ({context: m?.spanContext()}) as Link) + .filter(l => l.context); + const span: Span = getTracer().startSpan( + `${subName} response batch`, + { + kind: SpanKind.CONSUMER, + attributes: spanAttributes, + links, + }, + ROOT_CONTEXT + ); + span?.setAttribute('messaging.batch.message_count', messageSpans.length); + messageSpans.forEach(m => { + // Workaround until the JS API properly supports adding links later. + if (m) { + m.setAttribute( + 'messaging.gcp_pubsub.receive.trace_id', + span.spanContext().traceId + ); + m.setAttribute( + 'messaging.gcp_pubsub.receive.span_id', + span.spanContext().spanId + ); + } + }); + + return span; + } + static createReceiveFlowSpan( message: MessageWithAttributes ): Span | undefined { diff --git a/test/message-queues.ts b/test/message-queues.ts index 6fe7f829f..095da46f6 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -252,7 +252,7 @@ describe('MessageQueues', () => { messageQueue.flush(); const [batch] = messageQueue.batches; - assert.strictEqual(batch[0].ackId, message.ackId); + assert.strictEqual(batch[0].message.ackId, message.ackId); assert.strictEqual(batch[0].deadline, deadline); assert.ok(batch[0].responsePromise?.resolve); }); @@ -593,7 +593,7 @@ describe('MessageQueues', () => { clock.tick(1000); assert.strictEqual(ackQueue.requests.length, 1); - assert.strictEqual(ackQueue.requests[0].ackId, message.ackId); + assert.strictEqual(ackQueue.requests[0].message.ackId, message.ackId); assert.strictEqual(ackQueue.numInRetryRequests, 0); assert.strictEqual(ackQueue.numPendingRequests, 1); }); From 7e00b101a35a34e97e20d9336f041bb441622de5 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Tue, 5 Dec 2023 17:56:45 -0500 Subject: [PATCH 13/15] docs: fix inadequate comments for deadline --- src/message-queues.ts | 4 ++-- src/subscriber.ts | 29 ++++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index 5eb892f31..5bdc675b5 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -45,7 +45,7 @@ export interface ReducedMessage { */ export interface QueuedMessage { message: ReducedMessage; - deadline?: number; + deadline?: number; // seconds responsePromise?: defer.DeferredPromise; retryCount: number; } @@ -182,7 +182,7 @@ export abstract class MessageQueue { * Adds a message to the queue. * * @param {Message} message The message to add. - * @param {number} [deadline] The deadline. + * @param {number} [deadline] The deadline in seconds. * @private */ add(message: Message, deadline?: number): Promise { diff --git a/src/subscriber.ts b/src/subscriber.ts index 58094b0f2..4b846c87d 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -100,6 +100,13 @@ export class SubscriberSpans { tracing.PubsubEvents.ackEnd(this.parent); } + // Emit an event for calling ack. + ackCall() { + if (this.processing) { + tracing.PubsubEvents.ackCalled(this.processing); + } + } + // Emit an event for starting to send a nack. nackStart() { tracing.PubsubEvents.nackStart(this.parent); @@ -110,6 +117,13 @@ export class SubscriberSpans { tracing.PubsubEvents.nackEnd(this.parent); } + // Emit an event for calling nack. + nackCall() { + if (this.processing) { + tracing.PubsubEvents.nackCalled(this.processing); + } + } + // Emit an event for starting to send a modAck. modAckStart(deadline: Duration, isInitial: boolean) { tracing.PubsubEvents.modAckStart(this.parent, deadline, isInitial); @@ -120,6 +134,13 @@ export class SubscriberSpans { tracing.PubsubEvents.modAckEnd(this.parent); } + // Emit an event for calling modAck. + modAckCall(deadline: Duration) { + if (this.processing) { + tracing.PubsubEvents.modAckCalled(this.processing, deadline); + } + } + // Start a scheduler span if needed. // Note: This is not currently used in Node, because there is no // scheduler process, due to the way messages are delivered one at a time. @@ -360,6 +381,7 @@ export class Message implements tracing.MessageWithAttributes { ack(): void { if (!this._handled) { this._handled = true; + this.subSpans.ackCall(); this._subscriber.ack(this); } } @@ -387,6 +409,7 @@ export class Message implements tracing.MessageWithAttributes { if (!this._handled) { this._handled = true; + this.subSpans.ackCall(); try { return await this._subscriber.ackWithResponse(this); } catch (e) { @@ -406,6 +429,7 @@ export class Message implements tracing.MessageWithAttributes { */ modAck(deadline: number): void { if (!this._handled) { + this.subSpans.modAckCall(Duration.from({seconds: deadline})); this._subscriber.modAck(this, deadline); } } @@ -428,6 +452,7 @@ export class Message implements tracing.MessageWithAttributes { } if (!this._handled) { + this.subSpans.modAckCall(Duration.from({seconds: deadline})); try { return await this._subscriber.modAckWithResponse(this, deadline); } catch (e) { @@ -452,6 +477,7 @@ export class Message implements tracing.MessageWithAttributes { nack(): void { if (!this._handled) { this._handled = true; + this.subSpans.nackCall(); this._subscriber.nack(this); } } @@ -480,6 +506,7 @@ export class Message implements tracing.MessageWithAttributes { if (!this._handled) { this._handled = true; + this.subSpans.nackCall(); try { return await this._subscriber.nackWithResponse(this); } catch (e) { @@ -797,7 +824,7 @@ export class Subscriber extends EventEmitter { * Modifies the acknowledge deadline for the provided message. * * @param {Message} message The message to modify. - * @param {number} deadline The deadline. + * @param {number} deadline The deadline in seconds. * @returns {Promise} * @private */ From e70516b1683df8c412834aac640914acc27f6aa6 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Tue, 5 Dec 2023 17:57:09 -0500 Subject: [PATCH 14/15] feat: add ack/nack/modack events in processing --- src/subscription.ts | 12 ++++++------ src/telemetry-tracing.ts | 18 ++++++++++++++++++ test/subscription.ts | 7 ++++++- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/subscription.ts b/src/subscription.ts index 3e9392d89..e666a00d1 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -359,19 +359,19 @@ export class Subscription extends WrappingEmitter { if (eventName !== 'message') { return listener(...args); } else { - const span = tracing.PubsubSpans.createReceiveProcessSpan( - args[0] as Message, - this.name - ); + const message = args[0] as Message; + message.subSpans.processingStart(this.name); // If the user returned a Promise, that means they used an async handler. // In that case, we need to tag on to their Promise to end the span. // Otherwise, the listener chain is sync, and we can close out sync. const result = listener(...args) as unknown as Promise; if (result && typeof result.then === 'function') { - result.then(() => span?.end()); + result.then(() => { + message.subSpans.processingEnd(); + }); } else { - span?.end(); + message.subSpans.processingEnd(); } } } diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 7a26728a2..6567e8750 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -477,6 +477,24 @@ export class PubsubEvents { PubsubEvents.addEvent('nack end', message); } + static ackCalled(span: Span) { + span.addEvent('ack called'); + } + + static nackCalled(span: Span) { + span.addEvent('nack called'); + } + + static modAckCalled(span: Span, deadline: Duration) { + // User-called modAcks are never initial ones. + span.addEvent('modack called', { + 'messaging.gcp_pubsub.modack_deadline_seconds': `${deadline.totalOf( + 'second' + )}`, + 'messaging.gcp_pubsub.is_receipt_modack': 'false', + }); + } + static modAckStart( message: MessageWithAttributes, deadline: Duration, diff --git a/test/subscription.ts b/test/subscription.ts index 9c68d2956..77717f617 100644 --- a/test/subscription.ts +++ b/test/subscription.ts @@ -200,7 +200,12 @@ describe('Subscription', () => { }); it('should emit messages', done => { - const message = {}; + const message = { + subSpans: { + processingStart() {}, + processingEnd() {}, + }, + }; subscription.on?.('message', (msg: Message) => { assert.strictEqual(msg, message); From f3b407084e9284a2e6265755a4cbf1853426dce8 Mon Sep 17 00:00:00 2001 From: Megan Potter <57276408+feywind@users.noreply.github.com> Date: Tue, 5 Dec 2023 17:59:46 -0500 Subject: [PATCH 15/15] fix: publisher -> publish scheduler Co-authored-by: Anna Levenberg --- src/telemetry-tracing.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 6567e8750..9c60bc154 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -325,7 +325,7 @@ export class PubsubSpans { } static createPublishSchedulerSpan(message: PubsubMessage): Span | undefined { - return PubsubSpans.createChildSpan('publisher scheduler', message); + return PubsubSpans.createChildSpan('publish scheduler', message); } static createPublishRpcSpan(