Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-625 Add support for reactive MRT methods #24

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<name>aerospike-client-java-reactive</name>
<description>aerospike-client-java-reactive</description>
<url>https://github.com/aerospike/aerospike-client-java-reactive</url>
<version>8.1.2</version>
<version>9.0.2</version>
<packaging>pom</packaging>

<modules>
Expand All @@ -29,7 +29,7 @@
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>

<aerospike-client.version>8.1.2</aerospike-client.version>
<aerospike-client.version>9.0.2</aerospike-client.version>
<netty.version>4.1.111.Final</netty.version>
<commons-cli.version>1.5.0</commons-cli.version>
<junit.version>4.13.2</junit.version>
Expand Down
2 changes: 1 addition & 1 deletion reactor-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client-java-reactive</artifactId>
<version>8.1.2</version>
<version>9.0.2</version>
</parent>
<artifactId>aerospike-reactor-client</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,18 @@ public Mono<Boolean> operate(BatchPolicy policy, List<BatchRecord> records) thro
null, new ReactorBatchOperateListListener(sink), policy, records));
}

@Override
public Mono<CommitStatus> commit(Txn txn) throws AerospikeException {
return Mono.create(sink -> aerospikeClient.commit(
null, new ReactorCommitListener(sink), txn));
}

@Override
public Mono<AbortStatus> abort(Txn txn) throws AerospikeException {
return Mono.create(sink -> aerospikeClient.abort(
null, new ReactorAbortListener(sink), txn));
}

