Skip to content

Commit

Permalink
FMWK-125 Rollback for query support (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr authored Jan 25, 2023
1 parent 033ff39 commit c2fe9ec
Show file tree
Hide file tree
Showing 15 changed files with 0 additions and 1,230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,24 @@
import com.aerospike.client.BatchRecord;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.exp.Expression;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.documentapi.batch.BatchOperation;
import com.aerospike.documentapi.data.DocumentFilter;
import com.aerospike.documentapi.data.DocumentFilterExp;
import com.aerospike.documentapi.data.DocumentQueryStatement;
import com.aerospike.documentapi.data.DocumentFilterSecIndex;
import com.aerospike.documentapi.data.KeyResult;
import com.aerospike.documentapi.jsonpath.JsonPathObject;
import com.aerospike.documentapi.jsonpath.JsonPathParser;
import com.aerospike.documentapi.jsonpath.JsonPathQuery;
import com.aerospike.documentapi.policy.DocumentPolicy;
import com.aerospike.documentapi.util.Lut;
import com.fasterxml.jackson.databind.JsonNode;
import net.minidev.json.JSONArray;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* Primary object for accessing and mutating documents.
Expand All @@ -47,22 +31,19 @@ public class AerospikeDocumentClient implements IAerospikeDocumentClient {
private final Policy readPolicy;
private final WritePolicy writePolicy;
private final BatchPolicy batchPolicy;
private final QueryPolicy queryPolicy;

public AerospikeDocumentClient(IAerospikeClient client) {
this.aerospikeDocumentRepository = new AerospikeDocumentRepository(client);
this.readPolicy = client.getReadPolicyDefault();
this.writePolicy = client.getWritePolicyDefault();
this.batchPolicy = client.getBatchPolicyDefault();
this.queryPolicy = client.getQueryPolicyDefault();
}

public AerospikeDocumentClient(IAerospikeClient client, DocumentPolicy documentPolicy) {
this.aerospikeDocumentRepository = new AerospikeDocumentRepository(client);
this.readPolicy = documentPolicy.getReadPolicy();
this.writePolicy = documentPolicy.getWritePolicy();
this.batchPolicy = documentPolicy.getBatchPolicy();
this.queryPolicy = documentPolicy.getQueryPolicy();
}

@Override
Expand Down Expand Up @@ -194,80 +175,6 @@ public List<BatchRecord> batchPerform(List<BatchOperation> batchOperations, bool
.collect(Collectors.toList());
}

@Override
public Stream<KeyResult> query(DocumentQueryStatement queryStatement, DocumentFilter... docFilters) {
QueryPolicy policy = new QueryPolicy(queryPolicy);
policy.filterExp = getFilterExp(docFilters);

Filter secIndexFilter = getSecIndexFilter(docFilters);
Stream<KeyRecord> keyRecords = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
aerospikeDocumentRepository.query(policy, queryStatement.toStatement(secIndexFilter)).iterator(),
Spliterator.ORDERED
), false);

// no need to parse if there is no jsonPath given
if (queryStatement.getJsonPaths() == null || queryStatement.getJsonPaths().length == 0) {
return keyRecords.map(keyRecord -> new KeyResult(keyRecord.key, keyRecord.record));
}

// parsing KeyRecords to return the required objects
return keyRecords
.map(keyRecord -> getKeyResult(keyRecord.key,
getResults(queryStatement.getJsonPaths(), keyRecord.record.bins)))
.filter(Objects::nonNull);
}

