From 70d1b80ea7c6f92e4359ea458dc7c3047cb4c02d Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Mon, 23 Oct 2023 14:59:49 -0700 Subject: [PATCH] CLIENT-2293 Convert a batch node command to a single record command when the batch size for that node is one. This improves performance for small batch sizes. --- .../com/aerospike/client/AerospikeClient.java | 770 ++++++++++-- .../aerospike/client/async/AsyncBatch.java | 515 +------- .../client/async/AsyncBatchExecutor.java | 289 ++++- .../client/async/AsyncBatchSingle.java | 1046 +++++++++++++++++ .../aerospike/client/cluster/Partition.java | 4 +- .../com/aerospike/client/command/Batch.java | 66 +- .../aerospike/client/command/BatchAttr.java | 13 +- .../client/command/BatchExecutor.java | 33 +- .../aerospike/client/command/BatchSingle.java | 460 ++++++++ .../com/aerospike/client/command/Command.java | 173 +++ .../client/command/IBatchCommand.java | 23 + .../aerospike/client/command/OperateArgs.java | 1 - .../aerospike/client/command/ReadCommand.java | 110 +- .../client/command/RecordParser.java | 209 ++++ .../client/proxy/AerospikeClientProxy.java | 2 +- .../aerospike/test/async/TestAsyncBatch.java | 320 ++++- .../aerospike/test/async/TestAsyncUDF.java | 47 +- .../aerospike/test/sync/basic/TestPutGet.java | 34 +- 18 files changed, 3384 insertions(+), 731 deletions(-) create mode 100644 client/src/com/aerospike/client/async/AsyncBatchSingle.java create mode 100644 client/src/com/aerospike/client/command/BatchSingle.java create mode 100644 client/src/com/aerospike/client/command/IBatchCommand.java create mode 100644 client/src/com/aerospike/client/command/RecordParser.java diff --git a/client/src/com/aerospike/client/AerospikeClient.java b/client/src/com/aerospike/client/AerospikeClient.java index 5e8f4b022..4d86d1863 100644 --- a/client/src/com/aerospike/client/AerospikeClient.java +++ b/client/src/com/aerospike/client/AerospikeClient.java @@ -29,6 +29,9 @@ import com.aerospike.client.admin.Role; import com.aerospike.client.admin.User; import com.aerospike.client.async.AsyncBatch; +import com.aerospike.client.async.AsyncBatchExecutor; +import com.aerospike.client.async.AsyncBatchSingle; +import com.aerospike.client.async.AsyncCommand; import com.aerospike.client.async.AsyncDelete; import com.aerospike.client.async.AsyncExecute; import com.aerospike.client.async.AsyncExists; @@ -49,11 +52,11 @@ import com.aerospike.client.cluster.Connection; import com.aerospike.client.cluster.Node; import com.aerospike.client.command.Batch; -import com.aerospike.client.command.Batch.BatchCommand; import com.aerospike.client.command.BatchAttr; import com.aerospike.client.command.BatchExecutor; import com.aerospike.client.command.BatchNode; import com.aerospike.client.command.BatchNodeList; +import com.aerospike.client.command.BatchSingle; import com.aerospike.client.command.BatchStatus; import com.aerospike.client.command.Buffer; import com.aerospike.client.command.Command; @@ -61,6 +64,7 @@ import com.aerospike.client.command.ExecuteCommand; import com.aerospike.client.command.Executor; import com.aerospike.client.command.ExistsCommand; +import com.aerospike.client.command.IBatchCommand; import com.aerospike.client.command.OperateArgs; import com.aerospike.client.command.OperateCommand; import com.aerospike.client.command.ReadCommand; @@ -271,7 +275,8 @@ public AerospikeClient(ClientPolicy policy, Host... hosts) // TODO: Reintroduce requirement in the next major client release. /* if (! Log.isSet()) { - throw new AerospikeException("Log.setCallback() or Log.setCallbackStandard() must be called." + System.lineSeparator() + + throw new AerospikeException( + "Log.setCallback() or Log.setCallbackStandard() must be called." + System.lineSeparator() + "See https://developer.aerospike.com/client/java/usage/logging for details."); } */ @@ -763,14 +768,21 @@ public final BatchResults delete(BatchPolicy batchPolicy, BatchDeletePolicy dele try { BatchStatus status = new BatchStatus(true); - List batchNodes = BatchNodeList.generate(cluster, batchPolicy, keys, records, attr.hasWrite, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, batchPolicy, keys, records, attr.hasWrite, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.OperateArrayCommand(cluster, batchNode, batchPolicy, keys, null, records, attr, status); + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new BatchSingle.Delete( + cluster, batchPolicy, attr, records[i], status, bn.node); + } + else { + commands[count++] = new Batch.OperateArrayCommand( + cluster, bn, batchPolicy, keys, null, records, attr, status); + } } - BatchExecutor.execute(cluster, batchPolicy, commands, status); return new BatchResults(records, status.getStatus()); } @@ -825,7 +837,30 @@ public final void delete( BatchAttr attr = new BatchAttr(); attr.setDelete(deletePolicy); - new AsyncBatch.OperateRecordArrayExecutor(eventLoop, cluster, batchPolicy, listener, keys, null, attr); + BatchRecord[] records = new BatchRecord[keys.length]; + + for (int i = 0; i < keys.length; i++) { + records[i] = new BatchRecord(keys[i], attr.hasWrite); + } + + AsyncBatchExecutor.BatchRecordArray executor = new AsyncBatchExecutor.BatchRecordArray( + eventLoop, cluster, listener, records); + List bns = BatchNodeList.generate(cluster, batchPolicy, keys, records, attr.hasWrite, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.Delete( + executor, cluster, batchPolicy, attr, records[i], bn.node); + } + else { + commands[count++] = new AsyncBatch.OperateRecordArrayCommand( + executor, bn, batchPolicy, keys, null, records, attr); + } + } + executor.execute(commands); } /** @@ -874,7 +909,25 @@ public final void delete( BatchAttr attr = new BatchAttr(); attr.setDelete(deletePolicy); - new AsyncBatch.OperateRecordSequenceExecutor(eventLoop, cluster, batchPolicy, listener, keys, null, attr); + boolean[] sent = new boolean[keys.length]; + AsyncBatchExecutor.BatchRecordSequence executor = new AsyncBatchExecutor.BatchRecordSequence( + eventLoop, cluster, listener, sent); + List bns = BatchNodeList.generate(cluster, batchPolicy, keys, null, attr.hasWrite, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.DeleteSequenceSent( + executor, cluster, batchPolicy, keys[i], attr, bn.node, listener, i); + } + else { + commands[count++] = new AsyncBatch.OperateRecordSequenceCommand( + executor, bn, batchPolicy, keys, null, sent, listener, attr); + } + } + executor.execute(commands); } /** @@ -1051,12 +1104,20 @@ public final boolean[] exists(BatchPolicy policy, Key[] keys) try { BatchStatus status = new BatchStatus(false); - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, false, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.ExistsArrayCommand(cluster, batchNode, policy, keys, existsArray, status); + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new BatchSingle.Exists( + cluster, policy, keys[i], existsArray, i, status, bn.node); + } + else { + commands[count++] = new Batch.ExistsArrayCommand( + cluster, bn, policy, keys, existsArray, status); + } } BatchExecutor.execute(cluster, policy, commands, status); return existsArray; @@ -1094,7 +1155,26 @@ public final void exists(EventLoop eventLoop, ExistsArrayListener listener, Batc if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.ExistsArrayExecutor(eventLoop, cluster, policy, keys, listener); + + boolean[] existsArray = new boolean[keys.length]; + AsyncBatchExecutor.ExistsArray executor = new AsyncBatchExecutor.ExistsArray( + eventLoop, cluster, listener, keys, existsArray); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.Exists( + executor, cluster, policy, keys[i], bn.node, existsArray, i); + } + else { + commands[count++] = new AsyncBatch.ExistsArrayCommand( + executor, bn, policy, keys, existsArray); + } + } + executor.execute(commands); } /** @@ -1125,7 +1205,25 @@ public final void exists(EventLoop eventLoop, ExistsSequenceListener listener, B if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.ExistsSequenceExecutor(eventLoop, cluster, policy, keys, listener); + + AsyncBatchExecutor.ExistsSequence executor = new AsyncBatchExecutor.ExistsSequence( + eventLoop, cluster, listener); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.ExistsSequence( + executor, cluster, policy, keys[i], bn.node, listener); + } + else { + commands[count++] = new AsyncBatch.ExistsSequenceCommand( + executor, bn, policy, keys, listener); + } + } + executor.execute(commands); } //------------------------------------------------------- @@ -1299,12 +1397,18 @@ public final boolean get(BatchPolicy policy, List records) } BatchStatus status = new BatchStatus(true); - List batchNodes = BatchNodeList.generate(cluster, policy, records, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, policy, records, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.ReadListCommand(cluster, batchNode, policy, records, status); + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new BatchSingle.ReadRecord(cluster, policy, records.get(i), status, bn.node); + } + else { + commands[count++] = new Batch.ReadListCommand(cluster, bn, policy, records, status); + } } BatchExecutor.execute(cluster, policy, commands, status); return status.getStatus(); @@ -1341,7 +1445,22 @@ public final void get(EventLoop eventLoop, BatchListListener listener, BatchPoli if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.ReadListExecutor(eventLoop, cluster, policy, listener, records); + + AsyncBatchExecutor.ReadList executor = new AsyncBatchExecutor.ReadList(eventLoop, cluster, listener, records); + List bns = BatchNodeList.generate(cluster, policy, records, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.Read(executor, cluster, policy, records.get(i), bn.node); + } + else { + commands[count++] = new AsyncBatch.ReadListCommand(executor, bn, policy, records); + } + } + executor.execute(commands); } /** @@ -1375,7 +1494,24 @@ public final void get(EventLoop eventLoop, BatchSequenceListener listener, Batch if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.ReadSequenceExecutor(eventLoop, cluster, policy, listener, records); + + AsyncBatchExecutor.ReadSequence executor = new AsyncBatchExecutor.ReadSequence(eventLoop, cluster, listener); + List bns = BatchNodeList.generate(cluster, policy, records, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.ReadGetSequence( + executor, cluster, policy, records.get(i), bn.node, listener); + } + else { + commands[count++] = new AsyncBatch.ReadSequenceCommand( + executor, bn, policy, listener, records); + } + } + executor.execute(commands); } /** @@ -1402,12 +1538,21 @@ public final Record[] get(BatchPolicy policy, Key[] keys) try { BatchStatus status = new BatchStatus(false); - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, false, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.GetArrayCommand(cluster, batchNode, policy, keys, null, null, records, Command.INFO1_READ | Command.INFO1_GET_ALL, false, status); + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new BatchSingle.Read( + cluster, policy, keys[i], null, records, i, status, bn.node, false); + } + else { + commands[count++] = new Batch.GetArrayCommand( + cluster, bn, policy, keys, null, null, records, Command.INFO1_READ | Command.INFO1_GET_ALL, + false, status); + } } BatchExecutor.execute(cluster, policy, commands, status); return records; @@ -1446,7 +1591,26 @@ public final void get(EventLoop eventLoop, RecordArrayListener listener, BatchPo if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.GetArrayExecutor(eventLoop, cluster, policy, listener, keys, null, null, Command.INFO1_READ | Command.INFO1_GET_ALL, false); + + Record[] records = new Record[keys.length]; + AsyncBatchExecutor.GetArray executor = new AsyncBatchExecutor.GetArray( + eventLoop, cluster, listener, keys, records); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.Get( + executor, cluster, policy, keys[i], null, records, bn.node, i, false); + } + else { + commands[count++] = new AsyncBatch.GetArrayCommand( + executor, bn, policy, keys, null, null, records, Command.INFO1_READ | Command.INFO1_GET_ALL, false); + } + } + executor.execute(commands); } /** @@ -1478,7 +1642,25 @@ public final void get(EventLoop eventLoop, RecordSequenceListener listener, Batc if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.GetSequenceExecutor(eventLoop, cluster, policy, listener, keys, null, null, Command.INFO1_READ | Command.INFO1_GET_ALL, false); + + AsyncBatchExecutor.GetSequence executor = new AsyncBatchExecutor.GetSequence(eventLoop, cluster, listener); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.GetSequence( + executor, cluster, policy, listener, keys[i], null, bn.node, false); + } + else { + commands[count++] = new AsyncBatch.GetSequenceCommand( + executor, bn, policy, keys, null, null, listener, Command.INFO1_READ | Command.INFO1_GET_ALL, + false); + } + } + executor.execute(commands); } /** @@ -1506,12 +1688,20 @@ public final Record[] get(BatchPolicy policy, Key[] keys, String... binNames) try { BatchStatus status = new BatchStatus(false); - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, false, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.GetArrayCommand(cluster, batchNode, policy, keys, binNames, null, records, Command.INFO1_READ, false, status); + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new BatchSingle.Read( + cluster, policy, keys[i], binNames, records, i, status, bn.node, false); + } + else { + commands[count++] = new Batch.GetArrayCommand( + cluster, bn, policy, keys, binNames, null, records, Command.INFO1_READ, false, status); + } } BatchExecutor.execute(cluster, policy, commands, status); return records; @@ -1551,7 +1741,26 @@ public final void get(EventLoop eventLoop, RecordArrayListener listener, BatchPo if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.GetArrayExecutor(eventLoop, cluster, policy, listener, keys, binNames, null, Command.INFO1_READ, false); + + Record[] records = new Record[keys.length]; + AsyncBatchExecutor.GetArray executor = new AsyncBatchExecutor.GetArray( + eventLoop, cluster, listener, keys, records); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.Get( + executor, cluster, policy, keys[i], binNames, records, bn.node, i, false); + } + else { + commands[count++] = new AsyncBatch.GetArrayCommand( + executor, bn, policy, keys, binNames, null, records, Command.INFO1_READ, false); + } + } + executor.execute(commands); } /** @@ -1584,7 +1793,24 @@ public final void get(EventLoop eventLoop, RecordSequenceListener listener, Batc if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.GetSequenceExecutor(eventLoop, cluster, policy, listener, keys, binNames, null, Command.INFO1_READ, false); + + AsyncBatchExecutor.GetSequence executor = new AsyncBatchExecutor.GetSequence(eventLoop, cluster, listener); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.GetSequence( + executor, cluster, policy, listener, keys[i], binNames, bn.node, false); + } + else { + commands[count++] = new AsyncBatch.GetSequenceCommand( + executor, bn, policy, keys, binNames, null, listener, Command.INFO1_READ, false); + } + } + executor.execute(commands); } /** @@ -1612,12 +1838,20 @@ public final Record[] get(BatchPolicy policy, Key[] keys, Operation... ops) try { BatchStatus status = new BatchStatus(false); - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, false, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.GetArrayCommand(cluster, batchNode, policy, keys, null, ops, records, Command.INFO1_READ, true, status); + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new BatchSingle.OperateRead( + cluster, policy, keys[i], ops, records, i, status, bn.node); + } + else { + commands[count++] = new Batch.GetArrayCommand( + cluster, bn, policy, keys, null, ops, records, Command.INFO1_READ, true, status); + } } BatchExecutor.execute(cluster, policy, commands, status); return records; @@ -1657,7 +1891,26 @@ public final void get(EventLoop eventLoop, RecordArrayListener listener, BatchPo if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.GetArrayExecutor(eventLoop, cluster, policy, listener, keys, null, ops, Command.INFO1_READ, true); + + Record[] records = new Record[keys.length]; + AsyncBatchExecutor.GetArray executor = new AsyncBatchExecutor.GetArray( + eventLoop, cluster, listener, keys, records); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.OperateGet( + executor, cluster, policy, keys[i], ops, records, bn.node, i); + } + else { + commands[count++] = new AsyncBatch.GetArrayCommand( + executor, bn, policy, keys, null, ops, records, Command.INFO1_READ, true); + } + } + executor.execute(commands); } /** @@ -1690,7 +1943,24 @@ public final void get(EventLoop eventLoop, RecordSequenceListener listener, Batc if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.GetSequenceExecutor(eventLoop, cluster, policy, listener, keys, null, ops, Command.INFO1_READ, true); + + AsyncBatchExecutor.GetSequence executor = new AsyncBatchExecutor.GetSequence(eventLoop, cluster, listener); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.OperateGetSequence( + executor, cluster, policy, listener, keys[i], ops, bn.node); + } + else { + commands[count++] = new AsyncBatch.GetSequenceCommand( + executor, bn, policy, keys, null, ops, listener, Command.INFO1_READ, true); + } + } + executor.execute(commands); } /** @@ -1717,12 +1987,21 @@ public final Record[] getHeader(BatchPolicy policy, Key[] keys) try { BatchStatus status = new BatchStatus(false); - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, false, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.GetArrayCommand(cluster, batchNode, policy, keys, null, null, records, Command.INFO1_READ | Command.INFO1_NOBINDATA, false, status); + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new BatchSingle.ReadHeader( + cluster, policy, keys[i], records, i, status, bn.node); + } + else { + commands[count++] = new Batch.GetArrayCommand( + cluster, bn, policy, keys, null, null, records, Command.INFO1_READ | Command.INFO1_NOBINDATA, + false, status); + } } BatchExecutor.execute(cluster, policy, commands, status); return records; @@ -1761,7 +2040,27 @@ public final void getHeader(EventLoop eventLoop, RecordArrayListener listener, B if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.GetArrayExecutor(eventLoop, cluster, policy, listener, keys, null, null, Command.INFO1_READ | Command.INFO1_NOBINDATA, false); + + Record[] records = new Record[keys.length]; + AsyncBatchExecutor.GetArray executor = new AsyncBatchExecutor.GetArray( + eventLoop, cluster, listener, keys, records); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.ReadHeader( + executor, cluster, policy, keys[i], records, bn.node, i); + } + else { + commands[count++] = new AsyncBatch.GetArrayCommand( + executor, bn, policy, keys, null, null, records, Command.INFO1_READ | Command.INFO1_NOBINDATA, + false); + } + } + executor.execute(commands); } /** @@ -1793,7 +2092,25 @@ public final void getHeader(EventLoop eventLoop, RecordSequenceListener listener if (policy == null) { policy = batchPolicyDefault; } - new AsyncBatch.GetSequenceExecutor(eventLoop, cluster, policy, listener, keys, null, null, Command.INFO1_READ | Command.INFO1_NOBINDATA, false); + + AsyncBatchExecutor.GetSequence executor = new AsyncBatchExecutor.GetSequence(eventLoop, cluster, listener); + List bns = BatchNodeList.generate(cluster, policy, keys, null, false, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.ReadHeaderSequence( + executor, cluster, policy, keys[i], bn.node, listener); + } + else { + commands[count++] = new AsyncBatch.GetSequenceCommand( + executor, bn, policy, keys, null, null, listener, Command.INFO1_READ | Command.INFO1_NOBINDATA, + false); + } + } + executor.execute(commands); } //------------------------------------------------------- @@ -1820,7 +2137,7 @@ public final void getHeader(EventLoop eventLoop, RecordSequenceListener listener */ public final Record operate(WritePolicy policy, Key key, Operation... operations) throws AerospikeException { - OperateArgs args = new OperateArgs(policy, writePolicyDefault, operatePolicyReadDefault, key, operations); + OperateArgs args = new OperateArgs(policy, writePolicyDefault, operatePolicyReadDefault, operations); OperateCommand command = new OperateCommand(cluster, key, args); command.execute(); return command.getRecord(); @@ -1855,7 +2172,7 @@ public final void operate(EventLoop eventLoop, RecordListener listener, WritePol eventLoop = cluster.eventLoops.next(); } - OperateArgs args = new OperateArgs(policy, writePolicyDefault, operatePolicyReadDefault, key, operations); + OperateArgs args = new OperateArgs(policy, writePolicyDefault, operatePolicyReadDefault, operations); AsyncOperate command = new AsyncOperate(cluster, listener, key, args); eventLoop.execute(cluster, command); } @@ -1890,12 +2207,77 @@ public final boolean operate(BatchPolicy policy, List records) } BatchStatus status = new BatchStatus(true); - List batchNodes = BatchNodeList.generate(cluster, policy, records, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, policy, records, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.OperateListCommand(cluster, batchNode, policy, records, status); + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + BatchRecord record = records.get(i); + + switch (record.getType()) { + case BATCH_READ: { + BatchRead br = (BatchRead)record; + commands[count++] = new BatchSingle.ReadRecord(cluster, policy, br, status, bn.node); + break; + } + + case BATCH_WRITE: { + BatchWrite bw = (BatchWrite)record; + BatchAttr attr = new BatchAttr(); + + if (bw.policy != null) { + attr.setWrite(bw.policy); + } + else { + attr.setWrite(policy); + } + attr.adjustWrite(bw.ops); + attr.setOpSize(bw.ops); + commands[count++] = new BatchSingle.OperateBatchRecord( + cluster, policy, bw.ops, attr, record, status, bn.node); + break; + } + + case BATCH_UDF: { + BatchUDF bu = (BatchUDF)record; + BatchAttr attr = new BatchAttr(); + + if (bu.policy != null) { + attr.setUDF(bu.policy); + } + else { + attr.setUDF(policy); + } + commands[count++] = new BatchSingle.UDF( + cluster, policy, bu.packageName, bu.functionName, bu.functionArgs, attr, record, status, + bn.node); + break; + } + + case BATCH_DELETE: { + BatchDelete bd = (BatchDelete)record; + BatchAttr attr = new BatchAttr(); + + if (bd.policy != null) { + attr.setDelete(bd.policy); + } + else { + attr.setDelete(policy); + } + commands[count++] = new BatchSingle.Delete(cluster, policy, attr, record, status, bn.node); + break; + } + + default: { + throw new AerospikeException("Invalid batch type: " + record.getType()); + } + } + } + else { + commands[count++] = new Batch.OperateListCommand(cluster, bn, policy, records, status); + } } BatchExecutor.execute(cluster, policy, commands, status); return status.getStatus(); @@ -1939,7 +2321,80 @@ public final void operate( if (policy == null) { policy = batchParentPolicyWriteDefault; } - new AsyncBatch.OperateListExecutor(eventLoop, cluster, policy, listener, records); + + AsyncBatchExecutor.OperateList executor = new AsyncBatchExecutor.OperateList( + eventLoop, cluster, listener, records); + List bns = BatchNodeList.generate(cluster, policy, records, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + BatchRecord record = records.get(i); + + switch (record.getType()) { + case BATCH_READ: { + BatchRead br = (BatchRead)record; + commands[count++] = new AsyncBatchSingle.Read(executor, cluster, policy, br, bn.node); + break; + } + + case BATCH_WRITE: { + BatchWrite bw = (BatchWrite)record; + BatchAttr attr = new BatchAttr(); + + if (bw.policy != null) { + attr.setWrite(bw.policy); + } + else { + attr.setWrite(policy); + } + attr.adjustWrite(bw.ops); + attr.setOpSize(bw.ops); + commands[count++] = new AsyncBatchSingle.Write(executor, cluster, policy, attr, bw, bn.node); + break; + } + + case BATCH_UDF: { + BatchUDF bu = (BatchUDF)record; + BatchAttr attr = new BatchAttr(); + + if (bu.policy != null) { + attr.setUDF(bu.policy); + } + else { + attr.setUDF(policy); + } + commands[count++] = new AsyncBatchSingle.UDF(executor, cluster, policy, attr, bu, bn.node); + break; + } + + case BATCH_DELETE: { + BatchDelete bd = (BatchDelete)record; + BatchAttr attr = new BatchAttr(); + + if (bd.policy != null) { + attr.setDelete(bd.policy); + } + else { + attr.setDelete(policy); + } + commands[count++] = new AsyncBatchSingle.Delete(executor, cluster, policy, attr, record, + bn.node); + break; + } + + default: { + throw new AerospikeException("Invalid batch type: " + record.getType()); + } + } + } + else { + commands[count++] = new AsyncBatch.OperateListCommand(executor, bn, policy, records); + } + } + executor.execute(commands); } /** @@ -1980,7 +2435,83 @@ public final void operate( if (policy == null) { policy = batchParentPolicyWriteDefault; } - new AsyncBatch.OperateSequenceExecutor(eventLoop, cluster, policy, listener, records); + + AsyncBatchExecutor.OperateSequence executor = new AsyncBatchExecutor.OperateSequence( + eventLoop, cluster, listener); + List bns = BatchNodeList.generate(cluster, policy, records, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + BatchRecord record = records.get(i); + + switch (record.getType()) { + case BATCH_READ: { + BatchRead br = (BatchRead)record; + commands[count++] = new AsyncBatchSingle.ReadSequence( + executor, cluster, policy, br, bn.node, listener, i); + break; + } + + case BATCH_WRITE: { + BatchWrite bw = (BatchWrite)record; + BatchAttr attr = new BatchAttr(); + + if (bw.policy != null) { + attr.setWrite(bw.policy); + } + else { + attr.setWrite(policy); + } + attr.adjustWrite(bw.ops); + attr.setOpSize(bw.ops); + commands[count++] = new AsyncBatchSingle.WriteSequence( + executor, cluster, policy, attr, bw, bn.node, listener, i); + break; + } + + case BATCH_UDF: { + BatchUDF bu = (BatchUDF)record; + BatchAttr attr = new BatchAttr(); + + if (bu.policy != null) { + attr.setUDF(bu.policy); + } + else { + attr.setUDF(policy); + } + commands[count++] = new AsyncBatchSingle.UDFSequence( + executor, cluster, policy, attr, bu, bn.node, listener, i); + break; + } + + case BATCH_DELETE: { + BatchDelete bd = (BatchDelete)record; + BatchAttr attr = new BatchAttr(); + + if (bd.policy != null) { + attr.setDelete(bd.policy); + } + else { + attr.setDelete(policy); + } + commands[count++] = new AsyncBatchSingle.DeleteSequence( + executor, cluster, policy, attr, bd, bn.node, listener, i); + break; + } + + default: { + throw new AerospikeException("Invalid batch type: " + record.getType()); + } + } + } + else { + commands[count++] = new AsyncBatch.OperateSequenceCommand(executor, bn, policy, listener, records); + } + } + executor.execute(commands); } /** @@ -2025,14 +2556,27 @@ public final BatchResults operate( try { BatchStatus status = new BatchStatus(true); - List batchNodes = BatchNodeList.generate(cluster, batchPolicy, keys, records, attr.hasWrite, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, batchPolicy, keys, records, attr.hasWrite, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.OperateArrayCommand(cluster, batchNode, batchPolicy, keys, ops, records, attr, status); + boolean opSizeSet = false; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + if (! opSizeSet) { + attr.setOpSize(ops); + opSizeSet = true; + } + + int i = bn.offsets[0]; + commands[count++] = new BatchSingle.OperateBatchRecord( + cluster, batchPolicy, ops, attr, records[i], status, bn.node); + } + else { + commands[count++] = new Batch.OperateArrayCommand( + cluster, bn, batchPolicy, keys, ops, records, attr, status); + } } - BatchExecutor.execute(cluster, batchPolicy, commands, status); return new BatchResults(records, status.getStatus()); } @@ -2089,7 +2633,30 @@ public final void operate( } BatchAttr attr = new BatchAttr(batchPolicy, writePolicy, ops); - new AsyncBatch.OperateRecordArrayExecutor(eventLoop, cluster, batchPolicy, listener, keys, ops, attr); + BatchRecord[] records = new BatchRecord[keys.length]; + + for (int i = 0; i < keys.length; i++) { + records[i] = new BatchRecord(keys[i], attr.hasWrite); + } + + AsyncBatchExecutor.BatchRecordArray executor = new AsyncBatchExecutor.BatchRecordArray( + eventLoop, cluster, listener, records); + List bns = BatchNodeList.generate(cluster, batchPolicy, keys, records, attr.hasWrite, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.Operate( + executor, cluster, batchPolicy, attr, records[i], ops, bn.node); + } + else { + commands[count++] = new AsyncBatch.OperateRecordArrayCommand( + executor, bn, batchPolicy, keys, ops, records, attr); + } + } + executor.execute(commands); } /** @@ -2141,7 +2708,25 @@ public final void operate( } BatchAttr attr = new BatchAttr(batchPolicy, writePolicy, ops); - new AsyncBatch.OperateRecordSequenceExecutor(eventLoop, cluster, batchPolicy, listener, keys, ops, attr); + boolean[] sent = new boolean[keys.length]; + AsyncBatchExecutor.BatchRecordSequence executor = new AsyncBatchExecutor.BatchRecordSequence( + eventLoop, cluster, listener, sent); + List bns = BatchNodeList.generate(cluster, batchPolicy, keys, null, attr.hasWrite, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.OperateSequence( + executor, cluster, batchPolicy, keys[i], attr, ops, bn.node, listener, i); + } + else { + commands[count++] = new AsyncBatch.OperateRecordSequenceCommand( + executor, bn, batchPolicy, keys, ops, sent, listener, attr); + } + } + executor.execute(commands); } //------------------------------------------------------- @@ -2561,14 +3146,22 @@ public final BatchResults execute( try { BatchStatus status = new BatchStatus(true); - List batchNodes = BatchNodeList.generate(cluster, batchPolicy, keys, records, attr.hasWrite, status); - BatchCommand[] commands = new BatchCommand[batchNodes.size()]; + List bns = BatchNodeList.generate(cluster, batchPolicy, keys, records, attr.hasWrite, status); + IBatchCommand[] commands = new IBatchCommand[bns.size()]; int count = 0; - for (BatchNode batchNode : batchNodes) { - commands[count++] = new Batch.UDFCommand(cluster, batchNode, batchPolicy, keys, packageName, functionName, argBytes, records, attr, status); + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new BatchSingle.UDF( + cluster, batchPolicy, packageName, functionName, functionArgs, attr, records[i], status, + bn.node); + } + else { + commands[count++] = new Batch.UDFCommand( + cluster, bn, batchPolicy, keys, packageName, functionName, argBytes, records, attr, status); + } } - BatchExecutor.execute(cluster, batchPolicy, commands, status); return new BatchResults(records, status.getStatus()); } @@ -2632,7 +3225,30 @@ public final void execute( BatchAttr attr = new BatchAttr(); attr.setUDF(udfPolicy); - new AsyncBatch.UDFArrayExecutor(eventLoop, cluster, batchPolicy, listener, keys, packageName, functionName, argBytes, attr); + BatchRecord[] records = new BatchRecord[keys.length]; + + for (int i = 0; i < keys.length; i++) { + records[i] = new BatchRecord(keys[i], attr.hasWrite); + } + + AsyncBatchExecutor.BatchRecordArray executor = new AsyncBatchExecutor.BatchRecordArray( + eventLoop, cluster, listener, records); + List bns = BatchNodeList.generate(cluster, batchPolicy, keys, records, attr.hasWrite, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.UDFCommand( + executor, cluster, batchPolicy, attr, records[i], packageName, functionName, argBytes, bn.node); + } + else { + commands[count++] = new AsyncBatch.UDFArrayCommand( + executor, bn, batchPolicy, keys, packageName, functionName, argBytes, records, attr); + } + } + executor.execute(commands); } /** @@ -2690,7 +3306,25 @@ public final void execute( BatchAttr attr = new BatchAttr(); attr.setUDF(udfPolicy); - new AsyncBatch.UDFSequenceExecutor(eventLoop, cluster, batchPolicy, listener, keys, packageName, functionName, argBytes, attr); + boolean[] sent = new boolean[keys.length]; + AsyncBatchExecutor.BatchRecordSequence executor = new AsyncBatchExecutor.BatchRecordSequence( + eventLoop, cluster, listener, sent); + List bns = BatchNodeList.generate(cluster, batchPolicy, keys, null, attr.hasWrite, executor); + AsyncCommand[] commands = new AsyncCommand[bns.size()]; + int count = 0; + + for (BatchNode bn : bns) { + if (bn.offsetsSize == 1) { + int i = bn.offsets[0]; + commands[count++] = new AsyncBatchSingle.UDFSequenceCommand( + executor, cluster, batchPolicy, keys[i], attr, packageName, functionName, argBytes, bn.node, listener, i); + } + else { + commands[count++] = new AsyncBatch.UDFSequenceCommand( + executor, bn, batchPolicy, keys, packageName, functionName, argBytes, sent, listener, attr); + } + } + executor.execute(commands); } //---------------------------------------------------------- diff --git a/client/src/com/aerospike/client/async/AsyncBatch.java b/client/src/com/aerospike/client/async/AsyncBatch.java index e4ae780d5..734918c17 100644 --- a/client/src/com/aerospike/client/async/AsyncBatch.java +++ b/client/src/com/aerospike/client/async/AsyncBatch.java @@ -26,19 +26,13 @@ import com.aerospike.client.Operation; import com.aerospike.client.Record; import com.aerospike.client.ResultCode; -import com.aerospike.client.cluster.Cluster; import com.aerospike.client.command.BatchAttr; import com.aerospike.client.command.BatchNode; import com.aerospike.client.command.BatchNodeList; import com.aerospike.client.command.Command; -import com.aerospike.client.listener.BatchListListener; -import com.aerospike.client.listener.BatchOperateListListener; -import com.aerospike.client.listener.BatchRecordArrayListener; import com.aerospike.client.listener.BatchRecordSequenceListener; import com.aerospike.client.listener.BatchSequenceListener; -import com.aerospike.client.listener.ExistsArrayListener; import com.aerospike.client.listener.ExistsSequenceListener; -import com.aerospike.client.listener.RecordArrayListener; import com.aerospike.client.listener.RecordSequenceListener; import com.aerospike.client.metrics.LatencyType; import com.aerospike.client.policy.BatchPolicy; @@ -51,43 +45,7 @@ public final class AsyncBatch { // ReadList //------------------------------------------------------- - public static final class ReadListExecutor extends AsyncBatchExecutor { - private final BatchListListener listener; - private final List records; - - public ReadListExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - BatchListListener listener, - List records - ) { - super(eventLoop, cluster, true); - this.listener = listener; - this.records = records; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, records, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new ReadListCommand(this, batchNode, policy, records); - } - // Dispatch commands to nodes. - execute(tasks); - } - - protected void onSuccess() { - listener.onSuccess(records); - } - - protected void onFailure(AerospikeException ae) { - listener.onFailure(ae); - } - } - - private static final class ReadListCommand extends AsyncBatchCommand { + public static final class ReadListCommand extends AsyncBatchCommand { private final List records; public ReadListCommand( @@ -139,41 +97,7 @@ protected List generateBatchNodes() { // ReadSequence //------------------------------------------------------- - public static final class ReadSequenceExecutor extends AsyncBatchExecutor { - private final BatchSequenceListener listener; - - public ReadSequenceExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - BatchSequenceListener listener, - List records - ) { - super(eventLoop, cluster, true); - this.listener = listener; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, records, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new ReadSequenceCommand(this, batchNode, policy, listener, records); - } - // Dispatch commands to nodes. - execute(tasks); - } - - protected void onSuccess() { - listener.onSuccess(); - } - - protected void onFailure(AerospikeException ae) { - listener.onFailure(ae); - } - } - - private static final class ReadSequenceCommand extends AsyncBatchCommand { + public static final class ReadSequenceCommand extends AsyncBatchCommand { private final BatchSequenceListener listener; private final List records; @@ -229,49 +153,7 @@ protected List generateBatchNodes() { // GetArray //------------------------------------------------------- - public static final class GetArrayExecutor extends AsyncBatchExecutor { - private final RecordArrayListener listener; - private final Key[] keys; - private final Record[] recordArray; - - public GetArrayExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - RecordArrayListener listener, - Key[] keys, - String[] binNames, - Operation[] ops, - int readAttr, - boolean isOperation - ) { - super(eventLoop, cluster, false); - this.listener = listener; - this.keys = keys; - this.recordArray = new Record[keys.length]; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, false, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new GetArrayCommand(this, batchNode, policy, keys, binNames, ops, recordArray, readAttr, isOperation); - } - // Dispatch commands to nodes. - execute(tasks); - } - - protected void onSuccess() { - listener.onSuccess(keys, recordArray); - } - - protected void onFailure(AerospikeException ae) { - listener.onFailure(new AerospikeException.BatchRecords(recordArray, ae)); - } - } - - private static final class GetArrayCommand extends AsyncBatchCommand { + public static final class GetArrayCommand extends AsyncBatchCommand { private final Key[] keys; private final String[] binNames; private final Operation[] ops; @@ -332,47 +214,7 @@ protected List generateBatchNodes() { // GetSequence //------------------------------------------------------- - public static final class GetSequenceExecutor extends AsyncBatchExecutor { - private final RecordSequenceListener listener; - - public GetSequenceExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - RecordSequenceListener listener, - Key[] keys, - String[] binNames, - Operation[] ops, - int readAttr, - boolean isOperation - ) { - super(eventLoop, cluster, false); - this.listener = listener; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, false, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new GetSequenceCommand(this, batchNode, policy, keys, binNames, ops, listener, readAttr, isOperation); - } - // Dispatch commands to nodes. - execute(tasks); - } - - @Override - protected void onSuccess() { - listener.onSuccess(); - } - - @Override - protected void onFailure(AerospikeException ae) { - listener.onFailure(ae); - } - } - - private static final class GetSequenceCommand extends AsyncBatchCommand { + public static final class GetSequenceCommand extends AsyncBatchCommand { private final Key[] keys; private final String[] binNames; private final Operation[] ops; @@ -439,45 +281,7 @@ protected List generateBatchNodes() { // ExistsArray //------------------------------------------------------- - public static final class ExistsArrayExecutor extends AsyncBatchExecutor { - private final ExistsArrayListener listener; - private final Key[] keys; - private final boolean[] existsArray; - - public ExistsArrayExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - Key[] keys, - ExistsArrayListener listener - ) { - super(eventLoop, cluster, false); - this.listener = listener; - this.keys = keys; - this.existsArray = new boolean[keys.length]; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, false, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new ExistsArrayCommand(this, batchNode, policy, keys, existsArray); - } - // Dispatch commands to nodes. - execute(tasks); - } - - protected void onSuccess() { - listener.onSuccess(keys, existsArray); - } - - protected void onFailure(AerospikeException ae) { - listener.onFailure(new AerospikeException.BatchExists(existsArray, ae)); - } - } - - private static final class ExistsArrayCommand extends AsyncBatchCommand { + public static final class ExistsArrayCommand extends AsyncBatchCommand { private final Key[] keys; private final boolean[] existsArray; @@ -530,41 +334,7 @@ protected List generateBatchNodes() { // ExistsSequence //------------------------------------------------------- - public static final class ExistsSequenceExecutor extends AsyncBatchExecutor { - private final ExistsSequenceListener listener; - - public ExistsSequenceExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - Key[] keys, - ExistsSequenceListener listener - ) { - super(eventLoop, cluster, false); - this.listener = listener; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, false, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new ExistsSequenceCommand(this, batchNode, policy, keys, listener); - } - // Dispatch commands to nodes. - execute(tasks); - } - - protected void onSuccess() { - listener.onSuccess(); - } - - protected void onFailure(AerospikeException ae) { - listener.onFailure(ae); - } - } - - private static final class ExistsSequenceCommand extends AsyncBatchCommand { + public static final class ExistsSequenceCommand extends AsyncBatchCommand { private final Key[] keys; private final ExistsSequenceListener listener; @@ -618,43 +388,7 @@ protected List generateBatchNodes() { // OperateList //------------------------------------------------------- - public static final class OperateListExecutor extends AsyncBatchExecutor { - private final BatchOperateListListener listener; - private final List records; - - public OperateListExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - BatchOperateListListener listener, - List records - ) { - super(eventLoop, cluster, true); - this.listener = listener; - this.records = records; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, records, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new OperateListCommand(this, batchNode, policy, records); - } - // Dispatch commands to nodes. - execute(tasks); - } - - protected void onSuccess() { - listener.onSuccess(records, getStatus()); - } - - protected void onFailure(AerospikeException ae) { - listener.onFailure(ae); - } - } - - private static final class OperateListCommand extends AsyncBatchCommand { + public static final class OperateListCommand extends AsyncBatchCommand { private final List records; public OperateListCommand( @@ -738,41 +472,7 @@ protected List generateBatchNodes() { // OperateSequence //------------------------------------------------------- - public static final class OperateSequenceExecutor extends AsyncBatchExecutor { - private final BatchRecordSequenceListener listener; - - public OperateSequenceExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - BatchRecordSequenceListener listener, - List records - ) { - super(eventLoop, cluster, true); - this.listener = listener; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, records, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new OperateSequenceCommand(this, batchNode, policy, listener, records); - } - // Dispatch commands to nodes. - execute(tasks); - } - - protected void onSuccess() { - listener.onSuccess(); - } - - protected void onFailure(AerospikeException ae) { - listener.onFailure(ae); - } - } - - private static final class OperateSequenceCommand extends AsyncBatchCommand { + public static final class OperateSequenceCommand extends AsyncBatchCommand { private final BatchRecordSequenceListener listener; private final List records; @@ -861,49 +561,7 @@ protected List generateBatchNodes() { // OperateRecordArray //------------------------------------------------------- - public static final class OperateRecordArrayExecutor extends AsyncBatchExecutor { - private final BatchRecordArrayListener listener; - private final BatchRecord[] records; - - public OperateRecordArrayExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - BatchRecordArrayListener listener, - Key[] keys, - Operation[] ops, - BatchAttr attr - ) { - super(eventLoop, cluster, true); - this.listener = listener; - this.records = new BatchRecord[keys.length]; - - for (int i = 0; i < keys.length; i++) { - this.records[i] = new BatchRecord(keys[i], attr.hasWrite); - } - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, keys, records, attr.hasWrite, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new OperateRecordArrayCommand(this, batchNode, policy, keys, ops, records, attr); - } - // Dispatch commands to nodes. - execute(tasks); - } - - protected void onSuccess() { - listener.onSuccess(records, getStatus()); - } - - protected void onFailure(AerospikeException ae) { - listener.onFailure(records, ae); - } - } - - private static final class OperateRecordArrayCommand extends AsyncBatchCommand { + public static final class OperateRecordArrayCommand extends AsyncBatchCommand { private final Key[] keys; private final Operation[] ops; private final BatchRecord[] records; @@ -980,54 +638,7 @@ protected List generateBatchNodes() { // OperateRecordSequence //------------------------------------------------------- - public static final class OperateRecordSequenceExecutor extends AsyncBatchExecutor { - private final BatchRecordSequenceListener listener; - private final boolean[] sent; - - public OperateRecordSequenceExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - BatchRecordSequenceListener listener, - Key[] keys, - Operation[] ops, - BatchAttr attr - ) { - super(eventLoop, cluster, true); - this.listener = listener; - this.sent = new boolean[keys.length]; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, attr.hasWrite, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new OperateRecordSequenceCommand(this, batchNode, policy, keys, ops, sent, listener, attr); - } - // Dispatch commands to nodes. - execute(tasks); - } - - @Override - public void batchKeyError(Key key, int index, AerospikeException ae, boolean inDoubt, boolean hasWrite) { - BatchRecord record = new BatchRecord(key, null, ae.getResultCode(), inDoubt, hasWrite); - sent[index] = true; - AsyncBatch.onRecord(listener, record, index); - } - - @Override - protected void onSuccess() { - listener.onSuccess(); - } - - @Override - protected void onFailure(AerospikeException ae) { - listener.onFailure(ae); - } - } - - private static final class OperateRecordSequenceCommand extends AsyncBatchCommand { + public static final class OperateRecordSequenceCommand extends AsyncBatchCommand { private final Key[] keys; private final Operation[] ops; private final boolean[] sent; @@ -1107,50 +718,6 @@ protected List generateBatchNodes() { // UDFArray //------------------------------------------------------- - public static final class UDFArrayExecutor extends AsyncBatchExecutor { - private final BatchRecordArrayListener listener; - private final BatchRecord[] recordArray; - - public UDFArrayExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - BatchRecordArrayListener listener, - Key[] keys, - String packageName, - String functionName, - byte[] argBytes, - BatchAttr attr - ) { - super(eventLoop, cluster, true); - this.listener = listener; - this.recordArray = new BatchRecord[keys.length]; - - for (int i = 0; i < keys.length; i++) { - this.recordArray[i] = new BatchRecord(keys[i], attr.hasWrite); - } - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, keys, recordArray, attr.hasWrite, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new UDFArrayCommand(this, batchNode, policy, keys, packageName, functionName, argBytes, recordArray, attr); - } - // Dispatch commands to nodes. - execute(tasks); - } - - protected void onSuccess() { - listener.onSuccess(recordArray, getStatus()); - } - - protected void onFailure(AerospikeException ae) { - listener.onFailure(recordArray, ae); - } - } - public static final class UDFArrayCommand extends AsyncBatchCommand { private final Key[] keys; private final String packageName; @@ -1248,56 +815,7 @@ protected List generateBatchNodes() { // UDFSequence //------------------------------------------------------- - public static final class UDFSequenceExecutor extends AsyncBatchExecutor { - private final BatchRecordSequenceListener listener; - private final boolean[] sent; - - public UDFSequenceExecutor( - EventLoop eventLoop, - Cluster cluster, - BatchPolicy policy, - BatchRecordSequenceListener listener, - Key[] keys, - String packageName, - String functionName, - byte[] argBytes, - BatchAttr attr - ) { - super(eventLoop, cluster, true); - this.listener = listener; - this.sent = new boolean[keys.length]; - - // Create commands. - List batchNodes = BatchNodeList.generate(cluster, policy, keys, null, attr.hasWrite, this); - AsyncBatchCommand[] tasks = new AsyncBatchCommand[batchNodes.size()]; - int count = 0; - - for (BatchNode batchNode : batchNodes) { - tasks[count++] = new UDFSequenceCommand(this, batchNode, policy, keys, packageName, functionName, argBytes, sent, listener, attr); - } - // Dispatch commands to nodes. - execute(tasks); - } - - @Override - public void batchKeyError(Key key, int index, AerospikeException ae, boolean inDoubt, boolean hasWrite) { - BatchRecord record = new BatchRecord(key, null, ae.getResultCode(), inDoubt, hasWrite); - sent[index] = true; - AsyncBatch.onRecord(listener, record, index); - } - - @Override - protected void onSuccess() { - listener.onSuccess(); - } - - @Override - protected void onFailure(AerospikeException ae) { - listener.onFailure(ae); - } - } - - private static final class UDFSequenceCommand extends AsyncBatchCommand { + public static final class UDFSequenceCommand extends AsyncBatchCommand { private final Key[] keys; private final String packageName; private final String functionName; @@ -1473,7 +991,16 @@ protected void setInDoubt(boolean inDoubt) { abstract List generateBatchNodes(); } - private static void onRecord(BatchRecordSequenceListener listener, BatchRecord record, int index) { + public static void onRecord(RecordSequenceListener listener, Key key, Record record) { + try { + listener.onRecord(key, record); + } + catch (Throwable e) { + Log.error("Unexpected exception from onRecord(): " + Util.getErrorMessage(e)); + } + } + + public static void onRecord(BatchRecordSequenceListener listener, BatchRecord record, int index) { try { listener.onRecord(record, index); } diff --git a/client/src/com/aerospike/client/async/AsyncBatchExecutor.java b/client/src/com/aerospike/client/async/AsyncBatchExecutor.java index cb90c27ab..5f1f81d4f 100644 --- a/client/src/com/aerospike/client/async/AsyncBatchExecutor.java +++ b/client/src/com/aerospike/client/async/AsyncBatchExecutor.java @@ -16,54 +16,327 @@ */ package com.aerospike.client.async; +import java.util.List; + import com.aerospike.client.AerospikeException; +import com.aerospike.client.BatchRead; +import com.aerospike.client.BatchRecord; import com.aerospike.client.Key; +import com.aerospike.client.Record; import com.aerospike.client.async.AsyncBatch.AsyncBatchCommand; import com.aerospike.client.cluster.Cluster; import com.aerospike.client.command.BatchNodeList; +import com.aerospike.client.listener.BatchListListener; +import com.aerospike.client.listener.BatchOperateListListener; +import com.aerospike.client.listener.BatchRecordArrayListener; +import com.aerospike.client.listener.BatchRecordSequenceListener; +import com.aerospike.client.listener.BatchSequenceListener; +import com.aerospike.client.listener.ExistsArrayListener; +import com.aerospike.client.listener.ExistsSequenceListener; +import com.aerospike.client.listener.RecordArrayListener; +import com.aerospike.client.listener.RecordSequenceListener; public abstract class AsyncBatchExecutor implements BatchNodeList.IBatchStatus { + public static final class BatchRecordArray extends AsyncBatchExecutor { + private final BatchRecordArrayListener listener; + private final BatchRecord[] records; + + public BatchRecordArray( + EventLoop eventLoop, + Cluster cluster, + BatchRecordArrayListener listener, + BatchRecord[] records + ) { + super(eventLoop, cluster, true); + this.listener = listener; + this.records = records; + } + + protected void onSuccess() { + listener.onSuccess(records, getStatus()); + } + + protected void onFailure(AerospikeException ae) { + listener.onFailure(records, ae); + } + } + + public static final class BatchRecordSequence extends AsyncBatchExecutor { + private final BatchRecordSequenceListener listener; + private final boolean[] sent; + + public BatchRecordSequence( + EventLoop eventLoop, + Cluster cluster, + BatchRecordSequenceListener listener, + boolean[] sent + ) { + super(eventLoop, cluster, true); + this.listener = listener; + this.sent = sent; + } + + public void setSent(int index) { + sent[index] = true; + } + + public boolean exchangeSent(int index) { + boolean prev = sent[index]; + sent[index] = true; + return prev; + } + + @Override + public void batchKeyError(Key key, int index, AerospikeException ae, boolean inDoubt, boolean hasWrite) { + sent[index] = true; + BatchRecord record = new BatchRecord(key, null, ae.getResultCode(), inDoubt, hasWrite); + AsyncBatch.onRecord(listener, record, index); + } + + @Override + protected void onSuccess() { + listener.onSuccess(); + } + + @Override + protected void onFailure(AerospikeException ae) { + listener.onFailure(ae); + } + } + + public static final class ExistsArray extends AsyncBatchExecutor { + private final ExistsArrayListener listener; + private final Key[] keys; + private final boolean[] existsArray; + + public ExistsArray( + EventLoop eventLoop, + Cluster cluster, + ExistsArrayListener listener, + Key[] keys, + boolean[] existsArray + ) { + super(eventLoop, cluster, false); + this.listener = listener; + this.keys = keys; + this.existsArray = existsArray; + } + + protected void onSuccess() { + listener.onSuccess(keys, existsArray); + } + + protected void onFailure(AerospikeException ae) { + listener.onFailure(new AerospikeException.BatchExists(existsArray, ae)); + } + } + + public static final class ExistsSequence extends AsyncBatchExecutor { + private final ExistsSequenceListener listener; + + public ExistsSequence( + EventLoop eventLoop, + Cluster cluster, + ExistsSequenceListener listener + ) { + super(eventLoop, cluster, false); + this.listener = listener; + } + + protected void onSuccess() { + listener.onSuccess(); + } + + protected void onFailure(AerospikeException ae) { + listener.onFailure(ae); + } + } + + public static final class ReadList extends AsyncBatchExecutor { + private final BatchListListener listener; + private final List records; + + public ReadList( + EventLoop eventLoop, + Cluster cluster, + BatchListListener listener, + List records + ) { + super(eventLoop, cluster, true); + this.listener = listener; + this.records = records; + } + + protected void onSuccess() { + listener.onSuccess(records); + } + + protected void onFailure(AerospikeException ae) { + listener.onFailure(ae); + } + } + + public static final class ReadSequence extends AsyncBatchExecutor { + private final BatchSequenceListener listener; + + public ReadSequence( + EventLoop eventLoop, + Cluster cluster, + BatchSequenceListener listener + ) { + super(eventLoop, cluster, true); + this.listener = listener; + } + + protected void onSuccess() { + listener.onSuccess(); + } + + protected void onFailure(AerospikeException ae) { + listener.onFailure(ae); + } + } + + public static final class GetArray extends AsyncBatchExecutor { + private final RecordArrayListener listener; + private final Key[] keys; + private final Record[] records; + + public GetArray( + EventLoop eventLoop, + Cluster cluster, + RecordArrayListener listener, + Key[] keys, + Record[] records + ) { + super(eventLoop, cluster, false); + this.listener = listener; + this.keys = keys; + this.records = records; + } + + protected void onSuccess() { + listener.onSuccess(keys, records); + } + + protected void onFailure(AerospikeException ae) { + listener.onFailure(new AerospikeException.BatchRecords(records, ae)); + } + } + + public static final class GetSequence extends AsyncBatchExecutor { + private final RecordSequenceListener listener; + + public GetSequence( + EventLoop eventLoop, + Cluster cluster, + RecordSequenceListener listener + ) { + super(eventLoop, cluster, false); + this.listener = listener; + } + + @Override + protected void onSuccess() { + listener.onSuccess(); + } + + @Override + protected void onFailure(AerospikeException ae) { + listener.onFailure(ae); + } + } + + public static final class OperateList extends AsyncBatchExecutor { + private final BatchOperateListListener listener; + private final List records; + + public OperateList( + EventLoop eventLoop, + Cluster cluster, + BatchOperateListListener listener, + List records + ) { + super(eventLoop, cluster, true); + this.listener = listener; + this.records = records; + } + + protected void onSuccess() { + listener.onSuccess(records, getStatus()); + } + + protected void onFailure(AerospikeException ae) { + listener.onFailure(ae); + } + } + + public static final class OperateSequence extends AsyncBatchExecutor { + private final BatchRecordSequenceListener listener; + + public OperateSequence( + EventLoop eventLoop, + Cluster cluster, + BatchRecordSequenceListener listener + ) { + super(eventLoop, cluster, true); + this.listener = listener; + } + + protected void onSuccess() { + listener.onSuccess(); + } + + protected void onFailure(AerospikeException ae) { + listener.onFailure(ae); + } + } + + //------------------------------------------------------- + // Base Executor + //------------------------------------------------------- + final EventLoop eventLoop; final Cluster cluster; private AerospikeException exception; - private AsyncBatchCommand[] commands; + private AsyncCommand[] commands; private int completedCount; // Not atomic because all commands run on same event loop thread. private final boolean hasResultCode; boolean done; boolean error; - public AsyncBatchExecutor(EventLoop eventLoop, Cluster cluster, boolean hasResultCode) { + private AsyncBatchExecutor(EventLoop eventLoop, Cluster cluster, boolean hasResultCode) { this.eventLoop = eventLoop; this.cluster = cluster; this.hasResultCode = hasResultCode; cluster.addTran(); } - public void execute(AsyncBatchCommand[] cmds) { + public void execute(AsyncCommand[] cmds) { this.commands = cmds; - for (AsyncBatchCommand cmd : cmds) { + for (AsyncCommand cmd : cmds) { eventLoop.execute(cluster, cmd); } } public void executeBatchRetry(AsyncBatchCommand[] cmds, AsyncBatchCommand orig, Runnable other, long deadline) { // Create new commands array. - AsyncBatchCommand[] target = new AsyncBatchCommand[commands.length + cmds.length - 1]; + AsyncCommand[] target = new AsyncCommand[commands.length + cmds.length - 1]; int count = 0; - for (AsyncBatchCommand cmd : commands) { + for (AsyncCommand cmd : commands) { if (cmd != orig) { target[count++] = cmd; } } - for (AsyncBatchCommand cmd : cmds) { + for (AsyncCommand cmd : cmds) { target[count++] = cmd; } commands = target; - for (AsyncBatchCommand cmd : cmds) { + for (AsyncCommand cmd : cmds) { eventLoop.executeBatchRetry(other, cmd, deadline); } } diff --git a/client/src/com/aerospike/client/async/AsyncBatchSingle.java b/client/src/com/aerospike/client/async/AsyncBatchSingle.java new file mode 100644 index 000000000..783605718 --- /dev/null +++ b/client/src/com/aerospike/client/async/AsyncBatchSingle.java @@ -0,0 +1,1046 @@ +/* + * Copyright 2012-2023 Aerospike, Inc. + * + * Portions may be licensed to Aerospike, Inc. under one or more contributor + * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.aerospike.client.async; + +import com.aerospike.client.AerospikeException; +import com.aerospike.client.BatchRead; +import com.aerospike.client.BatchRecord; +import com.aerospike.client.BatchUDF; +import com.aerospike.client.BatchWrite; +import com.aerospike.client.Key; +import com.aerospike.client.Log; +import com.aerospike.client.Operation; +import com.aerospike.client.Record; +import com.aerospike.client.ResultCode; +import com.aerospike.client.async.AsyncBatchExecutor.BatchRecordSequence; +import com.aerospike.client.cluster.Cluster; +import com.aerospike.client.cluster.Node; +import com.aerospike.client.cluster.Partition; +import com.aerospike.client.command.BatchAttr; +import com.aerospike.client.command.Command; +import com.aerospike.client.command.RecordParser; +import com.aerospike.client.listener.BatchRecordSequenceListener; +import com.aerospike.client.listener.BatchSequenceListener; +import com.aerospike.client.listener.ExistsSequenceListener; +import com.aerospike.client.listener.RecordSequenceListener; +import com.aerospike.client.metrics.LatencyType; +import com.aerospike.client.policy.BatchPolicy; +import com.aerospike.client.policy.Policy; +import com.aerospike.client.util.Util; + +public final class AsyncBatchSingle { + //------------------------------------------------------- + // Read + //------------------------------------------------------- + + public static final class ReadGetSequence extends Read { + private final BatchSequenceListener listener; + + public ReadGetSequence( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchRead record, + Node node, + BatchSequenceListener listener + ) { + super(executor, cluster, policy, record, node); + this.listener = listener; + } + + @Override + protected final boolean parseResult() { + super.parseResult(); + + try { + listener.onRecord(record); + } + catch (Throwable e) { + Log.error("Unexpected exception from onRecord(): " + Util.getErrorMessage(e)); + } + return true; + } + } + + public static final class ReadSequence extends Read { + private final BatchRecordSequenceListener listener; + private final int index; + + public ReadSequence( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchRead record, + Node node, + BatchRecordSequenceListener listener, + int index + ) { + super(executor, cluster, policy, record, node); + this.listener = listener; + this.index = index; + } + + @Override + protected boolean parseResult() { + super.parseResult(); + AsyncBatch.onRecord(listener, record, index); + return true; + } + } + + public static class Read extends AsyncBaseCommand { + final BatchRead record; + + public Read( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchRead record, + Node node + ) { + super(executor, cluster, policy, record.key, node, false); + this.record = record; + } + + @Override + protected void writeBuffer() { + setRead(policy, record); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.resultCode == ResultCode.OK) { + record.setRecord(rp.parseRecord(record.ops != null)); + } + else { + record.setError(rp.resultCode, false); + executor.setRowError(); + } + return true; + } + } + + //------------------------------------------------------- + // Operate/Get + //------------------------------------------------------- + + public static final class OperateGet extends Get { + private final Operation[] ops; + + public OperateGet( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + Key key, + Operation[] ops, + Record[] records, + Node node, + int index + ) { + super(executor, cluster, policy, key, null, records, node, index, true); + this.ops = ops; + } + + @Override + protected void writeBuffer() { + setRead(policy, key, ops); + } + } + + public static class Get extends AsyncBaseCommand { + private final String[] binNames; + private final Record[] records; + private final int index; + private final boolean isOperation; + + public Get( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + Key key, + String[] binNames, + Record[] records, + Node node, + int index, + boolean isOperation + ) { + super(executor, cluster, policy, key, node, false); + this.binNames = binNames; + this.records = records; + this.index = index; + this.isOperation = isOperation; + } + + @Override + protected void writeBuffer() { + setRead(policy, key, binNames); + } + + @Override + protected final boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.resultCode == ResultCode.OK) { + records[index] = rp.parseRecord(isOperation); + } + return true; + } + } + + public static final class OperateGetSequence extends GetSequence { + private final Operation[] ops; + + public OperateGetSequence( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + RecordSequenceListener listener, + Key key, + Operation[] ops, + Node node + ) { + super(executor, cluster, policy, listener, key, null, node, true); + this.ops = ops; + } + + @Override + protected void writeBuffer() { + setRead(policy, key, ops); + } + } + + public static class GetSequence extends AsyncBaseCommand { + private final RecordSequenceListener listener; + private final String[] binNames; + private final boolean isOperation; + + public GetSequence( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + RecordSequenceListener listener, + Key key, + String[] binNames, + Node node, + boolean isOperation + ) { + super(executor, cluster, policy, key, node, false); + this.listener = listener; + this.binNames = binNames; + this.isOperation = isOperation; + } + + @Override + protected void writeBuffer() { + setRead(policy, key, binNames); + } + + @Override + protected final boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + Record record = null; + + if (rp.resultCode == ResultCode.OK) { + record = rp.parseRecord(isOperation); + } + AsyncBatch.onRecord(listener, key, record); + return true; + } + } + + //------------------------------------------------------- + // Read Header + //------------------------------------------------------- + + public static class ReadHeaderSequence extends AsyncBaseCommand { + private final RecordSequenceListener listener; + + public ReadHeaderSequence( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + Key key, + Node node, + RecordSequenceListener listener + ) { + super(executor, cluster, policy, key, node, false); + this.listener = listener; + } + + @Override + protected void writeBuffer() { + setReadHeader(policy, key); + } + + @Override + protected final boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + Record record = null; + + if (rp.resultCode == ResultCode.OK) { + record = rp.parseRecord(false); + } + AsyncBatch.onRecord(listener, key, record); + return true; + } + } + + public static class ReadHeader extends AsyncBaseCommand { + private final Record[] records; + private final int index; + + public ReadHeader( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + Key key, + Record[] records, + Node node, + int index + ) { + super(executor, cluster, policy, key, node, false); + this.records = records; + this.index = index; + } + + @Override + protected void writeBuffer() { + setReadHeader(policy, key); + } + + @Override + protected final boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.resultCode == ResultCode.OK) { + records[index] = rp.parseRecord(false); + } + return true; + } + } + + //------------------------------------------------------- + // Exists + //------------------------------------------------------- + + public static final class ExistsSequence extends AsyncBaseCommand { + private final ExistsSequenceListener listener; + + public ExistsSequence( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + Key key, + Node node, + ExistsSequenceListener listener + ) { + super(executor, cluster, policy, key, node, false); + this.listener = listener; + } + + @Override + protected void writeBuffer() { + setExists(policy, key); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.opCount > 0) { + throw new AerospikeException.Parse("Received bins that were not requested!"); + } + + try { + listener.onExists(key, rp.resultCode == 0); + } + catch (Throwable e) { + Log.error("Unexpected exception from onExists(): " + Util.getErrorMessage(e)); + } + return true; + } + } + + public static final class Exists extends AsyncBaseCommand { + private final boolean[] existsArray; + private final int index; + + public Exists( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + Key key, + Node node, + boolean[] existsArray, + int index + ) { + super(executor, cluster, policy, key, node, false); + this.existsArray = existsArray; + this.index = index; + } + + @Override + protected void writeBuffer() { + setExists(policy, key); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.opCount > 0) { + throw new AerospikeException.Parse("Received bins that were not requested!"); + } + + existsArray[index] = rp.resultCode == 0; + return true; + } + } + + //------------------------------------------------------- + // Operate + //------------------------------------------------------- + + public static final class OperateSequence extends AsyncBaseCommand { + private final BatchRecordSequence parent; + private final BatchRecordSequenceListener listener; + private final BatchAttr attr; + private final Operation[] ops; + private final int index; + + public OperateSequence( + BatchRecordSequence executor, + Cluster cluster, + BatchPolicy policy, + Key key, + BatchAttr attr, + Operation[] ops, + Node node, + BatchRecordSequenceListener listener, + int index + ) { + super(executor, cluster, policy, key, node, attr.hasWrite); + this.parent = executor; + this.listener = listener; + this.attr = attr; + this.ops = ops; + this.index = index; + } + + @Override + protected void writeBuffer() { + setOperate(policy, attr, key, ops); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + BatchRecord record; + + if (rp.resultCode == 0) { + record = new BatchRecord(key, rp.parseRecord(true), attr.hasWrite); + } + else { + record = new BatchRecord(key, null, rp.resultCode, + Command.batchInDoubt(attr.hasWrite, commandSentCounter), attr.hasWrite); + executor.setRowError(); + } + parent.setSent(index); + AsyncBatch.onRecord(listener, record, index); + return true; + } + + @Override + public void setInDoubt() { + if (! parent.exchangeSent(index)) { + BatchRecord record = new BatchRecord(key, null, ResultCode.NO_RESPONSE, true, attr.hasWrite); + AsyncBatch.onRecord(listener, record, index); + } + } + } + + public static final class Operate extends AsyncBaseCommand { + private final BatchAttr attr; + private final BatchRecord record; + private final Operation[] ops; + + public Operate( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchAttr attr, + BatchRecord record, + Operation[] ops, + Node node + ) { + super(executor, cluster, policy, record.key, node, attr.hasWrite); + this.attr = attr; + this.record = record; + this.ops = ops; + } + + @Override + protected void writeBuffer() { + setOperate(policy, attr, record.key, ops); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.resultCode == ResultCode.OK) { + record.setRecord(rp.parseRecord(true)); + } + else { + record.setError(rp.resultCode, Command.batchInDoubt(attr.hasWrite, commandSentCounter)); + executor.setRowError(); + } + return true; + } + + @Override + public void setInDoubt() { + if (record.resultCode == ResultCode.NO_RESPONSE) { + record.inDoubt = true; + } + } + } + + //------------------------------------------------------- + // Write + //------------------------------------------------------- + + public static final class WriteSequence extends Write { + private final BatchRecordSequenceListener listener; + private final int index; + + public WriteSequence( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchAttr attr, + BatchWrite record, + Node node, + BatchRecordSequenceListener listener, + int index + ) { + super(executor, cluster, policy, attr, record, node); + this.listener = listener; + this.index = index; + } + + @Override + protected boolean parseResult() { + super.parseResult(); + AsyncBatch.onRecord(listener, record, index); + return true; + } + + // setInDoubt() is not overridden to call onRecord() because user already has access to full + // BatchRecord list and can examine each record for inDoubt when the exception occurs. + } + + public static class Write extends AsyncBaseCommand { + private final BatchAttr attr; + final BatchWrite record; + + public Write( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchAttr attr, + BatchWrite record, + Node node + ) { + super(executor, cluster, policy, record.key, node, true); + this.attr = attr; + this.record = record; + } + + @Override + protected void writeBuffer() { + setOperate(policy, attr, record.key, record.ops); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.resultCode == ResultCode.OK) { + record.setRecord(rp.parseRecord(true)); + } + else { + record.setError(rp.resultCode, Command.batchInDoubt(true, commandSentCounter)); + executor.setRowError(); + } + return true; + } + + @Override + public void setInDoubt() { + if (record.resultCode == ResultCode.NO_RESPONSE) { + record.inDoubt = true; + } + } + } + + //------------------------------------------------------- + // UDF + //------------------------------------------------------- + + public static final class UDFSequence extends UDF { + private final BatchRecordSequenceListener listener; + private final int index; + + public UDFSequence( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchAttr attr, + BatchUDF record, + Node node, + BatchRecordSequenceListener listener, + int index + ) { + super(executor, cluster, policy, attr, record, node); + this.listener = listener; + this.index = index; + } + + @Override + protected boolean parseResult() { + super.parseResult(); + AsyncBatch.onRecord(listener, record, index); + return true; + } + + // setInDoubt() is not overridden to call onRecord() because user already has access to full + // BatchRecord list and can examine each record for inDoubt when the exception occurs. + } + + public static class UDF extends AsyncBaseCommand { + private final BatchAttr attr; + final BatchUDF record; + + public UDF( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchAttr attr, + BatchUDF record, + Node node + ) { + super(executor, cluster, policy, record.key, node, true); + this.attr = attr; + this.record = record; + } + + @Override + protected void writeBuffer() { + setUdf(policy, attr, record.key, record.packageName, record.functionName, record.functionArgs); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.resultCode == ResultCode.OK) { + record.setRecord(rp.parseRecord(false)); + } + else if (rp.resultCode == ResultCode.UDF_BAD_RESPONSE) { + Record r = rp.parseRecord(false); + String m = r.getString("FAILURE"); + + if (m != null) { + // Need to store record because failure bin contains an error message. + record.record = r; + } + record.resultCode = rp.resultCode; + record.inDoubt = Command.batchInDoubt(true, commandSentCounter); + executor.setRowError(); + } + else { + record.setError(rp.resultCode, Command.batchInDoubt(true, commandSentCounter)); + executor.setRowError(); + } + return true; + } + + @Override + public void setInDoubt() { + if (record.resultCode == ResultCode.NO_RESPONSE) { + record.inDoubt = true; + } + } + } + + public static final class UDFSequenceCommand extends AsyncBaseCommand { + private final BatchRecordSequence parent; + private final BatchRecordSequenceListener listener; + private final BatchAttr attr; + private final String packageName; + private final String functionName; + private final byte[] argBytes; + private final int index; + + public UDFSequenceCommand( + BatchRecordSequence executor, + Cluster cluster, + BatchPolicy policy, + Key key, + BatchAttr attr, + String packageName, + String functionName, + byte[] argBytes, + Node node, + BatchRecordSequenceListener listener, + int index + ) { + super(executor, cluster, policy, key, node, true); + this.parent = executor; + this.listener = listener; + this.attr = attr; + this.packageName = packageName; + this.functionName = functionName; + this.argBytes = argBytes; + this.index = index; + } + + @Override + protected void writeBuffer() { + setUdf(policy, attr, key, packageName, functionName, argBytes); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + BatchRecord record; + + if (rp.resultCode == ResultCode.OK) { + record = new BatchRecord(key, rp.parseRecord(true), attr.hasWrite); + } + else if (rp.resultCode == ResultCode.UDF_BAD_RESPONSE) { + Record r = rp.parseRecord(false); + String m = r.getString("FAILURE"); + + if (m != null) { + // Need to store record because failure bin contains an error message. + record = new BatchRecord(key, r, rp.resultCode, Command.batchInDoubt(true, commandSentCounter), true); + } + else { + record = new BatchRecord(key, null, rp.resultCode, Command.batchInDoubt(true, commandSentCounter), true); + } + executor.setRowError(); + } + else { + record = new BatchRecord(key, null, rp.resultCode, Command.batchInDoubt(true, commandSentCounter), true); + executor.setRowError(); + } + parent.setSent(index); + AsyncBatch.onRecord(listener, record, index); + return true; + } + + @Override + public void setInDoubt() { + if (! parent.exchangeSent(index)) { + BatchRecord record = new BatchRecord(key, null, ResultCode.NO_RESPONSE, true, true); + AsyncBatch.onRecord(listener, record, index); + } + } + } + + public static class UDFCommand extends AsyncBaseCommand { + private final BatchAttr attr; + private final BatchRecord record; + private final String packageName; + private final String functionName; + private final byte[] argBytes; + + public UDFCommand( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchAttr attr, + BatchRecord record, + String packageName, + String functionName, + byte[] argBytes, + Node node + ) { + super(executor, cluster, policy, record.key, node, true); + this.attr = attr; + this.record = record; + this.packageName = packageName; + this.functionName = functionName; + this.argBytes = argBytes; + } + + @Override + protected void writeBuffer() { + setUdf(policy, attr, key, packageName, functionName, argBytes); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.resultCode == ResultCode.OK) { + record.setRecord(rp.parseRecord(false)); + } + else if (rp.resultCode == ResultCode.UDF_BAD_RESPONSE) { + Record r = rp.parseRecord(false); + String m = r.getString("FAILURE"); + + if (m != null) { + // Need to store record because failure bin contains an error message. + record.record = r; + } + record.resultCode = rp.resultCode; + record.inDoubt = Command.batchInDoubt(true, commandSentCounter); + executor.setRowError(); + } + else { + record.setError(rp.resultCode, Command.batchInDoubt(true, commandSentCounter)); + executor.setRowError(); + } + return true; + } + + @Override + public void setInDoubt() { + if (record.resultCode == ResultCode.NO_RESPONSE) { + record.inDoubt = true; + } + } + } + + //------------------------------------------------------- + // Delete + //------------------------------------------------------- + + public static final class DeleteSequenceSent extends AsyncBaseCommand { + private final BatchRecordSequence parent; + private final BatchRecordSequenceListener listener; + private final BatchAttr attr; + private final int index; + + public DeleteSequenceSent( + BatchRecordSequence executor, + Cluster cluster, + BatchPolicy policy, + Key key, + BatchAttr attr, + Node node, + BatchRecordSequenceListener listener, + int index + ) { + super(executor, cluster, policy, key, node, true); + this.parent = executor; + this.listener = listener; + this.attr = attr; + this.index = index; + } + + @Override + protected void writeBuffer() { + setDelete(policy, key, attr); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + BatchRecord record; + + if (rp.resultCode == 0) { + record = new BatchRecord(key, new Record(null, rp.generation, rp.expiration), true); + } + else { + record = new BatchRecord(key, null, rp.resultCode, Command.batchInDoubt(true, commandSentCounter), true); + executor.setRowError(); + } + parent.setSent(index); + AsyncBatch.onRecord(listener, record, index); + return true; + } + + @Override + public void setInDoubt() { + if (! parent.exchangeSent(index)) { + BatchRecord record = new BatchRecord(key, null, ResultCode.NO_RESPONSE, true, true); + AsyncBatch.onRecord(listener, record, index); + } + } + } + + public static final class DeleteSequence extends Delete { + private final BatchRecordSequenceListener listener; + private final int index; + + public DeleteSequence( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchAttr attr, + BatchRecord record, + Node node, + BatchRecordSequenceListener listener, + int index + ) { + super(executor, cluster, policy, attr, record, node); + this.listener = listener; + this.index = index; + } + + @Override + protected boolean parseResult() { + super.parseResult(); + AsyncBatch.onRecord(listener, record, index); + return true; + } + + // setInDoubt() is not overridden to call onRecord() because user already has access to full + // BatchRecord list and can examine each record for inDoubt when the exception occurs. + } + + public static class Delete extends AsyncBaseCommand { + private final BatchAttr attr; + final BatchRecord record; + + public Delete( + AsyncBatchExecutor executor, + Cluster cluster, + BatchPolicy policy, + BatchAttr attr, + BatchRecord record, + Node node + ) { + super(executor, cluster, policy, record.key, node, true); + this.attr = attr; + this.record = record; + } + + @Override + protected void writeBuffer() { + setDelete(policy, record.key, attr); + } + + @Override + protected boolean parseResult() { + RecordParser rp = new RecordParser(dataBuffer, dataOffset, receiveSize); + + if (rp.resultCode == 0) { + record.setRecord(new Record(null, rp.generation, rp.expiration)); + } + else { + record.setError(rp.resultCode, Command.batchInDoubt(true, commandSentCounter)); + executor.setRowError(); + } + return true; + } + + @Override + public void setInDoubt() { + if (record.resultCode == ResultCode.NO_RESPONSE) { + record.inDoubt = true; + } + } + } + + //------------------------------------------------------- + // Async Batch Base Command + //------------------------------------------------------- + + static abstract class AsyncBaseCommand extends AsyncCommand { + final AsyncBatchExecutor executor; + final Cluster cluster; + final Key key; + Node node; + int sequence; + final boolean hasWrite; + + public AsyncBaseCommand( + AsyncBatchExecutor executor, + Cluster cluster, + Policy policy, + Key key, + Node node, + boolean hasWrite + ) { + super(policy, true); + this.executor = executor; + this.cluster = cluster; + this.key = key; + this.node = node; + this.hasWrite = hasWrite; + } + + @Override + boolean isWrite() { + return hasWrite; + } + + @Override + protected Node getNode(Cluster cluster) { + return node; + } + + @Override + protected LatencyType getLatencyType() { + return LatencyType.BATCH; + } + + @Override + protected boolean prepareRetry(boolean timeout) { + if (hasWrite) { + Partition p = Partition.write(cluster, policy, key); + p.sequence = sequence; + p.prevNode = node; + p.prepareRetryWrite(timeout); + node = p.getNodeWrite(cluster); + sequence = p.sequence; + } + else { + Partition p = Partition.read(cluster, policy, key); + p.sequence = sequence; + p.prevNode = node; + p.prepareRetryRead(timeout); + node = p.getNodeRead(cluster); + sequence = p.sequence; + } + return true; + } + + @Override + protected void onSuccess() { + executor.childSuccess(); + } + + @Override + protected void onFailure(AerospikeException e) { + if (e.getInDoubt()) { + setInDoubt(); + } + executor.childFailure(e); + } + + public void setInDoubt() { + } + } +} diff --git a/client/src/com/aerospike/client/cluster/Partition.java b/client/src/com/aerospike/client/cluster/Partition.java index e26d6dcd1..ef18d56ba 100644 --- a/client/src/com/aerospike/client/cluster/Partition.java +++ b/client/src/com/aerospike/client/cluster/Partition.java @@ -139,9 +139,9 @@ public static Node getNodeBatchRead( private Partitions partitions; private final String namespace; private final Replica replica; - private Node prevNode; + public Node prevNode; private int partitionId; - private int sequence; + public int sequence; private final boolean linearize; private Partition(Partitions partitions, Key key, Replica replica, Node prevNode, boolean linearize) { diff --git a/client/src/com/aerospike/client/command/Batch.java b/client/src/com/aerospike/client/command/Batch.java index 98f6e7f45..934de41fd 100644 --- a/client/src/com/aerospike/client/command/Batch.java +++ b/client/src/com/aerospike/client/command/Batch.java @@ -266,11 +266,7 @@ protected boolean parseRow() { } @Override - protected void setInDoubt(boolean inDoubt) { - if (!inDoubt) { - return; - } - + protected void inDoubt() { for (int index : batch.offsets) { BatchRecord record = records.get(index); @@ -345,8 +341,8 @@ protected boolean parseRow() { } @Override - protected void setInDoubt(boolean inDoubt) { - if (!inDoubt || !attr.hasWrite) { + protected void inDoubt() { + if (!attr.hasWrite) { return; } @@ -354,7 +350,7 @@ protected void setInDoubt(boolean inDoubt) { BatchRecord record = records[index]; if (record.resultCode == ResultCode.NO_RESPONSE) { - record.inDoubt = inDoubt; + record.inDoubt = true; } } } @@ -444,8 +440,8 @@ protected boolean parseRow() { } @Override - protected void setInDoubt(boolean inDoubt) { - if (!inDoubt || !attr.hasWrite) { + protected void inDoubt() { + if (!attr.hasWrite) { return; } @@ -453,7 +449,7 @@ protected void setInDoubt(boolean inDoubt) { BatchRecord record = records[index]; if (record.resultCode == ResultCode.NO_RESPONSE) { - record.inDoubt = inDoubt; + record.inDoubt = true; } } } @@ -473,7 +469,7 @@ protected List generateBatchNodes() { // Batch Base Command //------------------------------------------------------- - public static abstract class BatchCommand extends MultiCommand implements Runnable { + public static abstract class BatchCommand extends MultiCommand implements IBatchCommand { final BatchNode batch; final BatchPolicy batchPolicy; final BatchStatus status; @@ -495,31 +491,28 @@ public BatchCommand( this.status = status; } + @Override + public void setParent(BatchExecutor parent) { + this.parent = parent; + } + @Override public void run() { try { execute(); } catch (AerospikeException ae) { - // Set error/inDoubt for keys associated this batch command when - // the command was not retried and split. If a split retry occurred, - // those new subcommands have already set error/inDoubt on the affected - // subset of keys. - if (! splitRetry) { - setInDoubt(ae.getInDoubt()); + if (ae.getInDoubt()) { + setInDoubt(); } status.setException(ae); } catch (RuntimeException re) { - if (! splitRetry) { - setInDoubt(true); - } + setInDoubt(); status.setException(re); } catch (Throwable e) { - if (! splitRetry) { - setInDoubt(true); - } + setInDoubt(); status.setException(new RuntimeException(e)); } finally { @@ -584,8 +577,8 @@ protected boolean retryBatch( command.executeCommand(); } catch (AerospikeException ae) { - if (! command.splitRetry) { - command.setInDoubt(ae.getInDoubt()); + if (ae.getInDoubt()) { + command.setInDoubt(); } status.setException(ae); @@ -594,9 +587,7 @@ protected boolean retryBatch( } } catch (RuntimeException re) { - if (! command.splitRetry) { - command.setInDoubt(true); - } + command.setInDoubt(); status.setException(re); if (!batchPolicy.respondAllKeys) { @@ -604,9 +595,7 @@ protected boolean retryBatch( } } catch (Throwable e) { - if (! command.splitRetry) { - command.setInDoubt(true); - } + command.setInDoubt(); status.setException(new RuntimeException(e)); if (!batchPolicy.respondAllKeys) { @@ -617,7 +606,18 @@ protected boolean retryBatch( return true; } - protected void setInDoubt(boolean inDoubt) { + @Override + public void setInDoubt() { + // Set error/inDoubt for keys associated this batch command when + // the command was not retried and split. If a split retry occurred, + // those new subcommands have already set inDoubt on the affected + // subset of keys. + if (! splitRetry) { + inDoubt(); + } + } + + protected void inDoubt() { // Do nothing by default. Batch writes will override this method. } diff --git a/client/src/com/aerospike/client/command/BatchAttr.java b/client/src/com/aerospike/client/command/BatchAttr.java index 87cc8f1dd..74b832e39 100644 --- a/client/src/com/aerospike/client/command/BatchAttr.java +++ b/client/src/com/aerospike/client/command/BatchAttr.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2022 Aerospike, Inc. + * Copyright 2012-2023 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -33,6 +33,7 @@ public final class BatchAttr { public int writeAttr; public int infoAttr; public int expiration; + public int opSize; public short generation; public boolean hasWrite; public boolean sendKey; @@ -343,4 +344,14 @@ public void setDelete(BatchDeletePolicy dp) { infoAttr |= Command.INFO3_COMMIT_MASTER; } } + + public void setOpSize(Operation[] ops) { + int dataOffset = 0; + + for (Operation op : ops) { + dataOffset += Buffer.estimateSizeUtf8(op.binName) + Command.OPERATION_HEADER_SIZE; + dataOffset += op.value.estimateSize(); + } + opSize = dataOffset; + } } diff --git a/client/src/com/aerospike/client/command/BatchExecutor.java b/client/src/com/aerospike/client/command/BatchExecutor.java index 386d7a4d7..ebde63874 100644 --- a/client/src/com/aerospike/client/command/BatchExecutor.java +++ b/client/src/com/aerospike/client/command/BatchExecutor.java @@ -22,27 +22,22 @@ import com.aerospike.client.AerospikeException; import com.aerospike.client.cluster.Cluster; -import com.aerospike.client.command.Batch.BatchCommand; import com.aerospike.client.policy.BatchPolicy; public final class BatchExecutor { - public static void execute(Cluster cluster, BatchPolicy policy, BatchCommand[] commands, BatchStatus status) { + public static void execute(Cluster cluster, BatchPolicy policy, IBatchCommand[] commands, BatchStatus status) { cluster.addTran(); if (policy.maxConcurrentThreads == 1 || commands.length <= 1) { // Run batch requests sequentially in same thread. - for (BatchCommand command : commands) { + for (IBatchCommand command : commands) { try { command.execute(); } catch (AerospikeException ae) { - // Set error/inDoubt for keys associated this batch command when - // the command was not retried and split. If a split retry occurred, - // those new subcommands have already set error/inDoubt on the affected - // subset of keys. - if (! command.splitRetry) { - command.setInDoubt(ae.getInDoubt()); + if (ae.getInDoubt()) { + command.setInDoubt(); } status.setException(ae); @@ -51,9 +46,7 @@ public static void execute(Cluster cluster, BatchPolicy policy, BatchCommand[] c } } catch (RuntimeException re) { - if (! command.splitRetry) { - command.setInDoubt(true); - } + command.setInDoubt(); status.setException(re); if (!policy.respondAllKeys) { @@ -61,9 +54,7 @@ public static void execute(Cluster cluster, BatchPolicy policy, BatchCommand[] c } } catch (Throwable e) { - if (! command.splitRetry) { - command.setInDoubt(true); - } + command.setInDoubt(); status.setException(new RuntimeException(e)); if (!policy.respondAllKeys) { @@ -84,11 +75,11 @@ public static void execute(Cluster cluster, BatchPolicy policy, BatchCommand[] c private final ExecutorService threadPool; private final AtomicBoolean done; private final AtomicInteger completedCount; - private final BatchCommand[] commands; + private final IBatchCommand[] commands; private final int maxConcurrentThreads; private boolean completed; - private BatchExecutor(Cluster cluster, BatchPolicy policy, BatchCommand[] commands, BatchStatus status) { + private BatchExecutor(Cluster cluster, BatchPolicy policy, IBatchCommand[] commands, BatchStatus status) { this.commands = commands; this.status = status; this.threadPool = cluster.getThreadPool(); @@ -101,8 +92,8 @@ private BatchExecutor(Cluster cluster, BatchPolicy policy, BatchCommand[] comman void execute() { // Start threads. for (int i = 0; i < maxConcurrentThreads; i++) { - BatchCommand cmd = commands[i]; - cmd.parent = this; + IBatchCommand cmd = commands[i]; + cmd.setParent(this); threadPool.execute(cmd); } @@ -123,8 +114,8 @@ void onComplete() { // Determine if a new thread needs to be started. if (nextThread < commands.length && ! done.get()) { // Start new thread. - BatchCommand cmd = commands[nextThread]; - cmd.parent = this; + IBatchCommand cmd = commands[nextThread]; + cmd.setParent(this); threadPool.execute(cmd); } } diff --git a/client/src/com/aerospike/client/command/BatchSingle.java b/client/src/com/aerospike/client/command/BatchSingle.java new file mode 100644 index 000000000..4dd05cf7a --- /dev/null +++ b/client/src/com/aerospike/client/command/BatchSingle.java @@ -0,0 +1,460 @@ +/* + * Copyright 2012-2023 Aerospike, Inc. + * + * Portions may be licensed to Aerospike, Inc. under one or more contributor + * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.aerospike.client.command; + +import java.io.IOException; + +import com.aerospike.client.AerospikeException; +import com.aerospike.client.BatchRead; +import com.aerospike.client.BatchRecord; +import com.aerospike.client.Key; +import com.aerospike.client.Operation; +import com.aerospike.client.Record; +import com.aerospike.client.ResultCode; +import com.aerospike.client.Value; +import com.aerospike.client.cluster.Cluster; +import com.aerospike.client.cluster.Connection; +import com.aerospike.client.cluster.Node; +import com.aerospike.client.cluster.Partition; +import com.aerospike.client.metrics.LatencyType; +import com.aerospike.client.policy.BatchPolicy; +import com.aerospike.client.policy.Policy; + +public final class BatchSingle { + public static final class OperateRead extends Read { + private final Operation[] ops; + + public OperateRead( + Cluster cluster, + BatchPolicy policy, + Key key, + Operation[] ops, + Record[] records, + int index, + BatchStatus status, + Node node + ) { + super(cluster, policy, key, null, records, index, status, node, true); + this.ops = ops; + } + + @Override + protected void writeBuffer() { + setRead(policy, key, ops); + } + } + + public static class Read extends BaseCommand { + protected final Key key; + private final String[] binNames; + private final Record[] records; + private final int index; + private final boolean isOperation; + + public Read( + Cluster cluster, + Policy policy, + Key key, + String[] binNames, + Record[] records, + int index, + BatchStatus status, + Node node, + boolean isOperation + ) { + super(cluster, policy, status, key, node, false); + this.key = key; + this.binNames = binNames; + this.records = records; + this.index = index; + this.isOperation = isOperation; + } + + @Override + protected void writeBuffer() { + setRead(policy, key, binNames); + } + + @Override + protected void parseResult(Connection conn) throws IOException { + RecordParser rp = new RecordParser(conn, dataBuffer); + + if (rp.resultCode == ResultCode.OK) { + records[index] = rp.parseRecord(isOperation); + } + } + } + + public static final class ReadHeader extends BaseCommand { + private final Key key; + private final Record[] records; + private final int index; + + public ReadHeader( + Cluster cluster, + Policy policy, + Key key, + Record[] records, + int index, + BatchStatus status, + Node node + ) { + super(cluster, policy, status, key, node, false); + this.key = key; + this.records = records; + this.index = index; + } + + @Override + protected void writeBuffer() { + setReadHeader(policy, key); + } + + @Override + protected void parseResult(Connection conn) throws IOException { + conn.readFully(dataBuffer, Command.MSG_TOTAL_HEADER_SIZE, Command.STATE_READ_HEADER); + conn.updateLastUsed(); + + int resultCode = dataBuffer[13] & 0xFF; + + if (resultCode == 0) { + int generation = Buffer.bytesToInt(dataBuffer, 14); + int expiration = Buffer.bytesToInt(dataBuffer, 18); + records[index] = new Record(null, generation, expiration); + } + } + } + + public static class ReadRecord extends BaseCommand { + private final BatchRead record; + + public ReadRecord( + Cluster cluster, + Policy policy, + BatchRead record, + BatchStatus status, + Node node + ) { + super(cluster, policy, status, record.key, node, false); + this.record = record; + } + + @Override + protected void writeBuffer() { + setRead(policy, record); + } + + @Override + protected void parseResult(Connection conn) throws IOException { + RecordParser rp = new RecordParser(conn, dataBuffer); + + if (rp.resultCode == ResultCode.OK) { + record.setRecord(rp.parseRecord(true)); + } + else { + record.setError(rp.resultCode, false); + status.setRowError(); + } + } + } + + public static final class Exists extends BaseCommand { + private final Key key; + private final boolean[] existsArray; + private final int index; + + public Exists( + Cluster cluster, + Policy policy, + Key key, + boolean[] existsArray, + int index, + BatchStatus status, + Node node + ) { + super(cluster, policy, status, key, node, false); + this.key = key; + this.existsArray = existsArray; + this.index = index; + } + + @Override + protected void writeBuffer() { + setExists(policy, key); + } + + @Override + protected void parseResult(Connection conn) throws IOException { + // Read header. + conn.readFully(dataBuffer, Command.MSG_TOTAL_HEADER_SIZE, Command.STATE_READ_HEADER); + conn.updateLastUsed(); + + int resultCode = dataBuffer[13] & 0xFF; + existsArray[index] = resultCode == 0; + } + } + + public static final class OperateBatchRecord extends BaseCommand { + private final Operation[] ops; + private final BatchAttr attr; + private final BatchRecord record; + + public OperateBatchRecord( + Cluster cluster, + BatchPolicy policy, + Operation[] ops, + BatchAttr attr, + BatchRecord record, + BatchStatus status, + Node node + ) { + super(cluster, policy, status, record.key, node, attr.hasWrite); + this.ops = ops; + this.attr = attr; + this.record = record; + } + + @Override + protected void writeBuffer() { + setOperate(policy, attr, record.key, ops); + } + + @Override + protected void parseResult(Connection conn) throws IOException { + RecordParser rp = new RecordParser(conn, dataBuffer); + + if (rp.resultCode == ResultCode.OK) { + record.setRecord(rp.parseRecord(true)); + } + else { + record.setError(rp.resultCode, Command.batchInDoubt(attr.hasWrite, commandSentCounter)); + status.setRowError(); + } + } + + @Override + public void setInDoubt() { + if (record.resultCode == ResultCode.NO_RESPONSE) { + record.inDoubt = true; + } + } + } + + public static final class Delete extends BaseCommand { + private final BatchAttr attr; + private final BatchRecord record; + + public Delete( + Cluster cluster, + BatchPolicy policy, + BatchAttr attr, + BatchRecord record, + BatchStatus status, + Node node + ) { + super(cluster, policy, status, record.key, node, true); + this.attr = attr; + this.record = record; + } + + @Override + protected void writeBuffer() { + setDelete(policy, record.key, attr); + } + + @Override + protected void parseResult(Connection conn) throws IOException { + // Read header. + conn.readFully(dataBuffer, Command.MSG_TOTAL_HEADER_SIZE, Command.STATE_READ_HEADER); + conn.updateLastUsed(); + + int resultCode = dataBuffer[13] & 0xFF; + int generation = Buffer.bytesToInt(dataBuffer, 14); + int expiration = Buffer.bytesToInt(dataBuffer, 18); + + if (resultCode == ResultCode.OK || resultCode == ResultCode.KEY_NOT_FOUND_ERROR) { + record.setRecord(new Record(null, generation, expiration)); + } + else { + record.setError(resultCode, Command.batchInDoubt(true, commandSentCounter)); + status.setRowError(); + } + } + + @Override + public void setInDoubt() { + if (record.resultCode == ResultCode.NO_RESPONSE) { + record.inDoubt = true; + } + } + } + + public static final class UDF extends BaseCommand { + private final String packageName; + private final String functionName; + private final Value[] args; + private final BatchAttr attr; + private final BatchRecord record; + + public UDF( + Cluster cluster, + BatchPolicy policy, + String packageName, + String functionName, + Value[] args, + BatchAttr attr, + BatchRecord record, + BatchStatus status, + Node node + ) { + super(cluster, policy, status, record.key, node, true); + this.packageName = packageName; + this.functionName = functionName; + this.args = args; + this.attr = attr; + this.record = record; + } + + @Override + protected void writeBuffer() { + setUdf(policy, attr, record.key, packageName, functionName, args); + } + + @Override + protected void parseResult(Connection conn) throws IOException { + RecordParser rp = new RecordParser(conn, dataBuffer); + + if (rp.resultCode == ResultCode.OK) { + record.setRecord(rp.parseRecord(false)); + } + else if (rp.resultCode == ResultCode.UDF_BAD_RESPONSE) { + Record r = rp.parseRecord(false); + String m = r.getString("FAILURE"); + + if (m != null) { + // Need to store record because failure bin contains an error message. + record.record = r; + record.resultCode = rp.resultCode; + record.inDoubt = Command.batchInDoubt(true, commandSentCounter); + status.setRowError(); + } + } + else { + record.setError(rp.resultCode, Command.batchInDoubt(true, commandSentCounter)); + status.setRowError(); + } + } + + @Override + public void setInDoubt() { + if (record.resultCode == ResultCode.NO_RESPONSE) { + record.inDoubt = true; + } + } + } + + public static abstract class BaseCommand extends SyncCommand implements IBatchCommand { + BatchExecutor parent; + BatchStatus status; + Key key; + Node node; + int sequence; + boolean hasWrite; + + public BaseCommand( + Cluster cluster, + Policy policy, + BatchStatus status, + Key key, + Node node, + boolean hasWrite + ) { + super(cluster, policy); + this.status = status; + this.key = key; + this.node = node; + this.hasWrite = hasWrite; + } + + public void setParent(BatchExecutor parent) { + this.parent = parent; + } + + @Override + public void run() { + try { + execute(); + } + catch (AerospikeException ae) { + if (ae.getInDoubt()) { + setInDoubt(); + } + status.setException(ae); + } + catch (RuntimeException re) { + setInDoubt(); + status.setException(re); + } + catch (Throwable e) { + setInDoubt(); + status.setException(new RuntimeException(e)); + } + finally { + parent.onComplete(); + } + } + + @Override + protected boolean isWrite() { + return hasWrite; + } + + @Override + protected Node getNode() { + return node; + } + + @Override + protected LatencyType getLatencyType() { + return LatencyType.BATCH; + } + + @Override + protected boolean prepareRetry(boolean timeout) { + if (hasWrite) { + Partition p = Partition.write(cluster, policy, key); + p.sequence = sequence; + p.prevNode = node; + p.prepareRetryWrite(timeout); + node = p.getNodeWrite(cluster); + sequence = p.sequence; + } + else { + Partition p = Partition.read(cluster, policy, key); + p.sequence = sequence; + p.prevNode = node; + p.prepareRetryRead(timeout); + node = p.getNodeRead(cluster); + sequence = p.sequence; + } + return true; + } + + public void setInDoubt() { + } + } +} diff --git a/client/src/com/aerospike/client/command/Command.java b/client/src/com/aerospike/client/command/Command.java index 85c0de589..59ace4b6a 100644 --- a/client/src/com/aerospike/client/command/Command.java +++ b/client/src/com/aerospike/client/command/Command.java @@ -37,6 +37,7 @@ import com.aerospike.client.cluster.Cluster; import com.aerospike.client.exp.Expression; import com.aerospike.client.policy.BatchPolicy; +import com.aerospike.client.policy.BatchReadPolicy; import com.aerospike.client.policy.CommitLevel; import com.aerospike.client.policy.Policy; import com.aerospike.client.policy.QueryPolicy; @@ -184,6 +185,15 @@ public void setDelete(WritePolicy policy, Key key) { end(); } + public void setDelete(Policy policy, Key key, BatchAttr attr) { + begin(); + Expression exp = getBatchExpression(policy, attr); + int fieldCount = estimateKeyAttrSize(key, attr, exp); + sizeBuffer(); + writeKeyAttr(key, attr, exp, fieldCount, 0); + end(); + } + public final void setTouch(WritePolicy policy, Key key) { begin(); int fieldCount = estimateKeySize(policy, key); @@ -275,6 +285,89 @@ public final void setRead(Policy policy, Key key, String[] binNames) { } } + public final void setRead(Policy policy, BatchRead br) { + begin(); + + BatchReadPolicy rp = br.policy; + BatchAttr attr = new BatchAttr(); + Expression exp; + int opCount; + + if (rp != null) { + attr.setRead(rp); + exp = (rp.filterExp != null) ? rp.filterExp : policy.filterExp; + } + else { + attr.setRead(policy); + exp = policy.filterExp; + } + + if (br.binNames != null) { + opCount = br.binNames.length; + + for (String binName : br.binNames) { + estimateOperationSize(binName); + } + } + else if (br.ops != null) { + attr.adjustRead(br.ops); + opCount = br.ops.length; + + for (Operation op : br.ops) { + if (op.type.isWrite) { + throw new AerospikeException(ResultCode.PARAMETER_ERROR, "Write operations not allowed in read"); + } + estimateOperationSize(op); + } + } + else { + attr.adjustRead(br.readAllBins); + opCount = 0; + } + + int fieldCount = estimateKeyAttrSize(br.key, attr, exp); + + sizeBuffer(); + writeKeyAttr(br.key, attr, exp, fieldCount, opCount); + + if (br.binNames != null) { + for (String binName : br.binNames) { + writeOperation(binName, Operation.Type.READ); + } + } + else if (br.ops != null) { + for (Operation op : br.ops) { + writeOperation(op); + } + } + end(); + } + + public final void setRead(Policy policy, Key key, Operation[] ops) { + begin(); + + BatchAttr attr = new BatchAttr(); + attr.setRead(policy); + attr.adjustRead(ops); + + int fieldCount = estimateKeyAttrSize(key, attr, policy.filterExp); + + for (Operation op : ops) { + if (op.type.isWrite) { + throw new AerospikeException(ResultCode.PARAMETER_ERROR, "Write operations not allowed in read"); + } + estimateOperationSize(op); + } + + sizeBuffer(); + writeKeyAttr(key, attr, policy.filterExp, fieldCount, ops.length); + + for (Operation op : ops) { + writeOperation(op); + } + end(); + } + public final void setReadHeader(Policy policy, Key key) { begin(); int fieldCount = estimateKeySize(policy, key); @@ -323,6 +416,22 @@ public final void setOperate(WritePolicy policy, Key key, OperateArgs args) { compress(policy); } + public final void setOperate(Policy policy, BatchAttr attr, Key key, Operation[] ops) { + begin(); + Expression exp = getBatchExpression(policy, attr); + int fieldCount = estimateKeyAttrSize(key, attr, exp); + + dataOffset += attr.opSize; + sizeBuffer(); + writeKeyAttr(key, attr, exp, fieldCount, ops.length); + + for (Operation op : ops) { + writeOperation(op); + } + end(); + compress(policy); + } + //-------------------------------------------------- // UDF //-------------------------------------------------- @@ -354,6 +463,26 @@ public final void setUdf(WritePolicy policy, Key key, String packageName, String compress(policy); } + public final void setUdf(Policy policy, BatchAttr attr, Key key, String packageName, String functionName, Value[] args) { + byte[] argBytes = Packer.pack(args); + setUdf(policy, attr, key, packageName, functionName, argBytes); + } + + public final void setUdf(Policy policy, BatchAttr attr, Key key, String packageName, String functionName, byte[] argBytes) { + begin(); + Expression exp = getBatchExpression(policy, attr); + int fieldCount = estimateKeyAttrSize(key, attr, exp); + fieldCount += estimateUdfSize(packageName, functionName, argBytes); + + sizeBuffer(); + writeKeyAttr(key, attr, exp, fieldCount, 0); + writeField(packageName, FieldType.UDF_PACKAGE_NAME); + writeField(functionName, FieldType.UDF_FUNCTION); + writeField(argBytes, FieldType.UDF_ARGLIST); + end(); + compress(policy); + } + //-------------------------------------------------- // Batch Read Only //-------------------------------------------------- @@ -1559,6 +1688,21 @@ else if (binNames != null && (isNew || filter == null)) { // Command Sizing //-------------------------------------------------- + private final int estimateKeyAttrSize(Key key, BatchAttr attr, Expression filterExp) { + int fieldCount = estimateKeySize(key); + + if (attr.sendKey) { + dataOffset += key.userKey.estimateSize() + FIELD_HEADER_SIZE + 1; + fieldCount++; + } + + if (filterExp != null) { + dataOffset += filterExp.size(); + fieldCount++; + } + return fieldCount; + } + private final int estimateKeySize(Policy policy, Key key) { int fieldCount = estimateKeySize(key); @@ -1870,6 +2014,35 @@ private final void writeHeaderReadHeader(Policy policy, int readAttr, int fieldC dataOffset = MSG_TOTAL_HEADER_SIZE; } + /** + * Header write for batch single commands. + */ + private final void writeKeyAttr(Key key, BatchAttr attr, Expression filterExp, int fieldCount, int operationCount) { + // Write all header data except total size which must be written last. + dataBuffer[8] = MSG_REMAINING_HEADER_SIZE; // Message header length. + dataBuffer[9] = (byte)attr.readAttr; + dataBuffer[10] = (byte)attr.writeAttr; + dataBuffer[11] = (byte)attr.infoAttr; + dataBuffer[12] = 0; // unused + dataBuffer[13] = 0; // clear the result code + Buffer.intToBytes(attr.generation, dataBuffer, 14); + Buffer.intToBytes(attr.expiration, dataBuffer, 18); + Buffer.intToBytes(serverTimeout, dataBuffer, 22); + Buffer.shortToBytes(fieldCount, dataBuffer, 26); + Buffer.shortToBytes(operationCount, dataBuffer, 28); + dataOffset = MSG_TOTAL_HEADER_SIZE; + + writeKey(key); + + if (attr.sendKey) { + writeField(key.userKey, FieldType.KEY); + } + + if (filterExp != null) { + filterExp.write(this); + } + } + private final void writeKey(Policy policy, Key key) { writeKey(key); diff --git a/client/src/com/aerospike/client/command/IBatchCommand.java b/client/src/com/aerospike/client/command/IBatchCommand.java new file mode 100644 index 000000000..c00339cf9 --- /dev/null +++ b/client/src/com/aerospike/client/command/IBatchCommand.java @@ -0,0 +1,23 @@ +/* + * Copyright 2012-2023 Aerospike, Inc. + * + * Portions may be licensed to Aerospike, Inc. under one or more contributor + * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.aerospike.client.command; + +public interface IBatchCommand extends Runnable { + void setParent(BatchExecutor parent); + void execute(); + void setInDoubt(); +} diff --git a/client/src/com/aerospike/client/command/OperateArgs.java b/client/src/com/aerospike/client/command/OperateArgs.java index 13c77d2cb..c79f4e64e 100644 --- a/client/src/com/aerospike/client/command/OperateArgs.java +++ b/client/src/com/aerospike/client/command/OperateArgs.java @@ -34,7 +34,6 @@ public OperateArgs( WritePolicy policy, WritePolicy writeDefault, WritePolicy readDefault, - Key key, Operation[] operations ) { this.operations = operations; diff --git a/client/src/com/aerospike/client/command/ReadCommand.java b/client/src/com/aerospike/client/command/ReadCommand.java index 6d78e128a..0bbccc12f 100644 --- a/client/src/com/aerospike/client/command/ReadCommand.java +++ b/client/src/com/aerospike/client/command/ReadCommand.java @@ -17,8 +17,6 @@ package com.aerospike.client.command; import java.io.IOException; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; @@ -82,118 +80,32 @@ protected void writeBuffer() { @Override protected void parseResult(Connection conn) throws IOException { - // Read header. - conn.readFully(dataBuffer, 8, Command.STATE_READ_HEADER); + RecordParser rp = new RecordParser(conn, dataBuffer); - long sz = Buffer.bytesToLong(dataBuffer, 0); - int receiveSize = (int)(sz & 0xFFFFFFFFFFFFL); - - if (receiveSize <= 0) { - throw new AerospikeException("Invalid receive size: " + receiveSize); - } - - /* - byte version = (byte) (((int)(sz >> 56)) & 0xff); - if (version != MSG_VERSION) { - if (Log.debugEnabled()) { - Log.debug("read header: incorrect version."); - } - } - - if (type != MSG_TYPE) { - if (Log.debugEnabled()) { - Log.debug("read header: incorrect message type, aborting receive"); - } - } - - if (headerLength != MSG_REMAINING_HEADER_SIZE) { - if (Log.debugEnabled()) { - Log.debug("read header: unexpected header size, aborting"); - } - }*/ - - // Read remaining message bytes. - sizeBuffer(receiveSize); - conn.readFully(dataBuffer, receiveSize, Command.STATE_READ_DETAIL); - conn.updateLastUsed(); - - long type = (sz >> 48) & 0xff; - - if (type == Command.AS_MSG_TYPE) { - dataOffset = 5; - } - else if (type == Command.MSG_TYPE_COMPRESSED) { - int usize = (int)Buffer.bytesToLong(dataBuffer, 0); - byte[] buf = new byte[usize]; - - Inflater inf = new Inflater(); - try { - inf.setInput(dataBuffer, 8, receiveSize - 8); - int rsize; - - try { - rsize = inf.inflate(buf); - } - catch (DataFormatException dfe) { - throw new AerospikeException.Serialize(dfe); - } - - if (rsize != usize) { - throw new AerospikeException("Decompressed size " + rsize + " is not expected " + usize); - } - - dataBuffer = buf; - dataOffset = 13; - } finally { - inf.end(); - } - } - else { - throw new AerospikeException("Invalid proto type: " + type + " Expected: " + Command.AS_MSG_TYPE); - } - - int resultCode = dataBuffer[dataOffset] & 0xFF; - dataOffset++; - int generation = Buffer.bytesToInt(dataBuffer, dataOffset); - dataOffset += 4; - int expiration = Buffer.bytesToInt(dataBuffer, dataOffset); - dataOffset += 8; - int fieldCount = Buffer.bytesToShort(dataBuffer, dataOffset); - dataOffset += 2; - int opCount = Buffer.bytesToShort(dataBuffer, dataOffset); - dataOffset += 2; - - if (resultCode == 0) { - if (opCount == 0) { - // Bin data was not returned. - record = new Record(null, generation, expiration); - return; - } - skipKey(fieldCount); - record = parseRecord(opCount, generation, expiration, isOperation); + if (rp.resultCode == ResultCode.OK) { + this.record = rp.parseRecord(isOperation); return; } - if (resultCode == ResultCode.KEY_NOT_FOUND_ERROR) { - handleNotFound(resultCode); + if (rp.resultCode == ResultCode.KEY_NOT_FOUND_ERROR) { + handleNotFound(rp.resultCode); return; } - if (resultCode == ResultCode.FILTERED_OUT) { + if (rp.resultCode == ResultCode.FILTERED_OUT) { if (policy.failOnFilteredOut) { - throw new AerospikeException(resultCode); + throw new AerospikeException(rp.resultCode); } return; } - if (resultCode == ResultCode.UDF_BAD_RESPONSE) { - skipKey(fieldCount); - record = parseRecord(opCount, generation, expiration, isOperation); - handleUdfError(resultCode); + if (rp.resultCode == ResultCode.UDF_BAD_RESPONSE) { + this.record = rp.parseRecord(isOperation); + handleUdfError(rp.resultCode); return; } - throw new AerospikeException(resultCode); + throw new AerospikeException(rp.resultCode); } @Override diff --git a/client/src/com/aerospike/client/command/RecordParser.java b/client/src/com/aerospike/client/command/RecordParser.java new file mode 100644 index 000000000..75d18784a --- /dev/null +++ b/client/src/com/aerospike/client/command/RecordParser.java @@ -0,0 +1,209 @@ +/* + * Copyright 2012-2023 Aerospike, Inc. + * + * Portions may be licensed to Aerospike, Inc. under one or more contributor + * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.aerospike.client.command; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +import com.aerospike.client.AerospikeException; +import com.aerospike.client.Record; +import com.aerospike.client.cluster.Connection; +import com.aerospike.client.command.Command.OpResults; +import com.aerospike.client.util.ThreadLocalData; + +public final class RecordParser { + public final byte[] dataBuffer; + public final int resultCode; + public final int generation; + public final int expiration; + public final int fieldCount; + public final int opCount; + public int dataOffset; + + /** + * Sync record parser. + */ + public RecordParser(Connection conn, byte[] buffer) throws IOException { + // Read header. + conn.readFully(buffer, 8, Command.STATE_READ_HEADER); + + long sz = Buffer.bytesToLong(buffer, 0); + int receiveSize = (int)(sz & 0xFFFFFFFFFFFFL); + + if (receiveSize <= 0) { + throw new AerospikeException("Invalid receive size: " + receiveSize); + } + + /* + byte version = (byte) (((int)(sz >> 56)) & 0xff); + if (version != MSG_VERSION) { + if (Log.debugEnabled()) { + Log.debug("read header: incorrect version."); + } + } + + if (type != MSG_TYPE) { + if (Log.debugEnabled()) { + Log.debug("read header: incorrect message type, aborting receive"); + } + } + + if (headerLength != MSG_REMAINING_HEADER_SIZE) { + if (Log.debugEnabled()) { + Log.debug("read header: unexpected header size, aborting"); + } + }*/ + + // Read remaining message bytes. + if (receiveSize > buffer.length) { + buffer = ThreadLocalData.resizeBuffer(receiveSize); + } + + conn.readFully(buffer, receiveSize, Command.STATE_READ_DETAIL); + conn.updateLastUsed(); + + long type = (sz >> 48) & 0xff; + int offset; + + if (type == Command.AS_MSG_TYPE) { + offset = 5; + } + else if (type == Command.MSG_TYPE_COMPRESSED) { + int usize = (int)Buffer.bytesToLong(buffer, 0); + byte[] buf = new byte[usize]; + + Inflater inf = new Inflater(); + try { + inf.setInput(buffer, 8, receiveSize - 8); + int rsize; + + try { + rsize = inf.inflate(buf); + } + catch (DataFormatException dfe) { + throw new AerospikeException.Serialize(dfe); + } + + if (rsize != usize) { + throw new AerospikeException("Decompressed size " + rsize + " is not expected " + usize); + } + + buffer = buf; + offset = 13; + } finally { + inf.end(); + } + } + else { + throw new AerospikeException("Invalid proto type: " + type + " Expected: " + Command.AS_MSG_TYPE); + } + + this.resultCode = buffer[offset] & 0xFF; + offset++; + this.generation = Buffer.bytesToInt(buffer, offset); + offset += 4; + this.expiration = Buffer.bytesToInt(buffer, offset); + offset += 8; + this.fieldCount = Buffer.bytesToShort(buffer, offset); + offset += 2; + this.opCount = Buffer.bytesToShort(buffer, offset); + offset += 2; + this.dataOffset = offset; + this.dataBuffer = buffer; + } + + /** + * Async record parser. + */ + public RecordParser(byte[] buffer, int offset, int receiveSize) { + if (receiveSize < Command.MSG_REMAINING_HEADER_SIZE) { + throw new AerospikeException.Parse("Invalid receive size: " + receiveSize); + } + + offset += 5; + this.resultCode = buffer[offset] & 0xFF; + offset++; + this.generation = Buffer.bytesToInt(buffer, offset); + offset += 4; + this.expiration = Buffer.bytesToInt(buffer, offset); + offset += 8; + this.fieldCount = Buffer.bytesToShort(buffer, offset); + offset += 2; + this.opCount = Buffer.bytesToShort(buffer, offset); + offset += 2; + this.dataOffset = offset; + this.dataBuffer = buffer; + } + + public Record parseRecord(boolean isOperation) { + if (opCount == 0) { + // Bin data was not returned. + return new Record(null, generation, expiration); + } + + // Skip key. + for (int i = 0; i < fieldCount; i++) { + int fieldlen = Buffer.bytesToInt(dataBuffer, dataOffset); + dataOffset += 4 + fieldlen; + } + + // Parse record. + Map bins = new LinkedHashMap<>(); + + for (int i = 0 ; i < opCount; i++) { + int opSize = Buffer.bytesToInt(dataBuffer, dataOffset); + byte particleType = dataBuffer[dataOffset + 5]; + byte nameSize = dataBuffer[dataOffset + 7]; + String name = Buffer.utf8ToString(dataBuffer, dataOffset + 8, nameSize); + dataOffset += 4 + 4 + nameSize; + + int particleBytesSize = opSize - (4 + nameSize); + Object value = Buffer.bytesToParticle(particleType, dataBuffer, dataOffset, particleBytesSize); + dataOffset += particleBytesSize; + + if (isOperation) { + if (bins.containsKey(name)) { + // Multiple values returned for the same bin. + Object prev = bins.get(name); + + if (prev instanceof OpResults) { + // List already exists. Add to it. + OpResults list = (OpResults)prev; + list.add(value); + } + else { + // Make a list to store all values. + OpResults list = new OpResults(); + list.add(prev); + list.add(value); + bins.put(name, list); + } + } + else { + bins.put(name, value); + } + } + else { + bins.put(name, value); + } + } + return new Record(bins, generation, expiration); + } +} diff --git a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java index ba1208ea7..32305f660 100644 --- a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java +++ b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java @@ -1460,7 +1460,7 @@ public Record operate(WritePolicy policy, Key key, Operation... operations) { */ @Override public void operate(EventLoop eventLoop, RecordListener listener, WritePolicy policy, Key key, Operation... operations) { - OperateArgs args = new OperateArgs(policy, writePolicyDefault, operatePolicyReadDefault, key, operations); + OperateArgs args = new OperateArgs(policy, writePolicyDefault, operatePolicyReadDefault, operations); OperateCommandProxy command = new OperateCommandProxy(executor, listener, args.writePolicy, key, args); command.execute(); } diff --git a/test/src/com/aerospike/test/async/TestAsyncBatch.java b/test/src/com/aerospike/test/async/TestAsyncBatch.java index 31fdaed4f..f6ffbf0b8 100644 --- a/test/src/com/aerospike/test/async/TestAsyncBatch.java +++ b/test/src/com/aerospike/test/async/TestAsyncBatch.java @@ -44,6 +44,7 @@ import com.aerospike.client.listener.BatchOperateListListener; import com.aerospike.client.listener.BatchRecordArrayListener; import com.aerospike.client.listener.BatchRecordSequenceListener; +import com.aerospike.client.listener.BatchSequenceListener; import com.aerospike.client.listener.ExistsArrayListener; import com.aerospike.client.listener.ExistsSequenceListener; import com.aerospike.client.listener.RecordArrayListener; @@ -63,6 +64,7 @@ public class TestAsyncBatch extends TestAsync { private static final int Size = 8; private static Key[] sendKeys; private static Key[] deleteKeys; + private static Key[] deleteKeysSequence; @BeforeClass public static void initialize() { @@ -76,6 +78,10 @@ public static void initialize() { deleteKeys[0] = new Key(args.namespace, args.set, 10000); deleteKeys[1] = new Key(args.namespace, args.set, 10001); + deleteKeysSequence = new Key[2]; + deleteKeysSequence[0] = new Key(args.namespace, args.set, 11000); + deleteKeysSequence[1] = new Key(args.namespace, args.set, 11001); + AsyncMonitor monitor = new AsyncMonitor(); WriteListener listener = new WriteListener() { @@ -84,7 +90,7 @@ public static void initialize() { public void onSuccess(final Key key) { // Use non-atomic increment because all writes are performed // in the same event loop thread. - if (++count == Size + 3) { + if (++count == Size + 5) { monitor.notifyComplete(); } } @@ -139,6 +145,8 @@ public void onFailure(AerospikeException e) { client.put(eventLoop, listener, policy, deleteKeys[0], new Bin(BinName, 10000)); client.put(eventLoop, listener, policy, deleteKeys[1], new Bin(BinName, 10001)); client.put(eventLoop, listener, policy, new Key(args.namespace, args.set, 10002), new Bin(BinName, 10002)); + client.put(eventLoop, listener, policy, deleteKeysSequence[0], new Bin(BinName, 11000)); + client.put(eventLoop, listener, policy, deleteKeysSequence[1], new Bin(BinName, 11001)); monitor.waitTillComplete(); } @@ -214,6 +222,36 @@ public void onFailure(AerospikeException e) { waitTillComplete(); } + @Test + public void asyncBatchGetArrayBinName() throws Exception { + client.get(eventLoop, new RecordArrayListener() { + public void onSuccess(Key[] keys, Record[] records) { + if (assertEquals(Size, records.length)) { + for (int i = 0; i < records.length; i++) { + if (i != 5) { + if (! assertBinEqual(keys[i], records[i], BinName, ValuePrefix + (i + 1))) { + break; + } + } + else { + if (! assertBinEqual(keys[i], records[i], BinName, i + 1)) { + break; + } + } + } + } + notifyComplete(); + } + + public void onFailure(AerospikeException e) { + setError(e); + notifyComplete(); + } + }, null, sendKeys, BinName); + + waitTillComplete(); + } + @Test public void asyncBatchGetSequence() throws Exception { client.get(eventLoop, new RecordSequenceListener() { @@ -237,6 +275,29 @@ public void onFailure(AerospikeException e) { waitTillComplete(); } + @Test + public void asyncBatchGetSequenceBinName() throws Exception { + client.get(eventLoop, new RecordSequenceListener() { + public void onRecord(Key key, Record record) { + if (assertRecordFound(key, record)) { + Object value = record.getValue(BinName); + assertNotNull(value); + } + } + + public void onSuccess() { + notifyComplete(); + } + + public void onFailure(AerospikeException e) { + setError(e); + notifyComplete(); + } + }, null, sendKeys, BinName); + + waitTillComplete(); + } + @Test public void asyncBatchGetHeaders() throws Exception { client.getHeader(eventLoop, new RecordArrayListener() { @@ -270,6 +331,60 @@ public void onFailure(AerospikeException e) { waitTillComplete(); } + @Test + public void asyncBatchGetHeadersSeq() throws Exception { + client.getHeader(eventLoop, new RecordSequenceListener() { + int count; + + public void onRecord(Key key, Record record) { + count++; + + int index = getKeyIndex(key); + + if (!assertTrue(index >= 0)) { + notifyComplete(); + return; + } + + if (! assertRecordFound(sendKeys[index], record)) { + notifyComplete(); + return; + } + + if (! assertGreaterThanZero(record.generation)) { + notifyComplete(); + return; + } + + if (args.hasTtl && !assertGreaterThanZero(record.expiration)) { + notifyComplete(); + return; + } + } + + public void onSuccess() { + assertEquals(Size, count); + notifyComplete(); + } + + public void onFailure(AerospikeException e) { + setError(e); + notifyComplete(); + } + + private int getKeyIndex(Key key) { + for (int i = 0; i < sendKeys.length; i++) { + if (key == sendKeys[i]) { + return i; + } + } + return -1; + } + }, null, sendKeys); + + waitTillComplete(); + } + @Test public void asyncBatchReadComplex() throws Exception { // Batch gets into one call. @@ -348,6 +463,106 @@ public void onFailure(AerospikeException e) { waitTillComplete(); } + @Test + public void asyncBatchReadComplexSeq() throws Exception { + Expression exp = Exp.build(Exp.mul(Exp.intBin(BinName), Exp.val(8))); + Operation[] ops = Operation.array(ExpOperation.read(BinName, exp, ExpReadFlags.DEFAULT)); + + String[] bins = new String[] {BinName}; + + Key[] keys = new Key[9]; + keys[0] = new Key(args.namespace, args.set, KeyPrefix + 1); + keys[1] = new Key(args.namespace, args.set, KeyPrefix + 2); + keys[2] = new Key(args.namespace, args.set, KeyPrefix + 3); + keys[3] = new Key(args.namespace, args.set, KeyPrefix + 4); + keys[4] = new Key(args.namespace, args.set, KeyPrefix + 5); + keys[5] = new Key(args.namespace, args.set, KeyPrefix + 6); + keys[6] = new Key(args.namespace, args.set, KeyPrefix + 7); + keys[7] = new Key(args.namespace, args.set, KeyPrefix + 8); + keys[8] = new Key(args.namespace, args.set, "keynotfound"); + + List records = new ArrayList(); + records.add(new BatchRead(keys[0], bins)); + records.add(new BatchRead(keys[1], true)); + records.add(new BatchRead(keys[2], true)); + records.add(new BatchRead(keys[3], false)); + records.add(new BatchRead(keys[4], true)); + records.add(new BatchRead(keys[5], ops)); + records.add(new BatchRead(keys[6], bins)); + + // This record should be found, but the requested bin will not be found. + records.add(new BatchRead(keys[7], new String[] {"binnotfound"})); + + // This record should not be found. + records.add(new BatchRead(keys[8], bins)); + + // Execute batch. + client.get(eventLoop, new BatchSequenceListener() { + private int found; + + public void onRecord(BatchRead record) { + Record rec = record.record; + + if (rec != null) { + found++; + + int index = getKeyIndex(record.key); + + if (!assertTrue(index >= 0)) { + notifyComplete(); + return; + } + + if (index != 3 && index != 5 && index <= 6) { + Object value = rec.getValue(BinName); + + if (!assertEquals(ValuePrefix + (index+1), value)) { + notifyComplete(); + return; + } + } + else if (index == 5) { + int value = rec.getInt(BinName); + + if (!assertEquals(48, value)) { + notifyComplete(); + return; + } + } + else { + Object value = rec.getValue(BinName); + + if (!assertNull(value)) { + notifyComplete(); + return; + } + } + } + } + + private int getKeyIndex(Key key) { + for (int i = 0; i < keys.length; i++) { + if (key == keys[i]) { + return i; + } + } + return -1; + } + + public void onSuccess() { + assertEquals(8, found); + notifyComplete(); + } + + public void onFailure(AerospikeException e) { + setError(e); + notifyComplete(); + } + }, null, records); + + waitTillComplete(); + } + @Test public void asyncBatchListReadOperate() throws Exception { client.get(eventLoop, new RecordArrayListener() { @@ -378,6 +593,62 @@ public void onFailure(AerospikeException e) { waitTillComplete(); } + @Test + public void asyncBatchListReadOperateSeq() throws Exception { + client.get(eventLoop, new RecordSequenceListener() { + int count; + + public void onRecord(Key key, Record record) { + count++; + + int index = getKeyIndex(key); + + if (!assertTrue(index >= 0)) { + notifyComplete(); + return; + } + + List results = record.getList(ListBin); + long size = (Long)results.get(0); + long val = (Long)results.get(1); + + if (! assertEquals(index + 1, size)) { + notifyComplete(); + return; + } + + if (! assertEquals(index * (index + 1), val)) { + notifyComplete(); + return; + } + } + + public void onSuccess() { + assertEquals(Size, count); + notifyComplete(); + } + + public void onFailure(AerospikeException e) { + setError(e); + notifyComplete(); + } + + private int getKeyIndex(Key key) { + for (int i = 0; i < sendKeys.length; i++) { + if (key == sendKeys[i]) { + return i; + } + } + return -1; + } + }, null, sendKeys, + ListOperation.size(ListBin), + ListOperation.getByIndex(ListBin, -1, ListReturnType.VALUE) + ); + + waitTillComplete(); + } + @Test public void asyncBatchListWriteOperate() { client.operate(eventLoop, new BatchRecordArrayListener() { @@ -618,4 +889,51 @@ public void onFailure(AerospikeException e) { waitTillComplete(); } + + @Test + public void asyncBatchDeleteSequence() { + // Ensure keys exists + client.exists(eventLoop, new ExistsArrayListener() { + public void onSuccess(Key[] keys, boolean[] exists) { + assertEquals(true, exists[0]); + assertEquals(true, exists[1]); + + // Delete keys + client.delete(eventLoop, new BatchRecordSequenceListener() { + public void onRecord(BatchRecord record, int index) { + assertEquals(ResultCode.OK, record.resultCode); + assertTrue(index <= 1); + } + + public void onSuccess() { + // Ensure keys do not exist + client.exists(eventLoop, new ExistsArrayListener() { + public void onSuccess(Key[] keys, boolean[] exists) { + assertEquals(false, exists[0]); + assertEquals(false, exists[1]); + notifyComplete(); + } + + public void onFailure(AerospikeException e) { + setError(e); + notifyComplete(); + } + }, null, deleteKeysSequence); + } + + public void onFailure(AerospikeException e) { + setError(e); + notifyComplete(); + } + }, null, null, keys); + } + + public void onFailure(AerospikeException e) { + setError(e); + notifyComplete(); + } + }, null, deleteKeysSequence); + + waitTillComplete(); + } } diff --git a/test/src/com/aerospike/test/async/TestAsyncUDF.java b/test/src/com/aerospike/test/async/TestAsyncUDF.java index a5c75d108..937cb8e9f 100644 --- a/test/src/com/aerospike/test/async/TestAsyncUDF.java +++ b/test/src/com/aerospike/test/async/TestAsyncUDF.java @@ -97,7 +97,7 @@ public void onFailure(AerospikeException e) { public void asyncBatchUDF() { Key[] keys = new Key[] { new Key(args.namespace, args.set, 20000), - new Key(args.namespace, args.set, 20001) + new Key(args.namespace, args.set, 20003) }; client.delete(null, null, keys); @@ -129,6 +129,51 @@ public void onFailure(BatchRecord[] records, AerospikeException ae) { waitTillComplete(); } + @Test + public void asyncBatchUDFSeq() { + Key[] keys = new Key[] { + new Key(args.namespace, args.set, 20000), + new Key(args.namespace, args.set, 20003) + }; + + client.delete(null, null, keys); + + client.execute(null, new BatchRecordSequenceListener() { + int count; + + public void onRecord(BatchRecord record, int index) { + count++; + + if (!assertTrue(index >= 0 && index <= 1)) { + notifyComplete(); + return; + } + + if (!assertTrue(record.resultCode == 0)) { + notifyComplete(); + return; + } + + if (!assertNotNull(record.record)) { + notifyComplete(); + return; + } + } + + public void onSuccess() { + assertEquals(2, count); + notifyComplete(); + } + + public void onFailure(AerospikeException ae) { + setError(ae); + notifyComplete(); + } + }, null, null, keys, "record_example", "writeBin", Value.get("B5"), Value.get("value5")); + + waitTillComplete(); + } + @Test public void asyncBatchUDFComplex() { List records = new ArrayList(); diff --git a/test/src/com/aerospike/test/sync/basic/TestPutGet.java b/test/src/com/aerospike/test/sync/basic/TestPutGet.java index a5faa0d65..e2f99cd42 100644 --- a/test/src/com/aerospike/test/sync/basic/TestPutGet.java +++ b/test/src/com/aerospike/test/sync/basic/TestPutGet.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 Aerospike, Inc. + * Copyright 2012-2023 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -16,6 +16,7 @@ */ package com.aerospike.test.sync.basic; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -25,6 +26,8 @@ import com.aerospike.client.Bin; import com.aerospike.client.Key; import com.aerospike.client.Record; +import com.aerospike.client.policy.Policy; +import com.aerospike.client.policy.WritePolicy; import com.aerospike.test.sync.TestSync; public class TestPutGet extends TestSync { @@ -73,4 +76,33 @@ public void putGetBool() { b = record.getBoolean(bin4.name); assertTrue(b); } + + @Test + public void putGetCompress() { + Key key = new Key(args.namespace, args.set, "pgc"); + byte[] bytes = new byte[2000]; + + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte)(i % 256); + } + + Bin bin = new Bin("bb", bytes); + + WritePolicy wp = new WritePolicy(); + wp.compress = true; + + client.put(wp, key, bin); + + Policy p = new Policy(); + p.compress = true; + + Record record = client.get(p, key); + byte[] rcv = record.getBytes("bb"); + assertEquals(2000, rcv.length); + + for (int i = 0; i < rcv.length; i++) { + byte b = (byte)(i % 256); + assertEquals(b, rcv[i]); + } + } }