From 461456b6f682833ec81bf32df754e106c44f23e0 Mon Sep 17 00:00:00 2001 From: John Blum Date: Thu, 14 Sep 2023 17:36:31 -0700 Subject: [PATCH] Adapt to API changes in the Jedis 5.0 driver. Closes #2612 --- .../jedis/JedisClusterHashCommands.java | 19 +-- .../jedis/JedisClusterServerCommands.java | 93 +++++++----- .../jedis/JedisClusterStreamCommands.java | 140 ++++++++++-------- .../jedis/JedisClusterZSetCommands.java | 93 ++++++------ .../connection/jedis/JedisConnection.java | 7 +- .../connection/jedis/JedisConverters.java | 80 +++++----- .../connection/jedis/JedisHashCommands.java | 17 ++- .../redis/connection/jedis/JedisInvoker.java | 31 ++-- .../connection/jedis/JedisZSetCommands.java | 34 ++--- .../redis/connection/zset/DefaultTuple.java | 58 ++++---- .../data/redis/connection/zset/Tuple.java | 21 +-- .../data/redis/core/types/Expiration.java | 1 - .../jedis/JedisConnectionUnitTests.java | 31 ++-- .../jedis/JedisConvertersUnitTests.java | 127 +++++++++++++--- ...faultStreamOperationsIntegrationTests.java | 11 +- 15 files changed, 444 insertions(+), 319 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java index 06bbf2a569..f53c6cbbb6 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java @@ -30,13 +30,15 @@ import org.springframework.data.redis.core.ScanCursor; import org.springframework.data.redis.core.ScanIteration; import org.springframework.data.redis.core.ScanOptions; -import org.springframework.data.util.Streamable; import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** + * Cluster {@link RedisHashCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 2.0 */ class JedisClusterHashCommands implements RedisHashCommands { @@ -160,10 +162,10 @@ public Entry hRandFieldWithValues(byte[] key) { Assert.notNull(key, "Key must not be null"); try { - Map map = connection.getCluster().hrandfieldWithValues(key, 1); - return map.isEmpty() ? null : map.entrySet().iterator().next(); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + List> mapEntryList = connection.getCluster().hrandfieldWithValues(key, 1); + return mapEntryList.isEmpty() ? null : mapEntryList.get(0); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -185,10 +187,9 @@ public List hRandField(byte[] key, long count) { public List> hRandFieldWithValues(byte[] key, long count) { try { - Map map = connection.getCluster().hrandfieldWithValues(key, count); - return Streamable.of(() -> map.entrySet().iterator()).toList(); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return connection.getCluster().hrandfieldWithValues(key, count); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java index 43b83bf874..0acc10af59 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java @@ -15,12 +15,11 @@ */ package org.springframework.data.redis.connection.jedis; -import redis.clients.jedis.Jedis; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -38,9 +37,14 @@ import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; +import redis.clients.jedis.Jedis; + /** + * {@link RedisClusterServerCommands} implementation for Jedis. + * * @author Mark Paluch * @author Dennis Neufeld + * @author John Blum * @since 2.0 */ class JedisClusterServerCommands implements RedisClusterServerCommands { @@ -82,7 +86,8 @@ public Long lastSave() { return null; } - Collections.sort(result, Collections.reverseOrder()); + result.sort(Collections.reverseOrder()); + return result.get(0); } @@ -221,7 +226,7 @@ public Properties info(RedisClusterNode node, String section) { public void shutdown() { connection.getClusterCommandExecutor().executeCommandOnAllNodes((JedisClusterCommandCallback) jedis -> { jedis.shutdown(); - return null; + return "success"; }); } @@ -229,12 +234,12 @@ public void shutdown() { public void shutdown(RedisClusterNode node) { executeCommandOnSingleNode(jedis -> { jedis.shutdown(); - return null; + return "success"; }, node); } @Override - public void shutdown(ShutdownOption option) { + public void shutdown(@Nullable ShutdownOption option) { if (option == null) { shutdown(); @@ -249,21 +254,26 @@ public Properties getConfig(String pattern) { Assert.notNull(pattern, "Pattern must not be null"); - List>> mapResult = connection.getClusterCommandExecutor() - .executeCommandOnAllNodes((JedisClusterCommandCallback>) client -> client.configGet(pattern)) + JedisClusterCommandCallback> command = jedis -> jedis.configGet(pattern); + + List>> nodeResults = connection.getClusterCommandExecutor() + .executeCommandOnAllNodes(command) .getResults(); - List result = new ArrayList<>(); - for (NodeResult> entry : mapResult) { + Properties nodesConfiguration = new Properties(); + + for (NodeResult> nodeResult : nodeResults) { + + String prefix = nodeResult.getNode().asString(); - String prefix = entry.getNode().asString(); - int i = 0; - for (String value : entry.getValue()) { - result.add((i++ % 2 == 0 ? (prefix + ".") : "") + value); + for (Entry entry : nodeResult.getValue().entrySet()) { + String newKey = prefix.concat(".").concat(entry.getKey()); + String value = entry.getValue(); + nodesConfiguration.setProperty(newKey, value); } } - return Converters.toProperties(result); + return nodesConfiguration; } @Override @@ -271,10 +281,10 @@ public Properties getConfig(RedisClusterNode node, String pattern) { Assert.notNull(pattern, "Pattern must not be null"); + JedisClusterCommandCallback command = client -> Converters.toProperties(client.configGet(pattern)); + return connection.getClusterCommandExecutor() - .executeCommandOnSingleNode( - (JedisClusterCommandCallback) client -> Converters.toProperties(client.configGet(pattern)), - node) + .executeCommandOnSingleNode(command, node) .getValue(); } @@ -322,19 +332,19 @@ public void rewriteConfig(RedisClusterNode node) { @Override public Long time(TimeUnit timeUnit) { - return convertListOfStringToTime( - connection.getClusterCommandExecutor() - .executeCommandOnArbitraryNode((JedisClusterCommandCallback>) Jedis::time).getValue(), - timeUnit); + JedisClusterCommandCallback> command = Jedis::time; + + return convertListOfStringToTime(connection.getClusterCommandExecutor() + .executeCommandOnArbitraryNode(command).getValue(), timeUnit); } @Override public Long time(RedisClusterNode node, TimeUnit timeUnit) { - return convertListOfStringToTime( - connection.getClusterCommandExecutor() - .executeCommandOnSingleNode((JedisClusterCommandCallback>) Jedis::time, node).getValue(), - timeUnit); + JedisClusterCommandCallback> command = Jedis::time; + + return convertListOfStringToTime(connection.getClusterCommandExecutor() + .executeCommandOnSingleNode(command, node).getValue(), timeUnit); } @Override @@ -343,8 +353,9 @@ public void killClient(String host, int port) { Assert.hasText(host, "Host for 'CLIENT KILL' must not be 'null' or 'empty'"); String hostAndPort = String.format("%s:%s", host, port); - connection.getClusterCommandExecutor() - .executeCommandOnAllNodes((JedisClusterCommandCallback) client -> client.clientKill(hostAndPort)); + JedisClusterCommandCallback command = jedis -> jedis.clientKill(hostAndPort); + + connection.getClusterCommandExecutor().executeCommandOnAllNodes(command); } @Override @@ -360,21 +371,26 @@ public String getClientName() { @Override public List getClientList() { + JedisClusterCommandCallback command = Jedis::clientList; + Collection map = connection.getClusterCommandExecutor() - .executeCommandOnAllNodes((JedisClusterCommandCallback) Jedis::clientList).resultsAsList(); + .executeCommandOnAllNodes(command).resultsAsList(); ArrayList result = new ArrayList<>(); + for (String infos : map) { result.addAll(JedisConverters.toListOfRedisClientInformation(infos)); } + return result; } @Override public List getClientList(RedisClusterNode node) { - return JedisConverters - .toListOfRedisClientInformation(executeCommandOnSingleNode(Jedis::clientList, node).getValue()); + JedisClusterCommandCallback command = Jedis::clientList; + + return JedisConverters.toListOfRedisClientInformation(executeCommandOnSingleNode(command, node).getValue()); } @Override @@ -403,8 +419,10 @@ public void migrate(byte[] key, RedisNode target, int dbIndex, @Nullable Migrate RedisClusterNode node = connection.getTopologyProvider().getTopology().lookup(target.getHost(), target.getPort()); - executeCommandOnSingleNode(client -> client.migrate(target.getHost(), target.getPort(), key, dbIndex, timeoutToUse), - node); + JedisClusterCommandCallback command = jedis -> + jedis.migrate(target.getHost(), target.getPort(), key, dbIndex, timeoutToUse); + + executeCommandOnSingleNode(command, node); } private Long convertListOfStringToTime(List serverTimeInformation, TimeUnit timeUnit) { @@ -416,12 +434,11 @@ private Long convertListOfStringToTime(List serverTimeInformation, TimeU return Converters.toTimeMillis(serverTimeInformation.get(0), serverTimeInformation.get(1), timeUnit); } - private NodeResult executeCommandOnSingleNode(JedisClusterCommandCallback cmd, RedisClusterNode node) { - return connection.getClusterCommandExecutor().executeCommandOnSingleNode(cmd, node); + private NodeResult executeCommandOnSingleNode(JedisClusterCommandCallback command, RedisClusterNode node) { + return connection.getClusterCommandExecutor().executeCommandOnSingleNode(command, node); } - private MultiNodeResult executeCommandOnAllNodes(JedisClusterCommandCallback cmd) { - return connection.getClusterCommandExecutor().executeCommandOnAllNodes(cmd); + private MultiNodeResult executeCommandOnAllNodes(JedisClusterCommandCallback command) { + return connection.getClusterCommandExecutor().executeCommandOnAllNodes(command); } - } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index 11223cb2d5..2e860cbcd1 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -15,11 +15,11 @@ */ package org.springframework.data.redis.connection.jedis; -import static org.springframework.data.redis.connection.jedis.StreamConverters.*; - import redis.clients.jedis.BuilderFactory; +import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XClaimParams; +import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; @@ -43,9 +43,13 @@ import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamReadOptions; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** + * {@link RedisStreamCommands} implementation for Jedis. + * * @author Dengliming + * @author John Blum * @since 2.3 */ class JedisClusterStreamCommands implements RedisStreamCommands { @@ -65,9 +69,9 @@ public Long xAck(byte[] key, String group, RecordId... recordIds) { try { return connection.getCluster().xack(key, JedisConverters.toBytes(group), - entryIdsToBytes(Arrays.asList(recordIds))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + StreamConverters.entryIdsToBytes(Arrays.asList(recordIds))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -80,10 +84,10 @@ public RecordId xAdd(MapRecord record, XAddOptions optio XAddParams params = StreamConverters.toXAddParams(record.getId(), options); try { - return RecordId - .of(JedisConverters.toString(connection.getCluster().xadd(record.getStream(), record.getValue(), params))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return RecordId.of(JedisConverters.toString(connection.getCluster() + .xadd(record.getStream(), record.getValue(), params))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -97,17 +101,21 @@ public List xClaimJustId(byte[] key, String group, String newOwner, XC long minIdleTime = options.getMinIdleTime() == null ? -1L : options.getMinIdleTime().toMillis(); XClaimParams xClaimParams = StreamConverters.toXClaimParams(options); + try { List ids = connection.getCluster().xclaimJustId(key, JedisConverters.toBytes(group), - JedisConverters.toBytes(newOwner), minIdleTime, xClaimParams, entryIdsToBytes(options.getIds())); + JedisConverters.toBytes(newOwner), minIdleTime, xClaimParams, + StreamConverters.entryIdsToBytes(options.getIds())); List recordIds = new ArrayList<>(ids.size()); + ids.forEach(it -> recordIds.add(RecordId.of(JedisConverters.toString(it)))); return recordIds; - } catch (Exception ex) { - throw convertJedisAccessException(ex); + + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -121,11 +129,13 @@ public List xClaim(byte[] key, String group, String newOwner, XClaim long minIdleTime = options.getMinIdleTime() == null ? -1L : options.getMinIdleTime().toMillis(); XClaimParams xClaimParams = StreamConverters.toXClaimParams(options); + try { - return convertToByteRecord(key, connection.getCluster().xclaim(key, JedisConverters.toBytes(group), - JedisConverters.toBytes(newOwner), minIdleTime, xClaimParams, entryIdsToBytes(options.getIds()))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return StreamConverters.convertToByteRecord(key, connection.getCluster().xclaim(key, + JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), minIdleTime, xClaimParams, + StreamConverters.entryIdsToBytes(options.getIds()))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -136,9 +146,9 @@ public Long xDel(byte[] key, RecordId... recordIds) { Assert.notNull(recordIds, "recordIds must not be null"); try { - return connection.getCluster().xdel(key, entryIdsToBytes(Arrays.asList(recordIds))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return connection.getCluster().xdel(key, StreamConverters.entryIdsToBytes(Arrays.asList(recordIds))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -157,8 +167,8 @@ public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, try { return connection.getCluster().xgroupCreate(key, JedisConverters.toBytes(groupName), JedisConverters.toBytes(readOffset.getOffset()), mkStream); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -171,8 +181,8 @@ public Boolean xGroupDelConsumer(byte[] key, Consumer consumer) { try { return connection.getCluster().xgroupDelConsumer(key, JedisConverters.toBytes(consumer.getGroup()), JedisConverters.toBytes(consumer.getName())) != 0L; - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -184,8 +194,8 @@ public Boolean xGroupDestroy(byte[] key, String groupName) { try { return connection.getCluster().xgroupDestroy(key, JedisConverters.toBytes(groupName)) != 0L; - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -196,8 +206,8 @@ public StreamInfo.XInfoStream xInfo(byte[] key) { try { return StreamInfo.XInfoStream.fromList((List) connection.getCluster().xinfoStream(key)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -208,8 +218,8 @@ public StreamInfo.XInfoGroups xInfoGroups(byte[] key) { try { return StreamInfo.XInfoGroups.fromList(connection.getCluster().xinfoGroups(key)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -222,8 +232,8 @@ public StreamInfo.XInfoConsumers xInfoConsumers(byte[] key, String groupName) { try { return StreamInfo.XInfoConsumers.fromList(groupName, connection.getCluster().xinfoConsumers(key, JedisConverters.toBytes(groupName))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -234,8 +244,8 @@ public Long xLen(byte[] key) { try { return connection.getCluster().xlen(key); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -252,13 +262,14 @@ public PendingMessagesSummary xPending(byte[] key, String groupName) { Object response = connection.getCluster().xpending(key, group); return StreamConverters.toPendingMessagesSummary(groupName, response); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @Override + @SuppressWarnings("unchecked") public PendingMessages xPending(byte[] key, String groupName, XPendingOptions options) { Assert.notNull(key, "Key must not be null"); @@ -269,14 +280,24 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op try { - List response = connection.getCluster().xpending(key, group, - JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)), - options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName())); + XPendingParams pendingParams = new XPendingParams( + JedisConverters.toBytes(StreamConverters.getLowerValue(range)), + JedisConverters.toBytes(StreamConverters.getUpperValue(range)), + options.getCount().intValue()); + + String consumerName = options.getConsumerName(); + + if (StringUtils.hasText(consumerName)) { + pendingParams = pendingParams.consumer(consumerName); + } + + List response = connection.getCluster().xpending(key, group, pendingParams); return StreamConverters.toPendingMessages(groupName, range, BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(response)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -290,10 +311,11 @@ public List xRange(byte[] key, Range range, Limit limit) { int count = limit.isUnlimited() ? Integer.MAX_VALUE : limit.getCount(); try { - return convertToByteRecord(key, connection.getCluster().xrange(key, JedisConverters.toBytes(getLowerValue(range)), - JedisConverters.toBytes(getUpperValue(range)), count)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return StreamConverters.convertToByteRecord(key, connection.getCluster().xrange(key, + JedisConverters.toBytes(StreamConverters.getLowerValue(range)), + JedisConverters.toBytes(StreamConverters.getUpperValue(range)), count)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -307,15 +329,15 @@ public List xRead(StreamReadOptions readOptions, StreamOffset xread = connection.getCluster().xread(xReadParams, toStreamOffsets(streams)); + List xread = connection.getCluster().xread(xReadParams, StreamConverters.toStreamOffsets(streams)); if (xread == null) { return Collections.emptyList(); } return StreamConverters.convertToByteRecords(xread); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -332,15 +354,15 @@ public List xReadGroup(Consumer consumer, StreamReadOptions readOpti try { List xread = connection.getCluster().xreadGroup(JedisConverters.toBytes(consumer.getGroup()), - JedisConverters.toBytes(consumer.getName()), xReadParams, toStreamOffsets(streams)); + JedisConverters.toBytes(consumer.getName()), xReadParams, StreamConverters.toStreamOffsets(streams)); if (xread == null) { return Collections.emptyList(); } return StreamConverters.convertToByteRecords(xread); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -354,10 +376,11 @@ public List xRevRange(byte[] key, Range range, Limit limit) int count = limit.isUnlimited() ? Integer.MAX_VALUE : limit.getCount(); try { - return convertToByteRecord(key, connection.getCluster().xrevrange(key, - JedisConverters.toBytes(getUpperValue(range)), JedisConverters.toBytes(getLowerValue(range)), count)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return StreamConverters.convertToByteRecord(key, connection.getCluster().xrevrange(key, + JedisConverters.toBytes(StreamConverters.getUpperValue(range)), + JedisConverters.toBytes(StreamConverters.getLowerValue(range)), count)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -373,13 +396,12 @@ public Long xTrim(byte[] key, long count, boolean approximateTrimming) { try { return connection.getCluster().xtrim(key, count, approximateTrimming); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } - private DataAccessException convertJedisAccessException(Exception ex) { - return connection.convertJedisAccessException(ex); + private DataAccessException convertJedisAccessException(Exception cause) { + return connection.convertJedisAccessException(cause); } - } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java index 4d0923e43d..a63acfedca 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java @@ -20,6 +20,7 @@ import redis.clients.jedis.params.ZParams; import redis.clients.jedis.params.ZRangeParams; import redis.clients.jedis.resps.ScanResult; +import redis.clients.jedis.util.KeyValue; import java.util.ArrayList; import java.util.LinkedHashSet; @@ -34,7 +35,6 @@ import org.springframework.data.redis.connection.RedisZSetCommands; import org.springframework.data.redis.connection.convert.SetConverter; import org.springframework.data.redis.connection.zset.Aggregate; -import org.springframework.data.redis.connection.zset.DefaultTuple; import org.springframework.data.redis.connection.zset.Tuple; import org.springframework.data.redis.connection.zset.Weights; import org.springframework.data.redis.core.Cursor; @@ -46,18 +46,22 @@ import org.springframework.util.Assert; /** + * Cluster {@link RedisZSetCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch * @author Clement Ong * @author Andrey Shlykov * @author Jens Deppe * @author Shyngys Sapraliyev + * @author John Blum * @since 2.0 */ class JedisClusterZSetCommands implements RedisZSetCommands { - private static final SetConverter TUPLE_SET_CONVERTER = new SetConverter<>( - JedisConverters::toTuple); + private static final SetConverter TUPLE_SET_CONVERTER = + new SetConverter<>(JedisConverters::toTuple); + private final JedisClusterConnection connection; JedisClusterZSetCommands(JedisClusterConnection connection) { @@ -349,8 +353,8 @@ public Tuple bZPopMin(byte[] key, long timeout, TimeUnit unit) { try { return toTuple(connection.getCluster().bzpopmin(JedisConverters.toSeconds(timeout, unit), key)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -816,11 +820,10 @@ public Set zDiff(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return connection.getCluster().zdiff(sets); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toSet(connection.getCluster().zdiff(sets)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -833,11 +836,10 @@ public Set zDiffWithScores(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return JedisConverters.toTupleSet(connection.getCluster().zdiffWithScores(sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster().zdiffWithScores(sets))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -870,11 +872,10 @@ public Set zInter(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return connection.getCluster().zinter(new ZParams(), sets); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toSet(connection.getCluster().zinter(new ZParams(), sets)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -887,11 +888,11 @@ public Set zInterWithScores(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return JedisConverters.toTupleSet(connection.getCluster().zinterWithScores(new ZParams(), sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zinterWithScores(new ZParams(), sets))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -907,12 +908,11 @@ public Set zInterWithScores(Aggregate aggregate, Weights weights, byte[]. .format("The number of weights (%d) must match the number of source sets (%d)", weights.size(), sets.length)); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return JedisConverters - .toTupleSet(connection.getCluster().zinterWithScores(toZParams(aggregate, weights), sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zinterWithScores(toZParams(aggregate, weights), sets))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -969,11 +969,10 @@ public Set zUnion(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return connection.getCluster().zunion(new ZParams(), sets); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toSet(connection.getCluster().zunion(new ZParams(), sets)); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -986,11 +985,11 @@ public Set zUnionWithScores(byte[]... sets) { Assert.notNull(sets, "Sets must not be null"); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return JedisConverters.toTupleSet(connection.getCluster().zunionWithScores(new ZParams(), sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zunionWithScores(new ZParams(), sets))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -1006,12 +1005,11 @@ public Set zUnionWithScores(Aggregate aggregate, Weights weights, byte[]. .format("The number of weights (%d) must match the number of source sets (%d)", weights.size(), sets.length)); if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { - try { - return JedisConverters - .toTupleSet(connection.getCluster().zunionWithScores(toZParams(aggregate, weights), sets)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zunionWithScores(toZParams(aggregate, weights), sets))); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -1126,21 +1124,14 @@ private static ZParams toZParams(Aggregate aggregate, Weights weights) { return new ZParams().weights(weights.toArray()).aggregate(ZParams.Aggregate.valueOf(aggregate.name())); } - /** - * Workaround for broken Jedis BZPOP signature. - * - * @param bytes - * @return - */ @Nullable - @SuppressWarnings("unchecked") - private static Tuple toTuple(@Nullable List bytes) { + private static Tuple toTuple(@Nullable KeyValue keyValue) { - if (bytes == null || bytes.isEmpty()) { - return null; + if (keyValue != null) { + redis.clients.jedis.resps.Tuple tuple = keyValue.getValue(); + return tuple != null ? JedisConverters.toTuple(tuple) : null; } - return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2)))); + return null; } - } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index eaa1b5ba4e..a474bbb8fe 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -59,9 +59,6 @@ import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.CommandArguments; import redis.clients.jedis.CommandObject; @@ -130,6 +127,7 @@ public class JedisConnection extends AbstractRedisConnection { private final Log LOGGER = LogFactory.getLog(getClass()); + @SuppressWarnings("rawtypes") private List pipelinedResults = new ArrayList<>(); private final @Nullable Pool pool; @@ -348,7 +346,6 @@ public void close() throws DataAccessException { jedis.close(); } else { - doExceptionThrowingOperationSafely(jedis::quit, "Failed to quit during close"); doExceptionThrowingOperationSafely(jedis::disconnect, "Failed to disconnect during close"); } } @@ -480,6 +477,7 @@ public void discard() { public List exec() { try { + if (transaction == null) { throw new InvalidDataAccessApiUsageException("No ongoing transaction; Did you forget to call multi"); } @@ -489,6 +487,7 @@ public List exec() { return !CollectionUtils.isEmpty(results) ? new TransactionResultConverter<>(txResults, JedisExceptionConverter.INSTANCE).convert(results) : results; + } catch (Exception cause) { throw convertJedisAccessException(cause); } finally { diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java index 88b8a5eb4d..705621ac0c 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java @@ -15,26 +15,12 @@ */ package org.springframework.data.redis.connection.jedis; -import redis.clients.jedis.GeoCoordinate; -import redis.clients.jedis.args.BitOP; -import redis.clients.jedis.args.FlushMode; -import redis.clients.jedis.args.GeoUnit; -import redis.clients.jedis.args.ListPosition; -import redis.clients.jedis.params.GeoRadiusParam; -import redis.clients.jedis.params.GeoSearchParam; -import redis.clients.jedis.params.GetExParams; -import redis.clients.jedis.params.ScanParams; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.params.SortingParams; -import redis.clients.jedis.params.ZAddParams; -import redis.clients.jedis.resps.GeoRadiusResponse; -import redis.clients.jedis.util.SafeEncoder; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -87,6 +73,21 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import redis.clients.jedis.GeoCoordinate; +import redis.clients.jedis.args.BitOP; +import redis.clients.jedis.args.FlushMode; +import redis.clients.jedis.args.GeoUnit; +import redis.clients.jedis.args.ListPosition; +import redis.clients.jedis.params.GeoRadiusParam; +import redis.clients.jedis.params.GeoSearchParam; +import redis.clients.jedis.params.GetExParams; +import redis.clients.jedis.params.ScanParams; +import redis.clients.jedis.params.SetParams; +import redis.clients.jedis.params.SortingParams; +import redis.clients.jedis.params.ZAddParams; +import redis.clients.jedis.resps.GeoRadiusResponse; +import redis.clients.jedis.util.SafeEncoder; + /** * Jedis type converters. * @@ -114,12 +115,22 @@ abstract class JedisConverters extends Converters { MINUS_BYTES = toBytes("-"); POSITIVE_INFINITY_BYTES = toBytes("+inf"); NEGATIVE_INFINITY_BYTES = toBytes("-inf"); + } - public static Converter stringToBytes() { + @Nullable + static Set toSet(@Nullable List list) { + return list != null ? new LinkedHashSet<>(list) : null; + } + + static Converter stringToBytes() { return JedisConverters::toBytes; } + static ListConverter stringListToByteList() { + return new ListConverter<>(stringToBytes()); + } + /** * {@link ListConverter} converting jedis {@link redis.clients.jedis.resps.Tuple} to {@link Tuple}. * @@ -129,16 +140,16 @@ static ListConverter tuplesToTuples() { return new ListConverter<>(JedisConverters::toTuple); } - static ListConverter stringListToByteList() { - return new ListConverter<>(stringToBytes()); + static Tuple toTuple(redis.clients.jedis.resps.Tuple source) { + return new DefaultTuple(source.getBinaryElement(), source.getScore()); } - static Set toTupleSet(Set source) { - return new SetConverter<>(JedisConverters::toTuple).convert(source); + static List toTupleList(List source) { + return tuplesToTuples().convert(source); } - public static Tuple toTuple(redis.clients.jedis.resps.Tuple source) { - return new DefaultTuple(source.getBinaryElement(), source.getScore()); + static Set toTupleSet(Set source) { + return new SetConverter<>(JedisConverters::toTuple).convert(source); } /** @@ -255,6 +266,7 @@ public static byte[][] toByteArrays(Map source) { public static SortingParams toSortingParams(@Nullable SortParameters params) { SortingParams jedisParams = null; + if (params != null) { jedisParams = new SortingParams(); byte[] byPattern = params.getByPattern(); @@ -278,6 +290,7 @@ public static SortingParams toSortingParams(@Nullable SortParameters params) { jedisParams.alpha(); } } + return jedisParams; } @@ -386,8 +399,10 @@ public static SetParams toSetCommandExPxArgument(Expiration expiration, SetParam * @since 2.6 */ static GetExParams toGetExParams(Expiration expiration) { + return toGetExParams(expiration, new GetExParams()); + } - GetExParams params = new GetExParams(); + static GetExParams toGetExParams(Expiration expiration, GetExParams params) { if (expiration.isPersistent()) { return params.persist(); @@ -584,18 +599,14 @@ static ZAddParams toZAddParams(ZAddArgs source) { return new ZAddParams(); } - ZAddParams target = new ZAddParams() { - - { - if (source.contains(ZAddArgs.Flag.GT)) { - addParam("gt"); - } - if (source.contains(ZAddArgs.Flag.LT)) { - addParam("lt"); - } - } - }; + ZAddParams target = new ZAddParams(); + if (source.contains(ZAddArgs.Flag.GT)) { + target.gt(); + } + if (source.contains(ZAddArgs.Flag.LT)) { + target.lt(); + } if (source.contains(ZAddArgs.Flag.XX)) { target.xx(); } @@ -605,6 +616,7 @@ static ZAddParams toZAddParams(ZAddArgs source) { if (source.contains(ZAddArgs.Flag.CH)) { target.ch(); } + return target; } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java index 4efaf0c85c..419d19d3f8 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java @@ -37,8 +37,11 @@ import org.springframework.util.Assert; /** + * {@link RedisHashCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 2.0 */ class JedisHashCommands implements RedisHashCommands { @@ -122,7 +125,7 @@ public Entry hRandFieldWithValues(byte[] key) { Assert.notNull(key, "Key must not be null"); return connection.invoke().from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, 1L) - .get(it -> it.isEmpty() ? null : it.entrySet().iterator().next()); + .get(mapEntryList -> mapEntryList.isEmpty() ? null : mapEntryList.get(0)); } @Nullable @@ -141,12 +144,16 @@ public List> hRandFieldWithValues(byte[] key, long count) Assert.notNull(key, "Key must not be null"); return connection.invoke() - .from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count).get(it -> { + .from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count) + .get(mapEntryList -> { + + List> convertedMapEntryList = new ArrayList<>(mapEntryList.size()); + + mapEntryList.forEach(entry -> + convertedMapEntryList.add(Converters.entryOf(entry.getKey(), entry.getValue()))); - List> entries = new ArrayList<>(it.size()); - it.forEach((k, v) -> entries.add(Converters.entryOf(k, v))); + return convertedMapEntryList; - return entries; }); } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java index 805efabe4b..ef1771969d 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java @@ -17,7 +17,6 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Queable; import redis.clients.jedis.Response; import redis.clients.jedis.Transaction; import redis.clients.jedis.commands.DatabasePipelineCommands; @@ -47,7 +46,7 @@ * composing a functional pipeline to transform the result using a {@link Converter}. *

