Skip to content

Commit

Permalink
Add reconnect test coverage (#271)
Browse files Browse the repository at this point in the history
* Add test for retry connection errors

* Test refactor

* Add test_raises_future_error_when_future_with_exception_is_cancelled

* Linting

* Isort

* Change connection mock

* refactor

* Refactor 2

* Linting

---------

Co-authored-by: Iván Garrido Tamarit <[email protected]>
  • Loading branch information
jBonoraW and Iván Garrido Tamarit authored Jan 22, 2024
1 parent 08e7a04 commit 5518cf7
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
1 change: 1 addition & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ coverage>=4.4.0
codecov>=2.0.0
black
isort~=5.10.0
freezegun==1.4.0
tests/sample_pypi_package
74 changes: 71 additions & 3 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import time
from concurrent import futures
from unittest.mock import ANY, patch
from concurrent.futures._base import FINISHED
from unittest.mock import ANY, create_autospec, patch

import pytest
from freezegun import freeze_time
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler

from rele import Subscriber, Worker, sub
from rele.middleware import register_middleware
from rele.retry_policy import RetryPolicy
from rele.subscription import Callback
from rele.worker import create_and_run
from rele.worker import NotConnectionError, create_and_run


@sub(topic="some-cool-topic", prefix="rele")
Expand Down Expand Up @@ -46,6 +49,13 @@ def mock_create_subscription():
yield m


@pytest.fixture(autouse=True)
def mock_internet_connection():
with patch("rele.worker.check_internet_connection") as m:
m.return_value = True
yield m


class TestWorker:
def test_start_subscribes_and_saves_futures_when_subscriptions_given(
self, mock_consume, worker
Expand Down Expand Up @@ -116,6 +126,14 @@ def test_creates_subscription_with_custom_ack_deadline_from_environment(
assert worker._subscriber._ack_deadline == custom_ack_deadline
assert worker._subscriber._gc_project_id == "rele-test"

def test_raises_not_connection_error_during_start(
self, worker, mock_internet_connection
):
mock_internet_connection.return_value = False

with pytest.raises(NotConnectionError):
worker.start()


@pytest.mark.usefixtures("mock_create_subscription")
class TestRestartConsumer:
Expand All @@ -140,14 +158,64 @@ def test_restarts_consumption_when_future_is_cancelled(self, worker, mock_consum

assert len(mock_consume.call_args_list) == 2

def test_restarts_consumption_when_future_is_done(self, worker, mock_consume):
def test_raises_future_error_when_future_with_exception_is_cancelled(
self, worker, mock_create_subscription
):
with patch.object(Subscriber, "consume") as m:
mock_streaming_pull_future = create_autospec(
spec=StreamingPullFuture, instance=True
)
mock_streaming_pull_future.cancelled.return_value = True
mock_streaming_pull_future._state = FINISHED
m.return_value = mock_streaming_pull_future

with pytest.raises(ValueError):
worker.run_forever()

mock_streaming_pull_future.result.assert_called_once()

def test_restarts_consumption_when_future_is_done(
self, worker, mock_consume, mock_sleep
):
mock_consume.return_value.set_result(True)

with pytest.raises(ValueError):
worker.run_forever()

assert len(mock_consume.call_args_list) == 2

@freeze_time("2024-01-01 10:00:50Z")
@pytest.mark.usefixtures("mock_consume")
def test_wait_forever_if_we_have_connection_and_timestamp_module_50(
self, worker, mock_internet_connection
):
with pytest.raises(ValueError):
worker._wait_forever(1)

mock_internet_connection.assert_called_once()

@pytest.mark.usefixtures("mock_consume")
@pytest.mark.parametrize(
"timestamp_now", ["2024-01-01 10:00:49Z", "2024-01-01 10:00:51Z"]
)
def test_does_not_check_internet_connection_when_timestamp_is_not_module_50(
self, worker, mock_internet_connection, timestamp_now
):
with freeze_time(timestamp_now):
with pytest.raises(ValueError):
worker._wait_forever(1)

mock_internet_connection.assert_not_called()

@freeze_time("2024-01-01 10:00:50Z")
def test_raises_not_connection_error_during_wait_forever_if_connection_is_down_every_50_seconds( # noqa
self, worker, mock_internet_connection
):
mock_internet_connection.return_value = False

with pytest.raises(NotConnectionError):
worker._wait_forever(1)


class TestCreateAndRun:
@pytest.fixture(autouse=True)
Expand Down

0 comments on commit 5518cf7

Please sign in to comment.