private Filter getSecIndexFilter(DocumentFilter[] docFilters) {
if (docFilters == null || docFilters.length == 0) return null;

return Arrays.stream(docFilters)
.filter(Objects::nonNull)
.filter(DocumentFilterSecIndex.class::isInstance)
.map(filterExp -> ((DocumentFilterSecIndex) filterExp).toSecIndexFilter())
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

private KeyResult getKeyResult(Key key, Map<String, Object> results) {
return results.isEmpty() ? null : new KeyResult(key, results);
}

private Expression getFilterExp(DocumentFilter[] docFilters) {
if (docFilters == null || docFilters.length == 0) return null;

List<Exp> filterExps = Arrays.stream(docFilters)
.filter(Objects::nonNull)
.filter(DocumentFilterExp.class::isInstance)
.map(filterExp -> ((DocumentFilterExp) filterExp).toFilterExp())
.filter(Objects::nonNull)
.collect(Collectors.toList());

if (filterExps.isEmpty()) return null;

Exp expResult = filterExps.size() == 1 ?
filterExps.get(0)
: Exp.and(filterExps.toArray(new Exp[0]));
return Exp.build(expResult);
}

private Map<String, Object> getResults(String[] jsonPaths, Map<String, Object> bins) {
if (jsonPaths == null || jsonPaths.length == 0) return Collections.emptyMap();

Map<String, Object> res = new HashMap<>();
bins.values()
.forEach(binValue -> Arrays.stream(jsonPaths)
.forEach(jsonPath -> addNonNull(jsonPath, JsonPathQuery.read(binValue, jsonPath), res))
);
return res;
}

private void addNonNull(String jsonPath, Object readRes, Map<String, Object> res) {
if (readRes == null || (readRes instanceof JSONArray && ((JSONArray) readRes).isEmpty())) return;

res.put(jsonPath, readRes);
}

private WritePolicy getLutPolicy(Map<String, Object> result) {
return Lut.setLutPolicy(new WritePolicy(writePolicy), (long) result.get(Lut.LUT_BIN));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.documentapi.jsonpath.JsonPathObject;
import com.aerospike.documentapi.jsonpath.PathDetails;
import com.aerospike.documentapi.util.Lut;
Expand Down Expand Up @@ -213,13 +210,4 @@ public boolean batchPerform(BatchPolicy batchPolicy, List<BatchRecord> batchReco
throw DocumentApiException.toDocumentException(e);
}
}

@Override
public RecordSet query(QueryPolicy policy, Statement statement) {
try {
return client.query(policy, statement);
} catch (AerospikeException e) {
throw DocumentApiException.toDocumentException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@
import com.aerospike.client.BatchRecord;
import com.aerospike.client.Key;
import com.aerospike.documentapi.batch.BatchOperation;
import com.aerospike.documentapi.data.DocFilterExp;
import com.aerospike.documentapi.data.DocumentFilter;
import com.aerospike.documentapi.data.DocumentQueryStatement;
import com.aerospike.documentapi.data.KeyResult;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public interface IAerospikeDocumentClient {

Expand Down Expand Up @@ -124,22 +119,4 @@ public interface IAerospikeDocumentClient {
* @throws IllegalArgumentException if the batch has multiple two-step operations with the same key.
*/
List<BatchRecord> batchPerform(List<BatchOperation> batchOperations, boolean parallel);

/**
* Perform query.
* <p>Filtering can be done by setting one or more of the following items:</p>
* <ul>
* <li>optional secondary index filter (record level),</li>
* <li>optional document filter expressions (record level),</li>
* <li>optional bin names (bin level),</li>
* <li>optional json paths (inner objects less than a bin if necessary).</li>
* </ul>
*
* @param queryStatement object for building query definition, storing required bin names and json paths
* @param documentFilters filters (can include one secondary index filter and/or one or more filter expressions;
* if there are multiple filter expressions given they are concatenated using logical AND)
* @return stream of {@link KeyResult} objects
* @throws DocumentApiException if query fails
*/
Stream<KeyResult> query(DocumentQueryStatement queryStatement, DocumentFilter... documentFilters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import com.aerospike.client.Key;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.documentapi.jsonpath.JsonPathObject;
import com.fasterxml.jackson.databind.JsonNode;

Expand Down Expand Up @@ -36,6 +33,4 @@ void append(WritePolicy writePolicy, Key key, Collection<String> binNames, Strin
void delete(WritePolicy writePolicy, Key key, Collection<String> binNames, JsonPathObject jsonPathObject);

boolean batchPerform(BatchPolicy batchPolicy, List<BatchRecord> batchRecords);

RecordSet query(QueryPolicy policy, Statement statement);
}
46 changes: 0 additions & 46 deletions src/main/java/com/aerospike/documentapi/data/DocFilterExp.java

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit c2fe9ec

Please sign in to comment.