diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 15c28e7c3..20e3946e9 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -6,7 +6,7 @@
* This method registers the command with an event loop and returns. * The event loop thread will process the command and send the results to the listener. *
@@ -1361,6 +1386,61 @@ public final void touch(EventLoop eventLoop, WriteListener listener, WritePolicy AsyncTxnMonitor.execute(eventLoop, cluster, policy, command); } + /** + * Reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + * Return true if the record exists and is touched. Return false if the record does not exist. + * + * @param policy write configuration parameters, pass in null for defaults + * @param key unique record identifier + * @throws AerospikeException if touch fails + */ + public final boolean touched(WritePolicy policy, Key key) + throws AerospikeException { + if (policy == null) { + policy = writePolicyDefault; + } + + if (policy.txn != null) { + TxnMonitor.addKey(cluster, policy, key); + } + + TouchCommand command = new TouchCommand(cluster, policy, key, false); + command.execute(); + return command.getTouched(); + } + + /** + * Asynchronously reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + *
+ * This method registers the command with an event loop and returns. + * The event loop thread will process the command and send the results to the listener. + *
+ * If the record does not exist, send a value of false to + * {@link com.aerospike.client.listener.ExistsListener#onSuccess(Key, boolean)} + * + * @param eventLoop event loop that will process the command. If NULL, the event + * loop will be chosen by round-robin. + * @param listener where to send results, pass in null for fire and forget + * @param policy write configuration parameters, pass in null for defaults + * @param key unique record identifier + * @throws AerospikeException if event loop registration fails + */ + public final void touched(EventLoop eventLoop, ExistsListener listener, WritePolicy policy, Key key) + throws AerospikeException { + if (eventLoop == null) { + eventLoop = cluster.eventLoops.next(); + } + + if (policy == null) { + policy = writePolicyDefault; + } + + AsyncTouch command = new AsyncTouch(cluster, listener, policy, key); + AsyncTxnMonitor.execute(eventLoop, cluster, policy, command); + } + //------------------------------------------------------- // Existence-Check Operations //------------------------------------------------------- diff --git a/client/src/com/aerospike/client/AerospikeException.java b/client/src/com/aerospike/client/AerospikeException.java index 7a092a086..2f1153cb9 100644 --- a/client/src/com/aerospike/client/AerospikeException.java +++ b/client/src/com/aerospike/client/AerospikeException.java @@ -496,7 +496,7 @@ public Backoff(int resultCode) { } /** - * Exception thrown when {@link AerospikeClient#commit(com.aerospike.client.Tran)} fails. + * Exception thrown when a multi-record transaction commit fails. */ public static final class Commit extends AerospikeException { private static final long serialVersionUID = 1L; diff --git a/client/src/com/aerospike/client/IAerospikeClient.java b/client/src/com/aerospike/client/IAerospikeClient.java index bf3af145f..c3fac8e85 100644 --- a/client/src/com/aerospike/client/IAerospikeClient.java +++ b/client/src/com/aerospike/client/IAerospikeClient.java @@ -82,112 +82,130 @@ public interface IAerospikeClient extends Closeable { //------------------------------------------------------- /** - * Copy read policy default to avoid problems if this shared instance is later modified. + * Return read policy default. Use when the policy will not be modified. */ public Policy getReadPolicyDefault(); /** - * Copy read policy default. + * Copy read policy default. Use when the policy will be modified for use in a specific command. */ public Policy copyReadPolicyDefault(); /** - * Copy write policy default to avoid problems if this shared instance is later modified. + * Return write policy default. Use when the policy will not be modified. */ public WritePolicy getWritePolicyDefault(); /** - * Copy write policy default. + * Copy write policy default. Use when the policy will be modified for use in a specific command. */ public WritePolicy copyWritePolicyDefault(); /** - * Copy scan policy default to avoid problems if this shared instance is later modified. + * Return scan policy default. Use when the policy will not be modified. */ public ScanPolicy getScanPolicyDefault(); /** - * Copy scan policy default. + * Copy scan policy default. Use when the policy will be modified for use in a specific command. */ public ScanPolicy copyScanPolicyDefault(); /** - * Copy query policy default to avoid problems if this shared instance is later modified. + * Return query policy default. Use when the policy will not be modified. */ public QueryPolicy getQueryPolicyDefault(); /** - * Copy query policy default. + * Copy query policy default. Use when the policy will be modified for use in a specific command. */ public QueryPolicy copyQueryPolicyDefault(); /** - * Copy batch header read policy default to avoid problems if this shared instance is later modified. + * Return batch header read policy default. Use when the policy will not be modified. */ public BatchPolicy getBatchPolicyDefault(); /** - * Copy batch header read policy default. + * Copy batch header read policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchPolicy copyBatchPolicyDefault(); /** - * Copy batch header write policy default to avoid problems if this shared instance is later modified. + * Return batch header write policy default. Use when the policy will not be modified. */ public BatchPolicy getBatchParentPolicyWriteDefault(); /** - * Copy batch header write policy default. + * Copy batch header write policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchPolicy copyBatchParentPolicyWriteDefault(); /** - * Copy batch detail write policy default to avoid problems if this shared instance is later modified. + * Return batch detail write policy default. Use when the policy will not be modified. */ public BatchWritePolicy getBatchWritePolicyDefault(); /** - * Copy batch detail write policy default. + * Copy batch detail write policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchWritePolicy copyBatchWritePolicyDefault(); /** - * Copy batch detail delete policy default to avoid problems if this shared instance is later modified. + * Return batch detail delete policy default. Use when the policy will not be modified. */ public BatchDeletePolicy getBatchDeletePolicyDefault(); /** - * Copy batch detail delete policy default. + * Copy batch detail delete policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchDeletePolicy copyBatchDeletePolicyDefault(); /** - * Copy batch detail UDF policy default to avoid problems if this shared instance is later modified. + * Return batch detail UDF policy default. Use when the policy will not be modified. */ public BatchUDFPolicy getBatchUDFPolicyDefault(); /** - * Copy batch detail UDF policy default. + * Copy batch detail UDF policy default. Use when the policy will be modified for use in a + * specific command. */ public BatchUDFPolicy copyBatchUDFPolicyDefault(); /** - * Copy info command policy default to avoid problems if this shared instance is later modified. + * Return info command policy default. Use when the policy will not be modified. */ public InfoPolicy getInfoPolicyDefault(); /** - * Copy info command policy default. + * Copy info command policy default. Use when the policy will be modified for use in a + * specific command. */ public InfoPolicy copyInfoPolicyDefault(); /** - * Copy MRT record version verify policy default. + * Return MRT record version verify policy default. Use when the policy will not be modified. + */ + public TxnVerifyPolicy getTxnVerifyPolicyDefault(); + + /** + * Copy MRT record version verify policy default. Use when the policy will be modified for use + * in a specific command. */ public TxnVerifyPolicy copyTxnVerifyPolicyDefault(); /** - * Copy MRT roll forward/back policy default. + * Return MRT roll forward/back policy default. Use when the policy will not be modified. + */ + public TxnRollPolicy getTxnRollPolicyDefault(); + + /** + * Copy MRT roll forward/back policy default. Use when the policy will be modified for use + * in a specific command. */ public TxnRollPolicy copyTxnRollPolicyDefault(); @@ -597,7 +615,8 @@ public void truncate(InfoPolicy policy, String ns, String set, Calendar beforeLa /** * Reset record's time to expiration using the policy's expiration. - * Fail if the record does not exist. + * If the record does not exist, it can't be created because the server deletes empty records. + * Throw an exception if the record does not exist. * * @param policy write configuration parameters, pass in null for defaults * @param key unique record identifier @@ -608,6 +627,8 @@ public void touch(WritePolicy policy, Key key) /** * Asynchronously reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + *
* This method registers the command with an event loop and returns. * The event loop thread will process the command and send the results to the listener. *
@@ -623,6 +644,38 @@ public void touch(WritePolicy policy, Key key) public void touch(EventLoop eventLoop, WriteListener listener, WritePolicy policy, Key key) throws AerospikeException; + /** + * Reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + * Return true if the record exists and is touched. Return false if the record does not exist. + * + * @param policy write configuration parameters, pass in null for defaults + * @param key unique record identifier + * @throws AerospikeException if touch fails + */ + public boolean touched(WritePolicy policy, Key key) + throws AerospikeException; + + /** + * Asynchronously reset record's time to expiration using the policy's expiration. + * If the record does not exist, it can't be created because the server deletes empty records. + *
+ * This method registers the command with an event loop and returns. + * The event loop thread will process the command and send the results to the listener. + *
+ * If the record does not exist, send a value of false to + * {@link com.aerospike.client.listener.ExistsListener#onSuccess(Key, boolean)} + * + * @param eventLoop event loop that will process the command. If NULL, the event + * loop will be chosen by round-robin. + * @param listener where to send results, pass in null for fire and forget + * @param policy write configuration parameters, pass in null for defaults + * @param key unique record identifier + * @throws AerospikeException if event loop registration fails + */ + public void touched(EventLoop eventLoop, ExistsListener listener, WritePolicy policy, Key key) + throws AerospikeException; + //------------------------------------------------------- // Existence-Check Operations //------------------------------------------------------- diff --git a/client/src/com/aerospike/client/ResultCode.java b/client/src/com/aerospike/client/ResultCode.java index 5d454f80b..d6e0a7eb8 100644 --- a/client/src/com/aerospike/client/ResultCode.java +++ b/client/src/com/aerospike/client/ResultCode.java @@ -238,37 +238,11 @@ public final class ResultCode { */ public static final int LOST_CONFLICT = 28; - /** - * MRT record blocked by a different transaction. - */ - public static final int MRT_BLOCKED = 29; - - /** - * MRT read version mismatch identified during commit. - * Some other command changed the record outside of the transaction. - */ - public static final int MRT_VERSION_MISMATCH = 30; - - /** - * MRT deadline reached without a successful commit or abort. - */ - public static final int MRT_EXPIRED = 31; - /** * Write can't complete until XDR finishes shipping. */ public static final int XDR_KEY_BUSY = 32; - /** - * MRT was already committed. - */ - public static final int MRT_COMMITTED = 33; - - /** - * MRT was already aborted. - */ - public static final int MRT_ABORTED = 34; - /** * There are no more records left for query. */ @@ -394,6 +368,37 @@ public final class ResultCode { */ public static final int UDF_BAD_RESPONSE = 100; + /** + * MRT record blocked by a different transaction. + */ + public static final int MRT_BLOCKED = 120; + + /** + * MRT read version mismatch identified during commit. + * Some other command changed the record outside of the transaction. + */ + public static final int MRT_VERSION_MISMATCH = 121; + + /** + * MRT deadline reached without a successful commit or abort. + */ + public static final int MRT_EXPIRED = 122; + + /** + * MRT write command limit (4096) exceeded. + */ + public static final int MRT_TOO_MANY_WRITES = 123; + + /** + * MRT was already committed. + */ + public static final int MRT_COMMITTED = 124; + + /** + * MRT was already aborted. + */ + public static final int MRT_ABORTED = 125; + /** * Batch functionality has been disabled. */ @@ -620,24 +625,9 @@ public static String getResultString(int resultCode) { case LOST_CONFLICT: return "Command failed due to conflict with XDR"; - case MRT_BLOCKED: - return "MRT record blocked by a different transaction"; - - case MRT_VERSION_MISMATCH: - return "MRT version mismatch"; - - case MRT_EXPIRED: - return "MRT expired"; - case XDR_KEY_BUSY: return "Write can't complete until XDR finishes shipping"; - - case MRT_COMMITTED: - return "MRT already committed"; - - case MRT_ABORTED: - return "MRT already aborted"; - + case QUERY_END: return "Query end"; @@ -713,6 +703,24 @@ public static String getResultString(int resultCode) { case UDF_BAD_RESPONSE: return "UDF returned error"; + case MRT_BLOCKED: + return "MRT record blocked by a different transaction"; + + case MRT_VERSION_MISMATCH: + return "MRT version mismatch"; + + case MRT_EXPIRED: + return "MRT expired"; + + case MRT_TOO_MANY_WRITES: + return "MRT write command limit exceeded"; + + case MRT_COMMITTED: + return "MRT already committed"; + + case MRT_ABORTED: + return "MRT already aborted"; + case BATCH_DISABLED: return "Batch functionality has been disabled"; diff --git a/client/src/com/aerospike/client/Txn.java b/client/src/com/aerospike/client/Txn.java index d8e0e0330..37246adb0 100644 --- a/client/src/com/aerospike/client/Txn.java +++ b/client/src/com/aerospike/client/Txn.java @@ -45,24 +45,29 @@ public static enum State { private String namespace; private int timeout; private int deadline; - private boolean monitorInDoubt; + private boolean writeInDoubt; private boolean inDoubt; /** * Create MRT, assign random transaction id and initialize reads/writes hashmaps with default - * capacities. The default MRT timeout is 10 seconds. + * capacities. + *
+ * The default client MRT timeout is zero. This means use the server configuration mrt-duration + * as the MRT timeout. The default mrt-duration is 10 seconds. */ public Txn() { id = createId(); reads = new ConcurrentHashMap<>(); writes = ConcurrentHashMap.newKeySet(); state = Txn.State.OPEN; - timeout = 10; // seconds } /** * Create MRT, assign random transaction id and initialize reads/writes hashmaps with given - * capacities. The default MRT timeout is 10 seconds. + * capacities. + *
+ * The default client MRT timeout is zero. This means use the server configuration mrt-duration + * as the MRT timeout. The default mrt-duration is 10 seconds. * * @param readsCapacity expected number of record reads in the MRT. Minimum value is 16. * @param writesCapacity expected number of record writes in the MRT. Minimum value is 16. @@ -80,7 +85,6 @@ public Txn(int readsCapacity, int writesCapacity) { reads = new ConcurrentHashMap<>(readsCapacity); writes = ConcurrentHashMap.newKeySet(writesCapacity); state = Txn.State.OPEN; - timeout = 10; // seconds } private static long createId() { @@ -110,6 +114,9 @@ public long getId() { * Set MRT timeout in seconds. The timer starts when the MRT monitor record is created. * This occurs when the first command in the MRT is executed. If the timeout is reached before * a commit or abort is called, the server will expire and rollback the MRT. + *
+ * If the MRT timeout is zero, the server configuration mrt-duration is used.
+ * The default mrt-duration is 10 seconds.
*/
public void setTimeout(int timeout) {
this.timeout = timeout;
@@ -197,6 +204,7 @@ public void onWrite(Key key, Long version, int resultCode) {
* Add key to write hash when write command is in doubt (usually caused by timeout).
*/
public void onWriteInDoubt(Key key) {
+ writeInDoubt = true;
reads.remove(key);
writes.add(key);
}
@@ -266,17 +274,10 @@ public int getDeadline() {
}
/**
- * Set that the MRT monitor existence is in doubt. For internal use only.
+ * Return if the MRT monitor record should be closed/deleted. For internal use only.
*/
- public void setMonitorInDoubt() {
- this.monitorInDoubt = true;
- }
-
- /**
- * Does MRT monitor record exist or is in doubt.
- */
- public boolean monitorMightExist() {
- return deadline != 0 || monitorInDoubt;
+ public boolean closeMonitor() {
+ return deadline != 0 && !writeInDoubt;
}
/**
diff --git a/client/src/com/aerospike/client/async/AsyncTouch.java b/client/src/com/aerospike/client/async/AsyncTouch.java
index ff24608a3..2cdf57b1f 100644
--- a/client/src/com/aerospike/client/async/AsyncTouch.java
+++ b/client/src/com/aerospike/client/async/AsyncTouch.java
@@ -20,15 +20,25 @@
import com.aerospike.client.Key;
import com.aerospike.client.ResultCode;
import com.aerospike.client.cluster.Cluster;
+import com.aerospike.client.listener.ExistsListener;
import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.policy.WritePolicy;
public final class AsyncTouch extends AsyncWriteBase {
- private final WriteListener listener;
+ private final WriteListener writeListener;
+ private final ExistsListener existsListener;
+ private boolean touched;
public AsyncTouch(Cluster cluster, WriteListener listener, WritePolicy writePolicy, Key key) {
super(cluster, writePolicy, key);
- this.listener = listener;
+ this.writeListener = listener;
+ this.existsListener = null;
+ }
+
+ public AsyncTouch(Cluster cluster, ExistsListener listener, WritePolicy writePolicy, Key key) {
+ super(cluster, writePolicy, key);
+ this.writeListener = null;
+ this.existsListener = listener;
}
@Override
@@ -41,6 +51,15 @@ protected boolean parseResult() {
int resultCode = parseHeader();
if (resultCode == ResultCode.OK) {
+ touched = true;
+ return true;
+ }
+
+ if (resultCode == ResultCode.KEY_NOT_FOUND_ERROR) {
+ if (existsListener == null) {
+ throw new AerospikeException(resultCode);
+ }
+ touched = false;
return true;
}
@@ -48,6 +67,7 @@ protected boolean parseResult() {
if (policy.failOnFilteredOut) {
throw new AerospikeException(resultCode);
}
+ touched = false;
return true;
}
@@ -56,15 +76,21 @@ protected boolean parseResult() {
@Override
protected void onSuccess() {
- if (listener != null) {
- listener.onSuccess(key);
+ if (writeListener != null) {
+ writeListener.onSuccess(key);
+ }
+ else if (existsListener != null) {
+ existsListener.onSuccess(key, touched);
}
}
@Override
protected void onFailure(AerospikeException e) {
- if (listener != null) {
- listener.onFailure(e);
+ if (writeListener != null) {
+ writeListener.onFailure(e);
+ }
+ else if (existsListener != null) {
+ existsListener.onFailure(e);
}
}
}
diff --git a/client/src/com/aerospike/client/async/AsyncTxnAddKeys.java b/client/src/com/aerospike/client/async/AsyncTxnAddKeys.java
index 3ac91732c..987ab5a97 100644
--- a/client/src/com/aerospike/client/async/AsyncTxnAddKeys.java
+++ b/client/src/com/aerospike/client/async/AsyncTxnAddKeys.java
@@ -51,12 +51,6 @@ protected boolean parseResult() {
throw new AerospikeException(rp.resultCode);
}
- @Override
- void onInDoubt() {
- // The MRT monitor record might exist if AsyncTxnAddKeys command is inDoubt.
- policy.txn.setMonitorInDoubt();
- }
-
@Override
protected void onSuccess() {
if (listener != null) {
diff --git a/client/src/com/aerospike/client/async/AsyncTxnRoll.java b/client/src/com/aerospike/client/async/AsyncTxnRoll.java
index 0e0f20686..e983005d8 100644
--- a/client/src/com/aerospike/client/async/AsyncTxnRoll.java
+++ b/client/src/com/aerospike/client/async/AsyncTxnRoll.java
@@ -315,7 +315,7 @@ private void roll(BatchRecordArrayListener rollListener, int txnAttr) {
}
private void closeOnCommit(boolean verified) {
- if (! txn.monitorMightExist()) {
+ if (! txn.closeMonitor()) {
// There is no MRT monitor to remove.
if (verified) {
notifyCommitSuccess(CommitStatus.OK);
@@ -365,7 +365,7 @@ public void onFailure(AerospikeException ae) {
}
private void closeOnAbort() {
- if (! txn.monitorMightExist()) {
+ if (! txn.closeMonitor()) {
// There is no MRT monitor record to remove.
notifyAbortSuccess(AbortStatus.OK);
return;
diff --git a/client/src/com/aerospike/client/command/TouchCommand.java b/client/src/com/aerospike/client/command/TouchCommand.java
index 4432e5efa..63007526f 100644
--- a/client/src/com/aerospike/client/command/TouchCommand.java
+++ b/client/src/com/aerospike/client/command/TouchCommand.java
@@ -26,8 +26,12 @@
import com.aerospike.client.policy.WritePolicy;
public final class TouchCommand extends SyncWriteCommand {
- public TouchCommand(Cluster cluster, WritePolicy writePolicy, Key key) {
+ private boolean failOnNotFound;
+ private boolean touched;
+
+ public TouchCommand(Cluster cluster, WritePolicy writePolicy, Key key, boolean failOnNotFound) {
super(cluster, writePolicy, key);
+ this.failOnNotFound = failOnNotFound;
}
@Override
@@ -40,6 +44,15 @@ protected void parseResult(Connection conn) throws IOException {
int resultCode = parseHeader(conn);
if (resultCode == ResultCode.OK) {
+ touched = true;
+ return;
+ }
+
+ if (resultCode == ResultCode.KEY_NOT_FOUND_ERROR) {
+ if (failOnNotFound) {
+ throw new AerospikeException(resultCode);
+ }
+ touched = false;
return;
}
@@ -47,9 +60,14 @@ protected void parseResult(Connection conn) throws IOException {
if (writePolicy.failOnFilteredOut) {
throw new AerospikeException(resultCode);
}
+ touched = false;
return;
}
throw new AerospikeException(resultCode);
}
+
+ public boolean getTouched() {
+ return touched;
+ }
}
diff --git a/client/src/com/aerospike/client/command/TxnAddKeys.java b/client/src/com/aerospike/client/command/TxnAddKeys.java
index af155384e..dfb103f32 100644
--- a/client/src/com/aerospike/client/command/TxnAddKeys.java
+++ b/client/src/com/aerospike/client/command/TxnAddKeys.java
@@ -48,10 +48,4 @@ protected void parseResult(Connection conn) throws IOException {
throw new AerospikeException(rp.resultCode);
}
-
- @Override
- protected void onInDoubt() {
- // The MRT monitor record might exist if TxnAddKeys command is inDoubt.
- policy.txn.setMonitorInDoubt();
- }
}
diff --git a/client/src/com/aerospike/client/command/TxnRoll.java b/client/src/com/aerospike/client/command/TxnRoll.java
index da21392ce..ee5d13748 100644
--- a/client/src/com/aerospike/client/command/TxnRoll.java
+++ b/client/src/com/aerospike/client/command/TxnRoll.java
@@ -60,7 +60,7 @@ public void verify(BatchPolicy verifyPolicy, BatchPolicy rollPolicy) {
throw createCommitException(CommitError.VERIFY_FAIL_ABORT_ABANDONED, t);
}
- if (txn.monitorMightExist()) {
+ if (txn.closeMonitor()) {
try {
WritePolicy writePolicy = new WritePolicy(rollPolicy);
Key txnKey = TxnMonitor.getTxnMonitorKey(txn);
@@ -130,7 +130,7 @@ else if (ae.getInDoubt()){
return CommitStatus.ROLL_FORWARD_ABANDONED;
}
- if (txn.monitorMightExist()) {
+ if (txn.closeMonitor()) {
// Remove MRT monitor.
try {
close(writePolicy, txnKey);
@@ -165,7 +165,7 @@ public AbortStatus abort(BatchPolicy rollPolicy) {
return AbortStatus.ROLL_BACK_ABANDONED;
}
- if (txn.monitorMightExist()) {
+ if (txn.closeMonitor()) {
try {
WritePolicy writePolicy = new WritePolicy(rollPolicy);
Key txnKey = TxnMonitor.getTxnMonitorKey(txn);
diff --git a/examples/pom.xml b/examples/pom.xml
index 6c36cb6b6..daec43f27 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -6,7 +6,7 @@