Skip to content

Commit

Permalink
chore: pull promise-based publish out of OTel change (#1723)
Browse files Browse the repository at this point in the history
This shifts the publisher to use Promise-based logic instead of callbacks. This was pulled out of the OpenTelemetry change to make that one simpler.
  • Loading branch information
feywind authored May 12, 2023
1 parent f61a64a commit 6ea41f4
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 178 deletions.
8 changes: 3 additions & 5 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/

import {promisify} from '@google-cloud/promisify';
import * as extend from 'extend';
import {CallOptions} from 'google-gax';
import {SemanticAttributes} from '@opentelemetry/semantic-conventions';
Expand Down Expand Up @@ -123,14 +122,12 @@ export class Publisher {
// event listeners after we've completed flush().
q.removeListener('drain', flushResolver);
};
return q.on('drain', flushResolver);
q.on('drain', flushResolver);
})
)
);

const allPublishes = Promise.all(
toDrain.map(q => promisify(q.publishDrain).bind(q)())
);
const allPublishes = Promise.all(toDrain.map(q => q.publishDrain()));

allPublishes
.then(() => allDrains)
Expand All @@ -139,6 +136,7 @@ export class Publisher {
})
.catch(definedCallback);
}

/**
* Publish the provided message.
*
Expand Down
142 changes: 76 additions & 66 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import {PublishError} from './publish-error';
import {Publisher, PubsubMessage, PublishCallback} from './';
import {google} from '../../protos/protos';

export interface PublishDone {
(err: ServiceError | null): void;
}
import {promisify} from 'util';

/**
* Queues are used to manage publishing batches of messages.
Expand Down Expand Up @@ -73,57 +71,55 @@ export abstract class MessageQueue extends EventEmitter {
*
* @abstract
*/
abstract publish(): void;
abstract publish(): Promise<void>;

/**
* Method to finalize publishing. Does as many publishes as are needed
* to finish emptying the queues, and fires a drain event afterward.
*
* @abstract
*/
abstract publishDrain(): void;
abstract publishDrain(): Promise<void>;

