From 3a73c5c8b62501097b61f235ad0ceb595ead6f53 Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Wed, 11 Dec 2024 15:59:35 +0200 Subject: [PATCH] FMWK-625 Add support for reactive MRT methods --- pom.xml | 4 +-- reactor-client/pom.xml | 2 +- .../reactor/AerospikeReactorClient.java | 24 +++++++++++++++- .../client/reactor/DefaultPolicyProvider.java | 4 +++ .../reactor/IAerospikeReactorClient.java | 28 +++++++++++++++++++ .../listeners/ReactorAbortListener.java | 19 +++++++++++++ .../ReactorBatchSequenceListener.java | 2 ++ .../listeners/ReactorCommitListener.java | 25 +++++++++++++++++ .../listeners/ReactorDeleteListener.java | 1 + .../listeners/ReactorExistsListener.java | 1 + .../ReactorExistsSequenceListener.java | 2 ++ .../listeners/ReactorIndexListener.java | 1 - .../listeners/ReactorRecordListener.java | 1 + .../ReactorRecordSequenceListener.java | 3 +- .../listeners/ReactorTaskStatusListener.java | 1 - .../listeners/ReactorWriteListener.java | 2 +- .../retry/AerospikeReactorRetryClient.java | 20 +++++++++++++ 17 files changed, 132 insertions(+), 8 deletions(-) create mode 100644 reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorAbortListener.java create mode 100644 reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorCommitListener.java diff --git a/pom.xml b/pom.xml index d688069..ea5c5fe 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ aerospike-client-java-reactive aerospike-client-java-reactive https://github.com/aerospike/aerospike-client-java-reactive - 8.1.2 + 9.0.2 pom @@ -29,7 +29,7 @@ 3.2.1 1.6 - 8.1.2 + 9.0.2 4.1.111.Final 1.5.0 4.13.2 diff --git a/reactor-client/pom.xml b/reactor-client/pom.xml index 014b6f6..87ee623 100644 --- a/reactor-client/pom.xml +++ b/reactor-client/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-client-java-reactive - 8.1.2 + 9.0.2 aerospike-reactor-client jar diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/AerospikeReactorClient.java b/reactor-client/src/main/java/com/aerospike/client/reactor/AerospikeReactorClient.java index ccb990e..6102929 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/AerospikeReactorClient.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/AerospikeReactorClient.java @@ -325,6 +325,18 @@ public Mono operate(BatchPolicy policy, List records) thro null, new ReactorBatchOperateListListener(sink), policy, records)); } + @Override + public Mono commit(Txn txn) throws AerospikeException { + return Mono.create(sink -> aerospikeClient.commit( + null, new ReactorCommitListener(sink), txn)); + } + + @Override + public Mono abort(Txn txn) throws AerospikeException { + return Mono.create(sink -> aerospikeClient.abort( + null, new ReactorAbortListener(sink), txn)); + } + @Override public final Flux query(Statement statement) throws AerospikeException { return query(null, statement); @@ -404,7 +416,7 @@ public IAerospikeClient getAerospikeClient() { private Mono createIndexImpl(Policy policy, String namespace, String setName, String indexName, String binName, IndexType indexType, IndexCollectionType indexCollectionType, CTX... ctx){ - return Mono.create(sink -> aerospikeClient.createIndex(null, + return Mono.create(sink -> aerospikeClient.createIndex(null, new ReactorIndexListener(sink), policy, namespace, setName, indexName, binName, indexType, indexCollectionType, ctx)); } @@ -460,4 +472,14 @@ public BatchPolicy getBatchPolicyDefault() { public InfoPolicy getInfoPolicyDefault() { return aerospikeClient.getInfoPolicyDefault(); } + + @Override + public TxnVerifyPolicy getTxnVerifyPolicyDefault() { + return aerospikeClient.getTxnVerifyPolicyDefault(); + } + + @Override + public TxnRollPolicy getTxnRollPolicyDefault() { + return aerospikeClient.getTxnRollPolicyDefault(); + } } diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/DefaultPolicyProvider.java b/reactor-client/src/main/java/com/aerospike/client/reactor/DefaultPolicyProvider.java index 45185af..828b941 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/DefaultPolicyProvider.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/DefaultPolicyProvider.java @@ -15,4 +15,8 @@ public interface DefaultPolicyProvider { BatchPolicy getBatchPolicyDefault(); InfoPolicy getInfoPolicyDefault(); + + TxnVerifyPolicy getTxnVerifyPolicyDefault(); + + TxnRollPolicy getTxnRollPolicyDefault(); } diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/IAerospikeReactorClient.java b/reactor-client/src/main/java/com/aerospike/client/reactor/IAerospikeReactorClient.java index 9c28150..9d12729 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/IAerospikeReactorClient.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/IAerospikeReactorClient.java @@ -644,6 +644,34 @@ Mono operate(BatchPolicy batchPolicy, */ Mono operate(BatchPolicy policy, List records) throws AerospikeException; + /** + * Asynchronously attempt to commit the given multi-record transaction. First, the expected + * record versions are sent to the server nodes for verification. If all nodes return success, + * the transaction is committed. Otherwise, the transaction is aborted. + *

