From 207563380941d577b998704f28759b6c8a9f869f Mon Sep 17 00:00:00 2001 From: John Blum Date: Thu, 15 Jun 2023 19:17:35 -0700 Subject: [PATCH] Upgrade to Jedis 5.0 Adapt to API changes in the Jedis 5.0 driver. Fix bzPopMaxShouldWorkCorrectly() and bzPopMinShouldWorkCorrectly() tests in JedisClusterConnectionTests. Jedis 5.0 changed the bzpopmax and bzpopmin Redis commands to no longer return an empty (Array)List internally when evaluating and popping from an empty sorted set. A NullPointerException will be thrown if either bzpopmax or bzpopmin commands are executd on an empty Redis sorted set in Jedis 5.0 (vs. Jedis 4.x): Closes #2612 Original pull request: #2716 --- pom.xml | 2 +- .../jedis/JedisClusterHashCommands.java | 11 +- .../jedis/JedisClusterServerCommands.java | 24 ++-- .../jedis/JedisClusterStreamCommands.java | 18 ++- .../jedis/JedisClusterZSetCommands.java | 50 +++---- .../connection/jedis/JedisConnection.java | 7 +- .../connection/jedis/JedisConverters.java | 50 ++++--- .../connection/jedis/JedisHashCommands.java | 17 ++- .../redis/connection/jedis/JedisInvoker.java | 31 ++--- .../connection/jedis/JedisZSetCommands.java | 34 ++--- .../redis/connection/zset/DefaultTuple.java | 9 +- .../jedis/JedisClusterConnectionTests.java | 4 +- .../jedis/JedisConnectionUnitTests.java | 16 +-- .../jedis/JedisConvertersUnitTests.java | 127 ++++++++++++++---- 14 files changed, 257 insertions(+), 143 deletions(-) diff --git a/pom.xml b/pom.xml index 778dd177dc..97ebc2565e 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ 1.4.20 2.11.1 6.2.6.RELEASE - 4.4.5 + 5.0.1 1.01 4.1.96.Final spring.data.redis diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java index 06bbf2a569..1bfbfe550d 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java @@ -30,13 +30,15 @@ import org.springframework.data.redis.core.ScanCursor; import org.springframework.data.redis.core.ScanIteration; import org.springframework.data.redis.core.ScanOptions; -import org.springframework.data.util.Streamable; import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** + * Cluster {@link RedisHashCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 2.0 */ class JedisClusterHashCommands implements RedisHashCommands { @@ -160,8 +162,8 @@ public Entry hRandFieldWithValues(byte[] key) { Assert.notNull(key, "Key must not be null"); try { - Map map = connection.getCluster().hrandfieldWithValues(key, 1); - return map.isEmpty() ? null : map.entrySet().iterator().next(); + List> mapEntryList = connection.getCluster().hrandfieldWithValues(key, 1); + return mapEntryList.isEmpty() ? null : mapEntryList.get(0); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -185,8 +187,7 @@ public List hRandField(byte[] key, long count) { public List> hRandFieldWithValues(byte[] key, long count) { try { - Map map = connection.getCluster().hrandfieldWithValues(key, count); - return Streamable.of(() -> map.entrySet().iterator()).toList(); + return connection.getCluster().hrandfieldWithValues(key, count); } catch (Exception ex) { throw convertJedisAccessException(ex); } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java index 43b83bf874..5e920cc85c 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -249,21 +250,26 @@ public Properties getConfig(String pattern) { Assert.notNull(pattern, "Pattern must not be null"); - List>> mapResult = connection.getClusterCommandExecutor() - .executeCommandOnAllNodes((JedisClusterCommandCallback>) client -> client.configGet(pattern)) + JedisClusterCommandCallback> command = jedis -> jedis.configGet(pattern); + + List>> nodeResults = connection.getClusterCommandExecutor() + .executeCommandOnAllNodes(command) .getResults(); - List result = new ArrayList<>(); - for (NodeResult> entry : mapResult) { + Properties nodesConfiguration = new Properties(); + + for (NodeResult> nodeResult : nodeResults) { + + String prefix = nodeResult.getNode().asString(); - String prefix = entry.getNode().asString(); - int i = 0; - for (String value : entry.getValue()) { - result.add((i++ % 2 == 0 ? (prefix + ".") : "") + value); + for (Entry entry : nodeResult.getValue().entrySet()) { + String newKey = prefix.concat(".").concat(entry.getKey()); + String value = entry.getValue(); + nodesConfiguration.setProperty(newKey, value); } } - return Converters.toProperties(result); + return nodesConfiguration; } @Override diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index 11223cb2d5..f9372ba9a4 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -17,9 +17,11 @@ import static org.springframework.data.redis.connection.jedis.StreamConverters.*; +import org.springframework.util.StringUtils; import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XClaimParams; +import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; @@ -269,9 +271,19 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op try { - List response = connection.getCluster().xpending(key, group, - JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)), - options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName())); + @SuppressWarnings("all") + XPendingParams pendingParams = new XPendingParams( + JedisConverters.toBytes(StreamConverters.getLowerValue(range)), + JedisConverters.toBytes(StreamConverters.getUpperValue(range)), + options.getCount().intValue()); + + String consumerName = options.getConsumerName(); + + if (StringUtils.hasText(consumerName)) { + pendingParams = pendingParams.consumer(consumerName); + } + + List response = connection.getCluster().xpending(key, group, pendingParams); return StreamConverters.toPendingMessages(groupName, range, BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(response)); diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java index 4d0923e43d..c090186086 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java @@ -20,6 +20,7 @@ import redis.clients.jedis.params.ZParams; import redis.clients.jedis.params.ZRangeParams; import redis.clients.jedis.resps.ScanResult; +import redis.clients.jedis.util.KeyValue; import java.util.ArrayList; import java.util.LinkedHashSet; @@ -34,7 +35,6 @@ import org.springframework.data.redis.connection.RedisZSetCommands; import org.springframework.data.redis.connection.convert.SetConverter; import org.springframework.data.redis.connection.zset.Aggregate; -import org.springframework.data.redis.connection.zset.DefaultTuple; import org.springframework.data.redis.connection.zset.Tuple; import org.springframework.data.redis.connection.zset.Weights; import org.springframework.data.redis.core.Cursor; @@ -46,18 +46,22 @@ import org.springframework.util.Assert; /** + * Cluster {@link RedisZSetCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch * @author Clement Ong * @author Andrey Shlykov * @author Jens Deppe * @author Shyngys Sapraliyev + * @author John Blum * @since 2.0 */ class JedisClusterZSetCommands implements RedisZSetCommands { - private static final SetConverter TUPLE_SET_CONVERTER = new SetConverter<>( - JedisConverters::toTuple); + private static final SetConverter TUPLE_SET_CONVERTER = + new SetConverter<>(JedisConverters::toTuple); + private final JedisClusterConnection connection; JedisClusterZSetCommands(JedisClusterConnection connection) { @@ -818,7 +822,7 @@ public Set zDiff(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return connection.getCluster().zdiff(sets); + return JedisConverters.toSet(connection.getCluster().zdiff(sets)); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -835,7 +839,7 @@ public Set zDiffWithScores(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters.toTupleSet(connection.getCluster().zdiffWithScores(sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster().zdiffWithScores(sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -872,7 +876,7 @@ public Set zInter(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return connection.getCluster().zinter(new ZParams(), sets); + return JedisConverters.toSet(connection.getCluster().zinter(new ZParams(), sets)); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -889,7 +893,8 @@ public Set zInterWithScores(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters.toTupleSet(connection.getCluster().zinterWithScores(new ZParams(), sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zinterWithScores(new ZParams(), sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -909,8 +914,8 @@ public Set zInterWithScores(Aggregate aggregate, Weights weights, byte[]. if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters - .toTupleSet(connection.getCluster().zinterWithScores(toZParams(aggregate, weights), sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zinterWithScores(toZParams(aggregate, weights), sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -971,7 +976,7 @@ public Set zUnion(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return connection.getCluster().zunion(new ZParams(), sets); + return JedisConverters.toSet(connection.getCluster().zunion(new ZParams(), sets)); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -988,7 +993,8 @@ public Set zUnionWithScores(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters.toTupleSet(connection.getCluster().zunionWithScores(new ZParams(), sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zunionWithScores(new ZParams(), sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -1008,10 +1014,11 @@ public Set zUnionWithScores(Aggregate aggregate, Weights weights, byte[]. if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters - .toTupleSet(connection.getCluster().zunionWithScores(toZParams(aggregate, weights), sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zunionWithScores(toZParams(aggregate, weights), sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); + } } @@ -1126,21 +1133,14 @@ private static ZParams toZParams(Aggregate aggregate, Weights weights) { return new ZParams().weights(weights.toArray()).aggregate(ZParams.Aggregate.valueOf(aggregate.name())); } - /** - * Workaround for broken Jedis BZPOP signature. - * - * @param bytes - * @return - */ @Nullable - @SuppressWarnings("unchecked") - private static Tuple toTuple(@Nullable List bytes) { + private static Tuple toTuple(@Nullable KeyValue keyValue) { - if (bytes == null || bytes.isEmpty()) { - return null; + if (keyValue != null) { + redis.clients.jedis.resps.Tuple tuple = keyValue.getValue(); + return tuple != null ? JedisConverters.toTuple(tuple) : null; } - return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2)))); + return null; } - } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index eaa1b5ba4e..a474bbb8fe 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -59,9 +59,6 @@ import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.CommandArguments; import redis.clients.jedis.CommandObject; @@ -130,6 +127,7 @@ public class JedisConnection extends AbstractRedisConnection { private final Log LOGGER = LogFactory.getLog(getClass()); + @SuppressWarnings("rawtypes") private List pipelinedResults = new ArrayList<>(); private final @Nullable Pool pool; @@ -348,7 +346,6 @@ public void close() throws DataAccessException { jedis.close(); } else { - doExceptionThrowingOperationSafely(jedis::quit, "Failed to quit during close"); doExceptionThrowingOperationSafely(jedis::disconnect, "Failed to disconnect during close"); } } @@ -480,6 +477,7 @@ public void discard() { public List exec() { try { + if (transaction == null) { throw new InvalidDataAccessApiUsageException("No ongoing transaction; Did you forget to call multi"); } @@ -489,6 +487,7 @@ public List exec() { return !CollectionUtils.isEmpty(results) ? new TransactionResultConverter<>(txResults, JedisExceptionConverter.INSTANCE).convert(results) : results; + } catch (Exception cause) { throw convertJedisAccessException(cause); } finally { diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java index 88b8a5eb4d..ac689c75ca 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -87,6 +88,8 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import redis.clients.jedis.util.SafeEncoder; + /** * Jedis type converters. * @@ -114,12 +117,22 @@ abstract class JedisConverters extends Converters { MINUS_BYTES = toBytes("-"); POSITIVE_INFINITY_BYTES = toBytes("+inf"); NEGATIVE_INFINITY_BYTES = toBytes("-inf"); + + } + + @Nullable + static Set toSet(@Nullable List list) { + return list != null ? new LinkedHashSet<>(list) : null; } public static Converter stringToBytes() { return JedisConverters::toBytes; } + static ListConverter stringListToByteList() { + return new ListConverter<>(stringToBytes()); + } + /** * {@link ListConverter} converting jedis {@link redis.clients.jedis.resps.Tuple} to {@link Tuple}. * @@ -129,16 +142,16 @@ static ListConverter tuplesToTuples() { return new ListConverter<>(JedisConverters::toTuple); } - static ListConverter stringListToByteList() { - return new ListConverter<>(stringToBytes()); + static Tuple toTuple(redis.clients.jedis.resps.Tuple source) { + return new DefaultTuple(source.getBinaryElement(), source.getScore()); } - static Set toTupleSet(Set source) { - return new SetConverter<>(JedisConverters::toTuple).convert(source); + static List toTupleList(List source) { + return tuplesToTuples().convert(source); } - public static Tuple toTuple(redis.clients.jedis.resps.Tuple source) { - return new DefaultTuple(source.getBinaryElement(), source.getScore()); + static Set toTupleSet(Set source) { + return new SetConverter<>(JedisConverters::toTuple).convert(source); } /** @@ -255,6 +268,7 @@ public static byte[][] toByteArrays(Map source) { public static SortingParams toSortingParams(@Nullable SortParameters params) { SortingParams jedisParams = null; + if (params != null) { jedisParams = new SortingParams(); byte[] byPattern = params.getByPattern(); @@ -278,6 +292,7 @@ public static SortingParams toSortingParams(@Nullable SortParameters params) { jedisParams.alpha(); } } + return jedisParams; } @@ -386,8 +401,10 @@ public static SetParams toSetCommandExPxArgument(Expiration expiration, SetParam * @since 2.6 */ static GetExParams toGetExParams(Expiration expiration) { + return toGetExParams(expiration, new GetExParams()); + } - GetExParams params = new GetExParams(); + static GetExParams toGetExParams(Expiration expiration, GetExParams params) { if (expiration.isPersistent()) { return params.persist(); @@ -584,18 +601,14 @@ static ZAddParams toZAddParams(ZAddArgs source) { return new ZAddParams(); } - ZAddParams target = new ZAddParams() { - - { - if (source.contains(ZAddArgs.Flag.GT)) { - addParam("gt"); - } - if (source.contains(ZAddArgs.Flag.LT)) { - addParam("lt"); - } - } - }; + ZAddParams target = new ZAddParams(); + if (source.contains(ZAddArgs.Flag.GT)) { + target.gt(); + } + if (source.contains(ZAddArgs.Flag.LT)) { + target.lt(); + } if (source.contains(ZAddArgs.Flag.XX)) { target.xx(); } @@ -605,6 +618,7 @@ static ZAddParams toZAddParams(ZAddArgs source) { if (source.contains(ZAddArgs.Flag.CH)) { target.ch(); } + return target; } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java index 4efaf0c85c..419d19d3f8 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java @@ -37,8 +37,11 @@ import org.springframework.util.Assert; /** + * {@link RedisHashCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 2.0 */ class JedisHashCommands implements RedisHashCommands { @@ -122,7 +125,7 @@ public Entry hRandFieldWithValues(byte[] key) { Assert.notNull(key, "Key must not be null"); return connection.invoke().from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, 1L) - .get(it -> it.isEmpty() ? null : it.entrySet().iterator().next()); + .get(mapEntryList -> mapEntryList.isEmpty() ? null : mapEntryList.get(0)); } @Nullable @@ -141,12 +144,16 @@ public List> hRandFieldWithValues(byte[] key, long count) Assert.notNull(key, "Key must not be null"); return connection.invoke() - .from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count).get(it -> { + .from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count) + .get(mapEntryList -> { + + List> convertedMapEntryList = new ArrayList<>(mapEntryList.size()); + + mapEntryList.forEach(entry -> + convertedMapEntryList.add(Converters.entryOf(entry.getKey(), entry.getValue()))); - List> entries = new ArrayList<>(it.size()); - it.forEach((k, v) -> entries.add(Converters.entryOf(k, v))); + return convertedMapEntryList; - return entries; }); } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java index 805efabe4b..ef1771969d 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java @@ -17,7 +17,6 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Queable; import redis.clients.jedis.Response; import redis.clients.jedis.Transaction; import redis.clients.jedis.commands.DatabasePipelineCommands; @@ -47,7 +46,7 @@ * composing a functional pipeline to transform the result using a {@link Converter}. *

