Skip to content

Commit

Permalink
FMWK-116 Support for query with filtering and response parsing (#88)
Browse files Browse the repository at this point in the history
Co-authored-by: yrizhkov <[email protected]>
  • Loading branch information
agrgr and reugn authored Jan 19, 2023
1 parent c0b4001 commit ae443de
Show file tree
Hide file tree
Showing 38 changed files with 1,310 additions and 297 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,37 @@
import com.aerospike.client.BatchRecord;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.exp.Expression;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.documentapi.batch.BatchOperation;
import com.aerospike.documentapi.data.DocumentFilterExp;
import com.aerospike.documentapi.data.DocumentQueryStatement;
import com.aerospike.documentapi.data.KeyResult;
import com.aerospike.documentapi.jsonpath.JsonPathObject;
import com.aerospike.documentapi.jsonpath.JsonPathParser;
import com.aerospike.documentapi.jsonpath.JsonPathQuery;
import com.aerospike.documentapi.policy.DocumentPolicy;
import com.aerospike.documentapi.util.Lut;
import com.fasterxml.jackson.databind.JsonNode;
import net.minidev.json.JSONArray;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* Primary object for accessing and mutating documents.
Expand All @@ -31,19 +44,22 @@ public class AerospikeDocumentClient implements IAerospikeDocumentClient {
private final Policy readPolicy;
private final WritePolicy writePolicy;
private final BatchPolicy batchPolicy;
private final QueryPolicy queryPolicy;

public AerospikeDocumentClient(IAerospikeClient client) {
this.aerospikeDocumentRepository = new AerospikeDocumentRepository(client);
this.readPolicy = client.getReadPolicyDefault();
this.writePolicy = client.getWritePolicyDefault();
this.batchPolicy = client.getBatchPolicyDefault();
this.queryPolicy = client.getQueryPolicyDefault();
}

public AerospikeDocumentClient(IAerospikeClient client, DocumentPolicy documentPolicy) {
this.aerospikeDocumentRepository = new AerospikeDocumentRepository(client);
this.readPolicy = documentPolicy.getReadPolicy();
this.writePolicy = documentPolicy.getWritePolicy();
this.batchPolicy = documentPolicy.getBatchPolicy();
this.queryPolicy = documentPolicy.getQueryPolicy();
}

@Override
Expand Down Expand Up @@ -175,6 +191,66 @@ public List<BatchRecord> batchPerform(List<BatchOperation> batchOperations, bool
.collect(Collectors.toList());
}

@Override
public Stream<KeyResult> query(DocumentQueryStatement queryStatement, DocumentFilterExp... docFilters) {
QueryPolicy policy = new QueryPolicy(queryPolicy);
policy.filterExp = getFilterExp(docFilters);

Stream<KeyRecord> keyRecords = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
aerospikeDocumentRepository.query(policy, queryStatement.toStatement()).iterator(),
Spliterator.ORDERED
), false);

// no need to parse if there is no jsonPath given
if (queryStatement.getJsonPaths() == null || queryStatement.getJsonPaths().length == 0) {
return keyRecords.map(keyRecord -> new KeyResult(keyRecord.key, keyRecord.record));
}

// parsing KeyRecords to return the required objects
return keyRecords
.map(keyRecord -> getKeyResult(keyRecord.key,
getResults(queryStatement.getJsonPaths(), keyRecord.record.bins)))
.filter(Objects::nonNull);
}

private KeyResult getKeyResult(Key key, Map<String, Object> results) {
return results.isEmpty() ? null : new KeyResult(key, results);
}

private Expression getFilterExp(DocumentFilterExp[] docFilters) {
if (docFilters == null || docFilters.length == 0) return null;

List<Exp> filterExps = Arrays.stream(docFilters)
.filter(Objects::nonNull)
.map(DocumentFilterExp::toFilterExpression)
.filter(Objects::nonNull)
.collect(Collectors.toList());

if (filterExps.isEmpty()) return null;

Exp expResult = filterExps.size() == 1 ?
filterExps.get(0)
: Exp.and(filterExps.toArray(new Exp[0]));
return Exp.build(expResult);
}

