diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index 74b828d696..0357de48c0 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -42,15 +42,14 @@ jobs: distribution: "temurin" java-version: ${{ matrix.java }} - - name: Install and run protoc (protobuf) + - name: Install protoc (protobuf) run: | sudo apt update sudo apt install -y protobuf-compiler - mkdir -p java/client/src/main/java/org/babushka/javababushka/generated - protoc -Iprotobuf=babushka-core/src/protobuf/ --java_out=java/client/src/main/java/org/babushka/javababushka/generated babushka-core/src/protobuf/*.proto + - name: Build rust part working-directory: java - run: cargo build + run: cargo build --release - name: Start Redis run: docker run -p 6379:6379 -p 8001:8001 -d redis/redis-stack diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index d13870c92e..0000000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,42 +0,0 @@ -{ - "version": "0.2.0", - "configurations": [ - { - // Use IntelliSense to find out which attributes exist for C# debugging - // Use hover for the description of the existing attributes - // For further information visit https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md - "name": ".NET Core Launch (console)", - "type": "coreclr", - "request": "launch", - "preLaunchTask": "build", - // If you have changed target frameworks, make sure to update the program path. - "program": "${workspaceFolder}/csharp/tests/bin/Debug/net6.0/tests.dll", - "args": [], - "cwd": "${workspaceFolder}/csharp/tests", - // For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console - "console": "internalConsole", - "stopAtEntry": false - }, - { - "name": ".NET Core Attach", - "type": "coreclr", - "request": "attach" - }, - { - "name": "C# benchmark Launch (console)", - "type": "coreclr", - "request": "launch", - "preLaunchTask": "build", - "program": "${workspaceFolder}/benchmarks/csharp/bin/Debug/net6.0/csharp_benchmark.dll", - "args": [], - "cwd": "${workspaceFolder}/benchmarks/csharp", - "console": "internalConsole", - "stopAtEntry": true - }, - { - "name": ".NET Core Attach", - "type": "coreclr", - "request": "attach" - } - ] -} diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 70ac6dd315..0000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "editor.insertSpaces": true, - "editor.tabSize": 4, - "editor.codeActionsOnSave": { - "source.organizeImports": true - }, - "editor.formatOnSave": true, - "files.insertFinalNewline": true, - "files.trimFinalNewlines": true, - "python.formatting.provider": "black", - "rust-analyzer.linkedProjects": [ - "babushka-core/Cargo.toml", - "python/Cargo.toml", - "node/rust-client/Cargo.toml", - "logger_core/Cargo.toml", - "csharp/lib/Cargo.toml", - "submodules/redis-rs/Cargo.toml", - "benchmarks/rust/Cargo.toml" - ], - "rust-analyzer.runnableEnv": { - "REDISRS_SERVER_TYPE": "tcp" - }, - "python.testing.pytestArgs": ["python"], - "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true, - "[yaml]": { - "editor.tabSize": 4 - }, - "python.linting.flake8Enabled": false, - "python.linting.enabled": true, - "python.linting.flake8Args": [ - "--extend-ignore=E203", - "--max-line-length=127" - ], - "python.formatting.blackArgs": ["--target-version", "py36"], - "isort.args": ["--profile", "black"] -} diff --git a/.vscode/tasks.json b/.vscode/tasks.json deleted file mode 100644 index f5f11a93a9..0000000000 --- a/.vscode/tasks.json +++ /dev/null @@ -1,41 +0,0 @@ -{ - "version": "2.0.0", - "tasks": [ - { - "label": "build", - "command": "dotnet", - "type": "process", - "args": [ - "build", - "${workspaceFolder}/csharp/tests/tests.csproj", - "/property:GenerateFullPaths=true", - "/consoleloggerparameters:NoSummary" - ], - "problemMatcher": "$msCompile" - }, - { - "label": "publish", - "command": "dotnet", - "type": "process", - "args": [ - "publish", - "${workspaceFolder}/csharp/tests/tests.csproj", - "/property:GenerateFullPaths=true", - "/consoleloggerparameters:NoSummary" - ], - "problemMatcher": "$msCompile" - }, - { - "label": "watch", - "command": "dotnet", - "type": "process", - "args": [ - "watch", - "run", - "--project", - "${workspaceFolder}/csharp/tests/tests.csproj" - ], - "problemMatcher": "$msCompile" - } - ] -} \ No newline at end of file diff --git a/benchmarks/csharp/Program.cs b/benchmarks/csharp/Program.cs index f37c4baf74..6144c5244e 100644 --- a/benchmarks/csharp/Program.cs +++ b/benchmarks/csharp/Program.cs @@ -226,7 +226,7 @@ int num_of_concurrent_tasks {"num_of_tasks", num_of_concurrent_tasks}, {"data_size", data_size}, {"tps", tps}, - {"clientCount", clients.Length}, + {"client_count", clients.Length}, {"is_cluster", "false"} }; result = result diff --git a/benchmarks/install_and_test.sh b/benchmarks/install_and_test.sh index 2a7853a412..de70794752 100755 --- a/benchmarks/install_and_test.sh +++ b/benchmarks/install_and_test.sh @@ -24,13 +24,16 @@ runAllBenchmarks=1 runPython=0 runNode=0 runCsharp=0 +runJava=0 runRust=0 concurrentTasks="1 10 100 1000" dataSize="100 4000" clientCount="1" chosenClients="all" host="localhost" +port=6379 tlsFlag="--tls" +javaTlsFlag="-tls" function runPythonBenchmark(){ # generate protobuf files @@ -68,6 +71,12 @@ function runCSharpBenchmark(){ dotnet run --configuration Release --resultsFile=../$1 --dataSize $2 --concurrentTasks $concurrentTasks --clients $chosenClients --host $host --clientCount $clientCount $tlsFlag $portFlag } +function runJavaBenchmark(){ + cd ${BENCH_FOLDER}/../java + echo "./gradlew run --args=\"-resultsFile ${BENCH_FOLDER}/$1 -dataSize \"$2\" -concurrentTasks \"$concurrentTasks\" -clientCount \"$clientCount\" -clients $chosenClients -host $host $javaPortFlag $javaTlsFlag $javaClusterFlag\"" + ./gradlew run --args="-resultsFile \"${BENCH_FOLDER}/$1\" -dataSize \"$2\" -concurrentTasks \"$concurrentTasks\" -clients \"$chosenClients\" -host $host $javaPortFlag -clientCount \"$clientCount\" $javaTlsFlag $javaClusterFlag" +} + function runRustBenchmark(){ rustConcurrentTasks= for value in $concurrentTasks @@ -109,7 +118,7 @@ function resultFileName() { function Help() { echo Running the script without any arguments runs all benchmarks. - echo Pass -node, -csharp, -python as arguments in order to run the node, csharp, or python benchmarks accordingly. + echo Pass -node, -csharp, -python, -java as arguments in order to run the node, csharp, python, or java benchmarks accordingly. echo Multiple such flags can be passed. echo Pass -no-csv to skip analysis of the results. echo @@ -185,6 +194,21 @@ do runAllBenchmarks=0 runNode=1 ;; + -java) + runAllBenchmarks=0 + runJava=1 + chosenClients="Babushka" + ;; + -lettuce) + runAllBenchmarks=0 + runJava=1 + chosenClients="lettuce_async" + ;; + -jedis) + runAllBenchmarks=0 + runJava=1 + chosenClients="Jedis" + ;; -csharp) runAllBenchmarks=0 runCsharp=1 @@ -205,12 +229,15 @@ do -no-csv) writeResultsCSV=0 ;; -no-tls) tlsFlag= + javaTlsFlag= ;; -is-cluster) clusterFlag="--clusterModeEnabled" + javaClusterFlag="-clusterModeEnabled" ;; -port) portFlag="--port "$2 + javaPortFlag="-port "$2 shift ;; esac @@ -242,6 +269,13 @@ do runCSharpBenchmark $csharpResults $currentDataSize fi + if [ $runAllBenchmarks == 1 ] || [ $runJava == 1 ]; + then + javaResults=$(resultFileName java $currentDataSize) + resultFiles+=$javaResults" " + runJavaBenchmark $javaResults $currentDataSize + fi + if [ $runAllBenchmarks == 1 ] || [ $runRust == 1 ]; then rustResults=$(resultFileName rust $currentDataSize) @@ -250,8 +284,6 @@ do fi done - - flushDB if [ $writeResultsCSV == 1 ]; diff --git a/benchmarks/node/node_benchmark.ts b/benchmarks/node/node_benchmark.ts index c2a493b517..5921f9cf0e 100644 --- a/benchmarks/node/node_benchmark.ts +++ b/benchmarks/node/node_benchmark.ts @@ -167,7 +167,7 @@ async function run_clients( num_of_tasks: num_of_concurrent_tasks, data_size, tps, - clientCount: clients.length, + client_count: clients.length, is_cluster, ...set_latency_results, ...get_existing_latency_results, diff --git a/benchmarks/python/python_benchmark.py b/benchmarks/python/python_benchmark.py index c70d7af7d8..b189b689c2 100644 --- a/benchmarks/python/python_benchmark.py +++ b/benchmarks/python/python_benchmark.py @@ -226,7 +226,7 @@ async def run_clients( "num_of_tasks": num_of_concurrent_tasks, "data_size": data_size, "tps": tps, - "clientCount": len(clients), + "client_count": len(clients), "is_cluster": is_cluster, }, **get_existing_latency_results, diff --git a/benchmarks/rust/src/main.rs b/benchmarks/rust/src/main.rs index 08910deafc..a52d3c8f8a 100644 --- a/benchmarks/rust/src/main.rs +++ b/benchmarks/rust/src/main.rs @@ -130,7 +130,7 @@ async fn perform_benchmark(args: Args) { Value::Number((number_of_operations as i64 * 1000 / stopwatch.elapsed_ms()).into()), ); results_json.insert( - "clientCount".to_string(), + "client_count".to_string(), Value::Number(args.client_count.into()), ); results_json.insert( diff --git a/benchmarks/utilities/csv_exporter.py b/benchmarks/utilities/csv_exporter.py old mode 100644 new mode 100755 index a9f1131729..753df2e5ab --- a/benchmarks/utilities/csv_exporter.py +++ b/benchmarks/utilities/csv_exporter.py @@ -1,3 +1,5 @@ +#!/bin/python3 + import csv import json import os @@ -12,7 +14,7 @@ "is_cluster", "num_of_tasks", "data_size", - "clientCount", + "client_count", "tps", "get_non_existing_p50_latency", "get_non_existing_p90_latency", @@ -39,7 +41,7 @@ json_file_name = os.path.basename(json_file_full_path) - languages = ["csharp", "node", "python", "rust"] + languages = ["csharp", "node", "python", "rust", "java"] language = next( (language for language in languages if language in json_file_name), None ) diff --git a/java/.cargo/config.toml b/java/.cargo/config.toml index 6b50eaf7c2..12fc36fc94 100644 --- a/java/.cargo/config.toml +++ b/java/.cargo/config.toml @@ -1,4 +1,3 @@ [env] BABUSHKA_NAME = { value = "javababushka", force = true } BABUSHKA_VERSION = "0.1.0" - diff --git a/java/Cargo.toml b/java/Cargo.toml index 3313f92419..4e9ac83ef1 100644 --- a/java/Cargo.toml +++ b/java/Cargo.toml @@ -1,4 +1,3 @@ - [package] name = "javababushka" version = "0.1.0" @@ -17,6 +16,8 @@ babushka = { path = "../babushka-core" } tokio = { version = "^1", features = ["rt", "macros", "rt-multi-thread", "time"] } logger_core = {path = "../logger_core"} tracing-subscriber = "0.3.16" +jni = "0.21.1" +log = "0.4.20" [profile.release] lto = true diff --git a/java/README.md b/java/README.md index f7c3216c2f..6fb456a9b4 100644 --- a/java/README.md +++ b/java/README.md @@ -30,7 +30,7 @@ You can run benchmarks using `./gradlew run`. You can set arguments using the ar ```shell ./gradlew run --args="-help" -./gradlew run --args="-resultsFile=output.csv -dataSize \"100 1000\" -concurrentTasks \"10 100\" -clients all -host localhost -port 6279 -clientCount \"1 5\" -tls" +./gradlew run --args="-resultsFile=output.json -dataSize \"100 1000\" -concurrentTasks \"10 100\" -clients all -host localhost -port 6279 -clientCount \"1 5\" -tls" ``` The following arguments are accepted: @@ -44,7 +44,6 @@ The following arguments are accepted: ### Troubleshooting -* Connection Timeout: +* Connection Timeout: * If you're unable to connect to redis, check that you are connecting to the correct host, port, and TLS configuration. * Only server-side certificates are supported by the TLS configured redis. - diff --git a/java/benchmarks/build.gradle b/java/benchmarks/build.gradle index 8d9e500284..6ade7532b0 100644 --- a/java/benchmarks/build.gradle +++ b/java/benchmarks/build.gradle @@ -1,6 +1,7 @@ plugins { // Apply the application plugin to add support for building a CLI application in Java. id 'application' + id 'io.freefair.lombok' } repositories { @@ -9,6 +10,8 @@ repositories { } dependencies { + implementation project(':client') + // Use JUnit test framework. testImplementation 'org.junit.jupiter:junit-jupiter:5.9.2' @@ -18,6 +21,10 @@ dependencies { implementation 'io.lettuce:lettuce-core:6.2.6.RELEASE' implementation 'commons-cli:commons-cli:1.5.0' implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.13.0' + implementation group: 'com.google.code.gson', name: 'gson', version: '2.10.1' + + compileOnly 'org.projectlombok:lombok:1.18.30' + annotationProcessor 'org.projectlombok:lombok:1.18.30' } // Apply a specific Java toolchain to ease working on different environments. @@ -30,12 +37,14 @@ java { application { // Define the main class for the application. mainClass = 'javababushka.benchmarks.BenchmarkingApp' + applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" } -tasks.withType(Test) { - testLogging { - exceptionFormat "full" - events "started", "skipped", "passed", "failed" - showStandardStreams true - } +tasks.withType(Test) { + testLogging { + exceptionFormat "full" + events "started", "skipped", "passed", "failed" + showStandardStreams true + } + jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java index ae87c269f7..b401488584 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java @@ -1,10 +1,16 @@ package javababushka.benchmarks; -import java.io.FileWriter; -import java.io.IOException; +import static javababushka.benchmarks.utils.Benchmarking.testClientSetGet; + import java.util.Arrays; import java.util.Optional; import java.util.stream.Stream; +import javababushka.benchmarks.clients.babushka.JniNettyClient; +import javababushka.benchmarks.clients.jedis.JedisClient; +import javababushka.benchmarks.clients.jedis.JedisPseudoAsyncClient; +import javababushka.benchmarks.clients.lettuce.LettuceAsyncClient; +import javababushka.benchmarks.clients.lettuce.LettuceAsyncClusterClient; +import javababushka.benchmarks.clients.lettuce.LettuceClient; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -44,33 +50,30 @@ public static void main(String[] args) { switch (client) { case JEDIS: // run testClientSetGet on JEDIS sync client - System.out.println("Run JEDIS sync client"); + testClientSetGet(JedisClient::new, runConfiguration, false); break; case JEDIS_ASYNC: // run testClientSetGet on JEDIS pseudo-async client - System.out.println("Run JEDIS pseudo-async client"); + testClientSetGet(JedisPseudoAsyncClient::new, runConfiguration, true); break; case LETTUCE: - // run testClientSetGet on LETTUCE sync client - System.out.println("Run LETTUCE sync client"); + testClientSetGet(LettuceClient::new, runConfiguration, false); break; case LETTUCE_ASYNC: - // run testClientSetGet on LETTUCE async client - System.out.println("Run LETTUCE async client"); + if (runConfiguration.clusterModeEnabled) { + testClientSetGet(LettuceAsyncClusterClient::new, runConfiguration, true); + } else { + testClientSetGet(LettuceAsyncClient::new, runConfiguration, true); + } + break; + case BABUSHKA: + testClientSetGet(() -> new JniNettyClient(false), runConfiguration, false); break; case BABUSHKA_ASYNC: - System.out.println("Babushka async not yet configured"); + testClientSetGet(() -> new JniNettyClient(true), runConfiguration, true); break; } } - - if (runConfiguration.resultsFile.isPresent()) { - try { - runConfiguration.resultsFile.get().close(); - } catch (IOException ioException) { - System.out.println("Error closing results file: " + ioException.getLocalizedMessage()); - } - } } private static Options getOptions() { @@ -96,15 +99,21 @@ private static Options getOptions() { Option.builder("clients") .hasArg(true) .desc( - "one of:" - + " all|jedis|jedis_async|lettuce|lettuce_async|babushka_async|all_async|all_sync" - + " [all]") + "one of: all|jedis|jedis_async|lettuce|lettuce_async|" + + "babushka|babushka_async|all_async|all_sync") .build()); options.addOption(Option.builder("host").hasArg(true).desc("Hostname [localhost]").build()); options.addOption(Option.builder("port").hasArg(true).desc("Port number [6379]").build()); options.addOption( Option.builder("clientCount").hasArg(true).desc("Number of clients to run [1]").build()); options.addOption(Option.builder("tls").hasArg(false).desc("TLS [false]").build()); + options.addOption( + Option.builder("clusterModeEnabled") + .hasArg(false) + .desc("Is cluster-mode enabled, other standalone mode is used [false]") + .build()); + options.addOption( + Option.builder("debugLogging").hasArg(false).desc("Verbose logs [false]").build()); return options; } @@ -123,16 +132,11 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce } if (line.hasOption("resultsFile")) { - String resultsFileName = line.getOptionValue("resultsFile"); - try { - runConfiguration.resultsFile = Optional.of(new FileWriter(resultsFileName)); - } catch (IOException ioException) { - throw new ParseException( - "Unable to write to resultsFile (" - + resultsFileName - + "): " - + ioException.getMessage()); - } + runConfiguration.resultsFile = Optional.ofNullable(line.getOptionValue("resultsFile")); + } + + if (line.hasOption("dataSize")) { + runConfiguration.dataSize = parseIntListOption(line.getOptionValue("dataSize")); } if (line.hasOption("concurrentTasks")) { @@ -151,19 +155,17 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce return Stream.of( ClientName.JEDIS, ClientName.JEDIS_ASYNC, - // ClientName.BABUSHKA_ASYNC, + ClientName.BABUSHKA, + ClientName.BABUSHKA_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC); case ALL_ASYNC: return Stream.of( ClientName.JEDIS_ASYNC, - // ClientName.BABUSHKA_ASYNC, + ClientName.BABUSHKA_ASYNC, ClientName.LETTUCE_ASYNC); case ALL_SYNC: - return Stream.of( - ClientName.JEDIS, - // ClientName.BABUSHKA_ASYNC, - ClientName.LETTUCE); + return Stream.of(ClientName.JEDIS, ClientName.LETTUCE, ClientName.BABUSHKA); default: return Stream.of(e); } @@ -184,6 +186,8 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce } runConfiguration.tls = line.hasOption("tls"); + runConfiguration.clusterModeEnabled = line.hasOption("clusterModeEnabled"); + runConfiguration.debugLogging = line.hasOption("debugLogging"); return runConfiguration; } @@ -195,8 +199,12 @@ private static int[] parseIntListOption(String line) throws ParseException { if (lineValue.startsWith("[") && lineValue.endsWith("]")) { lineValue = lineValue.substring(1, lineValue.length() - 1); } + + // trim whitespace + lineValue = lineValue.trim(); + // check if it's the correct format - if (!lineValue.matches("\\d+(\\s+\\d+)?")) { + if (!lineValue.matches("\\d+(\\s+\\d+)*")) { throw new ParseException("Invalid option: " + line); } // split the string into a list of integers @@ -209,6 +217,7 @@ public enum ClientName { LETTUCE("Lettuce"), LETTUCE_ASYNC("Lettuce async"), BABUSHKA_ASYNC("Babushka async"), + BABUSHKA("Babushka"), ALL("All"), ALL_SYNC("All sync"), ALL_ASYNC("All async"); @@ -231,7 +240,7 @@ public boolean isEqual(String other) { public static class RunConfiguration { public String configuration; - public Optional resultsFile; + public Optional resultsFile; public int[] dataSize; public int[] concurrentTasks; public ClientName[] clients; @@ -239,6 +248,7 @@ public static class RunConfiguration { public int port; public int[] clientCount; public boolean tls; + public boolean clusterModeEnabled; public boolean debugLogging = false; public RunConfiguration() { @@ -248,13 +258,15 @@ public RunConfiguration() { concurrentTasks = new int[] {100, 1000}; clients = new ClientName[] { - // ClientName.BABUSHKA_ASYNC, - ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC + // ClientName.LETTUCE, + // ClientName.LETTUCE_ASYNC, + ClientName.BABUSHKA_ASYNC, ClientName.BABUSHKA, }; host = "localhost"; port = 6379; - clientCount = new int[] {1}; + clientCount = new int[] {1, 2}; tls = false; + clusterModeEnabled = false; } } } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java new file mode 100644 index 0000000000..deae2ceee5 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java @@ -0,0 +1,29 @@ +package javababushka.benchmarks.clients; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javababushka.benchmarks.utils.ConnectionSettings; + +/** A Redis client with async capabilities */ +public interface AsyncClient extends Client { + + long DEFAULT_TIMEOUT_MILLISECOND = 1000; + + Future asyncConnectToRedis(ConnectionSettings connectionSettings); + + Future asyncSet(String key, String value); + + Future asyncGet(String key); + + default T waitForResult(Future future) { + return waitForResult(future, DEFAULT_TIMEOUT_MILLISECOND); + } + + default T waitForResult(Future future, long timeout) { + try { + return future.get(timeout, TimeUnit.MILLISECONDS); + } catch (Exception ignored) { + return null; + } + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java new file mode 100644 index 0000000000..d95f31f25d --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java @@ -0,0 +1,14 @@ +package javababushka.benchmarks.clients; + +import javababushka.benchmarks.utils.ConnectionSettings; + +/** A Redis client interface */ +public interface Client { + void connectToRedis(); + + void connectToRedis(ConnectionSettings connectionSettings); + + default void closeConnection() {} + + String getName(); +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java new file mode 100644 index 0000000000..603f91e936 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java @@ -0,0 +1,8 @@ +package javababushka.benchmarks.clients; + +/** A Redis client with sync capabilities */ +public interface SyncClient extends Client { + void set(String key, String value); + + String get(String key); +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java new file mode 100644 index 0000000000..a4fc37f4d9 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java @@ -0,0 +1,69 @@ +package javababushka.benchmarks.clients.babushka; + +import static response.ResponseOuterClass.Response; + +import java.util.concurrent.Future; +import javababushka.Client; +import javababushka.benchmarks.clients.AsyncClient; +import javababushka.benchmarks.clients.SyncClient; +import javababushka.benchmarks.utils.ConnectionSettings; + +public class JniNettyClient implements SyncClient, AsyncClient { + + private final Client testClient; + private String name = "JNI Netty"; + + public JniNettyClient(boolean async) { + name += async ? " async" : " sync"; + testClient = new Client(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void closeConnection() { + testClient.closeConnection(); + } + + @Override + public void connectToRedis() { + connectToRedis(new ConnectionSettings("localhost", 6379, false, false)); + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + waitForResult(asyncConnectToRedis(connectionSettings)); + } + + @Override + public Future asyncConnectToRedis(ConnectionSettings connectionSettings) { + return testClient.asyncConnectToRedis( + connectionSettings.host, + connectionSettings.port, + connectionSettings.useSsl, + connectionSettings.clusterMode); + } + + @Override + public Future asyncSet(String key, String value) { + return testClient.asyncSet(key, value); + } + + @Override + public Future asyncGet(String key) { + return testClient.asyncGet(key); + } + + @Override + public void set(String key, String value) { + testClient.set(key, value); + } + + @Override + public String get(String key) { + return testClient.get(key); + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisClient.java new file mode 100644 index 0000000000..f088d5ac07 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisClient.java @@ -0,0 +1,62 @@ +package javababushka.benchmarks.clients.jedis; + +import javababushka.benchmarks.clients.SyncClient; +import javababushka.benchmarks.utils.ConnectionSettings; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; + +/** A Jedis client with sync capabilities. See: https://github.com/redis/jedis */ +public class JedisClient implements SyncClient { + + public static final String DEFAULT_HOST = "localhost"; + public static final int DEFAULT_PORT = 6379; + + protected Jedis jedisResource; + + @Override + public void connectToRedis() { + JedisPool pool = new JedisPool(DEFAULT_HOST, DEFAULT_PORT); + jedisResource = pool.getResource(); + } + + @Override + public void closeConnection() { + try { + jedisResource.close(); + } catch (Exception ignored) { + } + } + + @Override + public String getName() { + return "Jedis"; + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + jedisResource = + new Jedis(connectionSettings.host, connectionSettings.port, connectionSettings.useSsl); + jedisResource.connect(); + if (!jedisResource.isConnected()) { + throw new RuntimeException("failed to connect to jedis"); + } + } + + public String info() { + return jedisResource.info(); + } + + public String info(String section) { + return jedisResource.info(section); + } + + @Override + public void set(String key, String value) { + jedisResource.set(key, value); + } + + @Override + public String get(String key) { + return jedisResource.get(key); + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java new file mode 100644 index 0000000000..3970826a33 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java @@ -0,0 +1,34 @@ +package javababushka.benchmarks.clients.jedis; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import javababushka.benchmarks.clients.AsyncClient; +import javababushka.benchmarks.utils.ConnectionSettings; + +/** + * A Jedis client with pseudo-async capabilities. Jedis doesn't provide async API + * https://github.com/redis/jedis/issues/241 + * + *

See: https://github.com/redis/jedis + */ +public class JedisPseudoAsyncClient extends JedisClient implements AsyncClient { + @Override + public Future asyncConnectToRedis(ConnectionSettings connectionSettings) { + return CompletableFuture.runAsync(() -> super.connectToRedis(connectionSettings)); + } + + @Override + public Future asyncSet(String key, String value) { + return CompletableFuture.runAsync(() -> super.set(key, value)); + } + + @Override + public Future asyncGet(String key) { + return CompletableFuture.supplyAsync(() -> super.get(key)); + } + + @Override + public String getName() { + return "Jedis pseudo-async"; + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClient.java new file mode 100644 index 0000000000..ded154b3c5 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClient.java @@ -0,0 +1,74 @@ +package javababushka.benchmarks.clients.lettuce; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.codec.StringCodec; +import java.util.concurrent.Future; +import javababushka.benchmarks.clients.AsyncClient; +import javababushka.benchmarks.utils.ConnectionSettings; + +/** A Lettuce client with async capabilities see: https://lettuce.io/ */ +public class LettuceAsyncClient implements AsyncClient { + + private RedisClient client; + private RedisAsyncCommands asyncCommands; + private StatefulRedisConnection connection; + + @Override + public void connectToRedis() { + connectToRedis(new ConnectionSettings("localhost", 6379, false, false)); + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + client = + RedisClient.create( + String.format( + "%s://%s:%d", + connectionSettings.useSsl ? "rediss" : "redis", + connectionSettings.host, + connectionSettings.port)); + connection = client.connect(); + asyncCommands = connection.async(); + } + + @Override + public Future asyncConnectToRedis(ConnectionSettings connectionSettings) { + client = RedisClient.create(); + var asyncConnection = + client.connectAsync( + new StringCodec(), + RedisURI.create( + String.format( + "%s://%s:%d", + connectionSettings.useSsl ? "rediss" : "redis", + connectionSettings.host, + connectionSettings.port))); + asyncConnection.whenComplete((connection, exception) -> asyncCommands = connection.async()); + return asyncConnection.thenApply((connection) -> "OK"); + } + + @Override + public RedisFuture asyncSet(String key, String value) { + return asyncCommands.set(key, value); + } + + @Override + public RedisFuture asyncGet(String key) { + return asyncCommands.get(key); + } + + @Override + public void closeConnection() { + connection.close(); + client.shutdown(); + } + + @Override + public String getName() { + return "Lettuce Async"; + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java new file mode 100644 index 0000000000..6e5ed09ce3 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java @@ -0,0 +1,57 @@ +/* + * This Java source file was generated by the Gradle 'init' task. + */ +package javababushka.benchmarks.clients.lettuce; + +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; +import javababushka.benchmarks.utils.ConnectionSettings; + +public class LettuceAsyncClusterClient extends LettuceAsyncClient { + + private RedisClusterClient clusterClient; + private RedisAdvancedClusterAsyncCommands clusterAsyncCommands; + private StatefulRedisClusterConnection clusterConnection; + + @Override + public void connectToRedis() { + connectToRedis(new ConnectionSettings("localhost", 6379, false, true)); + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + RedisURI uri = + RedisURI.builder() + .withHost(connectionSettings.host) + .withPort(connectionSettings.port) + .withSsl(connectionSettings.useSsl) + .build(); + clusterClient = RedisClusterClient.create(uri); + clusterConnection = clusterClient.connect(); + clusterAsyncCommands = clusterConnection.async(); + } + + @Override + public RedisFuture asyncSet(String key, String value) { + return clusterAsyncCommands.set(key, value); + } + + @Override + public RedisFuture asyncGet(String key) { + return clusterAsyncCommands.get(key); + } + + @Override + public void closeConnection() { + clusterConnection.close(); + clusterClient.shutdown(); + } + + @Override + public String getName() { + return "Lettuce Cluster Async"; + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceClient.java new file mode 100644 index 0000000000..87d7bc9d2e --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceClient.java @@ -0,0 +1,54 @@ +package javababushka.benchmarks.clients.lettuce; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisStringCommands; +import javababushka.benchmarks.clients.SyncClient; +import javababushka.benchmarks.utils.ConnectionSettings; + +/** A Lettuce client with sync capabilities see: https://lettuce.io/ */ +public class LettuceClient implements SyncClient { + + RedisClient client; + RedisStringCommands syncCommands; + StatefulRedisConnection connection; + + @Override + public void connectToRedis() { + connectToRedis(new ConnectionSettings("localhost", 6379, false, false)); + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + client = + RedisClient.create( + String.format( + "%s://%s:%d", + connectionSettings.useSsl ? "rediss" : "redis", + connectionSettings.host, + connectionSettings.port)); + connection = client.connect(); + syncCommands = connection.sync(); + } + + @Override + public void set(String key, String value) { + syncCommands.set(key, value); + } + + @Override + public String get(String key) { + return (String) syncCommands.get(key); + } + + @Override + public void closeConnection() { + connection.close(); + client.shutdown(); + } + + @Override + public String getName() { + return "Lettuce"; + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java new file mode 100644 index 0000000000..4d826c427b --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -0,0 +1,302 @@ +package javababushka.benchmarks.utils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import javababushka.benchmarks.BenchmarkingApp; +import javababushka.benchmarks.clients.AsyncClient; +import javababushka.benchmarks.clients.Client; +import javababushka.benchmarks.clients.SyncClient; +import org.apache.commons.lang3.tuple.Pair; + +/** Class to calculate latency on client-actions */ +public class Benchmarking { + static final double PROB_GET = 0.8; + static final double PROB_GET_EXISTING_KEY = 0.8; + static final int SIZE_GET_KEYSPACE = 3750000; + static final int SIZE_SET_KEYSPACE = 3000000; + static final int ASYNC_OPERATION_TIMEOUT_SEC = 1; + static final double LATENCY_NORMALIZATION = 1000000.0; + static final int LATENCY_MIN = 100000; + static final int LATENCY_MAX = 10000000; + static final int LATENCY_MULTIPLIER = 10000; + static final double TPS_NORMALIZATION = 1000000000.0; // nano to seconds + // measurements are done in nano-seconds, but it should be converted to seconds later + static final double SECONDS_IN_NANO = 1e-9; + public static final double NANO_TO_SECONDS = 1e9; + + private static ChosenAction randomAction() { + if (Math.random() > PROB_GET) { + return ChosenAction.SET; + } + if (Math.random() > PROB_GET_EXISTING_KEY) { + return ChosenAction.GET_NON_EXISTING; + } + return ChosenAction.GET_EXISTING; + } + + public static String generateKeyGet() { + int range = SIZE_GET_KEYSPACE - SIZE_SET_KEYSPACE; + return Math.floor(Math.random() * range + SIZE_SET_KEYSPACE + 1) + ""; + } + + public static String generateKeySet() { + return (Math.floor(Math.random() * SIZE_SET_KEYSPACE) + 1) + ""; + } + + public interface Operation { + void go() throws Exception; + } + + public static Pair measurePerformance(Map actions) { + var action = randomAction(); + long before = System.nanoTime(); + try { + actions.get(action).go(); + } catch (Exception e) { + // timed out - exception from Future::get + return null; + } + long after = System.nanoTime(); + return Pair.of(action, after - before); + } + + // Assumption: latencies is sorted in ascending order + private static Long percentile(List latencies, int percentile) { + int N = latencies.size(); + double n = (N - 1) * percentile / 100. + 1; + if (n == 1d) return latencies.get(0); + else if (n == N) return latencies.get(N - 1); + int k = (int) n; + double d = n - k; + return Math.round(latencies.get(k - 1) + d * (latencies.get(k) - latencies.get(k - 1))); + } + + private static double stdDeviation(List latencies, Double avgLatency) { + double stdDeviation = + latencies.stream() + .mapToDouble(Long::doubleValue) + .reduce(0.0, (stdDev, latency) -> stdDev + Math.pow(latency - avgLatency, 2)); + return Math.sqrt(stdDeviation / latencies.size()); + } + + // This has the side-effect of sorting each latencies ArrayList + public static Map calculateResults( + Map> actionLatencies) { + Map results = new HashMap<>(); + + for (Map.Entry> entry : actionLatencies.entrySet()) { + ChosenAction action = entry.getKey(); + List latencies = entry.getValue(); + + if (latencies.size() == 0) { + results.put(action, new LatencyResults(0, 0, 0, 0, 0, 0)); + } else { + double avgLatency = + SECONDS_IN_NANO + * latencies.stream().mapToLong(Long::longValue).sum() + / latencies.size(); + + Collections.sort(latencies); + results.put( + action, + new LatencyResults( + avgLatency, + SECONDS_IN_NANO * percentile(latencies, 50), + SECONDS_IN_NANO * percentile(latencies, 90), + SECONDS_IN_NANO * percentile(latencies, 99), + SECONDS_IN_NANO * stdDeviation(latencies, avgLatency), + latencies.size())); + } + } + + return results; + } + + public static void printResults( + Map resultsMap, double duration, int iterations) { + System.out.printf("Runtime s: %f%n", duration); + System.out.printf("Iterations: %d%n", iterations); + System.out.printf("TPS: %f%n", iterations / duration); + int totalHits = 0; + for (Map.Entry entry : resultsMap.entrySet()) { + ChosenAction action = entry.getKey(); + LatencyResults results = entry.getValue(); + + System.out.printf("===> %s <===%n", action); + System.out.printf("avg. time ms: %f%n", results.avgLatency); + System.out.printf("std dev ms: %f%n", results.stdDeviation); + System.out.printf("p50 latency ms: %f%n", results.p50Latency); + System.out.printf("p90 latency ms: %f%n", results.p90Latency); + System.out.printf("p99 latency ms: %f%n", results.p99Latency); + System.out.printf("Total hits: %d%n", results.totalHits); + totalHits += results.totalHits; + } + System.out.println("Total hits: " + totalHits); + } + + public static void testClientSetGet( + Supplier clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) { + for (int concurrentNum : config.concurrentTasks) { + int iterations = Math.min(Math.max(100000, concurrentNum * 10000), 10000000); + for (int clientCount : config.clientCount) { + for (int dataSize : config.dataSize) { + // create clients + List clients = new LinkedList<>(); + for (int cc = 0; cc < clientCount; cc++) { + Client newClient = clientCreator.get(); + newClient.connectToRedis( + new ConnectionSettings( + config.host, config.port, config.tls, config.clusterModeEnabled)); + clients.add(newClient); + } + + var clientName = clients.get(0).getName(); + + System.out.printf( + "%n =====> %s <===== %d clients %d concurrent %n%n", + clientName, clientCount, concurrentNum); + AtomicInteger iterationCounter = new AtomicInteger(0); + + long started = System.nanoTime(); + List>>> asyncTasks = + new ArrayList<>(); + for (int taskNum = 0; taskNum < concurrentNum; taskNum++) { + final int taskNumDebugging = taskNum; + asyncTasks.add( + CompletableFuture.supplyAsync( + () -> { + Map> taskActionResults = + Map.of( + ChosenAction.GET_EXISTING, new ArrayList<>(), + ChosenAction.GET_NON_EXISTING, new ArrayList<>(), + ChosenAction.SET, new ArrayList<>()); + int iterationIncrement = iterationCounter.getAndIncrement(); + int clientIndex = iterationIncrement % clients.size(); + + if (config.debugLogging) { + System.out.printf( + "%n concurrent = %d/%d, client# = %d/%d%n", + taskNumDebugging, concurrentNum, clientIndex + 1, clientCount); + } + while (iterationIncrement < iterations) { + if (config.debugLogging) { + System.out.printf( + "> iteration = %d/%d, client# = %d/%d%n", + iterationIncrement + 1, iterations, clientIndex + 1, clientCount); + } + + var actions = getActionMap(clients.get(clientIndex), dataSize, async); + // operate and calculate tik-tok + Pair result = measurePerformance(actions); + if (result != null) { + taskActionResults.get(result.getLeft()).add(result.getRight()); + } + + iterationIncrement = iterationCounter.getAndIncrement(); + clientIndex = iterationIncrement % clients.size(); + } + return taskActionResults; + })); + } + if (config.debugLogging) { + System.out.printf("%s client Benchmarking: %n", clientName); + System.out.printf( + "===> concurrentNum = %d, clientNum = %d, tasks = %d%n", + concurrentNum, clientCount, asyncTasks.size()); + } + + // This will start execution of all the concurrent tasks asynchronously + CompletableFuture>>[] completableAsyncTaskArray = + asyncTasks.toArray(new CompletableFuture[asyncTasks.size()]); + try { + // wait for all futures to complete + CompletableFuture.allOf(completableAsyncTaskArray).get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + long after = System.nanoTime(); + + // Map to save latency results separately for each action + Map> actionResults = + Map.of( + ChosenAction.GET_EXISTING, new ArrayList<>(), + ChosenAction.GET_NON_EXISTING, new ArrayList<>(), + ChosenAction.SET, new ArrayList<>()); + + // for each task, call future.get() to retrieve & save the result in the map + asyncTasks.forEach( + future -> { + try { + var futureResult = future.get(); + futureResult.forEach( + (action, result) -> actionResults.get(action).addAll(result)); + } catch (Exception e) { + e.printStackTrace(); + } + }); + var calculatedResults = calculateResults(actionResults); + + clients.forEach(Client::closeConnection); + + if (config.resultsFile.isPresent()) { + double tps = iterationCounter.get() * NANO_TO_SECONDS / (after - started); + JsonWriter.Write( + calculatedResults, + config.resultsFile.get(), + config.clusterModeEnabled, + dataSize, + clientName, + clientCount, + concurrentNum, + tps); + } + printResults(calculatedResults, (after - started) / TPS_NORMALIZATION, iterations); + } + } + } + + System.out.println(); + } + + public static Map getActionMap( + Client client, int dataSize, boolean async) { + + String value = "0".repeat(dataSize); + Map actions = new HashMap<>(); + actions.put( + ChosenAction.GET_EXISTING, + async + ? () -> + ((AsyncClient) client) + .asyncGet(generateKeySet()) + .get(ASYNC_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS) + : () -> ((SyncClient) client).get(generateKeySet())); + actions.put( + ChosenAction.GET_NON_EXISTING, + async + ? () -> + ((AsyncClient) client) + .asyncGet(generateKeyGet()) + .get(ASYNC_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS) + : () -> ((SyncClient) client).get(generateKeyGet())); + actions.put( + ChosenAction.SET, + async + ? () -> + ((AsyncClient) client) + .asyncSet(generateKeySet(), value) + .get(ASYNC_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS) + : () -> ((SyncClient) client).set(generateKeySet(), value)); + return actions; + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ChosenAction.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ChosenAction.java new file mode 100644 index 0000000000..bd9f01fd90 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ChosenAction.java @@ -0,0 +1,7 @@ +package javababushka.benchmarks.utils; + +public enum ChosenAction { + GET_NON_EXISTING, + GET_EXISTING, + SET +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java new file mode 100644 index 0000000000..91c11c76a8 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java @@ -0,0 +1,12 @@ +package javababushka.benchmarks.utils; + +import lombok.AllArgsConstructor; + +/** Redis-client settings */ +@AllArgsConstructor +public class ConnectionSettings { + public final String host; + public final int port; + public final boolean useSsl; + public final boolean clusterMode; +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java new file mode 100644 index 0000000000..24e658f56b --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java @@ -0,0 +1,104 @@ +package javababushka.benchmarks.utils; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import lombok.Builder; +import lombok.Getter; + +public class JsonWriter { + + public static void Write( + Map calculatedResults, + String resultsFile, + boolean isCluster, + int dataSize, + String client, + int clientCount, + int numOfTasks, + double tps) { + + try { + Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create(); + Collection recordings = new ArrayList<>(); + + Path path = Path.of(resultsFile); + if (Files.exists(path)) { + TypeToken> collectionType = new TypeToken<>() {}; + var json = new String(Files.readAllBytes(path)); + recordings = gson.fromJson(json, collectionType); + } + var data = + new Measurements.MeasurementsBuilder() + .is_cluster(isCluster) + .data_size(dataSize) + .client(client) + .client_count(clientCount) + .num_of_tasks(numOfTasks) + .tps(tps) + .get_existing_average_latency( + calculatedResults.get(ChosenAction.GET_EXISTING).avgLatency) + .get_existing_p50_latency(calculatedResults.get(ChosenAction.GET_EXISTING).p50Latency) + .get_existing_p90_latency(calculatedResults.get(ChosenAction.GET_EXISTING).p90Latency) + .get_existing_p99_latency(calculatedResults.get(ChosenAction.GET_EXISTING).p99Latency) + .get_existing_std_dev(calculatedResults.get(ChosenAction.GET_EXISTING).stdDeviation) + .get_non_existing_average_latency( + calculatedResults.get(ChosenAction.GET_NON_EXISTING).avgLatency) + .get_non_existing_p50_latency( + calculatedResults.get(ChosenAction.GET_NON_EXISTING).p50Latency) + .get_non_existing_p90_latency( + calculatedResults.get(ChosenAction.GET_NON_EXISTING).p90Latency) + .get_non_existing_p99_latency( + calculatedResults.get(ChosenAction.GET_NON_EXISTING).p99Latency) + .get_non_existing_std_dev( + calculatedResults.get(ChosenAction.GET_NON_EXISTING).stdDeviation) + .set_average_latency(calculatedResults.get(ChosenAction.SET).avgLatency) + .set_p50_latency(calculatedResults.get(ChosenAction.SET).p50Latency) + .set_p90_latency(calculatedResults.get(ChosenAction.SET).p90Latency) + .set_p99_latency(calculatedResults.get(ChosenAction.SET).p99Latency) + .set_std_dev(calculatedResults.get(ChosenAction.SET).stdDeviation) + .build(); + + recordings.add(data); + + Files.write(path, gson.toJson(recordings).getBytes()); + } catch (IOException e) { + System.out.printf( + "Failed to write measurement results into a file '%s': %s%n", + resultsFile, e.getMessage()); + e.printStackTrace(); + } + } + + @Getter + @Builder + public static class Measurements { + private String client; + private int client_count; + private int data_size; + private double get_existing_average_latency; + private double get_existing_p50_latency; + private double get_existing_p90_latency; + private double get_existing_p99_latency; + private double get_existing_std_dev; + private double get_non_existing_average_latency; + private double get_non_existing_p50_latency; + private double get_non_existing_p90_latency; + private double get_non_existing_p99_latency; + private double get_non_existing_std_dev; + private boolean is_cluster; + private int num_of_tasks; + private double set_average_latency; + private double set_p50_latency; + private double set_p90_latency; + private double set_p99_latency; + private double set_std_dev; + private double tps; + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java new file mode 100644 index 0000000000..5d25d72a67 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java @@ -0,0 +1,14 @@ +package javababushka.benchmarks.utils; + +import lombok.AllArgsConstructor; + +/** Raw timing results in nanoseconds */ +@AllArgsConstructor +public class LatencyResults { + public final double avgLatency; + public final double p50Latency; + public final double p90Latency; + public final double p99Latency; + public final double stdDeviation; + public final int totalHits; +} diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java b/java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java new file mode 100644 index 0000000000..9a6dbbe93e --- /dev/null +++ b/java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java @@ -0,0 +1,67 @@ +/* + * This Java source file was generated by the Gradle 'init' task. + */ +package javababushka.benchmarks.jedis; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javababushka.benchmarks.clients.jedis.JedisClient; +import javababushka.benchmarks.utils.Benchmarking; +import javababushka.benchmarks.utils.ChosenAction; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class JedisClientIT { + + private static JedisClient jedisClient; + + @BeforeAll + static void initializeJedisClient() { + jedisClient = new JedisClient(); + jedisClient.connectToRedis(); + } + + @Test + public void testResourceInfo() { + String result = jedisClient.info(); + + assertTrue(result.length() > 0); + } + + @Test + public void testResourceInfoBySection() { + String section = "Server"; + String result = jedisClient.info(section); + + assertTrue(result.length() > 0); + assertTrue(result.startsWith("# " + section)); + } + + @Test + public void testResourceSetGet() { + int iterations = 100000; + String value = "my-value"; + + Map actions = new HashMap<>(); + actions.put(ChosenAction.GET_EXISTING, () -> jedisClient.get(Benchmarking.generateKeySet())); + actions.put( + ChosenAction.GET_NON_EXISTING, () -> jedisClient.get(Benchmarking.generateKeyGet())); + actions.put(ChosenAction.SET, () -> jedisClient.set(Benchmarking.generateKeySet(), value)); + + Map> latencies = + Map.of( + ChosenAction.GET_EXISTING, new ArrayList<>(), + ChosenAction.GET_NON_EXISTING, new ArrayList<>(), + ChosenAction.SET, new ArrayList<>()); + for (int i = 0; i < iterations; i++) { + var latency = Benchmarking.measurePerformance(actions); + latencies.get(latency.getKey()).add(latency.getValue()); + } + + Benchmarking.printResults(Benchmarking.calculateResults(latencies), 0, iterations); + } +} diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java new file mode 100644 index 0000000000..92f81df2da --- /dev/null +++ b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java @@ -0,0 +1,78 @@ +/* + * This Java source file was generated by the Gradle 'init' task. + */ +package javababushka.benchmarks.lettuce; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +import io.lettuce.core.RedisFuture; +import javababushka.benchmarks.clients.lettuce.LettuceAsyncClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class LettuceAsyncClientIT { + + private static LettuceAsyncClient lettuceClient; + + private static LettuceAsyncClient otherLettuceClient; + + @BeforeAll + static void initializeLettuceClient() { + lettuceClient = new LettuceAsyncClient(); + lettuceClient.connectToRedis(); + + otherLettuceClient = new LettuceAsyncClient(); + otherLettuceClient.connectToRedis(); + } + + @AfterAll + static void closeConnection() { + lettuceClient.closeConnection(); + otherLettuceClient.closeConnection(); + } + + @Test + public void testResourceSetGet() { + String key = "key1"; + String value = "my-value-1"; + + String otherKey = "key2"; + String otherValue = "my-value-2"; + + RedisFuture setResult = lettuceClient.asyncSet(key, value); + RedisFuture otherSetResult = otherLettuceClient.asyncSet(otherKey, otherValue); + + // and wait for both clients + try { + lettuceClient.waitForResult(setResult); + } catch (Exception e) { + fail("SET result failed with exception " + e); + } + try { + otherLettuceClient.waitForResult(otherSetResult); + } catch (Exception e) { + fail("SET result on other client failed with exception " + e); + } + + RedisFuture getResult = lettuceClient.asyncGet(key); + RedisFuture otherGetResult = otherLettuceClient.asyncGet(otherKey); + String result = "invalid"; + String otherResult = "invalid"; + try { + result = (String) lettuceClient.waitForResult(getResult); + } catch (Exception e) { + fail("GET result failed with exception " + e); + } + + try { + otherResult = (String) otherLettuceClient.waitForResult(otherGetResult); + } catch (Exception e) { + fail("GET result on other client failed with exception " + e); + } + + assertEquals(value, result); + assertEquals(otherValue, otherResult); + } +} diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java new file mode 100644 index 0000000000..e9670e8dbb --- /dev/null +++ b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java @@ -0,0 +1,40 @@ +/* + * This Java source file was generated by the Gradle 'init' task. + */ +package javababushka.benchmarks.lettuce; + +import java.util.HashMap; +import javababushka.benchmarks.clients.lettuce.LettuceClient; +import javababushka.benchmarks.utils.Benchmarking; +import javababushka.benchmarks.utils.ChosenAction; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class LettuceClientIT { + + private static LettuceClient lettuceClient; + + @BeforeAll + static void initializeLettuceClient() { + lettuceClient = new LettuceClient(); + lettuceClient.connectToRedis(); + } + + @AfterAll + static void closeConnection() { + lettuceClient.closeConnection(); + } + + @Test + public void testResourceSetGet() { + int iterations = 100000; + String value = "my-value"; + + HashMap actions = new HashMap<>(); + actions.put(ChosenAction.GET_EXISTING, () -> lettuceClient.get(Benchmarking.generateKeySet())); + actions.put( + ChosenAction.GET_NON_EXISTING, () -> lettuceClient.get(Benchmarking.generateKeyGet())); + actions.put(ChosenAction.SET, () -> lettuceClient.set(Benchmarking.generateKeySet(), value)); + } +} diff --git a/java/build.gradle b/java/build.gradle index 390053f28d..340f2c98cb 100644 --- a/java/build.gradle +++ b/java/build.gradle @@ -98,4 +98,3 @@ spotless { } } // End of Spotless section - diff --git a/java/client/build.gradle b/java/client/build.gradle index a8de4b1b6e..8dd4e5b7c5 100644 --- a/java/client/build.gradle +++ b/java/client/build.gradle @@ -10,23 +10,37 @@ repositories { dependencies { implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.24.3' + implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.13.0' + + implementation group: 'io.netty', name: 'netty-handler', version: '4.1.100.Final' + // https://github.com/netty/netty/wiki/Native-transports + // Windows is not supported, because babushka does not support windows, because tokio does not support windows, because ... 42 + implementation group: 'io.netty', name: 'netty-transport-native-epoll', version: '4.1.100.Final', classifier: 'linux-x86_64' + implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-x86_64' + implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-aarch_64' } tasks.register('protobuf', Exec) { doFirst { - project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/org/babushka/javababushka/generated').toString()) + project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) } commandLine 'protoc', '-Iprotobuf=babushka-core/src/protobuf/', - '--java_out=java/client/src/main/java/org/babushka/javababushka/generated', + '--java_out=java/client/src/main/java/javababushka/generated', 'babushka-core/src/protobuf/connection_request.proto', 'babushka-core/src/protobuf/redis_request.proto', 'babushka-core/src/protobuf/response.proto' - workingDir Paths.get(project.rootDir.path, '../..').toFile() + workingDir Paths.get(project.rootDir.path, '..').toFile() +} + +tasks.register('cleanProtobuf') { + doFirst { + project.delete(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) + } } tasks.register('buildRust', Exec) { - commandLine 'cargo', 'build' + commandLine 'cargo', 'build', '--release' workingDir project.rootDir } @@ -45,3 +59,14 @@ tasks.register('buildAll') { finalizedBy 'build' } +compileJava.dependsOn('protobuf') +clean.dependsOn('cleanProtobuf') + +tasks.withType(Test) { + testLogging { + exceptionFormat "full" + events "started", "skipped", "passed", "failed" + showStandardStreams true + } + jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" +} diff --git a/java/client/src/main/java/javababushka/BabushkaCoreNativeDefinitions.java b/java/client/src/main/java/javababushka/BabushkaCoreNativeDefinitions.java new file mode 100644 index 0000000000..3f26ebef91 --- /dev/null +++ b/java/client/src/main/java/javababushka/BabushkaCoreNativeDefinitions.java @@ -0,0 +1,11 @@ +package javababushka; + +public class BabushkaCoreNativeDefinitions { + public static native String startSocketListenerExternal() throws Exception; + + public static native Object valueFromPointer(long pointer); + + static { + System.loadLibrary("javababushka"); + } +} diff --git a/java/client/src/main/java/javababushka/Client.java b/java/client/src/main/java/javababushka/Client.java new file mode 100644 index 0000000000..d6cb16e591 --- /dev/null +++ b/java/client/src/main/java/javababushka/Client.java @@ -0,0 +1,405 @@ +package javababushka; + +import static connection_request.ConnectionRequestOuterClass.AddressInfo; +import static connection_request.ConnectionRequestOuterClass.AuthenticationInfo; +import static connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import static connection_request.ConnectionRequestOuterClass.ConnectionRetryStrategy; +import static connection_request.ConnectionRequestOuterClass.ReadFromReplicaStrategy; +import static connection_request.ConnectionRequestOuterClass.TlsMode; +import static redis_request.RedisRequestOuterClass.Command; +import static redis_request.RedisRequestOuterClass.Command.ArgsArray; +import static redis_request.RedisRequestOuterClass.RedisRequest; +import static redis_request.RedisRequestOuterClass.RequestType; +import static redis_request.RedisRequestOuterClass.Routes; +import static redis_request.RedisRequestOuterClass.SimpleRoutes; +import static response.ResponseOuterClass.Response; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.unix.DomainSocketAddress; +import io.netty.channel.unix.UnixChannel; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.internal.logging.InternalLoggerFactory; +import io.netty.util.internal.logging.Slf4JLoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; + +public class Client implements AutoCloseable { + + private static final int RESPONSE_TIMEOUT_MILLISECONDS = 250; + private static final int CLIENT_CREATION_TIMEOUT_MILLISECONDS = 250; + private static final int HIGH_WRITE_WATERMARK = 4096; + private static final int LOW_WRITE_WATERMARK = 1024; + private static final long DEFAULT_TIMEOUT_MILLISECONDS = 1000; + public static boolean ALWAYS_FLUSH_ON_WRITE = true; + + // https://netty.io/3.6/api/org/jboss/netty/handler/queue/BufferedWriteHandler.html + // Flush every N bytes if !ALWAYS_FLUSH_ON_WRITE + public static int AUTO_FLUSH_THRESHOLD_BYTES = 512; // 1024; + private final AtomicInteger nonFlushedBytesCounter = new AtomicInteger(0); + + // Flush every N writes if !ALWAYS_FLUSH_ON_WRITE + public static int AUTO_FLUSH_THRESHOLD_WRITES = 10; + private final AtomicInteger nonFlushedWritesCounter = new AtomicInteger(0); + + // If !ALWAYS_FLUSH_ON_WRITE and a command has no response in N millis, flush (probably it wasn't + // send) + public static int AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS = 100; + // If !ALWAYS_FLUSH_ON_WRITE flush on timer (like a cron) + public static int AUTO_FLUSH_TIMER_MILLIS = 200; + + public static int PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS = 1000; + + // Futures to handle responses. Index is callback id, starting from 1 (0 index is for connection + // request always). + // Is it not a concurrent nor sync collection, but it is synced on adding. No removes. + private final List> responses = new ArrayList<>(); + // Unique offset for every client to avoid having multiple commands with the same id at a time. + // For debugging replace with: new Random().nextInt(1000) * 1000 + private final int callbackOffset = new Random().nextInt(); + + // TODO move to a [static] constructor. + private final String unixSocket = getSocket(); + + private static String getSocket() { + try { + return BabushkaCoreNativeDefinitions.startSocketListenerExternal(); + } catch (Exception | UnsatisfiedLinkError e) { + System.err.printf("Failed to get UDS from babushka and dedushka: %s%n%n", e); + throw new RuntimeException(e); + } + } + + private Channel channel = null; + private EventLoopGroup group = null; + + // We support MacOS and Linux only, because Babushka does not support Windows, because tokio does + // not support it. + // Probably we should use NIO (NioEventLoopGroup) for Windows. + private static final boolean isMacOs = isMacOs(); + + private static boolean isMacOs() { + try { + Class.forName("io.netty.channel.kqueue.KQueue"); + return KQueue.isAvailable(); + } catch (ClassNotFoundException e) { + return false; + } + } + + static { + // TODO fix: netty still doesn't use slf4j nor log4j + InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); + } + + private void createChannel() { + // TODO maybe move to constructor or to static? + try { + channel = + new Bootstrap() + .option( + ChannelOption.WRITE_BUFFER_WATER_MARK, + new WriteBufferWaterMark(LOW_WRITE_WATERMARK, HIGH_WRITE_WATERMARK)) + .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) + .group(group = isMacOs ? new KQueueEventLoopGroup() : new EpollEventLoopGroup()) + .channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + public void initChannel(UnixChannel ch) throws Exception { + ch.pipeline() + .addLast("logger", new LoggingHandler(LogLevel.DEBUG)) + // https://netty.io/4.1/api/io/netty/handler/codec/protobuf/ProtobufEncoder.html + .addLast("protobufDecoder", new ProtobufVarint32FrameDecoder()) + .addLast("protobufEncoder", new ProtobufVarint32LengthFieldPrepender()) + .addLast( + new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + // System.out.printf("=== channelRead %s %s %n", ctx, msg); + var buf = (ByteBuf) msg; + var bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + // TODO surround parsing with try-catch, set error to future if + // parsing failed. + var response = Response.parseFrom(bytes); + int callbackId = response.getCallbackIdx(); + if (callbackId != 0) { + // connection request has hardcoded callback id = 0 + // https://github.com/aws/babushka/issues/600 + callbackId -= callbackOffset; + } + // System.out.printf("== Received response with callback %d%n", + // response.getCallbackIdx()); + responses.get(callbackId).complete(response); + responses.set(callbackId, null); + super.channelRead(ctx, bytes); + } + + @Override + public void exceptionCaught( + ChannelHandlerContext ctx, Throwable cause) throws Exception { + System.out.printf("=== exceptionCaught %s %s %n", ctx, cause); + cause.printStackTrace(); + super.exceptionCaught(ctx, cause); + } + }) + .addLast( + new ChannelOutboundHandlerAdapter() { + @Override + public void write( + ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + // System.out.printf("=== write %s %s %s %n", ctx, msg, promise); + var bytes = (byte[]) msg; + + boolean needFlush = false; + if (!ALWAYS_FLUSH_ON_WRITE) { + synchronized (nonFlushedBytesCounter) { + if (nonFlushedBytesCounter.addAndGet(bytes.length) + >= AUTO_FLUSH_THRESHOLD_BYTES + || nonFlushedWritesCounter.incrementAndGet() + >= AUTO_FLUSH_THRESHOLD_WRITES) { + nonFlushedBytesCounter.set(0); + nonFlushedWritesCounter.set(0); + needFlush = true; + } + } + } + super.write(ctx, Unpooled.copiedBuffer(bytes), promise); + if (needFlush) { + // flush outside the sync block + flush(ctx); + // System.out.println("-- auto flush - buffer"); + } + } + }); + } + }) + .connect(new DomainSocketAddress(unixSocket)) + .sync() + .channel(); + + } catch (Exception e) { + System.err.printf( + "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); + e.printStackTrace(System.err); + } + + if (!ALWAYS_FLUSH_ON_WRITE) { + new Timer(true) + .scheduleAtFixedRate( + new TimerTask() { + @Override + public void run() { + channel.flush(); + nonFlushedBytesCounter.set(0); + nonFlushedWritesCounter.set(0); + } + }, + 0, + AUTO_FLUSH_TIMER_MILLIS); + } + } + + public void closeConnection() { + + // flush and close the channel + channel.flush(); + channel.close(); + // TODO: check that the channel is closed + + // shutdown the event loop group gracefully by waiting for the remaining response + // and then shutting down the connection + try { + long waitStarted = System.nanoTime(); + long waitUntil = + waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos + for (var responseFuture : responses) { + if (responseFuture == null || responseFuture.isDone()) { + continue; + } + try { + responseFuture.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS); + } catch (InterruptedException | ExecutionException ignored) { + // TODO: print warning + } catch (TimeoutException e) { + responseFuture.cancel(true); + // TODO: cancel the rest + break; + } + } + } finally { + var shuttingDown = group.shutdownGracefully(); + try { + shuttingDown.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + assert group.isShutdown() : "Redis connection did not shutdown gracefully"; + } + } + + public void set(String key, String value) { + waitForResult(asyncSet(key, value)); + // TODO parse response and rethrow an exception if there is an error + } + + public String get(String key) { + return waitForResult(asyncGet(key)); + // TODO support non-strings + } + + private synchronized Pair> getNextCallback() { + var future = new CompletableFuture(); + responses.add(future); + return Pair.of(responses.size() - 1, future); + } + + @Override + public void close() throws Exception { + closeConnection(); + } + + public Future asyncConnectToRedis( + String host, int port, boolean useSsl, boolean clusterMode) { + createChannel(); + + var request = + ConnectionRequest.newBuilder() + .addAddresses(AddressInfo.newBuilder().setHost(host).setPort(port).build()) + .setTlsMode( + useSsl // TODO: secure or insecure TLS? + ? TlsMode.SecureTls + : TlsMode.NoTls) + .setClusterModeEnabled(clusterMode) + .setResponseTimeout(RESPONSE_TIMEOUT_MILLISECONDS) + .setClientCreationTimeout(CLIENT_CREATION_TIMEOUT_MILLISECONDS) + .setReadFromReplicaStrategy(ReadFromReplicaStrategy.AlwaysFromPrimary) + .setConnectionRetryStrategy( + ConnectionRetryStrategy.newBuilder() + .setNumberOfRetries(1) + .setFactor(1) + .setExponentBase(1) + .build()) + .setAuthenticationInfo( + AuthenticationInfo.newBuilder().setPassword("").setUsername("default").build()) + .setDatabaseId(0) + .build(); + + var future = new CompletableFuture(); + responses.add(future); + channel.writeAndFlush(request.toByteArray()); + return future; + } + + private CompletableFuture submitNewCommand(RequestType command, List args) { + var commandId = getNextCallback(); + // System.out.printf("== %s(%s), callback %d%n", command, String.join(", ", args), commandId); + + return CompletableFuture.supplyAsync( + () -> { + var commandArgs = ArgsArray.newBuilder(); + for (var arg : args) { + commandArgs.addArgs(arg); + } + + RedisRequest request = + RedisRequest.newBuilder() + .setCallbackIdx(commandId.getKey() + callbackOffset) + .setSingleCommand( + Command.newBuilder() + .setRequestType(command) + .setArgsArray(commandArgs.build()) + .build()) + .setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes).build()) + .build(); + if (ALWAYS_FLUSH_ON_WRITE) { + channel.writeAndFlush(request.toByteArray()); + return commandId.getRight(); + } + channel.write(request.toByteArray()); + return autoFlushFutureWrapper(commandId.getRight()); + }) + .thenCompose(f -> f); + } + + private CompletableFuture autoFlushFutureWrapper(Future future) { + return CompletableFuture.supplyAsync( + () -> { + try { + return future.get(AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + // System.out.println("-- auto flush - timeout"); + channel.flush(); + nonFlushedBytesCounter.set(0); + nonFlushedWritesCounter.set(0); + } + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + + public Future asyncSet(String key, String value) { + // System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId); + return submitNewCommand(RequestType.SetString, List.of(key, value)); + } + + public Future asyncGet(String key) { + // System.out.printf("== get(%s), callback %d%n", key, callbackId); + return submitNewCommand(RequestType.GetString, List.of(key)) + .thenApply( + response -> + response.getRespPointer() != 0 + ? BabushkaCoreNativeDefinitions.valueFromPointer(response.getRespPointer()) + .toString() + : null); + } + + public T waitForResult(Future future) { + return waitForResult(future, DEFAULT_TIMEOUT_MILLISECONDS); + } + + public T waitForResult(Future future, long timeout) { + try { + return future.get(timeout, TimeUnit.MILLISECONDS); + } catch (Exception ignored) { + return null; + } + } +} diff --git a/java/gradlew.bat b/java/gradlew.bat old mode 100644 new mode 100755 diff --git a/java/src/lib.rs b/java/src/lib.rs index e69de29bb2..468d8797e7 100644 --- a/java/src/lib.rs +++ b/java/src/lib.rs @@ -0,0 +1,98 @@ +use babushka::start_socket_listener; + +use jni::objects::{JClass, JObject, JThrowable}; +use jni::JNIEnv; +use jni::sys::jlong; +use std::sync::mpsc; +use log::error; +use logger_core::Level; +use redis::Value; + +fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject { + match val { + Value::Nil => JObject::null(), + Value::Status(str) => JObject::from(env.new_string(str).unwrap()), + Value::Okay => JObject::from(env.new_string("OK").unwrap()), + // TODO use primitive integer + Value::Int(num) => env.new_object("java/lang/Integer", "(I)V", &[num.into()]).unwrap(), + Value::Data(data) => match std::str::from_utf8(data.as_ref()) { + Ok(val) => JObject::from(env.new_string(val).unwrap()), + Err(_err) => { + let _ = env.throw("Error decoding Unicode data"); + JObject::null() + }, + }, + Value::Bulk(_bulk) => { + let _ = env.throw("Not implemented"); + JObject::null() + /* + let elements: &PyList = PyList::new( + py, + bulk.into_iter() + .map(|item| redis_value_to_py(py, item).unwrap()), + ); + Ok(elements.into_py(py)) + */ + } + } +} + +#[no_mangle] +pub extern "system" fn Java_javababushka_BabushkaCoreNativeDefinitions_valueFromPointer<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + pointer: jlong +) -> JObject<'local> { + let value = unsafe { Box::from_raw(pointer as *mut Value) }; + redis_value_to_java(env, *value) +} + +#[no_mangle] +pub extern "system" fn Java_javababushka_BabushkaCoreNativeDefinitions_startSocketListenerExternal<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local> +) -> JObject<'local> { + let (tx, rx) = mpsc::channel::>(); + + //logger_core::init(Some(Level::Trace), None); + + start_socket_listener(move |socket_path : Result| { + // Signals that thread has started + let _ = tx.send(socket_path); + }); + + // Wait until the thread has started + let socket_path = rx.recv(); + + match socket_path { + Ok(Ok(path)) => { + env.new_string(path).unwrap().into() + }, + Ok(Err(error_message)) => { + throw_java_exception(env, error_message); + JObject::null() + }, + Err(error) => { + throw_java_exception(env, error.to_string()); + JObject::null() + } + } +} + +fn throw_java_exception(mut env: JNIEnv, message: String) { + let res = env.new_object( + "java/lang/Exception", + "(Ljava/lang/String;)V", + &[ + (&env.new_string(message.clone()).unwrap()).into(), + ]); + + match res { + Ok(res) => { + let _ = env.throw(JThrowable::from(res)); + }, + Err(err) => { + error!("Failed to create exception with string {}: {}", message, err.to_string()); + } + }; +}