Skip to content

Commit

Permalink
Polishing.
Browse files Browse the repository at this point in the history
Simplify tests. Reuse existing interfaces from Spring. Remove inappropriate nullability annotations and introduce annotations where required.

Replace Future mocking with easier to maintain and to read future method overrides. Remove superfluous code and replace with infrastructure classes provided by Spring Framework.

Consistently name callbacks. Make exception collector concept explicit. Reformat code.

See #2518
Original pull request: #2719
  • Loading branch information
mp911de committed Oct 11, 2023
1 parent 781fda6 commit 8be6691
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 542 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,13 @@
*/
package org.springframework.data.redis.connection;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -111,47 +96,46 @@ public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterN
/**
* Run {@link ClusterCommandCallback} on a random node.
*
* @param clusterCommand must not be {@literal null}.
* @param commandCallback must not be {@literal null}.
* @return never {@literal null}.
*/
public <T> NodeResult<T> executeCommandOnArbitraryNode(ClusterCommandCallback<?, T> clusterCommand) {
public <T> NodeResult<T> executeCommandOnArbitraryNode(ClusterCommandCallback<?, T> commandCallback) {

Assert.notNull(clusterCommand, "ClusterCommandCallback must not be null");
Assert.notNull(commandCallback, "ClusterCommandCallback must not be null");

List<RedisClusterNode> nodes = new ArrayList<>(getClusterTopology().getActiveNodes());

RedisClusterNode arbitraryNode = nodes.get(new Random().nextInt(nodes.size()));

return executeCommandOnSingleNode(clusterCommand, arbitraryNode);
return executeCommandOnSingleNode(commandCallback, arbitraryNode);
}

/**
* Run {@link ClusterCommandCallback} on given {@link RedisClusterNode}.
*
* @param clusterCommand must not be {@literal null}.
* @param commandCallback must not be {@literal null}.
* @param node must not be {@literal null}.
* @return the {@link NodeResult} from the single, targeted {@link RedisClusterNode}.
* @throws IllegalArgumentException in case no resource can be acquired for given node.
*/
public <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> clusterCommand,
public <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> commandCallback,
RedisClusterNode node) {

return executeCommandOnSingleNode(clusterCommand, node, 0);
return executeCommandOnSingleNode(commandCallback, node, 0);
}

