From 59ddb5609a9a31fb4753c9919ef66803d1de816c Mon Sep 17 00:00:00 2001 From: Andrey G Date: Tue, 26 Dec 2023 17:23:03 +0200 Subject: [PATCH] FMWK-285 Compare versions when performing delete operations (#681) * align save, insert, update and delete methods that receive documents to throw CAS exception * add deleteAll(Iterable) * GENERATION_ERROR, KEY_EXISTS_ERROR and KEY_NOT_FOUND_ERROR now cause OptimisticLockingFailureException when a versioned document is written --------- Co-authored-by: yrizhkov --- .../aerospike/core/AerospikeOperations.java | 188 ++++++++++++++---- .../aerospike/core/AerospikeTemplate.java | 127 ++++++++---- .../aerospike/core/BaseAerospikeTemplate.java | 80 ++++++-- .../core/ReactiveAerospikeOperations.java | 178 ++++++++++++++--- .../core/ReactiveAerospikeTemplate.java | 90 +++++++-- .../support/SimpleAerospikeRepository.java | 8 +- .../SimpleReactiveAerospikeRepository.java | 10 +- .../data/aerospike/utility/Utils.java | 2 +- .../core/AerospikeTemplateDeleteTests.java | 154 +++++++++++--- .../core/AerospikeTemplateInsertTests.java | 13 +- .../core/AerospikeTemplateSaveTests.java | 20 +- .../core/AerospikeTemplateUpdateTests.java | 14 +- ...veAerospikeTemplateDeleteRelatedTests.java | 125 +++++++++--- ...tiveAerospikeTemplateSaveRelatedTests.java | 13 +- .../SimpleAerospikeRepositoryTest.java | 17 +- ...SimpleReactiveAerospikeRepositoryTest.java | 13 +- .../data/aerospike/utility/IndexUtils.java | 6 +- 17 files changed, 784 insertions(+), 274 deletions(-) diff --git a/src/main/java/org/springframework/data/aerospike/core/AerospikeOperations.java b/src/main/java/org/springframework/data/aerospike/core/AerospikeOperations.java index 06a45c5ba..39131571d 100644 --- a/src/main/java/org/springframework/data/aerospike/core/AerospikeOperations.java +++ b/src/main/java/org/springframework/data/aerospike/core/AerospikeOperations.java @@ -24,6 +24,8 @@ import com.aerospike.client.query.IndexCollectionType; import com.aerospike.client.query.IndexType; import com.aerospike.client.query.ResultSet; +import org.springframework.dao.DataAccessException; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.aerospike.config.AerospikeDataSettings; import org.springframework.data.aerospike.convert.MappingAerospikeConverter; import org.springframework.data.aerospike.core.model.GroupedEntities; @@ -102,6 +104,10 @@ public interface AerospikeOperations { * does not exist it will be created, otherwise updated (an "upsert"). * * @param document The document to be saved. Must not be {@literal null}. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ void save(T document); @@ -123,6 +129,10 @@ public interface AerospikeOperations { * * @param document The document to be saved. Must not be {@literal null}. * @param setName Set name to override the set associated with the document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ void save(T document, String setName); @@ -134,10 +144,11 @@ public interface AerospikeOperations { * This operation requires Server version 6.0+. * * @param documents The documents to be saved. Must not be {@literal null}. - * @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null - * records - * @throws org.springframework.dao.DataAccessException if batch operation failed (see - * {@link DefaultAerospikeExceptionTranslator} for details) + * @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void saveAll(Iterable documents); @@ -151,10 +162,11 @@ public interface AerospikeOperations { * * @param documents The documents to be saved. Must not be {@literal null}. * @param setName Set name to override the default set associated with the documents. - * @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null - * records - * @throws org.springframework.dao.DataAccessException if batch operation failed (see - * {@link DefaultAerospikeExceptionTranslator} for details) + * @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void saveAll(Iterable documents, String setName); @@ -164,6 +176,10 @@ public interface AerospikeOperations { * If the document has version property it will be updated with the server's version after successful operation. * * @param document The document to be inserted. Must not be {@literal null}. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void insert(T document); @@ -175,6 +191,10 @@ public interface AerospikeOperations { * * @param document The document to be inserted. Must not be {@literal null}. * @param setName Set name to override the set associated with the document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void insert(T document, String setName); @@ -186,10 +206,12 @@ public interface AerospikeOperations { * This operation requires Server version 6.0+. * * @param documents Documents to be inserted. Must not be {@literal null}. - * @throws AerospikeException.BatchRecordArray if batch insert succeeds, but results contain errors or null - * records - * @throws org.springframework.dao.DataAccessException if batch operation failed (see - * {@link DefaultAerospikeExceptionTranslator} for details) + * @throws AerospikeException.BatchRecordArray if batch insert succeeds, but results contain errors or null + * records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void insertAll(Iterable documents); @@ -203,10 +225,12 @@ public interface AerospikeOperations { * * @param documents Documents to be inserted. Must not be {@literal null}. * @param setName Set name to override the set associated with the document. - * @throws AerospikeException.BatchRecordArray if batch insert succeeds, but results contain errors or null - * records - * @throws org.springframework.dao.DataAccessException if batch operation failed (see - * {@link DefaultAerospikeExceptionTranslator} for details) + * @throws AerospikeException.BatchRecordArray if batch insert succeeds, but results contain errors or null + * records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void insertAll(Iterable documents, String setName); @@ -216,6 +240,7 @@ public interface AerospikeOperations { * @param document The document to be persisted. Must not be {@literal null}. * @param writePolicy The Aerospike write policy for the inner Aerospike put operation. Must not be * {@literal null}. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ void persist(T document, WritePolicy writePolicy); @@ -227,6 +252,7 @@ public interface AerospikeOperations { * @param writePolicy The Aerospike write policy for the inner Aerospike put operation. Must not be * {@literal null}. * @param setName Set name to override the set associated with the document. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ void persist(T document, WritePolicy writePolicy, String setName); @@ -238,6 +264,10 @@ public interface AerospikeOperations { * If document has version property it will be updated with the server's version after successful operation. * * @param document The document that identifies the record to be updated. Must not be {@literal null}. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ void update(T document); @@ -251,6 +281,10 @@ public interface AerospikeOperations { * * @param document The document that identifies the record to be updated. Must not be {@literal null}. * @param setName Set name to override the set associated with the document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ void update(T document, String setName); @@ -264,6 +298,10 @@ public interface AerospikeOperations { * * @param document The document that identifies the record to be updated. Must not be {@literal null}. * @param fields Specific fields to update. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ void update(T document, Collection fields); @@ -278,6 +316,10 @@ public interface AerospikeOperations { * @param document The document that identifies the record to be updated. Must not be {@literal null}. * @param setName Set name to override the set associated with the document. * @param fields Specific fields to update. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ void update(T document, String setName, Collection fields); @@ -289,10 +331,12 @@ public interface AerospikeOperations { * This operation requires Server version 6.0+. * * @param documents The documents that identify the records to be updated. Must not be {@literal null}. - * @throws AerospikeException.BatchRecordArray if batch update succeeds, but results contain errors or null - * records - * @throws org.springframework.dao.DataAccessException if batch operation failed (see - * {@link DefaultAerospikeExceptionTranslator} for details) + * @throws AerospikeException.BatchRecordArray if batch update succeeds, but results contain errors or null + * records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void updateAll(Iterable documents); @@ -306,10 +350,12 @@ public interface AerospikeOperations { * * @param documents The documents that identify the records to be updated. Must not be {@literal null}. * @param setName Set name to override the set associated with the document. - * @throws AerospikeException.BatchRecordArray if batch update succeeds, but results contain errors or null - * records - * @throws org.springframework.dao.DataAccessException if batch operation failed (see - * {@link DefaultAerospikeExceptionTranslator} for details) + * @throws AerospikeException.BatchRecordArray if batch update succeeds, but results contain errors or null + * records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void updateAll(Iterable documents, String setName); @@ -326,83 +372,139 @@ public interface AerospikeOperations { * * @param id The id of the record to delete. Must not be {@literal null}. * @param entityClass The class to extract set name from. Must not be {@literal null}. - * @return whether the document existed on server before deletion. + * @return whether the record existed on server before deletion. * @deprecated since 4.6.0, use {@link AerospikeOperations#deleteById(Object, Class)} instead. */ boolean delete(Object id, Class entityClass); /** * Delete a record using the document's id. + *

+ * If the document has version property it will be compared with the corresponding record's version on server. * * @param document The document to get set name and id from. Must not be {@literal null}. - * @return Whether the document existed on server before deletion. + * @return Whether the record existed on server before deletion. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ boolean delete(T document); /** * Delete a record within the given set using the document's id. + *

+ * If the document has version property it will be compared with the corresponding record's version on server. * * @param document The document to get id from. Must not be {@literal null}. * @param setName Set name to use. - * @return Whether the document existed on server before deletion. + * @return Whether the record existed on server before deletion. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ boolean delete(T document, String setName); + + /** + * Delete multiple records in one batch request. The policies are analogous to {@link #delete(Object)}. + *

+ * The execution order is NOT preserved. + *

+ * This operation requires Server version 6.0+. + * + * @param documents The documents to be deleted. Must not be {@literal null}. + * @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). + */ + void deleteAll(Iterable documents); + + /** + * Delete multiple records within the given set (overrides the default set associated with the documents) in one + * batch request. The policies are analogous to {@link #delete(Object)}. + *

+ * The execution order is NOT preserved. + *

+ * This operation requires Server version 6.0+. + * + * @param documents The documents to be deleted. Must not be {@literal null}. + * @param setName Set name to override the default set associated with the documents. + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). + */ + void deleteAll(Iterable documents, String setName); + /** * Delete a record by id, set name will be determined by the given entityClass. + *

+ * If the document has version property it is not compared with the corresponding record's version on server. * * @param id The id of the record to be deleted. Must not be {@literal null}. * @param entityClass The class to extract set name from. Must not be {@literal null}. - * @return Whether the document existed on server before deletion. + * @return Whether the record existed on server before deletion. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ boolean deleteById(Object id, Class entityClass); /** * Delete a record by id within the given set. + *

+ * If the document has version property it is not compared with the corresponding record's version on server. * * @param id The id of the record to be deleted. Must not be {@literal null}. * @param setName Set name to use. - * @return Whether the document existed on server before deletion. + * @return Whether the record existed on server before deletion. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ boolean deleteById(Object id, String setName); /** * Delete records by ids using a single batch delete operation, set name will be determined by the given - * entityClass. + * entityClass. The policies are analogous to {@link #deleteById(Object, Class)}. *

* This operation requires Server version 6.0+. * * @param ids The ids of the records to be deleted. Must not be {@literal null}. * @param entityClass The class to extract set name from. Must not be {@literal null}. - * @throws AerospikeException.BatchRecordArray if batch delete results contain errors - * @throws org.springframework.dao.DataAccessException if batch operation failed (see - * {@link DefaultAerospikeExceptionTranslator} for details) + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void deleteByIds(Iterable ids, Class entityClass); /** - * Delete records by ids within the given set using a single batch delete operation. + * Delete records by ids within the given set using a single batch delete operation. The policies are analogous to + * {@link #deleteById(Object, String)}. *

* This operation requires Server version 6.0+. * * @param ids The ids of the records to be deleted. Must not be {@literal null}. * @param setName Set name to use. - * @throws AerospikeException.BatchRecordArray if batch delete results contain errors - * @throws org.springframework.dao.DataAccessException if batch operation failed (see - * {@link DefaultAerospikeExceptionTranslator} for details) + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void deleteByIds(Iterable ids, String setName); /** - * Perform a single batch delete for records from different sets. + * Perform a single batch delete operation for records from different sets. *

- * This operation requires Server version 6.0+. + * Records' versions on server are not checked. + *

+ * This operation requires Server 6.0+. * * @param groupedKeys Keys grouped by document type. Must not be {@literal null}, groupedKeys.getEntitiesKeys() must * not be {@literal null}. - * @throws AerospikeException.BatchRecordArray if batch delete results contain errors - * @throws org.springframework.dao.DataAccessException if batch operation failed (see - * {@link DefaultAerospikeExceptionTranslator} for details) + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ void deleteByIds(GroupedKeys groupedKeys); @@ -410,6 +512,7 @@ public interface AerospikeOperations { * Truncate/Delete all records in the set determined by the given entity class. * * @param entityClass The class to extract set name from. Must not be {@literal null}. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ void deleteAll(Class entityClass); @@ -417,6 +520,7 @@ public interface AerospikeOperations { * Truncate/Delete all documents in the given set. * * @param setName Set name to truncate/delete all records in. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ void deleteAll(String setName); diff --git a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java index ce5160480..638a9364b 100644 --- a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java @@ -66,6 +66,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.springframework.data.aerospike.core.BaseAerospikeTemplate.OperationType.DELETE_OPERATION; import static org.springframework.data.aerospike.core.BaseAerospikeTemplate.OperationType.INSERT_OPERATION; import static org.springframework.data.aerospike.core.BaseAerospikeTemplate.OperationType.SAVE_OPERATION; import static org.springframework.data.aerospike.core.BaseAerospikeTemplate.OperationType.UPDATE_OPERATION; @@ -140,12 +141,12 @@ public void save(T document, String setName) { AerospikeWriteData data = writeData(document, setName); AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); if (entity.hasVersionProperty()) { - WritePolicy policy = expectGenerationCasAwareSavePolicy(data); + WritePolicy policy = expectGenerationCasAwarePolicy(data); // mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions - doPersistWithVersionAndHandleCasError(document, data, policy, true); + doPersistWithVersionAndHandleCasError(document, data, policy, true, SAVE_OPERATION); } else { - WritePolicy policy = ignoreGenerationSavePolicy(data, RecordExistsAction.UPDATE); + WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE); // mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions Operation[] operations = operations(data.getBinsAsArray(), Operation::put, @@ -194,6 +195,8 @@ private void batchWriteAllDocuments(List documents, String setName, Opera documents.forEach(document -> batchWriteDataList.add(getBatchWriteForInsert(document, setName))); case UPDATE_OPERATION -> documents.forEach(document -> batchWriteDataList.add(getBatchWriteForUpdate(document, setName))); + case DELETE_OPERATION -> + documents.forEach(document -> batchWriteDataList.add(getBatchWriteForDelete(document, setName))); default -> throw new IllegalArgumentException("Unexpected operation name: " + operationType); } @@ -202,25 +205,39 @@ private void batchWriteAllDocuments(List documents, String setName, Opera // requires server ver. >= 6.0.0 client.operate(null, batchWriteRecords); } catch (AerospikeException e) { - throw translateError(e); + throw translateError(e); // no exception is thrown for versions mismatch, only record's result code shows it } checkForErrorsAndUpdateVersion(batchWriteDataList, batchWriteRecords, operationType); } - private void checkForErrorsAndUpdateVersion(List> batchWriteDataList, - List batchWriteRecords, OperationType operationType) { + protected void checkForErrorsAndUpdateVersion(List> batchWriteDataList, + List batchWriteRecords, + OperationType operationType) { boolean errorsFound = false; + String casErrorDocumentId = null; for (BaseAerospikeTemplate.BatchWriteData data : batchWriteDataList) { if (!errorsFound && batchRecordFailed(data.batchRecord())) { errorsFound = true; } - if (data.hasVersionProperty() && !batchRecordFailed(data.batchRecord())) { - updateVersion(data.document(), data.batchRecord().record); + if (data.hasVersionProperty()) { + if (!batchRecordFailed(data.batchRecord())) { + if (operationType != DELETE_OPERATION) updateVersion(data.document(), data.batchRecord().record); + } else { + if (hasOptimisticLockingError(data.batchRecord().resultCode)) { + // ID can be a String or a primitive + casErrorDocumentId = data.batchRecord().key.userKey.toString(); + } + } } } if (errorsFound) { + if (casErrorDocumentId != null) { + throw getOptimisticLockingFailureException( + "Failed to %s the record with ID '%s' due to versions mismatch" + .formatted(operationType, casErrorDocumentId), null); + } AerospikeException e = new AerospikeException("Errors during batch " + operationType); throw new AerospikeException.BatchRecordArray(batchWriteRecords.toArray(BatchRecord[]::new), e); } @@ -238,7 +255,7 @@ public void insert(T document, String setName) { Assert.notNull(setName, "Set name must not be null!"); AerospikeWriteData data = writeData(document, setName); - WritePolicy policy = ignoreGenerationSavePolicy(data, RecordExistsAction.CREATE_ONLY); + WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.CREATE_ONLY); AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); if (entity.hasVersionProperty()) { // we are ignoring generation here as insert operation should fail with DuplicateKeyException if key @@ -302,12 +319,12 @@ public void update(T document, String setName) { AerospikeWriteData data = writeData(document, setName); AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); if (entity.hasVersionProperty()) { - WritePolicy policy = expectGenerationSavePolicy(data, RecordExistsAction.UPDATE_ONLY); + WritePolicy policy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY); // mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions - doPersistWithVersionAndHandleCasError(document, data, policy, true); + doPersistWithVersionAndHandleCasError(document, data, policy, true, UPDATE_OPERATION); } else { - WritePolicy policy = ignoreGenerationSavePolicy(data, RecordExistsAction.UPDATE_ONLY); + WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY); // mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions Operation[] operations = Stream.concat(Stream.of(Operation.delete()), data.getBins().stream() @@ -330,11 +347,11 @@ public void update(T document, String setName, Collection fields) { AerospikeWriteData data = writeDataWithSpecificFields(document, setName, fields); AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); if (entity.hasVersionProperty()) { - WritePolicy policy = expectGenerationSavePolicy(data, RecordExistsAction.UPDATE_ONLY); + WritePolicy policy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY); - doPersistWithVersionAndHandleCasError(document, data, policy, false); + doPersistWithVersionAndHandleCasError(document, data, policy, false, UPDATE_OPERATION); } else { - WritePolicy policy = ignoreGenerationSavePolicy(data, RecordExistsAction.UPDATE_ONLY); + WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY); Operation[] operations = operations(data.getBinsAsArray(), Operation::put); doPersistAndHandleError(data, policy, operations); @@ -372,7 +389,7 @@ public boolean delete(Object id, Class entityClass) { try { Key key = getKey(id, getSetName(entityClass)); - return this.client.delete(ignoreGenerationDeletePolicy(), key); + return client.delete(ignoreGenerationPolicy(), key); } catch (AerospikeException e) { throw translateError(e); } @@ -389,10 +406,25 @@ public boolean delete(T document, String setName) { Assert.notNull(document, "Document must not be null!"); Assert.notNull(setName, "Set name must not be null!"); + AerospikeWriteData data = writeData(document, setName); + AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); + if (entity.hasVersionProperty()) { + return doDeleteWithVersionAndHandleCasError(data); + } + return doDeleteIgnoreVersionAndTranslateError(data); + } + + private boolean doDeleteWithVersionAndHandleCasError(AerospikeWriteData data) { try { - AerospikeWriteData data = writeData(document, setName); + return client.delete(expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY), data.getKey()); + } catch (AerospikeException e) { + throw translateCasError(e, "Failed to delete record due to versions mismatch"); + } + } - return this.client.delete(ignoreGenerationDeletePolicy(), data.getKey()); + private boolean doDeleteIgnoreVersionAndTranslateError(AerospikeWriteData data) { + try { + return client.delete(ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY), data.getKey()); } catch (AerospikeException e) { throw translateError(e); } @@ -412,16 +444,41 @@ public boolean deleteById(Object id, String setName) { try { Key key = getKey(id, setName); - return this.client.delete(ignoreGenerationDeletePolicy(), key); + return client.delete(ignoreGenerationPolicy(), key); } catch (AerospikeException e) { throw translateError(e); } } + @Override + public void deleteAll(Iterable documents) { + String setName = getSetName(documents.iterator().next()); + + int batchSize = converter.getAerospikeDataSettings().getBatchWriteSize(); + List documentsList = new ArrayList<>(); + for (Object document : documents) { + if (batchWriteSizeMatch(batchSize, documentsList.size())) { + deleteAll(documentsList, setName); + documentsList.clear(); + } + documentsList.add(document); + } + if (!documentsList.isEmpty()) { + deleteAll(documentsList, setName); + } + } + + @Override + public void deleteAll(Iterable documents, String setName) { + Assert.notNull(setName, "Set name must not be null!"); + validateForBatchWrite(documents, "Documents for deleting"); + + applyBufferedBatchWrite(documents, setName, DELETE_OPERATION); + } + @Override public void deleteByIds(Iterable ids, Class entityClass) { Assert.notNull(entityClass, "Class must not be null!"); - validateForBatchWrite(ids, "IDs"); deleteByIds(ids, getSetName(entityClass)); } @@ -536,7 +593,7 @@ public T add(T document, String setName, Map values) { .expiration(data.getExpiration()) .build(); - Record aeroRecord = this.client.operate(writePolicy, data.getKey(), ops); + Record aeroRecord = client.operate(writePolicy, data.getKey(), ops); return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord); } catch (AerospikeException e) { @@ -562,7 +619,7 @@ public T add(T document, String setName, String binName, long value) { .expiration(data.getExpiration()) .build(); - Record aeroRecord = this.client.operate(writePolicy, data.getKey(), + Record aeroRecord = client.operate(writePolicy, data.getKey(), Operation.add(new Bin(binName, value)), Operation.get()); return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord); @@ -585,7 +642,7 @@ public T append(T document, String setName, Map values) { try { AerospikeWriteData data = writeData(document, setName); Operation[] ops = operations(values, Operation.Type.APPEND, Operation.get()); - Record aeroRecord = this.client.operate(null, data.getKey(), ops); + Record aeroRecord = client.operate(null, data.getKey(), ops); return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord); } catch (AerospikeException e) { @@ -606,7 +663,7 @@ public T append(T document, String setName, String binName, String value) { try { AerospikeWriteData data = writeData(document, setName); - Record aeroRecord = this.client.operate(null, data.getKey(), + Record aeroRecord = client.operate(null, data.getKey(), Operation.append(new Bin(binName, value)), Operation.get(binName)); @@ -629,7 +686,7 @@ public T prepend(T document, String setName, String fieldName, String value) try { AerospikeWriteData data = writeData(document, setName); - Record aeroRecord = this.client.operate(null, data.getKey(), + Record aeroRecord = client.operate(null, data.getKey(), Operation.prepend(new Bin(fieldName, value)), Operation.get(fieldName)); @@ -653,7 +710,7 @@ public T prepend(T document, String setName, Map values) { try { AerospikeWriteData data = writeData(document, setName); Operation[] ops = operations(values, Operation.Type.PREPEND, Operation.get()); - Record aeroRecord = this.client.operate(null, data.getKey(), ops); + Record aeroRecord = client.operate(null, data.getKey(), ops); return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord); } catch (AerospikeException e) { @@ -773,7 +830,7 @@ private Record getAndTouch(Key key, int expiration, String[] binNames, Query que try { if (binNames == null || binNames.length == 0) { - return this.client.operate(writePolicy, key, Operation.touch(), Operation.get()); + return client.operate(writePolicy, key, Operation.touch(), Operation.get()); } else { Operation[] operations = new Operation[binNames.length + 1]; operations[0] = Operation.touch(); @@ -781,7 +838,7 @@ private Record getAndTouch(Key key, int expiration, String[] binNames, Query que for (int i = 1; i < operations.length; i++) { operations[i] = Operation.get(binNames[i - 1]); } - return this.client.operate(writePolicy, key, operations); + return client.operate(writePolicy, key, operations); } } catch (AerospikeException aerospikeException) { if (aerospikeException.getResultCode() == ResultCode.KEY_NOT_FOUND_ERROR) { @@ -1051,7 +1108,7 @@ public boolean exists(Object id, String setName) { try { Key key = getKey(id, setName); - Record aeroRecord = this.client.operate(null, key, Operation.getHeader()); + Record aeroRecord = client.operate(null, key, Operation.getHeader()); return aeroRecord != null; } catch (AerospikeException e) { throw translateError(e); @@ -1154,10 +1211,10 @@ public ResultSet aggregate(Filter filter, String setName, statement.setNamespace(this.namespace); ResultSet resultSet; if (arguments != null && !arguments.isEmpty()) - resultSet = this.client.queryAggregate(null, statement, module, + resultSet = client.queryAggregate(null, statement, module, function, arguments.toArray(new Value[0])); else - resultSet = this.client.queryAggregate(null, statement); + resultSet = client.queryAggregate(null, statement); return resultSet; } @@ -1280,21 +1337,21 @@ public boolean indexExists(String indexName) { return false; } - private void doPersistAndHandleError(AerospikeWriteData data, WritePolicy policy, Operation[] operations) { + private Record doPersistAndHandleError(AerospikeWriteData data, WritePolicy policy, Operation[] operations) { try { - client.operate(policy, data.getKey(), operations); + return client.operate(policy, data.getKey(), operations); } catch (AerospikeException e) { throw translateError(e); } } private void doPersistWithVersionAndHandleCasError(T document, AerospikeWriteData data, WritePolicy policy, - boolean firstlyDeleteBins) { + boolean firstlyDeleteBins, OperationType operationType) { try { Record newAeroRecord = putAndGetHeader(data, policy, firstlyDeleteBins); updateVersion(document, newAeroRecord); } catch (AerospikeException e) { - throw translateCasError(e); + throw translateCasError(e, "Failed to " + operationType.toString() + " record due to versions mismatch"); } } diff --git a/src/main/java/org/springframework/data/aerospike/core/BaseAerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/BaseAerospikeTemplate.java index 50b2a1610..3ad906d1b 100644 --- a/src/main/java/org/springframework/data/aerospike/core/BaseAerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/BaseAerospikeTemplate.java @@ -197,14 +197,23 @@ T updateVersion(T document, Record newAeroRecord) { return document; } - RuntimeException translateCasError(AerospikeException e) { - int code = e.getResultCode(); - if (code == ResultCode.KEY_EXISTS_ERROR || code == ResultCode.GENERATION_ERROR) { - return new OptimisticLockingFailureException("Save document with version value failed", e); + RuntimeException translateCasError(AerospikeException e, String errMsg) { + if (hasOptimisticLockingError(e.getResultCode())) { + return getOptimisticLockingFailureException(errMsg, e); } return translateError(e); } + protected boolean hasOptimisticLockingError(int resultCode) { + return List.of(ResultCode.GENERATION_ERROR, ResultCode.KEY_EXISTS_ERROR, ResultCode.KEY_NOT_FOUND_ERROR) + .contains(resultCode); + } + + protected OptimisticLockingFailureException getOptimisticLockingFailureException(String errMsg, + AerospikeException e) { + return new OptimisticLockingFailureException(errMsg, e); + } + RuntimeException translateError(AerospikeException e) { DataAccessException translated = exceptionTranslator.translateExceptionIfPossible(e); return translated == null ? e : translated; @@ -225,27 +234,27 @@ AerospikeWriteData writeDataWithSpecificFields(T document, String setName, C return data; } - WritePolicy expectGenerationCasAwareSavePolicy(AerospikeWriteData data) { + WritePolicy expectGenerationCasAwarePolicy(AerospikeWriteData data) { RecordExistsAction recordExistsAction = data.getVersion() .filter(v -> v > 0L) .map(v -> RecordExistsAction.UPDATE_ONLY) // updating existing document with generation, // cannot use REPLACE_ONLY due to bin convergence feature restrictions .orElse(RecordExistsAction.CREATE_ONLY); // create new document, // if exists we should fail with optimistic locking - return expectGenerationSavePolicy(data, recordExistsAction); + return expectGenerationPolicy(data, recordExistsAction); } - BatchWritePolicy expectGenerationCasAwareSaveBatchPolicy(AerospikeWriteData data) { + BatchWritePolicy expectGenerationCasAwareBatchPolicy(AerospikeWriteData data) { RecordExistsAction recordExistsAction = data.getVersion() .filter(v -> v > 0L) .map(v -> RecordExistsAction.UPDATE_ONLY) // updating existing document with generation, // cannot use REPLACE_ONLY due to bin convergence feature restrictions .orElse(RecordExistsAction.CREATE_ONLY); // create new document, // if exists we should fail with optimistic locking - return expectGenerationSaveBatchPolicy(data, recordExistsAction); + return expectGenerationBatchPolicy(data, recordExistsAction); } - WritePolicy expectGenerationSavePolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { + WritePolicy expectGenerationPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { return WritePolicyBuilder.builder(this.writePolicyDefault) .generationPolicy(GenerationPolicy.EXPECT_GEN_EQUAL) .generation(data.getVersion().orElse(0)) @@ -254,7 +263,7 @@ WritePolicy expectGenerationSavePolicy(AerospikeWriteData data, RecordExistsActi .build(); } - BatchWritePolicy expectGenerationSaveBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { + BatchWritePolicy expectGenerationBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { BatchWritePolicy batchWritePolicy = new BatchWritePolicy(this.batchWritePolicyDefault); batchWritePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; batchWritePolicy.generation = data.getVersion().orElse(0); @@ -263,7 +272,7 @@ BatchWritePolicy expectGenerationSaveBatchPolicy(AerospikeWriteData data, Record return batchWritePolicy; } - WritePolicy ignoreGenerationSavePolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { + WritePolicy ignoreGenerationPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { return WritePolicyBuilder.builder(this.writePolicyDefault) .generationPolicy(GenerationPolicy.NONE) .expiration(data.getExpiration()) @@ -271,7 +280,7 @@ WritePolicy ignoreGenerationSavePolicy(AerospikeWriteData data, RecordExistsActi .build(); } - BatchWritePolicy ignoreGenerationSaveBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { + BatchWritePolicy ignoreGenerationBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { BatchWritePolicy batchWritePolicy = new BatchWritePolicy(this.batchWritePolicyDefault); batchWritePolicy.generationPolicy = GenerationPolicy.NONE; batchWritePolicy.expiration = data.getExpiration(); @@ -279,12 +288,20 @@ BatchWritePolicy ignoreGenerationSaveBatchPolicy(AerospikeWriteData data, Record return batchWritePolicy; } - WritePolicy ignoreGenerationDeletePolicy() { + WritePolicy ignoreGenerationPolicy() { return WritePolicyBuilder.builder(this.writePolicyDefault) .generationPolicy(GenerationPolicy.NONE) .build(); } + WritePolicy expectGenerationPolicy(AerospikeWriteData data) { + return WritePolicyBuilder.builder(this.writePolicyDefault) + .generationPolicy(GenerationPolicy.EXPECT_GEN_EQUAL) + .generation(data.getVersion().orElse(0)) + .expiration(data.getExpiration()) + .build(); + } + Key getKey(Object id, AerospikePersistentEntity entity) { return getKey(id, entity.getSetName()); } @@ -382,9 +399,8 @@ protected Operation[] getPutAndGetHeaderOperations(AerospikeWriteData data, bool "Cannot put and get header on a document with no bins and \"@_class\" bin disabled."); } - return firstlyDeleteBins ? operations(bins, Operation::put, - Operation.array(Operation.delete()), Operation.array(Operation.getHeader())) - : operations(bins, Operation::put, null, Operation.array(Operation.getHeader())); + return operations(bins, Operation::put, firstlyDeleteBins ? Operation.array(Operation.delete()) : null, + Operation.array(Operation.getHeader())); } public BatchWriteData getBatchWriteForSave(T document, String setName) { @@ -396,12 +412,12 @@ public BatchWriteData getBatchWriteForSave(T document, String setName) { Operation[] operations; BatchWritePolicy policy; if (entity.hasVersionProperty()) { - policy = expectGenerationCasAwareSaveBatchPolicy(data); + policy = expectGenerationCasAwareBatchPolicy(data); // mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions operations = getPutAndGetHeaderOperations(data, true); } else { - policy = ignoreGenerationSaveBatchPolicy(data, RecordExistsAction.UPDATE); + policy = ignoreGenerationBatchPolicy(data, RecordExistsAction.UPDATE); // mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions operations = operations(data.getBinsAsArray(), Operation::put, @@ -419,7 +435,7 @@ public BatchWriteData getBatchWriteForInsert(T document, String setName) AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); Operation[] operations; - BatchWritePolicy policy = ignoreGenerationSaveBatchPolicy(data, RecordExistsAction.CREATE_ONLY); + BatchWritePolicy policy = ignoreGenerationBatchPolicy(data, RecordExistsAction.CREATE_ONLY); if (entity.hasVersionProperty()) { operations = getPutAndGetHeaderOperations(data, false); } else { @@ -439,12 +455,12 @@ public BatchWriteData getBatchWriteForUpdate(T document, String setName) Operation[] operations; BatchWritePolicy policy; if (entity.hasVersionProperty()) { - policy = expectGenerationSaveBatchPolicy(data, RecordExistsAction.UPDATE_ONLY); + policy = expectGenerationBatchPolicy(data, RecordExistsAction.UPDATE_ONLY); // mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions operations = getPutAndGetHeaderOperations(data, true); } else { - policy = ignoreGenerationSaveBatchPolicy(data, RecordExistsAction.UPDATE_ONLY); + policy = ignoreGenerationBatchPolicy(data, RecordExistsAction.UPDATE_ONLY); // mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions operations = Stream.concat(Stream.of(Operation.delete()), data.getBins().stream() @@ -455,6 +471,25 @@ public BatchWriteData getBatchWriteForUpdate(T document, String setName) entity.hasVersionProperty()); } + public BatchWriteData getBatchWriteForDelete(T document, String setName) { + Assert.notNull(document, "Document must not be null!"); + + AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); + Operation[] operations; + BatchWritePolicy policy; + AerospikeWriteData data = writeData(document, setName); + + if (entity.hasVersionProperty()) { + policy = expectGenerationBatchPolicy(data, RecordExistsAction.UPDATE_ONLY); + } else { + policy = ignoreGenerationBatchPolicy(data, RecordExistsAction.UPDATE_ONLY); + } + operations = Operation.array(Operation.delete()); + + return new BatchWriteData<>(document, new BatchWrite(policy, data.getKey(), operations), + entity.hasVersionProperty()); + } + protected void validateGroupedKeys(GroupedKeys groupedKeys) { Assert.notNull(groupedKeys, "Grouped keys must not be null!"); validateForBatchWrite(groupedKeys.getEntitiesKeys(), "Entities keys"); @@ -481,7 +516,8 @@ protected boolean batchWriteSupported() { protected enum OperationType { SAVE_OPERATION("save"), INSERT_OPERATION("insert"), - UPDATE_OPERATION("update"); + UPDATE_OPERATION("update"), + DELETE_OPERATION("delete"); private final String name; diff --git a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeOperations.java b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeOperations.java index 4561162e7..6299efc98 100644 --- a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeOperations.java +++ b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeOperations.java @@ -21,6 +21,8 @@ import com.aerospike.client.query.IndexCollectionType; import com.aerospike.client.query.IndexType; import com.aerospike.client.reactor.IAerospikeReactorClient; +import org.springframework.dao.DataAccessException; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.aerospike.config.AerospikeDataSettings; import org.springframework.data.aerospike.convert.MappingAerospikeConverter; import org.springframework.data.aerospike.core.model.GroupedEntities; @@ -81,6 +83,10 @@ public interface ReactiveAerospikeOperations { * * @param document The document to be saved. Must not be {@literal null}. * @return A Mono of the new saved document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono save(T document); @@ -103,6 +109,10 @@ public interface ReactiveAerospikeOperations { * @param document The document to be saved. Must not be {@literal null}. * @param setName The set name to save the document. * @return A Mono of the new saved document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono save(T document, String setName); @@ -114,9 +124,12 @@ public interface ReactiveAerospikeOperations { * Requires Server version 6.0+. * * @param documents The documents to be saved. Must not be {@literal null}. - * @return A Flux of the saved documents, otherwise onError is signalled with - * {@link AerospikeException.BatchRecordArray} if batch save results contain errors, or with - * {@link org.springframework.dao.DataAccessException} if batch operation failed. + * @return A Flux of the saved documents + * @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ Flux saveAll(Iterable documents); @@ -130,9 +143,12 @@ public interface ReactiveAerospikeOperations { * * @param documents The documents to be saved. Must not be {@literal null}. * @param setName The set name to save to documents. - * @return A Flux of the saved documents, otherwise onError is signalled with - * {@link AerospikeException.BatchRecordArray} if batch save results contain errors, or with - * {@link org.springframework.dao.DataAccessException} if batch operation failed. + * @return A Flux of the saved documents + * @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ Flux saveAll(Iterable documents, String setName); @@ -143,6 +159,10 @@ public interface ReactiveAerospikeOperations { * * @param document The document to be inserted. Must not be {@literal null}. * @return A Mono of the new inserted document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono insert(T document); @@ -155,6 +175,10 @@ public interface ReactiveAerospikeOperations { * @param document The document to be inserted. Must not be {@literal null}. * @param setName The set name to insert the document. * @return A Mono of the new inserted document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono insert(T document, String setName); @@ -166,9 +190,13 @@ public interface ReactiveAerospikeOperations { * Requires Server version 6.0+. * * @param documents Documents to insert. Must not be {@literal null}. - * @return A Flux of the inserted documents, otherwise onError is signalled with - * {@link AerospikeException.BatchRecordArray} if batch insert results contain errors, or with - * {@link org.springframework.dao.DataAccessException} if batch operation failed. + * @return A Flux of the inserted documents + * @throws AerospikeException.BatchRecordArray if batch insert succeeds, but results contain errors or null + * records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ Flux insertAll(Iterable documents); @@ -182,9 +210,13 @@ public interface ReactiveAerospikeOperations { * * @param documents Documents to insert. Must not be {@literal null}. * @param setName The set name to insert the documents. - * @return A Flux of the inserted documents, otherwise onError is signalled with - * {@link AerospikeException.BatchRecordArray} if batch insert results contain errors, or with - * {@link org.springframework.dao.DataAccessException} if batch operation failed. + * @return A Flux of the inserted documents + * @throws AerospikeException.BatchRecordArray if batch insert succeeds, but results contain errors or null + * records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ Flux insertAll(Iterable documents, String setName); @@ -195,6 +227,7 @@ public interface ReactiveAerospikeOperations { * @param writePolicy The Aerospike write policy for the inner Aerospike put operation. Must not be * {@literal null}. * @return A Mono of the new persisted document. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ Mono persist(T document, WritePolicy writePolicy); @@ -207,6 +240,7 @@ public interface ReactiveAerospikeOperations { * {@literal null}. * @param setName Set name to override the set associated with the document. * @return A Mono of the new persisted document. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ Mono persist(T document, WritePolicy writePolicy, String setName); @@ -220,6 +254,10 @@ public interface ReactiveAerospikeOperations { * * @param document The document that identifies the record to be updated. Must not be {@literal null}. * @return A Mono of the new updated document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono update(T document); @@ -234,6 +272,10 @@ public interface ReactiveAerospikeOperations { * @param document The document that identifies the record to be updated. Must not be {@literal null}. * @param setName The set name to update the document. * @return A Mono of the new updated document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono update(T document, String setName); @@ -247,6 +289,10 @@ public interface ReactiveAerospikeOperations { * * @param document The document that identifies the record to be updated. Must not be {@literal null}. * @return A Mono of the new updated document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono update(T document, Collection fields); @@ -261,6 +307,10 @@ public interface ReactiveAerospikeOperations { * @param document The document that identifies the record to be updated. Must not be {@literal null}. * @param setName The set name to update the document. * @return A Mono of the new updated document. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono update(T document, String setName, Collection fields); @@ -272,9 +322,13 @@ public interface ReactiveAerospikeOperations { * Requires Server version 6.0+. * * @param documents The documents that identify the records to be updated. Must not be {@literal null}. - * @return A Flux of the updated documents, otherwise onError is signalled with - * {@link AerospikeException.BatchRecordArray} if batch update results contain errors, or with - * {@link org.springframework.dao.DataAccessException} if batch operation failed. + * @return A Flux of the updated documents + * @throws AerospikeException.BatchRecordArray if batch update succeeds, but results contain errors or null + * records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ Flux updateAll(Iterable documents); @@ -288,9 +342,13 @@ public interface ReactiveAerospikeOperations { * * @param documents The documents that identify the records to be updated. Must not be {@literal null}. * @param setName The set name to update the documents. - * @return A Flux of the updated documents, otherwise onError is signalled with - * {@link AerospikeException.BatchRecordArray} if batch update results contain errors, or with - * {@link org.springframework.dao.DataAccessException} if batch operation failed. + * @return A Flux of the updated documents + * @throws AerospikeException.BatchRecordArray if batch update succeeds, but results contain errors or null + * records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ Flux updateAll(Iterable documents, String setName); @@ -314,9 +372,15 @@ public interface ReactiveAerospikeOperations { /** * Reactively delete a record using the document's id. + *

+ * If the document has version property it will be compared with the corresponding record's version on server. * * @param document The document to get set name and id from. Must not be {@literal null}. - * @return A Mono of whether the document existed on server before deletion. + * @return A Mono of whether the record existed on server before deletion. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono delete(T document); @@ -325,63 +389,113 @@ public interface ReactiveAerospikeOperations { * * @param document The document to get id from. Must not be {@literal null}. * @param setName Set name to use. - * @return A Mono of whether the document existed on server before deletion. + * @return A Mono of whether the record existed on server before deletion. + * @throws OptimisticLockingFailureException if the document has a version attribute with a different value from + * that found on server. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} + * for details). */ Mono delete(T document, String setName); + /** + * Reactively delete multiple records in one batch request. The policies are analogous to {@link #delete(Object)}. + *

+ * The execution order is NOT preserved. + *

+ * This operation requires Server version 6.0+. + * + * @param documents The documents to be deleted. Must not be {@literal null}. + * @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null records. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). + */ + Mono deleteAll(Iterable documents); + + /** + * Reactively delete multiple records within the given set (overrides the default set associated with the documents) + * in one batch request. The policies are analogous to {@link #delete(Object)}. + *

+ * The execution order is NOT preserved. + *

+ * This operation requires Server version 6.0+. + * + * @param documents The documents to be deleted. Must not be {@literal null}. + * @param setName Set name to override the default set associated with the documents. + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors. + * @throws OptimisticLockingFailureException if at least one document has a version attribute with a different + * value from that found on server. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). + */ + Mono deleteAll(Iterable documents, String setName); + /** * Reactively delete a record by id, set name will be determined by the given entity class. + *

+ * If the document has version property it is not compared with the corresponding record's version on server. * * @param id The id of the record to be deleted. Must not be {@literal null}. * @param entityClass The class to extract the Aerospike set name from. Must not be {@literal null}. - * @return A Mono of whether the document existed on server before deletion. + * @return A Mono of whether the record existed on server before deletion. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ Mono deleteById(Object id, Class entityClass); /** - * Reactively delete a record within the given set by id. + * Reactively delete a record by id within the given set. + *

+ * If the document has version property it is not compared with the corresponding record's version on server. * * @param id The id of the record to be deleted. Must not be {@literal null}. * @param setName Set name to use. - * @return A Mono of whether the document existed on server before deletion. + * @return A Mono of whether the record existed on server before deletion. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ Mono deleteById(Object id, String setName); /** * Reactively delete records using a single batch delete operation, set name will be determined by the given entity - * class. + * class. The policies are analogous to {@link #deleteById(Object, Class)}. *

* This operation requires Server version 6.0+. * * @param ids The ids of the records to find. Must not be {@literal null}. * @param entityClass The class to extract the Aerospike set name from and to map the results to. Must not be * {@literal null}. - * @return onError is signalled with {@link AerospikeException.BatchRecordArray} if batch delete results contain - * errors, or with {@link org.springframework.dao.DataAccessException} if batch operation failed. + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ Mono deleteByIds(Iterable ids, Class entityClass); /** - * Reactively delete records within the given set using a single batch delete operation. + * Reactively delete records within the given set using a single batch delete operation. The policies are analogous + * to {@link #deleteById(Object, String)}. *

* This operation requires Server version 6.0+. * * @param ids The ids of the documents to find. Must not be {@literal null}. * @param setName Set name to use. - * @return onError is signalled with {@link AerospikeException.BatchRecordArray} if batch delete results contain - * errors, or with {@link org.springframework.dao.DataAccessException} if batch operation failed. + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ Mono deleteByIds(Iterable ids, String setName); /** * Reactively delete records from different sets in a single request. *

+ * Records' versions on server are not checked. + *

* This operation requires Server version 6.0+. * * @param groupedKeys Keys grouped by document type. Must not be {@literal null}, groupedKeys.getEntitiesKeys() must * not be {@literal null}. - * @return onError is signalled with {@link AerospikeException.BatchRecordArray} if batch delete results contain - * errors, or with {@link org.springframework.dao.DataAccessException} if batch operation failed. + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors. + * @throws DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details). */ Mono deleteByIds(GroupedKeys groupedKeys); @@ -389,6 +503,7 @@ public interface ReactiveAerospikeOperations { * Reactively truncate/delete all records in the set determined by the given entity class. * * @param entityClass The class to extract set name from. Must not be {@literal null}. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ Mono deleteAll(Class entityClass); @@ -396,6 +511,7 @@ public interface ReactiveAerospikeOperations { * Reactively truncate/delete all the documents in the given set. * * @param setName Set name to use. + * @throws DataAccessException if operation failed (see {@link DefaultAerospikeExceptionTranslator} for details). */ Mono deleteAll(String setName); diff --git a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java index 4625e9e90..90e3f4882 100644 --- a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java @@ -65,6 +65,7 @@ import static com.aerospike.client.ResultCode.KEY_NOT_FOUND_ERROR; import static java.util.Objects.nonNull; +import static org.springframework.data.aerospike.core.BaseAerospikeTemplate.OperationType.DELETE_OPERATION; import static org.springframework.data.aerospike.core.BaseAerospikeTemplate.OperationType.INSERT_OPERATION; import static org.springframework.data.aerospike.core.BaseAerospikeTemplate.OperationType.SAVE_OPERATION; import static org.springframework.data.aerospike.core.BaseAerospikeTemplate.OperationType.UPDATE_OPERATION; @@ -124,14 +125,14 @@ public Mono save(T document, String setName) { AerospikeWriteData data = writeData(document, setName); AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); if (entity.hasVersionProperty()) { - WritePolicy policy = expectGenerationCasAwareSavePolicy(data); + WritePolicy policy = expectGenerationCasAwarePolicy(data); // mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions Operation[] operations = operations(data.getBinsAsArray(), Operation::put, Operation.array(Operation.delete())); - return doPersistWithVersionAndHandleCasError(document, data, policy, operations); + return doPersistWithVersionAndHandleCasError(document, data, policy, operations, SAVE_OPERATION); } else { - WritePolicy policy = ignoreGenerationSavePolicy(data, RecordExistsAction.UPDATE); + WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE); // mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions Operation[] operations = operations(data.getBinsAsArray(), Operation::put, Operation.array(Operation.delete())); @@ -184,6 +185,8 @@ private Flux batchWriteAllDocuments(List documents, String setName, Op documents.forEach(document -> batchWriteDataList.add(getBatchWriteForInsert(document, setName))); case UPDATE_OPERATION -> documents.forEach(document -> batchWriteDataList.add(getBatchWriteForUpdate(document, setName))); + case DELETE_OPERATION -> + documents.forEach(document -> batchWriteDataList.add(getBatchWriteForDelete(document, setName))); default -> throw new IllegalArgumentException("Unexpected operation name: " + operationType); } @@ -207,16 +210,29 @@ private Mono>> checkForErrorsAndUpdateVersion(List batchWriteRecords, OperationType operationType) { boolean errorsFound = false; + String casErrorDocumentId = null; for (BaseAerospikeTemplate.BatchWriteData data : batchWriteDataList) { if (!errorsFound && batchRecordFailed(data.batchRecord())) { errorsFound = true; } - if (data.hasVersionProperty() && !batchRecordFailed(data.batchRecord())) { - updateVersion(data.document(), data.batchRecord().record); + if (data.hasVersionProperty()) { + if (!batchRecordFailed(data.batchRecord())) { + if (operationType != DELETE_OPERATION) updateVersion(data.document(), data.batchRecord().record); + } else { + if (hasOptimisticLockingError(data.batchRecord().resultCode)) { + // ID can be a String or a primitive + casErrorDocumentId = data.batchRecord().key.userKey.toString(); + } + } } } if (errorsFound) { + if (casErrorDocumentId != null) { + return Mono.error(getOptimisticLockingFailureException( + "Failed to %s the record with ID '%s' due to versions mismatch" + .formatted(operationType, casErrorDocumentId), null)); + } AerospikeException e = new AerospikeException("Errors during batch " + operationType); return Mono.error( new AerospikeException.BatchRecordArray(batchWriteRecords.toArray(BatchRecord[]::new), e)); @@ -236,7 +252,7 @@ public Mono insert(T document, String setName) { Assert.notNull(setName, "Set name must not be null!"); AerospikeWriteData data = writeData(document, setName); - WritePolicy policy = ignoreGenerationSavePolicy(data, RecordExistsAction.CREATE_ONLY); + WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.CREATE_ONLY); AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); if (entity.hasVersionProperty()) { @@ -302,14 +318,14 @@ public Mono update(T document, String setName) { AerospikeWriteData data = writeData(document, setName); AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); if (entity.hasVersionProperty()) { - WritePolicy policy = expectGenerationSavePolicy(data, RecordExistsAction.UPDATE_ONLY); + WritePolicy policy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY); // mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions Operation[] operations = operations(data.getBinsAsArray(), Operation::put, Operation.array(Operation.delete()), Operation.array(Operation.getHeader())); - return doPersistWithVersionAndHandleCasError(document, data, policy, operations); + return doPersistWithVersionAndHandleCasError(document, data, policy, operations, UPDATE_OPERATION); } else { - WritePolicy policy = ignoreGenerationSavePolicy(data, RecordExistsAction.UPDATE_ONLY); + WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY); // mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions Operation[] operations = operations(data.getBinsAsArray(), Operation::put, @@ -332,13 +348,13 @@ public Mono update(T document, String setName, Collection fields) AerospikeWriteData data = writeDataWithSpecificFields(document, setName, fields); AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); if (entity.hasVersionProperty()) { - WritePolicy policy = expectGenerationSavePolicy(data, RecordExistsAction.UPDATE_ONLY); + WritePolicy policy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY); Operation[] operations = operations(data.getBinsAsArray(), Operation::put, null, Operation.array(Operation.getHeader())); - return doPersistWithVersionAndHandleCasError(document, data, policy, operations); + return doPersistWithVersionAndHandleCasError(document, data, policy, operations, UPDATE_OPERATION); } else { - WritePolicy policy = ignoreGenerationSavePolicy(data, RecordExistsAction.UPDATE_ONLY); + WritePolicy policy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY); Operation[] operations = operations(data.getBinsAsArray(), Operation::put); return doPersistAndHandleError(document, data, policy, operations); @@ -383,7 +399,7 @@ public Mono delete(Object id, Class entityClass) { AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(entityClass); return reactorClient - .delete(ignoreGenerationDeletePolicy(), getKey(id, entity)) + .delete(ignoreGenerationPolicy(), getKey(id, entity)) .map(k -> true) .onErrorMap(this::translateError); } @@ -399,10 +415,16 @@ public Mono delete(T document, String setName) { Assert.notNull(document, "Set name must not be null!"); AerospikeWriteData data = writeData(document, setName); - + AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); + if (entity.hasVersionProperty()) { + return reactorClient + .delete(expectGenerationPolicy(data), data.getKey()) + .hasElement() + .onErrorMap(e -> translateCasThrowable(e, DELETE_OPERATION.toString())); + } return reactorClient - .delete(ignoreGenerationDeletePolicy(), data.getKey()) - .map(key -> true) + .delete(ignoreGenerationPolicy(), data.getKey()) + .hasElement() .onErrorMap(this::translateError); } @@ -420,11 +442,26 @@ public Mono deleteById(Object id, String setName) { Assert.notNull(setName, "Set name must not be null!"); return reactorClient - .delete(ignoreGenerationDeletePolicy(), getKey(id, setName)) + .delete(ignoreGenerationPolicy(), getKey(id, setName)) .map(k -> true) .onErrorMap(this::translateError); } + @Override + public Mono deleteAll(Iterable documents) { + validateForBatchWrite(documents, "Documents for deleting"); + + return deleteAll(documents, getSetName(documents.iterator().next())); + } + + @Override + public Mono deleteAll(Iterable documents, String setName) { + Assert.notNull(setName, "Set name must not be null!"); + validateForBatchWrite(documents, "Documents for deleting"); + + return applyBufferedBatchWrite(documents, setName, DELETE_OPERATION).then(); + } + @Override public Mono deleteByIds(Iterable ids, Class entityClass) { Assert.notNull(entityClass, "Class must not be null!"); @@ -1135,7 +1172,8 @@ public Mono indexExists(String indexName) { try { Node[] nodes = reactorClient.getAerospikeClient().getNodes(); for (Node node : nodes) { - String response = Info.request(node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName); + String response = Info.request(reactorClient.getAerospikeClient().getInfoPolicyDefault(), + node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName); if (response == null) throw new AerospikeException("Null node response"); if (response.equalsIgnoreCase("true")) { @@ -1188,10 +1226,11 @@ private Mono doPersistAndHandleError(T document, AerospikeWriteData data, } private Mono doPersistWithVersionAndHandleCasError(T document, AerospikeWriteData data, WritePolicy policy, - Operation[] operations) { + Operation[] operations, OperationType operationType) { return putAndGetHeader(data, policy, operations) .map(newRecord -> updateVersion(document, newRecord)) - .onErrorMap(AerospikeException.class, this::translateCasError); + .onErrorMap(AerospikeException.class, i -> translateCasError(i, + "Failed to " + operationType.toString() + " record due to versions mismatch")); } private Mono doPersistWithVersionAndHandleError(T document, AerospikeWriteData data, WritePolicy policy, @@ -1232,8 +1271,8 @@ private String[] getBinNamesFromTargetClass(Class targetClass) { List binNamesList = new ArrayList<>(); - targetEntity.doWithProperties((PropertyHandler) property - -> binNamesList.add(property.getFieldName())); + targetEntity.doWithProperties( + (PropertyHandler) property -> binNamesList.add(property.getFieldName())); return binNamesList.toArray(new String[0]); } @@ -1245,6 +1284,13 @@ private Throwable translateError(Throwable e) { return e; } + private Throwable translateCasThrowable(Throwable e, String operationName) { + if (e instanceof AerospikeException ae) { + return translateCasError(ae, "Failed to %s record due to versions mismatch".formatted(operationName)); + } + return e; + } + private Flux findWithPostProcessing(String setName, Class targetClass, Query query) { verifyUnsortedWithOffset(query.getSort(), query.getOffset()); Flux results = findUsingQueryWithDistinctPredicate(setName, targetClass, getDistinctPredicate(query), diff --git a/src/main/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepository.java b/src/main/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepository.java index 239fd3b31..8c7f36bea 100644 --- a/src/main/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepository.java +++ b/src/main/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepository.java @@ -27,7 +27,6 @@ import org.springframework.data.repository.core.EntityInformation; import org.springframework.util.Assert; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -129,12 +128,7 @@ public void deleteById(ID id) { @Override public void deleteAll(Iterable entities) { Assert.notNull(entities, "The given entities for deleting must not be null!"); - List ids = new ArrayList<>(); - entities.forEach(entity -> { - Assert.notNull(entity, "The given Iterable of entities must not contain null!"); - ids.add(entityInformation.getId(entity)); - }); - operations.deleteByIds(ids, entityInformation.getJavaType()); + operations.deleteAll(entities); } @Override diff --git a/src/main/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepository.java b/src/main/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepository.java index 8798c4629..ddf5e9095 100644 --- a/src/main/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepository.java +++ b/src/main/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepository.java @@ -26,9 +26,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.ArrayList; -import java.util.List; - /** * Stub implementation of {@link ReactiveAerospikeRepository}. * @@ -132,12 +129,7 @@ public Mono deleteAllById(Iterable ids) { @Override public Mono deleteAll(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null!"); - List ids = new ArrayList<>(); - entities.forEach(entity -> { - Assert.notNull(entity, "The given Iterable of entities must not contain null!"); - ids.add(entityInformation.getId(entity)); - }); - return operations.deleteByIds(ids, entityInformation.getJavaType()); + return operations.deleteAll(entities); } @Override diff --git a/src/main/java/org/springframework/data/aerospike/utility/Utils.java b/src/main/java/org/springframework/data/aerospike/utility/Utils.java index 65f6cda3f..76e7c122b 100644 --- a/src/main/java/org/springframework/data/aerospike/utility/Utils.java +++ b/src/main/java/org/springframework/data/aerospike/utility/Utils.java @@ -50,7 +50,7 @@ public static String[] infoAll(IAerospikeClient client, String[] messages = new String[client.getNodes().length]; int index = 0; for (Node node : client.getNodes()) { - messages[index] = Info.request(node, infoString); + messages[index] = Info.request(client.getInfoPolicyDefault(), node, infoString); } return messages; } diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateDeleteTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateDeleteTests.java index 6861559a5..ec7dda3d4 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateDeleteTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateDeleteTests.java @@ -19,6 +19,7 @@ import com.aerospike.client.policy.GenerationPolicy; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.aerospike.BaseBlockingIntegrationTests; import org.springframework.data.aerospike.core.model.GroupedKeys; import org.springframework.data.aerospike.sample.Customer; @@ -50,22 +51,6 @@ public void beforeEach() { template.deleteAll(Customer.class); } - @Test - public void deleteByObject_ignoresDocumentVersionEvenIfDefaultGenerationPolicyIsSet() { - GenerationPolicy initialGenerationPolicy = client.getWritePolicyDefault().generationPolicy; - client.getWritePolicyDefault().generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; - try { - VersionedClass initialDocument = new VersionedClass(id, "a"); - template.insert(initialDocument); - template.update(new VersionedClass(id, "b", initialDocument.getVersion())); - - boolean deleted = template.delete(initialDocument); - assertThat(deleted).isTrue(); - } finally { - client.getWritePolicyDefault().generationPolicy = initialGenerationPolicy; - } - } - @Test public void deleteByObject_ignoresVersionEvenIfDefaultGenerationPolicyIsSet() { GenerationPolicy initialGenerationPolicy = client.getWritePolicyDefault().generationPolicy; @@ -86,53 +71,94 @@ public void deleteByObject_ignoresVersionEvenIfDefaultGenerationPolicyIsSet() { public void deleteByObject_deletesDocument() { Person document = new Person(id, "QLastName", 21); template.insert(document); + VersionedClass versionedDocument = new VersionedClass(nextId(), "test"); + template.insert(versionedDocument); boolean deleted = template.delete(document); assertThat(deleted).isTrue(); - Person result = template.findById(id, Person.class); assertThat(result).isNull(); + + boolean deleted2 = template.delete(versionedDocument); + assertThat(deleted2).isTrue(); + VersionedClass result2 = template.findById(versionedDocument.getId(), VersionedClass.class); + assertThat(result2).isNull(); } @Test public void deleteByObject_deletesDocumentWithSetName() { - Person document = new Person(id, "QLastName", 21); - template.insert(document, OVERRIDE_SET_NAME); + Person person = new Person(id, "QLastName", 21); + template.insert(person, OVERRIDE_SET_NAME); + String id2 = nextId(); + VersionedClass versionedDocument = new VersionedClass(id2, "test"); + template.insert(versionedDocument, OVERRIDE_SET_NAME); - boolean deleted = template.delete(document, OVERRIDE_SET_NAME); + boolean deleted = template.delete(person, OVERRIDE_SET_NAME); assertThat(deleted).isTrue(); - Person result = template.findById(id, Person.class, OVERRIDE_SET_NAME); assertThat(result).isNull(); + + boolean deleted2 = template.delete(versionedDocument, OVERRIDE_SET_NAME); + assertThat(deleted2).isTrue(); + VersionedClass result2 = template.findById(id2, VersionedClass.class); + assertThat(result2).isNull(); + } + + @Test + public void deleteByObject_VersionsMismatch() { + VersionedClass versionedDocument = new VersionedClass(nextId(), "test"); + + template.insert(versionedDocument); + versionedDocument.setVersion(2); + assertThatThrownBy(() -> template.delete(versionedDocument)) + .isInstanceOf(OptimisticLockingFailureException.class) + .hasMessage("Failed to delete record due to versions mismatch"); } @Test public void deleteById_deletesDocument() { Person document = new Person(id, "QLastName", 21); template.insert(document); + String id2 = nextId(); + VersionedClass versionedDocument = new VersionedClass(id2, "test"); + template.insert(versionedDocument); boolean deleted = template.deleteById(id, Person.class); assertThat(deleted).isTrue(); - Person result = template.findById(id, Person.class); assertThat(result).isNull(); + + boolean deleted2 = template.deleteById(id2, VersionedClass.class); + assertThat(deleted2).isTrue(); + VersionedClass result2 = template.findById(id2, VersionedClass.class); + assertThat(result2).isNull(); } @Test public void deleteById_deletesDocumentWithSetName() { Person document = new Person(id, "QLastName", 21); template.insert(document, OVERRIDE_SET_NAME); + String id2 = nextId(); + VersionedClass versionedDocument = new VersionedClass(id2, "test"); + template.insert(versionedDocument, OVERRIDE_SET_NAME); boolean deleted = template.deleteById(id, OVERRIDE_SET_NAME); assertThat(deleted).isTrue(); - Person result = template.findById(id, Person.class, OVERRIDE_SET_NAME); assertThat(result).isNull(); + + boolean deleted2 = template.deleteById(id2, OVERRIDE_SET_NAME); + assertThat(deleted2).isTrue(); + VersionedClass result2 = template.findById(id2, VersionedClass.class, OVERRIDE_SET_NAME); + assertThat(result2).isNull(); } @Test public void deleteById_returnsFalseIfValueIsAbsent() { assertThat(template.deleteById(id, Person.class)).isFalse(); + + assertThat(template.delete(new Person(id, "QLastName", 21))).isFalse(); + assertThat(template.delete(new VersionedClass(nextId(), "test"))).isFalse(); } @Test @@ -215,7 +241,7 @@ public void deleteByType_NullTypeThrowsException() { } @Test - public void deleteAll_rejectsDuplicateIds() { + public void deleteByIds_rejectsDuplicateIds() { // batch write operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { String id1 = nextId(); @@ -232,7 +258,7 @@ public void deleteAll_rejectsDuplicateIds() { } @Test - public void deleteAll_ShouldDeleteAllDocuments() { + public void deleteByIds_ShouldDeleteAllDocuments() { // batch delete operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { String id1 = nextId(); @@ -257,7 +283,7 @@ public void deleteAll_ShouldDeleteAllDocuments() { } @Test - public void deleteAll_ShouldDeleteAllDocumentsWithSetName() { + public void deleteByIds_ShouldDeleteAllDocumentsWithSetName() { // batch delete operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { String id1 = nextId(); @@ -271,4 +297,80 @@ public void deleteAll_ShouldDeleteAllDocumentsWithSetName() { assertThat(template.findByIds(ids, DocumentWithExpiration.class, OVERRIDE_SET_NAME)).isEmpty(); } } + + @Test + public void deleteAll_rejectsDuplicateIds() { + // batch write operations are supported starting with Server version 6.0+ + if (serverVersionSupport.batchWrite()) { + String id1 = nextId(); + DocumentWithExpiration document1 = new DocumentWithExpiration(id1); + DocumentWithExpiration document2 = new DocumentWithExpiration(id1); + template.save(document1); + template.save(document2); + + assertThatThrownBy(() -> template.deleteAll(List.of(document1, document2))) + .isInstanceOf(AerospikeException.BatchRecordArray.class) + .hasMessageContaining("Errors during batch delete"); + } + } + + @Test + public void deleteAll_ShouldDeleteAllDocuments() { + // batch delete operations are supported starting with Server version 6.0+ + if (serverVersionSupport.batchWrite()) { + String id1 = nextId(); + String id2 = nextId(); + DocumentWithExpiration document1 = new DocumentWithExpiration(id1); + DocumentWithExpiration document2 = new DocumentWithExpiration(id2); + template.save(document1); + template.save(document2); + + template.deleteAll(List.of(document1, document2)); + assertThat(template.findByIds(List.of(id1, id2), DocumentWithExpiration.class)).isEmpty(); + + List persons = additionalAerospikeTestOperations.saveGeneratedPersons(101); + template.deleteAll(persons); + List personsIds = persons.stream().map(Person::getId).toList(); + assertThat(template.findByIds(personsIds, Person.class)).isEmpty(); + + List persons2 = additionalAerospikeTestOperations.saveGeneratedPersons(1001); + template.deleteAll(persons2); + personsIds = persons2.stream().map(Person::getId).toList(); + assertThat(template.findByIds(personsIds, Person.class)).isEmpty(); + } + } + + @Test + public void deleteAll_ShouldDeleteAllDocumentsWithSetName() { + // batch delete operations are supported starting with Server version 6.0+ + if (serverVersionSupport.batchWrite()) { + String id1 = nextId(); + String id2 = nextId(); + DocumentWithExpiration document1 = new DocumentWithExpiration(id1); + DocumentWithExpiration document2 = new DocumentWithExpiration(id2); + template.saveAll(List.of(document1, document2), OVERRIDE_SET_NAME); + + template.deleteAll(List.of(document1, document2), OVERRIDE_SET_NAME); + + assertThat(template.findByIds(List.of(id1, id2), DocumentWithExpiration.class, OVERRIDE_SET_NAME)).isEmpty(); + } + } + + @Test + public void deleteAll_VersionsMismatch() { + // batch delete operations are supported starting with Server version 6.0+ + if (serverVersionSupport.batchWrite()) { + String id1 = "id1"; + VersionedClass document1 = new VersionedClass(id1, "test1"); + String id2 = "id2"; + VersionedClass document2 = new VersionedClass(id2, "test2"); + template.save(document1); + template.save(document2); + + document2.setVersion(232); + assertThatThrownBy(() -> template.deleteAll(List.of(document1, document2))) + .isInstanceOf(OptimisticLockingFailureException.class) + .hasMessageContaining("Failed to delete the record with ID 'id2' due to versions mismatch"); + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateInsertTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateInsertTests.java index 5c6b51754..894ae5fdb 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateInsertTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateInsertTests.java @@ -15,7 +15,6 @@ */ package org.springframework.data.aerospike.core; -import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.Record; import com.aerospike.client.policy.Policy; @@ -23,6 +22,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.dao.DuplicateKeyException; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.aerospike.BaseBlockingIntegrationTests; import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.sample.SampleClasses.CustomCollectionClass; @@ -259,12 +259,11 @@ public void insertAllWithSetName_insertsAllDocuments() { public void insertAll_rejectsDuplicateIds() { // batch write operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { - VersionedClass first = new VersionedClass(id, "foo"); - - assertThatThrownBy(() -> template.insertAll(List.of(first, first))) - .isInstanceOf(AerospikeException.BatchRecordArray.class) - .hasMessageContaining("Errors during batch insert"); - Assertions.assertEquals(1, (long) first.getVersion()); + VersionedClass second = new VersionedClass("as-5440", "foo"); + assertThatThrownBy(() -> template.insertAll(List.of(second, second))) + .isInstanceOf(OptimisticLockingFailureException.class) + .hasMessageContaining("Failed to insert the record with ID 'as-5440' due to versions mismatch"); + Assertions.assertEquals(1, second.getVersion()); } } diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateSaveTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateSaveTests.java index bdfa5278f..560f21ef5 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateSaveTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateSaveTests.java @@ -15,7 +15,6 @@ */ package org.springframework.data.aerospike.core; -import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.Record; import com.aerospike.client.policy.Policy; @@ -23,7 +22,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.springframework.dao.ConcurrencyFailureException; -import org.springframework.dao.DataRetrievalFailureException; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.aerospike.BaseBlockingIntegrationTests; import org.springframework.data.aerospike.sample.Person; @@ -147,8 +145,8 @@ public void shouldSaveDocumentWithEqualVersion() { @Test public void shouldFailSaveNewDocumentWithVersionGreaterThanZero() { - assertThatThrownBy(() -> template.save(new VersionedClass(id, "foo", 5L))) - .isInstanceOf(DataRetrievalFailureException.class); + assertThatThrownBy(() -> template.save(new VersionedClass(nextId(), "foo", 5L))) + .isInstanceOf(OptimisticLockingFailureException.class); } @Test @@ -294,8 +292,8 @@ public void rejectsNullObjectToBeSaved() { @Test public void shouldConcurrentlyUpdateDocumentIfTouchOnReadIsTrue() { - int numberOfConcurrentUpdate = 10; - AsyncUtils.executeConcurrently(numberOfConcurrentUpdate, new Runnable() { + int numberOfConcurrentUpdates = 10; + AsyncUtils.executeConcurrently(numberOfConcurrentUpdates, new Runnable() { @Override public void run() { try { @@ -316,7 +314,7 @@ public void run() { }); DocumentWithTouchOnRead actual = template.findById(id, DocumentWithTouchOnRead.class); - assertThat(actual.getField()).isEqualTo(numberOfConcurrentUpdate); + assertThat(actual.getField()).isEqualTo(numberOfConcurrentUpdates); } @Test @@ -375,15 +373,15 @@ public void shouldSaveAllAndSetVersionWithSetName() { public void shouldSaveAllVersionedDocumentsAndSetVersionAndThrowExceptionIfAlreadyExist() { // batch write operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { - VersionedClass first = new VersionedClass(id, "foo"); - VersionedClass second = new VersionedClass(nextId(), "foo"); + VersionedClass first = new VersionedClass("as-5279", "foo"); + VersionedClass second = new VersionedClass("as-5277", "foo"); assertThat(first.getVersion() == 0).isTrue(); assertThat(second.getVersion() == 0).isTrue(); assertThatThrownBy(() -> template.saveAll(List.of(first, first, second, second))) - .isInstanceOf(AerospikeException.BatchRecordArray.class) - .hasMessageContaining("Errors during batch save"); + .isInstanceOf(OptimisticLockingFailureException.class) + .hasMessageFindingMatch("Failed to save the record with ID .* due to versions mismatch"); assertThat(first.getVersion() == 1).isTrue(); assertThat(second.getVersion() == 1).isTrue(); diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateUpdateTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateUpdateTests.java index 6cceab813..6700f4788 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateUpdateTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateUpdateTests.java @@ -418,8 +418,8 @@ public void updateAllShouldThrowExceptionOnUpdateForNonExistingKey() { // RecordExistsAction.UPDATE_ONLY assertThatThrownBy(() -> template.updateAll(List.of(first, second))) // An attempt to update versioned // documents without already existing DB records results in getting BatchRecordArray exception - .isInstanceOf(AerospikeException.BatchRecordArray.class) - .hasMessageContaining("Errors during batch update"); + .isInstanceOf(OptimisticLockingFailureException.class) + .hasMessageContaining("Failed to update the record with ID 'newId2' due to versions mismatch"); assertThat(first.getVersion() == 2).isTrue(); // This document's version gets updated after it is read // from the corresponding DB record assertThat(second.getVersion() == 0).isTrue(); // This document's version stays equal to zero as there is @@ -428,6 +428,16 @@ public void updateAllShouldThrowExceptionOnUpdateForNonExistingKey() { assertThat(template.findById(first.getId(), VersionedClass.class)).isEqualTo(first); assertThat(template.findById(second.getId(), VersionedClass.class)).isNull(); + Person firstPerson = new Person("newId1", "foo"); + Person secondPerson = new Person("newId2", "bar"); // + template.insert(firstPerson); + // RecordExistsAction.UPDATE_ONLY + assertThatThrownBy(() -> template.updateAll(List.of(firstPerson, secondPerson))) + .isInstanceOf(AerospikeException.BatchRecordArray.class) + .hasMessageContaining("Errors during batch update"); + + assertThat(template.findById(firstPerson.getId(), Person.class)).isEqualTo(firstPerson); + assertThat(template.findById(secondPerson.getId(), Person.class)).isNull(); } } diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java index 765b671a6..092b87dd3 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java @@ -4,6 +4,7 @@ import com.aerospike.client.policy.GenerationPolicy; import com.aerospike.client.policy.WritePolicy; import org.junit.jupiter.api.Test; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.aerospike.BaseReactiveIntegrationTests; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; import org.springframework.data.aerospike.core.model.GroupedKeys; @@ -19,6 +20,7 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.springframework.data.aerospike.query.cache.IndexRefresher.INDEX_CACHE_REFRESH_SECONDS; import static org.springframework.data.aerospike.sample.SampleClasses.VersionedClass; @@ -31,23 +33,6 @@ // this test class does not require secondary indexes created on startup public class ReactiveAerospikeTemplateDeleteRelatedTests extends BaseReactiveIntegrationTests { - @Test - public void deleteByObject_ignoresDocumentVersionEvenIfDefaultGenerationPolicyIsSet() { - WritePolicy writePolicyDefault = reactorClient.getWritePolicyDefault(); - GenerationPolicy initialGenerationPolicy = writePolicyDefault.generationPolicy; - writePolicyDefault.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; - try { - VersionedClass initialDocument = new VersionedClass(id, "a"); - reactiveTemplate.insert(initialDocument).block(); - reactiveTemplate.update(new VersionedClass(id, "b", initialDocument.getVersion())).block(); - - Mono deleted = reactiveTemplate.delete(initialDocument).subscribeOn(Schedulers.parallel()); - StepVerifier.create(deleted).expectNext(true).verifyComplete(); - } finally { - writePolicyDefault.generationPolicy = initialGenerationPolicy; - } - } - @Test public void deleteByObject_ignoresVersionEvenIfDefaultGenerationPolicyIsSet() { WritePolicy writePolicyDefault = reactorClient.getWritePolicyDefault(); @@ -117,13 +102,26 @@ public void simpleDeleteByObjectWithSetName() { StepVerifier.create(result).expectComplete().verify(); } + @Test + public void deleteByObject_VersionsMismatch() { + VersionedClass versionedDocument = new VersionedClass(nextId(), "test"); + + reactiveTemplate.insert(versionedDocument).block(); + versionedDocument.setVersion(2); + assertThatThrownBy(() -> reactiveTemplate.delete(versionedDocument).block()) + .isInstanceOf(OptimisticLockingFailureException.class) + .hasMessage("Failed to delete record due to versions mismatch"); + } + @Test public void deleteById_shouldReturnFalseIfValueIsAbsent() { // when Mono deleted = reactiveTemplate.deleteById(id, Person.class).subscribeOn(Schedulers.parallel()); - // then StepVerifier.create(deleted).expectComplete().verify(); + + assertThat(reactiveTemplate.delete(new Person(id, "QLastName", 21)).block()).isFalse(); + assertThat(reactiveTemplate.delete(new VersionedClass(nextId(), "test")).block()).isFalse(); } @Test @@ -144,11 +142,11 @@ public void deleteByObject_shouldReturnFalseIfValueIsAbsent() { Mono deleted = reactiveTemplate.delete(person).subscribeOn(Schedulers.parallel()); // then - StepVerifier.create(deleted).expectComplete().verify(); + StepVerifier.create(deleted).expectNext(false); } @Test - public void deleteAll_ShouldDeleteAllDocuments() { + public void deleteByIds_ShouldDeleteAllDocuments() { // batch delete operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { String id1 = nextId(); @@ -176,7 +174,7 @@ public void deleteAll_ShouldDeleteAllDocuments() { } @Test - public void deleteAllWithSetName_ShouldDeleteAllDocuments() { + public void deleteByIdsWithSetName_ShouldDeleteAllDocuments() { // batch delete operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { String id1 = nextId(); @@ -195,7 +193,7 @@ public void deleteAllWithSetName_ShouldDeleteAllDocuments() { } @Test - public void deleteAllFromDifferentSets_ShouldDeleteAllDocuments() { + public void deleteByIdsFromDifferentSets_ShouldDeleteAllDocuments() { // batch delete operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { SampleClasses.DocumentWithExpiration entity1_1 = new SampleClasses.DocumentWithExpiration(id); @@ -238,7 +236,7 @@ public void deleteAllFromDifferentSets_ShouldDeleteAllDocuments() { } @Test - public void deleteAll_rejectsDuplicateIds() { + public void deleteByIds_rejectsDuplicateIds() { // batch write operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { String id1 = nextId(); @@ -253,4 +251,85 @@ public void deleteAll_rejectsDuplicateIds() { .verify(); } } + + @Test + public void deleteAll_ShouldDeleteAllDocuments() { + // batch delete operations are supported starting with Server version 6.0+ + if (serverVersionSupport.batchWrite()) { + String id1 = nextId(); + String id2 = nextId(); + SampleClasses.DocumentWithExpiration document1 = new SampleClasses.DocumentWithExpiration(id1); + SampleClasses.DocumentWithExpiration document2 = new SampleClasses.DocumentWithExpiration(id2); + reactiveTemplate.saveAll(List.of(document1, document2)).blockLast(); + + List ids = List.of(id1, id2); + reactiveTemplate.deleteAll(List.of(document1, document2)).block(); + + List list = reactiveTemplate.findByIds(ids, + SampleClasses.VersionedClass.class).subscribeOn(Schedulers.parallel()).collectList().block(); + assertThat(list).isEmpty(); + + List persons = additionalAerospikeTestOperations.saveGeneratedPersons(101); + ids = persons.stream().map(Person::getId).toList(); + reactiveTemplate.deleteAll(persons).block(); + assertThat(reactiveTemplate.findByIds(ids, Person.class).collectList().block()).hasSize(0); + + List persons2 = additionalAerospikeTestOperations.saveGeneratedPersons(1001); + ids = persons2.stream().map(Person::getId).toList(); + reactiveTemplate.deleteAll(persons2).block(); + assertThat(reactiveTemplate.findByIds(ids, Person.class).collectList().block()).hasSize(0); + } + } + + @Test + public void deleteAllWithSetName_ShouldDeleteAllDocuments() { + // batch delete operations are supported starting with Server version 6.0+ + if (serverVersionSupport.batchWrite()) { + String id1 = nextId(); + String id2 = nextId(); + SampleClasses.DocumentWithExpiration document1 = new SampleClasses.DocumentWithExpiration(id1); + SampleClasses.DocumentWithExpiration document2 = new SampleClasses.DocumentWithExpiration(id2); + reactiveTemplate.saveAll(List.of(document1, document2), OVERRIDE_SET_NAME).blockLast(); + + reactiveTemplate.deleteAll(List.of(document1, document2), OVERRIDE_SET_NAME).block(); + List ids = List.of(id1, id2); + List list = reactiveTemplate.findByIds(ids, + SampleClasses.DocumentWithExpiration.class, OVERRIDE_SET_NAME) + .subscribeOn(Schedulers.parallel()).collectList().block(); + assertThat(list).isEmpty(); + } + } + + @Test + public void deleteAll_rejectsDuplicateIds() { + // batch write operations are supported starting with Server version 6.0+ + if (serverVersionSupport.batchWrite()) { + String id1 = nextId(); + SampleClasses.DocumentWithExpiration document1 = new SampleClasses.DocumentWithExpiration(id1); + SampleClasses.DocumentWithExpiration document2 = new SampleClasses.DocumentWithExpiration(id1); + reactiveTemplate.saveAll(List.of(document1, document2)).blockLast(); + + StepVerifier.create(reactiveTemplate.deleteAll(List.of(document1, document2))) + .expectError(AerospikeException.BatchRecordArray.class) + .verify(); + } + } + + @Test + public void deleteAll_VersionsMismatch() { + // batch delete operations are supported starting with Server version 6.0+ + if (serverVersionSupport.batchWrite()) { + String id1 = "id1"; + VersionedClass document1 = new VersionedClass(id1, "test1"); + String id2 = "id2"; + VersionedClass document2 = new VersionedClass(id2, "test2"); + reactiveTemplate.save(document1).block(); + reactiveTemplate.save(document2).block(); + + document2.setVersion(232); + assertThatThrownBy(() -> reactiveTemplate.deleteAll(List.of(document1, document2)).block()) + .isInstanceOf(OptimisticLockingFailureException.class) + .hasMessageContaining("Failed to delete the record with ID 'id2' due to versions mismatch"); + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java index b3d598d41..bc235cc6d 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java @@ -1,10 +1,8 @@ package org.springframework.data.aerospike.core.reactive; -import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.policy.Policy; import org.junit.jupiter.api.Test; -import org.springframework.dao.DataRetrievalFailureException; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.aerospike.BaseReactiveIntegrationTests; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; @@ -75,7 +73,7 @@ public void save_shouldSaveDocumentWithEqualVersion() { public void save_shouldFailSaveNewDocumentWithVersionGreaterThanZero() { StepVerifier.create(reactiveTemplate.save(new VersionedClass(id, "foo", 5L)) .subscribeOn(Schedulers.parallel())) - .expectError(DataRetrievalFailureException.class) + .expectError(OptimisticLockingFailureException.class) .verify(); } @@ -295,9 +293,8 @@ public void saveAll_rejectsDuplicateId() { // batch delete operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { VersionedClass first = new VersionedClass(id, "foo"); - StepVerifier.create(reactiveTemplate.saveAll(List.of(first, first))) - .expectError(AerospikeException.BatchRecordArray.class) + .expectError(OptimisticLockingFailureException.class) .verify(); reactiveTemplate.delete(findById(id, VersionedClass.class)).block(); // cleanup } @@ -307,10 +304,10 @@ public void saveAll_rejectsDuplicateId() { public void saveAllWithSetName_rejectsDuplicateId() { // batch delete operations are supported starting with Server version 6.0+ if (serverVersionSupport.batchWrite()) { - VersionedClass first = new VersionedClass(id, "foo"); + VersionedClass second = new VersionedClass(id, "foo"); - StepVerifier.create(reactiveTemplate.saveAll(List.of(first, first), OVERRIDE_SET_NAME)) - .expectError(AerospikeException.BatchRecordArray.class) + StepVerifier.create(reactiveTemplate.saveAll(List.of(second, second), OVERRIDE_SET_NAME)) + .expectError(OptimisticLockingFailureException.class) .verify(); reactiveTemplate.delete(findById(id, VersionedClass.class, OVERRIDE_SET_NAME), OVERRIDE_SET_NAME) .block(); // cleanup diff --git a/src/test/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepositoryTest.java b/src/test/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepositoryTest.java index 4a985d99c..302df0428 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepositoryTest.java +++ b/src/test/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepositoryTest.java @@ -36,17 +36,14 @@ import org.springframework.data.repository.core.EntityInformation; import org.springframework.test.context.TestPropertySource; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.Optional; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.springframework.data.aerospike.query.cache.IndexRefresher.INDEX_CACHE_REFRESH_SECONDS; @@ -188,24 +185,14 @@ public void deleteAllById() { } @Test - public void deleteAllIterable() throws NoSuchFieldException, IllegalAccessException { - Field field = aerospikeRepository.getClass().getDeclaredField("entityInformation"); - field.setAccessible(true); - EntityInformation entityInformation = mock(EntityInformation.class); - field.set(aerospikeRepository, entityInformation); - when(entityInformation.getId(any(Person.class))).thenReturn(testPerson.getId()); - when(entityInformation.getJavaType()).thenReturn(Person.class); - + public void deleteAllIterable() { aerospikeRepository.deleteAll(List.of(testPerson)); - List ids = List.of(testPerson.getId()); - - verify(operations).deleteByIds(ids, Person.class); + verify(operations).deleteAll(List.of(testPerson)); } @Test public void deleteAll() { aerospikeRepository.deleteAll(); - verify(operations).deleteAll(Person.class); } diff --git a/src/test/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepositoryTest.java b/src/test/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepositoryTest.java index 67c41808e..565036760 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepositoryTest.java +++ b/src/test/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepositoryTest.java @@ -224,18 +224,9 @@ public void testDeleteAllById() throws NoSuchFieldException, IllegalAccessExcept } @Test - public void testDeleteAllIterable() throws NoSuchFieldException, IllegalAccessException { - Field field = repository.getClass().getDeclaredField("entityInformation"); - field.setAccessible(true); - EntityInformation entityInformation = mock(EntityInformation.class); - field.set(repository, entityInformation); - when(entityInformation.getJavaType()).thenReturn(Customer.class); - when(entityInformation.getId(any(Customer.class))).thenReturn(testCustomer.getId()); - + public void testDeleteAllIterable() { repository.deleteAll(List.of(testCustomer)); - - List ids = List.of(testCustomer.getId()); - verify(operations).deleteByIds(ids, Customer.class); + verify(operations).deleteAll(List.of(testCustomer)); } @Test diff --git a/src/test/java/org/springframework/data/aerospike/utility/IndexUtils.java b/src/test/java/org/springframework/data/aerospike/utility/IndexUtils.java index 5ce58cba3..7848d074c 100644 --- a/src/test/java/org/springframework/data/aerospike/utility/IndexUtils.java +++ b/src/test/java/org/springframework/data/aerospike/utility/IndexUtils.java @@ -45,7 +45,8 @@ static void createIndex(IAerospikeClient client, ServerVersionSupport serverVers public static List getIndexes(IAerospikeClient client, String namespace, IndexInfoParser indexInfoParser) { Node node = client.getCluster().getRandomNode(); - String response = Info.request(node, "sindex-list:ns=" + namespace + ";b64=true"); + String response = Info.request(client.getInfoPolicyDefault(), + node, "sindex-list:ns=" + namespace + ";b64=true"); return Arrays.stream(response.split(";")) .map(indexInfoParser::parse) .collect(Collectors.toList()); @@ -57,7 +58,8 @@ public static List getIndexes(IAerospikeClient client, String namespace, */ public static boolean indexExists(IAerospikeClient client, String namespace, String indexName) { Node node = client.getCluster().getRandomNode(); - String response = Info.request(node, "sindex/" + namespace + '/' + indexName); + String response = Info.request(client.getInfoPolicyDefault(), + node, "sindex/" + namespace + '/' + indexName); return !response.startsWith("FAIL:201"); }