* Usage example: - * + *

*

  * JedisInvoker invoker = …;
  *
@@ -62,6 +61,7 @@
  *
  * @author Mark Paluch
  * @author Christoph Strobl
+ * @author John Blum
  * @since 2.5
  */
 class JedisInvoker {
@@ -937,10 +937,7 @@ static class DefaultSingleInvocationSpec implements SingleInvocationSpec {
 
 		@Override
 		public  T get(Converter converter) {
-
-			Assert.notNull(converter, "Converter must not be null");
-
-			return synchronizer.invoke(parentFunction, parentPipelineFunction, converter, () -> null);
+			return getOrElse(converter, () -> null);
 		}
 
 		@Nullable
@@ -959,6 +956,7 @@ static class DefaultManyInvocationSpec implements ManyInvocationSpec {
 		private final Function>> parentPipelineFunction;
 		private final Synchronizer synchronizer;
 
+		@SuppressWarnings({ "rawtypes", "unchecked" })
 		DefaultManyInvocationSpec(Function> parentFunction,
 				Function>> parentPipelineFunction,
 				Synchronizer synchronizer) {
@@ -969,6 +967,7 @@ static class DefaultManyInvocationSpec implements ManyInvocationSpec {
 		}
 
 		@Override
+		@SuppressWarnings("all")
 		public  List toList(Converter converter) {
 
 			Assert.notNull(converter, "Converter must not be null");
@@ -981,15 +980,17 @@ public  List toList(Converter converter) {
 
 				List result = new ArrayList<>(source.size());
 
-				for (S s : source) {
-					result.add(converter.convert(s));
+				for (S element : source) {
+					result.add(converter.convert(element));
 				}
 
 				return result;
+
 			}, Collections::emptyList);
 		}
 
 		@Override
+		@SuppressWarnings("all")
 		public  Set toSet(Converter converter) {
 
 			Assert.notNull(converter, "Converter must not be null");
@@ -1002,11 +1003,12 @@ public  Set toSet(Converter converter) {
 
 				Set result = new LinkedHashSet<>(source.size());
 
-				for (S s : source) {
-					result.add(converter.convert(s));
+				for (S element : source) {
+					result.add(converter.convert(element));
 				}
 
 				return result;
+
 			}, Collections::emptySet);
 		}
 	}
@@ -1020,6 +1022,7 @@ interface Synchronizer {
 		@Nullable
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		default  T invoke(Function callFunction, Function> pipelineFunction) {
+
 			return (T) doInvoke((Function) callFunction, (Function) pipelineFunction, Converters.identityConverter(),
 					() -> null);
 		}
@@ -1046,15 +1049,13 @@ interface ResponseCommands extends PipelineBinaryCommands, DatabasePipelineComma
 	/**
 	 * Create a proxy to invoke methods dynamically on {@link Pipeline} or {@link Transaction} as those share many
 	 * commands that are not defined on a common super-type.
-	 *
-	 * @param pipelineOrTransaction
-	 * @return
 	 */
-	static ResponseCommands createCommands(Queable pipelineOrTransaction) {
+	static ResponseCommands createCommands(Object pipelineOrTransaction) {
 
 		ProxyFactory proxyFactory = new ProxyFactory(pipelineOrTransaction);
+
 		proxyFactory.addInterface(ResponseCommands.class);
+
 		return (ResponseCommands) proxyFactory.getProxy(JedisInvoker.class.getClassLoader());
 	}
-
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
index 1aab87de0a..a19247a8d1 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
@@ -22,6 +22,7 @@
 import redis.clients.jedis.params.ZParams;
 import redis.clients.jedis.params.ZRangeParams;
 import redis.clients.jedis.resps.ScanResult;
+import redis.clients.jedis.util.KeyValue;
 
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -31,7 +32,6 @@
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.data.redis.connection.RedisZSetCommands;
 import org.springframework.data.redis.connection.zset.Aggregate;
-import org.springframework.data.redis.connection.zset.DefaultTuple;
 import org.springframework.data.redis.connection.zset.Tuple;
 import org.springframework.data.redis.connection.zset.Weights;
 import org.springframework.data.redis.core.Cursor;
@@ -42,11 +42,14 @@
 import org.springframework.util.Assert;
 
 /**
+ * {@link RedisZSetCommands} implementation for Jedis.
+ *
  * @author Christoph Strobl
  * @author Clement Ong
  * @author Mark Paluch
  * @author Andrey Shlykov
  * @author Shyngys Sapraliyev
+ * @author John Blum
  * @since 2.0
  */
 class JedisZSetCommands implements RedisZSetCommands {
@@ -74,8 +77,10 @@ public Long zAdd(byte[] key, Set tuples, ZAddArgs args) {
 		Assert.notNull(key, "Key must not be null");
 		Assert.notNull(tuples, "Tuples must not be null");
 
-		return connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key, JedisConverters.toTupleMap(tuples),
-				JedisConverters.toZAddParams(args));
+		Long count = connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key,
+				JedisConverters.toTupleMap(tuples), JedisConverters.toZAddParams(args));
+
+		return count != null ? count : 0L;
 	}
 
 	@Override
@@ -424,7 +429,7 @@ public Set zDiff(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
-		return connection.invoke().just(Jedis::zdiff, PipelineBinaryCommands::zdiff, sets);
+		return connection.invoke().fromMany(Jedis::zdiff, PipelineBinaryCommands::zdiff, sets).toSet();
 	}
 
 	@Override
@@ -450,7 +455,7 @@ public Set zInter(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
-		return connection.invoke().just(Jedis::zinter, PipelineBinaryCommands::zinter, new ZParams(), sets);
+		return connection.invoke().fromMany(Jedis::zinter, PipelineBinaryCommands::zinter, new ZParams(), sets).toSet();
 	}
 
 	@Override
@@ -504,7 +509,7 @@ public Set zUnion(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
-		return connection.invoke().just(Jedis::zunion, PipelineBinaryCommands::zunion, new ZParams(), sets);
+		return connection.invoke().fromMany(Jedis::zunion, PipelineBinaryCommands::zunion, new ZParams(), sets).toSet();
 	}
 
 	@Override
@@ -772,21 +777,8 @@ static ZRangeParams toZRangeParams(Protocol.Keyword by, byte[] min, byte[] max,
 		return zRangeParams;
 	}
 
-	/**
-	 * Workaround for broken Jedis BZPOP signature.
-	 *
-	 * @param bytes
-	 * @return
-	 */
 	@Nullable
-	@SuppressWarnings("unchecked")
-	private static Tuple toTuple(List bytes) {
-
-		if (bytes.isEmpty()) {
-			return null;
-		}
-
-		return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2))));
+	private static Tuple toTuple(@Nullable KeyValue keyValue) {
+		return keyValue != null ? JedisConverters.toTuple(keyValue.getValue()) : null;
 	}
-
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java b/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
index eb2c198e8e..34e8e88454 100644
--- a/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
+++ b/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
@@ -18,23 +18,27 @@
 import java.util.Arrays;
 
 import org.springframework.lang.Nullable;
+import org.springframework.util.ObjectUtils;
 
 /**
  * Default implementation for {@link Tuple} interface.
  *
  * @author Costin Leau
  * @author Christoph Strobl
+ * @author John Blum
  */
 public class DefaultTuple implements Tuple {
 
+	private static final Double ZERO = 0.0d;
+
 	private final Double score;
 	private final byte[] value;
 
 	/**
-	 * Constructs a new DefaultTuple instance.
+	 * Constructs a new {@link DefaultTuple}.
 	 *
-	 * @param value
-	 * @param score
+	 * @param value {@link byte[]} of the member's raw value.
+	 * @param score {@link Double score} of the raw value used in sorting.
 	 */
 	public DefaultTuple(byte[] value, Double score) {
 
@@ -43,29 +47,25 @@ public DefaultTuple(byte[] value, Double score) {
 	}
 
 	public Double getScore() {
-		return score;
+		return this.score;
 	}
 
 	public byte[] getValue() {
-		return value;
+		return this.value;
 	}
 
 	public boolean equals(@Nullable Object obj) {
-		if (this == obj)
+
+		if (this == obj) {
 			return true;
-		if (obj == null)
-			return false;
-		if (!(obj instanceof DefaultTuple))
-			return false;
-		DefaultTuple other = (DefaultTuple) obj;
-		if (score == null) {
-			if (other.score != null)
-				return false;
-		} else if (!score.equals(other.score))
-			return false;
-		if (!Arrays.equals(value, other.value))
+		}
+
+		if (!(obj instanceof DefaultTuple that)) {
 			return false;
-		return true;
+		}
+
+		return ObjectUtils.nullSafeEquals(this.score, that.score)
+			&& Arrays.equals(this.value, that.value);
 	}
 
 	public int hashCode() {
@@ -76,19 +76,21 @@ public int hashCode() {
 		return result;
 	}
 
-	public int compareTo(Double o) {
-		Double d = (score == null ? Double.valueOf(0.0d) : score);
-		Double a = (o == null ? Double.valueOf(0.0d) : o);
-		return d.compareTo(a);
+	public int compareTo(Double value) {
+
+		Double ourScore = getScore();
+		Double thisScore = ourScore != null ? ourScore : ZERO;
+		Double thatScore = value != null ? value : ZERO;
+
+		return thisScore.compareTo(thatScore);
 	}
 
 	@Override
 	public String toString() {
-		StringBuffer sb = new StringBuffer();
-		sb.append(getClass().getSimpleName());
-		sb.append(" [score=").append(score);
-		sb.append(", value=").append(value == null ? "null" : new String(value));
-		sb.append(']');
-		return sb.toString();
+
+		return getClass().getSimpleName()
+			+ " { score=" + getScore()
+			+ ", value=" + Arrays.toString(getValue())
+			+ " }";
 	}
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/zset/Tuple.java b/src/main/java/org/springframework/data/redis/connection/zset/Tuple.java
index 19c05a3385..4f35b6c25f 100644
--- a/src/main/java/org/springframework/data/redis/connection/zset/Tuple.java
+++ b/src/main/java/org/springframework/data/redis/connection/zset/Tuple.java
@@ -20,6 +20,17 @@
  */
 public interface Tuple extends Comparable {
 
+	/**
+	 * Create a new {@link Tuple} with the {@link byte[] raw value} and given {@link Double score}.
+	 *
+	 * @param value {@link byte[]} of the member's raw value.
+	 * @param score {@link Double score} given to the {@code value} used in sorting.
+	 * @return a {@link Tuple} capturing the {@link byte[] value} with its {@link Double score}.
+	 */
+	static Tuple of(byte[] value, Double score) {
+		return new DefaultTuple(value, score);
+	}
+
 	/**
 	 * @return the raw value of the member.
 	 */
@@ -30,14 +41,4 @@ public interface Tuple extends Comparable {
 	 */
 	Double getScore();
 
-	/**
-	 * Create a new {@link Tuple}.
-	 *
-	 * @param value
-	 * @param score
-	 * @return
-	 */
-	static Tuple of(byte[] value, Double score) {
-		return new DefaultTuple(value, score);
-	}
 }
diff --git a/src/main/java/org/springframework/data/redis/core/types/Expiration.java b/src/main/java/org/springframework/data/redis/core/types/Expiration.java
index c5df3ae8f7..d24a60fc89 100644
--- a/src/main/java/org/springframework/data/redis/core/types/Expiration.java
+++ b/src/main/java/org/springframework/data/redis/core/types/Expiration.java
@@ -16,7 +16,6 @@
 package org.springframework.data.redis.core.types;
 
 import java.time.Duration;
-import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import org.springframework.lang.Nullable;
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
index 2a39274473..51edd0ded0 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
@@ -25,7 +25,6 @@
 import redis.clients.jedis.params.ScanParams;
 import redis.clients.jedis.resps.ScanResult;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.Map.Entry;
 
@@ -43,7 +42,10 @@
 import org.springframework.data.redis.core.ScanOptions;
 
 /**
+ * Unit tests for {@link JedisConnection}.
+ *
  * @author Christoph Strobl
+ * @author John Blum
  */
 class JedisConnectionUnitTests {
 
@@ -165,11 +167,11 @@ public void scanShouldKeepTheConnectionOpen() {
 
 			connection.scan(ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531, GH-2006
-		public void scanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException {
+		public void scanShouldCloseTheConnectionWhenCursorIsClosed() {
 
 			doReturn(new ScanResult<>("0", Collections. emptyList())).when(jedisSpy).scan(any(byte[].class),
 					any(ScanParams.class));
@@ -177,7 +179,7 @@ public void scanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.scan(ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -188,11 +190,11 @@ public void sScanShouldKeepTheConnectionOpen() {
 
 			connection.sScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
-		public void sScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException {
+		public void sScanShouldCloseTheConnectionWhenCursorIsClosed() {
 
 			doReturn(new ScanResult<>("0", Collections. emptyList())).when(jedisSpy).sscan(any(byte[].class),
 					any(byte[].class), any(ScanParams.class));
@@ -200,7 +202,7 @@ public void sScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.sScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -211,11 +213,11 @@ public void zScanShouldKeepTheConnectionOpen() {
 
 			connection.zScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
-		public void zScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException {
+		public void zScanShouldCloseTheConnectionWhenCursorIsClosed() {
 
 			doReturn(new ScanResult<>("0", Collections. emptyList())).when(jedisSpy).zscan(any(byte[].class),
 					any(byte[].class), any(ScanParams.class));
@@ -223,7 +225,7 @@ public void zScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.zScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -234,19 +236,20 @@ public void hScanShouldKeepTheConnectionOpen() {
 
 			connection.hScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
-		public void hScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException {
+		public void hScanShouldCloseTheConnectionWhenCursorIsClosed() {
 
 			doReturn(new ScanResult<>("0", Collections. emptyList())).when(jedisSpy).hscan(any(byte[].class),
 					any(byte[].class), any(ScanParams.class));
 
 			Cursor> cursor = connection.hScan("foo".getBytes(), ScanOptions.NONE);
+
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-714
@@ -369,7 +372,5 @@ public void hScanShouldCloseTheConnectionWhenCursorIsClosed() {
 			assertThatExceptionOfType(InvalidDataAccessApiUsageException.class)
 					.isThrownBy(() -> super.hScanShouldCloseTheConnectionWhenCursorIsClosed());
 		}
-
 	}
-
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
index eb37cf171d..7cbc1b6e92 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
@@ -16,7 +16,15 @@
 package org.springframework.data.redis.connection.jedis;
 
 import static org.assertj.core.api.Assertions.*;
-
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import redis.clients.jedis.Protocol;
 import redis.clients.jedis.params.GetExParams;
 import redis.clients.jedis.params.SetParams;
 
@@ -35,12 +43,15 @@
 import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
 import org.springframework.data.redis.core.types.Expiration;
 import org.springframework.data.redis.core.types.RedisClientInfo;
+import org.springframework.lang.Nullable;
+import org.springframework.test.util.ReflectionTestUtils;
 
 /**
  * Unit tests for {@link JedisConverters}.
  *
  * @author Christoph Strobl
  * @author Mark Paluch
+ * @author John Blum
  */
 class JedisConvertersUnitTests {
 
@@ -212,15 +223,31 @@ void toSetCommandExPxOptionShouldReturnEXforMilliseconds() {
 	@Test // GH-2050
 	void convertsExpirationToSetPXAT() {
 
-		assertThat(JedisConverters.toSetCommandExPxArgument(Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS)))
-				.extracting(SetParams::toString).isEqualTo(SetParams.setParams().pxAt(10).toString());
+		SetParams mockSetParams = mock(SetParams.class);
+
+		doReturn(mockSetParams).when(mockSetParams).pxAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS);
+
+		assertThat(JedisConverters.toSetCommandExPxArgument(expiration, mockSetParams)).isNotNull();
+
+		verify(mockSetParams, times(1)).pxAt(eq(10L));
+		verifyNoMoreInteractions(mockSetParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToSetEXAT() {
 
-		assertThat(JedisConverters.toSetCommandExPxArgument(Expiration.unixTimestamp(1, TimeUnit.MINUTES)))
-				.extracting(SetParams::toString).isEqualTo(SetParams.setParams().exAt(60).toString());
+		SetParams mockSetParams = mock(SetParams.class);
+
+		doReturn(mockSetParams).when(mockSetParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toSetCommandExPxArgument(expiration, mockSetParams)).isNotNull();
+
+		verify(mockSetParams, times(1)).exAt(eq(60L));
+		verifyNoMoreInteractions(mockSetParams);
 	}
 
 	@Test // DATAREDIS-316, DATAREDIS-749
@@ -241,43 +268,91 @@ void toSetCommandNxXxOptionShouldReturnEmptyArrayforUpsert() {
 	@Test // GH-2050
 	void convertsExpirationToGetExEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.seconds(10))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().ex(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).ex(anyLong());
+
+		Expiration expiration = Expiration.seconds(10);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).ex(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationWithTimeUnitToGetExEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.from(1, TimeUnit.MINUTES))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().ex(60).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).ex(anyLong());
+
+		Expiration expiration = Expiration.from(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).ex(eq(60L)); // seconds
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExPEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.milliseconds(10))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().px(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).px(anyLong());
+
+		Expiration expiration = Expiration.milliseconds(10L);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).px(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExEXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(10, TimeUnit.SECONDS)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().exAt(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.SECONDS);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).exAt(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationWithTimeUnitToGetExEXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(1, TimeUnit.MINUTES)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().exAt(60).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).exAt(eq(60L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExPXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().pxAt(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).pxAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).pxAt(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	private void verifyRedisServerInfo(RedisServer server, Map values) {
@@ -289,18 +364,22 @@ private void verifyRedisServerInfo(RedisServer server, Map value
 
 	private static String toString(SetParams setParams) {
 
-		StringBuilder builder = new StringBuilder();
+		StringBuilder stringBuilder = new StringBuilder();
 
-		for (byte[] parameter : setParams.getByteParams()) {
+		stringBuilder.append(toString((Protocol.Keyword) ReflectionTestUtils.getField(setParams, "existance")));
+		stringBuilder.append(toString((Protocol.Keyword) ReflectionTestUtils.getField(setParams, "expiration")));
 
-			if (builder.length() != 0) {
-				builder.append(' ');
-			}
+		Long expirationValue = (Long) ReflectionTestUtils.getField(setParams, "expirationValue");
 
-			builder.append(new String(parameter));
+		if (expirationValue != null) {
+			stringBuilder.append(" ").append(expirationValue);
 		}
 
-		return builder.toString();
+		return stringBuilder.toString().trim();
+	}
+
+	private static String toString(@Nullable Enum value) {
+		return value != null ? value.name().toLowerCase() : "";
 	}
 
 	private Map getRedisServerInfoMap(String name, int port) {
diff --git a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java
index 58e3ee3c31..b8963b5618 100644
--- a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java
@@ -78,10 +78,11 @@ public DefaultStreamOperationsIntegrationTests(RedisTemplate redisTemplate
 	public static Collection testParams() {
 
 		List params = new ArrayList<>();
+
 		params.addAll(AbstractOperationsTestParams
 				.testParams(JedisConnectionFactoryExtension.getConnectionFactory(RedisStanalone.class)));
 
-		if(RedisDetector.isClusterAvailable()) {
+		if (RedisDetector.isClusterAvailable()) {
 			params.addAll(AbstractOperationsTestParams
 					.testParams(JedisConnectionFactoryExtension.getConnectionFactory(RedisCluster.class)));
 		}
@@ -89,7 +90,7 @@ public static Collection testParams() {
 		params.addAll(AbstractOperationsTestParams
 				.testParams(LettuceConnectionFactoryExtension.getConnectionFactory(RedisStanalone.class)));
 
-		if(RedisDetector.isClusterAvailable()) {
+		if (RedisDetector.isClusterAvailable()) {
 			params.addAll(AbstractOperationsTestParams
 					.testParams(LettuceConnectionFactoryExtension.getConnectionFactory(RedisCluster.class)));
 		}
@@ -398,11 +399,11 @@ void pendingShouldReadMessageDetails() {
 
 		K key = keyFactory.instance();
 		HK hashKey = hashKeyFactory.instance();
-		HV value = hashValueFactory.instance();
+		HV hashValue = hashValueFactory.instance();
 
-		RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value));
-		streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group");
+		RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, hashValue));
 
+		streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group");
 		streamOps.read(Consumer.from("my-group", "my-consumer"),
 				StreamOffset.create(key, ReadOffset.lastConsumed()));