Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump redis-rs + Route Function Stats to all nodes #2117

Merged
merged 9 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@
* Node: Added PUBSUB * commands ([#2090](https://github.com/valkey-io/valkey-glide/pull/2090))
* Python: Added PUBSUB * commands ([#2043](https://github.com/valkey-io/valkey-glide/pull/2043))
* Node: Added XGROUP CREATE & XGROUP DESTROY commands ([#2084](https://github.com/valkey-io/valkey-glide/pull/2084))
* Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077))
* Node: Added BZPOPMAX & BZPOPMIN command ([#2077](https://github.com/valkey-io/valkey-glide/pull/2077))
* Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088))
* Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107))
* Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
* Core: Change FUNCTION STATS command to return multi node response for standalone mode ([#2117](https://github.com/valkey-io/valkey-glide/pull/2117))

#### Fixes
* Java: Add overloads for XADD to allow duplicate entry keys ([#1970](https://github.com/valkey-io/valkey-glide/pull/1970))
Expand Down
2 changes: 1 addition & 1 deletion glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl ReconnectingConnection {
create_connection(backend, connection_retry_strategy, push_sender).await
}

fn node_address(&self) -> String {
pub(crate) fn node_address(&self) -> String {
shohamazon marked this conversation as resolved.
Show resolved Hide resolved
self.inner
.backend
.connection_info
Expand Down
17 changes: 16 additions & 1 deletion glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,22 @@ impl StandaloneClient {
Some(ResponsePolicy::CombineMaps) => future::try_join_all(requests)
.await
.and_then(cluster_routing::combine_map_results),
Some(ResponsePolicy::Special) | None => {
Some(ResponsePolicy::Special) => {
// Await all futures and collect results
let results = future::try_join_all(requests).await?;
// Create key-value pairs where the key is the node address and the value is the corresponding result
let node_result_pairs = self
.inner
.nodes
.iter()
.zip(results)
.map(|(node, result)| (Value::BulkString(node.node_address().into()), result))
.collect();
shohamazon marked this conversation as resolved.
Show resolved Hide resolved

Ok(Value::Map(node_result_pairs))
}
shohamazon marked this conversation as resolved.
Show resolved Hide resolved

None => {
// This is our assumption - if there's no coherent way to aggregate the responses, we just collect them in an array, and pass it to the user.
// TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes.
future::try_join_all(requests).await.map(Value::Array)
Expand Down
35 changes: 33 additions & 2 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringBaseCommands;
import glide.api.commands.TransactionsBaseCommands;
import glide.api.models.ClusterValue;
import glide.api.models.GlideString;
import glide.api.models.PubSubMessage;
import glide.api.models.Script;
Expand Down Expand Up @@ -696,7 +697,7 @@ protected Map<GlideString, Object>[] handleFunctionListResponseBinary(Object[] r
return data;
}

/** Process a <code>FUNCTION STATS</code> standalone response. */
/** Process a <code>FUNCTION STATS</code> response from one node. */
protected Map<String, Map<String, Object>> handleFunctionStatsResponse(
Map<String, Map<String, Object>> response) {
Map<String, Object> runningScriptInfo = response.get("running_script");
Expand All @@ -707,7 +708,7 @@ protected Map<String, Map<String, Object>> handleFunctionStatsResponse(
return response;
}

/** Process a <code>FUNCTION STATS</code> standalone response. */
/** Process a <code>FUNCTION STATS</code> response from one node. */
protected Map<GlideString, Map<GlideString, Object>> handleFunctionStatsBinaryResponse(
Map<GlideString, Map<GlideString, Object>> response) {
Map<GlideString, Object> runningScriptInfo = response.get(gs("running_script"));
Expand All @@ -718,6 +719,36 @@ protected Map<GlideString, Map<GlideString, Object>> handleFunctionStatsBinaryRe
return response;
}

/** Process a <code>FUNCTION STATS</code> cluster response. */
protected ClusterValue<Map<String, Map<String, Object>>> handleFunctionStatsResponse(
Response response, boolean isSingleValue) {
if (isSingleValue) {
return ClusterValue.ofSingleValue(handleFunctionStatsResponse(handleMapResponse(response)));
} else {
Map<String, Map<String, Map<String, Object>>> data = handleMapResponse(response);
for (var nodeInfo : data.entrySet()) {
nodeInfo.setValue(handleFunctionStatsResponse(nodeInfo.getValue()));
}
return ClusterValue.ofMultiValue(data);
}
}

/** Process a <code>FUNCTION STATS</code> cluster response. */
protected ClusterValue<Map<GlideString, Map<GlideString, Object>>>
handleFunctionStatsBinaryResponse(Response response, boolean isSingleValue) {
if (isSingleValue) {
return ClusterValue.ofSingleValue(
handleFunctionStatsBinaryResponse(handleBinaryStringMapResponse(response)));
} else {
Map<GlideString, Map<GlideString, Map<GlideString, Object>>> data =
handleBinaryStringMapResponse(response);
for (var nodeInfo : data.entrySet()) {
nodeInfo.setValue(handleFunctionStatsBinaryResponse(nodeInfo.getValue()));
}
return ClusterValue.ofMultiValueBinary(data);
}
}

/** Process a <code>LCS key1 key2 IDX</code> response */
protected Map<String, Object> handleLcsIdxResponse(Map<String, Object> response)
throws GlideException {
Expand Down
9 changes: 5 additions & 4 deletions java/client/src/main/java/glide/api/GlideClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,19 +444,20 @@ public CompletableFuture<String> functionKill() {
}

@Override
public CompletableFuture<Map<String, Map<String, Object>>> functionStats() {
public CompletableFuture<Map<String, Map<String, Map<String, Object>>>> functionStats() {
return commandManager.submitNewCommand(
FunctionStats,
new String[0],
response -> handleFunctionStatsResponse(handleMapResponse(response)));
response -> handleFunctionStatsResponse(response, false).getMultiValue());
}

@Override
public CompletableFuture<Map<GlideString, Map<GlideString, Object>>> functionStatsBinary() {
public CompletableFuture<Map<String, Map<GlideString, Map<GlideString, Object>>>>
functionStatsBinary() {
return commandManager.submitNewCommand(
FunctionStats,
new GlideString[0],
response -> handleFunctionStatsBinaryResponse(handleBinaryStringMapResponse(response)));
response -> handleFunctionStatsBinaryResponse(response, false).getMultiValue());
}

@Override
Expand Down
30 changes: 0 additions & 30 deletions java/client/src/main/java/glide/api/GlideClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -946,36 +946,6 @@ public CompletableFuture<String> functionKill(@NonNull Route route) {
FunctionKill, new String[0], route, this::handleStringResponse);
}

/** Process a <code>FUNCTION STATS</code> cluster response. */
protected ClusterValue<Map<String, Map<String, Object>>> handleFunctionStatsResponse(
Response response, boolean isSingleValue) {
if (isSingleValue) {
return ClusterValue.ofSingleValue(handleFunctionStatsResponse(handleMapResponse(response)));
} else {
Map<String, Map<String, Map<String, Object>>> data = handleMapResponse(response);
for (var nodeInfo : data.entrySet()) {
nodeInfo.setValue(handleFunctionStatsResponse(nodeInfo.getValue()));
}
return ClusterValue.ofMultiValue(data);
}
}

/** Process a <code>FUNCTION STATS</code> cluster response. */
protected ClusterValue<Map<GlideString, Map<GlideString, Object>>>
handleFunctionStatsBinaryResponse(Response response, boolean isSingleValue) {
if (isSingleValue) {
return ClusterValue.ofSingleValue(
handleFunctionStatsBinaryResponse(handleBinaryStringMapResponse(response)));
} else {
Map<GlideString, Map<GlideString, Map<GlideString, Object>>> data =
handleBinaryStringMapResponse(response);
for (var nodeInfo : data.entrySet()) {
nodeInfo.setValue(handleFunctionStatsBinaryResponse(nodeInfo.getValue()));
}
return ClusterValue.ofMultiValueBinary(data);
}
}

@Override
public CompletableFuture<ClusterValue<Map<String, Map<String, Object>>>> functionStats() {
return commandManager.submitNewCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ CompletableFuture<ClusterValue<Object>> fcallReadOnly(
/**
* Kills a function that is currently executing.<br>
* <code>FUNCTION KILL</code> terminates read-only functions only.<br>
* The command will be routed to all primary nodes.
* The command will be routed to all nodes.
*
* @since Valkey 7.0 and above.
* @see <a href="https://valkey.io/commands/function-kill/">valkey.io</a> for details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ CompletableFuture<Map<GlideString, Object>[]> functionListBinary(

/**
* Kills a function that is currently executing.<br>
* <code>FUNCTION KILL</code> terminates read-only functions only.
* <code>FUNCTION KILL</code> terminates read-only functions only. <code>FUNCTION KILL</code> runs
* on all nodes of the server, including primary and replicas.
*
* @since Valkey 7.0 and above.
* @see <a href="https://valkey.io/commands/function-kill/">valkey.io</a> for details.
Expand All @@ -343,63 +344,73 @@ CompletableFuture<Map<GlideString, Object>[]> functionListBinary(

/**
* Returns information about the function that's currently running and information about the
* available execution engines.
* available execution engines.<br>
* <code>FUNCTION STATS</code> runs on all nodes of the server, including primary and replicas.
* The response includes a mapping from node address to the command response for that node.
*
* @since Valkey 7.0 and above.
* @see <a href="https://valkey.io/commands/function-stats/">valkey.io</a> for details.
* @return A <code>Map</code> with two keys:
* @return A <code>Map</code> from node address to the command response for that node, where the
* command contains a <code>Map</code> with two keys:
* <ul>
* <li><code>running_script</code> with information about the running script.
* <li><code>engines</code> with information about available engines and their stats.
* </ul>
* See example for more details.
* @example
* <pre>{@code
* Map<String, Map<String, Object>> response = client.functionStats().get();
* Map<String, Object> runningScriptInfo = response.get("running_script");
* if (runningScriptInfo != null) {
* String[] commandLine = (String[]) runningScriptInfo.get("command");
* System.out.printf("Server is currently running function '%s' with command line '%s', which has been running for %d ms%n",
* runningScriptInfo.get("name"), String.join(" ", commandLine), (long)runningScriptInfo.get("duration_ms"));
* }
* Map<String, Object> enginesInfo = response.get("engines");
* for (String engineName : enginesInfo.keySet()) {
* Map<String, Long> engine = (Map<String, Long>) enginesInfo.get(engineName);
* System.out.printf("Server supports engine '%s', which has %d libraries and %d functions in total%n",
* engineName, engine.get("libraries_count"), engine.get("functions_count"));
* Map<String, Map<String, Map<String, Object>>> response = client.functionStats().get();
* for (String node : response.keySet()) {
* Map<String, Object> runningScriptInfo = response.get(node).get("running_script");
* if (runningScriptInfo != null) {
* String[] commandLine = (String[]) runningScriptInfo.get("command");
* System.out.printf("Node '%s' is currently running function '%s' with command line '%s', which has been running for %d ms%n",
* node, runningScriptInfo.get("name"), String.join(" ", commandLine), (long)runningScriptInfo.get("duration_ms"));
* }
* Map<String, Object> enginesInfo = response.get(node).get("engines");
* for (String engineName : enginesInfo.keySet()) {
* Map<String, Long> engine = (Map<String, Long>) enginesInfo.get(engineName);
* System.out.printf("Node '%s' supports engine '%s', which has %d libraries and %d functions in total%n",
* node, engineName, engine.get("libraries_count"), engine.get("functions_count"));
* }
* }
* }</pre>
*/
CompletableFuture<Map<String, Map<String, Object>>> functionStats();
CompletableFuture<Map<String, Map<String, Map<String, Object>>>> functionStats();

/**
* Returns information about the function that's currently running and information about the
* available execution engines.
* available execution engines.<br>
* <code>FUNCTION STATS</code> runs on all nodes of the server, including primary and replicas.
* The response includes a mapping from node address to the command response for that node.
*
* @since Valkey 7.0 and above.
* @see <a href="https://valkey.io/commands/function-stats/">valkey.io</a> for details.
* @return A <code>Map</code> with two keys:
* @return A <code>Map</code> from node address to the command response for that node, where the
* command contains a <code>Map</code> with two keys:
* <ul>
* <li><code>running_script</code> with information about the running script.
* <li><code>engines</code> with information about available engines and their stats.
* </ul>
* See example for more details.
* @example
* <pre>{@code
* Map<GlideString, Map<GlideString, Object>> response = client.functionStats().get();
* Map<GlideString, Object> runningScriptInfo = response.get(gs("running_script"));
* if (runningScriptInfo != null) {
* GlideString[] commandLine = (GlideString[]) runningScriptInfo.get(gs("command"));
* System.out.printf("Server is currently running function '%s' with command line '%s', which has been running for %d ms%n",
* runningScriptInfo.get(gs("name")), String.join(" ", Arrays.toString(commandLine)), (long)runningScriptInfo.get(gs("duration_ms")));
* }
* Map<GlideString, Object> enginesInfo = response.get(gs("engines"));
* for (GlideString engineName : enginesInfo.keySet()) {
* Map<GlideString, Long> engine = (Map<GlideString, Long>) enginesInfo.get(gs(engineName));
* System.out.printf("Server supports engine '%s', which has %d libraries and %d functions in total%n",
* engineName, engine.get(gs("libraries_count")), engine.get(gs("functions_count")));
* Map<GlideString, Map<GlideString, Map<GlideString, Object>>> response = client.functionStats().get();
* for (String node : response.keySet()) {
* Map<GlideString, Object> runningScriptInfo = response.get(gs(node)).get(gs("running_script"));
* if (runningScriptInfo != null) {
* GlideString[] commandLine = (GlideString[]) runningScriptInfo.get(gs("command"));
* System.out.printf("Node '%s' is currently running function '%s' with command line '%s', which has been running for %d ms%n",
* node, runningScriptInfo.get(gs("name")), String.join(" ", Arrays.toString(commandLine)), (long)runningScriptInfo.get(gs("duration_ms")));
* }
* Map<GlideString, Object> enginesInfo = response.get(gs(node)).get(gs("engines"));
* for (GlideString engineName : enginesInfo.keySet()) {
* Map<GlideString, Long> engine = (Map<GlideString, Long>) enginesInfo.get(gs(engineName));
* System.out.printf("Node '%s' supports engine '%s', which has %d libraries and %d functions in total%n",
* node, engineName, engine.get(gs("libraries_count")), engine.get(gs("functions_count")));
* }
* }
* }</pre>
*/
CompletableFuture<Map<GlideString, Map<GlideString, Object>>> functionStatsBinary();
CompletableFuture<Map<String, Map<GlideString, Map<GlideString, Object>>>> functionStatsBinary();
}
24 changes: 14 additions & 10 deletions java/client/src/test/java/glide/api/GlideClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11485,18 +11485,21 @@ public void functionKill_returns_success() {
public void functionStats_returns_success() {
// setup
String[] args = new String[0];
Map<String, Map<String, Object>> value = Map.of("1", Map.of("2", 2));
CompletableFuture<Map<String, Map<String, Object>>> testResponse = new CompletableFuture<>();
Map<String, Map<String, Map<String, Object>>> value =
Map.of("::1", Map.of("1", Map.of("2", 2)));
CompletableFuture<Map<String, Map<String, Map<String, Object>>>> testResponse =
new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<Map<String, Map<String, Object>>>submitNewCommand(
when(commandManager.<Map<String, Map<String, Map<String, Object>>>>submitNewCommand(
eq(FunctionStats), eq(args), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<String, Map<String, Object>>> response = service.functionStats();
Map<String, Map<String, Object>> payload = response.get();
CompletableFuture<Map<String, Map<String, Map<String, Object>>>> response =
service.functionStats();
Map<String, Map<String, Map<String, Object>>> payload = response.get();

// verify
assertEquals(testResponse, response);
Expand All @@ -11508,20 +11511,21 @@ public void functionStats_returns_success() {
public void functionStatsBinary_returns_success() {
// setup
GlideString[] args = new GlideString[0];
Map<GlideString, Map<GlideString, Object>> value = Map.of(gs("1"), Map.of(gs("2"), 2));
CompletableFuture<Map<GlideString, Map<GlideString, Object>>> testResponse =
Map<String, Map<GlideString, Map<GlideString, Object>>> value =
Map.of("::1", Map.of(gs("1"), Map.of(gs("2"), 2)));
CompletableFuture<Map<String, Map<GlideString, Map<GlideString, Object>>>> testResponse =
new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<Map<GlideString, Map<GlideString, Object>>>submitNewCommand(
when(commandManager.<Map<String, Map<GlideString, Map<GlideString, Object>>>>submitNewCommand(
eq(FunctionStats), eq(args), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<GlideString, Map<GlideString, Object>>> response =
CompletableFuture<Map<String, Map<GlideString, Map<GlideString, Object>>>> response =
service.functionStatsBinary();
Map<GlideString, Map<GlideString, Object>> payload = response.get();
Map<String, Map<GlideString, Map<GlideString, Object>>> payload = response.get();

// verify
assertEquals(testResponse, response);
Expand Down
Loading
Loading