Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process: add mypy session for samples #551

Merged
merged 11 commits into from
Jan 17, 2022
23 changes: 22 additions & 1 deletion google/cloud/pubsub_v1/publisher/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

from __future__ import absolute_import

from typing import Union
import typing
from typing import Any, Callable, Union

from google.cloud.pubsub_v1 import futures

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud import pubsub_v1


class Future(futures.Future):
"""This future object is returned from asychronous Pub/Sub publishing
Expand Down Expand Up @@ -60,3 +64,20 @@ def result(self, timeout: Union[int, float] = None) -> str:
call execution.
"""
return super().result(timeout=timeout)

# This exists to make the type checkers happy.
def add_done_callback(
self, callback: Callable[["pubsub_v1.publisher.futures.Future"], Any]
) -> None:
"""Attache a callable that will be called when the future finishes.
tswast marked this conversation as resolved.
Show resolved Hide resolved

Args:
callback:
A callable that will be called with this future as its only
argument when the future completes or is cancelled. The callable
will always be called by a thread in the same process in which
it was added. If the future has already completed or been
cancelled then the callable will be called immediately. These
callables are called in the order that they were added.
"""
return super().add_done_callback(callback) # type: ignore
12 changes: 10 additions & 2 deletions google/cloud/pubsub_v1/subscriber/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,27 @@ def _on_close_callback(self, manager: "StreamingPullManager", result: Any):
else:
self.set_exception(result)

def cancel(self):
def cancel(self) -> bool:
"""Stops pulling messages and shutdowns the background thread consuming
messages.

The method always returns ``True``, as the shutdown is always initiated.
However, if the background stream is already being shut down or the shutdown
has completed, this method is a no-op.

.. versionchanged:: 2.4.1
The method does not block anymore, it just triggers the shutdown and returns
immediately. To block until the background stream is terminated, call
:meth:`result()` after cancelling the future.

.. versionchanged:: 2.10.0
The method always returns ``True`` instead of ``None``.
"""
# NOTE: We circumvent the base future's self._state to track the cancellation
# state, as this state has different meaning with streaming pull futures.
self.__cancelled = True
return self.__manager.close()
self.__manager.close()
return True

def cancelled(self) -> bool:
"""
Expand Down
29 changes: 29 additions & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"blacken",
"mypy",
"pytype",
# "mypy_samples", # TODO: uncomment when the checks pass
plamut marked this conversation as resolved.
Show resolved Hide resolved
"docs",
]

Expand All @@ -64,6 +65,12 @@ def mypy(session):
# require an additional pass.
session.install("types-protobuf", "types-setuptools")

# Version 2.1.1 of google-api-core version is the first type-checked release.
# Version 2.2.0 of google-cloud-core version is the first type-checked release.
session.install(
"google-api-core[grpc]>=2.1.1", "google-cloud-core>=2.2.0",
)

# TODO: Only check the hand-written layer, the generated code does not pass
# mypy checks yet.
# https://github.com/googleapis/gapic-generator-python/issues/1092
Expand All @@ -78,6 +85,28 @@ def pytype(session):
session.run("pytype")


@nox.session(python=DEFAULT_PYTHON_VERSION)
def mypy_samples(session):
"""Run type checks with mypy."""

session.install("-e", ".[all]")

session.install("pytest")
session.install(MYPY_VERSION)

# Just install the type info directly, since "mypy --install-types" might
# require an additional pass.
session.install("types-mock", "types-protobuf", "types-setuptools")

session.run(
"mypy",
"--config-file",
str(CURRENT_DIRECTORY / "samples" / "snippets" / "mypy.ini"),
"--no-incremental", # Required by warn-unused-configs from mypy.ini to work
"samples/",
)


@nox.session(python=DEFAULT_PYTHON_VERSION)
def lint(session):
"""Run linters.
Expand Down
47 changes: 47 additions & 0 deletions owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,13 +447,60 @@ def mypy(session):
# require an additional pass.
session.install("types-protobuf", "types-setuptools")

# Version 2.1.1 of google-api-core version is the first type-checked release.
# Version 2.2.0 of google-cloud-core version is the first type-checked release.
session.install(
"google-api-core[grpc]>=2.1.1",
"google-cloud-core>=2.2.0",
)

# TODO: Only check the hand-written layer, the generated code does not pass
# mypy checks yet.
# https://github.com/googleapis/gapic-generator-python/issues/1092
session.run("mypy", "google/cloud")'''
),
)


# ----------------------------------------------------------------------------
# Add mypy_samples nox session.
# ----------------------------------------------------------------------------
s.replace(
"noxfile.py",
r'"pytype",',
'\g<0>\n # "mypy_samples", # TODO: uncomment when the checks pass',
)
s.replace(
"noxfile.py",
r'session\.run\("pytype"\)',
textwrap.dedent(
''' \g<0>


