Skip to content

Commit

Permalink
feat: add a close() method to PubSub, and a flush() method to Topic/P…
Browse files Browse the repository at this point in the history
…ublisher (#916)

4097995
commit 4097995
Author: Megan Potter <[email protected]>
Date:   Thu Mar 26 11:03:26 2020 -0700

    feat: add a close() method to PubSub, and a flush() method to Topic/Publisher (#916)

    * docs: fix a typo in a comment

    * feat: allow manually closing the server connections in the PubSub object

    * feat: add flush() method for Topic objects

    * tests: add tests for new flush() and close() methods

    * build: update github checkout action to v2 to fix spurious retry errors

    * fix: set isOpen to false before trying to close it so that all usage will stop
  • Loading branch information
yoshi-automation committed Apr 1, 2020
1 parent c77aac4 commit a56e5c4
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 10 deletions.
36 changes: 32 additions & 4 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
* limitations under the License.
*/

import {promisifyAll} from '@google-cloud/promisify';
import {promisify, promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import {CallOptions} from 'google-gax';

import {MessageBatch, BatchPublishOptions} from './message-batch';
import {Queue, OrderedQueue} from './message-queues';
import {BatchPublishOptions} from './message-batch';
import {Queue, OrderedQueue, PublishDone} from './message-queues';
import {Topic} from '../topic';
import {RequestCallback} from '../pubsub';
import {RequestCallback, EmptyCallback, EmptyResponse} from '../pubsub';
import {google} from '../../proto/pubsub';
import {defaultOptions} from '../default-options';

Expand Down Expand Up @@ -72,6 +72,34 @@ export class Publisher {
this.queue = new Queue(this);
this.orderedQueues = new Map();
}

flush(): Promise<void>;
flush(callback: EmptyCallback): void;
/**
* Immediately sends all remaining queued data. This is mostly useful
* if you are planning to call close() on the PubSub object that holds
* the server connections.
*
* @private
*
* @param {EmptyCallback} [callback] Callback function.
* @returns {Promise<EmptyResponse>}
*/
flush(callback?: EmptyCallback): Promise<void> | void {
const definedCallback = callback ? callback : () => {};

const publishes = [promisify(this.queue.publish)()];
Array.from(this.orderedQueues.values()).forEach(q =>
publishes.push(promisify(q.publish)())
);
const allPublishes = Promise.all(publishes);

allPublishes
.then(() => {
definedCallback(null);
})
.catch(definedCallback);
}
publish(data: Buffer, attributes?: Attributes): Promise<string>;
publish(data: Buffer, callback: PublishCallback): void;
publish(
Expand Down
9 changes: 6 additions & 3 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class Queue extends MessageQueue {
/**
* Cancels any pending publishes and calls _publish immediately.
*/
publish(): void {
publish(callback?: PublishDone): void {
const {messages, callbacks} = this.batch;

this.batch = new MessageBatch(this.batchOptions);
Expand All @@ -141,7 +141,7 @@ export class Queue extends MessageQueue {
delete this.pending;
}

this._publish(messages, callbacks);
this._publish(messages, callbacks, callback);
}
}

Expand Down Expand Up @@ -259,7 +259,8 @@ export class OrderedQueue extends MessageQueue {
*
* @fires OrderedQueue#drain
*/
publish(): void {
publish(callback?: PublishDone): void {
const definedCallback = callback || (() => {});
this.inFlight = true;

if (this.pending) {
Expand All @@ -274,10 +275,12 @@ export class OrderedQueue extends MessageQueue {

if (err) {
this.handlePublishFailure(err);
definedCallback(err);
} else if (this.batches.length) {
this.beginNextPublish();
} else {
this.emit('drain');
definedCallback(null);
}
});
}
Expand Down
60 changes: 59 additions & 1 deletion src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ export class PubSub {
getTopicsStream = paginator.streamify('getTopics') as () => ObjectStream<
Topic
>;
isOpen = true;

constructor(options?: ClientConfig) {
options = options || {};
Expand Down Expand Up @@ -281,6 +282,33 @@ export class PubSub {
}
}

close(): Promise<void>;
close(callback: EmptyCallback): void;
/**
* Closes out this object, releasing any server connections. Note that once
* you close a PubSub object, it may not be used again. Any pending operations
* (e.g. queued publish messages) will fail. If you have topic or subscription
* objects that may have pending operations, you should call close() on those
* first if you want any pending messages to be delivered correctly. The
* PubSub class doesn't track those.
*
* @callback EmptyCallback
* @returns {Promise<void>}
*/
close(callback?: EmptyCallback): Promise<void> | void {
const definedCallback = callback || (() => {});
if (this.isOpen) {
this.isOpen = false;
this.closeAllClients_()
.then(() => {
definedCallback(null);
})
.catch(definedCallback);
} else {
definedCallback(null);
}
}

createSubscription(
topic: Topic | string,
name: string,
Expand Down Expand Up @@ -941,6 +969,23 @@ export class PubSub {

return gaxClient;
}
/**
* Close all open client objects.
*
* @private
*
* @returns {Promise}
*/
async closeAllClients_(): Promise<void> {
const promises = [];
for (const clientConfig of Object.keys(this.api)) {
const gaxClient = this.api[clientConfig];
promises.push(gaxClient.close());
delete this.api[clientConfig];
}

await Promise.all(promises);
}
/**
* Funnel all API requests through this method, to be sure we have a project
* ID.
Expand All @@ -954,6 +999,19 @@ export class PubSub {
* @param {function} [callback] The callback function.
*/
request<T, R = void>(config: RequestConfig, callback: RequestCallback<T, R>) {
// This prevents further requests, in case any publishers were hanging around.
if (!this.isOpen) {
const statusObject = {
code: 0,
details: 'Cannot use a closed PubSub object.',
metadata: null,
};
const err = new Error(statusObject.details);
Object.assign(err, statusObject);
callback(err as ServiceError);
return;
}

this.getClient_(config, (err, client) => {
if (err) {
callback(err as ServiceError);
Expand Down Expand Up @@ -1137,7 +1195,7 @@ export class PubSub {

/*! Developer Documentation
*
* These methods can be agto-paginated.
* These methods can be auto-paginated.
*/
paginator.extend(PubSub, ['getSnapshots', 'getSubscriptions', 'getTopics']);

Expand Down
16 changes: 16 additions & 0 deletions src/topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,22 @@ export class Topic {
this.iam = new IAM(pubsub, this.name);
}

flush(): Promise<void>;
flush(callback: EmptyCallback): void;
/**
* Immediately sends all remaining queued data. This is mostly useful
* if you are planning to call close() on the PubSub object that holds
* the server connections.
*
* @param {EmptyCallback} [callback] Callback function.
* @returns {Promise<EmptyResponse>}
*/
flush(callback?: EmptyCallback): Promise<void> | void {
// It doesn't matter here if callback is undefined; the Publisher
// flush() will handle it.
this.publisher.flush(callback!);
}

create(gaxOpts?: CallOptions): Promise<CreateTopicResponse>;
create(callback: CreateTopicCallback): void;
create(gaxOpts: CallOptions, callback: CreateTopicCallback): void;
Expand Down
2 changes: 1 addition & 1 deletion synth.metadata
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"updateTime": "2020-04-01T12:27:37.675305Z",
"updateTime": "2020-04-01T12:26:59.195010Z",
"sources": [
{
"generator": {
Expand Down
49 changes: 49 additions & 0 deletions test/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class FakeQueue extends EventEmitter {
this.publisher = publisher;
}
add(message: p.PubsubMessage, callback: p.PublishCallback): void {}
publish(callback: (err: Error | null) => void) {}
}

class FakeOrderedQueue extends FakeQueue {
Expand All @@ -57,6 +58,7 @@ class FakeOrderedQueue extends FakeQueue {
this.orderingKey = key;
}
resumePublishing(): void {}
publish(callback: (err: Error | null) => void) {}
}

describe('Publisher', () => {
Expand Down Expand Up @@ -239,6 +241,34 @@ describe('Publisher', () => {

assert.strictEqual(publisher.orderedQueues.size, 0);
});

it('should drain any ordered queues on flush', done => {
// We have to stub out the regular queue as well, so that the flush() operation finishes.
sandbox
.stub(FakeQueue.prototype, 'publish')
.callsFake((callback: (err: Error | null) => void) => {
callback(null);
});

sandbox
.stub(FakeOrderedQueue.prototype, 'publish')
.callsFake((callback: (err: Error | null) => void) => {
const queue = (publisher.orderedQueues.get(
orderingKey
) as unknown) as FakeOrderedQueue;
queue.emit('drain');
callback(null);
});

publisher.orderedQueues.clear();
publisher.publishMessage(fakeMessage, spy);

publisher.flush(err => {
assert.strictEqual(err, null);
assert.strictEqual(publisher.orderedQueues.size, 0);
done();
});
});
});
});

Expand Down Expand Up @@ -315,4 +345,23 @@ describe('Publisher', () => {
assert.strictEqual(publisher.settings.batching!.maxMessages, 1000);
});
});

describe('flush', () => {
// The ordered queue drain test is above with the ordered queue tests.
it('should drain the main publish queue', done => {
sandbox.stub(publisher.queue, 'publish').callsFake(cb => {
if (cb) {
cb(null);
}
});
publisher.flush(err => {
assert.strictEqual(err, null);
assert.strictEqual(
!publisher.queue.batch || publisher.queue.batch.messages.length === 0,
true
);
done();
});
});
});
});
49 changes: 48 additions & 1 deletion test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ describe('PubSub', () => {

const PUBSUB_EMULATOR_HOST = process.env.PUBSUB_EMULATOR_HOST;

interface PostCloseCallback {
(err: Error | null): void;
}

interface PostCloseTest {
(callback: PostCloseCallback): void;
}

before(() => {
delete process.env.PUBSUB_EMULATOR_HOST;
PubSub = proxyquire('../src/pubsub', {
Expand Down Expand Up @@ -243,6 +251,15 @@ describe('PubSub', () => {
assert.deepStrictEqual(pubsub.api, {});
});

it('should default to the opened state', () => {
assert.strictEqual(pubsub.isOpen, true);
});

it('should not be in the opened state after close()', async () => {
await pubsub.close();
assert.strictEqual(pubsub.isOpen, false);
});

it('should cache a local google-auth-library instance', () => {
const fakeGoogleAuthInstance = {};
const options = {
Expand Down Expand Up @@ -1041,6 +1058,21 @@ describe('PubSub', () => {
};
});

it('should throw if the PubSub is already closed', done => {
pubsub.close((err: Error | null) => {
assert.strictEqual(err, null);

pubsub.request(CONFIG, (errInner: Error | null) => {
assert.notStrictEqual(errInner, null);
assert.strictEqual(
errInner!.message.indexOf('closed PubSub object') >= 0,
true
);
done();
});
});
});

it('should call getClient_ with the correct config', done => {
pubsub.getClient_ = config => {
assert.strictEqual(config, CONFIG);
Expand Down Expand Up @@ -1088,7 +1120,9 @@ describe('PubSub', () => {
});

describe('getClientAsync_', () => {
const FAKE_CLIENT_INSTANCE = class {};
const FAKE_CLIENT_INSTANCE = class {
close() {}
};
const CONFIG = ({
client: 'FakeClient',
} as {}) as pubsubTypes.GetClientConfig;
Expand All @@ -1100,6 +1134,19 @@ describe('PubSub', () => {

afterEach(() => sandbox.restore());

describe('closeAllClients_', () => {
it('should close out any open client', async () => {
// Create a client that we'll close.
const client = await pubsub.getClientAsync_(CONFIG);

// Stub out its close method, and verify it gets called.
const stub = sandbox.stub(client, 'close').resolves();
await pubsub.closeAllClients_();

assert.strictEqual(stub.called, true);
});
});

describe('project ID', () => {
beforeEach(() => {
delete pubsub.projectId;
Expand Down

0 comments on commit a56e5c4

Please sign in to comment.