Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Pub & Sub = <3 #1662

Merged
merged 13 commits into from
Jul 1, 2024
Merged

Conversation

Yury-Fridlyand
Copy link
Collaborator

@Yury-Fridlyand Yury-Fridlyand commented Jun 26, 2024

Issue #, if available:
PubSub support for Java client
Ref: #1602, #1616, #1650, #1643 and #1741

Description of changes:

  • Configuration for pubsub
  • Receiving pubsub messages
  • Sending messages
  • UT
  • IT

Configuration with callback:

    @Test
    @SneakyThrows
    public void basic_client() {
        MessageCallback callback =
            (msg, context) -> System.out.printf("Received %s, context %s\n", msg, context);

        var regularClient =
                RedisClient.CreateClient(
                                RedisClientConfiguration.builder()
                                        .address(NodeAddress.builder().port(6379).build())
                                        .requestTimeout(3000)
                                        .subscriptionConfiguration( // subscribe happens here
                                                StandaloneSubscriptionConfiguration.builder()
                                                        .subscription(EXACT, "ch1")
                                                        .subscription(PATTERN, "chat*")
                                                        .callback(callback)
                                                        .build())
                                        .build())
                        .get();

        Thread.sleep(100500);
        regularClient.close(); // unsubscribe happens here
    }

To send messages use

client.publish(channel, message).get();
clusterClient.spublish(channel, message).get();

Configuration without callback:

    @Test
    @SneakyThrows
    public void basic_client() {
        var regularClient =
                RedisClient.CreateClient(
                                RedisClientConfiguration.builder()
                                        .address(NodeAddress.builder().port(6379).build())
                                        .requestTimeout(3000)
                                        .subscriptionConfiguration(
                                                StandaloneSubscriptionConfiguration.builder()
                                                        .subscription(EXACT, "ch1")
                                                        .subscription(PATTERN, "chat*")
                                                        .build())
                                        .build())
                        .get();

        Thread.sleep(100500);

        Message msg = null;
        while ((msg = regularClient.tryGetPubSubMessage()) != null) {
            System.out.printf("Received from queue %s\n", msg);
        }
    }

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

* Add client configuartion for subscribing to channels.

Signed-off-by: Yury-Fridlyand <[email protected]>

* CLIPPY I HATE YOU

Signed-off-by: Yury-Fridlyand <[email protected]>

* Get and store callback.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rework configuration and add docs.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Config rework.

Signed-off-by: Yury-Fridlyand <[email protected]>

* docs

Signed-off-by: Yury-Fridlyand <[email protected]>

* More TODOs for the god of TODOs.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
* Add `PUBLISH` and `SPUBLISH` commands.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix the test.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
* Add client configuartion for subscribing to channels.

Signed-off-by: Yury-Fridlyand <[email protected]>

* CLIPPY I HATE YOU

Signed-off-by: Yury-Fridlyand <[email protected]>

* Get and store callback.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rework configuration and add docs.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Config rework.

Signed-off-by: Yury-Fridlyand <[email protected]>

* docs

Signed-off-by: Yury-Fridlyand <[email protected]>

* Receive pushes (subscibed messages).

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* I HATE YOU SPOTLESS

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rename a class.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
@Yury-Fridlyand Yury-Fridlyand added the java issues and fixes related to the java client label Jun 26, 2024
@Yury-Fridlyand Yury-Fridlyand requested a review from a team as a code owner June 26, 2024 06:47
@Yury-Fridlyand Yury-Fridlyand marked this pull request as draft June 26, 2024 06:48
Copy link
Collaborator

@ikolomi ikolomi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reviewed and posted comments from the beginning to java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java

java/client/src/main/java/glide/api/BaseClient.java Outdated Show resolved Hide resolved
+ " callback.");
}
// TODO
throw new NotImplementedException("oh no");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not implemented and won't be in the foreseeable future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a more meaningful error message here, along with the TODO link to documentation (perhaps explaining the limitation?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, WIP

Copy link
Collaborator Author

