Skip to content

Commit

Permalink
samples: add optimisticSubscribe, plus a few small library changes to…
Browse files Browse the repository at this point in the history
… support it
  • Loading branch information
feywind committed Sep 17, 2024
1 parent c3abf92 commit 2f3919d
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 6 deletions.
3 changes: 2 additions & 1 deletion samples/listenForMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const {PubSub} = require('@google-cloud/pubsub');
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId, timeout) {
// References an existing subscription
// References an existing subscription; if you are unsure if the
// subscription will exist, try the optimisticSubscribe sample.
const subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
Expand Down
101 changes: 101 additions & 0 deletions samples/optimisticSubscribe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2019-2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on
* subscriptions with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Optimistic Subscribe
// description: Listens for messages from a subscription, creating it if needed.
// usage: node optimisticSubscribe.js <subscription-name-or-id> <topic-name-or-id> [timeout-in-seconds]

// [START pubsub_optimistic_subscribe]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
// Try using an existing subscription
let subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Set an error handler so that we're notified if the subscription doesn't
// already exist.
subscription.on('error', async e => {
// Resource Not Found
if (e.code === 5) {
console.log('Subscription not found, creating it');
await pubSubClient.createSubscription(
topicNameOrId,
subscriptionNameOrId
);

// Refresh our subscriber object and re-attach the message handler.
subscription = pubSubClient.subscription(subscriptionNameOrId);
subscription.on('message', messageHandler);
}
});

// Listen for new messages until timeout is hit; this will attempt to
// open the actual subscriber streams. If it fails, the error handler
// above will be called.
subscription.on('message', messageHandler);

// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_optimistic_subscribe]

function main(
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
timeout = 60
) {
timeout = Number(timeout);
optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout);
}

main(...process.argv.slice(2));
3 changes: 2 additions & 1 deletion samples/typescript/listenForMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import {PubSub, Message} from '@google-cloud/pubsub';
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId: string, timeout: number) {
// References an existing subscription
// References an existing subscription; if you are unsure if the
// subscription will exist, try the optimisticSubscribe sample.
const subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
Expand Down
101 changes: 101 additions & 0 deletions samples/typescript/optimisticSubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2019-2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to perform basic operations on
* subscriptions with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Optimistic Subscribe
// description: Listens for messages from a subscription, creating it if needed.
// usage: node optimisticSubscribe.js <subscription-name-or-id> <topic-name-or-id> [timeout-in-seconds]

// [START pubsub_optimistic_subscribe]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
subscriptionNameOrId: string,
topicNameOrId: string,
timeout: number
) {
// Try using an existing subscription
let subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = (message: Message) => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Set an error handler so that we're notified if the subscription doesn't
// already exist.
subscription.on('error', async (e: StatusError) => {
// Resource Not Found
if (e.code === 5) {
console.log('Subscription not found, creating it');
await pubSubClient.createSubscription(
topicNameOrId,
subscriptionNameOrId
);

// Refresh our subscriber object and re-attach the message handler.
subscription = pubSubClient.subscription(subscriptionNameOrId);
subscription.on('message', messageHandler);
}
});

// Listen for new messages until timeout is hit; this will attempt to
// open the actual subscriber streams. If it fails, the error handler
// above will be called.
subscription.on('message', messageHandler);

// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_optimistic_subscribe]

function main(
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
timeout = 60
) {
timeout = Number(timeout);
optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout);
}

main(...process.argv.slice(2));
5 changes: 4 additions & 1 deletion src/debug.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import {grpc} from 'google-gax';

/**
* Represents a debug message the user might want to print out for logging
* while debugging or whatnot. These will always come by way of the 'error'
Expand All @@ -31,6 +33,7 @@
export class DebugMessage {
constructor(
public message: string,
public error?: Error
public error?: Error,
public status?: grpc.StatusObject
) {}
}
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export {
SeekResponse,
Snapshot,
} from './snapshot';
export {Message, SubscriberOptions} from './subscriber';
export {Message, StatusError, SubscriberOptions} from './subscriber';
export {
Schema,
CreateSchemaResponse,
Expand Down
8 changes: 6 additions & 2 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ export class MessageStream extends PassThrough {
this.emit(
'debug',
new DebugMessage(
`Subscriber stream ${index} has ended with status ${status.code}; will be retried.`
`Subscriber stream ${index} has ended with status ${status.code}; will be retried.`,
undefined,
status
)
);
if (PullRetry.resetFailures(status)) {
Expand All @@ -401,7 +403,9 @@ export class MessageStream extends PassThrough {
this.emit(
'debug',
new DebugMessage(
`Subscriber stream ${index} has ended with status ${status.code}; will not be retried.`
`Subscriber stream ${index} has ended with status ${status.code}; will not be retried.`,
undefined,
status
)
);

Expand Down
2 changes: 2 additions & 0 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import * as tracing from './telemetry-tracing';
import {Duration} from './temporal';
import {EventEmitter} from 'events';

export {StatusError} from './message-stream';

export type PullResponse = google.pubsub.v1.IStreamingPullResponse;
export type SubscriptionProperties =
google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties;
Expand Down

0 comments on commit 2f3919d

Please sign in to comment.