Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a close() method to PubSub, and a flush() method to Topic/Publisher #916

Merged
merged 6 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
matrix:
node: [8, 10, 12, 13]
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
feywind marked this conversation as resolved.
Show resolved Hide resolved
- uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node }}
Expand All @@ -30,7 +30,7 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: 12
Expand All @@ -39,7 +39,7 @@ jobs:
docs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: 12
Expand All @@ -48,7 +48,7 @@ jobs:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: 12
Expand Down
36 changes: 32 additions & 4 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
* limitations under the License.
*/

import {promisifyAll} from '@google-cloud/promisify';
import {promisify, promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import {CallOptions} from 'google-gax';

import {MessageBatch, BatchPublishOptions} from './message-batch';
import {Queue, OrderedQueue} from './message-queues';
import {BatchPublishOptions} from './message-batch';
import {Queue, OrderedQueue, PublishDone} from './message-queues';
import {Topic} from '../topic';
import {RequestCallback} from '../pubsub';
import {RequestCallback, EmptyCallback, EmptyResponse} from '../pubsub';
import {google} from '../../proto/pubsub';
import {defaultOptions} from '../default-options';

Expand Down Expand Up @@ -72,6 +72,34 @@ export class Publisher {
this.queue = new Queue(this);
this.orderedQueues = new Map();
}

flush(): Promise<void>;
flush(callback: EmptyCallback): void;
/**
* Immediately sends all remaining queued data. This is mostly useful
* if you are planning to call close() on the PubSub object that holds
* the server connections.
*
* @private
*
* @param {EmptyCallback} [callback] Callback function.
* @returns {Promise<EmptyResponse>}
*/
flush(callback?: EmptyCallback): Promise<void> | void {
const definedCallback = callback ? callback : () => {};

const publishes = [promisify(this.queue.publish)()];
Array.from(this.orderedQueues.values()).forEach(q =>
publishes.push(promisify(q.publish)())
);
const allPublishes = Promise.all(publishes);

allPublishes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to use async/await here? I'm at the point where I pretty much always favor it these days.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I about 350% agree... unfortunately the current code is mostly callback-based, and a few event-based bits, so pulling the async/await far up the stack was looking difficult. I'd really like to go back and do a pass of just converting everything to straightforward async/await/Promises, but I don't want to do that as part of this.

.then(() => {
definedCallback(null);
})
.catch(definedCallback);
}
publish(data: Buffer, attributes?: Attributes): Promise<string>;
publish(data: Buffer, callback: PublishCallback): void;
publish(
Expand Down
9 changes: 6 additions & 3 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class Queue extends MessageQueue {
/**
* Cancels any pending publishes and calls _publish immediately.
*/
publish(): void {
publish(callback?: PublishDone): void {
const {messages, callbacks} = this.batch;

this.batch = new MessageBatch(this.batchOptions);
Expand All @@ -141,7 +141,7 @@ export class Queue extends MessageQueue {
delete this.pending;
}

this._publish(messages, callbacks);
this._publish(messages, callbacks, callback);
}
}

Expand Down Expand Up @@ -259,7 +259,8 @@ export class OrderedQueue extends MessageQueue {
*
* @fires OrderedQueue#drain
*/
publish(): void {
publish(callback?: PublishDone): void {
const definedCallback = callback || (() => {});
this.inFlight = true;

if (this.pending) {
Expand All @@ -274,10 +275,12 @@ export class OrderedQueue extends MessageQueue {

if (err) {
this.handlePublishFailure(err);
definedCallback(err);
} else if (this.batches.length) {
this.beginNextPublish();
} else {
this.emit('drain');
definedCallback(null);
}
});
}
Expand Down
60 changes: 59 additions & 1 deletion src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ export class PubSub {
getTopicsStream = paginator.streamify('getTopics') as () => ObjectStream<
Topic
>;
isOpen = true;

constructor(options?: ClientConfig) {
options = options || {};
Expand Down Expand Up @@ -281,6 +282,33 @@ export class PubSub {
}
}

close(): Promise<void>;
close(callback: EmptyCallback): void;
/**
* Closes out this object, releasing any server connections. Note that once
* you close a PubSub object, it may not be used again. Any pending operations
* (e.g. queued publish messages) will fail. If you have topic or subscription
* objects that may have pending operations, you should call close() on those
* first if you want any pending messages to be delivered correctly. The
* PubSub class doesn't track those.
*
* @callback EmptyCallback
* @returns {Promise<void>}
*/
close(callback?: EmptyCallback): Promise<void> | void {
const definedCallback = callback || (() => {});
if (this.isOpen) {
this.closeAllClients_()
.then(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment, as above. I can understand the argument for keeping the codebase internally consistent, if that's the thinking.

definedCallback(null);
})
.catch(definedCallback);
this.isOpen = false;
feywind marked this conversation as resolved.
Show resolved Hide resolved
} else {
definedCallback(null);
}
}

createSubscription(
topic: Topic | string,
name: string,
Expand Down Expand Up @@ -941,6 +969,23 @@ export class PubSub {

return gaxClient;
}
/**
* Close all open client objects.
*
* @private
*
* @returns {Promise}
*/
async closeAllClients_(): Promise<void> {
const promises = [];
for (const clientConfig of Object.keys(this.api)) {
const gaxClient = this.api[clientConfig];
promises.push(gaxClient.close());
delete this.api[clientConfig];
}

await Promise.all(promises);
}
/**
* Funnel all API requests through this method, to be sure we have a project
* ID.
Expand All @@ -954,6 +999,19 @@ export class PubSub {
* @param {function} [callback] The callback function.
*/
request<T, R = void>(config: RequestConfig, callback: RequestCallback<T, R>) {
// This prevents further requests, in case any publishers were hanging around.
if (!this.isOpen) {
const statusObject = {
code: 0,
details: 'Cannot use a closed PubSub object.',
metadata: null,
};
const err = new Error(statusObject.details);
Object.assign(err, statusObject);
callback(err as ServiceError);
return;
}

this.getClient_(config, (err, client) => {
if (err) {
callback(err as ServiceError);
Expand Down Expand Up @@ -1137,7 +1195,7 @@ export class PubSub {

/*! Developer Documentation
*
* These methods can be agto-paginated.
* These methods can be auto-paginated.
*/
paginator.extend(PubSub, ['getSnapshots', 'getSubscriptions', 'getTopics']);

Expand Down
16 changes: 16 additions & 0 deletions src/topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,22 @@ export class Topic {
this.iam = new IAM(pubsub, this.name);
}

flush(): Promise<void>;
flush(callback: EmptyCallback): void;
/**
* Immediately sends all remaining queued data. This is mostly useful
* if you are planning to call close() on the PubSub object that holds
* the server connections.
*
* @param {EmptyCallback} [callback] Callback function.
* @returns {Promise<EmptyResponse>}
*/
flush(callback?: EmptyCallback): Promise<void> | void {
// It doesn't matter here if callback is undefined; the Publisher
// flush() will handle it.
this.publisher.flush(callback!);
}

create(gaxOpts?: CallOptions): Promise<CreateTopicResponse>;
create(callback: CreateTopicCallback): void;
create(gaxOpts: CallOptions, callback: CreateTopicCallback): void;
Expand Down
49 changes: 49 additions & 0 deletions test/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class FakeQueue extends EventEmitter {
this.publisher = publisher;
}
add(message: p.PubsubMessage, callback: p.PublishCallback): void {}
publish(callback: (err: Error | null) => void) {}
}

class FakeOrderedQueue extends FakeQueue {
Expand All @@ -57,6 +58,7 @@ class FakeOrderedQueue extends FakeQueue {
this.orderingKey = key;
}
resumePublishing(): void {}
publish(callback: (err: Error | null) => void) {}
}

describe('Publisher', () => {
Expand Down Expand Up @@ -239,6 +241,34 @@ describe('Publisher', () => {

assert.strictEqual(publisher.orderedQueues.size, 0);
});

it('should drain any ordered queues on flush', done => {
// We have to stub out the regular queue as well, so that the flush() operation finishes.
sandbox
.stub(FakeQueue.prototype, 'publish')
.callsFake((callback: (err: Error | null) => void) => {
callback(null);
});

sandbox
.stub(FakeOrderedQueue.prototype, 'publish')
.callsFake((callback: (err: Error | null) => void) => {
const queue = (publisher.orderedQueues.get(
orderingKey
) as unknown) as FakeOrderedQueue;
queue.emit('drain');
callback(null);
});

publisher.orderedQueues.clear();
publisher.publishMessage(fakeMessage, spy);

publisher.flush(err => {
assert.strictEqual(err, null);
assert.strictEqual(publisher.orderedQueues.size, 0);
done();
});
});
});
});

Expand Down Expand Up @@ -315,4 +345,23 @@ describe('Publisher', () => {
assert.strictEqual(publisher.settings.batching!.maxMessages, 1000);
});
});

describe('flush', () => {
// The ordered queue drain test is above with the ordered queue tests.
it('should drain the main publish queue', done => {
sandbox.stub(publisher.queue, 'publish').callsFake(cb => {
if (cb) {
cb(null);
}
});
publisher.flush(err => {
assert.strictEqual(err, null);
assert.strictEqual(
!publisher.queue.batch || publisher.queue.batch.messages.length === 0,
true
);
done();
});
});
});
});
Loading