Skip to content

Commit

Permalink
Java: Add WATCH and UNWATCH command (valkey-io#1539)
Browse files Browse the repository at this point in the history
* Java: Add `WATCH` and `UNWATCH` command

* Addressed PR comments

* Added unwatch with route and improved tests

* Fixed test based on submodule changes

* Addressed PR comments

* Added example for watch returning null

* Added crossSlot test

* Addressed comments

* Commented out a test due to a bug

* Address PR comments
  • Loading branch information
GumpacG authored and cyip10 committed Jun 24, 2024
1 parent f017e0a commit b7b9f3b
Show file tree
Hide file tree
Showing 11 changed files with 381 additions and 26 deletions.
2 changes: 2 additions & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ enum RequestType {
LPos = 180;
LCS = 181;
GeoSearch = 182;
Watch = 183;
UnWatch = 184;
}

message Command {
Expand Down
6 changes: 6 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ pub enum RequestType {
LPos = 180,
LCS = 181,
GeoSearch = 182,
Watch = 183,
UnWatch = 184,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -383,6 +385,8 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::LPos => RequestType::LPos,
ProtobufRequestType::LCS => RequestType::LCS,
ProtobufRequestType::GeoSearch => RequestType::GeoSearch,
ProtobufRequestType::Watch => RequestType::Watch,
ProtobufRequestType::UnWatch => RequestType::UnWatch,
}
}
}
Expand Down Expand Up @@ -572,6 +576,8 @@ impl RequestType {
RequestType::LPos => Some(cmd("LPOS")),
RequestType::LCS => Some(cmd("LCS")),
RequestType::GeoSearch => Some(cmd("GEOSEARCH")),
RequestType::Watch => Some(cmd("WATCH")),
RequestType::UnWatch => Some(cmd("UNWATCH")),
}
}
}
16 changes: 15 additions & 1 deletion java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@
import static redis_request.RedisRequestOuterClass.RequestType.TTL;
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
Expand Down Expand Up @@ -163,6 +165,7 @@
import glide.api.commands.SortedSetBaseCommands;
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringBaseCommands;
import glide.api.commands.TransactionsBaseCommands;
import glide.api.models.Script;
import glide.api.models.commands.ExpireOptions;
import glide.api.models.commands.LInsertOptions.InsertPosition;
Expand Down Expand Up @@ -229,7 +232,8 @@ public abstract class BaseClient
StreamBaseCommands,
HyperLogLogBaseCommands,
GeospatialIndicesBaseCommands,
ScriptingAndFunctionsBaseCommands {
ScriptingAndFunctionsBaseCommands,
TransactionsBaseCommands {

/** Redis simple string response with "OK" */
public static final String OK = ConstantResponse.OK.toString();
Expand Down Expand Up @@ -1872,4 +1876,14 @@ public CompletableFuture<Long> lcsLen(@NonNull String key1, @NonNull String key2
String[] arguments = new String[] {key1, key2, LEN_REDIS_API};
return commandManager.submitNewCommand(LCS, arguments, this::handleLongResponse);
}

@Override
public CompletableFuture<String> watch(@NonNull String[] keys) {
return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse);
}

@Override
public CompletableFuture<String> unwatch() {
return commandManager.submitNewCommand(UnWatch, new String[0], this::handleStringResponse);
}
}
11 changes: 10 additions & 1 deletion java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import static redis_request.RedisRequestOuterClass.RequestType.Lolwut;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;

import glide.api.commands.ConnectionManagementClusterCommands;
import glide.api.commands.GenericClusterCommands;
import glide.api.commands.ScriptingAndFunctionsClusterCommands;
import glide.api.commands.ServerManagementClusterCommands;
import glide.api.commands.TransactionsClusterCommands;
import glide.api.models.ClusterTransaction;
import glide.api.models.ClusterValue;
import glide.api.models.commands.FlushMode;
Expand All @@ -62,7 +64,8 @@ public class RedisClusterClient extends BaseClient
implements ConnectionManagementClusterCommands,
GenericClusterCommands,
ServerManagementClusterCommands,
ScriptingAndFunctionsClusterCommands {
ScriptingAndFunctionsClusterCommands,
TransactionsClusterCommands {

protected RedisClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
Expand Down Expand Up @@ -651,4 +654,10 @@ public CompletableFuture<ClusterValue<Map<String, Map<String, Object>>>> functio
route,
response -> handleFunctionStatsResponse(response, route instanceof SingleNodeRoute));
}

