diff --git a/CHANGELOG.md b/CHANGELOG.md index f09725bcdb..1bcb350c6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ * Python: Added XREADGROUP command ([#1679](https://github.com/aws/glide-for-redis/pull/1679)) * Python: Added XACK command ([#1681](https://github.com/aws/glide-for-redis/pull/1681)) * Python: Added FLUSHDB command ([#1680](https://github.com/aws/glide-for-redis/pull/1680)) +* Python: Added XGROUP SETID command ([#1683](https://github.com/aws/glide-for-redis/pull/1683)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/glide-core/src/protobuf/redis_request.proto b/glide-core/src/protobuf/redis_request.proto index e3d2b01b2c..b3330959ec 100644 --- a/glide-core/src/protobuf/redis_request.proto +++ b/glide-core/src/protobuf/redis_request.proto @@ -237,6 +237,7 @@ enum RequestType { FunctionDump = 196; FunctionRestore = 197; XPending = 198; + XGroupSetId = 199; } message Command { diff --git a/glide-core/src/request_type.rs b/glide-core/src/request_type.rs index 805db2dc91..943721bfd3 100644 --- a/glide-core/src/request_type.rs +++ b/glide-core/src/request_type.rs @@ -207,6 +207,7 @@ pub enum RequestType { FunctionDump = 196, FunctionRestore = 197, XPending = 198, + XGroupSetId = 199, } fn get_two_word_command(first: &str, second: &str) -> Cmd { @@ -417,6 +418,7 @@ impl From<::protobuf::EnumOrUnknown> for RequestType { ProtobufRequestType::FunctionDump => RequestType::FunctionDump, ProtobufRequestType::FunctionRestore => RequestType::FunctionRestore, ProtobufRequestType::XPending => RequestType::XPending, + ProtobufRequestType::XGroupSetId => RequestType::XGroupSetId, } } } @@ -625,6 +627,7 @@ impl RequestType { RequestType::FunctionDump => Some(get_two_word_command("FUNCTION", "DUMP")), RequestType::FunctionRestore => Some(get_two_word_command("FUNCTION", "RESTORE")), RequestType::XPending => Some(cmd("XPENDING")), + RequestType::XGroupSetId => Some(get_two_word_command("XGROUP", "SETID")), } } } diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 330a9018d3..d88fd8352e 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -2917,6 +2917,42 @@ async def xgroup_del_consumer( ), ) + async def xgroup_set_id( + self, + key: str, + group_name: str, + stream_id: str, + entries_read_id: Optional[str] = None, + ) -> TOK: + """ + Set the last delivered ID for a consumer group. + + See https://valkey.io/commands/xgroup-setid for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + stream_id (str): The stream entry ID that should be set as the last delivered ID for the consumer group. + entries_read_id (Optional[str]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0")) + used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last + entry. This argument can only be specified if you are using Redis version 7.0.0 or above. + + Returns: + TOK: A simple "OK" response. + + Examples: + >>> await client.xgroup_set_id("mystream", "mygroup", "0") + OK # The last delivered ID for consumer group "mygroup" was set to 0. + """ + args = [key, group_name, stream_id] + if entries_read_id is not None: + args.extend(["ENTRIESREAD", entries_read_id]) + + return cast( + TOK, + await self._execute_command(RequestType.XGroupSetId, args), + ) + async def xreadgroup( self, keys_and_ids: Mapping[str, str], diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 36ce987ea6..71ca2f2e18 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -2040,6 +2040,35 @@ def xgroup_del_consumer( RequestType.XGroupDelConsumer, [key, group_name, consumer_name] ) + def xgroup_set_id( + self: TTransaction, + key: str, + group_name: str, + stream_id: str, + entries_read_id: Optional[str] = None, + ) -> TTransaction: + """ + Set the last delivered ID for a consumer group. + + See https://valkey.io/commands/xgroup-setid for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + stream_id (str): The stream entry ID that should be set as the last delivered ID for the consumer group. + entries_read_id (Optional[str]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0")) + used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last + entry. This argument can only be specified if you are using Redis version 7.0.0 or above. + + Command response: + TOK: A simple "OK" response. + """ + args = [key, group_name, stream_id] + if entries_read_id is not None: + args.extend(["ENTRIESREAD", entries_read_id]) + + return self.append_command(RequestType.XGroupSetId, args) + def xreadgroup( self: TTransaction, keys_and_ids: Mapping[str, str], diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index b5134b2248..11155191ea 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -5562,6 +5562,90 @@ async def test_xack( with pytest.raises(RequestError): await redis_client.xack(string_key, group_name, [stream_id1_0]) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xgroup_set_id( + self, redis_client: TGlideClient, cluster_mode, protocol, request + ): + key = f"{{testKey}}:{get_random_string(10)}" + non_existing_key = f"{{testKey}}:{get_random_string(10)}" + string_key = f"{{testKey}}:{get_random_string(10)}" + group_name = get_random_string(10) + consumer_name = get_random_string(10) + stream_id0 = "0" + stream_id1_0 = "1-0" + stream_id1_1 = "1-1" + stream_id1_2 = "1-2" + + # setup: create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries + # List + assert ( + await redis_client.xadd(key, [("f0", "v0")], StreamAddOptions(stream_id1_0)) + == stream_id1_0 + ) + assert ( + await redis_client.xadd(key, [("f1", "v1")], StreamAddOptions(stream_id1_1)) + == stream_id1_1 + ) + assert ( + await redis_client.xadd(key, [("f2", "v2")], StreamAddOptions(stream_id1_2)) + == stream_id1_2 + ) + assert await redis_client.xgroup_create(key, group_name, stream_id0) == OK + assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == { + key: { + stream_id1_0: [["f0", "v0"]], + stream_id1_1: [["f1", "v1"]], + stream_id1_2: [["f2", "v2"]], + } + } + # sanity check: xreadgroup should not return more entries since they're all already in the Pending Entries List + assert ( + await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) is None + ) + + # reset the last delivered ID for the consumer group to "1-1" + # ENTRIESREAD is only supported in Redis version 7.0.0 and above + if await check_if_server_version_lt(redis_client, "7.0.0"): + assert await redis_client.xgroup_set_id(key, group_name, stream_id1_1) == OK + else: + assert ( + await redis_client.xgroup_set_id( + key, group_name, stream_id1_1, entries_read_id=stream_id0 + ) + == OK + ) + + # the entries_read_id cannot be the first, last, or zero ID. Here we pass the first ID and assert that an + # error is raised. + with pytest.raises(RequestError): + await redis_client.xgroup_set_id( + key, group_name, stream_id1_1, entries_read_id=stream_id1_0 + ) + + # xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1 + assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == { + key: { + stream_id1_2: [["f2", "v2"]], + } + } + + # an error is raised if XGROUP SETID is called with a non-existing key + with pytest.raises(RequestError): + await redis_client.xgroup_set_id(non_existing_key, group_name, stream_id0) + + # an error is raised if XGROUP SETID is called with a non-existing group + with pytest.raises(RequestError): + await redis_client.xgroup_set_id(key, "non_existing_group", stream_id0) + + # setting the ID to a non-existing ID is allowed + assert await redis_client.xgroup_set_id(key, group_name, "99-99") == OK + + # key exists, but it is not a stream + assert await redis_client.set(string_key, "foo") == OK + with pytest.raises(RequestError): + await redis_client.xgroup_set_id(string_key, group_name, stream_id0) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_pfadd(self, redis_client: TGlideClient): diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index aa64213e8a..89673c7dd3 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -506,6 +506,8 @@ async def transaction_test( args.append({key11: {"0-2": [["foo", "bar"]]}}) transaction.xack(key11, group_name1, ["0-2"]) args.append(1) + transaction.xgroup_set_id(key11, group_name1, "0-2") + args.append(OK) transaction.xgroup_del_consumer(key11, group_name1, consumer) args.append(0) transaction.xgroup_destroy(key11, group_name1)