Skip to content

Commit

Permalink
fix my comments
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Oct 30, 2023
1 parent 3ff9269 commit 971fd88
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
32 changes: 16 additions & 16 deletions fixcloudutils/redis/event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import sys
import uuid
from asyncio import Task
from collections import defaultdict
from contextlib import suppress
from datetime import datetime, timedelta
from functools import partial
Expand Down Expand Up @@ -74,6 +75,7 @@ class Backoff:
base_delay: float
maximum_delay: float
retries: int
log_failed_attempts: bool = True

def wait_time(self, attempt: int) -> float:
delay: float = self.base_delay * (2**attempt + random.uniform(0, 1))
Expand All @@ -85,14 +87,16 @@ async def with_backoff(self, fn: Callable[[], Awaitable[T]], attempt: int = 0) -
except Exception as e:
if attempt < self.retries:
delay = self.wait_time(attempt)
log.warning(f"Got Exception in attempt {attempt}. Retry after {delay} seconds: {e}")
if self.log_failed_attempts:
log.warning(f"Got Exception in attempt {attempt}. Retry after {delay} seconds: {e}")
await asyncio.sleep(delay)
return await self.with_backoff(fn, attempt + 1)
else:
raise


NoBackoff = Backoff(0, 0, 0)
DefaultBackoff = Backoff(0.1, 10, 10)


@define(frozen=True, slots=True)
Expand Down Expand Up @@ -125,7 +129,7 @@ def __init__(
consider_failed_after: timedelta,
batch_size: int = 1000,
stop_on_fail: bool = False,
backoff: Optional[Backoff] = Backoff(0.1, 10, 10),
backoff: Optional[Dict[str, Backoff]] = None,
parallelism: Optional[int] = None,
) -> None:
"""
Expand All @@ -140,7 +144,8 @@ def __init__(
:param consider_failed_after: The time after which a message is considered failed and will be retried.
:param batch_size: The number of events to read in one batch.
:param stop_on_fail: If True, the listener will stop if a failed event is retried too many times.
:param backoff: The backoff strategy to use when retrying failed events.
:param backoff: The backoff strategy for the defined message kind to use when retrying failed events.
The DefaultBackoff is used if no value is provided.
:param parallelism: If provided, messages will be processed in parallel without order.
"""
self.redis = redis
Expand All @@ -150,7 +155,7 @@ def __init__(
self.message_processor = message_processor
self.batch_size = batch_size
self.stop_on_fail = stop_on_fail
self.backoff = backoff or NoBackoff
self.backoff = defaultdict(lambda: DefaultBackoff) if backoff is None else backoff
self.__should_run = True
self.__listen_task: Optional[Task[Any]] = None
# Check for messages that are not processed for a long time by any listener. Try to claim and process them.
Expand Down Expand Up @@ -195,9 +200,9 @@ async def _handle_stream_messages(self, messages: List[Any]) -> None:
# acknowledge all processed messages
await self.redis.xack(self.stream, self.group, *ids)

async def _handle_stream_messages_parallel(self, messages: List[Any], max_parallelesm: int) -> None:
async def _handle_stream_messages_parallel(self, messages: List[Any], max_parallelism: int) -> None:
"""
Handle messages in parallel in unordered fasion. The number of parallel tasks is limited by max_parallelism.
Handle messages in parallel in an unordered fashion. The number of parallel tasks is limited by max_parallelism.
"""

async def handle_and_ack(msg: Any, message_id: StreamIdT) -> None:
Expand All @@ -210,26 +215,26 @@ def task_done_callback(task: Task[Any]) -> None:
for stream, stream_messages in messages:
log.debug(f"Handle {len(stream_messages)} messages from stream.")
for uid, data in stream_messages:
if len(self._ongoing_tasks) >= max_parallelesm: # queue is full, wait for a slot to be freed
while len(self._ongoing_tasks) >= max_parallelism: # queue is full, wait for a slot to be freed
await asyncio.wait(self._ongoing_tasks, return_when=asyncio.FIRST_COMPLETED)

task = asyncio.create_task(handle_and_ack(data, uid), name=f"handle_message_{uid}")
task.add_done_callback(task_done_callback)
self._ongoing_tasks.add(task)

async def _handle_single_message(self, message: Json) -> None:
try:
if "id" in message and "at" in message and "data" in message:
kind = message["kind"]
context = MessageContext(
id=message["id"],
kind=message["kind"],
kind=kind,
publisher=message["publisher"],
sent_at=parse_utc_str(message["at"]),
received_at=utc(),
)
data = json.loads(message["data"])
log.debug(f"Received message {self.listener}: message {context} data: {data}")
await self.backoff.with_backoff(partial(self.message_processor, data, context))
await self.backoff[kind].with_backoff(partial(self.message_processor, data, context))
else:
log.warning(f"Invalid message format: {message}. Ignore.")
except Exception as e:
Expand Down Expand Up @@ -303,12 +308,7 @@ async def read_all() -> None:
await self.__outdated_messages_task.start()

async def stop(self) -> Any:
async def stop_task(task: Task[Any]) -> None:
task.cancel()
with suppress(asyncio.CancelledError):
await task

await asyncio.gather(*[stop_task(task) for task in self._ongoing_tasks])
await asyncio.gather(*[stop_running_task(task) for task in self._ongoing_tasks])
self.__should_run = False
await self.__outdated_messages_task.stop()
await stop_running_task(self.__listen_task)
Expand Down
10 changes: 6 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import List
from typing import List, AsyncIterator

from arango.client import ArangoClient
from attr import define
from pytest import fixture
from redis.asyncio import Redis
from redis.backoff import ExponentialBackoff
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialBackoff

from fixcloudutils.arangodb.async_arangodb import AsyncArangoDB


@fixture
def redis() -> Redis:
async def redis() -> AsyncIterator[Redis]:
backoff = ExponentialBackoff() # type: ignore
return Redis(host="localhost", port=6379, decode_responses=True, retry=Retry(backoff, 10))
redis = Redis(host="localhost", port=6379, decode_responses=True, retry=Retry(backoff, 10))
yield redis
await redis.close(True)


@fixture
Expand Down
8 changes: 7 additions & 1 deletion tests/event_stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,13 @@ async def handle_message(message: Json, context: Any) -> None:

# a new redis listener started later will receive all messages
async with RedisStreamListener(
redis, "test-stream", "t1", "l1", handle_message, timedelta(seconds=5), backoff=Backoff(0, 0, 5)
redis,
"test-stream",
"t1",
"l1",
handle_message,
timedelta(seconds=5),
backoff=defaultdict(lambda: Backoff(0, 0, 5)),
):
async with RedisStreamPublisher(redis, "test-stream", "test") as publisher:
await publisher.publish("test_data", unstructure(ExampleData(1, "foo", [1, 2, 3])))
Expand Down

0 comments on commit 971fd88

Please sign in to comment.