Skip to content

Commit

Permalink
FMWK-124 Creating secondary index based on json path (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr authored Jan 24, 2023
1 parent ae443de commit 033ff39
Show file tree
Hide file tree
Showing 13 changed files with 540 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.documentapi.batch.BatchOperation;
import com.aerospike.documentapi.data.DocumentFilter;
import com.aerospike.documentapi.data.DocumentFilterExp;
import com.aerospike.documentapi.data.DocumentQueryStatement;
import com.aerospike.documentapi.data.DocumentFilterSecIndex;
import com.aerospike.documentapi.data.KeyResult;
import com.aerospike.documentapi.jsonpath.JsonPathObject;
import com.aerospike.documentapi.jsonpath.JsonPathParser;
Expand Down Expand Up @@ -192,12 +195,13 @@ public List<BatchRecord> batchPerform(List<BatchOperation> batchOperations, bool
}

@Override
public Stream<KeyResult> query(DocumentQueryStatement queryStatement, DocumentFilterExp... docFilters) {
public Stream<KeyResult> query(DocumentQueryStatement queryStatement, DocumentFilter... docFilters) {
QueryPolicy policy = new QueryPolicy(queryPolicy);
policy.filterExp = getFilterExp(docFilters);

Filter secIndexFilter = getSecIndexFilter(docFilters);
Stream<KeyRecord> keyRecords = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
aerospikeDocumentRepository.query(policy, queryStatement.toStatement()).iterator(),
aerospikeDocumentRepository.query(policy, queryStatement.toStatement(secIndexFilter)).iterator(),
Spliterator.ORDERED
), false);

Expand All @@ -213,16 +217,29 @@ public Stream<KeyResult> query(DocumentQueryStatement queryStatement, DocumentFi
.filter(Objects::nonNull);
}

private Filter getSecIndexFilter(DocumentFilter[] docFilters) {
if (docFilters == null || docFilters.length == 0) return null;

return Arrays.stream(docFilters)
.filter(Objects::nonNull)
.filter(DocumentFilterSecIndex.class::isInstance)
.map(filterExp -> ((DocumentFilterSecIndex) filterExp).toSecIndexFilter())
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

private KeyResult getKeyResult(Key key, Map<String, Object> results) {
return results.isEmpty() ? null : new KeyResult(key, results);
}

private Expression getFilterExp(DocumentFilterExp[] docFilters) {
private Expression getFilterExp(DocumentFilter[] docFilters) {
if (docFilters == null || docFilters.length == 0) return null;

List<Exp> filterExps = Arrays.stream(docFilters)
.filter(Objects::nonNull)
.map(DocumentFilterExp::toFilterExpression)
.filter(DocumentFilterExp.class::isInstance)
.map(filterExp -> ((DocumentFilterExp) filterExp).toFilterExp())
.filter(Objects::nonNull)
.collect(Collectors.toList());

Expand All @@ -235,7 +252,7 @@ private Expression getFilterExp(DocumentFilterExp[] docFilters) {
}

private Map<String, Object> getResults(String[] jsonPaths, Map<String, Object> bins) {
if (jsonPaths == null || jsonPaths.length == 0) return null;
if (jsonPaths == null || jsonPaths.length == 0) return Collections.emptyMap();

Map<String, Object> res = new HashMap<>();
bins.values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import com.aerospike.client.BatchRecord;
import com.aerospike.client.Key;
import com.aerospike.documentapi.batch.BatchOperation;
import com.aerospike.documentapi.data.DocumentFilterExp;
import com.aerospike.documentapi.data.DocFilterExp;
import com.aerospike.documentapi.data.DocumentFilter;
import com.aerospike.documentapi.data.DocumentQueryStatement;
import com.aerospike.documentapi.data.KeyResult;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -134,11 +135,11 @@ public interface IAerospikeDocumentClient {
* <li>optional json paths (inner objects less than a bin if necessary).</li>
* </ul>
*
* @param queryStatement object for building query definition, storing required bin names, json paths
* and secondary index filter
* @param documentFilterExpressions filter expressions
* @param queryStatement object for building query definition, storing required bin names and json paths
* @param documentFilters filters (can include one secondary index filter and/or one or more filter expressions;
* if there are multiple filter expressions given they are concatenated using logical AND)
* @return stream of {@link KeyResult} objects
* @throws DocumentApiException if query fails
*/
Stream<KeyResult> query(DocumentQueryStatement queryStatement, DocumentFilterExp... documentFilterExpressions);
Stream<KeyResult> query(DocumentQueryStatement queryStatement, DocumentFilter... documentFilters);
}
46 changes: 46 additions & 0 deletions src/main/java/com/aerospike/documentapi/data/DocFilterExp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.aerospike.documentapi.data;

import com.aerospike.client.exp.Exp;
import com.aerospike.documentapi.util.ExpConverter;

public class DocFilterExp implements DocumentFilterExp {

private final String binName;
private final String jsonPath;
private final Operator operator;
private final Object value;
private Integer regexFlags = null;

public DocFilterExp(String binName, String jsonPath, Operator operator, Object value) {
this.binName = binName;
this.jsonPath = jsonPath;
this.operator = operator;
this.value = value;
}

public Exp toFilterExp() {
switch (operator) {
case LT:
return ExpConverter.lt(binName, jsonPath, value);
case GT:
return ExpConverter.gt(binName, jsonPath, value);
case LE:
return ExpConverter.le(binName, jsonPath, value);
case GE:
return ExpConverter.ge(binName, jsonPath, value);
case EQ:
return ExpConverter.eq(binName, jsonPath, value);
case NE:
return ExpConverter.ne(binName, jsonPath, value);
case REGEX:
return ExpConverter.regex(binName, jsonPath, value.toString(), regexFlags);
default:
return null;
}
}

@Override
public void setRegexFlags(int regexFlags) {
this.regexFlags = regexFlags;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.aerospike.documentapi.data;

import com.aerospike.client.query.Filter;
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.documentapi.util.FilterConverter;

import static com.aerospike.client.query.IndexCollectionType.DEFAULT;

public class DocFilterSecIndex implements DocumentFilterSecIndex {

private final String binName;
private final String jsonPath;
private final Operator operator;
private final Object value;
private IndexCollectionType idxCollectionType = DEFAULT;

public DocFilterSecIndex(String binName, String jsonPath, Operator operator, Object value) {
this.binName = binName;
this.jsonPath = jsonPath;
this.operator = operator;
this.value = value;
}

public Filter toSecIndexFilter() {
switch (operator) {
case LT:
return FilterConverter.lt(binName, jsonPath, value, idxCollectionType);
case GT:
return FilterConverter.gt(binName, jsonPath, value, idxCollectionType);
case LE:
return FilterConverter.le(binName, jsonPath, value, idxCollectionType);
case GE:
return FilterConverter.ge(binName, jsonPath, value, idxCollectionType);
case EQ:
return FilterConverter.eq(binName, jsonPath, value);
default:
throw new UnsupportedOperationException(String.format("'%s' secondary filter is not supported", operator));
}
}

@Override
public void setIdxCollectionType(IndexCollectionType idxCollectionType) {
this.idxCollectionType = idxCollectionType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.aerospike.documentapi.data;

/**
* Document filter interface extended by {@link DocumentFilterExp} and {@link DocumentFilterSecIndex}.
*/
public interface DocumentFilter {
}
48 changes: 18 additions & 30 deletions src/main/java/com/aerospike/documentapi/data/DocumentFilterExp.java
Original file line number Diff line number Diff line change
@@ -1,36 +1,24 @@
package com.aerospike.documentapi.data;

import com.aerospike.client.exp.Exp;
import com.aerospike.documentapi.util.DocumentExp;

public class DocumentFilterExp {
/**
* Base interface for creating filter expression.
*
* <p>For the supported json paths see {@link com.aerospike.documentapi.util.ExpConverter}.</p>
* <p>Supported operators: </p>
* <ul>
* <li>EQ</li>
* <li>NE</li>
* <li>GT</li>
* <li>GE</li>
* <li>LT</li>
* <li>LE</li>
* <li>REGEX</li>
* </ul>
*/
public interface DocumentFilterExp extends DocumentFilter {
Exp toFilterExp();

private Exp exp;

public DocumentFilterExp(String binName, String jsonPath, Operator.Simple operator, Object value) {
switch (operator) {
case LT:
exp = DocumentExp.lt(binName, jsonPath, value);
break;
case GT:
exp = DocumentExp.gt(binName, jsonPath, value);
break;
case LTE:
exp = DocumentExp.le(binName, jsonPath, value);
break;
case GTE:
exp = DocumentExp.ge(binName, jsonPath, value);
break;
case EQ:
exp = DocumentExp.eq(binName, jsonPath, value);
break;
case NE:
exp = DocumentExp.ne(binName, jsonPath, value);
break;
}
}

public Exp toFilterExpression() {
return exp;
}
void setRegexFlags(int regexFlags);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.aerospike.documentapi.data;

import com.aerospike.client.query.Filter;
import com.aerospike.client.query.IndexCollectionType;

/**
* Base interface for creating secondary index filter.
*
* <p>For the supported json paths see {@link com.aerospike.documentapi.util.FilterConverter}.</p>
* <p>Supported operators: </p>
* <ul>
* <li>EQ</li>
* <li>GT</li>
* <li>GE</li>
* <li>LT</li>
* <li>LE</li>
* </ul>
*/
public interface DocumentFilterSecIndex extends DocumentFilter {
Filter toSecIndexFilter();

void setIdxCollectionType(IndexCollectionType idxCollectionType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@ public class DocumentQueryStatement {
long maxRecords;
int recordsPerSecond;
String[] jsonPaths;
Filter secondaryIndexFilter;

public Statement toStatement() {
public Statement toStatement(Filter secIndexFilter) {
Statement statement = new Statement();
statement.setNamespace(namespace);
statement.setSetName(setName);
statement.setIndexName(indexName);
statement.setBinNames(binNames);
statement.setMaxRecords(maxRecords);
statement.setRecordsPerSecond(recordsPerSecond);
statement.setFilter(secondaryIndexFilter);
statement.setFilter(secIndexFilter);
return statement;
}
}
Loading

0 comments on commit 033ff39

Please sign in to comment.