Skip to content

Commit

Permalink
ALS-4978: Incomplete commit -- begin to separate phenotypic and genom…
Browse files Browse the repository at this point in the history
…ic processing
  • Loading branch information
ramari16 committed Sep 5, 2023
1 parent 0b6d03e commit 3c88970
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public Set<Integer> getKeysForValue(V value) {

}

public TreeSet<Integer> getKeysForRange(V min, V max) {
public Set<Integer> getKeysForRange(V min, V max) {
KeyAndValue<V>[] entries = getEntriesForValueRange(min, max);
TreeSet<Integer> keys = new TreeSet<>();
Set<Integer> keys = new HashSet<>();
for(KeyAndValue<V> entry : entries) {
keys.add(entry.key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;

import com.google.common.util.concurrent.UncheckedExecutionException;
Expand Down Expand Up @@ -138,7 +139,6 @@ public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, VariantService v
infoStoreColumns = new ArrayList<>(infoStores.keySet());

variantIndexCache = new VariantIndexCache(variantService.getVariantIndex(), infoStores);
warmCaches();
}

public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, LoadingCache<String, PhenoCube<?>> store,
Expand All @@ -164,12 +164,6 @@ public List<String> getInfoStoreColumns() {
return infoStoreColumns;
}

private void warmCaches() {
//infoCache.refresh("Variant_frequency_as_text_____Rare");
//infoCache.refresh("Variant_frequency_as_text_____Common");
//infoCache.refresh("Variant_frequency_as_text_____Novel");
}


/**
* Merges a list of sets of patient ids by intersection. If we implemented OR semantics
Expand All @@ -181,6 +175,10 @@ private void warmCaches() {
protected Set<Integer> applyBooleanLogic(List<Set<Integer>> filteredIdSets) {
Set<Integer>[] ids = new Set[] {filteredIdSets.get(0)};
filteredIdSets.forEach((keySet)->{
// I believe this is not ideal for large numbers of sets. Sets.intersection creates a view of the 2 sets, so
// doing this repeatedly would create views on views on views of the backing sets. retainAll() returns a new
// set with only the common elements, and if we sort by set size and start with the smallest I believe that
// will be more efficient
ids[0] = Sets.intersection(ids[0], keySet);
});
return ids[0];
Expand All @@ -195,24 +193,31 @@ protected Set<Integer> applyBooleanLogic(List<Set<Integer>> filteredIdSets) {
* @return
*/
protected List<Set<Integer>> idSetsForEachFilter(Query query) {
ArrayList<Set<Integer>> filteredIdSets = new ArrayList<Set<Integer>>();
DistributableQuery distributableQuery = new DistributableQuery();

try {
addIdSetsForAnyRecordOf(query, filteredIdSets);
addIdSetsForRequiredFields(query, filteredIdSets);
addIdSetsForNumericFilters(query, filteredIdSets);
addIdSetsForCategoryFilters(query, filteredIdSets);
addIdSetsForAnyRecordOf(query, distributableQuery);
addIdSetsForRequiredFields(query, distributableQuery);
addIdSetsForNumericFilters(query, distributableQuery);
addIdSetsForCategoryFilters(query, distributableQuery);
} catch (InvalidCacheLoadException e) {
log.warn("Invalid query supplied: " + e.getLocalizedMessage());
filteredIdSets.add(new HashSet<Integer>()); // if an invalid path is supplied, no patients should match.
distributableQuery.addAndClausePatients(new HashSet<>()); // if an invalid path is supplied, no patients should match.
}

Set<Integer> phenotypicPatientSet;
//AND logic to make sure all patients match each filter
if(filteredIdSets.size()>1) {
filteredIdSets = new ArrayList<Set<Integer>>(List.of(applyBooleanLogic(filteredIdSets)));
if(distributableQuery.getPhenotypicQueryPatientSets().size()>0) {
phenotypicPatientSet = applyBooleanLogic(distributableQuery.getPhenotypicQueryPatientSets());
} else {
phenotypicPatientSet = Arrays.stream(variantService.getPatientIds())
.map(String::trim)
.map(Integer::parseInt)
.collect(Collectors.toSet());
}
// todo: use this patient id set going forward

return addIdSetsForVariantInfoFilters(query, filteredIdSets);
return addIdSetsForVariantInfoFilters(query, distributableQuery);
}

/**
Expand Down Expand Up @@ -245,64 +250,61 @@ public TreeSet<Integer> getPatientSubsetForQuery(Query query) {
return idList;
}

private void addIdSetsForRequiredFields(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private void addIdSetsForRequiredFields(Query query, DistributableQuery distributableQuery) {
if(!query.getRequiredFields().isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
filteredIdSets.addAll(query.getRequiredFields().parallelStream().map(path->{
if(VariantUtils.pathIsVariantSpec(path)) {
TreeSet<Integer> patientsInScope = new TreeSet<>();
addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1","1/1"}, path, patientsInScope, bucketCache);
return patientsInScope;
query.getRequiredFields().forEach(path -> {
if (VariantUtils.pathIsVariantSpec(path)) {
// todo: implement this logic in the genomic nodes
//VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
//TreeSet<Integer> patientsInScope = new TreeSet<>();
//addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1", "1/1"}, path, patientsInScope, bucketCache);
//return patientsInScope;
distributableQuery.addRequiredVariantField(path);
} else {
return new TreeSet<Integer>(getCube(path).keyBasedIndex());
distributableQuery.addAndClausePatients(new HashSet<Integer>(getCube(path).keyBasedIndex()));
}
}).collect(Collectors.toSet()));
});
}
}

private void addIdSetsForAnyRecordOf(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private void addIdSetsForAnyRecordOf(Query query, DistributableQuery distributableQuery) {
if(!query.getAnyRecordOf().isEmpty()) {
Set<Integer> patientsInScope = new ConcurrentSkipListSet<Integer>();
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<VariantMasks>();
query.getAnyRecordOf().parallelStream().forEach(path->{
if(patientsInScope.size()<Math.max(
phenotypeMetaStore.getPatientIds().size(),
variantService.getPatientIds().length)) {
if(VariantUtils.pathIsVariantSpec(path)) {
addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1","1/1"}, path, patientsInScope, bucketCache);
} else {
patientsInScope.addAll(getCube(path).keyBasedIndex());
}
// This is an OR aggregation of anyRecordOf filters
Set<Integer> anyRecordOfPatientSet = query.getAnyRecordOf().parallelStream().flatMap(path -> {
if (VariantUtils.pathIsVariantSpec(path)) {
throw new IllegalArgumentException("Variant paths not allowed for anyRecordOf queries");
}
});
filteredIdSets.add(patientsInScope);
return (Stream<Integer>) getCube(path).keyBasedIndex().stream();
}).collect(Collectors.toSet());
distributableQuery.addAndClausePatients(anyRecordOfPatientSet);
}
}

private void addIdSetsForNumericFilters(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private void addIdSetsForNumericFilters(Query query, DistributableQuery distributableQuery) {
if(!query.getNumericFilters().isEmpty()) {
filteredIdSets.addAll((Set<TreeSet<Integer>>)(query.getNumericFilters().entrySet().parallelStream().map(entry->{
return (TreeSet<Integer>)(getCube(entry.getKey()).getKeysForRange(entry.getValue().getMin(), entry.getValue().getMax()));
}).collect(Collectors.toSet())));
query.getNumericFilters().forEach((key, value) -> {
Set<Integer> keysForRange = getCube(key).getKeysForRange(value.getMin(), value.getMax());
distributableQuery.addAndClausePatients(keysForRange);
});
}
}

private void addIdSetsForCategoryFilters(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private void addIdSetsForCategoryFilters(Query query, DistributableQuery distributableQuery) {
if(!query.getCategoryFilters().isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<VariantMasks>();
Set<Set<Integer>> idsThatMatchFilters = (Set<Set<Integer>>)query.getCategoryFilters().entrySet().parallelStream().map(entry->{
Set<Integer> ids = new TreeSet<Integer>();
if(VariantUtils.pathIsVariantSpec(entry.getKey())) {
addIdSetsForVariantSpecCategoryFilters(entry.getValue(), entry.getKey(), ids, bucketCache);
query.getCategoryFilters().forEach((key, categoryFilters) -> {
Set<Integer> ids = new TreeSet<>();
if (VariantUtils.pathIsVariantSpec(key)) {
// todo: implement this logic in the genomic nodes
//VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
//addIdSetsForVariantSpecCategoryFilters(categoryFilters, key, ids, bucketCache);
distributableQuery.addVariantSpecCategoryFilter(key, categoryFilters);
} else {
String[] categoryFilter = entry.getValue();
for(String category : categoryFilter) {
ids.addAll(getCube(entry.getKey()).getKeysForValue(category));
for (String category : categoryFilters) {
ids.addAll(getCube(key).getKeysForValue(category));
}
}
return ids;
}).collect(Collectors.toSet());
filteredIdSets.addAll(idsThatMatchFilters);
distributableQuery.addAndClausePatients(ids);
});
}
}

Expand Down Expand Up @@ -379,7 +381,7 @@ private BigInteger calculateIndiscriminateBitmask(VariantMasks masks) {
return indiscriminateVariantBitmask;
}

protected List<Set<Integer>> addIdSetsForVariantInfoFilters(Query query, List<Set<Integer>> filteredIdSets) {
protected List<Set<Integer>> addIdSetsForVariantInfoFilters(Query query, DistributableQuery distributableQuery) {
// log.debug("filterdIDSets START size: " + filteredIdSets.size());
/* VARIANT INFO FILTER HANDLING IS MESSY */
if(!query.getVariantInfoFilters().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

public class DistributableQuery {

private Query genomicQuery;
private List<Set<Integer>> phenotypicQueryPatientSets;

public DistributableQuery() {
genomicQuery = new Query();
phenotypicQueryPatientSets = new ArrayList<>();
}

public Query getGenomicQuery() {
return genomicQuery;
}

public List<Set<Integer>> getPhenotypicQueryPatientSets() {
return phenotypicQueryPatientSets;
}

public void addAndClausePatients(Set<Integer> patientSet) {
synchronized (patientSet) {
phenotypicQueryPatientSets.add(patientSet);
}
}

public void addRequiredVariantField(String path) {
synchronized (genomicQuery) {
genomicQuery.getRequiredFields().add(path);
}
}

public void addVariantSpecCategoryFilter(String key, String[] categoryFilters) {
synchronized (genomicQuery) {
genomicQuery.getCategoryFilters().put(key, categoryFilters);
}
}
}

0 comments on commit 3c88970

Please sign in to comment.