From a6f2b29dc0743785bda6823a1fc9f68b44a87150 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Tue, 2 Jul 2024 15:21:47 -0700 Subject: [PATCH 01/18] Add backend for async `getPubSubMessage`. Signed-off-by: Yury-Fridlyand --- .../src/main/java/glide/api/BaseClient.java | 21 ++--- .../connectors/handlers/MessageHandler.java | 3 +- .../handlers/PubSubMessageQueue.java | 82 +++++++++++++++++++ .../src/test/java/glide/PubSubTests.java | 7 +- 4 files changed, 98 insertions(+), 15 deletions(-) create mode 100644 java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index fa3ba82abe..f9e441593a 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -3,8 +3,6 @@ import static glide.api.models.GlideString.gs; import static glide.api.models.commands.SortBaseOptions.STORE_COMMAND_STRING; -import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldReadOnlySubCommands; -import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldSubCommands; import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldArgs; import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldGlideStringArgs; import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API; @@ -250,6 +248,7 @@ import glide.connectors.handlers.CallbackDispatcher; import glide.connectors.handlers.ChannelHandler; import glide.connectors.handlers.MessageHandler; +import glide.connectors.handlers.PubSubMessageQueue; import glide.connectors.resources.Platform; import glide.connectors.resources.ThreadPoolResource; import glide.connectors.resources.ThreadPoolResourceAllocator; @@ -265,14 +264,12 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.NotImplementedException; import response.ResponseOuterClass.ConstantResponse; import response.ResponseOuterClass.Response; @@ -298,7 +295,7 @@ public abstract class BaseClient protected final CommandManager commandManager; protected final ConnectionManager connectionManager; - protected final ConcurrentLinkedDeque messageQueue; + protected final PubSubMessageQueue messageQueue; protected final Optional subscriptionConfiguration; /** Helper which extracts data from received {@link Response}s from GLIDE. */ @@ -322,7 +319,7 @@ protected BaseClient(ClientBuilder builder) { protected static class ClientBuilder { private final ConnectionManager connectionManager; private final CommandManager commandManager; - private final ConcurrentLinkedDeque messageQueue; + private final PubSubMessageQueue messageQueue; private final Optional subscriptionConfiguration; } @@ -366,7 +363,7 @@ protected static CompletableFuture CreateClient( } /** - * Tries to return a next pubsub message. + * A blocking call to return a next pubsub message. * * @throws ConfigurationError If client is not subscribed to any channel or if client configured * with a callback. @@ -383,7 +380,12 @@ public PubSubMessage tryGetPubSubMessage() { "The operation will never complete since messages will be passed to the configured" + " callback."); } - return messageQueue.poll(); + try { + return messageQueue.pop().get(); + } catch (Exception unreachable) { + // should be never happen + return null; + } } /** @@ -405,8 +407,7 @@ public CompletableFuture getPubSubMessage() { "The operation will never complete since messages will be passed to the configured" + " callback."); } - throw new NotImplementedException( - "This feature will be supported in a future release of the GLIDE java client"); + return messageQueue.pop(); } /** diff --git a/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java index da2d16502a..a5c1002452 100644 --- a/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java @@ -9,7 +9,6 @@ import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -34,7 +33,7 @@ public class MessageHandler { private final BaseResponseResolver responseResolver; /** A message queue wrapper. */ - private final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); + private final PubSubMessageQueue queue = new PubSubMessageQueue(); /** Process a push (PUBSUB) message received as a part of {@link Response} from GLIDE. */ public void handle(Response response) { diff --git a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java new file mode 100644 index 0000000000..da053ce542 --- /dev/null +++ b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java @@ -0,0 +1,82 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.connectors.handlers; + +import glide.api.models.PubSubMessage; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Future; + +/** A FIFO message queue for {@link PubSubMessage} backed by {@link ConcurrentLinkedDeque}. */ +public class PubSubMessageQueue { + /** The queue itself. */ + private final ConcurrentLinkedDeque mq = new ConcurrentLinkedDeque<>(); + + /** + * The head of the queue stored aside as a {@link Future}. Stores a promise to the first message. + */ + private CompletableFuture head = new CompletableFuture<>(); + + /** An object to synchronize threads. */ + private final Object lock = 42; + + /** State of the queue. */ + private enum HeadState { + // `head` is an empty CF which was never given to a user + UNSET_UNREAD, + // `head` is a non-empty CF, which was never given to a user + SET_UNREAD, + // `head` is unset, but was given to a user + UNSET_READ, + // `head` was set, and given to a user + SET_READ, + } + + private HeadState state = HeadState.UNSET_UNREAD; + + /** Store a new message. */ + public void push(PubSubMessage message) { + synchronized (lock) { + switch (state) { + case UNSET_UNREAD: + head.complete(message); + state = HeadState.SET_UNREAD; + break; + case SET_UNREAD: + mq.push(message); + break; + case SET_READ: + case UNSET_READ: + head.complete(message); + head = new CompletableFuture<>(); + state = HeadState.SET_UNREAD; + break; + } + } + } + + /** Get a promise for the next message. */ + public CompletableFuture pop() { + synchronized (lock) { + CompletableFuture result = head; + switch (state) { + case UNSET_UNREAD: + case SET_READ: + state = HeadState.UNSET_READ; + break; + case SET_UNREAD: + head = new CompletableFuture<>(); + if (mq.isEmpty()) { + state = HeadState.SET_READ; + break; + } + head.complete(mq.pop()); + // no state change + break; + case UNSET_READ: + // no state change + break; + } + return result; + } + } +} diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 7bdb8870c2..7e4d25a8db 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -835,9 +835,10 @@ public void transaction_with_all_types_of_PubsubMessages( boolean standalone, boolean useCallback) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); - assumeTrue( - standalone, // TODO activate tests after fix - "Test doesn't work on cluster due to Cross Slot error, probably a bug in `redis-rs`"); + // assumeTrue( + // standalone, // TODO activate tests after fix + // "Test doesn't work on cluster due to Cross Slot error, probably a bug in + // `redis-rs`"); String prefix = "channel"; String pattern = prefix + "*"; From 148eaf8678ffb9744057fd436eb826e6cf8f2f46 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Tue, 2 Jul 2024 15:25:50 -0700 Subject: [PATCH 02/18] Revert some stuff. Signed-off-by: Yury-Fridlyand --- java/integTest/src/test/java/glide/PubSubTests.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 7e4d25a8db..7bdb8870c2 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -835,10 +835,9 @@ public void transaction_with_all_types_of_PubsubMessages( boolean standalone, boolean useCallback) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); - // assumeTrue( - // standalone, // TODO activate tests after fix - // "Test doesn't work on cluster due to Cross Slot error, probably a bug in - // `redis-rs`"); + assumeTrue( + standalone, // TODO activate tests after fix + "Test doesn't work on cluster due to Cross Slot error, probably a bug in `redis-rs`"); String prefix = "channel"; String pattern = prefix + "*"; From 512eaa7643be36940e5276d4ef6ed554cabee934 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Tue, 2 Jul 2024 23:22:56 -0700 Subject: [PATCH 03/18] Address PR comments. Signed-off-by: Yury-Fridlyand --- .../handlers/PubSubMessageQueue.java | 27 +++++++------------ .../src/test/java/glide/PubSubTests.java | 11 ++++---- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java index da053ce542..bc73737fde 100644 --- a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java +++ b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java @@ -2,14 +2,14 @@ package glide.connectors.handlers; import glide.api.models.PubSubMessage; +import java.util.LinkedList; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Future; -/** A FIFO message queue for {@link PubSubMessage} backed by {@link ConcurrentLinkedDeque}. */ +/** A FIFO message queue for {@link PubSubMessage}. */ public class PubSubMessageQueue { /** The queue itself. */ - private final ConcurrentLinkedDeque mq = new ConcurrentLinkedDeque<>(); + private final LinkedList messageQueue = new LinkedList<>(); /** * The head of the queue stored aside as a {@link Future}. Stores a promise to the first message. @@ -17,18 +17,16 @@ public class PubSubMessageQueue { private CompletableFuture head = new CompletableFuture<>(); /** An object to synchronize threads. */ - private final Object lock = 42; + private final Object lock = new Object(); /** State of the queue. */ private enum HeadState { - // `head` is an empty CF which was never given to a user + // `head` is a new CF, which was never given to a user UNSET_UNREAD, // `head` is a non-empty CF, which was never given to a user SET_UNREAD, // `head` is unset, but was given to a user UNSET_READ, - // `head` was set, and given to a user - SET_READ, } private HeadState state = HeadState.UNSET_UNREAD; @@ -37,14 +35,10 @@ private enum HeadState { public void push(PubSubMessage message) { synchronized (lock) { switch (state) { - case UNSET_UNREAD: - head.complete(message); - state = HeadState.SET_UNREAD; - break; case SET_UNREAD: - mq.push(message); + messageQueue.push(message); break; - case SET_READ: + case UNSET_UNREAD: case UNSET_READ: head.complete(message); head = new CompletableFuture<>(); @@ -60,16 +54,15 @@ public CompletableFuture pop() { CompletableFuture result = head; switch (state) { case UNSET_UNREAD: - case SET_READ: state = HeadState.UNSET_READ; break; case SET_UNREAD: head = new CompletableFuture<>(); - if (mq.isEmpty()) { - state = HeadState.SET_READ; + if (messageQueue.isEmpty()) { + state = HeadState.UNSET_UNREAD; break; } - head.complete(mq.pop()); + head.complete(messageQueue.pop()); // no state change break; case UNSET_READ: diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 7bdb8870c2..075c231dca 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -130,15 +131,16 @@ public void cleanup() { pubsubMessageQueue.clear(); } + @SneakyThrows private void verifyReceivedPubsubMessages( Set> pubsubMessages, BaseClient listener, boolean callback) { if (callback) { assertEquals(pubsubMessages, new HashSet<>(pubsubMessageQueue)); } else { var received = new HashSet(pubsubMessages.size()); - PubSubMessage pubsubMessage; - while ((pubsubMessage = listener.tryGetPubSubMessage()) != null) { - received.add(pubsubMessage); + CompletableFuture messagePromise; + while ((messagePromise = listener.getPubSubMessage()).isDone()) { + received.add(messagePromise.get()); } assertEquals( pubsubMessages.stream().map(Pair::getValue).collect(Collectors.toSet()), received); @@ -831,8 +833,7 @@ public void error_cases() { @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTwoBoolPermutations") - public void transaction_with_all_types_of_PubsubMessages( - boolean standalone, boolean useCallback) { + public void transaction_with_all_types_of_messages(boolean standalone, boolean useCallback) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); assumeTrue( From 1586794789bc173b7c89758d1a26b23382e14ab8 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 09:44:06 -0700 Subject: [PATCH 04/18] Test fixes. Signed-off-by: Yury-Fridlyand --- .../handlers/PubSubMessageQueue.java | 3 + .../src/test/java/glide/PubSubTests.java | 197 +++++++++++------- 2 files changed, 122 insertions(+), 78 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java index bc73737fde..ee047b7a65 100644 --- a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java +++ b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java @@ -39,6 +39,9 @@ public void push(PubSubMessage message) { messageQueue.push(message); break; case UNSET_UNREAD: + head.complete(message); + state = HeadState.SET_UNREAD; + break; case UNSET_READ: head.complete(message); head = new CompletableFuture<>(); diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 075c231dca..c32e39a1b5 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -37,19 +37,21 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -@Timeout(30) // sec +// @Timeout(30) // sec public class PubSubTests { // TODO protocol version @@ -131,12 +133,20 @@ public void cleanup() { pubsubMessageQueue.clear(); } + private enum MessageReadMethod { + Callback, + Async, + Sync + } + @SneakyThrows private void verifyReceivedPubsubMessages( - Set> pubsubMessages, BaseClient listener, boolean callback) { - if (callback) { + Set> pubsubMessages, + BaseClient listener, + MessageReadMethod method) { + if (method == MessageReadMethod.Callback) { assertEquals(pubsubMessages, new HashSet<>(pubsubMessageQueue)); - } else { + } else if (method == MessageReadMethod.Async) { var received = new HashSet(pubsubMessages.size()); CompletableFuture messagePromise; while ((messagePromise = listener.getPubSubMessage()).isDone()) { @@ -144,15 +154,33 @@ private void verifyReceivedPubsubMessages( } assertEquals( pubsubMessages.stream().map(Pair::getValue).collect(Collectors.toSet()), received); + } else { // Sync + var received = new HashSet(pubsubMessages.size()); + PubSubMessage message; + do { + try { + message = + CompletableFuture.supplyAsync(listener::tryGetPubSubMessage).get(1, TimeUnit.SECONDS); + received.add(message); + } catch (TimeoutException ignored) { + // all messages read + break; + } + } while (message != null); + assertEquals( + pubsubMessages.stream().map(Pair::getValue).collect(Collectors.toSet()), received); } } - private static Stream getTwoBoolPermutations() { + /** Permute all combinations of `standalone` as bool vs {@link MessageReadMethod}. */ + private static Stream getTestScenarios() { return Stream.of( - Arguments.of(true, true), - Arguments.of(true, false), - Arguments.of(false, true), - Arguments.of(false, false)); + Arguments.of(true, MessageReadMethod.Callback), + Arguments.of(true, MessageReadMethod.Sync), + Arguments.of(true, MessageReadMethod.Async), + Arguments.of(false, MessageReadMethod.Callback), + Arguments.of(false, MessageReadMethod.Sync), + Arguments.of(false, MessageReadMethod.Async)); } private ChannelMode exact(boolean standalone) { @@ -204,14 +232,15 @@ private void skipTestsOnMac() { /** Similar to `test_pubsub_exact_happy_path` in python client tests. */ @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void exact_happy_path(boolean standalone, boolean useCallback) { + @MethodSource("getTestScenarios") + public void exact_happy_path(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); String channel = UUID.randomUUID().toString(); String message = UUID.randomUUID().toString(); var subscriptions = Map.of(exact(standalone), Set.of(channel)); - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -219,14 +248,14 @@ public void exact_happy_path(boolean standalone, boolean useCallback) { Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message verifyReceivedPubsubMessages( - Set.of(Pair.of(1, new PubSubMessage(message, channel))), listener, useCallback); + Set.of(Pair.of(1, new PubSubMessage(message, channel))), listener, method); } /** Similar to `test_pubsub_exact_happy_path_many_channels` in python client tests. */ @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void exact_happy_path_many_channels(boolean standalone, boolean useCallback) { + @MethodSource("getTestScenarios") + public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); int numChannels = 256; int messagesPerChannel = 256; @@ -243,7 +272,8 @@ public void exact_happy_path_many_channels(boolean standalone, boolean useCallba } } - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -254,16 +284,14 @@ public void exact_happy_path_many_channels(boolean standalone, boolean useCallba Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages verifyReceivedPubsubMessages( - messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), - listener, - useCallback); + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); } /** Similar to `test_sharded_pubsub` in python client tests. */ @SneakyThrows @ParameterizedTest(name = "use callback = {0}") - @ValueSource(booleans = {true, false}) - public void sharded_pubsub(boolean useCallback) { + @EnumSource(MessageReadMethod.class) + public void sharded_pubsub(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); @@ -271,7 +299,7 @@ public void sharded_pubsub(boolean useCallback) { String pubsubMessage = UUID.randomUUID().toString(); var subscriptions = Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); - var listener = createListener(false, useCallback, 1, subscriptions); + var listener = createListener(false, method == MessageReadMethod.Callback, 1, subscriptions); var sender = (RedisClusterClient) createClient(false); clients.addAll(List.of(listener, sender)); @@ -279,14 +307,14 @@ public void sharded_pubsub(boolean useCallback) { Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message verifyReceivedPubsubMessages( - Set.of(Pair.of(1, new PubSubMessage(pubsubMessage, channel))), listener, useCallback); + Set.of(Pair.of(1, new PubSubMessage(pubsubMessage, channel))), listener, method); } /** Similar to `test_sharded_pubsub_many_channels` in python client tests. */ @SneakyThrows @ParameterizedTest(name = "use callback = {0}") - @ValueSource(booleans = {true, false}) - public void sharded_pubsub_many_channels(boolean useCallback) { + @EnumSource(MessageReadMethod.class) + public void sharded_pubsub_many_channels(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); @@ -305,7 +333,7 @@ public void sharded_pubsub_many_channels(boolean useCallback) { } } - var listener = createListener(false, useCallback, 1, subscriptions); + var listener = createListener(false, method == MessageReadMethod.Callback, 1, subscriptions); var sender = (RedisClusterClient) createClient(false); clients.addAll(List.of(listener, sender)); @@ -319,14 +347,14 @@ public void sharded_pubsub_many_channels(boolean useCallback) { verifyReceivedPubsubMessages( pubsubMessages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, - useCallback); + method); } /** Similar to `test_pubsub_pattern` in python client tests. */ @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void pattern(boolean standalone, boolean useCallback) { + @MethodSource("getTestScenarios") + public void pattern(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -338,7 +366,8 @@ public void pattern(boolean standalone, boolean useCallback) { standalone ? PubSubChannelMode.PATTERN : PubSubClusterChannelMode.PATTERN, Set.of(pattern)); - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -355,14 +384,14 @@ public void pattern(boolean standalone, boolean useCallback) { .map(e -> Pair.of(1, new PubSubMessage(e.getValue(), e.getKey(), pattern))) .collect(Collectors.toSet()); - verifyReceivedPubsubMessages(expected, listener, useCallback); + verifyReceivedPubsubMessages(expected, listener, method); } /** Similar to `test_pubsub_pattern_many_channels` in python client tests. */ @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void pattern_many_channels(boolean standalone, boolean useCallback) { + @MethodSource("getTestScenarios") + public void pattern_many_channels(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -380,7 +409,8 @@ public void pattern_many_channels(boolean standalone, boolean useCallback) { } } - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -393,16 +423,14 @@ public void pattern_many_channels(boolean standalone, boolean useCallback) { Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages verifyReceivedPubsubMessages( - messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), - listener, - useCallback); + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); } /** Similar to `test_pubsub_combined_exact_and_pattern_one_client` in python client tests. */ @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void combined_exact_and_pattern_one_client(boolean standalone, boolean useCallback) { + @MethodSource("getTestScenarios") + public void combined_exact_and_pattern_one_client(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -432,7 +460,8 @@ public void combined_exact_and_pattern_one_client(boolean standalone, boolean us messages.add(new PubSubMessage(pubsubMessage, channel, pattern)); } - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -443,9 +472,7 @@ public void combined_exact_and_pattern_one_client(boolean standalone, boolean us Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages verifyReceivedPubsubMessages( - messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), - listener, - useCallback); + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); } /** @@ -453,8 +480,10 @@ public void combined_exact_and_pattern_one_client(boolean standalone, boolean us */ @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void combined_exact_and_pattern_multiple_clients(boolean standalone, boolean useCallback) { + @MethodSource("getTestScenarios") + public void combined_exact_and_pattern_multiple_clients( + boolean standalone, MessageReadMethod method) { + if (!standalone) return; skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -476,10 +505,12 @@ public void combined_exact_and_pattern_multiple_clients(boolean standalone, bool messages.add(new PubSubMessage(message, channel, pattern)); } - var listenerExactSub = createListener(standalone, useCallback, 1, subscriptions); + var listenerExactSub = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); subscriptions = Map.of(pattern(standalone), Set.of(pattern)); - var listenerPatternSub = createListener(standalone, useCallback, 2, subscriptions); + var listenerPatternSub = + createListener(standalone, method == MessageReadMethod.Callback, 2, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listenerExactSub, listenerPatternSub, sender)); @@ -490,13 +521,13 @@ public void combined_exact_and_pattern_multiple_clients(boolean standalone, bool Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages - if (useCallback) { + if (method == MessageReadMethod.Callback) { verifyReceivedPubsubMessages( messages.stream() .map(m -> Pair.of(m.getPattern().isEmpty() ? 1 : 2, m)) .collect(Collectors.toSet()), listenerExactSub, - useCallback); + method); } else { verifyReceivedPubsubMessages( messages.stream() @@ -504,14 +535,14 @@ public void combined_exact_and_pattern_multiple_clients(boolean standalone, bool .map(m -> Pair.of(1, m)) .collect(Collectors.toSet()), listenerExactSub, - useCallback); + method); verifyReceivedPubsubMessages( messages.stream() .filter(m -> m.getPattern().isPresent()) .map(m -> Pair.of(2, m)) .collect(Collectors.toSet()), listenerPatternSub, - useCallback); + method); } } @@ -520,8 +551,8 @@ public void combined_exact_and_pattern_multiple_clients(boolean standalone, bool */ @SneakyThrows @ParameterizedTest(name = "use callback = {0}") - @ValueSource(booleans = {true, false}) - public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { + @EnumSource(MessageReadMethod.class) + public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); @@ -557,7 +588,7 @@ public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { messages.add(new PubSubMessage(message, channel, pattern)); } - var listener = createListener(false, useCallback, 1, subscriptions); + var listener = createListener(false, method == MessageReadMethod.Callback, 1, subscriptions); var sender = (RedisClusterClient) createClient(false); clients.addAll(List.of(listener, sender)); @@ -572,9 +603,7 @@ public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { messages.addAll(shardedMessages); verifyReceivedPubsubMessages( - messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), - listener, - useCallback); + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); } /** @@ -583,8 +612,8 @@ public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { */ @SneakyThrows @ParameterizedTest(name = "use callback = {0}") - @ValueSource(booleans = {true, false}) - public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) { + @EnumSource(MessageReadMethod.class) + public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); @@ -624,13 +653,22 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) var listenerExact = createListener( - false, useCallback, PubSubClusterChannelMode.EXACT.ordinal(), subscriptionsExact); + false, + method == MessageReadMethod.Callback, + PubSubClusterChannelMode.EXACT.ordinal(), + subscriptionsExact); var listenerPattern = createListener( - false, useCallback, PubSubClusterChannelMode.PATTERN.ordinal(), subscriptionsPattern); + false, + method == MessageReadMethod.Callback, + PubSubClusterChannelMode.PATTERN.ordinal(), + subscriptionsPattern); var listenerSharded = createListener( - false, useCallback, PubSubClusterChannelMode.SHARDED.ordinal(), subscriptionsSharded); + false, + method == MessageReadMethod.Callback, + PubSubClusterChannelMode.SHARDED.ordinal(), + subscriptionsSharded); var sender = (RedisClusterClient) createClient(false); clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded, sender)); @@ -647,7 +685,7 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages - if (useCallback) { + if (method == MessageReadMethod.Callback) { var expected = new HashSet>(); expected.addAll( exactMessages.stream() @@ -662,26 +700,26 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) .map(m -> Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), m)) .collect(Collectors.toSet())); - verifyReceivedPubsubMessages(expected, listenerExact, useCallback); + verifyReceivedPubsubMessages(expected, listenerExact, method); } else { verifyReceivedPubsubMessages( exactMessages.stream() .map(m -> Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), m)) .collect(Collectors.toSet()), listenerExact, - useCallback); + method); verifyReceivedPubsubMessages( patternMessages.stream() .map(m -> Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), m)) .collect(Collectors.toSet()), listenerPattern, - useCallback); + method); verifyReceivedPubsubMessages( shardedMessages.stream() .map(m -> Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), m)) .collect(Collectors.toSet()), listenerSharded, - useCallback); + method); } } @@ -690,11 +728,13 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) * tests. */ @SneakyThrows - @Test - public void three_publishing_clients_same_name_with_sharded_no_callback() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void three_publishing_clients_same_name_with_sharded_no_callback(boolean sync) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); + MessageReadMethod method = sync ? MessageReadMethod.Sync : MessageReadMethod.Async; String channel = UUID.randomUUID().toString(); var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); var patternMessage = new PubSubMessage(UUID.randomUUID().toString(), channel, channel); @@ -727,7 +767,7 @@ public void three_publishing_clients_same_name_with_sharded_no_callback() { PubSubClusterChannelMode.EXACT.ordinal(), new PubSubMessage(patternMessage.getMessage(), channel))), listenerExact, - false); + method); verifyReceivedPubsubMessages( Set.of( Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage), @@ -735,11 +775,11 @@ public void three_publishing_clients_same_name_with_sharded_no_callback() { PubSubClusterChannelMode.PATTERN.ordinal(), new PubSubMessage(exactMessage.getMessage(), channel, channel))), listenerPattern, - false); + method); verifyReceivedPubsubMessages( Set.of(Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)), listenerSharded, - false); + method); } /** @@ -794,7 +834,7 @@ public void three_publishing_clients_same_name_with_sharded_with_callback() { new PubSubMessage(exactMessage.getMessage(), channel, channel)), Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)); - verifyReceivedPubsubMessages(expected, listenerExact, true); + verifyReceivedPubsubMessages(expected, listenerExact, MessageReadMethod.Callback); } @SneakyThrows @@ -832,8 +872,8 @@ public void error_cases() { @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void transaction_with_all_types_of_messages(boolean standalone, boolean useCallback) { + @MethodSource("getTestScenarios") + public void transaction_with_all_types_of_messages(boolean standalone, MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); assumeTrue( @@ -863,7 +903,8 @@ public void transaction_with_all_types_of_messages(boolean standalone, boolean u PubSubClusterChannelMode.SHARDED, Set.of(shardPrefix)); - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -889,6 +930,6 @@ public void transaction_with_all_types_of_messages(boolean standalone, boolean u ? Set.of(Pair.of(1, exactMessage), Pair.of(1, patternMessage)) : Set.of( Pair.of(1, exactMessage), Pair.of(1, patternMessage), Pair.of(1, shardedMessage)); - verifyReceivedPubsubMessages(expected, listener, useCallback); + verifyReceivedPubsubMessages(expected, listener, method); } } From 9b0cef300d0d5be097961af9056ac1af99397030 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 09:45:10 -0700 Subject: [PATCH 05/18] Revert a test stub. Signed-off-by: Yury-Fridlyand --- java/integTest/src/test/java/glide/PubSubTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index c32e39a1b5..b5ca8f2fb7 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -483,7 +483,6 @@ public void combined_exact_and_pattern_one_client(boolean standalone, MessageRea @MethodSource("getTestScenarios") public void combined_exact_and_pattern_multiple_clients( boolean standalone, MessageReadMethod method) { - if (!standalone) return; skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; From e76d655f274b81f4ef6446a2f8e68f14ce09c047 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 10:02:08 -0700 Subject: [PATCH 06/18] Activate tests on mac. Signed-off-by: Yury-Fridlyand --- .../src/test/java/glide/PubSubTests.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index b5ca8f2fb7..6bcf8215d8 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -222,19 +222,11 @@ private BaseClient createListener( // meanwhile, all pubsubMessages are delivered. // debug this and add checks for `publish` return value - // TODO: remove once fixed - private void skipTestsOnMac() { - assumeFalse( - System.getProperty("os.name").toLowerCase().contains("mac"), - "PubSub doesn't work on mac OS"); - } - /** Similar to `test_pubsub_exact_happy_path` in python client tests. */ @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void exact_happy_path(boolean standalone, MessageReadMethod method) { - skipTestsOnMac(); String channel = UUID.randomUUID().toString(); String message = UUID.randomUUID().toString(); var subscriptions = Map.of(exact(standalone), Set.of(channel)); @@ -256,7 +248,6 @@ public void exact_happy_path(boolean standalone, MessageReadMethod method) { @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod method) { - skipTestsOnMac(); int numChannels = 256; int messagesPerChannel = 256; var messages = new ArrayList(numChannels * messagesPerChannel); @@ -293,7 +284,6 @@ public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod @EnumSource(MessageReadMethod.class) public void sharded_pubsub(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); - skipTestsOnMac(); String channel = UUID.randomUUID().toString(); String pubsubMessage = UUID.randomUUID().toString(); @@ -316,7 +306,6 @@ public void sharded_pubsub(MessageReadMethod method) { @EnumSource(MessageReadMethod.class) public void sharded_pubsub_many_channels(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); - skipTestsOnMac(); int numChannels = 256; int pubsubMessagesPerChannel = 256; @@ -355,7 +344,6 @@ public void sharded_pubsub_many_channels(MessageReadMethod method) { @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void pattern(boolean standalone, MessageReadMethod method) { - skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; Map message2channels = @@ -392,7 +380,6 @@ public void pattern(boolean standalone, MessageReadMethod method) { @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void pattern_many_channels(boolean standalone, MessageReadMethod method) { - skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; int numChannels = 256; @@ -431,7 +418,6 @@ public void pattern_many_channels(boolean standalone, MessageReadMethod method) @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void combined_exact_and_pattern_one_client(boolean standalone, MessageReadMethod method) { - skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; int numChannels = 256; @@ -483,7 +469,6 @@ public void combined_exact_and_pattern_one_client(boolean standalone, MessageRea @MethodSource("getTestScenarios") public void combined_exact_and_pattern_multiple_clients( boolean standalone, MessageReadMethod method) { - skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; int numChannels = 256; @@ -553,7 +538,6 @@ public void combined_exact_and_pattern_multiple_clients( @EnumSource(MessageReadMethod.class) public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); - skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -614,7 +598,6 @@ public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod meth @EnumSource(MessageReadMethod.class) public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); - skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -731,7 +714,6 @@ public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod me @ValueSource(booleans = {true, false}) public void three_publishing_clients_same_name_with_sharded_no_callback(boolean sync) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); - skipTestsOnMac(); MessageReadMethod method = sync ? MessageReadMethod.Sync : MessageReadMethod.Async; String channel = UUID.randomUUID().toString(); @@ -789,7 +771,6 @@ public void three_publishing_clients_same_name_with_sharded_no_callback(boolean @Test public void three_publishing_clients_same_name_with_sharded_with_callback() { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); - skipTestsOnMac(); String channel = UUID.randomUUID().toString(); var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); @@ -839,7 +820,6 @@ public void three_publishing_clients_same_name_with_sharded_with_callback() { @SneakyThrows @Test public void error_cases() { - skipTestsOnMac(); // client isn't configured with subscriptions var client = createClient(true); assertThrows(ConfigurationError.class, client::tryGetPubSubMessage); @@ -874,7 +854,6 @@ public void error_cases() { @MethodSource("getTestScenarios") public void transaction_with_all_types_of_messages(boolean standalone, MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); - skipTestsOnMac(); assumeTrue( standalone, // TODO activate tests after fix "Test doesn't work on cluster due to Cross Slot error, probably a bug in `redis-rs`"); From e0ca0a4ec077113ee1d100891fbac23d217e4222 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 10:17:43 -0700 Subject: [PATCH 07/18] I HATE YOU SPOTLESS Signed-off-by: Yury-Fridlyand --- java/integTest/src/test/java/glide/PubSubTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 6bcf8215d8..c9aa1845cf 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -10,7 +10,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.junit.jupiter.api.Assumptions.assumeTrue; import glide.api.BaseClient; From 025de96a39fc275d20b839c9631104f3bf8ae0a9 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 10:27:49 -0700 Subject: [PATCH 08/18] Revert "Activate tests on mac." This reverts commit e76d655f274b81f4ef6446a2f8e68f14ce09c047. Signed-off-by: Yury-Fridlyand --- .../src/test/java/glide/PubSubTests.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index c9aa1845cf..b5ca8f2fb7 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.junit.jupiter.api.Assumptions.assumeTrue; import glide.api.BaseClient; @@ -221,11 +222,19 @@ private BaseClient createListener( // meanwhile, all pubsubMessages are delivered. // debug this and add checks for `publish` return value + // TODO: remove once fixed + private void skipTestsOnMac() { + assumeFalse( + System.getProperty("os.name").toLowerCase().contains("mac"), + "PubSub doesn't work on mac OS"); + } + /** Similar to `test_pubsub_exact_happy_path` in python client tests. */ @SneakyThrows @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void exact_happy_path(boolean standalone, MessageReadMethod method) { + skipTestsOnMac(); String channel = UUID.randomUUID().toString(); String message = UUID.randomUUID().toString(); var subscriptions = Map.of(exact(standalone), Set.of(channel)); @@ -247,6 +256,7 @@ public void exact_happy_path(boolean standalone, MessageReadMethod method) { @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod method) { + skipTestsOnMac(); int numChannels = 256; int messagesPerChannel = 256; var messages = new ArrayList(numChannels * messagesPerChannel); @@ -283,6 +293,7 @@ public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod @EnumSource(MessageReadMethod.class) public void sharded_pubsub(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); String channel = UUID.randomUUID().toString(); String pubsubMessage = UUID.randomUUID().toString(); @@ -305,6 +316,7 @@ public void sharded_pubsub(MessageReadMethod method) { @EnumSource(MessageReadMethod.class) public void sharded_pubsub_many_channels(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); int numChannels = 256; int pubsubMessagesPerChannel = 256; @@ -343,6 +355,7 @@ public void sharded_pubsub_many_channels(MessageReadMethod method) { @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void pattern(boolean standalone, MessageReadMethod method) { + skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; Map message2channels = @@ -379,6 +392,7 @@ public void pattern(boolean standalone, MessageReadMethod method) { @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void pattern_many_channels(boolean standalone, MessageReadMethod method) { + skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; int numChannels = 256; @@ -417,6 +431,7 @@ public void pattern_many_channels(boolean standalone, MessageReadMethod method) @ParameterizedTest(name = "standalone = {0}, use callback = {1}") @MethodSource("getTestScenarios") public void combined_exact_and_pattern_one_client(boolean standalone, MessageReadMethod method) { + skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; int numChannels = 256; @@ -468,6 +483,7 @@ public void combined_exact_and_pattern_one_client(boolean standalone, MessageRea @MethodSource("getTestScenarios") public void combined_exact_and_pattern_multiple_clients( boolean standalone, MessageReadMethod method) { + skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; int numChannels = 256; @@ -537,6 +553,7 @@ public void combined_exact_and_pattern_multiple_clients( @EnumSource(MessageReadMethod.class) public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -597,6 +614,7 @@ public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod meth @EnumSource(MessageReadMethod.class) public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -713,6 +731,7 @@ public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod me @ValueSource(booleans = {true, false}) public void three_publishing_clients_same_name_with_sharded_no_callback(boolean sync) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); MessageReadMethod method = sync ? MessageReadMethod.Sync : MessageReadMethod.Async; String channel = UUID.randomUUID().toString(); @@ -770,6 +789,7 @@ public void three_publishing_clients_same_name_with_sharded_no_callback(boolean @Test public void three_publishing_clients_same_name_with_sharded_with_callback() { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); String channel = UUID.randomUUID().toString(); var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); @@ -819,6 +839,7 @@ public void three_publishing_clients_same_name_with_sharded_with_callback() { @SneakyThrows @Test public void error_cases() { + skipTestsOnMac(); // client isn't configured with subscriptions var client = createClient(true); assertThrows(ConfigurationError.class, client::tryGetPubSubMessage); @@ -853,6 +874,7 @@ public void error_cases() { @MethodSource("getTestScenarios") public void transaction_with_all_types_of_messages(boolean standalone, MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); assumeTrue( standalone, // TODO activate tests after fix "Test doesn't work on cluster due to Cross Slot error, probably a bug in `redis-rs`"); From eb84c2a97ce3ac83a7e10a3469bbf1dfe0c26f34 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 12:50:43 -0700 Subject: [PATCH 09/18] O, great god of test, accept this. Signed-off-by: Yury-Fridlyand --- .github/workflows/java.yml | 2 +- .../java/glide/api/models/PubSubMessage.java | 4 +- .../handlers/PubSubMessageQueue.java | 9 +- .../handlers/PubSubMessageQueueTests.java | 140 ++++++++++++++++++ .../src/test/java/glide/PubSubTests.java | 91 +++++++++++- 5 files changed, 234 insertions(+), 12 deletions(-) create mode 100644 java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index 4964a7dbed..07443075ff 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -126,7 +126,7 @@ jobs: - 17 runs-on: ubuntu-latest container: amazonlinux:latest - timeout-minutes: 15 + timeout-minutes: 35 steps: - name: Install git run: | diff --git a/java/client/src/main/java/glide/api/models/PubSubMessage.java b/java/client/src/main/java/glide/api/models/PubSubMessage.java index c3109956f1..43d6bc1d93 100644 --- a/java/client/src/main/java/glide/api/models/PubSubMessage.java +++ b/java/client/src/main/java/glide/api/models/PubSubMessage.java @@ -32,10 +32,10 @@ public PubSubMessage(String message, String channel) { @Override public String toString() { - String res = String.format("%s, channel = %s", message, channel); + String res = String.format("(%s, channel = %s", message, channel); if (pattern.isPresent()) { res += ", pattern = " + pattern.get(); } - return res; + return res + ")"; } } diff --git a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java index ee047b7a65..b28e8d503d 100644 --- a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java +++ b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java @@ -8,13 +8,14 @@ /** A FIFO message queue for {@link PubSubMessage}. */ public class PubSubMessageQueue { + // fields are protected to ease testing /** The queue itself. */ - private final LinkedList messageQueue = new LinkedList<>(); + protected final LinkedList messageQueue = new LinkedList<>(); /** * The head of the queue stored aside as a {@link Future}. Stores a promise to the first message. */ - private CompletableFuture head = new CompletableFuture<>(); + protected CompletableFuture head = new CompletableFuture<>(); /** An object to synchronize threads. */ private final Object lock = new Object(); @@ -36,7 +37,7 @@ public void push(PubSubMessage message) { synchronized (lock) { switch (state) { case SET_UNREAD: - messageQueue.push(message); + messageQueue.addLast(message); break; case UNSET_UNREAD: head.complete(message); @@ -45,7 +46,7 @@ public void push(PubSubMessage message) { case UNSET_READ: head.complete(message); head = new CompletableFuture<>(); - state = HeadState.SET_UNREAD; + state = HeadState.UNSET_UNREAD; break; } } diff --git a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java new file mode 100644 index 0000000000..13b08a6cb6 --- /dev/null +++ b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java @@ -0,0 +1,140 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.connectors.handlers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import glide.api.models.PubSubMessage; +import java.util.LinkedList; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(30) // sec +public class PubSubMessageQueueTests { + + private void checkFutureStatus(CompletableFuture future, boolean shouldBeDone) { + assertEquals(shouldBeDone, future.isDone()); + assertFalse(future.isCancelled()); + assertFalse(future.isCompletedExceptionally()); + } + + @Test + @SneakyThrows + public void read_messages_then_add() { + var queue = new PubSubMessageQueue(); + var promise1 = queue.pop(); + var promise2 = queue.pop(); + + assertSame(promise1, promise2); + checkFutureStatus(promise1, false); + assertTrue(queue.messageQueue.isEmpty()); + + // now - add + var msg1 = new PubSubMessage("one", "one"); + var msg2 = new PubSubMessage("two", "two"); + var msg3 = new PubSubMessage("three", "three"); + queue.push(msg1); + queue.push(msg2); + queue.push(msg3); + + // promise should get resolved automagically + checkFutureStatus(promise1, true); + assertSame(msg1, promise1.get()); + // and `head` too + checkFutureStatus(queue.head, true); + assertSame(msg2, queue.head.get()); + // and the last message remains in the MQ + assertEquals(1, queue.messageQueue.size()); + } + + @Test + @SneakyThrows + public void add_messages_then_read() { + var queue = new PubSubMessageQueue(); + + var msg1 = new PubSubMessage("one", "one"); + var msg2 = new PubSubMessage("two", "two"); + var msg3 = new PubSubMessage("three", "three"); + queue.push(msg1); + queue.push(msg2); + queue.push(msg3); + + checkFutureStatus(queue.head, true); + // MQ stores only second and third message, first is stored in `head` + assertEquals(2, queue.messageQueue.size()); + assertSame(msg1, queue.head.get()); + assertSame(msg2, queue.messageQueue.peek()); + + // now - read + assertSame(msg1, queue.pop().get()); + // second messages should be shifted to `head` + assertEquals(1, queue.messageQueue.size()); + checkFutureStatus(queue.head, true); + assertSame(msg2, queue.head.get()); + // keep reading + assertSame(msg2, queue.pop().get()); + // MQ is empty, but `head` isn't + assertTrue(queue.messageQueue.isEmpty()); + checkFutureStatus(queue.head, true); + assertSame(msg3, queue.head.get()); + // read more + assertSame(msg2, queue.pop().get()); + // MQ and `head` are both empty + assertTrue(queue.messageQueue.isEmpty()); + checkFutureStatus(queue.head, false); + } + + @Test + @SneakyThrows + public void concurrent_read_write() { + var queue = new PubSubMessageQueue(); + var numMessages = 1000; // test takes ~0.5 sec + // collections aren't concurrent, since we have only 1 reader and 1 writer so far + var expected = new LinkedList(); + var actual = new LinkedList(); + var rand = new Random(); + for (int i = 0; i < numMessages; i++) { + expected.add(new PubSubMessage(i + " " + UUID.randomUUID(), UUID.randomUUID().toString())); + } + + Runnable writer = + () -> { + for (int i = 0; i < numMessages; i++) { + queue.push(expected.get(i)); + try { + Thread.sleep(rand.nextInt(2)); + } catch (InterruptedException ignored) { + } + } + }; + Runnable reader = + () -> { + PubSubMessage message = null; + do { + try { + message = queue.pop().get(15, TimeUnit.MILLISECONDS); + actual.add(message); + Thread.sleep(rand.nextInt(2)); + } catch (TimeoutException ignored) { + // all messages read + break; + } catch (Exception ignored) { + } + } while (message != null); + }; + // start reader and writer and wait for finish + CompletableFuture.allOf(CompletableFuture.runAsync(writer), CompletableFuture.runAsync(reader)) + .get(); + + // this verifies message order + assertEquals(expected, actual); + } +} diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index b5ca8f2fb7..8f30164351 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -45,13 +46,14 @@ import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -// @Timeout(30) // sec +@Timeout(30) // sec public class PubSubTests { // TODO protocol version @@ -289,7 +291,7 @@ public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod /** Similar to `test_sharded_pubsub` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "use callback = {0}") + @ParameterizedTest @EnumSource(MessageReadMethod.class) public void sharded_pubsub(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); @@ -312,7 +314,7 @@ public void sharded_pubsub(MessageReadMethod method) { /** Similar to `test_sharded_pubsub_many_channels` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "use callback = {0}") + @ParameterizedTest @EnumSource(MessageReadMethod.class) public void sharded_pubsub_many_channels(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); @@ -549,7 +551,7 @@ public void combined_exact_and_pattern_multiple_clients( * Similar to `test_pubsub_combined_exact_pattern_and_sharded_one_client` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "use callback = {0}") + @ParameterizedTest @EnumSource(MessageReadMethod.class) public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); @@ -605,12 +607,91 @@ public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod meth messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); } + @SneakyThrows + @Test + public void coexistense_of_sync_and_async_read() { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); + + String prefix = "channel."; + String pattern = prefix + "*"; + String shardPrefix = "{shard}"; + int numChannels = 256; + var messages = new ArrayList(numChannels * 2); + var shardedMessages = new ArrayList(numChannels); + Map> subscriptions = + Map.of( + PubSubClusterChannelMode.EXACT, new HashSet<>(), + PubSubClusterChannelMode.PATTERN, Set.of(pattern), + PubSubClusterChannelMode.SHARDED, new HashSet<>()); + + for (var i = 0; i < numChannels; i++) { + var channel = i + "-" + UUID.randomUUID(); + subscriptions.get(PubSubClusterChannelMode.EXACT).add(channel); + var message = i + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel)); + } + + for (var i = 0; i < numChannels; i++) { + var channel = shardPrefix + "-" + i + "-" + UUID.randomUUID(); + subscriptions.get(PubSubClusterChannelMode.SHARDED).add(channel); + var message = i + "-" + UUID.randomUUID(); + shardedMessages.add(new PubSubMessage(message, channel)); + } + + for (var j = 0; j < numChannels; j++) { + var message = j + "-" + UUID.randomUUID(); + var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel, pattern)); + } + + var listener = createListener(false, false, 1, subscriptions); + var sender = (RedisClusterClient) createClient(false); + clients.addAll(List.of(listener, sender)); + + for (var pubsubMessage : messages) { + sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + for (var pubsubMessage : shardedMessages) { + sender.spublish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + messages.addAll(shardedMessages); + + var received = new HashSet(messages.size()); + var rand = new Random(); + while (true) { + if (rand.nextBoolean()) { + CompletableFuture messagePromise = listener.getPubSubMessage(); + if (messagePromise.isDone()) { + received.add(messagePromise.get()); + } else { + break; // all messages read + } + } else { + try { + var message = + CompletableFuture.supplyAsync(listener::tryGetPubSubMessage).get(1, TimeUnit.SECONDS); + received.add(message); + } catch (TimeoutException ignored) { + break; // all messages read + } + } + } + + // redis can reorder the messages, so we can't validate that the order (without big delays + // between sends) + assertEquals(new HashSet<>(messages), received); + } + /** * Similar to `test_pubsub_combined_exact_pattern_and_sharded_multi_client` in python client * tests. */ @SneakyThrows - @ParameterizedTest(name = "use callback = {0}") + @ParameterizedTest @EnumSource(MessageReadMethod.class) public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); From b11e444e0f8499cba175eb90e282ae9ad3c089ca Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 12:55:22 -0700 Subject: [PATCH 10/18] Code cleanup. Signed-off-by: Yury-Fridlyand --- java/integTest/src/test/java/glide/PubSubTests.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 8f30164351..e74e0326f1 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -209,11 +209,6 @@ private BaseClient createListener( : createClientWithSubscriptions(standalone, subscriptions); } - // TODO add following tests from https://github.com/aws/glide-for-redis/pull/1643 - // test_pubsub_exact_happy_path_coexistence - // test_pubsub_exact_happy_path_many_channels_co_existence - // test_sharded_pubsub_co_existence - // test_pubsub_pattern_co_existence // TODO tests below blocked by https://github.com/aws/glide-for-redis/issues/1649 // test_pubsub_exact_max_size_PubsubMessage // test_pubsub_sharded_max_size_PubsubMessage @@ -607,6 +602,7 @@ public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod meth messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); } + /** This test fully covers all `test_pubsub_*_co_existence` tests in python client. */ @SneakyThrows @Test public void coexistense_of_sync_and_async_read() { From 8ee4741bfa7cf5b58bdc81614af8335aa57495a9 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 13:12:14 -0700 Subject: [PATCH 11/18] Typo fix. Signed-off-by: Yury-Fridlyand --- .../java/glide/connectors/handlers/PubSubMessageQueueTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java index 13b08a6cb6..4db5fa6964 100644 --- a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java +++ b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java @@ -86,7 +86,7 @@ public void add_messages_then_read() { checkFutureStatus(queue.head, true); assertSame(msg3, queue.head.get()); // read more - assertSame(msg2, queue.pop().get()); + assertSame(msg3, queue.pop().get()); // MQ and `head` are both empty assertTrue(queue.messageQueue.isEmpty()); checkFutureStatus(queue.head, false); From 87e5739effa05d3223a82d0983659f42efa27c3e Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 16:07:28 -0700 Subject: [PATCH 12/18] Address PR comments. Signed-off-by: Yury-Fridlyand --- java/integTest/src/test/java/glide/PubSubTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index e74e0326f1..d49ffb1c5d 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -29,6 +29,7 @@ import glide.api.models.exceptions.RequestException; import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -679,7 +680,7 @@ public void coexistense_of_sync_and_async_read() { // redis can reorder the messages, so we can't validate that the order (without big delays // between sends) - assertEquals(new HashSet<>(messages), received); + assertEquals(new LinkedHashSet<>(messages), received); } /** From 72da45aaaa4750e0fb05866c4ef5de90a31d554b Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 3 Jul 2024 16:26:47 -0700 Subject: [PATCH 13/18] Polish the tests. Signed-off-by: Yury-Fridlyand --- .../src/test/java/glide/PubSubTests.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index d49ffb1c5d..bc707a7e74 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -197,14 +197,14 @@ private ChannelMode pattern(boolean standalone) { @SuppressWarnings("unchecked") private BaseClient createListener( boolean standalone, - boolean useCallback, + boolean withCallback, int clientId, Map> subscriptions) { MessageCallback callback = (msg, ctx) -> ((ConcurrentLinkedDeque>) ctx) .push(Pair.of(clientId, msg)); - return useCallback + return withCallback ? createClientWithSubscriptions( standalone, subscriptions, Optional.of(callback), Optional.of(pubsubMessageQueue)) : createClientWithSubscriptions(standalone, subscriptions); @@ -229,7 +229,7 @@ private void skipTestsOnMac() { /** Similar to `test_pubsub_exact_happy_path` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") public void exact_happy_path(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); @@ -251,7 +251,7 @@ public void exact_happy_path(boolean standalone, MessageReadMethod method) { /** Similar to `test_pubsub_exact_happy_path_many_channels` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); @@ -350,7 +350,7 @@ public void sharded_pubsub_many_channels(MessageReadMethod method) { /** Similar to `test_pubsub_pattern` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") public void pattern(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); @@ -387,7 +387,7 @@ public void pattern(boolean standalone, MessageReadMethod method) { /** Similar to `test_pubsub_pattern_many_channels` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") public void pattern_many_channels(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); @@ -426,7 +426,7 @@ public void pattern_many_channels(boolean standalone, MessageReadMethod method) /** Similar to `test_pubsub_combined_exact_and_pattern_one_client` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") public void combined_exact_and_pattern_one_client(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); @@ -477,7 +477,7 @@ public void combined_exact_and_pattern_one_client(boolean standalone, MessageRea * Similar to `test_pubsub_combined_exact_and_pattern_multiple_clients` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") public void combined_exact_and_pattern_multiple_clients( boolean standalone, MessageReadMethod method) { @@ -948,7 +948,7 @@ public void error_cases() { } @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") public void transaction_with_all_types_of_messages(boolean standalone, MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); From 5729ca6b4635a9f4088c5c35c5a9e39e7b2a91d9 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 4 Jul 2024 17:09:54 -0700 Subject: [PATCH 14/18] Rework impl. Signed-off-by: Yury-Fridlyand --- .../src/main/java/glide/api/BaseClient.java | 13 +- .../handlers/PubSubMessageQueue.java | 88 ++++------ .../handlers/PubSubMessageQueueTests.java | 164 ++++++++++++++---- .../src/test/java/glide/PubSubTests.java | 9 +- 4 files changed, 170 insertions(+), 104 deletions(-) diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index f9e441593a..16fa972be6 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -363,7 +363,7 @@ protected static CompletableFuture CreateClient( } /** - * A blocking call to return a next pubsub message. + * Return a next pubsub message if it is present. * * @throws ConfigurationError If client is not subscribed to any channel or if client configured * with a callback. @@ -380,16 +380,11 @@ public PubSubMessage tryGetPubSubMessage() { "The operation will never complete since messages will be passed to the configured" + " callback."); } - try { - return messageQueue.pop().get(); - } catch (Exception unreachable) { - // should be never happen - return null; - } + return messageQueue.popSync(); } /** - * Returns a promise for a next pubsub message. + * Async returns a promise for a next pubsub message. * * @apiNote Not implemented! * @throws ConfigurationError If client is not subscribed to any channel or if client configured @@ -407,7 +402,7 @@ public CompletableFuture getPubSubMessage() { "The operation will never complete since messages will be passed to the configured" + " callback."); } - return messageQueue.pop(); + return messageQueue.popAsync(); } /** diff --git a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java index b28e8d503d..8696c92ab3 100644 --- a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java +++ b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java @@ -2,78 +2,54 @@ package glide.connectors.handlers; import glide.api.models.PubSubMessage; -import java.util.LinkedList; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicBoolean; /** A FIFO message queue for {@link PubSubMessage}. */ public class PubSubMessageQueue { // fields are protected to ease testing /** The queue itself. */ - protected final LinkedList messageQueue = new LinkedList<>(); + protected final ConcurrentLinkedDeque messageQueue = new ConcurrentLinkedDeque<>(); /** - * The head of the queue stored aside as a {@link Future}. Stores a promise to the first message. + * A promise for the first incoming message. Returned to a user, if message queried in async + * manner, but nothing received yet. */ - protected CompletableFuture head = new CompletableFuture<>(); + protected CompletableFuture firstMessagePromise = new CompletableFuture<>(); - /** An object to synchronize threads. */ - private final Object lock = new Object(); + /** A flag whether a user already got a {@link #firstMessagePromise}. */ + private final AtomicBoolean headPromiseRequested = new AtomicBoolean(false); - /** State of the queue. */ - private enum HeadState { - // `head` is a new CF, which was never given to a user - UNSET_UNREAD, - // `head` is a non-empty CF, which was never given to a user - SET_UNREAD, - // `head` is unset, but was given to a user - UNSET_READ, - } - - private HeadState state = HeadState.UNSET_UNREAD; + // TODO Rework to remove or reduce `synchronized` blocks. If remove it now, some messages get + // reordered. /** Store a new message. */ - public void push(PubSubMessage message) { - synchronized (lock) { - switch (state) { - case SET_UNREAD: - messageQueue.addLast(message); - break; - case UNSET_UNREAD: - head.complete(message); - state = HeadState.SET_UNREAD; - break; - case UNSET_READ: - head.complete(message); - head = new CompletableFuture<>(); - state = HeadState.UNSET_UNREAD; - break; - } + public synchronized void push(PubSubMessage message) { + if (headPromiseRequested.getAndSet(false)) { + firstMessagePromise.complete(message); + firstMessagePromise = new CompletableFuture<>(); + return; } + + messageQueue.addLast(message); } - /** Get a promise for the next message. */ - public CompletableFuture pop() { - synchronized (lock) { - CompletableFuture result = head; - switch (state) { - case UNSET_UNREAD: - state = HeadState.UNSET_READ; - break; - case SET_UNREAD: - head = new CompletableFuture<>(); - if (messageQueue.isEmpty()) { - state = HeadState.UNSET_UNREAD; - break; - } - head.complete(messageQueue.pop()); - // no state change - break; - case UNSET_READ: - // no state change - break; - } - return result; + /** Get a promise for a next message. */ + public synchronized CompletableFuture popAsync() { + PubSubMessage message; + if ((message = messageQueue.poll()) != null) { + var future = new CompletableFuture(); + future.complete(message); + return future; } + + headPromiseRequested.set(true); + return firstMessagePromise; + } + + /** Get a new message or null if nothing stored so far. */ + public PubSubMessage popSync() { + return messageQueue.poll(); } } diff --git a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java index 4db5fa6964..b36912e7e3 100644 --- a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java +++ b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -28,11 +29,14 @@ private void checkFutureStatus(CompletableFuture future, boolean @Test @SneakyThrows - public void read_messages_then_add() { + public void async_read_messages_then_add() { var queue = new PubSubMessageQueue(); - var promise1 = queue.pop(); - var promise2 = queue.pop(); + // read async - receiving promises + var promise1 = queue.popAsync(); + var promise2 = queue.popAsync(); + + // async reading from empty queue returns the same future for the same message assertSame(promise1, promise2); checkFutureStatus(promise1, false); assertTrue(queue.messageQueue.isEmpty()); @@ -45,14 +49,50 @@ public void read_messages_then_add() { queue.push(msg2); queue.push(msg3); - // promise should get resolved automagically + // promises should get resolved automagically checkFutureStatus(promise1, true); assertSame(msg1, promise1.get()); - // and `head` too - checkFutureStatus(queue.head, true); - assertSame(msg2, queue.head.get()); - // and the last message remains in the MQ - assertEquals(1, queue.messageQueue.size()); + // `firstMessagePromise` is a new uncompleted future + checkFutureStatus(queue.firstMessagePromise, false); + // and `msg1` isn't stored in the Q + assertEquals(2, queue.messageQueue.size()); + assertSame(msg2, queue.messageQueue.pop()); + assertSame(msg3, queue.messageQueue.pop()); + // now MQ should be empty + assertTrue(queue.messageQueue.isEmpty()); + } + + @Test + @SneakyThrows + public void sync_read_messages_then_add() { + var queue = new PubSubMessageQueue(); + + // read async - receiving nulls + assertNull(queue.popSync()); + assertNull(queue.popSync()); + + // `firstMessagePromise` remains unset and unused + checkFutureStatus(queue.firstMessagePromise, false); + // and Q is empty + assertTrue(queue.messageQueue.isEmpty()); + + // now - add + var msg1 = new PubSubMessage("one", "one"); + var msg2 = new PubSubMessage("two", "two"); + var msg3 = new PubSubMessage("three", "three"); + queue.push(msg1); + queue.push(msg2); + queue.push(msg3); + + // `firstMessagePromise` remains unset and unused + checkFutureStatus(queue.firstMessagePromise, false); + // all 3 messages are stored in the Q + assertEquals(3, queue.messageQueue.size()); + + // reading them + assertSame(msg1, queue.popSync()); + assertSame(msg2, queue.popSync()); + assertSame(msg3, queue.popSync()); } @Test @@ -63,38 +103,49 @@ public void add_messages_then_read() { var msg1 = new PubSubMessage("one", "one"); var msg2 = new PubSubMessage("two", "two"); var msg3 = new PubSubMessage("three", "three"); + var msg4 = new PubSubMessage("four", "four"); queue.push(msg1); queue.push(msg2); queue.push(msg3); + queue.push(msg4); + + // `firstMessagePromise` remains unset and unused + checkFutureStatus(queue.firstMessagePromise, false); + // all messages are stored in the Q + assertEquals(4, queue.messageQueue.size()); - checkFutureStatus(queue.head, true); - // MQ stores only second and third message, first is stored in `head` + // now - read one async + assertSame(msg1, queue.popAsync().get()); + // `firstMessagePromise` remains unset and unused + checkFutureStatus(queue.firstMessagePromise, false); + // Q stores remaining 3 messages + assertEquals(3, queue.messageQueue.size()); + + // read sync + assertSame(msg2, queue.popSync()); + checkFutureStatus(queue.firstMessagePromise, false); assertEquals(2, queue.messageQueue.size()); - assertSame(msg1, queue.head.get()); - assertSame(msg2, queue.messageQueue.peek()); - // now - read - assertSame(msg1, queue.pop().get()); - // second messages should be shifted to `head` - assertEquals(1, queue.messageQueue.size()); - checkFutureStatus(queue.head, true); - assertSame(msg2, queue.head.get()); // keep reading - assertSame(msg2, queue.pop().get()); - // MQ is empty, but `head` isn't - assertTrue(queue.messageQueue.isEmpty()); - checkFutureStatus(queue.head, true); - assertSame(msg3, queue.head.get()); - // read more - assertSame(msg3, queue.pop().get()); - // MQ and `head` are both empty - assertTrue(queue.messageQueue.isEmpty()); - checkFutureStatus(queue.head, false); + // get a future for the next message + var future = queue.popAsync(); + checkFutureStatus(future, true); + checkFutureStatus(queue.firstMessagePromise, false); + assertEquals(1, queue.messageQueue.size()); + // then read sync + assertSame(msg4, queue.popSync()); + // nothing remains in the Q + assertEquals(0, queue.messageQueue.size()); + // message 3 isn't lost - it is stored in `future` + assertSame(msg3, future.get()); } + // Not merging `concurrent_write_async_read` and `concurrent_write_sync_read`, because + // concurrent sync and async read may reorder messages + @Test @SneakyThrows - public void concurrent_read_write() { + public void concurrent_write_async_read() { var queue = new PubSubMessageQueue(); var numMessages = 1000; // test takes ~0.5 sec // collections aren't concurrent, since we have only 1 reader and 1 writer so far @@ -107,8 +158,8 @@ public void concurrent_read_write() { Runnable writer = () -> { - for (int i = 0; i < numMessages; i++) { - queue.push(expected.get(i)); + for (var message : expected) { + queue.push(message); try { Thread.sleep(rand.nextInt(2)); } catch (InterruptedException ignored) { @@ -120,9 +171,9 @@ public void concurrent_read_write() { PubSubMessage message = null; do { try { - message = queue.pop().get(15, TimeUnit.MILLISECONDS); + message = queue.popAsync().get(15, TimeUnit.MILLISECONDS); actual.add(message); - Thread.sleep(rand.nextInt(2)); + Thread.sleep(rand.nextInt(4)); } catch (TimeoutException ignored) { // all messages read break; @@ -130,6 +181,51 @@ public void concurrent_read_write() { } } while (message != null); }; + + // start reader and writer and wait for finish + CompletableFuture.allOf(CompletableFuture.runAsync(writer), CompletableFuture.runAsync(reader)) + .get(); + + // this verifies message order + assertEquals(expected, actual); + } + + @Test + @SneakyThrows + public void concurrent_write_sync_read() { + var queue = new PubSubMessageQueue(); + var numMessages = 1000; // test takes ~0.5 sec + // collections aren't concurrent, since we have only 1 reader and 1 writer so far + var expected = new LinkedList(); + var actual = new LinkedList(); + var rand = new Random(); + for (int i = 0; i < numMessages; i++) { + expected.add(new PubSubMessage(i + " " + UUID.randomUUID(), UUID.randomUUID().toString())); + } + + Runnable writer = + () -> { + for (var message : expected) { + queue.push(message); + try { + Thread.sleep(rand.nextInt(2)); + } catch (InterruptedException ignored) { + } + } + }; + Runnable reader = + () -> { + do { + try { + var message = queue.popSync(); + if (message != null) { + actual.add(message); + } + Thread.sleep(rand.nextInt(4)); + } catch (InterruptedException ignored) { + } + } while (actual.size() < expected.size()); + }; // start reader and writer and wait for finish CompletableFuture.allOf(CompletableFuture.runAsync(writer), CompletableFuture.runAsync(reader)) .get(); diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index bc707a7e74..afdf8904a6 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -613,7 +613,7 @@ public void coexistense_of_sync_and_async_read() { String prefix = "channel."; String pattern = prefix + "*"; String shardPrefix = "{shard}"; - int numChannels = 256; + int numChannels = 25; // 256; var messages = new ArrayList(numChannels * 2); var shardedMessages = new ArrayList(numChannels); Map> subscriptions = @@ -668,11 +668,10 @@ public void coexistense_of_sync_and_async_read() { break; // all messages read } } else { - try { - var message = - CompletableFuture.supplyAsync(listener::tryGetPubSubMessage).get(1, TimeUnit.SECONDS); + var message = listener.tryGetPubSubMessage(); + if (message != null) { received.add(message); - } catch (TimeoutException ignored) { + } else { break; // all messages read } } From 42d7907c67faa25847bbdf12854348bd9bf22879 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 4 Jul 2024 17:46:41 -0700 Subject: [PATCH 15/18] Fix tests. Signed-off-by: Yury-Fridlyand --- .../handlers/PubSubMessageQueueTests.java | 40 +++++++++++++++++++ .../src/test/java/glide/PubSubTests.java | 15 ++----- 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java index b36912e7e3..b7b1d63a6d 100644 --- a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java +++ b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java @@ -8,7 +8,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import glide.api.models.PubSubMessage; +import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -140,6 +142,44 @@ public void add_messages_then_read() { assertSame(msg3, future.get()); } + @Test + @SneakyThrows + public void getting_messages_reordered_on_concurrent_async_and_sync_read() { + var queue = new PubSubMessageQueue(); + var msg1 = new PubSubMessage("one", "one"); + var msg2 = new PubSubMessage("two", "two"); + queue.push(msg1); + queue.push(msg2); + + var readMessages = new ArrayList(2); + + // assuming thread 1 started async read + var future = queue.popAsync(); + // and got raced by thread 2 which reads sync + var msg = queue.popSync(); + readMessages.add(msg); + // then thread 1 continues + msg = future.get(); + readMessages.add(msg); + + // messages get reordered since stored into a single collection (even if is a concurrent one) + assertEquals(List.of(msg2, msg1), readMessages); + + // another example + + // reading async before anything added to the queue + future = queue.popAsync(); + // queue gets 2 messages + queue.push(msg1); + queue.push(msg2); + // but inside the queue only one is stored + assertEquals(1, queue.messageQueue.size()); + // then if we read sync, we receive only second one + assertSame(msg2, queue.popSync()); + // future gets resolved by the first message + assertSame(msg1, future.get()); + } + // Not merging `concurrent_write_async_read` and `concurrent_write_sync_read`, because // concurrent sync and async read may reorder messages diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index afdf8904a6..e9c33b4a4f 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -39,8 +39,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.SneakyThrows; @@ -160,16 +158,9 @@ private void verifyReceivedPubsubMessages( } else { // Sync var received = new HashSet(pubsubMessages.size()); PubSubMessage message; - do { - try { - message = - CompletableFuture.supplyAsync(listener::tryGetPubSubMessage).get(1, TimeUnit.SECONDS); - received.add(message); - } catch (TimeoutException ignored) { - // all messages read - break; - } - } while (message != null); + while ((message = listener.tryGetPubSubMessage()) != null) { + received.add(message); + } assertEquals( pubsubMessages.stream().map(Pair::getValue).collect(Collectors.toSet()), received); } From a20febd765c86a3416d13b7352749693b6266e3c Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 4 Jul 2024 20:13:51 -0700 Subject: [PATCH 16/18] Address PR comments. Signed-off-by: Yury-Fridlyand --- .../src/main/java/glide/api/BaseClient.java | 6 +-- .../handlers/PubSubMessageQueue.java | 39 ++++++++++++------- .../handlers/PubSubMessageQueueTests.java | 14 ++----- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 616f19e9c5..0be514ad93 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -388,12 +388,12 @@ public PubSubMessage tryGetPubSubMessage() { } /** - * Async returns a promise for a next pubsub message. + * Returns a promise for a next pubsub message.
+ * Message gets unrecoverable lost if future is cancelled or reference to this future is lost. * - * @apiNote Not implemented! * @throws ConfigurationError If client is not subscribed to any channel or if client configured * with a callback. - * @return A Future which resolved with the next incoming message. + * @return A {@link CompletableFuture} which will asynchronously hold the next available message. */ public CompletableFuture getPubSubMessage() { if (subscriptionConfiguration.isEmpty()) { diff --git a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java index 8696c92ab3..36420ee013 100644 --- a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java +++ b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java @@ -6,7 +6,10 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; -/** A FIFO message queue for {@link PubSubMessage}. */ +/** + * An asynchronous FIFO message queue for {@link PubSubMessage} backed by {@link + * ConcurrentLinkedDeque}. + */ public class PubSubMessageQueue { // fields are protected to ease testing /** The queue itself. */ @@ -19,33 +22,41 @@ public class PubSubMessageQueue { protected CompletableFuture firstMessagePromise = new CompletableFuture<>(); /** A flag whether a user already got a {@link #firstMessagePromise}. */ - private final AtomicBoolean headPromiseRequested = new AtomicBoolean(false); + private final AtomicBoolean firstMessagePromiseRequested = new AtomicBoolean(false); + + /** A private object used to synchronize {@link #push} and {@link #popAsync}. */ + private final Object lock = new Object(); // TODO Rework to remove or reduce `synchronized` blocks. If remove it now, some messages get // reordered. /** Store a new message. */ - public synchronized void push(PubSubMessage message) { - if (headPromiseRequested.getAndSet(false)) { - firstMessagePromise.complete(message); - firstMessagePromise = new CompletableFuture<>(); - return; - } + public void push(PubSubMessage message) { + synchronized (lock) { + if (firstMessagePromiseRequested.getAndSet(false)) { + firstMessagePromise.complete(message); + firstMessagePromise = new CompletableFuture<>(); + return; + } - messageQueue.addLast(message); + messageQueue.addLast(message); + } } /** Get a promise for a next message. */ public synchronized CompletableFuture popAsync() { - PubSubMessage message; - if ((message = messageQueue.poll()) != null) { + synchronized (lock) { + PubSubMessage message = messageQueue.poll(); + if (message == null) { + // this makes first incoming message to be delivered into `firstMessagePromise` instead of + // `messageQueue` + firstMessagePromiseRequested.set(true); + return firstMessagePromise; + } var future = new CompletableFuture(); future.complete(message); return future; } - - headPromiseRequested.set(true); - return firstMessagePromise; } /** Get a new message or null if nothing stored so far. */ diff --git a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java index b7b1d63a6d..5620e6640a 100644 --- a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java +++ b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java @@ -14,8 +14,6 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -208,18 +206,14 @@ public void concurrent_write_async_read() { }; Runnable reader = () -> { - PubSubMessage message = null; do { try { - message = queue.popAsync().get(15, TimeUnit.MILLISECONDS); + var message = queue.popAsync().get(); actual.add(message); - Thread.sleep(rand.nextInt(4)); - } catch (TimeoutException ignored) { - // all messages read - break; + Thread.sleep(rand.nextInt(2)); } catch (Exception ignored) { } - } while (message != null); + } while (actual.size() < expected.size()); }; // start reader and writer and wait for finish @@ -261,7 +255,7 @@ public void concurrent_write_sync_read() { if (message != null) { actual.add(message); } - Thread.sleep(rand.nextInt(4)); + Thread.sleep(rand.nextInt(2)); } catch (InterruptedException ignored) { } } while (actual.size() < expected.size()); From 899adce0ed79bb7ca0263bd9dd1bf7b1c4b8d409 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 4 Jul 2024 20:49:27 -0700 Subject: [PATCH 17/18] Address PR comments. Signed-off-by: Yury-Fridlyand --- .../java/glide/connectors/handlers/PubSubMessageQueue.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java index 36420ee013..8664ff0ee7 100644 --- a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java +++ b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java @@ -11,15 +11,14 @@ * ConcurrentLinkedDeque}. */ public class PubSubMessageQueue { - // fields are protected to ease testing /** The queue itself. */ - protected final ConcurrentLinkedDeque messageQueue = new ConcurrentLinkedDeque<>(); + final ConcurrentLinkedDeque messageQueue = new ConcurrentLinkedDeque<>(); /** * A promise for the first incoming message. Returned to a user, if message queried in async * manner, but nothing received yet. */ - protected CompletableFuture firstMessagePromise = new CompletableFuture<>(); + CompletableFuture firstMessagePromise = new CompletableFuture<>(); /** A flag whether a user already got a {@link #firstMessagePromise}. */ private final AtomicBoolean firstMessagePromiseRequested = new AtomicBoolean(false); From 5fa4368dac87acccc97da04e4d25f63195f0b26c Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 4 Jul 2024 20:53:08 -0700 Subject: [PATCH 18/18] Typo fix. Signed-off-by: Yury-Fridlyand --- .../main/java/glide/connectors/handlers/PubSubMessageQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java index 8664ff0ee7..de7c516b3c 100644 --- a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java +++ b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java @@ -43,7 +43,7 @@ public void push(PubSubMessage message) { } /** Get a promise for a next message. */ - public synchronized CompletableFuture popAsync() { + public CompletableFuture popAsync() { synchronized (lock) { PubSubMessage message = messageQueue.poll(); if (message == null) {