Skip to content

Commit

Permalink
chore: cancel commands in initialState()/reset() if not using mpsc queue
Browse files Browse the repository at this point in the history
  • Loading branch information
okg-cxf committed Aug 14, 2024
1 parent cc01006 commit e8d515a
Showing 1 changed file with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {

private final int batchSize;

private final boolean usesMpscQueue;

/**
* Create a new {@link AutoBatchFlushEndpoint}.
*
Expand All @@ -193,8 +195,8 @@ protected DefaultAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResou
this.rejectCommandsWhileDisconnected = isRejectCommand(clientOptions);
long endpointId = ENDPOINT_COUNTER.incrementAndGet();
this.cachedEndpointId = "0x" + Long.toHexString(endpointId);
this.taskQueue = clientOptions.getAutoBatchFlushOptions().usesMpscQueue() ? new JcToolsUnboundedMpscOfferFirstQueue<>()
: new ConcurrentLinkedOfferFirstQueue<>();
this.usesMpscQueue = clientOptions.getAutoBatchFlushOptions().usesMpscQueue();
this.taskQueue = usesMpscQueue ? new JcToolsUnboundedMpscOfferFirstQueue<>() : new ConcurrentLinkedOfferFirstQueue<>();
this.canFire = false;
this.callbackOnClose = callbackOnClose;
this.writeSpinCount = clientOptions.getAutoBatchFlushOptions().getWriteSpinCount();
Expand Down Expand Up @@ -553,7 +555,10 @@ public void reset() {
if (chan.context.initialState.isConnected()) {
chan.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset());
}
// Unsafe to call cancelBufferedCommands() here.
if (!usesMpscQueue) {
cancelCommands("reset");
}
// Otherwise, unsafe to call cancelBufferedCommands() here.
}

private void resetInternal() {
Expand All @@ -574,8 +579,10 @@ private void resetInternal() {
*/
@Override
public void initialState() {
// Unsafe to call cancelCommands() here.
// No need to cancel.
if (!usesMpscQueue) {
cancelCommands("initialState");
}
// Otherwise, unsafe to call cancelBufferedCommands() here.
ContextualChannel currentChannel = this.channel;
if (currentChannel.context.initialState.isConnected()) {
ChannelFuture close = currentChannel.close();
Expand Down

0 comments on commit e8d515a

Please sign in to comment.