Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add WATCH and UNWATCH command #1539

Merged
merged 10 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ enum RequestType {
LPos = 180;
LCS = 181;
GeoSearch = 182;
Watch = 183;
UnWatch = 184;
}

message Command {
Expand Down
6 changes: 6 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ pub enum RequestType {
LPos = 180,
LCS = 181,
GeoSearch = 182,
Watch = 183,
UnWatch = 184,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -383,6 +385,8 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::LPos => RequestType::LPos,
ProtobufRequestType::LCS => RequestType::LCS,
ProtobufRequestType::GeoSearch => RequestType::GeoSearch,
ProtobufRequestType::Watch => RequestType::Watch,
ProtobufRequestType::UnWatch => RequestType::UnWatch,
}
}
}
Expand Down Expand Up @@ -572,6 +576,8 @@ impl RequestType {
RequestType::LPos => Some(cmd("LPOS")),
RequestType::LCS => Some(cmd("LCS")),
RequestType::GeoSearch => Some(cmd("GEOSEARCH")),
RequestType::Watch => Some(cmd("WATCH")),
RequestType::UnWatch => Some(cmd("UNWATCH")),
}
}
}
16 changes: 15 additions & 1 deletion java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@
import static redis_request.RedisRequestOuterClass.RequestType.TTL;
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
Expand Down Expand Up @@ -163,6 +165,7 @@
import glide.api.commands.SortedSetBaseCommands;
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringBaseCommands;
import glide.api.commands.TransactionsBaseCommands;
import glide.api.models.Script;
import glide.api.models.commands.ExpireOptions;
import glide.api.models.commands.LInsertOptions.InsertPosition;
Expand Down Expand Up @@ -229,7 +232,8 @@ public abstract class BaseClient
StreamBaseCommands,
HyperLogLogBaseCommands,
GeospatialIndicesBaseCommands,
ScriptingAndFunctionsBaseCommands {
ScriptingAndFunctionsBaseCommands,
TransactionsBaseCommands {

/** Redis simple string response with "OK" */
public static final String OK = ConstantResponse.OK.toString();
Expand Down Expand Up @@ -1872,4 +1876,14 @@ public CompletableFuture<Long> lcsLen(@NonNull String key1, @NonNull String key2
String[] arguments = new String[] {key1, key2, LEN_REDIS_API};
return commandManager.submitNewCommand(LCS, arguments, this::handleLongResponse);
}

@Override
public CompletableFuture<String> watch(@NonNull String[] keys) {
return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse);
}

@Override
public CompletableFuture<String> unwatch() {
return commandManager.submitNewCommand(UnWatch, new String[0], this::handleStringResponse);
}
}
11 changes: 10 additions & 1 deletion java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import static redis_request.RedisRequestOuterClass.RequestType.Lolwut;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;

import glide.api.commands.ConnectionManagementClusterCommands;
import glide.api.commands.GenericClusterCommands;
import glide.api.commands.ScriptingAndFunctionsClusterCommands;
import glide.api.commands.ServerManagementClusterCommands;
import glide.api.commands.TransactionsClusterCommands;
import glide.api.models.ClusterTransaction;
import glide.api.models.ClusterValue;
import glide.api.models.commands.FlushMode;
Expand All @@ -62,7 +64,8 @@ public class RedisClusterClient extends BaseClient
implements ConnectionManagementClusterCommands,
GenericClusterCommands,
ServerManagementClusterCommands,
ScriptingAndFunctionsClusterCommands {
ScriptingAndFunctionsClusterCommands,
TransactionsClusterCommands {

protected RedisClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
Expand Down Expand Up @@ -651,4 +654,10 @@ public CompletableFuture<ClusterValue<Map<String, Map<String, Object>>>> functio
route,
response -> handleFunctionStatsResponse(response, route instanceof SingleNodeRoute));
}

