diff --git a/src/publisher/index.ts b/src/publisher/index.ts index fee6811e1..3e3ed9b17 100644 --- a/src/publisher/index.ts +++ b/src/publisher/index.ts @@ -14,14 +14,14 @@ * limitations under the License. */ -import {promisifyAll} from '@google-cloud/promisify'; +import {promisify, promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import {CallOptions} from 'google-gax'; -import {MessageBatch, BatchPublishOptions} from './message-batch'; -import {Queue, OrderedQueue} from './message-queues'; +import {BatchPublishOptions} from './message-batch'; +import {Queue, OrderedQueue, PublishDone} from './message-queues'; import {Topic} from '../topic'; -import {RequestCallback} from '../pubsub'; +import {RequestCallback, EmptyCallback, EmptyResponse} from '../pubsub'; import {google} from '../../proto/pubsub'; import {defaultOptions} from '../default-options'; @@ -72,6 +72,34 @@ export class Publisher { this.queue = new Queue(this); this.orderedQueues = new Map(); } + + flush(): Promise; + flush(callback: EmptyCallback): void; + /** + * Immediately sends all remaining queued data. This is mostly useful + * if you are planning to call close() on the PubSub object that holds + * the server connections. + * + * @private + * + * @param {EmptyCallback} [callback] Callback function. + * @returns {Promise} + */ + flush(callback?: EmptyCallback): Promise | void { + const definedCallback = callback ? callback : () => {}; + + const publishes = [promisify(this.queue.publish)()]; + Array.from(this.orderedQueues.values()).forEach(q => + publishes.push(promisify(q.publish)()) + ); + const allPublishes = Promise.all(publishes); + + allPublishes + .then(() => { + definedCallback(null); + }) + .catch(definedCallback); + } publish(data: Buffer, attributes?: Attributes): Promise; publish(data: Buffer, callback: PublishCallback): void; publish( diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index 0886d3624..2f357e207 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -131,7 +131,7 @@ export class Queue extends MessageQueue { /** * Cancels any pending publishes and calls _publish immediately. */ - publish(): void { + publish(callback?: PublishDone): void { const {messages, callbacks} = this.batch; this.batch = new MessageBatch(this.batchOptions); @@ -141,7 +141,7 @@ export class Queue extends MessageQueue { delete this.pending; } - this._publish(messages, callbacks); + this._publish(messages, callbacks, callback); } } @@ -259,7 +259,8 @@ export class OrderedQueue extends MessageQueue { * * @fires OrderedQueue#drain */ - publish(): void { + publish(callback?: PublishDone): void { + const definedCallback = callback || (() => {}); this.inFlight = true; if (this.pending) { @@ -274,10 +275,12 @@ export class OrderedQueue extends MessageQueue { if (err) { this.handlePublishFailure(err); + definedCallback(err); } else if (this.batches.length) { this.beginNextPublish(); } else { this.emit('drain'); + definedCallback(null); } }); } diff --git a/src/pubsub.ts b/src/pubsub.ts index cb792bd62..4b81e2a84 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -247,6 +247,7 @@ export class PubSub { getTopicsStream = paginator.streamify('getTopics') as () => ObjectStream< Topic >; + isOpen = true; constructor(options?: ClientConfig) { options = options || {}; @@ -281,6 +282,33 @@ export class PubSub { } } + close(): Promise; + close(callback: EmptyCallback): void; + /** + * Closes out this object, releasing any server connections. Note that once + * you close a PubSub object, it may not be used again. Any pending operations + * (e.g. queued publish messages) will fail. If you have topic or subscription + * objects that may have pending operations, you should call close() on those + * first if you want any pending messages to be delivered correctly. The + * PubSub class doesn't track those. + * + * @callback EmptyCallback + * @returns {Promise} + */ + close(callback?: EmptyCallback): Promise | void { + const definedCallback = callback || (() => {}); + if (this.isOpen) { + this.isOpen = false; + this.closeAllClients_() + .then(() => { + definedCallback(null); + }) + .catch(definedCallback); + } else { + definedCallback(null); + } + } + createSubscription( topic: Topic | string, name: string, @@ -941,6 +969,23 @@ export class PubSub { return gaxClient; } + /** + * Close all open client objects. + * + * @private + * + * @returns {Promise} + */ + async closeAllClients_(): Promise { + const promises = []; + for (const clientConfig of Object.keys(this.api)) { + const gaxClient = this.api[clientConfig]; + promises.push(gaxClient.close()); + delete this.api[clientConfig]; + } + + await Promise.all(promises); + } /** * Funnel all API requests through this method, to be sure we have a project * ID. @@ -954,6 +999,19 @@ export class PubSub { * @param {function} [callback] The callback function. */ request(config: RequestConfig, callback: RequestCallback) { + // This prevents further requests, in case any publishers were hanging around. + if (!this.isOpen) { + const statusObject = { + code: 0, + details: 'Cannot use a closed PubSub object.', + metadata: null, + }; + const err = new Error(statusObject.details); + Object.assign(err, statusObject); + callback(err as ServiceError); + return; + } + this.getClient_(config, (err, client) => { if (err) { callback(err as ServiceError); @@ -1137,7 +1195,7 @@ export class PubSub { /*! Developer Documentation * - * These methods can be agto-paginated. + * These methods can be auto-paginated. */ paginator.extend(PubSub, ['getSnapshots', 'getSubscriptions', 'getTopics']); diff --git a/src/topic.ts b/src/topic.ts index 453b31c08..43f9278bd 100644 --- a/src/topic.ts +++ b/src/topic.ts @@ -181,6 +181,22 @@ export class Topic { this.iam = new IAM(pubsub, this.name); } + flush(): Promise; + flush(callback: EmptyCallback): void; + /** + * Immediately sends all remaining queued data. This is mostly useful + * if you are planning to call close() on the PubSub object that holds + * the server connections. + * + * @param {EmptyCallback} [callback] Callback function. + * @returns {Promise} + */ + flush(callback?: EmptyCallback): Promise | void { + // It doesn't matter here if callback is undefined; the Publisher + // flush() will handle it. + this.publisher.flush(callback!); + } + create(gaxOpts?: CallOptions): Promise; create(callback: CreateTopicCallback): void; create(gaxOpts: CallOptions, callback: CreateTopicCallback): void; diff --git a/synth.metadata b/synth.metadata index 56a176b13..a2bde0fb7 100644 --- a/synth.metadata +++ b/synth.metadata @@ -1,5 +1,5 @@ { - "updateTime": "2020-04-01T12:27:37.675305Z", + "updateTime": "2020-04-01T12:26:59.195010Z", "sources": [ { "generator": { diff --git a/test/publisher/index.ts b/test/publisher/index.ts index dd90fe301..59d5ae86b 100644 --- a/test/publisher/index.ts +++ b/test/publisher/index.ts @@ -47,6 +47,7 @@ class FakeQueue extends EventEmitter { this.publisher = publisher; } add(message: p.PubsubMessage, callback: p.PublishCallback): void {} + publish(callback: (err: Error | null) => void) {} } class FakeOrderedQueue extends FakeQueue { @@ -57,6 +58,7 @@ class FakeOrderedQueue extends FakeQueue { this.orderingKey = key; } resumePublishing(): void {} + publish(callback: (err: Error | null) => void) {} } describe('Publisher', () => { @@ -239,6 +241,34 @@ describe('Publisher', () => { assert.strictEqual(publisher.orderedQueues.size, 0); }); + + it('should drain any ordered queues on flush', done => { + // We have to stub out the regular queue as well, so that the flush() operation finishes. + sandbox + .stub(FakeQueue.prototype, 'publish') + .callsFake((callback: (err: Error | null) => void) => { + callback(null); + }); + + sandbox + .stub(FakeOrderedQueue.prototype, 'publish') + .callsFake((callback: (err: Error | null) => void) => { + const queue = (publisher.orderedQueues.get( + orderingKey + ) as unknown) as FakeOrderedQueue; + queue.emit('drain'); + callback(null); + }); + + publisher.orderedQueues.clear(); + publisher.publishMessage(fakeMessage, spy); + + publisher.flush(err => { + assert.strictEqual(err, null); + assert.strictEqual(publisher.orderedQueues.size, 0); + done(); + }); + }); }); }); @@ -315,4 +345,23 @@ describe('Publisher', () => { assert.strictEqual(publisher.settings.batching!.maxMessages, 1000); }); }); + + describe('flush', () => { + // The ordered queue drain test is above with the ordered queue tests. + it('should drain the main publish queue', done => { + sandbox.stub(publisher.queue, 'publish').callsFake(cb => { + if (cb) { + cb(null); + } + }); + publisher.flush(err => { + assert.strictEqual(err, null); + assert.strictEqual( + !publisher.queue.batch || publisher.queue.batch.messages.length === 0, + true + ); + done(); + }); + }); + }); }); diff --git a/test/pubsub.ts b/test/pubsub.ts index fd1dd8f26..78bb05b81 100644 --- a/test/pubsub.ts +++ b/test/pubsub.ts @@ -153,6 +153,14 @@ describe('PubSub', () => { const PUBSUB_EMULATOR_HOST = process.env.PUBSUB_EMULATOR_HOST; + interface PostCloseCallback { + (err: Error | null): void; + } + + interface PostCloseTest { + (callback: PostCloseCallback): void; + } + before(() => { delete process.env.PUBSUB_EMULATOR_HOST; PubSub = proxyquire('../src/pubsub', { @@ -243,6 +251,15 @@ describe('PubSub', () => { assert.deepStrictEqual(pubsub.api, {}); }); + it('should default to the opened state', () => { + assert.strictEqual(pubsub.isOpen, true); + }); + + it('should not be in the opened state after close()', async () => { + await pubsub.close(); + assert.strictEqual(pubsub.isOpen, false); + }); + it('should cache a local google-auth-library instance', () => { const fakeGoogleAuthInstance = {}; const options = { @@ -1041,6 +1058,21 @@ describe('PubSub', () => { }; }); + it('should throw if the PubSub is already closed', done => { + pubsub.close((err: Error | null) => { + assert.strictEqual(err, null); + + pubsub.request(CONFIG, (errInner: Error | null) => { + assert.notStrictEqual(errInner, null); + assert.strictEqual( + errInner!.message.indexOf('closed PubSub object') >= 0, + true + ); + done(); + }); + }); + }); + it('should call getClient_ with the correct config', done => { pubsub.getClient_ = config => { assert.strictEqual(config, CONFIG); @@ -1088,7 +1120,9 @@ describe('PubSub', () => { }); describe('getClientAsync_', () => { - const FAKE_CLIENT_INSTANCE = class {}; + const FAKE_CLIENT_INSTANCE = class { + close() {} + }; const CONFIG = ({ client: 'FakeClient', } as {}) as pubsubTypes.GetClientConfig; @@ -1100,6 +1134,19 @@ describe('PubSub', () => { afterEach(() => sandbox.restore()); + describe('closeAllClients_', () => { + it('should close out any open client', async () => { + // Create a client that we'll close. + const client = await pubsub.getClientAsync_(CONFIG); + + // Stub out its close method, and verify it gets called. + const stub = sandbox.stub(client, 'close').resolves(); + await pubsub.closeAllClients_(); + + assert.strictEqual(stub.called, true); + }); + }); + describe('project ID', () => { beforeEach(() => { delete pubsub.projectId;