Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

samples: add optimisticSubscribe, plus a few small library changes to support it #1973

Merged
merged 7 commits into from
Sep 24, 2024
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Listen For Messages With Custom Attributes | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithCustomAttributes.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithCustomAttributes.js,samples/README.md) |
| Subscribe with OpenTelemetry Tracing | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithOpenTelemetryTracing.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithOpenTelemetryTracing.js,samples/README.md) |
| Modify Push Configuration | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) |
| Optimistic Subscribe | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/optimisticSubscribe.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/optimisticSubscribe.js,samples/README.md) |
| Publish Avro Records to a Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishAvroRecords.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishAvroRecords.js,samples/README.md) |
| Publish Batched Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishBatchedMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishBatchedMessages.js,samples/README.md) |
| Publish Message | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishMessage.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishMessage.js,samples/README.md) |
Expand Down
20 changes: 20 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ guides.
* [Listen For Messages With Custom Attributes](#listen-for-messages-with-custom-attributes)
* [Subscribe with OpenTelemetry Tracing](#subscribe-with-opentelemetry-tracing)
* [Modify Push Configuration](#modify-push-configuration)
* [Optimistic Subscribe](#optimistic-subscribe)
* [Publish Avro Records to a Topic](#publish-avro-records-to-a-topic)
* [Publish Batched Messages](#publish-batched-messages)
* [Publish Message](#publish-message)
Expand Down Expand Up @@ -901,6 +902,25 @@ __Usage:__



### Optimistic Subscribe

Listens for messages from a subscription, creating it if needed.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/optimisticSubscribe.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/optimisticSubscribe.js,samples/README.md)

__Usage:__


`node optimisticSubscribe.js <subscription-name-or-id> <topic-name-or-id> [timeout-in-seconds]`


-----




### Publish Avro Records to a Topic

Publishes a record in Avro to a topic with a schema.
Expand Down
3 changes: 2 additions & 1 deletion samples/listenForMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const {PubSub} = require('@google-cloud/pubsub');
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId, timeout) {
// References an existing subscription
// References an existing subscription; if you are unsure if the
// subscription will exist, try the optimisticSubscribe sample.
const subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
Expand Down
101 changes: 101 additions & 0 deletions samples/optimisticSubscribe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on
* subscriptions with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Optimistic Subscribe
// description: Listens for messages from a subscription, creating it if needed.
// usage: node optimisticSubscribe.js <subscription-name-or-id> <topic-name-or-id> [timeout-in-seconds]

// [START pubsub_optimistic_subscribe]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
// Try using an existing subscription
let subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Set an error handler so that we're notified if the subscription doesn't
// already exist.
subscription.on('error', async e => {
// Resource Not Found
if (e.code === 5) {
console.log('Subscription not found, creating it');
await pubSubClient.createSubscription(
topicNameOrId,
subscriptionNameOrId
);

// Refresh our subscriber object and re-attach the message handler.
subscription = pubSubClient.subscription(subscriptionNameOrId);
subscription.on('message', messageHandler);
}
});

// Listen for new messages until timeout is hit; this will attempt to
// open the actual subscriber streams. If it fails, the error handler
// above will be called.
subscription.on('message', messageHandler);

// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_optimistic_subscribe]

function main(
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
timeout = 60
) {
timeout = Number(timeout);
optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout);
}

main(...process.argv.slice(2));
3 changes: 2 additions & 1 deletion samples/typescript/listenForMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import {PubSub, Message} from '@google-cloud/pubsub';
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId: string, timeout: number) {
// References an existing subscription
// References an existing subscription; if you are unsure if the
// subscription will exist, try the optimisticSubscribe sample.
const subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
Expand Down
101 changes: 101 additions & 0 deletions samples/typescript/optimisticSubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to perform basic operations on
* subscriptions with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Optimistic Subscribe
// description: Listens for messages from a subscription, creating it if needed.
// usage: node optimisticSubscribe.js <subscription-name-or-id> <topic-name-or-id> [timeout-in-seconds]

// [START pubsub_optimistic_subscribe]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
subscriptionNameOrId: string,
topicNameOrId: string,
timeout: number
) {
// Try using an existing subscription
let subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = (message: Message) => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Set an error handler so that we're notified if the subscription doesn't
// already exist.
subscription.on('error', async (e: StatusError) => {
// Resource Not Found
if (e.code === 5) {
console.log('Subscription not found, creating it');
await pubSubClient.createSubscription(
topicNameOrId,
subscriptionNameOrId
);

// Refresh our subscriber object and re-attach the message handler.
subscription = pubSubClient.subscription(subscriptionNameOrId);
subscription.on('message', messageHandler);
}
});

// Listen for new messages until timeout is hit; this will attempt to
// open the actual subscriber streams. If it fails, the error handler
// above will be called.
subscription.on('message', messageHandler);

// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_optimistic_subscribe]

function main(
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
timeout = 60
) {
timeout = Number(timeout);
optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout);
}

main(...process.argv.slice(2));
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export {
SeekResponse,
Snapshot,
} from './snapshot';
export {Message, SubscriberOptions} from './subscriber';
export {Message, StatusError, SubscriberOptions} from './subscriber';
export {
Schema,
CreateSchemaResponse,
Expand Down
8 changes: 6 additions & 2 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,14 @@ export class MessageStream extends PassThrough {
private _onEnd(index: number, status: grpc.StatusObject): void {
this._removeStream(index);

const statusError = new StatusError(status);

if (PullRetry.retry(status)) {
this.emit(
'debug',
new DebugMessage(
`Subscriber stream ${index} has ended with status ${status.code}; will be retried.`
`Subscriber stream ${index} has ended with status ${status.code}; will be retried.`,
statusError
)
);
if (PullRetry.resetFailures(status)) {
Expand All @@ -401,7 +404,8 @@ export class MessageStream extends PassThrough {
this.emit(
'debug',
new DebugMessage(
`Subscriber stream ${index} has ended with status ${status.code}; will not be retried.`
`Subscriber stream ${index} has ended with status ${status.code}; will not be retried.`,
statusError
)
);

Expand Down
2 changes: 2 additions & 0 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import * as tracing from './telemetry-tracing';
import {Duration} from './temporal';
import {EventEmitter} from 'events';

export {StatusError} from './message-stream';

export type PullResponse = google.pubsub.v1.IStreamingPullResponse;
export type SubscriptionProperties =
google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties;
Expand Down
Loading