Skip to content

Commit

Permalink
ALS-6330: Move VariantMetadataIndex to genomic processor. Implement m…
Browse files Browse the repository at this point in the history
…erging of variant metadata indexes
  • Loading branch information
ramari16 committed Jun 10, 2024
1 parent a70da0d commit 7d85f71
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package edu.harvard.hms.dbmi.avillach.hpds.data.genotype;

import java.io.*;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import com.google.common.base.Joiner;
import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,23 +24,23 @@
* a fast, disk-based backing store.
*/
public class VariantMetadataIndex implements Serializable {
// todo: make this variable
public static String VARIANT_METADATA_BIN_FILE = "/opt/local/hpds/all/VariantMetadata.javabin";
public static final String VARIANT_METADATA_FILENAME = "VariantMetadata.javabin";
public static String VARIANT_METADATA_BIN_FILE = "/opt/local/hpds/all/" + VARIANT_METADATA_FILENAME;

private static final long serialVersionUID = 5917054606643971537L;
private static Logger log = LoggerFactory.getLogger(VariantMetadataIndex.class);

// (String) contig --> (Integer) Bucket --> (String) variant spec --> INFO column data[].
private Map<String, FileBackedByteIndexedStorage<Integer, ConcurrentHashMap<String, String[]>> > indexMap = new HashMap<String, FileBackedByteIndexedStorage<Integer, ConcurrentHashMap<String, String[]>> >();
private final Map<String, FileBackedByteIndexedStorage<Integer, ConcurrentHashMap<String, String[]>> > indexMap = new HashMap<>();

// todo: make this variable
private static String fileStoragePrefix = "/opt/local/hpds/all/VariantMetadataStorage";
public static final String VARIANT_METADATA_STORAGE_FILE_PREFIX = "VariantMetadataStorage";
private static String fileStoragePrefix = "/opt/local/hpds/all/" + VARIANT_METADATA_STORAGE_FILE_PREFIX;

/**
* This map allows us to load millions of variants without re-writing the fbbis each time (which would blow up the disk space).
* We need to remember to flush() between each contig this gets saved to the fbbis array.
*/
private transient Map<String, ConcurrentHashMap<Integer, ConcurrentHashMap<String, String[]>> > loadingMap = new HashMap<String, ConcurrentHashMap<Integer, ConcurrentHashMap<String, String[]>> >();
private transient Map<String, ConcurrentHashMap<Integer, ConcurrentHashMap<String, String[]>> > loadingMap = new HashMap<>();

/**
* This constructor should only be used for testing; we expect the files to be in the default locations in production
Expand Down Expand Up @@ -205,4 +208,35 @@ public static VariantMetadataIndex createInstance(String metadataIndexPath) {
return null;
}
}

public static void merge(VariantMetadataIndex variantMetadataIndex1, VariantMetadataIndex variantMetadataIndex2, String outputDirectory) throws IOException {
VariantMetadataIndex merged = new VariantMetadataIndex(outputDirectory + VARIANT_METADATA_STORAGE_FILE_PREFIX);
if (!variantMetadataIndex1.indexMap.keySet().equals(variantMetadataIndex2.indexMap.keySet())) {
log.warn("Merging incompatible variant indexes. Index1 keys: " + Joiner.on(",").join(variantMetadataIndex1.indexMap.keySet()) + ". Index 2 keys: " + Joiner.on(",").join(variantMetadataIndex2.indexMap.keySet()));
throw new IllegalStateException("Cannot merge variant metadata index with different contig keys");
}
for (String contig : variantMetadataIndex1.indexMap.keySet()) {
String filePath = outputDirectory + VARIANT_METADATA_STORAGE_FILE_PREFIX + "_" + contig + ".bin";
FileBackedByteIndexedStorage<Integer, ConcurrentHashMap<String, String[]>> mergedFbbis = new FileBackedJavaIndexedStorage(Integer.class, ConcurrentHashMap.class, new File(filePath));

FileBackedByteIndexedStorage<Integer, ConcurrentHashMap<String, String[]>> fbbis1 = variantMetadataIndex1.indexMap.get(contig);
FileBackedByteIndexedStorage<Integer, ConcurrentHashMap<String, String[]>> fbbis2 = variantMetadataIndex2.indexMap.get(contig);

fbbis1.keys().forEach(key -> {
mergedFbbis.put(key, fbbis1.get(key));
});
fbbis2.keys().forEach(key -> {
if (!mergedFbbis.keys().contains(key)) {
mergedFbbis.put(key, fbbis2.get(key));
}
});
mergedFbbis.complete();
merged.indexMap.put(contig, mergedFbbis);
}

try(ObjectOutputStream out = new ObjectOutputStream(new GZIPOutputStream(Files.newOutputStream(new File(outputDirectory + VARIANT_METADATA_FILENAME).toPath())))){
out.writeObject(merged);
out.flush();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype;

import com.google.common.base.Preconditions;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.FileBackedByteIndexedInfoStore;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariableVariantMasks;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.*;
import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,7 +21,6 @@ public class GenomicDatasetMergerRunner {
private static Logger log = LoggerFactory.getLogger(GenomicDatasetMerger.class);

public static final String INFO_STORE_JAVABIN_SUFFIX = "infoStore.javabin";
public static final String VARIANT_SPEC_INDEX_FILENAME = "variantSpecIndex.javabin";

private static String genomicDirectory1;
private static String genomicDirectory2;
Expand Down Expand Up @@ -56,6 +52,12 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio
variantIndexes.values().forEach(variantIndex -> {
variantIndex.write(new File(outputDirectory + variantIndex.column_key + "_" + INFO_STORE_JAVABIN_SUFFIX));
});

// todo: merge these
VariantMetadataIndex variantMetadataIndex1 = VariantMetadataIndex.createInstance(genomicDirectory1 + "/VariantMetadata.javabin");
VariantMetadataIndex variantMetadataIndex2 = VariantMetadataIndex.createInstance(genomicDirectory2 + "/VariantMetadata.javabin");

VariantMetadataIndex.merge(variantMetadataIndex1, variantMetadataIndex2, outputDirectory);
}

private static Map<String, FileBackedByteIndexedInfoStore> loadInfoStores(String directory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
import reactor.core.publisher.Mono;

import java.math.BigInteger;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.*;

public interface GenomicProcessor {
Mono<VariantMask> getPatientMask(DistributableQuery distributableQuery);
Expand All @@ -31,4 +28,6 @@ public interface GenomicProcessor {
Set<String> getInfoStoreValues(String conceptPath);

List<InfoColumnMeta> getInfoColumnMeta();

Map<String, String[]> getVariantMetadata(Collection<String> variantList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
import reactor.core.publisher.Mono;

import java.math.BigInteger;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.*;

public class GenomicProcessorNoOp implements GenomicProcessor {
@Override
Expand Down Expand Up @@ -58,4 +55,9 @@ public Set<String> getInfoStoreValues(String conceptPath) {
public List<InfoColumnMeta> getInfoColumnMeta() {
return null;
}

@Override
public Map<String, String[]> getVariantMetadata(Collection<String> variantList) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.common.base.Joiner;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.*;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Filter;
Expand All @@ -13,12 +12,13 @@
import reactor.core.scheduler.Schedulers;

import java.io.*;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;

import static edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMetadataIndex.VARIANT_METADATA_FILENAME;

public class GenomicProcessorNodeImpl implements GenomicProcessor {

private static Logger log = LoggerFactory.getLogger(GenomicProcessorNodeImpl.class);
Expand All @@ -35,6 +35,8 @@ public class GenomicProcessorNodeImpl implements GenomicProcessor {

private final VariantService variantService;

private final VariantMetadataIndex variantMetadataIndex;


private final String HOMOZYGOUS_VARIANT = "1/1";
private final String HETEROZYGOUS_VARIANT = "0/1";
Expand All @@ -44,6 +46,7 @@ public GenomicProcessorNodeImpl(String genomicDataDirectory) {
this.genomicDataDirectory = genomicDataDirectory;
this.variantService = new VariantService(genomicDataDirectory);
this.patientVariantJoinHandler = new PatientVariantJoinHandler(variantService);
this.variantMetadataIndex = VariantMetadataIndex.createInstance(genomicDataDirectory + VARIANT_METADATA_FILENAME);

infoStores = new HashMap<>();
File genomicDataDirectoryFile = new File(this.genomicDataDirectory);
Expand Down Expand Up @@ -396,4 +399,9 @@ public List<InfoColumnMeta> getInfoColumnMeta() {
)
.collect(Collectors.toList());
}

@Override
public Map<String, String[]> getVariantMetadata(Collection<String> variantList) {
return variantMetadataIndex.findByMultipleVariantSpec(variantList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ public List<InfoColumnMeta> getInfoColumnMeta() {
return infoColumnsMeta;
}

@Override
public Map<String, String[]> getVariantMetadata(Collection<String> variantList) {
return nodes.parallelStream()
.map(node -> node.getVariantMetadata(variantList))
.reduce((p1, p2) -> {
Map<String, String[]> mapCopy = new HashMap<>(p1);
mapCopy.putAll(p2);
return mapCopy;
}).orElseGet(Map::of);
}

private List<InfoColumnMeta> initInfoColumnsMeta() {
return nodes.parallelStream()
.map(GenomicProcessor::getInfoColumnMeta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ public List<InfoColumnMeta> getInfoColumnMeta() {
return infoColumnsMeta;
}

@Override
public Map<String, String[]> getVariantMetadata(Collection<String> variantList) {
return nodes.parallelStream()
.map(node -> node.getVariantMetadata(variantList))
.reduce((p1, p2) -> {
Map<String, String[]> mapCopy = new HashMap<>(p1);
mapCopy.putAll(p2);
return mapCopy;
}).orElseGet(Map::of);
}

private List<InfoColumnMeta> initInfoColumnsMeta() {
return nodes.parallelStream()
.map(GenomicProcessor::getInfoColumnMeta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
@Component
public class VariantListProcessor implements HpdsProcessor {

private final VariantMetadataIndex metadataIndex;
private final GenomicProcessor genomicProcessor;

private static Logger log = LoggerFactory.getLogger(VariantListProcessor.class);

Expand All @@ -36,9 +36,9 @@ public class VariantListProcessor implements HpdsProcessor {


@Autowired
public VariantListProcessor(AbstractProcessor abstractProcessor, @Value("${VCF_EXCERPT_ENABLED:false}") boolean vcfExcerptEnabled ) {
public VariantListProcessor(AbstractProcessor abstractProcessor, GenomicProcessor genomicProcessor, @Value("${VCF_EXCERPT_ENABLED:false}") boolean vcfExcerptEnabled ) {
this.abstractProcessor = abstractProcessor;
this.metadataIndex = VariantMetadataIndex.createInstance(VariantMetadataIndex.VARIANT_METADATA_BIN_FILE);
this.genomicProcessor = genomicProcessor;

VCF_EXCERPT_ENABLED = vcfExcerptEnabled;
//always enable aggregate queries if full queries are permitted.
Expand All @@ -52,7 +52,7 @@ public VariantListProcessor(AbstractProcessor abstractProcessor, @Value("${VCF_E

public VariantListProcessor(boolean isOnlyForTests, AbstractProcessor abstractProcessor) {
this.abstractProcessor = abstractProcessor;
this.metadataIndex = null;
this.genomicProcessor = null;

VCF_EXCERPT_ENABLED = "TRUE".equalsIgnoreCase(System.getProperty("VCF_EXCERPT_ENABLED", "FALSE"));
//always enable aggregate queries if full queries are permitted.
Expand Down Expand Up @@ -141,7 +141,7 @@ public String runVcfExcerptQuery(Query query, boolean includePatientData) throws

log.debug("variantList Size " + variantList.size());

Map<String, String[]> metadata = (metadataIndex == null ? null : metadataIndex.findByMultipleVariantSpec(variantList));
Map<String, String[]> metadata = genomicProcessor.getVariantMetadata(variantList);

log.debug("metadata size " + metadata.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import reactor.core.publisher.Mono;

import java.math.BigInteger;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.*;

public class GenomicProcessorRestClient implements GenomicProcessor {

Expand Down Expand Up @@ -117,4 +114,9 @@ public List<InfoColumnMeta> getInfoColumnMeta() {
.block();
return result;
}

@Override
public Map<String, String[]> getVariantMetadata(Collection<String> variantList) {
throw new RuntimeException("Not implemented yet");
}
}

0 comments on commit 7d85f71

Please sign in to comment.