-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: add XGROUP SETID command #1683
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5562,6 +5562,90 @@ async def test_xack( | |
with pytest.raises(RequestError): | ||
await redis_client.xack(string_key, group_name, [stream_id1_0]) | ||
|
||
@pytest.mark.parametrize("cluster_mode", [True, False]) | ||
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) | ||
async def test_xgroup_set_id( | ||
self, redis_client: TGlideClient, cluster_mode, protocol, request | ||
): | ||
key = f"{{testKey}}:{get_random_string(10)}" | ||
non_existing_key = f"{{testKey}}:{get_random_string(10)}" | ||
string_key = f"{{testKey}}:{get_random_string(10)}" | ||
group_name = get_random_string(10) | ||
consumer_name = get_random_string(10) | ||
stream_id0 = "0" | ||
stream_id1_0 = "1-0" | ||
stream_id1_1 = "1-1" | ||
stream_id1_2 = "1-2" | ||
|
||
# setup: create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries | ||
# List | ||
assert ( | ||
await redis_client.xadd(key, [("f0", "v0")], StreamAddOptions(stream_id1_0)) | ||
== stream_id1_0 | ||
) | ||
assert ( | ||
await redis_client.xadd(key, [("f1", "v1")], StreamAddOptions(stream_id1_1)) | ||
== stream_id1_1 | ||
) | ||
assert ( | ||
await redis_client.xadd(key, [("f2", "v2")], StreamAddOptions(stream_id1_2)) | ||
== stream_id1_2 | ||
) | ||
assert await redis_client.xgroup_create(key, group_name, stream_id0) == OK | ||
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == { | ||
key: { | ||
stream_id1_0: [["f0", "v0"]], | ||
stream_id1_1: [["f1", "v1"]], | ||
stream_id1_2: [["f2", "v2"]], | ||
} | ||
} | ||
# sanity check: xreadgroup should not return more entries since they're all already in the Pending Entries List | ||
assert ( | ||
await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) is None | ||
) | ||
|
||
# reset the last delivered ID for the consumer group to "1-1" | ||
# ENTRIESREAD is only supported in Redis version 7.0.0 and above | ||
if await check_if_server_version_lt(redis_client, "7.0.0"): | ||
assert await redis_client.xgroup_set_id(key, group_name, stream_id1_1) == OK | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a separate command to call to test that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think XINFO GROUPS, we can probably test this once that gets implemented |
||
else: | ||
assert ( | ||
await redis_client.xgroup_set_id( | ||
key, group_name, stream_id1_1, entries_read_id=stream_id0 | ||
) | ||
== OK | ||
) | ||
|
||
# the entries_read_id cannot be the first, last, or zero ID. Here we pass the first ID and assert that an | ||
# error is raised. | ||
with pytest.raises(RequestError): | ||
await redis_client.xgroup_set_id( | ||
key, group_name, stream_id1_1, entries_read_id=stream_id1_0 | ||
) | ||
|
||
# xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1 | ||
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == { | ||
key: { | ||
stream_id1_2: [["f2", "v2"]], | ||
} | ||
} | ||
|
||
# an error is raised if XGROUP SETID is called with a non-existing key | ||
with pytest.raises(RequestError): | ||
await redis_client.xgroup_set_id(non_existing_key, group_name, stream_id0) | ||
|
||
# an error is raised if XGROUP SETID is called with a non-existing group | ||
with pytest.raises(RequestError): | ||
await redis_client.xgroup_set_id(key, "non_existing_group", stream_id0) | ||
|
||
# setting the ID to a non-existing ID is allowed | ||
assert await redis_client.xgroup_set_id(key, group_name, "99-99") == OK | ||
|
||
# key exists, but it is not a stream | ||
assert await redis_client.set(string_key, "foo") == OK | ||
with pytest.raises(RequestError): | ||
await redis_client.xgroup_set_id(string_key, group_name, stream_id0) | ||
|
||
@pytest.mark.parametrize("cluster_mode", [True, False]) | ||
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) | ||
async def test_pfadd(self, redis_client: TGlideClient): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As more I read this, as less I understand it.
No complains to you, you just copied the doc. Do you understand how it changes the command behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is pretty hard to understand from the existing Redis docs. I think this is relevant to the XINFO GROUPS command. If I understand correctly, you can pass this to tell Redis what the last entry received was for the consumer group. Then when you execute XINFO GROUPS it can return this info to you, as well as a count of how many messages have arrived since the indicated entries_read_id