Skip to content

Commit

Permalink
ALS-4978: Changes to optionally support remote genomic processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Nov 16, 2023
1 parent dc95c51 commit 8d0c3a7
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;

import com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
Expand All @@ -24,6 +23,7 @@
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;


@Component
Expand All @@ -39,33 +39,12 @@ public class AbstractProcessor {
private final String genomicDataDirectory;



private List<String> infoStoreColumns;

private Map<String, FileBackedByteIndexedInfoStore> infoStores;

private LoadingCache<String, PhenoCube<?>> store;

private final PhenotypeMetaStore phenotypeMetaStore;

private final GenomicProcessor genomicProcessor;

private final LoadingCache<String, List<String>> infoStoreValuesCache = CacheBuilder.newBuilder().build(new CacheLoader<>() {
@Override
public List<String> load(String conceptPath) {
FileBackedByteIndexedInfoStore store = getInfoStore(conceptPath);
if (store == null) {
throw new IllegalArgumentException("Concept path: " + conceptPath + " not found");
} else if (store.isContinuous) {
throw new IllegalArgumentException("Concept path: " + conceptPath + " is not categorical");
}
return store.getAllValues().keys()
.stream()
.sorted(String::compareToIgnoreCase)
.collect(Collectors.toList());
}
});

@Autowired
public AbstractProcessor(
PhenotypeMetaStore phenotypeMetaStore,
Expand Down Expand Up @@ -105,38 +84,13 @@ public AbstractProcessor(
}

}
infoStores = new HashMap<>();
File genomicDataDirectory = new File(this.genomicDataDirectory);
if(genomicDataDirectory.exists() && genomicDataDirectory.isDirectory()) {
Arrays.stream(genomicDataDirectory.list((file, filename)->{return filename.endsWith("infoStore.javabin");}))
.forEach((String filename)->{
try (
FileInputStream fis = new FileInputStream(this.genomicDataDirectory + filename);
GZIPInputStream gis = new GZIPInputStream(fis);
ObjectInputStream ois = new ObjectInputStream(gis)
){
log.info("loading " + filename);
FileBackedByteIndexedInfoStore infoStore = (FileBackedByteIndexedInfoStore) ois.readObject();
infoStore.updateStorageDirectory(genomicDataDirectory);
infoStores.put(filename.replace("_infoStore.javabin", ""), infoStore);
ois.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
});
}
infoStoreColumns = new ArrayList<>(infoStores.keySet());
}

public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, LoadingCache<String, PhenoCube<?>> store,
Map<String, FileBackedByteIndexedInfoStore> infoStores, List<String> infoStoreColumns,
GenomicProcessor genomicProcessor) {
this.phenotypeMetaStore = phenotypeMetaStore;
this.store = store;
this.infoStores = infoStores;
this.infoStoreColumns = infoStoreColumns;
this.genomicProcessor = genomicProcessor;

CACHE_SIZE = Integer.parseInt(System.getProperty("CACHE_SIZE", "100"));
Expand All @@ -148,7 +102,7 @@ public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, LoadingCache<Str
}

public List<String> getInfoStoreColumns() {
return infoStoreColumns;
return genomicProcessor.getInfoStoreColumns();
}


