Skip to content

Commit

Permalink
FMWK-618 Copy policies for transactional queries (#798)
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr authored Dec 10, 2024
1 parent 6d5dcc9 commit 1fcaa06
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import static org.springframework.data.aerospike.core.CoreUtils.getDistinctPredicate;
import static org.springframework.data.aerospike.core.CoreUtils.operations;
import static org.springframework.data.aerospike.core.CoreUtils.verifyUnsortedWithOffset;
import static org.springframework.data.aerospike.core.TemplateUtils.checkForTransaction;
import static org.springframework.data.aerospike.core.TemplateUtils.enrichPolicyWithTransaction;
import static org.springframework.data.aerospike.core.TemplateUtils.excludeIdQualifier;
import static org.springframework.data.aerospike.core.TemplateUtils.getBinNamesFromTargetClass;
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
Expand Down Expand Up @@ -111,7 +111,7 @@ public AerospikeTemplate(IAerospikeClient client,
QueryEngine queryEngine,
IndexRefresher indexRefresher,
ServerVersionSupport serverVersionSupport) {
super(namespace, converter, mappingContext, exceptionTranslator, client.getWritePolicyDefault(),
super(namespace, converter, mappingContext, exceptionTranslator, client.copyWritePolicyDefault(),
serverVersionSupport);
this.client = client;
this.queryEngine = queryEngine;
Expand Down Expand Up @@ -208,8 +208,8 @@ private <T> void batchWriteAllDocuments(List<T> documents, String setName, Opera

List<BatchRecord> batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList();
try {
BatchPolicy bPolicy = (BatchPolicy) checkForTransaction(client, client.getBatchPolicyDefault());
client.operate(bPolicy, batchWriteRecords);
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.copyBatchPolicyDefault());
client.operate(batchPolicy, batchWriteRecords);
} catch (AerospikeException e) {
throw translateError(e); // no exception is thrown for versions mismatch, only record's result code shows it
}
Expand Down Expand Up @@ -308,7 +308,8 @@ public <T> void persist(T document, WritePolicy writePolicy, String setName) {
AerospikeWriteData data = writeData(document, setName);

Operation[] operations = operations(data.getBinsAsArray(), Operation::put);
doPersistAndHandleError(data, writePolicy, operations);
// not using initial writePolicy instance because it can get enriched with transaction id
doPersistAndHandleError(data, new WritePolicy(writePolicy), operations);
}

@Override
Expand Down Expand Up @@ -401,7 +402,7 @@ public <T> boolean delete(T document, String setName) {
private boolean doDeleteWithVersionAndHandleCasError(AerospikeWriteData data) {
try {
WritePolicy writePolicy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);
return client.delete(writePolicy, data.getKey());
} catch (AerospikeException e) {
throw translateCasError(e, "Failed to delete record due to versions mismatch");
Expand All @@ -411,7 +412,7 @@ private boolean doDeleteWithVersionAndHandleCasError(AerospikeWriteData data) {
private boolean doDeleteIgnoreVersionAndTranslateError(AerospikeWriteData data) {
try {
WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);
return client.delete(writePolicy, data.getKey());
} catch (AerospikeException e) {
throw translateError(e);
Expand All @@ -431,7 +432,7 @@ public boolean deleteById(Object id, String setName) {

try {
Key key = getKey(id, setName);
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, ignoreGenerationPolicy());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, ignoreGenerationPolicy());
return client.delete(writePolicy, key);
} catch (AerospikeException e) {
throw translateError(e);
Expand Down Expand Up @@ -593,7 +594,7 @@ public void deleteAll(String setName, Instant beforeLastUpdate) {
private void deleteAndHandleErrors(IAerospikeClient client, Key[] keys) {
BatchResults results;
try {
BatchPolicy batchPolicy = (BatchPolicy) checkForTransaction(client, client.getBatchPolicyDefault());
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.copyBatchPolicyDefault());
results = client.delete(batchPolicy, null, keys);
} catch (AerospikeException e) {
throw translateError(e);
Expand Down Expand Up @@ -627,10 +628,10 @@ public <T> T add(T document, String setName, Map<String, Long> values) {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.ADD, Operation.get());

WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
WritePolicy writePolicy = WritePolicyBuilder.builder(writePolicyDefault)
.expiration(data.getExpiration())
.build();
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);

Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);

Expand All @@ -654,10 +655,10 @@ public <T> T add(T document, String setName, String binName, long value) {
try {
AerospikeWriteData data = writeData(document, setName);

WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
WritePolicy writePolicy = WritePolicyBuilder.builder(writePolicyDefault)
.expiration(data.getExpiration())
.build();
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);

Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.add(new Bin(binName, value)), Operation.get());
Expand All @@ -682,7 +683,7 @@ public <T> T append(T document, String setName, Map<String, String> values) {
try {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.APPEND, Operation.get());
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);

return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord);
Expand All @@ -704,7 +705,7 @@ public <T> T append(T document, String setName, String binName, String value) {

try {
AerospikeWriteData data = writeData(document, setName);
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.append(new Bin(binName, value)),
Operation.get(binName));
Expand All @@ -728,7 +729,7 @@ public <T> T prepend(T document, String setName, String fieldName, String value)

try {
AerospikeWriteData data = writeData(document, setName);
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.prepend(new Bin(fieldName, value)),
Operation.get(fieldName));
Expand All @@ -753,7 +754,7 @@ public <T> T prepend(T document, String setName, Map<String, String> values) {
try {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.PREPEND, Operation.get());
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);

return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord);
Expand Down Expand Up @@ -808,15 +809,15 @@ private Record getRecord(AerospikePersistentEntity<?> entity, Key key, @Nullable
Assert.state(!entity.hasExpirationProperty(), "Touch on read is not supported for expiration property");
aeroRecord = getAndTouch(key, entity.getExpiration(), null, null);
} else {
Policy policy = checkForTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
Policy policy = enrichPolicyWithTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
aeroRecord = client.get(policy, key);
}
return aeroRecord;
}

private BatchPolicy getBatchPolicyFilterExp(Query query) {
if (queryCriteriaIsNotNull(query)) {
BatchPolicy batchPolicy = new BatchPolicy(getAerospikeClient().getBatchPolicyDefault());
BatchPolicy batchPolicy = getAerospikeClient().copyBatchPolicyDefault();
Qualifier qualifier = query.getCriteriaObject();
batchPolicy.filterExp = queryEngine.getFilterExpressionsBuilder().build(qualifier);
return batchPolicy;
Expand All @@ -838,7 +839,7 @@ private <S> Object getRecordMapToTargetClass(AerospikePersistentEntity<?> entity
Assert.state(!entity.hasExpirationProperty(), "Touch on read is not supported for expiration property");
aeroRecord = getAndTouch(key, entity.getExpiration(), binNames, query);
} else {
Policy policy = checkForTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
Policy policy = enrichPolicyWithTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
aeroRecord = client.get(policy, key, binNames);
}
return mapToEntity(key, targetClass, aeroRecord);
Expand All @@ -855,15 +856,15 @@ private Policy getPolicyFilterExp(Query query) {
}

private Record getAndTouch(Key key, int expiration, String[] binNames, @Nullable Query query) {
WritePolicyBuilder writePolicyBuilder = WritePolicyBuilder.builder(client.getWritePolicyDefault())
WritePolicyBuilder writePolicyBuilder = WritePolicyBuilder.builder(client.copyWritePolicyDefault())
.expiration(expiration);

if (queryCriteriaIsNotNull(query)) {
Qualifier qualifier = query.getCriteriaObject();
writePolicyBuilder.filterExp(queryEngine.getFilterExpressionsBuilder().build(qualifier));
}
WritePolicy writePolicy = writePolicyBuilder.build();
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);

try {
if (binNames == null || binNames.length == 0) {
Expand Down Expand Up @@ -944,8 +945,8 @@ public GroupedEntities findByIds(GroupedKeys groupedKeys) {

private GroupedEntities findGroupedEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
BatchPolicy bPolicy = (BatchPolicy) checkForTransaction(client, client.getBatchPolicyDefault());
Record[] aeroRecords = client.get(bPolicy, entitiesKeys.getKeys());
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.copyBatchPolicyDefault());
Record[] aeroRecords = client.get(batchPolicy, entitiesKeys.getKeys());

return toGroupedEntities(entitiesKeys, aeroRecords);
}
Expand Down Expand Up @@ -1000,7 +1001,7 @@ public <T, S> List<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClas
.map(id -> getKey(id, setName))
.toArray(Key[]::new);

BatchPolicy batchPolicy = (BatchPolicy) checkForTransaction(client, getBatchPolicyFilterExp(query));
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, getBatchPolicyFilterExp(query));
Class<?> target;
Record[] aeroRecords;
if (targetClass != null && targetClass != entityClass) {
Expand Down Expand Up @@ -1170,7 +1171,7 @@ public boolean exists(Object id, String setName) {
try {
Key key = getKey(id, setName);

WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, key, Operation.getHeader());
return aeroRecord != null;
} catch (AerospikeException e) {
Expand Down Expand Up @@ -1426,7 +1427,7 @@ public boolean indexExists(String indexName) {

private Record doPersistAndHandleError(AerospikeWriteData data, WritePolicy writePolicy, Operation[] operations) {
try {
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);
return client.operate(writePolicy, data.getKey(), operations);
} catch (AerospikeException e) {
throw translateError(e);
Expand Down Expand Up @@ -1455,7 +1456,7 @@ private <T> void doPersistWithVersionAndHandleError(T document, AerospikeWriteDa
private Record putAndGetHeader(AerospikeWriteData data, WritePolicy writePolicy, boolean firstlyDeleteBins) {
Key key = data.getKey();
Operation[] operations = getPutAndGetHeaderOperations(data, firstlyDeleteBins);
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);

return client.operate(writePolicy, key, operations);
}
Expand Down Expand Up @@ -1540,13 +1541,13 @@ private List<KeyRecord> findByIdsWithoutMapping(Collection<?> ids, String setNam
try {
Key[] keys = getKeys(ids, setName);

BatchPolicy bPolicy = (BatchPolicy) checkForTransaction(client, getBatchPolicyFilterExp(query));
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, getBatchPolicyFilterExp(query));
Record[] aeroRecords;
if (targetClass != null) {
String[] binNames = getBinNamesFromTargetClass(targetClass, mappingContext);
aeroRecords = client.get(bPolicy, keys, binNames);
aeroRecords = client.get(batchPolicy, keys, binNames);
} else {
aeroRecords = client.get(bPolicy, keys);
aeroRecords = client.get(batchPolicy, keys);
}

return IntStream.range(0, keys.length)
Expand Down
Loading

0 comments on commit 1fcaa06

Please sign in to comment.