Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REL-2539 Java Client 8.1.2 for JDK21 #326

Merged
merged 9 commits into from
Jun 25, 2024
2 changes: 1 addition & 1 deletion benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-parent</artifactId>
<version>8.1.1</version>
<version>8.1.2</version>
</parent>
<artifactId>aerospike-benchmarks</artifactId>
<packaging>jar</packaging>
Expand Down
3 changes: 2 additions & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-parent</artifactId>
<version>8.1.1</version>
<version>8.1.2</version>
</parent>
<artifactId>aerospike-client-jdk21</artifactId>
<packaging>jar</packaging>
Expand Down Expand Up @@ -129,6 +129,7 @@
<include>com/aerospike/client/listener/*</include>
<include>com/aerospike/client/command/ParticleType.java</include>
<include>com/aerospike/client/exp/*</include>
<include>com/aerospike/client/metrics/*</include>
<include>com/aerospike/client/operation/*</include>
<include>com/aerospike/client/policy/*</include>
<include>com/aerospike/client/task/*</include>
Expand Down
33 changes: 14 additions & 19 deletions client/src/com/aerospike/client/AerospikeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,9 @@ public final Record[] get(BatchPolicy policy, Key[] keys, String... binNames)
policy = batchPolicyDefault;
}

int readAttr = (binNames == null || binNames.length == 0)?
Command.INFO1_READ | Command.INFO1_GET_ALL : Command.INFO1_READ;

Record[] records = new Record[keys.length];

try {
Expand All @@ -1802,7 +1805,7 @@ public final Record[] get(BatchPolicy policy, Key[] keys, String... binNames)
}
else {
commands[count++] = new Batch.GetArrayCommand(
cluster, bn, policy, keys, binNames, null, records, Command.INFO1_READ, false, status);
cluster, bn, policy, keys, binNames, null, records, readAttr, false, status);
}
}
BatchExecutor.execute(cluster, policy, commands, status);
Expand Down Expand Up @@ -1844,6 +1847,9 @@ public final void get(EventLoop eventLoop, RecordArrayListener listener, BatchPo
policy = batchPolicyDefault;
}

int readAttr = (binNames == null || binNames.length == 0)?
Command.INFO1_READ | Command.INFO1_GET_ALL : Command.INFO1_READ;

Record[] records = new Record[keys.length];
AsyncBatchExecutor.GetArray executor = new AsyncBatchExecutor.GetArray(
eventLoop, cluster, listener, keys, records);
Expand All @@ -1859,7 +1865,7 @@ public final void get(EventLoop eventLoop, RecordArrayListener listener, BatchPo
}
else {
commands[count++] = new AsyncBatch.GetArrayCommand(
executor, bn, policy, keys, binNames, null, records, Command.INFO1_READ, false);
executor, bn, policy, keys, binNames, null, records, readAttr, false);
}
}
executor.execute(commands);
Expand Down Expand Up @@ -1896,6 +1902,9 @@ public final void get(EventLoop eventLoop, RecordSequenceListener listener, Batc
policy = batchPolicyDefault;
}

int readAttr = (binNames == null || binNames.length == 0)?
Command.INFO1_READ | Command.INFO1_GET_ALL : Command.INFO1_READ;

AsyncBatchExecutor.GetSequence executor = new AsyncBatchExecutor.GetSequence(eventLoop, cluster, listener);
List<BatchNode> bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor);
AsyncCommand[] commands = new AsyncCommand[bns.size()];
Expand All @@ -1909,7 +1918,7 @@ public final void get(EventLoop eventLoop, RecordSequenceListener listener, Batc
}
else {
commands[count++] = new AsyncBatch.GetSequenceCommand(
executor, bn, policy, keys, binNames, null, listener, Command.INFO1_READ, false);
executor, bn, policy, keys, binNames, null, listener, readAttr, false);
}
}
executor.execute(commands);
Expand Down Expand Up @@ -4461,21 +4470,7 @@ public void onFailure(AerospikeException ae) {
}

private static int parseIndexErrorCode(String response) {
int code = 0;

try {
String[] list = response.split(":");

if (list.length >= 2 && list[0].equals("FAIL")) {
code = Integer.parseInt(list[1]);
}
}
catch (Throwable e) {
}

if (code == 0) {
code = ResultCode.SERVER_ERROR;
}
return code;
Info.Error error = new Info.Error(response);
return (error.code == 0)? ResultCode.SERVER_ERROR : error.code;
}
}
86 changes: 64 additions & 22 deletions client/src/com/aerospike/client/Info.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,30 +319,15 @@ public static int parseResultCode(String response) {
return ResultCode.OK;
}

// Error format: ERROR|FAIL[:<code>][:<message>]
try {
String[] list = response.split(":");
String s = list[0];

if (s.regionMatches(true, 0, "FAIL", 0, 4) ||
s.regionMatches(true, 0, "ERROR", 0, 5)) {
Info.Error error = new Info.Error(response);

if (list.length > 1) {
s = list[1].trim();

if (! s.isEmpty()) {
return Integer.parseInt(s);
}
}
return ResultCode.SERVER_ERROR;
}
throw new AerospikeException("Unrecognized info response: " + response);
}
catch (AerospikeException ae) {
throw ae;
if (error.code >= 0) {
// Server errors return error code.
return error.code;
}
catch (Throwable t) {
throw new AerospikeException("Unrecognized info response: " + response, t);
else {
// Client errors result in a exception.
throw new AerospikeException(error.code, "Unrecognized info response: " + response);
}
}

Expand Down Expand Up @@ -828,4 +813,61 @@ public String getStringBase64() {
return Buffer.utf8ToString(bytes, 0, bytes.length);
}
}

/**
* Info command error response.
*/
public static class Error {
public final int code;
public final String message;

/**
* Parse info command response into code and message.
* If the response is not a recognized error format, the code is set to
* {@link ResultCode#CLIENT_ERROR} and the message is set to the full
* response string.
*/
public Error(String response) {
// Error format: ERROR|FAIL[:<code>][:<message>]
int rc = ResultCode.CLIENT_ERROR;
String msg = response;

try {
String[] list = response.split(":");
String s = list[0];

if (s.regionMatches(true, 0, "FAIL", 0, 4) ||
s.regionMatches(true, 0, "ERROR", 0, 5)) {

if (list.length >= 3) {
msg = list[2].trim();
s = list[1].trim();

if (! s.isEmpty()) {
rc = Integer.parseInt(s);
}
}
else if (list.length == 2) {
s = list[1].trim();

if (! s.isEmpty()) {
try {
rc = Integer.parseInt(s);
}
catch (Throwable t) {
// Some error strings omit the code and just have a message.
msg = s;
}
}
}
}
}
catch (Throwable t) {
}
finally {
this.code = rc;
this.message = msg;
}
}
}
}
1 change: 0 additions & 1 deletion client/src/com/aerospike/client/async/AsyncBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package com.aerospike.client.async;

