Skip to content

Commit

Permalink
Java: API layer with RedisClient (valkey-io#737)
Browse files Browse the repository at this point in the history
* Java: API layer with RedisClient to create new client (#46)

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Andrew Carbonetto <[email protected]>
Co-authored-by: Yury-Fridlyand <[email protected]>
Co-authored-by: SanHalacogluImproving <[email protected]>
  • Loading branch information
3 people authored Jan 11, 2024
1 parent 9d6010d commit 2f901b0
Show file tree
Hide file tree
Showing 18 changed files with 816 additions and 30 deletions.
1 change: 1 addition & 0 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testAnnotationProcessor 'org.projectlombok:lombok:1.18.30'

// junit
testImplementation group: 'org.mockito', name: 'mockito-inline', version: '3.12.4'
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}
Expand Down
33 changes: 33 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package glide.api;

import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;

/** Base Client class for Redis */
@AllArgsConstructor
public abstract class BaseClient implements AutoCloseable {

protected ConnectionManager connectionManager;
protected CommandManager commandManager;

/**
* Closes this resource, relinquishing any underlying resources. This method is invoked
* automatically on objects managed by the try-with-resources statement.
*
* <p>see: <a
* href="https://docs.oracle.com/javase/8/docs/api/java/lang/AutoCloseable.html#close--">AutoCloseable::close()</a>
*/
@Override
public void close() throws ExecutionException {
try {
connectionManager.closeConnection().get();
} catch (InterruptedException interruptedException) {
// AutoCloseable classes are strongly advised to avoid throwing InterruptedExceptions
// TODO: marking resources as closed:
// https://github.com/orgs/Bit-Quill/projects/4/views/6?pane=issue&itemId=48063887
throw new RuntimeException(interruptedException);
}
}
}
50 changes: 50 additions & 0 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package glide.api;

import static glide.ffi.resolvers.SocketListenerResolver.getSocket;

import glide.api.models.configuration.RedisClientConfiguration;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Standalone mode. Use {@link
* #CreateClient(RedisClientConfiguration)} to request a client to Redis.
*/
public class RedisClient extends BaseClient {

/**
* Request an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
}

protected static ChannelHandler buildChannelHandler() {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket());
}

protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) {
return new ConnectionManager(channelHandler);
}

protected static CommandManager buildCommandManager(ChannelHandler channelHandler) {
return new CommandManager(channelHandler);
}

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package glide.api.models.configuration;

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;

/**
* Represents the strategy used to determine how and when to reconnect, in case of connection
* failures. The time between attempts grows exponentially, to the formula <code>rand(0 ... factor *
* (exponentBase ^ N))</code>, where <code>N</code> is the number of failed attempts.
*
* <p>Once the maximum value is reached, that will remain the time between retry attempts until a
* reconnect attempt is successful. The client will attempt to reconnect indefinitely.
*/
@Getter
@Builder
public class BackoffStrategy {
/**
* Number of retry attempts that the client should perform when disconnected from the server,
* where the time between retries increases. Once the retries have reached the maximum value, the
* time between retries will remain constant until a reconnect attempt is successful.
*/
@NonNull private final Integer numOfRetries;

/** The multiplier that will be applied to the waiting time between each retry. */
@NonNull private final Integer factor;

/** The exponent base configured for the strategy. */
@NonNull private final Integer exponentBase;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package glide.api.models.configuration;

import java.util.List;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Singular;
import lombok.experimental.SuperBuilder;

/**
* Configuration settings class for creating a Redis Client. Shared settings for standalone and
* cluster clients.
*/
@Getter
@SuperBuilder
public abstract class BaseClientConfiguration {
/**
* DNS Addresses and ports of known nodes in the cluster. If the server is in cluster mode the
* list can be partial, as the client will attempt to map out the cluster and find all nodes. If
* the server is in standalone mode, only nodes whose addresses were provided will be used by the
* client. For example: <code>[ {address:sample-address-0001.use1.cache.amazonaws.com, port:6379},
* {address: sample-address-0002.use2.cache.amazonaws.com, port:6379} ]</code>. If none are set, a
* default address localhost:6379 will be used.
*/
@Singular private final List<NodeAddress> addresses;

/**
* True if communication with the cluster should use Transport Level Security.
*
* <p>If the server/cluster requires TLS, not setting this will cause the connection attempt to
* fail.
*
* <p>If the server/cluster doesn't require TLS, setting this will also cause the connection
* attempt to fail.
*/
@Builder.Default private final boolean useTLS = false;

/** Represents the client's read from strategy. */
@NonNull @Builder.Default private final ReadFrom readFrom = ReadFrom.PRIMARY;

/**
* Credentials for authentication process. If none are set, the client will not authenticate
* itself with the server.
*/
private final RedisCredentials credentials;

/**
* The duration in milliseconds that the client should wait for a request to complete. This
* duration encompasses sending the request, awaiting for a response from the server, and any
* required reconnections or retries. If the specified timeout is exceeded for a pending request,
* it will result in a timeout error. If not set, a default value will be used.
*/
private final Integer requestTimeout;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package glide.api.models.configuration;

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;

/** Represents the address and port of a node in the cluster. */
@Getter
@Builder
public class NodeAddress {
public static String DEFAULT_HOST = "localhost";
public static Integer DEFAULT_PORT = 6379;

@NonNull @Builder.Default private final String host = DEFAULT_HOST;
@NonNull @Builder.Default private final Integer port = DEFAULT_PORT;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package glide.api.models.configuration;

/** Represents the client's read from strategy. */
public enum ReadFrom {
/** Always get from primary, in order to get the freshest data. */
PRIMARY,
/**
* Spread the requests between all replicas in a round-robin manner. If no replica is available,
* route the requests to the primary.
*/
PREFER_REPLICA
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package glide.api.models.configuration;

import lombok.Getter;
import lombok.experimental.SuperBuilder;

/** Represents the configuration settings for a Standalone Redis client. */
@Getter
@SuperBuilder
public class RedisClientConfiguration extends BaseClientConfiguration {
/** Strategy used to determine how and when to reconnect, in case of connection failures. */
private final BackoffStrategy reconnectStrategy;

/** Index of the logical database to connect to. */
private final Integer databaseId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package glide.api.models.configuration;

import lombok.experimental.SuperBuilder;

/**
* Represents the configuration settings for a Cluster Redis client. Notes: Currently, the
* reconnection strategy in cluster mode is not configurable, and exponential backoff with fixed
* values is used.
*/
@SuperBuilder
public class RedisClusterClientConfiguration extends BaseClientConfiguration {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package glide.api.models.configuration;

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;

/** Represents the credentials for connecting to a Redis server. */
@Getter
@Builder
public class RedisCredentials {
/** The password that will be used for authenticating connections to the Redis servers. */
@NonNull private final String password;

/**
* The username that will be used for authenticating connections to the Redis servers. If not
* supplied, "default" will be used.
*/
private final String username;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
import glide.connectors.resources.ThreadPoolAllocator;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.channel.unix.UnixChannel;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import redis_request.RedisRequestOuterClass.RedisRequest;
import response.ResponseOuterClass.Response;

/**
* Class responsible for handling calls to/from a netty.io {@link Channel}.<br>
* Uses a {@link CallbackDispatcher} to record callbacks of every request sent.
* Class responsible for handling calls to/from a netty.io {@link Channel}. Uses a {@link
* CallbackDispatcher} to record callbacks of every request sent.
*/
public class ChannelHandler {

Expand All @@ -24,15 +28,35 @@ public class ChannelHandler {

/** Open a new channel for a new client. */
public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) {
// TODO: add the ability to pass in group and channel from user
this(
ThreadPoolAllocator.createOrGetNettyThreadPool(THREAD_POOL_NAME, Optional.empty()),
Platform.getClientUdsNettyChannelType(),
new ProtobufSocketChannelInitializer(callbackDispatcher),
new DomainSocketAddress(socketPath),
callbackDispatcher);
}

/**
* Open a new channel for a new client and running it on the provided EventLoopGroup
*
* @param eventLoopGroup - ELG to run handler on
* @param domainSocketChannelClass - socket channel class for Handler
* @param channelInitializer - UnixChannel initializer
* @param domainSocketAddress - address to connect
* @param callbackDispatcher - dispatcher to handle callbacks
*/
public ChannelHandler(
EventLoopGroup eventLoopGroup,
Class<? extends DomainSocketChannel> domainSocketChannelClass,
ChannelInitializer<UnixChannel> channelInitializer,
DomainSocketAddress domainSocketAddress,
CallbackDispatcher callbackDispatcher) {
channel =
new Bootstrap()
// TODO let user specify the thread pool or pool size as an option
.group(
ThreadPoolAllocator.createOrGetNettyThreadPool(THREAD_POOL_NAME, Optional.empty()))
.channel(Platform.getClientUdsNettyChannelType())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
.group(eventLoopGroup)
.channel(domainSocketChannelClass)
.handler(channelInitializer)
.connect(domainSocketAddress)
// TODO call here .sync() if needed or remove this comment
.channel();
this.callbackDispatcher = callbackDispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ public class ReadHandler extends ChannelInboundHandlerAdapter {

/** Submit responses from glide to an instance {@link CallbackDispatcher} to handle them. */
@Override
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) {
callbackDispatcher.completeRequest((Response) msg);
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
throws RuntimeException {
if (msg instanceof Response) {
Response response = (Response) msg;
callbackDispatcher.completeRequest(response);
ctx.fireChannelRead(msg);
return;
}
throw new RuntimeException("Unexpected message in socket");
}

/** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import response.ResponseOuterClass.Response;

public class RedisValueResolver {

// TODO: consider lazy loading the glide_rs library
static {
System.loadLibrary("glide_rs");
}

/**
* Resolve a value received from Redis using given C-style pointer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public class SocketListenerResolver {
/** Make an FFI call to Glide to open a UDS socket to connect to. */
private static native String startSocketListener() throws Exception;

// TODO: consider lazy loading the glide_rs library
static {
System.loadLibrary("glide_rs");
}
Expand Down
Loading

0 comments on commit 2f901b0

Please sign in to comment.