diff --git a/minique/utils/redis_list.py b/minique/utils/redis_list.py index 753ac91..e3ed360 100644 --- a/minique/utils/redis_list.py +++ b/minique/utils/redis_list.py @@ -1,4 +1,4 @@ -from typing import Iterable, Optional +from typing import Iterable, Optional, Callable from redis import Redis @@ -8,6 +8,7 @@ def read_list( key: str, *, chunk_size: int = 4096, + filter_fn: Optional[Callable[[bytes], bool]] = None, last_n: Optional[int] = None, ) -> Iterable[bytes]: """ @@ -18,7 +19,8 @@ def read_list( :param redis_conn: Redis connection :param key: Key :param chunk_size: How many lines to read per request. - :param last_n: Attempt to only read the last N lines. + :param filter_fn: Only include lines that pass this filter + :param last_n: How many last lines to return, filtering is applied before limiting. :return: """ # LLEN returns zero for a non-existent key, @@ -31,17 +33,30 @@ def read_list( if chunk_size <= 0: chunk_size = 4096 - if last_n and last_n > 0: - offset = max(0, list_len - last_n) - else: - offset = 0 + results = [] - while offset < list_len: - # Regarding that - 1 there, see this from https://redis.io/commands/lrange: - # > Note that if you have a list of numbers from 0 to 100, LRANGE list 0 10 - # > will return 11 elements, that is, the rightmost item is included. - chunk = redis_conn.lrange(key, offset, offset + chunk_size - 1) or [] + # Regarding that - 1 there, see this from https://redis.io/commands/lrange: + # > Note that if you have a list of numbers from 0 to 100, LRANGE list 0 10 + # > will return 11 elements, that is, the rightmost item is included. + end_index = list_len - 1 + remaining_needed = last_n if last_n and last_n > 0 else list_len + + # read until we have enough items, or we run out of the list + while end_index >= 0 and remaining_needed > 0: + start_index = max(0, end_index - chunk_size + 1) + + chunk = redis_conn.lrange(key, start_index, end_index) if not chunk: break - yield from chunk - offset += chunk_size + + for item in reversed(chunk): + if not filter_fn or filter_fn(item): + results.append(item) + remaining_needed -= 1 + if remaining_needed == 0: + break + + # move the reading window further back + end_index = start_index - 1 + + yield from reversed(results) diff --git a/minique_tests/test_redis_list.py b/minique_tests/test_redis_list.py index 919f100..4263b17 100644 --- a/minique_tests/test_redis_list.py +++ b/minique_tests/test_redis_list.py @@ -3,8 +3,49 @@ from minique.utils.redis_list import read_list -def test_redis_list(redis: Redis, random_queue_name: str): +def test_read_list(redis: Redis, random_queue_name: str): data = [str(x).encode() for x in range(500)] redis.rpush(random_queue_name, *data) assert list(read_list(redis, random_queue_name, chunk_size=7)) == data redis.delete(random_queue_name) + + +def test_read_list_last_n(redis: Redis, random_queue_name: str): + data = [str(x).encode() for x in range(6)] + redis.rpush(random_queue_name, *data) + + last_two = read_list(redis, random_queue_name, last_n=2) + assert list(last_two) == [b"4", b"5"] + + last_four = read_list(redis, random_queue_name, last_n=4) + assert list(last_four) == [b"2", b"3", b"4", b"5"] + + last_everything = read_list(redis, random_queue_name, last_n=999_999) + assert list(last_everything) == [b"0", b"1", b"2", b"3", b"4", b"5"] + + redis.delete(random_queue_name) + + +def test_read_list_filter(redis: Redis, random_queue_name: str): + data = [str(x).encode() for x in range(101)] + redis.rpush(random_queue_name, *data) + + last_four_without_nines = read_list( + redis, + random_queue_name, + chunk_size=2, + last_n=4, + filter_fn=lambda line: b"9" not in line, + ) + assert list(last_four_without_nines) == [b"86", b"87", b"88", b"100"] + + last_four_starting_with_one = read_list( + redis, + random_queue_name, + chunk_size=5, + last_n=4, + filter_fn=lambda line: line.startswith(b"1"), + ) + assert list(last_four_starting_with_one) == [b"17", b"18", b"19", b"100"] + + redis.delete(random_queue_name)