Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update to OTel PR for latest design #4

Merged
merged 17 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,14 @@ export class LeaseManager extends EventEmitter {
}
}
/**
* Removes ALL messages from inventory.
* Removes ALL messages from inventory, and returns the ones removed.
* @private
*/
clear(): void {
clear(): Message[] {
const wasFull = this.isFull();

this._pending = [];
this._messages.forEach(m => {
m.endParentSpan();
});
const remaining = Array.from(this._messages);
this._messages.clear();
this.bytes = 0;

Expand All @@ -141,6 +139,8 @@ export class LeaseManager extends EventEmitter {
}

this._cancelExtension();

return remaining;
}
/**
* Indicates if we're at or over capacity.
Expand All @@ -162,9 +162,6 @@ export class LeaseManager extends EventEmitter {
* @private
*/
remove(message: Message): void {
// The subscriber span ends when it leaves leasing.
message.endParentSpan();

if (!this._messages.has(message)) {
return;
}
Expand Down Expand Up @@ -269,7 +266,8 @@ export class LeaseManager extends EventEmitter {
const lifespan = (Date.now() - message.received) / (60 * 1000);

if (lifespan < this._options.maxExtensionMinutes!) {
message.subSpans.modAckStart(Duration.from({seconds: deadline}), false);
const deadlineDuration = Duration.from({seconds: deadline});
message.subSpans.modAckStart(deadlineDuration, false);

if (this._subscriber.isExactlyOnceDelivery) {
message
Expand All @@ -281,11 +279,11 @@ export class LeaseManager extends EventEmitter {
this.remove(message);
})
.finally(() => {
message.subSpans.modAckStop();
message.subSpans.modAckEnd();
});
} else {
message.modAck(deadline);
message.subSpans.modAckStop();
message.subSpans.modAckStart(deadlineDuration, false);
}
} else {
this.remove(message);
Expand Down
40 changes: 30 additions & 10 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,19 @@ import {
import {Duration} from './temporal';
import {addToBucket} from './util';
import {DebugMessage} from './debug';
import * as tracing from './telemetry-tracing';

export interface ReducedMessage {
ackId: string;
tracingSpan?: tracing.Span;
}

/**
* @private
*/
export interface QueuedMessage {
ackId: string;
deadline?: number;
message: ReducedMessage;
deadline?: number; // seconds
responsePromise?: defer.DeferredPromise<void>;
retryCount: number;
}
Expand Down Expand Up @@ -176,10 +182,10 @@ export abstract class MessageQueue {
* Adds a message to the queue.
*
* @param {Message} message The message to add.
* @param {number} [deadline] The deadline.
* @param {number} [deadline] The deadline in seconds.
* @private
*/
add({ackId}: Message, deadline?: number): Promise<void> {
add(message: Message, deadline?: number): Promise<void> {
if (this._closed) {
if (this._subscriber.isExactlyOnceDelivery) {
throw new AckError(AckResponses.Invalid, 'Subscriber closed');
Expand All @@ -192,7 +198,10 @@ export abstract class MessageQueue {

const responsePromise = defer<void>();
this._requests.push({
ackId,
message: {
ackId: message.ackId,
tracingSpan: message.parentSpan,
},
deadline,
responsePromise,
retryCount: 0,
Expand Down Expand Up @@ -379,9 +388,9 @@ export abstract class MessageQueue {
const codes: AckErrorCodes = processAckErrorInfo(rpcError);

for (const m of batch) {
if (codes.has(m.ackId)) {
if (codes.has(m.message.ackId)) {
// This ack has an ErrorInfo entry, so use that to route it.
const code = codes.get(m.ackId)!;
const code = codes.get(m.message.ackId)!;
if (code.transient) {
// Transient errors get retried.
toRetry.push(m);
Expand All @@ -407,7 +416,7 @@ export abstract class MessageQueue {
// stream message if an unknown error happens during ack.
const others = toError.get(AckResponses.Other);
if (others?.length) {
const otherIds = others.map(e => e.ackId);
const otherIds = others.map(e => e.message.ackId);
const debugMsg = new BatchError(rpcError, otherIds, operation);
this._subscriber.emit('debug', debugMsg);
}
Expand Down Expand Up @@ -468,15 +477,20 @@ export class AckQueue extends MessageQueue {
* @return {Promise}
*/
protected async _sendBatch(batch: QueuedMessages): Promise<QueuedMessages> {
const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan(
batch.map(b => b.message.tracingSpan),
this._subscriber.name
);
const client = await this._subscriber.getClient();
const ackIds = batch.map(({ackId}) => ackId);
const ackIds = batch.map(({message}) => message.ackId);
const reqOpts = {subscription: this._subscriber.name, ackIds};

try {
await client.acknowledge(reqOpts, this.getCallOptions());

// It's okay if these pass through since they're successful anyway.
this.handleAckSuccesses(batch);
responseSpan?.end();
return [];
} catch (e) {
// If exactly-once delivery isn't enabled, don't do error processing. We'll
Expand All @@ -500,6 +514,7 @@ export class AckQueue extends MessageQueue {
batch.forEach(m => {
m.responsePromise?.reject(exc);
});
responseSpan?.end();
return [];
}
}
Expand All @@ -524,6 +539,10 @@ export class ModAckQueue extends MessageQueue {
* @return {Promise}
*/
protected async _sendBatch(batch: QueuedMessages): Promise<QueuedMessages> {
const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan(
batch.map(b => b.message.tracingSpan),
this._subscriber.name
);
const client = await this._subscriber.getClient();
const subscription = this._subscriber.name;
const modAckTable: {[index: string]: QueuedMessages} = batch.reduce(
Expand All @@ -541,7 +560,7 @@ export class ModAckQueue extends MessageQueue {
const callOptions = this.getCallOptions();
const modAckRequests = Object.keys(modAckTable).map(async deadline => {
const messages = modAckTable[deadline];
const ackIds = messages.map(m => m.ackId);
const ackIds = messages.map(m => m.message.ackId);
const ackDeadlineSeconds = Number(deadline);
const reqOpts = {subscription, ackIds, ackDeadlineSeconds};

Expand Down Expand Up @@ -575,6 +594,7 @@ export class ModAckQueue extends MessageQueue {

// This catches the sub-failures and bubbles up anything we need to bubble.
const allNewBatches: QueuedMessages[] = await Promise.all(modAckRequests);
responseSpan?.end();
return allNewBatches.reduce((p: QueuedMessage[], c: QueuedMessage[]) => [
...(p ?? []),
...c,
Expand Down
27 changes: 24 additions & 3 deletions src/publisher/message-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@

import {BATCH_LIMITS, PubsubMessage, PublishCallback} from './';
import {calculateMessageSize} from './pubsub-message';
import * as tracing from '../telemetry-tracing';

export interface BatchPublishOptions {
maxBytes?: number;
maxMessages?: number;
maxMilliseconds?: number;
}

export interface BatchResults {
messages: PubsubMessage[];
callbacks: PublishCallback[];
}

/**
* @typedef BatchPublishOptions
* @property {number} [maxBytes=1 * 1024 * 1024] The maximum number of bytes to
Expand All @@ -40,13 +46,15 @@ export interface BatchPublishOptions {
* @param {BatchPublishOptions} options The batching options.
*/
export class MessageBatch {
options: BatchPublishOptions;
messages: PubsubMessage[];
callbacks: PublishCallback[];
created: number;
bytes: number;
constructor(options: BatchPublishOptions) {
this.options = options;

constructor(
public options: BatchPublishOptions,
public topicName: string
) {
this.messages = [];
this.callbacks = [];
this.created = Date.now();
Expand All @@ -72,7 +80,18 @@ export class MessageBatch {
this.messages.push(message);
this.callbacks.push(callback);
this.bytes += calculateMessageSize(message);

tracing.PubsubSpans.createPublishSchedulerSpan(message);
}

end(): BatchResults {
this.messages.forEach(m => m.publishSchedulerSpan?.end());
return {
messages: this.messages,
callbacks: this.callbacks,
};
}

/**
* Indicates if a given message can fit in the batch.
*
Expand All @@ -86,6 +105,7 @@ export class MessageBatch {
this.bytes + calculateMessageSize(message) <= maxBytes!
);
}

/**
* Checks to see if this batch is at the maximum allowed payload size.
* When publishing ordered messages, it is ok to exceed the user configured
Expand All @@ -97,6 +117,7 @@ export class MessageBatch {
const {maxMessages, maxBytes} = BATCH_LIMITS;
return this.messages.length >= maxMessages! || this.bytes >= maxBytes!;
}

/**
* Indicates if the batch is at capacity.
*
Expand Down
26 changes: 10 additions & 16 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,10 @@ export abstract class MessageQueue extends EventEmitter {
});
}

messages.forEach(m => {
const span = tracing.PubsubSpans.createPublishRpcSpan(m, messages.length);
if (span) {
m.rpcSpan = span;
}
});
const rpcSpan = tracing.PubsubSpans.createPublishRpcSpan(
spanMessages,
topic.name
);

const requestCallback = topic.request<google.pubsub.v1.IPublishResponse>;
const request = promisify(requestCallback.bind(topic));
Expand All @@ -144,7 +142,7 @@ export abstract class MessageQueue extends EventEmitter {
messages.forEach(m => {
// We're finished with both the RPC and the whole publish operation,
// so close out all of the related spans.
m.rpcSpan?.end();
rpcSpan?.end();
m.parentSpan?.end();
});
}
Expand All @@ -163,7 +161,7 @@ export class Queue extends MessageQueue {
batch: MessageBatch;
constructor(publisher: Publisher) {
super(publisher);
this.batch = new MessageBatch(this.batchOptions);
this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name);
}

// This needs to update our existing message batch.
Expand All @@ -186,8 +184,6 @@ export class Queue extends MessageQueue {
this.publish().catch(() => {});
}

message.batchingSpan = tracing.PubsubSpans.createPublishBatchSpan(message);

this.batch.add(message, callback);

if (this.batch.isFull()) {
Expand Down Expand Up @@ -230,17 +226,15 @@ export class Queue extends MessageQueue {
* @emits Queue#drain when all messages are sent.
*/
async _publishInternal(fullyDrain: boolean): Promise<void> {
const {messages, callbacks} = this.batch;
const {messages, callbacks} = this.batch.end();

this.batch = new MessageBatch(this.batchOptions);
this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name);

if (this.pending) {
clearTimeout(this.pending);
delete this.pending;
}

messages.forEach(m => m.batchingSpan?.end());

await this._publish(messages, callbacks);
if (this.batch.messages.length) {
// We only do the indefinite go-arounds when we're trying to do a
Expand Down Expand Up @@ -358,7 +352,7 @@ export class OrderedQueue extends MessageQueue {
* @returns {MessageBatch}
*/
createBatch(): MessageBatch {
return new MessageBatch(this.batchOptions);
return new MessageBatch(this.batchOptions, this.publisher.topic.name);
}
/**
* In the event of a publish failure, we need to cache the error in question
Expand Down Expand Up @@ -401,7 +395,7 @@ export class OrderedQueue extends MessageQueue {
delete this.pending;
}

const {messages, callbacks} = this.batches.pop()!;
const {messages, callbacks} = this.batches.pop()!.end();

try {
await this._publish(messages, callbacks);
Expand Down
8 changes: 4 additions & 4 deletions src/publisher/pubsub-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ export interface PubsubMessage
// don't get to control what these objects are. They come from grpc.

/**
* If tracing is enabled, track the batch span.
* If tracing is enabled, track the message span.
*
* @private
*/
batchingSpan?: tracing.Span;
messageSpan?: tracing.Span;

/**
* If tracing is enabled, track the RPC send time span.
* If tracing is enabled, track the batching (publish scheduling) period.
*
* @private
*/
rpcSpan?: tracing.Span;
publishSchedulerSpan?: tracing.Span;
}

/**
Expand Down
Loading