Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: async getPubSubMessage. #1770

Closed
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ jobs:
- 17
runs-on: ubuntu-latest
container: amazonlinux:latest
timeout-minutes: 15
timeout-minutes: 35
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
steps:
- name: Install git
run: |
Expand Down
21 changes: 11 additions & 10 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

import static glide.api.models.GlideString.gs;
import static glide.api.models.commands.SortBaseOptions.STORE_COMMAND_STRING;
import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldReadOnlySubCommands;
import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldSubCommands;
import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldArgs;
import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldGlideStringArgs;
import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
Expand Down Expand Up @@ -250,6 +248,7 @@
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.connectors.handlers.MessageHandler;
import glide.connectors.handlers.PubSubMessageQueue;
import glide.connectors.resources.Platform;
import glide.connectors.resources.ThreadPoolResource;
import glide.connectors.resources.ThreadPoolResourceAllocator;
Expand All @@ -265,14 +264,12 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.NotImplementedException;
import response.ResponseOuterClass.ConstantResponse;
import response.ResponseOuterClass.Response;

Expand All @@ -298,7 +295,7 @@ public abstract class BaseClient

protected final CommandManager commandManager;
protected final ConnectionManager connectionManager;
protected final ConcurrentLinkedDeque<PubSubMessage> messageQueue;
protected final PubSubMessageQueue messageQueue;
protected final Optional<BaseSubscriptionConfiguration> subscriptionConfiguration;

