Skip to content

Commit

Permalink
FMWK-285 Compare versions when performing delete operations (#681)
Browse files Browse the repository at this point in the history
* align save, insert, update and delete methods that receive documents to throw CAS exception
* add deleteAll(Iterable)
* GENERATION_ERROR, KEY_EXISTS_ERROR and KEY_NOT_FOUND_ERROR now cause OptimisticLockingFailureException when a versioned document is written
---------
Co-authored-by: yrizhkov <[email protected]>
  • Loading branch information
agrgr authored Dec 26, 2023
1 parent 0066b89 commit 59ddb56
Show file tree
Hide file tree
Showing 17 changed files with 784 additions and 274 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,23 @@ <T> T updateVersion(T document, Record newAeroRecord) {
return document;
}

RuntimeException translateCasError(AerospikeException e) {
int code = e.getResultCode();
if (code == ResultCode.KEY_EXISTS_ERROR || code == ResultCode.GENERATION_ERROR) {
return new OptimisticLockingFailureException("Save document with version value failed", e);
RuntimeException translateCasError(AerospikeException e, String errMsg) {
if (hasOptimisticLockingError(e.getResultCode())) {
return getOptimisticLockingFailureException(errMsg, e);
}
return translateError(e);
}

protected boolean hasOptimisticLockingError(int resultCode) {
return List.of(ResultCode.GENERATION_ERROR, ResultCode.KEY_EXISTS_ERROR, ResultCode.KEY_NOT_FOUND_ERROR)
.contains(resultCode);
}

protected OptimisticLockingFailureException getOptimisticLockingFailureException(String errMsg,
AerospikeException e) {
return new OptimisticLockingFailureException(errMsg, e);
}

RuntimeException translateError(AerospikeException e) {
DataAccessException translated = exceptionTranslator.translateExceptionIfPossible(e);
return translated == null ? e : translated;
Expand All @@ -225,27 +234,27 @@ <T> AerospikeWriteData writeDataWithSpecificFields(T document, String setName, C
return data;
}

WritePolicy expectGenerationCasAwareSavePolicy(AerospikeWriteData data) {
WritePolicy expectGenerationCasAwarePolicy(AerospikeWriteData data) {
RecordExistsAction recordExistsAction = data.getVersion()
.filter(v -> v > 0L)
.map(v -> RecordExistsAction.UPDATE_ONLY) // updating existing document with generation,
// cannot use REPLACE_ONLY due to bin convergence feature restrictions
.orElse(RecordExistsAction.CREATE_ONLY); // create new document,
// if exists we should fail with optimistic locking
return expectGenerationSavePolicy(data, recordExistsAction);
return expectGenerationPolicy(data, recordExistsAction);
}

BatchWritePolicy expectGenerationCasAwareSaveBatchPolicy(AerospikeWriteData data) {
BatchWritePolicy expectGenerationCasAwareBatchPolicy(AerospikeWriteData data) {
RecordExistsAction recordExistsAction = data.getVersion()
.filter(v -> v > 0L)
.map(v -> RecordExistsAction.UPDATE_ONLY) // updating existing document with generation,
// cannot use REPLACE_ONLY due to bin convergence feature restrictions
.orElse(RecordExistsAction.CREATE_ONLY); // create new document,
// if exists we should fail with optimistic locking
return expectGenerationSaveBatchPolicy(data, recordExistsAction);
return expectGenerationBatchPolicy(data, recordExistsAction);
}

WritePolicy expectGenerationSavePolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) {
WritePolicy expectGenerationPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) {
return WritePolicyBuilder.builder(this.writePolicyDefault)
.generationPolicy(GenerationPolicy.EXPECT_GEN_EQUAL)
.generation(data.getVersion().orElse(0))
Expand All @@ -254,7 +263,7 @@ WritePolicy expectGenerationSavePolicy(AerospikeWriteData data, RecordExistsActi
.build();
}

BatchWritePolicy expectGenerationSaveBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) {
BatchWritePolicy expectGenerationBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) {
BatchWritePolicy batchWritePolicy = new BatchWritePolicy(this.batchWritePolicyDefault);
batchWritePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
batchWritePolicy.generation = data.getVersion().orElse(0);
Expand All @@ -263,28 +272,36 @@ BatchWritePolicy expectGenerationSaveBatchPolicy(AerospikeWriteData data, Record
return batchWritePolicy;
}

WritePolicy ignoreGenerationSavePolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) {
WritePolicy ignoreGenerationPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) {
return WritePolicyBuilder.builder(this.writePolicyDefault)
.generationPolicy(GenerationPolicy.NONE)
.expiration(data.getExpiration())
.recordExistsAction(recordExistsAction)
.build();
}

BatchWritePolicy ignoreGenerationSaveBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) {
BatchWritePolicy ignoreGenerationBatchPolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) {
BatchWritePolicy batchWritePolicy = new BatchWritePolicy(this.batchWritePolicyDefault);
batchWritePolicy.generationPolicy = GenerationPolicy.NONE;
batchWritePolicy.expiration = data.getExpiration();
batchWritePolicy.recordExistsAction = recordExistsAction;
return batchWritePolicy;
}

WritePolicy ignoreGenerationDeletePolicy() {
WritePolicy ignoreGenerationPolicy() {
return WritePolicyBuilder.builder(this.writePolicyDefault)
.generationPolicy(GenerationPolicy.NONE)
.build();
}

WritePolicy expectGenerationPolicy(AerospikeWriteData data) {
return WritePolicyBuilder.builder(this.writePolicyDefault)
.generationPolicy(GenerationPolicy.EXPECT_GEN_EQUAL)
.generation(data.getVersion().orElse(0))
.expiration(data.getExpiration())
.build();
}

Key getKey(Object id, AerospikePersistentEntity<?> entity) {
return getKey(id, entity.getSetName());
}
Expand Down Expand Up @@ -382,9 +399,8 @@ protected Operation[] getPutAndGetHeaderOperations(AerospikeWriteData data, bool
"Cannot put and get header on a document with no bins and \"@_class\" bin disabled.");
}

return firstlyDeleteBins ? operations(bins, Operation::put,
Operation.array(Operation.delete()), Operation.array(Operation.getHeader()))
: operations(bins, Operation::put, null, Operation.array(Operation.getHeader()));
return operations(bins, Operation::put, firstlyDeleteBins ? Operation.array(Operation.delete()) : null,
Operation.array(Operation.getHeader()));
}

public <T> BatchWriteData<T> getBatchWriteForSave(T document, String setName) {
Expand All @@ -396,12 +412,12 @@ public <T> BatchWriteData<T> getBatchWriteForSave(T document, String setName) {
Operation[] operations;
BatchWritePolicy policy;
if (entity.hasVersionProperty()) {
policy = expectGenerationCasAwareSaveBatchPolicy(data);
policy = expectGenerationCasAwareBatchPolicy(data);

// mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions
operations = getPutAndGetHeaderOperations(data, true);
} else {
policy = ignoreGenerationSaveBatchPolicy(data, RecordExistsAction.UPDATE);
policy = ignoreGenerationBatchPolicy(data, RecordExistsAction.UPDATE);

// mimicking REPLACE behavior by firstly deleting bins due to bin convergence feature restrictions
operations = operations(data.getBinsAsArray(), Operation::put,
Expand All @@ -419,7 +435,7 @@ public <T> BatchWriteData<T> getBatchWriteForInsert(T document, String setName)

AerospikePersistentEntity<?> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
Operation[] operations;
BatchWritePolicy policy = ignoreGenerationSaveBatchPolicy(data, RecordExistsAction.CREATE_ONLY);
BatchWritePolicy policy = ignoreGenerationBatchPolicy(data, RecordExistsAction.CREATE_ONLY);
if (entity.hasVersionProperty()) {
operations = getPutAndGetHeaderOperations(data, false);
} else {
Expand All @@ -439,12 +455,12 @@ public <T> BatchWriteData<T> getBatchWriteForUpdate(T document, String setName)
Operation[] operations;
BatchWritePolicy policy;
if (entity.hasVersionProperty()) {
policy = expectGenerationSaveBatchPolicy(data, RecordExistsAction.UPDATE_ONLY);
policy = expectGenerationBatchPolicy(data, RecordExistsAction.UPDATE_ONLY);

// mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions
operations = getPutAndGetHeaderOperations(data, true);
} else {
policy = ignoreGenerationSaveBatchPolicy(data, RecordExistsAction.UPDATE_ONLY);
policy = ignoreGenerationBatchPolicy(data, RecordExistsAction.UPDATE_ONLY);

// mimicking REPLACE_ONLY behavior by firstly deleting bins due to bin convergence feature restrictions
operations = Stream.concat(Stream.of(Operation.delete()), data.getBins().stream()
Expand All @@ -455,6 +471,25 @@ public <T> BatchWriteData<T> getBatchWriteForUpdate(T document, String setName)
entity.hasVersionProperty());
}

public <T> BatchWriteData<T> getBatchWriteForDelete(T document, String setName) {
Assert.notNull(document, "Document must not be null!");

AerospikePersistentEntity<?> entity = mappingContext.getRequiredPersistentEntity(document.getClass());
Operation[] operations;
BatchWritePolicy policy;
AerospikeWriteData data = writeData(document, setName);

if (entity.hasVersionProperty()) {
policy = expectGenerationBatchPolicy(data, RecordExistsAction.UPDATE_ONLY);
} else {
policy = ignoreGenerationBatchPolicy(data, RecordExistsAction.UPDATE_ONLY);
}
operations = Operation.array(Operation.delete());

return new BatchWriteData<>(document, new BatchWrite(policy, data.getKey(), operations),
entity.hasVersionProperty());
}

protected void validateGroupedKeys(GroupedKeys groupedKeys) {
Assert.notNull(groupedKeys, "Grouped keys must not be null!");
validateForBatchWrite(groupedKeys.getEntitiesKeys(), "Entities keys");
Expand All @@ -481,7 +516,8 @@ protected boolean batchWriteSupported() {
protected enum OperationType {
SAVE_OPERATION("save"),
INSERT_OPERATION("insert"),
UPDATE_OPERATION("update");
UPDATE_OPERATION("update"),
DELETE_OPERATION("delete");

private final String name;

Expand Down
Loading

0 comments on commit 59ddb56

Please sign in to comment.