diff --git a/fixcloudutils/redis/event_stream.py b/fixcloudutils/redis/event_stream.py index cf0256a..fe59369 100644 --- a/fixcloudutils/redis/event_stream.py +++ b/fixcloudutils/redis/event_stream.py @@ -50,6 +50,7 @@ ) from attrs import define +from prometheus_client import Counter from redis.asyncio import Redis from redis.typing import StreamIdT @@ -62,6 +63,14 @@ T = TypeVar("T") Json = Dict[str, Any] CommitTimeRE = re.compile(r"(\d{13})-.*") +MessageProcessingFailed = Counter( + "redis_messages_processing_failed", "Messages failed to process", ["stream", "listener", "group", "kind"] +) +MessagesProcessed = Counter( + "redis_stream_messages_processed", "Messages processed", ["stream", "listener", "group", "kind"] +) +MessagesPublished = Counter("redis_stream_messages_published", "Messages published", ["stream", "publisher", "kind"]) +MessagesCleaned = Counter("redis_stream_messages_cleaned", "Stream messages published", ["stream", "publisher"]) def time_from_id(uid: str, default: int) -> int: @@ -235,9 +244,18 @@ async def _handle_single_message(self, message: Json) -> None: data = json.loads(message["data"]) log.debug(f"Received message {self.listener}: message {context} data: {data}") await self.backoff[kind].with_backoff(partial(self.message_processor, data, context)) + MessagesProcessed.labels(stream=self.stream, listener=self.listener, group=self.group, kind=kind).inc() else: log.warning(f"Invalid message format: {message}. Ignore.") + kind = message.get("kind", "invalid") + MessageProcessingFailed.labels( + stream=self.stream, listener=self.listener, group=self.group, kind=kind + ).inc() except Exception as e: + kind = message.get("kind", "unknown") + MessageProcessingFailed.labels( + stream=self.stream, listener=self.listener, group=self.group, kind=kind + ).inc() if self.stop_on_fail: raise e else: @@ -326,12 +344,14 @@ def __init__( redis: Redis, stream: str, publisher_name: str, - keep_unprocessed_messages_for: timedelta = timedelta(days=1), + keep_unprocessed_messages_for: timedelta = timedelta(days=7), + keep_processed_messages_for: timedelta = timedelta(hours=3), ) -> None: self.redis = redis self.stream = stream self.publisher_name = publisher_name self.keep_unprocessed_messages_for = keep_unprocessed_messages_for + self.keep_processed_messages_for = keep_processed_messages_for self.clean_process = Periodic( "clean_processed_messages", self.cleanup_processed_messages, @@ -348,6 +368,7 @@ async def publish(self, kind: str, message: Json) -> None: "data": json.dumps(message), } await self.redis.xadd(self.stream, to_send) # type: ignore + MessagesPublished.labels(stream=self.stream, publisher=self.publisher_name, kind=kind).inc() async def cleanup_processed_messages(self) -> int: log.debug("Cleaning up processed messages.") @@ -363,10 +384,9 @@ async def cleanup_processed_messages(self) -> int: last_commit = info["last-delivered-id"] latest = min(latest, time_from_id(last_commit, latest)) # in case there is an inactive reader, make sure to only keep the unprocessed message time - latest = max( - latest, - int((datetime.now() - self.keep_unprocessed_messages_for).timestamp() * 1000), - ) + latest = max(latest, int((datetime.now() - self.keep_unprocessed_messages_for).timestamp() * 1000)) + # in case all messages have been processed, keep them for a defined time + latest = min(latest, int((datetime.now() - self.keep_processed_messages_for).timestamp() * 1000)) # iterate in batches over the stream and delete all messages that are older than the latest commit last = "0" cleaned_messages = 0 @@ -386,6 +406,7 @@ async def cleanup_processed_messages(self) -> int: log.info(f"Deleting processed or old messages from stream: {len(to_delete)}") cleaned_messages += len(to_delete) await self.redis.xdel(self.stream, *to_delete) + MessagesCleaned.labels(stream=self.stream, publisher=self.publisher_name).inc(len(to_delete)) if cleaned_messages > 0: log.info(f"Cleaning up processed messages done. Cleaned {cleaned_messages} messages.") return cleaned_messages diff --git a/fixcloudutils/redis/pub_sub.py b/fixcloudutils/redis/pub_sub.py index 83f7e6f..b52189c 100644 --- a/fixcloudutils/redis/pub_sub.py +++ b/fixcloudutils/redis/pub_sub.py @@ -34,6 +34,7 @@ from datetime import datetime from typing import Any, Optional, Callable, Awaitable +from prometheus_client import Counter from redis.asyncio import Redis from redis.asyncio.client import PubSub @@ -47,6 +48,10 @@ log = logging.getLogger("fixcloudutils.redis.pub_sub") redis_wildcard = re.compile(r"(? None: @@ -70,7 +75,9 @@ async def read_messages(pubsub: PubSub) -> None: await self.handler( data["id"], parse_utc_str(data["at"]), data["publisher"], data["kind"], data["data"] ) + MessagesProcessed.labels(channel=self.channel, kind=data["kind"]).inc() except Exception as ex: + MessageProcessingFailed.labels(channel=self.channel).inc() log.exception(f"Error handling message {msg}: {ex}. Ignore.") ps = self.redis.pubsub() @@ -110,3 +117,4 @@ async def publish(self, kind: str, message: Json, channel: Optional[str] = None) "data": message, } await self.redis.publish(channel or self.channel, json.dumps(to_send)) + MessagesPublished.labels(channel=self.channel, publisher=self.publisher_name, kind=kind).inc() diff --git a/fixcloudutils/util.py b/fixcloudutils/util.py index 341695a..508bd56 100644 --- a/fixcloudutils/util.py +++ b/fixcloudutils/util.py @@ -52,7 +52,7 @@ def identity(o: T) -> T: def value_in_path_get(element: JsonElement, path_or_name: Union[List[str], str], if_none: T) -> T: result = value_in_path(element, path_or_name) - return result if result is not None and isinstance(result, type(if_none)) else if_none + return result if result is not None and isinstance(result, type(if_none)) else if_none # type: ignore def value_in_path(element: JsonElement, path_or_name: Union[List[str], str]) -> Optional[Any]: diff --git a/pyproject.toml b/pyproject.toml index f89e124..4514bfb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fixcloudutils" -version = "1.10.0" +version = "1.11.0" authors = [{ name = "Some Engineering Inc." }] description = "Utilities for fixcloud." license = { file = "LICENSE" } diff --git a/tests/event_stream_test.py b/tests/event_stream_test.py index 88fa97f..27c8613 100644 --- a/tests/event_stream_test.py +++ b/tests/event_stream_test.py @@ -81,7 +81,7 @@ async def handle_message(group: int, uid: int, message: Json, _: MessageContext) await s.start() # publish 10 messages - publisher = RedisStreamPublisher(redis, "test-stream", "test") + publisher = RedisStreamPublisher(redis, "test-stream", "test", keep_processed_messages_for=timedelta(seconds=0)) for i in range(10): await publisher.publish("test_data", unstructure(ExampleData(i, "foo", [1, 2, 3])))