Skip to content

Commit

Permalink
[feat] small improvements (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Apr 3, 2024
1 parent 23d12cb commit ec1fb85
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 12 deletions.
25 changes: 24 additions & 1 deletion fixcloudutils/redis/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import pickle
from asyncio import Task, Queue, CancelledError, create_task
from datetime import datetime, timedelta
from typing import Any, Optional, TypeVar, Callable, ParamSpec, NewType, Hashable
from functools import wraps
from typing import Any, Optional, TypeVar, Callable, ParamSpec, NewType, Hashable, Awaitable

from attr import frozen
from prometheus_client import Counter
Expand Down Expand Up @@ -113,6 +114,28 @@ async def evict(self, key: str) -> None:
log.debug(f"{self.key}: Evict {key}")
await self.queue.put(RedisCacheEvict(self._redis_key(key)))

def cache(
self, key: str, *, ttl_memory: Optional[timedelta] = None, ttl_redis: Optional[timedelta] = None
) -> Callable[[Callable[P, T]], Callable[P, Awaitable[T]]]:
"""
Use it as a decorator.
```
rc = RedisCache(...)
@rc.cache("my_key")
def my_function():
...
```
"""

def decorator(fn: Callable[P, T]) -> Callable[P, Awaitable[T]]:
@wraps(fn)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
return await self.call(fn, key, ttl_memory=ttl_memory, ttl_redis=ttl_redis)(*args, **kwargs) # type: ignore # noqa

return wrapper

return decorator

def call(
self,
fn: Callable[P, T],
Expand Down
21 changes: 13 additions & 8 deletions fixcloudutils/redis/event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import random
import re
import sys
import traceback
import uuid
from asyncio import Task
from collections import defaultdict
Expand Down Expand Up @@ -186,13 +187,13 @@ async def _listen(self) -> None:
self.group, self.listener, {self.stream: self.__readpos}, count=self.batch_size, block=1000
)
self.__readpos = ">"
if not messages:
continue
if self.parallelism:
await self._handle_stream_messages_parallel(messages, self.parallelism)
else:
await self._handle_stream_messages(messages)
except Exception as e:
if isinstance(e, RuntimeError) and len(e.args) and e.args[0] == "no running event loop":
raise e
log.error(f"Failed to read from stream {self.stream}: {e}", exc_info=True)
if self.stop_on_fail:
raise
Expand Down Expand Up @@ -273,6 +274,7 @@ async def _handle_single_message(self, message: Json) -> None:
{
"listener": self.listener,
"error": str(e),
"stack": traceback.format_exc(),
"message": json.dumps(message),
},
)
Expand Down Expand Up @@ -302,19 +304,22 @@ async def _handle_pending_messages(
count=self.batch_size,
idle=min_idle,
)
if len(pending_messages) == 0:
break

message_ids = [
pm["message_id"] for pm in pending_messages if (pm["times_delivered"] < 10 or ignore_delivery_count)
]
if not message_ids:
break

log.debug(f"Found {len(pending_messages)} pending messages and {len(message_ids)} for this listener.")
log.info(f"Found {len(pending_messages)} pending messages and {len(message_ids)} for this listener.")

# it is possible that claiming the message fails
with suppress(Exception):
try:
messages = await self.redis.xclaim(self.stream, self.group, self.listener, min_idle or 0, message_ids)
await self._handle_stream_messages([(self.stream, messages)])
except Exception as e:
log.warning(f"Failed to claim pending messages: {e}. Wait for next cycle.", exc_info=True)
break
# process claimed messages
await self._handle_stream_messages([(self.stream, messages)])

async def start(self) -> Any:
self.__should_run = True
Expand Down
2 changes: 1 addition & 1 deletion fixcloudutils/redis/pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def read_messages(pubsub: PubSub) -> None:
async def stop(self) -> None:
await stop_running_task(self.reader)
if self.pubsub:
await self.pubsub.close()
await self.pubsub.aclose() # type: ignore
self.pubsub = None


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "fixcloudutils"
version = "1.13.2"
version = "1.13.3"
authors = [{ name = "Some Engineering Inc." }]
description = "Utilities for fixcloud."
license = { file = "LICENSE" }
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def redis() -> AsyncIterator[Redis]:
redis = Redis(host="localhost", port=6379, db=0, decode_responses=True, retry=Retry(backoff, 10))
await redis.flushdb() # wipe redis
yield redis
await redis.close(True)
await redis.aclose(True)


@fixture
Expand Down

0 comments on commit ec1fb85

Please sign in to comment.