diff --git a/pom.xml b/pom.xml
index 13d61713..2768a90e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,9 +29,10 @@
3.3.0
3.3.0
4.1.2
+ 6.1.11
3.3.0
1.6
- 8.1.2
+ 9.0.0
8.1.2
8.1.2
3.6.8
@@ -193,6 +194,11 @@
${springdata.spring-boot}
compile
+
+ org.springframework
+ spring-tx
+ ${spring-tx}
+
com.aerospike
aerospike-client-jdk8
diff --git a/src/main/asciidoc/reference/transactions.adoc b/src/main/asciidoc/reference/transactions.adoc
new file mode 100644
index 00000000..e7932589
--- /dev/null
+++ b/src/main/asciidoc/reference/transactions.adoc
@@ -0,0 +1,149 @@
+[[transactions]]
+= Transactions
+
+In the context of database operations, a transaction is a sequence of statements that are executed as a single unit of work. Transactions typically follow the A.C.I.D. principle:
+[arabic]
+. **Atomicity** ensures that a transaction is treated as a single, indivisible unit; either all operations within the
+transaction are completed successfully, or none of them are applied.
+. **Consistency** ensures that a transaction brings the database from one valid state to another, maintaining all
+predefined rules and constraints.
+. **Isolation** ensures that transactions operate independently of one another, so that intermediate states of a
+transaction are not visible to others.
+. **Durability** guarantees that once a transaction has been committed, its changes are permanent.
+
+== Choosing Transaction Management Model
+
+Spring offers two models of transaction management: **declarative** and **programmatic**. When choosing between them,
+consider the complexity and requirements of your application.
+
+**Declarative transaction management** is typically preferred for its simplicity and ease of maintenance, as it allows
+to define transaction boundaries using annotations without altering the business logic code.
+This model suits for most applications where transaction boundaries are straightforward and the business logic
+does not require intricate transaction control.
+
+**Programmatic transaction management** is chosen when you need more fine-grained control over transactions,
+such as handling complex transaction scenarios.
+This approach is useful in situations where specific transaction behavior needs to be dynamically adjusted
+or when integrating with legacy code that requires explicit transaction management. When using this approach,
+it is possible to explicitly start, commit, and rollback transactions within the code if needed.
+
+In general, declarative management is more straightforward and reduces boilerplate code,
+while programmatic management offers more control but at the cost of increased complexity.
+
+== Declarative Transaction Management
+
+Declarative transaction management uses annotations to define transaction boundaries and behavior without changing
+the business logic code. It’s usually more common in Spring applications due to its simplicity and ease of use.
+
+You can annotate methods and/or classes with `@Transactional` to automatically handle transactions, including
+committing or rolling back based on execution.
+
+Couple other things needed to start working with transactions using declarative approach:
+[arabic]
+. A transaction manager must be specified in your Spring Configuration.
+. Spring Configuration must be annotated with the `@EnableTransactionManagement` annotation.
+
+=== Example
+
+Here is an example that shows applying `@Transactional` to a method.
+It ensures that the entire method runs within a transaction context, and Spring manages the transaction lifecycle
+(automatically committing the transaction if the method succeeds or rolling back if it encounters an exception).
+
+[source,java]
+----
+@Configuration
+@EnableTransactionManagement
+public class Config {
+
+ @Bean
+ public AerospikeTransactionManager aerospikeTransactionManager(IAerospikeClient client) {
+ return new AerospikeTransactionManager(client);
+ }
+
+ // Other configuration
+}
+
+@Service
+public class MyService {
+
+ @Transactional
+ public void performDatabaseOperations() {
+ // Perform database operations
+ }
+}
+----
+
+== Programmatic Transaction Management
+
+Programmatic transaction management gives developers fine-grained control over transactions through code.
+This approach involves manually managing transactions using Spring’s API.
+
+The Spring Framework offers two ways for programmatic transaction management:
+
+[arabic]
+. Using `TransactionTemplate` or `TransactionalOperator` which use callback approach
+(for programmatic transaction management in imperative code it is typically recommended to use `TransactionTemplate`;
+for reactive code, `TransactionalOperator` is preferred).
+. Directly using a `TransactionManager` implementation.
+
+=== Example
+
+Here is an example that shows using a programmatic transaction in a method.
+You would use `TransactionTemplate` to wrap your database operations in a transaction block,
+ensuring the transaction is automatically committed if successful or rolled back if an exception occurs.
+
+[source,java]
+----
+@Configuration
+public class Config {
+
+ @Bean
+ public AerospikeTransactionManager aerospikeTransactionManager(IAerospikeClient client) {
+ return new AerospikeTransactionManager(client);
+ }
+
+ @Bean
+ public TransactionTemplate transactionTemplate(AerospikeTransactionManager transactionManager) {
+ return new TransactionTemplate(transactionManager);
+ }
+
+ // Other configuration
+}
+
+@Service
+public class MyService {
+
+ @Autowired
+ TransactionTemplate transactionTemplate;
+
+ public void performDatabaseOperations() {
+ transactionTemplate.executeWithoutResult(status -> {
+ // Perform database operations
+ });
+ }
+}
+----
+
+== Aerospike Operations Support
+
+Behind the curtains Aerospike transaction manager uses MRTs (multi-record transactions)
+which is an Aerospike feature allowing to group together multiple Aerospike operation requests
+into a single transaction.
+
+NOTE: Not all of the Aerospike operations can participate in transactions.
+
+Here is a list of Aerospike operations that participate in transactions:
+
+[arabic]
+. all single record operations (`insert`, `save`, `update`, `add`, `append`, `persist`, `findById`, `exists`, `delete`)
+. all batch operations without query (`insertAll`, `saveAll`, `findByIds`, `deleteAll`)
+. queries that include `id` (e.g., repository queries like `findByIdAndName`)
+
+The following operations do not participate in transactions
+(will not become part of a transaction if included into it):
+
+[arabic]
+. `truncate`
+. queries that do not include `id` (e.g., repository queries like `findByName`)
+. operations that perform info commands (e.g., `indexExists`)
+. operations that perform scans (using ScanPolicy)
diff --git a/src/main/java/org/springframework/data/aerospike/config/AbstractAerospikeDataConfiguration.java b/src/main/java/org/springframework/data/aerospike/config/AbstractAerospikeDataConfiguration.java
index bdff2148..db198c87 100644
--- a/src/main/java/org/springframework/data/aerospike/config/AbstractAerospikeDataConfiguration.java
+++ b/src/main/java/org/springframework/data/aerospike/config/AbstractAerospikeDataConfiguration.java
@@ -72,7 +72,7 @@ public QueryEngine queryEngine(IAerospikeClient aerospikeClient,
return queryEngine;
}
- @Bean(name = "aerospikePersistenceEntityIndexCreator")
+ @Bean
public AerospikePersistenceEntityIndexCreator aerospikePersistenceEntityIndexCreator(
ObjectProvider aerospikeMappingContext,
AerospikeIndexResolver aerospikeIndexResolver,
diff --git a/src/main/java/org/springframework/data/aerospike/config/AbstractReactiveAerospikeDataConfiguration.java b/src/main/java/org/springframework/data/aerospike/config/AbstractReactiveAerospikeDataConfiguration.java
index 4da5b1eb..501512d3 100644
--- a/src/main/java/org/springframework/data/aerospike/config/AbstractReactiveAerospikeDataConfiguration.java
+++ b/src/main/java/org/springframework/data/aerospike/config/AbstractReactiveAerospikeDataConfiguration.java
@@ -105,8 +105,8 @@ protected ClientPolicy getClientPolicy() {
return clientPolicy;
}
- @Bean(name = "reactiveAerospikePersistenceEntityIndexCreator")
- public ReactiveAerospikePersistenceEntityIndexCreator reactiveAerospikePersistenceEntityIndexCreator(
+ @Bean
+ public ReactiveAerospikePersistenceEntityIndexCreator aerospikePersistenceEntityIndexCreator(
ObjectProvider aerospikeMappingContext,
AerospikeIndexResolver aerospikeIndexResolver,
ObjectProvider template, AerospikeSettings settings) {
diff --git a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java
index 7b1aea89..4961674c 100644
--- a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java
+++ b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java
@@ -76,9 +76,11 @@
import static org.springframework.data.aerospike.core.CoreUtils.getDistinctPredicate;
import static org.springframework.data.aerospike.core.CoreUtils.operations;
import static org.springframework.data.aerospike.core.CoreUtils.verifyUnsortedWithOffset;
+import static org.springframework.data.aerospike.core.TemplateUtils.checkForTransaction;
import static org.springframework.data.aerospike.core.TemplateUtils.excludeIdQualifier;
import static org.springframework.data.aerospike.core.TemplateUtils.getBinNamesFromTargetClass;
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
+import static org.springframework.data.aerospike.core.TemplateUtils.getPolicyFilterExpOrDefault;
import static org.springframework.data.aerospike.query.QualifierUtils.getIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;
@@ -145,17 +147,17 @@ public void save(T document, String setName) {
AerospikeWriteData data = writeData(document, setName);
AerospikePersistentEntity> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
if (entity.hasVersionProperty()) {
- WritePolicy policy = expectGenerationCasAwarePolicy(data);
+ WritePolicy writePolicy = expectGenerationCasAwarePolicy(data);
// mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions
- doPersistWithVersionAndHandleCasError(document, data, policy, true, SAVE_OPERATION);
+ doPersistWithVersionAndHandleCasError(document, data, writePolicy, true, SAVE_OPERATION);
} else {
- WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE);
+ WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE);
// mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions
Operation[] operations = operations(data.getBinsAsArray(), Operation::put,
Operation.array(Operation.delete()));
- doPersistAndHandleError(data, policy, operations);
+ doPersistAndHandleError(data, writePolicy, operations);
}
}
@@ -206,8 +208,8 @@ private void batchWriteAllDocuments(List documents, String setName, Opera
List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList();
try {
- // requires server ver. >= 6.0.0
- client.operate(null, batchWriteRecords);
+ BatchPolicy bPolicy = (BatchPolicy) checkForTransaction(client, client.getBatchPolicyDefault());
+ client.operate(bPolicy, batchWriteRecords);
} catch (AerospikeException e) {
throw translateError(e); // no exception is thrown for versions mismatch, only record's result code shows it
}
@@ -259,7 +261,7 @@ public void insert(T document, String setName) {
Assert.notNull(setName, "Set name must not be null!");
AerospikeWriteData data = writeData(document, setName);
- WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.CREATE_ONLY);
+ WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.CREATE_ONLY);
AerospikePersistentEntity> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
if (entity.hasVersionProperty()) {
// we are ignoring generation here as insert operation should fail with DuplicateKeyException if key
@@ -268,10 +270,10 @@ public void insert(T document, String setName) {
// in the original document
// also we do not want to handle aerospike error codes as cas aware error codes as we are ignoring
// generation
- doPersistWithVersionAndHandleError(document, data, policy);
+ doPersistWithVersionAndHandleError(document, data, writePolicy);
} else {
Operation[] operations = operations(data.getBinsAsArray(), Operation::put);
- doPersistAndHandleError(data, policy, operations);
+ doPersistAndHandleError(data, writePolicy, operations);
}
}
@@ -291,22 +293,22 @@ public void insertAll(Iterable extends T> documents, String setName) {
}
@Override
- public void persist(T document, WritePolicy policy) {
+ public void persist(T document, WritePolicy writePolicy) {
Assert.notNull(document, "Document must not be null!");
- Assert.notNull(policy, "Policy must not be null!");
- persist(document, policy, getSetName(document));
+ Assert.notNull(writePolicy, "Policy must not be null!");
+ persist(document, writePolicy, getSetName(document));
}
@Override
- public void persist(T document, WritePolicy policy, String setName) {
+ public void persist(T document, WritePolicy writePolicy, String setName) {
Assert.notNull(document, "Document must not be null!");
- Assert.notNull(policy, "Policy must not be null!");
+ Assert.notNull(writePolicy, "Policy must not be null!");
Assert.notNull(setName, "Set name must not be null!");
AerospikeWriteData data = writeData(document, setName);
Operation[] operations = operations(data.getBinsAsArray(), Operation::put);
- doPersistAndHandleError(data, policy, operations);
+ doPersistAndHandleError(data, writePolicy, operations);
}
@Override
@@ -323,17 +325,17 @@ public void update(T document, String setName) {
AerospikeWriteData data = writeData(document, setName);
AerospikePersistentEntity> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
if (entity.hasVersionProperty()) {
- WritePolicy policy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ WritePolicy writePolicy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
// mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions
- doPersistWithVersionAndHandleCasError(document, data, policy, true, UPDATE_OPERATION);
+ doPersistWithVersionAndHandleCasError(document, data, writePolicy, true, UPDATE_OPERATION);
} else {
- WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
// mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions
Operation[] operations = Stream.concat(Stream.of(Operation.delete()), data.getBins().stream()
.map(Operation::put)).toArray(Operation[]::new);
- doPersistAndHandleError(data, policy, operations);
+ doPersistAndHandleError(data, writePolicy, operations);
}
}
@@ -351,14 +353,14 @@ public void update(T document, String setName, Collection fields) {
AerospikeWriteData data = writeDataWithSpecificFields(document, setName, fields);
AerospikePersistentEntity> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
if (entity.hasVersionProperty()) {
- WritePolicy policy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ WritePolicy writePolicy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
- doPersistWithVersionAndHandleCasError(document, data, policy, false, UPDATE_OPERATION);
+ doPersistWithVersionAndHandleCasError(document, data, writePolicy, false, UPDATE_OPERATION);
} else {
- WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
Operation[] operations = operations(data.getBinsAsArray(), Operation::put);
- doPersistAndHandleError(data, policy, operations);
+ doPersistAndHandleError(data, writePolicy, operations);
}
}
@@ -398,7 +400,9 @@ public boolean delete(T document, String setName) {
private boolean doDeleteWithVersionAndHandleCasError(AerospikeWriteData data) {
try {
- return client.delete(expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY), data.getKey());
+ WritePolicy writePolicy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
+ return client.delete(writePolicy, data.getKey());
} catch (AerospikeException e) {
throw translateCasError(e, "Failed to delete record due to versions mismatch");
}
@@ -406,7 +410,9 @@ private boolean doDeleteWithVersionAndHandleCasError(AerospikeWriteData data) {
private boolean doDeleteIgnoreVersionAndTranslateError(AerospikeWriteData data) {
try {
- return client.delete(ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY), data.getKey());
+ WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
+ return client.delete(writePolicy, data.getKey());
} catch (AerospikeException e) {
throw translateError(e);
}
@@ -425,8 +431,8 @@ public boolean deleteById(Object id, String setName) {
try {
Key key = getKey(id, setName);
-
- return client.delete(ignoreGenerationPolicy(), key);
+ WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, ignoreGenerationPolicy());
+ return client.delete(writePolicy, key);
} catch (AerospikeException e) {
throw translateError(e);
}
@@ -587,8 +593,8 @@ public void deleteAll(String setName, Instant beforeLastUpdate) {
private void deleteAndHandleErrors(IAerospikeClient client, Key[] keys) {
BatchResults results;
try {
- // requires server ver. >= 6.0.0
- results = client.delete(null, null, keys);
+ BatchPolicy batchPolicy = (BatchPolicy) checkForTransaction(client, client.getBatchPolicyDefault());
+ results = client.delete(batchPolicy, null, keys);
} catch (AerospikeException e) {
throw translateError(e);
}
@@ -624,6 +630,7 @@ public T add(T document, String setName, Map values) {
WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(data.getExpiration())
.build();
+ writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);
@@ -650,6 +657,7 @@ public T add(T document, String setName, String binName, long value) {
WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(data.getExpiration())
.build();
+ writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.add(new Bin(binName, value)), Operation.get());
@@ -674,7 +682,8 @@ public T append(T document, String setName, Map values) {
try {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.APPEND, Operation.get());
- Record aeroRecord = client.operate(null, data.getKey(), ops);
+ WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
+ Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);
return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord);
} catch (AerospikeException e) {
@@ -695,7 +704,8 @@ public T append(T document, String setName, String binName, String value) {
try {
AerospikeWriteData data = writeData(document, setName);
- Record aeroRecord = client.operate(null, data.getKey(),
+ WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
+ Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.append(new Bin(binName, value)),
Operation.get(binName));
@@ -718,7 +728,8 @@ public T prepend(T document, String setName, String fieldName, String value)
try {
AerospikeWriteData data = writeData(document, setName);
- Record aeroRecord = client.operate(null, data.getKey(),
+ WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
+ Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.prepend(new Bin(fieldName, value)),
Operation.get(fieldName));
@@ -742,7 +753,8 @@ public T prepend(T document, String setName, Map values) {
try {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.PREPEND, Operation.get());
- Record aeroRecord = client.operate(null, data.getKey(), ops);
+ WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
+ Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);
return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord);
} catch (AerospikeException e) {
@@ -790,24 +802,24 @@ public S findById(Object id, Class entityClass, Class targetClass,
return (S) findByIdUsingQuery(id, entityClass, targetClass, setName, null);
}
- private Record getRecord(AerospikePersistentEntity> entity, Key key, Query query) {
+ private Record getRecord(AerospikePersistentEntity> entity, Key key, @Nullable Query query) {
Record aeroRecord;
if (entity.isTouchOnRead()) {
Assert.state(!entity.hasExpirationProperty(), "Touch on read is not supported for expiration property");
aeroRecord = getAndTouch(key, entity.getExpiration(), null, null);
} else {
- Policy policy = getPolicyFilterExp(query);
- aeroRecord = getAerospikeClient().get(policy, key);
+ Policy policy = checkForTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
+ aeroRecord = client.get(policy, key);
}
return aeroRecord;
}
private BatchPolicy getBatchPolicyFilterExp(Query query) {
if (queryCriteriaIsNotNull(query)) {
- BatchPolicy policy = new BatchPolicy(getAerospikeClient().getBatchPolicyDefault());
+ BatchPolicy batchPolicy = new BatchPolicy(getAerospikeClient().getBatchPolicyDefault());
Qualifier qualifier = query.getCriteriaObject();
- policy.filterExp = queryEngine.getFilterExpressionsBuilder().build(qualifier);
- return policy;
+ batchPolicy.filterExp = queryEngine.getFilterExpressionsBuilder().build(qualifier);
+ return batchPolicy;
}
return null;
}
@@ -819,15 +831,15 @@ private Key[] getKeys(Collection> ids, String setName) {
}
private Object getRecordMapToTargetClass(AerospikePersistentEntity> entity, Key key, Class targetClass,
- Query query) {
+ @Nullable Query query) {
Record aeroRecord;
String[] binNames = getBinNamesFromTargetClass(targetClass, mappingContext);
if (entity.isTouchOnRead()) {
Assert.state(!entity.hasExpirationProperty(), "Touch on read is not supported for expiration property");
aeroRecord = getAndTouch(key, entity.getExpiration(), binNames, query);
} else {
- Policy policy = getPolicyFilterExp(query);
- aeroRecord = getAerospikeClient().get(policy, key, binNames);
+ Policy policy = checkForTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
+ aeroRecord = client.get(policy, key, binNames);
}
return mapToEntity(key, targetClass, aeroRecord);
}
@@ -842,7 +854,7 @@ private Policy getPolicyFilterExp(Query query) {
return null;
}
- private Record getAndTouch(Key key, int expiration, String[] binNames, Query query) {
+ private Record getAndTouch(Key key, int expiration, String[] binNames, @Nullable Query query) {
WritePolicyBuilder writePolicyBuilder = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(expiration);
@@ -851,6 +863,7 @@ private Record getAndTouch(Key key, int expiration, String[] binNames, Query que
writePolicyBuilder.filterExp(queryEngine.getFilterExpressionsBuilder().build(qualifier));
}
WritePolicy writePolicy = writePolicyBuilder.build();
+ writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
try {
if (binNames == null || binNames.length == 0) {
@@ -931,7 +944,8 @@ public GroupedEntities findByIds(GroupedKeys groupedKeys) {
private GroupedEntities findGroupedEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
- Record[] aeroRecords = client.get(null, entitiesKeys.getKeys());
+ BatchPolicy bPolicy = (BatchPolicy) checkForTransaction(client, client.getBatchPolicyDefault());
+ Record[] aeroRecords = client.get(bPolicy, entitiesKeys.getKeys());
return toGroupedEntities(entitiesKeys, aeroRecords);
}
@@ -946,7 +960,7 @@ public Object findByIdUsingQuery(Object id, Class entityClass, Class Object findByIdUsingQuery(Object id, Class entityClass, Class targetClass, String setName,
- Query query) {
+ @Nullable Query query) {
Assert.notNull(id, "Id must not be null!");
Assert.notNull(entityClass, "Entity class must not be null!");
Assert.notNull(setName, "Set name must not be null!");
@@ -986,24 +1000,22 @@ public List> findByIdsUsingQuery(Collection> ids, Class entityClas
.map(id -> getKey(id, setName))
.toArray(Key[]::new);
- BatchPolicy policy = getBatchPolicyFilterExp(query);
-
+ BatchPolicy batchPolicy = (BatchPolicy) checkForTransaction(client, getBatchPolicyFilterExp(query));
Class> target;
Record[] aeroRecords;
if (targetClass != null && targetClass != entityClass) {
String[] binNames = getBinNamesFromTargetClass(targetClass, mappingContext);
- aeroRecords = getAerospikeClient().get(policy, keys, binNames);
+ aeroRecords = client.get(batchPolicy, keys, binNames);
target = targetClass;
} else {
- aeroRecords = getAerospikeClient().get(policy, keys);
+ aeroRecords = client.get(batchPolicy, keys);
target = entityClass;
}
- Stream> results = IntStream.range(0, keys.length)
+ return IntStream.range(0, keys.length)
.filter(index -> aeroRecords[index] != null)
- .mapToObj(index -> mapToEntity(keys[index], target, aeroRecords[index]));
-
- return applyPostProcessingOnResults(results, query).collect(Collectors.toList());
+ .mapToObj(index -> mapToEntity(keys[index], target, aeroRecords[index]))
+ .collect(Collectors.toList());
} catch (AerospikeException e) {
throw translateError(e);
}
@@ -1022,10 +1034,10 @@ public IntStream findByIdsUsingQueryWithoutMapping(Collection> ids, String set
.toArray(Key[]::new);
}
- BatchPolicy policy = getBatchPolicyFilterExp(query);
+ BatchPolicy batchPolicy = getBatchPolicyFilterExp(query);
Record[] aeroRecords;
- aeroRecords = getAerospikeClient().get(policy, keys);
+ aeroRecords = getAerospikeClient().get(batchPolicy, keys);
return IntStream.range(0, keys.length)
.filter(index -> aeroRecords[index] != null);
@@ -1158,7 +1170,8 @@ public boolean exists(Object id, String setName) {
try {
Key key = getKey(id, setName);
- Record aeroRecord = client.operate(null, key, Operation.getHeader());
+ WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
+ Record aeroRecord = client.operate(writePolicy, key, Operation.getHeader());
return aeroRecord != null;
} catch (AerospikeException e) {
throw translateError(e);
@@ -1411,38 +1424,40 @@ public boolean indexExists(String indexName) {
return false;
}
- private Record doPersistAndHandleError(AerospikeWriteData data, WritePolicy policy, Operation[] operations) {
+ private Record doPersistAndHandleError(AerospikeWriteData data, WritePolicy writePolicy, Operation[] operations) {
try {
- return client.operate(policy, data.getKey(), operations);
+ writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
+ return client.operate(writePolicy, data.getKey(), operations);
} catch (AerospikeException e) {
throw translateError(e);
}
}
- private void doPersistWithVersionAndHandleCasError(T document, AerospikeWriteData data, WritePolicy policy,
+ private void doPersistWithVersionAndHandleCasError(T document, AerospikeWriteData data, WritePolicy writePolicy,
boolean firstlyDeleteBins, OperationType operationType) {
try {
- Record newAeroRecord = putAndGetHeader(data, policy, firstlyDeleteBins);
+ Record newAeroRecord = putAndGetHeader(data, writePolicy, firstlyDeleteBins);
updateVersion(document, newAeroRecord);
} catch (AerospikeException e) {
throw translateCasError(e, "Failed to " + operationType.toString() + " record due to versions mismatch");
}
}
- private void doPersistWithVersionAndHandleError(T document, AerospikeWriteData data, WritePolicy policy) {
+ private void doPersistWithVersionAndHandleError(T document, AerospikeWriteData data, WritePolicy writePolicy) {
try {
- Record newAeroRecord = putAndGetHeader(data, policy, false);
+ Record newAeroRecord = putAndGetHeader(data, writePolicy, false);
updateVersion(document, newAeroRecord);
} catch (AerospikeException e) {
throw translateError(e);
}
}
- private Record putAndGetHeader(AerospikeWriteData data, WritePolicy policy, boolean firstlyDeleteBins) {
+ private Record putAndGetHeader(AerospikeWriteData data, WritePolicy writePolicy, boolean firstlyDeleteBins) {
Key key = data.getKey();
Operation[] operations = getPutAndGetHeaderOperations(data, firstlyDeleteBins);
+ writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
- return client.operate(policy, key, operations);
+ return client.operate(writePolicy, key, operations);
}
@SuppressWarnings("SameParameterValue")
@@ -1525,14 +1540,13 @@ private List findByIdsWithoutMapping(Collection> ids, String setNam
try {
Key[] keys = getKeys(ids, setName);
- BatchPolicy policy = getBatchPolicyFilterExp(query);
-
+ BatchPolicy bPolicy = (BatchPolicy) checkForTransaction(client, getBatchPolicyFilterExp(query));
Record[] aeroRecords;
if (targetClass != null) {
String[] binNames = getBinNamesFromTargetClass(targetClass, mappingContext);
- aeroRecords = getAerospikeClient().get(policy, keys, binNames);
+ aeroRecords = client.get(bPolicy, keys, binNames);
} else {
- aeroRecords = getAerospikeClient().get(policy, keys);
+ aeroRecords = client.get(bPolicy, keys);
}
return IntStream.range(0, keys.length)
diff --git a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java
index bf29f13f..8d68ecc0 100644
--- a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java
+++ b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java
@@ -82,6 +82,7 @@
import static org.springframework.data.aerospike.core.BaseAerospikeTemplate.OperationType.UPDATE_OPERATION;
import static org.springframework.data.aerospike.core.CoreUtils.getDistinctPredicate;
import static org.springframework.data.aerospike.core.CoreUtils.operations;
+import static org.springframework.data.aerospike.core.TemplateUtils.enrichPolicyWithTransaction;
import static org.springframework.data.aerospike.core.TemplateUtils.excludeIdQualifier;
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
import static org.springframework.data.aerospike.query.QualifierUtils.getIdQualifier;
@@ -136,19 +137,19 @@ public Mono save(T document, String setName) {
AerospikeWriteData data = writeData(document, setName);
AerospikePersistentEntity> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
if (entity.hasVersionProperty()) {
- WritePolicy policy = expectGenerationCasAwarePolicy(data);
+ WritePolicy writePolicy = expectGenerationCasAwarePolicy(data);
// mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions
Operation[] operations = operations(data.getBinsAsArray(), Operation::put,
Operation.array(Operation.delete()));
- return doPersistWithVersionAndHandleCasError(document, data, policy, operations, SAVE_OPERATION);
+ return doPersistWithVersionAndHandleCasError(document, data, writePolicy, operations, SAVE_OPERATION);
} else {
- WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE);
+ WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE);
// mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions
Operation[] operations = operations(data.getBinsAsArray(), Operation::put,
Operation.array(Operation.delete()));
- return doPersistAndHandleError(document, data, policy, operations);
+ return doPersistAndHandleError(document, data, writePolicy, operations);
}
}
@@ -203,14 +204,17 @@ private Flux batchWriteAllDocuments(List documents, String setName, Op
List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList();
- return batchWriteAndCheckForErrors(batchWriteRecords, batchWriteDataList, operationType);
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault())
+ .flatMapMany(batchPolicyEnriched ->
+ batchWriteAndCheckForErrors((BatchPolicy) batchPolicyEnriched, batchWriteRecords, batchWriteDataList,
+ operationType));
}
- private Flux batchWriteAndCheckForErrors(List batchWriteRecords,
+ private Flux batchWriteAndCheckForErrors(BatchPolicy batchPolicy, List batchWriteRecords,
List> batchWriteDataList,
OperationType operationType) {
- // requires server ver. >= 6.0.0
- return reactorClient.operate(null, batchWriteRecords)
+ return reactorClient
+ .operate(batchPolicy, batchWriteRecords)
.onErrorMap(this::translateError)
.flatMap(ignore -> checkForErrorsAndUpdateVersion(batchWriteDataList, batchWriteRecords, operationType))
.flux()
@@ -263,7 +267,7 @@ public Mono insert(T document, String setName) {
Assert.notNull(setName, "Set name must not be null!");
AerospikeWriteData data = writeData(document, setName);
- WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.CREATE_ONLY);
+ WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.CREATE_ONLY);
AerospikePersistentEntity> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
if (entity.hasVersionProperty()) {
@@ -275,10 +279,10 @@ public Mono insert(T document, String setName) {
// generation
Operation[] operations = operations(data.getBinsAsArray(), Operation::put, null,
Operation.array(Operation.getHeader()));
- return doPersistWithVersionAndHandleError(document, data, policy, operations);
+ return doPersistWithVersionAndHandleError(document, data, writePolicy, operations);
} else {
Operation[] operations = operations(data.getBinsAsArray(), Operation::put);
- return doPersistAndHandleError(document, data, policy, operations);
+ return doPersistAndHandleError(document, data, writePolicy, operations);
}
}
@@ -298,22 +302,24 @@ public Flux insertAll(Iterable extends T> documents, String setName) {
}
@Override
- public Mono persist(T document, WritePolicy policy) {
+ public Mono persist(T document, WritePolicy writePolicy) {
Assert.notNull(document, "Document must not be null!");
- Assert.notNull(policy, "Policy must not be null!");
- return persist(document, policy, getSetName(document));
+ Assert.notNull(writePolicy, "Policy must not be null!");
+ return persist(document, writePolicy, getSetName(document));
}
@Override
- public Mono persist(T document, WritePolicy policy, String setName) {
+ public Mono persist(T document, WritePolicy writePolicy, String setName) {
Assert.notNull(document, "Document must not be null!");
- Assert.notNull(policy, "Policy must not be null!");
+ Assert.notNull(writePolicy, "Policy must not be null!");
Assert.notNull(setName, "Set name must not be null!");
AerospikeWriteData data = writeData(document, setName);
Operation[] operations = operations(data.getBinsAsArray(), Operation::put);
- return doPersistAndHandleError(document, data, policy, operations);
+ return enrichPolicyWithTransaction(reactorClient, writePolicy)
+ .flatMap(writePolicyEnriched ->
+ doPersistAndHandleError(document, data, (WritePolicy) writePolicyEnriched, operations));
}
@Override
@@ -329,19 +335,19 @@ public Mono update(T document, String setName) {
AerospikeWriteData data = writeData(document, setName);
AerospikePersistentEntity> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
if (entity.hasVersionProperty()) {
- WritePolicy policy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ WritePolicy writePolicy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
// mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions
Operation[] operations = operations(data.getBinsAsArray(), Operation::put,
Operation.array(Operation.delete()), Operation.array(Operation.getHeader()));
- return doPersistWithVersionAndHandleCasError(document, data, policy, operations, UPDATE_OPERATION);
+ return doPersistWithVersionAndHandleCasError(document, data, writePolicy, operations, UPDATE_OPERATION);
} else {
- WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
// mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions
Operation[] operations = operations(data.getBinsAsArray(), Operation::put,
Operation.array(Operation.delete()));
- return doPersistAndHandleError(document, data, policy, operations);
+ return doPersistAndHandleError(document, data, writePolicy, operations);
}
}
@@ -359,16 +365,16 @@ public Mono update(T document, String setName, Collection fields)
AerospikeWriteData data = writeDataWithSpecificFields(document, setName, fields);
AerospikePersistentEntity> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
if (entity.hasVersionProperty()) {
- WritePolicy policy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ WritePolicy writePolicy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
Operation[] operations = operations(data.getBinsAsArray(), Operation::put, null,
Operation.array(Operation.getHeader()));
- return doPersistWithVersionAndHandleCasError(document, data, policy, operations, UPDATE_OPERATION);
+ return doPersistWithVersionAndHandleCasError(document, data, writePolicy, operations, UPDATE_OPERATION);
} else {
- WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
+ WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
Operation[] operations = operations(data.getBinsAsArray(), Operation::put);
- return doPersistAndHandleError(document, data, policy, operations);
+ return doPersistAndHandleError(document, data, writePolicy, operations);
}
}
@@ -400,13 +406,13 @@ public Mono delete(T document, String setName) {
AerospikeWriteData data = writeData(document, setName);
AerospikePersistentEntity> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
if (entity.hasVersionProperty()) {
- return reactorClient
- .delete(expectGenerationPolicy(data), data.getKey())
+ return enrichPolicyWithTransaction(reactorClient, expectGenerationPolicy(data))
+ .flatMap(writePolicyEnriched -> reactorClient.delete((WritePolicy) writePolicyEnriched, data.getKey()))
.hasElement()
.onErrorMap(e -> translateCasThrowable(e, DELETE_OPERATION.toString()));
}
- return reactorClient
- .delete(ignoreGenerationPolicy(), data.getKey())
+ return enrichPolicyWithTransaction(reactorClient, ignoreGenerationPolicy())
+ .flatMap(writePolicyEnriched -> reactorClient.delete((WritePolicy) writePolicyEnriched, data.getKey()))
.hasElement()
.onErrorMap(this::translateError);
}
@@ -422,7 +428,7 @@ public Mono delete(Query query, Class entityClass, String setName)
return findQueryResults.flatMap(list -> {
if (!list.isEmpty()) {
- return deleteAll(list);
+ return deleteAll(list);
}
return Mono.empty();
}
@@ -471,8 +477,9 @@ public Mono deleteById(Object id, String setName) {
Assert.notNull(id, "Id must not be null!");
Assert.notNull(setName, "Set name must not be null!");
- return reactorClient
- .delete(ignoreGenerationPolicy(), getKey(id, setName))
+ return enrichPolicyWithTransaction(reactorClient, ignoreGenerationPolicy())
+ .flatMap(writePolicyEnriched ->
+ reactorClient.delete((WritePolicy) writePolicyEnriched, getKey(id, setName)))
.map(k -> true)
.onErrorMap(this::translateError);
}
@@ -548,8 +555,8 @@ private Mono batchDeleteAndCheckForErrors(IAerospikeReactorClient reactorC
return Mono.empty();
};
- // requires server ver. >= 6.0.0
- return reactorClient.delete(null, null, keys)
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault())
+ .flatMap(batchPolicy -> reactorClient.delete((BatchPolicy) batchPolicy, null, keys))
.onErrorMap(this::translateError)
.flatMap(checkForErrors);
}
@@ -568,7 +575,8 @@ public Mono deleteByIds(GroupedKeys groupedKeys) {
private Mono deleteEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
- reactorClient.delete(null, null, entitiesKeys.getKeys())
+ enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault())
+ .flatMap(batchPolicy -> reactorClient.delete((BatchPolicy) batchPolicy, null, entitiesKeys.getKeys()))
.doOnError(this::translateError);
return batchDeleteAndCheckForErrors(reactorClient, entitiesKeys.getKeys());
@@ -633,7 +641,9 @@ public Mono add(T document, String setName, Map values) {
.expiration(data.getExpiration())
.build();
- return executeOperationsOnValue(document, data, operations, writePolicy);
+ return enrichPolicyWithTransaction(reactorClient, writePolicy)
+ .flatMap(writePolicyEnriched ->
+ executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations));
}
@Override
@@ -654,7 +664,9 @@ public Mono add(T document, String setName, String binName, long value) {
.build();
Operation[] operations = {Operation.add(new Bin(binName, value)), Operation.get(binName)};
- return executeOperationsOnValue(document, data, operations, writePolicy);
+ return enrichPolicyWithTransaction(reactorClient, writePolicy)
+ .flatMap(writePolicyEnriched ->
+ executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations));
}
@Override
@@ -670,7 +682,9 @@ public Mono append(T document, String setName, Map values
AerospikeWriteData data = writeData(document, setName);
Operation[] operations = operations(values, Operation.Type.APPEND, Operation.get());
- return executeOperationsOnValue(document, data, operations, null);
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault())
+ .flatMap(writePolicyEnriched ->
+ executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations));
}
@Override
@@ -685,7 +699,9 @@ public Mono append(T document, String setName, String binName, String val
AerospikeWriteData data = writeData(document, setName);
Operation[] operations = {Operation.append(new Bin(binName, value)), Operation.get(binName)};
- return executeOperationsOnValue(document, data, operations, null);
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault())
+ .flatMap(writePolicyEnriched ->
+ executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations));
}
@Override
@@ -701,7 +717,9 @@ public Mono prepend(T document, String setName, Map value
AerospikeWriteData data = writeData(document, setName);
Operation[] operations = operations(values, Operation.Type.PREPEND, Operation.get());
- return executeOperationsOnValue(document, data, operations, null);
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault())
+ .flatMap(writePolicyEnriched -> executeOperationsOnValue(document, data,
+ (WritePolicy) writePolicyEnriched, operations));
}
@Override
@@ -716,11 +734,13 @@ public Mono prepend(T document, String setName, String binName, String va
AerospikeWriteData data = writeData(document, setName);
Operation[] operations = {Operation.prepend(new Bin(binName, value)), Operation.get(binName)};
- return executeOperationsOnValue(document, data, operations, null);
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault())
+ .flatMap(writePolicyEnriched ->
+ executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations));
}
- private Mono executeOperationsOnValue(T document, AerospikeWriteData data, Operation[] operations,
- WritePolicy writePolicy) {
+ private Mono executeOperationsOnValue(T document, AerospikeWriteData data, WritePolicy writePolicy,
+ Operation[] operations) {
return reactorClient.operate(writePolicy, data.getKey(), operations)
.filter(keyRecord -> Objects.nonNull(keyRecord.record))
.map(keyRecord -> mapToEntity(keyRecord.key, getEntityClass(document), keyRecord.record))
@@ -758,7 +778,8 @@ public Mono findById(Object id, Class entityClass, String setName) {
)
.onErrorMap(this::translateError);
} else {
- return reactorClient.get(key)
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getReadPolicyDefault())
+ .flatMap(policy -> reactorClient.get(policy, key))
.filter(keyRecord -> Objects.nonNull(keyRecord.record))
.map(keyRecord -> mapToEntity(keyRecord.key, entityClass, keyRecord.record))
.onErrorMap(this::translateError);
@@ -789,7 +810,8 @@ public Mono findById(Object id, Class entityClass, Class targetC
)
.onErrorMap(this::translateError);
} else {
- return reactorClient.get(null, key, binNames)
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getReadPolicyDefault())
+ .flatMap(policy -> reactorClient.get(policy, key, binNames))
.filter(keyRecord -> Objects.nonNull(keyRecord.record))
.map(keyRecord -> mapToEntity(keyRecord.key, targetClass, keyRecord.record))
.onErrorMap(this::translateError);
@@ -837,7 +859,8 @@ private Flux findByIds(Collection> ids, Class targetClass, String se
.map(id -> getKey(id, setName))
.toArray(Key[]::new);
- return reactorClient.get(null, keys)
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault())
+ .flatMap(batchPolicy -> reactorClient.get((BatchPolicy) batchPolicy, keys))
.flatMap(kr -> Mono.just(kr.asMap()))
.flatMapMany(keyRecordMap -> {
List entities = keyRecordMap.entrySet().stream()
@@ -856,13 +879,14 @@ public Mono findByIds(GroupedKeys groupedKeys) {
return Mono.just(GroupedEntities.builder().build());
}
- return findGroupedEntitiesByGroupedKeys(groupedKeys);
+ return findGroupedEntitiesByGroupedKeys(reactorClient.getBatchPolicyDefault(), groupedKeys);
}
- private Mono findGroupedEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
+ private Mono findGroupedEntitiesByGroupedKeys(BatchPolicy batchPolicy, GroupedKeys groupedKeys) {
EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
- return reactorClient.get(null, entitiesKeys.getKeys())
+ return enrichPolicyWithTransaction(reactorClient, batchPolicy)
+ .flatMap(bPolicy -> reactorClient.get((BatchPolicy) bPolicy, entitiesKeys.getKeys()))
.map(item -> toGroupedEntities(entitiesKeys, item.records))
.onErrorMap(this::translateError);
}
@@ -906,7 +930,8 @@ public Mono> findByIdUsingQuery(Object id, Class entityClass, Class<
Qualifier qualifier = query.getCriteriaObject();
policy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier);
}
- return reactorClient.get(policy, key, binNames)
+ return enrichPolicyWithTransaction(reactorClient, policy)
+ .flatMap(rPolicy -> reactorClient.get(rPolicy, key, binNames))
.filter(keyRecord -> Objects.nonNull(keyRecord.record))
.map(keyRecord -> mapToEntity(keyRecord.key, target, keyRecord.record))
.onErrorMap(this::translateError);
@@ -930,7 +955,7 @@ public Flux> findByIdsUsingQuery(Collection> ids, Class entityClas
return Flux.empty();
}
- BatchPolicy policy = getBatchPolicyFilterExp(query);
+ BatchPolicy batchPolicy = getBatchPolicyFilterExp(query);
Class> target;
if (targetClass != null && targetClass != entityClass) {
@@ -941,7 +966,7 @@ public Flux> findByIdsUsingQuery(Collection> ids, Class entityClas
Flux> results = Flux.fromIterable(ids)
.map(id -> getKey(id, setName))
- .flatMap(key -> getFromClient(policy, key, targetClass))
+ .flatMap(key -> getFromClient(batchPolicy, key, targetClass))
.filter(keyRecord -> nonNull(keyRecord.record))
.map(keyRecord -> mapToEntity(keyRecord.key, target, keyRecord.record));
@@ -956,11 +981,11 @@ private Flux> findByIdsUsingQueryWithoutMapping(Collection> ids, String setN
return Flux.empty();
}
- BatchPolicy policy = getBatchPolicyFilterExp(query);
+ BatchPolicy batchPolicy = getBatchPolicyFilterExp(query);
return Flux.fromIterable(ids)
.map(id -> getKey(id, setName))
- .flatMap(key -> getFromClient(policy, key, null))
+ .flatMap(key -> getFromClient(batchPolicy, key, null))
.filter(keyRecord -> nonNull(keyRecord.record));
}
@@ -1056,20 +1081,22 @@ public Flux findInRange(long offset, long limit, Sort sort, Class targ
private BatchPolicy getBatchPolicyFilterExp(Query query) {
if (queryCriteriaIsNotNull(query)) {
- BatchPolicy policy = new BatchPolicy(reactorClient.getAerospikeClient().getBatchPolicyDefault());
+ BatchPolicy batchPolicy = new BatchPolicy(reactorClient.getAerospikeClient().getBatchPolicyDefault());
Qualifier qualifier = query.getCriteriaObject();
- policy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier);
- return policy;
+ batchPolicy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier);
+ return batchPolicy;
}
return null;
}
- private Mono getFromClient(BatchPolicy finalPolicy, Key key, Class> targetClass) {
+ private Mono getFromClient(BatchPolicy batchPolicy, Key key, Class> targetClass) {
if (targetClass != null) {
String[] binNames = getBinNamesFromTargetClass(targetClass);
- return reactorClient.get(finalPolicy, key, binNames);
+ return enrichPolicyWithTransaction(reactorClient, batchPolicy)
+ .flatMap(rPolicy -> reactorClient.get(rPolicy, key, binNames));
} else {
- return reactorClient.get(finalPolicy, key);
+ return enrichPolicyWithTransaction(reactorClient, batchPolicy)
+ .flatMap(rPolicy -> reactorClient.get(rPolicy, key));
}
}
@@ -1086,7 +1113,8 @@ public Mono exists(Object id, String setName) {
Assert.notNull(setName, "Set name must not be null!");
Key key = getKey(id, setName);
- return reactorClient.exists(key)
+ return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().getReadPolicyDefault())
+ .flatMap(policy -> reactorClient.exists(policy, key))
.map(Objects::nonNull)
.defaultIfEmpty(false)
.onErrorMap(this::translateError);
@@ -1306,31 +1334,35 @@ public long getQueryMaxRecords() {
return reactorQueryEngine.getQueryMaxRecords();
}
- private Mono doPersistAndHandleError(T document, AerospikeWriteData data, WritePolicy policy,
+ private Mono doPersistAndHandleError(T document, AerospikeWriteData data, WritePolicy writePolicy,
Operation[] operations) {
- return reactorClient
- .operate(policy, data.getKey(), operations)
+ return enrichPolicyWithTransaction(reactorClient, writePolicy)
+ .flatMap(writePolicyEnriched ->
+ reactorClient.operate((WritePolicy) writePolicyEnriched, data.getKey(), operations))
.map(docKey -> document)
.onErrorMap(this::translateError);
}
- private Mono doPersistWithVersionAndHandleCasError(T document, AerospikeWriteData data, WritePolicy policy,
- Operation[] operations, OperationType operationType) {
- return putAndGetHeader(data, policy, operations)
+ private Mono doPersistWithVersionAndHandleCasError(T document, AerospikeWriteData data,
+ WritePolicy writePolicy, Operation[] operations,
+ OperationType operationType) {
+ return enrichPolicyWithTransaction(reactorClient, writePolicy)
+ .flatMap(writePolicyEnriched -> putAndGetHeader(data, (WritePolicy) writePolicyEnriched, operations))
.map(newRecord -> updateVersion(document, newRecord))
.onErrorMap(AerospikeException.class, i -> translateCasError(i,
"Failed to " + operationType.toString() + " record due to versions mismatch"));
}
- private Mono doPersistWithVersionAndHandleError(T document, AerospikeWriteData data, WritePolicy policy,
+ private Mono doPersistWithVersionAndHandleError(T document, AerospikeWriteData data, WritePolicy writePolicy,
Operation[] operations) {
- return putAndGetHeader(data, policy, operations)
+ return enrichPolicyWithTransaction(reactorClient, writePolicy)
+ .flatMap(writePolicyEnriched -> putAndGetHeader(data, (WritePolicy) writePolicyEnriched, operations))
.map(newRecord -> updateVersion(document, newRecord))
.onErrorMap(AerospikeException.class, this::translateError);
}
- private Mono putAndGetHeader(AerospikeWriteData data, WritePolicy policy, Operation[] operations) {
- return reactorClient.operate(policy, data.getKey(), operations)
+ private Mono putAndGetHeader(AerospikeWriteData data, WritePolicy writePolicy, Operation[] operations) {
+ return reactorClient.operate(writePolicy, data.getKey(), operations)
.map(keyRecord -> keyRecord.record);
}
@@ -1345,7 +1377,9 @@ private Mono getAndTouch(Key key, int expiration, String[] binNames,
WritePolicy writePolicy = writePolicyBuilder.build();
if (binNames == null || binNames.length == 0) {
- return reactorClient.operate(writePolicy, key, Operation.touch(), Operation.get());
+ return enrichPolicyWithTransaction(reactorClient, writePolicy)
+ .flatMap(writePolicyEnriched ->
+ reactorClient.operate((WritePolicy) writePolicyEnriched, key, Operation.touch(), Operation.get()));
}
Operation[] operations = new Operation[binNames.length + 1];
operations[0] = Operation.touch();
@@ -1353,7 +1387,8 @@ private Mono getAndTouch(Key key, int expiration, String[] binNames,
for (int i = 1; i < operations.length; i++) {
operations[i] = Operation.get(binNames[i - 1]);
}
- return reactorClient.operate(writePolicy, key, operations);
+ return enrichPolicyWithTransaction(reactorClient, writePolicy)
+ .flatMap(writePolicyEnriched -> reactorClient.operate((WritePolicy) writePolicyEnriched, key, operations));
}
private String[] getBinNamesFromTargetClass(Class> targetClass) {
@@ -1419,18 +1454,16 @@ private void verifyUnsortedWithOffset(Sort sort, long offset) {
}
private Flux applyPostProcessingOnResults(Flux results, Query query) {
- if (query != null) {
- if (query.getSort() != null && query.getSort().isSorted()) {
- Comparator comparator = getComparator(query);
- results = results.sort(comparator);
- }
+ if (query.getSort() != null && query.getSort().isSorted()) {
+ Comparator comparator = getComparator(query);
+ results = results.sort(comparator);
+ }
- if (query.hasOffset()) {
- results = results.skip(query.getOffset());
- }
- if (query.hasRows()) {
- results = results.take(query.getRows());
- }
+ if (query.hasOffset()) {
+ results = results.skip(query.getOffset());
+ }
+ if (query.hasRows()) {
+ results = results.take(query.getRows());
}
return results;
}
@@ -1490,11 +1523,11 @@ private Flux findByIdsWithoutMapping(Collection> ids, String se
return Flux.empty();
}
- BatchPolicy policy = getBatchPolicyFilterExp(query);
+ BatchPolicy batchPolicy = getBatchPolicyFilterExp(query);
return Flux.fromIterable(ids)
.map(id -> getKey(id, setName))
- .flatMap(key -> getFromClient(policy, key, targetClass))
+ .flatMap(key -> getFromClient(batchPolicy, key, targetClass))
.filter(keyRecord -> nonNull(keyRecord.record));
}
}
diff --git a/src/main/java/org/springframework/data/aerospike/core/TemplateUtils.java b/src/main/java/org/springframework/data/aerospike/core/TemplateUtils.java
index 9f2d9f0a..da734c69 100644
--- a/src/main/java/org/springframework/data/aerospike/core/TemplateUtils.java
+++ b/src/main/java/org/springframework/data/aerospike/core/TemplateUtils.java
@@ -1,14 +1,25 @@
package org.springframework.data.aerospike.core;
+import com.aerospike.client.IAerospikeClient;
+import com.aerospike.client.policy.Policy;
+import com.aerospike.client.reactor.IAerospikeReactorClient;
import lombok.experimental.UtilityClass;
import org.springframework.data.aerospike.mapping.AerospikePersistentEntity;
import org.springframework.data.aerospike.mapping.AerospikePersistentProperty;
import org.springframework.data.aerospike.mapping.BasicAerospikePersistentEntity;
import org.springframework.data.aerospike.query.FilterOperation;
+import org.springframework.data.aerospike.query.QueryEngine;
import org.springframework.data.aerospike.query.qualifier.Qualifier;
+import org.springframework.data.aerospike.repository.query.Query;
+import org.springframework.data.aerospike.transaction.reactive.AerospikeReactiveTransactionResourceHolder;
+import org.springframework.data.aerospike.transaction.sync.AerospikeTransactionResourceHolder;
import org.springframework.data.mapping.PropertyHandler;
import org.springframework.data.mapping.context.MappingContext;
+import org.springframework.transaction.NoTransactionException;
+import org.springframework.transaction.reactive.TransactionContextManager;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
+import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Arrays;
@@ -17,14 +28,13 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;
import static org.springframework.data.aerospike.query.qualifier.Qualifier.and;
import static org.springframework.data.aerospike.query.qualifier.Qualifier.or;
@UtilityClass
public class TemplateUtils {
- final String SERVER_VERSION_6 = "6.0.0";
-
public static List