diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index 43c2e74bd0..6b8e93a69a 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -791,6 +791,7 @@ fn convert_to_array_of_pairs( value_expected_return_type: Option, ) -> RedisResult { match response { + Value::Nil => Ok(response), Value::Array(ref array) if array.is_empty() || matches!(array[0], Value::Array(_)) => { // The server response is an empty array or a RESP3 array of pairs. In RESP3, the values in the pairs are // already of the correct type, so we do not need to convert them and `response` is in the correct format. @@ -852,7 +853,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { key_type: &Some(ExpectedReturnType::BulkString), value_type: &Some(ExpectedReturnType::ArrayOfPairs), }), - b"XREAD" => Some(ExpectedReturnType::Map { + b"XREAD" | b"XREADGROUP" => Some(ExpectedReturnType::Map { key_type: &Some(ExpectedReturnType::BulkString), value_type: &Some(ExpectedReturnType::Map { key_type: &Some(ExpectedReturnType::BulkString), @@ -1205,6 +1206,28 @@ mod tests { )); } + #[test] + fn convert_xreadgroup() { + assert!(matches!( + expected_type_for_cmd( + redis::cmd("XREADGROUP") + .arg("GROUP") + .arg("group") + .arg("consumer") + .arg("streams") + .arg("key") + .arg("id") + ), + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &Some(ExpectedReturnType::ArrayOfPairs), + }), + }) + )); + } + #[test] fn test_convert_empty_array_to_map_is_nil() { let mut cmd = redis::cmd("XREAD"); diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 0c72491cbd..9471efad8c 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -129,6 +129,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; +import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup; import static redis_request.RedisRequestOuterClass.RequestType.XRevRange; import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZAdd; @@ -195,6 +196,7 @@ import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import glide.api.models.configuration.BaseClientConfiguration; @@ -1429,6 +1431,22 @@ public CompletableFuture xgroupDelConsumer( XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse); } + @Override + public CompletableFuture>> xreadgroup( + @NonNull Map keysAndIds, @NonNull String group, @NonNull String consumer) { + return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build()); + } + + @Override + public CompletableFuture>> xreadgroup( + @NonNull Map keysAndIds, + @NonNull String group, + @NonNull String consumer, + @NonNull StreamReadGroupOptions options) { + String[] arguments = options.toArgs(group, consumer, keysAndIds); + return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index d51aeadd18..1c10c2b992 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -7,6 +7,7 @@ import glide.api.models.commands.stream.StreamRange; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import java.util.Map; @@ -68,7 +69,7 @@ public interface StreamBaseCommands { * @param keysAndIds A Map of keys and entry ids to read from. The * Map is composed of a stream's key and the id of the entry after which the stream * will be read. - * @return A {@literal Map>} with stream + * @return A {@literal Map>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. * @example *
{@code
@@ -95,7 +96,7 @@ public interface StreamBaseCommands {
      *     Map is composed of a stream's key and the id of the entry after which the stream
      *     will be read.
      * @param options Options detailing how to read the stream {@link StreamReadOptions}.
-     * @return A {@literal Map>} with stream
+     * @return A {@literal Map>} with stream
      *     keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
      * @example
      *     
{@code
@@ -407,4 +408,89 @@ CompletableFuture xgroupCreate(
      * }
*/ CompletableFuture xgroupDelConsumer(String key, String group, String consumer); + + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal ">"} to receive only new messages. + * @param group The consumer group name. + * @param consumer The newly created consumer. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * Returns code>null if the consumer group does not exist. Returns a code>Map with a value of code>null
if the stream is empty. + * @example + *
{@code
+     * // create a new stream at "mystream", with stream id "1-0"
+     * Map xreadKeys = Map.of("myfield", "mydata");
+     * String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get();
+     * assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup"
+     * Map> streamReadResponse = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "myconsumer").get();
+     * // Returns "mystream": "1-0": {{"myfield", "mydata"}}
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+     * assert client.xdel("mystream", "1-0").get() == 1L;
+     * client.xreadgroup(Map.of("mystream", "0"), "mygroup", "myconsumer").get();
+     * // Returns "mystream": "1-0": null
+     * assert streamReadResponse.get("mystream").get("1-0") == null;
+     * 
+ */ + CompletableFuture>> xreadgroup( + Map keysAndIds, String group, String consumer); + + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal ">"} to receive only new messages. + * @param group The consumer group name. + * @param consumer The newly created consumer. + * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * Returns code>null if the consumer group does not exist. Returns a code>Map with a value of code>null if the stream is empty. + * @example + *
{@code
+     * // create a new stream at "mystream", with stream id "1-0"
+     * Map xreadKeys = Map.of("myfield", "mydata");
+     * String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get();
+     * assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup"
+     * StreamReadGroupOptions options = StreamReadGroupOptions.builder().count(1).build(); // retrieves only a single message at a time
+     * Map> streamReadResponse = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "myconsumer", options).get();
+     * // Returns "mystream": "1-0": {{"myfield", "mydata"}}
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+     * assert client.xdel("mystream", "1-0").get() == 1L;
+     * // read the first 10 items and acknowledge (ACK) them:
+     * StreamReadGroupOptions options = StreamReadGroupOptions.builder().count(10L).noack().build();
+     * streamReadResponse = client.xreadgroup(Map.of("mystream", "0"), "mygroup", "myconsumer", options).get();
+     * // Returns "mystream": "1-0": null
+     * assert streamReadResponse.get("mystream").get("1-0") == null;
+     * 
+ */ + CompletableFuture>> xreadgroup( + Map keysAndIds, + String group, + String consumer, + StreamReadGroupOptions options); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 66e75b457f..1f1255a27c 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -157,6 +157,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; +import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup; import static redis_request.RedisRequestOuterClass.RequestType.XRevRange; import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZAdd; @@ -233,6 +234,7 @@ import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import glide.api.models.configuration.ReadFrom; @@ -2767,7 +2769,7 @@ public T xadd( * @param keysAndIds An array of Pairs of keys and entry ids to read from. A * pair is composed of a stream's key and the id of the entry after which the stream * will be read. - * @return Command Response - A {@literal Map>} with stream + * @return Command Response - A {@literal Map>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. */ public T xread(@NonNull Map keysAndIds) { @@ -2782,7 +2784,7 @@ public T xread(@NonNull Map keysAndIds) { * pair is composed of a stream's key and the id of the entry after which the stream * will be read. * @param options options detailing how to read the stream {@link StreamReadOptions}. - * @return Command Response - A {@literal Map>} with stream + * @return Command Response - A {@literal Map>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. */ public T xread(@NonNull Map keysAndIds, @NonNull StreamReadOptions options) { @@ -3048,6 +3050,58 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull return getThis(); } + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal Map>} + * to receive only new messages. + * @param group The consumer group name. + * @param consumer The newly created consumer. + * @return Command Response - A {@literal Map>} with + * stream keys, to Map of stream-ids, to an array of pairings with format + * [[field, entry], [field, entry], ...]. + * Returns code>null if the consumer group does not exist. Returns a code>Map + * with a value of code>null if the stream is empty. + */ + public T xreadgroup( + @NonNull Map keysAndIds, @NonNull String group, @NonNull String consumer) { + return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build()); + } + + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal Map>} + * to receive only new messages. + * @param group The consumer group name. + * @param consumer The newly created consumer. + * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}. + * @return Command Response - A {@literal Map>} with + * stream keys, to Map of stream-ids, to an array of pairings with format + * [[field, entry], [field, entry], ...]. + * Returns code>null if the consumer group does not exist. Returns a code>Map + * with a value of code>null if the stream is empty. + */ + public T xreadgroup( + @NonNull Map keysAndIds, + @NonNull String group, + @NonNull String consumer, + @NonNull StreamReadGroupOptions options) { + protobufTransaction.addCommands( + buildCommand(XReadGroup, buildArgs(options.toArgs(group, consumer, keysAndIds)))); + return getThis(); + } + /** * Returns the remaining time to live of key that has a timeout, in milliseconds. * diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java new file mode 100644 index 0000000000..246b4e1128 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java @@ -0,0 +1,72 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.commands.stream; + +import glide.api.commands.StreamBaseCommands; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.experimental.SuperBuilder; + +/** + * Optional arguments for {@link StreamBaseCommands#xreadgroup(Map, String, String, + * StreamReadGroupOptions)} + * + * @see redis.io + */ +@SuperBuilder +public final class StreamReadGroupOptions extends StreamReadOptions { + + public static final String READ_GROUP_REDIS_API = "GROUP"; + public static final String READ_NOACK_REDIS_API = "NOACK"; + + /** + * If set, messages are not added to the Pending Entries List (PEL). This is equivalent to + * acknowledging the message when it is read. + */ + private boolean noack; + + public abstract static class StreamReadGroupOptionsBuilder< + C extends StreamReadGroupOptions, B extends StreamReadGroupOptionsBuilder> + extends StreamReadOptions.StreamReadOptionsBuilder { + public B noack() { + this.noack = true; + return self(); + } + } + + /** + * Converts options and the key-to-id input for {@link StreamBaseCommands#xreadgroup(Map, String, + * String, StreamReadGroupOptions)} into a String[]. + * + * @return String[] + */ + public String[] toArgs(String group, String consumer, Map streams) { + List optionArgs = new ArrayList<>(); + optionArgs.add(READ_GROUP_REDIS_API); + optionArgs.add(group); + optionArgs.add(consumer); + + if (this.count != null) { + optionArgs.add(READ_COUNT_REDIS_API); + optionArgs.add(count.toString()); + } + + if (this.block != null) { + optionArgs.add(READ_BLOCK_REDIS_API); + optionArgs.add(block.toString()); + } + + if (this.noack) { + optionArgs.add(READ_NOACK_REDIS_API); + } + + optionArgs.add(READ_STREAMS_REDIS_API); + Set> entrySet = streams.entrySet(); + optionArgs.addAll(entrySet.stream().map(Map.Entry::getKey).collect(Collectors.toList())); + optionArgs.addAll(entrySet.stream().map(Map.Entry::getValue).collect(Collectors.toList())); + + return optionArgs.toArray(new String[0]); + } +} diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java index 7baad14121..ad3d0fe421 100644 --- a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java @@ -7,15 +7,15 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import lombok.Builder; +import lombok.experimental.SuperBuilder; /** * Optional arguments for {@link StreamBaseCommands#xread(Map, StreamReadOptions)} * * @see redis.io */ -@Builder -public final class StreamReadOptions { +@SuperBuilder +public class StreamReadOptions { public static final String READ_COUNT_REDIS_API = "COUNT"; public static final String READ_BLOCK_REDIS_API = "BLOCK"; @@ -25,12 +25,12 @@ public final class StreamReadOptions { * If set, the request will be blocked for the set amount of milliseconds or until the server has * the required number of entries. Equivalent to BLOCK in the Redis API. */ - Long block; + protected Long block; /** * The maximal number of elements requested. Equivalent to COUNT in the Redis API. */ - Long count; + protected Long count; /** * Converts options and the key-to-id input for {@link StreamBaseCommands#xread(Map, diff --git a/java/client/src/main/java/glide/utils/ArrayTransformUtils.java b/java/client/src/main/java/glide/utils/ArrayTransformUtils.java index a251693293..8fea005cae 100644 --- a/java/client/src/main/java/glide/utils/ArrayTransformUtils.java +++ b/java/client/src/main/java/glide/utils/ArrayTransformUtils.java @@ -5,6 +5,7 @@ import glide.api.models.commands.geospatial.GeospatialData; import java.lang.reflect.Array; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -128,7 +129,10 @@ public static Map castMapOf2DArray( return null; } return mapOfArrays.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> castArrayofArrays(e.getValue(), clazz))); + .collect( + HashMap::new, + (m, e) -> m.put(e.getKey(), castArrayofArrays(e.getValue(), clazz)), + HashMap::putAll); } /** diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 57fdedca9f..b93b321904 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -33,6 +33,8 @@ import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.MINIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.RANGE_COUNT_REDIS_API; +import static glide.api.models.commands.stream.StreamReadGroupOptions.READ_GROUP_REDIS_API; +import static glide.api.models.commands.stream.StreamReadGroupOptions.READ_NOACK_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_BLOCK_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_COUNT_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_STREAMS_REDIS_API; @@ -195,6 +197,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; +import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup; import static redis_request.RedisRequestOuterClass.RequestType.XRevRange; import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZAdd; @@ -268,6 +271,7 @@ import glide.api.models.commands.stream.StreamRange; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import glide.api.models.commands.stream.StreamTrimOptions.MaxLen; @@ -4511,6 +4515,103 @@ public void xgroupDelConsumer() { assertEquals(result, payload); } + @SneakyThrows + @Test + public void xreadgroup_multiple_keys() { + // setup + String keyOne = "one"; + String streamIdOne = "id-one"; + String keyTwo = "two"; + String streamIdTwo = "id-two"; + String groupName = "testGroup"; + String consumerName = "consumerGroup"; + String[][] fieldValues = {{"field", "value"}}; + Map> completedResult = new LinkedHashMap<>(); + completedResult.put(keyOne, Map.of(streamIdOne, fieldValues)); + completedResult.put(keyTwo, Map.of(streamIdTwo, fieldValues)); + String[] arguments = { + READ_GROUP_REDIS_API, + groupName, + consumerName, + READ_STREAMS_REDIS_API, + keyOne, + keyTwo, + streamIdOne, + streamIdTwo + }; + + CompletableFuture>> testResponse = + new CompletableFuture<>(); + testResponse.complete(completedResult); + + // match on protobuf request + when(commandManager.>>submitNewCommand( + eq(XReadGroup), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + Map keysAndIds = new LinkedHashMap<>(); + keysAndIds.put(keyOne, streamIdOne); + keysAndIds.put(keyTwo, streamIdTwo); + CompletableFuture>> response = + service.xreadgroup(keysAndIds, groupName, consumerName); + Map> payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(completedResult, payload); + } + + @SneakyThrows + @Test + public void xreadgroup_with_options() { + // setup + String keyOne = "one"; + String streamIdOne = "id-one"; + Long block = 2L; + Long count = 10L; + String groupName = "testGroup"; + String consumerName = "consumerGroup"; + String[][] fieldValues = {{"field", "value"}}; + Map> completedResult = + Map.of(keyOne, Map.of(streamIdOne, fieldValues)); + String[] arguments = { + READ_GROUP_REDIS_API, + groupName, + consumerName, + READ_COUNT_REDIS_API, + count.toString(), + READ_BLOCK_REDIS_API, + block.toString(), + READ_NOACK_REDIS_API, + READ_STREAMS_REDIS_API, + keyOne, + streamIdOne + }; + + CompletableFuture>> testResponse = + new CompletableFuture<>(); + testResponse.complete(completedResult); + + // match on protobuf request + when(commandManager.>>submitNewCommand( + eq(XReadGroup), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture>> response = + service.xreadgroup( + Map.of(keyOne, streamIdOne), + groupName, + consumerName, + StreamReadGroupOptions.builder().block(block).count(count).noack().build()); + Map> payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(completedResult, payload); + } + @SneakyThrows @Test public void type_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 12b7d7e10f..2641d424f1 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -29,6 +29,8 @@ import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.MINIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.RANGE_COUNT_REDIS_API; +import static glide.api.models.commands.stream.StreamReadGroupOptions.READ_GROUP_REDIS_API; +import static glide.api.models.commands.stream.StreamReadGroupOptions.READ_NOACK_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_BLOCK_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_COUNT_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_STREAMS_REDIS_API; @@ -171,6 +173,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; +import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup; import static redis_request.RedisRequestOuterClass.RequestType.XRevRange; import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZAdd; @@ -235,6 +238,7 @@ import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.InfRangeBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions.MinId; import java.util.ArrayList; @@ -786,6 +790,34 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), transaction.xgroupDelConsumer("key", "group", "consumer"); results.add(Pair.of(XGroupDelConsumer, buildArgs("key", "group", "consumer"))); + transaction.xreadgroup(Map.of("key", "id"), "group", "consumer"); + results.add( + Pair.of( + XReadGroup, + buildArgs( + READ_GROUP_REDIS_API, "group", "consumer", READ_STREAMS_REDIS_API, "key", "id"))); + + transaction.xreadgroup( + Map.of("key", "id"), + "group", + "consumer", + StreamReadGroupOptions.builder().block(1L).count(2L).noack().build()); + results.add( + Pair.of( + XReadGroup, + buildArgs( + READ_GROUP_REDIS_API, + "group", + "consumer", + READ_COUNT_REDIS_API, + "2", + READ_BLOCK_REDIS_API, + "1", + READ_NOACK_REDIS_API, + READ_STREAMS_REDIS_API, + "key", + "id"))); + transaction.time(); results.add(Pair.of(Time, buildArgs())); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index aea6359eb4..7ba5ddff22 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -69,6 +69,7 @@ import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions.MaxLen; import glide.api.models.commands.stream.StreamTrimOptions.MinId; @@ -3480,7 +3481,7 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") - public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) { + public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) { String key = UUID.randomUUID().toString(); String stringKey = UUID.randomUUID().toString(); String groupName = "group" + UUID.randomUUID(); @@ -3510,8 +3511,43 @@ public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) { // Deletes a consumer that is not created yet returns 0 assertEquals(0L, client.xgroupDelConsumer(key, groupName, "not_a_consumer").get()); - // TODO use XREADGROUP to mark pending messages for the consumer so that we get non-zero return - assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + // Add two stream entries + String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get(); + assertNotNull(streamid_1); + String streamid_2 = client.xadd(key, Map.of("field2", "value2")).get(); + assertNotNull(streamid_2); + + // read the entire stream for the consumer and mark messages as pending + var result_1 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get(); + assertDeepEquals( + Map.of( + key, + Map.of( + streamid_1, new String[][] {{"field1", "value1"}}, + streamid_2, new String[][] {{"field2", "value2"}})), + result_1); + + // delete one of the streams + assertEquals(1L, client.xdel(key, new String[] {streamid_1}).get()); + + // now xreadgroup yeilds one empty stream and one non-empty stream + var result_2 = client.xreadgroup(Map.of(key, "0"), groupName, consumerName).get(); + assertEquals(2, result_2.get(key).size()); + assertNull(result_2.get(key).get(streamid_1)); + assertArrayEquals(new String[][] {{"field2", "value2"}}, result_2.get(key).get(streamid_2)); + + String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get(); + assertNotNull(streamid_3); + + // Delete the consumer group and expect 2 pending messages + assertEquals(2L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + + // Consume the last message with the previously deleted consumer (creates the consumer anew) + var result_3 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get(); + assertEquals(1, result_3.get(key).size()); + + // Delete the consumer group and expect the pending message + assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get()); // key is a string and cannot be created as a stream assertEquals(OK, client.set(stringKey, "not_a_stream").get()); @@ -3528,6 +3564,124 @@ public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) { assertInstanceOf(RequestException.class, executionException.getCause()); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xreadgroup_return_failures(BaseClient client) { + String key = "{key}:1" + UUID.randomUUID(); + String nonStreamKey = "{key}:3" + UUID.randomUUID(); + String field1 = "f1_"; + + // setup first entries in streams key1 and key2 + Map timestamp_1_1_map = new LinkedHashMap<>(); + timestamp_1_1_map.put(field1, field1 + "1"); + String timestamp_1_1 = + client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get(); + assertNotNull(timestamp_1_1); + + String groupName = "group" + UUID.randomUUID(); + String zeroStreamId = "0"; + String consumerName = "consumer" + UUID.randomUUID(); + + // create group and consumer for the group + assertEquals( + OK, + client + .xgroupCreate( + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); + + // First key exists, but it is not a stream + assertEquals(OK, client.set(nonStreamKey, "bar").get()); + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> + client + .xreadgroup( + Map.of(nonStreamKey, timestamp_1_1, key, timestamp_1_1), + groupName, + consumerName) + .get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + // Second key exists, but it is not a stream + executionException = + assertThrows( + ExecutionException.class, + () -> + client + .xreadgroup( + Map.of(key, timestamp_1_1, nonStreamKey, timestamp_1_1), + groupName, + consumerName) + .get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + try (var testClient = + client instanceof RedisClient + ? RedisClient.CreateClient(commonClientConfig().build()).get() + : RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + String timeoutKey = "{key}:2" + UUID.randomUUID(); + String timeoutGroupName = "group" + UUID.randomUUID(); + String timeoutConsumerName = "consumer" + UUID.randomUUID(); + + // Create a group read with the test client + // add a single stream entry and consumer + // the first call to ">" will return an update consumer group + // the second call to ">" will block waiting for new entries + // using anything other than ">" won't block, but will return the empty consumer result + // see: https://github.com/redis/redis/issues/6587 + assertEquals( + OK, + testClient + .xgroupCreate( + timeoutKey, + timeoutGroupName, + zeroStreamId, + StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue( + testClient.xgroupCreateConsumer(timeoutKey, timeoutGroupName, timeoutConsumerName).get()); + String streamid_1 = testClient.xadd(timeoutKey, Map.of("field1", "value1")).get(); + assertNotNull(streamid_1); + + // read the entire stream for the consumer and mark messages as pending + var result_1 = + testClient + .xreadgroup(Map.of(timeoutKey, ">"), timeoutGroupName, timeoutConsumerName) + .get(); + // returns an null result on the key + assertNull(result_1.get(key)); + + // subsequent calls to read ">" will block: + // ensure that command doesn't time out even if timeout > request timeout + long oneSecondInMS = 1000L; + assertNull( + testClient + .xreadgroup( + Map.of(timeoutKey, ">"), + timeoutGroupName, + timeoutConsumerName, + StreamReadGroupOptions.builder().block(oneSecondInMS).build()) + .get()); + + // with 0 timeout (no timeout) should never time out, + // but we wrap the test with timeout to avoid test failing or stuck forever + assertThrows( + TimeoutException.class, // <- future timeout, not command timeout + () -> + testClient + .xreadgroup( + Map.of(timeoutKey, ">"), + timeoutGroupName, + timeoutConsumerName, + StreamReadGroupOptions.builder().block(0L).build()) + .get(3, TimeUnit.SECONDS)); + } + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index 7db848ae4d..2ead3c6ca3 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -38,6 +38,8 @@ import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.IdBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; +import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions.MinId; import java.util.HashMap; import java.util.Map; @@ -737,15 +739,22 @@ private static Object[] streamCommands(BaseTransaction transaction) { .xadd(streamKey1, Map.of("field3", "value3"), StreamAddOptions.builder().id("0-3").build()) .xlen(streamKey1) .xread(Map.of(streamKey1, "0-2")) + .xread(Map.of(streamKey1, "0-2"), StreamReadOptions.builder().count(1L).build()) .xrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1")) .xrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"), 1L) .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1")) .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"), 1L) .xtrim(streamKey1, new MinId(true, "0-2")) - .xgroupCreate(streamKey1, groupName1, "0-0") + .xgroupCreate(streamKey1, groupName1, "0-2") .xgroupCreate( streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build()) .xgroupCreateConsumer(streamKey1, groupName1, consumer1) + .xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1) + .xreadgroup( + Map.of(streamKey1, "0-3"), + groupName1, + consumer1, + StreamReadGroupOptions.builder().count(2L).build()) .xgroupDelConsumer(streamKey1, groupName1, consumer1) .xgroupDestroy(streamKey1, groupName1) .xgroupDestroy(streamKey1, groupName2) @@ -759,6 +768,11 @@ private static Object[] streamCommands(BaseTransaction transaction) { Map.of( streamKey1, Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2")); + Map.of( + streamKey1, + Map.of( + "0-3", + new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2"), options); Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrange(streamKey1, "0-1", "0-1") Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrange(streamKey1, "0-1", "0-1", 1l) Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrevrange(streamKey1, "0-1", "0-1") @@ -768,7 +782,17 @@ private static Object[] streamCommands(BaseTransaction transaction) { OK, // xgroupCreate(streamKey1, groupName1, "0-0") OK, // xgroupCreate(streamKey1, groupName1, "0-0", options) true, // xgroupCreateConsumer(streamKey1, groupName1, consumer1) - 0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1) + Map.of( + streamKey1, + Map.of( + "0-3", + new String[][] { + {"field3", "value3"} + })), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1); + Map.of( + streamKey1, + Map.of()), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1, options); + 1L, // xgroupDelConsumer(streamKey1, groupName1, consumer1) true, // xgroupDestroy(streamKey1, groupName1) true, // xgroupDestroy(streamKey1, groupName2) 1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"});