Skip to content

Commit

Permalink
Adapt to API changes in the Jedis 5.0 driver.
Browse files Browse the repository at this point in the history
  • Loading branch information
jxblum committed Sep 15, 2023
1 parent 3db8c6d commit cfa00d7
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.springframework.data.redis.core.ScanCursor;
import org.springframework.data.redis.core.ScanIteration;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.util.Streamable;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* Cluster {@link RedisHashCommands} implementation for Jedis.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author John Blum
* @since 2.0
*/
class JedisClusterHashCommands implements RedisHashCommands {
Expand Down Expand Up @@ -160,10 +162,10 @@ public Entry<byte[], byte[]> hRandFieldWithValues(byte[] key) {
Assert.notNull(key, "Key must not be null");

try {
Map<byte[], byte[]> map = connection.getCluster().hrandfieldWithValues(key, 1);
return map.isEmpty() ? null : map.entrySet().iterator().next();
} catch (Exception ex) {
throw convertJedisAccessException(ex);
List<Entry<byte[], byte[]>> mapEntryList = connection.getCluster().hrandfieldWithValues(key, 1);
return mapEntryList.isEmpty() ? null : mapEntryList.get(0);
} catch (Exception cause) {
throw convertJedisAccessException(cause);
}
}

Expand All @@ -185,10 +187,9 @@ public List<byte[]> hRandField(byte[] key, long count) {
public List<Entry<byte[], byte[]>> hRandFieldWithValues(byte[] key, long count) {

try {
Map<byte[], byte[]> map = connection.getCluster().hrandfieldWithValues(key, count);
return Streamable.of(() -> map.entrySet().iterator()).toList();
} catch (Exception ex) {
throw convertJedisAccessException(ex);
return connection.getCluster().hrandfieldWithValues(key, count);
} catch (Exception cause) {
throw convertJedisAccessException(cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
*/
package org.springframework.data.redis.connection.jedis;

import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,9 +37,14 @@
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import redis.clients.jedis.Jedis;

/**
* {@link RedisClusterServerCommands} implementation for Jedis.
*
* @author Mark Paluch
* @author Dennis Neufeld
* @author John Blum
* @since 2.0
*/
class JedisClusterServerCommands implements RedisClusterServerCommands {
Expand Down Expand Up @@ -82,7 +86,8 @@ public Long lastSave() {
return null;
}

Collections.sort(result, Collections.reverseOrder());
result.sort(Collections.reverseOrder());

return result.get(0);
}

Expand Down Expand Up @@ -221,20 +226,20 @@ public Properties info(RedisClusterNode node, String section) {
public void shutdown() {
connection.getClusterCommandExecutor().executeCommandOnAllNodes((JedisClusterCommandCallback<String>) jedis -> {
jedis.shutdown();
return null;
return "success";
});
}

@Override
public void shutdown(RedisClusterNode node) {
executeCommandOnSingleNode(jedis -> {
jedis.shutdown();
return null;
return "success";
}, node);
}

@Override
public void shutdown(ShutdownOption option) {
public void shutdown(@Nullable ShutdownOption option) {

if (option == null) {
shutdown();
Expand All @@ -249,32 +254,37 @@ public Properties getConfig(String pattern) {

Assert.notNull(pattern, "Pattern must not be null");

List<NodeResult<List<String>>> mapResult = connection.getClusterCommandExecutor()
.executeCommandOnAllNodes((JedisClusterCommandCallback<List<String>>) client -> client.configGet(pattern))
JedisClusterCommandCallback<Map<String, String>> command = jedis -> jedis.configGet(pattern);

List<NodeResult<Map<String, String>>> nodeResults = connection.getClusterCommandExecutor()
.executeCommandOnAllNodes(command)
.getResults();

List<String> result = new ArrayList<>();
for (NodeResult<List<String>> entry : mapResult) {
Properties nodesConfiguration = new Properties();

for (NodeResult<Map<String, String>> nodeResult : nodeResults) {

String prefix = nodeResult.getNode().asString();

String prefix = entry.getNode().asString();
int i = 0;
for (String value : entry.getValue()) {
result.add((i++ % 2 == 0 ? (prefix + ".") : "") + value);
for (Entry<String, String> entry : nodeResult.getValue().entrySet()) {
String newKey = prefix.concat(".").concat(entry.getKey());
String value = entry.getValue();
nodesConfiguration.setProperty(newKey, value);
}
}

return Converters.toProperties(result);
return nodesConfiguration;
}

@Override
public Properties getConfig(RedisClusterNode node, String pattern) {

Assert.notNull(pattern, "Pattern must not be null");

JedisClusterCommandCallback<Properties> command = client -> Converters.toProperties(client.configGet(pattern));

return connection.getClusterCommandExecutor()
.executeCommandOnSingleNode(
(JedisClusterCommandCallback<Properties>) client -> Converters.toProperties(client.configGet(pattern)),
node)
.executeCommandOnSingleNode(command, node)
.getValue();
}

Expand Down Expand Up @@ -322,19 +332,19 @@ public void rewriteConfig(RedisClusterNode node) {
@Override
public Long time(TimeUnit timeUnit) {

return convertListOfStringToTime(
connection.getClusterCommandExecutor()
.executeCommandOnArbitraryNode((JedisClusterCommandCallback<List<String>>) Jedis::time).getValue(),
timeUnit);
JedisClusterCommandCallback<List<String>> command = Jedis::time;

return convertListOfStringToTime(connection.getClusterCommandExecutor()
.executeCommandOnArbitraryNode(command).getValue(), timeUnit);
}

@Override
public Long time(RedisClusterNode node, TimeUnit timeUnit) {

return convertListOfStringToTime(
connection.getClusterCommandExecutor()
.executeCommandOnSingleNode((JedisClusterCommandCallback<List<String>>) Jedis::time, node).getValue(),
timeUnit);
JedisClusterCommandCallback<List<String>> command = Jedis::time;

return convertListOfStringToTime(connection.getClusterCommandExecutor()
.executeCommandOnSingleNode(command, node).getValue(), timeUnit);
}

@Override
Expand All @@ -343,8 +353,9 @@ public void killClient(String host, int port) {
Assert.hasText(host, "Host for 'CLIENT KILL' must not be 'null' or 'empty'");
String hostAndPort = String.format("%s:%s", host, port);

connection.getClusterCommandExecutor()
.executeCommandOnAllNodes((JedisClusterCommandCallback<String>) client -> client.clientKill(hostAndPort));
JedisClusterCommandCallback<String> command = jedis -> jedis.clientKill(hostAndPort);

connection.getClusterCommandExecutor().executeCommandOnAllNodes(command);
}

@Override
Expand All @@ -360,21 +371,26 @@ public String getClientName() {
@Override
public List<RedisClientInfo> getClientList() {

JedisClusterCommandCallback<String> command = Jedis::clientList;

Collection<String> map = connection.getClusterCommandExecutor()
.executeCommandOnAllNodes((JedisClusterCommandCallback<String>) Jedis::clientList).resultsAsList();
.executeCommandOnAllNodes(command).resultsAsList();

ArrayList<RedisClientInfo> result = new ArrayList<>();

for (String infos : map) {
result.addAll(JedisConverters.toListOfRedisClientInformation(infos));
}

return result;
}

@Override
public List<RedisClientInfo> getClientList(RedisClusterNode node) {

return JedisConverters
.toListOfRedisClientInformation(executeCommandOnSingleNode(Jedis::clientList, node).getValue());
JedisClusterCommandCallback<String> command = Jedis::clientList;

return JedisConverters.toListOfRedisClientInformation(executeCommandOnSingleNode(command, node).getValue());
}

@Override
Expand Down Expand Up @@ -403,8 +419,10 @@ public void migrate(byte[] key, RedisNode target, int dbIndex, @Nullable Migrate

RedisClusterNode node = connection.getTopologyProvider().getTopology().lookup(target.getHost(), target.getPort());

executeCommandOnSingleNode(client -> client.migrate(target.getHost(), target.getPort(), key, dbIndex, timeoutToUse),
node);
JedisClusterCommandCallback<String> command = jedis ->
jedis.migrate(target.getHost(), target.getPort(), key, dbIndex, timeoutToUse);

executeCommandOnSingleNode(command, node);
}

private Long convertListOfStringToTime(List<String> serverTimeInformation, TimeUnit timeUnit) {
Expand All @@ -416,12 +434,11 @@ private Long convertListOfStringToTime(List<String> serverTimeInformation, TimeU
return Converters.toTimeMillis(serverTimeInformation.get(0), serverTimeInformation.get(1), timeUnit);
}

private <T> NodeResult<T> executeCommandOnSingleNode(JedisClusterCommandCallback<T> cmd, RedisClusterNode node) {
return connection.getClusterCommandExecutor().executeCommandOnSingleNode(cmd, node);
private <T> NodeResult<T> executeCommandOnSingleNode(JedisClusterCommandCallback<T> command, RedisClusterNode node) {
return connection.getClusterCommandExecutor().executeCommandOnSingleNode(command, node);
}

private <T> MultiNodeResult<T> executeCommandOnAllNodes(JedisClusterCommandCallback<T> cmd) {
return connection.getClusterCommandExecutor().executeCommandOnAllNodes(cmd);
private <T> MultiNodeResult<T> executeCommandOnAllNodes(JedisClusterCommandCallback<T> command) {
return connection.getClusterCommandExecutor().executeCommandOnAllNodes(command);
}

}
Loading

0 comments on commit cfa00d7

Please sign in to comment.