Skip to content

Commit

Permalink
remove creating new policy instances from transaction enriching method
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr committed Dec 10, 2024
1 parent 598c258 commit 5d111ec
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public AerospikeTemplate(IAerospikeClient client,
QueryEngine queryEngine,
IndexRefresher indexRefresher,
ServerVersionSupport serverVersionSupport) {
super(namespace, converter, mappingContext, exceptionTranslator, client.getWritePolicyDefault(),
super(namespace, converter, mappingContext, exceptionTranslator, client.copyWritePolicyDefault(),
serverVersionSupport);
this.client = client;
this.queryEngine = queryEngine;
Expand Down Expand Up @@ -208,7 +208,7 @@ private <T> void batchWriteAllDocuments(List<T> documents, String setName, Opera

List<BatchRecord> batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList();
try {
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.getBatchPolicyDefault());
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.copyBatchPolicyDefault());
client.operate(batchPolicy, batchWriteRecords);
} catch (AerospikeException e) {
throw translateError(e); // no exception is thrown for versions mismatch, only record's result code shows it
Expand Down Expand Up @@ -308,7 +308,8 @@ public <T> void persist(T document, WritePolicy writePolicy, String setName) {
AerospikeWriteData data = writeData(document, setName);

Operation[] operations = operations(data.getBinsAsArray(), Operation::put);
doPersistAndHandleError(data, writePolicy, operations);
// not using initial writePolicy instance because it can get enriched with transaction id
doPersistAndHandleError(data, new WritePolicy(writePolicy), operations);
}

@Override
Expand Down Expand Up @@ -593,7 +594,7 @@ public void deleteAll(String setName, Instant beforeLastUpdate) {
private void deleteAndHandleErrors(IAerospikeClient client, Key[] keys) {
BatchResults results;
try {
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.getBatchPolicyDefault());
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.copyBatchPolicyDefault());
results = client.delete(batchPolicy, null, keys);
} catch (AerospikeException e) {
throw translateError(e);
Expand Down Expand Up @@ -627,7 +628,7 @@ public <T> T add(T document, String setName, Map<String, Long> values) {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.ADD, Operation.get());

WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
WritePolicy writePolicy = WritePolicyBuilder.builder(writePolicyDefault)
.expiration(data.getExpiration())
.build();
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);
Expand All @@ -654,7 +655,7 @@ public <T> T add(T document, String setName, String binName, long value) {
try {
AerospikeWriteData data = writeData(document, setName);

WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
WritePolicy writePolicy = WritePolicyBuilder.builder(writePolicyDefault)
.expiration(data.getExpiration())
.build();
writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, writePolicy);
Expand Down Expand Up @@ -682,7 +683,7 @@ public <T> T append(T document, String setName, Map<String, String> values) {
try {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.APPEND, Operation.get());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);

return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord);
Expand All @@ -704,7 +705,7 @@ public <T> T append(T document, String setName, String binName, String value) {

try {
AerospikeWriteData data = writeData(document, setName);
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.append(new Bin(binName, value)),
Operation.get(binName));
Expand All @@ -728,7 +729,7 @@ public <T> T prepend(T document, String setName, String fieldName, String value)

try {
AerospikeWriteData data = writeData(document, setName);
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(),
Operation.prepend(new Bin(fieldName, value)),
Operation.get(fieldName));
Expand All @@ -753,7 +754,7 @@ public <T> T prepend(T document, String setName, Map<String, String> values) {
try {
AerospikeWriteData data = writeData(document, setName);
Operation[] ops = operations(values, Operation.Type.PREPEND, Operation.get());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, data.getKey(), ops);

return mapToEntity(data.getKey(), getEntityClass(document), aeroRecord);
Expand Down Expand Up @@ -816,7 +817,7 @@ private Record getRecord(AerospikePersistentEntity<?> entity, Key key, @Nullable

private BatchPolicy getBatchPolicyFilterExp(Query query) {
if (queryCriteriaIsNotNull(query)) {
BatchPolicy batchPolicy = new BatchPolicy(getAerospikeClient().getBatchPolicyDefault());
BatchPolicy batchPolicy = getAerospikeClient().copyBatchPolicyDefault();
Qualifier qualifier = query.getCriteriaObject();
batchPolicy.filterExp = queryEngine.getFilterExpressionsBuilder().build(qualifier);
return batchPolicy;
Expand Down Expand Up @@ -855,7 +856,7 @@ private Policy getPolicyFilterExp(Query query) {
}

private Record getAndTouch(Key key, int expiration, String[] binNames, @Nullable Query query) {
WritePolicyBuilder writePolicyBuilder = WritePolicyBuilder.builder(client.getWritePolicyDefault())
WritePolicyBuilder writePolicyBuilder = WritePolicyBuilder.builder(client.copyWritePolicyDefault())
.expiration(expiration);

if (queryCriteriaIsNotNull(query)) {
Expand Down Expand Up @@ -944,7 +945,7 @@ public GroupedEntities findByIds(GroupedKeys groupedKeys) {

private GroupedEntities findGroupedEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.getBatchPolicyDefault());
BatchPolicy batchPolicy = (BatchPolicy) enrichPolicyWithTransaction(client, client.copyBatchPolicyDefault());
Record[] aeroRecords = client.get(batchPolicy, entitiesKeys.getKeys());

return toGroupedEntities(entitiesKeys, aeroRecords);
Expand Down Expand Up @@ -1170,7 +1171,7 @@ public boolean exists(Object id, String setName) {
try {
Key key = getKey(id, setName);

WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.getWritePolicyDefault());
WritePolicy writePolicy = (WritePolicy) enrichPolicyWithTransaction(client, client.copyWritePolicyDefault());
Record aeroRecord = client.operate(writePolicy, key, Operation.getHeader());
return aeroRecord != null;
} catch (AerospikeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public ReactiveAerospikeTemplate(IAerospikeReactorClient reactorClient,
AerospikeExceptionTranslator exceptionTranslator,
ReactorQueryEngine queryEngine, ReactorIndexRefresher reactorIndexRefresher,
ServerVersionSupport serverVersionSupport) {
super(namespace, converter, mappingContext, exceptionTranslator, reactorClient.getWritePolicyDefault(),
serverVersionSupport);
super(namespace, converter, mappingContext, exceptionTranslator,
reactorClient.getAerospikeClient().copyWritePolicyDefault(), serverVersionSupport);
Assert.notNull(reactorClient, "Aerospike reactor client must not be null!");
this.reactorClient = reactorClient;
this.reactorQueryEngine = queryEngine;
Expand Down Expand Up @@ -204,7 +204,7 @@ private <T> Flux<T> batchWriteAllDocuments(List<T> documents, String setName, Op

List<BatchRecord> batchWriteRecords = batchWriteDataList.stream().map(BatchWriteData::batchRecord).toList();

return enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyBatchPolicyDefault())
.flatMapMany(batchPolicyEnriched ->
batchWriteAndCheckForErrors((BatchPolicy) batchPolicyEnriched, batchWriteRecords, batchWriteDataList,
operationType));
Expand Down Expand Up @@ -317,7 +317,8 @@ public <T> Mono<T> persist(T document, WritePolicy writePolicy, String setName)
AerospikeWriteData data = writeData(document, setName);

Operation[] operations = operations(data.getBinsAsArray(), Operation::put);
return enrichPolicyWithTransaction(reactorClient, writePolicy)
// not using initial writePolicy instance because it can get enriched with transaction id
return enrichPolicyWithTransaction(reactorClient, new WritePolicy(writePolicy))
.flatMap(writePolicyEnriched ->
doPersistAndHandleError(document, data, (WritePolicy) writePolicyEnriched, operations));
}
Expand Down Expand Up @@ -555,7 +556,7 @@ private Mono<Void> batchDeleteAndCheckForErrors(IAerospikeReactorClient reactorC
return Mono.empty();
};

return enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyBatchPolicyDefault())
.flatMap(batchPolicy -> reactorClient.delete((BatchPolicy) batchPolicy, null, keys))
.onErrorMap(this::translateError)
.flatMap(checkForErrors);
Expand All @@ -575,7 +576,7 @@ public Mono<Void> deleteByIds(GroupedKeys groupedKeys) {
private Mono<Void> deleteEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
EntitiesKeys entitiesKeys = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));

enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault())
enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyBatchPolicyDefault())
.flatMap(batchPolicy -> reactorClient.delete((BatchPolicy) batchPolicy, null, entitiesKeys.getKeys()))
.doOnError(this::translateError);

Expand Down Expand Up @@ -682,7 +683,7 @@ public <T> Mono<T> append(T document, String setName, Map<String, String> values

AerospikeWriteData data = writeData(document, setName);
Operation[] operations = operations(values, Operation.Type.APPEND, Operation.get());
return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyWritePolicyDefault())
.flatMap(writePolicyEnriched ->
executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations));
}
Expand All @@ -699,7 +700,7 @@ public <T> Mono<T> append(T document, String setName, String binName, String val

AerospikeWriteData data = writeData(document, setName);
Operation[] operations = {Operation.append(new Bin(binName, value)), Operation.get(binName)};
return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyWritePolicyDefault())
.flatMap(writePolicyEnriched ->
executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations));
}
Expand All @@ -717,7 +718,7 @@ public <T> Mono<T> prepend(T document, String setName, Map<String, String> value

AerospikeWriteData data = writeData(document, setName);
Operation[] operations = operations(values, Operation.Type.PREPEND, Operation.get());
return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyWritePolicyDefault())
.flatMap(writePolicyEnriched -> executeOperationsOnValue(document, data,
(WritePolicy) writePolicyEnriched, operations));
}
Expand All @@ -734,7 +735,7 @@ public <T> Mono<T> prepend(T document, String setName, String binName, String va

AerospikeWriteData data = writeData(document, setName);
Operation[] operations = {Operation.prepend(new Bin(binName, value)), Operation.get(binName)};
return enrichPolicyWithTransaction(reactorClient, reactorClient.getWritePolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyWritePolicyDefault())
.flatMap(writePolicyEnriched ->
executeOperationsOnValue(document, data, (WritePolicy) writePolicyEnriched, operations));
}
Expand Down Expand Up @@ -778,7 +779,7 @@ public <T> Mono<T> findById(Object id, Class<T> entityClass, String setName) {
)
.onErrorMap(this::translateError);
} else {
return enrichPolicyWithTransaction(reactorClient, reactorClient.getReadPolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyReadPolicyDefault())
.flatMap(policy -> reactorClient.get(policy, key))
.filter(keyRecord -> Objects.nonNull(keyRecord.record))
.map(keyRecord -> mapToEntity(keyRecord.key, entityClass, keyRecord.record))
Expand Down Expand Up @@ -810,7 +811,7 @@ public <T, S> Mono<S> findById(Object id, Class<T> entityClass, Class<S> targetC
)
.onErrorMap(this::translateError);
} else {
return enrichPolicyWithTransaction(reactorClient, reactorClient.getReadPolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyReadPolicyDefault())
.flatMap(policy -> reactorClient.get(policy, key, binNames))
.filter(keyRecord -> Objects.nonNull(keyRecord.record))
.map(keyRecord -> mapToEntity(keyRecord.key, targetClass, keyRecord.record))
Expand Down Expand Up @@ -859,7 +860,7 @@ private <T> Flux<T> findByIds(Collection<?> ids, Class<T> targetClass, String se
.map(id -> getKey(id, setName))
.toArray(Key[]::new);

return enrichPolicyWithTransaction(reactorClient, reactorClient.getBatchPolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyBatchPolicyDefault())
.flatMap(batchPolicy -> reactorClient.get((BatchPolicy) batchPolicy, keys))
.flatMap(kr -> Mono.just(kr.asMap()))
.flatMapMany(keyRecordMap -> {
Expand All @@ -879,7 +880,7 @@ public Mono<GroupedEntities> findByIds(GroupedKeys groupedKeys) {
return Mono.just(GroupedEntities.builder().build());
}

return findGroupedEntitiesByGroupedKeys(reactorClient.getBatchPolicyDefault(), groupedKeys);
return findGroupedEntitiesByGroupedKeys(reactorClient.getAerospikeClient().copyBatchPolicyDefault(), groupedKeys);
}

private Mono<GroupedEntities> findGroupedEntitiesByGroupedKeys(BatchPolicy batchPolicy, GroupedKeys groupedKeys) {
Expand Down Expand Up @@ -926,7 +927,7 @@ public <T, S> Mono<?> findByIdUsingQuery(Object id, Class<T> entityClass, Class<
} else {
Policy policy = null;
if (queryCriteriaIsNotNull(query)) {
policy = new Policy(reactorClient.getReadPolicyDefault());
policy = reactorClient.getAerospikeClient().copyReadPolicyDefault();
Qualifier qualifier = query.getCriteriaObject();
policy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier);
}
Expand Down Expand Up @@ -1081,7 +1082,7 @@ public <T> Flux<T> findInRange(long offset, long limit, Sort sort, Class<T> targ

private BatchPolicy getBatchPolicyFilterExp(Query query) {
if (queryCriteriaIsNotNull(query)) {
BatchPolicy batchPolicy = new BatchPolicy(reactorClient.getAerospikeClient().getBatchPolicyDefault());
BatchPolicy batchPolicy = reactorClient.getAerospikeClient().copyBatchPolicyDefault();
Qualifier qualifier = query.getCriteriaObject();
batchPolicy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier);
return batchPolicy;
Expand Down Expand Up @@ -1113,7 +1114,7 @@ public Mono<Boolean> exists(Object id, String setName) {
Assert.notNull(setName, "Set name must not be null!");

Key key = getKey(id, setName);
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().getReadPolicyDefault())
return enrichPolicyWithTransaction(reactorClient, reactorClient.getAerospikeClient().copyReadPolicyDefault())
.flatMap(policy -> reactorClient.exists(policy, key))
.map(Objects::nonNull)
.defaultIfEmpty(false)
Expand Down
Loading

0 comments on commit 5d111ec

Please sign in to comment.