Skip to content

Commit

Permalink
CLIENT-2293 Convert a batch node command to a single record command w…
Browse files Browse the repository at this point in the history
…hen the batch size for that node is one.

This improves performance for small batch sizes.
  • Loading branch information
BrianNichols committed Oct 23, 2023
1 parent ff1a649 commit 70d1b80
Show file tree
Hide file tree
Showing 18 changed files with 3,384 additions and 731 deletions.
770 changes: 702 additions & 68 deletions client/src/com/aerospike/client/AerospikeClient.java

Large diffs are not rendered by default.

515 changes: 21 additions & 494 deletions client/src/com/aerospike/client/async/AsyncBatch.java

Large diffs are not rendered by default.

289 changes: 281 additions & 8 deletions client/src/com/aerospike/client/async/AsyncBatchExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,327 @@
*/
package com.aerospike.client.async;

import java.util.List;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.BatchRead;
import com.aerospike.client.BatchRecord;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.async.AsyncBatch.AsyncBatchCommand;
import com.aerospike.client.cluster.Cluster;
import com.aerospike.client.command.BatchNodeList;
import com.aerospike.client.listener.BatchListListener;
import com.aerospike.client.listener.BatchOperateListListener;
import com.aerospike.client.listener.BatchRecordArrayListener;
import com.aerospike.client.listener.BatchRecordSequenceListener;
import com.aerospike.client.listener.BatchSequenceListener;
import com.aerospike.client.listener.ExistsArrayListener;
import com.aerospike.client.listener.ExistsSequenceListener;
import com.aerospike.client.listener.RecordArrayListener;
import com.aerospike.client.listener.RecordSequenceListener;

