Skip to content

Commit

Permalink
add support for findByQualifiers() for reactive flow, updates for not…
Browse files Browse the repository at this point in the history
… setting statement filter if EXCLUDE_FILTER == true and for reusing Qualifiers
  • Loading branch information
agrgr committed Oct 11, 2023
1 parent 46e7738 commit b6cbd15
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -557,5 +557,13 @@ <T> void createIndex(Class<T> entityClass, String indexName, String binName,
*/
boolean indexExists(String indexName);

/**
* Find all documents in the given entityClass's set using provided qualifiers.
*
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param filter Secondary index filter.
* @param qualifiers Qualifiers to build filter expressions from.
* @return Stream of entities
*/
<T> Stream<?> findAllUsingQuery(Class<T> entityClass, Filter filter, Qualifier... qualifiers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

import com.aerospike.client.AerospikeException;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.reactor.IAerospikeReactorClient;
import org.springframework.data.aerospike.core.model.GroupedEntities;
import org.springframework.data.aerospike.core.model.GroupedKeys;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.domain.Sort;
import org.springframework.data.mapping.context.MappingContext;
Expand Down Expand Up @@ -517,4 +519,14 @@ <T> Mono<Void> createIndex(Class<T> entityClass, String indexName, String binNam
* @return true if exists.
*/
Mono<Boolean> indexExists(String indexName);

/**
* Find all documents in the given entityClass's set using provided {@link Qualifier}s.
*
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param filter Secondary index filter.
* @param qualifiers Qualifiers to build filter expressions from.
* @return Flux of entities.
*/
<T> Flux<T> findAllUsingQuery(Class<T> entityClass, Filter filter, Qualifier... qualifiers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,13 @@ <T, S> Object mapToEntity(KeyRecord keyRecord, Class<T> entityClass, Class<S> ta
return mapToEntity(keyRecord.key, entityClass, keyRecord.record);
}

@Override
public <T> Flux<T> findAllUsingQuery(Class<T> entityClass, Filter filter,
Qualifier... qualifiers) {
return findAllRecordsUsingQuery(entityClass, null, filter, qualifiers)
.map(keyRecord -> mapToEntity(keyRecord.key, entityClass, keyRecord.record));
}

<T, S> Flux<?> findAllUsingQuery(Class<T> entityClass, Class<S> targetClass, Filter filter,
Qualifier... qualifiers) {
return findAllRecordsUsingQuery(entityClass, targetClass, filter, qualifiers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ public Expression build(Qualifier[] qualifiers) {
* sIndexFilter and FilterExpression. The filter is irrelevant for AND operation (nested qualifiers)
*/
private boolean excludeIrrelevantFilters(Qualifier qualifier) {
return !qualifier.queryAsFilter() ||
(qualifier.queryAsFilter() && FilterOperation.dualFilterOperations.contains(qualifier.getOperation()));
if (!qualifier.queryAsFilter()) {
return true;
} else if (qualifier.queryAsFilter() && FilterOperation.dualFilterOperations.contains(qualifier.getOperation())) {
qualifier.setQueryAsFilter(false); // clear the flag in case if the same Qualifier is going to be reused
return true;
} else {
qualifier.setQueryAsFilter(false); // clear the flag in case if the same Qualifier is going to be reused
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class Qualifier implements Map<String, Object>, Serializable {
protected static final String QUALIFIERS = "qualifiers";
protected static final String OPERATION = "operation";
protected static final String AS_FILTER = "queryAsFilter";
protected static final String EXCLUDE_FILTER = "excludeFilter";
@Serial
private static final long serialVersionUID = -2689196529952712849L;
protected final Map<String, Object> internalMap;
Expand All @@ -77,14 +78,22 @@ public CriteriaDefinition.AerospikeMetadata getMetadataField() {
return (CriteriaDefinition.AerospikeMetadata) internalMap.get(METADATA_FIELD);
}

public void asFilter(Boolean queryAsFilter) {
public void setQueryAsFilter(Boolean queryAsFilter) {
internalMap.put(AS_FILTER, queryAsFilter);
}

public Boolean queryAsFilter() {
return internalMap.containsKey(AS_FILTER) && (Boolean) internalMap.get(AS_FILTER);
}

public boolean getExcludeFilter() {
return internalMap.containsKey(EXCLUDE_FILTER) && (Boolean) internalMap.get(EXCLUDE_FILTER);
}

public void setExcludeFilter(boolean excludeFilter) {
internalMap.put(EXCLUDE_FILTER, excludeFilter);
}

public Qualifier[] getQualifiers() {
return (Qualifier[]) internalMap.get(QUALIFIERS);
}
Expand All @@ -109,7 +118,7 @@ public String getDotPath() {
return (String) internalMap.get(DOT_PATH);
}

public Filter asFilter() {
public Filter setQueryAsFilter() {
return FilterOperation.valueOf(getOperation().toString()).sIndexFilter(internalMap);
}

Expand Down Expand Up @@ -309,6 +318,11 @@ public QualifierBuilder setQualifiers(Qualifier... qualifiers) {
return this;
}

public QualifierBuilder setExcludeFilter(boolean excludeFilter) {
this.map.put(EXCLUDE_FILTER, excludeFilter);
return this;
}

public QualifierBuilder setValue1(Value value1) {
this.map.put(VALUE1, value1);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ public Statement build(String namespace, String set, Filter filter, Qualifier[]
}
if (filter != null) {
stmt.setFilter(filter);
}
if (qualifiers != null && qualifiers.length != 0) {
updateStatement(stmt, qualifiers);
} else if (qualifiers != null && qualifiers.length != 0) {
// statement's filter is set based on the first processed qualifier's filter
// if the qualifier doesn't have EXCLUDE_FILTER set to true
setStatementFilterFromQualifiers(stmt, qualifiers);
}
return stmt;
}

private void updateStatement(Statement stmt, Qualifier[] qualifiers) {
private void setStatementFilterFromQualifiers(Statement stmt, Qualifier[] qualifiers) {
/*
* query with filters
*/
Expand All @@ -65,28 +66,29 @@ private void updateStatement(Statement stmt, Qualifier[] qualifiers) {
if (qualifier.getOperation() == FilterOperation.AND) {
// no sense to use secondary index in case of OR
// as it requires to enlarge selection to more than 1 field
for (Qualifier q : qualifier.getQualifiers()) {
if (q != null && isIndexedBin(stmt, q)) {
Filter filter = q.asFilter();
for (Qualifier innerQualifier : qualifier.getQualifiers()) {
if (innerQualifier != null && !innerQualifier.getExcludeFilter()
&& isIndexedBin(stmt, innerQualifier)) {
Filter filter = innerQualifier.setQueryAsFilter();
if (filter != null) {
stmt.setFilter(filter);
q.asFilter(true);
break;
innerQualifier.setQueryAsFilter(true);
break; // the first processed filter becomes statement filter
}
}
}
} else if (isIndexedBin(stmt, qualifier)) {
Filter filter = qualifier.asFilter();
} else if (!qualifier.getExcludeFilter() && isIndexedBin(stmt, qualifier)) {
Filter filter = qualifier.setQueryAsFilter();
if (filter != null) {
stmt.setFilter(filter);
qualifier.asFilter(true);
qualifier.setQueryAsFilter(true);
/* If this was the only qualifier, we do not need to do anymore work, just return
* the query iterator.
*/
if (qualifiers.length == 1) {
return;
}
break;
break; // the first processed filter becomes statement filter
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public interface AerospikeRepository<T, ID> extends PagingAndSortingRepository<T
/**
* Create an index with the specified name.
*
* @param domainType The class to extract the Aerospike set from. Must not be {@literal null}
* @param indexName The index name. Must not be {@literal null}
* @param binName The bin name to create the index on. Must not be {@literal null}
* @param indexType The type of the index. Must not be {@literal null}
* @param domainType The class to extract the Aerospike set from. Must not be {@literal null}.
* @param indexName The index name. Must not be {@literal null}.
* @param binName The bin name to create the index on. Must not be {@literal null}.
* @param indexType The type of the index. Must not be {@literal null}.
*/
<E> void createIndex(Class<E> domainType, String indexName, String binName, IndexType indexType);

Expand All @@ -53,20 +53,20 @@ public interface AerospikeRepository<T, ID> extends PagingAndSortingRepository<T
* Checks whether an index with the specified name exists in Aerospike.
*
* @param indexName The Aerospike index name.
* @return true if exists
* @return true if exists.
*/
boolean indexExists(String indexName);

/**
* Run a query to find entities by providing qualifiers.
* Run a query to find entities by providing {@link Qualifier}s.
* <p>
* If multiple qualifiers are given, they are combined using AND.
* <p>
* Each qualifier itself might contain internal qualifiers and combine them using either {@link FilterOperation#AND}
* Each qualifier itself may contain internal qualifiers and combine them using either {@link FilterOperation#AND}
* or {@link FilterOperation#OR}.
*
* @param qualifiers One or more qualifiers representing expressions
* @return Iterable of entities
* @param qualifiers One or more qualifiers representing expressions.
* @return Iterable of entities.
*/
Iterable<T> findByQualifiers(Qualifier... qualifiers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,29 @@
*/
package org.springframework.data.aerospike.repository;

import org.springframework.data.aerospike.query.FilterOperation;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.repository.Repository;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;

/**
* Aerospike specific {@link Repository} interface with reactive support.
*
* @author Igor Ermolenko
*/
public interface ReactiveAerospikeRepository<T, ID> extends ReactiveCrudRepository<T, ID> {

/**
* Run a query to find entities by providing {@link Qualifier}s.
* <p>
* If multiple qualifiers are given, they are combined using AND.
* <p>
* Each qualifier itself may contain internal qualifiers and combine them using either {@link FilterOperation#AND}
* or {@link FilterOperation#OR}.
*
* @param qualifiers One or more qualifiers representing expressions.
* @return Flux of entities.
*/
Flux<T> findByQualifiers(Qualifier... qualifiers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ public boolean indexExists(String indexName) {

@Override
public Iterable<T> findByQualifiers(Qualifier... qualifiers) {
Arrays.stream(qualifiers).forEach(Qualifier::validateQualifier);
Arrays.stream(qualifiers).forEach(qualifier -> {
qualifier.setExcludeFilter(true);
Qualifier.validateQualifier(qualifier);
});
return (Iterable<T>) operations.findAllUsingQuery(entityInformation.getJavaType(), null, qualifiers).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import lombok.RequiredArgsConstructor;
import org.reactivestreams.Publisher;
import org.springframework.data.aerospike.core.ReactiveAerospikeOperations;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.aerospike.repository.ReactiveAerospikeRepository;
import org.springframework.data.repository.core.EntityInformation;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;

/**
* Stub implementation of {@link ReactiveAerospikeRepository}.
*
Expand Down Expand Up @@ -151,4 +154,10 @@ public void createIndex(Class<T> domainType, String indexName, String binName, I
public void deleteIndex(Class<T> domainType, String indexName) {
operations.deleteIndex(domainType, indexName);
}

@Override
public Flux<T> findByQualifiers(Qualifier... qualifiers) {
Arrays.stream(qualifiers).forEach(Qualifier::validateQualifier);
return operations.findAllUsingQuery(entityInformation.getJavaType(), null, qualifiers);
}
}
Loading

0 comments on commit b6cbd15

Please sign in to comment.