diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..fc15a6e --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,23 @@ +# This workflow will build the Java project with Maven + +name: Build + +on: + push: + branches: [ "dev" ] + pull_request: + branches: [ "dev" ] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up JDK + uses: actions/setup-java@v4 + with: + java-version: '21' + distribution: 'zulu' + cache: maven + - name: Build with Maven + run: mvn package diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..b2f79de --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,49 @@ +# This workflow will build the Java project with Maven and create a Github release + +name: Release +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +permissions: + contents: read + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up JDK + uses: actions/setup-java@v4 + with: + java-version: '21' + distribution: 'zulu' + cache: maven + - name: Build with Maven + run: mvn package + - name: Upload jar + uses: actions/upload-artifact@v4 + with: + name: maven-build-jar + path: target/aerospike-circuit-breaker-*-with-dependencies.jar + release: + runs-on: ubuntu-latest + needs: + - build + permissions: + contents: write + steps: + - uses: actions/checkout@v4 + - name: Download jar + uses: actions/download-artifact@v4 + with: + name: maven-build-jar + path: target/aerospike-circuit-breaker-*-with-dependencies.jar + - name: Release to Github + uses: ncipollo/release-action@v1 + with: + artifacts: "target/aerospike-circuit-breaker-*-with-dependencies.jar" + generateReleaseNotes: true + draft: true diff --git a/.gitignore b/.gitignore index 524f096..2140a37 100644 --- a/.gitignore +++ b/.gitignore @@ -1,24 +1,32 @@ -# Compiled class file -*.class +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ -# Log file -*.log +### IntelliJ IDEA ### +.idea/ -# BlueJ files -*.ctxt +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache -# Mobile Tools for Java (J2ME) -.mtj.tmp/ +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ -# Package Files # -*.jar -*.war -*.nar -*.ear -*.zip -*.tar.gz -*.rar +### VS Code ### +.vscode/ -# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml -hs_err_pid* -replay_pid* +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..26ae974 --- /dev/null +++ b/README.md @@ -0,0 +1,153 @@ +Aerospike Circuit Breaker in Java +================================= + +[![Aerospike Enterprise](https://img.shields.io/badge/Aerospike-Enterprise_Editition-C22127?labelColor=white&logo=aerospike&logoColor=C22127&style=flat)](https://aerospike.com/download/#aerospike-server-enterprise-edition) +[![Aerospike Community](https://img.shields.io/badge/Aerospike-Community_Editition-C22127?labelColor=white&logo=aerospike&logoColor=C22127&style=flat)](https://aerospike.com/download/#aerospike-server-community-edition) +[![Aerospike Java](https://img.shields.io/badge/Aerospike-Java_Client-C22127?labelColor=white&logo=aerospike&logoColor=C22127&style=flat)](https://aerospike.com/download/#aerospike-clients-java-client-library) + +[Aerospike](http://www.aerospike.com) is a low-latency distributed NoSQL database. This project is an example Java application that demonstrates how the "circuit breaker" design pattern is implemented in Aerospike. See my blog post [Aerospike Circuit Breaker Pattern](https://aerospike.com/blog/) for a complete discussion. + +This project uses the asynchronous API of the Aerospike Java Client. You can read about how that works in [Understanding Asynchronous Operations](https://aerospike.com/developer/tutorials/java/async_ops) on Aerospike Developer Hub. + +Questions, comments, feedback? Find me in the `#ask-the-community` channel in the [Aerospike Developer Discord server](https://discord.com/invite/NfC93wJEJU). + +Set Up +------ + +Clone the repo and build the package: + +```bash +git clone https://github.com/aerospike-examples/aerospike-circuit-breaker-java.git +cd aerospike-circuit-breaker-java +mvn package +``` + +For local testing you can run [Aerospike Database Enterprise in Docker](https://aerospike.com/docs/deploy_guides/docker/). Aerospike Enterprise comes with a free developer license for single-node configuration which is used for this example project. + +The following command will run Aerospike Database Enterprise using the free developer license in a container listening on port + +```bash +docker run -tid --name aerospike -p 3000:3000 \ +-v "$(pwd)/docker/opt/aerospike/conf":/opt/aerospike/conf \ +-e "FEATURE_KEY_FILE=/etc/aerospike/features.conf" \ +aerospike/aerospike-server-enterprise \ +--config-file /opt/aerospike/conf/aerospike.conf +``` + +Run the jar file passing in 3 command-line arguments: `HOST:PORT`, `MAX_ERROR_RATE`, and `WRITE_OPS`. The application will connect to the Aerospike Database running in Docker, set the [maxErrorRate](https://javadoc.io/doc/com.aerospike/aerospike-client/latest/com/aerospike/client/policy/ClientPolicy.html#maxErrorRate) policy to `MAX_ERROR_RATE`, and perform the number asynchronous write operations indicated by `WRITE_OPS`. + +For example, run the application against the Aerospike Database running in docker at `172.17.0.2` on port `3000`, set [maxErrorRate](https://javadoc.io/doc/com.aerospike/aerospike-client/latest/com/aerospike/client/policy/ClientPolicy.html#maxErrorRate) to `100`, and perform 10,000 write operations: + +```bash +java -jar \ +target/aerospike-circuit-breaker-java-1.0-SNAPSHOT-jar-with-dependencies.jar \ +172.17.0.2:3000 100 10000 +``` + +Output +------ + +The application will output a summary at the end of the run: + +``` +Run time: 0.283 seconds +Successful writes: 10000 +Connection errors: 0 +Timeout errors: 0 +Max errors exceeded: 0 +Node unavailable errors: 0 +Other errors: 0 +Connections opened: 96 +Connections closed: 0 +``` + +Demonstrating the Circuit Breaker +--------------------------------- + +### Step 1 - Run a steady-state baseline + +Perform 5 million write operations with `maxErrorRate=0` (disabled): + +```bash +java -jar \ +target/aerospike-circuit-breaker-java-1.0-SNAPSHOT-jar-with-dependencies.jar \ +172.17.0.2:3000 0 5000000 +``` + +The output shows that all write operations were successful: + +``` +Run time: 20.354 seconds +Successful writes: 5000000 +Connection errors: 0 +Timeout errors: 0 +Max errors exceeded: 0 +Node unavailable errors: 0 +Other errors: 0 +Connections opened: 96 +Connections closed: 0 +``` + +### Step 2 - Run with a 10 second period of latency without circuit breaker + +Perform 5 million write operations with `maxErrorRate=0` (disabled). __While the application is running__, add 20ms of latency to the Docker network interface, wait ~10 seconds, then remove the latency: + +_Terminal 1_ +```bash +java -jar \ +target/aerospike-circuit-breaker-java-1.0-SNAPSHOT-jar-with-dependencies.jar \ +172.17.0.2:3000 0 5000000 +``` + +_Terminal 2_ +```bash +sudo tc qdisc add dev docker0 root netem delay 20ms; +sleep 10; +sudo tc qdisc del dev docker0 root netem delay 20ms +``` + +The output now shows that the application slowed down while waiting on sockets (lower TPS), many timeout errors, and that each of those timeouts resulted in a churned connection: + +``` +Run time: 31.616 seconds +Successful writes: 4991677 +Connection errors: 0 +Timeout errors: 8323 +Max errors exceeded: 0 +Node unavailable errors: 0 +Other errors: 0 +Connections opened: 8419 +Connections closed: 8323 +``` + +### Step 3 - Run with a 10 second period of latency with circuit breaker + +Perform 5 million write operations with `maxErrorRate=100` (default). __While the application is running__, add 20ms of latency to the Docker network interface, wait 10 seconds, then remove the latency: + +_Terminal 1_ +```bash +java -jar \ +target/aerospike-circuit-breaker-java-1.0-SNAPSHOT-jar-with-dependencies.jar \ +172.17.0.2:3000 100 5000000 +``` + +_Terminal 2_ +```bash +sudo tc qdisc add dev docker0 root netem delay 20ms; +sleep 10; +sudo tc qdisc del dev docker0 root netem delay 20ms +``` + +The output now shows that the application didn't slow down waiting on sockets (same or better TPS), timeout errors were capped and thus connections churned were capped. + +``` +Run time: 16.205 seconds +Successful writes: 1373172 +Connection errors: 0 +Timeout errors: 1428 +Max errors exceeded: 3625400 +Node unavailable errors: 0 +Other errors: 0 +Connections opened: 1524 +Connections closed: 1428 +``` \ No newline at end of file diff --git a/docker/opt/aerospike/conf/aerospike.conf b/docker/opt/aerospike/conf/aerospike.conf new file mode 100644 index 0000000..a72c6c7 --- /dev/null +++ b/docker/opt/aerospike/conf/aerospike.conf @@ -0,0 +1,37 @@ +# Aerospike database developer configuration file. + +service { + proto-fd-max 1024 + cluster-name example + disable-udf-execution true +} + +logging { + console { + context any info + } +} + +network { + service { + address any + port 3000 + } + + heartbeat { + mode mesh + port 3002 + } + + fabric { + port 3001 + } +} + +namespace testNamespace { + replication-factor 1 + + storage-engine memory { + data-size 1G + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..4fdc449 --- /dev/null +++ b/pom.xml @@ -0,0 +1,59 @@ + + + 4.0.0 + + org.example + aerospike-circuit-breaker-java + 1.0-SNAPSHOT + + + 21 + 21 + UTF-8 + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.6.0 + + + + com.aerospike.examples.Example + + + + jar-with-dependencies + + + + + assemble-all + package + + single + + + + + + + + + + com.aerospike + aerospike-client + 8.0.0 + + + com.aerospike + aerospike-proxy-client + 8.1.0 + + + + \ No newline at end of file diff --git a/src/main/java/com/aerospike/examples/Example.java b/src/main/java/com/aerospike/examples/Example.java new file mode 100644 index 0000000..d253264 --- /dev/null +++ b/src/main/java/com/aerospike/examples/Example.java @@ -0,0 +1,233 @@ +package com.aerospike.examples; + +import com.aerospike.client.IAerospikeClient; +import com.aerospike.client.Host; +import com.aerospike.client.Key; +import com.aerospike.client.Bin; +import com.aerospike.client.Info; +import com.aerospike.client.cluster.Node; +import com.aerospike.client.policy.ClientPolicy; +import com.aerospike.client.policy.WritePolicy; +import com.aerospike.client.proxy.AerospikeClientFactory; +import com.aerospike.client.AerospikeException; +import com.aerospike.client.ResultCode; +import com.aerospike.client.async.EventPolicy; +import com.aerospike.client.async.EventLoop; +import com.aerospike.client.async.EventLoops; +import com.aerospike.client.async.NettyEventLoops; +import com.aerospike.client.async.Monitor; +import com.aerospike.client.async.Throttles; +import com.aerospike.client.listener.WriteListener; + +import io.netty.channel.nio.NioEventLoopGroup; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class Example { + private final AtomicInteger currentCount = new AtomicInteger(); + private final AtomicInteger successCount = new AtomicInteger(); + private final AtomicInteger connectErrorCounter = new AtomicInteger(); + private final AtomicInteger maxErrorCounter = new AtomicInteger(); + private final AtomicInteger timeoutErrorCounter = new AtomicInteger(); + private final AtomicInteger otherErrorCounter = new AtomicInteger(); + private final AtomicInteger nodeUnavailableCounter = new AtomicInteger(); + private Throttles throttles; + + public void runExample(Host[] hosts, int maxErrorRate, int write_ops) { + + ClientPolicy policy = new ClientPolicy(); + EventPolicy eventPolicy = new EventPolicy(); + + // the maxErrorRate acts as the "circuit breaker" + policy.maxErrorRate = maxErrorRate; + + // use the Aerospike client in async mode + int numLoops = Runtime.getRuntime().availableProcessors(); + int maxCommands = 96 / numLoops; + eventPolicy.maxCommandsInProcess = maxCommands; + eventPolicy.maxCommandsInQueue = write_ops; + throttles = new Throttles(numLoops, maxCommands); + NioEventLoopGroup eventGroup = new NioEventLoopGroup(numLoops); + EventLoops eventLoops = new NettyEventLoops(eventPolicy, eventGroup); + policy.eventLoops = eventLoops; + + System.out.println("Loops=" + numLoops + ", Max Commands=" + maxCommands); + + IAerospikeClient client = AerospikeClientFactory.getClient(policy, false, hosts); + + // check the number of opened and closed connections before and after running the operations + // to track "connection churn" + int startConnsOpened = 0; + int startConnsClosed = 0; + int stopConnsOpened = 0; + int stopConnsClosed = 0; + Node node = client.getNodes()[0]; + + try { + startConnsOpened = Integer.parseInt(getStat(node, "client_connections_opened")); + startConnsClosed = Integer.parseInt(getStat(node, "client_connections_closed")); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + + // lower the socket timeout in the write policy to 10ms so that introducing a latency + // greater than 10ms results in a timeout error + WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault()); + writePolicy.connectTimeout = 100; // 1s + writePolicy.socketTimeout = 10; // 10ms + + // submit n async write operations where n is the "write_ops" passed in as an argument + Monitor monitor = new Monitor(); + + // track the total run time + long startTime = System.nanoTime(); + + for (int i = 1; i <= write_ops; i++) { + Key key = new Key("testNamespace", "testSet", i); + Bin bin = new Bin("testBin", "testValue"); + + EventLoop eventLoop = eventLoops.next(); + int loopIndex = eventLoop.getIndex(); + + if (throttles.waitForSlot(loopIndex, 1)) { + try { + client.put(eventLoop, new WriteHandler(monitor, loopIndex, write_ops), writePolicy, key, bin); + } + catch (AerospikeException e) { + throttles.addSlot(loopIndex, 1); + System.out.println(e.getMessage()); + } + } + } + + monitor.waitTillComplete(); + + // calculate total run time + long endTime = System.nanoTime(); + double runTime = TimeUnit.MILLISECONDS.convert((endTime - startTime), TimeUnit.NANOSECONDS) / 1000.0; + + // store the number of opened and closed connections after running the operations + try { + stopConnsOpened = Integer.parseInt(getStat(node, "client_connections_opened")); + stopConnsClosed = Integer.parseInt(getStat(node, "client_connections_closed")); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + + System.out.println(); + System.out.println("Run time: " + runTime + " seconds"); + System.out.println("Successful writes: " + successCount); + System.out.println("Connection errors: " + connectErrorCounter); + System.out.println("Timeout errors: " + timeoutErrorCounter); + System.out.println("Max errors exceeded: " + maxErrorCounter); + System.out.println("Node unavailable errors: " + nodeUnavailableCounter); + System.out.println("Other errors: " + otherErrorCounter); + System.out.println("Connections opened: " + (stopConnsOpened - startConnsOpened)); + System.out.println("Connections closed: " + (stopConnsClosed - startConnsClosed)); + + eventLoops.close(); + client.close(); + } + + private class WriteHandler implements WriteListener { + private final Monitor monitor; + private final int loopIndex; + private final int totalCount; + + public WriteHandler(Monitor monitor, int loopIndex, int totalCount) { + this.monitor = monitor; + this.loopIndex = loopIndex; + this.totalCount = totalCount; + } + + public void onSuccess(Key key) { + throttles.addSlot(loopIndex, 1); + currentCount.getAndIncrement(); + successCount.getAndIncrement(); + showProgress(); + notifyIfComplete(); + } + + public void onFailure(AerospikeException e) { + throttles.addSlot(loopIndex, 1); + currentCount.getAndIncrement(); + + switch (e.getResultCode()) { + case ResultCode.SERVER_NOT_AVAILABLE: + // A connection error occurred which would cause the connection to close. + connectErrorCounter.getAndIncrement(); + //System.out.println(ae.getMessage()); + break; + case ResultCode.TIMEOUT: + // A socket timeout occurred which would cause the connection to close. + timeoutErrorCounter.getAndIncrement(); + break; + case ResultCode.MAX_ERROR_RATE: + // The maximum number of errors per tend interval (~1s) occurred which WILL NOT cause the + // connection to close. + maxErrorCounter.getAndIncrement(); + break; + case ResultCode.INVALID_NODE_ERROR: + // There are no nodes available in the cluster which would be the case in this single-node + // example when the network to that one node is broken. + nodeUnavailableCounter.getAndIncrement(); + break; + default: + System.out.println(e.getMessage()); + otherErrorCounter.getAndIncrement(); + } + + showProgress(); + notifyIfComplete(); + } + + private void showProgress() { + // progress indicator + int tenPercent = Math.floorDiv(totalCount, 10); + int current = currentCount.get(); + if (current % tenPercent == 0) { + System.out.println(current + " of " + totalCount + " records written"); + } + } + + private void notifyIfComplete() { + if (currentCount.get() >= totalCount) { + monitor.notifyComplete(); + } + } + } + + private static String getStat(Node node, String stat) throws Exception { + String statsLine = Info.request(null, node, "statistics"); + String[] stats = statsLine.split(";"); + + for (String value : stats) { + String[] tokens = value.split("="); + if (tokens[0].equals(stat)) return tokens[1]; + } + + throw new Exception("Stat not found: " + stat); + } + + public static void main(String[] args) { + + if (args.length < 3) { + usage(); + System.exit(-1); + } + + // parse command-line arguments + Host[] hosts = Host.parseHosts(args[0], 3000); + int maxErrorRate = Integer.parseInt(args[1]); + int write_ops = Integer.parseInt(args[2]); + + Example example = new Example(); + example.runExample(hosts, maxErrorRate, write_ops); + } + + private static void usage() { + System.out.println("Arguments:HOST:PORT MAX_ERROR_RATE WRITE_OPS"); + System.out.println("Example: 172.17.0.2:3000 5 10000"); + } +}