Skip to content

Commit

Permalink
Python: add XACK command (#1681)
Browse files Browse the repository at this point in the history
* Python: add XACK command

* PR suggestions
  • Loading branch information
aaron-congo authored Jun 26, 2024
1 parent 82010ea commit 27705a4
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* Python: Added XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands ([#1658](https://github.com/aws/glide-for-redis/pull/1658))
* Python: Added LOLWUT command ([#1657](https://github.com/aws/glide-for-redis/pull/1657))
* Python: Added XREADGROUP command ([#1679](https://github.com/aws/glide-for-redis/pull/1679))
* Python: Added XACK command ([#1681](https://github.com/aws/glide-for-redis/pull/1681))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
39 changes: 39 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2968,6 +2968,45 @@ async def xreadgroup(
await self._execute_command(RequestType.XReadGroup, args),
)

async def xack(
self,
key: str,
group_name: str,
ids: List[str],
) -> int:
"""
Removes one or multiple messages from the Pending Entries List (PEL) of a stream consumer group.
This command should be called on pending messages so that such messages do not get processed again by the
consumer group.
See https://valkey.io/commands/xack for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
ids (List[str]): The stream entry IDs to acknowledge and consume for the given consumer group.
Returns:
int: The number of messages that were successfully acknowledged.
Examples:
>>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="1-0"))
>>> await client.xgroup_create("mystream", "mygroup", "0-0")
>>> await client.xreadgroup({"mystream": ">"}, "mygroup", "myconsumer")
{
"mystream": {
"1-0": [["field1", "value1"]],
}
} # Read one stream entry, the entry is now in the Pending Entries List for "mygroup".
>>> await client.xack("mystream", "mygroup", ["1-0"])
1 # 1 pending message was acknowledged and removed from the Pending Entries List for "mygroup".
"""

return cast(
int,
await self._execute_command(RequestType.XAck, [key, group_name] + ids),
)

async def geoadd(
self,
key: str,
Expand Down
23 changes: 23 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2075,6 +2075,29 @@ def xreadgroup(

return self.append_command(RequestType.XReadGroup, args)

def xack(
self: TTransaction,
key: str,
group_name: str,
ids: List[str],
) -> TTransaction:
"""
Removes one or multiple messages from the Pending Entries List (PEL) of a stream consumer group.
This command should be called on pending messages so that such messages do not get processed again by the
consumer group.
See https://valkey.io/commands/xack for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
ids (List[str]): The stream entry IDs to acknowledge and consume for the given consumer group.
Command response:
int: The number of messages that were successfully acknowledged.
"""
return self.append_command(RequestType.XAck, [key, group_name] + ids)

def geoadd(
self: TTransaction,
key: str,
Expand Down
77 changes: 77 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5485,6 +5485,83 @@ async def endless_xreadgroup_call():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_xreadgroup_call(), timeout=3)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xack(
self, redis_client: TRedisClient, cluster_mode, protocol, request
):
key = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:{get_random_string(10)}"
string_key = f"{{testKey}}:{get_random_string(10)}"
group_name = get_random_string(10)
consumer_name = get_random_string(10)
stream_id0 = "0"
stream_id1_0 = "1-0"
stream_id1_1 = "1-1"
stream_id1_2 = "1-2"

# setup: add 2 entries to the stream, create consumer group, read to mark them as pending
assert (
await redis_client.xadd(key, [("f0", "v0")], StreamAddOptions(stream_id1_0))
== stream_id1_0
)
assert (
await redis_client.xadd(key, [("f1", "v1")], StreamAddOptions(stream_id1_1))
== stream_id1_1
)
assert await redis_client.xgroup_create(key, group_name, stream_id0) == OK
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key: {
stream_id1_0: [["f0", "v0"]],
stream_id1_1: [["f1", "v1"]],
}
}

# add one more entry
assert (
await redis_client.xadd(key, [("f2", "v2")], StreamAddOptions(stream_id1_2))
== stream_id1_2
)

# acknowledge the first 2 entries
assert (
await redis_client.xack(key, group_name, [stream_id1_0, stream_id1_1]) == 2
)
# attempting to acknowledge the first 2 entries again returns 0 since they were already acknowledged
assert (
await redis_client.xack(key, group_name, [stream_id1_0, stream_id1_1]) == 0
)
# read the last, unacknowledged entry
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key: {stream_id1_2: [["f2", "v2"]]}
}
# deleting the consumer returns 1 since the last entry still hasn't been acknowledged
assert (
await redis_client.xgroup_del_consumer(key, group_name, consumer_name) == 1
)

# attempting to acknowledge a non-existing key returns 0
assert (
await redis_client.xack(non_existing_key, group_name, [stream_id1_0]) == 0
)
# attempting to acknowledge a non-existing group returns 0
assert await redis_client.xack(key, "non_existing_group", [stream_id1_0]) == 0
# attempting to acknowledge a non-existing ID returns 0
assert await redis_client.xack(key, group_name, ["99-99"]) == 0

# invalid arg - ID list must not be empty
with pytest.raises(RequestError):
await redis_client.xack(key, group_name, [])

# invalid arg - invalid stream ID format
with pytest.raises(RequestError):
await redis_client.xack(key, group_name, ["invalid_ID_format"])

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo") == OK
with pytest.raises(RequestError):
await redis_client.xack(string_key, group_name, [stream_id1_0])

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_pfadd(self, redis_client: TRedisClient):
Expand Down
4 changes: 3 additions & 1 deletion python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,10 @@ async def transaction_test(
{key11: ">"}, group_name1, consumer, StreamReadGroupOptions(count=5)
)
args.append({key11: {"0-2": [["foo", "bar"]]}})
transaction.xgroup_del_consumer(key11, group_name1, consumer)
transaction.xack(key11, group_name1, ["0-2"])
args.append(1)
transaction.xgroup_del_consumer(key11, group_name1, consumer)
args.append(0)
transaction.xgroup_destroy(key11, group_name1)
args.append(True)
transaction.xgroup_destroy(key11, group_name2)
Expand Down

0 comments on commit 27705a4

Please sign in to comment.