records) throws AerospikeException;
+ /**
+ * Asynchronously attempt to commit the given multi-record transaction. First, the expected
+ * record versions are sent to the server nodes for verification. If all nodes return success,
+ * the transaction is committed. Otherwise, the transaction is aborted.
+ *
+ * This method registers the command with an event loop and returns.
+ * The event loop thread will process the command and send the results to the listener.
+ *
+ * Requires server version 8.0+
+ *
+ * @param txn multi-record transaction
+ * @throws AerospikeException if event loop registration fails
+ */
+ Mono commit(Txn txn) throws AerospikeException;
+
+ /**
+ * Asynchronously abort and rollback the given multi-record transaction.
+ *
+ * This method registers the command with an event loop and returns.
+ * The event loop thread will process the command and send the results to the listener.
+ *
+ * Requires server version 8.0+
+ *
+ * @param txn multi-record transaction
+ * @throws AerospikeException if event loop registration fails
+ */
+ Mono abort(Txn txn) throws AerospikeException;
+
/**
* Reactively execute query on all server nodes.
* This method registers the command with an event loop and returns.
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorAbortListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorAbortListener.java
new file mode 100644
index 0000000..d0fd993
--- /dev/null
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorAbortListener.java
@@ -0,0 +1,19 @@
+package com.aerospike.client.reactor.listeners;
+
+import com.aerospike.client.AbortStatus;
+import com.aerospike.client.listener.AbortListener;
+import reactor.core.publisher.MonoSink;
+
+public class ReactorAbortListener implements AbortListener {
+
+ private final MonoSink sink;
+
+ public ReactorAbortListener(MonoSink sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public void onSuccess(AbortStatus abortStatus) {
+ sink.success(abortStatus);
+ }
+}
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorBatchSequenceListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorBatchSequenceListener.java
index 14aff50..33879d6 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorBatchSequenceListener.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorBatchSequenceListener.java
@@ -33,10 +33,12 @@ public ReactorBatchSequenceListener(FluxSink sink) {
public void onRecord(BatchRead record) {
sink.next(record);
}
+
@Override
public void onSuccess() {
sink.complete();
}
+
@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorCommitListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorCommitListener.java
new file mode 100644
index 0000000..95f5c04
--- /dev/null
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorCommitListener.java
@@ -0,0 +1,25 @@
+package com.aerospike.client.reactor.listeners;
+
+import com.aerospike.client.AerospikeException;
+import com.aerospike.client.CommitStatus;
+import com.aerospike.client.listener.CommitListener;
+import reactor.core.publisher.MonoSink;
+
+public class ReactorCommitListener implements CommitListener {
+
+ private final MonoSink sink;
+
+ public ReactorCommitListener(MonoSink sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public void onSuccess(CommitStatus commitStatus) {
+ sink.success(commitStatus);
+ }
+
+ @Override
+ public void onFailure(AerospikeException.Commit commit) {
+ sink.error(commit);
+ }
+}
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorDeleteListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorDeleteListener.java
index df6b4cc..eb4271f 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorDeleteListener.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorDeleteListener.java
@@ -37,6 +37,7 @@ public void onSuccess(Key key, boolean existed) {
sink.success();
}
}
+
@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsListener.java
index 1132e41..8e4533e 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsListener.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsListener.java
@@ -37,6 +37,7 @@ public void onSuccess(Key key, boolean exists) {
sink.success();
}
}
+
@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsSequenceListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsSequenceListener.java
index 52851d5..791580a 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsSequenceListener.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsSequenceListener.java
@@ -34,10 +34,12 @@ public ReactorExistsSequenceListener(FluxSink sink) {
public void onExists(Key key, boolean exists) {
sink.next(new KeyExists(key, exists));
}
+
@Override
public void onSuccess() {
sink.complete();
}
+
@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorIndexListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorIndexListener.java
index 4910009..ee6a09d 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorIndexListener.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorIndexListener.java
@@ -22,5 +22,4 @@ public void onSuccess(AsyncIndexTask indexTask) {
public void onFailure(AerospikeException ae) {
sink.error(ae);
}
-
}
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordListener.java
index 3275203..2778468 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordListener.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordListener.java
@@ -34,6 +34,7 @@ public ReactorRecordListener(MonoSink sink) {
public void onSuccess(Key key, Record record) {
sink.success(new KeyRecord(key, record));
}
+
@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordSequenceListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordSequenceListener.java
index 6297775..dd53fbf 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordSequenceListener.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordSequenceListener.java
@@ -35,13 +35,14 @@ public ReactorRecordSequenceListener(FluxSink sink) {
public void onRecord(Key key, Record record) throws AerospikeException {
sink.next(new KeyRecord(key, record));
}
+
@Override
public void onSuccess() {
sink.complete();
}
+
@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
}
-
}
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorTaskStatusListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorTaskStatusListener.java
index d63ad9d..71fe25b 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorTaskStatusListener.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorTaskStatusListener.java
@@ -21,5 +21,4 @@ public void onSuccess(int status) {
public void onFailure(AerospikeException ae) {
sink.error(ae);
}
-
}
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorWriteListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorWriteListener.java
index c056700..a26e8bc 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorWriteListener.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorWriteListener.java
@@ -33,9 +33,9 @@ public ReactorWriteListener(MonoSink sink) {
public void onSuccess(Key key) {
sink.success(key);
}
+
@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
}
-
}
diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/retry/AerospikeReactorRetryClient.java b/reactor-client/src/main/java/com/aerospike/client/reactor/retry/AerospikeReactorRetryClient.java
index 2d69c3e..7556db9 100644
--- a/reactor-client/src/main/java/com/aerospike/client/reactor/retry/AerospikeReactorRetryClient.java
+++ b/reactor-client/src/main/java/com/aerospike/client/reactor/retry/AerospikeReactorRetryClient.java
@@ -265,6 +265,16 @@ public Mono operate(BatchPolicy policy, List records) thro
return client.operate(policy, records).retryWhen(retryPolicy);
}
+ @Override
+ public Mono commit(Txn txn) throws AerospikeException {
+ return client.commit(txn).retryWhen(retryPolicy);
+ }
+
+ @Override
+ public Mono abort(Txn txn) throws AerospikeException {
+ return client.abort(txn).retryWhen(retryPolicy);
+ }
+
@Override
public final Flux query(Statement statement) throws AerospikeException {
return query(null, statement);
@@ -353,4 +363,14 @@ public BatchPolicy getBatchPolicyDefault() {
public InfoPolicy getInfoPolicyDefault() {
return client.getInfoPolicyDefault();
}
+
+ @Override
+ public TxnVerifyPolicy getTxnVerifyPolicyDefault() {
+ return client.getTxnVerifyPolicyDefault();
+ }
+
+ @Override
+ public TxnRollPolicy getTxnRollPolicyDefault() {
+ return client.getTxnRollPolicyDefault();
+ }
}