Expand Down Expand Up @@ -180,8 +134,8 @@ protected Set<Integer> idSetsForEachFilter(Query query) {
DistributableQuery distributableQuery = getDistributableQuery(query);

if (distributableQuery.hasFilters()) {
BigInteger patientMaskForVariantInfoFilters = genomicProcessor.getPatientMask(distributableQuery);
return genomicProcessor.patientMaskToPatientIdSet(patientMaskForVariantInfoFilters);
Mono<BigInteger> patientMaskForVariantInfoFilters = genomicProcessor.getPatientMask(distributableQuery);
return patientMaskForVariantInfoFilters.map(genomicProcessor::patientMaskToPatientIdSet).block();
}

return distributableQuery.getPatientIds();
Expand Down Expand Up @@ -210,7 +164,7 @@ private DistributableQuery getDistributableQuery(Query query) {
} else {
// if there are no patient filters, use all patients.
// todo: we should not have to send these
phenotypicPatientSet = Arrays.stream(genomicProcessor.getPatientIds())
phenotypicPatientSet = genomicProcessor.getPatientIds().stream()
.map(String::trim)
.map(Integer::parseInt)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -306,16 +260,18 @@ private List<Set<Integer>> getIdSetsForCategoryFilters(Query query, Distributabl

protected Collection<String> getVariantList(Query query) throws IOException {
DistributableQuery distributableQuery = getDistributableQuery(query);
return genomicProcessor.getVariantList(distributableQuery);
return genomicProcessor.getVariantList(distributableQuery).block();
}

public FileBackedByteIndexedInfoStore getInfoStore(String column) {
return infoStores.get(column);
// todo: figure out how we want to expose inner workings of the FBBIIS
throw new RuntimeException("Not yet implemented");
//return infoStores.get(column);
}

public List<String> searchInfoConceptValues(String conceptPath, String query) {
try {
return infoStoreValuesCache.getUnchecked(conceptPath).stream()
return genomicProcessor.getInfoStoreValues(conceptPath).stream()
.filter(variableValue -> variableValue.toUpperCase().contains(query.toUpperCase()))
.collect(Collectors.toList());
} catch (UncheckedExecutionException e) {
Expand Down Expand Up @@ -406,7 +362,7 @@ public TreeMap<String, ColumnMeta> getDictionary() {
return phenotypeMetaStore.getMetaStore();
}

public String[] getPatientIds() {
public List<String> getPatientIds() {
return genomicProcessor.getPatientIds();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@

import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder;
import reactor.core.publisher.Mono;

import java.math.BigInteger;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public interface GenomicProcessor {
BigInteger getPatientMask(DistributableQuery distributableQuery);
Mono<BigInteger> getPatientMask(DistributableQuery distributableQuery);

Set<Integer> patientMaskToPatientIdSet(BigInteger patientMask);

BigInteger createMaskForPatientSet(Set<Integer> patientSubset);

Collection<String> getVariantList(DistributableQuery distributableQuery);
Mono<Collection<String>> getVariantList(DistributableQuery distributableQuery);

String[] getPatientIds();
List<String> getPatientIds();

Optional<VariantMasks> getMasks(String path, VariantBucketHolder<VariantMasks> variantMasksVariantBucketHolder);

List<String> getInfoStoreColumns();

List<String> getInfoStoreValues(String conceptPath);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import edu.harvard.hms.dbmi.avillach.hpds.processing.genomic.GenomicProcessorRestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;

import java.util.ArrayList;
import java.util.List;

@SpringBootApplication
@PropertySource("classpath:application.properties")
public class GenomicProcessorConfig {

@Value("${HPDS_GENOMIC_DATA_DIRECTORY:/opt/local/hpds/all/}")
private String hpdsGenomicDataDirectory;


@Bean(name = "localGenomicProcessor")
@ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "local")
public GenomicProcessor localGenomicProcessor() {
// todo: make sure this is set as default
//System.getProperty("HPDS_GENOMIC_DATA_DIRECTORY", "/opt/local/hpds/all/");
return new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory);
}

@Bean(name = "remoteGenomicProcessor")
@ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "remote")
public GenomicProcessor remoteGenomicProcessor() {
// Just for testing, for now, move to a configuration file or something
List<GenomicProcessor> nodes = new ArrayList<>();
String[] hosts = new String[] {"http://localhost:8090/", "http://localhost:8091/"};
nodes = List.of(
new GenomicProcessorRestClient(hosts[0]),
new GenomicProcessorRestClient(hosts[1])
);
return new GenomicProcessorParentImpl(nodes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.io.*;
import java.math.BigInteger;
Expand Down Expand Up @@ -73,7 +74,7 @@ public GenomicProcessorNodeImpl(String genomicDataDirectory) {
}

@Override
public BigInteger getPatientMask(DistributableQuery distributableQuery) {
public Mono<BigInteger> getPatientMask(DistributableQuery distributableQuery) {
// log.debug("filterdIDSets START size: " + filteredIdSets.size());
/* VARIANT INFO FILTER HANDLING IS MESSY */
if(distributableQuery.hasFilters()) {
Expand Down Expand Up @@ -125,9 +126,9 @@ public BigInteger getPatientMask(DistributableQuery distributableQuery) {
}
}

return patientMask;
return Mono.just(patientMask);
}
return createMaskForPatientSet(distributableQuery.getPatientIds());
return Mono.fromCallable(() -> createMaskForPatientSet(distributableQuery.getPatientIds()));
/* END OF VARIANT INFO FILTER HANDLING */
}

Expand Down Expand Up @@ -224,7 +225,7 @@ private VariantIndex addVariantsForInfoFilter(VariantIndex unionOfInfoFilters, Q
}

@Override
public Collection<String> getVariantList(DistributableQuery query) {
public Mono<Collection<String>> getVariantList(DistributableQuery query) {
boolean queryContainsVariantInfoFilters = query.getVariantInfoFilters().stream().anyMatch(variantInfoFilter ->
!variantInfoFilter.categoryVariantInfoFilters.isEmpty() || !variantInfoFilter.numericVariantInfoFilters.isEmpty()
);
Expand Down Expand Up @@ -253,7 +254,7 @@ public Collection<String> getVariantList(DistributableQuery query) {
// If we have all patients then no variants would be filtered, so no need to do further processing
if(patientSubset.size()==variantService.getPatientIds().length) {
log.info("query selects all patient IDs, returning....");
return unionOfInfoFilters.mapToVariantSpec(variantService.getVariantIndex());
return Mono.just(unionOfInfoFilters.mapToVariantSpec(variantService.getVariantIndex()));
}

BigInteger patientMasks = createMaskForPatientSet(patientSubset);
Expand All @@ -276,12 +277,12 @@ public Collection<String> getVariantList(DistributableQuery query) {
}
});
});
return variantsWithPatients;
return Mono.just(variantsWithPatients);
}else {
return unionOfInfoFiltersVariantSpecs;
return Mono.just(unionOfInfoFiltersVariantSpecs);
}
}
return new ArrayList<>();
return Mono.just(new ArrayList<>());
}

private BigInteger getIdSetForVariantSpecCategoryFilter(String[] zygosities, String key, VariantBucketHolder<VariantMasks> bucketCache) {
Expand Down Expand Up @@ -348,13 +349,25 @@ private BigInteger calculateIndiscriminateBitmask(VariantMasks masks) {
}

@Override
public String[] getPatientIds() {
return variantService.getPatientIds();
public List<String> getPatientIds() {
return List.of(variantService.getPatientIds());
}

@Override
public Optional<VariantMasks> getMasks(String path, VariantBucketHolder<VariantMasks> variantMasksVariantBucketHolder) {
return variantService.getMasks(path, variantMasksVariantBucketHolder);
}

@Override
public List<String> getInfoStoreColumns() {
return infoStoreColumns;
}

@Override
public List<String> getInfoStoreValues(String conceptPath) {
return infoStores.get(conceptPath).getAllValues().keys()
.stream()
.sorted(String::compareToIgnoreCase)
.collect(Collectors.toList());
}
}
Loading

0 comments on commit 8d0c3a7

Please sign in to comment.