From f17bc6cf164f202bcae543f7a6871bc1e95c64aa Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Wed, 12 Jun 2024 15:18:12 -0700 Subject: [PATCH 1/4] Java: Add the `XGROUP CREATE` and `XGROUP DESTROY` commands (#359) * JAVA: Add the XGROUP CREATE and DESTROY command Signed-off-by: Andrew Carbonetto * Fix XGROUP DESTROY Signed-off-by: Andrew Carbonetto * Clean up for self-review Signed-off-by: Andrew Carbonetto * cargo fmt Signed-off-by: Andrew Carbonetto * Change builder to use constructors Signed-off-by: Andrew Carbonetto * Add optional javadocs Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto --- glide-core/src/client/value_conversion.rs | 5 +- .../src/main/java/glide/api/BaseClient.java | 26 ++++++ .../api/commands/StreamBaseCommands.java | 70 +++++++++++++-- .../glide/api/models/BaseTransaction.java | 72 +++++++++++++-- .../commands/stream/StreamGroupOptions.java | 89 +++++++++++++++++++ .../test/java/glide/api/RedisClientTest.java | 82 +++++++++++++++++ .../glide/api/models/TransactionTests.java | 18 ++++ .../test/java/glide/SharedCommandTests.java | 62 +++++++++++++ .../java/glide/TransactionTestUtilities.java | 11 +++ 9 files changed, 422 insertions(+), 13 deletions(-) create mode 100644 java/client/src/main/java/glide/api/models/commands/stream/StreamGroupOptions.java diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index 22bd4f7c46..843abd2122 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -700,9 +700,8 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { }), b"INCRBYFLOAT" | b"HINCRBYFLOAT" | b"ZINCRBY" => Some(ExpectedReturnType::Double), b"HEXISTS" | b"HSETNX" | b"EXPIRE" | b"EXPIREAT" | b"PEXPIRE" | b"PEXPIREAT" - | b"SISMEMBER" | b"PERSIST" | b"SMOVE" | b"RENAMENX" | b"MOVE" | b"COPY" => { - Some(ExpectedReturnType::Boolean) - } + | b"SISMEMBER" | b"PERSIST" | b"SMOVE" | b"RENAMENX" | b"MOVE" | b"COPY" + | b"XGROUP DESTROY" => Some(ExpectedReturnType::Boolean), b"SMISMEMBER" => Some(ExpectedReturnType::ArrayOfBools), b"SMEMBERS" | b"SINTER" | b"SDIFF" => Some(ExpectedReturnType::Set), b"ZSCORE" | b"GEODIST" => Some(ExpectedReturnType::DoubleOrNull), diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index fc7fb6889b..5072cb2107 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -115,6 +115,8 @@ import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; +import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; +import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy; import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; @@ -179,6 +181,7 @@ import glide.api.models.commands.geospatial.GeoUnit; import glide.api.models.commands.geospatial.GeospatialData; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; @@ -1358,6 +1361,29 @@ public CompletableFuture> xrevrange( response -> castMapOf2DArray(handleMapResponse(response), String.class)); } + @Override + public CompletableFuture xgroupCreate( + @NonNull String key, @NonNull String groupname, @NonNull String id) { + return commandManager.submitNewCommand( + XGroupCreate, new String[] {key, groupname, id}, this::handleStringResponse); + } + + @Override + public CompletableFuture xgroupCreate( + @NonNull String key, + @NonNull String groupname, + @NonNull String id, + @NonNull StreamGroupOptions options) { + String[] arguments = concatenateArrays(new String[] {key, groupname, id}, options.toArgs()); + return commandManager.submitNewCommand(XGroupCreate, arguments, this::handleStringResponse); + } + + @Override + public CompletableFuture xgroupDestroy(@NonNull String key, @NonNull String groupname) { + return commandManager.submitNewCommand( + XGroupDestroy, new String[] {key, groupname}, this::handleBooleanResponse); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index dbe98bc6b3..c77b455d7a 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -3,6 +3,7 @@ import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder; +import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; @@ -23,7 +24,7 @@ public interface StreamBaseCommands { * Adds an entry to the specified stream stored at key.
* If the key doesn't exist, the stream is created. * - * @see redis.io for details. + * @see valkey.io for details. * @param key The key of the stream. * @param values Field-value pairs to be added to the entry. * @return The id of the added entry. @@ -39,7 +40,7 @@ public interface StreamBaseCommands { * Adds an entry to the specified stream stored at key.
* If the key doesn't exist, the stream is created. * - * @see redis.io for details. + * @see valkey.io for details. * @param key The key of the stream. * @param values Field-value pairs to be added to the entry. * @param options Stream add options {@link StreamAddOptions}. @@ -63,7 +64,7 @@ public interface StreamBaseCommands { * * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash * slot. - * @see redis.io for details. + * @see valkey.io for details. * @param keysAndIds A Map of keys and entry ids to read from. The * Map is composed of a stream's key and the id of the entry after which the stream * will be read. @@ -89,7 +90,7 @@ public interface StreamBaseCommands { * * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash * slot. - * @see redis.io for details. + * @see valkey.io for details. * @param keysAndIds A Map of keys and entry ids to read from. The * Map is composed of a stream's key and the id of the entry after which the stream * will be read. @@ -117,7 +118,7 @@ CompletableFuture>> xread( /** * Trims the stream by evicting older entries. * - * @see redis.io for details. + * @see valkey.io for details. * @param key The key of the stream. * @param options Stream trim options {@link StreamTrimOptions}. * @return The number of entries deleted from the stream. @@ -169,6 +170,7 @@ CompletableFuture>> xread( /** * Returns stream entries matching a given range of IDs. * + * @see valkey.io for details. * @param key The key of the stream. * @param start Starting stream ID bound for range. *
    @@ -205,6 +207,7 @@ CompletableFuture>> xread( /** * Returns stream entries matching a given range of IDs. * + * @see valkey.io for details. * @param key The key of the stream. * @param start Starting stream ID bound for range. *
      @@ -242,6 +245,7 @@ CompletableFuture> xrange( * Equivalent to {@link #xrange(String, StreamRange, StreamRange)} but returns the entries in * reverse order. * + * @see valkey.io for details. * @param key The key of the stream. * @param end Ending stream ID bound for range. *
        @@ -281,6 +285,7 @@ CompletableFuture> xrevrange( * Equivalent to {@link #xrange(String, StreamRange, StreamRange, long)} but returns the entries * in reverse order. * + * @see valkey.io for details. * @param key The key of the stream. * @param end Ending stream ID bound for range. *
          @@ -312,4 +317,59 @@ CompletableFuture> xrevrange( */ CompletableFuture> xrevrange( String key, StreamRange end, StreamRange start, long count); + + /** + * Creates a new consumer group uniquely identified by groupname for the stream + * stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupname The newly created consumer group name. + * @param id Stream ID that specifies the last delivered entry in the stream from the new group’s + * perspective. The special ID "$" can be used to specify the last entry in the + * stream. + * @return OK. + * @example + *
          {@code
          +     * // Create the consumer group "mygroup", using zero as the starting ID:
          +     * assert client.xgroupCreate("mystream", "mygroup", "0-0").get().equals("OK");
          +     * }
          + */ + CompletableFuture xgroupCreate(String key, String groupname, String id); + + /** + * Creates a new consumer group uniquely identified by groupname for the stream + * stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupname The newly created consumer group name. + * @param id Stream ID that specifies the last delivered entry in the stream from the new group’s + * perspective. The special ID "$" can be used to specify the last entry in the + * stream. + * @param options The group options {@link StreamGroupOptions}. + * @return OK. + * @example + *
          {@code
          +     * // Create the consumer group "mygroup", and the stream if it does not exist, after the last ID
          +     * assert client.xgroupCreate("mystream", "mygroup", "$", new StreamGroupOptions(true)).get().equals("OK");
          +     * }
          + */ + CompletableFuture xgroupCreate( + String key, String groupname, String id, StreamGroupOptions options); + + /** + * Destroys the consumer group groupname for the stream stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupname The newly created consumer group name. + * @return true if the consumer group is destroyed. Otherwise, false. + * @example + *
          {@code
          +     * // Destroys the consumer group "mygroup"
          +     * assert client.xgroupDestroy("mystream", "mygroup").get().equals("OK");
          +     * }
          + */ + CompletableFuture xgroupDestroy(String key, String groupname); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 98a89acce9..96e222f3fd 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -140,6 +140,8 @@ import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; +import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; +import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy; import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; @@ -216,6 +218,7 @@ import glide.api.models.commands.geospatial.GeospatialData; import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder; +import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; @@ -2619,7 +2622,7 @@ public T zinterWithScores( * Adds an entry to the specified stream stored at key.
          * If the key doesn't exist, the stream is created. * - * @see redis.io for details. + * @see valkey.io for details. * @param key The key of the stream. * @param values Field-value pairs to be added to the entry. * @return Command Response - The id of the added entry. @@ -2632,7 +2635,7 @@ public T xadd(@NonNull String key, @NonNull Map values) { * Adds an entry to the specified stream stored at key.
          * If the key doesn't exist, the stream is created. * - * @see redis.io for details. + * @see valkey.io for details. * @param key The key of the stream. * @param values Field-value pairs to be added to the entry. * @param options Stream add options {@link StreamAddOptions}. @@ -2653,7 +2656,7 @@ public T xadd( /** * Reads entries from the given streams. * - * @see redis.io for details. + * @see valkey.io for details. * @param keysAndIds An array of Pairs of keys and entry ids to read from. A * pair is composed of a stream's key and the id of the entry after which the stream * will be read. @@ -2667,7 +2670,7 @@ public T xread(@NonNull Map keysAndIds) { /** * Reads entries from the given streams. * - * @see redis.io for details. + * @see valkey.io for details. * @param keysAndIds An array of Pairs of keys and entry ids to read from. A * pair is composed of a stream's key and the id of the entry after which the stream * will be read. @@ -2683,7 +2686,7 @@ public T xread(@NonNull Map keysAndIds, @NonNull StreamReadOptio /** * Trims the stream by evicting older entries. * - * @see redis.io for details. + * @see valkey.io for details. * @param key The key of the stream. * @param options Stream trim options {@link StreamTrimOptions}. * @return Command Response - The number of entries deleted from the stream. @@ -2726,6 +2729,7 @@ public T xdel(@NonNull String key, @NonNull String[] ids) { /** * Returns stream entries matching a given range of IDs. * + * @see valkey.io for details. * @param key The key of the stream. * @param start Starting stream ID bound for range. *
            @@ -2754,6 +2758,7 @@ public T xrange(@NonNull String key, @NonNull StreamRange start, @NonNull Stream /** * Returns stream entries matching a given range of IDs. * + * @see valkey.io for details. * @param key The key of the stream. * @param start Starting stream ID bound for range. *
              @@ -2787,6 +2792,7 @@ public T xrange( * Equivalent to {@link #xrange(String, StreamRange, StreamRange)} but returns the entries in * reverse order. * + * @see valkey.io for details. * @param key The key of the stream. * @param end Ending stream ID bound for range. *
                @@ -2817,6 +2823,7 @@ public T xrevrange(@NonNull String key, @NonNull StreamRange end, @NonNull Strea * Equivalent to {@link #xrange(String, StreamRange, StreamRange, long)} but returns the entries * in reverse order. * + * @see valkey.io for details. * @param key The key of the stream. * @param start Starting stream ID bound for range. *
                  @@ -2845,6 +2852,61 @@ public T xrevrange( return getThis(); } + /** + * Creates a new consumer group uniquely identified by groupname for the stream + * stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupname The newly created consumer group name. + * @param id Stream ID that specifies the last delivered entry in the stream from the new group’s + * perspective. The special ID "$" can be used to specify the last entry in the + * stream. + * @return Command Response - OK. + */ + public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull String id) { + protobufTransaction.addCommands(buildCommand(XGroupCreate, buildArgs(key, groupname, id))); + return getThis(); + } + + /** + * Creates a new consumer group uniquely identified by groupname for the stream + * stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupname The newly created consumer group name. + * @param id Stream ID that specifies the last delivered entry in the stream from the new group’s + * perspective. The special ID "$" can be used to specify the last entry in the + * stream. + * @param options The group options {@link StreamGroupOptions}. + * @return Command Response - OK. + */ + public T xgroupCreate( + @NonNull String key, + @NonNull String groupname, + @NonNull String id, + @NonNull StreamGroupOptions options) { + ArgsArray commandArgs = + buildArgs(concatenateArrays(new String[] {key, groupname, id}, options.toArgs())); + protobufTransaction.addCommands(buildCommand(XGroupCreate, commandArgs)); + return getThis(); + } + + /** + * Destroys the consumer group groupname for the stream stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupname The newly created consumer group name. + * @return Command Response - true if the consumer group is destroyed. Otherwise, + * false. + */ + public T xgroupDestroy(@NonNull String key, @NonNull String groupname) { + protobufTransaction.addCommands(buildCommand(XGroupDestroy, buildArgs(key, groupname))); + return getThis(); + } + /** * Returns the remaining time to live of key that has a timeout, in milliseconds. * diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamGroupOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamGroupOptions.java new file mode 100644 index 0000000000..f1cceeed1e --- /dev/null +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamGroupOptions.java @@ -0,0 +1,89 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.commands.stream; + +import glide.api.commands.StreamBaseCommands; +import java.util.ArrayList; +import java.util.List; + +/** + * Optional arguments for {@link StreamBaseCommands#xgroupCreate(String, String, String, + * StreamGroupOptions)} + * + * @see valkey.io + */ +public final class StreamGroupOptions { + + public static final String MAKE_STREAM_REDIS_API = "MKSTREAM"; + public static final String ENTRIES_READ_REDIS_API = "ENTRIESREAD"; + + /** If the stream doesn't exist, creates a new stream with a length of 0. */ + boolean makeStream; + + /** + * An arbitrary ID (that isn't the first ID, last ID, or the zero "0-0". Use it to + * find out how many entries are between the arbitrary ID (excluding it) and the stream's last + * entry. + * + * @since Redis 7.0.0 + */ + String entriesRead; + + /** + * Options for {@link StreamBaseCommands#xgroupCreate(String, String, String, StreamGroupOptions)} + * + * @param makeStream If the stream doesn't exist, creates a new stream with a length of 0 + * . + */ + public StreamGroupOptions(Boolean makeStream) { + this.makeStream = makeStream; + } + + /** + * Options for {@link StreamBaseCommands#xgroupCreate(String, String, String, StreamGroupOptions)} + * + * @param entriesRead An arbitrary ID that isn't the first ID, last ID, or the zero "0-0" + * . Use it to find out how many entries are between the arbitrary ID (excluding it) + * and the stream's last entry. + * @since ENTRIESREAD was added in Redis 7.0.0. + */ + public StreamGroupOptions(String entriesRead) { + this.makeStream = false; + this.entriesRead = entriesRead; + } + + /** + * Options for {@link StreamBaseCommands#xgroupCreate(String, String, String, StreamGroupOptions)} + * + * @param makeStream If the stream doesn't exist, creates a new stream with a length of 0 + * . + * @param entriesRead An arbitrary ID that isn't the first ID, last ID, or the zero "0-0" + * . Use it to find out how many entries are between the arbitrary ID (excluding it) + * and the stream's last entry. + * @since ENTRIESREAD was added in Redis 7.0.0. + */ + public StreamGroupOptions(Boolean makeStream, String entriesRead) { + this.makeStream = makeStream; + this.entriesRead = entriesRead; + } + + /** + * Converts options and the key-to-id input for {@link StreamBaseCommands#xgroupCreate(String, + * String, String, StreamGroupOptions)} into a String[]. + * + * @return String[] + */ + public String[] toArgs() { + List optionArgs = new ArrayList<>(); + + if (this.makeStream) { + optionArgs.add(MAKE_STREAM_REDIS_API); + } + + if (this.entriesRead != null) { + optionArgs.add(ENTRIES_READ_REDIS_API); + optionArgs.add(this.entriesRead); + } + + return optionArgs.toArray(new String[0]); + } +} diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 825290341f..326a06da83 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -27,6 +27,8 @@ import static glide.api.models.commands.function.FunctionListOptions.WITH_CODE_REDIS_API; import static glide.api.models.commands.geospatial.GeoAddOptions.CHANGED_REDIS_API; import static glide.api.models.commands.stream.StreamAddOptions.NO_MAKE_STREAM_REDIS_API; +import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API; +import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_REDIS_API; import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.MINIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.RANGE_COUNT_REDIS_API; @@ -173,6 +175,8 @@ import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; +import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; +import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy; import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; @@ -244,6 +248,7 @@ import glide.api.models.commands.geospatial.GeoUnit; import glide.api.models.commands.geospatial.GeospatialData; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; @@ -4242,6 +4247,83 @@ public void xrevrange_withcount_returns_success() { assertEquals(completedResult, payload); } + @SneakyThrows + @Test + public void xgroupCreate() { + // setup + String key = "testKey"; + String groupName = "testGroupName"; + String id = "testId"; + String[] arguments = new String[] {key, groupName, id}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(OK); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupCreate), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xgroupCreate(key, groupName, id); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(OK, payload); + } + + @SneakyThrows + @Test + public void xgroupCreate_withOptions() { + // setup + String key = "testKey"; + String groupName = "testGroupName"; + String id = "testId"; + String testEntry = "testEntry"; + StreamGroupOptions options = new StreamGroupOptions(true, testEntry); + String[] arguments = + new String[] {key, groupName, id, MAKE_STREAM_REDIS_API, ENTRIES_READ_REDIS_API, testEntry}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(OK); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupCreate), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xgroupCreate(key, groupName, id, options); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(OK, payload); + } + + @SneakyThrows + @Test + public void xgroupDestroy() { + // setup + String key = "testKey"; + String groupName = "testGroupName"; + String[] arguments = new String[] {key, groupName}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(Boolean.TRUE); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XGroupDestroy), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xgroupDestroy(key, groupName); + Boolean payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(Boolean.TRUE, payload); + } + @SneakyThrows @Test public void type_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 04d4b8f702..6c132ff9be 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -24,6 +24,8 @@ import static glide.api.models.commands.function.FunctionListOptions.LIBRARY_NAME_REDIS_API; import static glide.api.models.commands.function.FunctionListOptions.WITH_CODE_REDIS_API; import static glide.api.models.commands.geospatial.GeoAddOptions.CHANGED_REDIS_API; +import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API; +import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_REDIS_API; import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.MINIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.RANGE_COUNT_REDIS_API; @@ -153,6 +155,8 @@ import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; +import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; +import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy; import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; @@ -217,6 +221,7 @@ import glide.api.models.commands.geospatial.GeoUnit; import glide.api.models.commands.geospatial.GeospatialData; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.InfRangeBound; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions.MinId; @@ -731,6 +736,19 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), RANGE_COUNT_REDIS_API, "99"))); + transaction.xgroupCreate("key", "group", "id"); + results.add(Pair.of(XGroupCreate, buildArgs("key", "group", "id"))); + + transaction.xgroupCreate("key", "group", "id", new StreamGroupOptions(true, "entry")); + results.add( + Pair.of( + XGroupCreate, + buildArgs( + "key", "group", "id", MAKE_STREAM_REDIS_API, ENTRIES_READ_REDIS_API, "entry"))); + + transaction.xgroupDestroy("key", "group"); + results.add(Pair.of(XGroupDestroy, buildArgs("key", "group"))); + transaction.time(); results.add(Pair.of(Time, buildArgs())); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 90c874351a..c757d8c54e 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -65,6 +65,7 @@ import glide.api.models.commands.geospatial.GeoUnit; import glide.api.models.commands.geospatial.GeospatialData; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; import glide.api.models.commands.stream.StreamReadOptions; @@ -3323,6 +3324,67 @@ public void xrange_and_xrevrange(BaseClient client) { assertInstanceOf(RequestException.class, executionException.getCause()); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xgroupCreate_xgroupDestroy(BaseClient client) { + String key = UUID.randomUUID().toString(); + String stringKey = UUID.randomUUID().toString(); + String groupName = "group" + UUID.randomUUID(); + String streamId = "0-1"; + + // Stream not created results in error + Exception executionException = + assertThrows( + ExecutionException.class, () -> client.xgroupCreate(key, groupName, streamId).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + // Stream with option to create creates stream & Group + assertEquals( + OK, client.xgroupCreate(key, groupName, streamId, new StreamGroupOptions(true)).get()); + + // ...and again results in BUSYGROUP error, because group names must be unique + executionException = + assertThrows( + ExecutionException.class, () -> client.xgroupCreate(key, groupName, streamId).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + assertTrue(executionException.getMessage().contains("BUSYGROUP")); + + // Stream Group can be destroyed returns: true + assertEquals(true, client.xgroupDestroy(key, groupName).get()); + + // ...and again results in: false + assertEquals(false, client.xgroupDestroy(key, groupName).get()); + + // ENTRIESREAD option was added in redis 7.0.0 + StreamGroupOptions entriesReadOption = new StreamGroupOptions("10"); + if (REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0")) { + assertEquals(OK, client.xgroupCreate(key, groupName, streamId, entriesReadOption).get()); + } else { + executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupCreate(key, groupName, streamId, entriesReadOption).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + + // key is a string and cannot be created as a stream + assertEquals(OK, client.set(stringKey, "not_a_stream").get()); + executionException = + assertThrows( + ExecutionException.class, + () -> + client + .xgroupCreate(stringKey, groupName, streamId, new StreamGroupOptions(true)) + .get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + executionException = + assertThrows( + ExecutionException.class, () -> client.xgroupDestroy(stringKey, groupName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index 51d5037ae3..76051946aa 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -34,6 +34,7 @@ import glide.api.models.commands.geospatial.GeoUnit; import glide.api.models.commands.geospatial.GeospatialData; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamTrimOptions.MinId; import java.util.HashMap; @@ -659,6 +660,8 @@ private static Object[] hyperLogLogCommands(BaseTransaction transaction) { private static Object[] streamCommands(BaseTransaction transaction) { final String streamKey1 = "{streamKey}-1-" + UUID.randomUUID(); + final String groupName1 = "{groupName}-1-" + UUID.randomUUID(); + final String groupName2 = "{groupName}-2-" + UUID.randomUUID(); transaction .xadd(streamKey1, Map.of("field1", "value1"), StreamAddOptions.builder().id("0-1").build()) @@ -671,6 +674,10 @@ private static Object[] streamCommands(BaseTransaction transaction) { .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1")) .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"), 1L) .xtrim(streamKey1, new MinId(true, "0-2")) + .xgroupCreate(streamKey1, groupName1, "0-0") + .xgroupCreate(streamKey1, groupName2, "0-0", new StreamGroupOptions(true)) + .xgroupDestroy(streamKey1, groupName1) + .xgroupDestroy(streamKey1, groupName2) .xdel(streamKey1, new String[] {"0-3", "0-5"}); return new Object[] { @@ -687,6 +694,10 @@ private static Object[] streamCommands(BaseTransaction transaction) { Map.of( "0-1", new String[][] {{"field1", "value1"}}), // .xrevrange(streamKey1, "0-1", "0-1", 1l) 1L, // xtrim(streamKey1, new MinId(true, "0-2")) + OK, // xgroupCreate(streamKey1, groupName1, "0-0") + OK, // xgroupCreate(streamKey1, groupName1, "0-0", options) + true, // xgroupDestroy(streamKey1, groupName1) + true, // xgroupDestroy(streamKey1, groupName2) 1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"}); }; } From 57ab9908ff5a4d49c8ef9478c11de522aeb94228 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 14 Jun 2024 16:42:23 -0700 Subject: [PATCH 2/4] Update options to use builder Signed-off-by: Andrew Carbonetto --- .../commands/stream/StreamGroupOptions.java | 60 ++++++------------- .../test/java/glide/api/RedisClientTest.java | 3 +- .../glide/api/models/TransactionTests.java | 6 +- .../test/java/glide/SharedCommandTests.java | 7 ++- .../java/glide/TransactionTestUtilities.java | 2 +- 5 files changed, 31 insertions(+), 47 deletions(-) diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamGroupOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamGroupOptions.java index f1cceeed1e..1ed8996907 100644 --- a/java/client/src/main/java/glide/api/models/commands/stream/StreamGroupOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamGroupOptions.java @@ -4,6 +4,7 @@ import glide.api.commands.StreamBaseCommands; import java.util.ArrayList; import java.util.List; +import lombok.Builder; /** * Optional arguments for {@link StreamBaseCommands#xgroupCreate(String, String, String, @@ -11,60 +12,37 @@ * * @see valkey.io */ +@Builder public final class StreamGroupOptions { + // Redis API String argument for makeStream public static final String MAKE_STREAM_REDIS_API = "MKSTREAM"; - public static final String ENTRIES_READ_REDIS_API = "ENTRIESREAD"; - /** If the stream doesn't exist, creates a new stream with a length of 0. */ - boolean makeStream; + // Redis API String argument for entriesRead + public static final String ENTRIES_READ_REDIS_API = "ENTRIESREAD"; /** - * An arbitrary ID (that isn't the first ID, last ID, or the zero "0-0". Use it to - * find out how many entries are between the arbitrary ID (excluding it) and the stream's last - * entry. - * - * @since Redis 7.0.0 + * If true and the stream doesn't exist, creates a new stream with a length of + * 0. */ - String entriesRead; + @Builder.Default private boolean mkStream = false; - /** - * Options for {@link StreamBaseCommands#xgroupCreate(String, String, String, StreamGroupOptions)} - * - * @param makeStream If the stream doesn't exist, creates a new stream with a length of 0 - * . - */ - public StreamGroupOptions(Boolean makeStream) { - this.makeStream = makeStream; - } + public static class StreamGroupOptionsBuilder { - /** - * Options for {@link StreamBaseCommands#xgroupCreate(String, String, String, StreamGroupOptions)} - * - * @param entriesRead An arbitrary ID that isn't the first ID, last ID, or the zero "0-0" - * . Use it to find out how many entries are between the arbitrary ID (excluding it) - * and the stream's last entry. - * @since ENTRIESREAD was added in Redis 7.0.0. - */ - public StreamGroupOptions(String entriesRead) { - this.makeStream = false; - this.entriesRead = entriesRead; + /** If the stream doesn't exist, this creates a new stream with a length of 0. */ + public StreamGroupOptionsBuilder makeStream() { + return mkStream(true); + } } /** - * Options for {@link StreamBaseCommands#xgroupCreate(String, String, String, StreamGroupOptions)} + * An arbitrary ID (that isn't the first ID, last ID, or the zero "0-0". Use it to + * find out how many entries are between the arbitrary ID (excluding it) and the stream's last + * entry. * - * @param makeStream If the stream doesn't exist, creates a new stream with a length of 0 - * . - * @param entriesRead An arbitrary ID that isn't the first ID, last ID, or the zero "0-0" - * . Use it to find out how many entries are between the arbitrary ID (excluding it) - * and the stream's last entry. - * @since ENTRIESREAD was added in Redis 7.0.0. + * @since Redis 7.0.0 */ - public StreamGroupOptions(Boolean makeStream, String entriesRead) { - this.makeStream = makeStream; - this.entriesRead = entriesRead; - } + private String entriesRead; /** * Converts options and the key-to-id input for {@link StreamBaseCommands#xgroupCreate(String, @@ -75,7 +53,7 @@ public StreamGroupOptions(Boolean makeStream, String entriesRead) { public String[] toArgs() { List optionArgs = new ArrayList<>(); - if (this.makeStream) { + if (this.mkStream) { optionArgs.add(MAKE_STREAM_REDIS_API); } diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index bc487c2ec0..dbab49630d 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -4308,7 +4308,8 @@ public void xgroupCreate_withOptions() { String groupName = "testGroupName"; String id = "testId"; String testEntry = "testEntry"; - StreamGroupOptions options = new StreamGroupOptions(true, testEntry); + StreamGroupOptions options = + StreamGroupOptions.builder().makeStream().entriesRead(testEntry).build(); String[] arguments = new String[] {key, groupName, id, MAKE_STREAM_REDIS_API, ENTRIES_READ_REDIS_API, testEntry}; diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 20924b671d..37bef17148 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -744,7 +744,11 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), transaction.xgroupCreate("key", "group", "id"); results.add(Pair.of(XGroupCreate, buildArgs("key", "group", "id"))); - transaction.xgroupCreate("key", "group", "id", new StreamGroupOptions(true, "entry")); + transaction.xgroupCreate( + "key", + "group", + "id", + StreamGroupOptions.builder().makeStream().entriesRead("entry").build()); results.add( Pair.of( XGroupCreate, diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index fa70bd6cea..d86990e711 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3341,7 +3341,8 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { // Stream with option to create creates stream & Group assertEquals( - OK, client.xgroupCreate(key, groupName, streamId, new StreamGroupOptions(true)).get()); + OK, client.xgroupCreate(key, groupName, streamId, StreamGroupOptions.builder().makeStream() + .build()).get()); // ...and again results in BUSYGROUP error, because group names must be unique executionException = @@ -3357,7 +3358,7 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { assertEquals(false, client.xgroupDestroy(key, groupName).get()); // ENTRIESREAD option was added in redis 7.0.0 - StreamGroupOptions entriesReadOption = new StreamGroupOptions("10"); + StreamGroupOptions entriesReadOption = StreamGroupOptions.builder().entriesRead("10").build(); if (REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0")) { assertEquals(OK, client.xgroupCreate(key, groupName, streamId, entriesReadOption).get()); } else { @@ -3375,7 +3376,7 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { ExecutionException.class, () -> client - .xgroupCreate(stringKey, groupName, streamId, new StreamGroupOptions(true)) + .xgroupCreate(stringKey, groupName, streamId, StreamGroupOptions.builder().makeStream().build()) .get()); assertInstanceOf(RequestException.class, executionException.getCause()); diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index ba905d0fc6..be6bb9035b 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -687,7 +687,7 @@ private static Object[] streamCommands(BaseTransaction transaction) { .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"), 1L) .xtrim(streamKey1, new MinId(true, "0-2")) .xgroupCreate(streamKey1, groupName1, "0-0") - .xgroupCreate(streamKey1, groupName2, "0-0", new StreamGroupOptions(true)) + .xgroupCreate(streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build()) .xgroupDestroy(streamKey1, groupName1) .xgroupDestroy(streamKey1, groupName2) .xdel(streamKey1, new String[] {"0-3", "0-5"}); From 57b9c415c0355cce21409500ddade08043a88aa8 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Sat, 15 Jun 2024 23:24:45 -0700 Subject: [PATCH 3/4] SPOTLESS Signed-off-by: Andrew Carbonetto --- .../src/test/java/glide/SharedCommandTests.java | 13 ++++++++++--- .../test/java/glide/TransactionTestUtilities.java | 3 ++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index d86990e711..c7f43ae09d 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3341,8 +3341,11 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { // Stream with option to create creates stream & Group assertEquals( - OK, client.xgroupCreate(key, groupName, streamId, StreamGroupOptions.builder().makeStream() - .build()).get()); + OK, + client + .xgroupCreate( + key, groupName, streamId, StreamGroupOptions.builder().makeStream().build()) + .get()); // ...and again results in BUSYGROUP error, because group names must be unique executionException = @@ -3376,7 +3379,11 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { ExecutionException.class, () -> client - .xgroupCreate(stringKey, groupName, streamId, StreamGroupOptions.builder().makeStream().build()) + .xgroupCreate( + stringKey, + groupName, + streamId, + StreamGroupOptions.builder().makeStream().build()) .get()); assertInstanceOf(RequestException.class, executionException.getCause()); diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index be6bb9035b..1e6e77c225 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -687,7 +687,8 @@ private static Object[] streamCommands(BaseTransaction transaction) { .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"), 1L) .xtrim(streamKey1, new MinId(true, "0-2")) .xgroupCreate(streamKey1, groupName1, "0-0") - .xgroupCreate(streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build()) + .xgroupCreate( + streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build()) .xgroupDestroy(streamKey1, groupName1) .xgroupDestroy(streamKey1, groupName2) .xdel(streamKey1, new String[] {"0-3", "0-5"}); From 5afda49762f04a41fb69188e78daee2642206db6 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Mon, 17 Jun 2024 13:11:50 -0700 Subject: [PATCH 4/4] Update doc Signed-off-by: Andrew Carbonetto --- .../java/glide/api/commands/StreamBaseCommands.java | 12 ++++++------ .../main/java/glide/api/models/BaseTransaction.java | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index c77b455d7a..6aeac8987a 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -325,9 +325,9 @@ CompletableFuture> xrevrange( * @see valkey.io for details. * @param key The key of the stream. * @param groupname The newly created consumer group name. - * @param id Stream ID that specifies the last delivered entry in the stream from the new group’s - * perspective. The special ID "$" can be used to specify the last entry in the - * stream. + * @param id Stream entry ID that specifies the last delivered entry in the stream from the new + * group’s perspective. The special ID "$" can be used to specify the last entry + * in the stream. * @return OK. * @example *
                  {@code
                  @@ -344,9 +344,9 @@ CompletableFuture> xrevrange(
                        * @see valkey.io for details.
                        * @param key The key of the stream.
                        * @param groupname The newly created consumer group name.
                  -     * @param id Stream ID that specifies the last delivered entry in the stream from the new group’s
                  -     *     perspective. The special ID "$" can be used to specify the last entry in the
                  -     *     stream.
                  +     * @param id Stream entry ID that specifies the last delivered entry in the stream from the new
                  +     *     group’s perspective. The special ID "$" can be used to specify the last entry
                  +     *     in the stream.
                        * @param options The group options {@link StreamGroupOptions}.
                        * @return OK.
                        * @example
                  diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java
                  index ab804c7f78..5b9ec40d0d 100644
                  --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java
                  +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java
                  @@ -2878,9 +2878,9 @@ public T xrevrange(
                        * @see valkey.io for details.
                        * @param key The key of the stream.
                        * @param groupname The newly created consumer group name.
                  -     * @param id Stream ID that specifies the last delivered entry in the stream from the new group’s
                  -     *     perspective. The special ID "$" can be used to specify the last entry in the
                  -     *     stream.
                  +     * @param id Stream entry ID that specifies the last delivered entry in the stream from the new
                  +     *     group’s perspective. The special ID "$" can be used to specify the last entry
                  +     *     in the stream.
                        * @return Command Response - OK.
                        */
                       public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull String id) {
                  @@ -2895,9 +2895,9 @@ public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull S
                        * @see valkey.io for details.
                        * @param key The key of the stream.
                        * @param groupname The newly created consumer group name.
                  -     * @param id Stream ID that specifies the last delivered entry in the stream from the new group’s
                  -     *     perspective. The special ID "$" can be used to specify the last entry in the
                  -     *     stream.
                  +     * @param id Stream entry ID that specifies the last delivered entry in the stream from the new
                  +     *     group’s perspective. The special ID "$" can be used to specify the last entry
                  +     *     in the stream.
                        * @param options The group options {@link StreamGroupOptions}.
                        * @return Command Response - OK.
                        */