Skip to content

Commit

Permalink
support xgroup, xread and xreadgroup with GlideString (#1841)
Browse files Browse the repository at this point in the history
* support xgroup with GlideString

* replace REDIS_VERSION with SERVER_VERSION

* initial commit of supporting xread and xreadgroup with GlideString

* suuport xread and xreadgroup with GlideString
  • Loading branch information
alon-arenberg authored Jul 7, 2024
1 parent e26f3a0 commit e318665
Show file tree
Hide file tree
Showing 9 changed files with 1,511 additions and 46 deletions.
134 changes: 126 additions & 8 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldArgs;
import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldGlideStringArgs;
import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API;
import static glide.api.models.commands.stream.StreamReadOptions.READ_COUNT_REDIS_API;
import static glide.api.models.commands.stream.XInfoStreamOptions.COUNT;
import static glide.api.models.commands.stream.XInfoStreamOptions.FULL;
Expand Down Expand Up @@ -244,6 +245,7 @@
import glide.api.models.commands.scan.ZScanOptions;
import glide.api.models.commands.scan.ZScanOptionsBinary;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptionsBinary;
import glide.api.models.commands.stream.StreamClaimOptions;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamPendingOptions;
Expand Down Expand Up @@ -634,6 +636,25 @@ protected Map<String, Map<String, String[][]>> handleXReadResponse(Response resp
e -> castMapOf2DArray((Map<String, Object[][]>) e.getValue(), String.class)));
}

/**
* @param response A Protobuf response
* @return A map of a map of <code>GlideString[][]</code>
*/
protected Map<GlideString, Map<GlideString, GlideString[][]>> handleXReadResponseBinary(
Response response) throws RedisException {
Map<GlideString, Object> mapResponse = handleBinaryStringMapOrNullResponse(response);
if (mapResponse == null) {
return null;
}
return mapResponse.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
castMapOf2DArray(
(Map<GlideString, Object[][]>) e.getValue(), GlideString.class)));
}

