From e318665f88da26c36d18df9e16e381c5893fb0ff Mon Sep 17 00:00:00 2001 From: Alon Arenberg <93711356+alon-arenberg@users.noreply.github.com> Date: Sun, 7 Jul 2024 12:57:07 +0300 Subject: [PATCH] support xgroup, xread and xreadgroup with GlideString (#1841) * support xgroup with GlideString * replace REDIS_VERSION with SERVER_VERSION * initial commit of supporting xread and xreadgroup with GlideString * suuport xread and xreadgroup with GlideString --- .../src/main/java/glide/api/BaseClient.java | 134 +++- .../api/commands/StreamBaseCommands.java | 271 +++++++- .../commands/stream/StreamAddOptions.java | 3 +- .../stream/StreamAddOptionsBinary.java | 62 ++ .../stream/StreamReadGroupOptions.java | 38 ++ .../commands/stream/StreamReadOptions.java | 40 ++ .../commands/stream/StreamTrimOptions.java | 32 + .../test/java/glide/api/RedisClientTest.java | 383 ++++++++++- .../test/java/glide/SharedCommandTests.java | 594 +++++++++++++++++- 9 files changed, 1511 insertions(+), 46 deletions(-) create mode 100644 java/client/src/main/java/glide/api/models/commands/stream/StreamAddOptionsBinary.java diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 5632f6cf46..f2b56f5c02 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -171,6 +171,7 @@ import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldArgs; import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldGlideStringArgs; import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API; +import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_COUNT_REDIS_API; import static glide.api.models.commands.stream.XInfoStreamOptions.COUNT; import static glide.api.models.commands.stream.XInfoStreamOptions.FULL; @@ -244,6 +245,7 @@ import glide.api.models.commands.scan.ZScanOptions; import glide.api.models.commands.scan.ZScanOptionsBinary; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamAddOptionsBinary; import glide.api.models.commands.stream.StreamClaimOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamPendingOptions; @@ -634,6 +636,25 @@ protected Map> handleXReadResponse(Response resp e -> castMapOf2DArray((Map) e.getValue(), String.class))); } + /** + * @param response A Protobuf response + * @return A map of a map of GlideString[][] + */ + protected Map> handleXReadResponseBinary( + Response response) throws RedisException { + Map mapResponse = handleBinaryStringMapOrNullResponse(response); + if (mapResponse == null) { + return null; + } + return mapResponse.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + castMapOf2DArray( + (Map) e.getValue(), GlideString.class))); + } + @SuppressWarnings("unchecked") // raw Set cast to Set protected Set handleSetResponse(Response response) throws RedisException { return handleRedisResponse(Set.class, EnumSet.of(ResponseFlags.ENCODING_UTF8), response); @@ -2331,7 +2352,7 @@ public CompletableFuture xadd(@NonNull String key, @NonNull Map xadd( @NonNull GlideString key, @NonNull Map values) { - return xadd(key, values, StreamAddOptions.builder().build()); + return xadd(key, values, StreamAddOptionsBinary.builder().build()); } @Override @@ -2347,13 +2368,14 @@ public CompletableFuture xadd( public CompletableFuture xadd( @NonNull GlideString key, @NonNull Map values, - @NonNull StreamAddOptions options) { - String[] toArgsString = options.toArgs(); - GlideString[] toArgs = - Arrays.stream(toArgsString).map(GlideString::gs).toArray(GlideString[]::new); + @NonNull StreamAddOptionsBinary options) { GlideString[] arguments = - ArrayUtils.addAll( - ArrayUtils.addFirst(toArgs, key), convertMapToKeyValueGlideStringArray(values)); + new ArgsBuilder() + .add(key) + .add(options.toArgs()) + .add(convertMapToKeyValueGlideStringArray(values)) + .toArray(); + return commandManager.submitNewCommand(XAdd, arguments, this::handleGlideStringOrNullResponse); } @@ -2363,6 +2385,12 @@ public CompletableFuture>> xread( return xread(keysAndIds, StreamReadOptions.builder().build()); } + @Override + public CompletableFuture>> xreadBinary( + @NonNull Map keysAndIds) { + return xreadBinary(keysAndIds, StreamReadOptions.builder().build()); + } + @Override public CompletableFuture>> xread( @NonNull Map keysAndIds, @NonNull StreamReadOptions options) { @@ -2370,6 +2398,13 @@ public CompletableFuture>> xread( return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponse); } + @Override + public CompletableFuture>> xreadBinary( + @NonNull Map keysAndIds, @NonNull StreamReadOptions options) { + GlideString[] arguments = options.toArgsBinary(keysAndIds); + return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponseBinary); + } + @Override public CompletableFuture xtrim(@NonNull String key, @NonNull StreamTrimOptions options) { String[] arguments = ArrayUtils.addFirst(options.toArgs(), key); @@ -2501,6 +2536,13 @@ public CompletableFuture xgroupCreate( XGroupCreate, new String[] {key, groupName, id}, this::handleStringResponse); } + @Override + public CompletableFuture xgroupCreate( + @NonNull GlideString key, @NonNull GlideString groupName, @NonNull GlideString id) { + return commandManager.submitNewCommand( + XGroupCreate, new GlideString[] {key, groupName, id}, this::handleStringResponse); + } + @Override public CompletableFuture xgroupCreate( @NonNull String key, @@ -2511,12 +2553,30 @@ public CompletableFuture xgroupCreate( return commandManager.submitNewCommand(XGroupCreate, arguments, this::handleStringResponse); } + @Override + public CompletableFuture xgroupCreate( + @NonNull GlideString key, + @NonNull GlideString groupName, + @NonNull GlideString id, + @NonNull StreamGroupOptions options) { + GlideString[] arguments = + new ArgsBuilder().add(key).add(groupName).add(id).add(options.toArgs()).toArray(); + return commandManager.submitNewCommand(XGroupCreate, arguments, this::handleStringResponse); + } + @Override public CompletableFuture xgroupDestroy(@NonNull String key, @NonNull String groupname) { return commandManager.submitNewCommand( XGroupDestroy, new String[] {key, groupname}, this::handleBooleanResponse); } + @Override + public CompletableFuture xgroupDestroy( + @NonNull GlideString key, @NonNull GlideString groupname) { + return commandManager.submitNewCommand( + XGroupDestroy, new GlideString[] {key, groupname}, this::handleBooleanResponse); + } + @Override public CompletableFuture xgroupCreateConsumer( @NonNull String key, @NonNull String group, @NonNull String consumer) { @@ -2524,6 +2584,15 @@ public CompletableFuture xgroupCreateConsumer( XGroupCreateConsumer, new String[] {key, group, consumer}, this::handleBooleanResponse); } + @Override + public CompletableFuture xgroupCreateConsumer( + @NonNull GlideString key, @NonNull GlideString group, @NonNull GlideString consumer) { + return commandManager.submitNewCommand( + XGroupCreateConsumer, + new GlideString[] {key, group, consumer}, + this::handleBooleanResponse); + } + @Override public CompletableFuture xgroupDelConsumer( @NonNull String key, @NonNull String group, @NonNull String consumer) { @@ -2531,6 +2600,13 @@ public CompletableFuture xgroupDelConsumer( XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse); } + @Override + public CompletableFuture xgroupDelConsumer( + @NonNull GlideString key, @NonNull GlideString group, @NonNull GlideString consumer) { + return commandManager.submitNewCommand( + XGroupDelConsumer, new GlideString[] {key, group, consumer}, this::handleLongResponse); + } + @Override public CompletableFuture xgroupSetId( @NonNull String key, @NonNull String groupName, @NonNull String id) { @@ -2538,11 +2614,35 @@ public CompletableFuture xgroupSetId( XGroupSetId, new String[] {key, groupName, id}, this::handleStringResponse); } + @Override + public CompletableFuture xgroupSetId( + @NonNull GlideString key, @NonNull GlideString groupName, @NonNull GlideString id) { + return commandManager.submitNewCommand( + XGroupSetId, new GlideString[] {key, groupName, id}, this::handleStringResponse); + } + @Override public CompletableFuture xgroupSetId( @NonNull String key, @NonNull String groupName, @NonNull String id, long entriesRead) { String[] arguments = - new String[] {key, groupName, id, "ENTRIESREAD", Long.toString(entriesRead)}; + new String[] {key, groupName, id, ENTRIES_READ_VALKEY_API, Long.toString(entriesRead)}; + return commandManager.submitNewCommand(XGroupSetId, arguments, this::handleStringResponse); + } + + @Override + public CompletableFuture xgroupSetId( + @NonNull GlideString key, + @NonNull GlideString groupName, + @NonNull GlideString id, + long entriesRead) { + GlideString[] arguments = + new ArgsBuilder() + .add(key) + .add(groupName) + .add(id) + .add(ENTRIES_READ_VALKEY_API) + .add(entriesRead) + .toArray(); return commandManager.submitNewCommand(XGroupSetId, arguments, this::handleStringResponse); } @@ -2552,6 +2652,14 @@ public CompletableFuture>> xreadgroup( return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build()); } + @Override + public CompletableFuture>> xreadgroup( + @NonNull Map keysAndIds, + @NonNull GlideString group, + @NonNull GlideString consumer) { + return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build()); + } + @Override public CompletableFuture>> xreadgroup( @NonNull Map keysAndIds, @@ -2562,6 +2670,16 @@ public CompletableFuture>> xreadgroup( return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse); } + @Override + public CompletableFuture>> xreadgroup( + @NonNull Map keysAndIds, + @NonNull GlideString group, + @NonNull GlideString consumer, + @NonNull StreamReadGroupOptions options) { + GlideString[] arguments = options.toArgsBinary(group, consumer, keysAndIds); + return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponseBinary); + } + @Override public CompletableFuture xack( @NonNull String key, @NonNull String group, @NonNull String[] ids) { diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index 217ea5b614..b6b652c34e 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -4,6 +4,8 @@ import glide.api.models.GlideString; import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder; +import glide.api.models.commands.stream.StreamAddOptionsBinary; +import glide.api.models.commands.stream.StreamAddOptionsBinary.StreamAddOptionsBinaryBuilder; import glide.api.models.commands.stream.StreamClaimOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamPendingOptions; @@ -88,12 +90,12 @@ public interface StreamBaseCommands { * @param values Field-value pairs to be added to the entry. * @param options Stream add options {@link StreamAddOptions}. * @return The id of the added entry, or null if {@link - * StreamAddOptionsBuilder#makeStream(Boolean)} is set to false and no stream - * with the matching key exists. + * StreamAddOptionsBinaryBuilder#makeStream(Boolean)} is set to false and no + * stream with the matching key exists. * @example *
{@code
      * // Option to use the existing stream, or return null if the stream doesn't already exist at "key"
-     * StreamAddOptions options = StreamAddOptions.builder().id("sid").makeStream(Boolean.FALSE).build();
+     * StreamAddOptionsBinary options = StreamAddOptions.builder().id(gs("sid")).makeStream(Boolean.FALSE).build();
      * String streamId = client.xadd(gs("key"), Map.of(gs("name"), gs("Sara"), gs("surname"), gs("OConnor")), options).get();
      * if (streamId != null) {
      *     assert streamId.equals("sid");
@@ -101,7 +103,7 @@ public interface StreamBaseCommands {
      * }
*/ CompletableFuture xadd( - GlideString key, Map values, StreamAddOptions options); + GlideString key, Map values, StreamAddOptionsBinary options); /** * Reads entries from the given streams. @@ -129,6 +131,33 @@ CompletableFuture xadd( */ CompletableFuture>> xread(Map keysAndIds); + /** + * Reads entries from the given streams. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * @example + *
{@code
+     * Map xreadKeys = Map.of(gs("streamKey"), gs("0-0"));
+     * Map> streamReadResponse = client.xread(xreadKeys).get();
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+ */ + CompletableFuture>> xreadBinary( + Map keysAndIds); + /** * Reads entries from the given streams. * @@ -159,6 +188,36 @@ CompletableFuture xadd( CompletableFuture>> xread( Map keysAndIds, StreamReadOptions options); + /** + * Reads entries from the given streams. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. + * @param options Options detailing how to read the stream {@link StreamReadOptions}. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * @example + *
{@code
+     * // retrieve streamKey entries and block for 1 second if is no stream data
+     * Map xreadKeys = Map.of(gs("streamKey"), gs("0-0"));
+     * StreamReadOptions options = StreamReadOptions.builder().block(1L).build();
+     * Map> streamReadResponse = client.xread(xreadKeys, options).get();
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+ */ + CompletableFuture>> xreadBinary( + Map keysAndIds, StreamReadOptions options); + /** * Trims the stream by evicting older entries. * @@ -585,6 +644,25 @@ CompletableFuture> xrevrange( */ CompletableFuture xgroupCreate(String key, String groupname, String id); + /** + * Creates a new consumer group uniquely identified by groupname for the stream + * stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupname The newly created consumer group name. + * @param id Stream entry ID that specifies the last delivered entry in the stream from the new + * group’s perspective. The special ID "$" can be used to specify the last entry + * in the stream. + * @return OK. + * @example + *
{@code
+     * // Create the consumer group gs("mygroup"), using zero as the starting ID:
+     * assert client.xgroupCreate(gs("mystream"), gs("mygroup"), gs("0-0")).get().equals("OK");
+     * }
+ */ + CompletableFuture xgroupCreate(GlideString key, GlideString groupname, GlideString id); + /** * Creates a new consumer group uniquely identified by groupname for the stream * stored at key. @@ -606,6 +684,27 @@ CompletableFuture> xrevrange( CompletableFuture xgroupCreate( String key, String groupName, String id, StreamGroupOptions options); + /** + * Creates a new consumer group uniquely identified by groupname for the stream + * stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupName The newly created consumer group name. + * @param id Stream entry ID that specifies the last delivered entry in the stream from the new + * group’s perspective. The special ID "$" can be used to specify the last entry + * in the stream. + * @param options The group options {@link StreamGroupOptions}. + * @return OK. + * @example + *
{@code
+     * // Create the consumer group gs("mygroup"), and the stream if it does not exist, after the last ID
+     * assert client.xgroupCreate(gs("mystream"), gs("mygroup"), gs("$"), new StreamGroupOptions(true)).get().equals("OK");
+     * }
+ */ + CompletableFuture xgroupCreate( + GlideString key, GlideString groupName, GlideString id, StreamGroupOptions options); + /** * Destroys the consumer group groupname for the stream stored at key. * @@ -621,6 +720,21 @@ CompletableFuture xgroupCreate( */ CompletableFuture xgroupDestroy(String key, String groupname); + /** + * Destroys the consumer group groupname for the stream stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupname The consumer group name to delete. + * @return true if the consumer group is destroyed. Otherwise, false. + * @example + *
{@code
+     * // Destroys the consumer group gs("mygroup")
+     * assert client.xgroupDestroy(gs("mystream"), gs("mygroup")).get().equals("OK");
+     * }
+ */ + CompletableFuture xgroupDestroy(GlideString key, GlideString groupname); + /** * Creates a consumer named consumer in the consumer group group for the * stream stored at key. @@ -638,6 +752,24 @@ CompletableFuture xgroupCreate( */ CompletableFuture xgroupCreateConsumer(String key, String group, String consumer); + /** + * Creates a consumer named consumer in the consumer group group for the + * stream stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The newly created consumer. + * @return true if the consumer is created. Otherwise, false. + * @example + *
{@code
+     * // Creates the consumer gs("myconsumer") in consumer group gs("mygroup")
+     * assert client.xgroupCreateConsumer(gs("mystream"), gs("mygroup"), gs("myconsumer")).get();
+     * }
+ */ + CompletableFuture xgroupCreateConsumer( + GlideString key, GlideString group, GlideString consumer); + /** * Deletes a consumer named consumer in the consumer group group. * @@ -656,6 +788,25 @@ CompletableFuture xgroupCreate( */ CompletableFuture xgroupDelConsumer(String key, String group, String consumer); + /** + * Deletes a consumer named consumer in the consumer group group. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The consumer to delete. + * @return The number of pending messages the consumer had before it was deleted. + * @example + *
{@code
+     * // Deletes the consumer gs("myconsumer") in consumer group gs("mygroup")
+     * Long pendingMsgCount = client.xgroupDelConsumer(gs("mystream"), gs("mygroup"), gs("myconsumer")).get();
+     * System.out.println("Consumer 'myconsumer' had " +
+     *     + pendingMsgCount + " pending messages unclaimed.");
+     * }
+ */ + CompletableFuture xgroupDelConsumer( + GlideString key, GlideString group, GlideString consumer); + /** * Sets the last delivered ID for a consumer group. * @@ -673,6 +824,23 @@ CompletableFuture xgroupCreate( */ CompletableFuture xgroupSetId(String key, String groupName, String id); + /** + * Sets the last delivered ID for a consumer group. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupName The consumer group name. + * @param id The stream entry ID that should be set as the last delivered ID for the consumer + * group. + * @return OK. + * @example + *
{@code
+     * // Update consumer group gs("mygroup"), to set the last delivered entry ID.
+     * assert client.xgroupSetId(gs("mystream"), gs("mygroup"), gs("0")).get().equals("OK");
+     * }
+ */ + CompletableFuture xgroupSetId(GlideString key, GlideString groupName, GlideString id); + /** * Sets the last delivered ID for a consumer group. * @@ -687,11 +855,31 @@ CompletableFuture xgroupCreate( * @example *
{@code
      * // Update consumer group "mygroup", to set the last delivered entry ID.
-     * assert client.xgroupSetId("mystream", "mygroup", "0", "1-1").get().equals("OK");
+     * assert client.xgroupSetId("mystream", "mygroup", "0", 1L).get().equals("OK");
      * }
*/ CompletableFuture xgroupSetId(String key, String groupName, String id, long entriesRead); + /** + * Sets the last delivered ID for a consumer group. + * + * @since Valkey 7.0 and above + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupName The consumer group name. + * @param id The stream entry ID that should be set as the last delivered ID for the consumer + * group. + * @param entriesRead A value representing the number of stream entries already read by the group. + * @return OK. + * @example + *
{@code
+     * // Update consumer group gs("mygroup"), to set the last delivered entry ID.
+     * assert client.xgroupSetId(gs("mystream"), gs("mygroup"),gs("0"), 1L).get().equals("OK");
+     * }
+ */ + CompletableFuture xgroupSetId( + GlideString key, GlideString groupName, GlideString id, long entriesRead); + /** * Reads entries from the given streams owned by a consumer group. * @@ -726,6 +914,40 @@ CompletableFuture xgroupCreate( CompletableFuture>> xreadgroup( Map keysAndIds, String group, String consumer); + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal gs(">")} to receive only new messages. + * @param group The consumer group name. + * @param consumer The consumer name. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * Returns null if there is no stream that can be served. + * @example + *
{@code
+     * // create a new stream at gs("mystream"), with stream id gs("1-0")
+     * String streamId = client.xadd(gs("mystream"), Map.of(gs("myfield"), gs("mydata")), StreamAddOptionsBinary.builder().id(gs("1-0")).build()).get();
+     * assert client.xgroupCreate(gs("mystream"), gs("mygroup"), gs("0-0")).get().equals("OK"); // create the consumer group gs("mygroup")
+     * Map> streamReadResponse = client.xreadgroup(Map.of(gs("mystream"), gs(">")), gs("mygroup"), gs("myconsumer")).get();
+     * // Returns gs("mystream"): gs("1-0"): {{gs("myfield"), gs("mydata")}}
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+     * 
+ */ + CompletableFuture>> xreadgroup( + Map keysAndIds, GlideString group, GlideString consumer); + /** * Reads entries from the given streams owned by a consumer group. * @@ -765,6 +987,45 @@ CompletableFuture>> xreadgroup( String consumer, StreamReadGroupOptions options); + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal gs(">")} to receive only new messages. + * @param group The consumer group name. + * @param consumer The consumer name. + * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * Returns null if there is no stream that can be served. + * @example + *
{@code
+     * // create a new stream at gs("mystream"), with stream id gs("1-0")
+     * String streamId = client.xadd(gs("mystream"), Map.of(gs("myfield"), gs("mydata")), StreamAddOptionsBinary.builder().id(gs("1-0")).build()).get();
+     * assert client.xgroupCreate(gs("mystream"), gs("mygroup"), gs("0-0")).get().equals("OK"); // create the consumer group gs("mygroup")
+     * StreamReadGroupOptions options = StreamReadGroupOptions.builder().count(1).build(); // retrieves only a single message at a time
+     * Map> streamReadResponse = client.xreadgroup(Map.of(gs("mystream"), gs(">")), gs("mygroup"), gs("myconsumer"), options).get();
+     * // Returns gs("mystream"): gs("1-0"): {{gs("myfield"), gs("mydata")}}
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+     * 
+ */ + CompletableFuture>> xreadgroup( + Map keysAndIds, + GlideString group, + GlideString consumer, + StreamReadGroupOptions options); + /** * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream. * This command should be called on a pending message so that such message does not get processed again. diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamAddOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamAddOptions.java index f7f3c6b317..daeecdd570 100644 --- a/java/client/src/main/java/glide/api/models/commands/stream/StreamAddOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamAddOptions.java @@ -8,8 +8,7 @@ import lombok.Builder; /** - * Optional arguments to {@link StreamBaseCommands#xadd(String, Map, StreamAddOptions)} and {@link - * StreamBaseCommands#xadd(GlideString, Map, StreamAddOptions)} + * Optional arguments to {@link StreamBaseCommands#xadd(String, Map, StreamAddOptions)} * * @see valkey.io */ diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamAddOptionsBinary.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamAddOptionsBinary.java new file mode 100644 index 0000000000..7140185b4e --- /dev/null +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamAddOptionsBinary.java @@ -0,0 +1,62 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.commands.stream; + +import static glide.api.models.GlideString.gs; + +import glide.api.commands.StreamBaseCommands; +import glide.api.models.GlideString; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Builder; + +/** + * Optional arguments to {@link StreamBaseCommands#xadd(GlideString, Map, StreamAddOptionsBinary)} + * + * @see valkey.io + */ +@Builder +public final class StreamAddOptionsBinary { + public static final GlideString NO_MAKE_STREAM_REDIS_API_GLIDE_STRING = gs("NOMKSTREAM"); + public static final GlideString ID_WILDCARD_REDIS_API_GLIDE_STRING = gs("*"); + + /** If set, the new entry will be added with this id. */ + private final GlideString id; + + /** + * If set to false, a new stream won't be created if no stream matches the given key. + *
+ * Equivalent to NOMKSTREAM in the Redis API. + */ + private final Boolean makeStream; + + /** If set, the add operation will also trim the older entries in the stream. */ + private final StreamTrimOptions trim; + + /** + * Converts options for Xadd into a GlideString[]. + * + * @return GlideString[] + */ + public GlideString[] toArgs() { + List optionArgs = new ArrayList<>(); + + if (makeStream != null && !makeStream) { + optionArgs.add(NO_MAKE_STREAM_REDIS_API_GLIDE_STRING); + } + + if (trim != null) { + optionArgs.addAll( + trim.getRedisApi().stream().map(GlideString::gs).collect(Collectors.toList())); + } + + if (id != null) { + optionArgs.add(id); + } else { + optionArgs.add(ID_WILDCARD_REDIS_API_GLIDE_STRING); + } + + return optionArgs.toArray(new GlideString[0]); + } +} diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java index ecb41a2c0e..bc31189fb7 100644 --- a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java @@ -1,6 +1,8 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api.models.commands.stream; +import static glide.api.models.GlideString.gs; + import glide.api.commands.StreamBaseCommands; import glide.api.models.GlideString; import java.util.ArrayList; @@ -12,6 +14,7 @@ /** * Optional arguments for {@link StreamBaseCommands#xreadgroup(Map, String, String, + * StreamReadGroupOptions)} and {@link StreamBaseCommands#xreadgroup(Map, GlideString, GlideString, * StreamReadGroupOptions)} * * @see valkey.io @@ -100,4 +103,39 @@ public String[] toArgs(String group, String consumer, Map stream return optionArgs.toArray(new String[0]); } + + /** + * Converts options and the key-to-id input for {@link StreamBaseCommands#xreadgroupBinary(Map, + * GlideString, GlideString, StreamReadGroupOptions)} into a GlideString[]. + * + * @return GlideString[] + */ + public GlideString[] toArgsBinary( + GlideString group, GlideString consumer, Map streams) { + List optionArgs = new ArrayList<>(); + optionArgs.add(gs(READ_GROUP_REDIS_API)); + optionArgs.add(group); + optionArgs.add(consumer); + + if (this.count != null) { + optionArgs.add(gs(READ_COUNT_REDIS_API)); + optionArgs.add(gs(count.toString())); + } + + if (this.block != null) { + optionArgs.add(gs(READ_BLOCK_REDIS_API)); + optionArgs.add(gs(block.toString())); + } + + if (this.noack) { + optionArgs.add(gs(READ_NOACK_REDIS_API)); + } + + optionArgs.add(gs(READ_STREAMS_REDIS_API)); + Set> entrySet = streams.entrySet(); + optionArgs.addAll(entrySet.stream().map(Map.Entry::getKey).collect(Collectors.toList())); + optionArgs.addAll(entrySet.stream().map(Map.Entry::getValue).collect(Collectors.toList())); + + return optionArgs.toArray(new GlideString[0]); + } } diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java index da6afc6535..5a43e763a8 100644 --- a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java @@ -1,7 +1,10 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api.models.commands.stream; +import static glide.api.models.GlideString.gs; + import glide.api.commands.StreamBaseCommands; +import glide.api.models.GlideString; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -60,6 +63,33 @@ public String[] toArgs(Map streams) { return optionArgs.toArray(new String[0]); } + /** + * Converts options and the key-to-id input for {@link StreamBaseCommands#xread(Map, + * StreamReadOptions)} into a GlideString[]. + * + * @return GlideString[] + */ + public GlideString[] toArgsBinary(Map streams) { + List optionArgs = new ArrayList<>(); + + if (this.count != null) { + optionArgs.add(gs(READ_COUNT_REDIS_API)); + optionArgs.add(gs(count.toString())); + } + + if (this.block != null) { + optionArgs.add(gs(READ_BLOCK_REDIS_API)); + optionArgs.add(gs(block.toString())); + } + + optionArgs.add(gs(READ_STREAMS_REDIS_API)); + Set> entrySet = streams.entrySet(); + optionArgs.addAll(entrySet.stream().map(Map.Entry::getKey).collect(Collectors.toList())); + optionArgs.addAll(entrySet.stream().map(Map.Entry::getValue).collect(Collectors.toList())); + + return optionArgs.toArray(new GlideString[0]); + } + /** * Converts options into a String[]. * @@ -69,4 +99,14 @@ public String[] toArgs() { Map emptyMap = new HashMap<>(); return toArgs(emptyMap); } + + /** + * Converts options into a GlideString[]. + * + * @return GlideString[] + */ + public GlideString[] toArgsBinary() { + Map emptyMap = new HashMap<>(); + return toArgsBinary(emptyMap); + } } diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamTrimOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamTrimOptions.java index f61b40b509..c3dfd5618d 100644 --- a/java/client/src/main/java/glide/api/models/commands/stream/StreamTrimOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamTrimOptions.java @@ -91,6 +91,15 @@ public MinId(@NonNull String threshold) { this.threshold = threshold; } + /** + * Create a trim option to trim stream based on stream ID. + * + * @param threshold Comparison id. + */ + public MinId(@NonNull GlideString threshold) { + this.threshold = threshold.getString(); + } + /** * Create a trim option to trim stream based on stream ID. * @@ -102,6 +111,17 @@ public MinId(boolean exact, @NonNull String threshold) { this.exact = exact; } + /** + * Create a trim option to trim stream based on stream ID. + * + * @param exact Whether to match exactly on the threshold. + * @param threshold Comparison id. + */ + public MinId(boolean exact, @NonNull GlideString threshold) { + this.threshold = threshold.getString(); + this.exact = exact; + } + /** * Create a trim option to trim stream based on stream ID. * @@ -114,6 +134,18 @@ public MinId(@NonNull String threshold, long limit) { this.limit = limit; } + /** + * Create a trim option to trim stream based on stream ID. + * + * @param threshold Comparison id. + * @param limit Max number of stream entries to be trimmed for non-exact match. + */ + public MinId(@NonNull GlideString threshold, long limit) { + this.exact = false; + this.threshold = threshold.getString(); + this.limit = limit; + } + @Override protected String getMethod() { return TRIM_MINID_REDIS_API; diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 06ff9f998b..ebfb49f63a 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -346,6 +346,7 @@ import glide.api.models.commands.scan.ZScanOptions; import glide.api.models.commands.scan.ZScanOptionsBinary; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamAddOptionsBinary; import glide.api.models.commands.stream.StreamClaimOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamPendingOptions; @@ -6506,8 +6507,12 @@ public void xadd_binary_with_nomakestream_maxlen_options_returns_success() { Map fieldValues = new LinkedHashMap<>(); fieldValues.put(gs("testField1"), gs("testValue1")); fieldValues.put(gs("testField2"), gs("testValue2")); - StreamAddOptions options = - StreamAddOptions.builder().id("id").makeStream(false).trim(new MaxLen(true, 5L)).build(); + StreamAddOptionsBinary options = + StreamAddOptionsBinary.builder() + .id(gs("id")) + .makeStream(false) + .trim(new MaxLen(true, 5L)) + .build(); GlideString[] arguments = new GlideString[] { @@ -6742,6 +6747,44 @@ public void xread_multiple_keys() { assertEquals(completedResult, payload); } + @SneakyThrows + @Test + public void xread_binary_multiple_keys() { + // setup + GlideString keyOne = gs("one"); + GlideString streamIdOne = gs("id-one"); + GlideString keyTwo = gs("two"); + GlideString streamIdTwo = gs("id-two"); + GlideString[][] fieldValues = {{gs("field"), gs("value")}}; + Map> completedResult = new LinkedHashMap<>(); + completedResult.put(keyOne, Map.of(streamIdOne, fieldValues)); + completedResult.put(keyTwo, Map.of(streamIdTwo, fieldValues)); + GlideString[] arguments = { + gs(READ_STREAMS_REDIS_API), keyOne, keyTwo, streamIdOne, streamIdTwo + }; + + CompletableFuture>> testResponse = + new CompletableFuture<>(); + testResponse.complete(completedResult); + + // match on protobuf request + when(commandManager.>>submitNewCommand( + eq(XRead), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + Map keysAndIds = new LinkedHashMap<>(); + keysAndIds.put(keyOne, streamIdOne); + keysAndIds.put(keyTwo, streamIdTwo); + CompletableFuture>> response = + service.xreadBinary(keysAndIds); + Map> payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(completedResult, payload); + } + @SneakyThrows @Test public void xread_with_options() { @@ -6784,6 +6827,48 @@ public void xread_with_options() { assertEquals(completedResult, payload); } + @SneakyThrows + @Test + public void xread_with_options_binary() { + // setup + GlideString keyOne = gs("one"); + GlideString streamIdOne = gs("id-one"); + Long block = 2L; + Long count = 10L; + GlideString[][] fieldValues = {{gs("field"), gs("value")}}; + Map> completedResult = + Map.of(keyOne, Map.of(streamIdOne, fieldValues)); + GlideString[] arguments = { + gs(READ_COUNT_REDIS_API), + gs(count.toString()), + gs(READ_BLOCK_REDIS_API), + gs(block.toString()), + gs(READ_STREAMS_REDIS_API), + keyOne, + streamIdOne + }; + + CompletableFuture>> testResponse = + new CompletableFuture<>(); + testResponse.complete(completedResult); + + // match on protobuf request + when(commandManager.>>submitNewCommand( + eq(XRead), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture>> response = + service.xreadBinary( + Map.of(keyOne, streamIdOne), + StreamReadOptions.builder().block(block).count(count).build()); + Map> payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(completedResult, payload); + } + @Test @SneakyThrows public void xdel_returns_success() { @@ -7126,6 +7211,31 @@ public void xgroupCreate() { assertEquals(OK, payload); } + @SneakyThrows + @Test + public void xgroupCreate_binary() { + // setup + GlideString key = gs("testKey"); + GlideString groupName = gs("testGroupName"); + GlideString id = gs("testId"); + GlideString[] arguments = new GlideString[] {key, groupName, id}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(OK); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupCreate), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xgroupCreate(key, groupName, id); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(OK, payload); + } + @SneakyThrows @Test public void xgroupCreate_withOptions() { @@ -7162,6 +7272,42 @@ public void xgroupCreate_withOptions() { assertEquals(OK, payload); } + @SneakyThrows + @Test + public void xgroupCreate_withOptions_binary() { + // setup + GlideString key = gs("testKey"); + GlideString groupName = gs("testGroupName"); + GlideString id = gs("testId"); + Long testEntry = 123L; + StreamGroupOptions options = + StreamGroupOptions.builder().makeStream().entriesRead(testEntry).build(); + GlideString[] arguments = + new ArgsBuilder() + .add(key) + .add(groupName) + .add(id) + .add(MAKE_STREAM_VALKEY_API) + .add(ENTRIES_READ_VALKEY_API) + .add(testEntry) + .toArray(); + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(OK); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupCreate), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xgroupCreate(key, groupName, id, options); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(OK, payload); + } + @SneakyThrows @Test public void xgroupDestroy() { @@ -7186,6 +7332,30 @@ public void xgroupDestroy() { assertEquals(Boolean.TRUE, payload); } + @SneakyThrows + @Test + public void xgroupDestroy_binary() { + // setup + GlideString key = gs("testKey"); + GlideString groupName = gs("testGroupName"); + GlideString[] arguments = new GlideString[] {key, groupName}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(Boolean.TRUE); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupDestroy), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xgroupDestroy(key, groupName); + Boolean payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(Boolean.TRUE, payload); + } + @SneakyThrows @Test public void xgroupCreateConsumer() { @@ -7212,6 +7382,32 @@ public void xgroupCreateConsumer() { assertEquals(Boolean.TRUE, payload); } + @SneakyThrows + @Test + public void xgroupCreateConsumer_binary() { + // setup + GlideString key = gs("testKey"); + GlideString groupName = gs("testGroupName"); + GlideString consumerName = gs("testConsumerName"); + GlideString[] arguments = new GlideString[] {key, groupName, consumerName}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(Boolean.TRUE); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupCreateConsumer), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = + service.xgroupCreateConsumer(key, groupName, consumerName); + Boolean payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(Boolean.TRUE, payload); + } + @SneakyThrows @Test public void xgroupDelConsumer() { @@ -7238,6 +7434,32 @@ public void xgroupDelConsumer() { assertEquals(result, payload); } + @SneakyThrows + @Test + public void xgroupDelConsumer_binary() { + // setup + GlideString key = gs("testKey"); + GlideString groupName = gs("testGroupName"); + GlideString consumerName = gs("testConsumerName"); + GlideString[] arguments = new GlideString[] {key, groupName, consumerName}; + Long result = 28L; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(result); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupDelConsumer), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xgroupDelConsumer(key, groupName, consumerName); + Long payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(result, payload); + } + @SneakyThrows @Test public void xgroupSetid() { @@ -7263,6 +7485,31 @@ public void xgroupSetid() { assertEquals(OK, payload); } + @SneakyThrows + @Test + public void xgroupSetid_binary() { + // setup + GlideString key = gs("testKey"); + GlideString groupName = gs("testGroupName"); + GlideString id = gs("testId"); + GlideString[] arguments = new GlideString[] {key, groupName, id}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(OK); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupSetId), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xgroupSetId(key, groupName, id); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(OK, payload); + } + @SneakyThrows @Test public void xgroupSetidWithEntriesRead() { @@ -7272,7 +7519,40 @@ public void xgroupSetidWithEntriesRead() { String id = "testId"; Long entriesRead = 1L; String[] arguments = - new String[] {key, groupName, id, "ENTRIESREAD", Long.toString(entriesRead)}; + new String[] {key, groupName, id, ENTRIES_READ_VALKEY_API, Long.toString(entriesRead)}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(OK); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupSetId), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xgroupSetId(key, groupName, id, entriesRead); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(OK, payload); + } + + @SneakyThrows + @Test + public void xgroupSetidWithEntriesRead_binary() { + // setup + GlideString key = gs("testKey"); + GlideString groupName = gs("testGroupName"); + GlideString id = gs("testId"); + Long entriesRead = 1L; + GlideString[] arguments = + new ArgsBuilder() + .add(key) + .add(groupName) + .add(id) + .add(ENTRIES_READ_VALKEY_API) + .add(entriesRead) + .toArray(); CompletableFuture testResponse = new CompletableFuture<>(); testResponse.complete(OK); @@ -7337,6 +7617,53 @@ public void xreadgroup_multiple_keys() { assertEquals(completedResult, payload); } + @SneakyThrows + @Test + public void xreadgroup_binary_multiple_keys() { + // setup + GlideString keyOne = gs("one"); + GlideString streamIdOne = gs("id-one"); + GlideString keyTwo = gs("two"); + GlideString streamIdTwo = gs("id-two"); + GlideString groupName = gs("testGroup"); + GlideString consumerName = gs("consumerGroup"); + GlideString[][] fieldValues = {{gs("field"), gs("value")}}; + Map> completedResult = new LinkedHashMap<>(); + completedResult.put(keyOne, Map.of(streamIdOne, fieldValues)); + completedResult.put(keyTwo, Map.of(streamIdTwo, fieldValues)); + GlideString[] arguments = { + gs(READ_GROUP_REDIS_API), + groupName, + consumerName, + gs(READ_STREAMS_REDIS_API), + keyOne, + keyTwo, + streamIdOne, + streamIdTwo + }; + + CompletableFuture>> testResponse = + new CompletableFuture<>(); + testResponse.complete(completedResult); + + // match on protobuf request + when(commandManager.>>submitNewCommand( + eq(XReadGroup), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + Map keysAndIds = new LinkedHashMap<>(); + keysAndIds.put(keyOne, streamIdOne); + keysAndIds.put(keyTwo, streamIdTwo); + CompletableFuture>> response = + service.xreadgroup(keysAndIds, groupName, consumerName); + Map> payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(completedResult, payload); + } + @SneakyThrows @Test public void xreadgroup_with_options() { @@ -7387,6 +7714,56 @@ public void xreadgroup_with_options() { assertEquals(completedResult, payload); } + @SneakyThrows + @Test + public void xreadgroup_with_options_binary() { + // setup + GlideString keyOne = gs("one"); + GlideString streamIdOne = gs("id-one"); + Long block = 2L; + Long count = 10L; + GlideString groupName = gs("testGroup"); + GlideString consumerName = gs("consumerGroup"); + GlideString[][] fieldValues = {{gs("field"), gs("value")}}; + Map> completedResult = + Map.of(keyOne, Map.of(streamIdOne, fieldValues)); + GlideString[] arguments = { + gs(READ_GROUP_REDIS_API), + groupName, + consumerName, + gs(READ_COUNT_REDIS_API), + gs(count.toString()), + gs(READ_BLOCK_REDIS_API), + gs(block.toString()), + gs(READ_NOACK_REDIS_API), + gs(READ_STREAMS_REDIS_API), + keyOne, + streamIdOne + }; + + CompletableFuture>> testResponse = + new CompletableFuture<>(); + testResponse.complete(completedResult); + + // match on protobuf request + when(commandManager.>>submitNewCommand( + eq(XReadGroup), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture>> response = + service.xreadgroup( + Map.of(keyOne, streamIdOne), + groupName, + consumerName, + StreamReadGroupOptions.builder().block(block).count(count).noack().build()); + Map> payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(completedResult, payload); + } + @SneakyThrows @Test public void xack_returns_success() { diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 0139c7134b..0693d3a51a 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -89,6 +89,7 @@ import glide.api.models.commands.scan.ZScanOptions; import glide.api.models.commands.scan.ZScanOptionsBinary; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamAddOptionsBinary; import glide.api.models.commands.stream.StreamClaimOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamPendingOptions; @@ -4974,34 +4975,31 @@ public void xadd_xlen_and_xtrim_binary(BaseClient client) { assertNull( client .xadd( - key.toString(), - Map.of(field1.toString(), "foo0", field2.toString(), "bar0"), - StreamAddOptions.builder().makeStream(Boolean.FALSE).build()) + key, + Map.of(field1, gs("foo0"), field2, gs("bar0")), + StreamAddOptionsBinary.builder().makeStream(Boolean.FALSE).build()) .get()); - String timestamp1 = "0-1"; + GlideString timestamp1 = gs("0-1"); assertEquals( timestamp1, client .xadd( - key.toString(), - Map.of(field1.toString(), "foo1", field2.toString(), "bar1"), - StreamAddOptions.builder().id(timestamp1).build()) + key, + Map.of(field1, gs("foo1"), field2, gs("bar1")), + StreamAddOptionsBinary.builder().id(timestamp1).build()) .get()); - assertNotNull( - client - .xadd(key.toString(), Map.of(field1.toString(), "foo2", field2.toString(), "bar2")) - .get()); + assertNotNull(client.xadd(key, Map.of(field1, gs("foo2"), field2, gs("bar2"))).get()); assertEquals(2L, client.xlen(key).get()); // this will trim the first entry. - String id = + GlideString id = client .xadd( - key.toString(), - Map.of(field1.toString(), "foo3", field2.toString(), "bar3"), - StreamAddOptions.builder().trim(new MaxLen(true, 2L)).build()) + key, + Map.of(field1, gs("foo3"), field2, gs("bar3")), + StreamAddOptionsBinary.builder().trim(new MaxLen(true, 2L)).build()) .get(); assertNotNull(id); assertEquals(2L, client.xlen(key).get()); @@ -5010,9 +5008,9 @@ public void xadd_xlen_and_xtrim_binary(BaseClient client) { assertNotNull( client .xadd( - key.toString(), - Map.of(field1.toString(), "foo4", field2.toString(), "bar4"), - StreamAddOptions.builder().trim(new MinId(true, id)).build()) + key, + Map.of(field1, gs("foo4"), field2, gs("bar4")), + StreamAddOptionsBinary.builder().trim(new MinId(true, id)).build()) .get()); assertEquals(2L, client.xlen(key).get()); @@ -5105,6 +5103,94 @@ public void xread(BaseClient client) { assertDeepEquals(expected_key2, result.get(key2)); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xread_binary(BaseClient client) { + GlideString key1 = gs("{key}:1" + UUID.randomUUID()); + GlideString key2 = gs("{key}:2" + UUID.randomUUID()); + GlideString field1 = gs("f1_"); + GlideString field2 = gs("f2_"); + GlideString field3 = gs("f3_"); + + // setup first entries in streams key1 and key2 + Map timestamp_1_1_map = new LinkedHashMap<>(); + timestamp_1_1_map.put(field1, gs(field1.toString() + "1")); + timestamp_1_1_map.put(field3, gs(field3.toString() + "1")); + GlideString timestamp_1_1 = + client + .xadd(key1, timestamp_1_1_map, StreamAddOptionsBinary.builder().id(gs("1-1")).build()) + .get(); + assertNotNull(timestamp_1_1); + + GlideString timestamp_2_1 = + client + .xadd( + key2, + Map.of(field2, gs(field2.toString() + "1")), + StreamAddOptionsBinary.builder().id(gs("2-1")).build()) + .get(); + assertNotNull(timestamp_2_1); + + // setup second entries in streams key1 and key2 + GlideString timestamp_1_2 = + client + .xadd( + key1, + Map.of(field1, gs(field1.toString() + "2")), + StreamAddOptionsBinary.builder().id(gs("1-2")).build()) + .get(); + assertNotNull(timestamp_1_2); + + GlideString timestamp_2_2 = + client + .xadd( + key2, + Map.of(field2, gs(field2.toString() + "2")), + StreamAddOptionsBinary.builder().id(gs("2-2")).build()) + .get(); + assertNotNull(timestamp_2_2); + + // setup third entries in streams key1 and key2 + Map timestamp_1_3_map = new LinkedHashMap<>(); + timestamp_1_3_map.put(field1, gs(field1.toString() + "3")); + timestamp_1_3_map.put(field3, gs(field3.toString() + "3")); + GlideString timestamp_1_3 = + client + .xadd(key1, timestamp_1_3_map, StreamAddOptionsBinary.builder().id(gs("1-3")).build()) + .get(); + assertNotNull(timestamp_1_3); + + GlideString timestamp_2_3 = + client + .xadd( + key2, + Map.of(field2, gs(field2.toString() + "3")), + StreamAddOptionsBinary.builder().id(gs("2-3")).build()) + .get(); + assertNotNull(timestamp_2_3); + + Map> result = + client.xreadBinary(Map.of(key1, timestamp_1_1, key2, timestamp_2_1)).get(); + + // check key1 + Map expected_key1 = new LinkedHashMap<>(); + expected_key1.put(timestamp_1_2, new GlideString[][] {{field1, gs(field1.toString() + "2")}}); + expected_key1.put( + timestamp_1_3, + new GlideString[][] { + {field1, gs(field1.toString() + "3")}, + {field3, gs(field3.toString() + "3")} + }); + assertDeepEquals(expected_key1, result.get(key1)); + + // check key2 + Map expected_key2 = new LinkedHashMap<>(); + expected_key2.put(timestamp_2_2, new GlideString[][] {{field2, gs(field2.toString() + "2")}}); + expected_key2.put(timestamp_2_3, new GlideString[][] {{field2, gs(field2.toString() + "3")}}); + assertDeepEquals(expected_key2, result.get(key2)); + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") @@ -5159,6 +5245,65 @@ public void xread_return_failures(BaseClient client) { } } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xread_binary_return_failures(BaseClient client) { + GlideString key1 = gs("{key}:1" + UUID.randomUUID()); + GlideString nonStreamKey = gs("{key}:3" + UUID.randomUUID()); + GlideString field1 = gs("f1_"); + + // setup first entries in streams key1 and key2 + Map timestamp_1_1_map = new LinkedHashMap<>(); + timestamp_1_1_map.put(field1, gs(field1.toString() + "1")); + GlideString timestamp_1_1 = + client + .xadd(key1, timestamp_1_1_map, StreamAddOptionsBinary.builder().id(gs("1-1")).build()) + .get(); + assertNotNull(timestamp_1_1); + + // Key exists, but it is not a stream + assertEquals(OK, client.set(nonStreamKey, gs("bar")).get()); + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> + client.xreadBinary(Map.of(nonStreamKey, timestamp_1_1, key1, timestamp_1_1)).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + executionException = + assertThrows( + ExecutionException.class, + () -> + client.xreadBinary(Map.of(key1, timestamp_1_1, nonStreamKey, timestamp_1_1)).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + try (var testClient = + client instanceof RedisClient + ? RedisClient.createClient(commonClientConfig().build()).get() + : RedisClusterClient.createClient(commonClusterClientConfig().build()).get()) { + + // ensure that commands doesn't time out even if timeout > request timeout + long oneSecondInMS = 1000L; + assertNull( + testClient + .xreadBinary( + Map.of(key1, timestamp_1_1), + StreamReadOptions.builder().block(oneSecondInMS).build()) + .get()); + + // with 0 timeout (no timeout) should never time out, + // but we wrap the test with timeout to avoid test failing or stuck forever + assertThrows( + TimeoutException.class, // <- future timeout, not command timeout + () -> + testClient + .xreadBinary( + Map.of(key1, timestamp_1_1), StreamReadOptions.builder().block(0L).build()) + .get(3, TimeUnit.SECONDS)); + } + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") @@ -5361,7 +5506,7 @@ public void xrange_and_xrevrange_binary(BaseClient client) { .xadd( key, Map.of(gs("f1"), gs("foo1"), gs("f2"), gs("bar2")), - StreamAddOptions.builder().id(streamId1).build()) + StreamAddOptionsBinary.builder().id(gs(streamId1)).build()) .get()); assertEquals( gs(streamId2), @@ -5369,7 +5514,7 @@ public void xrange_and_xrevrange_binary(BaseClient client) { .xadd( key, Map.of(gs("f1"), gs("foo1"), gs("f2"), gs("bar2")), - StreamAddOptions.builder().id(streamId2).build()) + StreamAddOptionsBinary.builder().id(gs(streamId2)).build()) .get()); assertEquals(2L, client.xlen(key).get()); @@ -5403,7 +5548,7 @@ public void xrange_and_xrevrange_binary(BaseClient client) { .xadd( key, Map.of(gs("f3"), gs("foo3"), gs("f4"), gs("bar3")), - StreamAddOptions.builder().id(streamId3).build()) + StreamAddOptionsBinary.builder().id(gs(streamId3)).build()) .get()); // get the newest entry @@ -5558,6 +5703,75 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { assertInstanceOf(RequestException.class, executionException.getCause()); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xgroupCreate_binary_xgroupDestroy_binary(BaseClient client) { + GlideString key = gs(UUID.randomUUID().toString()); + GlideString stringKey = gs(UUID.randomUUID().toString()); + GlideString groupName = gs("group" + UUID.randomUUID()); + GlideString streamId = gs("0-1"); + + // Stream not created results in error + Exception executionException = + assertThrows( + ExecutionException.class, () -> client.xgroupCreate(key, groupName, streamId).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + // Stream with option to create creates stream & Group + assertEquals( + OK, + client + .xgroupCreate( + key, groupName, streamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + + // ...and again results in BUSYGROUP error, because group names must be unique + executionException = + assertThrows( + ExecutionException.class, () -> client.xgroupCreate(key, groupName, streamId).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + assertTrue(executionException.getMessage().contains("BUSYGROUP")); + + // Stream Group can be destroyed returns: true + assertEquals(true, client.xgroupDestroy(key, groupName).get()); + + // ...and again results in: false + assertEquals(false, client.xgroupDestroy(key, groupName).get()); + + // ENTRIESREAD option was added in redis 7.0.0 + StreamGroupOptions entriesReadOption = StreamGroupOptions.builder().entriesRead(10L).build(); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0")) { + assertEquals(OK, client.xgroupCreate(key, groupName, streamId, entriesReadOption).get()); + } else { + executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupCreate(key, groupName, streamId, entriesReadOption).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + + // key is a string and cannot be created as a stream + assertEquals(OK, client.set(stringKey, gs("not_a_stream")).get()); + executionException = + assertThrows( + ExecutionException.class, + () -> + client + .xgroupCreate( + stringKey, + groupName, + streamId, + StreamGroupOptions.builder().makeStream().build()) + .get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + executionException = + assertThrows( + ExecutionException.class, () -> client.xgroupDestroy(stringKey, groupName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") @@ -5835,11 +6049,7 @@ public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient cl assertNotNull(streamid_3); // xack that streamid_1, and streamid_2 was received - assertEquals( - 2L, - client - .xack(gs(key), gs(groupName), new GlideString[] {gs(streamid_1), gs(streamid_2)}) - .get()); + assertEquals(2L, client.xack(key, groupName, new String[] {streamid_1, streamid_2}).get()); // Delete the consumer group and expect 1 pending messages (one was received) assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get()); @@ -5852,8 +6062,7 @@ public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient cl assertEquals(1, result_3.get(key).size()); // wrong group, so xack streamid_3 returns 0 - assertEquals( - 0L, client.xack(gs(key), gs("not_a_group"), new GlideString[] {gs(streamid_3)}).get()); + assertEquals(0L, client.xack(key, "not_a_group", new String[] {streamid_3}).get()); // Delete the consumer group and expect the pending message assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get()); @@ -5873,6 +6082,102 @@ public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient cl assertInstanceOf(RequestException.class, executionException.getCause()); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack_binary(BaseClient client) { + GlideString key = gs(UUID.randomUUID().toString()); + GlideString stringKey = gs(UUID.randomUUID().toString()); + GlideString groupName = gs("group" + UUID.randomUUID()); + GlideString zeroStreamId = gs("0"); + GlideString consumerName = gs("consumer" + UUID.randomUUID()); + + // create group and consumer for the group + assertEquals( + OK, + client + .xgroupCreate( + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); + + // create consumer for group that does not exist results in a NOGROUP request error + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupCreateConsumer(key, gs("not_a_group"), consumerName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + assertTrue(executionException.getMessage().contains("NOGROUP")); + + // create consumer for group again + assertFalse(client.xgroupCreateConsumer(key, groupName, consumerName).get()); + + // Deletes a consumer that is not created yet returns 0 + assertEquals(0L, client.xgroupDelConsumer(key, groupName, gs("not_a_consumer")).get()); + + // Add two stream entries + GlideString streamid_1 = client.xadd(key, Map.of(gs("field1"), gs("value1"))).get(); + assertNotNull(streamid_1); + GlideString streamid_2 = client.xadd(key, Map.of(gs("field2"), gs("value2"))).get(); + assertNotNull(streamid_2); + + // read the entire stream for the consumer and mark messages as pending + var result_1 = client.xreadgroup(Map.of(key, gs(">")), groupName, consumerName).get(); + assertDeepEquals( + Map.of( + key, + Map.of( + streamid_1, new GlideString[][] {{gs("field1"), gs("value1")}}, + streamid_2, new GlideString[][] {{gs("field2"), gs("value2")}})), + result_1); + + // delete one of the streams + assertEquals(1L, client.xdel(key, new GlideString[] {streamid_1}).get()); + + // now xreadgroup returns one empty stream and one non-empty stream + var result_2 = client.xreadgroup(Map.of(key, gs("0")), groupName, consumerName).get(); + assertEquals(2, result_2.get(key).size()); + assertNull(result_2.get(key).get(streamid_1)); + assertArrayEquals( + new GlideString[][] {{gs("field2"), gs("value2")}}, result_2.get(key).get(streamid_2)); + + GlideString streamid_3 = client.xadd(key, Map.of(gs("field3"), gs("value3"))).get(); + assertNotNull(streamid_3); + + // xack that streamid_1, and streamid_2 was received + assertEquals(2L, client.xack(key, groupName, new GlideString[] {streamid_1, streamid_2}).get()); + + // Delete the consumer group and expect 1 pending messages (one was received) + assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + + // xack streamid_1, and streamid_2 already received returns 0L + assertEquals(0L, client.xack(key, groupName, new GlideString[] {streamid_1, streamid_2}).get()); + + // Consume the last message with the previously deleted consumer (creates the consumer anew) + var result_3 = client.xreadgroup(Map.of(key, gs(">")), groupName, consumerName).get(); + assertEquals(1, result_3.get(key).size()); + + // wrong group, so xack streamid_3 returns 0 + assertEquals(0L, client.xack(key, gs("not_a_group"), new GlideString[] {streamid_3}).get()); + + // Delete the consumer group and expect the pending message + assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + + // key is a string and cannot be created as a stream + assertEquals(OK, client.set(stringKey, gs("not_a_stream")).get()); + executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupCreateConsumer(stringKey, groupName, consumerName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupDelConsumer(stringKey, groupName, consumerName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") @@ -5959,6 +6264,102 @@ public void xgroupSetId_entriesRead(BaseClient client) { assertInstanceOf(RequestException.class, executionException.getCause()); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xgroupSetId_entriesRead_binary(BaseClient client) { + GlideString key = gs("testKey" + UUID.randomUUID()); + GlideString nonExistingKey = gs("group" + UUID.randomUUID()); + GlideString stringKey = gs("testKey" + UUID.randomUUID()); + GlideString groupName = gs(UUID.randomUUID().toString()); + GlideString consumerName = gs(UUID.randomUUID().toString()); + GlideString streamId0 = gs("0"); + GlideString streamId1_0 = gs("1-0"); + GlideString streamId1_1 = gs("1-1"); + GlideString streamId1_2 = gs("1-2"); + + // Setup: Create stream with 3 entries, create consumer group, read entries to add them to the + // Pending Entries List. + assertEquals( + streamId1_0, + client + .xadd( + key, + Map.of(gs("f0"), gs("v0")), + StreamAddOptionsBinary.builder().id(streamId1_0).build()) + .get()); + assertEquals( + streamId1_1, + client + .xadd( + key, + Map.of(gs("f1"), gs("v1")), + StreamAddOptionsBinary.builder().id(streamId1_1).build()) + .get()); + assertEquals( + streamId1_2, + client + .xadd( + key, + Map.of(gs("f2"), gs("v2")), + StreamAddOptionsBinary.builder().id(streamId1_2).build()) + .get()); + + assertEquals(OK, client.xgroupCreate(key, groupName, streamId0).get()); + + var result = client.xreadgroup(Map.of(key, gs(">")), groupName, consumerName).get(); + assertDeepEquals( + Map.of( + key, + Map.of( + streamId1_0, new GlideString[][] {{gs("f0"), gs("v0")}}, + streamId1_1, new GlideString[][] {{gs("f1"), gs("v1")}}, + streamId1_2, new GlideString[][] {{gs("f2"), gs("v2")}})), + result); + + // Sanity check: xreadgroup should not return more entries since they're all already in the + // Pending Entries List. + assertNull(client.xreadgroup(Map.of(key, gs(">")), groupName, consumerName).get()); + + // Reset the last delivered ID for the consumer group to "1-1". + // ENTRIESREAD is only supported in Redis version 7.0.0 and higher. + if (SERVER_VERSION.isLowerThan("7.0.0")) { + assertEquals(OK, client.xgroupSetId(key, groupName, streamId1_1).get()); + } else { + assertEquals(OK, client.xgroupSetId(key, groupName, streamId1_1, 1L).get()); + } + + // xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1. + result = client.xreadgroup(Map.of(key, gs(">")), groupName, consumerName).get(); + assertDeepEquals( + Map.of(key, Map.of(streamId1_2, new GlideString[][] {{gs("f2"), gs("v2")}})), result); + + // An error is raised if XGROUP SETID is called with a non-existing key. + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupSetId(nonExistingKey, groupName, streamId0).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + // An error is raised if XGROUP SETID is called with a non-existing group. + executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupSetId(key, gs("non_existing_group"), streamId0).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + // Setting the ID to a non-existing ID is allowed + assertEquals("OK", client.xgroupSetId(key, groupName, gs("99-99")).get()); + + // Key exists, but it is not a stream + assertEquals("OK", client.set(stringKey, gs("foo")).get()); + executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupSetId(stringKey, groupName, streamId0).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") @@ -6087,6 +6488,143 @@ public void xreadgroup_return_failures(BaseClient client) { } } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xreadgroup_binary_return_failures(BaseClient client) { + GlideString key = gs("{key}:1" + UUID.randomUUID()); + GlideString nonStreamKey = gs("{key}:3" + UUID.randomUUID()); + GlideString groupName = gs("group" + UUID.randomUUID()); + GlideString zeroStreamId = gs("0"); + GlideString consumerName = gs("consumer" + UUID.randomUUID()); + + // setup first entries in streams key1 and key2 + GlideString timestamp_1_1 = + client + .xadd( + key, + Map.of(gs("f1"), gs("v1")), + StreamAddOptionsBinary.builder().id(gs("1-1")).build()) + .get(); + assertNotNull(timestamp_1_1); + + // create group and consumer for the group + assertEquals( + OK, + client + .xgroupCreate( + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); + + // First key exists, but it is not a stream + assertEquals(OK, client.set(nonStreamKey, gs("bar")).get()); + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> + client + .xreadgroup( + Map.of(nonStreamKey, timestamp_1_1, key, timestamp_1_1), + groupName, + consumerName) + .get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + // Second key exists, but it is not a stream + executionException = + assertThrows( + ExecutionException.class, + () -> + client + .xreadgroup( + Map.of(key, timestamp_1_1, nonStreamKey, timestamp_1_1), + groupName, + consumerName) + .get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + // group doesn't exists, throws a request error with "NOGROUP" + executionException = + assertThrows( + ExecutionException.class, + () -> + client + .xreadgroup(Map.of(key, timestamp_1_1), gs("not_a_group"), consumerName) + .get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + assertTrue(executionException.getMessage().contains("NOGROUP")); + + // consumer doesn't exist and will be created + var emptyResult = + client.xreadgroup(Map.of(key, timestamp_1_1), groupName, gs("non_existing_consumer")).get(); + // no available pending messages + assertEquals(0, emptyResult.get(key).size()); + + try (var testClient = + client instanceof RedisClient + ? RedisClient.createClient(commonClientConfig().build()).get() + : RedisClusterClient.createClient(commonClusterClientConfig().build()).get()) { + GlideString timeoutKey = gs("{key}:2" + UUID.randomUUID()); + GlideString timeoutGroupName = gs("group" + UUID.randomUUID()); + GlideString timeoutConsumerName = gs("consumer" + UUID.randomUUID()); + + // Create a group read with the test client + // add a single stream entry and consumer + // the first call to gs(">") will return an update consumer group + // the second call to gs(">") will block waiting for new entries + // using anything other than gs(">") won't block, but will return the empty consumer result + // see: https://github.com/redis/redis/issues/6587 + assertEquals( + OK, + testClient + .xgroupCreate( + timeoutKey, + timeoutGroupName, + zeroStreamId, + StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue( + testClient.xgroupCreateConsumer(timeoutKey, timeoutGroupName, timeoutConsumerName).get()); + GlideString streamid_1 = + testClient.xadd(timeoutKey, Map.of(gs("field1"), gs("value1"))).get(); + assertNotNull(streamid_1); + + // read the entire stream for the consumer and mark messages as pending + var result_1 = + testClient + .xreadgroup(Map.of(timeoutKey, gs(">")), timeoutGroupName, timeoutConsumerName) + .get(); + // returns a null result on the key + assertNull(result_1.get(key)); + + // subsequent calls to read ">" will block: + // ensure that command doesn't time out even if timeout > request timeout + long oneSecondInMS = 1000L; + assertNull( + testClient + .xreadgroup( + Map.of(timeoutKey, gs(">")), + timeoutGroupName, + timeoutConsumerName, + StreamReadGroupOptions.builder().block(oneSecondInMS).build()) + .get()); + + // with 0 timeout (no timeout) should never time out, + // but we wrap the test with timeout to avoid test failing or stuck forever + assertThrows( + TimeoutException.class, // <- future timeout, not command timeout + () -> + testClient + .xreadgroup( + Map.of(timeoutKey, gs(">")), + timeoutGroupName, + timeoutConsumerName, + StreamReadGroupOptions.builder().block(0L).build()) + .get(3, TimeUnit.SECONDS)); + } + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients")