From 4a4797654681e03386421d50ceb5c1e47357bb32 Mon Sep 17 00:00:00 2001 From: John Blum Date: Thu, 14 Sep 2023 17:36:31 -0700 Subject: [PATCH] Adapt to API changes in the Jedis 5.0 driver. Closes #2612 --- .../jedis/JedisClusterHashCommands.java | 19 +-- .../jedis/JedisClusterServerCommands.java | 93 +++++++----- .../jedis/JedisClusterStreamCommands.java | 134 ++++++++++-------- .../jedis/JedisClusterZSetCommands.java | 125 ++++++++++------ .../connection/jedis/JedisConnection.java | 7 +- .../connection/jedis/JedisConverters.java | 50 ++++--- .../connection/jedis/JedisHashCommands.java | 17 ++- .../redis/connection/jedis/JedisInvoker.java | 31 ++-- .../connection/jedis/JedisZSetCommands.java | 49 ++++--- .../redis/connection/zset/DefaultTuple.java | 58 ++++---- .../data/redis/connection/zset/Tuple.java | 21 +-- .../data/redis/core/types/Expiration.java | 1 - .../jedis/JedisConnectionUnitTests.java | 31 ++-- .../jedis/JedisConvertersUnitTests.java | 127 +++++++++++++---- 14 files changed, 476 insertions(+), 287 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java index 06bbf2a569..f53c6cbbb6 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java @@ -30,13 +30,15 @@ import org.springframework.data.redis.core.ScanCursor; import org.springframework.data.redis.core.ScanIteration; import org.springframework.data.redis.core.ScanOptions; -import org.springframework.data.util.Streamable; import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** + * Cluster {@link RedisHashCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 2.0 */ class JedisClusterHashCommands implements RedisHashCommands { @@ -160,10 +162,10 @@ public Entry hRandFieldWithValues(byte[] key) { Assert.notNull(key, "Key must not be null"); try { - Map map = connection.getCluster().hrandfieldWithValues(key, 1); - return map.isEmpty() ? null : map.entrySet().iterator().next(); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + List> mapEntryList = connection.getCluster().hrandfieldWithValues(key, 1); + return mapEntryList.isEmpty() ? null : mapEntryList.get(0); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -185,10 +187,9 @@ public List hRandField(byte[] key, long count) { public List> hRandFieldWithValues(byte[] key, long count) { try { - Map map = connection.getCluster().hrandfieldWithValues(key, count); - return Streamable.of(() -> map.entrySet().iterator()).toList(); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return connection.getCluster().hrandfieldWithValues(key, count); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java index 43b83bf874..0acc10af59 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java @@ -15,12 +15,11 @@ */ package org.springframework.data.redis.connection.jedis; -import redis.clients.jedis.Jedis; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -38,9 +37,14 @@ import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; +import redis.clients.jedis.Jedis; + /** + * {@link RedisClusterServerCommands} implementation for Jedis. + * * @author Mark Paluch * @author Dennis Neufeld + * @author John Blum * @since 2.0 */ class JedisClusterServerCommands implements RedisClusterServerCommands { @@ -82,7 +86,8 @@ public Long lastSave() { return null; } - Collections.sort(result, Collections.reverseOrder()); + result.sort(Collections.reverseOrder()); + return result.get(0); } @@ -221,7 +226,7 @@ public Properties info(RedisClusterNode node, String section) { public void shutdown() { connection.getClusterCommandExecutor().executeCommandOnAllNodes((JedisClusterCommandCallback) jedis -> { jedis.shutdown(); - return null; + return "success"; }); } @@ -229,12 +234,12 @@ public void shutdown() { public void shutdown(RedisClusterNode node) { executeCommandOnSingleNode(jedis -> { jedis.shutdown(); - return null; + return "success"; }, node); } @Override - public void shutdown(ShutdownOption option) { + public void shutdown(@Nullable ShutdownOption option) { if (option == null) { shutdown(); @@ -249,21 +254,26 @@ public Properties getConfig(String pattern) { Assert.notNull(pattern, "Pattern must not be null"); - List>> mapResult = connection.getClusterCommandExecutor() - .executeCommandOnAllNodes((JedisClusterCommandCallback>) client -> client.configGet(pattern)) + JedisClusterCommandCallback> command = jedis -> jedis.configGet(pattern); + + List>> nodeResults = connection.getClusterCommandExecutor() + .executeCommandOnAllNodes(command) .getResults(); - List result = new ArrayList<>(); - for (NodeResult> entry : mapResult) { + Properties nodesConfiguration = new Properties(); + + for (NodeResult> nodeResult : nodeResults) { + + String prefix = nodeResult.getNode().asString(); - String prefix = entry.getNode().asString(); - int i = 0; - for (String value : entry.getValue()) { - result.add((i++ % 2 == 0 ? (prefix + ".") : "") + value); + for (Entry entry : nodeResult.getValue().entrySet()) { + String newKey = prefix.concat(".").concat(entry.getKey()); + String value = entry.getValue(); + nodesConfiguration.setProperty(newKey, value); } } - return Converters.toProperties(result); + return nodesConfiguration; } @Override @@ -271,10 +281,10 @@ public Properties getConfig(RedisClusterNode node, String pattern) { Assert.notNull(pattern, "Pattern must not be null"); + JedisClusterCommandCallback command = client -> Converters.toProperties(client.configGet(pattern)); + return connection.getClusterCommandExecutor() - .executeCommandOnSingleNode( - (JedisClusterCommandCallback) client -> Converters.toProperties(client.configGet(pattern)), - node) + .executeCommandOnSingleNode(command, node) .getValue(); } @@ -322,19 +332,19 @@ public void rewriteConfig(RedisClusterNode node) { @Override public Long time(TimeUnit timeUnit) { - return convertListOfStringToTime( - connection.getClusterCommandExecutor() - .executeCommandOnArbitraryNode((JedisClusterCommandCallback>) Jedis::time).getValue(), - timeUnit); + JedisClusterCommandCallback> command = Jedis::time; + + return convertListOfStringToTime(connection.getClusterCommandExecutor() + .executeCommandOnArbitraryNode(command).getValue(), timeUnit); } @Override public Long time(RedisClusterNode node, TimeUnit timeUnit) { - return convertListOfStringToTime( - connection.getClusterCommandExecutor() - .executeCommandOnSingleNode((JedisClusterCommandCallback>) Jedis::time, node).getValue(), - timeUnit); + JedisClusterCommandCallback> command = Jedis::time; + + return convertListOfStringToTime(connection.getClusterCommandExecutor() + .executeCommandOnSingleNode(command, node).getValue(), timeUnit); } @Override @@ -343,8 +353,9 @@ public void killClient(String host, int port) { Assert.hasText(host, "Host for 'CLIENT KILL' must not be 'null' or 'empty'"); String hostAndPort = String.format("%s:%s", host, port); - connection.getClusterCommandExecutor() - .executeCommandOnAllNodes((JedisClusterCommandCallback) client -> client.clientKill(hostAndPort)); + JedisClusterCommandCallback command = jedis -> jedis.clientKill(hostAndPort); + + connection.getClusterCommandExecutor().executeCommandOnAllNodes(command); } @Override @@ -360,21 +371,26 @@ public String getClientName() { @Override public List getClientList() { + JedisClusterCommandCallback command = Jedis::clientList; + Collection map = connection.getClusterCommandExecutor() - .executeCommandOnAllNodes((JedisClusterCommandCallback) Jedis::clientList).resultsAsList(); + .executeCommandOnAllNodes(command).resultsAsList(); ArrayList result = new ArrayList<>(); + for (String infos : map) { result.addAll(JedisConverters.toListOfRedisClientInformation(infos)); } + return result; } @Override public List getClientList(RedisClusterNode node) { - return JedisConverters - .toListOfRedisClientInformation(executeCommandOnSingleNode(Jedis::clientList, node).getValue()); + JedisClusterCommandCallback command = Jedis::clientList; + + return JedisConverters.toListOfRedisClientInformation(executeCommandOnSingleNode(command, node).getValue()); } @Override @@ -403,8 +419,10 @@ public void migrate(byte[] key, RedisNode target, int dbIndex, @Nullable Migrate RedisClusterNode node = connection.getTopologyProvider().getTopology().lookup(target.getHost(), target.getPort()); - executeCommandOnSingleNode(client -> client.migrate(target.getHost(), target.getPort(), key, dbIndex, timeoutToUse), - node); + JedisClusterCommandCallback command = jedis -> + jedis.migrate(target.getHost(), target.getPort(), key, dbIndex, timeoutToUse); + + executeCommandOnSingleNode(command, node); } private Long convertListOfStringToTime(List serverTimeInformation, TimeUnit timeUnit) { @@ -416,12 +434,11 @@ private Long convertListOfStringToTime(List serverTimeInformation, TimeU return Converters.toTimeMillis(serverTimeInformation.get(0), serverTimeInformation.get(1), timeUnit); } - private NodeResult executeCommandOnSingleNode(JedisClusterCommandCallback cmd, RedisClusterNode node) { - return connection.getClusterCommandExecutor().executeCommandOnSingleNode(cmd, node); + private NodeResult executeCommandOnSingleNode(JedisClusterCommandCallback command, RedisClusterNode node) { + return connection.getClusterCommandExecutor().executeCommandOnSingleNode(command, node); } - private MultiNodeResult executeCommandOnAllNodes(JedisClusterCommandCallback cmd) { - return connection.getClusterCommandExecutor().executeCommandOnAllNodes(cmd); + private MultiNodeResult executeCommandOnAllNodes(JedisClusterCommandCallback command) { + return connection.getClusterCommandExecutor().executeCommandOnAllNodes(command); } - } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index 11223cb2d5..a73354d61c 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -15,11 +15,11 @@ */ package org.springframework.data.redis.connection.jedis; -import static org.springframework.data.redis.connection.jedis.StreamConverters.*; - import redis.clients.jedis.BuilderFactory; +import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XClaimParams; +import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; @@ -45,7 +45,10 @@ import org.springframework.util.Assert; /** + * {@link RedisStreamCommands} implementation for Jedis. + * * @author Dengliming + * @author John Blum * @since 2.3 */ class JedisClusterStreamCommands implements RedisStreamCommands { @@ -65,9 +68,9 @@ public Long xAck(byte[] key, String group, RecordId... recordIds) { try { return connection.getCluster().xack(key, JedisConverters.toBytes(group), - entryIdsToBytes(Arrays.asList(recordIds))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + StreamConverters.entryIdsToBytes(Arrays.asList(recordIds))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -80,10 +83,10 @@ public RecordId xAdd(MapRecord record, XAddOptions optio XAddParams params = StreamConverters.toXAddParams(record.getId(), options); try { - return RecordId - .of(JedisConverters.toString(connection.getCluster().xadd(record.getStream(), record.getValue(), params))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return RecordId.of(JedisConverters.toString(connection.getCluster() + .xadd(record.getStream(), record.getValue(), params))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -97,17 +100,21 @@ public List xClaimJustId(byte[] key, String group, String newOwner, XC long minIdleTime = options.getMinIdleTime() == null ? -1L : options.getMinIdleTime().toMillis(); XClaimParams xClaimParams = StreamConverters.toXClaimParams(options); + try { List ids = connection.getCluster().xclaimJustId(key, JedisConverters.toBytes(group), - JedisConverters.toBytes(newOwner), minIdleTime, xClaimParams, entryIdsToBytes(options.getIds())); + JedisConverters.toBytes(newOwner), minIdleTime, xClaimParams, + StreamConverters.entryIdsToBytes(options.getIds())); List recordIds = new ArrayList<>(ids.size()); + ids.forEach(it -> recordIds.add(RecordId.of(JedisConverters.toString(it)))); return recordIds; - } catch (Exception ex) { - throw convertJedisAccessException(ex); + + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -121,11 +128,13 @@ public List xClaim(byte[] key, String group, String newOwner, XClaim long minIdleTime = options.getMinIdleTime() == null ? -1L : options.getMinIdleTime().toMillis(); XClaimParams xClaimParams = StreamConverters.toXClaimParams(options); + try { - return convertToByteRecord(key, connection.getCluster().xclaim(key, JedisConverters.toBytes(group), - JedisConverters.toBytes(newOwner), minIdleTime, xClaimParams, entryIdsToBytes(options.getIds()))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return StreamConverters.convertToByteRecord(key, connection.getCluster().xclaim(key, + JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), minIdleTime, xClaimParams, + StreamConverters.entryIdsToBytes(options.getIds()))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -136,9 +145,9 @@ public Long xDel(byte[] key, RecordId... recordIds) { Assert.notNull(recordIds, "recordIds must not be null"); try { - return connection.getCluster().xdel(key, entryIdsToBytes(Arrays.asList(recordIds))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return connection.getCluster().xdel(key, StreamConverters.entryIdsToBytes(Arrays.asList(recordIds))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -157,8 +166,8 @@ public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, try { return connection.getCluster().xgroupCreate(key, JedisConverters.toBytes(groupName), JedisConverters.toBytes(readOffset.getOffset()), mkStream); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -171,8 +180,8 @@ public Boolean xGroupDelConsumer(byte[] key, Consumer consumer) { try { return connection.getCluster().xgroupDelConsumer(key, JedisConverters.toBytes(consumer.getGroup()), JedisConverters.toBytes(consumer.getName())) != 0L; - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -184,8 +193,8 @@ public Boolean xGroupDestroy(byte[] key, String groupName) { try { return connection.getCluster().xgroupDestroy(key, JedisConverters.toBytes(groupName)) != 0L; - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -196,8 +205,8 @@ public StreamInfo.XInfoStream xInfo(byte[] key) { try { return StreamInfo.XInfoStream.fromList((List) connection.getCluster().xinfoStream(key)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -208,8 +217,8 @@ public StreamInfo.XInfoGroups xInfoGroups(byte[] key) { try { return StreamInfo.XInfoGroups.fromList(connection.getCluster().xinfoGroups(key)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -222,8 +231,8 @@ public StreamInfo.XInfoConsumers xInfoConsumers(byte[] key, String groupName) { try { return StreamInfo.XInfoConsumers.fromList(groupName, connection.getCluster().xinfoConsumers(key, JedisConverters.toBytes(groupName))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -234,8 +243,8 @@ public Long xLen(byte[] key) { try { return connection.getCluster().xlen(key); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -252,8 +261,8 @@ public PendingMessagesSummary xPending(byte[] key, String groupName) { Object response = connection.getCluster().xpending(key, group); return StreamConverters.toPendingMessagesSummary(groupName, response); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -269,14 +278,20 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op try { - List response = connection.getCluster().xpending(key, group, - JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)), - options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName())); + XPendingParams pendingParams = new XPendingParams(); + + pendingParams.start(new StreamEntryID(JedisConverters.toBytes(StreamConverters.getLowerValue(range)))); + pendingParams.end(new StreamEntryID(JedisConverters.toBytes(StreamConverters.getUpperValue(range)))); + pendingParams.count(options.getCount().intValue()); + pendingParams.consumer(options.getConsumerName()); + + List response = connection.getCluster().xpending(key, group, pendingParams); return StreamConverters.toPendingMessages(groupName, range, BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(response)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -290,10 +305,11 @@ public List xRange(byte[] key, Range range, Limit limit) { int count = limit.isUnlimited() ? Integer.MAX_VALUE : limit.getCount(); try { - return convertToByteRecord(key, connection.getCluster().xrange(key, JedisConverters.toBytes(getLowerValue(range)), - JedisConverters.toBytes(getUpperValue(range)), count)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return StreamConverters.convertToByteRecord(key, connection.getCluster().xrange(key, + JedisConverters.toBytes(StreamConverters.getLowerValue(range)), + JedisConverters.toBytes(StreamConverters.getUpperValue(range)), count)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -307,15 +323,15 @@ public List xRead(StreamReadOptions readOptions, StreamOffset xread = connection.getCluster().xread(xReadParams, toStreamOffsets(streams)); + List xread = connection.getCluster().xread(xReadParams, StreamConverters.toStreamOffsets(streams)); if (xread == null) { return Collections.emptyList(); } return StreamConverters.convertToByteRecords(xread); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -332,15 +348,15 @@ public List xReadGroup(Consumer consumer, StreamReadOptions readOpti try { List xread = connection.getCluster().xreadGroup(JedisConverters.toBytes(consumer.getGroup()), - JedisConverters.toBytes(consumer.getName()), xReadParams, toStreamOffsets(streams)); + JedisConverters.toBytes(consumer.getName()), xReadParams, StreamConverters.toStreamOffsets(streams)); if (xread == null) { return Collections.emptyList(); } return StreamConverters.convertToByteRecords(xread); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -354,10 +370,11 @@ public List xRevRange(byte[] key, Range range, Limit limit) int count = limit.isUnlimited() ? Integer.MAX_VALUE : limit.getCount(); try { - return convertToByteRecord(key, connection.getCluster().xrevrange(key, - JedisConverters.toBytes(getUpperValue(range)), JedisConverters.toBytes(getLowerValue(range)), count)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return StreamConverters.convertToByteRecord(key, connection.getCluster().xrevrange(key, + JedisConverters.toBytes(StreamConverters.getUpperValue(range)), + JedisConverters.toBytes(StreamConverters.getLowerValue(range)), count)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -373,13 +390,12 @@ public Long xTrim(byte[] key, long count, boolean approximateTrimming) { try { return connection.getCluster().xtrim(key, count, approximateTrimming); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } - private DataAccessException convertJedisAccessException(Exception ex) { - return connection.convertJedisAccessException(ex); + private DataAccessException convertJedisAccessException(Exception cause) { + return connection.convertJedisAccessException(cause); } - } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java index 4d0923e43d..0784e9b834 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java @@ -20,8 +20,10 @@ import redis.clients.jedis.params.ZParams; import redis.clients.jedis.params.ZRangeParams; import redis.clients.jedis.resps.ScanResult; +import redis.clients.jedis.util.KeyValue; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -34,7 +36,6 @@ import org.springframework.data.redis.connection.RedisZSetCommands; import org.springframework.data.redis.connection.convert.SetConverter; import org.springframework.data.redis.connection.zset.Aggregate; -import org.springframework.data.redis.connection.zset.DefaultTuple; import org.springframework.data.redis.connection.zset.Tuple; import org.springframework.data.redis.connection.zset.Weights; import org.springframework.data.redis.core.Cursor; @@ -46,6 +47,8 @@ import org.springframework.util.Assert; /** + * Cluster {@link RedisZSetCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch * @author Clement Ong @@ -349,8 +352,8 @@ public Tuple bZPopMin(byte[] key, long timeout, TimeUnit unit) { try { return toTuple(connection.getCluster().bzpopmin(JedisConverters.toSeconds(timeout, unit), key)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -810,17 +813,23 @@ public Long zRemRangeByScore(byte[] key, double min, double max) { } } + // TODO: Do we want to preserve order now that JedisCluster.zdiff(..) returns a List + // and add zDiffAsList(..) to the public interface? @Override public Set zDiff(byte[]... sets) { + return new HashSet<>(zDiffAsList(sets)); + } + + @Nullable + List zDiffAsList(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { return connection.getCluster().zdiff(sets); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -829,15 +838,21 @@ public Set zDiff(byte[]... sets) { @Override public Set zDiffWithScores(byte[]... sets) { + return new HashSet<>(zDiffWithScoresAsList(sets)); + } + + // TODO: Do we want to preserve order now that JedisCluster.zdiffWithScores(..) returns a List + // and add zDiffWithScoresAsList(..) to the public interface? + List zDiffWithScoresAsList(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters.toTupleSet(connection.getCluster().zdiffWithScores(sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toTupleList(connection.getCluster().zdiffWithScores(sets)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -866,6 +881,12 @@ public Long zDiffStore(byte[] destKey, byte[]... sets) { @Override public Set zInter(byte[]... sets) { + return new HashSet<>(zInterAsList(sets)); + } + + // TODO: Do we want to preserve order now that JedisCluster.zinter(..) returns a List + // and add zInterAsList(..) to the public interface? + List zInterAsList(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); @@ -873,8 +894,8 @@ public Set zInter(byte[]... sets) { try { return connection.getCluster().zinter(new ZParams(), sets); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -883,15 +904,21 @@ public Set zInter(byte[]... sets) { @Override public Set zInterWithScores(byte[]... sets) { + return new HashSet<>(zInterWithScoresAsList(sets)); + } + + // TODO: Do we want to preserve order now that JedisCluster.zinterWithScores(..) returns a List + // and add zInterWithScoresAsList(..) to the public interface? + List zInterWithScoresAsList(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters.toTupleSet(connection.getCluster().zinterWithScores(new ZParams(), sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toTupleList(connection.getCluster().zinterWithScores(new ZParams(), sets)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -900,6 +927,12 @@ public Set zInterWithScores(byte[]... sets) { @Override public Set zInterWithScores(Aggregate aggregate, Weights weights, byte[]... sets) { + return new HashSet<>(zInterWithScoresAsList(aggregate, weights, sets)); + } + + // TODO: Do we want to preserve order now that JedisCluster.zinterWithScores(..) returns a List + // and add zInterWithScoresAsList(..) to the public interface? + List zInterWithScoresAsList(Aggregate aggregate, Weights weights, byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); Assert.noNullElements(sets, "Source sets must not contain null elements"); @@ -907,12 +940,11 @@ public Set zInterWithScores(Aggregate aggregate, Weights weights, byte[]. .format("The number of weights (%d) must match the number of source sets (%d)", weights.size(), sets.length)); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return JedisConverters - .toTupleSet(connection.getCluster().zinterWithScores(toZParams(aggregate, weights), sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toTupleList(connection.getCluster() + .zinterWithScores(toZParams(aggregate, weights), sets)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -965,15 +997,20 @@ public Long zInterStore(byte[] destKey, Aggregate aggregate, Weights weights, by @Override public Set zUnion(byte[]... sets) { + return new HashSet<>(zUnionAsList(sets)); + } + + // TODO: Do we want to preserve order now that JedisCluster.zunion(..) returns a List + // and add zUnionAsList(..) to the public interface? + List zUnionAsList(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { return connection.getCluster().zunion(new ZParams(), sets); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -982,15 +1019,20 @@ public Set zUnion(byte[]... sets) { @Override public Set zUnionWithScores(byte[]... sets) { + return new HashSet<>(zUnionWithScoresAsList(sets)); + } + + // TODO: Do we want to preserve order now that JedisCluster.zunionWithScores(..) returns a List + // and add zUnionWithScoresAsList(..) to the public interface? + List zUnionWithScoresAsList(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return JedisConverters.toTupleSet(connection.getCluster().zunionWithScores(new ZParams(), sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toTupleList(connection.getCluster().zunionWithScores(new ZParams(), sets)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -999,6 +1041,12 @@ public Set zUnionWithScores(byte[]... sets) { @Override public Set zUnionWithScores(Aggregate aggregate, Weights weights, byte[]... sets) { + return new HashSet<>(zUnionWithScoresAsList(aggregate, weights, sets)); + } + + // TODO: Do we want to preserve order now that JedisCluster.zunionWithScores(..) returns a List + // and add zUnionWithScoresAsList(..) to the public interface? + List zUnionWithScoresAsList(Aggregate aggregate, Weights weights, byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); Assert.noNullElements(sets, "Source sets must not contain null elements"); @@ -1008,10 +1056,10 @@ public Set zUnionWithScores(Aggregate aggregate, Weights weights, byte[]. if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters - .toTupleSet(connection.getCluster().zunionWithScores(toZParams(aggregate, weights), sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toTupleList(connection.getCluster() + .zunionWithScores(toZParams(aggregate, weights), sets)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -1126,21 +1174,14 @@ private static ZParams toZParams(Aggregate aggregate, Weights weights) { return new ZParams().weights(weights.toArray()).aggregate(ZParams.Aggregate.valueOf(aggregate.name())); } - /** - * Workaround for broken Jedis BZPOP signature. - * - * @param bytes - * @return - */ @Nullable - @SuppressWarnings("unchecked") - private static Tuple toTuple(@Nullable List bytes) { + private static Tuple toTuple(@Nullable KeyValue keyValue) { - if (bytes == null || bytes.isEmpty()) { - return null; + if (keyValue != null) { + redis.clients.jedis.resps.Tuple tuple = keyValue.getValue(); + return tuple != null ? JedisConverters.toTuple(tuple) : null; } - return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2)))); + return null; } - } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index eaa1b5ba4e..a474bbb8fe 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -59,9 +59,6 @@ import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.CommandArguments; import redis.clients.jedis.CommandObject; @@ -130,6 +127,7 @@ public class JedisConnection extends AbstractRedisConnection { private final Log LOGGER = LogFactory.getLog(getClass()); + @SuppressWarnings("rawtypes") private List pipelinedResults = new ArrayList<>(); private final @Nullable Pool pool; @@ -348,7 +346,6 @@ public void close() throws DataAccessException { jedis.close(); } else { - doExceptionThrowingOperationSafely(jedis::quit, "Failed to quit during close"); doExceptionThrowingOperationSafely(jedis::disconnect, "Failed to disconnect during close"); } } @@ -480,6 +477,7 @@ public void discard() { public List exec() { try { + if (transaction == null) { throw new InvalidDataAccessApiUsageException("No ongoing transaction; Did you forget to call multi"); } @@ -489,6 +487,7 @@ public List exec() { return !CollectionUtils.isEmpty(results) ? new TransactionResultConverter<>(txResults, JedisExceptionConverter.INSTANCE).convert(results) : results; + } catch (Exception cause) { throw convertJedisAccessException(cause); } finally { diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java index 88b8a5eb4d..12928a0fac 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -114,12 +115,22 @@ abstract class JedisConverters extends Converters { MINUS_BYTES = toBytes("-"); POSITIVE_INFINITY_BYTES = toBytes("+inf"); NEGATIVE_INFINITY_BYTES = toBytes("-inf"); + + } + + @Nullable + static Set toSet(@Nullable List list) { + return list != null ? new HashSet<>(list) : null; } - public static Converter stringToBytes() { + static Converter stringToBytes() { return JedisConverters::toBytes; } + static ListConverter stringListToByteList() { + return new ListConverter<>(stringToBytes()); + } + /** * {@link ListConverter} converting jedis {@link redis.clients.jedis.resps.Tuple} to {@link Tuple}. * @@ -129,16 +140,16 @@ static ListConverter tuplesToTuples() { return new ListConverter<>(JedisConverters::toTuple); } - static ListConverter stringListToByteList() { - return new ListConverter<>(stringToBytes()); + static Tuple toTuple(redis.clients.jedis.resps.Tuple source) { + return new DefaultTuple(source.getBinaryElement(), source.getScore()); } - static Set toTupleSet(Set source) { - return new SetConverter<>(JedisConverters::toTuple).convert(source); + static List toTupleList(List source) { + return tuplesToTuples().convert(source); } - public static Tuple toTuple(redis.clients.jedis.resps.Tuple source) { - return new DefaultTuple(source.getBinaryElement(), source.getScore()); + static Set toTupleSet(Set source) { + return new SetConverter<>(JedisConverters::toTuple).convert(source); } /** @@ -255,6 +266,7 @@ public static byte[][] toByteArrays(Map source) { public static SortingParams toSortingParams(@Nullable SortParameters params) { SortingParams jedisParams = null; + if (params != null) { jedisParams = new SortingParams(); byte[] byPattern = params.getByPattern(); @@ -278,6 +290,7 @@ public static SortingParams toSortingParams(@Nullable SortParameters params) { jedisParams.alpha(); } } + return jedisParams; } @@ -386,8 +399,10 @@ public static SetParams toSetCommandExPxArgument(Expiration expiration, SetParam * @since 2.6 */ static GetExParams toGetExParams(Expiration expiration) { + return toGetExParams(expiration, new GetExParams()); + } - GetExParams params = new GetExParams(); + static GetExParams toGetExParams(Expiration expiration, GetExParams params) { if (expiration.isPersistent()) { return params.persist(); @@ -584,18 +599,14 @@ static ZAddParams toZAddParams(ZAddArgs source) { return new ZAddParams(); } - ZAddParams target = new ZAddParams() { - - { - if (source.contains(ZAddArgs.Flag.GT)) { - addParam("gt"); - } - if (source.contains(ZAddArgs.Flag.LT)) { - addParam("lt"); - } - } - }; + ZAddParams target = new ZAddParams(); + if (source.contains(ZAddArgs.Flag.GT)) { + target.gt(); + } + if (source.contains(ZAddArgs.Flag.LT)) { + target.lt(); + } if (source.contains(ZAddArgs.Flag.XX)) { target.xx(); } @@ -605,6 +616,7 @@ static ZAddParams toZAddParams(ZAddArgs source) { if (source.contains(ZAddArgs.Flag.CH)) { target.ch(); } + return target; } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java index 4efaf0c85c..419d19d3f8 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java @@ -37,8 +37,11 @@ import org.springframework.util.Assert; /** + * {@link RedisHashCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 2.0 */ class JedisHashCommands implements RedisHashCommands { @@ -122,7 +125,7 @@ public Entry hRandFieldWithValues(byte[] key) { Assert.notNull(key, "Key must not be null"); return connection.invoke().from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, 1L) - .get(it -> it.isEmpty() ? null : it.entrySet().iterator().next()); + .get(mapEntryList -> mapEntryList.isEmpty() ? null : mapEntryList.get(0)); } @Nullable @@ -141,12 +144,16 @@ public List> hRandFieldWithValues(byte[] key, long count) Assert.notNull(key, "Key must not be null"); return connection.invoke() - .from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count).get(it -> { + .from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count) + .get(mapEntryList -> { + + List> convertedMapEntryList = new ArrayList<>(mapEntryList.size()); + + mapEntryList.forEach(entry -> + convertedMapEntryList.add(Converters.entryOf(entry.getKey(), entry.getValue()))); - List> entries = new ArrayList<>(it.size()); - it.forEach((k, v) -> entries.add(Converters.entryOf(k, v))); + return convertedMapEntryList; - return entries; }); } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java index 805efabe4b..ef1771969d 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java @@ -17,7 +17,6 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Queable; import redis.clients.jedis.Response; import redis.clients.jedis.Transaction; import redis.clients.jedis.commands.DatabasePipelineCommands; @@ -47,7 +46,7 @@ * composing a functional pipeline to transform the result using a {@link Converter}. *

