From c69383088833056704ea942a59da762cc0d230d6 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Mon, 17 Aug 2020 16:33:45 -0400 Subject: [PATCH] docs: Add and improve ordering keys examples (#1071) * chore: Remove notes about ordering keys being experimental. * feat: Add support for server-side flow control * Revert "chore: Remove notes about ordering keys being experimental." This reverts commit d02f328896628067ef071031ea3c56ca6b3c9da6. * docs: Add and improve ordering keys samples * Fix test format and documentation * Lint fixes Co-authored-by: Megan Potter <57276408+feywind@users.noreply.github.com> --- protos/protos.js | 144 +++++----------------- samples/README.md | 40 +++++- samples/createSubscriptionWithOrdering.js | 64 ++++++++++ samples/publishOrderedMessage.js | 14 ++- samples/resumePublish.js | 85 +++++++++++++ samples/system-test/subscriptions.test.js | 18 +++ samples/system-test/topics.test.js | 41 +++--- 7 files changed, 270 insertions(+), 136 deletions(-) create mode 100644 samples/createSubscriptionWithOrdering.js create mode 100644 samples/resumePublish.js diff --git a/protos/protos.js b/protos/protos.js index d24ee2495..374b6140d 100644 --- a/protos/protos.js +++ b/protos/protos.js @@ -715,7 +715,7 @@ Topic.decode = function decode(reader, length) { if (!(reader instanceof $Reader)) reader = $Reader.create(reader); - var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.Topic(), key, value; + var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.Topic(), key; while (reader.pos < end) { var tag = reader.uint32(); switch (tag >>> 3) { @@ -723,26 +723,12 @@ message.name = reader.string(); break; case 2: + reader.skip().pos++; if (message.labels === $util.emptyObject) message.labels = {}; - var end2 = reader.uint32() + reader.pos; - key = ""; - value = ""; - while (reader.pos < end2) { - var tag2 = reader.uint32(); - switch (tag2 >>> 3) { - case 1: - key = reader.string(); - break; - case 2: - value = reader.string(); - break; - default: - reader.skipType(tag2 & 7); - break; - } - } - message.labels[key] = value; + key = reader.string(); + reader.pos++; + message.labels[key] = reader.string(); break; case 3: message.messageStoragePolicy = $root.google.pubsub.v1.MessageStoragePolicy.decode(reader, reader.uint32()); @@ -1021,7 +1007,7 @@ PubsubMessage.decode = function decode(reader, length) { if (!(reader instanceof $Reader)) reader = $Reader.create(reader); - var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.PubsubMessage(), key, value; + var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.PubsubMessage(), key; while (reader.pos < end) { var tag = reader.uint32(); switch (tag >>> 3) { @@ -1029,26 +1015,12 @@ message.data = reader.bytes(); break; case 2: + reader.skip().pos++; if (message.attributes === $util.emptyObject) message.attributes = {}; - var end2 = reader.uint32() + reader.pos; - key = ""; - value = ""; - while (reader.pos < end2) { - var tag2 = reader.uint32(); - switch (tag2 >>> 3) { - case 1: - key = reader.string(); - break; - case 2: - value = reader.string(); - break; - default: - reader.skipType(tag2 & 7); - break; - } - } - message.attributes[key] = value; + key = reader.string(); + reader.pos++; + message.attributes[key] = reader.string(); break; case 3: message.messageId = reader.string(); @@ -4752,7 +4724,7 @@ Subscription.decode = function decode(reader, length) { if (!(reader instanceof $Reader)) reader = $Reader.create(reader); - var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.Subscription(), key, value; + var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.Subscription(), key; while (reader.pos < end) { var tag = reader.uint32(); switch (tag >>> 3) { @@ -4775,26 +4747,12 @@ message.messageRetentionDuration = $root.google.protobuf.Duration.decode(reader, reader.uint32()); break; case 9: + reader.skip().pos++; if (message.labels === $util.emptyObject) message.labels = {}; - var end2 = reader.uint32() + reader.pos; - key = ""; - value = ""; - while (reader.pos < end2) { - var tag2 = reader.uint32(); - switch (tag2 >>> 3) { - case 1: - key = reader.string(); - break; - case 2: - value = reader.string(); - break; - default: - reader.skipType(tag2 & 7); - break; - } - } - message.labels[key] = value; + key = reader.string(); + reader.pos++; + message.labels[key] = reader.string(); break; case 10: message.enableMessageOrdering = reader.bool(); @@ -5791,7 +5749,7 @@ PushConfig.decode = function decode(reader, length) { if (!(reader instanceof $Reader)) reader = $Reader.create(reader); - var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.PushConfig(), key, value; + var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.PushConfig(), key; while (reader.pos < end) { var tag = reader.uint32(); switch (tag >>> 3) { @@ -5799,26 +5757,12 @@ message.pushEndpoint = reader.string(); break; case 2: + reader.skip().pos++; if (message.attributes === $util.emptyObject) message.attributes = {}; - var end2 = reader.uint32() + reader.pos; - key = ""; - value = ""; - while (reader.pos < end2) { - var tag2 = reader.uint32(); - switch (tag2 >>> 3) { - case 1: - key = reader.string(); - break; - case 2: - value = reader.string(); - break; - default: - reader.skipType(tag2 & 7); - break; - } - } - message.attributes[key] = value; + key = reader.string(); + reader.pos++; + message.attributes[key] = reader.string(); break; case 3: message.oidcToken = $root.google.pubsub.v1.PushConfig.OidcToken.decode(reader, reader.uint32()); @@ -9339,7 +9283,7 @@ CreateSnapshotRequest.decode = function decode(reader, length) { if (!(reader instanceof $Reader)) reader = $Reader.create(reader); - var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.CreateSnapshotRequest(), key, value; + var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.CreateSnapshotRequest(), key; while (reader.pos < end) { var tag = reader.uint32(); switch (tag >>> 3) { @@ -9350,26 +9294,12 @@ message.subscription = reader.string(); break; case 3: + reader.skip().pos++; if (message.labels === $util.emptyObject) message.labels = {}; - var end2 = reader.uint32() + reader.pos; - key = ""; - value = ""; - while (reader.pos < end2) { - var tag2 = reader.uint32(); - switch (tag2 >>> 3) { - case 1: - key = reader.string(); - break; - case 2: - value = reader.string(); - break; - default: - reader.skipType(tag2 & 7); - break; - } - } - message.labels[key] = value; + key = reader.string(); + reader.pos++; + message.labels[key] = reader.string(); break; default: reader.skipType(tag & 7); @@ -9838,7 +9768,7 @@ Snapshot.decode = function decode(reader, length) { if (!(reader instanceof $Reader)) reader = $Reader.create(reader); - var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.Snapshot(), key, value; + var end = length === undefined ? reader.len : reader.pos + length, message = new $root.google.pubsub.v1.Snapshot(), key; while (reader.pos < end) { var tag = reader.uint32(); switch (tag >>> 3) { @@ -9852,26 +9782,12 @@ message.expireTime = $root.google.protobuf.Timestamp.decode(reader, reader.uint32()); break; case 4: + reader.skip().pos++; if (message.labels === $util.emptyObject) message.labels = {}; - var end2 = reader.uint32() + reader.pos; - key = ""; - value = ""; - while (reader.pos < end2) { - var tag2 = reader.uint32(); - switch (tag2 >>> 3) { - case 1: - key = reader.string(); - break; - case 2: - value = reader.string(); - break; - default: - reader.skipType(tag2 & 7); - break; - } - } - message.labels[key] = value; + key = reader.string(); + reader.pos++; + message.labels[key] = reader.string(); break; default: reader.skipType(tag & 7); diff --git a/samples/README.md b/samples/README.md index e2905f5da..a812b16f3 100644 --- a/samples/README.md +++ b/samples/README.md @@ -23,6 +23,7 @@ guides. * [Create Push Subscription](#create-push-subscription) * [Create Subscription](#create-subscription) * [Create Subscription With Dead Letter Policy](#create-subscription-with-dead-letter-policy) + * [Create Subscription With Ordering Enabled](#create-subscription-with-ordering) * [Create Topic](#create-topic) * [Delete Subscription](#delete-subscription) * [Delete Topic](#delete-topic) @@ -126,6 +127,24 @@ __Usage:__ +### Create Subscription With Ordering Enabled + +Creates a new subscription With Ordering Enabled. + +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/master/samples/createSubscriptionWithOrdering.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) + +__Usage:__ + + +`node createSubscriptionWithOrdering.js ` + + +----- + + + ### Create Topic Creates a new topic. @@ -460,7 +479,26 @@ View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/master/s __Usage:__ -`node publishOrderedMessage.js ` +`node publishOrderedMessage.js ` + + +----- + + + + +### Resume Publishing + +Demonstrates how to resume publishing for an ordering key after a publish fails. + +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/master/samples/resumePublish.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/resumePublish.js,samples/README.md) + +__Usage:__ + + +`node resumePublish.js ` ----- diff --git a/samples/createSubscriptionWithOrdering.js b/samples/createSubscriptionWithOrdering.js new file mode 100644 index 000000000..9e04464c8 --- /dev/null +++ b/samples/createSubscriptionWithOrdering.js @@ -0,0 +1,64 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * subscriptions with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +'use strict'; + +// sample-metadata: +// title: Create Subscription with ordering enabled +// description: Creates a new subscription with ordering enabled. +// usage: node createSubscriptionWithOrdering.js + +function main( + topicName = 'YOUR_TOPIC_NAME', + subscriptionName = 'YOUR_SUBSCRIPTION_NAME' +) { + // [START pubsub_ordering_keys_create_subscription] + /** + * TODO(developer): Uncomment these variables before running the sample. + */ + // const topicName = 'YOUR_TOPIC_NAME'; + // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME'; + + // Imports the Google Cloud client library + const {PubSub} = require('@google-cloud/pubsub'); + + // Creates a client; cache this for further use + const pubSubClient = new PubSub(); + + async function createSubscriptionWithOrdering() { + // Creates a new subscription + await pubSubClient.topic(topicName).createSubscription(subscriptionName, { + enableMessageOrdering: true, + }); + console.log( + `Created subscription ${subscriptionName} with ordering enabled.` + ); + console.log( + 'To process messages in order, remember to add an ordering key to your messages.' + ); + } + + createSubscriptionWithOrdering().catch(console.error); + // [END pubsub_ordering_keys_create_subscription] +} + +main(...process.argv.slice(2)); diff --git a/samples/publishOrderedMessage.js b/samples/publishOrderedMessage.js index 70a645d99..6a65c2b1b 100644 --- a/samples/publishOrderedMessage.js +++ b/samples/publishOrderedMessage.js @@ -41,7 +41,8 @@ function setPublishCounterValue(value) { async function main( topicName = 'YOUR_TOPIC_NAME', - data = JSON.stringify({foo: 'bar'}) + data = JSON.stringify({foo: 'bar'}), + orderingKey = 'key1' ) { // [START pubsub_publish_ordered_message] /** @@ -49,6 +50,7 @@ async function main( */ // const topicName = 'YOUR_TOPIC_NAME'; // const data = JSON.stringify({foo: 'bar'}); + // const orderingKey = 'key1'; // Imports the Google Cloud client library const {PubSub} = require('@google-cloud/pubsub'); @@ -65,10 +67,16 @@ async function main( counterId: `${getPublishCounterValue()}`, }; + const message = { + data: dataBuffer, + attributes: attributes, + orderingKey: orderingKey, + }; + // Publishes the message const messageId = await pubSubClient - .topic(topicName) - .publish(dataBuffer, attributes); + .topic(topicName, {enableMessageOrdering: true}) + .publishMessage(message); // Update the counter value setPublishCounterValue(parseInt(attributes.counterId, 10) + 1); diff --git a/samples/resumePublish.js b/samples/resumePublish.js new file mode 100644 index 000000000..5cb9e9cc7 --- /dev/null +++ b/samples/resumePublish.js @@ -0,0 +1,85 @@ +// Copyright 2019-2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on topics with + * the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +'use strict'; + +// sample-metadata: +// title: Resume Publish +// description: Demonstrates how to resume publishing on an ordering key if +// publishing fails for a message. +// usage: node resumePublish.js + +async function main( + topicName = 'YOUR_TOPIC_NAME', + data = JSON.stringify({foo: 'bar'}), + orderingKey = 'key1' +) { + // [START pubsub_resume_publish] + /** + * TODO(developer): Uncomment these variables before running the sample. + */ + // const topicName = 'YOUR_TOPIC_NAME'; + // const data = JSON.stringify({foo: 'bar'}); + // const orderingKey = 'key1'; + + // Imports the Google Cloud client library + const {PubSub} = require('@google-cloud/pubsub'); + + // Creates a client; cache this for further use + const pubSubClient = new PubSub(); + + async function resumePublish() { + // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + + // Publishes the message + const publisher = pubSubClient.topic(topicName, { + enableMessageOrdering: true, + }); + try { + const message = { + data: dataBuffer, + orderingKey: orderingKey, + }; + const messageId = await publisher.publishMessage(message); + console.log(`Message ${messageId} published.`); + + return messageId; + } catch (e) { + console.log(`Could not publish: ${e}`); + publisher.resumePublishing(orderingKey); + return null; + } + } + + return await resumePublish(); + // [END pubsub_resume_publish] +} + +// This needs to be exported directly so that the system tests can find it. +module.exports = { + resumePublish: main, +}; + +if (require.main === module) { + main(...process.argv.slice(2)).catch(console.error); +} diff --git a/samples/system-test/subscriptions.test.js b/samples/system-test/subscriptions.test.js index 2f883c1a1..4699612e9 100644 --- a/samples/system-test/subscriptions.test.js +++ b/samples/system-test/subscriptions.test.js @@ -36,6 +36,7 @@ describe('subscriptions', () => { const subscriptionNameFive = `sub5-${runId}`; const subscriptionNameSix = `sub6-${runId}`; const subscriptionNameSeven = `sub7-${runId}`; + const subscriptionNameEight = `sub8-${runId}`; const subscriptionNameDetach = `testdetachsubsxyz-${runId}`; const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`; const fullSubscriptionNameOne = `projects/${projectId}/subscriptions/${subscriptionNameOne}`; @@ -393,4 +394,21 @@ describe('subscriptions', () => { .get(); assert.isNull(subscription.metadata.deadLetterPolicy); }); + + it('should create a subscription with ordering enabled.', async () => { + const output = execSync( + `${commandFor( + 'createSubscriptionWithOrdering' + )} ${topicNameTwo} ${subscriptionNameEight} ${topicNameThree}` + ); + assert.include( + output, + `Created subscription ${subscriptionNameEight} with ordering enabled.` + ); + const [subscription] = await pubsub + .topic(topicNameTwo) + .subscription(subscriptionNameEight) + .get(); + assert.strictEqual(subscription.metadata.enableMessageOrdering, true); + }); }); diff --git a/samples/system-test/topics.test.js b/samples/system-test/topics.test.js index bf80b8f28..8ff644d44 100644 --- a/samples/system-test/topics.test.js +++ b/samples/system-test/topics.test.js @@ -33,6 +33,7 @@ describe('topics', () => { const subscriptionNameTwo = `sub2-${runId}`; const subscriptionNameThree = `sub3-${runId}`; const subscriptionNameFour = `sub4-${runId}`; + const subscriptionNameFive = `sub5-${runId}`; const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`; const expectedMessage = {data: 'Hello, world!'}; @@ -135,31 +136,19 @@ describe('topics', () => { }); it('should publish ordered messages', async () => { - const topics = require('../publishOrderedMessage'); - const [subscription] = await pubsub .topic(topicNameTwo) .subscription(subscriptionNameTwo) .get({autoCreate: true}); - let messageId = await topics.publishOrderedMessage( - topicNameTwo, - expectedMessage.data - ); - let message = await _pullOneMessage(subscription); - assert.strictEqual(message.id, messageId); - assert.strictEqual(message.data.toString(), expectedMessage.data); - assert.strictEqual(message.attributes.counterId, '1'); - - messageId = await topics.publishOrderedMessage( - topicNameTwo, - expectedMessage.data + execSync( + `${commandFor('publishOrderedMessage')} ${topicNameTwo} "${ + expectedMessage.data + }" my-key` ); - message = await _pullOneMessage(subscription); - assert.strictEqual(message.id, messageId); + const message = await _pullOneMessage(subscription); + assert.strictEqual(message.orderingKey, 'my-key'); assert.strictEqual(message.data.toString(), expectedMessage.data); - assert.strictEqual(message.attributes.counterId, '2'); - await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data); }); it('should publish with specific batch settings', async () => { @@ -188,6 +177,22 @@ describe('topics', () => { ); }); + it('should resume publish', async () => { + const [subscription] = await pubsub + .topic(topicNameTwo) + .subscription(subscriptionNameFive) + .get({autoCreate: true}); + + execSync( + `${commandFor('resumePublish')} ${topicNameTwo} "${ + expectedMessage.data + }" my-key` + ); + const message = await _pullOneMessage(subscription); + assert.strictEqual(message.orderingKey, 'my-key'); + assert.strictEqual(message.data.toString(), expectedMessage.data); + }); + it('should publish with retry settings', async () => { const [subscription] = await pubsub .topic(topicNameOne)