Skip to content

Commit

Permalink
Merge pull request #326 from aerospike/stage
Browse files Browse the repository at this point in the history
REL-2539 Java Client 8.1.2 for JDK21
  • Loading branch information
BrianNichols authored Jun 25, 2024
2 parents 2a1549a + 4dac236 commit 3826185
Show file tree
Hide file tree
Showing 39 changed files with 247 additions and 129 deletions.
2 changes: 1 addition & 1 deletion benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-parent</artifactId>
<version>8.1.1</version>
<version>8.1.2</version>
</parent>
<artifactId>aerospike-benchmarks</artifactId>
<packaging>jar</packaging>
Expand Down
3 changes: 2 additions & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-parent</artifactId>
<version>8.1.1</version>
<version>8.1.2</version>
</parent>
<artifactId>aerospike-client-jdk21</artifactId>
<packaging>jar</packaging>
Expand Down Expand Up @@ -129,6 +129,7 @@
<include>com/aerospike/client/listener/*</include>
<include>com/aerospike/client/command/ParticleType.java</include>
<include>com/aerospike/client/exp/*</include>
<include>com/aerospike/client/metrics/*</include>
<include>com/aerospike/client/operation/*</include>
<include>com/aerospike/client/policy/*</include>
<include>com/aerospike/client/task/*</include>
Expand Down
33 changes: 14 additions & 19 deletions client/src/com/aerospike/client/AerospikeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,9 @@ public final Record[] get(BatchPolicy policy, Key[] keys, String... binNames)
policy = batchPolicyDefault;
}

int readAttr = (binNames == null || binNames.length == 0)?
Command.INFO1_READ | Command.INFO1_GET_ALL : Command.INFO1_READ;

Record[] records = new Record[keys.length];

try {
Expand All @@ -1802,7 +1805,7 @@ public final Record[] get(BatchPolicy policy, Key[] keys, String... binNames)
}
else {
commands[count++] = new Batch.GetArrayCommand(
cluster, bn, policy, keys, binNames, null, records, Command.INFO1_READ, false, status);
cluster, bn, policy, keys, binNames, null, records, readAttr, false, status);
}
}
BatchExecutor.execute(cluster, policy, commands, status);
Expand Down Expand Up @@ -1844,6 +1847,9 @@ public final void get(EventLoop eventLoop, RecordArrayListener listener, BatchPo
policy = batchPolicyDefault;
}

int readAttr = (binNames == null || binNames.length == 0)?
Command.INFO1_READ | Command.INFO1_GET_ALL : Command.INFO1_READ;

Record[] records = new Record[keys.length];
AsyncBatchExecutor.GetArray executor = new AsyncBatchExecutor.GetArray(
eventLoop, cluster, listener, keys, records);
Expand All @@ -1859,7 +1865,7 @@ public final void get(EventLoop eventLoop, RecordArrayListener listener, BatchPo
}
else {
commands[count++] = new AsyncBatch.GetArrayCommand(
executor, bn, policy, keys, binNames, null, records, Command.INFO1_READ, false);
executor, bn, policy, keys, binNames, null, records, readAttr, false);
}
}
executor.execute(commands);
Expand Down Expand Up @@ -1896,6 +1902,9 @@ public final void get(EventLoop eventLoop, RecordSequenceListener listener, Batc
policy = batchPolicyDefault;
}

int readAttr = (binNames == null || binNames.length == 0)?
Command.INFO1_READ | Command.INFO1_GET_ALL : Command.INFO1_READ;

AsyncBatchExecutor.GetSequence executor = new AsyncBatchExecutor.GetSequence(eventLoop, cluster, listener);
List<BatchNode> bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor);
AsyncCommand[] commands = new AsyncCommand[bns.size()];
Expand All @@ -1909,7 +1918,7 @@ public final void get(EventLoop eventLoop, RecordSequenceListener listener, Batc
}
else {
commands[count++] = new AsyncBatch.GetSequenceCommand(
executor, bn, policy, keys, binNames, null, listener, Command.INFO1_READ, false);
executor, bn, policy, keys, binNames, null, listener, readAttr, false);
}
}
executor.execute(commands);
Expand Down Expand Up @@ -4461,21 +4470,7 @@ public void onFailure(AerospikeException ae) {
}

private static int parseIndexErrorCode(String response) {
int code = 0;

try {
String[] list = response.split(":");

if (list.length >= 2 && list[0].equals("FAIL")) {
code = Integer.parseInt(list[1]);
}
}
catch (Throwable e) {
}

if (code == 0) {
code = ResultCode.SERVER_ERROR;
}
return code;
Info.Error error = new Info.Error(response);
return (error.code == 0)? ResultCode.SERVER_ERROR : error.code;
}
}
86 changes: 64 additions & 22 deletions client/src/com/aerospike/client/Info.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,30 +319,15 @@ public static int parseResultCode(String response) {
return ResultCode.OK;
}

// Error format: ERROR|FAIL[:<code>][:<message>]
try {
String[] list = response.split(":");
String s = list[0];

if (s.regionMatches(true, 0, "FAIL", 0, 4) ||
s.regionMatches(true, 0, "ERROR", 0, 5)) {
Info.Error error = new Info.Error(response);

if (list.length > 1) {
s = list[1].trim();

if (! s.isEmpty()) {
return Integer.parseInt(s);
}
}
return ResultCode.SERVER_ERROR;
}
throw new AerospikeException("Unrecognized info response: " + response);
}
catch (AerospikeException ae) {
throw ae;
if (error.code >= 0) {
// Server errors return error code.
return error.code;
}
catch (Throwable t) {
throw new AerospikeException("Unrecognized info response: " + response, t);
else {
// Client errors result in a exception.
throw new AerospikeException(error.code, "Unrecognized info response: " + response);
}
}

Expand Down Expand Up @@ -828,4 +813,61 @@ public String getStringBase64() {
return Buffer.utf8ToString(bytes, 0, bytes.length);
}
}

/**
* Info command error response.
*/
public static class Error {
public final int code;
public final String message;

/**
* Parse info command response into code and message.
* If the response is not a recognized error format, the code is set to
* {@link ResultCode#CLIENT_ERROR} and the message is set to the full
* response string.
*/
public Error(String response) {
// Error format: ERROR|FAIL[:<code>][:<message>]
int rc = ResultCode.CLIENT_ERROR;
String msg = response;

try {
String[] list = response.split(":");
String s = list[0];

if (s.regionMatches(true, 0, "FAIL", 0, 4) ||
s.regionMatches(true, 0, "ERROR", 0, 5)) {

if (list.length >= 3) {
msg = list[2].trim();
s = list[1].trim();

if (! s.isEmpty()) {
rc = Integer.parseInt(s);
}
}
else if (list.length == 2) {
s = list[1].trim();

if (! s.isEmpty()) {
try {
rc = Integer.parseInt(s);
}
catch (Throwable t) {
// Some error strings omit the code and just have a message.
msg = s;
}
}
}
}
}
catch (Throwable t) {
}
finally {
this.code = rc;
this.message = msg;
}
}
}
}
1 change: 0 additions & 1 deletion client/src/com/aerospike/client/async/AsyncBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package com.aerospike.client.async;

