diff --git a/CHANGELOG.md b/CHANGELOG.md index d5981c755..f99462d36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,16 @@ [1]: https://www.npmjs.com/package/@google-cloud/pubsub?activeTab=versions +## [3.7.4](https://github.com/googleapis/nodejs-pubsub/compare/v3.7.3...v3.7.4) (2023-09-08) + + +### Bug Fixes + +* Always fill the topic and sub names when creating from a PubSub object ([#1816](https://github.com/googleapis/nodejs-pubsub/issues/1816)) ([ddf8b8a](https://github.com/googleapis/nodejs-pubsub/commit/ddf8b8ab288387e51db816df825594cb5190b837)) +* Make retry policy back off more aggressively for RPCs that retry RESOURCE_EXHAUSTD ([#1806](https://github.com/googleapis/nodejs-pubsub/issues/1806)) ([bfcf523](https://github.com/googleapis/nodejs-pubsub/commit/bfcf523da4786d44cc35560aa91962363f475fbb)) +* Set grpc keepalive time|outs by default ([#1814](https://github.com/googleapis/nodejs-pubsub/issues/1814)) ([dedfdea](https://github.com/googleapis/nodejs-pubsub/commit/dedfdea7a47fc19de981218a1d7502c75fdde488)) +* Simplify logic for HTTP/1.1 REST fallback option ([#1809](https://github.com/googleapis/nodejs-pubsub/issues/1809)) ([ee09b69](https://github.com/googleapis/nodejs-pubsub/commit/ee09b69092a16e2e12bc8e2647d48f627e013afd)) + ## [3.7.3](https://github.com/googleapis/nodejs-pubsub/compare/v3.7.2...v3.7.3) (2023-07-26) diff --git a/package.json b/package.json index 9da69f11f..dfb18ce34 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@google-cloud/pubsub", "description": "Cloud Pub/Sub Client Library for Node.js", - "version": "3.7.3", + "version": "3.7.4", "license": "Apache-2.0", "author": "Google Inc.", "engines": { @@ -51,7 +51,7 @@ "@google-cloud/precise-date": "^3.0.0", "@google-cloud/projectify": "^3.0.0", "@google-cloud/promisify": "^2.0.0", - "@opentelemetry/api": "^1.0.0", + "@opentelemetry/api": "^1.6.0", "@opentelemetry/semantic-conventions": "~1.3.0", "@types/duplexify": "^3.6.0", "@types/long": "^4.0.0", diff --git a/samples/package.json b/samples/package.json index 62c162c7b..a1571ab0e 100644 --- a/samples/package.json +++ b/samples/package.json @@ -21,7 +21,7 @@ "precompile": "npm run clean" }, "dependencies": { - "@google-cloud/pubsub": "^3.7.3", + "@google-cloud/pubsub": "^3.7.4", "@opentelemetry/api": "^1.0.0", "@opentelemetry/tracing": "^0.24.0", "avro-js": "^1.10.1", diff --git a/src/pubsub.ts b/src/pubsub.ts index e2bb34594..e7fc0d819 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -272,16 +272,16 @@ export class PubSub { private schemaClient?: SchemaServiceClient; constructor(options?: ClientConfig) { - options = Object.assign({}, options || {}); - - // Needed for potentially large responses that may come from using exactly-once delivery. - // This will get passed down to grpc client objects. - const maxMetadataSize = 'grpc.max_metadata_size'; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const optionsAny = options as any; - if (optionsAny[maxMetadataSize] === undefined) { - optionsAny[maxMetadataSize] = 4 * 1024 * 1024; // 4 MiB - } + // Needed for potentially large responses that may come from using exactly-once delivery, + // as well as trying to work around silent connection failures. + // + // These will get passed down to grpc client objects. User values will overwrite these. + const grpcDefaults = { + 'grpc.max_metadata_size': 4 * 1024 * 1024, // 4 MiB + 'grpc.keepalive_time_ms': 300000, // 5 minutes + 'grpc.keepalive_timeout_ms': 20000, // 20 seconds + }; + options = Object.assign(grpcDefaults, options || {}); // Determine what scopes are needed. // It is the union of the scopes on both clients. @@ -561,6 +561,17 @@ export class PubSub { return; } subscription.metadata = resp!; + + // If this is the first call we've made, the projectId might be empty still. + if (subscription.name?.includes(PROJECT_ID_PLACEHOLDER)) { + if (subscription.metadata && subscription.metadata.name) { + subscription.name = Subscription.formatName_( + this.projectId, + subscription.metadata.name + ); + } + } + callback!(null, subscription, resp!); } ); @@ -655,6 +666,14 @@ export class PubSub { return; } topic.metadata = resp!; + + // If this is the first call we've made, the projectId might be empty still. + if (topic.name?.includes(PROJECT_ID_PLACEHOLDER)) { + if (topic.metadata && topic.metadata.name) { + topic.name = Topic.formatName_(this.projectId, topic.metadata.name); + } + } + callback!(null, topic, resp!); } ); diff --git a/test/pubsub.ts b/test/pubsub.ts index 3d56f167c..668857690 100644 --- a/test/pubsub.ts +++ b/test/pubsub.ts @@ -33,7 +33,6 @@ const PKG = require('../../package.json'); const sandbox = sinon.createSandbox(); const fakeCreds = {} as gax.grpc.ChannelCredentials; -sandbox.stub(gax.grpc.credentials, 'createInsecure').returns(fakeCreds); const subscriptionCached = subby.Subscription; @@ -49,6 +48,11 @@ function Subscription( return new overrideFn(pubsub, name, options); } +// eslint-disable-next-line @typescript-eslint/no-explicit-any +(Subscription as any).formatName_ = (): string => { + return 'formatted'; +}; + let promisified = false; const fakeUtil = Object.assign({}, util, { promisifySome( @@ -92,6 +96,10 @@ class FakeTopic { constructor(...args: Array<{}>) { this.calledWith_ = args; } + + static formatName_(): string { + return 'foo'; + } } let extended = false; @@ -187,15 +195,25 @@ describe('PubSub', () => { googleAuthOverride = null; pubsub = new PubSub(OPTIONS); pubsub.projectId = PROJECT_ID; + sandbox.stub(gax.grpc.credentials, 'createInsecure').returns(fakeCreds); + }); + + afterEach(() => { + sandbox.restore(); }); describe('instantiation', () => { const maxMetadataSizeKey = 'grpc.max_metadata_size'; + const keepaliveTimeKey = 'grpc.keepalive_time_ms'; + const keepaliveTimeoutKey = 'grpc.keepalive_timeout_ms'; + const DEFAULT_OPTIONS = { libName: 'gccl', libVersion: PKG.version, scopes: [], [maxMetadataSizeKey]: 4 * 1024 * 1024, + [keepaliveTimeKey]: 300000, + [keepaliveTimeoutKey]: 20000, }; it('should extend the correct methods', () => { @@ -216,18 +234,24 @@ describe('PubSub', () => { assert(new PubSub() instanceof PubSub); }); - it('should augment the gRPC options for metadata size', () => { + it('should augment the gRPC options', () => { let pubsub = new PubSub(); // eslint-disable-next-line @typescript-eslint/no-explicit-any let optionsAny: any = pubsub.options; assert.strictEqual(optionsAny[maxMetadataSizeKey], 4 * 1024 * 1024); + assert.strictEqual(optionsAny[keepaliveTimeKey], 300000); + assert.strictEqual(optionsAny[keepaliveTimeoutKey], 20000); optionsAny = { [maxMetadataSizeKey]: 1 * 1024 * 1024, + [keepaliveTimeKey]: 30, + [keepaliveTimeoutKey]: 100, }; pubsub = new PubSub(optionsAny); optionsAny = pubsub.options; assert.strictEqual(optionsAny[maxMetadataSizeKey], 1 * 1024 * 1024); + assert.strictEqual(optionsAny[keepaliveTimeKey], 30); + assert.strictEqual(optionsAny[keepaliveTimeoutKey], 100); }); it('should combine all required scopes', () => { @@ -543,13 +567,15 @@ describe('PubSub', () => { it('should return Subscription & resp to the callback', done => { const subscription = {}; - pubsub.subscription = () => { + sandbox.stub(pubsub, 'subscription').callsFake(() => { return subscription as subby.Subscription; - }; + }); - pubsub.request = (config, callback: Function) => { - callback(null, apiResponse); - }; + sandbox + .stub(pubsub, 'request') + .callsFake((config, callback: Function) => { + callback(null, apiResponse); + }); function callback( err?: Error | null, @@ -564,6 +590,31 @@ describe('PubSub', () => { pubsub.createSubscription?.(TOPIC_NAME, SUB_NAME, callback); }); + + it('should fill the subscription object name if projectId was empty', async () => { + const subscription = {}; + pubsub.projectId = undefined; + sandbox.stub(pubsub, 'subscription').callsFake(() => { + // Simulate the project ID not being resolved. + const sub = subscription as subby.Subscription; + sub.name = '{{projectId}}/foo/bar'; + return sub; + }); + + sandbox + .stub(pubsub, 'request') + .callsFake((config, callback: Function) => { + callback(null, apiResponse); + }); + + const [sub, resp] = await pubsub.createSubscription!( + TOPIC_NAME, + SUB_NAME + )!; + assert.strictEqual(sub, subscription); + assert.strictEqual(sub.name.includes('{{'), false); + assert.strictEqual(resp, apiResponse); + }); }); }); @@ -614,12 +665,17 @@ describe('PubSub', () => { }); describe('success', () => { - const apiResponse = {}; + const apiResponse = { + name: 'new-topic', + }; + let requestStub: sinon.SinonStub; beforeEach(() => { - pubsub.request = (config, callback: Function) => { - callback(null, apiResponse); - }; + requestStub = sandbox + .stub(pubsub, 'request') + .callsFake((config, callback: Function) => { + callback(null, apiResponse); + }); }); it('should return a Topic object', done => { @@ -645,6 +701,33 @@ describe('PubSub', () => { done(); }); }); + + it('should fill the topic object name if projectId was empty', async () => { + const topicName = 'new-topic'; + const topicInstance = {}; + + sandbox.stub(pubsub, 'topic').callsFake(name => { + assert.strictEqual(name, topicName); + + // Simulate the project ID not being resolved. + const topic = topicInstance as Topic; + topic.name = 'projects/{{projectId}}/topics/new-topic'; + return topic; + }); + + requestStub.restore(); + sandbox + .stub(pubsub, 'request') + .callsFake((config, callback: Function) => { + pubsub.projectId = 'projectId'; + callback(null, apiResponse); + }); + + const [topic, resp] = await pubsub.createTopic!(topicName)!; + assert.strictEqual(topic, topicInstance); + assert.strictEqual(topic.name.includes('{{'), false); + assert.strictEqual(resp, apiResponse); + }); }); });