From 230a03aebb8843059781c67cc1843d061375bb7b Mon Sep 17 00:00:00 2001 From: leisurelyrcxf <145091193+okg-cxf@users.noreply.github.com> Date: Fri, 6 Dec 2024 21:51:00 +0800 Subject: [PATCH] fix: prevent blocking event loop thread by replacing ArrayDeque with HashIndexedQueue (#2953) * fix: prevent long stall during reconnection/disconnection by providing LinkedHashSetQueue instead of ArrayDeque for CommandHandler#stack * Polishing * Missed out a file --------- Co-authored-by: Tihomir Mateev --- .../java/io/lettuce/core/ClientOptions.java | 39 ++- .../datastructure/queue/HashIndexedQueue.java | 266 ++++++++++++++++++ .../lettuce/core/protocol/CommandHandler.java | 10 +- .../queue/HashIndexedQueueTests.java | 185 ++++++++++++ 4 files changed, 491 insertions(+), 9 deletions(-) create mode 100644 src/main/java/io/lettuce/core/datastructure/queue/HashIndexedQueue.java create mode 100644 src/test/java/io/lettuce/core/datastructure/queue/HashIndexedQueueTests.java diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index 9f1f1c33d9..52a5a950da 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -85,6 +85,8 @@ public class ClientOptions implements Serializable { public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.enabled(); + public static final boolean DEFAULT_USE_HASH_INDEX_QUEUE = true; + private final boolean autoReconnect; private final boolean cancelCommandsOnReconnectFailure; @@ -115,6 +117,8 @@ public class ClientOptions implements Serializable { private final TimeoutOptions timeoutOptions; + private final boolean useHashIndexedQueue; + protected ClientOptions(Builder builder) { this.autoReconnect = builder.autoReconnect; this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure; @@ -131,6 +135,7 @@ protected ClientOptions(Builder builder) { this.sslOptions = builder.sslOptions; this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure; this.timeoutOptions = builder.timeoutOptions; + this.useHashIndexedQueue = builder.useHashIndexedQueue; } protected ClientOptions(ClientOptions original) { @@ -149,6 +154,7 @@ protected ClientOptions(ClientOptions original) { this.sslOptions = original.getSslOptions(); this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure(); this.timeoutOptions = original.getTimeoutOptions(); + this.useHashIndexedQueue = original.isUseHashIndexedQueue(); } /** @@ -214,6 +220,8 @@ public static class Builder { private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS; + private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE; + protected Builder() { } @@ -269,8 +277,8 @@ public Builder bufferUsageRatio(int bufferUsageRatio) { * * @param policy the policy to use in {@link io.lettuce.core.protocol.CommandHandler} * @return {@code this} - * @since 6.0 * @see DecodeBufferPolicies + * @since 6.0 */ public Builder decodeBufferPolicy(DecodeBufferPolicy policy) { @@ -317,8 +325,8 @@ public Builder pingBeforeActivateConnection(boolean pingBeforeActivateConnection * * @param protocolVersion version to use. * @return {@code this} - * @since 6.0 * @see ProtocolVersion#newestSupported() + * @since 6.0 */ public Builder protocolVersion(ProtocolVersion protocolVersion) { @@ -337,9 +345,9 @@ public Builder protocolVersion(ProtocolVersion protocolVersion) { * * @param publishOnScheduler true/false * @return {@code this} - * @since 5.2 * @see org.reactivestreams.Subscriber#onNext(Object) * @see ClientResources#eventExecutorGroup() + * @since 5.2 */ public Builder publishOnScheduler(boolean publishOnScheduler) { this.publishOnScheduler = publishOnScheduler; @@ -459,6 +467,20 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) { return this; } + /** + * Use hash indexed queue, which provides O(1) remove(Object) thus won't cause blocking issues. + * + * @param useHashIndexedQueue true/false + * @return {@code this} + * @see io.lettuce.core.protocol.CommandHandler.AddToStack + * @since 6.6 + */ + @SuppressWarnings("JavadocReference") + public Builder useHashIndexQueue(boolean useHashIndexedQueue) { + this.useHashIndexedQueue = useHashIndexedQueue; + return this; + } + /** * Create a new instance of {@link ClientOptions}. * @@ -476,7 +498,6 @@ public ClientOptions build() { * * @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are replicated from the * current {@link ClientOptions}. - * * @since 5.1 */ public ClientOptions.Builder mutate() { @@ -535,7 +556,6 @@ public DecodeBufferPolicy getDecodeBufferPolicy() { * * @return zero. * @since 5.2 - * * @deprecated since 6.0 in favor of {@link DecodeBufferPolicy}. */ @Deprecated @@ -684,6 +704,15 @@ public TimeoutOptions getTimeoutOptions() { return timeoutOptions; } + /** + * Whether we should use hash indexed queue, which provides O(1) remove(Object) + * + * @return if hash indexed queue should be used + */ + public boolean isUseHashIndexedQueue() { + return useHashIndexedQueue; + } + /** * Behavior of connections in disconnected state. */ diff --git a/src/main/java/io/lettuce/core/datastructure/queue/HashIndexedQueue.java b/src/main/java/io/lettuce/core/datastructure/queue/HashIndexedQueue.java new file mode 100644 index 0000000000..b265ba456a --- /dev/null +++ b/src/main/java/io/lettuce/core/datastructure/queue/HashIndexedQueue.java @@ -0,0 +1,266 @@ +/* + * Copyright 2011-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + */ +package io.lettuce.core.datastructure.queue; + +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import io.lettuce.core.internal.LettuceAssert; +import org.jetbrains.annotations.NotNull; + +/** + * A queue implementation that supports O(1) removal of elements. The queue is backed by a hash map and a doubly linked list. + * + * @author chenxiaofan + */ +@SuppressWarnings("unchecked") +public class HashIndexedQueue extends AbstractQueue { + + private final Map map; // Object can be Node or List> + + private Node head; + + private Node tail; + + private int size; + + private static class Node { + + E value; + + Node next; + + Node prev; + + Node(E value) { + this.value = value; + } + + } + + /** + * Create a new instance of the {@link HashIndexedQueue}. + */ + public HashIndexedQueue() { + map = new HashMap<>(); + size = 0; + } + + @Override + public boolean add(E e) { + return offer(e); + } + + @Override + public boolean offer(E e) { + final Node newNode = new Node<>(e); + if (tail == null) { + head = tail = newNode; + } else { + tail.next = newNode; + newNode.prev = tail; + tail = newNode; + } + + if (!map.containsKey(e)) { + map.put(e, newNode); + } else { + Object current = map.get(e); + if (current instanceof Node) { + List> nodes = new ArrayList<>(); + nodes.add((Node) current); + nodes.add(newNode); + map.put(e, nodes); + } else { + ((List>) current).add(newNode); + } + } + size++; + return true; + } + + @Override + public E poll() { + if (head == null) { + return null; + } + E value = head.value; + removeNodeFromMap(head); + head = head.next; + if (head == null) { + tail = null; + } else { + head.prev = null; + } + size--; + return value; + } + + @Override + public E peek() { + if (head == null) { + return null; + } + return head.value; + } + + @Override + public boolean remove(Object o) { + return removeFirstOccurrence(o); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean contains(Object o) { + return map.containsKey(o); + } + + public class Iterator implements java.util.Iterator { + + private Node current; + + private Node prev; + + private Iterator() { + current = HashIndexedQueue.this.head; + prev = null; + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public E next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + E value = current.value; + prev = current; + current = current.next; + return value; + } + + @Override + public void remove() { + if (prev != null) { + removeNodeFromMap(prev); + removeNode(prev); + size--; + // remove once + prev = null; + } + } + + } + + @NotNull + @Override + public Iterator iterator() { + return new Iterator(); + } + + @Override + public boolean removeAll(Collection c) { + boolean modified = false; + for (Object e : c) { + if (removeAllOccurrences(e)) { + modified = true; + } + } + return modified; + } + + @Override + public void clear() { + head = null; + tail = null; + map.clear(); + size = 0; + } + + private boolean removeFirstOccurrence(Object element) { + Object current = map.get(element); + if (current == null) { + return false; + } + if (current instanceof Node) { + Node node = (Node) current; + removeNode(node); + map.remove(element); + } else { + List> nodes = (List>) current; + Node node = nodes.remove(0); + if (nodes.isEmpty()) { + map.remove(element); + } + removeNode(node); + } + size--; + return true; + } + + private boolean removeAllOccurrences(Object element) { + Object current = map.get(element); + if (current == null) { + return false; + } + if (current instanceof Node) { + final Node node = (Node) current; + removeNode(node); + size--; + } else { + final List> nodes = (List>) current; + for (Node node : nodes) { + removeNode(node); + size--; + } + } + map.remove(element); + return true; + } + + private void removeNode(Node node) { + if (node.prev != null) { + node.prev.next = node.next; + } else { + head = node.next; + } + if (node.next != null) { + node.next.prev = node.prev; + } else { + tail = node.prev; + } + } + + private void removeNodeFromMap(Node node) { + E value = node.value; + Object current = map.get(value); + if (current instanceof Node) { + LettuceAssert.assertState(current == node, "current != node"); + map.remove(value); + } else { + List> nodes = (List>) current; + final boolean removed = nodes.remove(node); + LettuceAssert.assertState(removed, "!nodes.remove(node)"); + if (nodes.isEmpty()) { + map.remove(value); + } + } + } + +} diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 063a0ab218..6791be04c4 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -43,6 +43,7 @@ import io.lettuce.core.RedisURI; import io.lettuce.core.api.push.PushListener; import io.lettuce.core.api.push.PushMessage; +import io.lettuce.core.datastructure.queue.HashIndexedQueue; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceSets; import io.lettuce.core.metrics.CommandLatencyRecorder; @@ -98,7 +99,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom private final Endpoint endpoint; - private final ArrayDeque> stack = new ArrayDeque<>(); + private final Queue> stack; private final long commandHandlerId = COMMAND_HANDLER_COUNTER.incrementAndGet(); @@ -157,6 +158,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc this.commandLatencyRecorder = clientResources.commandLatencyRecorder(); this.latencyMetricsEnabled = commandLatencyRecorder.isEnabled(); this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE; + this.stack = clientOptions.isUseHashIndexedQueue() ? new HashIndexedQueue<>() : new ArrayDeque<>(); Tracing tracing = clientResources.tracing(); @@ -1041,7 +1043,7 @@ protected AddToStack newObject(Handle handle) { private final Recycler.Handle handle; - private ArrayDeque stack; + private Queue> stack; private RedisCommand command; @@ -1057,11 +1059,11 @@ protected AddToStack newObject(Handle handle) { * @return */ @SuppressWarnings("unchecked") - static AddToStack newInstance(ArrayDeque stack, RedisCommand command) { + static AddToStack newInstance(Queue> stack, RedisCommand command) { AddToStack entry = RECYCLER.get(); - entry.stack = (ArrayDeque) stack; + entry.stack = stack; entry.command = command; return entry; diff --git a/src/test/java/io/lettuce/core/datastructure/queue/HashIndexedQueueTests.java b/src/test/java/io/lettuce/core/datastructure/queue/HashIndexedQueueTests.java new file mode 100644 index 0000000000..8d837c048f --- /dev/null +++ b/src/test/java/io/lettuce/core/datastructure/queue/HashIndexedQueueTests.java @@ -0,0 +1,185 @@ +package io.lettuce.core.datastructure.queue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.NoSuchElementException; + +import io.lettuce.test.LettuceExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SuppressWarnings("ConstantValue") +@ExtendWith(LettuceExtension.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class HashIndexedQueueTests { + + private HashIndexedQueue queue; + + @BeforeEach + void setUp() { + queue = new HashIndexedQueue<>(); + } + + @Test + void testInitialization() { + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); + } + + @Test + void testAddAndOffer() { + assertTrue(queue.add(1)); + assertTrue(queue.offer(2)); + assertEquals(2, queue.size()); + assertTrue(queue.contains(1)); + assertTrue(queue.contains(2)); + } + + @Test + void testRemoveAndPoll() { + queue.add(1); + queue.add(2); + assertEquals(1, queue.remove()); + assertEquals(1, queue.size()); + assertEquals(2, queue.poll()); + assertTrue(queue.isEmpty()); + } + + @Test + void testPeekAndElement() { + queue.add(1); + queue.add(2); + assertEquals(1, queue.peek()); + assertEquals(1, queue.element()); + queue.remove(); + assertEquals(2, queue.peek()); + assertEquals(2, queue.element()); + } + + @Test + void testContains() { + queue.add(1); + assertTrue(queue.contains(1)); + assertFalse(queue.contains(2)); + } + + @Test + void testSizeAndIsEmpty() { + assertTrue(queue.isEmpty()); + queue.add(1); + assertFalse(queue.isEmpty()); + assertEquals(1, queue.size()); + } + + @Test + void testClear() { + queue.add(1); + queue.add(2); + queue.clear(); + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); + } + + @Test + void testRemove() { + queue.add(1); + queue.add(1); + queue.add(2); + queue.add(1); + assertTrue(queue.remove(1)); + assertEquals(3, queue.size()); + assertTrue(queue.contains(1)); + assertTrue(queue.contains(2)); + assertTrue(queue.remove(1)); + assertEquals(2, queue.size()); + assertTrue(queue.remove(1)); + assertEquals(1, queue.size()); + assertTrue(queue.contains(2)); + } + + @Test + void testRemoveAll() { + queue.add(1); + queue.add(1); + queue.add(2); + queue.add(1); + assertTrue(queue.removeAll(Collections.singletonList(1))); + assertEquals(1, queue.size()); + assertTrue(queue.contains(2)); + } + + @Test + void testRemoveAll2() { + queue.add(1); + queue.add(1); + queue.add(1); + queue.add(1); + assertTrue(queue.removeAll(Collections.singletonList(1))); + assertEquals(0, queue.size()); + } + + @Test + void testRemoveAllAndRetainAll() { + queue.addAll(Arrays.asList(1, 2, 3, 4)); + queue.removeAll(Arrays.asList(1, 2)); + assertEquals(2, queue.size()); + assertFalse(queue.contains(1)); + assertFalse(queue.contains(2)); + queue.retainAll(Collections.singletonList(3)); + assertEquals(1, queue.size()); + assertTrue(queue.contains(3)); + assertFalse(queue.contains(4)); + } + + @Test + void testRetainAll() { + queue.addAll(Arrays.asList(1, 2, 3, 4, 1, 2, 3, 4)); + queue.retainAll(Arrays.asList(1, 2)); + assertEquals(4, queue.size()); + HashIndexedQueue.Iterator iterator = queue.iterator(); + iterator.remove(); + assertEquals(1, iterator.next()); + assertEquals(2, iterator.next()); + assertEquals(1, iterator.next()); + assertEquals(2, iterator.next()); + iterator.remove(); + // no effect + iterator.remove(); + iterator.remove(); + + iterator = queue.iterator(); + assertEquals(1, iterator.next()); + assertEquals(2, iterator.next()); + assertEquals(1, iterator.next()); + assertFalse(iterator.hasNext()); + } + + @Test + void testRemoveNonExistentElement() { + assertThrows(NoSuchElementException.class, () -> queue.remove()); + } + + @Test + void testPollEmptyQueue() { + assertNull(queue.poll()); + } + + @Test + void testElementEmptyQueue() { + assertThrows(NoSuchElementException.class, () -> queue.element()); + } + + @Test + void testPeekEmptyQueue() { + assertNull(queue.peek()); + } + +}