From 82042a96623af1e75d4d9a6957f7f285a4445b3b Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Wed, 19 Jun 2024 15:34:04 -0700 Subject: [PATCH 1/8] Java: Add the `XREADGROUP` command (#376) * Add XGROUP CreateConsumer, DelConsumer Signed-off-by: Andrew Carbonetto * Add XREADGROUP command Signed-off-by: Andrew Carbonetto * Udpate IT tests Signed-off-by: Andrew Carbonetto * Fix IT tests Signed-off-by: Andrew Carbonetto * SPOTLESS & merge conflict fix Signed-off-by: Andrew Carbonetto * Update for review comments Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto --- .../api/commands/StreamBaseCommands.java | 4 +- .../glide/api/models/BaseTransaction.java | 4 +- .../test/java/glide/SharedCommandTests.java | 204 ++++++++++++++++++ 3 files changed, 208 insertions(+), 4 deletions(-) diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index 1c10c2b992..270878843e 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -422,7 +422,7 @@ CompletableFuture xgroupCreate( * @param consumer The newly created consumer. * @return A {@literal Map>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. - * Returns code>null if the consumer group does not exist. Returns a code>Map with a value of code>null if the stream is empty. + * Returns null if the consumer group does not exist. Returns a code>Map with a value of code>null if the stream is empty. * @example *
{@code
      * // create a new stream at "mystream", with stream id "1-0"
@@ -462,7 +462,7 @@ CompletableFuture>> xreadgroup(
      * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
      * @return A {@literal Map>} with stream
      *      keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
-     *      Returns code>null if the consumer group does not exist. Returns a code>Map with a value of code>null if the stream is empty.
+     *      Returns null if the consumer group does not exist. Returns a code>Map with a value of code>null if the stream is empty.
      * @example
      *     
{@code
      * // create a new stream at "mystream", with stream id "1-0"
diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java
index 1f1255a27c..f20b20fd6f 100644
--- a/java/client/src/main/java/glide/api/models/BaseTransaction.java
+++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java
@@ -3065,7 +3065,7 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull
      * @return Command Response - A {@literal Map>} with
      *     stream keys, to Map of stream-ids, to an array of pairings with format 
      *     [[field, entry], [field, entry], ...].
-     *     Returns code>null if the consumer group does not exist. Returns a code>Map
+     *     Returns null if the consumer group does not exist. Returns a code>Map
      *     with a value of code>null if the stream is empty.
      */
     public T xreadgroup(
@@ -3089,7 +3089,7 @@ public T xreadgroup(
      * @return Command Response - A {@literal Map>} with
      *     stream keys, to Map of stream-ids, to an array of pairings with format 
      *     [[field, entry], [field, entry], ...].
-     *     Returns code>null if the consumer group does not exist. Returns a code>Map
+     *     Returns null if the consumer group does not exist. Returns a code>Map
      *     with a value of code>null if the stream is empty.
      */
     public T xreadgroup(
diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java
index 7ba5ddff22..7c068b6b5f 100644
--- a/java/integTest/src/test/java/glide/SharedCommandTests.java
+++ b/java/integTest/src/test/java/glide/SharedCommandTests.java
@@ -3564,6 +3564,210 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client)
         assertInstanceOf(RequestException.class, executionException.getCause());
     }
 
+    @SneakyThrows
+    @ParameterizedTest(autoCloseArguments = false)
+    @MethodSource("getClients")
+    public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) {
+        String key = UUID.randomUUID().toString();
+        String stringKey = UUID.randomUUID().toString();
+        String groupName = "group" + UUID.randomUUID();
+        String zeroStreamId = "0";
+        String consumerName = "consumer" + UUID.randomUUID();
+
+        // create group and consumer for the group
+        assertEquals(
+                OK,
+                client
+                        .xgroupCreate(
+                                key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
+                        .get());
+        assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get());
+
+        // create consumer for group that does not exist results in a NOGROUP request error
+        ExecutionException executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () -> client.xgroupCreateConsumer(key, "not_a_group", consumerName).get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+        assertTrue(executionException.getMessage().contains("NOGROUP"));
+
+        // create consumer for group again
+        assertFalse(client.xgroupCreateConsumer(key, groupName, consumerName).get());
+
+        // Deletes a consumer that is not created yet returns 0
+        assertEquals(0L, client.xgroupDelConsumer(key, groupName, "not_a_consumer").get());
+
+        // Add two stream entries
+        String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get();
+        assertNotNull(streamid_1);
+        String streamid_2 = client.xadd(key, Map.of("field2", "value2")).get();
+        assertNotNull(streamid_2);
+
+        // read the entire stream for the consumer and mark messages as pending
+        var result_1 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get();
+        assertDeepEquals(
+                Map.of(
+                        key,
+                        Map.of(
+                                streamid_1, new String[][] {{"field1", "value1"}},
+                                streamid_2, new String[][] {{"field2", "value2"}})),
+                result_1);
+
+        // delete one of the streams
+        assertEquals(1L, client.xdel(key, new String[] {streamid_1}).get());
+
+        // now xreadgroup yeilds one empty stream and one non-empty stream
+        var result_2 = client.xreadgroup(Map.of(key, "0"), groupName, consumerName).get();
+        assertEquals(2, result_2.get(key).size());
+        assertNull(result_2.get(key).get(streamid_1));
+        assertArrayEquals(new String[][] {{"field2", "value2"}}, result_2.get(key).get(streamid_2));
+
+        String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get();
+        assertNotNull(streamid_3);
+
+        // Delete the consumer group and expect 2 pending messages
+        assertEquals(2L, client.xgroupDelConsumer(key, groupName, consumerName).get());
+
+        // Consume the last message with the previously deleted consumer (creates the consumer anew)
+        var result_3 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get();
+        assertEquals(1, result_3.get(key).size());
+
+        // Delete the consumer group and expect the pending message
+        assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get());
+
+        // key is a string and cannot be created as a stream
+        assertEquals(OK, client.set(stringKey, "not_a_stream").get());
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () -> client.xgroupCreateConsumer(stringKey, groupName, consumerName).get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () -> client.xgroupDelConsumer(stringKey, groupName, consumerName).get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+    }
+
+    @SneakyThrows
+    @ParameterizedTest(autoCloseArguments = false)
+    @MethodSource("getClients")
+    public void xreadgroup_return_failures(BaseClient client) {
+        String key = "{key}:1" + UUID.randomUUID();
+        String nonStreamKey = "{key}:3" + UUID.randomUUID();
+        String field1 = "f1_";
+
+        // setup first entries in streams key1 and key2
+        Map timestamp_1_1_map = new LinkedHashMap<>();
+        timestamp_1_1_map.put(field1, field1 + "1");
+        String timestamp_1_1 =
+                client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get();
+        assertNotNull(timestamp_1_1);
+
+        String groupName = "group" + UUID.randomUUID();
+        String zeroStreamId = "0";
+        String consumerName = "consumer" + UUID.randomUUID();
+
+        // create group and consumer for the group
+        assertEquals(
+                OK,
+                client
+                        .xgroupCreate(
+                                key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
+                        .get());
+        assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get());
+
+        // First key exists, but it is not a stream
+        assertEquals(OK, client.set(nonStreamKey, "bar").get());
+        ExecutionException executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xreadgroup(
+                                                Map.of(nonStreamKey, timestamp_1_1, key, timestamp_1_1),
+                                                groupName,
+                                                consumerName)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        // Second key exists, but it is not a stream
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xreadgroup(
+                                                Map.of(key, timestamp_1_1, nonStreamKey, timestamp_1_1),
+                                                groupName,
+                                                consumerName)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        try (var testClient =
+                client instanceof RedisClient
+                        ? RedisClient.CreateClient(commonClientConfig().build()).get()
+                        : RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) {
+            String timeoutKey = "{key}:2" + UUID.randomUUID();
+            String timeoutGroupName = "group" + UUID.randomUUID();
+            String timeoutConsumerName = "consumer" + UUID.randomUUID();
+
+            // Create a group read with the test client
+            // add a single stream entry and consumer
+            // the first call to ">" will return an update consumer group
+            // the second call to ">" will block waiting for new entries
+            // using anything other than ">" won't block, but will return the empty consumer result
+            // see: https://github.com/redis/redis/issues/6587
+            assertEquals(
+                    OK,
+                    testClient
+                            .xgroupCreate(
+                                    timeoutKey,
+                                    timeoutGroupName,
+                                    zeroStreamId,
+                                    StreamGroupOptions.builder().makeStream().build())
+                            .get());
+            assertTrue(
+                    testClient.xgroupCreateConsumer(timeoutKey, timeoutGroupName, timeoutConsumerName).get());
+            String streamid_1 = testClient.xadd(timeoutKey, Map.of("field1", "value1")).get();
+            assertNotNull(streamid_1);
+
+            // read the entire stream for the consumer and mark messages as pending
+            var result_1 =
+                    testClient
+                            .xreadgroup(Map.of(timeoutKey, ">"), timeoutGroupName, timeoutConsumerName)
+                            .get();
+            // returns an null result on the key
+            assertNull(result_1.get(key));
+
+            // subsequent calls to read ">" will block:
+            // ensure that command doesn't time out even if timeout > request timeout
+            long oneSecondInMS = 1000L;
+            assertNull(
+                    testClient
+                            .xreadgroup(
+                                    Map.of(timeoutKey, ">"),
+                                    timeoutGroupName,
+                                    timeoutConsumerName,
+                                    StreamReadGroupOptions.builder().block(oneSecondInMS).build())
+                            .get());
+
+            // with 0 timeout (no timeout) should never time out,
+            // but we wrap the test with timeout to avoid test failing or stuck forever
+            assertThrows(
+                    TimeoutException.class, // <- future timeout, not command timeout
+                    () ->
+                            testClient
+                                    .xreadgroup(
+                                            Map.of(timeoutKey, ">"),
+                                            timeoutGroupName,
+                                            timeoutConsumerName,
+                                            StreamReadGroupOptions.builder().block(0L).build())
+                                    .get(3, TimeUnit.SECONDS));
+        }
+    }
+
     @SneakyThrows
     @ParameterizedTest(autoCloseArguments = false)
     @MethodSource("getClients")

From d334f0cffe15ca53eed221711615cafa21b622b4 Mon Sep 17 00:00:00 2001
From: Guian Gumpac 
Date: Mon, 17 Jun 2024 11:11:57 -0700
Subject: [PATCH 2/8] Added implementation and incomplete transactiontests

---
 .../src/main/java/glide/api/BaseClient.java   |  7 +++
 .../api/commands/StreamBaseCommands.java      | 10 ++++
 .../glide/api/models/BaseTransaction.java     | 15 ++++++
 .../test/java/glide/api/RedisClientTest.java  | 27 ++++++++++
 .../glide/api/models/TransactionTests.java    |  4 ++
 .../test/java/glide/SharedCommandTests.java   | 49 +++++++++++++++++++
 .../java/glide/TransactionTestUtilities.java  | 13 ++++-
 7 files changed, 124 insertions(+), 1 deletion(-)

diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java
index 9471efad8c..47da9bc506 100644
--- a/java/client/src/main/java/glide/api/BaseClient.java
+++ b/java/client/src/main/java/glide/api/BaseClient.java
@@ -119,6 +119,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Touch;
 import static redis_request.RedisRequestOuterClass.RequestType.Type;
 import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
+import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.Watch;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
@@ -1447,6 +1448,12 @@ public CompletableFuture>> xreadgroup(
         return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse);
     }
 
+    @Override
+    public CompletableFuture xack(@NonNull String key, @NonNull String groupname, @NonNull String[] ids) {
+        String[] args = concatenateArrays(new String[]{key, groupname}, ids);
+        return commandManager.submitNewCommand(XAck, args, this::handleLongResponse);
+    }
+
     @Override
     public CompletableFuture pttl(@NonNull String key) {
         return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java
index 270878843e..2727edc497 100644
--- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java
+++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java
@@ -493,4 +493,14 @@ CompletableFuture>> xreadgroup(
             String group,
             String consumer,
             StreamReadGroupOptions options);
+
+    /**
+     * TODO
+     *
+     * @param key
+     * @param groupname
+     * @param ids
+     * @return
+     */
+    CompletableFuture xack(String key, String groupname, String[] ids);
 }
diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java
index f20b20fd6f..423a2b172c 100644
--- a/java/client/src/main/java/glide/api/models/BaseTransaction.java
+++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java
@@ -148,6 +148,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Touch;
 import static redis_request.RedisRequestOuterClass.RequestType.Type;
 import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
+import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
@@ -3102,6 +3103,20 @@ public T xreadgroup(
         return getThis();
     }
 
+    /**
+      * TODO
+     *
+     * @param key
+     * @param groupname
+     * @param ids
+     * @return
+     */
+    public T xack(@NonNull String key, @NonNull String groupname, @NonNull String[] ids) {
+        String[] args = concatenateArrays(new String[]{key, groupname}, ids);
+        protobufTransaction.addCommands(buildCommand(XAck, buildArgs(args)));
+        return getThis();
+    }
+
     /**
      * Returns the remaining time to live of key that has a timeout, in milliseconds.
      *
diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java
index b93b321904..41fdcdf618 100644
--- a/java/client/src/test/java/glide/api/RedisClientTest.java
+++ b/java/client/src/test/java/glide/api/RedisClientTest.java
@@ -187,6 +187,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Type;
 import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
 import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
+import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.Watch;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
@@ -4612,6 +4613,32 @@ public void xreadgroup_with_options() {
         assertEquals(completedResult, payload);
     }
 
+    @SneakyThrows
+    @Test
+    public void xack_returns_success() {
+        // setup
+        String key = "testKey";
+        String groupName = "testGroupName";
+        String[] ids = new String[] {"testId"};
+        String[] arguments = concatenateArrays(new String[] {key, groupName}, ids);
+        Long mockResult = 1L;
+
+        CompletableFuture testResponse = new CompletableFuture<>();
+        testResponse.complete(mockResult);
+
+        // match on protobuf request
+        when(commandManager.submitNewCommand(eq(XAck), eq(arguments), any()))
+            .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture response = service.xack(key, groupName, ids);
+        Long payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(mockResult, payload);
+    }
+
     @SneakyThrows
     @Test
     public void type_returns_success() {
diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java
index 2641d424f1..cc52690245 100644
--- a/java/client/src/test/java/glide/api/models/TransactionTests.java
+++ b/java/client/src/test/java/glide/api/models/TransactionTests.java
@@ -164,6 +164,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Touch;
 import static redis_request.RedisRequestOuterClass.RequestType.Type;
 import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
+import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
@@ -818,6 +819,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
                                 "key",
                                 "id")));
 
+        transaction.xack("key", "group", new String[] {"12345-1", "98765-4"});
+        results.add(Pair.of(XAck, buildArgs("key", "group", "12345-1", "98765-4")));
+
         transaction.time();
         results.add(Pair.of(Time, buildArgs()));
 
diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java
index 7c068b6b5f..35884de9cf 100644
--- a/java/integTest/src/test/java/glide/SharedCommandTests.java
+++ b/java/integTest/src/test/java/glide/SharedCommandTests.java
@@ -3886,6 +3886,55 @@ public void xreadgroup_return_failures(BaseClient client) {
         }
     }
 
+    @SneakyThrows
+    @ParameterizedTest(autoCloseArguments = false)
+    @MethodSource("getClients")
+    public void xack(BaseClient client) {
+        String key = "race:italy";
+        String stringKey = UUID.randomUUID().toString();
+        String groupName = "italy_riders";
+        String streamId1 = "0-1";
+        String streamId2 = "0-2";
+        String streamId3 = "0-3";
+        String[] streamIds = new String[] {streamId1, streamId2, streamId3};
+
+        // No messages are acknowledged as nothing is in the stream
+        assertEquals(0, client.xack(key, groupName, streamIds).get());
+
+        // TODO: fails when running the XREADGROUP command
+        if (client instanceof RedisClient) {
+            assertEquals(
+                OK, client.xgroupCreate(key, groupName, "$", StreamGroupOptions.builder().makeStream()
+                    .build()).get());
+            assertNotNull(((RedisClient) client).customCommand(new String[] {"XREADGROUP", "GROUP", groupName, "Alice", "COUNT", "1", "STREAMS", key, ">"}).get());
+            assertEquals(1, client.xack(key, groupName, streamIds).get());
+            // 0 is returned when calling XACK again as the message is removed from the Pending Entries List (PEL)
+            assertEquals(0, client.xack(key, groupName, streamIds).get());
+//            assertEquals(
+//                OK, client.xgroupCreate(key, groupName, "$", StreamGroupOptions.builder().makeStream()
+//                    .build()).get());
+//            streamIds[0] = client.xadd(key, Map.of("rider", "Castilla")).get();
+//
+//            ((RedisClient) client).customCommand(new String[] {"XREADGROUP", "GROUP", groupName, "Alice", "COUNT", "1", "STREAMS", key, ">"}).get();
+//            assertEquals(1, client.xack(key, groupName, streamIds).get());
+        }
+
+
+        // Exceptions
+        // Exception is thrown due to key holding a value with the wrong type
+        client.set(stringKey, "test").get();
+        Exception executionException =
+            assertThrows(
+                ExecutionException.class, () -> client.xack(stringKey, groupName, streamIds).get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        // Exception is thrown due to empty stream ids
+        executionException =
+            assertThrows(
+                ExecutionException.class, () -> client.xack(key, groupName, new String[0]).get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+    }
+
     @SneakyThrows
     @ParameterizedTest(autoCloseArguments = false)
     @MethodSource("getClients")
diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
index 2ead3c6ca3..c6f1d912ad 100644
--- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java
+++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
@@ -758,7 +758,11 @@ private static Object[] streamCommands(BaseTransaction transaction) {
                 .xgroupDelConsumer(streamKey1, groupName1, consumer1)
                 .xgroupDestroy(streamKey1, groupName1)
                 .xgroupDestroy(streamKey1, groupName2)
-                .xdel(streamKey1, new String[] {"0-3", "0-5"});
+                .xdel(streamKey1, new String[] {"0-3", "0-5"})
+                .xgroupCreate(streamKey1, groupName1, "$", StreamGroupOptions.builder().makeStream().build())
+                .xadd(streamKey1, Map.of("rider", "Castilla"), StreamAddOptions.builder().id("1-0").build())
+                .customCommand(new String[] {"XREADGROUP", "GROUP", groupName1, "Alice", "COUNT", "1", "STREAMS", streamKey1, ">"})
+                .xack(streamKey1, groupName1, new String[] {"1-0"});;
 
         return new Object[] {
             "0-1", // xadd(streamKey1, Map.of("field1", "value1"), ... .id("0-1").build());
@@ -796,6 +800,13 @@ private static Object[] streamCommands(BaseTransaction transaction) {
             true, // xgroupDestroy(streamKey1, groupName1)
             true, // xgroupDestroy(streamKey1, groupName2)
             1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"});
+            OK,
+            "1-0",
+            // TODO: fix expected
+            Map.of(
+                streamKey1,
+                Map.of("1-0", new String[][] {{"rider", "bob"}})),
+            1L,
         };
     }
 

From dc49832d9f04c246371665f1aa63fc1e33609731 Mon Sep 17 00:00:00 2001
From: Andrew Carbonetto 
Date: Wed, 19 Jun 2024 15:56:45 -0700
Subject: [PATCH 3/8] Update test

Signed-off-by: Andrew Carbonetto 
---
 .../test/java/glide/SharedCommandTests.java   | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java
index 35884de9cf..571ca38b29 100644
--- a/java/integTest/src/test/java/glide/SharedCommandTests.java
+++ b/java/integTest/src/test/java/glide/SharedCommandTests.java
@@ -3481,7 +3481,7 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) {
     @SneakyThrows
     @ParameterizedTest(autoCloseArguments = false)
     @MethodSource("getClients")
-    public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) {
+    public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient client) {
         String key = UUID.randomUUID().toString();
         String stringKey = UUID.randomUUID().toString();
         String groupName = "group" + UUID.randomUUID();
@@ -3530,22 +3530,29 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client)
         // delete one of the streams
         assertEquals(1L, client.xdel(key, new String[] {streamid_1}).get());
 
-        // now xreadgroup yeilds one empty stream and one non-empty stream
+        // now xreadgroup returns one empty stream and one non-empty stream
         var result_2 = client.xreadgroup(Map.of(key, "0"), groupName, consumerName).get();
         assertEquals(2, result_2.get(key).size());
-        assertNull(result_2.get(key).get(streamid_1));
+        assertNull(result_2.get(key).get(streamid_2));
         assertArrayEquals(new String[][] {{"field2", "value2"}}, result_2.get(key).get(streamid_2));
 
+        // add another stream
         String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get();
         assertNotNull(streamid_3);
 
+        // acknowledge receiving streamid_1
+        assertEquals(1L, client.xack(key, groupName, new String[] {streamid_1}).get());
+
         // Delete the consumer group and expect 2 pending messages
-        assertEquals(2L, client.xgroupDelConsumer(key, groupName, consumerName).get());
+        assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get());
 
         // Consume the last message with the previously deleted consumer (creates the consumer anew)
         var result_3 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get();
         assertEquals(1, result_3.get(key).size());
 
+        // acknowledge receiving streamid_2 and streamid_3 - although streamid_2 was deleted
+        assertEquals(1L, client.xack(key, groupName, new String[] {streamid_2, streamid_3}).get());
+
         // Delete the consumer group and expect the pending message
         assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get());
 
@@ -3889,7 +3896,7 @@ public void xreadgroup_return_failures(BaseClient client) {
     @SneakyThrows
     @ParameterizedTest(autoCloseArguments = false)
     @MethodSource("getClients")
-    public void xack(BaseClient client) {
+    public void xack_return_failures(BaseClient client) {
         String key = "race:italy";
         String stringKey = UUID.randomUUID().toString();
         String groupName = "italy_riders";
@@ -3901,8 +3908,6 @@ public void xack(BaseClient client) {
         // No messages are acknowledged as nothing is in the stream
         assertEquals(0, client.xack(key, groupName, streamIds).get());
 
-        // TODO: fails when running the XREADGROUP command
-        if (client instanceof RedisClient) {
             assertEquals(
                 OK, client.xgroupCreate(key, groupName, "$", StreamGroupOptions.builder().makeStream()
                     .build()).get());

From a4558f7580d9491381b31c6f7670cad289de2f12 Mon Sep 17 00:00:00 2001
From: Andrew Carbonetto 
Date: Wed, 19 Jun 2024 16:56:48 -0700
Subject: [PATCH 4/8] Add XACK test

Signed-off-by: Andrew Carbonetto 
---
 .../src/main/java/glide/api/BaseClient.java   |  7 +-
 .../api/commands/StreamBaseCommands.java      | 20 ++++--
 .../glide/api/models/BaseTransaction.java     | 16 +++--
 .../test/java/glide/api/RedisClientTest.java  |  4 +-
 .../test/java/glide/SharedCommandTests.java   | 68 ++++++-------------
 .../java/glide/TransactionTestUtilities.java  | 18 ++---
 6 files changed, 53 insertions(+), 80 deletions(-)

diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java
index 47da9bc506..66da0b4bf3 100644
--- a/java/client/src/main/java/glide/api/BaseClient.java
+++ b/java/client/src/main/java/glide/api/BaseClient.java
@@ -119,8 +119,8 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Touch;
 import static redis_request.RedisRequestOuterClass.RequestType.Type;
 import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
-import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.Watch;
+import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
@@ -1449,8 +1449,9 @@ public CompletableFuture>> xreadgroup(
     }
 
     @Override
-    public CompletableFuture xack(@NonNull String key, @NonNull String groupname, @NonNull String[] ids) {
-        String[] args = concatenateArrays(new String[]{key, groupname}, ids);
+    public CompletableFuture xack(
+            @NonNull String key, @NonNull String group, @NonNull String[] ids) {
+        String[] args = concatenateArrays(new String[] {key, group}, ids);
         return commandManager.submitNewCommand(XAck, args, this::handleLongResponse);
     }
 
diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java
index 2727edc497..4e69719f56 100644
--- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java
+++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java
@@ -495,12 +495,20 @@ CompletableFuture>> xreadgroup(
             StreamReadGroupOptions options);
 
     /**
-     * TODO
+     * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
+     * This command should be called on a pending message so that such message does not get processed again.
      *
-     * @param key
-     * @param groupname
-     * @param ids
-     * @return
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param ids Stream entry ID to acknowledge and purge messages.
+     * @return The number of messages that were successfully acknowledged.
+     * @example
+     *     
{@code
+     * String streamId = client.xadd("mystream", Map.of("myfield", "mydata")).get();
+     * // read and process messages from streamId
+     * assert 1L == client.xack("mystream", "mygroup", new String[] {streamId}).get();
+     * // messages purged from stream
+     * 
*/ - CompletableFuture xack(String key, String groupname, String[] ids); + CompletableFuture xack(String key, String group, String[] ids); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 423a2b172c..01cabaa189 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -3104,15 +3104,17 @@ public T xreadgroup( } /** - * TODO + * Returns the number of messages that were successfully acknowledged by the consumer group member + * of a stream. This command should be called on a pending message so that such message does not + * get processed again. * - * @param key - * @param groupname - * @param ids - * @return + * @param key The key of the stream. + * @param group The consumer group name. + * @param ids Stream entry ID to acknowledge and purge messages. + * @return Command Response - The number of messages that were successfully acknowledged. */ - public T xack(@NonNull String key, @NonNull String groupname, @NonNull String[] ids) { - String[] args = concatenateArrays(new String[]{key, groupname}, ids); + public T xack(@NonNull String key, @NonNull String group, @NonNull String[] ids) { + String[] args = concatenateArrays(new String[] {key, group}, ids); protobufTransaction.addCommands(buildCommand(XAck, buildArgs(args))); return getThis(); } diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 41fdcdf618..e0d7a7283d 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -187,8 +187,8 @@ import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.UnWatch; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; -import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.Watch; +import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; @@ -4628,7 +4628,7 @@ public void xack_returns_success() { // match on protobuf request when(commandManager.submitNewCommand(eq(XAck), eq(arguments), any())) - .thenReturn(testResponse); + .thenReturn(testResponse); // exercise CompletableFuture response = service.xack(key, groupName, ids); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 571ca38b29..b61d5e353c 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3533,25 +3533,27 @@ public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient cl // now xreadgroup returns one empty stream and one non-empty stream var result_2 = client.xreadgroup(Map.of(key, "0"), groupName, consumerName).get(); assertEquals(2, result_2.get(key).size()); - assertNull(result_2.get(key).get(streamid_2)); + assertNull(result_2.get(key).get(streamid_1)); assertArrayEquals(new String[][] {{"field2", "value2"}}, result_2.get(key).get(streamid_2)); - // add another stream - String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get(); + String streamid_3 = client.xadd(key, Map.of("field3", "field3")).get(); assertNotNull(streamid_3); - // acknowledge receiving streamid_1 - assertEquals(1L, client.xack(key, groupName, new String[] {streamid_1}).get()); + // xack that streamid_1, and streamid_2 was received + assertEquals(2L, client.xack(key, groupName, new String[] {streamid_1, streamid_2}).get()); - // Delete the consumer group and expect 2 pending messages - assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + // Delete the consumer group and expect 1 pending messages (one was received) + assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + + // xack streamid_1, and streamid_2 already received returns 0L + assertEquals(0L, client.xack(key, groupName, new String[] {streamid_1, streamid_2}).get()); // Consume the last message with the previously deleted consumer (creates the consumer anew) var result_3 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get(); assertEquals(1, result_3.get(key).size()); - // acknowledge receiving streamid_2 and streamid_3 - although streamid_2 was deleted - assertEquals(1L, client.xack(key, groupName, new String[] {streamid_2, streamid_3}).get()); + // wrong group, so xack streamid_3 returns 0 + assertEquals(0L, client.xack(key, "not_a_group", new String[] {streamid_3}).get()); // Delete the consumer group and expect the pending message assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get()); @@ -3897,46 +3899,16 @@ public void xreadgroup_return_failures(BaseClient client) { @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") public void xack_return_failures(BaseClient client) { - String key = "race:italy"; - String stringKey = UUID.randomUUID().toString(); - String groupName = "italy_riders"; - String streamId1 = "0-1"; - String streamId2 = "0-2"; - String streamId3 = "0-3"; - String[] streamIds = new String[] {streamId1, streamId2, streamId3}; - - // No messages are acknowledged as nothing is in the stream - assertEquals(0, client.xack(key, groupName, streamIds).get()); - - assertEquals( - OK, client.xgroupCreate(key, groupName, "$", StreamGroupOptions.builder().makeStream() - .build()).get()); - assertNotNull(((RedisClient) client).customCommand(new String[] {"XREADGROUP", "GROUP", groupName, "Alice", "COUNT", "1", "STREAMS", key, ">"}).get()); - assertEquals(1, client.xack(key, groupName, streamIds).get()); - // 0 is returned when calling XACK again as the message is removed from the Pending Entries List (PEL) - assertEquals(0, client.xack(key, groupName, streamIds).get()); -// assertEquals( -// OK, client.xgroupCreate(key, groupName, "$", StreamGroupOptions.builder().makeStream() -// .build()).get()); -// streamIds[0] = client.xadd(key, Map.of("rider", "Castilla")).get(); -// -// ((RedisClient) client).customCommand(new String[] {"XREADGROUP", "GROUP", groupName, "Alice", "COUNT", "1", "STREAMS", key, ">"}).get(); -// assertEquals(1, client.xack(key, groupName, streamIds).get()); - } - - - // Exceptions - // Exception is thrown due to key holding a value with the wrong type - client.set(stringKey, "test").get(); - Exception executionException = - assertThrows( - ExecutionException.class, () -> client.xack(stringKey, groupName, streamIds).get()); - assertInstanceOf(RequestException.class, executionException.getCause()); + String nonStreamKey = "{key}:3" + UUID.randomUUID(); + String groupName = "group" + UUID.randomUUID(); + String zeroStreamId = "0"; - // Exception is thrown due to empty stream ids - executionException = - assertThrows( - ExecutionException.class, () -> client.xack(key, groupName, new String[0]).get()); + // Key exists, but it is not a stream + assertEquals(OK, client.set(nonStreamKey, "bar").get()); + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> client.xack(nonStreamKey, groupName, new String[] {zeroStreamId}).get()); assertInstanceOf(RequestException.class, executionException.getCause()); } diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index c6f1d912ad..d0682de0f5 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -731,7 +731,6 @@ private static Object[] streamCommands(BaseTransaction transaction) { final String groupName1 = "{groupName}-1-" + UUID.randomUUID(); final String groupName2 = "{groupName}-2-" + UUID.randomUUID(); final String consumer1 = "{consumer}-1-" + UUID.randomUUID(); - final String consumer2 = "{consumer}-2-" + UUID.randomUUID(); transaction .xadd(streamKey1, Map.of("field1", "value1"), StreamAddOptions.builder().id("0-1").build()) @@ -755,14 +754,11 @@ private static Object[] streamCommands(BaseTransaction transaction) { groupName1, consumer1, StreamReadGroupOptions.builder().count(2L).build()) + .xack(streamKey1, groupName1, new String[] {"0-3"}) .xgroupDelConsumer(streamKey1, groupName1, consumer1) .xgroupDestroy(streamKey1, groupName1) .xgroupDestroy(streamKey1, groupName2) - .xdel(streamKey1, new String[] {"0-3", "0-5"}) - .xgroupCreate(streamKey1, groupName1, "$", StreamGroupOptions.builder().makeStream().build()) - .xadd(streamKey1, Map.of("rider", "Castilla"), StreamAddOptions.builder().id("1-0").build()) - .customCommand(new String[] {"XREADGROUP", "GROUP", groupName1, "Alice", "COUNT", "1", "STREAMS", streamKey1, ">"}) - .xack(streamKey1, groupName1, new String[] {"1-0"});; + .xdel(streamKey1, new String[] {"0-3", "0-5"}); return new Object[] { "0-1", // xadd(streamKey1, Map.of("field1", "value1"), ... .id("0-1").build()); @@ -796,17 +792,11 @@ private static Object[] streamCommands(BaseTransaction transaction) { Map.of( streamKey1, Map.of()), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1, options); - 1L, // xgroupDelConsumer(streamKey1, groupName1, consumer1) + 1L, // xack(streamKey1, groupName1, new String[] {"0-3"}) + 0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1) true, // xgroupDestroy(streamKey1, groupName1) true, // xgroupDestroy(streamKey1, groupName2) 1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"}); - OK, - "1-0", - // TODO: fix expected - Map.of( - streamKey1, - Map.of("1-0", new String[][] {{"rider", "bob"}})), - 1L, }; } From acdccf23222966116ec4f663d9bc4ddba45967f2 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Wed, 19 Jun 2024 20:42:20 -0700 Subject: [PATCH 5/8] Remove extra test Signed-off-by: Andrew Carbonetto --- .../test/java/glide/SharedCommandTests.java | 118 ------------------ 1 file changed, 118 deletions(-) diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index b61d5e353c..e98cc4fa14 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3777,124 +3777,6 @@ public void xreadgroup_return_failures(BaseClient client) { } } - @SneakyThrows - @ParameterizedTest(autoCloseArguments = false) - @MethodSource("getClients") - public void xreadgroup_return_failures(BaseClient client) { - String key = "{key}:1" + UUID.randomUUID(); - String nonStreamKey = "{key}:3" + UUID.randomUUID(); - String field1 = "f1_"; - - // setup first entries in streams key1 and key2 - Map timestamp_1_1_map = new LinkedHashMap<>(); - timestamp_1_1_map.put(field1, field1 + "1"); - String timestamp_1_1 = - client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get(); - assertNotNull(timestamp_1_1); - - String groupName = "group" + UUID.randomUUID(); - String zeroStreamId = "0"; - String consumerName = "consumer" + UUID.randomUUID(); - - // create group and consumer for the group - assertEquals( - OK, - client - .xgroupCreate( - key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) - .get()); - assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); - - // First key exists, but it is not a stream - assertEquals(OK, client.set(nonStreamKey, "bar").get()); - ExecutionException executionException = - assertThrows( - ExecutionException.class, - () -> - client - .xreadgroup( - Map.of(nonStreamKey, timestamp_1_1, key, timestamp_1_1), - groupName, - consumerName) - .get()); - assertInstanceOf(RequestException.class, executionException.getCause()); - - // Second key exists, but it is not a stream - executionException = - assertThrows( - ExecutionException.class, - () -> - client - .xreadgroup( - Map.of(key, timestamp_1_1, nonStreamKey, timestamp_1_1), - groupName, - consumerName) - .get()); - assertInstanceOf(RequestException.class, executionException.getCause()); - - try (var testClient = - client instanceof RedisClient - ? RedisClient.CreateClient(commonClientConfig().build()).get() - : RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { - String timeoutKey = "{key}:2" + UUID.randomUUID(); - String timeoutGroupName = "group" + UUID.randomUUID(); - String timeoutConsumerName = "consumer" + UUID.randomUUID(); - - // Create a group read with the test client - // add a single stream entry and consumer - // the first call to ">" will return an update consumer group - // the second call to ">" will block waiting for new entries - // using anything other than ">" won't block, but will return the empty consumer result - // see: https://github.com/redis/redis/issues/6587 - assertEquals( - OK, - testClient - .xgroupCreate( - timeoutKey, - timeoutGroupName, - zeroStreamId, - StreamGroupOptions.builder().makeStream().build()) - .get()); - assertTrue( - testClient.xgroupCreateConsumer(timeoutKey, timeoutGroupName, timeoutConsumerName).get()); - String streamid_1 = testClient.xadd(timeoutKey, Map.of("field1", "value1")).get(); - assertNotNull(streamid_1); - - // read the entire stream for the consumer and mark messages as pending - var result_1 = - testClient - .xreadgroup(Map.of(timeoutKey, ">"), timeoutGroupName, timeoutConsumerName) - .get(); - // returns an null result on the key - assertNull(result_1.get(key)); - - // subsequent calls to read ">" will block: - // ensure that command doesn't time out even if timeout > request timeout - long oneSecondInMS = 1000L; - assertNull( - testClient - .xreadgroup( - Map.of(timeoutKey, ">"), - timeoutGroupName, - timeoutConsumerName, - StreamReadGroupOptions.builder().block(oneSecondInMS).build()) - .get()); - - // with 0 timeout (no timeout) should never time out, - // but we wrap the test with timeout to avoid test failing or stuck forever - assertThrows( - TimeoutException.class, // <- future timeout, not command timeout - () -> - testClient - .xreadgroup( - Map.of(timeoutKey, ">"), - timeoutGroupName, - timeoutConsumerName, - StreamReadGroupOptions.builder().block(0L).build()) - .get(3, TimeUnit.SECONDS)); - } - } - @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") From 9822432abc0b922dd6718882f63ea3e97bbf67cf Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Wed, 19 Jun 2024 20:46:17 -0700 Subject: [PATCH 6/8] More merge conflict cleanup Signed-off-by: Andrew Carbonetto --- .../api/commands/StreamBaseCommands.java | 6 +- .../test/java/glide/SharedCommandTests.java | 86 ------------------- 2 files changed, 3 insertions(+), 89 deletions(-) diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index 4e69719f56..ff953fb8de 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -504,10 +504,10 @@ CompletableFuture>> xreadgroup( * @return The number of messages that were successfully acknowledged. * @example *
{@code
-     * String streamId = client.xadd("mystream", Map.of("myfield", "mydata")).get();
+     * String entryId = client.xadd("mystream", Map.of("myfield", "mydata")).get();
      * // read and process messages from streamId
-     * assert 1L == client.xack("mystream", "mygroup", new String[] {streamId}).get();
-     * // messages purged from stream
+     * assert 1L == client.xack("mystream", "mygroup", new String[] {entryId}).get();
+     * // message purged from stream
      * 
*/ CompletableFuture xack(String key, String group, String[] ids); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index e98cc4fa14..f878976345 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3573,92 +3573,6 @@ public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient cl assertInstanceOf(RequestException.class, executionException.getCause()); } - @SneakyThrows - @ParameterizedTest(autoCloseArguments = false) - @MethodSource("getClients") - public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) { - String key = UUID.randomUUID().toString(); - String stringKey = UUID.randomUUID().toString(); - String groupName = "group" + UUID.randomUUID(); - String zeroStreamId = "0"; - String consumerName = "consumer" + UUID.randomUUID(); - - // create group and consumer for the group - assertEquals( - OK, - client - .xgroupCreate( - key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) - .get()); - assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); - - // create consumer for group that does not exist results in a NOGROUP request error - ExecutionException executionException = - assertThrows( - ExecutionException.class, - () -> client.xgroupCreateConsumer(key, "not_a_group", consumerName).get()); - assertInstanceOf(RequestException.class, executionException.getCause()); - assertTrue(executionException.getMessage().contains("NOGROUP")); - - // create consumer for group again - assertFalse(client.xgroupCreateConsumer(key, groupName, consumerName).get()); - - // Deletes a consumer that is not created yet returns 0 - assertEquals(0L, client.xgroupDelConsumer(key, groupName, "not_a_consumer").get()); - - // Add two stream entries - String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get(); - assertNotNull(streamid_1); - String streamid_2 = client.xadd(key, Map.of("field2", "value2")).get(); - assertNotNull(streamid_2); - - // read the entire stream for the consumer and mark messages as pending - var result_1 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get(); - assertDeepEquals( - Map.of( - key, - Map.of( - streamid_1, new String[][] {{"field1", "value1"}}, - streamid_2, new String[][] {{"field2", "value2"}})), - result_1); - - // delete one of the streams - assertEquals(1L, client.xdel(key, new String[] {streamid_1}).get()); - - // now xreadgroup yeilds one empty stream and one non-empty stream - var result_2 = client.xreadgroup(Map.of(key, "0"), groupName, consumerName).get(); - assertEquals(2, result_2.get(key).size()); - assertNull(result_2.get(key).get(streamid_1)); - assertArrayEquals(new String[][] {{"field2", "value2"}}, result_2.get(key).get(streamid_2)); - - String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get(); - assertNotNull(streamid_3); - - // Delete the consumer group and expect 2 pending messages - assertEquals(2L, client.xgroupDelConsumer(key, groupName, consumerName).get()); - - // Consume the last message with the previously deleted consumer (creates the consumer anew) - var result_3 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get(); - assertEquals(1, result_3.get(key).size()); - - // Delete the consumer group and expect the pending message - assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get()); - - // key is a string and cannot be created as a stream - assertEquals(OK, client.set(stringKey, "not_a_stream").get()); - executionException = - assertThrows( - ExecutionException.class, - () -> client.xgroupCreateConsumer(stringKey, groupName, consumerName).get()); - assertInstanceOf(RequestException.class, executionException.getCause()); - - executionException = - assertThrows( - ExecutionException.class, - () -> client.xgroupDelConsumer(stringKey, groupName, consumerName).get()); - assertInstanceOf(RequestException.class, executionException.getCause()); - } - @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") From 387c74d1a42618cbd66d6f4625ce8e7d643c7855 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Thu, 20 Jun 2024 09:46:32 -0700 Subject: [PATCH 7/8] Process review comments Signed-off-by: Andrew Carbonetto --- .../api/commands/StreamBaseCommands.java | 5 +-- .../test/java/glide/SharedCommandTests.java | 33 ++++++++++++++----- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index ff953fb8de..018bf1edb3 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -505,9 +505,10 @@ CompletableFuture>> xreadgroup( * @example *
{@code
      * String entryId = client.xadd("mystream", Map.of("myfield", "mydata")).get();
-     * // read and process messages from streamId
+     * // read messages from streamId
+     * var readResult = client.xreadgroup(Map.of("mystream", entryId), "mygroup", "my0consumer").get();
+     * // acknowledge messages on stream
      * assert 1L == client.xack("mystream", "mygroup", new String[] {entryId}).get();
-     * // message purged from stream
      * 
*/ CompletableFuture xack(String key, String group, String[] ids); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index f878976345..73e488ccde 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3536,7 +3536,7 @@ public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient cl assertNull(result_2.get(key).get(streamid_1)); assertArrayEquals(new String[][] {{"field2", "value2"}}, result_2.get(key).get(streamid_2)); - String streamid_3 = client.xadd(key, Map.of("field3", "field3")).get(); + String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get(); assertNotNull(streamid_3); // xack that streamid_1, and streamid_2 was received @@ -3579,19 +3579,15 @@ public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient cl public void xreadgroup_return_failures(BaseClient client) { String key = "{key}:1" + UUID.randomUUID(); String nonStreamKey = "{key}:3" + UUID.randomUUID(); - String field1 = "f1_"; + String groupName = "group" + UUID.randomUUID(); + String zeroStreamId = "0"; + String consumerName = "consumer" + UUID.randomUUID(); // setup first entries in streams key1 and key2 - Map timestamp_1_1_map = new LinkedHashMap<>(); - timestamp_1_1_map.put(field1, field1 + "1"); String timestamp_1_1 = - client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get(); + client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get(); assertNotNull(timestamp_1_1); - String groupName = "group" + UUID.randomUUID(); - String zeroStreamId = "0"; - String consumerName = "consumer" + UUID.randomUUID(); - // create group and consumer for the group assertEquals( OK, @@ -3695,9 +3691,28 @@ public void xreadgroup_return_failures(BaseClient client) { @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") public void xack_return_failures(BaseClient client) { + String key = "{key}:1" + UUID.randomUUID(); String nonStreamKey = "{key}:3" + UUID.randomUUID(); String groupName = "group" + UUID.randomUUID(); String zeroStreamId = "0"; + String consumerName = "consumer" + UUID.randomUUID(); + + // setup first entries in streams key1 and key2 + String timestamp_1_1 = + client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get(); + assertNotNull(timestamp_1_1); + + // create group and consumer for the group + assertEquals( + OK, + client + .xgroupCreate( + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); + + // Empty entity id list + assertNull(client.xack(key, groupName, new String[0]).get()); // Key exists, but it is not a stream assertEquals(OK, client.set(nonStreamKey, "bar").get()); From f7cd353dd42bd29a91efc530ce9e06a627b66e7d Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Thu, 20 Jun 2024 11:52:31 -0700 Subject: [PATCH 8/8] XACK add failure tests Signed-off-by: Andrew Carbonetto --- .../test/java/glide/SharedCommandTests.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 73e488ccde..4a903f3e6d 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3699,24 +3699,27 @@ public void xack_return_failures(BaseClient client) { // setup first entries in streams key1 and key2 String timestamp_1_1 = - client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get(); + client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get(); assertNotNull(timestamp_1_1); // create group and consumer for the group assertEquals( - OK, - client - .xgroupCreate( - key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) - .get()); + OK, + client + .xgroupCreate( + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); - // Empty entity id list - assertNull(client.xack(key, groupName, new String[0]).get()); + // Empty entity id list throws a RequestException + ExecutionException executionException = + assertThrows( + ExecutionException.class, () -> client.xack(key, groupName, new String[0]).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); // Key exists, but it is not a stream assertEquals(OK, client.set(nonStreamKey, "bar").get()); - ExecutionException executionException = + executionException = assertThrows( ExecutionException.class, () -> client.xack(nonStreamKey, groupName, new String[] {zeroStreamId}).get());