@nox.session(python=DEFAULT_PYTHON_VERSION)
def mypy_samples(session):
"""Run type checks with mypy."""

session.install("-e", ".[all]")

session.install("pytest")
session.install(MYPY_VERSION)

# Just install the type info directly, since "mypy --install-types" might
# require an additional pass.
session.install("types-mock", "types-protobuf", "types-setuptools")

session.run(
"mypy",
"--config-file",
str(CURRENT_DIRECTORY / "samples" / "snippets" / "mypy.ini"),
"--no-incremental", # Required by warn-unused-configs from mypy.ini to work
"samples/",
)'''
),
)


# Only consider the hand-written layer when assessing the test coverage.
s.replace(
"noxfile.py", "--cov=google", "--cov=google/cloud",
Expand Down
12 changes: 6 additions & 6 deletions samples/snippets/iam_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,23 @@ def subscription_path(
pass


def test_get_topic_policy(topic_path: str, capsys: CaptureFixture) -> None:
def test_get_topic_policy(topic_path: str, capsys: CaptureFixture[str]) -> None:
iam.get_topic_policy(PROJECT_ID, TOPIC_ID)
out, _ = capsys.readouterr()
assert topic_path in out


def test_get_subscription_policy(
subscription_path: str, capsys: CaptureFixture
subscription_path: str, capsys: CaptureFixture[str]
) -> None:
iam.get_subscription_policy(PROJECT_ID, SUBSCRIPTION_ID)
out, _ = capsys.readouterr()
assert subscription_path in out


def test_set_topic_policy(
publisher_client: pubsub_v1.PublisherClient, topic_path: str,
) -> CaptureFixture:
publisher_client: pubsub_v1.PublisherClient, topic_path: str
) -> None:
iam.set_topic_policy(PROJECT_ID, TOPIC_ID)
policy = publisher_client.get_iam_policy(request={"resource": topic_path})
assert "roles/pubsub.publisher" in str(policy)
Expand All @@ -110,15 +110,15 @@ def test_set_subscription_policy(
assert "domain:google.com" in str(policy)


def test_check_topic_permissions(topic_path: str, capsys: CaptureFixture) -> None:
def test_check_topic_permissions(topic_path: str, capsys: CaptureFixture[str]) -> None:
iam.check_topic_permissions(PROJECT_ID, TOPIC_ID)
out, _ = capsys.readouterr()
assert topic_path in out
assert "pubsub.topics.publish" in out


def test_check_subscription_permissions(
subscription_path: str, capsys: CaptureFixture,
subscription_path: str, capsys: CaptureFixture[str],
) -> None:
iam.check_subscription_permissions(PROJECT_ID, SUBSCRIPTION_ID)
out, _ = capsys.readouterr()
Expand Down
8 changes: 8 additions & 0 deletions samples/snippets/mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[mypy]
; We require type annotations in all samples.
strict = True
exclude = noxfile\.py
warn_unused_configs = True

[mypy-avro.*,backoff,flaky]
ignore_missing_imports = True
20 changes: 10 additions & 10 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def publish_messages(project_id: str, topic_id: str) -> None:
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
data = f"Message number {n}"
data_str = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
data = data_str.encode("utf-8")
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data)
print(future.result())
Expand All @@ -121,9 +121,9 @@ def publish_messages_with_custom_attributes(project_id: str, topic_id: str) -> N
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
data = f"Message number {n}"
data_str = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
data = data_str.encode("utf-8")
# Add two attributes, origin and username, to the message
future = publisher.publish(
topic_path, data, origin="python-sample", username="gcp"
Expand Down Expand Up @@ -202,9 +202,9 @@ def callback(future: pubsub_v1.publisher.futures.Future) -> None:
print(message_id)

for n in range(1, 10):
data = f"Message number {n}"
data_str = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
data = data_str.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch multiple messages.
publish_future.add_done_callback(callback)
Expand Down Expand Up @@ -252,9 +252,9 @@ def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
# Publish 1000 messages in quick succession may be constrained by
# publisher flow control.
for n in range(1, 1000):
data = f"Message number {n}"
data_str = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
data = data_str.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch messages.
publish_future.add_done_callback(callback)
Expand Down Expand Up @@ -298,9 +298,9 @@ def publish_messages_with_retry_settings(project_id: str, topic_id: str) -> None
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
data = f"Message number {n}"
data_str = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
data = data_str.encode("utf-8")
future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
print(future.result())

Expand Down
Loading