Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update to OTel PR for latest design #4

Merged
merged 17 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,14 @@ export class LeaseManager extends EventEmitter {
}
}
/**
* Removes ALL messages from inventory.
* Removes ALL messages from inventory, and returns the ones removed.
* @private
*/
clear(): void {
clear(): Message[] {
const wasFull = this.isFull();

this._pending = [];
this._messages.forEach(m => {
m.endParentSpan();
});
const remaining = Array.from(this._messages);
this._messages.clear();
this.bytes = 0;

Expand All @@ -141,6 +139,8 @@ export class LeaseManager extends EventEmitter {
}

this._cancelExtension();

return remaining;
}
/**
* Indicates if we're at or over capacity.
Expand All @@ -162,9 +162,6 @@ export class LeaseManager extends EventEmitter {
* @private
*/
remove(message: Message): void {
// The subscriber span ends when it leaves leasing.
message.endParentSpan();

if (!this._messages.has(message)) {
return;
}
Expand Down Expand Up @@ -269,7 +266,8 @@ export class LeaseManager extends EventEmitter {
const lifespan = (Date.now() - message.received) / (60 * 1000);

if (lifespan < this._options.maxExtensionMinutes!) {
message.subSpans.modAckStart(Duration.from({seconds: deadline}), false);
const deadlineDuration = Duration.from({seconds: deadline});
message.subSpans.modAckStart(deadlineDuration, false);

if (this._subscriber.isExactlyOnceDelivery) {
message
Expand All @@ -281,11 +279,11 @@ export class LeaseManager extends EventEmitter {
this.remove(message);
})
.finally(() => {
message.subSpans.modAckStop();
message.subSpans.modAckEnd();
});
} else {
message.modAck(deadline);
message.subSpans.modAckStop();
message.subSpans.modAckStart(deadlineDuration, false);
}
} else {
this.remove(message);
Expand Down
27 changes: 24 additions & 3 deletions src/publisher/message-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@

import {BATCH_LIMITS, PubsubMessage, PublishCallback} from './';
import {calculateMessageSize} from './pubsub-message';
import * as tracing from '../telemetry-tracing';

export interface BatchPublishOptions {
maxBytes?: number;
maxMessages?: number;
maxMilliseconds?: number;
}

export interface BatchResults {
messages: PubsubMessage[];
callbacks: PublishCallback[];
}

/**
* @typedef BatchPublishOptions
* @property {number} [maxBytes=1 * 1024 * 1024] The maximum number of bytes to
Expand All @@ -40,13 +46,15 @@ export interface BatchPublishOptions {
* @param {BatchPublishOptions} options The batching options.
*/
export class MessageBatch {
options: BatchPublishOptions;
messages: PubsubMessage[];
callbacks: PublishCallback[];
created: number;
bytes: number;
constructor(options: BatchPublishOptions) {
this.options = options;

constructor(
public options: BatchPublishOptions,
public topicName: string
) {
this.messages = [];
this.callbacks = [];
this.created = Date.now();
Expand All @@ -72,7 +80,18 @@ export class MessageBatch {
this.messages.push(message);
this.callbacks.push(callback);
this.bytes += calculateMessageSize(message);

tracing.PubsubSpans.createPublishSchedulerSpan(message);
}

end(): BatchResults {
this.messages.forEach(m => m.publishSchedulerSpan?.end());
return {
messages: this.messages,
callbacks: this.callbacks,
};
}

/**
* Indicates if a given message can fit in the batch.
*
Expand All @@ -86,6 +105,7 @@ export class MessageBatch {
this.bytes + calculateMessageSize(message) <= maxBytes!
);
}

/**
* Checks to see if this batch is at the maximum allowed payload size.
* When publishing ordered messages, it is ok to exceed the user configured
Expand All @@ -97,6 +117,7 @@ export class MessageBatch {
const {maxMessages, maxBytes} = BATCH_LIMITS;
return this.messages.length >= maxMessages! || this.bytes >= maxBytes!;
}

/**
* Indicates if the batch is at capacity.
*
Expand Down
26 changes: 10 additions & 16 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,10 @@ export abstract class MessageQueue extends EventEmitter {
});
}

messages.forEach(m => {
const span = tracing.PubsubSpans.createPublishRpcSpan(m, messages.length);
if (span) {
m.rpcSpan = span;
}
});
const rpcSpan = tracing.PubsubSpans.createPublishRpcSpan(
spanMessages,
topic.name
);

const requestCallback = topic.request<google.pubsub.v1.IPublishResponse>;
const request = promisify(requestCallback.bind(topic));
Expand All @@ -144,7 +142,7 @@ export abstract class MessageQueue extends EventEmitter {
messages.forEach(m => {
// We're finished with both the RPC and the whole publish operation,
// so close out all of the related spans.
m.rpcSpan?.end();
rpcSpan?.end();
m.parentSpan?.end();
});
}
Expand All @@ -163,7 +161,7 @@ export class Queue extends MessageQueue {
batch: MessageBatch;
constructor(publisher: Publisher) {
super(publisher);
this.batch = new MessageBatch(this.batchOptions);
this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name);
}

// This needs to update our existing message batch.
Expand All @@ -186,8 +184,6 @@ export class Queue extends MessageQueue {
this.publish().catch(() => {});
}

message.batchingSpan = tracing.PubsubSpans.createPublishBatchSpan(message);

this.batch.add(message, callback);

if (this.batch.isFull()) {
Expand Down Expand Up @@ -230,17 +226,15 @@ export class Queue extends MessageQueue {
* @emits Queue#drain when all messages are sent.
*/
async _publishInternal(fullyDrain: boolean): Promise<void> {
const {messages, callbacks} = this.batch;
const {messages, callbacks} = this.batch.end();

this.batch = new MessageBatch(this.batchOptions);
this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name);

if (this.pending) {
clearTimeout(this.pending);
delete this.pending;
}

messages.forEach(m => m.batchingSpan?.end());

await this._publish(messages, callbacks);
if (this.batch.messages.length) {
// We only do the indefinite go-arounds when we're trying to do a
Expand Down Expand Up @@ -358,7 +352,7 @@ export class OrderedQueue extends MessageQueue {
* @returns {MessageBatch}
*/
createBatch(): MessageBatch {
return new MessageBatch(this.batchOptions);
return new MessageBatch(this.batchOptions, this.publisher.topic.name);
}
/**
* In the event of a publish failure, we need to cache the error in question
Expand Down Expand Up @@ -401,7 +395,7 @@ export class OrderedQueue extends MessageQueue {
delete this.pending;
}

const {messages, callbacks} = this.batches.pop()!;
const {messages, callbacks} = this.batches.pop()!.end();

try {
await this._publish(messages, callbacks);
Expand Down
8 changes: 4 additions & 4 deletions src/publisher/pubsub-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ export interface PubsubMessage
// don't get to control what these objects are. They come from grpc.

/**
* If tracing is enabled, track the batch span.
* If tracing is enabled, track the message span.
*
* @private
*/
batchingSpan?: tracing.Span;
messageSpan?: tracing.Span;

/**
* If tracing is enabled, track the RPC send time span.
* If tracing is enabled, track the batching (publish scheduling) period.
*
* @private
*/
rpcSpan?: tracing.Span;
publishSchedulerSpan?: tracing.Span;
}

/**
Expand Down
Loading