Skip to content

Commit

Permalink
Java: Add the XGROUP CREATE and XGROUP DESTROY commands (#1557)
Browse files Browse the repository at this point in the history
* Java: Add the `XGROUP CREATE` and `XGROUP DESTROY` commands (#359)

* JAVA: Add the XGROUP CREATE and DESTROY command

Signed-off-by: Andrew Carbonetto <[email protected]>

* Fix XGROUP DESTROY

Signed-off-by: Andrew Carbonetto <[email protected]>

* Clean up for self-review

Signed-off-by: Andrew Carbonetto <[email protected]>

* cargo fmt

Signed-off-by: Andrew Carbonetto <[email protected]>

* Change builder to use constructors

Signed-off-by: Andrew Carbonetto <[email protected]>

* Add optional javadocs

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update options to use builder

Signed-off-by: Andrew Carbonetto <[email protected]>

* SPOTLESS

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update doc

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto authored Jun 17, 2024
1 parent a7eccad commit fe82f1b
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 13 deletions.
5 changes: 2 additions & 3 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,9 +861,8 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
}),
b"INCRBYFLOAT" | b"HINCRBYFLOAT" | b"ZINCRBY" => Some(ExpectedReturnType::Double),
b"HEXISTS" | b"HSETNX" | b"EXPIRE" | b"EXPIREAT" | b"PEXPIRE" | b"PEXPIREAT"
| b"SISMEMBER" | b"PERSIST" | b"SMOVE" | b"RENAMENX" | b"MOVE" | b"COPY" | b"MSETNX" => {
Some(ExpectedReturnType::Boolean)
}
| b"SISMEMBER" | b"PERSIST" | b"SMOVE" | b"RENAMENX" | b"MOVE" | b"COPY"
| b"XGROUP DESTROY" | b"MSETNX" => Some(ExpectedReturnType::Boolean),
b"SMISMEMBER" => Some(ExpectedReturnType::ArrayOfBools),
b"SMEMBERS" | b"SINTER" | b"SDIFF" => Some(ExpectedReturnType::Set),
b"ZSCORE" | b"GEODIST" => Some(ExpectedReturnType::DoubleOrNull),
Expand Down
26 changes: 26 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
Expand Down Expand Up @@ -189,6 +191,7 @@
import glide.api.models.commands.geospatial.GeoUnit;
import glide.api.models.commands.geospatial.GeospatialData;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
Expand Down Expand Up @@ -1414,6 +1417,29 @@ public CompletableFuture<Map<String, String[][]>> xrevrange(
response -> castMapOf2DArray(handleMapResponse(response), String.class));
}

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull String key, @NonNull String groupname, @NonNull String id) {
return commandManager.submitNewCommand(
XGroupCreate, new String[] {key, groupname, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull String key,
@NonNull String groupname,
@NonNull String id,
@NonNull StreamGroupOptions options) {
String[] arguments = concatenateArrays(new String[] {key, groupname, id}, options.toArgs());
return commandManager.submitNewCommand(XGroupCreate, arguments, this::handleStringResponse);
}

@Override
public CompletableFuture<Boolean> xgroupDestroy(@NonNull String key, @NonNull String groupname) {
return commandManager.submitNewCommand(
XGroupDestroy, new String[] {key, groupname}, this::handleBooleanResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamRange.IdBound;
import glide.api.models.commands.stream.StreamRange.InfRangeBound;
Expand All @@ -23,7 +24,7 @@ public interface StreamBaseCommands {
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return The id of the added entry.
Expand All @@ -39,7 +40,7 @@ public interface StreamBaseCommands {
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options {@link StreamAddOptions}.
Expand All @@ -63,7 +64,7 @@ public interface StreamBaseCommands {
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xread/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
Expand All @@ -89,7 +90,7 @@ public interface StreamBaseCommands {
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xread/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
Expand Down Expand Up @@ -117,7 +118,7 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xread(
/**
* Trims the stream by evicting older entries.
*
* @see <a href="https://redis.io/commands/xtrim/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xtrim/">valkey.io</a> for details.
* @param key The key of the stream.
* @param options Stream trim options {@link StreamTrimOptions}.
* @return The number of entries deleted from the stream.
Expand Down Expand Up @@ -169,6 +170,7 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xread(
/**
* Returns stream entries matching a given range of IDs.
*
* @see <a href="https://valkey.io/commands/xrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -205,6 +207,7 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xread(
/**
* Returns stream entries matching a given range of IDs.
*
* @see <a href="https://valkey.io/commands/xrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -242,6 +245,7 @@ CompletableFuture<Map<String, String[][]>> xrange(
* Equivalent to {@link #xrange(String, StreamRange, StreamRange)} but returns the entries in
* reverse order.
*
* @see <a href="https://valkey.io/commands/xrevrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param end Ending stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -281,6 +285,7 @@ CompletableFuture<Map<String, String[][]>> xrevrange(
* Equivalent to {@link #xrange(String, StreamRange, StreamRange, long)} but returns the entries
* in reverse order.
*
* @see <a href="https://valkey.io/commands/xrevrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param end Ending stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -312,4 +317,59 @@ CompletableFuture<Map<String, String[][]>> xrevrange(
*/
CompletableFuture<Map<String, String[][]>> xrevrange(
String key, StreamRange end, StreamRange start, long count);

/**
* Creates a new consumer group uniquely identified by <code>groupname</code> for the stream
* stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
* @return <code>OK</code>.
* @example
* <pre>{@code
* // Create the consumer group "mygroup", using zero as the starting ID:
* assert client.xgroupCreate("mystream", "mygroup", "0-0").get().equals("OK");
* }</pre>
*/
CompletableFuture<String> xgroupCreate(String key, String groupname, String id);

/**
* Creates a new consumer group uniquely identified by <code>groupname</code> for the stream
* stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
* @param options The group options {@link StreamGroupOptions}.
* @return <code>OK</code>.
* @example
* <pre>{@code
* // Create the consumer group "mygroup", and the stream if it does not exist, after the last ID
* assert client.xgroupCreate("mystream", "mygroup", "$", new StreamGroupOptions(true)).get().equals("OK");
* }</pre>
*/
CompletableFuture<String> xgroupCreate(
String key, String groupname, String id, StreamGroupOptions options);

/**
* Destroys the consumer group <code>groupname</code> for the stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-destroy/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @return <code>true</code> if the consumer group is destroyed. Otherwise, <code>false</code>.
* @example
* <pre>{@code
* // Destroys the consumer group "mygroup"
* assert client.xgroupDestroy("mystream", "mygroup").get().equals("OK");
* }</pre>
*/
CompletableFuture<Boolean> xgroupDestroy(String key, String groupname);
}
72 changes: 67 additions & 5 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
Expand Down Expand Up @@ -224,6 +226,7 @@
import glide.api.models.commands.geospatial.GeospatialData;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
Expand Down Expand Up @@ -2721,7 +2724,7 @@ public T zinterWithScores(
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return Command Response - The id of the added entry.
Expand All @@ -2734,7 +2737,7 @@ public T xadd(@NonNull String key, @NonNull Map<String, String> values) {
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options {@link StreamAddOptions}.
Expand All @@ -2755,7 +2758,7 @@ public T xadd(
/**
* Reads entries from the given streams.
*
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xread/">valkey.io</a> for details.
* @param keysAndIds An array of <code>Pair</code>s of keys and entry ids to read from. A <code>
* pair</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
Expand All @@ -2769,7 +2772,7 @@ public T xread(@NonNull Map<String, String> keysAndIds) {
/**
* Reads entries from the given streams.
*
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xread/">valkey.io</a> for details.
* @param keysAndIds An array of <code>Pair</code>s of keys and entry ids to read from. A <code>
* pair</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
Expand All @@ -2785,7 +2788,7 @@ public T xread(@NonNull Map<String, String> keysAndIds, @NonNull StreamReadOptio
/**
* Trims the stream by evicting older entries.
*
* @see <a href="https://redis.io/commands/xtrim/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xtrim/">valkey.io</a> for details.
* @param key The key of the stream.
* @param options Stream trim options {@link StreamTrimOptions}.
* @return Command Response - The number of entries deleted from the stream.
Expand Down Expand Up @@ -2828,6 +2831,7 @@ public T xdel(@NonNull String key, @NonNull String[] ids) {
/**
* Returns stream entries matching a given range of IDs.
*
* @see <a href="https://valkey.io/commands/xrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -2856,6 +2860,7 @@ public T xrange(@NonNull String key, @NonNull StreamRange start, @NonNull Stream
/**
* Returns stream entries matching a given range of IDs.
*
* @see <a href="https://valkey.io/commands/xrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -2889,6 +2894,7 @@ public T xrange(
* Equivalent to {@link #xrange(String, StreamRange, StreamRange)} but returns the entries in
* reverse order.
*
* @see <a href="https://valkey.io/commands/xrevrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param end Ending stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -2919,6 +2925,7 @@ public T xrevrange(@NonNull String key, @NonNull StreamRange end, @NonNull Strea
* Equivalent to {@link #xrange(String, StreamRange, StreamRange, long)} but returns the entries
* in reverse order.
*
* @see <a href="https://valkey.io/commands/xrevrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -2947,6 +2954,61 @@ public T xrevrange(
return getThis();
}

/**
* Creates a new consumer group uniquely identified by <code>groupname</code> for the stream
* stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
* @return Command Response - <code>OK</code>.
*/
public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull String id) {
protobufTransaction.addCommands(buildCommand(XGroupCreate, buildArgs(key, groupname, id)));
return getThis();
}

/**
* Creates a new consumer group uniquely identified by <code>groupname</code> for the stream
* stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
* @param options The group options {@link StreamGroupOptions}.
* @return Command Response - <code>OK</code>.
*/
public T xgroupCreate(
@NonNull String key,
@NonNull String groupname,
@NonNull String id,
@NonNull StreamGroupOptions options) {
ArgsArray commandArgs =
buildArgs(concatenateArrays(new String[] {key, groupname, id}, options.toArgs()));
protobufTransaction.addCommands(buildCommand(XGroupCreate, commandArgs));
return getThis();
}

/**
* Destroys the consumer group <code>groupname</code> for the stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-destroy/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @return Command Response - <code>true</code> if the consumer group is destroyed. Otherwise,
* <code>false</code>.
*/
public T xgroupDestroy(@NonNull String key, @NonNull String groupname) {
protobufTransaction.addCommands(buildCommand(XGroupDestroy, buildArgs(key, groupname)));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Loading

0 comments on commit fe82f1b

Please sign in to comment.