diff --git a/samples/listenForMessages.js b/samples/listenForMessages.js index 3f3f35547..ebfdbbeb7 100644 --- a/samples/listenForMessages.js +++ b/samples/listenForMessages.js @@ -44,7 +44,8 @@ const {PubSub} = require('@google-cloud/pubsub'); const pubSubClient = new PubSub(); function listenForMessages(subscriptionNameOrId, timeout) { - // References an existing subscription + // References an existing subscription; if you are unsure if the + // subscription will exist, try the optimisticSubscribe sample. const subscription = pubSubClient.subscription(subscriptionNameOrId); // Create an event handler to handle messages diff --git a/samples/optimisticSubscribe.js b/samples/optimisticSubscribe.js new file mode 100644 index 000000000..3d2c53272 --- /dev/null +++ b/samples/optimisticSubscribe.js @@ -0,0 +1,101 @@ +// Copyright 2019-2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a generated sample, using the typeless sample bot. Please +// look for the source TypeScript sample (.ts) for modifications. +'use strict'; + +/** + * This sample demonstrates how to perform basic operations on + * subscriptions with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Optimistic Subscribe +// description: Listens for messages from a subscription, creating it if needed. +// usage: node optimisticSubscribe.js [timeout-in-seconds] + +// [START pubsub_optimistic_subscribe] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; +// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; +// const timeout = 60; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) { + // Try using an existing subscription + let subscription = pubSubClient.subscription(subscriptionNameOrId); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = message => { + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${message.data}`); + console.log(`\tAttributes: ${message.attributes}`); + messageCount += 1; + + // "Ack" (acknowledge receipt of) the message + message.ack(); + }; + + // Set an error handler so that we're notified if the subscription doesn't + // already exist. + subscription.on('error', async e => { + // Resource Not Found + if (e.code === 5) { + console.log('Subscription not found, creating it'); + await pubSubClient.createSubscription( + topicNameOrId, + subscriptionNameOrId + ); + + // Refresh our subscriber object and re-attach the message handler. + subscription = pubSubClient.subscription(subscriptionNameOrId); + subscription.on('message', messageHandler); + } + }); + + // Listen for new messages until timeout is hit; this will attempt to + // open the actual subscriber streams. If it fails, the error handler + // above will be called. + subscription.on('message', messageHandler); + + // Wait a while for the subscription to run. (Part of the sample only.) + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_optimistic_subscribe] + +function main( + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + timeout = 60 +) { + timeout = Number(timeout); + optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/listenForMessages.ts b/samples/typescript/listenForMessages.ts index 72cb468df..582e04649 100644 --- a/samples/typescript/listenForMessages.ts +++ b/samples/typescript/listenForMessages.ts @@ -40,7 +40,8 @@ import {PubSub, Message} from '@google-cloud/pubsub'; const pubSubClient = new PubSub(); function listenForMessages(subscriptionNameOrId: string, timeout: number) { - // References an existing subscription + // References an existing subscription; if you are unsure if the + // subscription will exist, try the optimisticSubscribe sample. const subscription = pubSubClient.subscription(subscriptionNameOrId); // Create an event handler to handle messages diff --git a/samples/typescript/optimisticSubscribe.ts b/samples/typescript/optimisticSubscribe.ts new file mode 100644 index 000000000..215249311 --- /dev/null +++ b/samples/typescript/optimisticSubscribe.ts @@ -0,0 +1,101 @@ +// Copyright 2019-2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on + * subscriptions with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Optimistic Subscribe +// description: Listens for messages from a subscription, creating it if needed. +// usage: node optimisticSubscribe.js [timeout-in-seconds] + +// [START pubsub_optimistic_subscribe] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; +// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; +// const timeout = 60; + +// Imports the Google Cloud client library +import {PubSub, Message, StatusError} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +function optimisticSubscribe( + subscriptionNameOrId: string, + topicNameOrId: string, + timeout: number +) { + // Try using an existing subscription + let subscription = pubSubClient.subscription(subscriptionNameOrId); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = (message: Message) => { + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${message.data}`); + console.log(`\tAttributes: ${message.attributes}`); + messageCount += 1; + + // "Ack" (acknowledge receipt of) the message + message.ack(); + }; + + // Set an error handler so that we're notified if the subscription doesn't + // already exist. + subscription.on('error', async (e: StatusError) => { + // Resource Not Found + if (e.code === 5) { + console.log('Subscription not found, creating it'); + await pubSubClient.createSubscription( + topicNameOrId, + subscriptionNameOrId + ); + + // Refresh our subscriber object and re-attach the message handler. + subscription = pubSubClient.subscription(subscriptionNameOrId); + subscription.on('message', messageHandler); + } + }); + + // Listen for new messages until timeout is hit; this will attempt to + // open the actual subscriber streams. If it fails, the error handler + // above will be called. + subscription.on('message', messageHandler); + + // Wait a while for the subscription to run. (Part of the sample only.) + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_optimistic_subscribe] + +function main( + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + timeout = 60 +) { + timeout = Number(timeout); + optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout); +} + +main(...process.argv.slice(2)); diff --git a/src/debug.ts b/src/debug.ts index b7856dd44..807413ea1 100644 --- a/src/debug.ts +++ b/src/debug.ts @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +import {grpc} from 'google-gax'; + /** * Represents a debug message the user might want to print out for logging * while debugging or whatnot. These will always come by way of the 'error' @@ -31,6 +33,7 @@ export class DebugMessage { constructor( public message: string, - public error?: Error + public error?: Error, + public status?: grpc.StatusObject ) {} } diff --git a/src/index.ts b/src/index.ts index abc8afb08..15b5a1d32 100644 --- a/src/index.ts +++ b/src/index.ts @@ -124,7 +124,7 @@ export { SeekResponse, Snapshot, } from './snapshot'; -export {Message, SubscriberOptions} from './subscriber'; +export {Message, StatusError, SubscriberOptions} from './subscriber'; export { Schema, CreateSchemaResponse, diff --git a/src/message-stream.ts b/src/message-stream.ts index 80b714496..2d9ebc760 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -388,7 +388,9 @@ export class MessageStream extends PassThrough { this.emit( 'debug', new DebugMessage( - `Subscriber stream ${index} has ended with status ${status.code}; will be retried.` + `Subscriber stream ${index} has ended with status ${status.code}; will be retried.`, + undefined, + status ) ); if (PullRetry.resetFailures(status)) { @@ -401,7 +403,9 @@ export class MessageStream extends PassThrough { this.emit( 'debug', new DebugMessage( - `Subscriber stream ${index} has ended with status ${status.code}; will not be retried.` + `Subscriber stream ${index} has ended with status ${status.code}; will not be retried.`, + undefined, + status ) ); diff --git a/src/subscriber.ts b/src/subscriber.ts index fd3dc30a4..c20b63be6 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -30,6 +30,8 @@ import * as tracing from './telemetry-tracing'; import {Duration} from './temporal'; import {EventEmitter} from 'events'; +export {StatusError} from './message-stream'; + export type PullResponse = google.pubsub.v1.IStreamingPullResponse; export type SubscriptionProperties = google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties;