Skip to content

Commit

Permalink
FMWK-625 Add support for reactive MRT methods (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Dec 11, 2024
1 parent ad13e21 commit b5ff159
Show file tree
Hide file tree
Showing 17 changed files with 132 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<name>aerospike-client-java-reactive</name>
<description>aerospike-client-java-reactive</description>
<url>https://github.com/aerospike/aerospike-client-java-reactive</url>
<version>8.1.2</version>
<version>9.0.2</version>
<packaging>pom</packaging>

<modules>
Expand All @@ -29,7 +29,7 @@
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>

<aerospike-client.version>8.1.2</aerospike-client.version>
<aerospike-client.version>9.0.2</aerospike-client.version>
<netty.version>4.1.111.Final</netty.version>
<commons-cli.version>1.5.0</commons-cli.version>
<junit.version>4.13.2</junit.version>
Expand Down
2 changes: 1 addition & 1 deletion reactor-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client-java-reactive</artifactId>
<version>8.1.2</version>
<version>9.0.2</version>
</parent>
<artifactId>aerospike-reactor-client</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,18 @@ public Mono<Boolean> operate(BatchPolicy policy, List<BatchRecord> records) thro
null, new ReactorBatchOperateListListener(sink), policy, records));
}

@Override
public Mono<CommitStatus> commit(Txn txn) throws AerospikeException {
return Mono.create(sink -> aerospikeClient.commit(
null, new ReactorCommitListener(sink), txn));
}

@Override
public Mono<AbortStatus> abort(Txn txn) throws AerospikeException {
return Mono.create(sink -> aerospikeClient.abort(
null, new ReactorAbortListener(sink), txn));
}