@Override
public CompletableFuture<String> unwatch(@NonNull Route route) {
return commandManager.submitNewCommand(
UnWatch, new String[0], route, this::handleStringResponse);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import java.util.concurrent.CompletableFuture;

/**
* Supports commands for the "Transactions Commands" group for standalone and cluster clients.
*
* @see <a href="https://redis.io/commands/?group=transactions">Transactions Commands</a>
*/
public interface TransactionsBaseCommands {
/**
* Marks the given keys to be watched for conditional execution of a transaction. Transactions
* will only execute commands if the watched keys are not modified before execution of the
* transaction.
*
* @apiNote When in cluster mode, the command may route to multiple nodes when <code>keys</code>
* map to different hash slots.
* @see <a href="https://redis.io/docs/latest/commands/watch/">redis.io</a> for details.
* @param keys The keys to watch.
* @return <code>OK</code>.
* @example
* <pre>{@code
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* transaction.set("sampleKey", "foobar");
* Object[] result = client.exec(transaction).get();
* assert result != null; // Executes successfully and keys are unwatched.
*
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* transaction.set("sampleKey", "foobar");
* assert client.set("sampleKey", "hello world").get().equals("OK");
* Object[] result = client.exec(transaction).get();
* assert result == null; // null is returned when the watched key is modified before transaction execution.
* }</pre>
*/
CompletableFuture<String> watch(String[] keys);

/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
* automatically flush all previously watched keys.
*
* @see <a href="https://redis.io/docs/latest/commands/unwatch/">redis.io</a> for details.
* @return <code>OK</code>.
* @example
* <pre>{@code
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* assert client.unwatch().get().equals("OK"); // Flushes "sampleKey" from watched keys.
* }</pre>
*/
CompletableFuture<String> unwatch();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import java.util.concurrent.CompletableFuture;

/**
* Supports commands for the "Transactions Commands" group for cluster clients.
*
* @see <a href="https://redis.io/commands/?group=transactions">Transactions Commands</a>
*/
public interface TransactionsClusterCommands {
/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
* automatically flush all previously watched keys. The command will be routed to all primary
* nodes.
*
* @see <a href="https://redis.io/docs/latest/commands/unwatch/">redis.io</a> for details.
* @return <code>OK</code>.
* @example
* <pre>{@code
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* assert client.unwatch().get().equals("OK"); // Flushes "sampleKey" from watched keys.
* }</pre>
*/
CompletableFuture<String> unwatch();

/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
* automatically flush all previously watched keys.
*
* @see <a href="https://redis.io/docs/latest/commands/unwatch/">redis.io</a> for details.
* @param route Specifies the routing configuration for the command. The client will route the
* command to the nodes defined by <code>route</code>.
* @return <code>OK</code>.
* @example
* <pre>{@code
* assert client.watch(new String[] {"sampleKey"}).get().equals("OK");
* assert client.unwatch(ALL_PRIMARIES).get().equals("OK"); // Flushes "sampleKey" from watched keys for all primary nodes.
* }</pre>
*/
CompletableFuture<String> unwatch(Route route);
}
45 changes: 45 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
Expand Down Expand Up @@ -6256,4 +6258,47 @@ public void lcs_with_len_option() {
assertEquals(testResponse, response);
assertEquals(value, payload);
}

@SneakyThrows
@Test
public void watch_returns_success() {
// setup
String key1 = "testKey1";
String key2 = "testKey2";
String[] arguments = new String[] {key1, key2};
CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(Watch), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.watch(arguments);
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void unwatch_returns_success() {
// setup
CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(UnWatch), eq(new String[0]), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.unwatch();
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}
}
41 changes: 41 additions & 0 deletions java/client/src/test/java/glide/api/RedisClusterClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Lolwut;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;

import glide.api.models.ClusterTransaction;
import glide.api.models.ClusterValue;
Expand Down Expand Up @@ -1453,6 +1454,46 @@ public void functionDelete_with_route_returns_success() {
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void unwatch_returns_success() {
// setup
CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(UnWatch), eq(new String[0]), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.unwatch();
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void unwatch_with_route_returns_success() {
// setup
CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(UnWatch), eq(new String[0]), eq(RANDOM), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.unwatch(RANDOM);
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void fcall_without_keys_and_without_args_returns_success() {
Expand Down
Loading

0 comments on commit b7b9f3b

Please sign in to comment.