Skip to content

Commit

Permalink
Apply additional polishing to ClusterCommandExecutor.
Browse files Browse the repository at this point in the history
  • Loading branch information
jxblum committed Oct 12, 2023
1 parent 07cebd4 commit 9bd8fda
Showing 1 changed file with 16 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.data.redis.connection;

import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -227,48 +226,39 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba

<T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResult<T>>> futures) {

NodeExceptionCollector exceptionCollector = new NodeExceptionCollector();
MultiNodeResult<T> result = new MultiNodeResult<>();
Object placeholder = new Object();
Map<Future<NodeResult<T>>, Object> safeguard = new IdentityHashMap<>();
NodeExceptionCollector exceptionCollector = new NodeExceptionCollector();

OUT: while (!futures.isEmpty()) {

for (;;) {
Iterator<Map.Entry<NodeExecution, Future<NodeResult<T>>>> entryIterator = futures.entrySet().iterator();

boolean timeout = false;
for (Map.Entry<NodeExecution, Future<NodeResult<T>>> entry : futures.entrySet()) {
while (entryIterator.hasNext()) {

Map.Entry<NodeExecution, Future<NodeResult<T>>> entry = entryIterator.next();
NodeExecution nodeExecution = entry.getKey();
Future<NodeResult<T>> futureNodeResult = entry.getValue();

try {
NodeResult<T> nodeResult = futureNodeResult.get(10L, TimeUnit.MICROSECONDS);

if (!safeguard.containsKey(futureNodeResult)) {

NodeResult<T> nodeResult = futureNodeResult.get(10L, TimeUnit.MICROSECONDS);

if (nodeExecution.isPositional()) {
result.add(nodeExecution.getPositionalKey(), nodeResult);
} else {
result.add(nodeResult);
}

safeguard.put(futureNodeResult, placeholder);
if (nodeExecution.isPositional()) {
result.add(nodeExecution.getPositionalKey(), nodeResult);
} else {
result.add(nodeResult);
}

entryIterator.remove();
} catch (ExecutionException exception) {
safeguard.put(futureNodeResult, placeholder);
entryIterator.remove();
exceptionCollector.addException(nodeExecution, exception.getCause());
} catch (TimeoutException ignore) {
timeout = true;
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
exceptionCollector.addException(nodeExecution, exception);
break;
break OUT;
}
}

if (!timeout) {
break;
}
}

if (exceptionCollector.hasExceptions()) {
Expand Down Expand Up @@ -300,7 +290,7 @@ public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCa

Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<>();

for (Entry<RedisClusterNode, PositionalKeys> entry : nodeKeyMap.entrySet()) {
for (Map.Entry<RedisClusterNode, PositionalKeys> entry : nodeKeyMap.entrySet()) {

if (entry.getKey().isMaster()) {
for (PositionalKey key : entry.getValue()) {
Expand Down

0 comments on commit 9bd8fda

Please sign in to comment.