From 3c88970d07f7c283f3dc8e4bbd3ebce11e3752a5 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Tue, 5 Sep 2023 10:55:14 -0400 Subject: [PATCH] ALS-4978: Incomplete commit -- begin to separate phenotypic and genomic processing --- .../hpds/data/phenotype/PhenoCube.java | 4 +- .../hpds/processing/AbstractProcessor.java | 114 +++++++++--------- .../hpds/processing/DistributableQuery.java | 44 +++++++ 3 files changed, 104 insertions(+), 58 deletions(-) create mode 100644 processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/DistributableQuery.java diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/PhenoCube.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/PhenoCube.java index 97bdcab5..1380b05c 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/PhenoCube.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/PhenoCube.java @@ -67,9 +67,9 @@ public Set getKeysForValue(V value) { } - public TreeSet getKeysForRange(V min, V max) { + public Set getKeysForRange(V min, V max) { KeyAndValue[] entries = getEntriesForValueRange(min, max); - TreeSet keys = new TreeSet<>(); + Set keys = new HashSet<>(); for(KeyAndValue entry : entries) { keys.add(entry.key); } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java index e2f6c44a..8152ac30 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java @@ -6,6 +6,7 @@ import java.util.Map.Entry; import java.util.concurrent.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.zip.GZIPInputStream; import com.google.common.util.concurrent.UncheckedExecutionException; @@ -138,7 +139,6 @@ public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, VariantService v infoStoreColumns = new ArrayList<>(infoStores.keySet()); variantIndexCache = new VariantIndexCache(variantService.getVariantIndex(), infoStores); - warmCaches(); } public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, LoadingCache> store, @@ -164,12 +164,6 @@ public List getInfoStoreColumns() { return infoStoreColumns; } - private void warmCaches() { - //infoCache.refresh("Variant_frequency_as_text_____Rare"); - //infoCache.refresh("Variant_frequency_as_text_____Common"); - //infoCache.refresh("Variant_frequency_as_text_____Novel"); - } - /** * Merges a list of sets of patient ids by intersection. If we implemented OR semantics @@ -181,6 +175,10 @@ private void warmCaches() { protected Set applyBooleanLogic(List> filteredIdSets) { Set[] ids = new Set[] {filteredIdSets.get(0)}; filteredIdSets.forEach((keySet)->{ + // I believe this is not ideal for large numbers of sets. Sets.intersection creates a view of the 2 sets, so + // doing this repeatedly would create views on views on views of the backing sets. retainAll() returns a new + // set with only the common elements, and if we sort by set size and start with the smallest I believe that + // will be more efficient ids[0] = Sets.intersection(ids[0], keySet); }); return ids[0]; @@ -195,24 +193,31 @@ protected Set applyBooleanLogic(List> filteredIdSets) { * @return */ protected List> idSetsForEachFilter(Query query) { - ArrayList> filteredIdSets = new ArrayList>(); + DistributableQuery distributableQuery = new DistributableQuery(); try { - addIdSetsForAnyRecordOf(query, filteredIdSets); - addIdSetsForRequiredFields(query, filteredIdSets); - addIdSetsForNumericFilters(query, filteredIdSets); - addIdSetsForCategoryFilters(query, filteredIdSets); + addIdSetsForAnyRecordOf(query, distributableQuery); + addIdSetsForRequiredFields(query, distributableQuery); + addIdSetsForNumericFilters(query, distributableQuery); + addIdSetsForCategoryFilters(query, distributableQuery); } catch (InvalidCacheLoadException e) { log.warn("Invalid query supplied: " + e.getLocalizedMessage()); - filteredIdSets.add(new HashSet()); // if an invalid path is supplied, no patients should match. + distributableQuery.addAndClausePatients(new HashSet<>()); // if an invalid path is supplied, no patients should match. } + Set phenotypicPatientSet; //AND logic to make sure all patients match each filter - if(filteredIdSets.size()>1) { - filteredIdSets = new ArrayList>(List.of(applyBooleanLogic(filteredIdSets))); + if(distributableQuery.getPhenotypicQueryPatientSets().size()>0) { + phenotypicPatientSet = applyBooleanLogic(distributableQuery.getPhenotypicQueryPatientSets()); + } else { + phenotypicPatientSet = Arrays.stream(variantService.getPatientIds()) + .map(String::trim) + .map(Integer::parseInt) + .collect(Collectors.toSet()); } + // todo: use this patient id set going forward - return addIdSetsForVariantInfoFilters(query, filteredIdSets); + return addIdSetsForVariantInfoFilters(query, distributableQuery); } /** @@ -245,64 +250,61 @@ public TreeSet getPatientSubsetForQuery(Query query) { return idList; } - private void addIdSetsForRequiredFields(Query query, ArrayList> filteredIdSets) { + private void addIdSetsForRequiredFields(Query query, DistributableQuery distributableQuery) { if(!query.getRequiredFields().isEmpty()) { - VariantBucketHolder bucketCache = new VariantBucketHolder<>(); - filteredIdSets.addAll(query.getRequiredFields().parallelStream().map(path->{ - if(VariantUtils.pathIsVariantSpec(path)) { - TreeSet patientsInScope = new TreeSet<>(); - addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1","1/1"}, path, patientsInScope, bucketCache); - return patientsInScope; + query.getRequiredFields().forEach(path -> { + if (VariantUtils.pathIsVariantSpec(path)) { + // todo: implement this logic in the genomic nodes + //VariantBucketHolder bucketCache = new VariantBucketHolder<>(); + //TreeSet patientsInScope = new TreeSet<>(); + //addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1", "1/1"}, path, patientsInScope, bucketCache); + //return patientsInScope; + distributableQuery.addRequiredVariantField(path); } else { - return new TreeSet(getCube(path).keyBasedIndex()); + distributableQuery.addAndClausePatients(new HashSet(getCube(path).keyBasedIndex())); } - }).collect(Collectors.toSet())); + }); } } - private void addIdSetsForAnyRecordOf(Query query, ArrayList> filteredIdSets) { + private void addIdSetsForAnyRecordOf(Query query, DistributableQuery distributableQuery) { if(!query.getAnyRecordOf().isEmpty()) { - Set patientsInScope = new ConcurrentSkipListSet(); - VariantBucketHolder bucketCache = new VariantBucketHolder(); - query.getAnyRecordOf().parallelStream().forEach(path->{ - if(patientsInScope.size() anyRecordOfPatientSet = query.getAnyRecordOf().parallelStream().flatMap(path -> { + if (VariantUtils.pathIsVariantSpec(path)) { + throw new IllegalArgumentException("Variant paths not allowed for anyRecordOf queries"); } - }); - filteredIdSets.add(patientsInScope); + return (Stream) getCube(path).keyBasedIndex().stream(); + }).collect(Collectors.toSet()); + distributableQuery.addAndClausePatients(anyRecordOfPatientSet); } } - private void addIdSetsForNumericFilters(Query query, ArrayList> filteredIdSets) { + private void addIdSetsForNumericFilters(Query query, DistributableQuery distributableQuery) { if(!query.getNumericFilters().isEmpty()) { - filteredIdSets.addAll((Set>)(query.getNumericFilters().entrySet().parallelStream().map(entry->{ - return (TreeSet)(getCube(entry.getKey()).getKeysForRange(entry.getValue().getMin(), entry.getValue().getMax())); - }).collect(Collectors.toSet()))); + query.getNumericFilters().forEach((key, value) -> { + Set keysForRange = getCube(key).getKeysForRange(value.getMin(), value.getMax()); + distributableQuery.addAndClausePatients(keysForRange); + }); } } - private void addIdSetsForCategoryFilters(Query query, ArrayList> filteredIdSets) { + private void addIdSetsForCategoryFilters(Query query, DistributableQuery distributableQuery) { if(!query.getCategoryFilters().isEmpty()) { - VariantBucketHolder bucketCache = new VariantBucketHolder(); - Set> idsThatMatchFilters = (Set>)query.getCategoryFilters().entrySet().parallelStream().map(entry->{ - Set ids = new TreeSet(); - if(VariantUtils.pathIsVariantSpec(entry.getKey())) { - addIdSetsForVariantSpecCategoryFilters(entry.getValue(), entry.getKey(), ids, bucketCache); + query.getCategoryFilters().forEach((key, categoryFilters) -> { + Set ids = new TreeSet<>(); + if (VariantUtils.pathIsVariantSpec(key)) { + // todo: implement this logic in the genomic nodes + //VariantBucketHolder bucketCache = new VariantBucketHolder<>(); + //addIdSetsForVariantSpecCategoryFilters(categoryFilters, key, ids, bucketCache); + distributableQuery.addVariantSpecCategoryFilter(key, categoryFilters); } else { - String[] categoryFilter = entry.getValue(); - for(String category : categoryFilter) { - ids.addAll(getCube(entry.getKey()).getKeysForValue(category)); + for (String category : categoryFilters) { + ids.addAll(getCube(key).getKeysForValue(category)); } } - return ids; - }).collect(Collectors.toSet()); - filteredIdSets.addAll(idsThatMatchFilters); + distributableQuery.addAndClausePatients(ids); + }); } } @@ -379,7 +381,7 @@ private BigInteger calculateIndiscriminateBitmask(VariantMasks masks) { return indiscriminateVariantBitmask; } - protected List> addIdSetsForVariantInfoFilters(Query query, List> filteredIdSets) { + protected List> addIdSetsForVariantInfoFilters(Query query, DistributableQuery distributableQuery) { // log.debug("filterdIDSets START size: " + filteredIdSets.size()); /* VARIANT INFO FILTER HANDLING IS MESSY */ if(!query.getVariantInfoFilters().isEmpty()) { diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/DistributableQuery.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/DistributableQuery.java new file mode 100644 index 00000000..cddfcf97 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/DistributableQuery.java @@ -0,0 +1,44 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing; + +import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class DistributableQuery { + + private Query genomicQuery; + private List> phenotypicQueryPatientSets; + + public DistributableQuery() { + genomicQuery = new Query(); + phenotypicQueryPatientSets = new ArrayList<>(); + } + + public Query getGenomicQuery() { + return genomicQuery; + } + + public List> getPhenotypicQueryPatientSets() { + return phenotypicQueryPatientSets; + } + + public void addAndClausePatients(Set patientSet) { + synchronized (patientSet) { + phenotypicQueryPatientSets.add(patientSet); + } + } + + public void addRequiredVariantField(String path) { + synchronized (genomicQuery) { + genomicQuery.getRequiredFields().add(path); + } + } + + public void addVariantSpecCategoryFilter(String key, String[] categoryFilters) { + synchronized (genomicQuery) { + genomicQuery.getCategoryFilters().put(key, categoryFilters); + } + } +}