Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add the XGROUP CREATE and XGROUP DESTROY commands #1557

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,8 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
}),
b"INCRBYFLOAT" | b"HINCRBYFLOAT" | b"ZINCRBY" => Some(ExpectedReturnType::Double),
b"HEXISTS" | b"HSETNX" | b"EXPIRE" | b"EXPIREAT" | b"PEXPIRE" | b"PEXPIREAT"
| b"SISMEMBER" | b"PERSIST" | b"SMOVE" | b"RENAMENX" | b"MOVE" | b"COPY" | b"MSETNX" => {
Some(ExpectedReturnType::Boolean)
}
| b"SISMEMBER" | b"PERSIST" | b"SMOVE" | b"RENAMENX" | b"MOVE" | b"COPY"
| b"XGROUP DESTROY" | b"MSETNX" => Some(ExpectedReturnType::Boolean),
b"SMISMEMBER" => Some(ExpectedReturnType::ArrayOfBools),
b"SMEMBERS" | b"SINTER" | b"SDIFF" => Some(ExpectedReturnType::Set),
b"ZSCORE" | b"GEODIST" => Some(ExpectedReturnType::DoubleOrNull),
Expand Down
26 changes: 26 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
Expand Down Expand Up @@ -182,6 +184,7 @@
import glide.api.models.commands.geospatial.GeoUnit;
import glide.api.models.commands.geospatial.GeospatialData;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
Expand Down Expand Up @@ -1362,6 +1365,29 @@ public CompletableFuture<Map<String, String[][]>> xrevrange(
response -> castMapOf2DArray(handleMapResponse(response), String.class));
}

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull String key, @NonNull String groupname, @NonNull String id) {
return commandManager.submitNewCommand(
XGroupCreate, new String[] {key, groupname, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull String key,
@NonNull String groupname,
@NonNull String id,
@NonNull StreamGroupOptions options) {
String[] arguments = concatenateArrays(new String[] {key, groupname, id}, options.toArgs());
return commandManager.submitNewCommand(XGroupCreate, arguments, this::handleStringResponse);
}

@Override
public CompletableFuture<Boolean> xgroupDestroy(@NonNull String key, @NonNull String groupname) {
return commandManager.submitNewCommand(
XGroupDestroy, new String[] {key, groupname}, this::handleBooleanResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamRange.IdBound;
import glide.api.models.commands.stream.StreamRange.InfRangeBound;
Expand All @@ -23,7 +24,7 @@ public interface StreamBaseCommands {
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return The id of the added entry.
Expand All @@ -39,7 +40,7 @@ public interface StreamBaseCommands {
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options {@link StreamAddOptions}.
Expand All @@ -63,7 +64,7 @@ public interface StreamBaseCommands {
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xread/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
Expand All @@ -89,7 +90,7 @@ public interface StreamBaseCommands {
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xread/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
Expand Down Expand Up @@ -117,7 +118,7 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xread(
/**
* Trims the stream by evicting older entries.
*
* @see <a href="https://redis.io/commands/xtrim/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xtrim/">valkey.io</a> for details.
* @param key The key of the stream.
* @param options Stream trim options {@link StreamTrimOptions}.
* @return The number of entries deleted from the stream.
Expand Down Expand Up @@ -169,6 +170,7 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xread(
/**
* Returns stream entries matching a given range of IDs.
*
* @see <a href="https://valkey.io/commands/xrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -205,6 +207,7 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xread(
/**
* Returns stream entries matching a given range of IDs.
*
* @see <a href="https://valkey.io/commands/xrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -242,6 +245,7 @@ CompletableFuture<Map<String, String[][]>> xrange(
* Equivalent to {@link #xrange(String, StreamRange, StreamRange)} but returns the entries in
* reverse order.
*
* @see <a href="https://valkey.io/commands/xrevrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param end Ending stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -281,6 +285,7 @@ CompletableFuture<Map<String, String[][]>> xrevrange(
* Equivalent to {@link #xrange(String, StreamRange, StreamRange, long)} but returns the entries
* in reverse order.
*
* @see <a href="https://valkey.io/commands/xrevrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param end Ending stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -312,4 +317,59 @@ CompletableFuture<Map<String, String[][]>> xrevrange(
*/
CompletableFuture<Map<String, String[][]>> xrevrange(
String key, StreamRange end, StreamRange start, long count);

/**
* Creates a new consumer group uniquely identified by <code>groupname</code> for the stream
* stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param id Stream ID that specifies the last delivered entry in the stream from the new group’s
* perspective. The special ID <code>"$"</code> can be used to specify the last entry in the
* stream.
* @return <code>OK</code>.
* @example
* <pre>{@code
* // Create the consumer group "mygroup", using zero as the starting ID:
* assert client.xgroupCreate("mystream", "mygroup", "0-0").get().equals("OK");
* }</pre>
*/
CompletableFuture<String> xgroupCreate(String key, String groupname, String id);

/**
* Creates a new consumer group uniquely identified by <code>groupname</code> for the stream
* stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param id Stream ID that specifies the last delivered entry in the stream from the new group’s
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* perspective. The special ID <code>"$"</code> can be used to specify the last entry in the
* stream.
* @param options The group options {@link StreamGroupOptions}.
* @return <code>OK</code>.
* @example
* <pre>{@code
* // Create the consumer group "mygroup", and the stream if it does not exist, after the last ID
* assert client.xgroupCreate("mystream", "mygroup", "$", new StreamGroupOptions(true)).get().equals("OK");
* }</pre>
*/
CompletableFuture<String> xgroupCreate(
String key, String groupname, String id, StreamGroupOptions options);

/**
* Destroys the consumer group <code>groupname</code> for the stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-destroy/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @return <code>true</code> if the consumer group is destroyed. Otherwise, <code>false</code>.
* @example
* <pre>{@code
* // Destroys the consumer group "mygroup"
* assert client.xgroupDestroy("mystream", "mygroup").get().equals("OK");
* }</pre>
*/
CompletableFuture<Boolean> xgroupDestroy(String key, String groupname);
}
72 changes: 67 additions & 5 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
Expand Down Expand Up @@ -218,6 +220,7 @@
import glide.api.models.commands.geospatial.GeospatialData;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
Expand Down Expand Up @@ -2638,7 +2641,7 @@ public T zinterWithScores(
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return Command Response - The id of the added entry.
Expand All @@ -2651,7 +2654,7 @@ public T xadd(@NonNull String key, @NonNull Map<String, String> values) {
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options {@link StreamAddOptions}.
Expand All @@ -2672,7 +2675,7 @@ public T xadd(
/**
* Reads entries from the given streams.
*
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xread/">valkey.io</a> for details.
* @param keysAndIds An array of <code>Pair</code>s of keys and entry ids to read from. A <code>
* pair</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
Expand All @@ -2686,7 +2689,7 @@ public T xread(@NonNull Map<String, String> keysAndIds) {
/**
* Reads entries from the given streams.
*
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xread/">valkey.io</a> for details.
* @param keysAndIds An array of <code>Pair</code>s of keys and entry ids to read from. A <code>
* pair</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
Expand All @@ -2702,7 +2705,7 @@ public T xread(@NonNull Map<String, String> keysAndIds, @NonNull StreamReadOptio
/**
* Trims the stream by evicting older entries.
*
* @see <a href="https://redis.io/commands/xtrim/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/xtrim/">valkey.io</a> for details.
* @param key The key of the stream.
* @param options Stream trim options {@link StreamTrimOptions}.
* @return Command Response - The number of entries deleted from the stream.
Expand Down Expand Up @@ -2745,6 +2748,7 @@ public T xdel(@NonNull String key, @NonNull String[] ids) {
/**
* Returns stream entries matching a given range of IDs.
*
* @see <a href="https://valkey.io/commands/xrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -2773,6 +2777,7 @@ public T xrange(@NonNull String key, @NonNull StreamRange start, @NonNull Stream
/**
* Returns stream entries matching a given range of IDs.
*
* @see <a href="https://valkey.io/commands/xrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -2806,6 +2811,7 @@ public T xrange(
* Equivalent to {@link #xrange(String, StreamRange, StreamRange)} but returns the entries in
* reverse order.
*
* @see <a href="https://valkey.io/commands/xrevrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param end Ending stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -2836,6 +2842,7 @@ public T xrevrange(@NonNull String key, @NonNull StreamRange end, @NonNull Strea
* Equivalent to {@link #xrange(String, StreamRange, StreamRange, long)} but returns the entries
* in reverse order.
*
* @see <a href="https://valkey.io/commands/xrevrange/">valkey.io</a> for details.
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
Expand Down Expand Up @@ -2864,6 +2871,61 @@ public T xrevrange(
return getThis();
}

/**
* Creates a new consumer group uniquely identified by <code>groupname</code> for the stream
* stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param id Stream ID that specifies the last delivered entry in the stream from the new group’s
* perspective. The special ID <code>"$"</code> can be used to specify the last entry in the
* stream.
* @return Command Response - <code>OK</code>.
*/
public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull String id) {
protobufTransaction.addCommands(buildCommand(XGroupCreate, buildArgs(key, groupname, id)));
return getThis();
}

/**
* Creates a new consumer group uniquely identified by <code>groupname</code> for the stream
* stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param id Stream ID that specifies the last delivered entry in the stream from the new group’s
* perspective. The special ID <code>"$"</code> can be used to specify the last entry in the
* stream.
* @param options The group options {@link StreamGroupOptions}.
* @return Command Response - <code>OK</code>.
*/
public T xgroupCreate(
@NonNull String key,
@NonNull String groupname,
@NonNull String id,
@NonNull StreamGroupOptions options) {
ArgsArray commandArgs =
buildArgs(concatenateArrays(new String[] {key, groupname, id}, options.toArgs()));
protobufTransaction.addCommands(buildCommand(XGroupCreate, commandArgs));
return getThis();
}

/**
* Destroys the consumer group <code>groupname</code> for the stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-destroy/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @return Command Response - <code>true</code> if the consumer group is destroyed. Otherwise,
* <code>false</code>.
*/
public T xgroupDestroy(@NonNull String key, @NonNull String groupname) {
protobufTransaction.addCommands(buildCommand(XGroupDestroy, buildArgs(key, groupname)));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Loading
Loading