Skip to content

Commit

Permalink
[feat] Redis metrics (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Nov 14, 2023
1 parent 8151d49 commit 319848f
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 8 deletions.
31 changes: 26 additions & 5 deletions fixcloudutils/redis/event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
)

from attrs import define
from prometheus_client import Counter
from redis.asyncio import Redis
from redis.typing import StreamIdT

Expand All @@ -62,6 +63,14 @@
T = TypeVar("T")
Json = Dict[str, Any]
CommitTimeRE = re.compile(r"(\d{13})-.*")
MessageProcessingFailed = Counter(
"redis_messages_processing_failed", "Messages failed to process", ["stream", "listener", "group", "kind"]
)
MessagesProcessed = Counter(
"redis_stream_messages_processed", "Messages processed", ["stream", "listener", "group", "kind"]
)
MessagesPublished = Counter("redis_stream_messages_published", "Messages published", ["stream", "publisher", "kind"])
MessagesCleaned = Counter("redis_stream_messages_cleaned", "Stream messages published", ["stream", "publisher"])


def time_from_id(uid: str, default: int) -> int:
Expand Down Expand Up @@ -235,9 +244,18 @@ async def _handle_single_message(self, message: Json) -> None:
data = json.loads(message["data"])
log.debug(f"Received message {self.listener}: message {context} data: {data}")
await self.backoff[kind].with_backoff(partial(self.message_processor, data, context))
MessagesProcessed.labels(stream=self.stream, listener=self.listener, group=self.group, kind=kind).inc()
else:
log.warning(f"Invalid message format: {message}. Ignore.")
kind = message.get("kind", "invalid")
MessageProcessingFailed.labels(
stream=self.stream, listener=self.listener, group=self.group, kind=kind
).inc()
except Exception as e:
kind = message.get("kind", "unknown")
MessageProcessingFailed.labels(
stream=self.stream, listener=self.listener, group=self.group, kind=kind
).inc()
if self.stop_on_fail:
raise e
else:
Expand Down Expand Up @@ -326,12 +344,14 @@ def __init__(
redis: Redis,
stream: str,
publisher_name: str,
keep_unprocessed_messages_for: timedelta = timedelta(days=1),
keep_unprocessed_messages_for: timedelta = timedelta(days=7),
keep_processed_messages_for: timedelta = timedelta(hours=3),
) -> None:
self.redis = redis
self.stream = stream
self.publisher_name = publisher_name
self.keep_unprocessed_messages_for = keep_unprocessed_messages_for
self.keep_processed_messages_for = keep_processed_messages_for
self.clean_process = Periodic(
"clean_processed_messages",
self.cleanup_processed_messages,
Expand All @@ -348,6 +368,7 @@ async def publish(self, kind: str, message: Json) -> None:
"data": json.dumps(message),
}
await self.redis.xadd(self.stream, to_send) # type: ignore
MessagesPublished.labels(stream=self.stream, publisher=self.publisher_name, kind=kind).inc()

async def cleanup_processed_messages(self) -> int:
log.debug("Cleaning up processed messages.")
Expand All @@ -363,10 +384,9 @@ async def cleanup_processed_messages(self) -> int:
last_commit = info["last-delivered-id"]
latest = min(latest, time_from_id(last_commit, latest))
# in case there is an inactive reader, make sure to only keep the unprocessed message time
latest = max(
latest,
int((datetime.now() - self.keep_unprocessed_messages_for).timestamp() * 1000),
)
latest = max(latest, int((datetime.now() - self.keep_unprocessed_messages_for).timestamp() * 1000))
# in case all messages have been processed, keep them for a defined time
latest = min(latest, int((datetime.now() - self.keep_processed_messages_for).timestamp() * 1000))
# iterate in batches over the stream and delete all messages that are older than the latest commit
last = "0"
cleaned_messages = 0
Expand All @@ -386,6 +406,7 @@ async def cleanup_processed_messages(self) -> int:
log.info(f"Deleting processed or old messages from stream: {len(to_delete)}")
cleaned_messages += len(to_delete)
await self.redis.xdel(self.stream, *to_delete)
MessagesCleaned.labels(stream=self.stream, publisher=self.publisher_name).inc(len(to_delete))
if cleaned_messages > 0:
log.info(f"Cleaning up processed messages done. Cleaned {cleaned_messages} messages.")
return cleaned_messages
Expand Down
8 changes: 8 additions & 0 deletions fixcloudutils/redis/pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from datetime import datetime
from typing import Any, Optional, Callable, Awaitable

from prometheus_client import Counter
from redis.asyncio import Redis
from redis.asyncio.client import PubSub

Expand All @@ -47,6 +48,10 @@
log = logging.getLogger("fixcloudutils.redis.pub_sub")
redis_wildcard = re.compile(r"(?<!\\)[*?\[]")

MessageProcessingFailed = Counter("redis_pubsub_processing_failed", "Messages failed to process", ["channel"])
MessagesProcessed = Counter("redis_pubsub_messages_processed", "Messages processed", ["channel", "kind"])
MessagesPublished = Counter("redis_pubsub_messages_published", "Messages published", ["channel", "publisher", "kind"])


class RedisPubSubListener(Service):
def __init__(self, redis: Redis, channel: str, handler: MessageHandler) -> None:
Expand All @@ -70,7 +75,9 @@ async def read_messages(pubsub: PubSub) -> None:
await self.handler(
data["id"], parse_utc_str(data["at"]), data["publisher"], data["kind"], data["data"]
)
MessagesProcessed.labels(channel=self.channel, kind=data["kind"]).inc()
except Exception as ex:
MessageProcessingFailed.labels(channel=self.channel).inc()
log.exception(f"Error handling message {msg}: {ex}. Ignore.")

ps = self.redis.pubsub()
Expand Down Expand Up @@ -110,3 +117,4 @@ async def publish(self, kind: str, message: Json, channel: Optional[str] = None)
"data": message,
}
await self.redis.publish(channel or self.channel, json.dumps(to_send))
MessagesPublished.labels(channel=self.channel, publisher=self.publisher_name, kind=kind).inc()
2 changes: 1 addition & 1 deletion fixcloudutils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def identity(o: T) -> T:

def value_in_path_get(element: JsonElement, path_or_name: Union[List[str], str], if_none: T) -> T:
result = value_in_path(element, path_or_name)
return result if result is not None and isinstance(result, type(if_none)) else if_none
return result if result is not None and isinstance(result, type(if_none)) else if_none # type: ignore


def value_in_path(element: JsonElement, path_or_name: Union[List[str], str]) -> Optional[Any]:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "fixcloudutils"
version = "1.10.0"
version = "1.11.0"
authors = [{ name = "Some Engineering Inc." }]
description = "Utilities for fixcloud."
license = { file = "LICENSE" }
Expand Down
2 changes: 1 addition & 1 deletion tests/event_stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async def handle_message(group: int, uid: int, message: Json, _: MessageContext)
await s.start()

# publish 10 messages
publisher = RedisStreamPublisher(redis, "test-stream", "test")
publisher = RedisStreamPublisher(redis, "test-stream", "test", keep_processed_messages_for=timedelta(seconds=0))
for i in range(10):
await publisher.publish("test_data", unstructure(ExampleData(i, "foo", [1, 2, 3])))

Expand Down

0 comments on commit 319848f

Please sign in to comment.