From 5d111ec61a07e107f20bcac85c91e40f95c6407b Mon Sep 17 00:00:00 2001 From: agrgr Date: Tue, 10 Dec 2024 15:39:03 +0200 Subject: [PATCH] remove creating new policy instances from transaction enriching method --- .../aerospike/core/AerospikeTemplate.java | 29 +++++++------ .../core/ReactiveAerospikeTemplate.java | 35 +++++++-------- .../data/aerospike/core/TemplateUtils.java | 43 ++++++++++--------- ...AerospikeTransactionalAnnotationTests.java | 9 ++-- 4 files changed, 59 insertions(+), 57 deletions(-) diff --git a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java index c7fc73ae..f61fceb8 100644 --- a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java @@ -111,7 +111,7 @@ public AerospikeTemplate(IAerospikeClient client, QueryEngine queryEngine, IndexRefresher indexRefresher, ServerVersionSupport serverVersionSupport) { - super(namespace, converter, mappingContext, exceptionTranslator, client.getWritePolicyDefault(), + super(namespace, converter, mappingContext, exceptionTranslator, client.copyWritePolicyDefault(), serverVersionSupport); this.client = client; this.queryEngine = queryEngine; @@ -208,7 +208,7 @@ private void batchWriteAllDocuments(List documents, String setName, Opera List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList(); try { - BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.getBatchPolicyDefault()); + BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.copyBatchPolicyDefault()); client.operate(batchPolicy, batchWriteRecords); } catch (AerospikeException e) { throw translateError(e); // no exception is thrown for versions mismatch, only record's result code shows it @@ -308,7 +308,8 @@ public void persist(T document, WritePolicy writePolicy, String setName) { AerospikeWriteData data = writeData(document, setName); Operation[] operations = operations(data.getBinsAsArray(), Operation::put); - doPersistAndHandleError(data, writePolicy, operations); + // not using initial writePolicy instance because it can get enriched with transaction id + doPersistAndHandleError(data, new WritePolicy(writePolicy), operations); } @Override @@ -593,7 +594,7 @@ public void deleteAll(String setName, Instant beforeLastUpdate) { private void deleteAndHandleErrors(IAerospikeClient client, Key[] keys) { BatchResults results; try { - BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.getBatchPolicyDefault()); + BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.copyBatchPolicyDefault()); results = client.delete(batchPolicy, null, keys); } catch (AerospikeException e) { throw translateError(e); @@ -627,7 +628,7 @@ public T add(T document, String setName, Map values) { AerospikeWriteData data = writeData(document, setName); Operation[] ops = operations(values, Operation.Type.ADD, Operation.get()); - WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault()) + WritePolicy writePolicy = WritePolicyBuilder.builder(writePolicyDefault) .expiration(data.getExpiration()) .build(); writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy); @@ -654,7 +655,7 @@ public T add(T document, String setName, String binName, long value) { try { AerospikeWriteData data = writeData(document, setName); - WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault()) + WritePolicy writePolicy = WritePolicyBuilder.builder(writePolicyDefault) .expiration(data.getExpiration()) .build(); writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy); @@ -682,7 +683,7 @@ public T append(T document, String setName, Map values) { try { AerospikeWriteData data = writeData(document, setName); Operation[] ops = operations(values, Operation.Type.APPEND, Operation.get()); - WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault()); + WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault()); Record aeroRecord = client.operate(writePolicy, data.getKey(), ops); return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord); @@ -704,7 +705,7 @@ public T append(T document, String setName, String binName, String value) { try { AerospikeWriteData data = writeData(document, setName); - WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault()); + WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault()); Record aeroRecord = client.operate(writePolicy, data.getKey(), Operation.append(new Bin(binName, value)), Operation.get(binName)); @@ -728,7 +729,7 @@ public T prepend(T document, String setName, String fieldName, String value) try { AerospikeWriteData data = writeData(document, setName); - WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault()); + WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault()); Record aeroRecord = client.operate(writePolicy, data.getKey(), Operation.prepend(new Bin(fieldName, value)), Operation.get(fieldName)); @@ -753,7 +754,7 @@ public T prepend(T document, String setName, Map values) { try { AerospikeWriteData data = writeData(document, setName); Operation[] ops = operations(values, Operation.Type.PREPEND, Operation.get()); - WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault()); + WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault()); Record aeroRecord = client.operate(writePolicy, data.getKey(), ops); return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord); @@ -816,7 +817,7 @@ private Record getRecord(AerospikePersistentEntity entity, Key key, @Nullable private BatchPolicy getBatchPolicyFilterExp(Query query) { if (queryCriteriaIsNotNull(query)) { - BatchPolicy batchPolicy = new BatchPolicy(getAerospikeClient().getBatchPolicyDefault()); + BatchPolicy batchPolicy = getAerospikeClient().copyBatchPolicyDefault(); Qualifier qualifier = query.getCriteriaObject(); batchPolicy.filterExp = queryEngine.getFilterExpressionsBuilder().build(qualifier); return batchPolicy; @@ -855,7 +856,7 @@ private Policy getPolicyFilterExp(Query query) { } private Record getAndTouch(Key key, int expiration, String[] binNames, @Nullable Query query) { - WritePolicyBuilder writePolicyBuilder = WritePolicyBuilder.builder(client.getWritePolicyDefault()) + WritePolicyBuilder writePolicyBuilder = WritePolicyBuilder.builder(client.copyWritePolicyDefault()) .expiration(expiration); if (queryCriteriaIsNotNull(query)) { @@ -944,7 +945,7 @@ public GroupedEntities findByIds(GroupedKeys groupedKeys) { private GroupedEntities findGroupedEntitiesByGroupedKeys(GroupedKeys groupedKeys) { EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys)); - BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.getBatchPolicyDefault()); + BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.copyBatchPolicyDefault()); Record[] aeroRecords = client.get(batchPolicy, entitiesKeys.getKeys()); return toGroupedEntities(entitiesKeys, aeroRecords); @@ -1170,7 +1171,7 @@ public boolean exists(Object id, String setName) { try { Key key = getKey(id, setName); - WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault()); + WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault()); Record aeroRecord = client.operate(writePolicy, key, Operation.getHeader()); return aeroRecord != null; } catch (AerospikeException e) { diff --git a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java index 95fe6eaa..90d59d6f 100644 --- a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java @@ -112,8 +112,8 @@ public ReactiveAerospikeTemplate(IAerospikeReactorClient reactorClient, AerospikeExceptionTranslator exceptionTranslator, ReactorQueryEngine queryEngine, ReactorIndexRefresher reactorIndexRefresher, ServerVersionSupport serverVersionSupport) { - super(namespace, converter, mappingContext, exceptionTranslator, reactorClient.getWritePolicyDefault(), - serverVersionSupport); + super(namespace, converter, mappingContext, exceptionTranslator, + reactorClient.getAerospikeClient().copyWritePolicyDefault(), serverVersionSupport); Assert.notNull(reactorClient, "Aerospike reactor client must not be null!"); this.reactorClient = reactorClient; this.reactorQueryEngine = queryEngine; @@ -204,7 +204,7 @@ private Flux batchWriteAllDocuments(List documents, String setName, Op List batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList(); - return enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyBatchPolicyDefault()) .flatMapMany(batchPolicyEnriched -> batchWriteAndCheckForErrors((BatchPolicy) batchPolicyEnriched, batchWriteRecords, batchWriteDataList, operationType)); @@ -317,7 +317,8 @@ public Mono persist(T document, WritePolicy writePolicy, String setName) AerospikeWriteData data = writeData(document, setName); Operation[] operations = operations(data.getBinsAsArray(), Operation::put); - return enrichPolicyWithTransaction(reactorClient, writePolicy) + // not using initial writePolicy instance because it can get enriched with transaction id + return enrichPolicyWithTransaction(reactorClient, new WritePolicy(writePolicy)) .flatMap(writePolicyEnriched -> doPersistAndHandleError(document, data, (WritePolicy) writePolicyEnriched, operations)); } @@ -555,7 +556,7 @@ private Mono batchDeleteAndCheckForErrors(IAerospikeReactorClient reactorC return Mono.empty(); }; - return enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyBatchPolicyDefault()) .flatMap(batchPolicy -> reactorClient.delete((BatchPolicy) batchPolicy, null, keys)) .onErrorMap(this::translateError) .flatMap(checkForErrors); @@ -575,7 +576,7 @@ public Mono deleteByIds(GroupedKeys groupedKeys) { private Mono deleteEntitiesByGroupedKeys(GroupedKeys groupedKeys) { EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys)); - enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault()) + enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyBatchPolicyDefault()) .flatMap(batchPolicy -> reactorClient.delete((BatchPolicy) batchPolicy, null, entitiesKeys.getKeys())) .doOnError(this::translateError); @@ -682,7 +683,7 @@ public Mono append(T document, String setName, Map values AerospikeWriteData data = writeData(document, setName); Operation[] operations = operations(values, Operation.Type.APPEND, Operation.get()); - return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyWritePolicyDefault()) .flatMap(writePolicyEnriched -> executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations)); } @@ -699,7 +700,7 @@ public Mono append(T document, String setName, String binName, String val AerospikeWriteData data = writeData(document, setName); Operation[] operations = {Operation.append(new Bin(binName, value)), Operation.get(binName)}; - return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyWritePolicyDefault()) .flatMap(writePolicyEnriched -> executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations)); } @@ -717,7 +718,7 @@ public Mono prepend(T document, String setName, Map value AerospikeWriteData data = writeData(document, setName); Operation[] operations = operations(values, Operation.Type.PREPEND, Operation.get()); - return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyWritePolicyDefault()) .flatMap(writePolicyEnriched -> executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations)); } @@ -734,7 +735,7 @@ public Mono prepend(T document, String setName, String binName, String va AerospikeWriteData data = writeData(document, setName); Operation[] operations = {Operation.prepend(new Bin(binName, value)), Operation.get(binName)}; - return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyWritePolicyDefault()) .flatMap(writePolicyEnriched -> executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations)); } @@ -778,7 +779,7 @@ public Mono findById(Object id, Class entityClass, String setName) { ) .onErrorMap(this::translateError); } else { - return enrichPolicyWithTransaction(reactorClient, reactorClient.getReadPolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyReadPolicyDefault()) .flatMap(policy -> reactorClient.get(policy, key)) .filter(keyRecord -> Objects.nonNull(keyRecord.record)) .map(keyRecord -> mapToEntity(keyRecord.key, entityClass, keyRecord.record)) @@ -810,7 +811,7 @@ public Mono findById(Object id, Class entityClass, Class targetC ) .onErrorMap(this::translateError); } else { - return enrichPolicyWithTransaction(reactorClient, reactorClient.getReadPolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyReadPolicyDefault()) .flatMap(policy -> reactorClient.get(policy, key, binNames)) .filter(keyRecord -> Objects.nonNull(keyRecord.record)) .map(keyRecord -> mapToEntity(keyRecord.key, targetClass, keyRecord.record)) @@ -859,7 +860,7 @@ private Flux findByIds(Collection ids, Class targetClass, String se .map(id -> getKey(id, setName)) .toArray(Key[]::new); - return enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyBatchPolicyDefault()) .flatMap(batchPolicy -> reactorClient.get((BatchPolicy) batchPolicy, keys)) .flatMap(kr -> Mono.just(kr.asMap())) .flatMapMany(keyRecordMap -> { @@ -879,7 +880,7 @@ public Mono findByIds(GroupedKeys groupedKeys) { return Mono.just(GroupedEntities.builder().build()); } - return findGroupedEntitiesByGroupedKeys(reactorClient.getBatchPolicyDefault(), groupedKeys); + return findGroupedEntitiesByGroupedKeys(reactorClient.getAerospikeClient().copyBatchPolicyDefault(), groupedKeys); } private Mono findGroupedEntitiesByGroupedKeys(BatchPolicy batchPolicy, GroupedKeys groupedKeys) { @@ -926,7 +927,7 @@ public Mono findByIdUsingQuery(Object id, Class entityClass, Class< } else { Policy policy = null; if (queryCriteriaIsNotNull(query)) { - policy = new Policy(reactorClient.getReadPolicyDefault()); + policy = reactorClient.getAerospikeClient().copyReadPolicyDefault(); Qualifier qualifier = query.getCriteriaObject(); policy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier); } @@ -1081,7 +1082,7 @@ public Flux findInRange(long offset, long limit, Sort sort, Class targ private BatchPolicy getBatchPolicyFilterExp(Query query) { if (queryCriteriaIsNotNull(query)) { - BatchPolicy batchPolicy = new BatchPolicy(reactorClient.getAerospikeClient().getBatchPolicyDefault()); + BatchPolicy batchPolicy = reactorClient.getAerospikeClient().copyBatchPolicyDefault(); Qualifier qualifier = query.getCriteriaObject(); batchPolicy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier); return batchPolicy; @@ -1113,7 +1114,7 @@ public Mono exists(Object id, String setName) { Assert.notNull(setName, "Set name must not be null!"); Key key = getKey(id, setName); - return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().getReadPolicyDefault()) + return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyReadPolicyDefault()) .flatMap(policy -> reactorClient.exists(policy, key)) .map(Objects::nonNull) .defaultIfEmpty(false) diff --git a/src/main/java/org/springframework/data/aerospike/core/TemplateUtils.java b/src/main/java/org/springframework/data/aerospike/core/TemplateUtils.java index 181b611f..fd4349c0 100644 --- a/src/main/java/org/springframework/data/aerospike/core/TemplateUtils.java +++ b/src/main/java/org/springframework/data/aerospike/core/TemplateUtils.java @@ -1,9 +1,7 @@ package org.springframework.data.aerospike.core; import com.aerospike.client.IAerospikeClient; -import com.aerospike.client.policy.BatchPolicy; import com.aerospike.client.policy.Policy; -import com.aerospike.client.policy.WritePolicy; import com.aerospike.client.reactor.IAerospikeReactorClient; import lombok.experimental.UtilityClass; import org.springframework.data.aerospike.mapping.AerospikePersistentEntity; @@ -128,29 +126,26 @@ public static String[] getBinNamesFromTargetClass(Class targetClass, return binNamesList.toArray(new String[0]); } + /** + * Enrich given Policy with transaction id if transaction is active + * + * @param client IAerospikeClient + * @param policy Policy instance, typically not default policy to avoid saving transaction id to defaults + * @return Policy with filled {@link Policy#txn} if transaction is active + */ public static Policy enrichPolicyWithTransaction(IAerospikeClient client, Policy policy) { if (TransactionSynchronizationManager.hasResource(client)) { AerospikeTransactionResourceHolder resourceHolder = (AerospikeTransactionResourceHolder) TransactionSynchronizationManager.getResource(client); - Policy newPolicy = getNewPolicy(policy); - if (resourceHolder != null) newPolicy.txn = resourceHolder.getTransaction(); - return newPolicy; + if (resourceHolder != null) policy.txn = resourceHolder.getTransaction(); + return policy; } return policy; } - private static Policy getNewPolicy(Policy policy) { - if (policy instanceof WritePolicy writePolicy) { - return new WritePolicy(writePolicy); - } else if (policy instanceof BatchPolicy batchPolicy) { - return new BatchPolicy(batchPolicy); - } - return new Policy(policy); - } - private static Policy getPolicyFilterExp(IAerospikeClient client, QueryEngine queryEngine, Query query) { if (queryCriteriaIsNotNull(query)) { - Policy policy = new Policy(client.getReadPolicyDefault()); + Policy policy = client.copyReadPolicyDefault(); Qualifier qualifier = query.getCriteriaObject(); policy.filterExp = queryEngine.getFilterExpressionsBuilder().build(qualifier); return policy; @@ -160,18 +155,24 @@ private static Policy getPolicyFilterExp(IAerospikeClient client, QueryEngine qu static Policy getPolicyFilterExpOrDefault(IAerospikeClient client, QueryEngine queryEngine, Query query) { Policy policy = getPolicyFilterExp(client, queryEngine, query); - return policy != null ? policy : new Policy(client.getReadPolicyDefault()); + return policy != null ? policy : client.copyReadPolicyDefault(); } - static Mono enrichPolicyWithTransaction(IAerospikeReactorClient reactiveClient, Policy policy) { + /** + * Enrich given Policy with transaction id if transaction is active + * + * @param reactorClient IAerospikeReactorClient + * @param policy Policy instance, typically not default policy to avoid saving transaction id to defaults + * @return Mono<Policy> with filled {@link Policy#txn} if transaction is active + */ + static Mono enrichPolicyWithTransaction(IAerospikeReactorClient reactorClient, Policy policy) { return TransactionContextManager.currentContext() .map(ctx -> { AerospikeReactiveTransactionResourceHolder resourceHolder = - (AerospikeReactiveTransactionResourceHolder) ctx.getResources().get(reactiveClient); + (AerospikeReactiveTransactionResourceHolder) ctx.getResources().get(reactorClient); if (resourceHolder != null) { - Policy newPolicy = getNewPolicy(policy); - newPolicy.txn = resourceHolder.getTransaction(); - return newPolicy; + policy.txn = resourceHolder.getTransaction(); + return policy; } return policy; }) diff --git a/src/test/java/org/springframework/data/aerospike/transaction/sync/AerospikeTransactionalAnnotationTests.java b/src/test/java/org/springframework/data/aerospike/transaction/sync/AerospikeTransactionalAnnotationTests.java index 7fd3c960..6b9e2b11 100644 --- a/src/test/java/org/springframework/data/aerospike/transaction/sync/AerospikeTransactionalAnnotationTests.java +++ b/src/test/java/org/springframework/data/aerospike/transaction/sync/AerospikeTransactionalAnnotationTests.java @@ -101,11 +101,10 @@ public void verifyTransactionExists_multipleMethods() { // only for testing purposes as performing one write in a transaction lacks sense public void verifyTransaction_oneInsert() { TestTransactionSynchronization testSync = new TestTransactionSynchronization(() -> { - var resultsList = - // findAll() is used here because it uses QueryPolicy and ignores transaction id - // it is a callback after transaction completion, typically fast enough to happen before cleanup, - // so de facto transaction id is often still findable at this point - template.findAll(SampleClasses.DocumentWithPrimitiveIntId.class).toList(); + // findAll() is used here because it uses QueryPolicy and ignores transaction id + // it is a callback after transaction completion, typically fast enough to happen before cleanup, + // so de facto transaction id is often still findable at this point + var resultsList = template.findAll(SampleClasses.DocumentWithPrimitiveIntId.class).toList(); assertThat(resultsList).hasSize(1); assertThat(resultsList.stream().map(SampleClasses.DocumentWithPrimitiveIntId::getId).toList()) .containsExactlyInAnyOrder(300);