Skip to content

Commit

Permalink
samples: create BigQuery subscription (#722)
Browse files Browse the repository at this point in the history
* chore: Remove notes about ordering keys being experimental.

* Revert "chore: Remove notes about ordering keys being experimental."

This reverts commit 38b2a3e.

* feat: Add support for server-side flow control

* Add unit test for flow control

* samples: create BigQuery subscription

* samples: update BigQuery subscription test

* Fix linter error for PR 722

* samples: create BigQuery subscription fix unused variable

Co-authored-by: Anna Cocuzzo <[email protected]>
  • Loading branch information
kamalaboulhosn and acocuzzo authored Jul 7, 2022
1 parent f14930c commit 7d31d1d
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 3 deletions.
3 changes: 2 additions & 1 deletion samples/snippets/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
backoff==2.0.1
pytest==7.1.2
mock==4.0.3
flaky==3.7.0
flaky==3.7.0
google-cloud-bigquery==1.28.0
2 changes: 1 addition & 1 deletion samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google-cloud-pubsub==2.12.1
google-cloud-pubsub==2.13.0
avro==1.11.0
51 changes: 51 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,42 @@ def create_subscription_with_exactly_once_delivery(
# [END pubsub_create_subscription_with_exactly_once_delivery]


def create_bigquery_subscription(
project_id: str, topic_id: str, subscription_id: str, bigquery_table_id: str
) -> None:
"""Create a new BigQuery subscription on the given topic."""
# [START pubsub_create_bigquery_subscription]
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "your-project.your-dataset.your-table"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(table=bigquery_table_id, write_metadata=True)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
subscription = subscriber.create_subscription(
request={
"name": subscription_path,
"topic": topic_path,
"bigquery_config": bigquery_config,
}
)

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")
# [END pubsub_create_bigquery_subscription]


def delete_subscription(project_id: str, subscription_id: str) -> None:
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_subscription]
Expand Down Expand Up @@ -922,6 +958,14 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
"subscription_id"
)

create_bigquery_subscription_parser = subparsers.add_parser(
"create-biquery",
help=create_bigquery_subscription.__doc__,
)
create_bigquery_subscription_parser.add_argument("topic_id")
create_bigquery_subscription_parser.add_argument("subscription_id")
create_bigquery_subscription_parser.add_argument("bigquery_table_id")

delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__)
delete_parser.add_argument("subscription_id")

Expand Down Expand Up @@ -1050,6 +1094,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
create_subscription_with_exactly_once_delivery(
args.project_id, args.topic_id, args.subscription_id
)
elif args.command == "create-bigquery":
create_bigquery_subscription(
args.project_id,
args.topic_id,
args.subscription_id,
args.bigquery_table_id,
)
elif args.command == "delete":
delete_subscription(args.project_id, args.subscription_id)
elif args.command == "update-push":
Expand Down
60 changes: 59 additions & 1 deletion samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import backoff
from flaky import flaky
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from google.cloud import bigquery, pubsub_v1
import pytest

import subscriber

# This uuid is shared across tests which run in parallel.
UUID = uuid.uuid4().hex
PY_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}"
UNDERSCORE_PY_VERSION = PY_VERSION.replace(".", "_")
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
TOPIC = f"subscription-test-topic-{PY_VERSION}-{UUID}"
DEAD_LETTER_TOPIC = f"subscription-test-dead-letter-topic-{PY_VERSION}-{UUID}"
Expand All @@ -42,6 +43,8 @@
DEFAULT_MAX_DELIVERY_ATTEMPTS = 5
UPDATED_MAX_DELIVERY_ATTEMPTS = 20
FILTER = 'attributes.author="unknown"'
BIGQUERY_DATASET_ID = f"python_samples_dataset_{UNDERSCORE_PY_VERSION}_{UUID}"
BIGQUERY_TABLE_ID = f"python_samples_table_{UNDERSCORE_PY_VERSION}_{UUID}"

C = TypeVar("C", bound=Callable[..., Any])

Expand Down Expand Up @@ -545,6 +548,61 @@ def test_update_push_subscription(
subscriber_client.delete_subscription(request={"subscription": subscription_path})


@pytest.fixture(scope="module")
def bigquery_table() -> Generator[str, None, None]:
client = bigquery.Client()
dataset = bigquery.Dataset(f"{PROJECT_ID}.{BIGQUERY_DATASET_ID}")
dataset.location = "US"
dataset = client.create_dataset(dataset)

table_id = f"{PROJECT_ID}.{BIGQUERY_DATASET_ID}.{BIGQUERY_TABLE_ID}"
schema = [
bigquery.SchemaField("data", "STRING", mode="REQUIRED"),
bigquery.SchemaField("message_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("attributes", "STRING", mode="REQUIRED"),
bigquery.SchemaField("subscription_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("publish_time", "TIMESTAMP", mode="REQUIRED"),
]

table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)

yield table_id

client.delete_dataset(dataset, delete_contents=True)


def test_create_bigquery_subscription(
subscriber_client: pubsub_v1.SubscriberClient,
topic: str,
bigquery_table: str,
capsys: CaptureFixture[str],
) -> None:
bigquery_subscription_for_create_name = (
f"subscription-test-subscription-bigquery-for-create-{PY_VERSION}-{UUID}"
)

subscription_path = subscriber_client.subscription_path(
PROJECT_ID, bigquery_subscription_for_create_name
)
try:
subscriber_client.delete_subscription(
request={"subscription": subscription_path}
)
except NotFound:
pass

subscriber.create_bigquery_subscription(
PROJECT_ID, TOPIC, bigquery_subscription_for_create_name, bigquery_table
)

out, _ = capsys.readouterr()
assert f"{bigquery_subscription_for_create_name}" in out

# Clean up.
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_delete_subscription(
subscriber_client: pubsub_v1.SubscriberClient,
topic: str,
Expand Down

0 comments on commit 7d31d1d

Please sign in to comment.