* Usage example: - * + *

*

  * JedisInvoker invoker = …;
  *
@@ -62,6 +61,7 @@
  *
  * @author Mark Paluch
  * @author Christoph Strobl
+ * @author John Blum
  * @since 2.5
  */
 class JedisInvoker {
@@ -937,10 +937,7 @@ static class DefaultSingleInvocationSpec implements SingleInvocationSpec {
 
 		@Override
 		public  T get(Converter converter) {
-
-			Assert.notNull(converter, "Converter must not be null");
-
-			return synchronizer.invoke(parentFunction, parentPipelineFunction, converter, () -> null);
+			return getOrElse(converter, () -> null);
 		}
 
 		@Nullable
@@ -959,6 +956,7 @@ static class DefaultManyInvocationSpec implements ManyInvocationSpec {
 		private final Function>> parentPipelineFunction;
 		private final Synchronizer synchronizer;
 
+		@SuppressWarnings({ "rawtypes", "unchecked" })
 		DefaultManyInvocationSpec(Function> parentFunction,
 				Function>> parentPipelineFunction,
 				Synchronizer synchronizer) {
@@ -969,6 +967,7 @@ static class DefaultManyInvocationSpec implements ManyInvocationSpec {
 		}
 
 		@Override
+		@SuppressWarnings("all")
 		public  List toList(Converter converter) {
 
 			Assert.notNull(converter, "Converter must not be null");
@@ -981,15 +980,17 @@ public  List toList(Converter converter) {
 
 				List result = new ArrayList<>(source.size());
 
-				for (S s : source) {
-					result.add(converter.convert(s));
+				for (S element : source) {
+					result.add(converter.convert(element));
 				}
 
 				return result;
+
 			}, Collections::emptyList);
 		}
 
 		@Override
+		@SuppressWarnings("all")
 		public  Set toSet(Converter converter) {
 
 			Assert.notNull(converter, "Converter must not be null");
@@ -1002,11 +1003,12 @@ public  Set toSet(Converter converter) {
 
 				Set result = new LinkedHashSet<>(source.size());
 
-				for (S s : source) {
-					result.add(converter.convert(s));
+				for (S element : source) {
+					result.add(converter.convert(element));
 				}
 
 				return result;
+
 			}, Collections::emptySet);
 		}
 	}
@@ -1020,6 +1022,7 @@ interface Synchronizer {
 		@Nullable
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		default  T invoke(Function callFunction, Function> pipelineFunction) {
+
 			return (T) doInvoke((Function) callFunction, (Function) pipelineFunction, Converters.identityConverter(),
 					() -> null);
 		}
@@ -1046,15 +1049,13 @@ interface ResponseCommands extends PipelineBinaryCommands, DatabasePipelineComma
 	/**
 	 * Create a proxy to invoke methods dynamically on {@link Pipeline} or {@link Transaction} as those share many
 	 * commands that are not defined on a common super-type.
-	 *
-	 * @param pipelineOrTransaction
-	 * @return
 	 */
-	static ResponseCommands createCommands(Queable pipelineOrTransaction) {
+	static ResponseCommands createCommands(Object pipelineOrTransaction) {
 
 		ProxyFactory proxyFactory = new ProxyFactory(pipelineOrTransaction);
+
 		proxyFactory.addInterface(ResponseCommands.class);
+
 		return (ResponseCommands) proxyFactory.getProxy(JedisInvoker.class.getClassLoader());
 	}
-
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
index 1aab87de0a..9eae7fbad3 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
@@ -22,6 +22,7 @@
 import redis.clients.jedis.params.ZParams;
 import redis.clients.jedis.params.ZRangeParams;
 import redis.clients.jedis.resps.ScanResult;
+import redis.clients.jedis.util.KeyValue;
 
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -31,7 +32,6 @@
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.data.redis.connection.RedisZSetCommands;
 import org.springframework.data.redis.connection.zset.Aggregate;
-import org.springframework.data.redis.connection.zset.DefaultTuple;
 import org.springframework.data.redis.connection.zset.Tuple;
 import org.springframework.data.redis.connection.zset.Weights;
 import org.springframework.data.redis.core.Cursor;
@@ -42,11 +42,14 @@
 import org.springframework.util.Assert;
 
 /**
+ * {@link RedisZSetCommands} implementation for Jedis.
+ *
  * @author Christoph Strobl
  * @author Clement Ong
  * @author Mark Paluch
  * @author Andrey Shlykov
  * @author Shyngys Sapraliyev
+ * @author John Blum
  * @since 2.0
  */
 class JedisZSetCommands implements RedisZSetCommands {
@@ -74,8 +77,10 @@ public Long zAdd(byte[] key, Set tuples, ZAddArgs args) {
 		Assert.notNull(key, "Key must not be null");
 		Assert.notNull(tuples, "Tuples must not be null");
 
-		return connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key, JedisConverters.toTupleMap(tuples),
-				JedisConverters.toZAddParams(args));
+		Long count = connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key,
+				JedisConverters.toTupleMap(tuples), JedisConverters.toZAddParams(args));
+
+		return count != null ? count : 0L;
 	}
 
 	@Override
@@ -421,6 +426,13 @@ public Long zRemRangeByScore(byte[] key, org.springframework.data.domain.Range zDiff(byte[]... sets) {
+		return JedisConverters.toSet(zDiffAsList(sets));
+	}
+
+	// TODO: Do we want to preserve the order now that Jedis.zdiff(..) returns a List
+	//  and add zDiffAsList(..) to the public interface?
+	@Nullable
+	List zDiffAsList(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
@@ -447,6 +459,13 @@ public Long zDiffStore(byte[] destKey, byte[]... sets) {
 
 	@Override
 	public Set zInter(byte[]... sets) {
+		return JedisConverters.toSet(zInterAsList(sets));
+	}
+
+	// TODO: Do we want to preserve the order now that Jedis.zinter(..) returns a List,
+	//  and add zInterAsList(byte[]... sets) to the public interface?
+	@Nullable
+	List zInterAsList(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
@@ -501,6 +520,13 @@ public Long zInterStore(byte[] destKey, byte[]... sets) {
 
 	@Override
 	public Set zUnion(byte[]... sets) {
+		return JedisConverters.toSet(zUnionAsList(sets));
+	}
+
+	// TODO: Should we preserver order now that Jedis.zunion(..) returns a List
+	//  and add zUnionAsList(..) to the public interface?
+	@Nullable
+	List zUnionAsList(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
@@ -772,21 +798,8 @@ static ZRangeParams toZRangeParams(Protocol.Keyword by, byte[] min, byte[] max,
 		return zRangeParams;
 	}
 
-	/**
-	 * Workaround for broken Jedis BZPOP signature.
-	 *
-	 * @param bytes
-	 * @return
-	 */
 	@Nullable
-	@SuppressWarnings("unchecked")
-	private static Tuple toTuple(List bytes) {
-
-		if (bytes.isEmpty()) {
-			return null;
-		}
-
-		return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2))));
+	private static Tuple toTuple(@Nullable KeyValue keyValue) {
+		return keyValue != null ? JedisConverters.toTuple(keyValue.getValue()) : null;
 	}
-
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java b/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
index eb2c198e8e..34e8e88454 100644
--- a/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
+++ b/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
@@ -18,23 +18,27 @@
 import java.util.Arrays;
 
 import org.springframework.lang.Nullable;
+import org.springframework.util.ObjectUtils;
 
 /**
  * Default implementation for {@link Tuple} interface.
  *
  * @author Costin Leau
  * @author Christoph Strobl
+ * @author John Blum
  */
 public class DefaultTuple implements Tuple {
 
+	private static final Double ZERO = 0.0d;
+
 	private final Double score;
 	private final byte[] value;
 
 	/**
-	 * Constructs a new DefaultTuple instance.
+	 * Constructs a new {@link DefaultTuple}.
 	 *
-	 * @param value
-	 * @param score
+	 * @param value {@link byte[]} of the member's raw value.
+	 * @param score {@link Double score} of the raw value used in sorting.
 	 */
 	public DefaultTuple(byte[] value, Double score) {
 
@@ -43,29 +47,25 @@ public DefaultTuple(byte[] value, Double score) {
 	}
 
 	public Double getScore() {
-		return score;
+		return this.score;
 	}
 
 	public byte[] getValue() {
-		return value;
+		return this.value;
 	}
 
 	public boolean equals(@Nullable Object obj) {
-		if (this == obj)
+
+		if (this == obj) {
 			return true;
-		if (obj == null)
-			return false;
-		if (!(obj instanceof DefaultTuple))
-			return false;
-		DefaultTuple other = (DefaultTuple) obj;
-		if (score == null) {
-			if (other.score != null)
-				return false;
-		} else if (!score.equals(other.score))
-			return false;
-		if (!Arrays.equals(value, other.value))
+		}
+
+		if (!(obj instanceof DefaultTuple that)) {
 			return false;
-		return true;
+		}
+
+		return ObjectUtils.nullSafeEquals(this.score, that.score)
+			&& Arrays.equals(this.value, that.value);
 	}
 
 	public int hashCode() {
@@ -76,19 +76,21 @@ public int hashCode() {
 		return result;
 	}
 
-	public int compareTo(Double o) {
-		Double d = (score == null ? Double.valueOf(0.0d) : score);
-		Double a = (o == null ? Double.valueOf(0.0d) : o);
-		return d.compareTo(a);
+	public int compareTo(Double value) {
+
+		Double ourScore = getScore();
+		Double thisScore = ourScore != null ? ourScore : ZERO;
+		Double thatScore = value != null ? value : ZERO;
+
+		return thisScore.compareTo(thatScore);
 	}
 
 	@Override
 	public String toString() {
-		StringBuffer sb = new StringBuffer();
-		sb.append(getClass().getSimpleName());
-		sb.append(" [score=").append(score);
-		sb.append(", value=").append(value == null ? "null" : new String(value));
-		sb.append(']');
-		return sb.toString();
+
+		return getClass().getSimpleName()
+			+ " { score=" + getScore()
+			+ ", value=" + Arrays.toString(getValue())
+			+ " }";
 	}
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/zset/Tuple.java b/src/main/java/org/springframework/data/redis/connection/zset/Tuple.java
index 19c05a3385..4f35b6c25f 100644
--- a/src/main/java/org/springframework/data/redis/connection/zset/Tuple.java
+++ b/src/main/java/org/springframework/data/redis/connection/zset/Tuple.java
@@ -20,6 +20,17 @@
  */
 public interface Tuple extends Comparable {
 
+	/**
+	 * Create a new {@link Tuple} with the {@link byte[] raw value} and given {@link Double score}.
+	 *
+	 * @param value {@link byte[]} of the member's raw value.
+	 * @param score {@link Double score} given to the {@code value} used in sorting.
+	 * @return a {@link Tuple} capturing the {@link byte[] value} with its {@link Double score}.
+	 */
+	static Tuple of(byte[] value, Double score) {
+		return new DefaultTuple(value, score);
+	}
+
 	/**
 	 * @return the raw value of the member.
 	 */
@@ -30,14 +41,4 @@ public interface Tuple extends Comparable {
 	 */
 	Double getScore();
 
-	/**
-	 * Create a new {@link Tuple}.
-	 *
-	 * @param value
-	 * @param score
-	 * @return
-	 */
-	static Tuple of(byte[] value, Double score) {
-		return new DefaultTuple(value, score);
-	}
 }
diff --git a/src/main/java/org/springframework/data/redis/core/types/Expiration.java b/src/main/java/org/springframework/data/redis/core/types/Expiration.java
index c5df3ae8f7..d24a60fc89 100644
--- a/src/main/java/org/springframework/data/redis/core/types/Expiration.java
+++ b/src/main/java/org/springframework/data/redis/core/types/Expiration.java
@@ -16,7 +16,6 @@
 package org.springframework.data.redis.core.types;
 
 import java.time.Duration;
-import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import org.springframework.lang.Nullable;
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
index 2a39274473..51edd0ded0 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
@@ -25,7 +25,6 @@
 import redis.clients.jedis.params.ScanParams;
 import redis.clients.jedis.resps.ScanResult;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.Map.Entry;
 
@@ -43,7 +42,10 @@
 import org.springframework.data.redis.core.ScanOptions;
 
 /**
+ * Unit tests for {@link JedisConnection}.
+ *
  * @author Christoph Strobl
+ * @author John Blum
  */
 class JedisConnectionUnitTests {
 
@@ -165,11 +167,11 @@ public void scanShouldKeepTheConnectionOpen() {
 
 			connection.scan(ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531, GH-2006
-		public void scanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException {
+		public void scanShouldCloseTheConnectionWhenCursorIsClosed() {
 
 			doReturn(new ScanResult<>("0", Collections. emptyList())).when(jedisSpy).scan(any(byte[].class),
 					any(ScanParams.class));
@@ -177,7 +179,7 @@ public void scanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.scan(ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -188,11 +190,11 @@ public void sScanShouldKeepTheConnectionOpen() {
 
 			connection.sScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
-		public void sScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException {
+		public void sScanShouldCloseTheConnectionWhenCursorIsClosed() {
 
 			doReturn(new ScanResult<>("0", Collections. emptyList())).when(jedisSpy).sscan(any(byte[].class),
 					any(byte[].class), any(ScanParams.class));
@@ -200,7 +202,7 @@ public void sScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.sScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -211,11 +213,11 @@ public void zScanShouldKeepTheConnectionOpen() {
 
 			connection.zScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
-		public void zScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException {
+		public void zScanShouldCloseTheConnectionWhenCursorIsClosed() {
 
 			doReturn(new ScanResult<>("0", Collections. emptyList())).when(jedisSpy).zscan(any(byte[].class),
 					any(byte[].class), any(ScanParams.class));
@@ -223,7 +225,7 @@ public void zScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.zScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -234,19 +236,20 @@ public void hScanShouldKeepTheConnectionOpen() {
 
 			connection.hScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
-		public void hScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException {
+		public void hScanShouldCloseTheConnectionWhenCursorIsClosed() {
 
 			doReturn(new ScanResult<>("0", Collections. emptyList())).when(jedisSpy).hscan(any(byte[].class),
 					any(byte[].class), any(ScanParams.class));
 
 			Cursor> cursor = connection.hScan("foo".getBytes(), ScanOptions.NONE);
+
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-714
@@ -369,7 +372,5 @@ public void hScanShouldCloseTheConnectionWhenCursorIsClosed() {
 			assertThatExceptionOfType(InvalidDataAccessApiUsageException.class)
 					.isThrownBy(() -> super.hScanShouldCloseTheConnectionWhenCursorIsClosed());
 		}
-
 	}
-
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
index eb37cf171d..7cbc1b6e92 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
@@ -16,7 +16,15 @@
 package org.springframework.data.redis.connection.jedis;
 
 import static org.assertj.core.api.Assertions.*;
-
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import redis.clients.jedis.Protocol;
 import redis.clients.jedis.params.GetExParams;
 import redis.clients.jedis.params.SetParams;
 
@@ -35,12 +43,15 @@
 import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
 import org.springframework.data.redis.core.types.Expiration;
 import org.springframework.data.redis.core.types.RedisClientInfo;
+import org.springframework.lang.Nullable;
+import org.springframework.test.util.ReflectionTestUtils;
 
 /**
  * Unit tests for {@link JedisConverters}.
  *
  * @author Christoph Strobl
  * @author Mark Paluch
+ * @author John Blum
  */
 class JedisConvertersUnitTests {
 
@@ -212,15 +223,31 @@ void toSetCommandExPxOptionShouldReturnEXforMilliseconds() {
 	@Test // GH-2050
 	void convertsExpirationToSetPXAT() {
 
-		assertThat(JedisConverters.toSetCommandExPxArgument(Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS)))
-				.extracting(SetParams::toString).isEqualTo(SetParams.setParams().pxAt(10).toString());
+		SetParams mockSetParams = mock(SetParams.class);
+
+		doReturn(mockSetParams).when(mockSetParams).pxAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS);
+
+		assertThat(JedisConverters.toSetCommandExPxArgument(expiration, mockSetParams)).isNotNull();
+
+		verify(mockSetParams, times(1)).pxAt(eq(10L));
+		verifyNoMoreInteractions(mockSetParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToSetEXAT() {
 
-		assertThat(JedisConverters.toSetCommandExPxArgument(Expiration.unixTimestamp(1, TimeUnit.MINUTES)))
-				.extracting(SetParams::toString).isEqualTo(SetParams.setParams().exAt(60).toString());
+		SetParams mockSetParams = mock(SetParams.class);
+
+		doReturn(mockSetParams).when(mockSetParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toSetCommandExPxArgument(expiration, mockSetParams)).isNotNull();
+
+		verify(mockSetParams, times(1)).exAt(eq(60L));
+		verifyNoMoreInteractions(mockSetParams);
 	}
 
 	@Test // DATAREDIS-316, DATAREDIS-749
@@ -241,43 +268,91 @@ void toSetCommandNxXxOptionShouldReturnEmptyArrayforUpsert() {
 	@Test // GH-2050
 	void convertsExpirationToGetExEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.seconds(10))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().ex(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).ex(anyLong());
+
+		Expiration expiration = Expiration.seconds(10);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).ex(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationWithTimeUnitToGetExEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.from(1, TimeUnit.MINUTES))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().ex(60).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).ex(anyLong());
+
+		Expiration expiration = Expiration.from(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).ex(eq(60L)); // seconds
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExPEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.milliseconds(10))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().px(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).px(anyLong());
+
+		Expiration expiration = Expiration.milliseconds(10L);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).px(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExEXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(10, TimeUnit.SECONDS)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().exAt(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.SECONDS);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).exAt(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationWithTimeUnitToGetExEXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(1, TimeUnit.MINUTES)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().exAt(60).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).exAt(eq(60L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExPXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().pxAt(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).pxAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).pxAt(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	private void verifyRedisServerInfo(RedisServer server, Map values) {
@@ -289,18 +364,22 @@ private void verifyRedisServerInfo(RedisServer server, Map value
 
 	private static String toString(SetParams setParams) {
 
-		StringBuilder builder = new StringBuilder();
+		StringBuilder stringBuilder = new StringBuilder();
 
-		for (byte[] parameter : setParams.getByteParams()) {
+		stringBuilder.append(toString((Protocol.Keyword) ReflectionTestUtils.getField(setParams, "existance")));
+		stringBuilder.append(toString((Protocol.Keyword) ReflectionTestUtils.getField(setParams, "expiration")));
 
-			if (builder.length() != 0) {
-				builder.append(' ');
-			}
+		Long expirationValue = (Long) ReflectionTestUtils.getField(setParams, "expirationValue");
 
-			builder.append(new String(parameter));
+		if (expirationValue != null) {
+			stringBuilder.append(" ").append(expirationValue);
 		}
 
-		return builder.toString();
+		return stringBuilder.toString().trim();
+	}
+
+	private static String toString(@Nullable Enum value) {
+		return value != null ? value.name().toLowerCase() : "";
 	}
 
 	private Map getRedisServerInfoMap(String name, int port) {