+ * This method registers the command with an event loop and returns. + * The event loop thread will process the command and send the results to the listener. + *

+ * Requires server version 8.0+ + * + * @param txn multi-record transaction + * @throws AerospikeException if event loop registration fails + */ + Mono commit(Txn txn) throws AerospikeException; + + /** + * Asynchronously abort and rollback the given multi-record transaction. + *

+ * This method registers the command with an event loop and returns. + * The event loop thread will process the command and send the results to the listener. + *

+ * Requires server version 8.0+ + * + * @param txn multi-record transaction + * @throws AerospikeException if event loop registration fails + */ + Mono abort(Txn txn) throws AerospikeException; + /** * Reactively execute query on all server nodes. * This method registers the command with an event loop and returns. diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorAbortListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorAbortListener.java new file mode 100644 index 0000000..d0fd993 --- /dev/null +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorAbortListener.java @@ -0,0 +1,19 @@ +package com.aerospike.client.reactor.listeners; + +import com.aerospike.client.AbortStatus; +import com.aerospike.client.listener.AbortListener; +import reactor.core.publisher.MonoSink; + +public class ReactorAbortListener implements AbortListener { + + private final MonoSink sink; + + public ReactorAbortListener(MonoSink sink) { + this.sink = sink; + } + + @Override + public void onSuccess(AbortStatus abortStatus) { + sink.success(abortStatus); + } +} diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorBatchSequenceListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorBatchSequenceListener.java index 14aff50..33879d6 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorBatchSequenceListener.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorBatchSequenceListener.java @@ -33,10 +33,12 @@ public ReactorBatchSequenceListener(FluxSink sink) { public void onRecord(BatchRead record) { sink.next(record); } + @Override public void onSuccess() { sink.complete(); } + @Override public void onFailure(AerospikeException exception) { sink.error(exception); diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorCommitListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorCommitListener.java new file mode 100644 index 0000000..95f5c04 --- /dev/null +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorCommitListener.java @@ -0,0 +1,25 @@ +package com.aerospike.client.reactor.listeners; + +import com.aerospike.client.AerospikeException; +import com.aerospike.client.CommitStatus; +import com.aerospike.client.listener.CommitListener; +import reactor.core.publisher.MonoSink; + +public class ReactorCommitListener implements CommitListener { + + private final MonoSink sink; + + public ReactorCommitListener(MonoSink sink) { + this.sink = sink; + } + + @Override + public void onSuccess(CommitStatus commitStatus) { + sink.success(commitStatus); + } + + @Override + public void onFailure(AerospikeException.Commit commit) { + sink.error(commit); + } +} diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorDeleteListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorDeleteListener.java index df6b4cc..eb4271f 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorDeleteListener.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorDeleteListener.java @@ -37,6 +37,7 @@ public void onSuccess(Key key, boolean existed) { sink.success(); } } + @Override public void onFailure(AerospikeException exception) { sink.error(exception); diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsListener.java index 1132e41..8e4533e 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsListener.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsListener.java @@ -37,6 +37,7 @@ public void onSuccess(Key key, boolean exists) { sink.success(); } } + @Override public void onFailure(AerospikeException exception) { sink.error(exception); diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsSequenceListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsSequenceListener.java index 52851d5..791580a 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsSequenceListener.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorExistsSequenceListener.java @@ -34,10 +34,12 @@ public ReactorExistsSequenceListener(FluxSink sink) { public void onExists(Key key, boolean exists) { sink.next(new KeyExists(key, exists)); } + @Override public void onSuccess() { sink.complete(); } + @Override public void onFailure(AerospikeException exception) { sink.error(exception); diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorIndexListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorIndexListener.java index 4910009..ee6a09d 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorIndexListener.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorIndexListener.java @@ -22,5 +22,4 @@ public void onSuccess(AsyncIndexTask indexTask) { public void onFailure(AerospikeException ae) { sink.error(ae); } - } diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordListener.java index 3275203..2778468 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordListener.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordListener.java @@ -34,6 +34,7 @@ public ReactorRecordListener(MonoSink sink) { public void onSuccess(Key key, Record record) { sink.success(new KeyRecord(key, record)); } + @Override public void onFailure(AerospikeException exception) { sink.error(exception); diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordSequenceListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordSequenceListener.java index 6297775..dd53fbf 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordSequenceListener.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorRecordSequenceListener.java @@ -35,13 +35,14 @@ public ReactorRecordSequenceListener(FluxSink sink) { public void onRecord(Key key, Record record) throws AerospikeException { sink.next(new KeyRecord(key, record)); } + @Override public void onSuccess() { sink.complete(); } + @Override public void onFailure(AerospikeException exception) { sink.error(exception); } - } diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorTaskStatusListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorTaskStatusListener.java index d63ad9d..71fe25b 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorTaskStatusListener.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorTaskStatusListener.java @@ -21,5 +21,4 @@ public void onSuccess(int status) { public void onFailure(AerospikeException ae) { sink.error(ae); } - } diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorWriteListener.java b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorWriteListener.java index c056700..a26e8bc 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorWriteListener.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/listeners/ReactorWriteListener.java @@ -33,9 +33,9 @@ public ReactorWriteListener(MonoSink sink) { public void onSuccess(Key key) { sink.success(key); } + @Override public void onFailure(AerospikeException exception) { sink.error(exception); } - } diff --git a/reactor-client/src/main/java/com/aerospike/client/reactor/retry/AerospikeReactorRetryClient.java b/reactor-client/src/main/java/com/aerospike/client/reactor/retry/AerospikeReactorRetryClient.java index 2d69c3e..7556db9 100644 --- a/reactor-client/src/main/java/com/aerospike/client/reactor/retry/AerospikeReactorRetryClient.java +++ b/reactor-client/src/main/java/com/aerospike/client/reactor/retry/AerospikeReactorRetryClient.java @@ -265,6 +265,16 @@ public Mono operate(BatchPolicy policy, List records) thro return client.operate(policy, records).retryWhen(retryPolicy); } + @Override + public Mono commit(Txn txn) throws AerospikeException { + return client.commit(txn).retryWhen(retryPolicy); + } + + @Override + public Mono abort(Txn txn) throws AerospikeException { + return client.abort(txn).retryWhen(retryPolicy); + } + @Override public final Flux query(Statement statement) throws AerospikeException { return query(null, statement); @@ -353,4 +363,14 @@ public BatchPolicy getBatchPolicyDefault() { public InfoPolicy getInfoPolicyDefault() { return client.getInfoPolicyDefault(); } + + @Override + public TxnVerifyPolicy getTxnVerifyPolicyDefault() { + return client.getTxnVerifyPolicyDefault(); + } + + @Override + public TxnRollPolicy getTxnRollPolicyDefault() { + return client.getTxnRollPolicyDefault(); + } }