import java.util.ArrayList;
import java.util.List;

import com.aerospike.client.AerospikeClient;
Expand Down
2 changes: 2 additions & 0 deletions client/src/com/aerospike/client/cluster/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ public void close() {
}

public static final class ReadTimeout extends RuntimeException {
private static final long serialVersionUID = 1L;

public final byte[] buffer;
public final int offset;
public final int length;
Expand Down
2 changes: 1 addition & 1 deletion client/src/com/aerospike/client/command/BatchAttr.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public BatchAttr(Policy policy, int rattr) {

public BatchAttr(Policy policy, int rattr, Operation[] ops) {
setRead(policy);
this.readAttr |= rattr;
this.readAttr = rattr;

if (ops != null) {
adjustRead(ops);
Expand Down
3 changes: 0 additions & 3 deletions client/src/com/aerospike/client/command/BatchExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.cluster.Cluster;
Expand Down
63 changes: 30 additions & 33 deletions client/src/com/aerospike/client/command/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,53 +241,45 @@ public final void setExists(Policy policy, Key key) {
end();
}

private final void setRead(Policy policy, Key key) {
public final void setRead(Policy policy, Key key, String[] binNames) {
int readAttr = Command.INFO1_READ;
int opCount = 0;

if (binNames != null && binNames.length > 0) {
opCount = binNames.length;
}
else {
readAttr |= Command.INFO1_GET_ALL;
}

begin();
int fieldCount = estimateKeySize(policy, key);

if (policy.filterExp != null) {
dataOffset += policy.filterExp.size();
fieldCount++;
}

if (opCount != 0) {
for (String binName : binNames) {
estimateOperationSize(binName);
}
}

sizeBuffer();
writeHeaderRead(policy, serverTimeout, Command.INFO1_READ | Command.INFO1_GET_ALL, 0, 0, fieldCount, 0);
writeHeaderRead(policy, serverTimeout, readAttr, 0, 0, fieldCount, opCount);
writeKey(policy, key);

if (policy.filterExp != null) {
policy.filterExp.write(this);
}
end();
}

public final void setRead(Policy policy, Key key, String[] binNames) {
if (binNames != null) {
begin();
int fieldCount = estimateKeySize(policy, key);

if (policy.filterExp != null) {
dataOffset += policy.filterExp.size();
fieldCount++;
}

for (String binName : binNames) {
estimateOperationSize(binName);
}
sizeBuffer();
writeHeaderRead(policy, serverTimeout, Command.INFO1_READ, 0, 0, fieldCount, binNames.length);
writeKey(policy, key);

if (policy.filterExp != null) {
policy.filterExp.write(this);
}

if (opCount != 0) {
for (String binName : binNames) {
writeOperation(binName, Operation.Type.READ);
}
end();
}
else {
setRead(policy, key);
}
end();
}

public final void setRead(Policy policy, BatchRead br) {
Expand Down Expand Up @@ -851,7 +843,13 @@ public final void setBatchOperate(
}

if (br.binNames != null) {
writeBatchBinNames(key, br.binNames, attr, attr.filterExp);
if (br.binNames.length > 0) {
writeBatchBinNames(key, br.binNames, attr, attr.filterExp);
}
else {
attr.adjustRead(true);
writeBatchRead(key, attr, attr.filterExp, 0);
}
}
else if (br.ops != null) {
attr.adjustRead(br.ops);
Expand Down Expand Up @@ -1330,9 +1328,8 @@ public final void setScan(
}

// Clusters that support partition queries also support not sending partition done messages.
int infoAttr = cluster.hasPartitionQuery? Command.INFO3_PARTITION_DONE : 0;
int operationCount = (binNames == null)? 0 : binNames.length;
writeHeaderRead(policy, totalTimeout, readAttr, 0, infoAttr, fieldCount, operationCount);
writeHeaderRead(policy, totalTimeout, readAttr, 0, Command.INFO3_PARTITION_DONE, fieldCount, operationCount);

if (namespace != null) {
writeField(namespace, FieldType.NAMESPACE);
Expand Down Expand Up @@ -1582,7 +1579,7 @@ else if (qp.expectedDuration == QueryDuration.LONG_RELAX_AP) {
writeAttr |= Command.INFO2_RELAX_AP_LONG_QUERY;
}

int infoAttr = isNew? Command.INFO3_PARTITION_DONE : 0;
int infoAttr = (isNew || filter == null)? Command.INFO3_PARTITION_DONE : 0;

writeHeaderRead(policy, totalTimeout, readAttr, writeAttr, infoAttr, fieldCount, operationCount);
}
Expand Down
4 changes: 2 additions & 2 deletions client/src/com/aerospike/client/metrics/LatencyBuckets.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 Aerospike, Inc.
* Copyright 2012-2024 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
Expand Down Expand Up @@ -33,7 +33,7 @@ public final class LatencyBuckets {
*
* @param latencyColumns number of latency buckets
* @param latencyShift power of 2 multiple between each range bucket in latency histograms starting at bucket 3.
* The first 2 buckets are "<=1ms" and ">1ms".
* The first 2 buckets are "&lt;=1ms" and "&gt;1ms".
*/
public LatencyBuckets(int latencyColumns, int latencyShift) {
this.latencyShift = latencyShift;
Expand Down
5 changes: 4 additions & 1 deletion client/src/com/aerospike/client/metrics/LatencyType.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 Aerospike, Inc.
* Copyright 2012-2024 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
Expand All @@ -16,6 +16,9 @@
*/
package com.aerospike.client.metrics;

/**
* Latency group type.
*/
public enum LatencyType {
CONN,
WRITE,
Expand Down
2 changes: 1 addition & 1 deletion client/src/com/aerospike/client/metrics/MetricsPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class MetricsPolicy {

/**
* Power of 2 multiple between each range bucket in latency histograms starting at column 3. The bucket units
* are in milliseconds. The first 2 buckets are "<=1ms" and ">1ms". Examples:
* are in milliseconds. The first 2 buckets are "&lt;=1ms" and "&gt;1ms". Examples:
* <pre>{@code
* // latencyColumns=7 latencyShift=1
* <=1ms >1ms >2ms >4ms >8ms >16ms >32ms
Expand Down
Loading
Loading