@Override
public final Flux<KeyRecord> query(Statement statement) throws AerospikeException {
return query(null, statement);
Expand Down Expand Up @@ -404,7 +416,7 @@ public IAerospikeClient getAerospikeClient() {
private Mono<AsyncIndexTask> createIndexImpl(Policy policy,
String namespace, String setName, String indexName, String binName,
IndexType indexType, IndexCollectionType indexCollectionType, CTX... ctx){
return Mono.create(sink -> aerospikeClient.createIndex(null,
return Mono.create(sink -> aerospikeClient.createIndex(null,
new ReactorIndexListener(sink), policy, namespace, setName, indexName, binName,
indexType, indexCollectionType, ctx));
}
Expand Down Expand Up @@ -460,4 +472,14 @@ public BatchPolicy getBatchPolicyDefault() {
public InfoPolicy getInfoPolicyDefault() {
return aerospikeClient.getInfoPolicyDefault();
}

@Override
public TxnVerifyPolicy getTxnVerifyPolicyDefault() {
return aerospikeClient.getTxnVerifyPolicyDefault();
}

@Override
public TxnRollPolicy getTxnRollPolicyDefault() {
return aerospikeClient.getTxnRollPolicyDefault();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ public interface DefaultPolicyProvider {
BatchPolicy getBatchPolicyDefault();

InfoPolicy getInfoPolicyDefault();

TxnVerifyPolicy getTxnVerifyPolicyDefault();

TxnRollPolicy getTxnRollPolicyDefault();
}
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,34 @@ Mono<BatchResults> operate(BatchPolicy batchPolicy,
*/
Mono<Boolean> operate(BatchPolicy policy, List<BatchRecord> records) throws AerospikeException;

/**
* Asynchronously attempt to commit the given multi-record transaction. First, the expected
* record versions are sent to the server nodes for verification. If all nodes return success,
* the transaction is committed. Otherwise, the transaction is aborted.
* <p>
* This method registers the command with an event loop and returns.
* The event loop thread will process the command and send the results to the listener.
* <p>
* Requires server version 8.0+
*
* @param txn multi-record transaction
* @throws AerospikeException if event loop registration fails
*/
Mono<CommitStatus> commit(Txn txn) throws AerospikeException;

/**
* Asynchronously abort and rollback the given multi-record transaction.
* <p>
* This method registers the command with an event loop and returns.
* The event loop thread will process the command and send the results to the listener.
* <p>
* Requires server version 8.0+
*
* @param txn multi-record transaction
* @throws AerospikeException if event loop registration fails
*/
Mono<AbortStatus> abort(Txn txn) throws AerospikeException;

/**
* Reactively execute query on all server nodes.
* This method registers the command with an event loop and returns.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.aerospike.client.reactor.listeners;

import com.aerospike.client.AbortStatus;
import com.aerospike.client.listener.AbortListener;
import reactor.core.publisher.MonoSink;

public class ReactorAbortListener implements AbortListener {

private final MonoSink<AbortStatus> sink;

public ReactorAbortListener(MonoSink<AbortStatus> sink) {
this.sink = sink;
}

@Override
public void onSuccess(AbortStatus abortStatus) {
sink.success(abortStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ public ReactorBatchSequenceListener(FluxSink<BatchRead> sink) {
public void onRecord(BatchRead record) {
sink.next(record);
}

@Override
public void onSuccess() {
sink.complete();
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.aerospike.client.reactor.listeners;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.CommitStatus;
import com.aerospike.client.listener.CommitListener;
import reactor.core.publisher.MonoSink;

public class ReactorCommitListener implements CommitListener {

private final MonoSink<CommitStatus> sink;

public ReactorCommitListener(MonoSink<CommitStatus> sink) {
this.sink = sink;
}

@Override
public void onSuccess(CommitStatus commitStatus) {
sink.success(commitStatus);
}

@Override
public void onFailure(AerospikeException.Commit commit) {
sink.error(commit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void onSuccess(Key key, boolean existed) {
sink.success();
}
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void onSuccess(Key key, boolean exists) {
sink.success();
}
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ public ReactorExistsSequenceListener(FluxSink<KeyExists> sink) {
public void onExists(Key key, boolean exists) {
sink.next(new KeyExists(key, exists));
}

@Override
public void onSuccess() {
sink.complete();
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ public void onSuccess(AsyncIndexTask indexTask) {
public void onFailure(AerospikeException ae) {
sink.error(ae);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public ReactorRecordListener(MonoSink<KeyRecord> sink) {
public void onSuccess(Key key, Record record) {
sink.success(new KeyRecord(key, record));
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public ReactorRecordSequenceListener(FluxSink<KeyRecord> sink) {
public void onRecord(Key key, Record record) throws AerospikeException {
sink.next(new KeyRecord(key, record));
}

@Override
public void onSuccess() {
sink.complete();
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ public void onSuccess(int status) {
public void onFailure(AerospikeException ae) {
sink.error(ae);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public ReactorWriteListener(MonoSink<Key> sink) {
public void onSuccess(Key key) {
sink.success(key);
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,16 @@ public Mono<Boolean> operate(BatchPolicy policy, List<BatchRecord> records) thro
return client.operate(policy, records).retryWhen(retryPolicy);
}

@Override
public Mono<CommitStatus> commit(Txn txn) throws AerospikeException {
return client.commit(txn).retryWhen(retryPolicy);
}

@Override
public Mono<AbortStatus> abort(Txn txn) throws AerospikeException {
return client.abort(txn).retryWhen(retryPolicy);
}

@Override
public final Flux<KeyRecord> query(Statement statement) throws AerospikeException {
return query(null, statement);
Expand Down Expand Up @@ -353,4 +363,14 @@ public BatchPolicy getBatchPolicyDefault() {
public InfoPolicy getInfoPolicyDefault() {
return client.getInfoPolicyDefault();
}

@Override
public TxnVerifyPolicy getTxnVerifyPolicyDefault() {
return client.getTxnVerifyPolicyDefault();
}

@Override
public TxnRollPolicy getTxnRollPolicyDefault() {
return client.getTxnRollPolicyDefault();
}
}

0 comments on commit b5ff159

Please sign in to comment.