From 4da67599a9729ad569f6d9f6c44970981d37a5c3 Mon Sep 17 00:00:00 2001 From: Nikita Melkozerov Date: Thu, 26 Oct 2023 17:33:04 +0000 Subject: [PATCH] linter fix --- fixcloudutils/redis/event_stream.py | 3 +-- tests/event_stream_test.py | 18 ++++-------------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/fixcloudutils/redis/event_stream.py b/fixcloudutils/redis/event_stream.py index 1ab9064..f6d1882 100644 --- a/fixcloudutils/redis/event_stream.py +++ b/fixcloudutils/redis/event_stream.py @@ -126,7 +126,7 @@ def __init__( batch_size: int = 1000, stop_on_fail: bool = False, backoff: Optional[Backoff] = Backoff(0.1, 10, 10), - parallelism: Optional[int] = None + parallelism: Optional[int] = None, ) -> None: """ Create a RedisStream client. @@ -211,7 +211,6 @@ def task_done_callback(task: Task[Any]) -> None: for stream, stream_messages in messages: log.debug(f"Handle {len(stream_messages)} messages from stream.") for uid, data in stream_messages: - if len(self._ongoing_tasks) >= max_parallelesm: # queue is full, wait for a slot to be freed await asyncio.wait(self._ongoing_tasks, return_when=asyncio.FIRST_COMPLETED) diff --git a/tests/event_stream_test.py b/tests/event_stream_test.py index 367a3e2..89c70d2 100644 --- a/tests/event_stream_test.py +++ b/tests/event_stream_test.py @@ -127,6 +127,7 @@ async def check_all_arrived(expected_reader: int) -> bool: # don't leave any traces await redis.delete("test-stream", "test-stream.listener", "test-stream.dlq") + @pytest.mark.asyncio @pytest.mark.skipif(os.environ.get("REDIS_RUNNING") is None, reason="Redis is not running") async def test_stream_parallel(redis: Redis) -> None: @@ -145,13 +146,7 @@ async def handle_message(group: int, uid: int, message: Json, _: MessageContext) # create a single listener stream = RedisStreamListener( - redis, - "test-stream", - "group", - "id", - partial(handle_message, 1, 1), - timedelta(seconds=1), - parallelism=10 + redis, "test-stream", "group", "id", partial(handle_message, 1, 1), timedelta(seconds=1), parallelism=10 ) await stream.start() @@ -208,13 +203,7 @@ async def handle_message(group: int, uid: int, message: Json, _: MessageContext) # create a single listener stream = RedisStreamListener( - redis, - "test-stream", - "group", - "id", - partial(handle_message, 1, 1), - timedelta(seconds=1), - parallelism=1 + redis, "test-stream", "group", "id", partial(handle_message, 1, 1), timedelta(seconds=1), parallelism=1 ) await stream.start() @@ -254,6 +243,7 @@ async def check_all_arrived(expected_reader: int) -> bool: # don't leave any traces await redis.delete("test-stream", "test-stream.listener", "test-stream.dlq") + @pytest.mark.asyncio @pytest.mark.skipif(os.environ.get("REDIS_RUNNING") is None, reason="Redis is not running") async def test_stream_pending(redis: Redis) -> None: