From 7d85f71dcb9aefdedefeff6e37c4a00afccad693 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Mon, 10 Jun 2024 15:20:15 -0400 Subject: [PATCH] ALS-6330: Move VariantMetadataIndex to genomic processor. Implement merging of variant metadata indexes --- .../data/genotype/VariantMetadataIndex.java | 46 ++++++++++++++++--- .../genotype/GenomicDatasetMergerRunner.java | 12 +++-- .../hpds/processing/GenomicProcessor.java | 7 ++- .../hpds/processing/GenomicProcessorNoOp.java | 10 ++-- .../processing/GenomicProcessorNodeImpl.java | 12 ++++- .../GenomicProcessorParentImpl.java | 11 +++++ ...omicProcessorPatientMergingParentImpl.java | 11 +++++ .../hpds/processing/VariantListProcessor.java | 10 ++-- .../genomic/GenomicProcessorRestClient.java | 10 ++-- 9 files changed, 99 insertions(+), 30 deletions(-) diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java index 1f6cdf0f..eb4fe092 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java @@ -1,11 +1,14 @@ package edu.harvard.hms.dbmi.avillach.hpds.data.genotype; import java.io.*; +import java.nio.file.Files; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +import com.google.common.base.Joiner; import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,23 +24,23 @@ * a fast, disk-based backing store. */ public class VariantMetadataIndex implements Serializable { - // todo: make this variable - public static String VARIANT_METADATA_BIN_FILE = "/opt/local/hpds/all/VariantMetadata.javabin"; + public static final String VARIANT_METADATA_FILENAME = "VariantMetadata.javabin"; + public static String VARIANT_METADATA_BIN_FILE = "/opt/local/hpds/all/" + VARIANT_METADATA_FILENAME; private static final long serialVersionUID = 5917054606643971537L; private static Logger log = LoggerFactory.getLogger(VariantMetadataIndex.class); // (String) contig --> (Integer) Bucket --> (String) variant spec --> INFO column data[]. - private Map> > indexMap = new HashMap> >(); + private final Map> > indexMap = new HashMap<>(); - // todo: make this variable - private static String fileStoragePrefix = "/opt/local/hpds/all/VariantMetadataStorage"; + public static final String VARIANT_METADATA_STORAGE_FILE_PREFIX = "VariantMetadataStorage"; + private static String fileStoragePrefix = "/opt/local/hpds/all/" + VARIANT_METADATA_STORAGE_FILE_PREFIX; /** * This map allows us to load millions of variants without re-writing the fbbis each time (which would blow up the disk space). * We need to remember to flush() between each contig this gets saved to the fbbis array. */ - private transient Map> > loadingMap = new HashMap> >(); + private transient Map> > loadingMap = new HashMap<>(); /** * This constructor should only be used for testing; we expect the files to be in the default locations in production @@ -205,4 +208,35 @@ public static VariantMetadataIndex createInstance(String metadataIndexPath) { return null; } } + + public static void merge(VariantMetadataIndex variantMetadataIndex1, VariantMetadataIndex variantMetadataIndex2, String outputDirectory) throws IOException { + VariantMetadataIndex merged = new VariantMetadataIndex(outputDirectory + VARIANT_METADATA_STORAGE_FILE_PREFIX); + if (!variantMetadataIndex1.indexMap.keySet().equals(variantMetadataIndex2.indexMap.keySet())) { + log.warn("Merging incompatible variant indexes. Index1 keys: " + Joiner.on(",").join(variantMetadataIndex1.indexMap.keySet()) + ". Index 2 keys: " + Joiner.on(",").join(variantMetadataIndex2.indexMap.keySet())); + throw new IllegalStateException("Cannot merge variant metadata index with different contig keys"); + } + for (String contig : variantMetadataIndex1.indexMap.keySet()) { + String filePath = outputDirectory + VARIANT_METADATA_STORAGE_FILE_PREFIX + "_" + contig + ".bin"; + FileBackedByteIndexedStorage> mergedFbbis = new FileBackedJavaIndexedStorage(Integer.class, ConcurrentHashMap.class, new File(filePath)); + + FileBackedByteIndexedStorage> fbbis1 = variantMetadataIndex1.indexMap.get(contig); + FileBackedByteIndexedStorage> fbbis2 = variantMetadataIndex2.indexMap.get(contig); + + fbbis1.keys().forEach(key -> { + mergedFbbis.put(key, fbbis1.get(key)); + }); + fbbis2.keys().forEach(key -> { + if (!mergedFbbis.keys().contains(key)) { + mergedFbbis.put(key, fbbis2.get(key)); + } + }); + mergedFbbis.complete(); + merged.indexMap.put(contig, mergedFbbis); + } + + try(ObjectOutputStream out = new ObjectOutputStream(new GZIPOutputStream(Files.newOutputStream(new File(outputDirectory + VARIANT_METADATA_FILENAME).toPath())))){ + out.writeObject(merged); + out.flush(); + } + } } \ No newline at end of file diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java index ba3b2936..bfc38ea6 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java @@ -1,10 +1,7 @@ package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; import com.google.common.base.Preconditions; -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.FileBackedByteIndexedInfoStore; -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariableVariantMasks; -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks; -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.*; import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +21,6 @@ public class GenomicDatasetMergerRunner { private static Logger log = LoggerFactory.getLogger(GenomicDatasetMerger.class); public static final String INFO_STORE_JAVABIN_SUFFIX = "infoStore.javabin"; - public static final String VARIANT_SPEC_INDEX_FILENAME = "variantSpecIndex.javabin"; private static String genomicDirectory1; private static String genomicDirectory2; @@ -56,6 +52,12 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio variantIndexes.values().forEach(variantIndex -> { variantIndex.write(new File(outputDirectory + variantIndex.column_key + "_" + INFO_STORE_JAVABIN_SUFFIX)); }); + + // todo: merge these + VariantMetadataIndex variantMetadataIndex1 = VariantMetadataIndex.createInstance(genomicDirectory1 + "/VariantMetadata.javabin"); + VariantMetadataIndex variantMetadataIndex2 = VariantMetadataIndex.createInstance(genomicDirectory2 + "/VariantMetadata.javabin"); + + VariantMetadataIndex.merge(variantMetadataIndex1, variantMetadataIndex2, outputDirectory); } private static Map loadInfoStores(String directory) { diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessor.java index 8759338b..6a90255e 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessor.java @@ -8,10 +8,7 @@ import reactor.core.publisher.Mono; import java.math.BigInteger; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.Set; +import java.util.*; public interface GenomicProcessor { Mono getPatientMask(DistributableQuery distributableQuery); @@ -31,4 +28,6 @@ public interface GenomicProcessor { Set getInfoStoreValues(String conceptPath); List getInfoColumnMeta(); + + Map getVariantMetadata(Collection variantList); } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNoOp.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNoOp.java index 11013ffd..8d7ddcbe 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNoOp.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNoOp.java @@ -8,10 +8,7 @@ import reactor.core.publisher.Mono; import java.math.BigInteger; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.Set; +import java.util.*; public class GenomicProcessorNoOp implements GenomicProcessor { @Override @@ -58,4 +55,9 @@ public Set getInfoStoreValues(String conceptPath) { public List getInfoColumnMeta() { return null; } + + @Override + public Map getVariantMetadata(Collection variantList) { + return null; + } } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java index c0f65ac3..e957e321 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java @@ -2,7 +2,6 @@ import com.google.common.base.Joiner; import com.google.common.collect.Range; -import com.google.common.collect.Sets; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.*; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder; import edu.harvard.hms.dbmi.avillach.hpds.data.query.Filter; @@ -13,12 +12,13 @@ import reactor.core.scheduler.Schedulers; import java.io.*; -import java.math.BigInteger; import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; +import static edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMetadataIndex.VARIANT_METADATA_FILENAME; + public class GenomicProcessorNodeImpl implements GenomicProcessor { private static Logger log = LoggerFactory.getLogger(GenomicProcessorNodeImpl.class); @@ -35,6 +35,8 @@ public class GenomicProcessorNodeImpl implements GenomicProcessor { private final VariantService variantService; + private final VariantMetadataIndex variantMetadataIndex; + private final String HOMOZYGOUS_VARIANT = "1/1"; private final String HETEROZYGOUS_VARIANT = "0/1"; @@ -44,6 +46,7 @@ public GenomicProcessorNodeImpl(String genomicDataDirectory) { this.genomicDataDirectory = genomicDataDirectory; this.variantService = new VariantService(genomicDataDirectory); this.patientVariantJoinHandler = new PatientVariantJoinHandler(variantService); + this.variantMetadataIndex = VariantMetadataIndex.createInstance(genomicDataDirectory + VARIANT_METADATA_FILENAME); infoStores = new HashMap<>(); File genomicDataDirectoryFile = new File(this.genomicDataDirectory); @@ -396,4 +399,9 @@ public List getInfoColumnMeta() { ) .collect(Collectors.toList()); } + + @Override + public Map getVariantMetadata(Collection variantList) { + return variantMetadataIndex.findByMultipleVariantSpec(variantList); + } } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java index ea9ea853..a45f6141 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java @@ -135,6 +135,17 @@ public List getInfoColumnMeta() { return infoColumnsMeta; } + @Override + public Map getVariantMetadata(Collection variantList) { + return nodes.parallelStream() + .map(node -> node.getVariantMetadata(variantList)) + .reduce((p1, p2) -> { + Map mapCopy = new HashMap<>(p1); + mapCopy.putAll(p2); + return mapCopy; + }).orElseGet(Map::of); + } + private List initInfoColumnsMeta() { return nodes.parallelStream() .map(GenomicProcessor::getInfoColumnMeta) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java index 4d3b8a25..c3db2b11 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java @@ -155,6 +155,17 @@ public List getInfoColumnMeta() { return infoColumnsMeta; } + @Override + public Map getVariantMetadata(Collection variantList) { + return nodes.parallelStream() + .map(node -> node.getVariantMetadata(variantList)) + .reduce((p1, p2) -> { + Map mapCopy = new HashMap<>(p1); + mapCopy.putAll(p2); + return mapCopy; + }).orElseGet(Map::of); + } + private List initInfoColumnsMeta() { return nodes.parallelStream() .map(GenomicProcessor::getInfoColumnMeta) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantListProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantListProcessor.java index 02d99089..2868c833 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantListProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantListProcessor.java @@ -21,7 +21,7 @@ @Component public class VariantListProcessor implements HpdsProcessor { - private final VariantMetadataIndex metadataIndex; + private final GenomicProcessor genomicProcessor; private static Logger log = LoggerFactory.getLogger(VariantListProcessor.class); @@ -36,9 +36,9 @@ public class VariantListProcessor implements HpdsProcessor { @Autowired - public VariantListProcessor(AbstractProcessor abstractProcessor, @Value("${VCF_EXCERPT_ENABLED:false}") boolean vcfExcerptEnabled ) { + public VariantListProcessor(AbstractProcessor abstractProcessor, GenomicProcessor genomicProcessor, @Value("${VCF_EXCERPT_ENABLED:false}") boolean vcfExcerptEnabled ) { this.abstractProcessor = abstractProcessor; - this.metadataIndex = VariantMetadataIndex.createInstance(VariantMetadataIndex.VARIANT_METADATA_BIN_FILE); + this.genomicProcessor = genomicProcessor; VCF_EXCERPT_ENABLED = vcfExcerptEnabled; //always enable aggregate queries if full queries are permitted. @@ -52,7 +52,7 @@ public VariantListProcessor(AbstractProcessor abstractProcessor, @Value("${VCF_E public VariantListProcessor(boolean isOnlyForTests, AbstractProcessor abstractProcessor) { this.abstractProcessor = abstractProcessor; - this.metadataIndex = null; + this.genomicProcessor = null; VCF_EXCERPT_ENABLED = "TRUE".equalsIgnoreCase(System.getProperty("VCF_EXCERPT_ENABLED", "FALSE")); //always enable aggregate queries if full queries are permitted. @@ -141,7 +141,7 @@ public String runVcfExcerptQuery(Query query, boolean includePatientData) throws log.debug("variantList Size " + variantList.size()); - Map metadata = (metadataIndex == null ? null : metadataIndex.findByMultipleVariantSpec(variantList)); + Map metadata = genomicProcessor.getVariantMetadata(variantList); log.debug("metadata size " + metadata.size()); diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClient.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClient.java index de12dea0..fbeecbde 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClient.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/genomic/GenomicProcessorRestClient.java @@ -14,10 +14,7 @@ import reactor.core.publisher.Mono; import java.math.BigInteger; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.Set; +import java.util.*; public class GenomicProcessorRestClient implements GenomicProcessor { @@ -117,4 +114,9 @@ public List getInfoColumnMeta() { .block(); return result; } + + @Override + public Map getVariantMetadata(Collection variantList) { + throw new RuntimeException("Not implemented yet"); + } }