From 57cd1f8f2fa359cd911de57b3283830aa17010c0 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Tue, 19 Nov 2024 15:21:43 -0500 Subject: [PATCH 1/9] Fix AerospikeException.Commit api doc reference. --- client/src/com/aerospike/client/AerospikeException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/com/aerospike/client/AerospikeException.java b/client/src/com/aerospike/client/AerospikeException.java index 7a092a086..2f1153cb9 100644 --- a/client/src/com/aerospike/client/AerospikeException.java +++ b/client/src/com/aerospike/client/AerospikeException.java @@ -496,7 +496,7 @@ public Backoff(int resultCode) { } /** - * Exception thrown when {@link AerospikeClient#commit(com.aerospike.client.Tran)} fails. + * Exception thrown when a multi-record transaction commit fails. */ public static final class Commit extends AerospikeException { private static final long serialVersionUID = 1L; From 4df73e76bc25b81bedfd4f0b63d8bb123b2c1982 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Wed, 20 Nov 2024 13:50:26 -0500 Subject: [PATCH 2/9] CLIENT-3173 Default client MRT timeout to zero. This means use the server configuration mrt-duration as the MRT timeout. The default mrt-duration is 10 seconds. --- client/src/com/aerospike/client/Txn.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/client/src/com/aerospike/client/Txn.java b/client/src/com/aerospike/client/Txn.java index d8e0e0330..a5601d985 100644 --- a/client/src/com/aerospike/client/Txn.java +++ b/client/src/com/aerospike/client/Txn.java @@ -50,19 +50,24 @@ public static enum State { /** * Create MRT, assign random transaction id and initialize reads/writes hashmaps with default - * capacities. The default MRT timeout is 10 seconds. + * capacities. + *

+ * The default client MRT timeout is zero. This means use the server configuration mrt-duration + * as the MRT timeout. The default mrt-duration is 10 seconds. */ public Txn() { id = createId(); reads = new ConcurrentHashMap<>(); writes = ConcurrentHashMap.newKeySet(); state = Txn.State.OPEN; - timeout = 10; // seconds } /** * Create MRT, assign random transaction id and initialize reads/writes hashmaps with given - * capacities. The default MRT timeout is 10 seconds. + * capacities. + *

+ * The default client MRT timeout is zero. This means use the server configuration mrt-duration + * as the MRT timeout. The default mrt-duration is 10 seconds. * * @param readsCapacity expected number of record reads in the MRT. Minimum value is 16. * @param writesCapacity expected number of record writes in the MRT. Minimum value is 16. @@ -80,7 +85,6 @@ public Txn(int readsCapacity, int writesCapacity) { reads = new ConcurrentHashMap<>(readsCapacity); writes = ConcurrentHashMap.newKeySet(writesCapacity); state = Txn.State.OPEN; - timeout = 10; // seconds } private static long createId() { @@ -110,6 +114,9 @@ public long getId() { * Set MRT timeout in seconds. The timer starts when the MRT monitor record is created. * This occurs when the first command in the MRT is executed. If the timeout is reached before * a commit or abort is called, the server will expire and rollback the MRT. + *

+ * If the MRT timeout is zero, the server configuration mrt-duration is used. + * The default mrt-duration is 10 seconds. */ public void setTimeout(int timeout) { this.timeout = timeout; From 6baf9f7ded03a99c67dade0259cddd9a1f8c36e8 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Thu, 21 Nov 2024 14:30:03 -0500 Subject: [PATCH 3/9] CLIENT-3185 Add ResultCode.MRT_TOO_MANY_WRITES --- client/src/com/aerospike/client/ResultCode.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/client/src/com/aerospike/client/ResultCode.java b/client/src/com/aerospike/client/ResultCode.java index 5d454f80b..2505a83b6 100644 --- a/client/src/com/aerospike/client/ResultCode.java +++ b/client/src/com/aerospike/client/ResultCode.java @@ -269,6 +269,11 @@ public final class ResultCode { */ public static final int MRT_ABORTED = 34; + /** + * MRT write command limit (4096) exceeded. + */ + public static final int MRT_TOO_MANY_WRITES = 35; + /** * There are no more records left for query. */ @@ -631,13 +636,16 @@ public static String getResultString(int resultCode) { case XDR_KEY_BUSY: return "Write can't complete until XDR finishes shipping"; - + case MRT_COMMITTED: return "MRT already committed"; - + case MRT_ABORTED: return "MRT already aborted"; - + + case MRT_TOO_MANY_WRITES: + return "MRT write command limit exceeded"; + case QUERY_END: return "Query end"; From 08f040b6bc2679bdfdeca933732e51e61fa35b82 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Mon, 2 Dec 2024 10:03:17 -0500 Subject: [PATCH 4/9] CLIENT-3197 Change ResultCode values for multi-record transactions. --- .../src/com/aerospike/client/ResultCode.java | 98 +++++++++---------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/client/src/com/aerospike/client/ResultCode.java b/client/src/com/aerospike/client/ResultCode.java index 2505a83b6..d6e0a7eb8 100644 --- a/client/src/com/aerospike/client/ResultCode.java +++ b/client/src/com/aerospike/client/ResultCode.java @@ -238,42 +238,11 @@ public final class ResultCode { */ public static final int LOST_CONFLICT = 28; - /** - * MRT record blocked by a different transaction. - */ - public static final int MRT_BLOCKED = 29; - - /** - * MRT read version mismatch identified during commit. - * Some other command changed the record outside of the transaction. - */ - public static final int MRT_VERSION_MISMATCH = 30; - - /** - * MRT deadline reached without a successful commit or abort. - */ - public static final int MRT_EXPIRED = 31; - /** * Write can't complete until XDR finishes shipping. */ public static final int XDR_KEY_BUSY = 32; - /** - * MRT was already committed. - */ - public static final int MRT_COMMITTED = 33; - - /** - * MRT was already aborted. - */ - public static final int MRT_ABORTED = 34; - - /** - * MRT write command limit (4096) exceeded. - */ - public static final int MRT_TOO_MANY_WRITES = 35; - /** * There are no more records left for query. */ @@ -399,6 +368,37 @@ public final class ResultCode { */ public static final int UDF_BAD_RESPONSE = 100; + /** + * MRT record blocked by a different transaction. + */ + public static final int MRT_BLOCKED = 120; + + /** + * MRT read version mismatch identified during commit. + * Some other command changed the record outside of the transaction. + */ + public static final int MRT_VERSION_MISMATCH = 121; + + /** + * MRT deadline reached without a successful commit or abort. + */ + public static final int MRT_EXPIRED = 122; + + /** + * MRT write command limit (4096) exceeded. + */ + public static final int MRT_TOO_MANY_WRITES = 123; + + /** + * MRT was already committed. + */ + public static final int MRT_COMMITTED = 124; + + /** + * MRT was already aborted. + */ + public static final int MRT_ABORTED = 125; + /** * Batch functionality has been disabled. */ @@ -625,27 +625,9 @@ public static String getResultString(int resultCode) { case LOST_CONFLICT: return "Command failed due to conflict with XDR"; - case MRT_BLOCKED: - return "MRT record blocked by a different transaction"; - - case MRT_VERSION_MISMATCH: - return "MRT version mismatch"; - - case MRT_EXPIRED: - return "MRT expired"; - case XDR_KEY_BUSY: return "Write can't complete until XDR finishes shipping"; - case MRT_COMMITTED: - return "MRT already committed"; - - case MRT_ABORTED: - return "MRT already aborted"; - - case MRT_TOO_MANY_WRITES: - return "MRT write command limit exceeded"; - case QUERY_END: return "Query end"; @@ -721,6 +703,24 @@ public static String getResultString(int resultCode) { case UDF_BAD_RESPONSE: return "UDF returned error"; + case MRT_BLOCKED: + return "MRT record blocked by a different transaction"; + + case MRT_VERSION_MISMATCH: + return "MRT version mismatch"; + + case MRT_EXPIRED: + return "MRT expired"; + + case MRT_TOO_MANY_WRITES: + return "MRT write command limit exceeded"; + + case MRT_COMMITTED: + return "MRT already committed"; + + case MRT_ABORTED: + return "MRT already aborted"; + case BATCH_DISABLED: return "Batch functionality has been disabled"; From 01792e4eebee0fd9b2f8b46ba7ea67c7a1521524 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Mon, 2 Dec 2024 10:43:08 -0500 Subject: [PATCH 5/9] CLIENT-3198 Restore default policy getters to perform a get instead of a copy. --- .../com/aerospike/client/AerospikeClient.java | 86 ++++++++++++------- .../aerospike/client/IAerospikeClient.java | 62 ++++++++----- 2 files changed, 94 insertions(+), 54 deletions(-) diff --git a/client/src/com/aerospike/client/AerospikeClient.java b/client/src/com/aerospike/client/AerospikeClient.java index c22a1a925..861ad251d 100644 --- a/client/src/com/aerospike/client/AerospikeClient.java +++ b/client/src/com/aerospike/client/AerospikeClient.java @@ -367,154 +367,176 @@ protected AerospikeClient(ClientPolicy policy) { //------------------------------------------------------- /** - * Copy read policy default to avoid problems if this shared instance is later modified. + * Return read policy default. Use when the policy will not be modified. */ public final Policy getReadPolicyDefault() { - return new Policy(readPolicyDefault); + return readPolicyDefault; } /** - * Copy read policy default. + * Copy read policy default. Use when the policy will be modified for use in a specific command. */ public final Policy copyReadPolicyDefault() { return new Policy(readPolicyDefault); } /** - * Copy write policy default to avoid problems if this shared instance is later modified. + * Return write policy default. Use when the policy will not be modified. */ public final WritePolicy getWritePolicyDefault() { - return new WritePolicy(writePolicyDefault); + return writePolicyDefault; } /** - * Copy write policy default. + * Copy write policy default. Use when the policy will be modified for use in a specific command. */ public final WritePolicy copyWritePolicyDefault() { return new WritePolicy(writePolicyDefault); } /** - * Copy scan policy default to avoid problems if this shared instance is later modified. + * Return scan policy default. Use when the policy will not be modified. */ public final ScanPolicy getScanPolicyDefault() { - return new ScanPolicy(scanPolicyDefault); + return scanPolicyDefault; } /** - * Copy scan policy default. + * Copy scan policy default. Use when the policy will be modified for use in a specific command. */ public final ScanPolicy copyScanPolicyDefault() { return new ScanPolicy(scanPolicyDefault); } /** - * Copy query policy default to avoid problems if this shared instance is later modified. + * Return query policy default. Use when the policy will not be modified. */ public final QueryPolicy getQueryPolicyDefault() { - return new QueryPolicy(queryPolicyDefault); + return queryPolicyDefault; } /** - * Copy query policy default. + * Copy query policy default. Use when the policy will be modified for use in a specific command. */ public final QueryPolicy copyQueryPolicyDefault() { return new QueryPolicy(queryPolicyDefault); } /** - * Copy batch header read policy default to avoid problems if this shared instance is later modified. + * Return batch header read policy default. Use when the policy will not be modified. */ public final BatchPolicy getBatchPolicyDefault() { - return new BatchPolicy(batchPolicyDefault); + return batchPolicyDefault; } /** - * Copy batch header read policy default. + * Copy batch header read policy default. Use when the policy will be modified for use in a + * specific command. */ public final BatchPolicy copyBatchPolicyDefault() { return new BatchPolicy(batchPolicyDefault); } /** - * Copy batch header write policy default to avoid problems if this shared instance is later modified. + * Return batch header write policy default. Use when the policy will not be modified. */ public final BatchPolicy getBatchParentPolicyWriteDefault() { - return new BatchPolicy(batchParentPolicyWriteDefault); + return batchParentPolicyWriteDefault; } /** - * Copy batch header write policy default. + * Copy batch header write policy default. Use when the policy will be modified for use in a + * specific command. */ public final BatchPolicy copyBatchParentPolicyWriteDefault() { return new BatchPolicy(batchParentPolicyWriteDefault); } /** - * Copy batch detail write policy default to avoid problems if this shared instance is later modified. + * Return batch detail write policy default. Use when the policy will not be modified. */ public final BatchWritePolicy getBatchWritePolicyDefault() { - return new BatchWritePolicy(batchWritePolicyDefault); + return batchWritePolicyDefault; } /** - * Copy batch detail write policy default. + * Copy batch detail write policy default. Use when the policy will be modified for use in a + * specific command. */ public final BatchWritePolicy copyBatchWritePolicyDefault() { return new BatchWritePolicy(batchWritePolicyDefault); } /** - * Copy batch detail delete policy default to avoid problems if this shared instance is later modified. + * Return batch detail delete policy default. Use when the policy will not be modified. */ public final BatchDeletePolicy getBatchDeletePolicyDefault() { - return new BatchDeletePolicy(batchDeletePolicyDefault); + return batchDeletePolicyDefault; } /** - * Copy batch detail delete policy default. + * Copy batch detail delete policy default. Use when the policy will be modified for use in a + * specific command. */ public final BatchDeletePolicy copyBatchDeletePolicyDefault() { return new BatchDeletePolicy(batchDeletePolicyDefault); } /** - * Copy batch detail UDF policy default to avoid problems if this shared instance is later modified. + * Return batch detail UDF policy default. Use when the policy will not be modified. */ public final BatchUDFPolicy getBatchUDFPolicyDefault() { - return new BatchUDFPolicy(batchUDFPolicyDefault); + return batchUDFPolicyDefault; } /** - * Copy batch detail UDF policy default. + * Copy batch detail UDF policy default. Use when the policy will be modified for use in a + * specific command. */ public final BatchUDFPolicy copyBatchUDFPolicyDefault() { return new BatchUDFPolicy(batchUDFPolicyDefault); } /** - * Copy info command policy default to avoid problems if this shared instance is later modified. + * Return info command policy default. Use when the policy will not be modified. */ public final InfoPolicy getInfoPolicyDefault() { - return new InfoPolicy(infoPolicyDefault); + return infoPolicyDefault; } /** - * Copy info command policy default. + * Copy info command policy default. Use when the policy will be modified for use in a + * specific command. */ public final InfoPolicy copyInfoPolicyDefault() { return new InfoPolicy(infoPolicyDefault); } /** - * Copy MRT record version verify policy default. + * Return MRT record version verify policy default. Use when the policy will not be modified. + */ + public final TxnVerifyPolicy getTxnVerifyPolicyDefault() { + return txnVerifyPolicyDefault; + } + + /** + * Copy MRT record version verify policy default. Use when the policy will be modified for use + * in a specific command. */ public final TxnVerifyPolicy copyTxnVerifyPolicyDefault() { return new TxnVerifyPolicy(txnVerifyPolicyDefault); } /** - * Copy MRT roll forward/back policy default. + * Return MRT roll forward/back policy default. Use when the policy will not be modified. + */ + public final TxnRollPolicy getTxnRollPolicyDefault() { + return txnRollPolicyDefault; + } + + /** + * Copy MRT roll forward/back policy default. Use when the policy will be modified for use + * in a specific command. */ public final TxnRollPolicy copyTxnRollPolicyDefault() { return new TxnRollPolicy(txnRollPolicyDefault); diff --git a/client/src/com/aerospike/client/IAerospikeClient.java b/client/src/com/aerospike/client/IAerospikeClient.java index bf3af145f..9f3205eed 100644 --- a/client/src/com/aerospike/client/IAerospikeClient.java +++ b/client/src/com/aerospike/client/IAerospikeClient.java @@ -82,112 +82,130 @@ public interface IAerospikeClient extends Closeable { //------------------------------------------------------- /** - * Copy read policy default to avoid problems if this shared instance is later modified. + * Return read policy default. Use when the policy will not be modified. */ public Policy getReadPolicyDefault(); /** - * Copy read policy default. + * Copy read policy default. Use when the policy will be modified for use in a specific command. */ public Policy copyReadPolicyDefault(); /** - * Copy write policy default to avoid problems if this shared instance is later modified. + * Return write policy default. Use when the policy will not be modified. */ public WritePolicy getWritePolicyDefault(); /** - * Copy write policy default. + * Copy write policy default. Use when the policy will be modified for use in a specific command. */ public WritePolicy copyWritePolicyDefault(); /** - * Copy scan policy default to avoid problems if this shared instance is later modified. + * Return scan policy default. Use when the policy will not be modified. */ public ScanPolicy getScanPolicyDefault(); /** - * Copy scan policy default. + * Copy scan policy default. Use when the policy will be modified for use in a specific command. */ public ScanPolicy copyScanPolicyDefault(); /** - * Copy query policy default to avoid problems if this shared instance is later modified. + * Return query policy default. Use when the policy will not be modified. */ public QueryPolicy getQueryPolicyDefault(); /** - * Copy query policy default. + * Copy query policy default. Use when the policy will be modified for use in a specific command. */ public QueryPolicy copyQueryPolicyDefault(); /** - * Copy batch header read policy default to avoid problems if this shared instance is later modified. + * Return batch header read policy default. Use when the policy will not be modified. */ public BatchPolicy getBatchPolicyDefault(); /** - * Copy batch header read policy default. + * Copy batch header read policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchPolicy copyBatchPolicyDefault(); /** - * Copy batch header write policy default to avoid problems if this shared instance is later modified. + * Return batch header write policy default. Use when the policy will not be modified. */ public BatchPolicy getBatchParentPolicyWriteDefault(); /** - * Copy batch header write policy default. + * Copy batch header write policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchPolicy copyBatchParentPolicyWriteDefault(); /** - * Copy batch detail write policy default to avoid problems if this shared instance is later modified. + * Return batch detail write policy default. Use when the policy will not be modified. */ public BatchWritePolicy getBatchWritePolicyDefault(); /** - * Copy batch detail write policy default. + * Copy batch detail write policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchWritePolicy copyBatchWritePolicyDefault(); /** - * Copy batch detail delete policy default to avoid problems if this shared instance is later modified. + * Return batch detail delete policy default. Use when the policy will not be modified. */ public BatchDeletePolicy getBatchDeletePolicyDefault(); /** - * Copy batch detail delete policy default. + * Copy batch detail delete policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchDeletePolicy copyBatchDeletePolicyDefault(); /** - * Copy batch detail UDF policy default to avoid problems if this shared instance is later modified. + * Return batch detail UDF policy default. Use when the policy will not be modified. */ public BatchUDFPolicy getBatchUDFPolicyDefault(); /** - * Copy batch detail UDF policy default. + * Copy batch detail UDF policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchUDFPolicy copyBatchUDFPolicyDefault(); /** - * Copy info command policy default to avoid problems if this shared instance is later modified. + * Return info command policy default. Use when the policy will not be modified. */ public InfoPolicy getInfoPolicyDefault(); /** - * Copy info command policy default. + * Copy info command policy default. Use when the policy will be modified for use in a + * specific command. */ public InfoPolicy copyInfoPolicyDefault(); /** - * Copy MRT record version verify policy default. + * Return MRT record version verify policy default. Use when the policy will not be modified. + */ + public TxnVerifyPolicy getTxnVerifyPolicyDefault(); + + /** + * Copy MRT record version verify policy default. Use when the policy will be modified for use + * in a specific command. */ public TxnVerifyPolicy copyTxnVerifyPolicyDefault(); /** - * Copy MRT roll forward/back policy default. + * Return MRT roll forward/back policy default. Use when the policy will not be modified. + */ + public TxnRollPolicy getTxnRollPolicyDefault(); + + /** + * Copy MRT roll forward/back policy default. Use when the policy will be modified for use + * in a specific command. */ public TxnRollPolicy copyTxnRollPolicyDefault(); From 1dc2a59b9af5bc4eac56c77dbd46dbc88c6fefa9 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Mon, 2 Dec 2024 18:06:03 -0500 Subject: [PATCH 6/9] CLIENT-3199 Do not close/delete MRT monitor record on abort/commit when a write command in that MRT fails and is inDoubt. --- client/src/com/aerospike/client/Txn.java | 16 +++++----------- .../aerospike/client/async/AsyncTxnAddKeys.java | 6 ------ .../com/aerospike/client/async/AsyncTxnRoll.java | 4 ++-- .../com/aerospike/client/command/TxnAddKeys.java | 6 ------ .../com/aerospike/client/command/TxnRoll.java | 6 +++--- .../src/com/aerospike/examples/AsyncExample.java | 4 ++-- 6 files changed, 12 insertions(+), 30 deletions(-) diff --git a/client/src/com/aerospike/client/Txn.java b/client/src/com/aerospike/client/Txn.java index a5601d985..37246adb0 100644 --- a/client/src/com/aerospike/client/Txn.java +++ b/client/src/com/aerospike/client/Txn.java @@ -45,7 +45,7 @@ public static enum State { private String namespace; private int timeout; private int deadline; - private boolean monitorInDoubt; + private boolean writeInDoubt; private boolean inDoubt; /** @@ -204,6 +204,7 @@ public void onWrite(Key key, Long version, int resultCode) { * Add key to write hash when write command is in doubt (usually caused by timeout). */ public void onWriteInDoubt(Key key) { + writeInDoubt = true; reads.remove(key); writes.add(key); } @@ -273,17 +274,10 @@ public int getDeadline() { } /** - * Set that the MRT monitor existence is in doubt. For internal use only. + * Return if the MRT monitor record should be closed/deleted. For internal use only. */ - public void setMonitorInDoubt() { - this.monitorInDoubt = true; - } - - /** - * Does MRT monitor record exist or is in doubt. - */ - public boolean monitorMightExist() { - return deadline != 0 || monitorInDoubt; + public boolean closeMonitor() { + return deadline != 0 && !writeInDoubt; } /** diff --git a/client/src/com/aerospike/client/async/AsyncTxnAddKeys.java b/client/src/com/aerospike/client/async/AsyncTxnAddKeys.java index 3ac91732c..987ab5a97 100644 --- a/client/src/com/aerospike/client/async/AsyncTxnAddKeys.java +++ b/client/src/com/aerospike/client/async/AsyncTxnAddKeys.java @@ -51,12 +51,6 @@ protected boolean parseResult() { throw new AerospikeException(rp.resultCode); } - @Override - void onInDoubt() { - // The MRT monitor record might exist if AsyncTxnAddKeys command is inDoubt. - policy.txn.setMonitorInDoubt(); - } - @Override protected void onSuccess() { if (listener != null) { diff --git a/client/src/com/aerospike/client/async/AsyncTxnRoll.java b/client/src/com/aerospike/client/async/AsyncTxnRoll.java index 0e0f20686..e983005d8 100644 --- a/client/src/com/aerospike/client/async/AsyncTxnRoll.java +++ b/client/src/com/aerospike/client/async/AsyncTxnRoll.java @@ -315,7 +315,7 @@ private void roll(BatchRecordArrayListener rollListener, int txnAttr) { } private void closeOnCommit(boolean verified) { - if (! txn.monitorMightExist()) { + if (! txn.closeMonitor()) { // There is no MRT monitor to remove. if (verified) { notifyCommitSuccess(CommitStatus.OK); @@ -365,7 +365,7 @@ public void onFailure(AerospikeException ae) { } private void closeOnAbort() { - if (! txn.monitorMightExist()) { + if (! txn.closeMonitor()) { // There is no MRT monitor record to remove. notifyAbortSuccess(AbortStatus.OK); return; diff --git a/client/src/com/aerospike/client/command/TxnAddKeys.java b/client/src/com/aerospike/client/command/TxnAddKeys.java index af155384e..dfb103f32 100644 --- a/client/src/com/aerospike/client/command/TxnAddKeys.java +++ b/client/src/com/aerospike/client/command/TxnAddKeys.java @@ -48,10 +48,4 @@ protected void parseResult(Connection conn) throws IOException { throw new AerospikeException(rp.resultCode); } - - @Override - protected void onInDoubt() { - // The MRT monitor record might exist if TxnAddKeys command is inDoubt. - policy.txn.setMonitorInDoubt(); - } } diff --git a/client/src/com/aerospike/client/command/TxnRoll.java b/client/src/com/aerospike/client/command/TxnRoll.java index da21392ce..ee5d13748 100644 --- a/client/src/com/aerospike/client/command/TxnRoll.java +++ b/client/src/com/aerospike/client/command/TxnRoll.java @@ -60,7 +60,7 @@ public void verify(BatchPolicy verifyPolicy, BatchPolicy rollPolicy) { throw createCommitException(CommitError.VERIFY_FAIL_ABORT_ABANDONED, t); } - if (txn.monitorMightExist()) { + if (txn.closeMonitor()) { try { WritePolicy writePolicy = new WritePolicy(rollPolicy); Key txnKey = TxnMonitor.getTxnMonitorKey(txn); @@ -130,7 +130,7 @@ else if (ae.getInDoubt()){ return CommitStatus.ROLL_FORWARD_ABANDONED; } - if (txn.monitorMightExist()) { + if (txn.closeMonitor()) { // Remove MRT monitor. try { close(writePolicy, txnKey); @@ -165,7 +165,7 @@ public AbortStatus abort(BatchPolicy rollPolicy) { return AbortStatus.ROLL_BACK_ABANDONED; } - if (txn.monitorMightExist()) { + if (txn.closeMonitor()) { try { WritePolicy writePolicy = new WritePolicy(rollPolicy); Key txnKey = TxnMonitor.getTxnMonitorKey(txn); diff --git a/examples/src/com/aerospike/examples/AsyncExample.java b/examples/src/com/aerospike/examples/AsyncExample.java index 3b7bf4900..0fc0e3e57 100644 --- a/examples/src/com/aerospike/examples/AsyncExample.java +++ b/examples/src/com/aerospike/examples/AsyncExample.java @@ -130,8 +130,8 @@ public static void runExample(String exampleName, IAerospikeClient client, Event AsyncExample example = (AsyncExample)ctor.newInstance(); example.console = console; example.params = params; - example.writePolicy = client.getWritePolicyDefault(); - example.policy = client.getReadPolicyDefault(); + example.writePolicy = client.copyWritePolicyDefault(); + example.policy = client.copyReadPolicyDefault(); example.run(client, eventLoop); } else { From 1bd4bec715a34e0bbd9b93fda0f891a5ed64dfa3 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Mon, 2 Dec 2024 18:49:50 -0500 Subject: [PATCH 7/9] CLIENT-3174 Add sync and async touched() methods. --- .../com/aerospike/client/AerospikeClient.java | 62 +++++++++++++++++- .../aerospike/client/IAerospikeClient.java | 37 ++++++++++- .../aerospike/client/async/AsyncTouch.java | 38 +++++++++-- .../client/command/TouchCommand.java | 20 +++++- .../aerospike/test/sync/basic/TestTouch.java | 63 +++++++++++++++++-- 5 files changed, 204 insertions(+), 16 deletions(-) diff --git a/client/src/com/aerospike/client/AerospikeClient.java b/client/src/com/aerospike/client/AerospikeClient.java index 861ad251d..13b06b429 100644 --- a/client/src/com/aerospike/client/AerospikeClient.java +++ b/client/src/com/aerospike/client/AerospikeClient.java @@ -1335,7 +1335,8 @@ public final void truncate(InfoPolicy policy, String ns, String set, Calendar be /** * Reset record's time to expiration using the policy's expiration. - * Fail if the record does not exist. + * If the record does not exist, it can't be created because the server deletes empty records. + * Throw an exception if the record does not exist. * * @param policy write configuration parameters, pass in null for defaults * @param key unique record identifier @@ -1351,12 +1352,14 @@ public final void touch(WritePolicy policy, Key key) TxnMonitor.addKey(cluster, policy, key); } - TouchCommand command = new TouchCommand(cluster, policy, key); + TouchCommand command = new TouchCommand(cluster, policy, key, true); command.execute(); } /** * Asynchronously reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + *

* This method registers the command with an event loop and returns. * The event loop thread will process the command and send the results to the listener. *

@@ -1383,6 +1386,61 @@ public final void touch(EventLoop eventLoop, WriteListener listener, WritePolicy AsyncTxnMonitor.execute(eventLoop, cluster, policy, command); } + /** + * Reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + * Return true if the record exists and is touched. Return false if the record does not exist. + * + * @param policy write configuration parameters, pass in null for defaults + * @param key unique record identifier + * @throws AerospikeException if touch fails + */ + public final boolean touched(WritePolicy policy, Key key) + throws AerospikeException { + if (policy == null) { + policy = writePolicyDefault; + } + + if (policy.txn != null) { + TxnMonitor.addKey(cluster, policy, key); + } + + TouchCommand command = new TouchCommand(cluster, policy, key, false); + command.execute(); + return command.getTouched(); + } + + /** + * Asynchronously reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + *

+ * This method registers the command with an event loop and returns. + * The event loop thread will process the command and send the results to the listener. + *

+ * If the record does not exist, send a value of false to + * {@link com.aerospike.client.listener.ExistsListener#onSuccess(Key, boolean)} + * + * @param eventLoop event loop that will process the command. If NULL, the event + * loop will be chosen by round-robin. + * @param listener where to send results, pass in null for fire and forget + * @param policy write configuration parameters, pass in null for defaults + * @param key unique record identifier + * @throws AerospikeException if event loop registration fails + */ + public final void touched(EventLoop eventLoop, ExistsListener listener, WritePolicy policy, Key key) + throws AerospikeException { + if (eventLoop == null) { + eventLoop = cluster.eventLoops.next(); + } + + if (policy == null) { + policy = writePolicyDefault; + } + + AsyncTouch command = new AsyncTouch(cluster, listener, policy, key); + AsyncTxnMonitor.execute(eventLoop, cluster, policy, command); + } + //------------------------------------------------------- // Existence-Check Operations //------------------------------------------------------- diff --git a/client/src/com/aerospike/client/IAerospikeClient.java b/client/src/com/aerospike/client/IAerospikeClient.java index 9f3205eed..c3fac8e85 100644 --- a/client/src/com/aerospike/client/IAerospikeClient.java +++ b/client/src/com/aerospike/client/IAerospikeClient.java @@ -615,7 +615,8 @@ public void truncate(InfoPolicy policy, String ns, String set, Calendar beforeLa /** * Reset record's time to expiration using the policy's expiration. - * Fail if the record does not exist. + * If the record does not exist, it can't be created because the server deletes empty records. + * Throw an exception if the record does not exist. * * @param policy write configuration parameters, pass in null for defaults * @param key unique record identifier @@ -626,6 +627,8 @@ public void touch(WritePolicy policy, Key key) /** * Asynchronously reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + *

* This method registers the command with an event loop and returns. * The event loop thread will process the command and send the results to the listener. *

@@ -641,6 +644,38 @@ public void touch(WritePolicy policy, Key key) public void touch(EventLoop eventLoop, WriteListener listener, WritePolicy policy, Key key) throws AerospikeException; + /** + * Reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + * Return true if the record exists and is touched. Return false if the record does not exist. + * + * @param policy write configuration parameters, pass in null for defaults + * @param key unique record identifier + * @throws AerospikeException if touch fails + */ + public boolean touched(WritePolicy policy, Key key) + throws AerospikeException; + + /** + * Asynchronously reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + *

+ * This method registers the command with an event loop and returns. + * The event loop thread will process the command and send the results to the listener. + *

+ * If the record does not exist, send a value of false to + * {@link com.aerospike.client.listener.ExistsListener#onSuccess(Key, boolean)} + * + * @param eventLoop event loop that will process the command. If NULL, the event + * loop will be chosen by round-robin. + * @param listener where to send results, pass in null for fire and forget + * @param policy write configuration parameters, pass in null for defaults + * @param key unique record identifier + * @throws AerospikeException if event loop registration fails + */ + public void touched(EventLoop eventLoop, ExistsListener listener, WritePolicy policy, Key key) + throws AerospikeException; + //------------------------------------------------------- // Existence-Check Operations //------------------------------------------------------- diff --git a/client/src/com/aerospike/client/async/AsyncTouch.java b/client/src/com/aerospike/client/async/AsyncTouch.java index ff24608a3..2cdf57b1f 100644 --- a/client/src/com/aerospike/client/async/AsyncTouch.java +++ b/client/src/com/aerospike/client/async/AsyncTouch.java @@ -20,15 +20,25 @@ import com.aerospike.client.Key; import com.aerospike.client.ResultCode; import com.aerospike.client.cluster.Cluster; +import com.aerospike.client.listener.ExistsListener; import com.aerospike.client.listener.WriteListener; import com.aerospike.client.policy.WritePolicy; public final class AsyncTouch extends AsyncWriteBase { - private final WriteListener listener; + private final WriteListener writeListener; + private final ExistsListener existsListener; + private boolean touched; public AsyncTouch(Cluster cluster, WriteListener listener, WritePolicy writePolicy, Key key) { super(cluster, writePolicy, key); - this.listener = listener; + this.writeListener = listener; + this.existsListener = null; + } + + public AsyncTouch(Cluster cluster, ExistsListener listener, WritePolicy writePolicy, Key key) { + super(cluster, writePolicy, key); + this.writeListener = null; + this.existsListener = listener; } @Override @@ -41,6 +51,15 @@ protected boolean parseResult() { int resultCode = parseHeader(); if (resultCode == ResultCode.OK) { + touched = true; + return true; + } + + if (resultCode == ResultCode.KEY_NOT_FOUND_ERROR) { + if (existsListener == null) { + throw new AerospikeException(resultCode); + } + touched = false; return true; } @@ -48,6 +67,7 @@ protected boolean parseResult() { if (policy.failOnFilteredOut) { throw new AerospikeException(resultCode); } + touched = false; return true; } @@ -56,15 +76,21 @@ protected boolean parseResult() { @Override protected void onSuccess() { - if (listener != null) { - listener.onSuccess(key); + if (writeListener != null) { + writeListener.onSuccess(key); + } + else if (existsListener != null) { + existsListener.onSuccess(key, touched); } } @Override protected void onFailure(AerospikeException e) { - if (listener != null) { - listener.onFailure(e); + if (writeListener != null) { + writeListener.onFailure(e); + } + else if (existsListener != null) { + existsListener.onFailure(e); } } } diff --git a/client/src/com/aerospike/client/command/TouchCommand.java b/client/src/com/aerospike/client/command/TouchCommand.java index 4432e5efa..63007526f 100644 --- a/client/src/com/aerospike/client/command/TouchCommand.java +++ b/client/src/com/aerospike/client/command/TouchCommand.java @@ -26,8 +26,12 @@ import com.aerospike.client.policy.WritePolicy; public final class TouchCommand extends SyncWriteCommand { - public TouchCommand(Cluster cluster, WritePolicy writePolicy, Key key) { + private boolean failOnNotFound; + private boolean touched; + + public TouchCommand(Cluster cluster, WritePolicy writePolicy, Key key, boolean failOnNotFound) { super(cluster, writePolicy, key); + this.failOnNotFound = failOnNotFound; } @Override @@ -40,6 +44,15 @@ protected void parseResult(Connection conn) throws IOException { int resultCode = parseHeader(conn); if (resultCode == ResultCode.OK) { + touched = true; + return; + } + + if (resultCode == ResultCode.KEY_NOT_FOUND_ERROR) { + if (failOnNotFound) { + throw new AerospikeException(resultCode); + } + touched = false; return; } @@ -47,9 +60,14 @@ protected void parseResult(Connection conn) throws IOException { if (writePolicy.failOnFilteredOut) { throw new AerospikeException(resultCode); } + touched = false; return; } throw new AerospikeException(resultCode); } + + public boolean getTouched() { + return touched; + } } diff --git a/test/src/com/aerospike/test/sync/basic/TestTouch.java b/test/src/com/aerospike/test/sync/basic/TestTouch.java index a4df049ab..68760c40f 100644 --- a/test/src/com/aerospike/test/sync/basic/TestTouch.java +++ b/test/src/com/aerospike/test/sync/basic/TestTouch.java @@ -16,6 +16,7 @@ */ package com.aerospike.test.sync.basic; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; @@ -31,31 +32,81 @@ public class TestTouch extends TestSync { @Test - public void touch() { + public void touchOperate() { if (! args.hasTtl) { return; } - Key key = new Key(args.namespace, args.set, "touchkey"); + Key key = new Key(args.namespace, args.set, "touchOperate"); Bin bin = new Bin("touchbin", "touchvalue"); WritePolicy writePolicy = new WritePolicy(); - writePolicy.expiration = 2; + writePolicy.expiration = 1; client.put(writePolicy, key, bin); - writePolicy.expiration = 5; + writePolicy.expiration = 2; Record record = client.operate(writePolicy, key, Operation.touch(), Operation.getHeader()); assertRecordFound(key, record); assertNotEquals(0, record.expiration); - Util.sleep(3000); + Util.sleep(1000); record = client.get(null, key, bin.name); assertRecordFound(key, record); - Util.sleep(4000); + Util.sleep(3000); record = client.get(null, key, bin.name); assertNull(record); } + + @Test + public void touch() { + if (! args.hasTtl) { + return; + } + + Key key = new Key(args.namespace, args.set, "touch"); + Bin bin = new Bin("touchbin", "touchvalue"); + + WritePolicy writePolicy = new WritePolicy(); + writePolicy.expiration = 1; + client.put(writePolicy, key, bin); + + writePolicy.expiration = 2; + client.touch(writePolicy, key); + + Util.sleep(1000); + + Record record = client.getHeader(writePolicy, key); + assertRecordFound(key, record); + assertNotEquals(0, record.expiration); + + Util.sleep(3000); + + record = client.getHeader(null, key); + assertNull(record); + } + + @Test + public void touched() { + if (! args.hasTtl) { + return; + } + + Key key = new Key(args.namespace, args.set, "touched"); + + client.delete(null, key); + + WritePolicy writePolicy = new WritePolicy(); + writePolicy.expiration = 10; + boolean rv = client.touched(writePolicy, key); + assertEquals(false, rv); + + Bin bin = new Bin("touchbin", "touchvalue"); + client.put(writePolicy, key, bin); + + rv = client.touched(writePolicy, key); + assertEquals(true, rv); + } } From c3359f827a69bb7555c454e41a769bbfb5f4a192 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Tue, 3 Dec 2024 11:55:14 -0500 Subject: [PATCH 8/9] CLIENT-3201 Skip MRT tests if namespace is not configured as strong consistency. --- test/src/com/aerospike/test/async/TestAsyncTxn.java | 8 +++++--- test/src/com/aerospike/test/sync/basic/TestTxn.java | 2 ++ test/src/com/aerospike/test/util/Args.java | 9 +++++++++ 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/test/src/com/aerospike/test/async/TestAsyncTxn.java b/test/src/com/aerospike/test/async/TestAsyncTxn.java index 468384dba..f22f57aaa 100644 --- a/test/src/com/aerospike/test/async/TestAsyncTxn.java +++ b/test/src/com/aerospike/test/async/TestAsyncTxn.java @@ -16,8 +16,8 @@ */ package com.aerospike.test.async; -import org.junit.Test; import org.junit.BeforeClass; +import org.junit.Test; import com.aerospike.client.AbortStatus; import com.aerospike.client.AerospikeException; @@ -31,13 +31,13 @@ import com.aerospike.client.ResultCode; import com.aerospike.client.Txn; import com.aerospike.client.Value; +import com.aerospike.client.listener.AbortListener; import com.aerospike.client.listener.BatchRecordArrayListener; +import com.aerospike.client.listener.CommitListener; import com.aerospike.client.listener.DeleteListener; import com.aerospike.client.listener.ExecuteListener; import com.aerospike.client.listener.RecordArrayListener; import com.aerospike.client.listener.RecordListener; -import com.aerospike.client.listener.AbortListener; -import com.aerospike.client.listener.CommitListener; import com.aerospike.client.listener.WriteListener; import com.aerospike.client.policy.BatchPolicy; import com.aerospike.client.policy.Policy; @@ -50,6 +50,8 @@ public class TestAsyncTxn extends TestAsync { @BeforeClass public static void register() { + // Multi-record transactions require strong consistency namespaces. + org.junit.Assume.assumeTrue(args.scMode); RegisterTask task = client.register(null, TestUDF.class.getClassLoader(), "udf/record_example.lua", "record_example.lua", Language.LUA); task.waitTillComplete(); } diff --git a/test/src/com/aerospike/test/sync/basic/TestTxn.java b/test/src/com/aerospike/test/sync/basic/TestTxn.java index 6c12dbbc9..629b25f8a 100644 --- a/test/src/com/aerospike/test/sync/basic/TestTxn.java +++ b/test/src/com/aerospike/test/sync/basic/TestTxn.java @@ -45,6 +45,8 @@ public class TestTxn extends TestSync { @BeforeClass public static void register() { + // Multi-record transactions require strong consistency namespaces. + org.junit.Assume.assumeTrue(args.scMode); RegisterTask task = client.register(null, TestUDF.class.getClassLoader(), "udf/record_example.lua", "record_example.lua", Language.LUA); task.waitTillComplete(); } diff --git a/test/src/com/aerospike/test/util/Args.java b/test/src/com/aerospike/test/util/Args.java index 23014ccb4..05ff67680 100644 --- a/test/src/com/aerospike/test/util/Args.java +++ b/test/src/com/aerospike/test/util/Args.java @@ -34,6 +34,7 @@ import com.aerospike.client.Log.Level; import com.aerospike.client.async.EventLoopType; import com.aerospike.client.cluster.Node; +import com.aerospike.client.cluster.Partitions; import com.aerospike.client.policy.AuthMode; import com.aerospike.client.policy.ClientPolicy; import com.aerospike.client.policy.Policy; @@ -57,6 +58,7 @@ public class Args { public int totalTimeout = 1000; public boolean enterprise; public boolean hasTtl; + public boolean scMode; public Args() { host = "127.0.0.1"; @@ -248,7 +250,14 @@ private static void logUsage(Options options) { /** * Some database calls need to know how the server is configured. */ + @SuppressWarnings("resource") public void setServerSpecific(IAerospikeClient client) { + Partitions partitions = client.getCluster().partitionMap.get(namespace); + + if (partitions != null) { + scMode = partitions.scMode; + } + Node node = client.getNodes()[0]; String editionFilter = "edition"; String namespaceFilter = "namespace/" + namespace; From b747cd928211dd954337043a5b563cce5a1fb732 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Tue, 3 Dec 2024 12:07:42 -0500 Subject: [PATCH 9/9] Update version 9.0.1 --- benchmarks/pom.xml | 2 +- client/pom.xml | 2 +- examples/pom.xml | 2 +- pom.xml | 2 +- test/pom.xml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 15c28e7c3..20e3946e9 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-parent - 9.0.0 + 9.0.1 aerospike-benchmarks jar diff --git a/client/pom.xml b/client/pom.xml index 889ef685e..6da90c54a 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-parent - 9.0.0 + 9.0.1 aerospike-client-jdk8 jar diff --git a/examples/pom.xml b/examples/pom.xml index 6c36cb6b6..daec43f27 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-parent - 9.0.0 + 9.0.1 aerospike-examples jar diff --git a/pom.xml b/pom.xml index 3bc7d6c80..4ef2d37b8 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.aerospike aerospike-parent aerospike-parent - 9.0.0 + 9.0.1 pom https://github.com/aerospike/aerospike-client-java diff --git a/test/pom.xml b/test/pom.xml index f7190c953..7f7b4dfa7 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-parent - 9.0.0 + 9.0.1 aerospike-client-test jar