From a67d8fe3479157283061bd47eb26f8378cf10515 Mon Sep 17 00:00:00 2001 From: ramari16 Date: Wed, 23 Aug 2023 10:21:27 -0400 Subject: [PATCH] [ALS-4461] Allow incremental vcf loading (#73) * ALS-4461: Deserialize variant index from disk * ALS-4461: Add variant index builder for VCF loading * ALS-4461: Upgrade major version * ALS-4461: Store variants by index instead of full variant spec. Refactoring to support incremental vcf loading * ALS-4461: Initial commit for genomic dataset merger * ALS-4461: Add jar with dependencies build instructions * ALS-4461: Fix issue with hardcoded directory * ALS-4461: Fix more issues with non-relative file paths, various refactoring * ALS-4461: Parallelize chromosome mask merging * ALS-4461: Updated hpds version in dockerfile * ALS-4461: Update genomic directory on loading for variant index stores * ALS-4461: Change type of variant index store from String (variant spec) to Integer (variant id) * ALS-4461: Refactor duplicated variant store read/write code * ALS-4461: Fixing thread issues at startup * ALS-4461: Fix error handling * ALS-4461: Clean up error handling in file backed storages * ALS-4461: Remove IOExceptions thrown from FBBIS * ALS-4461: Fix deserialization issue * ALS-4461: Add comment explaining chromosome index merging * ALS-4461: Add comments * ALS-4461: Changes per PR * ALS-4461: Refactor variant spec index to make testing easier * ALS-4461: Refactor genomic dataset merger to support testing * ALS-4461: Add validation to prevent patient id duplicates * ALS-4461: Add main args validation * ALS-4461: Remove unused code * ALS-4461: Remove potential race condition --- client-api/pom.xml | 4 +- common/pom.xml | 10 +- .../storage/FileBackedByteIndexedStorage.java | 128 ++++---- .../storage/FileBackedJavaIndexedStorage.java | 32 ++ .../storage/FileBackedJsonIndexStorage.java | 40 +++ data/pom.xml | 2 +- .../data/genotype/BucketIndexBySample.java | 87 ++---- .../hpds/data/genotype/CompressedIndex.java | 2 +- .../FileBackedByteIndexedInfoStore.java | 60 ++-- .../hpds/data/genotype/InfoStore.java | 25 +- .../hpds/data/genotype/VariantMasks.java | 15 + .../data/genotype/VariantMetadataIndex.java | 9 +- .../hpds/data/genotype/VariantStore.java | 131 ++++---- .../FileBackedStorageVariantIndexImpl.java | 24 ++ .../FileBackedStorageVariantMasksImpl.java | 25 ++ docker/pic-sure-hpds/Dockerfile | 2 +- docker/pom.xml | 2 +- etl/pom.xml | 24 +- .../data/genotype/util/RemapPatientIds.java | 5 +- .../etl/genotype/GenomicDatasetMerger.java | 289 ++++++++++++++++++ .../genotype/GenomicDatasetMergerRunner.java | 81 +++++ .../hpds/etl/genotype/MultialleleCounter.java | 53 ---- .../hpds/etl/genotype/NewVCFLoader.java | 106 ++++--- ...FPerPatientInfoStoreToFBBIISConverter.java | 17 +- .../hpds/etl/genotype/VariantCounter.java | 41 --- .../etl/genotype/VariantIndexBuilder.java | 30 ++ .../genotype/BucketIndexBySampleTest.java | 8 +- pom.xml | 2 +- processing/pom.xml | 2 +- .../hpds/processing/AbstractProcessor.java | 1 + .../hpds/processing/VariantIndexCache.java | 16 +- .../hpds/processing/VariantService.java | 96 ++---- .../processing/AbstractProcessorTest.java | 4 +- service/pom.xml | 2 +- war/pom.xml | 2 +- 35 files changed, 878 insertions(+), 499 deletions(-) create mode 100644 common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJavaIndexedStorage.java create mode 100644 common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJsonIndexStorage.java create mode 100644 data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantIndexImpl.java create mode 100644 data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantMasksImpl.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java delete mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/MultialleleCounter.java delete mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantCounter.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantIndexBuilder.java diff --git a/client-api/pom.xml b/client-api/pom.xml index ef02dcbb..3746ec8e 100644 --- a/client-api/pom.xml +++ b/client-api/pom.xml @@ -4,12 +4,12 @@ pic-sure-hpds edu.harvard.hms.dbmi.avillach.hpds - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT edu.harvard.hms.dbmi.avillach.hpds client-api - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT client-api diff --git a/common/pom.xml b/common/pom.xml index 206c8a84..a92292d4 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -6,7 +6,7 @@ pic-sure-hpds edu.harvard.hms.dbmi.avillach.hpds - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT common @@ -21,5 +21,13 @@ com.google.guava guava + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + diff --git a/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedByteIndexedStorage.java b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedByteIndexedStorage.java index 80db84d8..1ecc466e 100644 --- a/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedByteIndexedStorage.java +++ b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedByteIndexedStorage.java @@ -1,28 +1,17 @@ package edu.harvard.hms.dbmi.avillach.hpds.storage; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.RandomAccessFile; -import java.io.Serializable; +import java.io.*; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; -import org.apache.commons.io.output.ByteArrayOutputStream; - -public class FileBackedByteIndexedStorage implements Serializable { +public abstract class FileBackedByteIndexedStorage implements Serializable { private static final long serialVersionUID = -7297090745384302635L; - private transient RandomAccessFile storage; - private ConcurrentHashMap index; - private File storageFile; - private boolean completed = false; - private Long maxStorageSize; //leave this in to not break serialization + protected transient RandomAccessFile storage; + protected ConcurrentHashMap index; + protected File storageFile; + protected boolean completed = false; + public FileBackedByteIndexedStorage(Class keyClass, Class valueClass, File storageFile) throws FileNotFoundException { this.index = new ConcurrentHashMap(); @@ -30,15 +19,34 @@ public FileBackedByteIndexedStorage(Class keyClass, Class valueClass, File this.storage = new RandomAccessFile(this.storageFile, "rw"); } + public void updateStorageDirectory(File storageDirectory) { + if (!storageDirectory.isDirectory()) { + throw new IllegalArgumentException("storageDirectory is not a directory"); + } + String currentStoreageFilename = storageFile.getName(); + storageFile = new File(storageDirectory, currentStoreageFilename); + } + public Set keys(){ return index.keySet(); } - public void put(K key, V value) throws IOException { + public void put(K key, V value) { if(completed) { throw new RuntimeException("A completed FileBackedByteIndexedStorage cannot be modified."); } - Long[] recordIndex = store(value); + Long[] recordIndex; + try (ByteArrayOutputStream out = writeObject(value)) { + recordIndex = new Long[2]; + synchronized (storage) { + storage.seek(storage.length()); + recordIndex[0] = storage.getFilePointer(); + storage.write(out.toByteArray()); + recordIndex[1] = storage.getFilePointer() - recordIndex[0]; + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } index.put(key, recordIndex); } @@ -63,60 +71,44 @@ public void complete() { this.completed = true; } - public boolean isComplete() { - return this.completed; - } - - private Long[] store(V value) throws IOException { - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(out)); - oos.writeObject(value); - oos.flush(); - oos.close(); - - Long[] recordIndex = new Long[2]; - synchronized(storage) { - storage.seek(storage.length()); - recordIndex[0] = storage.getFilePointer(); - storage.write(out.toByteArray()); - recordIndex[1] = storage.getFilePointer() - recordIndex[0]; -// maxStorageSize = storage.getFilePointer(); - } - return recordIndex; - } - - public V get(K key) throws IOException { - if(this.storage==null) { + public V get(K key) { + try { + // todo: make this class immutable and remove this lock/check altogether synchronized(this) { - this.open(); - } - } - Long[] offsetsInStorage = index.get(key); - if(offsetsInStorage != null) { - Long offsetInStorage = index.get(key)[0]; - int offsetLength = index.get(key)[1].intValue(); - if(offsetInStorage != null && offsetLength>0) { - byte[] buffer = new byte[offsetLength]; - synchronized(storage) { - storage.seek(offsetInStorage); - storage.readFully(buffer); + if(this.storage==null) { + this.open(); } - ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(buffer))); - - try { - V readObject = (V) in.readObject(); + } + Long[] offsetsInStorage = index.get(key); + if(offsetsInStorage != null) { + Long offsetInStorage = index.get(key)[0]; + int offsetLength = index.get(key)[1].intValue(); + if(offsetInStorage != null && offsetLength>0) { + byte[] buffer = new byte[offsetLength]; + synchronized(storage) { + storage.seek(offsetInStorage); + storage.readFully(buffer); + } + V readObject = readObject(buffer); return readObject; - } catch (ClassNotFoundException e) { - throw new RuntimeException("This should never happen."); - } finally { - in.close(); + }else { + return null; } - }else { + } else { return null; } - } else { - return null; + } catch (IOException e) { + throw new UncheckedIOException(e); } } + + protected abstract V readObject(byte[] buffer); + + protected abstract ByteArrayOutputStream writeObject(V value) throws IOException; + + public V getOrELse(K key, V defaultValue) { + V result = get(key); + return result == null ? defaultValue : result; + } + } diff --git a/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJavaIndexedStorage.java b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJavaIndexedStorage.java new file mode 100644 index 00000000..26968729 --- /dev/null +++ b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJavaIndexedStorage.java @@ -0,0 +1,32 @@ +package edu.harvard.hms.dbmi.avillach.hpds.storage; + +import java.io.*; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class FileBackedJavaIndexedStorage extends FileBackedByteIndexedStorage { + public FileBackedJavaIndexedStorage(Class keyClass, Class valueClass, File storageFile) throws FileNotFoundException { + super(keyClass, valueClass, storageFile); + } + + protected ByteArrayOutputStream writeObject(V value) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(out)); + oos.writeObject(value); + oos.flush(); + oos.close(); + return out; + } + + @Override + protected V readObject(byte[] buffer) { + try (ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(buffer)));) { + V readObject = (V) in.readObject(); + return readObject; + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } +} diff --git a/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJsonIndexStorage.java b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJsonIndexStorage.java new file mode 100644 index 00000000..f8f97c6f --- /dev/null +++ b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJsonIndexStorage.java @@ -0,0 +1,40 @@ +package edu.harvard.hms.dbmi.avillach.hpds.storage; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +import java.io.*; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public abstract class FileBackedJsonIndexStorage extends FileBackedByteIndexedStorage { + private static final long serialVersionUID = -1086729119489479152L; + + protected transient ObjectMapper objectMapper = new ObjectMapper(); + + public FileBackedJsonIndexStorage(File storageFile) throws FileNotFoundException { + super(null, null, storageFile); + } + + protected ByteArrayOutputStream writeObject(V value) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + objectMapper.writeValue(new GZIPOutputStream(out), value); + return out; + } + + protected V readObject(byte[] buffer) { + try { + return objectMapper.readValue(new GZIPInputStream(new ByteArrayInputStream(buffer)), getTypeReference()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // Required to populate the objectMapper on deserialization + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + objectMapper = new ObjectMapper(); + } + + public abstract TypeReference getTypeReference(); +} diff --git a/data/pom.xml b/data/pom.xml index 298160fc..c2b27598 100644 --- a/data/pom.xml +++ b/data/pom.xml @@ -5,7 +5,7 @@ pic-sure-hpds edu.harvard.hms.dbmi.avillach.hpds - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT data diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java index 46723234..ade0cfce 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java @@ -6,6 +6,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,22 +87,18 @@ public BucketIndexBySample(VariantStore variantStore, String storageDir) throws // Create a bitmask with 1 values for each patient who has any variant in this bucket BigInteger[] patientMaskForBucket = {variantStore.emptyBitmask()}; - try { - contigStore.get(bucket).values().forEach((VariantMasks masks)->{ - if(masks.heterozygousMask!=null) { - patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.heterozygousMask); - } - //add hetreo no call bits to mask - if(masks.heterozygousNoCallMask!=null) { - patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.heterozygousNoCallMask); - } - if(masks.homozygousMask!=null) { - patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.homozygousMask); - } - }); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + contigStore.get(bucket).values().forEach((VariantMasks masks)->{ + if(masks.heterozygousMask!=null) { + patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.heterozygousMask); + } + //add hetreo no call bits to mask + if(masks.heterozygousNoCallMask!=null) { + patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.heterozygousNoCallMask); + } + if(masks.homozygousMask!=null) { + patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.homozygousMask); + } + }); // For each patient set the patientBucketCharMask entry to 0 or 1 if they have a variant in the bucket. int maxIndex = patientMaskForBucket[0].bitLength() - 1; @@ -121,37 +118,17 @@ public BucketIndexBySample(VariantStore variantStore, String storageDir) throws }); // populate patientBucketMasks with bucketMasks for each patient - patientBucketMasks = new FileBackedByteIndexedStorage(Integer.class, BigInteger.class, new File(storageFileStr)); + patientBucketMasks = new FileBackedJavaIndexedStorage(Integer.class, BigInteger.class, new File(storageFileStr)); - //the process to write out the bucket masks takes a very long time. - //Lets spin up another thread that occasionally logs progress int[] processedPatients = new int[1]; - processedPatients[0] = 0; - new Thread(new Runnable() { - @Override - public void run() { - log.info("writing patient bucket masks to backing store (this may take some time)."); - while(!patientBucketMasks.isComplete()) { - try { - Thread.sleep(5 * 1000 * 60); //log a message every 5 minutes - } catch (InterruptedException e) { - log.error("Thread interrupted", e); - } - log.info("wrote " + processedPatients[0] + " patient bucket masks"); - } - } - }).start(); - patientIds.parallelStream().forEach((patientId)->{ - try { - BigInteger patientMask = new BigInteger(new String(patientBucketCharMasks[patientIds.indexOf(patientId)]),2); - patientBucketMasks.put(patientId, patientMask); - }catch(NumberFormatException e) { - log.error("NFE caught for " + patientId, e); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + BigInteger patientMask = new BigInteger(new String(patientBucketCharMasks[patientIds.indexOf(patientId)]),2); + patientBucketMasks.put(patientId, patientMask); processedPatients[0] += 1; + int processedPatientsCount = processedPatients[0]; + if (processedPatientsCount % 1000 == 0) { + log.info("wrote " + processedPatientsCount + " patient bucket masks"); + } }); patientBucketMasks.complete(); log.info("Done creating patient bucket masks"); @@ -174,14 +151,9 @@ public Collection filterVariantSetForPatientSet(Set variantSet, new BigInteger(new String(emptyBucketMaskChar()),2) : patientBucketMasks.get(patientSet.get(0)); BigInteger _defaultMask = patientBucketMask; - List patientBucketmasksForSet = patientSet.parallelStream().map((patientNum)->{ - try { - return patientBucketMasks.get(patientNum); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - //return _defaultMask; - }).collect(Collectors.toList()); + List patientBucketmasksForSet = patientSet.parallelStream() + .map((patientNum)-> patientBucketMasks.get(patientNum)) + .collect(Collectors.toList()); for(BigInteger patientMask : patientBucketmasksForSet) { patientBucketMask = patientMask.or(patientBucketMask); } @@ -218,17 +190,4 @@ private char[] emptyBucketMaskChar() { } return _emptyBucketMaskChar.clone(); } - - /** - * Use while debugging - */ - public void printPatientMasks() { - for(Integer patientID : patientBucketMasks.keys()) { - try { - log.info("BucketMask length for " + patientID + ":\t" + patientBucketMasks.get(patientID).toString(2).length()); - } catch (IOException e) { - log.error("FBBIS Error: ", e); - } - } - } } \ No newline at end of file diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/CompressedIndex.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/CompressedIndex.java index eb8705b9..48637410 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/CompressedIndex.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/CompressedIndex.java @@ -32,7 +32,7 @@ public class CompressedIndex implements Serializable { private HashMap, byte[]> compressedRangeMap; private int valueCount; - public TreeMap> buildContinuousValuesMap(FileBackedByteIndexedStorage allValues) { + public TreeMap> buildContinuousValuesMap(FileBackedByteIndexedStorage allValues) { TreeMap> continuousValueMap = new TreeMap<>(); for(String key : allValues.keys()) { try{ diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/FileBackedByteIndexedInfoStore.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/FileBackedByteIndexedInfoStore.java index f282707b..4fd4ae37 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/FileBackedByteIndexedInfoStore.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/FileBackedByteIndexedInfoStore.java @@ -1,8 +1,6 @@ package edu.harvard.hms.dbmi.avillach.hpds.data.genotype; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; +import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.TreeMap; @@ -10,8 +8,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; +import edu.harvard.hms.dbmi.avillach.hpds.data.storage.FileBackedStorageVariantIndexImpl; import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage; public class FileBackedByteIndexedInfoStore implements Serializable { @@ -21,12 +22,12 @@ public class FileBackedByteIndexedInfoStore implements Serializable { public boolean isContinuous; public Float min = Float.MAX_VALUE, max = Float.MIN_VALUE; - private FileBackedByteIndexedStorage allValues; + private FileBackedByteIndexedStorage allValues; public TreeMap> continuousValueMap; public CompressedIndex continuousValueIndex; - public FileBackedByteIndexedStorage getAllValues() { + public FileBackedByteIndexedStorage getAllValues() { return allValues; } @@ -41,8 +42,8 @@ public List search(String term) { } } - public void addEntry(String value, String[] variantSpecs) throws IOException { - allValues.put(value, variantSpecs); + public void addEntry(String value, Integer[] variantIds) throws IOException { + allValues.put(value, variantIds); } @@ -51,8 +52,7 @@ public void complete() { } public FileBackedByteIndexedInfoStore(File storageFolder, InfoStore infoStore) throws IOException { - this.allValues = new FileBackedByteIndexedStorage(String.class, String[].class, - new File(storageFolder, infoStore.column_key + "_infoStoreStorage.javabin")); + this.allValues = new FileBackedStorageVariantIndexImpl(new File(storageFolder, infoStore.column_key + "_infoStoreStorage.javabin")); this.description = infoStore.description; this.column_key = infoStore.column_key; this.isContinuous = infoStore.isNumeric(); @@ -71,8 +71,8 @@ public FileBackedByteIndexedInfoStore(File storageFolder, InfoStore infoStore) t if(x%10000 == 0) { System.out.println(infoStore.column_key + " " + ((((double)x) / sortedKeys.size()) * 100) + "% done"); } - ConcurrentSkipListSet variantSpecs = infoStore.allValues.get(key); - addEntry(key, variantSpecs.toArray(new String[variantSpecs.size()])); + ConcurrentSkipListSet variantIds = infoStore.allValues.get(key); + addEntry(key, variantIds.toArray(new Integer[variantIds.size()])); x++; } } @@ -89,10 +89,10 @@ public FileBackedByteIndexedInfoStore(File storageFolder, InfoStore infoStore) t private static void normalizeNumericStore(InfoStore store) { TreeSet allKeys = new TreeSet(store.allValues.keySet()); - ConcurrentHashMap> normalizedValues = new ConcurrentHashMap<>(); + ConcurrentHashMap> normalizedValues = new ConcurrentHashMap<>(); for(String key : allKeys) { String[] keys = key.split(","); - ConcurrentSkipListSet variantSpecs = store.allValues.get(key); + ConcurrentSkipListSet variantIds = store.allValues.get(key); if(key.contentEquals(".")) { //don't add it }else if(keyHasMultipleValues(keys)) { @@ -100,26 +100,26 @@ private static void normalizeNumericStore(InfoStore store) { if(value.contentEquals(".")) { }else { - ConcurrentSkipListSet normalizedSpecs = normalizedValues.get(value); - if(normalizedSpecs == null) { - normalizedSpecs = variantSpecs; + ConcurrentSkipListSet normalizedVariantIds = normalizedValues.get(value); + if(normalizedVariantIds == null) { + normalizedVariantIds = variantIds; }else { - normalizedSpecs.addAll(variantSpecs); + normalizedVariantIds.addAll(variantIds); } - normalizedValues.put(value, normalizedSpecs); + normalizedValues.put(value, normalizedVariantIds); } } }else { if(key.contentEquals(".")) { }else { - ConcurrentSkipListSet normalizedSpecs = normalizedValues.get(key); - if(normalizedSpecs == null) { - normalizedSpecs = variantSpecs; + ConcurrentSkipListSet normalizedVariantIds = normalizedValues.get(key); + if(normalizedVariantIds == null) { + normalizedVariantIds = variantIds; }else { - normalizedSpecs.addAll(variantSpecs); + normalizedVariantIds.addAll(variantIds); } - normalizedValues.put(key, normalizedSpecs); + normalizedValues.put(key, normalizedVariantIds); } } @@ -139,5 +139,19 @@ private static boolean keyHasMultipleValues(String[] keys) { return x>1; } + public void updateStorageDirectory(File storageDirectory) { + allValues.updateStorageDirectory(storageDirectory); + } + + public void write(File outputFile) { + try( + FileOutputStream fos = new FileOutputStream(outputFile); + GZIPOutputStream gzos = new GZIPOutputStream(fos); + ObjectOutputStream oos = new ObjectOutputStream(gzos);) { + oos.writeObject(this); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/InfoStore.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/InfoStore.java index b58373f2..a62e0591 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/InfoStore.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/InfoStore.java @@ -14,7 +14,7 @@ public class InfoStore implements Serializable { public final String column_key; public final String description; - public ConcurrentHashMap> allValues = new ConcurrentHashMap<>(); + public ConcurrentHashMap> allValues = new ConcurrentHashMap<>(); private String prefix; public List search(String term) { @@ -28,21 +28,6 @@ public List search(String term) { } } - public void processRecord(VariantSpec spec, String[] values) { - for(String value : values) { - if(value.startsWith(column_key + "=")) { - String valueWithoutkey = value.replaceFirst(column_key + "=", ""); - ConcurrentSkipListSet entriesForValue = allValues.get(valueWithoutkey); - if(entriesForValue == null) { - entriesForValue = new ConcurrentSkipListSet<>(); - allValues.put(valueWithoutkey, entriesForValue); - } - entriesForValue.add(spec.specNotation()); - } - } - } - - public InfoStore(String description, String delimiter, String key) { this.prefix = key + "="; this.description = description; @@ -53,7 +38,7 @@ public boolean isNumeric() { int nonNumericCount = 0; int numericCount = 0; System.out.println("Testing for numeric : " + this.column_key + " : " + allValues.size() + " values"); - KeySetView> allKeys = allValues.keySet(); + KeySetView> allKeys = allValues.keySet(); for(String key : allKeys){ try { Double.parseDouble(key); @@ -84,16 +69,16 @@ public boolean isNumeric() { return false; } - public void processRecord(String specNotation, String[] infoValues) { + public void processRecord(Integer variantId, String[] infoValues) { for(String value : infoValues) { if(value.startsWith(prefix)) { String valueWithoutkey = value.substring(prefix.length()); - ConcurrentSkipListSet entriesForValue = allValues.get(valueWithoutkey); + ConcurrentSkipListSet entriesForValue = allValues.get(valueWithoutkey); if(entriesForValue == null) { entriesForValue = new ConcurrentSkipListSet<>(); allValues.put(valueWithoutkey, entriesForValue); } - entriesForValue.add(specNotation); + entriesForValue.add(variantId); } } } diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMasks.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMasks.java index 9d3e599a..a8e2b75d 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMasks.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMasks.java @@ -1,5 +1,9 @@ package edu.harvard.hms.dbmi.avillach.hpds.data.genotype; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.map.ser.ToStringSerializer; + import java.io.Serializable; import java.math.BigInteger; @@ -166,8 +170,19 @@ public VariantMasks(char[][] maskValues) { } + public VariantMasks() { + } + + @JsonProperty("ho") + @JsonSerialize(using = ToStringSerializer.class, include=JsonSerialize.Inclusion.NON_NULL) public BigInteger homozygousMask; + @JsonProperty("he") + @JsonSerialize(using = ToStringSerializer.class, include=JsonSerialize.Inclusion.NON_NULL) public BigInteger heterozygousMask; + @JsonProperty("hon") + @JsonSerialize(using = ToStringSerializer.class, include=JsonSerialize.Inclusion.NON_NULL) public BigInteger homozygousNoCallMask; + @JsonProperty("hen") + @JsonSerialize(using = ToStringSerializer.class, include=JsonSerialize.Inclusion.NON_NULL) public BigInteger heterozygousNoCallMask; } diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java index 3b706cbc..1f6cdf0f 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java @@ -6,6 +6,7 @@ import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +92,7 @@ public String[] findBySingleVariantSpec(String variantSpec, VariantBucketHolder< log.warn("No bucket found for spec " + variantSpec + " in bucket " + chrOffset); return new String[0]; - } catch (IOException e) { + } catch (UncheckedIOException e) { log.warn("IOException caught looking up variantSpec : " + variantSpec, e); return new String[0]; } @@ -112,7 +113,6 @@ public Map findByMultipleVariantSpec(Collection varien * have to write them to disk once. The data will be written to disk only when the flush() method is called. * * @param variantSpec - * @param array * @throws IOException */ public void put(String variantSpec, String metaData ) throws IOException { @@ -161,7 +161,7 @@ public synchronized void flush() throws IOException { if(contigFbbis == null) { log.info("creating new file for " + contig); String filePath = fileStoragePrefix + "_" + contig + ".bin"; - contigFbbis = new FileBackedByteIndexedStorage>(Integer.class, (Class>)(Class) ConcurrentHashMap.class, new File(filePath)); + contigFbbis = new FileBackedJavaIndexedStorage(Integer.class, (Class>)(Class) ConcurrentHashMap.class, new File(filePath)); indexMap.put(contig, contigFbbis); } @@ -200,7 +200,8 @@ public static VariantMetadataIndex createInstance(String metadataIndexPath) { return (VariantMetadataIndex) in.readObject(); } catch(Exception e) { // todo: handle exceptions better - log.error("No Metadata Index found at " + metadataIndexPath, e); + log.info("No Metadata Index found at " + metadataIndexPath); + log.debug("Error loading metadata index:", e); return null; } } diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantStore.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantStore.java index 4a566427..6541c9dc 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantStore.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantStore.java @@ -1,60 +1,78 @@ package edu.harvard.hms.dbmi.avillach.hpds.data.genotype; +import com.google.common.collect.RangeSet; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.*; import java.math.BigInteger; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import com.google.errorprone.annotations.Var; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.event.Level; - -import com.google.common.collect.RangeSet; - -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder; -import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage; - public class VariantStore implements Serializable { private static final long serialVersionUID = -6970128712587609414L; private static Logger log = LoggerFactory.getLogger(VariantStore.class); public static final int BUCKET_SIZE = 1000; + public static final String VARIANT_SPEC_INDEX_FILE = "variantSpecIndex.javabin"; + private BigInteger emptyBitmask; private String[] patientIds; + private transient String[] variantSpecIndex; + private Integer variantStorageSize; private String[] vcfHeaders = new String[24]; - private TreeMap>> variantMaskStorage = new TreeMap<>(); + private Map>> variantMaskStorage = new TreeMap<>(); - public TreeMap>> getVariantMaskStorage() { + public Map>> getVariantMaskStorage() { return variantMaskStorage; } - public void setVariantMaskStorage(TreeMap>> variantMaskStorage) { + public void setVariantMaskStorage(Map>> variantMaskStorage) { this.variantMaskStorage = variantMaskStorage; } - public static VariantStore deserializeInstance(String genomicDataDirectory) throws IOException, ClassNotFoundException, InterruptedException { - if(new File(genomicDataDirectory + "variantStore.javabin").exists()) { - ObjectInputStream ois = new ObjectInputStream(new GZIPInputStream(new FileInputStream(genomicDataDirectory + "variantStore.javabin"))); - VariantStore variantStore = (VariantStore) ois.readObject(); - ois.close(); - variantStore.open(); - return variantStore; - } else { - //we still need an object to reference when checking the variant store, even if it's empty. - VariantStore variantStore = new VariantStore(); - variantStore.setPatientIds(new String[0]); - return variantStore; + public String[] getVariantSpecIndex() { + return variantSpecIndex; + } + public void setVariantSpecIndex(String[] variantSpecIndex) { + this.variantSpecIndex = variantSpecIndex; + } + + public static VariantStore readInstance(String genomicDataDirectory) throws IOException, ClassNotFoundException { + ObjectInputStream ois = new ObjectInputStream(new GZIPInputStream(new FileInputStream(genomicDataDirectory + "variantStore.javabin"))); + VariantStore variantStore = (VariantStore) ois.readObject(); + ois.close(); + variantStore.getVariantMaskStorage().values().forEach(store -> { + store.updateStorageDirectory(new File(genomicDataDirectory)); + }); + variantStore.open(); + variantStore.setVariantSpecIndex(loadVariantIndexFromFile(genomicDataDirectory)); + return variantStore; + } + + public void writeInstance(String genomicDirectory) { + try (FileOutputStream fos = new FileOutputStream(new File(genomicDirectory, "variantStore.javabin")); + GZIPOutputStream gzos = new GZIPOutputStream(fos); + ObjectOutputStream oos = new ObjectOutputStream(gzos);) { + oos.writeObject(this); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + try (FileOutputStream fos = new FileOutputStream(new File(genomicDirectory, "variantSpecIndex.javabin")); + GZIPOutputStream gzos = new GZIPOutputStream(fos); + ObjectOutputStream oos = new ObjectOutputStream(gzos);) { + oos.writeObject(Arrays.asList(variantSpecIndex)); + } catch (IOException e) { + throw new UncheckedIOException(e); } } @@ -63,31 +81,27 @@ public Map countVariants() { TreeMap counts = new TreeMap<>(); for (String contig : variantMaskStorage.keySet()) { counts.put(contig, new int[5]); - FileBackedByteIndexedStorage> storage = variantMaskStorage + FileBackedJsonIndexStorage> storage = variantMaskStorage .get(contig); storage.keys().stream().forEach((Integer key) -> { int[] contigCounts = counts.get(contig); - try { - Collection values = storage.get(key).values(); - contigCounts[0] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { - return masks.heterozygousMask != null ? 1 : 0; - })); - contigCounts[1] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { - return masks.homozygousMask != null ? 1 : 0; - })); - contigCounts[2] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { - return masks.heterozygousNoCallMask != null ? 1 : 0; - })); - contigCounts[3] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { - return masks.homozygousNoCallMask != null ? 1 : 0; - })); - contigCounts[4] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { - return masks.heterozygousMask != null || masks.homozygousMask != null - || masks.heterozygousNoCallMask != null || masks.homozygousNoCallMask != null ? 1 : 0; - })); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + Collection values = storage.get(key).values(); + contigCounts[0] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { + return masks.heterozygousMask != null ? 1 : 0; + })); + contigCounts[1] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { + return masks.homozygousMask != null ? 1 : 0; + })); + contigCounts[2] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { + return masks.heterozygousNoCallMask != null ? 1 : 0; + })); + contigCounts[3] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { + return masks.homozygousNoCallMask != null ? 1 : 0; + })); + contigCounts[4] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> { + return masks.heterozygousMask != null || masks.homozygousMask != null + || masks.heterozygousNoCallMask != null || masks.homozygousNoCallMask != null ? 1 : 0; + })); }); } return counts; @@ -160,7 +174,7 @@ public void setVariantStorageSize(int variantStorageSize) { public List getMasksForRangesOfChromosome(String contigForGene, List offsetsForGene, RangeSet rangeSetsForGene) throws IOException { - FileBackedByteIndexedStorage masksForChromosome = variantMaskStorage.get(contigForGene); + FileBackedJsonIndexStorage masksForChromosome = variantMaskStorage.get(contigForGene); Set bucketsForGene = offsetsForGene.stream().map((offset) -> { return offset / BUCKET_SIZE; }).collect(Collectors.toSet()); @@ -189,4 +203,19 @@ public BigInteger emptyBitmask() { return emptyBitmask; } + public static String[] loadVariantIndexFromFile(String genomicDataDirectory) { + try (ObjectInputStream objectInputStream = new ObjectInputStream(new GZIPInputStream(new FileInputStream(genomicDataDirectory + "/" + VARIANT_SPEC_INDEX_FILE)));){ + + List variants = (List) objectInputStream.readObject(); + return variants.toArray(new String[0]); + + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + } diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantIndexImpl.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantIndexImpl.java new file mode 100644 index 00000000..f2ec9e48 --- /dev/null +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantIndexImpl.java @@ -0,0 +1,24 @@ +package edu.harvard.hms.dbmi.avillach.hpds.data.storage; + +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage; +import org.codehaus.jackson.type.TypeReference; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.Serializable; + +public class FileBackedStorageVariantIndexImpl extends FileBackedJsonIndexStorage implements Serializable { + private static final long serialVersionUID = -893724459359928779L; + + public FileBackedStorageVariantIndexImpl(File storageFile) throws FileNotFoundException { + super(storageFile); + } + + private static final TypeReference typeRef + = new TypeReference() {}; + + @Override + public TypeReference getTypeReference() { + return typeRef; + } +} diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantMasksImpl.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantMasksImpl.java new file mode 100644 index 00000000..6d39d79a --- /dev/null +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantMasksImpl.java @@ -0,0 +1,25 @@ +package edu.harvard.hms.dbmi.avillach.hpds.data.storage; + +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage; +import org.codehaus.jackson.type.TypeReference; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.Serializable; +import java.util.concurrent.ConcurrentHashMap; + +public class FileBackedStorageVariantMasksImpl extends FileBackedJsonIndexStorage> implements Serializable { + private static final long serialVersionUID = -1086729119489479152L; + + public FileBackedStorageVariantMasksImpl(File storageFile) throws FileNotFoundException { + super(storageFile); + } + private static final TypeReference> typeRef + = new TypeReference>() {}; + + @Override + public TypeReference> getTypeReference() { + return typeRef; + } +} diff --git a/docker/pic-sure-hpds/Dockerfile b/docker/pic-sure-hpds/Dockerfile index 0b38a4de..64366119 100644 --- a/docker/pic-sure-hpds/Dockerfile +++ b/docker/pic-sure-hpds/Dockerfile @@ -6,6 +6,6 @@ RUN apk add --no-cache --purge -uU curl wget unzip RUN apk add --no-cache --purge openjdk11 -ADD hpds-war-1.0-SNAPSHOT-war-exec.jar /hpds.jar +ADD hpds-war-2.0.0-SNAPSHOT-war-exec.jar /hpds.jar EXPOSE 8080 diff --git a/docker/pom.xml b/docker/pom.xml index d19a2729..8322b603 100644 --- a/docker/pom.xml +++ b/docker/pom.xml @@ -5,7 +5,7 @@ pic-sure-hpds edu.harvard.hms.dbmi.avillach.hpds - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT docker diff --git a/etl/pom.xml b/etl/pom.xml index ded9fdd5..d56669d6 100644 --- a/etl/pom.xml +++ b/etl/pom.xml @@ -6,7 +6,7 @@ pic-sure-hpds edu.harvard.hms.dbmi.avillach.hpds - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT etl @@ -325,7 +325,27 @@ single - + + + GenomicDatasetMergerRunner + + + + edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.GenomicDatasetMergerRunner + + + ${project.basedir}/../docker/pic-sure-hpds-etl + + jar-with-dependencies + + GenomicDatasetMergerRunner + GenomicDatasetMergerRunner + + package + + single + + diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java index cab26774..0bd73458 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java @@ -51,10 +51,7 @@ public class RemapPatientIds { private static final int TEXT_VALUE = 3; public static void main(String[] args) throws ClassNotFoundException, FileNotFoundException, IOException { - - ObjectInputStream objectInputStream = new ObjectInputStream(new GZIPInputStream(new FileInputStream("/opt/local/hpds/all/variantStore.javabin"))); - VariantStore variantStore = (VariantStore) objectInputStream.readObject(); - objectInputStream.close(); + VariantStore variantStore = VariantStore.readInstance("/opt/local/hpds/all/"); String[] oldPatientIds = variantStore.getPatientIds(); String[] newPatientIds = new String[oldPatientIds.length]; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java new file mode 100644 index 00000000..e66f0c30 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java @@ -0,0 +1,289 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; + +import com.google.common.collect.Sets; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.*; +import edu.harvard.hms.dbmi.avillach.hpds.data.storage.FileBackedStorageVariantMasksImpl; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.math.BigInteger; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class GenomicDatasetMerger { + + private static Logger log = LoggerFactory.getLogger(GenomicDatasetMerger.class); + + private final VariantStore variantStore1; + private final VariantStore variantStore2; + + private final Map infoStores1; + private final Map infoStores2; + + private final String outputDirectory; + + private final VariantStore mergedVariantStore; + + public GenomicDatasetMerger(VariantStore variantStore1, VariantStore variantStore2, Map infoStores1, Map infoStores2, String outputDirectory) { + this.variantStore1 = variantStore1; + this.variantStore2 = variantStore2; + this.mergedVariantStore = new VariantStore(); + this.infoStores1 = infoStores1; + this.infoStores2 = infoStores2; + this.outputDirectory = outputDirectory; + validate(); + } + + private void validate() { + if (!variantStore1.getVariantMaskStorage().keySet().equals(variantStore2.getVariantMaskStorage().keySet())) { + log.error("Variant store chromosomes do not match:"); + log.error(String.join(", ", variantStore1.getVariantMaskStorage().keySet())); + log.error(String.join(", ", variantStore2.getVariantMaskStorage().keySet())); + throw new IllegalStateException("Unable to merge variant stores with different numbers of chromosomes"); + } + Sets.SetView patientIntersection = Sets.intersection(Sets.newHashSet(variantStore1.getPatientIds()), Sets.newHashSet(variantStore2.getPatientIds())); + if (!patientIntersection.isEmpty()) { + throw new IllegalStateException("Cannot merge genomic datasets containing the same patient id"); + } + } + + public VariantStore mergeVariantStore(Map>> mergedChromosomeMasks) { + mergedVariantStore.setVariantMaskStorage(mergedChromosomeMasks); + mergedVariantStore.setPatientIds(mergePatientIds()); + return mergedVariantStore; + } + + /** + * Since both sets of variant indexes reference a different variant spec list, we need to re-index + * the values in the second set of variant indexes. + * + * For each variant in the second list of variants: + * If it exists in list one, update any references to it by index to it's index in list 1 + * Otherwise, append it to list one, and update any references to it by index to it's new position in list 1 + * + * Ex: + * + * variantIndex1 = ["chr1,1000,A,G", "chr1,1002,A,G", "chr1,2000,A,G"] + * variantIndex2 = ["chr1,1000,A,G", "chr1,1004,A,G", "chr1,3000,A,G"] + * + * GeneWithVariant_store1 = [0, 1] + * GeneWithVariant_store2 = [0, 1, 2] + * + * mergedVariantIndex = ["chr1,1000,A,G", "chr1,1002,A,G", "chr1,2000,A,G", "chr1,1004,A,G", "chr1,3000,A,G"] + * GeneWithVariant_merged = [0, 1, 3, 4] + * + * @return + * @throws IOException + */ + public Map mergeVariantIndexes() throws IOException { + String[] variantIndex1 = variantStore1.getVariantSpecIndex(); + String[] variantIndex2 = variantStore2.getVariantSpecIndex(); + + Map variantSpecToIndexMap = new HashMap<>(); + LinkedList variantSpecList = new LinkedList<>(Arrays.asList(variantIndex1)); + for (int i = 0; i < variantIndex1.length; i++) { + variantSpecToIndexMap.put(variantIndex1[i], i); + } + + // Will contain any re-mapped indexes in the second variant index. For example, if a variant is contained in both + // data sets, the merged data set will use the index from dataset 1 to reference it, and any references in data + // set 2 for this variant needs to be re-mapped. Likewise, if a variant in set 2 is new, it will be appended to + // the list and also need to be re-mapped + Integer[] remappedIndexes = new Integer[variantIndex2.length]; + + for (int i = 0; i < variantIndex2.length; i++) { + String variantSpec = variantIndex2[i]; + Integer variantIndex = variantSpecToIndexMap.get(variantSpec); + if (variantIndex != null) { + remappedIndexes[i] = variantIndex; + } else { + variantSpecList.add(variantSpec); + // the new index is the now last item in the list + int newVariantSpecIndex = variantSpecList.size() - 1; + remappedIndexes[i] = newVariantSpecIndex; + variantSpecToIndexMap.put(variantSpec, newVariantSpecIndex); + } + } + + Map mergedInfoStores = new HashMap<>(); + + if (!infoStores1.keySet().equals(infoStores2.keySet())) { + throw new IllegalStateException("Info stores do not match"); + } + for (Map.Entry infoStores1Entry : infoStores1.entrySet()) { + FileBackedByteIndexedInfoStore infoStore2 = infoStores2.get(infoStores1Entry.getKey()); + + FileBackedByteIndexedStorage allValuesStore1 = infoStores1Entry.getValue().getAllValues(); + FileBackedByteIndexedStorage allValuesStore2 = infoStore2.getAllValues(); + ConcurrentHashMap> mergedInfoStoreValues = new ConcurrentHashMap<>(); + + Sets.SetView allKeys = Sets.union(allValuesStore1.keys(), allValuesStore2.keys()); + for (String key : allKeys) { + Set store1Values = Set.of(allValuesStore1.getOrELse(key, new Integer[]{})); + Set store2Values = Set.of(allValuesStore2.getOrELse(key, new Integer[]{})); + Set remappedValuesStore2 = store2Values.stream().map(value -> remappedIndexes[value]).collect(Collectors.toSet()); + + Set mergedValues = Sets.union(store1Values, remappedValuesStore2); + mergedInfoStoreValues.put(key, new ConcurrentSkipListSet<>(mergedValues)); + } + + InfoStore infoStore = new InfoStore(infoStore2.description, null, infoStores1Entry.getKey()); + infoStore.allValues = mergedInfoStoreValues; + FileBackedByteIndexedInfoStore mergedStore = new FileBackedByteIndexedInfoStore(new File(outputDirectory), infoStore); + mergedInfoStores.put(infoStores1Entry.getKey(), mergedStore); + } + + mergedVariantStore.setVariantSpecIndex(variantSpecList.toArray(new String[0])); + return mergedInfoStores; + } + + /** + * Merge patient ids from both variant stores. We are simply appending patients from store 2 to patients from store 1 + * + * @return the merged patient ids + */ + private String[] mergePatientIds() { + return Stream.concat(Arrays.stream(variantStore1.getPatientIds()), Arrays.stream(variantStore2.getPatientIds())) + .toArray(String[]::new); + } + + /** + * For each chromosome, call mergeChromosomeMask to merge the masks + * @return + */ + public Map>> mergeChromosomeMasks() { + Map>> mergedMaskStorage = new ConcurrentHashMap<>(); + variantStore1.getVariantMaskStorage().keySet().parallelStream().forEach(chromosome -> { + try { + mergedMaskStorage.put(chromosome, mergeChromosomeMask(chromosome)); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + }); + return mergedMaskStorage; + } + + /** + * Merge the masks for a chormosome. The logic here is somewhat complex but straightforward, dealing with no values at various places in the maps. + * If a variant mask contains no matches (ie, is 000000...), it is not stored in the data. + * + * Examples: + * variantMaskStorage1: { + * 10001 -> { + * "chr22,10001031,A,G" -> "10101010", + * "chr22,10001143,G,A" -> "10101010" + * }, + * 10002 -> { + * "chr22,10002031,A,G" -> "10101010", + * "chr22,10002143,G,A" -> "10101010" + * } + * } + * variantMaskStorage2: { + * 10001 -> { + * "chr22,10001031,A,G" -> "00001111", + * "chr22,10001213,A,G" -> "00001111" + * }, + * 10003 -> { + * "chr22,10003031,A,G" -> "00001111", + * "chr22,10003213,A,G" -> "00001111" + * } + * } + * + * mergedVariantMaskStorage: { + * 10001 -> { + * "chr22,10001031,A,G" -> "1010101000001111", + * "chr22,10001213,A,G" -> "0000000000001111", + * "chr22,10001143,G,A" -> "1010101000000000" + * }, + * 10002 -> { + * "chr22,10002031,A,G" -> "1010101000000000", + * "chr22,10002143,G,A" -> "1010101000000000" + * } + * 10003 -> { + * "chr22,10003031,A,G" -> "0000000000001111", + * "chr22,10003213,A,G" -> "0000000000001111" + * } + * } + */ + public FileBackedJsonIndexStorage> mergeChromosomeMask(String chromosome) throws FileNotFoundException { + FileBackedJsonIndexStorage> variantMaskStorage1 = variantStore1.getVariantMaskStorage().get(chromosome); + FileBackedJsonIndexStorage> variantMaskStorage2 = variantStore2.getVariantMaskStorage().get(chromosome); + + FileBackedJsonIndexStorage> merged = new FileBackedStorageVariantMasksImpl(new File(outputDirectory + chromosome + "masks.bin")); + variantMaskStorage1.keys().forEach(key -> { + Map masks1 = variantMaskStorage1.get(key); + Map masks2 = variantMaskStorage2.get(key); + if (masks2 == null) { + masks2 = Map.of(); + } + + ConcurrentHashMap mergedMasks = new ConcurrentHashMap<>(); + for (Map.Entry entry : masks1.entrySet()) { + VariantMasks variantMasks2 = masks2.get(entry.getKey()); + if (variantMasks2 == null) { + // this will have all null masks, which will result in null when + // appended to a null, or be replaced with an empty bitmask otherwise + variantMasks2 = new VariantMasks(); + } + mergedMasks.put(entry.getKey(), append(entry.getValue(), variantMasks2)); + } + // Any entry in the second set that is not in the merged set can be merged with an empty variant mask, + // if there were a corresponding entry in set 1, it would have been merged in the previous loop + for (Map.Entry entry : masks2.entrySet()) { + if (!mergedMasks.containsKey(entry.getKey())) { + mergedMasks.put(entry.getKey(), append(new VariantMasks(), entry.getValue())); + } + } + merged.put(key, mergedMasks); + }); + + ConcurrentHashMap mergedMasks = new ConcurrentHashMap<>(); + variantMaskStorage2.keys().forEach(key -> { + Map masks2 = variantMaskStorage2.get(key); + for (Map.Entry entry : masks2.entrySet()) { + if (!mergedMasks.containsKey(entry.getKey())) { + mergedMasks.put(entry.getKey(), append(new VariantMasks(), entry.getValue())); + } + } + }); + return merged; + } + + public VariantMasks append(VariantMasks variantMasks1, VariantMasks variantMasks2) { + VariantMasks appendedMasks = new VariantMasks(); + appendedMasks.homozygousMask = appendMask(variantMasks1.homozygousMask, variantMasks2.homozygousMask); + appendedMasks.heterozygousMask = appendMask(variantMasks1.heterozygousMask, variantMasks2.heterozygousMask); + appendedMasks.homozygousNoCallMask = appendMask(variantMasks1.homozygousNoCallMask, variantMasks2.homozygousNoCallMask); + appendedMasks.heterozygousNoCallMask = appendMask(variantMasks1.heterozygousNoCallMask, variantMasks2.heterozygousNoCallMask); + return appendedMasks; + } + + /** + * Appends one mask to another. This assumes the masks are both padded with '11' on each end + * to prevent overflow issues. + */ + public BigInteger appendMask(BigInteger mask1, BigInteger mask2) { + if (mask1 == null && mask2 == null) { + return null; + } + if (mask1 == null) { + mask1 = variantStore1.emptyBitmask(); + } + if (mask2 == null) { + mask2 = variantStore2.emptyBitmask(); + } + String binaryMask1 = mask1.toString(2); + String binaryMask2 = mask2.toString(2); + String appendedString = binaryMask1.substring(0, binaryMask1.length() - 2) + + binaryMask2.substring(2); + return new BigInteger(appendedString, 2); + } +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java new file mode 100644 index 00000000..70565730 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java @@ -0,0 +1,81 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; + +import com.google.common.base.Preconditions; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.FileBackedByteIndexedInfoStore; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.GZIPInputStream; + +public class GenomicDatasetMergerRunner { + + private static Logger log = LoggerFactory.getLogger(GenomicDatasetMerger.class); + + public static final String INFO_STORE_JAVABIN_SUFFIX = "infoStore.javabin"; + public static final String VARIANT_SPEC_INDEX_FILENAME = "variantSpecIndex.javabin"; + + private static String genomicDirectory1; + private static String genomicDirectory2; + + /** + * args[0]: directory containing genomic dataset 1 + * args[1]: directory containing genomic dataset 2 + * args[2]: output directory + */ + public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + if (args.length != 3) { + throw new IllegalArgumentException("Three arguments must be provided: source directory 1, source directory 2, output directory"); + } + genomicDirectory1 = args[0]; + genomicDirectory2 = args[1]; + String outputDirectory = args[2]; + + Map infoStores1 = loadInfoStores(genomicDirectory1); + Map infoStores2 = loadInfoStores(genomicDirectory2); + + GenomicDatasetMerger genomicDatasetMerger = new GenomicDatasetMerger(VariantStore.readInstance(genomicDirectory1),VariantStore.readInstance(genomicDirectory2), infoStores1, infoStores2, outputDirectory); + + Map>> mergedChromosomeMasks = genomicDatasetMerger.mergeChromosomeMasks(); + VariantStore mergedVariantStore = genomicDatasetMerger.mergeVariantStore(mergedChromosomeMasks); + Map variantIndexes = genomicDatasetMerger.mergeVariantIndexes(); + + mergedVariantStore.writeInstance(outputDirectory); + variantIndexes.values().forEach(variantIndex -> { + variantIndex.write(new File(outputDirectory + variantIndex.column_key + "_" + INFO_STORE_JAVABIN_SUFFIX)); + }); + } + + private static Map loadInfoStores(String directory) { + Map infoStores = new HashMap<>(); + File genomicDataDirectory = new File(directory); + if(genomicDataDirectory.exists() && genomicDataDirectory.isDirectory()) { + Arrays.stream(genomicDataDirectory.list((file, filename)->{return filename.endsWith(INFO_STORE_JAVABIN_SUFFIX);})) + .forEach((String filename)->{ + try ( + FileInputStream fis = new FileInputStream(directory + filename); + GZIPInputStream gis = new GZIPInputStream(fis); + ObjectInputStream ois = new ObjectInputStream(gis) + ){ + log.info("loading " + filename); + FileBackedByteIndexedInfoStore infoStore = (FileBackedByteIndexedInfoStore) ois.readObject(); + infoStore.updateStorageDirectory(genomicDataDirectory); + infoStores.put(filename.replace("_" + INFO_STORE_JAVABIN_SUFFIX, ""), infoStore); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + }); + } + return infoStores; + } +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/MultialleleCounter.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/MultialleleCounter.java deleted file mode 100644 index 758e772f..00000000 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/MultialleleCounter.java +++ /dev/null @@ -1,53 +0,0 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; - -import java.io.*; -import java.util.ArrayList; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.zip.GZIPInputStream; - -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks; -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantSpec; -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; -import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage; - -public class MultialleleCounter { - - public static void main(String[] args) throws ClassNotFoundException, FileNotFoundException, IOException { - try(FileInputStream fis = new FileInputStream("/opt/local/hpds/all/variantStore.javabin"); - ){ - VariantStore variantStore = (VariantStore) new ObjectInputStream(new GZIPInputStream(fis)).readObject(); - variantStore.open(); - for(String contig : variantStore.getVariantMaskStorage().keySet()) { - System.out.println("Starting contig : " + contig); - FileBackedByteIndexedStorage> - currentChromosome = variantStore.getVariantMaskStorage().get(contig); - currentChromosome.keys().parallelStream().forEach((offsetBucket)->{ - System.out.println("Starting bucket : " + offsetBucket); - ConcurrentHashMap maskMap; - try { - maskMap = currentChromosome.get(offsetBucket); - - TreeSet variantsSortedByOffset = new TreeSet(); - for(String variant : maskMap.keySet()) { - variantsSortedByOffset.add(new VariantSpec(variant)); - } - ArrayList variantsSortedByOffsetList = new ArrayList(variantsSortedByOffset); - for(int y = 1; y zygosityMaskStrings; - private static TreeMap>> variantMaskStorage = new TreeMap<>(); + private static TreeMap>> variantMaskStorage = new TreeMap<>(); private static long startTime; @@ -152,7 +158,7 @@ private static void loadVCFs(File indexFile) throws IOException { try { walker.nextLine(); } catch (IOException e) { - logger.error("Error reading nextline of VCF file [" + walker.vcfIndexLine.vcfPath + "]", e); + throw new UncheckedIOException(e); } }); zygosityMaskStrings.put(currentSpecNotation, maskStringsForVariantSpec[0]); @@ -165,8 +171,6 @@ private static void loadVCFs(File indexFile) throws IOException { shutdownChunkWriteExecutor(); - saveVariantStore(store, variantMaskStorage); - saveInfoStores(); splitInfoStoresByColumn(); @@ -185,22 +189,18 @@ private static void loadVCFs(File indexFile) throws IOException { chunkIds.addAll(chromosomeStorage.keys()); for (Integer chunkId : chunkIds) { for (String variantSpec : chromosomeStorage.get(chunkId).keySet()) { - try { - count[0]++; - VariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec); - if (variantMasks != null) { - BigInteger heterozygousMask = variantMasks.heterozygousMask; - String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask); - BigInteger homozygousMask = variantMasks.homozygousMask; - String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask); - - if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000) - logger.debug(variantSpec + " : heterozygous : " + heteroIdList); - if (!homoIdList.isEmpty() && homoIdList.length() < 1000) - logger.debug(variantSpec + " : homozygous : " + homoIdList); - } - } catch (IOException e) { - logger.error("an error occurred", e); + count[0]++; + VariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec); + if (variantMasks != null) { + BigInteger heterozygousMask = variantMasks.heterozygousMask; + String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask); + BigInteger homozygousMask = variantMasks.homozygousMask; + String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask); + + if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000) + logger.debug(variantSpec + " : heterozygous : " + heteroIdList); + if (!homoIdList.isEmpty() && homoIdList.length() < 1000) + logger.debug(variantSpec + " : homozygous : " + homoIdList); } } if (count[0] > 50) @@ -211,22 +211,18 @@ private static void loadVCFs(File indexFile) throws IOException { for (int x = chunkIds.size() - 1; x > 0; x--) { int chunkId = chunkIds.get(x); chromosomeStorage.get(chunkId).keySet().forEach((variantSpec) -> { - try { - count[0]++; - VariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec); - if (variantMasks != null) { - BigInteger heterozygousMask = variantMasks.heterozygousMask; - String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask); - BigInteger homozygousMask = variantMasks.homozygousMask; - String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask); - - if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000) - logger.debug(variantSpec + " : heterozygous : " + heteroIdList); - if (!homoIdList.isEmpty() && homoIdList.length() < 1000) - logger.debug(variantSpec + " : homozygous : " + homoIdList); - } - } catch (IOException e) { - logger.error("an error occurred", e); + count[0]++; + VariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec); + if (variantMasks != null) { + BigInteger heterozygousMask = variantMasks.heterozygousMask; + String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask); + BigInteger homozygousMask = variantMasks.homozygousMask; + String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask); + + if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000) + logger.debug(variantSpec + " : heterozygous : " + heteroIdList); + if (!homoIdList.isEmpty() && homoIdList.length() < 1000) + logger.debug(variantSpec + " : homozygous : " + homoIdList); } }); if (count[0] > 50) @@ -235,6 +231,9 @@ private static void loadVCFs(File indexFile) throws IOException { } } } + + store.setVariantSpecIndex(variantIndexBuilder.getVariantSpecIndex().toArray(new String[0])); + saveVariantStore(store, variantMaskStorage); } private static String sampleIdsForMask(String[] sampleIds, BigInteger heterozygousMask) { @@ -279,7 +278,7 @@ private static void flipChunk(String lastContigProcessed, int lastChunkProcessed } if (!currentContig.contentEquals(lastContigProcessed) || currentChunk > lastChunkProcessed || isLastChunk) { // flip chunk - TreeMap>> variantMaskStorage_f = variantMaskStorage; + TreeMap>> variantMaskStorage_f = variantMaskStorage; HashMap zygosityMaskStrings_f = zygosityMaskStrings; String lastContigProcessed_f = lastContigProcessed; int lastChunkProcessed_f = lastChunkProcessed; @@ -287,17 +286,17 @@ private static void flipChunk(String lastContigProcessed, int lastChunkProcessed try { if (variantMaskStorage_f.get(lastContigProcessed_f) == null) { String fileName = lastContigProcessed_f + "masks.bin"; - if ("chr".startsWith(fileName)) { + if (!fileName.startsWith("chr")) { fileName = "chr" + fileName; } + variantMaskStorage_f.put(lastContigProcessed_f, - new FileBackedByteIndexedStorage(Integer.class, ConcurrentHashMap.class, - new File(storageDir, fileName))); + new FileBackedStorageVariantMasksImpl(new File(storageDir, fileName))); } variantMaskStorage_f.get(lastContigProcessed_f).put(lastChunkProcessed_f, convertLoadingMapToMaskMap(zygosityMaskStrings_f)); } catch (IOException e) { - logger.error("an error occurred", e); + throw new UncheckedIOException(e); } }); if (lastChunkProcessed % 100 == 0) { @@ -309,7 +308,7 @@ private static void flipChunk(String lastContigProcessed, int lastChunkProcessed } private static void saveVariantStore(VariantStore store, - TreeMap>> variantMaskStorage) + TreeMap>> variantMaskStorage) throws IOException, FileNotFoundException { store.setVariantMaskStorage(variantMaskStorage); for (FileBackedByteIndexedStorage> storage : variantMaskStorage @@ -317,11 +316,7 @@ private static void saveVariantStore(VariantStore store, if (storage != null) storage.complete(); } - try (FileOutputStream fos = new FileOutputStream(new File(storageDir, "variantStore.javabin")); - GZIPOutputStream gzos = new GZIPOutputStream(fos); - ObjectOutputStream oos = new ObjectOutputStream(gzos);) { - oos.writeObject(store); - } + store.writeInstance(storageDirStr); logger.debug("Done saving variant masks."); } @@ -340,8 +335,8 @@ public static void splitInfoStoresByColumn() throws FileNotFoundException, IOExc logger.debug("Splitting" + (System.currentTimeMillis() - startTime) + " seconds"); try { VCFPerPatientInfoStoreSplitter.splitAll(storageDir, new File(mergedDirStr)); - } catch (ClassNotFoundException | InterruptedException | ExecutionException e) { - logger.error("Error splitting infostore's by column", e); + } catch (ClassNotFoundException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); } logger.debug("Split" + (System.currentTimeMillis() - startTime) + " seconds"); } @@ -350,8 +345,8 @@ public static void convertInfoStoresToByteIndexed() throws FileNotFoundException logger.debug("Converting" + (System.currentTimeMillis() - startTime) + " seconds"); try { VCFPerPatientInfoStoreToFBBIISConverter.convertAll(mergedDirStr, storageDirStr); - } catch (ClassNotFoundException | InterruptedException | ExecutionException e) { - logger.error("Error converting infostore to byteindexed", e); + } catch (ClassNotFoundException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); } logger.debug("Converted " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); } @@ -370,7 +365,7 @@ private static void shutdownChunkWriteExecutor() { private static ConcurrentHashMap convertLoadingMapToMaskMap( HashMap zygosityMaskStrings_f) { - ConcurrentHashMap maskMap = new ConcurrentHashMap<>(); + ConcurrentHashMap maskMap = new ConcurrentHashMap<>(zygosityMaskStrings_f.size()); zygosityMaskStrings_f.entrySet().parallelStream().forEach((entry) -> { maskMap.put(entry.getKey(), new VariantMasks(entry.getValue())); }); @@ -449,7 +444,7 @@ public void updateRecords(char[][] zygosityMaskStrings, ConcurrentHashMap { - infoStore.processRecord(currentSpecNotation, infoColumns); + infoStore.processRecord(variantIndex, infoColumns); }); } @@ -650,7 +646,7 @@ private static List parseVCFIndex(File vcfIndexFile) { } }); } catch (IOException e) { - throw new RuntimeException("IOException caught parsing vcfIndexFile", e); + throw new UncheckedIOException("IOException caught parsing vcfIndexFile", e); } return new ArrayList<>(vcfSet); } diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFPerPatientInfoStoreToFBBIISConverter.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFPerPatientInfoStoreToFBBIISConverter.java index 2b2b6053..cdfe3cdd 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFPerPatientInfoStoreToFBBIISConverter.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFPerPatientInfoStoreToFBBIISConverter.java @@ -5,7 +5,6 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +40,7 @@ public static void convert(File outputFolder, File file) { if (store.allValues.size() > 0) { FileBackedByteIndexedInfoStore fbbiis = new FileBackedByteIndexedInfoStore(outputFolder, store); - writeStore(new File(outputFolder, file.getName()), fbbiis); + fbbiis.write(new File(outputFolder, file.getName())); logger.info("Completed converting InfoStore file: " + file.getAbsolutePath()); } else { logger.info("Skipping empty InfoStore file: " + file.getAbsolutePath() + ""); @@ -55,18 +54,4 @@ public static void convert(File outputFolder, File file) { } } - private static synchronized void writeStore(File outputFile, FileBackedByteIndexedInfoStore fbbiis) - throws FileNotFoundException, IOException { - FileOutputStream fos = new FileOutputStream(outputFile); - GZIPOutputStream gzos = new GZIPOutputStream(fos); - ObjectOutputStream oos = new ObjectOutputStream(gzos); - oos.writeObject(fbbiis); - oos.flush(); - oos.close(); - gzos.flush(); - gzos.close(); - fos.flush(); - fos.close(); - } - } \ No newline at end of file diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantCounter.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantCounter.java deleted file mode 100644 index baee8fd6..00000000 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantCounter.java +++ /dev/null @@ -1,41 +0,0 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; - -import java.io.*; -import java.util.ArrayList; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.zip.GZIPInputStream; - -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks; -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantSpec; -import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; -import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage; - -public class VariantCounter { - - public static void main(String[] args) throws ClassNotFoundException, FileNotFoundException, IOException { - try(FileInputStream fis = new FileInputStream("/opt/local/hpds/all/variantStore.javabin"); - ){ - VariantStore variantStore = (VariantStore) new ObjectInputStream(new GZIPInputStream(fis)).readObject(); - variantStore.open(); - for(String contig : variantStore.getVariantMaskStorage().keySet()) { - int[] countOfVariants = {0}; - FileBackedByteIndexedStorage> - currentChromosome = variantStore.getVariantMaskStorage().get(contig); - currentChromosome.keys().parallelStream().forEach((offsetBucket)->{ - ConcurrentHashMap maskMap; - try { - maskMap = currentChromosome.get(offsetBucket); - if(maskMap!=null) { - countOfVariants[0]+=maskMap.size(); - } - - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - System.out.println(contig + "," + countOfVariants[0]); - } - } - } -} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantIndexBuilder.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantIndexBuilder.java new file mode 100644 index 00000000..71fec959 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantIndexBuilder.java @@ -0,0 +1,30 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class VariantIndexBuilder { + + private static Logger logger = LoggerFactory.getLogger(VariantIndexBuilder.class); + + private final LinkedList variantSpecIndex = new LinkedList<>(); + private final Map variantSpecToIndexMap = new ConcurrentHashMap<>(); + + public synchronized Integer getIndex(String variantSpec) { + Integer variantIndex = variantSpecToIndexMap.get(variantSpec); + if (variantIndex == null) { + variantIndex = variantSpecIndex.size(); + variantSpecIndex.add(variantSpec); + variantSpecToIndexMap.put(variantSpec, variantIndex); + } + return variantIndex; + } + + public LinkedList getVariantSpecIndex() { + return variantSpecIndex; + } +} diff --git a/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySampleTest.java b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySampleTest.java index 69fbfae3..b7474ef9 100644 --- a/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySampleTest.java +++ b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySampleTest.java @@ -75,12 +75,8 @@ public class BucketIndexBySampleTest { public static void initializeBinfile() throws Exception { //load variant data NewVCFLoader.main(new String[] {VCF_INDEX_FILE, STORAGE_DIR, MERGED_DIR}); - - //read in variantStore object created by VCFLoader - ObjectInputStream ois = new ObjectInputStream(new GZIPInputStream(new FileInputStream(STORAGE_DIR + "variantStore.javabin"))); - variantStore = (VariantStore) ois.readObject(); - ois.close(); - variantStore.open(); + + VariantStore variantStore = VariantStore.readInstance(STORAGE_DIR); //now use that object to initialize the BucketIndexBySample object bucketIndexBySample = new BucketIndexBySample(variantStore, STORAGE_DIR); diff --git a/pom.xml b/pom.xml index ab958be4..df3ca3fb 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 edu.harvard.hms.dbmi.avillach.hpds pic-sure-hpds - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT pom pic-sure-hpds diff --git a/processing/pom.xml b/processing/pom.xml index e1da83c2..f989a9d9 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -5,7 +5,7 @@ pic-sure-hpds edu.harvard.hms.dbmi.avillach.hpds - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT processing diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java index e2f6c44a..95258704 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java @@ -126,6 +126,7 @@ public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, VariantService v ){ log.info("loading " + filename); FileBackedByteIndexedInfoStore infoStore = (FileBackedByteIndexedInfoStore) ois.readObject(); + infoStore.updateStorageDirectory(genomicDataDirectory); infoStores.put(filename.replace("_infoStore.javabin", ""), infoStore); ois.close(); } catch (IOException e) { diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantIndexCache.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantIndexCache.java index 2ca6cdfe..a08ae2fd 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantIndexCache.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantIndexCache.java @@ -67,23 +67,21 @@ public VariantIndex load(String infoColumn_valueKey) throws IOException { log.debug("Calculating value for cache for key " + infoColumn_valueKey); long time = System.currentTimeMillis(); String[] column_and_value = infoColumn_valueKey.split(COLUMN_AND_KEY_DELIMITER); - String[] variantArray = infoStores.get(column_and_value[0]).getAllValues().get(column_and_value[1]); + Integer[] variantIndexIntArray = infoStores.get(column_and_value[0]).getAllValues().get(column_and_value[1]); - if ((double)variantArray.length / (double)variantIndex.length < MAX_SPARSE_INDEX_RATIO ) { + if ((double)variantIndexIntArray.length / (double)variantIndex.length < MAX_SPARSE_INDEX_RATIO ) { Set variantIds = new HashSet<>(); - for(String variantSpec : variantArray) { - int variantIndexArrayIndex = Arrays.binarySearch(variantIndex, variantSpec); - variantIds.add(variantIndexArrayIndex); + for(Integer variantIndex : variantIndexIntArray) { + variantIds.add(variantIndex); } return new SparseVariantIndex(variantIds); } else { boolean[] variantIndexArray = new boolean[variantIndex.length]; int x = 0; - for(String variantSpec : variantArray) { - int variantIndexArrayIndex = Arrays.binarySearch(variantIndex, variantSpec); + for(Integer variantIndex : variantIndexIntArray) { // todo: shouldn't this be greater than or equal to 0? 0 is a valid index - if (variantIndexArrayIndex > 0) { - variantIndexArray[variantIndexArrayIndex] = true; + if (variantIndex > 0) { + variantIndexArray[variantIndex] = true; } } log.debug("Cache value for key " + infoColumn_valueKey + " calculated in " + (System.currentTimeMillis() - time) + " ms"); diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantService.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantService.java index 616d6f0f..f8f99c0b 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantService.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantService.java @@ -5,6 +5,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder; import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -12,7 +13,6 @@ import java.io.*; import java.math.BigInteger; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -54,13 +54,13 @@ public Collection filterVariantSetForPatientSet(Set variantSet, } } - public VariantService() throws IOException, ClassNotFoundException, InterruptedException { - genomicDataDirectory = System.getProperty("HPDS_DATA_HOME", "/opt/local/hpds/all/"); + public VariantService() { + genomicDataDirectory = System.getProperty("HPDS_GENOMIC_DATA_DIRECTORY", "/opt/local/hpds/all/"); VARIANT_INDEX_FBBIS_STORAGE_FILE = genomicDataDirectory + "variantIndex_fbbis_storage.javabin"; VARIANT_INDEX_FBBIS_FILE = genomicDataDirectory + "variantIndex_fbbis.javabin"; BUCKET_INDEX_BY_SAMPLE_FILE = genomicDataDirectory + "BucketIndexBySample.javabin"; - variantStore = VariantStore.deserializeInstance(genomicDataDirectory); + variantStore = loadVariantStore(); try { loadGenomicCacheFiles(); } catch (Exception e) { @@ -68,66 +68,29 @@ public VariantService() throws IOException, ClassNotFoundException, InterruptedE } } - public void populateVariantIndex() throws InterruptedException { + private VariantStore loadVariantStore() { + VariantStore variantStore; + try { + variantStore = VariantStore.readInstance(genomicDataDirectory); + } catch (Exception e) { + variantStore = new VariantStore(); + variantStore.setPatientIds(new String[0]); + log.warn("Unable to load variant store"); + } + return variantStore; + } + + public String[] loadVariantIndex() { //skip if we have no variants if(variantStore.getPatientIds().length == 0) { - variantIndex = new String[0]; log.warn("No Genomic Data found. Skipping variant Indexing"); - return; - } - int[] numVariants = {0}; - HashMap contigMap = new HashMap<>(); - - ExecutorService ex = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - variantStore.getVariantMaskStorage().entrySet().forEach(entry->{ - ex.submit(()->{ - int numVariantsInContig = 0; - FileBackedByteIndexedStorage> storage = entry.getValue(); - HashMap bucketMap = new HashMap<>(); - log.info("Creating bucketMap for contig " + entry.getKey()); - for(Integer bucket: storage.keys()){ - try { - ConcurrentHashMap bucketStorage = storage.get(bucket); - numVariantsInContig += bucketStorage.size(); - bucketMap.put(bucket, bucketStorage.keySet().toArray(new String[0])); - } catch (IOException e) { - log.error("an error occurred", e); - } - }; - log.info("Completed bucketMap for contig " + entry.getKey()); - String[] variantsInContig = new String[numVariantsInContig]; - int current = 0; - for(String[] bucketList : bucketMap.values()) { - System.arraycopy(bucketList, 0, variantsInContig, current, bucketList.length); - current = current + bucketList.length; - } - bucketMap.clear(); - synchronized(numVariants) { - log.info("Found " + variantsInContig.length + " variants in contig " + entry.getKey() + "."); - contigMap.put(entry.getKey(), variantsInContig); - numVariants[0] += numVariantsInContig; - } - }); - }); - ex.shutdown(); - while(!ex.awaitTermination(10, TimeUnit.SECONDS)) { - Thread.sleep(20000); - log.info("Awaiting completion of variant index"); + return new String[0]; } - log.info("Found " + numVariants[0] + " total variants."); - - variantIndex = new String[numVariants[0]]; + String[] variantIndex = VariantStore.loadVariantIndexFromFile(genomicDataDirectory); - int current = 0; - for(String[] contigList : contigMap.values()) { - System.arraycopy(contigList, 0, variantIndex, current, contigList.length); - current = current + contigList.length; - } - contigMap.clear(); - - Arrays.sort(variantIndex); log.info("Index created with " + variantIndex.length + " total variants."); + return variantIndex; } /** @@ -141,9 +104,9 @@ private void loadGenomicCacheFiles() throws FileNotFoundException, IOException, if(variantIndex==null) { if(!new File(VARIANT_INDEX_FBBIS_FILE).exists()) { log.info("Creating new " + VARIANT_INDEX_FBBIS_FILE); - populateVariantIndex(); + this.variantIndex = loadVariantIndex(); FileBackedByteIndexedStorage fbbis = - new FileBackedByteIndexedStorage(Integer.class, String[].class, new File(VARIANT_INDEX_FBBIS_STORAGE_FILE)); + new FileBackedJavaIndexedStorage<>(Integer.class, String[].class, new File(VARIANT_INDEX_FBBIS_STORAGE_FILE)); try (ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(VARIANT_INDEX_FBBIS_FILE))); ){ @@ -182,17 +145,10 @@ private void loadGenomicCacheFiles() throws FileNotFoundException, IOException, for( int i = 0; i < bucketCount; i++) { final int _i = i; - ex.submit(new Runnable() { - @Override - public void run() { - try { - String[] variantIndexBucket = indexStore.get(_i); - System.arraycopy(variantIndexBucket, 0, _varaiantIndex2, (_i * VARIANT_INDEX_BLOCK_SIZE), variantIndexBucket.length); - log.info("loaded " + (_i * VARIANT_INDEX_BLOCK_SIZE) + " block"); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } + ex.submit(() -> { + String[] variantIndexBucket = indexStore.get(_i); + System.arraycopy(variantIndexBucket, 0, _varaiantIndex2, (_i * VARIANT_INDEX_BLOCK_SIZE), variantIndexBucket.length); + log.info("loaded " + (_i * VARIANT_INDEX_BLOCK_SIZE) + " block"); }); } objectInputStream.close(); diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessorTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessorTest.java index e22bea5e..9a9e63b4 100644 --- a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessorTest.java +++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessorTest.java @@ -42,12 +42,12 @@ public class AbstractProcessorTest { @Before public void setup() { FileBackedByteIndexedInfoStore mockInfoStore = mock(FileBackedByteIndexedInfoStore.class); - FileBackedByteIndexedStorage mockIndexedStorage = mock(FileBackedByteIndexedStorage.class); + FileBackedByteIndexedStorage mockIndexedStorage = mock(FileBackedByteIndexedStorage.class); when(mockIndexedStorage.keys()).thenReturn(new HashSet<>(EXAMPLE_GENES_WITH_VARIANT)); when(mockInfoStore.getAllValues()).thenReturn(mockIndexedStorage); FileBackedByteIndexedInfoStore mockInfoStore2 = mock(FileBackedByteIndexedInfoStore.class); - FileBackedByteIndexedStorage mockIndexedStorage2 = mock(FileBackedByteIndexedStorage.class); + FileBackedByteIndexedStorage mockIndexedStorage2 = mock(FileBackedByteIndexedStorage.class); when(mockIndexedStorage2.keys()).thenReturn(new HashSet<>(EXAMPLE_VARIANT_SEVERITIES)); when(mockInfoStore2.getAllValues()).thenReturn(mockIndexedStorage2); diff --git a/service/pom.xml b/service/pom.xml index 734e3a94..728b3af3 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -5,7 +5,7 @@ pic-sure-hpds edu.harvard.hms.dbmi.avillach.hpds - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT service diff --git a/war/pom.xml b/war/pom.xml index e71e6efa..8decd8b5 100644 --- a/war/pom.xml +++ b/war/pom.xml @@ -6,7 +6,7 @@ pic-sure-hpds edu.harvard.hms.dbmi.avillach.hpds - 1.0-SNAPSHOT + 2.0.0-SNAPSHOT hpds-war war