/**
* Accepts a batch of messages and publishes them to the API.
*
* @param {object[]} messages The messages to publish.
* @param {PublishCallback[]} callbacks The corresponding callback functions.
* @param {function} [callback] Callback to be fired when publish is done.
*/
_publish(
async _publish(
messages: PubsubMessage[],
callbacks: PublishCallback[],
callback?: PublishDone
): void {
callbacks: PublishCallback[]
): Promise<void> {
const {topic, settings} = this.publisher;
const reqOpts = {
topic: topic.name,
messages,
};
if (messages.length === 0) {
if (typeof callback === 'function') {
// Do this on the next tick to avoid Zalgo with the publish request below.
process.nextTick(() => callback(null));
}
return;
}

topic.request<google.pubsub.v1.IPublishResponse>(
{
const requestCallback = topic.request<google.pubsub.v1.IPublishResponse>;
const request = promisify(requestCallback.bind(topic));
try {
const resp = await request({
client: 'PublisherClient',
method: 'publish',
reqOpts,
gaxOpts: settings.gaxOpts!,
},
(err, resp) => {
const messageIds = (resp && resp.messageIds) || [];
callbacks.forEach((callback, i) => callback(err, messageIds[i]));

if (typeof callback === 'function') {
callback(err);
}
});

if (resp) {
const messageIds = resp.messageIds || [];
callbacks.forEach((callback, i) => callback(null, messageIds[i]));
}
);
} catch (e) {
const err = e as ServiceError;
callbacks.forEach(callback => callback(err));

throw e;
}
}
}

Expand Down Expand Up @@ -156,16 +152,25 @@ export class Queue extends MessageQueue {
*/
add(message: PubsubMessage, callback: PublishCallback): void {
if (!this.batch.canFit(message)) {
this.publish();
// Make a background best-effort attempt to clear out the
// queue. If this fails, we'll basically just be overloaded
// for a bit.
this.publish().catch(() => {});
}

this.batch.add(message, callback);

if (this.batch.isFull()) {
this.publish();
// See comment above - best effort.
this.publish().catch(() => {});
} else if (!this.pending) {
const {maxMilliseconds} = this.batchOptions;
this.pending = setTimeout(() => this.publish(), maxMilliseconds!);
this.pending = setTimeout(() => {
// See comment above - we are basically making a best effort
// to start clearing out the queue if nothing else happens
// before the batch timeout.
this.publish().catch(() => {});
}, maxMilliseconds!);
}
}

Expand All @@ -176,26 +181,25 @@ export class Queue extends MessageQueue {
*
* @emits Queue#drain when all messages are sent.
*/
publishDrain(callback?: PublishDone): void {
this._publishInternal(true, callback);
async publishDrain(): Promise<void> {
await this._publishInternal(true);
}

/**
* Cancels any pending publishes and calls _publish immediately.
*
* Does _not_ attempt to further drain after one batch is sent.
*/
publish(callback?: PublishDone): void {
this._publishInternal(false, callback);
async publish(): Promise<void> {
await this._publishInternal(false);
}

/**
* Cancels any pending publishes and calls _publish immediately.
*
* @emits Queue#drain when all messages are sent.
*/
_publishInternal(fullyDrain: boolean, callback?: PublishDone): void {
const definedCallback = callback || (() => {});
async _publishInternal(fullyDrain: boolean): Promise<void> {
const {messages, callbacks} = this.batch;

this.batch = new MessageBatch(this.batchOptions);
Expand All @@ -205,21 +209,17 @@ export class Queue extends MessageQueue {
delete this.pending;
}

this._publish(messages, callbacks, (err: null | ServiceError) => {
if (err) {
definedCallback(err);
} else if (this.batch.messages.length) {
// We only do the indefinite go-arounds when we're trying to do a
// final drain for flush(). In all other cases, we want to leave
// subsequent batches alone so that they can time out as needed.
if (fullyDrain) {
this._publishInternal(true, callback);
}
} else {
this.emit('drain');
definedCallback(null);
await this._publish(messages, callbacks);
if (this.batch.messages.length) {
// We only do the indefinite go-arounds when we're trying to do a
// final drain for flush(). In all other cases, we want to leave
// subsequent batches alone so that they can time out as needed.
if (fullyDrain) {
await this._publishInternal(true);
}
});
} else {
this.emit('drain');
}
}
}

Expand Down Expand Up @@ -286,7 +286,10 @@ export class OrderedQueue extends MessageQueue {
}

if (!this.currentBatch.canFit(message)) {
this.publish();
// Make a best-effort attempt to clear out the publish queue,
// to make more space for the new batch. If this fails, we'll
// just be overfilled for a bit.
this.publish().catch(() => {});
}

this.currentBatch.add(message, callback);
Expand All @@ -295,7 +298,8 @@ export class OrderedQueue extends MessageQueue {
// check again here
if (!this.inFlight) {
if (this.currentBatch.isFull()) {
this.publish();
// See comment above - best-effort.
this.publish().catch(() => {});
} else if (!this.pending) {
this.beginNextPublish();
}
Expand All @@ -309,7 +313,12 @@ export class OrderedQueue extends MessageQueue {
const timeWaiting = Date.now() - this.currentBatch.created;
const delay = Math.max(0, maxMilliseconds - timeWaiting);

this.pending = setTimeout(() => this.publish(), delay);
this.pending = setTimeout(() => {
// Make a best-effort attempt to start a publish request. If
// this fails, we'll catch it again later, eventually, when more
// messages try to enter the queue.
this.publish().catch(() => {});
}, delay);
}
/**
* Creates a new {@link MessageBatch} instance.
Expand Down Expand Up @@ -344,8 +353,7 @@ export class OrderedQueue extends MessageQueue {
*
* @fires OrderedQueue#drain
*/
publish(callback?: PublishDone): void {
const definedCallback = callback || (() => {});
async publish(): Promise<void> {
this.inFlight = true;

if (this.pending) {
Expand All @@ -355,28 +363,30 @@ export class OrderedQueue extends MessageQueue {

const {messages, callbacks} = this.batches.pop()!;

this._publish(messages, callbacks, (err: null | ServiceError) => {
try {
await this._publish(messages, callbacks);
} catch (e) {
const err = e as ServiceError;
this.inFlight = false;
this.handlePublishFailure(err);
} finally {
this.inFlight = false;
}

if (err) {
this.handlePublishFailure(err);
definedCallback(err);
} else if (this.batches.length) {
this.beginNextPublish();
} else {
this.emit('drain');
definedCallback(null);
}
});
if (this.batches.length) {
this.beginNextPublish();
} else {
this.emit('drain');
}
}

/**
* For ordered queues, this does exactly the same thing as `publish()`.
*
* @fires OrderedQueue#drain
*/
publishDrain(callback?: PublishDone): void {
this.publish(callback);
async publishDrain(): Promise<void> {
await this.publish();
}

/**
Expand Down
Loading

0 comments on commit 6ea41f4

Please sign in to comment.