Skip to content

Commit

Permalink
linter fix
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k committed Oct 26, 2023
1 parent 2d6bf28 commit 4da6759
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 16 deletions.
3 changes: 1 addition & 2 deletions fixcloudutils/redis/event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __init__(
batch_size: int = 1000,
stop_on_fail: bool = False,
backoff: Optional[Backoff] = Backoff(0.1, 10, 10),
parallelism: Optional[int] = None
parallelism: Optional[int] = None,
) -> None:
"""
Create a RedisStream client.
Expand Down Expand Up @@ -211,7 +211,6 @@ def task_done_callback(task: Task[Any]) -> None:
for stream, stream_messages in messages:
log.debug(f"Handle {len(stream_messages)} messages from stream.")
for uid, data in stream_messages:

if len(self._ongoing_tasks) >= max_parallelesm: # queue is full, wait for a slot to be freed
await asyncio.wait(self._ongoing_tasks, return_when=asyncio.FIRST_COMPLETED)

Expand Down
18 changes: 4 additions & 14 deletions tests/event_stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ async def check_all_arrived(expected_reader: int) -> bool:
# don't leave any traces
await redis.delete("test-stream", "test-stream.listener", "test-stream.dlq")


@pytest.mark.asyncio
@pytest.mark.skipif(os.environ.get("REDIS_RUNNING") is None, reason="Redis is not running")
async def test_stream_parallel(redis: Redis) -> None:
Expand All @@ -145,13 +146,7 @@ async def handle_message(group: int, uid: int, message: Json, _: MessageContext)

# create a single listener
stream = RedisStreamListener(
redis,
"test-stream",
"group",
"id",
partial(handle_message, 1, 1),
timedelta(seconds=1),
parallelism=10
redis, "test-stream", "group", "id", partial(handle_message, 1, 1), timedelta(seconds=1), parallelism=10
)
await stream.start()

Expand Down Expand Up @@ -208,13 +203,7 @@ async def handle_message(group: int, uid: int, message: Json, _: MessageContext)

# create a single listener
stream = RedisStreamListener(
redis,
"test-stream",
"group",
"id",
partial(handle_message, 1, 1),
timedelta(seconds=1),
parallelism=1
redis, "test-stream", "group", "id", partial(handle_message, 1, 1), timedelta(seconds=1), parallelism=1
)
await stream.start()

Expand Down Expand Up @@ -254,6 +243,7 @@ async def check_all_arrived(expected_reader: int) -> bool:
# don't leave any traces
await redis.delete("test-stream", "test-stream.listener", "test-stream.dlq")


@pytest.mark.asyncio
@pytest.mark.skipif(os.environ.get("REDIS_RUNNING") is None, reason="Redis is not running")
async def test_stream_pending(redis: Redis) -> None:
Expand Down

0 comments on commit 4da6759

Please sign in to comment.