From 37bcc71782862066403b4cbf7f77f6169b4e5111 Mon Sep 17 00:00:00 2001 From: Andrey G Date: Sun, 8 Oct 2023 16:58:24 +0300 Subject: [PATCH] FMWK-247 Add support for batch write operations (#640) --- .../core/AerospikeInternalOperations.java | 2 + .../aerospike/core/AerospikeOperations.java | 59 +++++- .../aerospike/core/AerospikeTemplate.java | 180 ++++++++++++------ .../aerospike/core/BaseAerospikeTemplate.java | 138 ++++++++++++++ .../DefaultAerospikeExceptionTranslator.java | 39 ++-- .../core/ReactiveAerospikeOperations.java | 58 +++++- .../core/ReactiveAerospikeTemplate.java | 106 +++++++++-- .../IndexAlreadyExistsException.java | 2 +- .../IndexNotFoundException.java | 4 +- ...dAerospikeDataAccessApiUsageException.java | 2 +- .../QualifierException.java | 2 +- ...erospikePersistenceEntityIndexCreator.java | 2 +- ...erospikePersistenceEntityIndexCreator.java | 2 +- .../data/aerospike/query/ExpiryQualifier.java | 1 + .../query/LatestUpdateQualifier.java | 1 + .../support/SimpleAerospikeRepository.java | 20 +- .../SimpleReactiveAerospikeRepository.java | 2 +- .../BaseBlockingIntegrationTests.java | 2 +- ...activeBlockingAerospikeTestOperations.java | 28 ++- .../core/AerospikeTemplateDeleteTests.java | 38 +++- ...ikeTemplateFindByQueryProjectionTests.java | 14 +- .../AerospikeTemplateFindByQueryTests.java | 21 +- .../core/AerospikeTemplateIndexTests.java | 14 +- .../core/AerospikeTemplateInsertTests.java | 58 ++++-- .../core/AerospikeTemplateSaveTests.java | 85 ++++++++- .../core/AerospikeTemplateUpdateTests.java | 41 ++++ ...faultAerospikeExceptionTranslatorTest.java | 4 +- ...veAerospikeTemplateDeleteRelatedTests.java | 65 +++++++ .../ReactiveAerospikeTemplateIndexTests.java | 18 +- .../ReactiveAerospikeTemplateInsertTests.java | 43 +++-- ...tiveAerospikeTemplateSaveRelatedTests.java | 33 ++++ .../ReactiveAerospikeTemplateUpdateTests.java | 41 ++++ ...pikePersistenceEntityIndexCreatorTest.java | 2 +- ...pikePersistenceEntityIndexCreatorTest.java | 2 +- .../query/IndexedQualifierTests.java | 4 +- ...tiveIndexedPersonRepositoryQueryTests.java | 12 +- .../IndexedPersonRepositoryQueryTests.java | 6 +- .../PersonRepositoryQueryTests.java | 61 +++--- .../SimpleAerospikeRepositoryTest.java | 9 +- ...SimpleReactiveAerospikeRepositoryTest.java | 6 +- .../AdditionalAerospikeTestOperations.java | 24 +++ .../data/aerospike/utility/IndexUtils.java | 36 +--- .../aerospike/utility/ServerVersionUtils.java | 40 ++++ 43 files changed, 1045 insertions(+), 282 deletions(-) rename src/main/java/org/springframework/data/aerospike/{ => exceptions}/IndexAlreadyExistsException.java (95%) rename src/main/java/org/springframework/data/aerospike/{ => exceptions}/IndexNotFoundException.java (93%) rename src/main/java/org/springframework/data/aerospike/{ => exceptions}/InvalidAerospikeDataAccessApiUsageException.java (95%) rename src/main/java/org/springframework/data/aerospike/{query => exceptions}/QualifierException.java (94%) create mode 100644 src/test/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java diff --git a/src/main/java/org/springframework/data/aerospike/core/AerospikeInternalOperations.java b/src/main/java/org/springframework/data/aerospike/core/AerospikeInternalOperations.java index 47868ab2f..1d14e9d75 100644 --- a/src/main/java/org/springframework/data/aerospike/core/AerospikeInternalOperations.java +++ b/src/main/java/org/springframework/data/aerospike/core/AerospikeInternalOperations.java @@ -1,5 +1,6 @@ package org.springframework.data.aerospike.core; +import com.aerospike.client.AerospikeException; import org.springframework.data.aerospike.query.Qualifier; import java.util.Collection; @@ -42,6 +43,7 @@ List findByIdsInternal(Collection ids, Class entityClass, Class< * * @param ids The ids of the documents to delete. Must not be {@literal null}. * @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}. + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors or null records */ void deleteByIdsInternal(Collection ids, Class entityClass); } diff --git a/src/main/java/org/springframework/data/aerospike/core/AerospikeOperations.java b/src/main/java/org/springframework/data/aerospike/core/AerospikeOperations.java index 120c9cd42..58dca39ef 100644 --- a/src/main/java/org/springframework/data/aerospike/core/AerospikeOperations.java +++ b/src/main/java/org/springframework/data/aerospike/core/AerospikeOperations.java @@ -15,6 +15,7 @@ */ package org.springframework.data.aerospike.core; +import com.aerospike.client.AerospikeException; import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Value; import com.aerospike.client.cdt.CTX; @@ -73,6 +74,21 @@ public interface AerospikeOperations { */ void insert(T document); + /** + * Insert multiple documents in one batch request. The policies are analogous to {@link #insert(Object)}. + *

+ * The order of returned results is preserved. The execution order is NOT preserved. + *

+ * This operation requires Server version 6.0+. + * + * @param documents Documents to insert. Must not be {@literal null}. + * @throws AerospikeException.BatchRecordArray if batch insert succeeds, but results contain errors or null + * records + * @throws org.springframework.dao.DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details) + */ + void insertAll(Iterable documents); + /** * Save a document. *

@@ -93,6 +109,21 @@ public interface AerospikeOperations { */ void save(T document); + /** + * Save multiple documents in one batch request. The policies are analogous to {@link #save(Object)}. + *

+ * The order of returned results is preserved. The execution order is NOT preserved. + *

+ * This operation requires Server version 6.0+. + * + * @param documents Documents to save. Must not be {@literal null}. + * @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null + * records + * @throws org.springframework.dao.DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details) + */ + void saveAll(Iterable documents); + /** * Persist a document using specified WritePolicy. * @@ -102,13 +133,6 @@ public interface AerospikeOperations { */ void persist(T document, WritePolicy writePolicy); - /** - * Insert each document of the given documents using single insert operations. - * - * @param documents The documents to insert. Must not be {@literal null}. - */ - void insertAll(Collection documents); - /** * Update a document using {@link com.aerospike.client.policy.RecordExistsAction#UPDATE_ONLY} policy combined with * removing bins at first (analogous to {@link com.aerospike.client.policy.RecordExistsAction#REPLACE_ONLY}) taking @@ -132,6 +156,21 @@ public interface AerospikeOperations { */ void update(T document, Collection fields); + /** + * Update multiple documents in one batch request. The policies are analogous to {@link #update(Object)}. + *

+ * The order of returned results is preserved. The execution order is NOT preserved. + *

+ * This operation requires Server version 6.0+. + * + * @param documents Documents to update. Must not be {@literal null}. + * @throws AerospikeException.BatchRecordArray if batch update succeeds, but results contain errors or null + * records + * @throws org.springframework.dao.DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details) + */ + void updateAll(Iterable documents); + /** * Truncate/Delete all the documents in the given entity's set. * @@ -165,6 +204,9 @@ public interface AerospikeOperations { * @param ids The ids of the documents to delete. Must not be {@literal null}. * @param entityClass The class to extract the Aerospike set from and to map the documents to. Must not be * {@literal null}. + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors + * @throws org.springframework.dao.DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details) */ void deleteByIds(Iterable ids, Class entityClass); @@ -178,6 +220,9 @@ public interface AerospikeOperations { * This operation requires Server version 6.0+. * * @param groupedKeys Must not be {@literal null}. + * @throws AerospikeException.BatchRecordArray if batch delete results contain errors + * @throws org.springframework.dao.DataAccessException if batch operation failed (see + * {@link DefaultAerospikeExceptionTranslator} for details) */ void deleteByIds(GroupedKeys groupedKeys); diff --git a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java index fb8f7806d..f99040400 100644 --- a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java @@ -15,15 +15,8 @@ */ package org.springframework.data.aerospike.core; -import com.aerospike.client.AerospikeException; -import com.aerospike.client.Bin; -import com.aerospike.client.IAerospikeClient; -import com.aerospike.client.Info; -import com.aerospike.client.Key; -import com.aerospike.client.Operation; import com.aerospike.client.Record; -import com.aerospike.client.ResultCode; -import com.aerospike.client.Value; +import com.aerospike.client.*; import com.aerospike.client.cdt.CTX; import com.aerospike.client.cluster.Node; import com.aerospike.client.policy.BatchPolicy; @@ -66,7 +59,6 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Matcher; @@ -237,6 +229,41 @@ public void save(T document) { } } + @Override + public void saveAll(Iterable documents) { + Assert.notNull(documents, "Documents for saving must not be null!"); + + List> batchWriteDataList = new ArrayList<>(); + documents.forEach(document -> batchWriteDataList.add(getBatchWriteForSave(document))); + + List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList(); + try { + client.operate(null, batchWriteRecords); + } catch (AerospikeException e) { + throw translateError(e); + } + + checkForErrorsAndUpdateVersion(batchWriteDataList, batchWriteRecords, "save"); + } + + private void checkForErrorsAndUpdateVersion(List> batchWriteDataList, + List batchWriteRecords, String commandName) { + boolean errorsFound = false; + for (AerospikeTemplate.BatchWriteData data : batchWriteDataList) { + if (!errorsFound && batchRecordFailed(data.batchRecord())) { + errorsFound = true; + } + if (data.hasVersionProperty() && !batchRecordFailed(data.batchRecord())) { + updateVersion(data.document(), data.batchRecord().record); + } + } + + if (errorsFound) { + AerospikeException e = new AerospikeException("Errors during batch " + commandName); + throw new AerospikeException.BatchRecordArray(batchWriteRecords.toArray(BatchRecord[]::new), e); + } + } + @Override public void persist(T document, WritePolicy policy) { Assert.notNull(document, "Document must not be null!"); @@ -248,13 +275,6 @@ public void persist(T document, WritePolicy policy) { doPersistAndHandleError(data, policy, operations); } - @Override - public void insertAll(Collection documents) { - Assert.notNull(documents, "Documents must not be null!"); - - documents.stream().filter(Objects::nonNull).forEach(this::insert); - } - @Override public void insert(T document) { Assert.notNull(document, "Document must not be null!"); @@ -276,6 +296,23 @@ public void insert(T document) { } } + @Override + public void insertAll(Iterable documents) { + Assert.notNull(documents, "Documents for inserting must not be null!"); + + List> batchWriteDataList = new ArrayList<>(); + documents.forEach(document -> batchWriteDataList.add(getBatchWriteForInsert(document))); + + List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList(); + try { + client.operate(null, batchWriteRecords); + } catch (AerospikeException e) { + throw translateError(e); + } + + checkForErrorsAndUpdateVersion(batchWriteDataList, batchWriteRecords, "insert"); + } + @Override public void update(T document) { Assert.notNull(document, "Document must not be null!"); @@ -315,6 +352,23 @@ public void update(T document, Collection fields) { } } + @Override + public void updateAll(Iterable documents) { + Assert.notNull(documents, "Documents for inserting must not be null!"); + + List> batchWriteDataList = new ArrayList<>(); + documents.forEach(document -> batchWriteDataList.add(getBatchWriteForUpdate(document))); + + List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList(); + try { + client.operate(null, batchWriteRecords); + } catch (AerospikeException e) { + throw translateError(e); + } + + checkForErrorsAndUpdateVersion(batchWriteDataList, batchWriteRecords, "update"); + } + @Override public void delete(Class entityClass) { Assert.notNull(entityClass, "Class must not be null!"); @@ -363,6 +417,53 @@ public void deleteByIds(Iterable ids, Class entityClass) { deleteByIdsInternal(IterableConverter.toList(ids), entityClass); } + @Override + public void deleteByIdsInternal(Collection ids, Class entityClass) { + if (ids.isEmpty()) { + return; + } + + AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(entityClass); + + Key[] keys = ids.stream() + .map(id -> getKey(id, entity)) + .toArray(Key[]::new); + + checkForErrors(client, keys); + } + + private void checkForErrors(IAerospikeClient client, Key[] keys) { + BatchResults results; + try { + // requires server ver. >= 6.0.0 + results = client.delete(null, null, keys); + } catch (AerospikeException e) { + throw translateError(e); + } + + for (int i = 0; i < results.records.length; i++) { + BatchRecord record = results.records[i]; + if (batchRecordFailed(record)) { + throw new AerospikeException.BatchRecordArray(results.records, + new AerospikeException("Errors during batch delete")); + } + } + } + + @Override + public void deleteByIds(GroupedKeys groupedKeys) { + Assert.notNull(groupedKeys, "Grouped keys must not be null!"); + Assert.notNull(groupedKeys.getEntitiesKeys(), "Entities keys must not be null!"); + Assert.notEmpty(groupedKeys.getEntitiesKeys(), "Entities keys must not be empty!"); + + deleteEntitiesByIdsInternal(groupedKeys); + } + + private void deleteEntitiesByIdsInternal(GroupedKeys groupedKeys) { + EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys)); + checkForErrors(client, entitiesKeys.getKeys()); + } + @Override public boolean exists(Object id, Class entityClass) { Assert.notNull(id, "Id must not be null!"); @@ -471,26 +572,6 @@ public List findByIdsInternal(Collection ids, Class entityClass, } } - @Override - public void deleteByIdsInternal(Collection ids, Class entityClass) { - if (ids.isEmpty()) { - return; - } - - try { - AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(entityClass); - - Key[] keys = ids.stream() - .map(id -> getKey(id, entity)) - .toArray(Key[]::new); - - // requires server ver. >= 6.0.0 - client.delete(null, null, keys); - } catch (AerospikeException e) { - throw translateError(e); - } - } - Object getRecordMapToTargetClass(AerospikePersistentEntity entity, Key key, Class targetClass, Qualifier... qualifiers) { Record aeroRecord; @@ -603,22 +684,6 @@ private GroupedEntities findEntitiesByIdsInternal(GroupedKeys groupedKeys) { return toGroupedEntities(entitiesKeys, aeroRecords); } - @Override - public void deleteByIds(GroupedKeys groupedKeys) { - Assert.notNull(groupedKeys, "Grouped keys must not be null!"); - - if (groupedKeys.getEntitiesKeys() == null || groupedKeys.getEntitiesKeys().isEmpty()) { - return; - } - - deleteEntitiesByIdsInternal(groupedKeys); - } - - private void deleteEntitiesByIdsInternal(GroupedKeys groupedKeys) { - EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys)); - client.delete(null, null, entitiesKeys.getKeys()); - } - @Override public ResultSet aggregate(Filter filter, Class entityClass, String module, String function, List arguments) { @@ -887,16 +952,7 @@ private void doPersistWithVersionAndHandleError(T document, AerospikeWriteDa private Record putAndGetHeader(AerospikeWriteData data, WritePolicy policy, boolean firstlyDeleteBins) { Key key = data.getKey(); - Bin[] bins = data.getBinsAsArray(); - - if (bins.length == 0) { - throw new AerospikeException( - "Cannot put and get header on a document with no bins and \"@_class\" bin disabled."); - } - - Operation[] operations = firstlyDeleteBins ? operations(bins, Operation::put, - Operation.array(Operation.delete()), Operation.array(Operation.getHeader())) - : operations(bins, Operation::put, null, Operation.array(Operation.getHeader())); + Operation[] operations = getPutAndGetHeaderOperations(data, firstlyDeleteBins); return client.operate(policy, key, operations); } diff --git a/src/main/java/org/springframework/data/aerospike/core/BaseAerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/BaseAerospikeTemplate.java index 508b2014a..4d555a6da 100644 --- a/src/main/java/org/springframework/data/aerospike/core/BaseAerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/BaseAerospikeTemplate.java @@ -16,10 +16,15 @@ package org.springframework.data.aerospike.core; import com.aerospike.client.AerospikeException; +import com.aerospike.client.BatchRecord; +import com.aerospike.client.BatchWrite; +import com.aerospike.client.Bin; import com.aerospike.client.Key; import com.aerospike.client.Log; +import com.aerospike.client.Operation; import com.aerospike.client.Record; import com.aerospike.client.ResultCode; +import com.aerospike.client.policy.BatchWritePolicy; import com.aerospike.client.policy.GenerationPolicy; import com.aerospike.client.policy.RecordExistsAction; import com.aerospike.client.policy.WritePolicy; @@ -54,6 +59,9 @@ import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.springframework.data.aerospike.core.CoreUtils.operations; /** * Base class for creation Aerospike templates @@ -69,6 +77,7 @@ abstract class BaseAerospikeTemplate { protected final String namespace; protected final AerospikeExceptionTranslator exceptionTranslator; protected final WritePolicy writePolicyDefault; + protected final BatchWritePolicy batchWritePolicyDefault; BaseAerospikeTemplate(String namespace, MappingAerospikeConverter converter, @@ -84,10 +93,23 @@ abstract class BaseAerospikeTemplate { this.namespace = namespace; this.mappingContext = mappingContext; this.writePolicyDefault = writePolicyDefault; + this.batchWritePolicyDefault = getFromWritePolicy(writePolicyDefault); loggerSetup(); } + private BatchWritePolicy getFromWritePolicy(WritePolicy writePolicy) { + BatchWritePolicy batchWritePolicy = new BatchWritePolicy(); + batchWritePolicy.commitLevel = writePolicy.commitLevel; + batchWritePolicy.durableDelete = writePolicy.durableDelete; + batchWritePolicy.generationPolicy = writePolicy.generationPolicy; + batchWritePolicy.expiration = writePolicy.expiration; + batchWritePolicy.sendKey = writePolicy.sendKey; + batchWritePolicy.recordExistsAction = writePolicy.recordExistsAction; + batchWritePolicy.filterExp = writePolicy.filterExp; + return batchWritePolicy; + } + private void loggerSetup() { Logger log = LoggerFactory.getLogger("com.aerospike.client"); Log.setCallback((level, message) -> { @@ -195,6 +217,16 @@ WritePolicy expectGenerationCasAwareSavePolicy(AerospikeWriteData data) { return expectGenerationSavePolicy(data, recordExistsAction); } + BatchWritePolicy expectGenerationCasAwareSaveBatchPolicy(AerospikeWriteData data) { + RecordExistsAction recordExistsAction = data.getVersion() + .filter(v -> v > 0L) + .map(v -> RecordExistsAction.UPDATE_ONLY) // updating existing document with generation, + // cannot use REPLACE_ONLY due to bin convergence feature restrictions + .orElse(RecordExistsAction.CREATE_ONLY); // create new document, + // if exists we should fail with optimistic locking + return expectGenerationSaveBatchPolicy(data, recordExistsAction); + } + WritePolicy expectGenerationSavePolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { return WritePolicyBuilder.builder(this.writePolicyDefault) .generationPolicy(GenerationPolicy.EXPECT_GEN_EQUAL) @@ -204,6 +236,15 @@ WritePolicy expectGenerationSavePolicy(AerospikeWriteData data, RecordExistsActi .build(); } + BatchWritePolicy expectGenerationSaveBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { + BatchWritePolicy batchWritePolicy = new BatchWritePolicy(this.batchWritePolicyDefault); + batchWritePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; + batchWritePolicy.generation = data.getVersion().orElse(0); + batchWritePolicy.expiration = data.getExpiration(); + batchWritePolicy.recordExistsAction = recordExistsAction; + return batchWritePolicy; + } + WritePolicy ignoreGenerationSavePolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { return WritePolicyBuilder.builder(this.writePolicyDefault) .generationPolicy(GenerationPolicy.NONE) @@ -212,6 +253,14 @@ WritePolicy ignoreGenerationSavePolicy(AerospikeWriteData data, RecordExistsActi .build(); } + BatchWritePolicy ignoreGenerationSaveBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) { + BatchWritePolicy batchWritePolicy = new BatchWritePolicy(this.batchWritePolicyDefault); + batchWritePolicy.generationPolicy = GenerationPolicy.NONE; + batchWritePolicy.expiration = data.getExpiration(); + batchWritePolicy.recordExistsAction = recordExistsAction; + return batchWritePolicy; + } + WritePolicy ignoreGenerationDeletePolicy() { return WritePolicyBuilder.builder(this.writePolicyDefault) .generationPolicy(GenerationPolicy.NONE) @@ -287,4 +336,93 @@ private S convertIfNecessary(Object source, Class type) { return type.isAssignableFrom(source.getClass()) ? (S) source : converter.getConversionService().convert(source, type); } + + protected record BatchWriteData(T document, BatchRecord batchRecord, boolean hasVersionProperty) { + + } + + protected Operation[] getPutAndGetHeaderOperations(AerospikeWriteData data, boolean firstlyDeleteBins) { + Bin[] bins = data.getBinsAsArray(); + + if (bins.length == 0) { + throw new AerospikeException( + "Cannot put and get header on a document with no bins and \"@_class\" bin disabled."); + } + + return firstlyDeleteBins ? operations(bins, Operation::put, + Operation.array(Operation.delete()), Operation.array(Operation.getHeader())) + : operations(bins, Operation::put, null, Operation.array(Operation.getHeader())); + } + + public BatchWriteData getBatchWriteForSave(T document) { + Assert.notNull(document, "Document must not be null!"); + + AerospikeWriteData data = writeData(document); + + AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); + Operation[] operations; + BatchWritePolicy policy; + if (entity.hasVersionProperty()) { + policy = expectGenerationCasAwareSaveBatchPolicy(data); + + // mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions + operations = getPutAndGetHeaderOperations(data, true); + } else { + policy = ignoreGenerationSaveBatchPolicy(data, RecordExistsAction.UPDATE); + + // mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions + operations = operations(data.getBinsAsArray(), Operation::put, + Operation.array(Operation.delete())); + } + + return new BatchWriteData<>(document, new BatchWrite(policy, data.getKey(), operations), + entity.hasVersionProperty()); + } + + public BatchWriteData getBatchWriteForInsert(T document) { + Assert.notNull(document, "Document must not be null!"); + + AerospikeWriteData data = writeData(document); + + AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); + Operation[] operations; + BatchWritePolicy policy = ignoreGenerationSaveBatchPolicy(data, RecordExistsAction.CREATE_ONLY); + if (entity.hasVersionProperty()) { + operations = getPutAndGetHeaderOperations(data, false); + } else { + operations = operations(data.getBinsAsArray(), Operation::put); + } + + return new BatchWriteData(document, new BatchWrite(policy, data.getKey(), operations), + entity.hasVersionProperty()); + } + + public BatchWriteData getBatchWriteForUpdate(T document) { + Assert.notNull(document, "Document must not be null!"); + + AerospikeWriteData data = writeData(document); + + AerospikePersistentEntity entity = mappingContext.getRequiredPersistentEntity(document.getClass()); + Operation[] operations; + BatchWritePolicy policy; + if (entity.hasVersionProperty()) { + policy = expectGenerationSaveBatchPolicy(data, RecordExistsAction.UPDATE_ONLY); + + // mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions + operations = getPutAndGetHeaderOperations(data, true); + } else { + policy = ignoreGenerationSaveBatchPolicy(data, RecordExistsAction.UPDATE_ONLY); + + // mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions + operations = Stream.concat(Stream.of(Operation.delete()), data.getBins().stream() + .map(Operation::put)).toArray(Operation[]::new); + } + + return new BatchWriteData(document, new BatchWrite(policy, data.getKey(), operations), + entity.hasVersionProperty()); + } + + protected boolean batchRecordFailed(BatchRecord batchRecord) { + return batchRecord.resultCode != ResultCode.OK || batchRecord.record == null; + } } diff --git a/src/main/java/org/springframework/data/aerospike/core/DefaultAerospikeExceptionTranslator.java b/src/main/java/org/springframework/data/aerospike/core/DefaultAerospikeExceptionTranslator.java index b2ed46e90..ad8f31125 100644 --- a/src/main/java/org/springframework/data/aerospike/core/DefaultAerospikeExceptionTranslator.java +++ b/src/main/java/org/springframework/data/aerospike/core/DefaultAerospikeExceptionTranslator.java @@ -24,8 +24,8 @@ import org.springframework.dao.QueryTimeoutException; import org.springframework.dao.RecoverableDataAccessException; import org.springframework.dao.TransientDataAccessResourceException; -import org.springframework.data.aerospike.IndexAlreadyExistsException; -import org.springframework.data.aerospike.IndexNotFoundException; +import org.springframework.data.aerospike.exceptions.IndexAlreadyExistsException; +import org.springframework.data.aerospike.exceptions.IndexNotFoundException; /** * This class translates the AerospikeException and result code to a DataAccessException. @@ -48,30 +48,17 @@ public DataAccessException translateExceptionIfPossible(RuntimeException cause) return new QueryTimeoutException(msg, cause); } } - switch (resultCode) { - /* - * Future enhancements will be more elaborate - */ - case ResultCode.PARAMETER_ERROR: - return new InvalidDataAccessApiUsageException(msg, cause); - case ResultCode.KEY_EXISTS_ERROR: - return new DuplicateKeyException(msg, cause); - case ResultCode.KEY_NOT_FOUND_ERROR: - return new DataRetrievalFailureException(msg, cause); - case ResultCode.INDEX_NOTFOUND: - return new IndexNotFoundException(msg, cause); - case ResultCode.INDEX_ALREADY_EXISTS: - return new IndexAlreadyExistsException(msg, cause); - case ResultCode.TIMEOUT: - case ResultCode.QUERY_TIMEOUT: - return new QueryTimeoutException(msg, cause); - case ResultCode.DEVICE_OVERLOAD: - case ResultCode.NO_MORE_CONNECTIONS: - case ResultCode.KEY_BUSY: - return new TransientDataAccessResourceException(msg, cause); - default: - return new RecoverableDataAccessException(msg, cause); - } + return switch (resultCode) { + case ResultCode.PARAMETER_ERROR -> new InvalidDataAccessApiUsageException(msg, cause); + case ResultCode.KEY_EXISTS_ERROR -> new DuplicateKeyException(msg, cause); + case ResultCode.KEY_NOT_FOUND_ERROR -> new DataRetrievalFailureException(msg, cause); + case ResultCode.INDEX_NOTFOUND -> new IndexNotFoundException(msg, cause); + case ResultCode.INDEX_ALREADY_EXISTS -> new IndexAlreadyExistsException(msg, cause); + case ResultCode.TIMEOUT, ResultCode.QUERY_TIMEOUT -> new QueryTimeoutException(msg, cause); + case ResultCode.DEVICE_OVERLOAD, ResultCode.NO_MORE_CONNECTIONS, ResultCode.KEY_BUSY -> + new TransientDataAccessResourceException(msg, cause); + default -> new RecoverableDataAccessException(msg, cause); + }; } // we should not convert exceptions that spring-data-aerospike does not recognise. return null; diff --git a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeOperations.java b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeOperations.java index a2cdec23a..3b69c2901 100644 --- a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeOperations.java +++ b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeOperations.java @@ -15,6 +15,7 @@ */ package org.springframework.data.aerospike.core; +import com.aerospike.client.AerospikeException; import com.aerospike.client.cdt.CTX; import com.aerospike.client.query.IndexCollectionType; import com.aerospike.client.query.IndexType; @@ -55,14 +56,14 @@ public interface ReactiveAerospikeOperations { * deciding whether to create new record or update existing. If version is set to zero - new record will be created, * creation will fail is such record already exists. If version is greater than zero - existing record will be * updated with {@link com.aerospike.client.policy.RecordExistsAction#UPDATE_ONLY} policy combined with removing - * bins at first (analogous to {@link com.aerospike.client.policy.RecordExistsAction#REPLACE_ONLY}) taking - * into consideration the version property of the document. Version property will be updated with the server's - * version after successful operation. + * bins at first (analogous to {@link com.aerospike.client.policy.RecordExistsAction#REPLACE_ONLY}) taking into + * consideration the version property of the document. Version property will be updated with the server's version + * after successful operation. *

* If document does not have version property - record is updated with * {@link com.aerospike.client.policy.RecordExistsAction#UPDATE} policy combined with removing bins at first - * (analogous to {@link com.aerospike.client.policy.RecordExistsAction#REPLACE}). This means that when such - * record does not exist it will be created, otherwise updated - an "upsert". + * (analogous to {@link com.aerospike.client.policy.RecordExistsAction#REPLACE}). This means that when such record + * does not exist it will be created, otherwise updated - an "upsert". * * @param document The document to save. Must not be {@literal null}. * @return A Mono of the new saved document. @@ -70,12 +71,18 @@ public interface ReactiveAerospikeOperations { Mono save(T document); /** - * Reactively insert each document of the given documents using single insert operations. + * Reactively save documents using one batch request. The policies are analogous to {@link #save(Object)}. + *

+ * The order of returned results is preserved. The execution order is NOT preserved. + *

+ * Requires Server version 6.0+. * - * @param documents The documents to insert. Must not be {@literal null}. - * @return A Flux of the new inserted documents. + * @param documents documents to save. Must not be {@literal null}. + * @return A Flux of the saved documents, otherwise onError is signalled with + * {@link AerospikeException.BatchRecordArray} if batch save results contain errors, or with + * {@link org.springframework.dao.DataAccessException} if batch operation failed. */ - Flux insertAll(Collection documents); + Flux saveAll(Iterable documents); /** * Reactively insert document using {@link com.aerospike.client.policy.RecordExistsAction#CREATE_ONLY} policy. @@ -87,6 +94,20 @@ public interface ReactiveAerospikeOperations { */ Mono insert(T document); + /** + * Reactively insert documents using one batch request. The policies are analogous to {@link #insert(Object)}. + *

+ * The order of returned results is preserved. The execution order is NOT preserved. + *

+ * Requires Server version 6.0+. + * + * @param documents Documents to insert. Must not be {@literal null}. + * @return A Flux of the inserted documents, otherwise onError is signalled with + * {@link AerospikeException.BatchRecordArray} if batch insert results contain errors, or with + * {@link org.springframework.dao.DataAccessException} if batch operation failed. + */ + Flux insertAll(Iterable documents); + /** * Reactively update document using {@link com.aerospike.client.policy.RecordExistsAction#UPDATE_ONLY} policy * combined with removing bins at first (analogous to @@ -113,6 +134,20 @@ public interface ReactiveAerospikeOperations { */ Mono update(T document, Collection fields); + /** + * Reactively update documents using one batch request. The policies are analogous to {@link #update(Object)}. + *

+ * The order of returned results is preserved. The execution order is NOT preserved. + *

+ * Requires Server version 6.0+. + * + * @param documents Documents to update. Must not be {@literal null}. + * @return A Flux of the updated documents, otherwise onError is signalled with + * {@link AerospikeException.BatchRecordArray} if batch update results contain errors, or with + * {@link org.springframework.dao.DataAccessException} if batch operation failed. + */ + Flux updateAll(Iterable documents); + /** * Reactively add integer/double bin values to existing document bin values, read the new modified document and map * it back the given document class type. @@ -411,6 +446,8 @@ public interface ReactiveAerospikeOperations { * @param ids The ids of the documents to find. Must not be {@literal null}. * @param entityClass The class to extract the Aerospike set from and to map the documents to. Must not be * {@literal null}. + * @return onError is signalled with {@link AerospikeException.BatchRecordArray} if batch delete results contain + * errors, or with {@link org.springframework.dao.DataAccessException} if batch operation failed. */ Mono deleteByIds(Iterable ids, Class entityClass); @@ -424,7 +461,8 @@ public interface ReactiveAerospikeOperations { * This operation requires Server version 6.0+. * * @param groupedKeys Must not be {@literal null}. - * @return + * @return onError is signalled with {@link AerospikeException.BatchRecordArray} if batch delete results contain + * errors, or with {@link org.springframework.dao.DataAccessException} if batch operation failed. */ Mono deleteByIds(GroupedKeys groupedKeys); diff --git a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java index 5ed2ecf44..e4ef0c107 100644 --- a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java @@ -15,14 +15,8 @@ */ package org.springframework.data.aerospike.core; -import com.aerospike.client.AerospikeException; -import com.aerospike.client.Bin; -import com.aerospike.client.Info; -import com.aerospike.client.Key; -import com.aerospike.client.Operation; import com.aerospike.client.Record; -import com.aerospike.client.ResultCode; -import com.aerospike.client.Value; +import com.aerospike.client.*; import com.aerospike.client.cdt.CTX; import com.aerospike.client.cluster.Node; import com.aerospike.client.policy.BatchPolicy; @@ -61,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Matcher; @@ -125,9 +120,47 @@ public Mono save(T document) { } @Override - public Flux insertAll(Collection documents) { - return Flux.fromIterable(documents) - .flatMap(this::insert); + public Flux saveAll(Iterable documents) { + Assert.notNull(documents, "Documents for saving must not be null!"); + + List> batchWriteDataList = new ArrayList<>(); + documents.forEach(document -> batchWriteDataList.add(getBatchWriteForSave(document))); + + List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList(); + + return batchWriteAndCheckForErrors(batchWriteRecords, batchWriteDataList, "save"); + } + + private Flux batchWriteAndCheckForErrors(List batchWriteRecords, + List> batchWriteDataList, String commandName) { + // requires server ver. >= 6.0.0 + return reactorClient.operate(null, batchWriteRecords) + .onErrorMap(this::translateError) + .flatMap(ignore -> checkForErrorsAndUpdateVersion(batchWriteDataList, batchWriteRecords, commandName)) + .flux() + .flatMapIterable(list -> list.stream().map(BatchWriteData::document).toList()); + } + + private Mono>> checkForErrorsAndUpdateVersion(List> batchWriteDataList, + List batchWriteRecords, + String commandName) { + boolean errorsFound = false; + for (AerospikeTemplate.BatchWriteData data : batchWriteDataList) { + if (!errorsFound && batchRecordFailed(data.batchRecord())) { + errorsFound = true; + } + if (data.hasVersionProperty() && !batchRecordFailed(data.batchRecord())) { + updateVersion(data.document(), data.batchRecord().record); + } + } + + if (errorsFound) { + AerospikeException e = new AerospikeException("Errors during batch " + commandName); + return Mono.error( + new AerospikeException.BatchRecordArray(batchWriteRecords.toArray(BatchRecord[]::new), e)); + } + + return Mono.just(batchWriteDataList); } @Override @@ -154,6 +187,18 @@ public Mono insert(T document) { } } + @Override + public Flux insertAll(Iterable documents) { + Assert.notNull(documents, "Documents for insert must not be null!"); + + List> batchWriteDataList = new ArrayList<>(); + documents.forEach(document -> batchWriteDataList.add(getBatchWriteForInsert(document))); + + List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList(); + + return batchWriteAndCheckForErrors(batchWriteRecords, batchWriteDataList, "insert"); + } + @Override public Mono update(T document) { Assert.notNull(document, "Document must not be null!"); @@ -197,6 +242,18 @@ public Mono update(T document, Collection fields) { } } + @Override + public Flux updateAll(Iterable documents) { + Assert.notNull(documents, "Documents for saving must not be null!"); + + List> batchWriteDataList = new ArrayList<>(); + documents.forEach(document -> batchWriteDataList.add(getBatchWriteForUpdate(document))); + + List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList(); + + return batchWriteAndCheckForErrors(batchWriteRecords, batchWriteDataList, "update"); + } + @SuppressWarnings("unchecked") @Override public Flux findAll(Class entityClass) { @@ -634,7 +691,7 @@ public Mono delete(T document) { AerospikeWriteData data = writeData(document); - return this.reactorClient + return reactorClient .delete(ignoreGenerationDeletePolicy(), data.getKey()) .map(key -> true) .onErrorMap(this::translateError); @@ -651,16 +708,31 @@ public Mono deleteByIds(Iterable ids, Class entityClass) { .map(id -> getKey(id, entity)) .toArray(Key[]::new); - return reactorClient.delete(null, null, keys).then(); + return batchDeleteAndCheckForErrors(reactorClient, keys); + } + + private Mono batchDeleteAndCheckForErrors(IAerospikeReactorClient reactorClient, Key[] keys) { + Function> checkForErrors = results -> { + for (BatchRecord record : results.records) { + if (batchRecordFailed(record)) { + return Mono.error(new AerospikeException.BatchRecordArray(results.records, + new AerospikeException("Errors during batch delete"))); + } + } + return Mono.empty(); + }; + + // requires server ver. >= 6.0.0 + return reactorClient.delete(null, null, keys) + .onErrorMap(this::translateError) + .flatMap(checkForErrors); } @Override public Mono deleteByIds(GroupedKeys groupedKeys) { Assert.notNull(groupedKeys, "Grouped keys must not be null!"); - - if (groupedKeys.getEntitiesKeys() == null || groupedKeys.getEntitiesKeys().isEmpty()) { - return Mono.empty(); - } + Assert.notNull(groupedKeys.getEntitiesKeys(), "Entities keys must not be null!"); + Assert.notEmpty(groupedKeys.getEntitiesKeys(), "Entities keys must not be empty!"); return deleteEntitiesByIdsInternal(groupedKeys); } @@ -671,7 +743,7 @@ private Mono deleteEntitiesByIdsInternal(GroupedKeys groupedKeys) { reactorClient.delete(null, null, entitiesKeys.getKeys()) .doOnError(this::translateError); - return Mono.empty(); + return batchDeleteAndCheckForErrors(reactorClient, entitiesKeys.getKeys()); } @Override diff --git a/src/main/java/org/springframework/data/aerospike/IndexAlreadyExistsException.java b/src/main/java/org/springframework/data/aerospike/exceptions/IndexAlreadyExistsException.java similarity index 95% rename from src/main/java/org/springframework/data/aerospike/IndexAlreadyExistsException.java rename to src/main/java/org/springframework/data/aerospike/exceptions/IndexAlreadyExistsException.java index cc34cd04b..9e2efb92d 100644 --- a/src/main/java/org/springframework/data/aerospike/IndexAlreadyExistsException.java +++ b/src/main/java/org/springframework/data/aerospike/exceptions/IndexAlreadyExistsException.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.data.aerospike; +package org.springframework.data.aerospike.exceptions; import org.springframework.dao.InvalidDataAccessResourceUsageException; diff --git a/src/main/java/org/springframework/data/aerospike/IndexNotFoundException.java b/src/main/java/org/springframework/data/aerospike/exceptions/IndexNotFoundException.java similarity index 93% rename from src/main/java/org/springframework/data/aerospike/IndexNotFoundException.java rename to src/main/java/org/springframework/data/aerospike/exceptions/IndexNotFoundException.java index 3f4294b71..55da857b1 100644 --- a/src/main/java/org/springframework/data/aerospike/IndexNotFoundException.java +++ b/src/main/java/org/springframework/data/aerospike/exceptions/IndexNotFoundException.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.data.aerospike; +package org.springframework.data.aerospike.exceptions; import org.springframework.dao.InvalidDataAccessResourceUsageException; @@ -22,4 +22,4 @@ public class IndexNotFoundException extends InvalidDataAccessResourceUsageExcept public IndexNotFoundException(String msg, Throwable cause) { super(msg, cause); } -} \ No newline at end of file +} diff --git a/src/main/java/org/springframework/data/aerospike/InvalidAerospikeDataAccessApiUsageException.java b/src/main/java/org/springframework/data/aerospike/exceptions/InvalidAerospikeDataAccessApiUsageException.java similarity index 95% rename from src/main/java/org/springframework/data/aerospike/InvalidAerospikeDataAccessApiUsageException.java rename to src/main/java/org/springframework/data/aerospike/exceptions/InvalidAerospikeDataAccessApiUsageException.java index 2b43725be..87b90492e 100644 --- a/src/main/java/org/springframework/data/aerospike/InvalidAerospikeDataAccessApiUsageException.java +++ b/src/main/java/org/springframework/data/aerospike/exceptions/InvalidAerospikeDataAccessApiUsageException.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.data.aerospike; +package org.springframework.data.aerospike.exceptions; import org.springframework.dao.InvalidDataAccessApiUsageException; diff --git a/src/main/java/org/springframework/data/aerospike/query/QualifierException.java b/src/main/java/org/springframework/data/aerospike/exceptions/QualifierException.java similarity index 94% rename from src/main/java/org/springframework/data/aerospike/query/QualifierException.java rename to src/main/java/org/springframework/data/aerospike/exceptions/QualifierException.java index 69868ee58..04f506634 100644 --- a/src/main/java/org/springframework/data/aerospike/query/QualifierException.java +++ b/src/main/java/org/springframework/data/aerospike/exceptions/QualifierException.java @@ -14,7 +14,7 @@ * License for the specific language governing permissions and limitations under * the License. */ -package org.springframework.data.aerospike.query; +package org.springframework.data.aerospike.exceptions; import java.io.Serial; diff --git a/src/main/java/org/springframework/data/aerospike/index/AerospikePersistenceEntityIndexCreator.java b/src/main/java/org/springframework/data/aerospike/index/AerospikePersistenceEntityIndexCreator.java index 25eb7c671..ece07a1ab 100644 --- a/src/main/java/org/springframework/data/aerospike/index/AerospikePersistenceEntityIndexCreator.java +++ b/src/main/java/org/springframework/data/aerospike/index/AerospikePersistenceEntityIndexCreator.java @@ -17,8 +17,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.ObjectProvider; -import org.springframework.data.aerospike.IndexAlreadyExistsException; import org.springframework.data.aerospike.core.AerospikeTemplate; +import org.springframework.data.aerospike.exceptions.IndexAlreadyExistsException; import org.springframework.data.aerospike.mapping.AerospikeMappingContext; import java.util.Set; diff --git a/src/main/java/org/springframework/data/aerospike/index/ReactiveAerospikePersistenceEntityIndexCreator.java b/src/main/java/org/springframework/data/aerospike/index/ReactiveAerospikePersistenceEntityIndexCreator.java index e4348bf77..8e29d85f3 100644 --- a/src/main/java/org/springframework/data/aerospike/index/ReactiveAerospikePersistenceEntityIndexCreator.java +++ b/src/main/java/org/springframework/data/aerospike/index/ReactiveAerospikePersistenceEntityIndexCreator.java @@ -17,8 +17,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.ObjectProvider; -import org.springframework.data.aerospike.IndexAlreadyExistsException; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; +import org.springframework.data.aerospike.exceptions.IndexAlreadyExistsException; import org.springframework.data.aerospike.mapping.AerospikeMappingContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/src/main/java/org/springframework/data/aerospike/query/ExpiryQualifier.java b/src/main/java/org/springframework/data/aerospike/query/ExpiryQualifier.java index 6822ebc6a..866763b94 100644 --- a/src/main/java/org/springframework/data/aerospike/query/ExpiryQualifier.java +++ b/src/main/java/org/springframework/data/aerospike/query/ExpiryQualifier.java @@ -18,6 +18,7 @@ import com.aerospike.client.Value; import com.aerospike.client.command.ParticleType; +import org.springframework.data.aerospike.exceptions.QualifierException; import java.io.Serial; diff --git a/src/main/java/org/springframework/data/aerospike/query/LatestUpdateQualifier.java b/src/main/java/org/springframework/data/aerospike/query/LatestUpdateQualifier.java index 85f54998d..050ac683b 100644 --- a/src/main/java/org/springframework/data/aerospike/query/LatestUpdateQualifier.java +++ b/src/main/java/org/springframework/data/aerospike/query/LatestUpdateQualifier.java @@ -18,6 +18,7 @@ import com.aerospike.client.Value; import com.aerospike.client.command.ParticleType; +import org.springframework.data.aerospike.exceptions.QualifierException; import java.io.Serial; diff --git a/src/main/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepository.java b/src/main/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepository.java index 8b130f4e5..6e3dd272f 100644 --- a/src/main/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepository.java +++ b/src/main/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepository.java @@ -50,21 +50,25 @@ public Optional findById(ID id) { @Override public S save(S entity) { - Assert.notNull(entity, "Cannot save NULL entity"); + Assert.notNull(entity, "Entity for save must not be null!"); operations.save(entity); return entity; } + /** + * Requires Server version 6.0+. + * + * @param entities must not be {@literal null} nor must it contain {@literal null}. + * @return List of entities + */ public List saveAll(Iterable entities) { - Assert.notNull(entities, "The given Iterable of entities not be null!"); + Assert.notNull(entities, "Entities for save must not be null!"); - List result = IterableConverter.toList(entities); - for (S entity : result) { - save(entity); - } + List entitiesList = IterableConverter.toList(entities); + operations.saveAll(entitiesList); - return result; + return entitiesList; } @Override @@ -123,7 +127,7 @@ public void deleteById(ID id) { @Override public void deleteAll(Iterable entities) { - Assert.notNull(entities, "The given ids must not be null!"); + Assert.notNull(entities, "The given entities for deleting must not be null!"); List ids = new ArrayList<>(); entities.forEach(entity -> ids.add(entityInformation.getId(entity))); operations.deleteByIds(ids, entityInformation.getJavaType()); diff --git a/src/main/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepository.java b/src/main/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepository.java index c08f1ff45..1ac3bc45a 100644 --- a/src/main/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepository.java +++ b/src/main/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepository.java @@ -45,7 +45,7 @@ public Mono save(S entity) { @Override public Flux saveAll(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null!"); - return Flux.fromIterable(entities).flatMap(this::save); + return operations.saveAll(entities); } @Override diff --git a/src/test/java/org/springframework/data/aerospike/BaseBlockingIntegrationTests.java b/src/test/java/org/springframework/data/aerospike/BaseBlockingIntegrationTests.java index 7928660d3..e4e5a5012 100644 --- a/src/test/java/org/springframework/data/aerospike/BaseBlockingIntegrationTests.java +++ b/src/test/java/org/springframework/data/aerospike/BaseBlockingIntegrationTests.java @@ -33,7 +33,7 @@ public abstract class BaseBlockingIntegrationTests extends BaseIntegrationTests @Autowired protected IndexRefresher indexRefresher; - protected void deleteAll(Collection collection) { + protected void deleteOneByOne(Collection collection) { collection.forEach(item -> template.delete(item)); } } diff --git a/src/test/java/org/springframework/data/aerospike/ReactiveBlockingAerospikeTestOperations.java b/src/test/java/org/springframework/data/aerospike/ReactiveBlockingAerospikeTestOperations.java index 6d50d034a..3762b1452 100644 --- a/src/test/java/org/springframework/data/aerospike/ReactiveBlockingAerospikeTestOperations.java +++ b/src/test/java/org/springframework/data/aerospike/ReactiveBlockingAerospikeTestOperations.java @@ -1,13 +1,17 @@ package org.springframework.data.aerospike; +import com.aerospike.client.AerospikeException; import com.aerospike.client.IAerospikeClient; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; import org.springframework.data.aerospike.query.cache.IndexInfoParser; -import org.springframework.data.aerospike.utility.AdditionalAerospikeTestOperations; +import org.springframework.data.aerospike.repository.ReactiveAerospikeRepository; import org.springframework.data.aerospike.sample.Customer; import org.springframework.data.aerospike.sample.Person; +import org.springframework.data.aerospike.utility.AdditionalAerospikeTestOperations; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import org.testcontainers.containers.GenericContainer; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -64,4 +68,26 @@ public List generatePersons(int count) { .peek(template::save) .collect(Collectors.toList()); } + + public void deleteAll(ReactiveAerospikeRepository repository, Collection entities) { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(template.getAerospikeReactorClient().getAerospikeClient())) { + try { + repository.deleteAll(entities).block(); + } catch (AerospikeException.BatchRecordArray ignored) { + // KEY_NOT_FOUND ResultCode causes exception if there are no entities + } + } else { + entities.forEach(entity -> repository.delete(entity).block()); + } + } + + public void saveAll(ReactiveAerospikeRepository repository, Collection entities) { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(template.getAerospikeReactorClient().getAerospikeClient())) { + repository.saveAll(entities).blockLast(); + } else { + entities.forEach(entity -> repository.save(entity).block()); + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateDeleteTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateDeleteTests.java index 5df97298b..a17c51d65 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateDeleteTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateDeleteTests.java @@ -15,6 +15,7 @@ */ package org.springframework.data.aerospike.core; +import com.aerospike.client.AerospikeException; import com.aerospike.client.policy.GenerationPolicy; import org.junit.jupiter.api.Test; import org.springframework.data.aerospike.BaseBlockingIntegrationTests; @@ -24,7 +25,7 @@ import org.springframework.data.aerospike.core.model.GroupedKeys; import org.springframework.data.aerospike.sample.Customer; import org.springframework.data.aerospike.sample.Person; -import org.springframework.data.aerospike.utility.IndexUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import java.util.Arrays; import java.util.Collection; @@ -102,7 +103,7 @@ public void deleteById_returnsFalseIfValueIsAbsent() { @Test public void deleteByGroupedKeys() { - if (IndexUtils.isBatchWriteSupported(client)) { + if (ServerVersionUtils.isBatchWriteSupported(client)) { List persons = additionalAerospikeTestOperations.generatePersons(5); List personsIds = persons.stream().map(Person::getId).toList(); List customers = additionalAerospikeTestOperations.generateCustomers(3); @@ -178,4 +179,37 @@ public void deleteByType_NullTypeThrowsException() { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Class must not be null!"); } + + @Test + public void deleteAll_rejectsDuplicateIds() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + String id1 = nextId(); + DocumentWithExpiration document1 = new DocumentWithExpiration(id1); + DocumentWithExpiration document2 = new DocumentWithExpiration(id1); + template.save(document1); + template.save(document2); + + List ids = List.of(id1, id1); + assertThatThrownBy(() -> template.deleteByIds(ids, DocumentWithExpiration.class)) + .isInstanceOf(AerospikeException.BatchRecordArray.class) + .hasMessageContaining("Errors during batch delete"); + } + } + + @Test + public void deleteAll_ShouldDeleteAllDocuments() { + // batch delete operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + String id1 = nextId(); + String id2 = nextId(); + template.save(new DocumentWithExpiration(id1)); + template.save(new DocumentWithExpiration(id2)); + + List ids = List.of(id1, id2); + template.deleteByIds(ids, DocumentWithExpiration.class); + + assertThat(template.findByIds(ids, DocumentWithExpiration.class)).isEmpty(); + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateFindByQueryProjectionTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateFindByQueryProjectionTests.java index c8fb45401..65370f3be 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateFindByQueryProjectionTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateFindByQueryProjectionTests.java @@ -11,6 +11,7 @@ import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.sample.PersonSomeFields; import org.springframework.data.aerospike.utility.QueryUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import org.springframework.data.domain.Sort; import java.util.Arrays; @@ -49,8 +50,15 @@ public class AerospikeTemplateFindByQueryProjectionTests extends BaseBlockingInt @BeforeAll public void beforeAllSetUp() { - deleteAll(allPersons); - template.insertAll(allPersons); + deleteOneByOne(allPersons); + + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + template.insertAll(allPersons); + } else { + allPersons.forEach(person -> template.insert(person)); + } + additionalAerospikeTestOperations.createIndex(Person.class, "person_age_index", "age", IndexType.NUMERIC); additionalAerospikeTestOperations.createIndex(Person.class, "person_first_name_index", "firstName" @@ -67,7 +75,7 @@ public void setUp() { @AfterAll public void afterAll() { - deleteAll(allPersons); + deleteOneByOne(allPersons); additionalAerospikeTestOperations.dropIndex(Person.class, "person_age_index"); additionalAerospikeTestOperations.dropIndex(Person.class, "person_first_name_index"); additionalAerospikeTestOperations.dropIndex(Person.class, "person_last_name_index"); diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateFindByQueryTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateFindByQueryTests.java index 84b7ec6d8..97cdce5fe 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateFindByQueryTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateFindByQueryTests.java @@ -31,6 +31,7 @@ import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.utility.CollectionUtils; import org.springframework.data.aerospike.utility.QueryUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import org.springframework.data.domain.Sort; import java.util.Arrays; @@ -77,8 +78,15 @@ public class AerospikeTemplateFindByQueryTests extends BaseBlockingIntegrationTe @BeforeAll public void beforeAllSetUp() { - deleteAll(allPersons); - template.insertAll(allPersons); + deleteOneByOne(allPersons); + + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + template.insertAll(allPersons); + } else { + allPersons.forEach(person -> template.insert(person)); + } + additionalAerospikeTestOperations.createIndex(Person.class, "person_age_index", "age", IndexType.NUMERIC); additionalAerospikeTestOperations.createIndex(Person.class, "person_first_name_index", "firstName", @@ -95,7 +103,7 @@ public void setUp() { @AfterAll public void afterAll() { - deleteAll(allPersons); + deleteOneByOne(allPersons); additionalAerospikeTestOperations.dropIndex(Person.class, "person_age_index"); additionalAerospikeTestOperations.dropIndex(Person.class, "person_first_name_index"); additionalAerospikeTestOperations.dropIndex(Person.class, "person_last_name_index"); @@ -226,7 +234,12 @@ public void findAll_findNothing() { Stream result = template.findAll(Person.class); assertThat(result).isEmpty(); - template.insertAll(allPersons); + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + template.insertAll(allPersons); + } else { + allPersons.forEach(person -> template.insert(person)); + } } @Test diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateIndexTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateIndexTests.java index b35810cae..10be2e8ac 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateIndexTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateIndexTests.java @@ -7,11 +7,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.data.aerospike.BaseBlockingIntegrationTests; -import org.springframework.data.aerospike.IndexAlreadyExistsException; +import org.springframework.data.aerospike.exceptions.IndexAlreadyExistsException; import org.springframework.data.aerospike.mapping.Document; import org.springframework.data.aerospike.query.model.Index; import org.springframework.data.aerospike.utility.AsyncUtils; -import org.springframework.data.aerospike.utility.IndexUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import org.springframework.test.context.TestPropertySource; import java.util.List; @@ -88,7 +88,7 @@ public void createIndex_createsIndex() { // for Aerospike Server ver. >= 6.1.0.1 @Test public void createIndex_shouldNotThrowExceptionIfIndexAlreadyExists() { - if (IndexUtils.isDropCreateBehaviorUpdated(client)) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(client)) { template.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING); awaitTenSecondsUntil(() -> assertThat(template.indexExists(INDEX_TEST_1)).isTrue()); @@ -138,7 +138,7 @@ public void createIndex_createsIndexForDifferentTypes() { // for Aerospike Server ver. >= 6.1.0.1 @Test public void deleteIndex_doesNotThrowExceptionIfIndexDoesNotExist() { - if (IndexUtils.isDropCreateBehaviorUpdated(client)) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(client)) { assertThatCode(() -> template.deleteIndex(IndexedDocument.class, "not-existing-index")) .doesNotThrowAnyException(); } @@ -147,7 +147,7 @@ public void deleteIndex_doesNotThrowExceptionIfIndexDoesNotExist() { // for Aerospike Server ver. >= 6.1.0.1 @Test public void createIndex_createsIndexOnNestedList() { - if (IndexUtils.isDropCreateBehaviorUpdated(client)) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(client)) { String setName = template.getSetName(IndexedDocument.class); template.createIndex(IndexedDocument.class, INDEX_TEST_1, "nestedList", IndexType.STRING, IndexCollectionType.LIST, CTX.listIndex(1)); @@ -167,7 +167,7 @@ public void createIndex_createsIndexOnNestedList() { // for Aerospike Server ver. >= 6.1.0.1 @Test public void createIndex_createsIndexOnNestedListContextRank() { - if (IndexUtils.isDropCreateBehaviorUpdated(client)) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(client)) { String setName = template.getSetName(IndexedDocument.class); template.createIndex(IndexedDocument.class, INDEX_TEST_1, "nestedList", IndexType.STRING, IndexCollectionType.LIST, CTX.listRank(-1)); @@ -187,7 +187,7 @@ public void createIndex_createsIndexOnNestedListContextRank() { // for Aerospike Server ver. >= 6.1.0.1 @Test public void createIndex_createsIndexOnMapOfMapsContext() { - if (IndexUtils.isDropCreateBehaviorUpdated(client)) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(client)) { String setName = template.getSetName(IndexedDocument.class); CTX[] ctx = new CTX[]{ diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateInsertTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateInsertTests.java index a1d827151..90fc1d8ba 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateInsertTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateInsertTests.java @@ -15,9 +15,11 @@ */ package org.springframework.data.aerospike.core; +import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.Record; import com.aerospike.client.policy.Policy; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.dao.DuplicateKeyException; import org.springframework.data.aerospike.BaseBlockingIntegrationTests; @@ -25,6 +27,7 @@ import org.springframework.data.aerospike.SampleClasses.DocumentWithByteArray; import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.utility.AsyncUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import java.util.Arrays; import java.util.Collections; @@ -35,6 +38,7 @@ import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.springframework.data.aerospike.SampleClasses.VersionedClass; @@ -107,6 +111,8 @@ public void insertsDocumentWithZeroVersionIfThereIsNoDocumentWithSameKey() { @Test public void insertsDocumentWithVersionGreaterThanZeroIfThereIsNoDocumentWithSameKey() { VersionedClass document = new VersionedClass(id, "any", 5L); + // initially given versions are ignored + // RecordExistsAction.CREATE_ONLY is set template.insert(document); assertThat(document.getVersion()).isEqualTo(1); @@ -173,16 +179,6 @@ public void insertsOnlyFirstDocumentAndNextAttemptsShouldFailWithDuplicateKeyExc template.delete(template.findById(id, Person.class)); // cleanup } - @Test - public void insertAll_rejectsDuplicateIds() { - Person person = Person.builder().id(id).build(); - List records = Arrays.asList(person, person); - - assertThatThrownBy(() -> template.insertAll(records)) - .isInstanceOf(DuplicateKeyException.class); - template.delete(person); // cleanup - } - @Test public void insertAll_insertsAllDocuments() { List persons = IntStream.range(1, 10) @@ -190,7 +186,13 @@ public void insertAll_insertsAllDocuments() { .firstName("Gregor") .age(age).build()) .collect(Collectors.toList()); - template.insertAll(persons); + + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + template.insertAll(persons); + } else { + persons.forEach(person -> template.insert(person)); + } List result = template.findByIds(persons.stream().map(Person::getId) .collect(Collectors.toList()), Person.class); @@ -200,4 +202,38 @@ public void insertAll_insertsAllDocuments() { template.delete(person); // cleanup } } + + @Test + public void insertAll_rejectsDuplicateIds() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + VersionedClass first = new VersionedClass(id, "foo"); + + assertThatThrownBy(() -> template.insertAll(List.of(first, first))) + .isInstanceOf(AerospikeException.BatchRecordArray.class) + .hasMessageContaining("Errors during batch insert"); + Assertions.assertEquals(1, (long) first.getVersion()); + template.delete(first); // cleanup + } + } + + @Test + public void shouldInsertAllVersionedDocuments() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + VersionedClass first = new VersionedClass(id, "foo"); + VersionedClass second = new VersionedClass(nextId(), "foo", 1L); + VersionedClass third = new VersionedClass(nextId(), "foo", 2L); + template.insertAll(List.of(first)); + + // initially given versions are ignored + // RecordExistsAction.CREATE_ONLY is set + assertThatNoException().isThrownBy(() -> template.insertAll( + List.of(second, third))); + + assertThat(first.getVersion() == 1).isTrue(); + assertThat(second.getVersion() == 1).isTrue(); + assertThat(third.getVersion() == 1).isTrue(); + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateSaveTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateSaveTests.java index fde54fba2..c9c207025 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateSaveTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateSaveTests.java @@ -15,6 +15,7 @@ */ package org.springframework.data.aerospike.core; +import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.Record; import com.aerospike.client.policy.Policy; @@ -25,10 +26,13 @@ import org.springframework.data.aerospike.BaseBlockingIntegrationTests; import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.utility.AsyncUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.springframework.data.aerospike.SampleClasses.CustomCollectionClass; import static org.springframework.data.aerospike.SampleClasses.DocumentWithByteArray; @@ -62,19 +66,36 @@ public void shouldSaveAndSetVersion() { } @Test - public void shouldNotSaveDocumentIfItAlreadyExists() { + public void shouldNotSaveVersionedDocumentIfItAlreadyExists() { template.save(new VersionedClass(id, "foo")); assertThatThrownBy(() -> template.save(new VersionedClass(id, "foo"))) .isInstanceOf(OptimisticLockingFailureException.class); } + @Test + public void shouldUpdateNotVersionedDocumentIfItAlreadyExists() { + Person person = new Person(id, "Amol"); + person.setAge(28); + template.save(person); + + assertThatNoException().isThrownBy(() -> template.save(person)); + } + @Test public void shouldSaveDocumentWithEqualVersion() { - template.save(new VersionedClass(id, "foo", 0L)); + // if an object has version property, GenerationPolicy.EXPECT_GEN_EQUAL is set + VersionedClass first = new VersionedClass(id, "foo", 0L); + VersionedClass second = new VersionedClass(id, "foo", 1L); + VersionedClass third = new VersionedClass(id, "foo", 2L); - template.save(new VersionedClass(id, "foo", 1L)); - template.save(new VersionedClass(id, "foo", 2L)); + template.save(first); + template.save(second); + template.save(third); + + assertThat(first.getVersion() == 1).isTrue(); + assertThat(second.getVersion() == 2).isTrue(); + assertThat(third.getVersion() == 3).isTrue(); } @Test @@ -87,9 +108,11 @@ public void shouldFailSaveNewDocumentWithVersionGreaterThanZero() { public void shouldUpdateNullField() { VersionedClass versionedClass = new VersionedClass(id, null); template.save(versionedClass); + assertThat(versionedClass.getVersion() == 1).isTrue(); VersionedClass saved = template.findById(id, VersionedClass.class); template.save(saved); + assertThat(saved.getVersion() == 2).isTrue(); } @Test @@ -163,7 +186,7 @@ public void shouldUpdateAlreadyExistingDocument() { try { template.save(messageData); saved = true; - } catch (OptimisticLockingFailureException e) { + } catch (OptimisticLockingFailureException ignored) { } } }); @@ -259,4 +282,56 @@ public void shouldSaveAndFindDocumentWithByteArrayField() { assertThat(result).isEqualTo(document); } + + @Test + public void shouldSaveAllAndSetVersion() { + VersionedClass first = new VersionedClass(id, "foo"); + VersionedClass second = new VersionedClass(nextId(), "foo"); + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + template.saveAll(List.of(first, second)); + } else { + List.of(first, second).forEach(person -> template.save(person)); + } + + assertThat(first.version).isEqualTo(1); + assertThat(second.version).isEqualTo(1); + assertThat(template.findById(id, VersionedClass.class).version).isEqualTo(1); + template.delete(first); // cleanup + template.delete(second); // cleanup + } + + @Test + public void shouldSaveAllVersionedDocumentsAndSetVersionAndThrowExceptionIfAlreadyExist() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + VersionedClass first = new VersionedClass(id, "foo"); + VersionedClass second = new VersionedClass(nextId(), "foo"); + + assertThatThrownBy(() -> template.saveAll(List.of(first, first, second, second))) + .isInstanceOf(AerospikeException.BatchRecordArray.class) + .hasMessageContaining("Errors during batch save"); + + assertThat(first.getVersion() == 1).isTrue(); + assertThat(second.getVersion() == 1).isTrue(); + + template.delete(first); // cleanup + template.delete(second); // cleanup + } + } + + @Test + public void shouldSaveAllNotVersionedDocumentsIfAlreadyExist() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + Person person = new Person(id, "Amol"); + person.setAge(28); + template.save(person); + + // If an object has no version property, RecordExistsAction.UPDATE is set + assertThatNoException().isThrownBy(() -> template.saveAll(List.of(person, person))); + + template.delete(person); // cleanup + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateUpdateTests.java b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateUpdateTests.java index e4da06446..98fc172a6 100644 --- a/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateUpdateTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/AerospikeTemplateUpdateTests.java @@ -15,6 +15,7 @@ */ package org.springframework.data.aerospike.core; +import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.Record; import com.aerospike.client.policy.Policy; @@ -25,6 +26,7 @@ import org.springframework.data.aerospike.BaseBlockingIntegrationTests; import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.utility.AsyncUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import java.util.ArrayList; import java.util.HashMap; @@ -40,6 +42,7 @@ public class AerospikeTemplateUpdateTests extends BaseBlockingIntegrationTests { @Test public void shouldThrowExceptionOnUpdateForNonExistingKey() { + // RecordExistsAction.UPDATE_ONLY assertThatThrownBy(() -> template.update(new Person(id, "svenfirstName", 11))) .isInstanceOf(DataRetrievalFailureException.class); } @@ -374,4 +377,42 @@ public void TestAddToMapSpecifyingMapFieldOnly() { assertThat(personWithList2.getStringMap().get("key4")).isEqualTo("Added something new"); template.delete(personWithList2); // cleanup } + + @Test + public void updateAllShouldThrowExceptionOnUpdateForNonExistingKey() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + Person person1 = new Person(id, "svenfirstName", 11); + Person person2 = new Person(nextId(), "svenfirstName", 11); + Person person3 = new Person(nextId(), "svenfirstName", 11); + template.save(person3); + // RecordExistsAction.UPDATE_ONLY + assertThatThrownBy(() -> template.updateAll(List.of(person1, person2))) + .isInstanceOf(AerospikeException.BatchRecordArray.class); + + assertThat(template.findById(person1.getId(), Person.class)).isNull(); + assertThat(template.findById(person2.getId(), Person.class)).isNull(); + assertThat(template.findById(person3.getId(), Person.class)).isEqualTo(person3); + } + } + + @Test + public void updateAllIfDocumentsNotChanged() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + int age1 = 140335200; + int age2 = 177652800; + Person person1 = new Person(id, "Wolfgang", age1); + Person person2 = new Person(nextId(), "Johann", age2); + template.insertAll(List.of(person1, person2)); + template.updateAll(List.of(person1, person2)); + + Person result1 = template.findById(person1.getId(), Person.class); + Person result2 = template.findById(person2.getId(), Person.class); + assertThat(result1.getAge()).isEqualTo(age1); + assertThat(result2.getAge()).isEqualTo(age2); + template.delete(result1); // cleanup + template.delete(result2); // cleanup + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/core/DefaultAerospikeExceptionTranslatorTest.java b/src/test/java/org/springframework/data/aerospike/core/DefaultAerospikeExceptionTranslatorTest.java index e119ad9ec..cf6c7f77c 100644 --- a/src/test/java/org/springframework/data/aerospike/core/DefaultAerospikeExceptionTranslatorTest.java +++ b/src/test/java/org/springframework/data/aerospike/core/DefaultAerospikeExceptionTranslatorTest.java @@ -24,8 +24,8 @@ import org.springframework.dao.QueryTimeoutException; import org.springframework.dao.RecoverableDataAccessException; import org.springframework.dao.TransientDataAccessResourceException; -import org.springframework.data.aerospike.IndexAlreadyExistsException; -import org.springframework.data.aerospike.IndexNotFoundException; +import org.springframework.data.aerospike.exceptions.IndexAlreadyExistsException; +import org.springframework.data.aerospike.exceptions.IndexNotFoundException; import static org.assertj.core.api.Assertions.assertThat; diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java index 0711ced45..11fd71e38 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java @@ -1,15 +1,22 @@ package org.springframework.data.aerospike.core.reactive; +import com.aerospike.client.AerospikeException; import com.aerospike.client.policy.GenerationPolicy; import com.aerospike.client.policy.WritePolicy; import org.junit.jupiter.api.Test; import org.springframework.data.aerospike.BaseReactiveIntegrationTests; +import org.springframework.data.aerospike.SampleClasses; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; +import org.springframework.data.aerospike.core.model.GroupedKeys; import org.springframework.data.aerospike.sample.Person; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; import static org.springframework.data.aerospike.SampleClasses.VersionedClass; /** @@ -107,4 +114,62 @@ public void deleteByObject_shouldReturnFalseIfValueIsAbsent() { // then StepVerifier.create(deleted).expectComplete().verify(); } + + @Test + public void deleteAll_ShouldDeleteAllDocuments() { + // batch delete operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(reactorClient.getAerospikeClient())) { + String id1 = nextId(); + String id2 = nextId(); + reactiveTemplate.save(new SampleClasses.DocumentWithExpiration(id1)); + reactiveTemplate.save(new SampleClasses.DocumentWithExpiration(id2)); + + List ids = List.of(id1, id2); + reactiveTemplate.deleteByIds(ids, SampleClasses.DocumentWithExpiration.class); + + List list = reactiveTemplate.findByIds(ids, + SampleClasses.DocumentWithExpiration.class).subscribeOn(Schedulers.parallel()).collectList().block(); + assertThat(list).isEmpty(); + } + } + + @Test + public void deleteAllFromDifferentSets_ShouldDeleteAllDocuments() { + // batch delete operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(reactorClient.getAerospikeClient())) { + SampleClasses.DocumentWithExpiration entity1 = new SampleClasses.DocumentWithExpiration(id); + SampleClasses.VersionedClass entity2 = new SampleClasses.VersionedClass(nextId(), "test"); + reactiveTemplate.save(entity1); + reactiveTemplate.save(entity2); + + reactiveTemplate.deleteByIds(GroupedKeys.builder() + .entityKeys(SampleClasses.DocumentWithExpiration.class, List.of(entity1.getId())).build()); + reactiveTemplate.deleteByIds(GroupedKeys.builder() + .entityKeys(SampleClasses.VersionedClass.class, List.of(entity2.getId())).build()); + + List list1 = reactiveTemplate.findByIds(List.of(entity1.getId()), + SampleClasses.DocumentWithExpiration.class).subscribeOn(Schedulers.parallel()).collectList().block(); + assertThat(list1).isEmpty(); + List list2 = reactiveTemplate.findByIds(List.of(entity2.getId()), + SampleClasses.VersionedClass.class).subscribeOn(Schedulers.parallel()).collectList().block(); + assertThat(list2).isEmpty(); + } + } + + @Test + public void deleteAll_rejectsDuplicateIds() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(reactorClient.getAerospikeClient())) { + String id1 = nextId(); + SampleClasses.DocumentWithExpiration document1 = new SampleClasses.DocumentWithExpiration(id1); + SampleClasses.DocumentWithExpiration document2 = new SampleClasses.DocumentWithExpiration(id1); + reactiveTemplate.save(document1); + reactiveTemplate.save(document2); + + List ids = List.of(id1, id1); + StepVerifier.create(reactiveTemplate.deleteByIds(ids, SampleClasses.DocumentWithExpiration.class)) + .expectError(AerospikeException.BatchRecordArray.class) + .verify(); + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateIndexTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateIndexTests.java index 236315891..3e32cf9fe 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateIndexTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateIndexTests.java @@ -7,12 +7,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.data.aerospike.BaseReactiveIntegrationTests; -import org.springframework.data.aerospike.IndexAlreadyExistsException; -import org.springframework.data.aerospike.IndexNotFoundException; import org.springframework.data.aerospike.core.AerospikeTemplateIndexTests; +import org.springframework.data.aerospike.exceptions.IndexAlreadyExistsException; +import org.springframework.data.aerospike.exceptions.IndexNotFoundException; import org.springframework.data.aerospike.mapping.Document; import org.springframework.data.aerospike.query.model.Index; -import org.springframework.data.aerospike.utility.IndexUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import reactor.core.publisher.Mono; import java.util.Objects; @@ -39,7 +39,7 @@ public void setUp() { // for Aerospike Server ver. >= 6.1.0.1 @Test public void createIndex_shouldNotThrowExceptionIfIndexAlreadyExists() { - if (IndexUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { reactiveTemplate.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING).block(); assertThatCode(() -> reactiveTemplate.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", @@ -52,7 +52,7 @@ public void createIndex_shouldNotThrowExceptionIfIndexAlreadyExists() { // for Aerospike Server ver. < 6.1.0.1 @Test public void createIndex_throwsExceptionIfIndexAlreadyExists() { - if (!IndexUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { + if (!ServerVersionUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { reactiveTemplate.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING).block(); assertThatThrownBy(() -> reactiveTemplate.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", @@ -139,7 +139,7 @@ public void createIndex_createsIndexForDifferentTypes() { // for Aerospike Server ver. >= 6.1.0.1 @Test public void createIndex_createsIndexOnNestedList() { - if (IndexUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { String setName = reactiveTemplate.getSetName(AerospikeTemplateIndexTests.IndexedDocument.class); reactiveTemplate.createIndex( AerospikeTemplateIndexTests.IndexedDocument.class, INDEX_TEST_1, "nestedList", @@ -160,7 +160,7 @@ public void createIndex_createsIndexOnNestedList() { // for Aerospike Server ver. >= 6.1.0.1 @Test public void createIndex_createsIndexOnMapOfMapsContext() { - if (IndexUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { String setName = reactiveTemplate.getSetName(AerospikeTemplateIndexTests.IndexedDocument.class); CTX[] ctx = new CTX[]{ @@ -191,7 +191,7 @@ public void createIndex_createsIndexOnMapOfMapsContext() { // for Aerospike Server ver. >= 6.1.0.1 @Test public void deleteIndex_doesNotThrowExceptionIfIndexDoesNotExist() { - if (IndexUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { assertThatCode(() -> reactiveTemplate.deleteIndex(IndexedDocument.class, "not-existing-index") .block()) .doesNotThrowAnyException(); @@ -201,7 +201,7 @@ public void deleteIndex_doesNotThrowExceptionIfIndexDoesNotExist() { // for Aerospike Server ver. < 6.1.0.1 @Test public void deleteIndex_throwsExceptionIfIndexDoesNotExist() { - if (!IndexUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { + if (!ServerVersionUtils.isDropCreateBehaviorUpdated(reactorClient.getAerospikeClient())) { assertThatThrownBy(() -> reactiveTemplate.deleteIndex(IndexedDocument.class, "not-existing-index").block()) .isInstanceOf(IndexNotFoundException.class); } diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateInsertTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateInsertTests.java index d3f0744fc..9a2e9ace4 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateInsertTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateInsertTests.java @@ -1,5 +1,6 @@ package org.springframework.data.aerospike.core.reactive; +import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.policy.Policy; import org.junit.jupiter.api.Test; @@ -10,6 +11,7 @@ import org.springframework.data.aerospike.SampleClasses.VersionedClass; import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.utility.AsyncUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; @@ -20,7 +22,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; public class ReactiveAerospikeTemplateInsertTests extends BaseReactiveIntegrationTests { @@ -170,28 +171,30 @@ public void insertsOnlyFirstDocumentAndNextAttemptsShouldFailWithDuplicateKeyExc @Test public void insertAll_shouldInsertAllDocuments() { - Person customer1 = new Person(nextId(), "Dave"); - Person customer2 = new Person(nextId(), "James"); - reactiveTemplate.insertAll(asList(customer1, customer2)).blockLast(); - - Person result1 = findById(customer1.getId(), Person.class); - Person result2 = findById(customer2.getId(), Person.class); - assertThat(result1).isEqualTo(customer1); - assertThat(result2).isEqualTo(customer2); - reactiveTemplate.delete(result1).block(); // cleanup - reactiveTemplate.delete(result2).block(); // cleanup + if (ServerVersionUtils.isBatchWriteSupported(reactorClient.getAerospikeClient())) { + Person customer1 = new Person(nextId(), "Dave"); + Person customer2 = new Person(nextId(), "James"); + reactiveTemplate.insertAll(List.of(customer1, customer2)).blockLast(); + + Person result1 = findById(customer1.getId(), Person.class); + Person result2 = findById(customer2.getId(), Person.class); + assertThat(result1).isEqualTo(customer1); + assertThat(result2).isEqualTo(customer2); + reactiveTemplate.delete(result1).block(); // cleanup + reactiveTemplate.delete(result2).block(); // cleanup + } } @Test public void insertAll_rejectsDuplicateId() { - Person person = new Person(id, "Amol"); - person.setAge(28); - reactiveTemplate.insert(person).block(); - assertThat(findById(id, Person.class)).isEqualTo(person); - - StepVerifier.create(reactiveTemplate.insertAll(List.of(person))) - .expectError(DuplicateKeyException.class) - .verify(); - reactiveTemplate.delete(findById(id, Person.class)).block(); // cleanup + if (ServerVersionUtils.isBatchWriteSupported(reactorClient.getAerospikeClient())) { + Person person = new Person(id, "Amol"); + person.setAge(28); + + StepVerifier.create(reactiveTemplate.insertAll(List.of(person, person))) + .expectError(AerospikeException.BatchRecordArray.class) + .verify(); + reactiveTemplate.delete(findById(id, Person.class)).block(); // cleanup + } } } diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java index 84ae6e86a..554a4c93a 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java @@ -1,5 +1,6 @@ package org.springframework.data.aerospike.core.reactive; +import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.policy.Policy; import org.junit.jupiter.api.Test; @@ -11,10 +12,12 @@ import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.utility.AsyncUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import static org.assertj.core.api.Assertions.assertThat; @@ -217,4 +220,34 @@ public void save_rejectsNullObjectToBeSaved() { assertThatThrownBy(() -> reactiveTemplate.save(null).block()) .isInstanceOf(IllegalArgumentException.class); } + + @Test + public void saveAll_shouldSaveAllDocuments() { + // batch delete operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(reactorClient.getAerospikeClient())) { + Person customer1 = new Person(nextId(), "Dave"); + Person customer2 = new Person(nextId(), "James"); + reactiveTemplate.saveAll(List.of(customer1, customer2)).blockLast(); + + Person result1 = findById(customer1.getId(), Person.class); + Person result2 = findById(customer2.getId(), Person.class); + assertThat(result1).isEqualTo(customer1); + assertThat(result2).isEqualTo(customer2); + reactiveTemplate.delete(result1).block(); // cleanup + reactiveTemplate.delete(result2).block(); // cleanup + } + } + + @Test + public void saveAll_rejectsDuplicateId() { + // batch delete operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(reactorClient.getAerospikeClient())) { + VersionedClass first = new VersionedClass(id, "foo"); + + StepVerifier.create(reactiveTemplate.saveAll(List.of(first, first))) + .expectError(AerospikeException.BatchRecordArray.class) + .verify(); + reactiveTemplate.delete(findById(id, VersionedClass.class)).block(); // cleanup + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateUpdateTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateUpdateTests.java index 7c6dc59c9..ddaf29508 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateUpdateTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateUpdateTests.java @@ -1,5 +1,6 @@ package org.springframework.data.aerospike.core.reactive; +import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.policy.Policy; import org.junit.jupiter.api.Test; @@ -10,6 +11,7 @@ import org.springframework.data.aerospike.SampleClasses.VersionedClass; import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.utility.AsyncUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -27,6 +29,7 @@ public class ReactiveAerospikeTemplateUpdateTests extends BaseReactiveIntegratio @Test public void shouldThrowExceptionOnUpdateForNonExistingKey() { + // RecordExistsAction.UPDATE_ONLY create(reactiveTemplate.update(new Person(id, "svenfirstName", 11))) .expectError(DataRetrievalFailureException.class) .verify(); @@ -325,4 +328,42 @@ public void TestAddToMapSpecifyingMapFieldOnly() { assertThat(personWithList2.getStringMap().get("key4")).isEqualTo("Added something new"); reactiveTemplate.delete(findById(id, Person.class)).block(); // cleanup } + + @Test + public void updateAllShouldThrowExceptionOnUpdateForNonExistingKey() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(reactorClient.getAerospikeClient())) { + Person person1 = new Person(id, "svenfirstName", 11); + Person person2 = new Person(nextId(), "svenfirstName", 11); + Person person3 = new Person(nextId(), "svenfirstName", 11); + reactiveTemplate.save(person3).block(); + // RecordExistsAction.UPDATE_ONLY + assertThatThrownBy(() -> reactiveTemplate.updateAll(List.of(person1, person2)).blockLast()) + .isInstanceOf(AerospikeException.BatchRecordArray.class); + + assertThat(reactiveTemplate.findById(person1.getId(), Person.class).block()).isNull(); + assertThat(reactiveTemplate.findById(person2.getId(), Person.class).block()).isNull(); + assertThat(reactiveTemplate.findById(person3.getId(), Person.class).block()).isEqualTo(person3); + } + } + + @Test + public void updateAllIfDocumentsNotChanged() { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(reactorClient.getAerospikeClient())) { + int age1 = 140335200; + int age2 = 177652800; + Person person1 = new Person(id, "Wolfgang", age1); + Person person2 = new Person(nextId(), "Johann", age2); + reactiveTemplate.insertAll(List.of(person1, person2)).blockLast(); + reactiveTemplate.updateAll(List.of(person1, person2)).blockLast(); + + Person result1 = reactiveTemplate.findById(person1.getId(), Person.class).block(); + Person result2 = reactiveTemplate.findById(person2.getId(), Person.class).block(); + assertThat(result1.getAge()).isEqualTo(age1); + assertThat(result2.getAge()).isEqualTo(age2); + reactiveTemplate.delete(result1).block(); // cleanup + reactiveTemplate.delete(result2).block(); // cleanup + } + } } diff --git a/src/test/java/org/springframework/data/aerospike/index/AerospikePersistenceEntityIndexCreatorTest.java b/src/test/java/org/springframework/data/aerospike/index/AerospikePersistenceEntityIndexCreatorTest.java index 6dcdd532b..3c4669b8a 100644 --- a/src/test/java/org/springframework/data/aerospike/index/AerospikePersistenceEntityIndexCreatorTest.java +++ b/src/test/java/org/springframework/data/aerospike/index/AerospikePersistenceEntityIndexCreatorTest.java @@ -3,8 +3,8 @@ import com.aerospike.client.query.IndexCollectionType; import com.aerospike.client.query.IndexType; import org.junit.jupiter.api.Test; -import org.springframework.data.aerospike.IndexAlreadyExistsException; import org.springframework.data.aerospike.core.AerospikeTemplate; +import org.springframework.data.aerospike.exceptions.IndexAlreadyExistsException; import org.springframework.data.aerospike.sample.AutoIndexedDocument; import org.springframework.data.aerospike.utility.MockObjectProvider; diff --git a/src/test/java/org/springframework/data/aerospike/index/ReactiveAerospikePersistenceEntityIndexCreatorTest.java b/src/test/java/org/springframework/data/aerospike/index/ReactiveAerospikePersistenceEntityIndexCreatorTest.java index a5ab23a88..cd705fe1e 100644 --- a/src/test/java/org/springframework/data/aerospike/index/ReactiveAerospikePersistenceEntityIndexCreatorTest.java +++ b/src/test/java/org/springframework/data/aerospike/index/ReactiveAerospikePersistenceEntityIndexCreatorTest.java @@ -3,8 +3,8 @@ import com.aerospike.client.query.IndexCollectionType; import com.aerospike.client.query.IndexType; import org.junit.jupiter.api.Test; -import org.springframework.data.aerospike.IndexAlreadyExistsException; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; +import org.springframework.data.aerospike.exceptions.IndexAlreadyExistsException; import org.springframework.data.aerospike.sample.AutoIndexedDocument; import org.springframework.data.aerospike.utility.MockObjectProvider; import reactor.core.publisher.Mono; diff --git a/src/test/java/org/springframework/data/aerospike/query/IndexedQualifierTests.java b/src/test/java/org/springframework/data/aerospike/query/IndexedQualifierTests.java index b1610ec6e..eb8d5cc07 100644 --- a/src/test/java/org/springframework/data/aerospike/query/IndexedQualifierTests.java +++ b/src/test/java/org/springframework/data/aerospike/query/IndexedQualifierTests.java @@ -22,7 +22,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.data.aerospike.utility.CollectionUtils; -import org.springframework.data.aerospike.utility.IndexUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import java.util.Map; import java.util.stream.Collectors; @@ -185,7 +185,7 @@ public void selectOnIndexedStringEQQualifier() { @Test public void selectWithGeoWithin() { - if (IndexUtils.isDropCreateBehaviorUpdated(client)) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(client)) { withIndex(namespace, INDEXED_GEO_SET, "geo_index", GEO_BIN_NAME, IndexType.GEO2DSPHERE, () -> { double lon = -122.0; double lat = 37.5; diff --git a/src/test/java/org/springframework/data/aerospike/query/reactive/ReactiveIndexedPersonRepositoryQueryTests.java b/src/test/java/org/springframework/data/aerospike/query/reactive/ReactiveIndexedPersonRepositoryQueryTests.java index 66572b047..423a71d72 100644 --- a/src/test/java/org/springframework/data/aerospike/query/reactive/ReactiveIndexedPersonRepositoryQueryTests.java +++ b/src/test/java/org/springframework/data/aerospike/query/reactive/ReactiveIndexedPersonRepositoryQueryTests.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.TestInstance; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.aerospike.BaseReactiveIntegrationTests; +import org.springframework.data.aerospike.ReactiveBlockingAerospikeTestOperations; import org.springframework.data.aerospike.repository.query.CriteriaDefinition; import org.springframework.data.aerospike.sample.Address; import org.springframework.data.aerospike.sample.IndexedPerson; @@ -47,11 +48,13 @@ public class ReactiveIndexedPersonRepositoryQueryTests extends BaseReactiveInteg .intMap(of("key1", 0, "key2", 1)).ints(Arrays.asList(450, 550, 990)).build(); public static final List allIndexedPersons = Arrays.asList(alain, luc, lilly, daniel, petra, emilien); + @Autowired + ReactiveBlockingAerospikeTestOperations reactiveBlockingAerospikeTestOperations; @BeforeAll public void beforeAll() { - reactiveRepository.deleteAll(allIndexedPersons).block(); - reactiveRepository.saveAll(allIndexedPersons).subscribeOn(Schedulers.parallel()).collectList().block(); + reactiveBlockingAerospikeTestOperations.deleteAll(reactiveRepository, allIndexedPersons); + reactiveBlockingAerospikeTestOperations.saveAll(reactiveRepository, allIndexedPersons); reactiveTemplate.createIndex(IndexedPerson.class, "indexed_person_first_name_index", "firstName", IndexType.STRING).block(); reactiveTemplate.createIndex(IndexedPerson.class, "indexed_person_last_name_index", "lastName", @@ -73,12 +76,12 @@ public void beforeAll() { IndexType.STRING, IndexCollectionType.MAPKEYS).block(); reactiveTemplate.createIndex(IndexedPerson.class, "indexed_person_address_values_index", "address", IndexType.STRING, IndexCollectionType.MAPVALUES).block(); - reactorIndexRefresher.refreshIndexes(); } @AfterAll public void afterAll() { - reactiveRepository.deleteAll(allIndexedPersons).block(); + reactiveBlockingAerospikeTestOperations.deleteAll(reactiveRepository, allIndexedPersons); + additionalAerospikeTestOperations.dropIndex(IndexedPerson.class, "indexed_person_first_name_index"); additionalAerospikeTestOperations.dropIndex(IndexedPerson.class, "indexed_person_last_name_index"); additionalAerospikeTestOperations.dropIndex(IndexedPerson.class, "indexed_person_strings_index"); @@ -91,7 +94,6 @@ public void afterAll() { additionalAerospikeTestOperations.dropIndex(IndexedPerson.class, "indexed_person_int_map_values_index"); additionalAerospikeTestOperations.dropIndex(IndexedPerson.class, "indexed_person_address_keys_index"); additionalAerospikeTestOperations.dropIndex(IndexedPerson.class, "indexed_person_address_values_index"); - reactorIndexRefresher.refreshIndexes(); } @Test diff --git a/src/test/java/org/springframework/data/aerospike/repository/IndexedPersonRepositoryQueryTests.java b/src/test/java/org/springframework/data/aerospike/repository/IndexedPersonRepositoryQueryTests.java index 5a508fb17..f1aaacb1e 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/IndexedPersonRepositoryQueryTests.java +++ b/src/test/java/org/springframework/data/aerospike/repository/IndexedPersonRepositoryQueryTests.java @@ -58,8 +58,8 @@ public class IndexedPersonRepositoryQueryTests extends BaseBlockingIntegrationTe @BeforeAll public void beforeAll() { - repository.deleteAll(allIndexedPersons); - repository.saveAll(allIndexedPersons); + additionalAerospikeTestOperations.deleteAll(repository, allIndexedPersons); + additionalAerospikeTestOperations.saveAll(repository, allIndexedPersons); List newIndexes = new ArrayList<>(); newIndexes.add(Index.builder().set(template.getSetName(IndexedPerson.class)) @@ -107,7 +107,7 @@ public void beforeAll() { @AfterAll public void afterAll() { - repository.deleteAll(allIndexedPersons); + additionalAerospikeTestOperations.deleteAll(repository, allIndexedPersons); List dropIndexes = new ArrayList<>(); dropIndexes.add(Index.builder().set(template.getSetName(IndexedPerson.class)) diff --git a/src/test/java/org/springframework/data/aerospike/repository/PersonRepositoryQueryTests.java b/src/test/java/org/springframework/data/aerospike/repository/PersonRepositoryQueryTests.java index 40d871b08..e3184662d 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/PersonRepositoryQueryTests.java +++ b/src/test/java/org/springframework/data/aerospike/repository/PersonRepositoryQueryTests.java @@ -11,7 +11,7 @@ import org.springframework.data.aerospike.sample.Person; import org.springframework.data.aerospike.sample.PersonRepository; import org.springframework.data.aerospike.sample.PersonSomeFields; -import org.springframework.data.aerospike.utility.IndexUtils; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import org.springframework.data.aerospike.utility.TestUtils; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; @@ -69,13 +69,13 @@ public class PersonRepositoryQueryTests extends BaseBlockingIntegrationTests { @BeforeAll public void beforeAll() { template.refreshIndexesCache(); - repository.deleteAll(allPersons); - repository.saveAll(allPersons); + additionalAerospikeTestOperations.deleteAll(repository, allPersons); + additionalAerospikeTestOperations.saveAll(repository, allPersons); } @AfterAll public void afterAll() { - repository.deleteAll(allPersons); + additionalAerospikeTestOperations.deleteAll(repository, allPersons); } @Test @@ -507,7 +507,7 @@ void findByMapOfListsKeyValueEquals() { @Test void findByAddressesMapKeyValueEquals() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Address address1 = new Address("Foo Street 1", 1, "C0123", "Bar"); Address address2 = new Address("Foo Street 2", 1, "C0123", "Bar"); Address address3 = new Address("Foo Street 2", 1, "C0124", "Bar"); @@ -720,7 +720,7 @@ void findByMapOfListsKeyValueNotEqual() { @Test void findByAddressesMapKeyValueNotEqual() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Address address1 = new Address("Foo Street 1", 1, "C0123", "Bar"); Address address2 = new Address("Foo Street 2", 1, "C0123", "Bar"); Address address3 = new Address("Foo Street 2", 1, "C0124", "Bar"); @@ -819,7 +819,7 @@ void findByMapKeyValueLessThanOrEqual() { @Test void findByMapKeyValueGreaterThanList() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Map> mapOfLists1 = Map.of("0", List.of(100), "1", List.of(200), "2", List.of(300), "3", List.of(400)); Map> mapOfLists2 = Map.of("0", List.of(101), "1", List.of(201), "2", List.of(301), @@ -854,7 +854,7 @@ void findByMapKeyValueGreaterThanList() { @Test void findByMapKeyValueLessThanAddress() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Map mapOfAddresses1 = Map.of("a", new Address("Foo Street 1", 1, "C0123", "Bar")); Map mapOfAddresses2 = Map.of("b", new Address("Foo Street 2", 1, "C0123", "Bar")); Map mapOfAddresses3 = Map.of("c", new Address("Foo Street 2", 1, "C0124", "Bar")); @@ -912,7 +912,7 @@ void findByIntMapKeyValueBetween() { @Test void findByIntMapBetween() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { assertThat(carter.getIntMap()).isEqualTo(Map.of("key1", 0, "key2", 1)); Map map1 = Map.of("key1", -1, "key2", 0); @@ -926,7 +926,7 @@ void findByIntMapBetween() { @Test void findByMapOfListsBetween() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Map> mapOfLists1 = Map.of("0", List.of(100), "1", List.of(200)); Map> mapOfLists2 = Map.of("2", List.of(301), "3", List.of(401)); Map> mapOfLists3 = Map.of("1", List.of(102), "2", List.of(202)); @@ -1141,7 +1141,7 @@ public void deletePersonById() { @Test public void deletePersonsByIds() { - if (IndexUtils.isBatchWriteSupported(client)) { + if (ServerVersionUtils.isBatchWriteSupported(client)) { // batch delete requires server ver. >= 6.0.0 repository.deleteAllById(List.of(dave.getId(), carter.getId())); @@ -1154,15 +1154,16 @@ public void deletePersonsByIds() { @Test public void deleteAllPersonsFromList() { - if (IndexUtils.isBatchWriteSupported(client)) { + if (ServerVersionUtils.isBatchWriteSupported(client)) { // batch delete requires server ver. >= 6.0.0 repository.deleteAll(List.of(dave, carter)); - assertThat(repository.findAllById(List.of(dave.getId(), carter.getId()))).isEmpty(); - - repository.save(dave); // cleanup - repository.save(carter); // cleanup + } else { + List.of(dave, carter).forEach(repository::delete); } + assertThat(repository.findAllById(List.of(dave.getId(), carter.getId()))).isEmpty(); + repository.save(dave); // cleanup + repository.save(carter); // cleanup } @Test @@ -1646,7 +1647,7 @@ public void findPersonInRangeCorrectly() { it = repository.findByFirstNameBetween("Dave", "David"); assertThat(it).hasSize(1).contains(dave); - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { assertThat(dave.getAddress()).isEqualTo(new Address("Foo Street 1", 1, "C0123", "Bar")); Address address1 = new Address("Foo Street 1", 0, "C0123", "Bar"); Address address2 = new Address("Foo Street 2", 2, "C0124", "Bar"); @@ -1694,7 +1695,7 @@ public void findPersonsByFriendsInAgeRangeCorrectly() { @Test public void findPersonsByStringsList() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { List listToCompareWith = List.of("str0", "str1", "str2"); assertThat(dave.getStrings()).isEqualTo(listToCompareWith); @@ -1709,7 +1710,7 @@ public void findPersonsByStringsList() { @Test public void findPersonsByStringsListNotEqual() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { List listToCompareWith = List.of("str0", "str1", "str2"); assertThat(dave.getStrings()).isEqualTo(listToCompareWith); assertThat(donny.getStrings()).isNotEmpty(); @@ -1722,7 +1723,7 @@ public void findPersonsByStringsListNotEqual() { @Test public void findPersonsByStringsListLessThan() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { List davesStrings = dave.getStrings(); List listToCompareWith = List.of("str1", "str2", "str3"); List listWithFewerElements = List.of("str1", "str2"); @@ -1742,7 +1743,7 @@ public void findPersonsByStringsListLessThan() { @Test public void findPersonsByStringsListGreaterThanOrEqual() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Set setToCompareWith = Set.of(0, 1, 2, 3, 4); dave.setIntSet(setToCompareWith); repository.save(dave); @@ -1755,7 +1756,7 @@ public void findPersonsByStringsListGreaterThanOrEqual() { @Test public void findPersonsByStringMap() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Map mapToCompareWith = Map.of("key1", "val1", "key2", "val2"); assertThat(boyd.getStringMap()).isEqualTo(mapToCompareWith); @@ -1770,7 +1771,7 @@ public void findPersonsByStringMap() { @Test public void findPersonsByAddress() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Address address = new Address("Foo Street 1", 1, "C0123", "Bar"); dave.setAddress(address); repository.save(dave); @@ -1782,7 +1783,7 @@ public void findPersonsByAddress() { @Test public void findPersonsByAddressNotEqual() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Address address = new Address("Foo Street 1", 1, "C0123", "Bar"); assertThat(dave.getAddress()).isEqualTo(address); assertThat(carter.getAddress()).isNotNull(); @@ -1797,7 +1798,7 @@ public void findPersonsByAddressNotEqual() { @Test public void findPersonsByIntMapNotEqual() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Map mapToCompareWith = Map.of("key1", 0, "key2", 1); assertThat(carter.getIntMap()).isEqualTo(mapToCompareWith); assertThat(boyd.getIntMap()).isNullOrEmpty(); @@ -1815,7 +1816,7 @@ public void findPersonsByIntMapNotEqual() { @Test public void findPersonsByAddressLessThan() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Address address = new Address("Foo Street 2", 2, "C0124", "C0123"); assertThat(dave.getAddress()).isNotEqualTo(address); assertThat(carter.getAddress()).isEqualTo(address); @@ -1827,7 +1828,7 @@ public void findPersonsByAddressLessThan() { @Test public void findPersonsByStringMapGreaterThan() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { assertThat(boyd.getStringMap()).isNotEmpty(); assertThat(donny.getStringMap()).isNotEmpty(); @@ -1839,7 +1840,7 @@ public void findPersonsByStringMapGreaterThan() { @Test public void findPersonsByFriend() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { alicia.setAddress(new Address("Foo Street 1", 1, "C0123", "Bar")); repository.save(alicia); oliver.setFriend(alicia); @@ -1855,7 +1856,7 @@ public void findPersonsByFriend() { @Test public void findPersonsByFriendAddress() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Address address = new Address("Foo Street 1", 1, "C0123", "Bar"); dave.setAddress(address); repository.save(dave); @@ -1994,7 +1995,7 @@ public void findPersonsByFriendFriendFriendFriendFriendFriendFriendFriendBestFri @Test // find by deeply nested POJO public void findPersonsByFriendFriendFriendFriendFriendFriendFriendFriendBestFriendBestFriendAddress() { - if (IndexUtils.isFindByPojoSupported(client)) { + if (ServerVersionUtils.isFindByPojoSupported(client)) { Address address = new Address("Foo Street 1", 1, "C0123", "Bar"); dave.setAddress(address); repository.save(dave); diff --git a/src/test/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepositoryTest.java b/src/test/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepositoryTest.java index d8951701a..ba8fb419d 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepositoryTest.java +++ b/src/test/java/org/springframework/data/aerospike/repository/support/SimpleAerospikeRepositoryTest.java @@ -24,8 +24,10 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.aerospike.core.AerospikeOperations; import org.springframework.data.aerospike.sample.Person; +import org.springframework.data.aerospike.utility.AdditionalAerospikeTestOperations; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; @@ -60,6 +62,8 @@ public class SimpleAerospikeRepositoryTest { AerospikeOperations operations; @InjectMocks SimpleAerospikeRepository aerospikeRepository; + @Autowired + AdditionalAerospikeTestOperations additionalAerospikeTestOperations; Person testPerson; List testPersons; @@ -95,6 +99,7 @@ public void save() { @Test public void saveIterableOfS() { + // batch write operations are supported starting with Server version 6.0+ List result = aerospikeRepository.saveAll(testPersons); assertThat(result).isEqualTo(testPersons); @@ -179,8 +184,8 @@ public void deleteAllById() { @Test public void deleteIterableOfQExtendsT() { - aerospikeRepository.deleteAll(testPersons); - + // batch write operations are supported starting with Server version 6.0+ + additionalAerospikeTestOperations.deleteAll(aerospikeRepository, testPersons); verify(operations, times(testPersons.size())).delete(any(Person.class)); } diff --git a/src/test/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepositoryTest.java b/src/test/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepositoryTest.java index 3bdb38a33..80d35cb5f 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepositoryTest.java +++ b/src/test/java/org/springframework/data/aerospike/repository/support/SimpleReactiveAerospikeRepositoryTest.java @@ -22,6 +22,8 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.aerospike.ReactiveBlockingAerospikeTestOperations; import org.springframework.data.aerospike.core.ReactiveAerospikeOperations; import org.springframework.data.aerospike.sample.Customer; import org.springframework.data.repository.core.EntityInformation; @@ -56,6 +58,8 @@ public class SimpleReactiveAerospikeRepositoryTest { private Customer testCustomer; private List testCustomers; + @Autowired + ReactiveBlockingAerospikeTestOperations reactiveBlockingAerospikeTestOperations; @BeforeEach public void setUp() { @@ -201,7 +205,7 @@ public void testDelete() { public void testDeleteAllIterable() { when(operations.delete(any(Customer.class))).thenReturn(Mono.just(true)); - repository.deleteAll(testCustomers).block(); + reactiveBlockingAerospikeTestOperations.deleteAll(repository, testCustomers); verify(operations, times(testCustomers.size())).delete(any(Customer.class)); } diff --git a/src/test/java/org/springframework/data/aerospike/utility/AdditionalAerospikeTestOperations.java b/src/test/java/org/springframework/data/aerospike/utility/AdditionalAerospikeTestOperations.java index 2619328d7..61e699e20 100644 --- a/src/test/java/org/springframework/data/aerospike/utility/AdditionalAerospikeTestOperations.java +++ b/src/test/java/org/springframework/data/aerospike/utility/AdditionalAerospikeTestOperations.java @@ -1,5 +1,6 @@ package org.springframework.data.aerospike.utility; +import com.aerospike.client.AerospikeException; import com.aerospike.client.Bin; import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Key; @@ -20,6 +21,7 @@ import org.springframework.data.aerospike.index.IndexesCacheRefresher; import org.springframework.data.aerospike.query.cache.IndexInfoParser; import org.springframework.data.aerospike.query.model.Index; +import org.springframework.data.aerospike.repository.AerospikeRepository; import org.springframework.data.aerospike.sample.Customer; import org.springframework.data.aerospike.sample.Person; import org.testcontainers.containers.Container; @@ -160,6 +162,28 @@ public void dropIndexes(Collection indexesToBeDropped) { indexesRefresher.refreshIndexesCache(); } + public void deleteAll(AerospikeRepository repository, Collection entities) { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + try { + repository.deleteAll(entities); + } catch (AerospikeException.BatchRecordArray ignored) { + // KEY_NOT_FOUND ResultCode causes exception if there are no entities + } + } else { + entities.forEach(repository::delete); + } + } + + public void saveAll(AerospikeRepository repository, Collection entities) { + // batch write operations are supported starting with Server version 6.0+ + if (ServerVersionUtils.isBatchWriteSupported(client)) { + repository.saveAll(entities); + } else { + entities.forEach(repository::save); + } + } + /** * @deprecated since Aerospike Server ver. 6.1.0.1. Use * {@link org.springframework.data.aerospike.core.AerospikeTemplate#indexExists(String)} diff --git a/src/test/java/org/springframework/data/aerospike/utility/IndexUtils.java b/src/test/java/org/springframework/data/aerospike/utility/IndexUtils.java index 6f2010ab8..c09ccaabb 100644 --- a/src/test/java/org/springframework/data/aerospike/utility/IndexUtils.java +++ b/src/test/java/org/springframework/data/aerospike/utility/IndexUtils.java @@ -12,7 +12,6 @@ import org.springframework.data.aerospike.query.cache.IndexInfoParser; import org.springframework.data.aerospike.query.model.Index; -import java.lang.module.ModuleDescriptor; import java.util.Arrays; import java.util.List; import java.util.function.Supplier; @@ -20,12 +19,8 @@ public class IndexUtils { - private static final ModuleDescriptor.Version SERVER_VERSION_6_0_0_0 = ModuleDescriptor.Version.parse("6.0.0.0"); - private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_1 = ModuleDescriptor.Version.parse("6.1.0.1"); - private static final ModuleDescriptor.Version SERVER_VERSION_6_3_0_0 = ModuleDescriptor.Version.parse("6.3.0.0"); - static void dropIndex(IAerospikeClient client, String namespace, String setName, String indexName) { - if (IndexUtils.isDropCreateBehaviorUpdated(client)) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(client)) { waitTillComplete(() -> client.dropIndex(null, namespace, setName, indexName)); } else { // ignoring ResultCode.INDEX_NOTFOUND for Aerospike Server prior to ver. 6.1.0.1 @@ -36,7 +31,7 @@ static void dropIndex(IAerospikeClient client, String namespace, String setName, static void createIndex(IAerospikeClient client, String namespace, String setName, String indexName, String binName, IndexType indexType, IndexCollectionType collectionType, CTX[] ctx) { - if (IndexUtils.isDropCreateBehaviorUpdated(client)) { + if (ServerVersionUtils.isDropCreateBehaviorUpdated(client)) { waitTillComplete(() -> client.createIndex(null, namespace, setName, indexName, binName, indexType, collectionType, ctx)); } else { @@ -64,33 +59,6 @@ public static boolean indexExists(IAerospikeClient client, String namespace, Str return !response.startsWith("FAIL:201"); } - public static String getServerVersion(IAerospikeClient client) { - String versionString = Info.request(client.getCluster().getRandomNode(), "version"); - return versionString.substring(versionString.lastIndexOf(' ') + 1); - } - - /** - * Since Aerospike Server ver. 6.1.0.1 attempting to create a secondary index which already exists or to drop a - * non-existing secondary index now returns success/OK instead of an error. - */ - public static boolean isDropCreateBehaviorUpdated(IAerospikeClient client) { - return ModuleDescriptor.Version.parse(IndexUtils.getServerVersion(client)) - .compareTo(SERVER_VERSION_6_1_0_1) >= 0; - } - - /** - * Since Aerospike Server ver. 6.3.0.0 find by POJO is supported. - */ - public static boolean isFindByPojoSupported(IAerospikeClient client) { - return ModuleDescriptor.Version.parse(IndexUtils.getServerVersion(client)) - .compareTo(SERVER_VERSION_6_3_0_0) >= 0; - } - - public static boolean isBatchWriteSupported(IAerospikeClient client) { - return ModuleDescriptor.Version.parse(IndexUtils.getServerVersion(client)) - .compareTo(SERVER_VERSION_6_0_0_0) >= 0; - } - private static void waitTillComplete(Supplier supplier) { IndexTask task = supplier.get(); if (task == null) { diff --git a/src/test/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java b/src/test/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java new file mode 100644 index 000000000..e5ceac184 --- /dev/null +++ b/src/test/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java @@ -0,0 +1,40 @@ +package org.springframework.data.aerospike.utility; + +import com.aerospike.client.IAerospikeClient; +import com.aerospike.client.Info; + +import java.lang.module.ModuleDescriptor; + +public class ServerVersionUtils { + + private static final ModuleDescriptor.Version SERVER_VERSION_6_0_0_0 = ModuleDescriptor.Version.parse("6.0.0.0"); + private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_1 = ModuleDescriptor.Version.parse("6.1.0.1"); + private static final ModuleDescriptor.Version SERVER_VERSION_6_3_0_0 = ModuleDescriptor.Version.parse("6.3.0.0"); + + public static String getServerVersion(IAerospikeClient client) { + String versionString = Info.request(client.getCluster().getRandomNode(), "version"); + return versionString.substring(versionString.lastIndexOf(' ') + 1); + } + + /** + * Since Aerospike Server ver. 6.1.0.1 attempting to create a secondary index which already exists or to drop a + * non-existing secondary index now returns success/OK instead of an error. + */ + public static boolean isDropCreateBehaviorUpdated(IAerospikeClient client) { + return ModuleDescriptor.Version.parse(ServerVersionUtils.getServerVersion(client)) + .compareTo(SERVER_VERSION_6_1_0_1) >= 0; + } + + /** + * Since Aerospike Server ver. 6.3.0.0 find by POJO is supported. + */ + public static boolean isFindByPojoSupported(IAerospikeClient client) { + return ModuleDescriptor.Version.parse(ServerVersionUtils.getServerVersion(client)) + .compareTo(SERVER_VERSION_6_3_0_0) >= 0; + } + + public static boolean isBatchWriteSupported(IAerospikeClient client) { + return ModuleDescriptor.Version.parse(ServerVersionUtils.getServerVersion(client)) + .compareTo(SERVER_VERSION_6_0_0_0) >= 0; + } +}