@Override
public final Flux<KeyRecord> query(Statement statement) throws AerospikeException {
return query(null, statement);
Expand Down Expand Up @@ -404,7 +416,7 @@ public IAerospikeClient getAerospikeClient() {
private Mono<AsyncIndexTask> createIndexImpl(Policy policy,
String namespace, String setName, String indexName, String binName,
IndexType indexType, IndexCollectionType indexCollectionType, CTX... ctx){
return Mono.create(sink -> aerospikeClient.createIndex(null,
return Mono.create(sink -> aerospikeClient.createIndex(null,
new ReactorIndexListener(sink), policy, namespace, setName, indexName, binName,
indexType, indexCollectionType, ctx));
}
Expand Down Expand Up @@ -460,4 +472,14 @@ public BatchPolicy getBatchPolicyDefault() {
public InfoPolicy getInfoPolicyDefault() {
return aerospikeClient.getInfoPolicyDefault();
}

@Override
public TxnVerifyPolicy getTxnVerifyPolicyDefault() {
return aerospikeClient.getTxnVerifyPolicyDefault();
}

@Override
public TxnRollPolicy getTxnRollPolicyDefault() {
return aerospikeClient.getTxnRollPolicyDefault();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ public interface DefaultPolicyProvider {
BatchPolicy getBatchPolicyDefault();

InfoPolicy getInfoPolicyDefault();

TxnVerifyPolicy getTxnVerifyPolicyDefault();

TxnRollPolicy getTxnRollPolicyDefault();
}
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,34 @@ Mono<BatchResults> operate(BatchPolicy batchPolicy,
*/
Mono<Boolean> operate(BatchPolicy policy, List<BatchRecord> records) throws AerospikeException;

/**
* Asynchronously attempt to commit the given multi-record transaction. First, the expected
* record versions are sent to the server nodes for verification. If all nodes return success,
* the transaction is committed. Otherwise, the transaction is aborted.
* <p>
* This method registers the command with an event loop and returns.
* The event loop thread will process the command and send the results to the listener.
* <p>
* Requires server version 8.0+
*
* @param txn multi-record transaction
* @throws AerospikeException if event loop registration fails
*/
Mono<CommitStatus> commit(Txn txn) throws AerospikeException;

/**
* Asynchronously abort and rollback the given multi-record transaction.
* <p>
* This method registers the command with an event loop and returns.
* The event loop thread will process the command and send the results to the listener.
* <p>
* Requires server version 8.0+
*
* @param txn multi-record transaction
* @throws AerospikeException if event loop registration fails
*/
Mono<AbortStatus> abort(Txn txn) throws AerospikeException;

/**
* Reactively execute query on all server nodes.
* This method registers the command with an event loop and returns.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.aerospike.client.reactor.listeners;

import com.aerospike.client.AbortStatus;
import com.aerospike.client.listener.AbortListener;
import reactor.core.publisher.MonoSink;

public class ReactorAbortListener implements AbortListener {

private final MonoSink<AbortStatus> sink;

public ReactorAbortListener(MonoSink<AbortStatus> sink) {
this.sink = sink;
}

@Override
public void onSuccess(AbortStatus abortStatus) {
sink.success(abortStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ public ReactorBatchSequenceListener(FluxSink<BatchRead> sink) {
public void onRecord(BatchRead record) {
sink.next(record);
}

@Override
public void onSuccess() {
sink.complete();
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.aerospike.client.reactor.listeners;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.CommitStatus;
import com.aerospike.client.listener.CommitListener;
import reactor.core.publisher.MonoSink;

public class ReactorCommitListener implements CommitListener {

private final MonoSink<CommitStatus> sink;

public ReactorCommitListener(MonoSink<CommitStatus> sink) {
this.sink = sink;
}

@Override
public void onSuccess(CommitStatus commitStatus) {
sink.success(commitStatus);
}

@Override
public void onFailure(AerospikeException.Commit commit) {
sink.error(commit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void onSuccess(Key key, boolean existed) {
sink.success();
}
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void onSuccess(Key key, boolean exists) {
sink.success();
}
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ public ReactorExistsSequenceListener(FluxSink<KeyExists> sink) {
public void onExists(Key key, boolean exists) {
sink.next(new KeyExists(key, exists));
}

@Override
public void onSuccess() {
sink.complete();
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ public void onSuccess(AsyncIndexTask indexTask) {
public void onFailure(AerospikeException ae) {
sink.error(ae);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public ReactorRecordListener(MonoSink<KeyRecord> sink) {
public void onSuccess(Key key, Record record) {
sink.success(new KeyRecord(key, record));
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public ReactorRecordSequenceListener(FluxSink<KeyRecord> sink) {
public void onRecord(Key key, Record record) throws AerospikeException {
sink.next(new KeyRecord(key, record));
}

@Override
public void onSuccess() {
sink.complete();
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ public void onSuccess(int status) {
public void onFailure(AerospikeException ae) {
sink.error(ae);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public ReactorWriteListener(MonoSink<Key> sink) {
public void onSuccess(Key key) {
sink.success(key);
}

@Override
public void onFailure(AerospikeException exception) {
sink.error(exception);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,16 @@ public Mono<Boolean> operate(BatchPolicy policy, List<BatchRecord> records) thro
return client.operate(policy, records).retryWhen(retryPolicy);
}

@Override
public Mono<CommitStatus> commit(Txn txn) throws AerospikeException {
return client.commit(txn).retryWhen(retryPolicy);
}

@Override
public Mono<AbortStatus> abort(Txn txn) throws AerospikeException {
return client.abort(txn).retryWhen(retryPolicy);
}

@Override
public final Flux<KeyRecord> query(Statement statement) throws AerospikeException {
return query(null, statement);
Expand Down Expand Up @@ -353,4 +363,14 @@ public BatchPolicy getBatchPolicyDefault() {
public InfoPolicy getInfoPolicyDefault() {
return client.getInfoPolicyDefault();
}

@Override
public TxnVerifyPolicy getTxnVerifyPolicyDefault() {
return client.getTxnVerifyPolicyDefault();
}

@Override
public TxnRollPolicy getTxnRollPolicyDefault() {
return client.getTxnRollPolicyDefault();
}
}
Loading