@Yury-Fridlyand Yury-Fridlyand Jun 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 5a11ff4

? new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response)
: new BaseCommandResponseResolver(RedisValueResolver::valueFromPointerBinary)
.apply(response);
encodingUtf8 ? responseResolver.apply(response) : binaryResponseResolver.apply(response);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how its applies to pubsub?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageHandler uses responseResolver to get the message from received push.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this answer your question @ikolomi ?

BaseClientConfiguration config,
BiFunction<ConnectionManager, CommandManager, T> constructor) {
protected static <T extends BaseClient> CompletableFuture<T> CreateClient(
@NonNull BaseClientConfiguration config, Supplier<T> constructor) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how it is related to the pubsub

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to rework it since BaseClient got more private fields to store.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this answer your question @ikolomi ?

}
return new MessageHandler(
config.getSubscriptionConfiguration().getCallback(),
config.getSubscriptionConfiguration().getContext(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a validation that context cannot be set w/o the callback and the subscriptions cannot be used with resp2, do we have it here?
https://github.com/aws/glide-for-redis/blob/e2a804c2f1d6355a797a60f28e95c2d16826a856/python/python/glide/config.py#L451

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java client does not support RESP2 yet. I put a todo.

This comment was marked as resolved.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ikolomi , does this cover your concern?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this takes in both callback and context in one method, the caller could supply null to either of these. I've added an explict check for this in #1773.

* @param message The message to publish.
* @return Command response - The number of clients that received the message.
*/
public ClusterTransaction spublish(@NonNull String channel, @NonNull String message) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sharded?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very sharded

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in #1773

java/client/src/main/java/glide/api/models/Message.java Outdated Show resolved Hide resolved
java/client/src/main/java/glide/api/models/Message.java Outdated Show resolved Hide resolved
java/client/src/main/java/glide/api/models/Message.java Outdated Show resolved Hide resolved
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
* Add some tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Test fixes.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add more tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Experiment

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add more tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* I HATE YOU SPOTLESS

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Uncomment test timeout.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
@Yury-Fridlyand Yury-Fridlyand changed the title [WIP] Java: Pub & Sub = <3 Java: Pub & Sub = <3 Jun 29, 2024
@Yury-Fridlyand Yury-Fridlyand marked this pull request as ready for review June 29, 2024 20:21
java/client/src/main/java/glide/api/BaseClient.java Outdated Show resolved Hide resolved
java/client/src/main/java/glide/api/BaseClient.java Outdated Show resolved Hide resolved
@@ -45,7 +46,12 @@ public void init() {
connectionManager = mock(ConnectionManager.class);
threadPoolResource = mock(ThreadPoolResource.class);

mockedClient.when(() -> buildChannelHandler(any())).thenReturn(channelHandler);
RedisClient client = new RedisClient();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This client needs to close() right? Need a try-with-resources?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all instances are mocks here, so I think we can omit that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RedisClient itself isn't a mock. Adding try-with-resources.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, the client is now mocked.

Copy link
Collaborator

@jonathanl-bq jonathanl-bq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signing off on the Rust changes with some minor comments.

java/src/lib.rs Show resolved Hide resolved
java/src/lib.rs Show resolved Hide resolved
Comment on lines +82 to +84
// Create a java `Map<String, Object>` with two keys:
// - "kind" which corresponds to the push type, stored as a `String`
// - "values" which corresponds to the array of values received, stored as `Object[]`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only have 2 keys, why not just return an array of 2 elements? Not a big deal, but I'm curious why we went with a Map.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, but I'm following python client.

Signed-off-by: Yury-Fridlyand <[email protected]>
@acarbonetto acarbonetto merged commit eb2201c into valkey-io:main Jul 1, 2024
17 checks passed
@acarbonetto acarbonetto deleted the java/integ_yuryf_pubsub branch July 1, 2024 03:29
@Yury-Fridlyand Yury-Fridlyand mentioned this pull request Jul 1, 2024
Copy link
Collaborator

@ikolomi ikolomi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The feature is not done until all the comments are resolved and explicitly approved by myself.
  2. Do reach out actively to me to in case of any questions arise.

}
return new MessageHandler(
config.getSubscriptionConfiguration().getCallback(),
config.getSubscriptionConfiguration().getContext(),

This comment was marked as resolved.

"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
throw new NotImplementedException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be implemented to be on a par with python implementation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see #1770

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this cover the ask @ikolomi ?

* @param message The message to publish.
* @return Command response - The number of clients that received the message.
*/
public ClusterTransaction spublish(@NonNull String channel, @NonNull String message) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. implement getPubSubMessage() flavors
  2. implement the missing tests per reference ( https://github.com/aws/glide-for-redis/blob/main/python/python/tests/test_pubsub.py) :

test_pubsub_exact_happy_path_coexistence
test_pubsub_exact_happy_path_many_channels_co_existence
test_sharded_pubsub_co_existence
test_pubsub_pattern_co_existence
test_pubsub_pattern_many_channels
test_pubsub_combined_exact_and_pattern_one_client
test_pubsub_combined_exact_and_pattern_multiple_clients
test_pubsub_combined_exact_pattern_and_sharded_multi_client
test_pubsub_combined_different_channels_with_same_name
test_pubsub_two_publishing_clients_same_name
test_pubsub_exact_max_size_message
test_pubsub_sharded_max_size_message
test_pubsub_exact_max_size_message_callback
test_pubsub_sharded_max_size_message_callback
test_pubsub_resp2_raise_an_error
test_pubsub_context_with_no_callback_raise_error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. All "coexistence" tests depend on getPubSubMessage and wait for it.
  2. All "max_size_message" tests disabled on python side due to a bug and there is no reason to implement them now until bug fixed.
  3. RESP2 is not supported by java client - no test there.
  4. All rest are implemented.

public CompletableFuture<String> publish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
Publish,
new String[] {channel, message},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we still using Strings?

@@ -780,6 +782,18 @@ public CompletableFuture<String> randomKey() {
RandomKey, new String[0], this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<String> spublish(@NonNull String channel, @NonNull String message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a GlideString variant for this function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eifrah-aws , I've removed spublish and made two overloads of publish(). They both use ArgType now. Please see: #1773.

* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> spublish(String channel, String message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlideString variant

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #1773

* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> publish(String channel, String message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlideString variant

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #1773

cyip10 pushed a commit to Bit-Quill/valkey-glide that referenced this pull request Jul 16, 2024
* Java: Add client configuration for subscribing to channels. (#381)

* Add client configuartion for subscribing to channels.

Signed-off-by: Yury-Fridlyand <[email protected]>

* CLIPPY I HATE YOU

Signed-off-by: Yury-Fridlyand <[email protected]>

* Get and store callback.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rework configuration and add docs.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Config rework.

Signed-off-by: Yury-Fridlyand <[email protected]>

* docs

Signed-off-by: Yury-Fridlyand <[email protected]>

* More TODOs for the god of TODOs.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add `PUBLISH` and `SPUBLISH` commands. (#391)

* Add `PUBLISH` and `SPUBLISH` commands.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix the test.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>

* Java client: receive pubsub messages (#385)

* Add client configuartion for subscribing to channels.

Signed-off-by: Yury-Fridlyand <[email protected]>

* CLIPPY I HATE YOU

Signed-off-by: Yury-Fridlyand <[email protected]>

* Get and store callback.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rework configuration and add docs.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Config rework.

Signed-off-by: Yury-Fridlyand <[email protected]>

* docs

Signed-off-by: Yury-Fridlyand <[email protected]>

* Receive pushes (subscibed messages).

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* I HATE YOU SPOTLESS

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rename a class.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Java: add IT for pubsub (#400)

* Add some tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Test fixes.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add more tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Experiment

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add more tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* I HATE YOU SPOTLESS

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Uncomment test timeout.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>

* Update function signature.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
java issues and fixes related to the java client
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

8 participants