diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fe456b4c9..7652b994b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ * Python: Added XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands ([#1658](https://github.com/aws/glide-for-redis/pull/1658)) * Python: Added LOLWUT command ([#1657](https://github.com/aws/glide-for-redis/pull/1657)) * Python: Added XREADGROUP command ([#1679](https://github.com/aws/glide-for-redis/pull/1679)) +* Python: Added XACK command ([#1681](https://github.com/aws/glide-for-redis/pull/1681)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index befa5be015..330a9018d3 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -2968,6 +2968,45 @@ async def xreadgroup( await self._execute_command(RequestType.XReadGroup, args), ) + async def xack( + self, + key: str, + group_name: str, + ids: List[str], + ) -> int: + """ + Removes one or multiple messages from the Pending Entries List (PEL) of a stream consumer group. + This command should be called on pending messages so that such messages do not get processed again by the + consumer group. + + See https://valkey.io/commands/xack for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + ids (List[str]): The stream entry IDs to acknowledge and consume for the given consumer group. + + Returns: + int: The number of messages that were successfully acknowledged. + + Examples: + >>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="1-0")) + >>> await client.xgroup_create("mystream", "mygroup", "0-0") + >>> await client.xreadgroup({"mystream": ">"}, "mygroup", "myconsumer") + { + "mystream": { + "1-0": [["field1", "value1"]], + } + } # Read one stream entry, the entry is now in the Pending Entries List for "mygroup". + >>> await client.xack("mystream", "mygroup", ["1-0"]) + 1 # 1 pending message was acknowledged and removed from the Pending Entries List for "mygroup". + """ + + return cast( + int, + await self._execute_command(RequestType.XAck, [key, group_name] + ids), + ) + async def geoadd( self, key: str, diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 209a2c501b..c28819293a 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -2075,6 +2075,29 @@ def xreadgroup( return self.append_command(RequestType.XReadGroup, args) + def xack( + self: TTransaction, + key: str, + group_name: str, + ids: List[str], + ) -> TTransaction: + """ + Removes one or multiple messages from the Pending Entries List (PEL) of a stream consumer group. + This command should be called on pending messages so that such messages do not get processed again by the + consumer group. + + See https://valkey.io/commands/xack for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + ids (List[str]): The stream entry IDs to acknowledge and consume for the given consumer group. + + Command response: + int: The number of messages that were successfully acknowledged. + """ + return self.append_command(RequestType.XAck, [key, group_name] + ids) + def geoadd( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 72b0a6c537..1da37a04f7 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -5485,6 +5485,83 @@ async def endless_xreadgroup_call(): with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(endless_xreadgroup_call(), timeout=3) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xack( + self, redis_client: TRedisClient, cluster_mode, protocol, request + ): + key = f"{{testKey}}:{get_random_string(10)}" + non_existing_key = f"{{testKey}}:{get_random_string(10)}" + string_key = f"{{testKey}}:{get_random_string(10)}" + group_name = get_random_string(10) + consumer_name = get_random_string(10) + stream_id0 = "0" + stream_id1_0 = "1-0" + stream_id1_1 = "1-1" + stream_id1_2 = "1-2" + + # setup: add 2 entries to the stream, create consumer group, read to mark them as pending + assert ( + await redis_client.xadd(key, [("f0", "v0")], StreamAddOptions(stream_id1_0)) + == stream_id1_0 + ) + assert ( + await redis_client.xadd(key, [("f1", "v1")], StreamAddOptions(stream_id1_1)) + == stream_id1_1 + ) + assert await redis_client.xgroup_create(key, group_name, stream_id0) == OK + assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == { + key: { + stream_id1_0: [["f0", "v0"]], + stream_id1_1: [["f1", "v1"]], + } + } + + # add one more entry + assert ( + await redis_client.xadd(key, [("f2", "v2")], StreamAddOptions(stream_id1_2)) + == stream_id1_2 + ) + + # acknowledge the first 2 entries + assert ( + await redis_client.xack(key, group_name, [stream_id1_0, stream_id1_1]) == 2 + ) + # attempting to acknowledge the first 2 entries again returns 0 since they were already acknowledged + assert ( + await redis_client.xack(key, group_name, [stream_id1_0, stream_id1_1]) == 0 + ) + # read the last, unacknowledged entry + assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == { + key: {stream_id1_2: [["f2", "v2"]]} + } + # deleting the consumer returns 1 since the last entry still hasn't been acknowledged + assert ( + await redis_client.xgroup_del_consumer(key, group_name, consumer_name) == 1 + ) + + # attempting to acknowledge a non-existing key returns 0 + assert ( + await redis_client.xack(non_existing_key, group_name, [stream_id1_0]) == 0 + ) + # attempting to acknowledge a non-existing group returns 0 + assert await redis_client.xack(key, "non_existing_group", [stream_id1_0]) == 0 + # attempting to acknowledge a non-existing ID returns 0 + assert await redis_client.xack(key, group_name, ["99-99"]) == 0 + + # invalid arg - ID list must not be empty + with pytest.raises(RequestError): + await redis_client.xack(key, group_name, []) + + # invalid arg - invalid stream ID format + with pytest.raises(RequestError): + await redis_client.xack(key, group_name, ["invalid_ID_format"]) + + # key exists, but it is not a stream + assert await redis_client.set(string_key, "foo") == OK + with pytest.raises(RequestError): + await redis_client.xack(string_key, group_name, [stream_id1_0]) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_pfadd(self, redis_client: TRedisClient): diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index e9b0749ab3..35381b862a 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -504,8 +504,10 @@ async def transaction_test( {key11: ">"}, group_name1, consumer, StreamReadGroupOptions(count=5) ) args.append({key11: {"0-2": [["foo", "bar"]]}}) - transaction.xgroup_del_consumer(key11, group_name1, consumer) + transaction.xack(key11, group_name1, ["0-2"]) args.append(1) + transaction.xgroup_del_consumer(key11, group_name1, consumer) + args.append(0) transaction.xgroup_destroy(key11, group_name1) args.append(True) transaction.xgroup_destroy(key11, group_name2)