@SuppressWarnings("unchecked") // raw Set cast to Set<String>
protected Set<String> handleSetResponse(Response response) throws RedisException {
return handleRedisResponse(Set.class, EnumSet.of(ResponseFlags.ENCODING_UTF8), response);
Expand Down Expand Up @@ -2331,7 +2352,7 @@ public CompletableFuture<String> xadd(@NonNull String key, @NonNull Map<String,
@Override
public CompletableFuture<GlideString> xadd(
@NonNull GlideString key, @NonNull Map<GlideString, GlideString> values) {
return xadd(key, values, StreamAddOptions.builder().build());
return xadd(key, values, StreamAddOptionsBinary.builder().build());
}

@Override
Expand All @@ -2347,13 +2368,14 @@ public CompletableFuture<String> xadd(
public CompletableFuture<GlideString> xadd(
@NonNull GlideString key,
@NonNull Map<GlideString, GlideString> values,
@NonNull StreamAddOptions options) {
String[] toArgsString = options.toArgs();
GlideString[] toArgs =
Arrays.stream(toArgsString).map(GlideString::gs).toArray(GlideString[]::new);
@NonNull StreamAddOptionsBinary options) {
GlideString[] arguments =
ArrayUtils.addAll(
ArrayUtils.addFirst(toArgs, key), convertMapToKeyValueGlideStringArray(values));
new ArgsBuilder()
.add(key)
.add(options.toArgs())
.add(convertMapToKeyValueGlideStringArray(values))
.toArray();

return commandManager.submitNewCommand(XAdd, arguments, this::handleGlideStringOrNullResponse);
}

Expand All @@ -2363,13 +2385,26 @@ public CompletableFuture<Map<String, Map<String, String[][]>>> xread(
return xread(keysAndIds, StreamReadOptions.builder().build());
}

@Override
public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xreadBinary(
@NonNull Map<GlideString, GlideString> keysAndIds) {
return xreadBinary(keysAndIds, StreamReadOptions.builder().build());
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xread(
@NonNull Map<String, String> keysAndIds, @NonNull StreamReadOptions options) {
String[] arguments = options.toArgs(keysAndIds);
return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponse);
}

@Override
public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xreadBinary(
@NonNull Map<GlideString, GlideString> keysAndIds, @NonNull StreamReadOptions options) {
GlideString[] arguments = options.toArgsBinary(keysAndIds);
return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponseBinary);
}

@Override
public CompletableFuture<Long> xtrim(@NonNull String key, @NonNull StreamTrimOptions options) {
String[] arguments = ArrayUtils.addFirst(options.toArgs(), key);
Expand Down Expand Up @@ -2501,6 +2536,13 @@ public CompletableFuture<String> xgroupCreate(
XGroupCreate, new String[] {key, groupName, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull GlideString key, @NonNull GlideString groupName, @NonNull GlideString id) {
return commandManager.submitNewCommand(
XGroupCreate, new GlideString[] {key, groupName, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull String key,
Expand All @@ -2511,38 +2553,96 @@ public CompletableFuture<String> xgroupCreate(
return commandManager.submitNewCommand(XGroupCreate, arguments, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull GlideString key,
@NonNull GlideString groupName,
@NonNull GlideString id,
@NonNull StreamGroupOptions options) {
GlideString[] arguments =
new ArgsBuilder().add(key).add(groupName).add(id).add(options.toArgs()).toArray();
return commandManager.submitNewCommand(XGroupCreate, arguments, this::handleStringResponse);
}

@Override
public CompletableFuture<Boolean> xgroupDestroy(@NonNull String key, @NonNull String groupname) {
return commandManager.submitNewCommand(
XGroupDestroy, new String[] {key, groupname}, this::handleBooleanResponse);
}

@Override
public CompletableFuture<Boolean> xgroupDestroy(
@NonNull GlideString key, @NonNull GlideString groupname) {
return commandManager.submitNewCommand(
XGroupDestroy, new GlideString[] {key, groupname}, this::handleBooleanResponse);
}

@Override
public CompletableFuture<Boolean> xgroupCreateConsumer(
@NonNull String key, @NonNull String group, @NonNull String consumer) {
return commandManager.submitNewCommand(
XGroupCreateConsumer, new String[] {key, group, consumer}, this::handleBooleanResponse);
}

@Override
public CompletableFuture<Boolean> xgroupCreateConsumer(
@NonNull GlideString key, @NonNull GlideString group, @NonNull GlideString consumer) {
return commandManager.submitNewCommand(
XGroupCreateConsumer,
new GlideString[] {key, group, consumer},
this::handleBooleanResponse);
}

@Override
public CompletableFuture<Long> xgroupDelConsumer(
@NonNull String key, @NonNull String group, @NonNull String consumer) {
return commandManager.submitNewCommand(
XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse);
}

@Override
public CompletableFuture<Long> xgroupDelConsumer(
@NonNull GlideString key, @NonNull GlideString group, @NonNull GlideString consumer) {
return commandManager.submitNewCommand(
XGroupDelConsumer, new GlideString[] {key, group, consumer}, this::handleLongResponse);
}

@Override
public CompletableFuture<String> xgroupSetId(
@NonNull String key, @NonNull String groupName, @NonNull String id) {
return commandManager.submitNewCommand(
XGroupSetId, new String[] {key, groupName, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupSetId(
@NonNull GlideString key, @NonNull GlideString groupName, @NonNull GlideString id) {
return commandManager.submitNewCommand(
XGroupSetId, new GlideString[] {key, groupName, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupSetId(
@NonNull String key, @NonNull String groupName, @NonNull String id, long entriesRead) {
String[] arguments =
new String[] {key, groupName, id, "ENTRIESREAD", Long.toString(entriesRead)};
new String[] {key, groupName, id, ENTRIES_READ_VALKEY_API, Long.toString(entriesRead)};
return commandManager.submitNewCommand(XGroupSetId, arguments, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupSetId(
@NonNull GlideString key,
@NonNull GlideString groupName,
@NonNull GlideString id,
long entriesRead) {
GlideString[] arguments =
new ArgsBuilder()
.add(key)
.add(groupName)
.add(id)
.add(ENTRIES_READ_VALKEY_API)
.add(entriesRead)
.toArray();
return commandManager.submitNewCommand(XGroupSetId, arguments, this::handleStringResponse);
}

Expand All @@ -2552,6 +2652,14 @@ public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build());
}

@Override
public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xreadgroup(
@NonNull Map<GlideString, GlideString> keysAndIds,
@NonNull GlideString group,
@NonNull GlideString consumer) {
return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build());
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
@NonNull Map<String, String> keysAndIds,
Expand All @@ -2562,6 +2670,16 @@ public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse);
}

@Override
public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xreadgroup(
@NonNull Map<GlideString, GlideString> keysAndIds,
@NonNull GlideString group,
@NonNull GlideString consumer,
@NonNull StreamReadGroupOptions options) {
GlideString[] arguments = options.toArgsBinary(group, consumer, keysAndIds);
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponseBinary);
}

@Override
public CompletableFuture<Long> xack(
@NonNull String key, @NonNull String group, @NonNull String[] ids) {
Expand Down
Loading

0 comments on commit e318665

Please sign in to comment.