Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

samples: add samples for Cloud Storage ingestion, and a few small fixes #1985

Merged
merged 8 commits into from
Oct 15, 2024
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ npm install @google-cloud/pubsub

```javascript
// Imports the Google Cloud client library
const { PubSub } = require("@google-cloud/pubsub");
const {PubSub} = require('@google-cloud/pubsub');

async function quickstart(
projectId = 'your-project-id', // Your Google Cloud Platform project ID
topicNameOrId = 'my-topic', // Name for the new topic to create
subscriptionName = 'my-sub' // Name for the new subscription to create
projectId = 'your-project-id', // Your Google Cloud Platform project ID
topicNameOrId = 'my-topic', // Name for the new topic to create
subscriptionName = 'my-sub' // Name for the new subscription to create
) {
// Instantiates a client
const pubsub = new PubSub({ projectId });
const pubsub = new PubSub({projectId});

// Creates a new topic
const [topic] = await pubsub.createTopic(topicNameOrId);
Expand All @@ -84,19 +84,19 @@ subscriptionName = 'my-sub' // Name for the new subscription to create
const [subscription] = await topic.createSubscription(subscriptionName);

// Receive callbacks for new messages on the subscription
subscription.on('message', (message) => {
subscription.on('message', message => {
console.log('Received message:', message.data.toString());
process.exit(0);
});

// Receive callbacks for errors on the subscription
subscription.on('error', (error) => {
subscription.on('error', error => {
console.error('Received error:', error);
process.exit(1);
});

// Send a message to the topic
topic.publishMessage({ data: Buffer.from('Test message!') });
topic.publishMessage({data: Buffer.from('Test message!')});
}

```
Expand Down Expand Up @@ -138,6 +138,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) |
| Create Subscription With Retry Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithRetryPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithRetryPolicy.js,samples/README.md) |
| Create Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopic.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopic.js,samples/README.md) |
| Create Topic With Cloud Storage Ingestion | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithCloudStorageIngestion.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithCloudStorageIngestion.js,samples/README.md) |
| Create Topic With Kinesis Ingestion | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithKinesisIngestion.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithKinesisIngestion.js,samples/README.md) |
| Create Topic With Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchema.js,samples/README.md) |
| Create Topic With Schema Revisions | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchemaRevisions.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchemaRevisions.js,samples/README.md) |
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"prelint": "cd samples; npm link ../; npm install",
"precompile": "gts clean",
"typeless": "npx typeless-sample-bot --outputpath samples --targets samples --recursive",
"posttypeless": "cd samples && npm i && cd .. && npx eslint --ignore-pattern owl-bot-staging --fix"
"posttypeless": "cd samples && npm i && cd .. && npx eslint --ignore-pattern owl-bot-staging --fix samples"
},
"dependencies": {
"@google-cloud/paginator": "^5.0.0",
Expand Down
20 changes: 20 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ guides.
* [Create Subscription with ordering enabled](#create-subscription-with-ordering-enabled)
* [Create Subscription With Retry Policy](#create-subscription-with-retry-policy)
* [Create Topic](#create-topic)
* [Create Topic With Cloud Storage Ingestion](#create-topic-with-cloud-storage-ingestion)
* [Create Topic With Kinesis Ingestion](#create-topic-with-kinesis-ingestion)
* [Create Topic With Schema](#create-topic-with-schema)
* [Create Topic With Schema Revisions](#create-topic-with-schema-revisions)
Expand Down Expand Up @@ -389,6 +390,25 @@ __Usage:__



### Create Topic With Cloud Storage Ingestion

Creates a new topic, with Cloud Storage ingestion enabled.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithCloudStorageIngestion.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithCloudStorageIngestion.js,samples/README.md)

__Usage:__


`node createTopicWithCloudStorageIngestion.js <topic-name> <bucket> <input-format> <text-delimiter> <match-glob> <minimum-object-creation-time>`


-----




### Create Topic With Kinesis Ingestion

Creates a new topic, with Kinesis ingestion enabled.
Expand Down
118 changes: 118 additions & 0 deletions samples/createTopicWithCloudStorageIngestion.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Cloud Storage Ingestion
// description: Creates a new topic, with Cloud Storage ingestion enabled.
// usage: node createTopicWithCloudStorageIngestion.js <topic-name> <bucket> <input-format> <text-delimiter> <match-glob> <minimum-object-creation-time>

// [START pubsub_create_topic_with_cloud_storage_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
michaelpri10 marked this conversation as resolved.
Show resolved Hide resolved
topicNameOrId,
bucket,
inputFormat,
textDelimiter,
matchGlob,
minimumObjectCreateTime
) {
const minimumDate = Date.parse(minimumObjectCreateTime);
const topicMetadata = {
name: topicNameOrId,
ingestionDataSourceSettings: {
cloudStorage: {
bucket,
minimumObjectCreateTime: {
seconds: minimumDate / 1000,
nanos: (minimumDate % 1000) * 1000,
},
matchGlob,
},
},
};

// Make a format appropriately.
switch (inputFormat) {
case 'text':
topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
delimiter: textDelimiter,
};
break;
case 'avro':
topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
break;
case 'pubsub_avro':
topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
{};
break;
default:
console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
return;
}

// Creates a new topic with Cloud Storage ingestion.
await pubSubClient.createTopic(topicMetadata);
console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}
// [END pubsub_create_topic_with_cloud_storage_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
bucket = 'YOUR_BUCKET_NAME',
inputFormat = 'text',
textDelimiter = '\n',
matchGlob = '**.txt',
minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ'
) {
createTopicWithKinesisIngestion(
topicNameOrId,
bucket,
inputFormat,
textDelimiter,
matchGlob,
minimumObjectCreateTime
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
3 changes: 1 addition & 2 deletions samples/createTopicWithKinesisIngestion.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ async function createTopicWithKinesisIngestion(
streamArn,
consumerArn
) {
// Creates a new topic with a schema. Note that you might also
// pass Encodings.Json or Encodings.Binary here.
// Creates a new topic with Kinesis ingestion.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
Expand Down
33 changes: 33 additions & 0 deletions samples/system-test/topics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

import {Message, PubSub, Topic, Subscription} from '@google-cloud/pubsub';
import {Bucket, Storage} from '@google-cloud/storage';
import {assert} from 'chai';
import {describe, it, after} from 'mocha';
import {execSync, commandFor} from './common';
Expand Down Expand Up @@ -52,6 +53,17 @@ describe('topics', () => {
return {t: topic, tname, s: sub};
}

async function createStorageBucket(testName: string): Promise<Bucket> {
const storage = new Storage({
projectId,
});

const name = resources.generateStorageName(testName);

const [bucket] = await storage.createBucket(name);
return bucket;
}

async function cleanSubs() {
const [subscriptions] = await pubsub.getSubscriptions();
await Promise.all(
Expand Down Expand Up @@ -121,6 +133,27 @@ describe('topics', () => {
assert.ok(exists, 'Topic was created');
});

it('should create a topic with cloud storage ingestion', async () => {
const testId = 'create-gcs-ingestion';
const name = topicName(testId);
const bucket = await createStorageBucket(testId);
const bucketName = bucket.name;

try {
const output = execSync(
`${commandFor('createTopicWithCloudStorageIngestion')} ${name} ${
bucketName
} text "\n" "**.txt' 2024-10-10T00:00:00Z`
);
assert.include(output, `Topic ${name} created with Cloud Storage ingestion.`);
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
} finally {
await bucket.delete();
}
});

it('should update a topic with kinesis integration', async () => {
const pair = await createPair('update-kinesis');
const output = execSync(
Expand Down
114 changes: 114 additions & 0 deletions samples/typescript/createTopicWithCloudStorageIngestion.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Cloud Storage Ingestion
// description: Creates a new topic, with Cloud Storage ingestion enabled.
// usage: node createTopicWithCloudStorageIngestion.js <topic-name> <bucket> <input-format> <text-delimiter> <match-glob> <minimum-object-creation-time>

// [START pubsub_create_topic_with_cloud_storage_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
michaelpri10 marked this conversation as resolved.
Show resolved Hide resolved
topicNameOrId: string,
bucket: string,
inputFormat: string,
textDelimiter: string,
matchGlob: string,
minimumObjectCreateTime: string
) {
const minimumDate = Date.parse(minimumObjectCreateTime);
const topicMetadata: TopicMetadata = {
name: topicNameOrId,
ingestionDataSourceSettings: {
cloudStorage: {
bucket,
minimumObjectCreateTime: {
seconds: minimumDate / 1000,
nanos: (minimumDate % 1000) * 1000,
},
matchGlob,
},
},
};

// Make a format appropriately.
switch (inputFormat) {
case 'text':
topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
delimiter: textDelimiter,
};
break;
case 'avro':
topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
break;
case 'pubsub_avro':
topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
{};
break;
default:
console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
return;
}

// Creates a new topic with Cloud Storage ingestion.
await pubSubClient.createTopic(topicMetadata);
console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}
// [END pubsub_create_topic_with_cloud_storage_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
bucket = 'YOUR_BUCKET_NAME',
inputFormat = 'text',
textDelimiter = '\n',
matchGlob = '**.txt',
minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ'
) {
createTopicWithKinesisIngestion(
topicNameOrId,
bucket,
inputFormat,
textDelimiter,
matchGlob,
minimumObjectCreateTime
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
Loading