Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add XACK stream command #380

Merged
merged 8 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
Expand Down Expand Up @@ -1447,6 +1448,13 @@ public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse);
}

@Override
public CompletableFuture<Long> xack(
@NonNull String key, @NonNull String group, @NonNull String[] ids) {
String[] args = concatenateArrays(new String[] {key, group}, ids);
return commandManager.submitNewCommand(XAck, args, this::handleLongResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ CompletableFuture<String> xgroupCreate(
* @param consumer The newly created consumer.
* @return A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code> with a value of code>null</code> if the stream is empty.
* Returns <code>null</code> if the consumer group does not exist. Returns a code>Map</code> with a value of code>null</code> if the stream is empty.
* @example
* <pre>{@code
* // create a new stream at "mystream", with stream id "1-0"
Expand Down Expand Up @@ -462,7 +462,7 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
* @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
* @return A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code> with a value of code>null</code> if the stream is empty.
* Returns <code>null</code> if the consumer group does not exist. Returns a code>Map</code> with a value of code>null</code> if the stream is empty.
* @example
* <pre>{@code
* // create a new stream at "mystream", with stream id "1-0"
Expand Down Expand Up @@ -493,4 +493,23 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
String group,
String consumer,
StreamReadGroupOptions options);

/**
* Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
* This command should be called on a pending message so that such message does not get processed again.
*
* @param key The key of the stream.
* @param group The consumer group name.
* @param ids Stream entry ID to acknowledge and purge messages.
* @return The number of messages that were successfully acknowledged.
* @example
* <pre>{@code
* String entryId = client.xadd("mystream", Map.of("myfield", "mydata")).get();
* // read messages from streamId
* var readResult = client.xreadgroup(Map.of("mystream", entryId), "mygroup", "my0consumer").get();
* // acknowledge messages on stream
* assert 1L == client.xack("mystream", "mygroup", new String[] {entryId}).get();
* </pre>
*/
CompletableFuture<Long> xack(String key, String group, String[] ids);
}
21 changes: 19 additions & 2 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
Expand Down Expand Up @@ -3065,7 +3066,7 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull
* @return Command Response - A <code>{@literal Map<String, Map<String, String[][]>>}</code> with
* stream keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>
* [[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code>
* Returns <code>null</code> if the consumer group does not exist. Returns a code>Map</code>
* with a value of code>null</code> if the stream is empty.
*/
public T xreadgroup(
Expand All @@ -3089,7 +3090,7 @@ public T xreadgroup(
* @return Command Response - A <code>{@literal Map<String, Map<String, String[][]>>}</code> with
* stream keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>
* [[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code>
* Returns <code>null</code> if the consumer group does not exist. Returns a code>Map</code>
* with a value of code>null</code> if the stream is empty.
*/
public T xreadgroup(
Expand All @@ -3102,6 +3103,22 @@ public T xreadgroup(
return getThis();
}

/**
* Returns the number of messages that were successfully acknowledged by the consumer group member
* of a stream. This command should be called on a pending message so that such message does not
* get processed again.
*
* @param key The key of the stream.
* @param group The consumer group name.
* @param ids Stream entry ID to acknowledge and purge messages.
* @return Command Response - The number of messages that were successfully acknowledged.
*/
public T xack(@NonNull String key, @NonNull String group, @NonNull String[] ids) {
String[] args = concatenateArrays(new String[] {key, group}, ids);
protobufTransaction.addCommands(buildCommand(XAck, buildArgs(args)));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
27 changes: 27 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
Expand Down Expand Up @@ -4612,6 +4613,32 @@ public void xreadgroup_with_options() {
assertEquals(completedResult, payload);
}

@SneakyThrows
@Test
public void xack_returns_success() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String[] ids = new String[] {"testId"};
String[] arguments = concatenateArrays(new String[] {key, groupName}, ids);
Long mockResult = 1L;

CompletableFuture<Long> testResponse = new CompletableFuture<>();
testResponse.complete(mockResult);

// match on protobuf request
when(commandManager.<Long>submitNewCommand(eq(XAck), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Long> response = service.xack(key, groupName, ids);
Long payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(mockResult, payload);
}

@SneakyThrows
@Test
public void type_returns_success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
Expand Down Expand Up @@ -818,6 +819,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
"key",
"id")));

transaction.xack("key", "group", new String[] {"12345-1", "98765-4"});
results.add(Pair.of(XAck, buildArgs("key", "group", "12345-1", "98765-4")));

transaction.time();
results.add(Pair.of(Time, buildArgs()));

Expand Down
68 changes: 56 additions & 12 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3481,7 +3481,7 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) {
@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) {
public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient client) {
String key = UUID.randomUUID().toString();
String stringKey = UUID.randomUUID().toString();
String groupName = "group" + UUID.randomUUID();
Expand Down Expand Up @@ -3530,7 +3530,7 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client)
// delete one of the streams
assertEquals(1L, client.xdel(key, new String[] {streamid_1}).get());

// now xreadgroup yeilds one empty stream and one non-empty stream
// now xreadgroup returns one empty stream and one non-empty stream
var result_2 = client.xreadgroup(Map.of(key, "0"), groupName, consumerName).get();
assertEquals(2, result_2.get(key).size());
assertNull(result_2.get(key).get(streamid_1));
Expand All @@ -3539,13 +3539,22 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client)
String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get();
assertNotNull(streamid_3);

// Delete the consumer group and expect 2 pending messages
assertEquals(2L, client.xgroupDelConsumer(key, groupName, consumerName).get());
// xack that streamid_1, and streamid_2 was received
assertEquals(2L, client.xack(key, groupName, new String[] {streamid_1, streamid_2}).get());

// Delete the consumer group and expect 1 pending messages (one was received)
assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get());

// xack streamid_1, and streamid_2 already received returns 0L
assertEquals(0L, client.xack(key, groupName, new String[] {streamid_1, streamid_2}).get());

// Consume the last message with the previously deleted consumer (creates the consumer anew)
var result_3 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get();
assertEquals(1, result_3.get(key).size());

// wrong group, so xack streamid_3 returns 0
assertEquals(0L, client.xack(key, "not_a_group", new String[] {streamid_3}).get());

// Delete the consumer group and expect the pending message
assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get());

Expand All @@ -3570,19 +3579,15 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client)
public void xreadgroup_return_failures(BaseClient client) {
String key = "{key}:1" + UUID.randomUUID();
String nonStreamKey = "{key}:3" + UUID.randomUUID();
String field1 = "f1_";
String groupName = "group" + UUID.randomUUID();
String zeroStreamId = "0";
String consumerName = "consumer" + UUID.randomUUID();

// setup first entries in streams key1 and key2
Map<String, String> timestamp_1_1_map = new LinkedHashMap<>();
timestamp_1_1_map.put(field1, field1 + "1");
String timestamp_1_1 =
client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get();
client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get();
assertNotNull(timestamp_1_1);

String groupName = "group" + UUID.randomUUID();
String zeroStreamId = "0";
String consumerName = "consumer" + UUID.randomUUID();

// create group and consumer for the group
assertEquals(
OK,
Expand Down Expand Up @@ -3682,6 +3687,45 @@ public void xreadgroup_return_failures(BaseClient client) {
}
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void xack_return_failures(BaseClient client) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should rename since it doesn't return all failures

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the xack calls return failures?

String key = "{key}:1" + UUID.randomUUID();
String nonStreamKey = "{key}:3" + UUID.randomUUID();
String groupName = "group" + UUID.randomUUID();
String zeroStreamId = "0";
String consumerName = "consumer" + UUID.randomUUID();

// setup first entries in streams key1 and key2
String timestamp_1_1 =
client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get();
assertNotNull(timestamp_1_1);

// create group and consumer for the group
assertEquals(
OK,
client
.xgroupCreate(
key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
.get());
assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get());

// Empty entity id list throws a RequestException
ExecutionException executionException =
assertThrows(
ExecutionException.class, () -> client.xack(key, groupName, new String[0]).get());
assertInstanceOf(RequestException.class, executionException.getCause());

// Key exists, but it is not a stream
assertEquals(OK, client.set(nonStreamKey, "bar").get());
executionException =
assertThrows(
ExecutionException.class,
() -> client.xack(nonStreamKey, groupName, new String[] {zeroStreamId}).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,6 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
final String groupName1 = "{groupName}-1-" + UUID.randomUUID();
final String groupName2 = "{groupName}-2-" + UUID.randomUUID();
final String consumer1 = "{consumer}-1-" + UUID.randomUUID();
final String consumer2 = "{consumer}-2-" + UUID.randomUUID();

transaction
.xadd(streamKey1, Map.of("field1", "value1"), StreamAddOptions.builder().id("0-1").build())
Expand All @@ -755,6 +754,7 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
groupName1,
consumer1,
StreamReadGroupOptions.builder().count(2L).build())
.xack(streamKey1, groupName1, new String[] {"0-3"})
.xgroupDelConsumer(streamKey1, groupName1, consumer1)
.xgroupDestroy(streamKey1, groupName1)
.xgroupDestroy(streamKey1, groupName2)
Expand Down Expand Up @@ -792,7 +792,8 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
Map.of(
streamKey1,
Map.of()), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1, options);
1L, // xgroupDelConsumer(streamKey1, groupName1, consumer1)
1L, // xack(streamKey1, groupName1, new String[] {"0-3"})
0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1)
true, // xgroupDestroy(streamKey1, groupName1)
true, // xgroupDestroy(streamKey1, groupName2)
1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"});
Expand Down
Loading