@Override
public CompletableFuture<String> unwatch(@NonNull Route route) {
Copy link
Collaborator

@Yury-Fridlyand Yury-Fridlyand Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBD do we need this? Does user need this?

return commandManager.submitNewCommand(
UnWatch, new String[0], route, this::handleStringResponse);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import java.util.concurrent.CompletableFuture;

/**
* Supports commands for the "Transactions Commands" group for standalone and cluster clients.
*
* @see <a href="https://redis.io/commands/?group=transactions">Transactions Commands</a>
*/
public interface TransactionsBaseCommands {
/**
* Marks the given keys to be watched for conditional execution of a transaction. Transactions
* will only execute commands if the watched keys are not modified before execution of the
* transaction.
*
* @apiNote When in cluster mode, the command may route to multiple nodes when <code>keys</code>
* map to different hash slots.
* @see <a href="https://redis.io/docs/latest/commands/watch/">redis.io</a> for details.
* @param keys The keys to watch.
* @return <code>OK</code>.
* @example
* <pre>{@code
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* transaction.set("sampleKey", "foobar");
* Object[] result = client.exec(transaction).get();
* assert result != null; // Executes successfully and keys are unwatched.
*
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* transaction.set("sampleKey", "foobar");
* assert client.set("sampleKey", "hello world").get().equals("OK");
* Object[] result = client.exec(transaction).get();
* assert result == null; // null is returned when the watched key is modified before transaction execution.
* }</pre>
*/
CompletableFuture<String> watch(String[] keys);

/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
* automatically flush all previously watched keys.
*
GumpacG marked this conversation as resolved.
Show resolved Hide resolved
* @see <a href="https://redis.io/docs/latest/commands/unwatch/">redis.io</a> for details.
* @return <code>OK</code>.
* @example
* <pre>{@code
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* assert client.unwatch().get().equals("OK"); // Flushes "sampleKey" from watched keys.
* }</pre>
*/
CompletableFuture<String> unwatch();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import java.util.concurrent.CompletableFuture;

/**
* Supports commands for the "Transactions Commands" group for cluster clients.
*
* @see <a href="https://redis.io/commands/?group=transactions">Transactions Commands</a>
*/
public interface TransactionsClusterCommands {
/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
* automatically flush all previously watched keys. The command will be routed to all primary
* nodes.
*
* @see <a href="https://redis.io/docs/latest/commands/unwatch/">redis.io</a> for details.
* @return <code>OK</code>.
* @example
* <pre>{@code
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* assert client.unwatch().get().equals("OK"); // Flushes "sampleKey" from watched keys.
* }</pre>
*/
CompletableFuture<String> unwatch();

/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
* automatically flush all previously watched keys.
*
* @see <a href="https://redis.io/docs/latest/commands/unwatch/">redis.io</a> for details.
* @param route Specifies the routing configuration for the command. The client will route the
* command to the nodes defined by <code>route</code>.
* @return <code>OK</code>.
* @example
* <pre>{@code
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* assert client.unwatch(ALL_PRIMARIES).get().equals("OK"); // Flushes "sampleKey" from watched keys for all primary nodes.
* }</pre>
*/
CompletableFuture<String> unwatch(Route route);
}
45 changes: 45 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
Expand Down Expand Up @@ -6256,4 +6258,47 @@ public void lcs_with_len_option() {
assertEquals(testResponse, response);
assertEquals(value, payload);
}

@SneakyThrows
@Test
public void watch_returns_success() {
// setup
String key1 = "testKey1";
String key2 = "testKey2";
String[] arguments = new String[] {key1, key2};
CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(Watch), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.watch(arguments);
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void unwatch_returns_success() {
// setup
CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(UnWatch), eq(new String[0]), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.unwatch();
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}
}
41 changes: 41 additions & 0 deletions java/client/src/test/java/glide/api/RedisClusterClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Lolwut;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;

import glide.api.models.ClusterTransaction;
import glide.api.models.ClusterValue;
Expand Down Expand Up @@ -1453,6 +1454,46 @@ public void functionDelete_with_route_returns_success() {
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void unwatch_returns_success() {
// setup
CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(UnWatch), eq(new String[0]), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.unwatch();
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void unwatch_with_route_returns_success() {
// setup
CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(UnWatch), eq(new String[0]), eq(RANDOM), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.unwatch(RANDOM);
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void fcall_without_keys_and_without_args_returns_success() {
Expand Down
Loading
Loading