/** Helper which extracts data from received {@link Response}s from GLIDE. */
Expand All @@ -322,7 +319,7 @@ protected BaseClient(ClientBuilder builder) {
protected static class ClientBuilder {
private final ConnectionManager connectionManager;
private final CommandManager commandManager;
private final ConcurrentLinkedDeque<PubSubMessage> messageQueue;
private final PubSubMessageQueue messageQueue;
private final Optional<BaseSubscriptionConfiguration> subscriptionConfiguration;
}

Expand Down Expand Up @@ -366,7 +363,7 @@ protected static <T extends BaseClient> CompletableFuture<T> CreateClient(
}

/**
* Tries to return a next pubsub message.
* A <b>blocking</b> call to return a next pubsub message.
*
* @throws ConfigurationError If client is not subscribed to any channel or if client configured
* with a callback.
Expand All @@ -383,7 +380,12 @@ public PubSubMessage tryGetPubSubMessage() {
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
return messageQueue.poll();
try {
return messageQueue.pop().get();
} catch (Exception unreachable) {
// should be never happen
return null;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
Expand All @@ -405,8 +407,7 @@ public CompletableFuture<PubSubMessage> getPubSubMessage() {
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
throw new NotImplementedException(
"This feature will be supported in a future release of the GLIDE java client");
return messageQueue.pop();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public PubSubMessage(String message, String channel) {

@Override
public String toString() {
String res = String.format("%s, channel = %s", message, channel);
String res = String.format("(%s, channel = %s", message, channel);
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
if (pattern.isPresent()) {
res += ", pattern = " + pattern.get();
}
return res;
return res + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -34,7 +33,7 @@ public class MessageHandler {
private final BaseResponseResolver responseResolver;

/** A message queue wrapper. */
private final ConcurrentLinkedDeque<PubSubMessage> queue = new ConcurrentLinkedDeque<>();
private final PubSubMessageQueue queue = new PubSubMessageQueue();

/** Process a push (PUBSUB) message received as a part of {@link Response} from GLIDE. */
public void handle(Response response) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.connectors.handlers;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

import glide.api.models.PubSubMessage;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/** A FIFO message queue for {@link PubSubMessage}. */
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
public class PubSubMessageQueue {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be package-private.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compilation fails - BaseClient from glide.api uses it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems strange that it uses it directly. I would've thought it goes through MessageHandler.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see, you are creating a MessageHandler before the builder, then passing the handler in when creating the channel but passing the queue in to the client's constructor.

This is exposing a class from a non-public package to a non-private field in a public/protected class in an exposed package.

I am actually seeing that on a bunch of BaseClient classes. Let's discuss offline.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate from the above issue of internal classes in public APIs, I think BaseClient should not have a MessageQueue. It should always route through a MessageHandler that accesses a MessageQueue. I would MessageQueue a static inner class of MessageHandler (package-private for unit testing).

This would make it clear a MessageQueue can't really be on its own and is always tied to a MessageHandler.

// fields are protected to ease testing
/** The queue itself. */
protected final LinkedList<PubSubMessage> messageQueue = new LinkedList<>();
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

/**
* The head of the queue stored aside as a {@link Future}. Stores a promise to the first message.
*/
protected CompletableFuture<PubSubMessage> head = new CompletableFuture<>();

/** An object to synchronize threads. */
private final Object lock = new Object();

/** State of the queue. */
private enum HeadState {
// `head` is a new CF, which was never given to a user
UNSET_UNREAD,
// `head` is a non-empty CF, which was never given to a user
SET_UNREAD,
// `head` is unset, but was given to a user
UNSET_READ,
}

private HeadState state = HeadState.UNSET_UNREAD;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

/** Store a new message. */
public void push(PubSubMessage message) {
synchronized (lock) {
switch (state) {
case SET_UNREAD:
messageQueue.addLast(message);
break;
case UNSET_UNREAD:
head.complete(message);
state = HeadState.SET_UNREAD;
break;
case UNSET_READ:
head.complete(message);
head = new CompletableFuture<>();
state = HeadState.UNSET_UNREAD;
break;
}
}
}

/** Get a promise for the next message. */
public CompletableFuture<PubSubMessage> pop() {
synchronized (lock) {
CompletableFuture<PubSubMessage> result = head;
switch (state) {
case UNSET_UNREAD:
state = HeadState.UNSET_READ;
break;
case SET_UNREAD:
head = new CompletableFuture<>();
if (messageQueue.isEmpty()) {
state = HeadState.UNSET_UNREAD;
break;
}
head.complete(messageQueue.pop());
// no state change
break;
case UNSET_READ:
// no state change
break;
}
return result;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.connectors.handlers;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

import glide.api.models.PubSubMessage;
import java.util.LinkedList;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(30) // sec
public class PubSubMessageQueueTests {

private void checkFutureStatus(CompletableFuture<PubSubMessage> future, boolean shouldBeDone) {
assertEquals(shouldBeDone, future.isDone());
assertFalse(future.isCancelled());
assertFalse(future.isCompletedExceptionally());
}

@Test
@SneakyThrows
public void read_messages_then_add() {
var queue = new PubSubMessageQueue();
var promise1 = queue.pop();
var promise2 = queue.pop();

assertSame(promise1, promise2);
checkFutureStatus(promise1, false);
assertTrue(queue.messageQueue.isEmpty());

// now - add
var msg1 = new PubSubMessage("one", "one");
var msg2 = new PubSubMessage("two", "two");
var msg3 = new PubSubMessage("three", "three");
queue.push(msg1);
queue.push(msg2);
queue.push(msg3);

// promise should get resolved automagically
checkFutureStatus(promise1, true);
assertSame(msg1, promise1.get());
// and `head` too
checkFutureStatus(queue.head, true);
assertSame(msg2, queue.head.get());
// and the last message remains in the MQ
assertEquals(1, queue.messageQueue.size());
}

@Test
@SneakyThrows
public void add_messages_then_read() {
var queue = new PubSubMessageQueue();

var msg1 = new PubSubMessage("one", "one");
var msg2 = new PubSubMessage("two", "two");
var msg3 = new PubSubMessage("three", "three");
queue.push(msg1);
queue.push(msg2);
queue.push(msg3);

checkFutureStatus(queue.head, true);
// MQ stores only second and third message, first is stored in `head`
assertEquals(2, queue.messageQueue.size());
assertSame(msg1, queue.head.get());
assertSame(msg2, queue.messageQueue.peek());

// now - read
assertSame(msg1, queue.pop().get());
// second messages should be shifted to `head`
assertEquals(1, queue.messageQueue.size());
checkFutureStatus(queue.head, true);
assertSame(msg2, queue.head.get());
// keep reading
assertSame(msg2, queue.pop().get());
// MQ is empty, but `head` isn't
assertTrue(queue.messageQueue.isEmpty());
checkFutureStatus(queue.head, true);
assertSame(msg3, queue.head.get());
// read more
assertSame(msg3, queue.pop().get());
// MQ and `head` are both empty
assertTrue(queue.messageQueue.isEmpty());
checkFutureStatus(queue.head, false);
}

@Test
@SneakyThrows
public void concurrent_read_write() {
var queue = new PubSubMessageQueue();
var numMessages = 1000; // test takes ~0.5 sec
// collections aren't concurrent, since we have only 1 reader and 1 writer so far
var expected = new LinkedList<PubSubMessage>();
var actual = new LinkedList<PubSubMessage>();
var rand = new Random();
for (int i = 0; i < numMessages; i++) {
expected.add(new PubSubMessage(i + " " + UUID.randomUUID(), UUID.randomUUID().toString()));
}

Runnable writer =
() -> {
for (int i = 0; i < numMessages; i++) {
queue.push(expected.get(i));
try {
Thread.sleep(rand.nextInt(2));
} catch (InterruptedException ignored) {
}
}
};
Runnable reader =
() -> {
PubSubMessage message = null;
do {
try {
message = queue.pop().get(15, TimeUnit.MILLISECONDS);
actual.add(message);
Thread.sleep(rand.nextInt(2));
} catch (TimeoutException ignored) {
// all messages read
break;
} catch (Exception ignored) {
}
} while (message != null);
};
// start reader and writer and wait for finish
CompletableFuture.allOf(CompletableFuture.runAsync(writer), CompletableFuture.runAsync(reader))
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
.get();

// this verifies message order
assertEquals(expected, actual);
}
}
Loading
Loading