diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java index ec90384c..117c18ea 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java @@ -6,7 +6,6 @@ import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.zip.GZIPInputStream; import com.google.common.util.concurrent.UncheckedExecutionException; import org.slf4j.Logger; @@ -24,6 +23,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; @Component @@ -39,33 +39,12 @@ public class AbstractProcessor { private final String genomicDataDirectory; - - private List infoStoreColumns; - - private Map infoStores; - private LoadingCache> store; private final PhenotypeMetaStore phenotypeMetaStore; private final GenomicProcessor genomicProcessor; - private final LoadingCache> infoStoreValuesCache = CacheBuilder.newBuilder().build(new CacheLoader<>() { - @Override - public List load(String conceptPath) { - FileBackedByteIndexedInfoStore store = getInfoStore(conceptPath); - if (store == null) { - throw new IllegalArgumentException("Concept path: " + conceptPath + " not found"); - } else if (store.isContinuous) { - throw new IllegalArgumentException("Concept path: " + conceptPath + " is not categorical"); - } - return store.getAllValues().keys() - .stream() - .sorted(String::compareToIgnoreCase) - .collect(Collectors.toList()); - } - }); - @Autowired public AbstractProcessor( PhenotypeMetaStore phenotypeMetaStore, @@ -105,29 +84,6 @@ public AbstractProcessor( } } - infoStores = new HashMap<>(); - File genomicDataDirectory = new File(this.genomicDataDirectory); - if(genomicDataDirectory.exists() && genomicDataDirectory.isDirectory()) { - Arrays.stream(genomicDataDirectory.list((file, filename)->{return filename.endsWith("infoStore.javabin");})) - .forEach((String filename)->{ - try ( - FileInputStream fis = new FileInputStream(this.genomicDataDirectory + filename); - GZIPInputStream gis = new GZIPInputStream(fis); - ObjectInputStream ois = new ObjectInputStream(gis) - ){ - log.info("loading " + filename); - FileBackedByteIndexedInfoStore infoStore = (FileBackedByteIndexedInfoStore) ois.readObject(); - infoStore.updateStorageDirectory(genomicDataDirectory); - infoStores.put(filename.replace("_infoStore.javabin", ""), infoStore); - ois.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - }); - } - infoStoreColumns = new ArrayList<>(infoStores.keySet()); } public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, LoadingCache> store, @@ -135,8 +91,6 @@ public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, LoadingCache getInfoStoreColumns() { - return infoStoreColumns; + return genomicProcessor.getInfoStoreColumns(); } @@ -180,8 +134,8 @@ protected Set idSetsForEachFilter(Query query) { DistributableQuery distributableQuery = getDistributableQuery(query); if (distributableQuery.hasFilters()) { - BigInteger patientMaskForVariantInfoFilters = genomicProcessor.getPatientMask(distributableQuery); - return genomicProcessor.patientMaskToPatientIdSet(patientMaskForVariantInfoFilters); + Mono patientMaskForVariantInfoFilters = genomicProcessor.getPatientMask(distributableQuery); + return patientMaskForVariantInfoFilters.map(genomicProcessor::patientMaskToPatientIdSet).block(); } return distributableQuery.getPatientIds(); @@ -210,7 +164,7 @@ private DistributableQuery getDistributableQuery(Query query) { } else { // if there are no patient filters, use all patients. // todo: we should not have to send these - phenotypicPatientSet = Arrays.stream(genomicProcessor.getPatientIds()) + phenotypicPatientSet = genomicProcessor.getPatientIds().stream() .map(String::trim) .map(Integer::parseInt) .collect(Collectors.toSet()); @@ -306,16 +260,18 @@ private List> getIdSetsForCategoryFilters(Query query, Distributabl protected Collection getVariantList(Query query) throws IOException { DistributableQuery distributableQuery = getDistributableQuery(query); - return genomicProcessor.getVariantList(distributableQuery); + return genomicProcessor.getVariantList(distributableQuery).block(); } public FileBackedByteIndexedInfoStore getInfoStore(String column) { - return infoStores.get(column); + // todo: figure out how we want to expose inner workings of the FBBIIS + throw new RuntimeException("Not yet implemented"); + //return infoStores.get(column); } public List searchInfoConceptValues(String conceptPath, String query) { try { - return infoStoreValuesCache.getUnchecked(conceptPath).stream() + return genomicProcessor.getInfoStoreValues(conceptPath).stream() .filter(variableValue -> variableValue.toUpperCase().contains(query.toUpperCase())) .collect(Collectors.toList()); } catch (UncheckedExecutionException e) { @@ -406,7 +362,7 @@ public TreeMap getDictionary() { return phenotypeMetaStore.getMetaStore(); } - public String[] getPatientIds() { + public List getPatientIds() { return genomicProcessor.getPatientIds(); } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessor.java index 41f3faac..6c51cb87 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessor.java @@ -2,22 +2,28 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder; +import reactor.core.publisher.Mono; import java.math.BigInteger; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.Set; public interface GenomicProcessor { - BigInteger getPatientMask(DistributableQuery distributableQuery); + Mono getPatientMask(DistributableQuery distributableQuery); Set patientMaskToPatientIdSet(BigInteger patientMask); BigInteger createMaskForPatientSet(Set patientSubset); - Collection getVariantList(DistributableQuery distributableQuery); + Mono> getVariantList(DistributableQuery distributableQuery); - String[] getPatientIds(); + List getPatientIds(); Optional getMasks(String path, VariantBucketHolder variantMasksVariantBucketHolder); + + List getInfoStoreColumns(); + + List getInfoStoreValues(String conceptPath); } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java new file mode 100644 index 00000000..96c5c595 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java @@ -0,0 +1,41 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing; + +import edu.harvard.hms.dbmi.avillach.hpds.processing.genomic.GenomicProcessorRestClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.PropertySource; + +import java.util.ArrayList; +import java.util.List; + +@SpringBootApplication +@PropertySource("classpath:application.properties") +public class GenomicProcessorConfig { + + @Value("${HPDS_GENOMIC_DATA_DIRECTORY:/opt/local/hpds/all/}") + private String hpdsGenomicDataDirectory; + + + @Bean(name = "localGenomicProcessor") + @ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "local") + public GenomicProcessor localGenomicProcessor() { + // todo: make sure this is set as default + //System.getProperty("HPDS_GENOMIC_DATA_DIRECTORY", "/opt/local/hpds/all/"); + return new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory); + } + + @Bean(name = "remoteGenomicProcessor") + @ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "remote") + public GenomicProcessor remoteGenomicProcessor() { + // Just for testing, for now, move to a configuration file or something + List nodes = new ArrayList<>(); + String[] hosts = new String[] {"http://localhost:8090/", "http://localhost:8091/"}; + nodes = List.of( + new GenomicProcessorRestClient(hosts[0]), + new GenomicProcessorRestClient(hosts[1]) + ); + return new GenomicProcessorParentImpl(nodes); + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java index 3e197f39..6e22c3b1 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java @@ -10,6 +10,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; import java.io.*; import java.math.BigInteger; @@ -73,7 +74,7 @@ public GenomicProcessorNodeImpl(String genomicDataDirectory) { } @Override - public BigInteger getPatientMask(DistributableQuery distributableQuery) { + public Mono getPatientMask(DistributableQuery distributableQuery) { // log.debug("filterdIDSets START size: " + filteredIdSets.size()); /* VARIANT INFO FILTER HANDLING IS MESSY */ if(distributableQuery.hasFilters()) { @@ -125,9 +126,9 @@ public BigInteger getPatientMask(DistributableQuery distributableQuery) { } } - return patientMask; + return Mono.just(patientMask); } - return createMaskForPatientSet(distributableQuery.getPatientIds()); + return Mono.fromCallable(() -> createMaskForPatientSet(distributableQuery.getPatientIds())); /* END OF VARIANT INFO FILTER HANDLING */ } @@ -224,7 +225,7 @@ private VariantIndex addVariantsForInfoFilter(VariantIndex unionOfInfoFilters, Q } @Override - public Collection getVariantList(DistributableQuery query) { + public Mono> getVariantList(DistributableQuery query) { boolean queryContainsVariantInfoFilters = query.getVariantInfoFilters().stream().anyMatch(variantInfoFilter -> !variantInfoFilter.categoryVariantInfoFilters.isEmpty() || !variantInfoFilter.numericVariantInfoFilters.isEmpty() ); @@ -253,7 +254,7 @@ public Collection getVariantList(DistributableQuery query) { // If we have all patients then no variants would be filtered, so no need to do further processing if(patientSubset.size()==variantService.getPatientIds().length) { log.info("query selects all patient IDs, returning...."); - return unionOfInfoFilters.mapToVariantSpec(variantService.getVariantIndex()); + return Mono.just(unionOfInfoFilters.mapToVariantSpec(variantService.getVariantIndex())); } BigInteger patientMasks = createMaskForPatientSet(patientSubset); @@ -276,12 +277,12 @@ public Collection getVariantList(DistributableQuery query) { } }); }); - return variantsWithPatients; + return Mono.just(variantsWithPatients); }else { - return unionOfInfoFiltersVariantSpecs; + return Mono.just(unionOfInfoFiltersVariantSpecs); } } - return new ArrayList<>(); + return Mono.just(new ArrayList<>()); } private BigInteger getIdSetForVariantSpecCategoryFilter(String[] zygosities, String key, VariantBucketHolder bucketCache) { @@ -348,8 +349,8 @@ private BigInteger calculateIndiscriminateBitmask(VariantMasks masks) { } @Override - public String[] getPatientIds() { - return variantService.getPatientIds(); + public List getPatientIds() { + return List.of(variantService.getPatientIds()); } @Override @@ -357,4 +358,16 @@ public Optional getMasks(String path, VariantBucketHolder getInfoStoreColumns() { + return infoStoreColumns; + } + + @Override + public List getInfoStoreValues(String conceptPath) { + return infoStores.get(conceptPath).getAllValues().keys() + .stream() + .sorted(String::compareToIgnoreCase) + .collect(Collectors.toList()); + } } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java index ce1d6441..0c1d820b 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java @@ -1,72 +1,111 @@ package edu.harvard.hms.dbmi.avillach.hpds.processing; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.FileBackedByteIndexedInfoStore; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder; +import edu.harvard.hms.dbmi.avillach.hpds.processing.genomic.GenomicProcessorRestClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.math.BigInteger; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; -@Component public class GenomicProcessorParentImpl implements GenomicProcessor { private static Logger log = LoggerFactory.getLogger(GenomicProcessorParentImpl.class); private final List nodes; + private final LoadingCache> infoStoreValuesCache = CacheBuilder.newBuilder().build(new CacheLoader<>() { + @Override + public List load(String conceptPath) { + return nodes.parallelStream() + .map(node -> node.getInfoStoreValues(conceptPath)) + .flatMap(List::stream) + .sorted(String::compareToIgnoreCase) + .collect(Collectors.toList()); + } + }); + + private List patientIds; - @Autowired - public GenomicProcessorParentImpl() { - // Just for testing, for now, move to a configuration file or something - String[] paths = new String[] {"/Users/ryan/dev/pic-sure-hpds-test/data/orchestration/1040.20/all/", "/Users/ryan/dev/pic-sure-hpds-test/data/orchestration/1040.22/all/"}; - nodes = List.of( - new GenomicProcessorNodeImpl(paths[0]), - new GenomicProcessorNodeImpl(paths[1]) - ); + public GenomicProcessorParentImpl(List nodes) { + this.nodes = nodes; } @Override - public BigInteger getPatientMask(DistributableQuery distributableQuery) { - BigInteger patientMask = null; + public Mono getPatientMask(DistributableQuery distributableQuery) { + Mono result = Flux.just(nodes.toArray(GenomicProcessor[]::new)) + .flatMap(node -> node.getPatientMask(distributableQuery)) + .reduce(BigInteger::or); + return result; + /*BigInteger patientMask = null; + System.out.println("Calling all nodes loop"); for (GenomicProcessor node : nodes) { if (patientMask == null) { - patientMask = node.getPatientMask(distributableQuery); + System.out.println("Calling first node"); + patientMask = node.getPatientMask(distributableQuery).block(); } else { - patientMask = patientMask.or(node.getPatientMask(distributableQuery)); + System.out.println("Calling second node"); + patientMask = patientMask.or(node.getPatientMask(distributableQuery).block()); } - log.info("Patients: " + node.patientMaskToPatientIdSet(patientMask)); } - return patientMask; + System.out.println("Finished calling all nodes loop"); + return Mono.just(patientMask);*/ } @Override public Set patientMaskToPatientIdSet(BigInteger patientMask) { - return null; + Set ids = new HashSet<>(); + String bitmaskString = patientMask.toString(2); + for(int x = 2;x < bitmaskString.length()-2;x++) { + if('1'==bitmaskString.charAt(x)) { + String patientId = patientIds.get(x-2).trim(); + ids.add(Integer.parseInt(patientId)); + } + } + return ids; } @Override public BigInteger createMaskForPatientSet(Set patientSubset) { - return null; + throw new RuntimeException("Not implemented"); } @Override - public Collection getVariantList(DistributableQuery distributableQuery) { - return nodes.parallelStream().flatMap(node -> + public Mono> getVariantList(DistributableQuery distributableQuery) { + Mono> result = Flux.just(nodes.toArray(GenomicProcessor[]::new)) + .flatMap(node -> node.getVariantList(distributableQuery)) + .reduce((variantList1, variantList2) -> { + List mergedResult = new ArrayList<>(variantList1.size() + variantList2.size()); + mergedResult.addAll(variantList1); + mergedResult.addAll(variantList2); + return mergedResult; + }); + return result; + /*return nodes.parallelStream().flatMap(node -> node.getVariantList(distributableQuery).stream()).collect(Collectors.toList() - ); + );*/ } @Override - public String[] getPatientIds() { - // todo: verify all nodes have the same potients - return nodes.get(0).getPatientIds(); + public List getPatientIds() { + if (patientIds != null) { + return patientIds; + } else { + // todo: verify all nodes have the same potients + List result = nodes.get(0).getPatientIds(); + patientIds = result; + return result; + } } @Override @@ -79,4 +118,18 @@ public Optional getMasks(String path, VariantBucketHolder getInfoStoreColumns() { + // todo: cache this + return nodes.parallelStream() + .map(GenomicProcessor::getInfoStoreColumns) + .flatMap(List::stream) + .collect(Collectors.toList()); + } + + @Override + public List getInfoStoreValues(String conceptPath) { + return infoStoreValuesCache.getUnchecked(conceptPath); + } } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java index 8f0df4e1..18a30531 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java @@ -107,14 +107,14 @@ private void processColumn(List paths, TreeSet ids, ResultStore if(VariantUtils.pathIsVariantSpec(path)) { // todo: confirm this entire if block is even used. I don't think it is Optional masks = abstractProcessor.getMasks(path, new VariantBucketHolder<>()); - String[] patientIds = abstractProcessor.getPatientIds(); + List patientIds = abstractProcessor.getPatientIds(); int idPointer = 0; ByteBuffer doubleBuffer = ByteBuffer.allocate(Double.BYTES); int idInSubsetPointer = 0; for(int id : ids) { - while(idPointer < patientIds.length) { - int key = Integer.parseInt(patientIds[idPointer]); + while(idPointer < patientIds.size()) { + int key = Integer.parseInt(patientIds.get(idPointer)); if(key < id) { idPointer++; } else if(key == id){ diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClient.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClient.java index d63facda..e18dde92 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClient.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClient.java @@ -2,9 +2,9 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder; -import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; import edu.harvard.hms.dbmi.avillach.hpds.processing.DistributableQuery; import edu.harvard.hms.dbmi.avillach.hpds.processing.GenomicProcessor; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient; @@ -12,6 +12,7 @@ import java.math.BigInteger; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.Set; @@ -27,14 +28,14 @@ public GenomicProcessorRestClient(String serviceUrl) { } @Override - public BigInteger getPatientMask(DistributableQuery distributableQuery) { - return webClient.post() + public Mono getPatientMask(DistributableQuery distributableQuery) { + Mono result = webClient.post() .uri("/patients") .contentType(MediaType.APPLICATION_JSON) .body(Mono.just(distributableQuery), DistributableQuery.class) .retrieve() - .bodyToMono(BigInteger.class) - .block(); + .bodyToMono(BigInteger.class); + return result; } @Override @@ -47,25 +48,41 @@ public BigInteger createMaskForPatientSet(Set patientSubset) { return null; } + private static final ParameterizedTypeReference> VARIANT_LIST_TYPE_REFERENCE = new ParameterizedTypeReference<>(){}; @SuppressWarnings("unchecked") @Override - public Collection getVariantList(DistributableQuery distributableQuery) { - return webClient.post() + public Mono> getVariantList(DistributableQuery distributableQuery) { + Mono> result = webClient.post() .uri("/variants") .contentType(MediaType.APPLICATION_JSON) .body(Mono.just(distributableQuery), DistributableQuery.class) .retrieve() - .bodyToMono(Collection.class) - .block(); + .bodyToMono(VARIANT_LIST_TYPE_REFERENCE); + return result; } @Override - public String[] getPatientIds() { - return new String[0]; + public List getPatientIds() { + List result = webClient.get() + .uri("/patients/ids") + .retrieve() + .bodyToMono(List.class) + .block(); + return result; } @Override public Optional getMasks(String path, VariantBucketHolder variantMasksVariantBucketHolder) { throw new RuntimeException("Not Implemented"); } + + @Override + public List getInfoStoreColumns() { + throw new RuntimeException("Not yet implemented"); + } + + @Override + public List getInfoStoreValues(String conceptPath) { + throw new RuntimeException("Not Yet implemented"); + } } diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClientTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClientTest.java index 7a34454f..2a010e7b 100644 --- a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClientTest.java +++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClientTest.java @@ -33,7 +33,7 @@ public void simpleTest() { distributableQuery.setVariantInfoFilters(variantInfoFilters); distributableQuery.setPatientIds(Set.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - BigInteger patientMaskForVariantInfoFilters = genomicProcessorRestClient.getPatientMask(distributableQuery); + BigInteger patientMaskForVariantInfoFilters = genomicProcessorRestClient.getPatientMask(distributableQuery).block(); System.out.println(patientMaskForVariantInfoFilters); } } \ No newline at end of file diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties index d2dc8be4..f5a025bb 100644 --- a/service/src/main/resources/application.properties +++ b/service/src/main/resources/application.properties @@ -1,3 +1,6 @@ SMALL_JOB_LIMIT = 100 SMALL_TASK_THREADS = 1 -LARGE_TASK_THREADS = 1 \ No newline at end of file +LARGE_TASK_THREADS = 1 + +hpds.genomicProcessor.impl=local +HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/ \ No newline at end of file diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/util/AbstractProcessorIntegrationTest.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/util/AbstractProcessorIntegrationTest.java new file mode 100644 index 00000000..2ddd4141 --- /dev/null +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/util/AbstractProcessorIntegrationTest.java @@ -0,0 +1,193 @@ +package edu.harvard.hms.dbmi.avillach.hpds.service.util; + +import com.google.common.collect.Sets; +import edu.harvard.hms.dbmi.avillach.hpds.data.query.Filter; +import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; + +import edu.harvard.hms.dbmi.avillach.hpds.processing.AbstractProcessor; +import edu.harvard.hms.dbmi.avillach.hpds.processing.CountProcessor; +import edu.harvard.hms.dbmi.avillach.hpds.service.HpdsApplication; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +@ExtendWith(SpringExtension.class) +@EnableAutoConfiguration +@SpringBootTest(classes = HpdsApplication.class) +public class AbstractProcessorIntegrationTest { + + @Autowired + private AbstractProcessor abstractProcessor; + + @Test + public void getPatientSubsetForQuery_validGeneWithVariantQuery() { + Query query = new Query(); + List variantInfoFilters = new ArrayList<>(); + Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter(); + variantInfoFilter.categoryVariantInfoFilters = Map.of("Gene_with_variant", new String[]{"FRG1FP"}); + variantInfoFilters.add(variantInfoFilter); + query.setVariantInfoFilters(variantInfoFilters); + + Set idList = abstractProcessor.getPatientSubsetForQuery(query); + assertEquals(5, idList.size()); + } + + @Test + public void getPatientSubsetForQuery_validGeneWithMultipleVariantQuery() { + Query query = new Query(); + List variantInfoFilters = new ArrayList<>(); + Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter(); + variantInfoFilter.categoryVariantInfoFilters = Map.of( + "Gene_with_variant", new String[]{"FRG1FP"} + ); + Query.VariantInfoFilter variantInfoFilter2 = new Query.VariantInfoFilter(); + variantInfoFilter2.categoryVariantInfoFilters = Map.of( + "Gene_with_variant", new String[]{"ACTG1P3"} + ); + variantInfoFilters.add(variantInfoFilter2); + query.setVariantInfoFilters(variantInfoFilters); + + Set idList = abstractProcessor.getPatientSubsetForQuery(query); + assertEquals(105, idList.size()); + } + + @Test + public void getPatientSubsetForQuery_validGeneWithVariantQueryAndNumericQuery() { + Query query = new Query(); + List variantInfoFilters = new ArrayList<>(); + Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter(); + variantInfoFilter.categoryVariantInfoFilters = Map.of("Gene_with_variant", new String[]{"FRG1FP"}); + variantInfoFilters.add(variantInfoFilter); + query.setVariantInfoFilters(variantInfoFilters); + query.setNumericFilters(Map.of("\\laboratory\\allergen test\\", new Filter.DoubleFilter(0.0, 30.0))); + + Set idList = abstractProcessor.getPatientSubsetForQuery(query); + assertEquals(4, idList.size()); + } + + @Test + public void getPatientSubsetForQuery_validRequiredVariant() { + Query query = new Query(); + query.setRequiredFields(List.of("chr22,10942689,C,T")); + query.setNumericFilters(Map.of("\\laboratory\\allergen test\\", new Filter.DoubleFilter(0.0, 30.0))); + + Set idList = abstractProcessor.getPatientSubsetForQuery(query); + assertEquals(1, idList.size()); + + + query = new Query(); + query.setRequiredFields(List.of("chr20,1156012,G,A")); + query.setNumericFilters(Map.of("\\laboratory\\allergen test\\", new Filter.DoubleFilter(0.0, 30.0))); + + idList = abstractProcessor.getPatientSubsetForQuery(query); + assertEquals(43, idList.size()); + } + + @Test + public void getPatientSubsetForQuery_verifySeparateQueriesAreEquivalent() { + Query query = new Query(); + query.setNumericFilters(Map.of("\\laboratory\\allergen test\\", new Filter.DoubleFilter(0.0, 30.0))); + + Set numericIdList = abstractProcessor.getPatientSubsetForQuery(query); + + query = new Query(); + query.setRequiredFields(List.of("chr20,1156012,G,A")); + + Set variantIdList = abstractProcessor.getPatientSubsetForQuery(query); + + + query = new Query(); + query.setNumericFilters(Map.of("\\laboratory\\allergen test\\", new Filter.DoubleFilter(0.0, 30.0))); + query.setRequiredFields(List.of("chr20,1156012,G,A")); + + Set bothIdList = abstractProcessor.getPatientSubsetForQuery(query); + + assertEquals(Sets.intersection(numericIdList, variantIdList), bothIdList); + } + + @Test + public void getPatientSubsetForQuery_validMultipleRequiredVariant() { + Query query = new Query(); + query.setRequiredFields(List.of("chr20,1156012,G,A")); + query.setRequiredFields(List.of("chr22,10942689,C,T")); + query.setNumericFilters(Map.of("\\laboratory\\allergen test\\", new Filter.DoubleFilter(0.0, 30.0))); + + Set idList = abstractProcessor.getPatientSubsetForQuery(query); + assertEquals(1, idList.size()); + } + + + @Test + public void getVariantList_validGeneWithVariantQuery() { + Query query = new Query(); + List variantInfoFilters = new ArrayList<>(); + Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter(); + variantInfoFilter.categoryVariantInfoFilters = Map.of("Gene_with_variant", new String[]{"FRG1FP"}); + variantInfoFilters.add(variantInfoFilter); + query.setVariantInfoFilters(variantInfoFilters); + + Integer variantCount = (Integer) new CountProcessor(abstractProcessor).runVariantCount(query).get("count"); + assertEquals(6, variantCount.intValue()); + } + + @Test + public void getVariantList_validGeneWithMultipleVariantQuery() { + Query query = new Query(); + List variantInfoFilters = new ArrayList<>(); + Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter(); + variantInfoFilter.categoryVariantInfoFilters = Map.of( + "Gene_with_variant", new String[]{"FRG1FP"} + ); + Query.VariantInfoFilter variantInfoFilter2 = new Query.VariantInfoFilter(); + variantInfoFilter2.categoryVariantInfoFilters = Map.of( + "Gene_with_variant", new String[]{"ACTG1P3"} + ); + variantInfoFilters.add(variantInfoFilter2); + query.setVariantInfoFilters(variantInfoFilters); + + Integer variantCount = (Integer) new CountProcessor(abstractProcessor).runVariantCount(query).get("count"); + assertEquals(49, variantCount.intValue()); + } + + @Test + public void getVariantList_validGeneWithVariantQueryAndNumericQuery() { + Query query = new Query(); + List variantInfoFilters = new ArrayList<>(); + Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter(); + variantInfoFilter.categoryVariantInfoFilters = Map.of("Gene_with_variant", new String[]{"FRG1FP"}); + variantInfoFilters.add(variantInfoFilter); + query.setVariantInfoFilters(variantInfoFilters); + query.setNumericFilters(Map.of("\\laboratory\\allergen test\\", new Filter.DoubleFilter(0.0, 30.0))); + + + Integer variantCount = (Integer) new CountProcessor(abstractProcessor).runVariantCount(query).get("count"); + assertEquals(5, variantCount.intValue()); + } + + @Test + @Disabled("This functionality not working") + public void getVariantList_validContinuousGenomicFilter() { + Query query = new Query(); + List variantInfoFilters = new ArrayList<>(); + Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter(); + variantInfoFilter.categoryVariantInfoFilters = Map.of(); + variantInfoFilter.numericVariantInfoFilters = Map.of("Variant_frequency_in_gnomAD", new Filter.FloatFilter(0.0f, 10.0f)); + variantInfoFilters.add(variantInfoFilter); + query.setVariantInfoFilters(variantInfoFilters); + + Set idList = abstractProcessor.getPatientSubsetForQuery(query); + assertEquals(5, idList.size()); + } + + // todo: test variant filters that use the phenotipic query, and edge cases +}