Skip to content

Commit

Permalink
FMWK-247 Add support for batch write operations (#640)
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr authored Oct 8, 2023
1 parent 5cee8cd commit 37bcc71
Show file tree
Hide file tree
Showing 43 changed files with 1,045 additions and 282 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.springframework.data.aerospike.core;

import com.aerospike.client.AerospikeException;
import org.springframework.data.aerospike.query.Qualifier;

import java.util.Collection;
Expand Down Expand Up @@ -42,6 +43,7 @@ <T, S> List<?> findByIdsInternal(Collection<?> ids, Class<T> entityClass, Class<
*
* @param ids The ids of the documents to delete. Must not be {@literal null}.
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @throws AerospikeException.BatchRecordArray if batch delete results contain errors or null records
*/
<T> void deleteByIdsInternal(Collection<?> ids, Class<T> entityClass);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.aerospike.core;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
Expand Down Expand Up @@ -73,6 +74,21 @@ public interface AerospikeOperations {
*/
<T> void insert(T document);

/**
* Insert multiple documents in one batch request. The policies are analogous to {@link #insert(Object)}.
* <p>
* The order of returned results is preserved. The execution order is NOT preserved.
* <p>
* This operation requires Server version 6.0+.
*
* @param documents Documents to insert. Must not be {@literal null}.
* @throws AerospikeException.BatchRecordArray if batch insert succeeds, but results contain errors or null
* records
* @throws org.springframework.dao.DataAccessException if batch operation failed (see
* {@link DefaultAerospikeExceptionTranslator} for details)
*/
<T> void insertAll(Iterable<? extends T> documents);

/**
* Save a document.
* <p>
Expand All @@ -93,6 +109,21 @@ public interface AerospikeOperations {
*/
<T> void save(T document);

/**
* Save multiple documents in one batch request. The policies are analogous to {@link #save(Object)}.
* <p>
* The order of returned results is preserved. The execution order is NOT preserved.
* <p>
* This operation requires Server version 6.0+.
*
* @param documents Documents to save. Must not be {@literal null}.
* @throws AerospikeException.BatchRecordArray if batch save succeeds, but results contain errors or null
* records
* @throws org.springframework.dao.DataAccessException if batch operation failed (see
* {@link DefaultAerospikeExceptionTranslator} for details)
*/
<T> void saveAll(Iterable<T> documents);

/**
* Persist a document using specified WritePolicy.
*
Expand All @@ -102,13 +133,6 @@ public interface AerospikeOperations {
*/
<T> void persist(T document, WritePolicy writePolicy);

/**
* Insert each document of the given documents using single insert operations.
*
* @param documents The documents to insert. Must not be {@literal null}.
*/
<T> void insertAll(Collection<? extends T> documents);

/**
* Update a document using {@link com.aerospike.client.policy.RecordExistsAction#UPDATE_ONLY} policy combined with
* removing bins at first (analogous to {@link com.aerospike.client.policy.RecordExistsAction#REPLACE_ONLY}) taking
Expand All @@ -132,6 +156,21 @@ public interface AerospikeOperations {
*/
<T> void update(T document, Collection<String> fields);

/**
* Update multiple documents in one batch request. The policies are analogous to {@link #update(Object)}.
* <p>
* The order of returned results is preserved. The execution order is NOT preserved.
* <p>
* This operation requires Server version 6.0+.
*
* @param documents Documents to update. Must not be {@literal null}.
* @throws AerospikeException.BatchRecordArray if batch update succeeds, but results contain errors or null
* records
* @throws org.springframework.dao.DataAccessException if batch operation failed (see
* {@link DefaultAerospikeExceptionTranslator} for details)
*/
<T> void updateAll(Iterable<T> documents);

/**
* Truncate/Delete all the documents in the given entity's set.
*
Expand Down Expand Up @@ -165,6 +204,9 @@ public interface AerospikeOperations {
* @param ids The ids of the documents to delete. Must not be {@literal null}.
* @param entityClass The class to extract the Aerospike set from and to map the documents to. Must not be
* {@literal null}.
* @throws AerospikeException.BatchRecordArray if batch delete results contain errors
* @throws org.springframework.dao.DataAccessException if batch operation failed (see
* {@link DefaultAerospikeExceptionTranslator} for details)
*/
<T> void deleteByIds(Iterable<?> ids, Class<T> entityClass);

Expand All @@ -178,6 +220,9 @@ public interface AerospikeOperations {
* This operation requires Server version 6.0+.
*
* @param groupedKeys Must not be {@literal null}.
* @throws AerospikeException.BatchRecordArray if batch delete results contain errors
* @throws org.springframework.dao.DataAccessException if batch operation failed (see
* {@link DefaultAerospikeExceptionTranslator} for details)
*/
void deleteByIds(GroupedKeys groupedKeys);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,8 @@
*/
package org.springframework.data.aerospike.core;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.*;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.BatchPolicy;
Expand Down Expand Up @@ -66,7 +59,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -237,6 +229,41 @@ public <T> void save(T document) {
}
}

@Override
public <T> void saveAll(Iterable<T> documents) {
Assert.notNull(documents, "Documents for saving must not be null!");

List<BatchWriteData<T>> batchWriteDataList = new ArrayList<>();
documents.forEach(document -> batchWriteDataList.add(getBatchWriteForSave(document)));

List<BatchRecord> batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList();
try {
client.operate(null, batchWriteRecords);
} catch (AerospikeException e) {
throw translateError(e);
}

checkForErrorsAndUpdateVersion(batchWriteDataList, batchWriteRecords, "save");
}

private <T> void checkForErrorsAndUpdateVersion(List<BatchWriteData<T>> batchWriteDataList,
List<BatchRecord> batchWriteRecords, String commandName) {
boolean errorsFound = false;
for (AerospikeTemplate.BatchWriteData<T> data : batchWriteDataList) {
if (!errorsFound && batchRecordFailed(data.batchRecord())) {
errorsFound = true;
}
if (data.hasVersionProperty() && !batchRecordFailed(data.batchRecord())) {
updateVersion(data.document(), data.batchRecord().record);
}
}

if (errorsFound) {
AerospikeException e = new AerospikeException("Errors during batch " + commandName);
throw new AerospikeException.BatchRecordArray(batchWriteRecords.toArray(BatchRecord[]::new), e);
}
}

@Override
public <T> void persist(T document, WritePolicy policy) {
Assert.notNull(document, "Document must not be null!");
Expand All @@ -248,13 +275,6 @@ public <T> void persist(T document, WritePolicy policy) {
doPersistAndHandleError(data, policy, operations);
}

@Override
public <T> void insertAll(Collection<? extends T> documents) {
Assert.notNull(documents, "Documents must not be null!");

documents.stream().filter(Objects::nonNull).forEach(this::insert);
}

@Override
public <T> void insert(T document) {
Assert.notNull(document, "Document must not be null!");
Expand All @@ -276,6 +296,23 @@ public <T> void insert(T document) {
}
}

@Override
public <T> void insertAll(Iterable<? extends T> documents) {
Assert.notNull(documents, "Documents for inserting must not be null!");

List<BatchWriteData<T>> batchWriteDataList = new ArrayList<>();
documents.forEach(document -> batchWriteDataList.add(getBatchWriteForInsert(document)));

List<BatchRecord> batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList();
try {
client.operate(null, batchWriteRecords);
} catch (AerospikeException e) {
throw translateError(e);
}

checkForErrorsAndUpdateVersion(batchWriteDataList, batchWriteRecords, "insert");
}

@Override
public <T> void update(T document) {
Assert.notNull(document, "Document must not be null!");
Expand Down Expand Up @@ -315,6 +352,23 @@ public <T> void update(T document, Collection<String> fields) {
}
}

@Override
public <T> void updateAll(Iterable<T> documents) {
Assert.notNull(documents, "Documents for inserting must not be null!");

List<BatchWriteData<T>> batchWriteDataList = new ArrayList<>();
documents.forEach(document -> batchWriteDataList.add(getBatchWriteForUpdate(document)));

List<BatchRecord> batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList();
try {
client.operate(null, batchWriteRecords);
} catch (AerospikeException e) {
throw translateError(e);
}

checkForErrorsAndUpdateVersion(batchWriteDataList, batchWriteRecords, "update");
}

@Override
public <T> void delete(Class<T> entityClass) {
Assert.notNull(entityClass, "Class must not be null!");
Expand Down Expand Up @@ -363,6 +417,53 @@ public <T> void deleteByIds(Iterable<?> ids, Class<T> entityClass) {
deleteByIdsInternal(IterableConverter.toList(ids), entityClass);
}

@Override
public <T> void deleteByIdsInternal(Collection<?> ids, Class<T> entityClass) {
if (ids.isEmpty()) {
return;
}

AerospikePersistentEntity<?> entity = mappingContext.getRequiredPersistentEntity(entityClass);

Key[] keys = ids.stream()
.map(id -> getKey(id, entity))
.toArray(Key[]::new);

checkForErrors(client, keys);
}

private void checkForErrors(IAerospikeClient client, Key[] keys) {
BatchResults results;
try {
// requires server ver. >= 6.0.0
results = client.delete(null, null, keys);
} catch (AerospikeException e) {
throw translateError(e);
}

for (int i = 0; i < results.records.length; i++) {
BatchRecord record = results.records[i];
if (batchRecordFailed(record)) {
throw new AerospikeException.BatchRecordArray(results.records,
new AerospikeException("Errors during batch delete"));
}
}
}

@Override
public void deleteByIds(GroupedKeys groupedKeys) {
Assert.notNull(groupedKeys, "Grouped keys must not be null!");
Assert.notNull(groupedKeys.getEntitiesKeys(), "Entities keys must not be null!");
Assert.notEmpty(groupedKeys.getEntitiesKeys(), "Entities keys must not be empty!");

deleteEntitiesByIdsInternal(groupedKeys);
}

private void deleteEntitiesByIdsInternal(GroupedKeys groupedKeys) {
EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
checkForErrors(client, entitiesKeys.getKeys());
}

@Override
public <T> boolean exists(Object id, Class<T> entityClass) {
Assert.notNull(id, "Id must not be null!");
Expand Down Expand Up @@ -471,26 +572,6 @@ public <T, S> List<?> findByIdsInternal(Collection<?> ids, Class<T> entityClass,
}
}

@Override
public <T> void deleteByIdsInternal(Collection<?> ids, Class<T> entityClass) {
if (ids.isEmpty()) {
return;
}

try {
AerospikePersistentEntity<?> entity = mappingContext.getRequiredPersistentEntity(entityClass);

Key[] keys = ids.stream()
.map(id -> getKey(id, entity))
.toArray(Key[]::new);

// requires server ver. >= 6.0.0
client.delete(null, null, keys);
} catch (AerospikeException e) {
throw translateError(e);
}
}

<S> Object getRecordMapToTargetClass(AerospikePersistentEntity<?> entity, Key key, Class<S> targetClass,
Qualifier... qualifiers) {
Record aeroRecord;
Expand Down Expand Up @@ -603,22 +684,6 @@ private GroupedEntities findEntitiesByIdsInternal(GroupedKeys groupedKeys) {
return toGroupedEntities(entitiesKeys, aeroRecords);
}

@Override
public void deleteByIds(GroupedKeys groupedKeys) {
Assert.notNull(groupedKeys, "Grouped keys must not be null!");

if (groupedKeys.getEntitiesKeys() == null || groupedKeys.getEntitiesKeys().isEmpty()) {
return;
}

deleteEntitiesByIdsInternal(groupedKeys);
}

private void deleteEntitiesByIdsInternal(GroupedKeys groupedKeys) {
EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
client.delete(null, null, entitiesKeys.getKeys());
}

@Override
public <T> ResultSet aggregate(Filter filter, Class<T> entityClass,
String module, String function, List<Value> arguments) {
Expand Down Expand Up @@ -887,16 +952,7 @@ private <T> void doPersistWithVersionAndHandleError(T document, AerospikeWriteDa

private Record putAndGetHeader(AerospikeWriteData data, WritePolicy policy, boolean firstlyDeleteBins) {
Key key = data.getKey();
Bin[] bins = data.getBinsAsArray();

if (bins.length == 0) {
throw new AerospikeException(
"Cannot put and get header on a document with no bins and \"@_class\" bin disabled.");
}

Operation[] operations = firstlyDeleteBins ? operations(bins, Operation::put,
Operation.array(Operation.delete()), Operation.array(Operation.getHeader()))
: operations(bins, Operation::put, null, Operation.array(Operation.getHeader()));
Operation[] operations = getPutAndGetHeaderOperations(data, firstlyDeleteBins);

return client.operate(policy, key, operations);
}
Expand Down
Loading

0 comments on commit 37bcc71

Please sign in to comment.