Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reconnect test coverage #271

Merged
merged 9 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ coverage>=4.4.0
codecov>=2.0.0
black
isort~=5.10.0
freezegun==1.4.0
tests/sample_pypi_package
74 changes: 71 additions & 3 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import time
from concurrent import futures
from unittest.mock import ANY, patch
from concurrent.futures._base import FINISHED
from unittest.mock import ANY, create_autospec, patch

import pytest
from freezegun import freeze_time
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler

from rele import Subscriber, Worker, sub
from rele.middleware import register_middleware
from rele.retry_policy import RetryPolicy
from rele.subscription import Callback
from rele.worker import create_and_run
from rele.worker import NotConnectionError, create_and_run


@sub(topic="some-cool-topic", prefix="rele")
Expand Down Expand Up @@ -46,6 +49,13 @@ def mock_create_subscription():
yield m


@pytest.fixture(autouse=True)
def mock_internet_connection():
with patch("rele.worker.check_internet_connection") as m:
m.return_value = True
yield m


class TestWorker:
def test_start_subscribes_and_saves_futures_when_subscriptions_given(
self, mock_consume, worker
Expand Down Expand Up @@ -116,6 +126,14 @@ def test_creates_subscription_with_custom_ack_deadline_from_environment(
assert worker._subscriber._ack_deadline == custom_ack_deadline
assert worker._subscriber._gc_project_id == "rele-test"

def test_raises_not_connection_error_during_start(
self, worker, mock_internet_connection
):
mock_internet_connection.return_value = False

with pytest.raises(NotConnectionError):
worker.start()


@pytest.mark.usefixtures("mock_create_subscription")
class TestRestartConsumer:
Expand All @@ -140,14 +158,64 @@ def test_restarts_consumption_when_future_is_cancelled(self, worker, mock_consum

assert len(mock_consume.call_args_list) == 2

def test_restarts_consumption_when_future_is_done(self, worker, mock_consume):
def test_raises_future_error_when_future_with_exception_is_cancelled(
self, worker, mock_create_subscription
):
with patch.object(Subscriber, "consume") as m:
mock_streaming_pull_future = create_autospec(
spec=StreamingPullFuture, instance=True
)
mock_streaming_pull_future.cancelled.return_value = True
mock_streaming_pull_future._state = FINISHED
m.return_value = mock_streaming_pull_future

with pytest.raises(ValueError):
worker.run_forever()

mock_streaming_pull_future.result.assert_called_once()

def test_restarts_consumption_when_future_is_done(
self, worker, mock_consume, mock_sleep
):
mock_consume.return_value.set_result(True)

with pytest.raises(ValueError):
worker.run_forever()

assert len(mock_consume.call_args_list) == 2

@freeze_time("2024-01-01 10:00:50Z")
@pytest.mark.usefixtures("mock_consume")
def test_wait_forever_if_we_have_connection_and_timestamp_module_50(
self, worker, mock_internet_connection
):
with pytest.raises(ValueError):
worker._wait_forever(1)

mock_internet_connection.assert_called_once()

@pytest.mark.usefixtures("mock_consume")
@pytest.mark.parametrize(
"timestamp_now", ["2024-01-01 10:00:49Z", "2024-01-01 10:00:51Z"]
)
def test_does_not_check_internet_connection_when_timestamp_is_not_module_50(
self, worker, mock_internet_connection, timestamp_now
):
with freeze_time(timestamp_now):
with pytest.raises(ValueError):
worker._wait_forever(1)

mock_internet_connection.assert_not_called()

@freeze_time("2024-01-01 10:00:50Z")
def test_raises_not_connection_error_during_wait_forever_if_connection_is_down_every_50_seconds( # noqa
self, worker, mock_internet_connection
):
mock_internet_connection.return_value = False

with pytest.raises(NotConnectionError):
worker._wait_forever(1)


class TestCreateAndRun:
@pytest.fixture(autouse=True)
Expand Down