diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index 0a773d2f08..07443075ff 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -126,7 +126,7 @@ jobs: - 17 runs-on: ubuntu-latest container: amazonlinux:latest - timeout-minutes: 20 + timeout-minutes: 35 steps: - name: Install git run: | diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index b09be22bb1..0be514ad93 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -252,6 +252,7 @@ import glide.connectors.handlers.CallbackDispatcher; import glide.connectors.handlers.ChannelHandler; import glide.connectors.handlers.MessageHandler; +import glide.connectors.handlers.PubSubMessageQueue; import glide.connectors.resources.Platform; import glide.connectors.resources.ThreadPoolResource; import glide.connectors.resources.ThreadPoolResourceAllocator; @@ -267,14 +268,12 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.NotImplementedException; import response.ResponseOuterClass.ConstantResponse; import response.ResponseOuterClass.Response; @@ -300,7 +299,7 @@ public abstract class BaseClient protected final CommandManager commandManager; protected final ConnectionManager connectionManager; - protected final ConcurrentLinkedDeque messageQueue; + protected final PubSubMessageQueue messageQueue; protected final Optional subscriptionConfiguration; /** Helper which extracts data from received {@link Response}s from GLIDE. */ @@ -324,7 +323,7 @@ protected BaseClient(ClientBuilder builder) { protected static class ClientBuilder { private final ConnectionManager connectionManager; private final CommandManager commandManager; - private final ConcurrentLinkedDeque messageQueue; + private final PubSubMessageQueue messageQueue; private final Optional subscriptionConfiguration; } @@ -368,7 +367,7 @@ protected static CompletableFuture CreateClient( } /** - * Tries to return a next pubsub message. + * Return a next pubsub message if it is present. * * @throws ConfigurationError If client is not subscribed to any channel or if client configured * with a callback. @@ -385,16 +384,16 @@ public PubSubMessage tryGetPubSubMessage() { "The operation will never complete since messages will be passed to the configured" + " callback."); } - return messageQueue.poll(); + return messageQueue.popSync(); } /** - * Returns a promise for a next pubsub message. + * Returns a promise for a next pubsub message.
+ * Message gets unrecoverable lost if future is cancelled or reference to this future is lost. * - * @apiNote Not implemented! * @throws ConfigurationError If client is not subscribed to any channel or if client configured * with a callback. - * @return A Future which resolved with the next incoming message. + * @return A {@link CompletableFuture} which will asynchronously hold the next available message. */ public CompletableFuture getPubSubMessage() { if (subscriptionConfiguration.isEmpty()) { @@ -407,8 +406,7 @@ public CompletableFuture getPubSubMessage() { "The operation will never complete since messages will be passed to the configured" + " callback."); } - throw new NotImplementedException( - "This feature will be supported in a future release of the GLIDE java client"); + return messageQueue.popAsync(); } /** diff --git a/java/client/src/main/java/glide/api/models/PubSubMessage.java b/java/client/src/main/java/glide/api/models/PubSubMessage.java index c3109956f1..43d6bc1d93 100644 --- a/java/client/src/main/java/glide/api/models/PubSubMessage.java +++ b/java/client/src/main/java/glide/api/models/PubSubMessage.java @@ -32,10 +32,10 @@ public PubSubMessage(String message, String channel) { @Override public String toString() { - String res = String.format("%s, channel = %s", message, channel); + String res = String.format("(%s, channel = %s", message, channel); if (pattern.isPresent()) { res += ", pattern = " + pattern.get(); } - return res; + return res + ")"; } } diff --git a/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java index da2d16502a..a5c1002452 100644 --- a/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java @@ -9,7 +9,6 @@ import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -34,7 +33,7 @@ public class MessageHandler { private final BaseResponseResolver responseResolver; /** A message queue wrapper. */ - private final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); + private final PubSubMessageQueue queue = new PubSubMessageQueue(); /** Process a push (PUBSUB) message received as a part of {@link Response} from GLIDE. */ public void handle(Response response) { diff --git a/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java new file mode 100644 index 0000000000..de7c516b3c --- /dev/null +++ b/java/client/src/main/java/glide/connectors/handlers/PubSubMessageQueue.java @@ -0,0 +1,65 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.connectors.handlers; + +import glide.api.models.PubSubMessage; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * An asynchronous FIFO message queue for {@link PubSubMessage} backed by {@link + * ConcurrentLinkedDeque}. + */ +public class PubSubMessageQueue { + /** The queue itself. */ + final ConcurrentLinkedDeque messageQueue = new ConcurrentLinkedDeque<>(); + + /** + * A promise for the first incoming message. Returned to a user, if message queried in async + * manner, but nothing received yet. + */ + CompletableFuture firstMessagePromise = new CompletableFuture<>(); + + /** A flag whether a user already got a {@link #firstMessagePromise}. */ + private final AtomicBoolean firstMessagePromiseRequested = new AtomicBoolean(false); + + /** A private object used to synchronize {@link #push} and {@link #popAsync}. */ + private final Object lock = new Object(); + + // TODO Rework to remove or reduce `synchronized` blocks. If remove it now, some messages get + // reordered. + + /** Store a new message. */ + public void push(PubSubMessage message) { + synchronized (lock) { + if (firstMessagePromiseRequested.getAndSet(false)) { + firstMessagePromise.complete(message); + firstMessagePromise = new CompletableFuture<>(); + return; + } + + messageQueue.addLast(message); + } + } + + /** Get a promise for a next message. */ + public CompletableFuture popAsync() { + synchronized (lock) { + PubSubMessage message = messageQueue.poll(); + if (message == null) { + // this makes first incoming message to be delivered into `firstMessagePromise` instead of + // `messageQueue` + firstMessagePromiseRequested.set(true); + return firstMessagePromise; + } + var future = new CompletableFuture(); + future.complete(message); + return future; + } + } + + /** Get a new message or null if nothing stored so far. */ + public PubSubMessage popSync() { + return messageQueue.poll(); + } +} diff --git a/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java new file mode 100644 index 0000000000..5620e6640a --- /dev/null +++ b/java/client/src/test/java/glide/connectors/handlers/PubSubMessageQueueTests.java @@ -0,0 +1,270 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.connectors.handlers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import glide.api.models.PubSubMessage; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(30) // sec +public class PubSubMessageQueueTests { + + private void checkFutureStatus(CompletableFuture future, boolean shouldBeDone) { + assertEquals(shouldBeDone, future.isDone()); + assertFalse(future.isCancelled()); + assertFalse(future.isCompletedExceptionally()); + } + + @Test + @SneakyThrows + public void async_read_messages_then_add() { + var queue = new PubSubMessageQueue(); + + // read async - receiving promises + var promise1 = queue.popAsync(); + var promise2 = queue.popAsync(); + + // async reading from empty queue returns the same future for the same message + assertSame(promise1, promise2); + checkFutureStatus(promise1, false); + assertTrue(queue.messageQueue.isEmpty()); + + // now - add + var msg1 = new PubSubMessage("one", "one"); + var msg2 = new PubSubMessage("two", "two"); + var msg3 = new PubSubMessage("three", "three"); + queue.push(msg1); + queue.push(msg2); + queue.push(msg3); + + // promises should get resolved automagically + checkFutureStatus(promise1, true); + assertSame(msg1, promise1.get()); + // `firstMessagePromise` is a new uncompleted future + checkFutureStatus(queue.firstMessagePromise, false); + // and `msg1` isn't stored in the Q + assertEquals(2, queue.messageQueue.size()); + assertSame(msg2, queue.messageQueue.pop()); + assertSame(msg3, queue.messageQueue.pop()); + // now MQ should be empty + assertTrue(queue.messageQueue.isEmpty()); + } + + @Test + @SneakyThrows + public void sync_read_messages_then_add() { + var queue = new PubSubMessageQueue(); + + // read async - receiving nulls + assertNull(queue.popSync()); + assertNull(queue.popSync()); + + // `firstMessagePromise` remains unset and unused + checkFutureStatus(queue.firstMessagePromise, false); + // and Q is empty + assertTrue(queue.messageQueue.isEmpty()); + + // now - add + var msg1 = new PubSubMessage("one", "one"); + var msg2 = new PubSubMessage("two", "two"); + var msg3 = new PubSubMessage("three", "three"); + queue.push(msg1); + queue.push(msg2); + queue.push(msg3); + + // `firstMessagePromise` remains unset and unused + checkFutureStatus(queue.firstMessagePromise, false); + // all 3 messages are stored in the Q + assertEquals(3, queue.messageQueue.size()); + + // reading them + assertSame(msg1, queue.popSync()); + assertSame(msg2, queue.popSync()); + assertSame(msg3, queue.popSync()); + } + + @Test + @SneakyThrows + public void add_messages_then_read() { + var queue = new PubSubMessageQueue(); + + var msg1 = new PubSubMessage("one", "one"); + var msg2 = new PubSubMessage("two", "two"); + var msg3 = new PubSubMessage("three", "three"); + var msg4 = new PubSubMessage("four", "four"); + queue.push(msg1); + queue.push(msg2); + queue.push(msg3); + queue.push(msg4); + + // `firstMessagePromise` remains unset and unused + checkFutureStatus(queue.firstMessagePromise, false); + // all messages are stored in the Q + assertEquals(4, queue.messageQueue.size()); + + // now - read one async + assertSame(msg1, queue.popAsync().get()); + // `firstMessagePromise` remains unset and unused + checkFutureStatus(queue.firstMessagePromise, false); + // Q stores remaining 3 messages + assertEquals(3, queue.messageQueue.size()); + + // read sync + assertSame(msg2, queue.popSync()); + checkFutureStatus(queue.firstMessagePromise, false); + assertEquals(2, queue.messageQueue.size()); + + // keep reading + // get a future for the next message + var future = queue.popAsync(); + checkFutureStatus(future, true); + checkFutureStatus(queue.firstMessagePromise, false); + assertEquals(1, queue.messageQueue.size()); + // then read sync + assertSame(msg4, queue.popSync()); + // nothing remains in the Q + assertEquals(0, queue.messageQueue.size()); + // message 3 isn't lost - it is stored in `future` + assertSame(msg3, future.get()); + } + + @Test + @SneakyThrows + public void getting_messages_reordered_on_concurrent_async_and_sync_read() { + var queue = new PubSubMessageQueue(); + var msg1 = new PubSubMessage("one", "one"); + var msg2 = new PubSubMessage("two", "two"); + queue.push(msg1); + queue.push(msg2); + + var readMessages = new ArrayList(2); + + // assuming thread 1 started async read + var future = queue.popAsync(); + // and got raced by thread 2 which reads sync + var msg = queue.popSync(); + readMessages.add(msg); + // then thread 1 continues + msg = future.get(); + readMessages.add(msg); + + // messages get reordered since stored into a single collection (even if is a concurrent one) + assertEquals(List.of(msg2, msg1), readMessages); + + // another example + + // reading async before anything added to the queue + future = queue.popAsync(); + // queue gets 2 messages + queue.push(msg1); + queue.push(msg2); + // but inside the queue only one is stored + assertEquals(1, queue.messageQueue.size()); + // then if we read sync, we receive only second one + assertSame(msg2, queue.popSync()); + // future gets resolved by the first message + assertSame(msg1, future.get()); + } + + // Not merging `concurrent_write_async_read` and `concurrent_write_sync_read`, because + // concurrent sync and async read may reorder messages + + @Test + @SneakyThrows + public void concurrent_write_async_read() { + var queue = new PubSubMessageQueue(); + var numMessages = 1000; // test takes ~0.5 sec + // collections aren't concurrent, since we have only 1 reader and 1 writer so far + var expected = new LinkedList(); + var actual = new LinkedList(); + var rand = new Random(); + for (int i = 0; i < numMessages; i++) { + expected.add(new PubSubMessage(i + " " + UUID.randomUUID(), UUID.randomUUID().toString())); + } + + Runnable writer = + () -> { + for (var message : expected) { + queue.push(message); + try { + Thread.sleep(rand.nextInt(2)); + } catch (InterruptedException ignored) { + } + } + }; + Runnable reader = + () -> { + do { + try { + var message = queue.popAsync().get(); + actual.add(message); + Thread.sleep(rand.nextInt(2)); + } catch (Exception ignored) { + } + } while (actual.size() < expected.size()); + }; + + // start reader and writer and wait for finish + CompletableFuture.allOf(CompletableFuture.runAsync(writer), CompletableFuture.runAsync(reader)) + .get(); + + // this verifies message order + assertEquals(expected, actual); + } + + @Test + @SneakyThrows + public void concurrent_write_sync_read() { + var queue = new PubSubMessageQueue(); + var numMessages = 1000; // test takes ~0.5 sec + // collections aren't concurrent, since we have only 1 reader and 1 writer so far + var expected = new LinkedList(); + var actual = new LinkedList(); + var rand = new Random(); + for (int i = 0; i < numMessages; i++) { + expected.add(new PubSubMessage(i + " " + UUID.randomUUID(), UUID.randomUUID().toString())); + } + + Runnable writer = + () -> { + for (var message : expected) { + queue.push(message); + try { + Thread.sleep(rand.nextInt(2)); + } catch (InterruptedException ignored) { + } + } + }; + Runnable reader = + () -> { + do { + try { + var message = queue.popSync(); + if (message != null) { + actual.add(message); + } + Thread.sleep(rand.nextInt(2)); + } catch (InterruptedException ignored) { + } + } while (actual.size() < expected.size()); + }; + // start reader and writer and wait for finish + CompletableFuture.allOf(CompletableFuture.runAsync(writer), CompletableFuture.runAsync(reader)) + .get(); + + // this verifies message order + assertEquals(expected, actual); + } +} diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 7bdb8870c2..e9c33b4a4f 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -29,11 +29,14 @@ import glide.api.models.exceptions.RequestException; import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -45,6 +48,7 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; @@ -130,27 +134,47 @@ public void cleanup() { pubsubMessageQueue.clear(); } + private enum MessageReadMethod { + Callback, + Async, + Sync + } + + @SneakyThrows private void verifyReceivedPubsubMessages( - Set> pubsubMessages, BaseClient listener, boolean callback) { - if (callback) { + Set> pubsubMessages, + BaseClient listener, + MessageReadMethod method) { + if (method == MessageReadMethod.Callback) { assertEquals(pubsubMessages, new HashSet<>(pubsubMessageQueue)); - } else { + } else if (method == MessageReadMethod.Async) { + var received = new HashSet(pubsubMessages.size()); + CompletableFuture messagePromise; + while ((messagePromise = listener.getPubSubMessage()).isDone()) { + received.add(messagePromise.get()); + } + assertEquals( + pubsubMessages.stream().map(Pair::getValue).collect(Collectors.toSet()), received); + } else { // Sync var received = new HashSet(pubsubMessages.size()); - PubSubMessage pubsubMessage; - while ((pubsubMessage = listener.tryGetPubSubMessage()) != null) { - received.add(pubsubMessage); + PubSubMessage message; + while ((message = listener.tryGetPubSubMessage()) != null) { + received.add(message); } assertEquals( pubsubMessages.stream().map(Pair::getValue).collect(Collectors.toSet()), received); } } - private static Stream getTwoBoolPermutations() { + /** Permute all combinations of `standalone` as bool vs {@link MessageReadMethod}. */ + private static Stream getTestScenarios() { return Stream.of( - Arguments.of(true, true), - Arguments.of(true, false), - Arguments.of(false, true), - Arguments.of(false, false)); + Arguments.of(true, MessageReadMethod.Callback), + Arguments.of(true, MessageReadMethod.Sync), + Arguments.of(true, MessageReadMethod.Async), + Arguments.of(false, MessageReadMethod.Callback), + Arguments.of(false, MessageReadMethod.Sync), + Arguments.of(false, MessageReadMethod.Async)); } private ChannelMode exact(boolean standalone) { @@ -164,24 +188,19 @@ private ChannelMode pattern(boolean standalone) { @SuppressWarnings("unchecked") private BaseClient createListener( boolean standalone, - boolean useCallback, + boolean withCallback, int clientId, Map> subscriptions) { MessageCallback callback = (msg, ctx) -> ((ConcurrentLinkedDeque>) ctx) .push(Pair.of(clientId, msg)); - return useCallback + return withCallback ? createClientWithSubscriptions( standalone, subscriptions, Optional.of(callback), Optional.of(pubsubMessageQueue)) : createClientWithSubscriptions(standalone, subscriptions); } - // TODO add following tests from https://github.com/aws/glide-for-redis/pull/1643 - // test_pubsub_exact_happy_path_coexistence - // test_pubsub_exact_happy_path_many_channels_co_existence - // test_sharded_pubsub_co_existence - // test_pubsub_pattern_co_existence // TODO tests below blocked by https://github.com/aws/glide-for-redis/issues/1649 // test_pubsub_exact_max_size_PubsubMessage // test_pubsub_sharded_max_size_PubsubMessage @@ -201,15 +220,16 @@ private void skipTestsOnMac() { /** Similar to `test_pubsub_exact_happy_path` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void exact_happy_path(boolean standalone, boolean useCallback) { + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") + @MethodSource("getTestScenarios") + public void exact_happy_path(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); String channel = UUID.randomUUID().toString(); String message = UUID.randomUUID().toString(); var subscriptions = Map.of(exact(standalone), Set.of(channel)); - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -217,14 +237,14 @@ public void exact_happy_path(boolean standalone, boolean useCallback) { Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message verifyReceivedPubsubMessages( - Set.of(Pair.of(1, new PubSubMessage(message, channel))), listener, useCallback); + Set.of(Pair.of(1, new PubSubMessage(message, channel))), listener, method); } /** Similar to `test_pubsub_exact_happy_path_many_channels` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void exact_happy_path_many_channels(boolean standalone, boolean useCallback) { + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") + @MethodSource("getTestScenarios") + public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); int numChannels = 256; int messagesPerChannel = 256; @@ -241,7 +261,8 @@ public void exact_happy_path_many_channels(boolean standalone, boolean useCallba } } - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -252,16 +273,14 @@ public void exact_happy_path_many_channels(boolean standalone, boolean useCallba Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages verifyReceivedPubsubMessages( - messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), - listener, - useCallback); + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); } /** Similar to `test_sharded_pubsub` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "use callback = {0}") - @ValueSource(booleans = {true, false}) - public void sharded_pubsub(boolean useCallback) { + @ParameterizedTest + @EnumSource(MessageReadMethod.class) + public void sharded_pubsub(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); @@ -269,7 +288,7 @@ public void sharded_pubsub(boolean useCallback) { String pubsubMessage = UUID.randomUUID().toString(); var subscriptions = Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); - var listener = createListener(false, useCallback, 1, subscriptions); + var listener = createListener(false, method == MessageReadMethod.Callback, 1, subscriptions); var sender = (RedisClusterClient) createClient(false); clients.addAll(List.of(listener, sender)); @@ -277,14 +296,14 @@ public void sharded_pubsub(boolean useCallback) { Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message verifyReceivedPubsubMessages( - Set.of(Pair.of(1, new PubSubMessage(pubsubMessage, channel))), listener, useCallback); + Set.of(Pair.of(1, new PubSubMessage(pubsubMessage, channel))), listener, method); } /** Similar to `test_sharded_pubsub_many_channels` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "use callback = {0}") - @ValueSource(booleans = {true, false}) - public void sharded_pubsub_many_channels(boolean useCallback) { + @ParameterizedTest + @EnumSource(MessageReadMethod.class) + public void sharded_pubsub_many_channels(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); @@ -303,7 +322,7 @@ public void sharded_pubsub_many_channels(boolean useCallback) { } } - var listener = createListener(false, useCallback, 1, subscriptions); + var listener = createListener(false, method == MessageReadMethod.Callback, 1, subscriptions); var sender = (RedisClusterClient) createClient(false); clients.addAll(List.of(listener, sender)); @@ -317,14 +336,14 @@ public void sharded_pubsub_many_channels(boolean useCallback) { verifyReceivedPubsubMessages( pubsubMessages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, - useCallback); + method); } /** Similar to `test_pubsub_pattern` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void pattern(boolean standalone, boolean useCallback) { + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") + @MethodSource("getTestScenarios") + public void pattern(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -336,7 +355,8 @@ public void pattern(boolean standalone, boolean useCallback) { standalone ? PubSubChannelMode.PATTERN : PubSubClusterChannelMode.PATTERN, Set.of(pattern)); - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -353,14 +373,14 @@ public void pattern(boolean standalone, boolean useCallback) { .map(e -> Pair.of(1, new PubSubMessage(e.getValue(), e.getKey(), pattern))) .collect(Collectors.toSet()); - verifyReceivedPubsubMessages(expected, listener, useCallback); + verifyReceivedPubsubMessages(expected, listener, method); } /** Similar to `test_pubsub_pattern_many_channels` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void pattern_many_channels(boolean standalone, boolean useCallback) { + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") + @MethodSource("getTestScenarios") + public void pattern_many_channels(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -378,7 +398,8 @@ public void pattern_many_channels(boolean standalone, boolean useCallback) { } } - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -391,16 +412,14 @@ public void pattern_many_channels(boolean standalone, boolean useCallback) { Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages verifyReceivedPubsubMessages( - messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), - listener, - useCallback); + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); } /** Similar to `test_pubsub_combined_exact_and_pattern_one_client` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void combined_exact_and_pattern_one_client(boolean standalone, boolean useCallback) { + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") + @MethodSource("getTestScenarios") + public void combined_exact_and_pattern_one_client(boolean standalone, MessageReadMethod method) { skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -430,7 +449,8 @@ public void combined_exact_and_pattern_one_client(boolean standalone, boolean us messages.add(new PubSubMessage(pubsubMessage, channel, pattern)); } - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -441,18 +461,17 @@ public void combined_exact_and_pattern_one_client(boolean standalone, boolean us Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages verifyReceivedPubsubMessages( - messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), - listener, - useCallback); + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); } /** * Similar to `test_pubsub_combined_exact_and_pattern_multiple_clients` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void combined_exact_and_pattern_multiple_clients(boolean standalone, boolean useCallback) { + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") + @MethodSource("getTestScenarios") + public void combined_exact_and_pattern_multiple_clients( + boolean standalone, MessageReadMethod method) { skipTestsOnMac(); String prefix = "channel."; String pattern = prefix + "*"; @@ -474,10 +493,12 @@ public void combined_exact_and_pattern_multiple_clients(boolean standalone, bool messages.add(new PubSubMessage(message, channel, pattern)); } - var listenerExactSub = createListener(standalone, useCallback, 1, subscriptions); + var listenerExactSub = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); subscriptions = Map.of(pattern(standalone), Set.of(pattern)); - var listenerPatternSub = createListener(standalone, useCallback, 2, subscriptions); + var listenerPatternSub = + createListener(standalone, method == MessageReadMethod.Callback, 2, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listenerExactSub, listenerPatternSub, sender)); @@ -488,13 +509,13 @@ public void combined_exact_and_pattern_multiple_clients(boolean standalone, bool Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages - if (useCallback) { + if (method == MessageReadMethod.Callback) { verifyReceivedPubsubMessages( messages.stream() .map(m -> Pair.of(m.getPattern().isEmpty() ? 1 : 2, m)) .collect(Collectors.toSet()), listenerExactSub, - useCallback); + method); } else { verifyReceivedPubsubMessages( messages.stream() @@ -502,14 +523,14 @@ public void combined_exact_and_pattern_multiple_clients(boolean standalone, bool .map(m -> Pair.of(1, m)) .collect(Collectors.toSet()), listenerExactSub, - useCallback); + method); verifyReceivedPubsubMessages( messages.stream() .filter(m -> m.getPattern().isPresent()) .map(m -> Pair.of(2, m)) .collect(Collectors.toSet()), listenerPatternSub, - useCallback); + method); } } @@ -517,9 +538,9 @@ public void combined_exact_and_pattern_multiple_clients(boolean standalone, bool * Similar to `test_pubsub_combined_exact_pattern_and_sharded_one_client` in python client tests. */ @SneakyThrows - @ParameterizedTest(name = "use callback = {0}") - @ValueSource(booleans = {true, false}) - public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { + @ParameterizedTest + @EnumSource(MessageReadMethod.class) + public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); @@ -555,7 +576,7 @@ public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { messages.add(new PubSubMessage(message, channel, pattern)); } - var listener = createListener(false, useCallback, 1, subscriptions); + var listener = createListener(false, method == MessageReadMethod.Callback, 1, subscriptions); var sender = (RedisClusterClient) createClient(false); clients.addAll(List.of(listener, sender)); @@ -570,9 +591,86 @@ public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { messages.addAll(shardedMessages); verifyReceivedPubsubMessages( - messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), - listener, - useCallback); + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), listener, method); + } + + /** This test fully covers all `test_pubsub_*_co_existence` tests in python client. */ + @SneakyThrows + @Test + public void coexistense_of_sync_and_async_read() { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); + + String prefix = "channel."; + String pattern = prefix + "*"; + String shardPrefix = "{shard}"; + int numChannels = 25; // 256; + var messages = new ArrayList(numChannels * 2); + var shardedMessages = new ArrayList(numChannels); + Map> subscriptions = + Map.of( + PubSubClusterChannelMode.EXACT, new HashSet<>(), + PubSubClusterChannelMode.PATTERN, Set.of(pattern), + PubSubClusterChannelMode.SHARDED, new HashSet<>()); + + for (var i = 0; i < numChannels; i++) { + var channel = i + "-" + UUID.randomUUID(); + subscriptions.get(PubSubClusterChannelMode.EXACT).add(channel); + var message = i + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel)); + } + + for (var i = 0; i < numChannels; i++) { + var channel = shardPrefix + "-" + i + "-" + UUID.randomUUID(); + subscriptions.get(PubSubClusterChannelMode.SHARDED).add(channel); + var message = i + "-" + UUID.randomUUID(); + shardedMessages.add(new PubSubMessage(message, channel)); + } + + for (var j = 0; j < numChannels; j++) { + var message = j + "-" + UUID.randomUUID(); + var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel, pattern)); + } + + var listener = createListener(false, false, 1, subscriptions); + var sender = (RedisClusterClient) createClient(false); + clients.addAll(List.of(listener, sender)); + + for (var pubsubMessage : messages) { + sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + for (var pubsubMessage : shardedMessages) { + sender.spublish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + messages.addAll(shardedMessages); + + var received = new HashSet(messages.size()); + var rand = new Random(); + while (true) { + if (rand.nextBoolean()) { + CompletableFuture messagePromise = listener.getPubSubMessage(); + if (messagePromise.isDone()) { + received.add(messagePromise.get()); + } else { + break; // all messages read + } + } else { + var message = listener.tryGetPubSubMessage(); + if (message != null) { + received.add(message); + } else { + break; // all messages read + } + } + } + + // redis can reorder the messages, so we can't validate that the order (without big delays + // between sends) + assertEquals(new LinkedHashSet<>(messages), received); } /** @@ -580,9 +678,9 @@ public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { * tests. */ @SneakyThrows - @ParameterizedTest(name = "use callback = {0}") - @ValueSource(booleans = {true, false}) - public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) { + @ParameterizedTest + @EnumSource(MessageReadMethod.class) + public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); @@ -622,13 +720,22 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) var listenerExact = createListener( - false, useCallback, PubSubClusterChannelMode.EXACT.ordinal(), subscriptionsExact); + false, + method == MessageReadMethod.Callback, + PubSubClusterChannelMode.EXACT.ordinal(), + subscriptionsExact); var listenerPattern = createListener( - false, useCallback, PubSubClusterChannelMode.PATTERN.ordinal(), subscriptionsPattern); + false, + method == MessageReadMethod.Callback, + PubSubClusterChannelMode.PATTERN.ordinal(), + subscriptionsPattern); var listenerSharded = createListener( - false, useCallback, PubSubClusterChannelMode.SHARDED.ordinal(), subscriptionsSharded); + false, + method == MessageReadMethod.Callback, + PubSubClusterChannelMode.SHARDED.ordinal(), + subscriptionsSharded); var sender = (RedisClusterClient) createClient(false); clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded, sender)); @@ -645,7 +752,7 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages - if (useCallback) { + if (method == MessageReadMethod.Callback) { var expected = new HashSet>(); expected.addAll( exactMessages.stream() @@ -660,26 +767,26 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) .map(m -> Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), m)) .collect(Collectors.toSet())); - verifyReceivedPubsubMessages(expected, listenerExact, useCallback); + verifyReceivedPubsubMessages(expected, listenerExact, method); } else { verifyReceivedPubsubMessages( exactMessages.stream() .map(m -> Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), m)) .collect(Collectors.toSet()), listenerExact, - useCallback); + method); verifyReceivedPubsubMessages( patternMessages.stream() .map(m -> Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), m)) .collect(Collectors.toSet()), listenerPattern, - useCallback); + method); verifyReceivedPubsubMessages( shardedMessages.stream() .map(m -> Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), m)) .collect(Collectors.toSet()), listenerSharded, - useCallback); + method); } } @@ -688,11 +795,13 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) * tests. */ @SneakyThrows - @Test - public void three_publishing_clients_same_name_with_sharded_no_callback() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void three_publishing_clients_same_name_with_sharded_no_callback(boolean sync) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); + MessageReadMethod method = sync ? MessageReadMethod.Sync : MessageReadMethod.Async; String channel = UUID.randomUUID().toString(); var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); var patternMessage = new PubSubMessage(UUID.randomUUID().toString(), channel, channel); @@ -725,7 +834,7 @@ public void three_publishing_clients_same_name_with_sharded_no_callback() { PubSubClusterChannelMode.EXACT.ordinal(), new PubSubMessage(patternMessage.getMessage(), channel))), listenerExact, - false); + method); verifyReceivedPubsubMessages( Set.of( Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage), @@ -733,11 +842,11 @@ public void three_publishing_clients_same_name_with_sharded_no_callback() { PubSubClusterChannelMode.PATTERN.ordinal(), new PubSubMessage(exactMessage.getMessage(), channel, channel))), listenerPattern, - false); + method); verifyReceivedPubsubMessages( Set.of(Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)), listenerSharded, - false); + method); } /** @@ -792,7 +901,7 @@ public void three_publishing_clients_same_name_with_sharded_with_callback() { new PubSubMessage(exactMessage.getMessage(), channel, channel)), Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)); - verifyReceivedPubsubMessages(expected, listenerExact, true); + verifyReceivedPubsubMessages(expected, listenerExact, MessageReadMethod.Callback); } @SneakyThrows @@ -829,10 +938,9 @@ public void error_cases() { } @SneakyThrows - @ParameterizedTest(name = "standalone = {0}, use callback = {1}") - @MethodSource("getTwoBoolPermutations") - public void transaction_with_all_types_of_PubsubMessages( - boolean standalone, boolean useCallback) { + @ParameterizedTest(name = "standalone = {0}, read messages via {1}") + @MethodSource("getTestScenarios") + public void transaction_with_all_types_of_messages(boolean standalone, MessageReadMethod method) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); assumeTrue( @@ -862,7 +970,8 @@ public void transaction_with_all_types_of_PubsubMessages( PubSubClusterChannelMode.SHARDED, Set.of(shardPrefix)); - var listener = createListener(standalone, useCallback, 1, subscriptions); + var listener = + createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); @@ -888,6 +997,6 @@ public void transaction_with_all_types_of_PubsubMessages( ? Set.of(Pair.of(1, exactMessage), Pair.of(1, patternMessage)) : Set.of( Pair.of(1, exactMessage), Pair.of(1, patternMessage), Pair.of(1, shardedMessage)); - verifyReceivedPubsubMessages(expected, listener, useCallback); + verifyReceivedPubsubMessages(expected, listener, method); } }