private Map<String, Object> getResults(String[] jsonPaths, Map<String, Object> bins) {
if (jsonPaths == null || jsonPaths.length == 0) return null;

Map<String, Object> res = new HashMap<>();
bins.values()
.forEach(binValue -> Arrays.stream(jsonPaths)
.forEach(jsonPath -> addNonNull(jsonPath, JsonPathQuery.read(binValue, jsonPath), res))
);
return res;
}

private void addNonNull(String jsonPath, Object readRes, Map<String, Object> res) {
if (readRes == null || (readRes instanceof JSONArray && ((JSONArray) readRes).isEmpty())) return;

res.put(jsonPath, readRes);
}

private WritePolicy getLutPolicy(Map<String, Object> result) {
return Lut.setLutPolicy(new WritePolicy(writePolicy), (long) result.get(Lut.LUT_BIN));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.documentapi.jsonpath.JsonPathObject;
import com.aerospike.documentapi.jsonpath.PathDetails;
import com.aerospike.documentapi.util.Lut;
Expand All @@ -24,6 +27,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static com.aerospike.documentapi.util.Utils.createBin;
import static com.aerospike.documentapi.util.Utils.getPathDetails;

class AerospikeDocumentRepository implements IAerospikeDocumentRepository {
Expand All @@ -45,7 +49,7 @@ public Map<String, Object> get(Policy readPolicy, Key key, Collection<String> bi
JsonPathObject jsonPathObject, boolean withLut) {
Map<String, Object> results = new HashMap<>();
// If there are no parts, retrieve the full document
if (jsonPathObject.getPathParts().isEmpty()) {
if (jsonPathObject.getTokensNotRequiringSecondStepQuery().isEmpty()) {
List<Operation> operations = new ArrayList<>();
for (String binName : binNames) {
operations.add(Operation.get(binName));
Expand All @@ -59,10 +63,10 @@ public Map<String, Object> get(Policy readPolicy, Key key, Collection<String> bi
results.putAll(rec.bins);
}
} else { // else retrieve using pure contexts
PathDetails pathDetails = getPathDetails(jsonPathObject.getPathParts(), true);
PathDetails pathDetails = getPathDetails(jsonPathObject.getTokensNotRequiringSecondStepQuery(), true);

List<Operation> operations = binNames.stream()
.map(binName -> pathDetails.getFinalPathPart().toAerospikeGetOperation(
.map(binName -> pathDetails.getFinalToken().toAerospikeGetOperation(
binName,
pathDetails.getCtxArray())
).collect(Collectors.toList());
Expand Down Expand Up @@ -100,20 +104,20 @@ public void put(WritePolicy writePolicy, Key key, Collection<String> binNames, O
JsonPathObject jsonPathObject) {
Operation[] operations;
// If there are no parts, put the full document
if (jsonPathObject.getPathParts().isEmpty()) {
if (jsonPathObject.getTokensNotRequiringSecondStepQuery().isEmpty()) {
operations = binNames.stream()
.map(bn -> {
Bin bin = new Bin(bn, jsonObject);
.map(binName -> {
Bin bin = createBin(binName, jsonObject);
return Operation.put(bin);
})
.toArray(Operation[]::new);
client.operate(writePolicy, key, operations);
} else { // else put using contexts
PathDetails pathDetails = getPathDetails(jsonPathObject.getPathParts(), true);
PathDetails pathDetails = getPathDetails(jsonPathObject.getTokensNotRequiringSecondStepQuery(), true);

try {
operations = binNames.stream()
.map(binName -> pathDetails.getFinalPathPart().toAerospikePutOperation(
.map(binName -> pathDetails.getFinalToken().toAerospikePutOperation(
binName,
jsonObject,
pathDetails.getCtxArray())
Expand All @@ -129,20 +133,20 @@ public void put(WritePolicy writePolicy, Key key, Collection<String> binNames, O
public void put(WritePolicy writePolicy, Key key, Map<String, Object> queryResults, JsonPathObject jsonPathObject) {
Operation[] operations;
// If there are no parts, put the full document
if (jsonPathObject.getPathParts().isEmpty()) {
if (jsonPathObject.getTokensNotRequiringSecondStepQuery().isEmpty()) {
operations = queryResults.entrySet().stream()
.map(e -> {
Bin bin = new Bin(e.getKey(), e.getValue());
Bin bin = createBin(e.getKey(), e.getValue());
return Operation.put(bin);
})
.toArray(Operation[]::new);
client.operate(writePolicy, key, operations);
} else { // else put using contexts
PathDetails pathDetails = getPathDetails(jsonPathObject.getPathParts(), true);
PathDetails pathDetails = getPathDetails(jsonPathObject.getTokensNotRequiringSecondStepQuery(), true);

try {
operations = queryResults.entrySet().stream()
.map(entry -> pathDetails.getFinalPathPart().toAerospikePutOperation(
.map(entry -> pathDetails.getFinalToken().toAerospikePutOperation(
entry.getKey(),
entry.getValue(),
pathDetails.getCtxArray())
Expand All @@ -158,14 +162,14 @@ public void put(WritePolicy writePolicy, Key key, Map<String, Object> queryResul
public void append(WritePolicy writePolicy, Key key, Collection<String> binNames, String jsonPath,
Object jsonObject, JsonPathObject jsonPathObject) {
// If there are no parts, you can't append
if (jsonPathObject.getPathParts().isEmpty()) {
if (jsonPathObject.getTokensNotRequiringSecondStepQuery().isEmpty()) {
throw new DocumentApiException.JsonAppendException(jsonPath);
} else {
PathDetails pathDetails = getPathDetails(jsonPathObject.getPathParts(), false);
PathDetails pathDetails = getPathDetails(jsonPathObject.getTokensNotRequiringSecondStepQuery(), false);

try {
Operation[] operations = binNames.stream()
.map(binName -> pathDetails.getFinalPathPart().toAerospikeAppendOperation(
.map(binName -> pathDetails.getFinalToken().toAerospikeAppendOperation(
binName,
jsonObject,
pathDetails.getCtxArray())
Expand All @@ -180,17 +184,17 @@ public void append(WritePolicy writePolicy, Key key, Collection<String> binNames
@Override
public void delete(WritePolicy writePolicy, Key key, Collection<String> binNames, JsonPathObject jsonPathObject) {
// If there are no parts, put an empty map in each given bin
if (jsonPathObject.getPathParts().isEmpty()) {
if (jsonPathObject.getTokensNotRequiringSecondStepQuery().isEmpty()) {
Operation[] operations = binNames.stream()
.map(MapOperation::clear)
.toArray(Operation[]::new);
client.operate(writePolicy, key, operations);
} else {
PathDetails pathDetails = getPathDetails(jsonPathObject.getPathParts(), true);
PathDetails pathDetails = getPathDetails(jsonPathObject.getTokensNotRequiringSecondStepQuery(), true);

try {
Operation[] operations = binNames.stream()
.map(bName -> pathDetails.getFinalPathPart().toAerospikeDeleteOperation(
.map(bName -> pathDetails.getFinalToken().toAerospikeDeleteOperation(
bName,
pathDetails.getCtxArray())
).toArray(Operation[]::new);
Expand All @@ -209,4 +213,13 @@ public boolean batchPerform(BatchPolicy batchPolicy, List<BatchRecord> batchReco
throw DocumentApiException.toDocumentException(e);
}
}

@Override
public RecordSet query(QueryPolicy policy, Statement statement) {
try {
return client.query(policy, statement);
} catch (AerospikeException e) {
throw DocumentApiException.toDocumentException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public JsonPrefixException(String jsonString) {
*/
public static class JsonPathException extends DocumentApiException {
public JsonPathException(String jsonString) {
super(String.format("'%s' does not match key[number] format", jsonString));
super(String.format("'%s' does not match JsonPath format", jsonString));
}
}

Expand All @@ -63,4 +63,13 @@ public JsonAppendException(String jsonString) {
super(String.format("Cannot append to '%s'", jsonString));
}
}

/**
* Exception to be thrown in case of invalid json path.
*/
public static class JsonPathParseException extends DocumentApiException {
public JsonPathParseException(String jsonPathPart) {
super(String.format("Unable to parse '%s' as jsonPath token", jsonPathPart));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import com.aerospike.client.BatchRecord;
import com.aerospike.client.Key;
import com.aerospike.documentapi.batch.BatchOperation;
import com.aerospike.documentapi.data.DocumentFilterExp;
import com.aerospike.documentapi.data.DocumentQueryStatement;
import com.aerospike.documentapi.data.KeyResult;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public interface IAerospikeDocumentClient {

Expand Down Expand Up @@ -119,4 +123,22 @@ public interface IAerospikeDocumentClient {
* @throws IllegalArgumentException if the batch has multiple two-step operations with the same key.
*/
List<BatchRecord> batchPerform(List<BatchOperation> batchOperations, boolean parallel);

/**
* Perform query.
* <p>Filtering can be done by setting one or more of the following items:</p>
* <ul>
* <li>optional secondary index filter (record level),</li>
* <li>optional document filter expressions (record level),</li>
* <li>optional bin names (bin level),</li>
* <li>optional json paths (inner objects less than a bin if necessary).</li>
* </ul>
*
* @param queryStatement object for building query definition, storing required bin names, json paths
* and secondary index filter
* @param documentFilterExpressions filter expressions
* @return stream of {@link KeyResult} objects
* @throws DocumentApiException if query fails
*/
Stream<KeyResult> query(DocumentQueryStatement queryStatement, DocumentFilterExp... documentFilterExpressions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import com.aerospike.client.Key;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.documentapi.jsonpath.JsonPathObject;
import com.fasterxml.jackson.databind.JsonNode;

Expand Down Expand Up @@ -33,4 +36,6 @@ void append(WritePolicy writePolicy, Key key, Collection<String> binNames, Strin
void delete(WritePolicy writePolicy, Key key, Collection<String> binNames, JsonPathObject jsonPathObject);

boolean batchPerform(BatchPolicy batchPolicy, List<BatchRecord> batchRecords);

RecordSet query(QueryPolicy policy, Statement statement);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ protected AbstractBatchOperation(Key key, Collection<String> binNames, String js

@Override
public void setFirstStepRecord() {
final PathDetails pathDetails = getPathDetails(jsonPathObject.getPathParts(), true);
final PathDetails pathDetails = getPathDetails(jsonPathObject.getTokensNotRequiringSecondStepQuery(), true);
List<Operation> batchOperations = binNames.stream()
.map(binName -> pathDetails.getFinalPathPart()
.map(binName -> pathDetails.getFinalToken()
.toAerospikeGetOperation(binName, pathDetails.getCtxArray()))
.collect(Collectors.toList());

Expand Down Expand Up @@ -96,7 +96,7 @@ protected Object firstStepJsonPathQuery(Map.Entry<String, Object> entry) {

protected Operation toPutOperation(String binName, Object objToPut, PathDetails pathDetails) {
try {
return pathDetails.getFinalPathPart()
return pathDetails.getFinalToken()
.toAerospikePutOperation(binName, objToPut, pathDetails.getCtxArray());
} catch (IllegalArgumentException e) {
errorBinName = binName;
Expand Down
Loading

0 comments on commit ae443de

Please sign in to comment.