From 08a2fdb1a13d637d368218431a029df8b9662bba Mon Sep 17 00:00:00 2001 From: Megan Potter Date: Tue, 24 Mar 2020 15:20:48 -0700 Subject: [PATCH 1/6] docs: fix a typo in a comment --- src/pubsub.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pubsub.ts b/src/pubsub.ts index cb792bd62..7105422e5 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -1137,7 +1137,7 @@ export class PubSub { /*! Developer Documentation * - * These methods can be agto-paginated. + * These methods can be auto-paginated. */ paginator.extend(PubSub, ['getSnapshots', 'getSubscriptions', 'getTopics']); From 3b0ca63a6b1d8a4bbf3ee4d03200aff1ca28b5b2 Mon Sep 17 00:00:00 2001 From: Megan Potter Date: Tue, 24 Mar 2020 15:21:08 -0700 Subject: [PATCH 2/6] feat: allow manually closing the server connections in the PubSub object --- src/pubsub.ts | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/pubsub.ts b/src/pubsub.ts index 7105422e5..e7a316043 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -247,6 +247,7 @@ export class PubSub { getTopicsStream = paginator.streamify('getTopics') as () => ObjectStream< Topic >; + isOpen = true; constructor(options?: ClientConfig) { options = options || {}; @@ -281,6 +282,33 @@ export class PubSub { } } + close(): Promise; + close(callback: EmptyCallback): void; + /** + * Closes out this object, releasing any server connections. Note that once + * you close a PubSub object, it may not be used again. Any pending operations + * (e.g. queued publish messages) will fail. If you have topic or subscription + * objects that may have pending operations, you should call close() on those + * first if you want any pending messages to be delivered correctly. The + * PubSub class doesn't track those. + * + * @callback EmptyCallback + * @returns {Promise} + */ + close(callback?: EmptyCallback): Promise | void { + const definedCallback = callback || (() => {}); + if (this.isOpen) { + this.closeAllClients_() + .then(() => { + definedCallback(null); + }) + .catch(definedCallback); + this.isOpen = false; + } else { + definedCallback(null); + } + } + createSubscription( topic: Topic | string, name: string, @@ -941,6 +969,23 @@ export class PubSub { return gaxClient; } + /** + * Close all open client objects. + * + * @private + * + * @returns {Promise} + */ + async closeAllClients_(): Promise { + const promises = []; + for (const clientConfig of Object.keys(this.api)) { + const gaxClient = this.api[clientConfig]; + promises.push(gaxClient.close()); + delete this.api[clientConfig]; + } + + await Promise.all(promises); + } /** * Funnel all API requests through this method, to be sure we have a project * ID. @@ -954,6 +999,19 @@ export class PubSub { * @param {function} [callback] The callback function. */ request(config: RequestConfig, callback: RequestCallback) { + // This prevents further requests, in case any publishers were hanging around. + if (!this.isOpen) { + const statusObject = { + code: 0, + details: 'Cannot use a closed PubSub object.', + metadata: null, + }; + const err = new Error(statusObject.details); + Object.assign(err, statusObject); + callback(err as ServiceError); + return; + } + this.getClient_(config, (err, client) => { if (err) { callback(err as ServiceError); From fe007773d8ac6d896ae13e18ca1842e2e5638a6f Mon Sep 17 00:00:00 2001 From: Megan Potter Date: Tue, 24 Mar 2020 16:15:17 -0700 Subject: [PATCH 3/6] feat: add flush() method for Topic objects --- src/publisher/index.ts | 36 +++++++++++++++++++++++++++++---- src/publisher/message-queues.ts | 9 ++++++--- src/topic.ts | 16 +++++++++++++++ 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/publisher/index.ts b/src/publisher/index.ts index fee6811e1..3e3ed9b17 100644 --- a/src/publisher/index.ts +++ b/src/publisher/index.ts @@ -14,14 +14,14 @@ * limitations under the License. */ -import {promisifyAll} from '@google-cloud/promisify'; +import {promisify, promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import {CallOptions} from 'google-gax'; -import {MessageBatch, BatchPublishOptions} from './message-batch'; -import {Queue, OrderedQueue} from './message-queues'; +import {BatchPublishOptions} from './message-batch'; +import {Queue, OrderedQueue, PublishDone} from './message-queues'; import {Topic} from '../topic'; -import {RequestCallback} from '../pubsub'; +import {RequestCallback, EmptyCallback, EmptyResponse} from '../pubsub'; import {google} from '../../proto/pubsub'; import {defaultOptions} from '../default-options'; @@ -72,6 +72,34 @@ export class Publisher { this.queue = new Queue(this); this.orderedQueues = new Map(); } + + flush(): Promise; + flush(callback: EmptyCallback): void; + /** + * Immediately sends all remaining queued data. This is mostly useful + * if you are planning to call close() on the PubSub object that holds + * the server connections. + * + * @private + * + * @param {EmptyCallback} [callback] Callback function. + * @returns {Promise} + */ + flush(callback?: EmptyCallback): Promise | void { + const definedCallback = callback ? callback : () => {}; + + const publishes = [promisify(this.queue.publish)()]; + Array.from(this.orderedQueues.values()).forEach(q => + publishes.push(promisify(q.publish)()) + ); + const allPublishes = Promise.all(publishes); + + allPublishes + .then(() => { + definedCallback(null); + }) + .catch(definedCallback); + } publish(data: Buffer, attributes?: Attributes): Promise; publish(data: Buffer, callback: PublishCallback): void; publish( diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index 0886d3624..2f357e207 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -131,7 +131,7 @@ export class Queue extends MessageQueue { /** * Cancels any pending publishes and calls _publish immediately. */ - publish(): void { + publish(callback?: PublishDone): void { const {messages, callbacks} = this.batch; this.batch = new MessageBatch(this.batchOptions); @@ -141,7 +141,7 @@ export class Queue extends MessageQueue { delete this.pending; } - this._publish(messages, callbacks); + this._publish(messages, callbacks, callback); } } @@ -259,7 +259,8 @@ export class OrderedQueue extends MessageQueue { * * @fires OrderedQueue#drain */ - publish(): void { + publish(callback?: PublishDone): void { + const definedCallback = callback || (() => {}); this.inFlight = true; if (this.pending) { @@ -274,10 +275,12 @@ export class OrderedQueue extends MessageQueue { if (err) { this.handlePublishFailure(err); + definedCallback(err); } else if (this.batches.length) { this.beginNextPublish(); } else { this.emit('drain'); + definedCallback(null); } }); } diff --git a/src/topic.ts b/src/topic.ts index 453b31c08..43f9278bd 100644 --- a/src/topic.ts +++ b/src/topic.ts @@ -181,6 +181,22 @@ export class Topic { this.iam = new IAM(pubsub, this.name); } + flush(): Promise; + flush(callback: EmptyCallback): void; + /** + * Immediately sends all remaining queued data. This is mostly useful + * if you are planning to call close() on the PubSub object that holds + * the server connections. + * + * @param {EmptyCallback} [callback] Callback function. + * @returns {Promise} + */ + flush(callback?: EmptyCallback): Promise | void { + // It doesn't matter here if callback is undefined; the Publisher + // flush() will handle it. + this.publisher.flush(callback!); + } + create(gaxOpts?: CallOptions): Promise; create(callback: CreateTopicCallback): void; create(gaxOpts: CallOptions, callback: CreateTopicCallback): void; From 89269a4d52d03fb09062234bfc927d11d9ed2371 Mon Sep 17 00:00:00 2001 From: Megan Potter Date: Tue, 24 Mar 2020 16:17:24 -0700 Subject: [PATCH 4/6] tests: add tests for new flush() and close() methods --- test/publisher/index.ts | 49 +++++++++++++++++++++++++++++++++++++++++ test/pubsub.ts | 49 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/test/publisher/index.ts b/test/publisher/index.ts index dd90fe301..59d5ae86b 100644 --- a/test/publisher/index.ts +++ b/test/publisher/index.ts @@ -47,6 +47,7 @@ class FakeQueue extends EventEmitter { this.publisher = publisher; } add(message: p.PubsubMessage, callback: p.PublishCallback): void {} + publish(callback: (err: Error | null) => void) {} } class FakeOrderedQueue extends FakeQueue { @@ -57,6 +58,7 @@ class FakeOrderedQueue extends FakeQueue { this.orderingKey = key; } resumePublishing(): void {} + publish(callback: (err: Error | null) => void) {} } describe('Publisher', () => { @@ -239,6 +241,34 @@ describe('Publisher', () => { assert.strictEqual(publisher.orderedQueues.size, 0); }); + + it('should drain any ordered queues on flush', done => { + // We have to stub out the regular queue as well, so that the flush() operation finishes. + sandbox + .stub(FakeQueue.prototype, 'publish') + .callsFake((callback: (err: Error | null) => void) => { + callback(null); + }); + + sandbox + .stub(FakeOrderedQueue.prototype, 'publish') + .callsFake((callback: (err: Error | null) => void) => { + const queue = (publisher.orderedQueues.get( + orderingKey + ) as unknown) as FakeOrderedQueue; + queue.emit('drain'); + callback(null); + }); + + publisher.orderedQueues.clear(); + publisher.publishMessage(fakeMessage, spy); + + publisher.flush(err => { + assert.strictEqual(err, null); + assert.strictEqual(publisher.orderedQueues.size, 0); + done(); + }); + }); }); }); @@ -315,4 +345,23 @@ describe('Publisher', () => { assert.strictEqual(publisher.settings.batching!.maxMessages, 1000); }); }); + + describe('flush', () => { + // The ordered queue drain test is above with the ordered queue tests. + it('should drain the main publish queue', done => { + sandbox.stub(publisher.queue, 'publish').callsFake(cb => { + if (cb) { + cb(null); + } + }); + publisher.flush(err => { + assert.strictEqual(err, null); + assert.strictEqual( + !publisher.queue.batch || publisher.queue.batch.messages.length === 0, + true + ); + done(); + }); + }); + }); }); diff --git a/test/pubsub.ts b/test/pubsub.ts index fd1dd8f26..78bb05b81 100644 --- a/test/pubsub.ts +++ b/test/pubsub.ts @@ -153,6 +153,14 @@ describe('PubSub', () => { const PUBSUB_EMULATOR_HOST = process.env.PUBSUB_EMULATOR_HOST; + interface PostCloseCallback { + (err: Error | null): void; + } + + interface PostCloseTest { + (callback: PostCloseCallback): void; + } + before(() => { delete process.env.PUBSUB_EMULATOR_HOST; PubSub = proxyquire('../src/pubsub', { @@ -243,6 +251,15 @@ describe('PubSub', () => { assert.deepStrictEqual(pubsub.api, {}); }); + it('should default to the opened state', () => { + assert.strictEqual(pubsub.isOpen, true); + }); + + it('should not be in the opened state after close()', async () => { + await pubsub.close(); + assert.strictEqual(pubsub.isOpen, false); + }); + it('should cache a local google-auth-library instance', () => { const fakeGoogleAuthInstance = {}; const options = { @@ -1041,6 +1058,21 @@ describe('PubSub', () => { }; }); + it('should throw if the PubSub is already closed', done => { + pubsub.close((err: Error | null) => { + assert.strictEqual(err, null); + + pubsub.request(CONFIG, (errInner: Error | null) => { + assert.notStrictEqual(errInner, null); + assert.strictEqual( + errInner!.message.indexOf('closed PubSub object') >= 0, + true + ); + done(); + }); + }); + }); + it('should call getClient_ with the correct config', done => { pubsub.getClient_ = config => { assert.strictEqual(config, CONFIG); @@ -1088,7 +1120,9 @@ describe('PubSub', () => { }); describe('getClientAsync_', () => { - const FAKE_CLIENT_INSTANCE = class {}; + const FAKE_CLIENT_INSTANCE = class { + close() {} + }; const CONFIG = ({ client: 'FakeClient', } as {}) as pubsubTypes.GetClientConfig; @@ -1100,6 +1134,19 @@ describe('PubSub', () => { afterEach(() => sandbox.restore()); + describe('closeAllClients_', () => { + it('should close out any open client', async () => { + // Create a client that we'll close. + const client = await pubsub.getClientAsync_(CONFIG); + + // Stub out its close method, and verify it gets called. + const stub = sandbox.stub(client, 'close').resolves(); + await pubsub.closeAllClients_(); + + assert.strictEqual(stub.called, true); + }); + }); + describe('project ID', () => { beforeEach(() => { delete pubsub.projectId; From 54b6ab07c4a594dd43ba826d8b5495a3680f54b6 Mon Sep 17 00:00:00 2001 From: Megan Potter Date: Thu, 19 Mar 2020 15:52:22 -0700 Subject: [PATCH 5/6] build: update github checkout action to v2 to fix spurious retry errors --- .github/workflows/ci.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4d36c57b1..4c9de0964 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -11,7 +11,7 @@ jobs: matrix: node: [8, 10, 12, 13] steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v2 - uses: actions/setup-node@v1 with: node-version: ${{ matrix.node }} @@ -30,7 +30,7 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v2 - uses: actions/setup-node@v1 with: node-version: 12 @@ -39,7 +39,7 @@ jobs: docs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v2 - uses: actions/setup-node@v1 with: node-version: 12 @@ -48,7 +48,7 @@ jobs: coverage: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v2 - uses: actions/setup-node@v1 with: node-version: 12 From 735c3cc0088e1f88d48cebb7fe1f51e6e3b609e3 Mon Sep 17 00:00:00 2001 From: Megan Potter Date: Wed, 25 Mar 2020 16:19:08 -0700 Subject: [PATCH 6/6] fix: set isOpen to false before trying to close it so that all usage will stop --- src/pubsub.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pubsub.ts b/src/pubsub.ts index e7a316043..4b81e2a84 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -298,12 +298,12 @@ export class PubSub { close(callback?: EmptyCallback): Promise | void { const definedCallback = callback || (() => {}); if (this.isOpen) { + this.isOpen = false; this.closeAllClients_() .then(() => { definedCallback(null); }) .catch(definedCallback); - this.isOpen = false; } else { definedCallback(null); }