Skip to content

Commit

Permalink
use new Policy instances when enriching with transaction id
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr committed Dec 10, 2024
1 parent 6d5dcc9 commit 33a81f8
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import static org.springframework.data.aerospike.core.CoreUtils.getDistinctPredicate;
import static org.springframework.data.aerospike.core.CoreUtils.operations;
import static org.springframework.data.aerospike.core.CoreUtils.verifyUnsortedWithOffset;
import static org.springframework.data.aerospike.core.TemplateUtils.checkForTransaction;
import static org.springframework.data.aerospike.core.TemplateUtils.enrichPolicyWithTransaction;
import static org.springframework.data.aerospike.core.TemplateUtils.excludeIdQualifier;
import static org.springframework.data.aerospike.core.TemplateUtils.getBinNamesFromTargetClass;
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
Expand Down Expand Up @@ -208,7 +208,7 @@ private <T> void batchWriteAllDocuments(List<T> documents, String setName, Opera

List<BatchRecord> batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList();
try {
BatchPolicy bPolicy = (BatchPolicy) checkForTransaction(client, client.getBatchPolicyDefault());
BatchPolicy bPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.getBatchPolicyDefault());
client.operate(bPolicy, batchWriteRecords);
} catch (AerospikeException e) {
throw translateError(e); // no exception is thrown for versions mismatch, only record's result code shows it
Expand Down Expand Up @@ -401,7 +401,7 @@ public <T> boolean delete(T document, String setName) {
private boolean doDeleteWithVersionAndHandleCasError(AerospikeWriteData data) {
try {
WritePolicy writePolicy = expectGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);
return client.delete(writePolicy, data.getKey());
} catch (AerospikeException e) {
throw translateCasError(e, "Failed to delete record due to versions mismatch");
Expand All @@ -411,7 +411,7 @@ private boolean doDeleteWithVersionAndHandleCasError(AerospikeWriteData data) {
private boolean doDeleteIgnoreVersionAndTranslateError(AerospikeWriteData data) {
try {
WritePolicy writePolicy = ignoreGenerationPolicy(data, RecordExistsAction.UPDATE_ONLY);
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);
return client.delete(writePolicy, data.getKey());
} catch (AerospikeException e) {
throw translateError(e);
Expand All @@ -431,7 +431,7 @@ public boolean deleteById(Object id, String setName) {

try {
Key key = getKey(id, setName);
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, ignoreGenerationPolicy());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, ignoreGenerationPolicy());
return client.delete(writePolicy, key);
} catch (AerospikeException e) {
throw translateError(e);
Expand Down Expand Up @@ -593,7 +593,7 @@ public void deleteAll(String setName, Instant beforeLastUpdate) {
private void deleteAndHandleErrors(IAerospikeClient client, Key[] keys) {
BatchResults results;
try {
BatchPolicy batchPolicy = (BatchPolicy) checkForTransaction(client, client.getBatchPolicyDefault());
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.getBatchPolicyDefault());
results = client.delete(batchPolicy, null, keys);
} catch (AerospikeException e) {
throw translateError(e);
Expand Down Expand Up @@ -630,7 +630,7 @@ public <T> T add(T document, String setName, Map<String, Long> values) {
WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(data.getExpiration())
.build();
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);

Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);

Expand All @@ -657,7 +657,7 @@ public <T> T add(T document, String setName, String binName, long value) {
WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(data.getExpiration())
.build();
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);

Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.add(new Bin(binName, value)), Operation.get());
Expand All @@ -682,7 +682,7 @@ public <T> T append(T document, String setName, Map<String, String> values) {
try {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.APPEND, Operation.get());
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);

return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord);
Expand All @@ -704,7 +704,7 @@ public <T> T append(T document, String setName, String binName, String value) {

try {
AerospikeWriteData data = writeData(document, setName);
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.append(new Bin(binName, value)),
Operation.get(binName));
Expand All @@ -728,7 +728,7 @@ public <T> T prepend(T document, String setName, String fieldName, String value)

try {
AerospikeWriteData data = writeData(document, setName);
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.prepend(new Bin(fieldName, value)),
Operation.get(fieldName));
Expand All @@ -753,7 +753,7 @@ public <T> T prepend(T document, String setName, Map<String, String> values) {
try {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.PREPEND, Operation.get());
WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);

return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord);
Expand Down Expand Up @@ -808,7 +808,7 @@ private Record getRecord(AerospikePersistentEntity<?> entity, Key key, @Nullable
Assert.state(!entity.hasExpirationProperty(), "Touch on read is not supported for expiration property");
aeroRecord = getAndTouch(key, entity.getExpiration(), null, null);
} else {
Policy policy = checkForTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
Policy policy = enrichPolicyWithTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
aeroRecord = client.get(policy, key);
}
return aeroRecord;
Expand Down Expand Up @@ -838,7 +838,7 @@ private <S> Object getRecordMapToTargetClass(AerospikePersistentEntity<?> entity
Assert.state(!entity.hasExpirationProperty(), "Touch on read is not supported for expiration property");
aeroRecord = getAndTouch(key, entity.getExpiration(), binNames, query);
} else {
Policy policy = checkForTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
Policy policy = enrichPolicyWithTransaction(client, getPolicyFilterExpOrDefault(client, queryEngine, query));
aeroRecord = client.get(policy, key, binNames);
}
return mapToEntity(key, targetClass, aeroRecord);
Expand All @@ -863,7 +863,7 @@ private Record getAndTouch(Key key, int expiration, String[] binNames, @Nullable
writePolicyBuilder.filterExp(queryEngine.getFilterExpressionsBuilder().build(qualifier));
}
WritePolicy writePolicy = writePolicyBuilder.build();
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);

try {
if (binNames == null || binNames.length == 0) {
Expand Down Expand Up @@ -944,7 +944,7 @@ public GroupedEntities findByIds(GroupedKeys groupedKeys) {

private GroupedEntities findGroupedEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
BatchPolicy bPolicy = (BatchPolicy) checkForTransaction(client, client.getBatchPolicyDefault());
BatchPolicy bPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.getBatchPolicyDefault());
Record[] aeroRecords = client.get(bPolicy, entitiesKeys.getKeys());

return toGroupedEntities(entitiesKeys, aeroRecords);
Expand Down Expand Up @@ -1000,7 +1000,7 @@ public <T, S> List<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClas
.map(id -> getKey(id, setName))
.toArray(Key[]::new);

BatchPolicy batchPolicy = (BatchPolicy) checkForTransaction(client, getBatchPolicyFilterExp(query));
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, getBatchPolicyFilterExp(query));
Class<?> target;
Record[] aeroRecords;
if (targetClass != null && targetClass != entityClass) {
Expand Down Expand Up @@ -1170,7 +1170,7 @@ public boolean exists(Object id, String setName) {
try {
Key key = getKey(id, setName);

WritePolicy writePolicy = (WritePolicy) checkForTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, key, Operation.getHeader());
return aeroRecord != null;
} catch (AerospikeException e) {
Expand Down Expand Up @@ -1426,7 +1426,7 @@ public boolean indexExists(String indexName) {

private Record doPersistAndHandleError(AerospikeWriteData data, WritePolicy writePolicy, Operation[] operations) {
try {
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);
return client.operate(writePolicy, data.getKey(), operations);
} catch (AerospikeException e) {
throw translateError(e);
Expand Down Expand Up @@ -1455,7 +1455,7 @@ private <T> void doPersistWithVersionAndHandleError(T document, AerospikeWriteDa
private Record putAndGetHeader(AerospikeWriteData data, WritePolicy writePolicy, boolean firstlyDeleteBins) {
Key key = data.getKey();
Operation[] operations = getPutAndGetHeaderOperations(data, firstlyDeleteBins);
writePolicy = (WritePolicy) checkForTransaction(client, writePolicy);
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);

return client.operate(writePolicy, key, operations);
}
Expand Down Expand Up @@ -1540,7 +1540,7 @@ private List<KeyRecord> findByIdsWithoutMapping(Collection<?> ids, String setNam
try {
Key[] keys = getKeys(ids, setName);

BatchPolicy bPolicy = (BatchPolicy) checkForTransaction(client, getBatchPolicyFilterExp(query));
BatchPolicy bPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, getBatchPolicyFilterExp(query));
Record[] aeroRecords;
if (targetClass != null) {
String[] binNames = getBinNamesFromTargetClass(targetClass, mappingContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.springframework.data.aerospike.core;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.reactor.IAerospikeReactorClient;
import lombok.experimental.UtilityClass;
import org.springframework.data.aerospike.mapping.AerospikePersistentEntity;
Expand Down Expand Up @@ -126,16 +128,27 @@ public static String[] getBinNamesFromTargetClass(Class<?> targetClass,
return binNamesList.toArray(new String[0]);
}

public static Policy checkForTransaction(IAerospikeClient client, Policy policy) {
public static Policy enrichPolicyWithTransaction(IAerospikeClient client, Policy policy) {
if (TransactionSynchronizationManager.hasResource(client)) {
AerospikeTransactionResourceHolder resourceHolder =
(AerospikeTransactionResourceHolder) TransactionSynchronizationManager.getResource(client);
if (resourceHolder != null) policy.txn = resourceHolder.getTransaction();
return policy;
Policy newPolicy = getNewPolicy(policy);
if (resourceHolder != null) newPolicy.txn = resourceHolder.getTransaction();
return newPolicy;
}
return policy;
}

private static Policy getNewPolicy(Policy policy) {
if (policy instanceof WritePolicy writePolicy) {
return new WritePolicy(writePolicy);
} else if (policy instanceof BatchPolicy batchPolicy) {
return new BatchPolicy(batchPolicy);
} else {
return new Policy(policy);
}
}

private static Policy getPolicyFilterExp(IAerospikeClient client, QueryEngine queryEngine, Query query) {
if (queryCriteriaIsNotNull(query)) {
Policy policy = new Policy(client.getReadPolicyDefault());
Expand All @@ -148,15 +161,19 @@ private static Policy getPolicyFilterExp(IAerospikeClient client, QueryEngine qu

static Policy getPolicyFilterExpOrDefault(IAerospikeClient client, QueryEngine queryEngine, Query query) {
Policy policy = getPolicyFilterExp(client, queryEngine, query);
return checkForTransaction(client, policy != null ? policy : client.getReadPolicyDefault());
return policy != null ? policy : new Policy(client.getReadPolicyDefault());
}

static Mono<Policy> enrichPolicyWithTransaction(IAerospikeReactorClient reactiveClient, Policy policy) {
return TransactionContextManager.currentContext()
.map(ctx -> {
AerospikeReactiveTransactionResourceHolder resourceHolder =
(AerospikeReactiveTransactionResourceHolder) ctx.getResources().get(reactiveClient);
if (resourceHolder != null) policy.txn = resourceHolder.getTransaction();
if (resourceHolder != null) {
Policy newPolicy = getNewPolicy(policy);
newPolicy.txn = resourceHolder.getTransaction();
return newPolicy;
}
return policy;
})
.onErrorResume(NoTransactionException.class, ignored ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;

@Slf4j
Expand Down Expand Up @@ -131,8 +130,8 @@ public void verifyMultipleWritesInTransaction() {
@Test
public void verifyMultipleWritesInTransactionWithTimeout() {
// Multi-record transactions are supported starting with Server version 8.0+
SampleClasses.DocumentWithIntegerId document1 = new SampleClasses.DocumentWithIntegerId(501, "test1");
SampleClasses.DocumentWithIntegerId document2 = new SampleClasses.DocumentWithIntegerId(501, "test2");
SampleClasses.DocumentWithIntegerId document1 = new SampleClasses.DocumentWithIntegerId(520, "test1");
SampleClasses.DocumentWithIntegerId document2 = new SampleClasses.DocumentWithIntegerId(520, "test2");

reactiveTemplate.insert(document1)
// wait less than the specified timeout for this transactional operator
Expand All @@ -144,7 +143,7 @@ public void verifyMultipleWritesInTransactionWithTimeout() {
.verifyComplete();

reactiveTemplate
.findById(501, SampleClasses.DocumentWithIntegerId.class)
.findById(520, SampleClasses.DocumentWithIntegerId.class)
.as(StepVerifier::create)
.consumeNextWith(result -> assertThat(result.getContent().equals("test2")).isTrue())
.verifyComplete();
Expand All @@ -153,8 +152,8 @@ public void verifyMultipleWritesInTransactionWithTimeout() {
@Test
public void verifyMultipleWritesInTransactionWithTimeoutExpired() {
// Multi-record transactions are supported starting with Server version 8.0+
SampleClasses.DocumentWithIntegerId document1 = new SampleClasses.DocumentWithIntegerId(501, "test1");
SampleClasses.DocumentWithIntegerId document2 = new SampleClasses.DocumentWithIntegerId(501, "test2");
SampleClasses.DocumentWithIntegerId document1 = new SampleClasses.DocumentWithIntegerId(521, "test1");
SampleClasses.DocumentWithIntegerId document2 = new SampleClasses.DocumentWithIntegerId(521, "test2");

reactiveTemplate.insert(document1)
// wait more than the specified timeout for this transactional operator
Expand All @@ -171,6 +170,27 @@ public void verifyMultipleWritesInTransactionWithTimeoutExpired() {
});
}

@Test
public void verifyMultipleWritesInTransactionWithDefaultTimeoutExpired() {
// Multi-record transactions are supported starting with Server version 8.0+
SampleClasses.DocumentWithIntegerId document1 = new SampleClasses.DocumentWithIntegerId(522, "test1");
SampleClasses.DocumentWithIntegerId document2 = new SampleClasses.DocumentWithIntegerId(522, "test2");

reactiveTemplate.insert(document1)
// wait more than the specified timeout for this transactional operator
.delayElement(Duration.ofSeconds(15))
.then(reactiveTemplate.save(document2))
.then()
.as(transactionalOperator::transactional)
.as(StepVerifier::create)
.verifyErrorMatches(throwable -> {
if (throwable instanceof RecoverableDataAccessException) {
return throwable.getMessage().contains("MRT expired");
}
return false;
});
}

@Test
public void oneWriteInTransaction_manual_transactional() {
// Multi-record transactions are supported starting with Server version 8.0+
Expand Down
Loading

0 comments on commit 33a81f8

Please sign in to comment.