diff --git a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java index f9a83cb98b..8adf3d1851 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java @@ -170,6 +170,8 @@ protected static void cancelCommandOnEndpointClose(RedisCommand cmd) { private final int batchSize; + private final boolean usesMpscQueue; + /** * Create a new {@link AutoBatchFlushEndpoint}. * @@ -193,8 +195,8 @@ protected DefaultAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResou this.rejectCommandsWhileDisconnected = isRejectCommand(clientOptions); long endpointId = ENDPOINT_COUNTER.incrementAndGet(); this.cachedEndpointId = "0x" + Long.toHexString(endpointId); - this.taskQueue = clientOptions.getAutoBatchFlushOptions().usesMpscQueue() ? new JcToolsUnboundedMpscOfferFirstQueue<>() - : new ConcurrentLinkedOfferFirstQueue<>(); + this.usesMpscQueue = clientOptions.getAutoBatchFlushOptions().usesMpscQueue(); + this.taskQueue = usesMpscQueue ? new JcToolsUnboundedMpscOfferFirstQueue<>() : new ConcurrentLinkedOfferFirstQueue<>(); this.canFire = false; this.callbackOnClose = callbackOnClose; this.writeSpinCount = clientOptions.getAutoBatchFlushOptions().getWriteSpinCount(); @@ -553,7 +555,10 @@ public void reset() { if (chan.context.initialState.isConnected()) { chan.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset()); } - // Unsafe to call cancelBufferedCommands() here. + if (!usesMpscQueue) { + cancelCommands("reset"); + } + // Otherwise, unsafe to call cancelBufferedCommands() here. } private void resetInternal() { @@ -574,8 +579,10 @@ private void resetInternal() { */ @Override public void initialState() { - // Unsafe to call cancelCommands() here. - // No need to cancel. + if (!usesMpscQueue) { + cancelCommands("initialState"); + } + // Otherwise, unsafe to call cancelBufferedCommands() here. ContextualChannel currentChannel = this.channel; if (currentChannel.context.initialState.isConnected()) { ChannelFuture close = currentChannel.close();