Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr committed Nov 20, 2023
1 parent 78adfb4 commit afa5f62
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import static org.springframework.data.aerospike.core.CoreUtils.verifyUnsortedWithOffset;
import static org.springframework.data.aerospike.core.TemplateUtils.excludeIdQualifier;
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
import static org.springframework.data.aerospike.query.QualifierUtils.getOneIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.getIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;

/**
Expand Down Expand Up @@ -1062,12 +1062,11 @@ public long count(Query query, String setName) {
}

private Stream<KeyRecord> countRecordsUsingQuery(String setName, Query query) {
Assert.notNull(query, "Query must not be null!");
Assert.notNull(setName, "Set name must not be null!");

Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
Qualifier idQualifier = getIdQualifier(qualifier);
if (idQualifier != null) {
// a separate flow for a query with id
return findByIdsWithoutMapping(getIdValue(idQualifier), setName, null,
Expand Down Expand Up @@ -1307,7 +1306,7 @@ private <T> Stream<T> applyPostProcessingOnResults(Stream<T> results, Sort sort,
private <T> Stream<KeyRecord> findRecordsUsingQuery(String setName, Class<T> targetClass, Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
Qualifier idQualifier = getIdQualifier(qualifier);
if (idQualifier != null) {
// a separate flow for a query with id
return findByIdsWithoutMapping(getIdValue(idQualifier), setName, targetClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import static org.springframework.data.aerospike.core.CoreUtils.operations;
import static org.springframework.data.aerospike.core.TemplateUtils.excludeIdQualifier;
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
import static org.springframework.data.aerospike.query.QualifierUtils.getOneIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.getIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;

/**
Expand Down Expand Up @@ -332,7 +332,7 @@ public <T> Mono<Void> delete(Class<T> entityClass) {
try {
String set = getSetName(entityClass);
return Mono.fromRunnable(
() -> reactorClient.getAerospikeClient().truncate(null, this.namespace, set, null));
() -> reactorClient.getAerospikeClient().truncate(null, namespace, set, null));
} catch (AerospikeException e) {
throw translateError(e);
}
Expand Down Expand Up @@ -473,7 +473,7 @@ public Mono<Void> deleteAll(String setName) {

try {
return Mono.fromRunnable(
() -> reactorClient.getAerospikeClient().truncate(null, this.namespace, setName, null));
() -> reactorClient.getAerospikeClient().truncate(null, namespace, setName, null));
} catch (AerospikeException e) {
throw translateError(e);
}
Expand All @@ -500,7 +500,7 @@ public <T> Mono<T> add(T document, String setName, Map<String, Long> values) {
}
operations[x] = Operation.get();

WritePolicy writePolicy = WritePolicyBuilder.builder(this.writePolicyDefault)
WritePolicy writePolicy = WritePolicyBuilder.builder(writePolicyDefault)
.expiration(data.getExpiration())
.build();

Expand All @@ -520,7 +520,7 @@ public <T> Mono<T> add(T document, String setName, String binName, long value) {

AerospikeWriteData data = writeData(document, setName);

WritePolicy writePolicy = WritePolicyBuilder.builder(this.writePolicyDefault)
WritePolicy writePolicy = WritePolicyBuilder.builder(writePolicyDefault)
.expiration(data.getExpiration())
.build();

Expand Down Expand Up @@ -952,45 +952,42 @@ public Mono<Long> count(String setName) {
private long countSet(String setName) {
Node[] nodes = reactorClient.getAerospikeClient().getNodes();

int replicationFactor = Utils.getReplicationFactor(nodes, this.namespace);
int replicationFactor = Utils.getReplicationFactor(nodes, namespace);

long totalObjects = Arrays.stream(nodes)
.mapToLong(node -> Utils.getObjectsCount(node, this.namespace, setName))
.mapToLong(node -> Utils.getObjectsCount(node, namespace, setName))
.sum();

return (nodes.length > 1) ? (totalObjects / replicationFactor) : totalObjects;
}

@Override
public <T> Mono<Long> count(Query query, Class<T> entityClass) {
Assert.notNull(query, "Query must not be null!");
Assert.notNull(entityClass, "Class must not be null!");

return count(query, getSetName(entityClass));
}

@Override
public Mono<Long> count(Query query, String setName) {
Assert.notNull(query, "Query must not be null!");
Assert.notNull(setName, "Set for count must not be null!");

return countRecordsUsingQuery(setName, query).count();
}

private Flux<KeyRecord> countRecordsUsingQuery(String setName, Query query) {
Assert.notNull(query, "Query must not be null!");
Assert.notNull(setName, "Set name must not be null!");

Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
Qualifier idQualifier = getIdQualifier(qualifier);
if (idQualifier != null) {
// a separate flow for a query with id
return findByIdsWithoutMapping(getIdValue(idQualifier), setName, null,
new Query(excludeIdQualifier(qualifier)));
}
}
return this.reactorQueryEngine.selectForCount(this.namespace, setName, query);
return reactorQueryEngine.selectForCount(namespace, setName, query);
}

@Override
Expand Down Expand Up @@ -1035,7 +1032,7 @@ public Mono<Void> createIndex(String setName, String indexName,
Assert.notNull(indexCollectionType, "Index collection type must not be null!");
Assert.notNull(ctx, "Ctx must not be null!");

return reactorClient.createIndex(null, this.namespace,
return reactorClient.createIndex(null, namespace,
setName, indexName, binName, indexType, indexCollectionType, ctx)
.then(reactorIndexRefresher.refreshIndexes())
.onErrorMap(this::translateError);
Expand All @@ -1052,7 +1049,7 @@ public Mono<Void> deleteIndex(String setName, String indexName) {
Assert.notNull(setName, "Set name must not be null!");
Assert.notNull(indexName, "Index name must not be null!");

return reactorClient.dropIndex(null, this.namespace, setName, indexName)
return reactorClient.dropIndex(null, namespace, setName, indexName)
.then(reactorIndexRefresher.refreshIndexes())
.onErrorMap(this::translateError);
}
Expand Down Expand Up @@ -1136,7 +1133,7 @@ private Mono<Record> putAndGetHeader(AerospikeWriteData data, WritePolicy policy
}

private Mono<KeyRecord> getAndTouch(Key key, int expiration, String[] binNames, Query query) {
WritePolicyBuilder writePolicyBuilder = WritePolicyBuilder.builder(this.writePolicyDefault)
WritePolicyBuilder writePolicyBuilder = WritePolicyBuilder.builder(writePolicyDefault)
.expiration(expiration);

if (queryCriteriaIsNotNull(query)) {
Expand Down Expand Up @@ -1252,7 +1249,7 @@ private <T> Flux<T> findUsingQueryWithDistinctPredicate(String setName, Class<T>
private <T> Flux<KeyRecord> findRecordsUsingQuery(String setName, Class<T> targetClass, Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
Qualifier idQualifier = getIdQualifier(qualifier);
if (idQualifier != null) {
// a separate flow for a query with id
return findByIdsWithoutMapping(getIdValue(idQualifier), setName, targetClass,
Expand All @@ -1262,10 +1259,9 @@ private <T> Flux<KeyRecord> findRecordsUsingQuery(String setName, Class<T> targe

if (targetClass != null) {
String[] binNames = getBinNamesFromTargetClass(targetClass);
return this.reactorQueryEngine.select(this.namespace, setName, binNames, query);
} else {
return this.reactorQueryEngine.select(this.namespace, setName, null, query);
return reactorQueryEngine.select(namespace, setName, binNames, query);
}
return reactorQueryEngine.select(namespace, setName, null, query);
}

private <T> Flux<KeyRecord> findByIdsWithoutMapping(Collection<?> ids, String setName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@
@UtilityClass
public class QualifierUtils {

public static Qualifier getIdQualifier(Qualifier qualifier) {
return getOneIdQualifier(qualifier);
}

/**
* Find id qualifier.
*
* @param qualifier {@link Qualifier} to search through
* @return The only id qualifier or null
* @throws IllegalArgumentException if more than one id qualifier given
*/
public static Qualifier getOneIdQualifier(Qualifier qualifier) {
public static Qualifier getIdQualifier(Qualifier qualifier) {
if (qualifier != null) {
List<Qualifier> idQualifiers = getIdQualifiers(qualifier);
if (idQualifiers.size() > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public KeyRecordIterator select(String namespace, String set, @Nullable Query qu
/**
* Select records filtered by a query
*
* @param namespace Namespace to storing the data
* @param namespace Namespace to store the data
* @param set Set storing the data
* @param binNames Bin names to return from the query
* @param query {@link Query} for filtering results
Expand Down Expand Up @@ -111,8 +111,7 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
*/
Statement statement = statementBuilder.build(namespace, set, query, binNames);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = new QueryPolicy(queryPolicy);
localQueryPolicy.filterExp = filterExpressionsBuilder.build(query);
QueryPolicy localQueryPolicy = getQueryPolicy(query, true);

if (!scansEnabled && statement.getFilter() == null) {
throw new IllegalStateException(SCANS_DISABLED_MESSAGE);
Expand All @@ -125,17 +124,15 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
/**
* Select records filtered by a query to be counted
*
* @param namespace Namespace to storing the data
* @param namespace Namespace to store the data
* @param set Set storing the data
* @param query {@link Query} for filtering results
* @return A KeyRecordIterator to iterate over the results
* @return A KeyRecordIterator for counting
*/
public KeyRecordIterator selectForCount(String namespace, String set, @Nullable Query query) {
Statement statement = statementBuilder.build(namespace, set, query);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = new QueryPolicy(queryPolicy);
localQueryPolicy.filterExp = filterExpressionsBuilder.build(query);
localQueryPolicy.includeBinData = false;
QueryPolicy localQueryPolicy = getQueryPolicy(query, false);

if (!scansEnabled && statement.getFilter() == null) {
throw new IllegalStateException(SCANS_DISABLED_MESSAGE);
Expand All @@ -153,6 +150,13 @@ private Record getRecord(Policy policy, Key key, String[] binNames) {
return client.get(policy, key, binNames);
}

private QueryPolicy getQueryPolicy(Query query, boolean includeBins) {
QueryPolicy queryPolicy = new QueryPolicy(this.queryPolicy);
queryPolicy.filterExp = filterExpressionsBuilder.build(query);
queryPolicy.includeBinData = includeBins;
return queryPolicy;
}

@Deprecated(since = "4.6.0", forRemoval = true)
public enum Meta {
KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Flux<KeyRecord> select(String namespace, String set, @Nullable Query quer
/**
* Select records filtered by a Filter and Qualifiers
*
* @param namespace Namespace to storing the data
* @param namespace Namespace to store the data
* @param set Set storing the data
* @param binNames Bin names to return from the query
* @param query {@link Query} for filtering results
Expand All @@ -102,8 +102,7 @@ public Flux<KeyRecord> select(String namespace, String set, String[] binNames, @
*/
Statement statement = statementBuilder.build(namespace, set, query, binNames);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = new QueryPolicy(queryPolicy);
localQueryPolicy.filterExp = filterExpressionsBuilder.build(query);
QueryPolicy localQueryPolicy = getQueryPolicy(query, true);

if (!scansEnabled && statement.getFilter() == null) {
return Flux.error(new IllegalStateException(QueryEngine.SCANS_DISABLED_MESSAGE));
Expand All @@ -115,17 +114,15 @@ public Flux<KeyRecord> select(String namespace, String set, String[] binNames, @
/**
* Select records filtered by a query to be counted
*
* @param namespace Namespace to storing the data
* @param namespace Namespace to store the data
* @param set Set storing the data
* @param query {@link Query} for filtering results
* @return A Flux<KeyRecord> to iterate over the results
* @return A Flux<KeyRecord> for counting
*/
public Flux<KeyRecord> selectForCount(String namespace, String set, @Nullable Query query) {
Statement statement = statementBuilder.build(namespace, set, query);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = new QueryPolicy(queryPolicy);
localQueryPolicy.filterExp = filterExpressionsBuilder.build(query);
localQueryPolicy.includeBinData = false;
QueryPolicy localQueryPolicy = getQueryPolicy(query, false);

if (!scansEnabled && statement.getFilter() == null) {
return Flux.error(new IllegalStateException(QueryEngine.SCANS_DISABLED_MESSAGE));
Expand All @@ -134,6 +131,13 @@ public Flux<KeyRecord> selectForCount(String namespace, String set, @Nullable Qu
return client.query(localQueryPolicy, statement);
}

private QueryPolicy getQueryPolicy(Query query, boolean includeBins) {
QueryPolicy queryPolicy = new QueryPolicy(this.queryPolicy);
queryPolicy.filterExp = filterExpressionsBuilder.build(query);
queryPolicy.includeBinData = includeBins;
return queryPolicy;
}

@SuppressWarnings("SameParameterValue")
private Mono<KeyRecord> getRecord(Policy policy, Key key, String[] binNames) {
if (binNames == null || binNames.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ private Object processPaginatedQuery(Class<?> targetClass, Pageable pageable, Qu
operations.findUsingQueryWithoutPostProcessing(entityClass, targetClass, query);
if (queryMethod.isSliceQuery()) {
return processSliceQuery(unprocessedResultsStream, pageable, query);
} else {
return processPageQuery(unprocessedResultsStream, pageable, query);
}
return processPageQuery(unprocessedResultsStream, pageable, query);
}

private Object processSliceQuery(Stream<?> unprocessedResultsStream, Pageable pageable, Query query) {
Expand All @@ -121,7 +120,6 @@ private Object processPageQuery(Stream<?> unprocessedResultsStream, Pageable pag
// and configuration parameter AerospikeDataSettings.queryMaxRecords is less than Integer.MAX_VALUE
List<?> unprocessedResults = unprocessedResultsStream.toList();
numberOfAllResults = unprocessedResults.size();

resultsPage = pageable.isUnpaged() ? unprocessedResults : applyPostProcessing(unprocessedResults.stream(),
query).toList();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,39 +85,35 @@ public Object execute(Object[] parameters) {
return sizeMono.flatMap(size ->
unprocessedResultsListMono.map(list -> getPage(list, size, pageable, query))
);
} else {
return sizeMono.map(size -> {
if (pageable.isUnpaged()) {
Mono<? extends List<?>> unprocessedResultsListMono = unprocessedResults.collectList();
return unprocessedResultsListMono.map(list -> getPage(list, size, pageable, query));
}
return getPage(unprocessedResults, size, pageable, query);
});
}
return sizeMono.map(size -> {
if (pageable.isUnpaged()) {
Mono<? extends List<?>> unprocessedResultsListMono = unprocessedResults.collectList();
return unprocessedResultsListMono.map(list -> getPage(list, size, pageable, query));
}
return getPage(unprocessedResults, size, pageable, query);
});
}

return findByQuery(query, targetClass);
}

public Object getPage(List<?> unprocessedResults, long overallSize, Pageable pageable,
Query query) {
public Object getPage(List<?> unprocessedResults, long overallSize, Pageable pageable, Query query) {
if (queryMethod.isSliceQuery()) {
return processSliceQuery(unprocessedResults, overallSize, pageable, query);
} else {
return processPageQuery(unprocessedResults, overallSize, pageable, query);
}
}

public Object getPage(Flux<?> unprocessedResults, long overallSize, Pageable pageable,
Query query) {
public Object getPage(Flux<?> unprocessedResults, long overallSize, Pageable pageable, Query query) {
if (queryMethod.isSliceQuery()) {
List<?> resultsPaginated = applyPostProcessing(unprocessedResults, query).toList();
boolean hasNext = overallSize > pageable.getPageSize() * (pageable.getOffset() + 1);
return new SliceImpl<>(resultsPaginated, pageable, hasNext);
} else {
List<?> resultsPaginated = applyPostProcessing(unprocessedResults, query).toList();
return new PageImpl<>(resultsPaginated, pageable, overallSize);
}
List<?> resultsPaginated = applyPostProcessing(unprocessedResults, query).toList();
return new PageImpl<>(resultsPaginated, pageable, overallSize);
}

private Object processSliceQuery(List<?> unprocessedResults, long overallSize, Pageable pageable, Query query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public void countFindsAllItemsByGivenCriteriaAndRespectsIgnoreCase() {

assertThat(template.count(query2, Person.class)).isEqualTo(1);

assertThat(template.count(null, Person.class)).isEqualTo(3);

template.delete(template.findById(id, Person.class));
template.delete(template.findById(id2, Person.class));
template.delete(template.findById(id3, Person.class));
Expand Down
Loading

0 comments on commit afa5f62

Please sign in to comment.