Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add XGROUP SETID command #1683

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* Python: Added XREADGROUP command ([#1679](https://github.com/aws/glide-for-redis/pull/1679))
* Python: Added XACK command ([#1681](https://github.com/aws/glide-for-redis/pull/1681))
* Python: Added FLUSHDB command ([#1680](https://github.com/aws/glide-for-redis/pull/1680))
* Python: Added XGROUP SETID command ([#1683](https://github.com/aws/glide-for-redis/pull/1683))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ enum RequestType {
FunctionDump = 196;
FunctionRestore = 197;
XPending = 198;
XGroupSetId = 199;
}

message Command {
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub enum RequestType {
FunctionDump = 196,
FunctionRestore = 197,
XPending = 198,
XGroupSetId = 199,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -417,6 +418,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::FunctionDump => RequestType::FunctionDump,
ProtobufRequestType::FunctionRestore => RequestType::FunctionRestore,
ProtobufRequestType::XPending => RequestType::XPending,
ProtobufRequestType::XGroupSetId => RequestType::XGroupSetId,
}
}
}
Expand Down Expand Up @@ -625,6 +627,7 @@ impl RequestType {
RequestType::FunctionDump => Some(get_two_word_command("FUNCTION", "DUMP")),
RequestType::FunctionRestore => Some(get_two_word_command("FUNCTION", "RESTORE")),
RequestType::XPending => Some(cmd("XPENDING")),
RequestType::XGroupSetId => Some(get_two_word_command("XGROUP", "SETID")),
}
}
}
36 changes: 36 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,42 @@ async def xgroup_del_consumer(
),
)

async def xgroup_set_id(
self,
key: str,
group_name: str,
stream_id: str,
entries_read_id: Optional[str] = None,
) -> TOK:
"""
Set the last delivered ID for a consumer group.

See https://valkey.io/commands/xgroup-setid for more details.

Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
stream_id (str): The stream entry ID that should be set as the last delivered ID for the consumer group.
entries_read_id (Optional[str]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This argument can only be specified if you are using Redis version 7.0.0 or above.
Comment on lines +2936 to +2938
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As more I read this, as less I understand it.
No complains to you, you just copied the doc. Do you understand how it changes the command behavior?

Copy link
Collaborator Author

@aaron-congo aaron-congo Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is pretty hard to understand from the existing Redis docs. I think this is relevant to the XINFO GROUPS command. If I understand correctly, you can pass this to tell Redis what the last entry received was for the consumer group. Then when you execute XINFO GROUPS it can return this info to you, as well as a count of how many messages have arrived since the indicated entries_read_id


Returns:
TOK: A simple "OK" response.

Examples:
>>> await client.xgroup_set_id("mystream", "mygroup", "0")
OK # The last delivered ID for consumer group "mygroup" was set to 0.
"""
args = [key, group_name, stream_id]
if entries_read_id is not None:
args.extend(["ENTRIESREAD", entries_read_id])

return cast(
TOK,
await self._execute_command(RequestType.XGroupSetId, args),
)

async def xreadgroup(
self,
keys_and_ids: Mapping[str, str],
Expand Down
29 changes: 29 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,35 @@ def xgroup_del_consumer(
RequestType.XGroupDelConsumer, [key, group_name, consumer_name]
)

def xgroup_set_id(
self: TTransaction,
key: str,
group_name: str,
stream_id: str,
entries_read_id: Optional[str] = None,
) -> TTransaction:
"""
Set the last delivered ID for a consumer group.

See https://valkey.io/commands/xgroup-setid for more details.

Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
stream_id (str): The stream entry ID that should be set as the last delivered ID for the consumer group.
entries_read_id (Optional[str]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This argument can only be specified if you are using Redis version 7.0.0 or above.

Command response:
TOK: A simple "OK" response.
"""
args = [key, group_name, stream_id]
if entries_read_id is not None:
args.extend(["ENTRIESREAD", entries_read_id])

return self.append_command(RequestType.XGroupSetId, args)

def xreadgroup(
self: TTransaction,
keys_and_ids: Mapping[str, str],
Expand Down
84 changes: 84 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5562,6 +5562,90 @@ async def test_xack(
with pytest.raises(RequestError):
await redis_client.xack(string_key, group_name, [stream_id1_0])

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xgroup_set_id(
self, redis_client: TGlideClient, cluster_mode, protocol, request
):
key = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:{get_random_string(10)}"
string_key = f"{{testKey}}:{get_random_string(10)}"
group_name = get_random_string(10)
consumer_name = get_random_string(10)
stream_id0 = "0"
stream_id1_0 = "1-0"
stream_id1_1 = "1-1"
stream_id1_2 = "1-2"

# setup: create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries
# List
assert (
await redis_client.xadd(key, [("f0", "v0")], StreamAddOptions(stream_id1_0))
== stream_id1_0
)
assert (
await redis_client.xadd(key, [("f1", "v1")], StreamAddOptions(stream_id1_1))
== stream_id1_1
)
assert (
await redis_client.xadd(key, [("f2", "v2")], StreamAddOptions(stream_id1_2))
== stream_id1_2
)
assert await redis_client.xgroup_create(key, group_name, stream_id0) == OK
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key: {
stream_id1_0: [["f0", "v0"]],
stream_id1_1: [["f1", "v1"]],
stream_id1_2: [["f2", "v2"]],
}
}
# sanity check: xreadgroup should not return more entries since they're all already in the Pending Entries List
assert (
await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) is None
)

# reset the last delivered ID for the consumer group to "1-1"
# ENTRIESREAD is only supported in Redis version 7.0.0 and above
if await check_if_server_version_lt(redis_client, "7.0.0"):
assert await redis_client.xgroup_set_id(key, group_name, stream_id1_1) == OK
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a separate command to call to test that ENTRIESREAD has an affects the group?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think XINFO GROUPS, we can probably test this once that gets implemented

else:
assert (
await redis_client.xgroup_set_id(
key, group_name, stream_id1_1, entries_read_id=stream_id0
)
== OK
)

# the entries_read_id cannot be the first, last, or zero ID. Here we pass the first ID and assert that an
# error is raised.
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(
key, group_name, stream_id1_1, entries_read_id=stream_id1_0
)

# xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key: {
stream_id1_2: [["f2", "v2"]],
}
}

# an error is raised if XGROUP SETID is called with a non-existing key
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(non_existing_key, group_name, stream_id0)

# an error is raised if XGROUP SETID is called with a non-existing group
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(key, "non_existing_group", stream_id0)

# setting the ID to a non-existing ID is allowed
assert await redis_client.xgroup_set_id(key, group_name, "99-99") == OK

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo") == OK
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(string_key, group_name, stream_id0)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_pfadd(self, redis_client: TGlideClient):
Expand Down
2 changes: 2 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ async def transaction_test(
args.append({key11: {"0-2": [["foo", "bar"]]}})
transaction.xack(key11, group_name1, ["0-2"])
args.append(1)
transaction.xgroup_set_id(key11, group_name1, "0-2")
args.append(OK)
transaction.xgroup_del_consumer(key11, group_name1, consumer)
args.append(0)
transaction.xgroup_destroy(key11, group_name1)
Expand Down
Loading