import java.util.ArrayList;
import java.util.List;

import com.aerospike.client.AerospikeClient;
Expand Down
2 changes: 2 additions & 0 deletions client/src/com/aerospike/client/cluster/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ public void close() {
}

public static final class ReadTimeout extends RuntimeException {
private static final long serialVersionUID = 1L;

public final byte[] buffer;
public final int offset;
public final int length;
Expand Down
2 changes: 1 addition & 1 deletion client/src/com/aerospike/client/command/BatchAttr.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public BatchAttr(Policy policy, int rattr) {

public BatchAttr(Policy policy, int rattr, Operation[] ops) {
setRead(policy);
this.readAttr |= rattr;
this.readAttr = rattr;

if (ops != null) {
adjustRead(ops);
Expand Down
3 changes: 0 additions & 3 deletions client/src/com/aerospike/client/command/BatchExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.cluster.Cluster;
Expand Down
63 changes: 30 additions & 33 deletions client/src/com/aerospike/client/command/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,53 +241,45 @@ public final void setExists(Policy policy, Key key) {
end();
}

private final void setRead(Policy policy, Key key) {
public final void setRead(Policy policy, Key key, String[] binNames) {
int readAttr = Command.INFO1_READ;
int opCount = 0;

if (binNames != null && binNames.length > 0) {
opCount = binNames.length;
}
else {
readAttr |= Command.INFO1_GET_ALL;
}

begin();
int fieldCount = estimateKeySize(policy, key);

if (policy.filterExp != null) {
dataOffset += policy.filterExp.size();
fieldCount++;
}

if (opCount != 0) {
for (String binName : binNames) {
estimateOperationSize(binName);
}
}

sizeBuffer();
writeHeaderRead(policy, serverTimeout, Command.INFO1_READ | Command.INFO1_GET_ALL, 0, 0, fieldCount, 0);
writeHeaderRead(policy, serverTimeout, readAttr, 0, 0, fieldCount, opCount);
writeKey(policy, key);

if (policy.filterExp != null) {
policy.filterExp.write(this);
}
end();
}

public final void setRead(Policy policy, Key key, String[] binNames) {
if (binNames != null) {
begin();
int fieldCount = estimateKeySize(policy, key);

if (policy.filterExp != null) {
dataOffset += policy.filterExp.size();
fieldCount++;
}

for (String binName : binNames) {
estimateOperationSize(binName);
}
sizeBuffer();
writeHeaderRead(policy, serverTimeout, Command.INFO1_READ, 0, 0, fieldCount, binNames.length);
writeKey(policy, key);

if (policy.filterExp != null) {
policy.filterExp.write(this);
}

if (opCount != 0) {
for (String binName : binNames) {
writeOperation(binName, Operation.Type.READ);
}
end();
}
else {
setRead(policy, key);
}
end();
}

public final void setRead(Policy policy, BatchRead br) {
Expand Down Expand Up @@ -851,7 +843,13 @@ public final void setBatchOperate(
}

if (br.binNames != null) {
writeBatchBinNames(key, br.binNames, attr, attr.filterExp);
if (br.binNames.length > 0) {
writeBatchBinNames(key, br.binNames, attr, attr.filterExp);
}
else {
attr.adjustRead(true);
writeBatchRead(key, attr, attr.filterExp, 0);
}
}
else if (br.ops != null) {
attr.adjustRead(br.ops);
Expand Down Expand Up @@ -1330,9 +1328,8 @@ public final void setScan(
}

// Clusters that support partition queries also support not sending partition done messages.
int infoAttr = cluster.hasPartitionQuery? Command.INFO3_PARTITION_DONE : 0;
int operationCount = (binNames == null)? 0 : binNames.length;
writeHeaderRead(policy, totalTimeout, readAttr, 0, infoAttr, fieldCount, operationCount);
writeHeaderRead(policy, totalTimeout, readAttr, 0, Command.INFO3_PARTITION_DONE, fieldCount, operationCount);

if (namespace != null) {
writeField(namespace, FieldType.NAMESPACE);
Expand Down Expand Up @@ -1582,7 +1579,7 @@ else if (qp.expectedDuration == QueryDuration.LONG_RELAX_AP) {
writeAttr |= Command.INFO2_RELAX_AP_LONG_QUERY;
}

int infoAttr = isNew? Command.INFO3_PARTITION_DONE : 0;
int infoAttr = (isNew || filter == null)? Command.INFO3_PARTITION_DONE : 0;

writeHeaderRead(policy, totalTimeout, readAttr, writeAttr, infoAttr, fieldCount, operationCount);
}
Expand Down
4 changes: 2 additions & 2 deletions client/src/com/aerospike/client/metrics/LatencyBuckets.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 Aerospike, Inc.
* Copyright 2012-2024 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
Expand Down Expand Up @@ -33,7 +33,7 @@ public final class LatencyBuckets {
*
* @param latencyColumns number of latency buckets
* @param latencyShift power of 2 multiple between each range bucket in latency histograms starting at bucket 3.
* The first 2 buckets are "<=1ms" and ">1ms".
* The first 2 buckets are "&lt;=1ms" and "&gt;1ms".
*/
public LatencyBuckets(int latencyColumns, int latencyShift) {
this.latencyShift = latencyShift;
Expand Down
5 changes: 4 additions & 1 deletion client/src/com/aerospike/client/metrics/LatencyType.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 Aerospike, Inc.
* Copyright 2012-2024 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
Expand All @@ -16,6 +16,9 @@
*/
package com.aerospike.client.metrics;

/**
* Latency group type.
*/
public enum LatencyType {
CONN,
WRITE,
Expand Down
2 changes: 1 addition & 1 deletion client/src/com/aerospike/client/metrics/MetricsPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class MetricsPolicy {

/**
* Power of 2 multiple between each range bucket in latency histograms starting at column 3. The bucket units
* are in milliseconds. The first 2 buckets are "<=1ms" and ">1ms". Examples:
* are in milliseconds. The first 2 buckets are "&lt;=1ms" and "&gt;1ms". Examples:
* <pre>{@code
* // latencyColumns=7 latencyShift=1
* <=1ms >1ms >2ms >4ms >8ms >16ms >32ms
Expand Down
Loading

0 comments on commit 3826185

Please sign in to comment.