Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: async getPubSubMessage. #1770

Closed
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ jobs:
- 17
runs-on: ubuntu-latest
container: amazonlinux:latest
timeout-minutes: 20
timeout-minutes: 35
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
steps:
- name: Install git
run: |
Expand Down
20 changes: 9 additions & 11 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.connectors.handlers.MessageHandler;
import glide.connectors.handlers.PubSubMessageQueue;
import glide.connectors.resources.Platform;
import glide.connectors.resources.ThreadPoolResource;
import glide.connectors.resources.ThreadPoolResourceAllocator;
Expand All @@ -267,14 +268,12 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.NotImplementedException;
import response.ResponseOuterClass.ConstantResponse;
import response.ResponseOuterClass.Response;

Expand All @@ -300,7 +299,7 @@ public abstract class BaseClient

protected final CommandManager commandManager;
protected final ConnectionManager connectionManager;
protected final ConcurrentLinkedDeque<PubSubMessage> messageQueue;
protected final PubSubMessageQueue messageQueue;
protected final Optional<BaseSubscriptionConfiguration> subscriptionConfiguration;

/** Helper which extracts data from received {@link Response}s from GLIDE. */
Expand All @@ -324,7 +323,7 @@ protected BaseClient(ClientBuilder builder) {
protected static class ClientBuilder {
private final ConnectionManager connectionManager;
private final CommandManager commandManager;
private final ConcurrentLinkedDeque<PubSubMessage> messageQueue;
private final PubSubMessageQueue messageQueue;
private final Optional<BaseSubscriptionConfiguration> subscriptionConfiguration;
}

Expand Down Expand Up @@ -368,7 +367,7 @@ protected static <T extends BaseClient> CompletableFuture<T> CreateClient(
}

/**
* Tries to return a next pubsub message.
* Return a next pubsub message if it is present.
*
* @throws ConfigurationError If client is not subscribed to any channel or if client configured
* with a callback.
Expand All @@ -385,16 +384,16 @@ public PubSubMessage tryGetPubSubMessage() {
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
return messageQueue.poll();
return messageQueue.popSync();
}

/**
* Returns a promise for a next pubsub message.
* Returns a promise for a next pubsub message.<br>
* Message gets unrecoverable lost if future is cancelled or reference to this future is lost.
*
* @apiNote <b>Not implemented!</b>
* @throws ConfigurationError If client is not subscribed to any channel or if client configured
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* with a callback.
* @return A <code>Future</code> which resolved with the next incoming message.
* @return A {@link CompletableFuture} which will asynchronously hold the next available message.
*/
public CompletableFuture<PubSubMessage> getPubSubMessage() {
if (subscriptionConfiguration.isEmpty()) {
Expand All @@ -407,8 +406,7 @@ public CompletableFuture<PubSubMessage> getPubSubMessage() {
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
throw new NotImplementedException(
"This feature will be supported in a future release of the GLIDE java client");
return messageQueue.popAsync();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public PubSubMessage(String message, String channel) {

@Override
public String toString() {
String res = String.format("%s, channel = %s", message, channel);
String res = String.format("(%s, channel = %s", message, channel);
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
if (pattern.isPresent()) {
res += ", pattern = " + pattern.get();
}
return res;
return res + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -34,7 +33,7 @@ public class MessageHandler {
private final BaseResponseResolver responseResolver;

/** A message queue wrapper. */
private final ConcurrentLinkedDeque<PubSubMessage> queue = new ConcurrentLinkedDeque<>();
private final PubSubMessageQueue queue = new PubSubMessageQueue();

/** Process a push (PUBSUB) message received as a part of {@link Response} from GLIDE. */
public void handle(Response response) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.connectors.handlers;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

import glide.api.models.PubSubMessage;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An asynchronous FIFO message queue for {@link PubSubMessage} backed by {@link
* ConcurrentLinkedDeque}.
*/
public class PubSubMessageQueue {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be package-private.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compilation fails - BaseClient from glide.api uses it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems strange that it uses it directly. I would've thought it goes through MessageHandler.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see, you are creating a MessageHandler before the builder, then passing the handler in when creating the channel but passing the queue in to the client's constructor.

This is exposing a class from a non-public package to a non-private field in a public/protected class in an exposed package.

I am actually seeing that on a bunch of BaseClient classes. Let's discuss offline.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate from the above issue of internal classes in public APIs, I think BaseClient should not have a MessageQueue. It should always route through a MessageHandler that accesses a MessageQueue. I would MessageQueue a static inner class of MessageHandler (package-private for unit testing).

This would make it clear a MessageQueue can't really be on its own and is always tied to a MessageHandler.

// fields are protected to ease testing
/** The queue itself. */
protected final ConcurrentLinkedDeque<PubSubMessage> messageQueue = new ConcurrentLinkedDeque<>();

/**
* A promise for the first incoming message. Returned to a user, if message queried in async
* manner, but nothing received yet.
*/
protected CompletableFuture<PubSubMessage> firstMessagePromise = new CompletableFuture<>();
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

/** A flag whether a user already got a {@link #firstMessagePromise}. */
private final AtomicBoolean firstMessagePromiseRequested = new AtomicBoolean(false);

/** A private object used to synchronize {@link #push} and {@link #popAsync}. */
private final Object lock = new Object();

// TODO Rework to remove or reduce `synchronized` blocks. If remove it now, some messages get
// reordered.

/** Store a new message. */
public void push(PubSubMessage message) {
synchronized (lock) {
if (firstMessagePromiseRequested.getAndSet(false)) {
firstMessagePromise.complete(message);
firstMessagePromise = new CompletableFuture<>();
return;
}

messageQueue.addLast(message);
}
}

/** Get a promise for a next message. */
public synchronized CompletableFuture<PubSubMessage> popAsync() {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
synchronized (lock) {
PubSubMessage message = messageQueue.poll();
if (message == null) {
// this makes first incoming message to be delivered into `firstMessagePromise` instead of
// `messageQueue`
firstMessagePromiseRequested.set(true);
return firstMessagePromise;
}
var future = new CompletableFuture<PubSubMessage>();
future.complete(message);
return future;
}
}

/** Get a new message or null if nothing stored so far. */
public PubSubMessage popSync() {
return messageQueue.poll();
}
}
Loading
Loading