Skip to content

Commit

Permalink
FMWK-301 Support beforeLastUpdate parameter in deleteAll (#682)
Browse files Browse the repository at this point in the history
* add support for beforeLastUpdate parameter in deleteAll()
  • Loading branch information
agrgr authored Dec 28, 2023
1 parent 59ddb56 commit efa5691
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 52 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.springframework.data.util.StreamUtils;
import org.springframework.util.Assert;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -538,15 +540,28 @@ private void deleteGroupedEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
@Override
public <T> void deleteAll(Class<T> entityClass) {
Assert.notNull(entityClass, "Class must not be null!");
deleteAll(getSetName(entityClass));
deleteAll(entityClass, null);
}

@Override
public <T> void deleteAll(Class<T> entityClass, Instant beforeLastUpdate) {
Assert.notNull(entityClass, "Class must not be null!");
deleteAll(getSetName(entityClass), beforeLastUpdate);
}

@Override
public void deleteAll(String setName) {
Assert.notNull(setName, "Set name must not be null!");
deleteAll(setName, null);
}

@Override
public void deleteAll(String setName, Instant beforeLastUpdate) {
Assert.notNull(setName, "Set name must not be null!");
Calendar beforeLastUpdateCalendar = convertToCalendar(beforeLastUpdate);

try {
client.truncate(null, getNamespace(), setName, null);
client.truncate(null, getNamespace(), setName, beforeLastUpdateCalendar);
} catch (AerospikeException e) {
throw translateError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@
import org.springframework.data.mapping.model.ConvertingPropertyAccessor;
import org.springframework.util.Assert;

import java.time.Instant;
import java.util.Calendar;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -391,6 +394,26 @@ private <S> S convertIfNecessary(Object source, Class<S> type) {
: converter.getConversionService().convert(source, type);
}

protected Instant convertToInstant(Long millis) {
if (millis == null) return null;

if (millis >= Instant.now().toEpochMilli())
throw new IllegalArgumentException("Last update time (%d) must be less than the current time"
.formatted(millis));
return Instant.ofEpochMilli(millis);
}

protected Calendar convertToCalendar(Instant instant) {
if (instant == null) return null;

Calendar calendar = Calendar.getInstance();
if (instant.toEpochMilli() > calendar.getTimeInMillis())
throw new IllegalArgumentException("Last update time (%d) must be less than the current time"
.formatted(instant.toEpochMilli()));
calendar.setTime(Date.from(instant));
return calendar;
}

protected Operation[] getPutAndGetHeaderOperations(AerospikeWriteData data, boolean firstlyDeleteBins) {
Bin[] bins = data.getBinsAsArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.function.Supplier;
Expand Down Expand Up @@ -507,6 +508,16 @@ public interface ReactiveAerospikeOperations {
*/
<T> Mono<Void> deleteAll(Class<T> entityClass);

/**
* Reactively truncate/delete all records in the set determined by the given entity class.
*
* @param entityClass The class to extract set name from. Must not be {@literal null}.
* @param beforeLastUpdate Delete records before the specified time (must be earlier than the current time at
* millisecond resolution).
* @throws DataAccessException If operation failed (see {@link DefaultAerospikeExceptionTranslator} for details).
*/
<T> Mono<Void> deleteAll(Class<T> entityClass, Instant beforeLastUpdate);

/**
* Reactively truncate/delete all the documents in the given set.
*
Expand All @@ -515,6 +526,16 @@ public interface ReactiveAerospikeOperations {
*/
Mono<Void> deleteAll(String setName);

/**
* Reactively truncate/delete all documents in the given set.
*
* @param setName Set name to truncate/delete all records in.
* @param beforeLastUpdate Delete records before the specified time (must be earlier than the current time at
* millisecond resolution).
* @throws DataAccessException If operation failed (see {@link DefaultAerospikeExceptionTranslator} for details).
*/
Mono<Void> deleteAll(String setName, Instant beforeLastUpdate);

/**
* Find an existing record matching the document's class and id, add map values to the corresponding bins of the
* record and return the modified record mapped to the document's class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -548,16 +550,31 @@ private Mono<Void> deleteEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
public <T> Mono<Void> deleteAll(Class<T> entityClass) {
Assert.notNull(entityClass, "Class must not be null!");

return deleteAll(getSetName(entityClass));
return deleteAll(getSetName(entityClass), null);
}

@Override
public <T> Mono<Void> deleteAll(Class<T> entityClass, Instant beforeLastUpdate) {
Assert.notNull(entityClass, "Class must not be null!");

return deleteAll(getSetName(entityClass), beforeLastUpdate);
}

@Override
public Mono<Void> deleteAll(String setName) {
Assert.notNull(setName, "Set name must not be null!");

return deleteAll(setName, null);
}

@Override
public Mono<Void> deleteAll(String setName, Instant beforeLastUpdate) {
Assert.notNull(setName, "Set name must not be null!");
Calendar beforeLastUpdateCalendar = convertToCalendar(beforeLastUpdate);

try {
return Mono.fromRunnable(
() -> reactorClient.getAerospikeClient().truncate(null, namespace, setName, null));
() -> reactorClient.getAerospikeClient().truncate(null, namespace, setName, beforeLastUpdateCalendar));
} catch (AerospikeException e) {
throw translateError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@
import org.springframework.data.aerospike.config.BlockingTestConfig;
import org.springframework.data.aerospike.config.CommonTestConfig;
import org.springframework.data.aerospike.core.AerospikeTemplate;
import org.springframework.data.aerospike.query.FilterOperation;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.aerospike.query.QueryEngine;
import org.springframework.data.aerospike.query.cache.IndexRefresher;
import org.springframework.data.aerospike.query.cache.IndexesCache;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;

import java.util.Collection;
import java.util.List;

import static org.springframework.data.aerospike.repository.query.CriteriaDefinition.AerospikeMetadata.LAST_UPDATE_TIME;

@SpringBootTest(
classes = {BlockingTestConfig.class, CommonTestConfig.class},
Expand Down Expand Up @@ -43,4 +49,14 @@ protected <T> void deleteOneByOne(Collection<T> collection) {
protected <T> void deleteOneByOne(Collection<T> collection, String setName) {
collection.forEach(item -> template.delete(item, setName));
}

protected <T> List<T> runLastUpdateTimeQuery(long lastUpdateTimeMillis, FilterOperation operation,
Class<T> entityClass) {
Qualifier lastUpdateTimeLtMillis = Qualifier.metadataBuilder()
.setMetadataField(LAST_UPDATE_TIME)
.setFilterOperation(operation)
.setValue1AsObj(lastUpdateTimeMillis * MILLIS_TO_NANO)
.build();
return template.find(new Query(lastUpdateTimeLtMillis), entityClass).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public abstract class BaseIntegrationTests {

public static final String DEFAULT_SET_NAME = "aerospike";
public static final String OVERRIDE_SET_NAME = "testSet1";
protected static final int MILLIS_TO_NANO = 1_000_000;

@Value("${embedded.aerospike.namespace}")
protected String namespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
import org.springframework.data.aerospike.config.CommonTestConfig;
import org.springframework.data.aerospike.config.ReactiveTestConfig;
import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate;
import org.springframework.data.aerospike.query.FilterOperation;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.aerospike.query.cache.ReactorIndexRefresher;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import reactor.core.publisher.Flux;

import java.io.Serializable;
import java.util.List;

import static org.springframework.data.aerospike.repository.query.CriteriaDefinition.AerospikeMetadata.LAST_UPDATE_TIME;

@SpringBootTest(
classes = {ReactiveTestConfig.class, CommonTestConfig.class},
Expand Down Expand Up @@ -46,4 +52,14 @@ protected <T> void deleteAll(Iterable<T> iterable) {
protected <T> void deleteAll(Iterable<T> iterable, String setName) {
Flux.fromIterable(iterable).flatMap(item -> reactiveTemplate.delete(item, setName)).blockLast();
}

protected <T> List<T> runLastUpdateTimeQuery(long lastUpdateTimeMillis, FilterOperation operation,
Class<T> entityClass) {
Qualifier lastUpdateTimeLtMillis = Qualifier.metadataBuilder()
.setMetadataField(LAST_UPDATE_TIME)
.setFilterOperation(operation)
.setValue1AsObj(lastUpdateTimeMillis * MILLIS_TO_NANO)
.build();
return reactiveTemplate.find(new Query(lastUpdateTimeLtMillis), entityClass).collectList().block();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.aerospike.BaseBlockingIntegrationTests;
import org.springframework.data.aerospike.core.model.GroupedKeys;
import org.springframework.data.aerospike.query.FilterOperation;
import org.springframework.data.aerospike.sample.Customer;
import org.springframework.data.aerospike.sample.Person;
import org.springframework.data.aerospike.sample.SampleClasses.CollectionOfObjects;
import org.springframework.data.aerospike.sample.SampleClasses.CustomCollectionClassToDelete;
import org.springframework.data.aerospike.sample.SampleClasses.DocumentWithExpiration;
import org.springframework.data.aerospike.sample.SampleClasses.VersionedClass;
import org.springframework.data.aerospike.utility.AwaitilityUtils;
import org.springframework.test.context.TestPropertySource;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
Expand All @@ -49,6 +54,8 @@ public class AerospikeTemplateDeleteTests extends BaseBlockingIntegrationTests {
public void beforeEach() {
template.deleteAll(Person.class);
template.deleteAll(Customer.class);
template.deleteAll(VersionedClass.class);
template.deleteAll(CollectionOfObjects.class);
}

@Test
Expand Down Expand Up @@ -208,8 +215,7 @@ public void deleteByType_ShouldDeleteAllDocumentsWithCustomSetName() {

template.deleteAll(CustomCollectionClassToDelete.class);

// truncate is async operation that is why we need to wait until
// it completes
// truncate is async operation that is why we need to wait until it completes
await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(template.findByIds(Arrays.asList(id1, id2),
CustomCollectionClassToDelete.class)).isEmpty());
Expand Down Expand Up @@ -356,6 +362,58 @@ public void deleteAll_ShouldDeleteAllDocumentsWithSetName() {
}
}

@Test
public void deleteAll_ShouldDeleteAllDocumentsBeforeGivenLastUpdateTime() {
// batch delete operations are supported starting with Server version 6.0+
if (serverVersionSupport.batchWrite()) {
String id1 = nextId();
String id2 = nextId();
CollectionOfObjects document1 = new CollectionOfObjects(id1, List.of("test1"));
CollectionOfObjects document2 = new CollectionOfObjects(id2, List.of("test2"));

template.save(document1);
AwaitilityUtils.wait(1, MILLISECONDS);

Instant lastUpdateTime = Instant.now();
Instant inFuture = Instant.ofEpochMilli(lastUpdateTime.toEpochMilli() + 10000);
template.save(document2);

// make sure document1 has lastUpdateTime less than specified millis
List<CollectionOfObjects> resultsWithLutLtMillis =
runLastUpdateTimeQuery(lastUpdateTime.toEpochMilli(), FilterOperation.LT, CollectionOfObjects.class);
assertThat(resultsWithLutLtMillis.get(0).getId()).isEqualTo(document1.getId());
assertThat(resultsWithLutLtMillis.get(0).getCollection().iterator().next())
.isEqualTo(document1.getCollection().iterator().next());

assertThatThrownBy(() -> template.deleteAll(CollectionOfObjects.class, inFuture))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageMatching("Last update time (.*) must be less than the current time");

template.deleteAll(CollectionOfObjects.class, lastUpdateTime);
assertThat(template.findByIds(List.of(id1, id2), CollectionOfObjects.class)).hasSize(1);
CollectionOfObjects result = template.findByIds(List.of(id1, id2), CollectionOfObjects.class).get(0);
assertThat(result.getId()).isEqualTo(document2.getId());
assertThat(result.getCollection().iterator().next()).isEqualTo(document2.getCollection().iterator().next());

List<Person> persons = additionalAerospikeTestOperations.saveGeneratedPersons(101);
AwaitilityUtils.wait(1, MILLISECONDS);
lastUpdateTime = Instant.now();
AwaitilityUtils.wait(1, MILLISECONDS);
Person newPerson = new Person(nextId(), "testFirstName");
template.save(newPerson);
persons.add(newPerson);

template.deleteAll(template.getSetName(Person.class), lastUpdateTime);
List<String> personsIds = persons.stream().map(Person::getId).toList();
assertThat(template.findByIds(personsIds, Person.class)).contains(newPerson);

List<Person> persons2 = additionalAerospikeTestOperations.saveGeneratedPersons(1001);
template.deleteAll(Person.class, lastUpdateTime); // persons2 were saved after the given time
personsIds = persons2.stream().map(Person::getId).toList();
assertThat(template.findByIds(personsIds, Person.class)).containsExactlyElementsOf(persons2);
}
}

@Test
public void deleteAll_VersionsMismatch() {
// batch delete operations are supported starting with Server version 6.0+
Expand Down
Loading

0 comments on commit efa5691

Please sign in to comment.