* Usage example: - * + *

*

  * JedisInvoker invoker = …;
  *
@@ -62,6 +61,7 @@
  *
  * @author Mark Paluch
  * @author Christoph Strobl
+ * @author John Blum
  * @since 2.5
  */
 class JedisInvoker {
@@ -937,10 +937,7 @@ static class DefaultSingleInvocationSpec implements SingleInvocationSpec {
 
 		@Override
 		public  T get(Converter converter) {
-
-			Assert.notNull(converter, "Converter must not be null");
-
-			return synchronizer.invoke(parentFunction, parentPipelineFunction, converter, () -> null);
+			return getOrElse(converter, () -> null);
 		}
 
 		@Nullable
@@ -959,6 +956,7 @@ static class DefaultManyInvocationSpec implements ManyInvocationSpec {
 		private final Function>> parentPipelineFunction;
 		private final Synchronizer synchronizer;
 
+		@SuppressWarnings({ "rawtypes", "unchecked" })
 		DefaultManyInvocationSpec(Function> parentFunction,
 				Function>> parentPipelineFunction,
 				Synchronizer synchronizer) {
@@ -969,6 +967,7 @@ static class DefaultManyInvocationSpec implements ManyInvocationSpec {
 		}
 
 		@Override
+		@SuppressWarnings("all")
 		public  List toList(Converter converter) {
 
 			Assert.notNull(converter, "Converter must not be null");
@@ -981,15 +980,17 @@ public  List toList(Converter converter) {
 
 				List result = new ArrayList<>(source.size());
 
-				for (S s : source) {
-					result.add(converter.convert(s));
+				for (S element : source) {
+					result.add(converter.convert(element));
 				}
 
 				return result;
+
 			}, Collections::emptyList);
 		}
 
 		@Override
+		@SuppressWarnings("all")
 		public  Set toSet(Converter converter) {
 
 			Assert.notNull(converter, "Converter must not be null");
@@ -1002,11 +1003,12 @@ public  Set toSet(Converter converter) {
 
 				Set result = new LinkedHashSet<>(source.size());
 
-				for (S s : source) {
-					result.add(converter.convert(s));
+				for (S element : source) {
+					result.add(converter.convert(element));
 				}
 
 				return result;
+
 			}, Collections::emptySet);
 		}
 	}
@@ -1020,6 +1022,7 @@ interface Synchronizer {
 		@Nullable
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		default  T invoke(Function callFunction, Function> pipelineFunction) {
+
 			return (T) doInvoke((Function) callFunction, (Function) pipelineFunction, Converters.identityConverter(),
 					() -> null);
 		}
@@ -1046,15 +1049,13 @@ interface ResponseCommands extends PipelineBinaryCommands, DatabasePipelineComma
 	/**
 	 * Create a proxy to invoke methods dynamically on {@link Pipeline} or {@link Transaction} as those share many
 	 * commands that are not defined on a common super-type.
-	 *
-	 * @param pipelineOrTransaction
-	 * @return
 	 */
-	static ResponseCommands createCommands(Queable pipelineOrTransaction) {
+	static ResponseCommands createCommands(Object pipelineOrTransaction) {
 
 		ProxyFactory proxyFactory = new ProxyFactory(pipelineOrTransaction);
+
 		proxyFactory.addInterface(ResponseCommands.class);
+
 		return (ResponseCommands) proxyFactory.getProxy(JedisInvoker.class.getClassLoader());
 	}
-
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
index 1aab87de0a..a19247a8d1 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
@@ -22,6 +22,7 @@
 import redis.clients.jedis.params.ZParams;
 import redis.clients.jedis.params.ZRangeParams;
 import redis.clients.jedis.resps.ScanResult;
+import redis.clients.jedis.util.KeyValue;
 
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -31,7 +32,6 @@
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.data.redis.connection.RedisZSetCommands;
 import org.springframework.data.redis.connection.zset.Aggregate;
-import org.springframework.data.redis.connection.zset.DefaultTuple;
 import org.springframework.data.redis.connection.zset.Tuple;
 import org.springframework.data.redis.connection.zset.Weights;
 import org.springframework.data.redis.core.Cursor;
@@ -42,11 +42,14 @@
 import org.springframework.util.Assert;
 
 /**
+ * {@link RedisZSetCommands} implementation for Jedis.
+ *
  * @author Christoph Strobl
  * @author Clement Ong
  * @author Mark Paluch
  * @author Andrey Shlykov
  * @author Shyngys Sapraliyev
+ * @author John Blum
  * @since 2.0
  */
 class JedisZSetCommands implements RedisZSetCommands {
@@ -74,8 +77,10 @@ public Long zAdd(byte[] key, Set tuples, ZAddArgs args) {
 		Assert.notNull(key, "Key must not be null");
 		Assert.notNull(tuples, "Tuples must not be null");
 
-		return connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key, JedisConverters.toTupleMap(tuples),
-				JedisConverters.toZAddParams(args));
+		Long count = connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key,
+				JedisConverters.toTupleMap(tuples), JedisConverters.toZAddParams(args));
+
+		return count != null ? count : 0L;
 	}
 
 	@Override
@@ -424,7 +429,7 @@ public Set zDiff(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
-		return connection.invoke().just(Jedis::zdiff, PipelineBinaryCommands::zdiff, sets);
+		return connection.invoke().fromMany(Jedis::zdiff, PipelineBinaryCommands::zdiff, sets).toSet();
 	}
 
 	@Override
@@ -450,7 +455,7 @@ public Set zInter(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
-		return connection.invoke().just(Jedis::zinter, PipelineBinaryCommands::zinter, new ZParams(), sets);
+		return connection.invoke().fromMany(Jedis::zinter, PipelineBinaryCommands::zinter, new ZParams(), sets).toSet();
 	}
 
 	@Override
@@ -504,7 +509,7 @@ public Set zUnion(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
-		return connection.invoke().just(Jedis::zunion, PipelineBinaryCommands::zunion, new ZParams(), sets);
+		return connection.invoke().fromMany(Jedis::zunion, PipelineBinaryCommands::zunion, new ZParams(), sets).toSet();
 	}
 
 	@Override
@@ -772,21 +777,8 @@ static ZRangeParams toZRangeParams(Protocol.Keyword by, byte[] min, byte[] max,
 		return zRangeParams;
 	}
 
-	/**
-	 * Workaround for broken Jedis BZPOP signature.
-	 *
-	 * @param bytes
-	 * @return
-	 */
 	@Nullable
-	@SuppressWarnings("unchecked")
-	private static Tuple toTuple(List bytes) {
-
-		if (bytes.isEmpty()) {
-			return null;
-		}
-
-		return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2))));
+	private static Tuple toTuple(@Nullable KeyValue keyValue) {
+		return keyValue != null ? JedisConverters.toTuple(keyValue.getValue()) : null;
 	}
-
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java b/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
index eb2c198e8e..3862985bb8 100644
--- a/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
+++ b/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
@@ -24,17 +24,20 @@
  *
  * @author Costin Leau
  * @author Christoph Strobl
+ * @author John Blum
  */
 public class DefaultTuple implements Tuple {
 
+	private static final Double ZERO = 0.0d;
+
 	private final Double score;
 	private final byte[] value;
 
 	/**
-	 * Constructs a new DefaultTuple instance.
+	 * Constructs a new {@link DefaultTuple}.
 	 *
-	 * @param value
-	 * @param score
+	 * @param value {@link byte[]} of the member's raw value.
+	 * @param score {@link Double score} of the raw value used in sorting.
 	 */
 	public DefaultTuple(byte[] value, Double score) {
 
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java
index 78ffeb756e..13621d31fc 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java
@@ -2279,7 +2279,7 @@ public void zPopMinShouldWorkCorrectly() {
 	@EnabledOnCommand("BZPOPMIN")
 	public void bzPopMinShouldWorkCorrectly() {
 
-		assertThat(clusterConnection.bZPopMin(KEY_1_BYTES, 10, TimeUnit.MILLISECONDS)).isNull();
+		assertThat(clusterConnection.zSetCommands().zCard(KEY_1_BYTES)).isZero();
 
 		nativeConnection.zadd(KEY_1_BYTES, 10D, VALUE_1_BYTES);
 		nativeConnection.zadd(KEY_1_BYTES, 20D, VALUE_2_BYTES);
@@ -2306,7 +2306,7 @@ public void zPopMaxShouldWorkCorrectly() {
 	@EnabledOnCommand("BZPOPMAX")
 	public void bzPopMaxShouldWorkCorrectly() {
 
-		assertThat(clusterConnection.bZPopMax(KEY_1_BYTES, 10, TimeUnit.MILLISECONDS)).isNull();
+		assertThat(clusterConnection.zSetCommands().zCard(KEY_1_BYTES)).isZero();
 
 		nativeConnection.zadd(KEY_1_BYTES, 10D, VALUE_1_BYTES);
 		nativeConnection.zadd(KEY_1_BYTES, 20D, VALUE_2_BYTES);
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
index 2a39274473..7f61f9aea5 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
@@ -165,7 +165,7 @@ public void scanShouldKeepTheConnectionOpen() {
 
 			connection.scan(ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531, GH-2006
@@ -177,7 +177,7 @@ public void scanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.scan(ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -188,7 +188,7 @@ public void sScanShouldKeepTheConnectionOpen() {
 
 			connection.sScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -200,7 +200,7 @@ public void sScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.sScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -211,7 +211,7 @@ public void zScanShouldKeepTheConnectionOpen() {
 
 			connection.zScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -223,7 +223,7 @@ public void zScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.zScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -234,7 +234,7 @@ public void hScanShouldKeepTheConnectionOpen() {
 
 			connection.hScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -246,7 +246,7 @@ public void hScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor> cursor = connection.hScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-714
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
index eb37cf171d..7cbc1b6e92 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
@@ -16,7 +16,15 @@
 package org.springframework.data.redis.connection.jedis;
 
 import static org.assertj.core.api.Assertions.*;
-
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import redis.clients.jedis.Protocol;
 import redis.clients.jedis.params.GetExParams;
 import redis.clients.jedis.params.SetParams;
 
@@ -35,12 +43,15 @@
 import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
 import org.springframework.data.redis.core.types.Expiration;
 import org.springframework.data.redis.core.types.RedisClientInfo;
+import org.springframework.lang.Nullable;
+import org.springframework.test.util.ReflectionTestUtils;
 
 /**
  * Unit tests for {@link JedisConverters}.
  *
  * @author Christoph Strobl
  * @author Mark Paluch
+ * @author John Blum
  */
 class JedisConvertersUnitTests {
 
@@ -212,15 +223,31 @@ void toSetCommandExPxOptionShouldReturnEXforMilliseconds() {
 	@Test // GH-2050
 	void convertsExpirationToSetPXAT() {
 
-		assertThat(JedisConverters.toSetCommandExPxArgument(Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS)))
-				.extracting(SetParams::toString).isEqualTo(SetParams.setParams().pxAt(10).toString());
+		SetParams mockSetParams = mock(SetParams.class);
+
+		doReturn(mockSetParams).when(mockSetParams).pxAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS);
+
+		assertThat(JedisConverters.toSetCommandExPxArgument(expiration, mockSetParams)).isNotNull();
+
+		verify(mockSetParams, times(1)).pxAt(eq(10L));
+		verifyNoMoreInteractions(mockSetParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToSetEXAT() {
 
-		assertThat(JedisConverters.toSetCommandExPxArgument(Expiration.unixTimestamp(1, TimeUnit.MINUTES)))
-				.extracting(SetParams::toString).isEqualTo(SetParams.setParams().exAt(60).toString());
+		SetParams mockSetParams = mock(SetParams.class);
+
+		doReturn(mockSetParams).when(mockSetParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toSetCommandExPxArgument(expiration, mockSetParams)).isNotNull();
+
+		verify(mockSetParams, times(1)).exAt(eq(60L));
+		verifyNoMoreInteractions(mockSetParams);
 	}
 
 	@Test // DATAREDIS-316, DATAREDIS-749
@@ -241,43 +268,91 @@ void toSetCommandNxXxOptionShouldReturnEmptyArrayforUpsert() {
 	@Test // GH-2050
 	void convertsExpirationToGetExEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.seconds(10))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().ex(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).ex(anyLong());
+
+		Expiration expiration = Expiration.seconds(10);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).ex(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationWithTimeUnitToGetExEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.from(1, TimeUnit.MINUTES))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().ex(60).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).ex(anyLong());
+
+		Expiration expiration = Expiration.from(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).ex(eq(60L)); // seconds
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExPEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.milliseconds(10))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().px(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).px(anyLong());
+
+		Expiration expiration = Expiration.milliseconds(10L);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).px(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExEXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(10, TimeUnit.SECONDS)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().exAt(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.SECONDS);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).exAt(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationWithTimeUnitToGetExEXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(1, TimeUnit.MINUTES)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().exAt(60).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).exAt(eq(60L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExPXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().pxAt(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).pxAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).pxAt(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	private void verifyRedisServerInfo(RedisServer server, Map values) {
@@ -289,18 +364,22 @@ private void verifyRedisServerInfo(RedisServer server, Map value
 
 	private static String toString(SetParams setParams) {
 
-		StringBuilder builder = new StringBuilder();
+		StringBuilder stringBuilder = new StringBuilder();
 
-		for (byte[] parameter : setParams.getByteParams()) {
+		stringBuilder.append(toString((Protocol.Keyword) ReflectionTestUtils.getField(setParams, "existance")));
+		stringBuilder.append(toString((Protocol.Keyword) ReflectionTestUtils.getField(setParams, "expiration")));
 
-			if (builder.length() != 0) {
-				builder.append(' ');
-			}
+		Long expirationValue = (Long) ReflectionTestUtils.getField(setParams, "expirationValue");
 
-			builder.append(new String(parameter));
+		if (expirationValue != null) {
+			stringBuilder.append(" ").append(expirationValue);
 		}
 
-		return builder.toString();
+		return stringBuilder.toString().trim();
+	}
+
+	private static String toString(@Nullable Enum value) {
+		return value != null ? value.name().toLowerCase() : "";
 	}
 
 	private Map getRedisServerInfoMap(String name, int port) {