private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> clusterCommand,
private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> commandCallback,
RedisClusterNode node, int redirectCount) {

Assert.notNull(clusterCommand, "ClusterCommandCallback must not be null");
Assert.notNull(commandCallback, "ClusterCommandCallback must not be null");
Assert.notNull(node, "RedisClusterNode must not be null");

if (redirectCount > this.maxRedirects) {

String message = String.format("Cannot follow Cluster Redirects over more than %s legs;"
+ " Please consider increasing the number of redirects to follow; Current value is: %s.",
redirectCount, this.maxRedirects);

throw new TooManyClusterRedirectionsException(message);
throw new TooManyClusterRedirectionsException(String.format(
"Cannot follow Cluster Redirects over more than %s legs; "
+ "Consider increasing the number of redirects to follow; Current value is: %s.",
redirectCount, this.maxRedirects));
}

RedisClusterNode nodeToUse = lookupNode(node);
Expand All @@ -161,15 +145,14 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
Assert.notNull(client, "Could not acquire resource for node; Is your cluster info up to date");

try {
return new NodeResult<>(node, clusterCommand.doInCluster(client));
return new NodeResult<>(node, commandCallback.doInCluster(client));
} catch (RuntimeException cause) {

RuntimeException translatedException = convertToDataAccessException(cause);

if (translatedException instanceof ClusterRedirectException clusterRedirectException) {
return executeCommandOnSingleNode(clusterCommand, topologyProvider.getTopology()
.lookup(clusterRedirectException.getTargetHost(), clusterRedirectException.getTargetPort()),
redirectCount + 1);
return executeCommandOnSingleNode(commandCallback, topologyProvider.getTopology().lookup(
clusterRedirectException.getTargetHost(), clusterRedirectException.getTargetPort()), redirectCount + 1);
} else {
throw translatedException != null ? translatedException : cause;
}
Expand All @@ -182,7 +165,8 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
* Lookup {@link RedisClusterNode node} from the {@link ClusterTopology topology}.
*
* @param node {@link RedisClusterNode node} to lookup; must not be {@literal null}.
* @return the resolved {@link RedisClusterNode node} from the {@link ClusterTopology topology}; never {@literal null}.
* @return the resolved {@link RedisClusterNode node} from the {@link ClusterTopology topology}; never
* {@literal null}.
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
*/
private RedisClusterNode lookupNode(RedisClusterNode node) {
Expand All @@ -197,27 +181,27 @@ private RedisClusterNode lookupNode(RedisClusterNode node) {
/**
* Run {@link ClusterCommandCallback} on all reachable master nodes.
*
* @param clusterCommand must not be {@literal null}.
* @param commandCallback must not be {@literal null}.
* @return never {@literal null}.
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
*/
public <S, T> MultiNodeResult<T> executeCommandOnAllNodes(ClusterCommandCallback<S, T> clusterCommand) {
return executeCommandAsyncOnNodes(clusterCommand, getClusterTopology().getActiveMasterNodes());
public <S, T> MultiNodeResult<T> executeCommandOnAllNodes(ClusterCommandCallback<S, T> commandCallback) {
return executeCommandAsyncOnNodes(commandCallback, getClusterTopology().getActiveMasterNodes());
}

/**
* @param clusterCommand must not be {@literal null}.
* @param commandCallback must not be {@literal null}.
* @param nodes must not be {@literal null}.
* @return never {@literal null}.
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
*/
public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallback<S, T> clusterCommand,
public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallback<S, T> commandCallback,
Iterable<RedisClusterNode> nodes) {

Assert.notNull(clusterCommand, "Callback must not be null");
Assert.notNull(commandCallback, "Callback must not be null");
Assert.notNull(nodes, "Nodes must not be null");

ClusterTopology topology = this.topologyProvider.getTopology();
Expand All @@ -234,7 +218,7 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba
Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<>();

for (RedisClusterNode node : resolvedRedisClusterNodes) {
Callable<NodeResult<T>> nodeCommandExecution = () -> executeCommandOnSingleNode(clusterCommand, node);
Callable<NodeResult<T>> nodeCommandExecution = () -> executeCommandOnSingleNode(commandCallback, node);
futures.put(new NodeExecution(node), executor.submit(nodeCommandExecution));
}

Expand All @@ -243,26 +227,22 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba

<T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResult<T>>> futures) {

Map<RedisClusterNode, Throwable> exceptions = new HashMap<>();
NodeExceptionCollector exceptionCollector = new NodeExceptionCollector();
MultiNodeResult<T> result = new MultiNodeResult<>();
Set<String> safeguard = new HashSet<>();
Object placeholder = new Object();
Map<Future<NodeResult<T>>, Object> safeguard = new IdentityHashMap<>();

BiConsumer<NodeExecution, Throwable> exceptionHandler = getExceptionHandlerFunction(exceptions);

boolean done = false;

while (!done) {

done = true;
for (;;) {

boolean timeout = false;
for (Map.Entry<NodeExecution, Future<NodeResult<T>>> entry : futures.entrySet()) {

NodeExecution nodeExecution = entry.getKey();
Future<NodeResult<T>> futureNodeResult = entry.getValue();
String futureId = ObjectUtils.getIdentityHexString(futureNodeResult);

try {
if (!safeguard.contains(futureId)) {

if (!safeguard.containsKey(futureNodeResult)) {

NodeResult<T> nodeResult = futureNodeResult.get(10L, TimeUnit.MICROSECONDS);

Expand All @@ -272,39 +252,32 @@ <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResult<T>>>
result.add(nodeResult);
}

safeguard.add(futureId);
safeguard.put(futureNodeResult, placeholder);
}
} catch (ExecutionException exception) {
safeguard.add(futureId);
exceptionHandler.accept(nodeExecution, exception.getCause());
safeguard.put(futureNodeResult, placeholder);
exceptionCollector.addException(nodeExecution, exception.getCause());
} catch (TimeoutException ignore) {
done = false;
} catch (InterruptedException cause) {
timeout = true;
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
exceptionHandler.accept(nodeExecution, cause);
exceptionCollector.addException(nodeExecution, exception);
break;
}
}

if (!timeout) {
break;
}
}

if (!exceptions.isEmpty()) {
throw new ClusterCommandExecutionFailureException(new ArrayList<>(exceptions.values()));
if (exceptionCollector.hasExceptions()) {
throw new ClusterCommandExecutionFailureException(exceptionCollector.getExceptions());
}

return result;
}

private BiConsumer<NodeExecution, Throwable> getExceptionHandlerFunction(Map<RedisClusterNode, Throwable> exceptions) {

return (nodeExecution, throwable) -> {

DataAccessException dataAccessException = convertToDataAccessException((Exception) throwable);
Throwable resolvedException = dataAccessException != null ? dataAccessException : throwable;

exceptions.putIfAbsent(nodeExecution.getNode(), resolvedException);
};
}

/**
* Run {@link MultiKeyClusterCommandCallback} with on a curated set of nodes serving one or more keys.
*
Expand All @@ -331,8 +304,8 @@ public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCa

if (entry.getKey().isMaster()) {
for (PositionalKey key : entry.getValue()) {
futures.put(new NodeExecution(entry.getKey(), key), this.executor.submit(() ->
executeMultiKeyCommandOnSingleNode(commandCallback, entry.getKey(), key.getBytes())));
futures.put(new NodeExecution(entry.getKey(), key), this.executor
.submit(() -> executeMultiKeyCommandOnSingleNode(commandCallback, entry.getKey(), key.getBytes())));
}
}
}
Expand Down Expand Up @@ -458,8 +431,8 @@ boolean isPositional() {
}

/**
* {@link NodeResult} encapsulates the actual {@link T value} returned by a {@link ClusterCommandCallback}
* on a given {@link RedisClusterNode}.
* {@link NodeResult} encapsulates the actual {@link T value} returned by a {@link ClusterCommandCallback} on a given
* {@link RedisClusterNode}.
*
* @param <T> {@link Class Type} of the {@link Object value} returned in the result.
* @author Christoph Strobl
Expand All @@ -468,9 +441,9 @@ boolean isPositional() {
*/
public static class NodeResult<T> {

private RedisClusterNode node;
private ByteArrayWrapper key;
private @Nullable T value;
private final RedisClusterNode node;
private final ByteArrayWrapper key;
private final @Nullable T value;

/**
* Create a new {@link NodeResult}.
Expand Down Expand Up @@ -551,9 +524,8 @@ public boolean equals(@Nullable Object obj) {
return false;
}

return ObjectUtils.nullSafeEquals(this.getNode(), that.getNode())
&& Objects.equals(this.key, that.key)
&& Objects.equals(this.getValue(), that.getValue());
return ObjectUtils.nullSafeEquals(this.getNode(), that.getNode()) && Objects.equals(this.key, that.key)
&& Objects.equals(this.getValue(), that.getValue());
}

@Override
Expand Down Expand Up @@ -757,8 +729,7 @@ public boolean equals(@Nullable Object obj) {
if (!(obj instanceof PositionalKey that))
return false;

return this.getPosition() == that.getPosition()
&& ObjectUtils.nullSafeEquals(this.getKey(), that.getKey());
return this.getPosition() == that.getPosition() && ObjectUtils.nullSafeEquals(this.getKey(), that.getKey());
}

@Override
Expand Down Expand Up @@ -836,4 +807,34 @@ public Iterator<PositionalKey> iterator() {
return this.keys.iterator();
}
}

/**
* Collector for exceptions. Applies translation of exceptions if possible.
*/
private class NodeExceptionCollector {

private final Map<RedisClusterNode, Throwable> exceptions = new HashMap<>();

/**
* @return {@code true} if the collector contains at least one exception.
*/
public boolean hasExceptions() {
return !exceptions.isEmpty();
}

public void addException(NodeExecution execution, Throwable throwable) {

Throwable translated = throwable instanceof Exception e ? convertToDataAccessException(e) : throwable;
Throwable resolvedException = translated != null ? translated : throwable;

exceptions.putIfAbsent(execution.getNode(), resolvedException);
}

/**
* @return the collected exceptions.
*/
public List<? extends Throwable> getExceptions() {
return new ArrayList<>(exceptions.values());
}
}
}
Loading

0 comments on commit 8be6691

Please sign in to comment.