public abstract class AsyncBatchExecutor implements BatchNodeList.IBatchStatus {
public static final class BatchRecordArray extends AsyncBatchExecutor {
private final BatchRecordArrayListener listener;
private final BatchRecord[] records;

public BatchRecordArray(
EventLoop eventLoop,
Cluster cluster,
BatchRecordArrayListener listener,
BatchRecord[] records
) {
super(eventLoop, cluster, true);
this.listener = listener;
this.records = records;
}

protected void onSuccess() {
listener.onSuccess(records, getStatus());
}

protected void onFailure(AerospikeException ae) {
listener.onFailure(records, ae);
}
}

public static final class BatchRecordSequence extends AsyncBatchExecutor {
private final BatchRecordSequenceListener listener;
private final boolean[] sent;

public BatchRecordSequence(
EventLoop eventLoop,
Cluster cluster,
BatchRecordSequenceListener listener,
boolean[] sent
) {
super(eventLoop, cluster, true);
this.listener = listener;
this.sent = sent;
}

public void setSent(int index) {
sent[index] = true;
}

public boolean exchangeSent(int index) {
boolean prev = sent[index];
sent[index] = true;
return prev;
}

@Override
public void batchKeyError(Key key, int index, AerospikeException ae, boolean inDoubt, boolean hasWrite) {
sent[index] = true;
BatchRecord record = new BatchRecord(key, null, ae.getResultCode(), inDoubt, hasWrite);
AsyncBatch.onRecord(listener, record, index);
}

@Override
protected void onSuccess() {
listener.onSuccess();
}

@Override
protected void onFailure(AerospikeException ae) {
listener.onFailure(ae);
}
}

public static final class ExistsArray extends AsyncBatchExecutor {
private final ExistsArrayListener listener;
private final Key[] keys;
private final boolean[] existsArray;

public ExistsArray(
EventLoop eventLoop,
Cluster cluster,
ExistsArrayListener listener,
Key[] keys,
boolean[] existsArray
) {
super(eventLoop, cluster, false);
this.listener = listener;
this.keys = keys;
this.existsArray = existsArray;
}

protected void onSuccess() {
listener.onSuccess(keys, existsArray);
}

protected void onFailure(AerospikeException ae) {
listener.onFailure(new AerospikeException.BatchExists(existsArray, ae));
}
}

public static final class ExistsSequence extends AsyncBatchExecutor {
private final ExistsSequenceListener listener;

public ExistsSequence(
EventLoop eventLoop,
Cluster cluster,
ExistsSequenceListener listener
) {
super(eventLoop, cluster, false);
this.listener = listener;
}

protected void onSuccess() {
listener.onSuccess();
}

protected void onFailure(AerospikeException ae) {
listener.onFailure(ae);
}
}

public static final class ReadList extends AsyncBatchExecutor {
private final BatchListListener listener;
private final List<BatchRead> records;

public ReadList(
EventLoop eventLoop,
Cluster cluster,
BatchListListener listener,
List<BatchRead> records
) {
super(eventLoop, cluster, true);
this.listener = listener;
this.records = records;
}

protected void onSuccess() {
listener.onSuccess(records);
}

protected void onFailure(AerospikeException ae) {
listener.onFailure(ae);
}
}

public static final class ReadSequence extends AsyncBatchExecutor {
private final BatchSequenceListener listener;

public ReadSequence(
EventLoop eventLoop,
Cluster cluster,
BatchSequenceListener listener
) {
super(eventLoop, cluster, true);
this.listener = listener;
}

protected void onSuccess() {
listener.onSuccess();
}

protected void onFailure(AerospikeException ae) {
listener.onFailure(ae);
}
}

public static final class GetArray extends AsyncBatchExecutor {
private final RecordArrayListener listener;
private final Key[] keys;
private final Record[] records;

public GetArray(
EventLoop eventLoop,
Cluster cluster,
RecordArrayListener listener,
Key[] keys,
Record[] records
) {
super(eventLoop, cluster, false);
this.listener = listener;
this.keys = keys;
this.records = records;
}

protected void onSuccess() {
listener.onSuccess(keys, records);
}

protected void onFailure(AerospikeException ae) {
listener.onFailure(new AerospikeException.BatchRecords(records, ae));
}
}

public static final class GetSequence extends AsyncBatchExecutor {
private final RecordSequenceListener listener;

public GetSequence(
EventLoop eventLoop,
Cluster cluster,
RecordSequenceListener listener
) {
super(eventLoop, cluster, false);
this.listener = listener;
}

@Override
protected void onSuccess() {
listener.onSuccess();
}

@Override
protected void onFailure(AerospikeException ae) {
listener.onFailure(ae);
}
}

public static final class OperateList extends AsyncBatchExecutor {
private final BatchOperateListListener listener;
private final List<BatchRecord> records;

public OperateList(
EventLoop eventLoop,
Cluster cluster,
BatchOperateListListener listener,
List<BatchRecord> records
) {
super(eventLoop, cluster, true);
this.listener = listener;
this.records = records;
}

protected void onSuccess() {
listener.onSuccess(records, getStatus());
}

protected void onFailure(AerospikeException ae) {
listener.onFailure(ae);
}
}

public static final class OperateSequence extends AsyncBatchExecutor {
private final BatchRecordSequenceListener listener;

public OperateSequence(
EventLoop eventLoop,
Cluster cluster,
BatchRecordSequenceListener listener
) {
super(eventLoop, cluster, true);
this.listener = listener;
}

protected void onSuccess() {
listener.onSuccess();
}

protected void onFailure(AerospikeException ae) {
listener.onFailure(ae);
}
}

//-------------------------------------------------------
// Base Executor
//-------------------------------------------------------

final EventLoop eventLoop;
final Cluster cluster;
private AerospikeException exception;
private AsyncBatchCommand[] commands;
private AsyncCommand[] commands;
private int completedCount; // Not atomic because all commands run on same event loop thread.
private final boolean hasResultCode;
boolean done;
boolean error;

public AsyncBatchExecutor(EventLoop eventLoop, Cluster cluster, boolean hasResultCode) {
private AsyncBatchExecutor(EventLoop eventLoop, Cluster cluster, boolean hasResultCode) {
this.eventLoop = eventLoop;
this.cluster = cluster;
this.hasResultCode = hasResultCode;
cluster.addTran();
}

public void execute(AsyncBatchCommand[] cmds) {
public void execute(AsyncCommand[] cmds) {
this.commands = cmds;

for (AsyncBatchCommand cmd : cmds) {
for (AsyncCommand cmd : cmds) {
eventLoop.execute(cluster, cmd);
}
}

public void executeBatchRetry(AsyncBatchCommand[] cmds, AsyncBatchCommand orig, Runnable other, long deadline) {
// Create new commands array.
AsyncBatchCommand[] target = new AsyncBatchCommand[commands.length + cmds.length - 1];
AsyncCommand[] target = new AsyncCommand[commands.length + cmds.length - 1];
int count = 0;

for (AsyncBatchCommand cmd : commands) {
for (AsyncCommand cmd : commands) {
if (cmd != orig) {
target[count++] = cmd;
}
}

for (AsyncBatchCommand cmd : cmds) {
for (AsyncCommand cmd : cmds) {
target[count++] = cmd;
}
commands = target;

for (AsyncBatchCommand cmd : cmds) {
for (AsyncCommand cmd : cmds) {
eventLoop.executeBatchRetry(other, cmd, deadline);
}
}
Expand Down
Loading

0 comments on commit 70d1b80

Please sign in to comment.