diff --git a/client-api/pom.xml b/client-api/pom.xml
index ef02dcbb..3746ec8e 100644
--- a/client-api/pom.xml
+++ b/client-api/pom.xml
@@ -4,12 +4,12 @@
pic-sure-hpds
edu.harvard.hms.dbmi.avillach.hpds
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
edu.harvard.hms.dbmi.avillach.hpds
client-api
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
client-api
diff --git a/common/pom.xml b/common/pom.xml
index 206c8a84..a92292d4 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -6,7 +6,7 @@
pic-sure-hpds
edu.harvard.hms.dbmi.avillach.hpds
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
common
@@ -21,5 +21,13 @@
com.google.guava
guava
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
diff --git a/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedByteIndexedStorage.java b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedByteIndexedStorage.java
index 80db84d8..1ecc466e 100644
--- a/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedByteIndexedStorage.java
+++ b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedByteIndexedStorage.java
@@ -1,28 +1,17 @@
package edu.harvard.hms.dbmi.avillach.hpds.storage;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
+import java.io.*;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-
-public class FileBackedByteIndexedStorage implements Serializable {
+public abstract class FileBackedByteIndexedStorage implements Serializable {
private static final long serialVersionUID = -7297090745384302635L;
- private transient RandomAccessFile storage;
- private ConcurrentHashMap index;
- private File storageFile;
- private boolean completed = false;
- private Long maxStorageSize; //leave this in to not break serialization
+ protected transient RandomAccessFile storage;
+ protected ConcurrentHashMap index;
+ protected File storageFile;
+ protected boolean completed = false;
+
public FileBackedByteIndexedStorage(Class keyClass, Class valueClass, File storageFile) throws FileNotFoundException {
this.index = new ConcurrentHashMap();
@@ -30,15 +19,34 @@ public FileBackedByteIndexedStorage(Class keyClass, Class valueClass, File
this.storage = new RandomAccessFile(this.storageFile, "rw");
}
+ public void updateStorageDirectory(File storageDirectory) {
+ if (!storageDirectory.isDirectory()) {
+ throw new IllegalArgumentException("storageDirectory is not a directory");
+ }
+ String currentStoreageFilename = storageFile.getName();
+ storageFile = new File(storageDirectory, currentStoreageFilename);
+ }
+
public Set keys(){
return index.keySet();
}
- public void put(K key, V value) throws IOException {
+ public void put(K key, V value) {
if(completed) {
throw new RuntimeException("A completed FileBackedByteIndexedStorage cannot be modified.");
}
- Long[] recordIndex = store(value);
+ Long[] recordIndex;
+ try (ByteArrayOutputStream out = writeObject(value)) {
+ recordIndex = new Long[2];
+ synchronized (storage) {
+ storage.seek(storage.length());
+ recordIndex[0] = storage.getFilePointer();
+ storage.write(out.toByteArray());
+ recordIndex[1] = storage.getFilePointer() - recordIndex[0];
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
index.put(key, recordIndex);
}
@@ -63,60 +71,44 @@ public void complete() {
this.completed = true;
}
- public boolean isComplete() {
- return this.completed;
- }
-
- private Long[] store(V value) throws IOException {
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(out));
- oos.writeObject(value);
- oos.flush();
- oos.close();
-
- Long[] recordIndex = new Long[2];
- synchronized(storage) {
- storage.seek(storage.length());
- recordIndex[0] = storage.getFilePointer();
- storage.write(out.toByteArray());
- recordIndex[1] = storage.getFilePointer() - recordIndex[0];
-// maxStorageSize = storage.getFilePointer();
- }
- return recordIndex;
- }
-
- public V get(K key) throws IOException {
- if(this.storage==null) {
+ public V get(K key) {
+ try {
+ // todo: make this class immutable and remove this lock/check altogether
synchronized(this) {
- this.open();
- }
- }
- Long[] offsetsInStorage = index.get(key);
- if(offsetsInStorage != null) {
- Long offsetInStorage = index.get(key)[0];
- int offsetLength = index.get(key)[1].intValue();
- if(offsetInStorage != null && offsetLength>0) {
- byte[] buffer = new byte[offsetLength];
- synchronized(storage) {
- storage.seek(offsetInStorage);
- storage.readFully(buffer);
+ if(this.storage==null) {
+ this.open();
}
- ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(buffer)));
-
- try {
- V readObject = (V) in.readObject();
+ }
+ Long[] offsetsInStorage = index.get(key);
+ if(offsetsInStorage != null) {
+ Long offsetInStorage = index.get(key)[0];
+ int offsetLength = index.get(key)[1].intValue();
+ if(offsetInStorage != null && offsetLength>0) {
+ byte[] buffer = new byte[offsetLength];
+ synchronized(storage) {
+ storage.seek(offsetInStorage);
+ storage.readFully(buffer);
+ }
+ V readObject = readObject(buffer);
return readObject;
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("This should never happen.");
- } finally {
- in.close();
+ }else {
+ return null;
}
- }else {
+ } else {
return null;
}
- } else {
- return null;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
}
+
+ protected abstract V readObject(byte[] buffer);
+
+ protected abstract ByteArrayOutputStream writeObject(V value) throws IOException;
+
+ public V getOrELse(K key, V defaultValue) {
+ V result = get(key);
+ return result == null ? defaultValue : result;
+ }
+
}
diff --git a/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJavaIndexedStorage.java b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJavaIndexedStorage.java
new file mode 100644
index 00000000..26968729
--- /dev/null
+++ b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJavaIndexedStorage.java
@@ -0,0 +1,32 @@
+package edu.harvard.hms.dbmi.avillach.hpds.storage;
+
+import java.io.*;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class FileBackedJavaIndexedStorage extends FileBackedByteIndexedStorage {
+ public FileBackedJavaIndexedStorage(Class keyClass, Class valueClass, File storageFile) throws FileNotFoundException {
+ super(keyClass, valueClass, storageFile);
+ }
+
+ protected ByteArrayOutputStream writeObject(V value) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(out));
+ oos.writeObject(value);
+ oos.flush();
+ oos.close();
+ return out;
+ }
+
+ @Override
+ protected V readObject(byte[] buffer) {
+ try (ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(buffer)));) {
+ V readObject = (V) in.readObject();
+ return readObject;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJsonIndexStorage.java b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJsonIndexStorage.java
new file mode 100644
index 00000000..f8f97c6f
--- /dev/null
+++ b/common/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/storage/FileBackedJsonIndexStorage.java
@@ -0,0 +1,40 @@
+package edu.harvard.hms.dbmi.avillach.hpds.storage;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.*;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public abstract class FileBackedJsonIndexStorage extends FileBackedByteIndexedStorage {
+ private static final long serialVersionUID = -1086729119489479152L;
+
+ protected transient ObjectMapper objectMapper = new ObjectMapper();
+
+ public FileBackedJsonIndexStorage(File storageFile) throws FileNotFoundException {
+ super(null, null, storageFile);
+ }
+
+ protected ByteArrayOutputStream writeObject(V value) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ objectMapper.writeValue(new GZIPOutputStream(out), value);
+ return out;
+ }
+
+ protected V readObject(byte[] buffer) {
+ try {
+ return objectMapper.readValue(new GZIPInputStream(new ByteArrayInputStream(buffer)), getTypeReference());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Required to populate the objectMapper on deserialization
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ objectMapper = new ObjectMapper();
+ }
+
+ public abstract TypeReference getTypeReference();
+}
diff --git a/data/pom.xml b/data/pom.xml
index 298160fc..c2b27598 100644
--- a/data/pom.xml
+++ b/data/pom.xml
@@ -5,7 +5,7 @@
pic-sure-hpds
edu.harvard.hms.dbmi.avillach.hpds
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
data
diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java
index 46723234..ade0cfce 100644
--- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java
+++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java
@@ -6,6 +6,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,22 +87,18 @@ public BucketIndexBySample(VariantStore variantStore, String storageDir) throws
// Create a bitmask with 1 values for each patient who has any variant in this bucket
BigInteger[] patientMaskForBucket = {variantStore.emptyBitmask()};
- try {
- contigStore.get(bucket).values().forEach((VariantMasks masks)->{
- if(masks.heterozygousMask!=null) {
- patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.heterozygousMask);
- }
- //add hetreo no call bits to mask
- if(masks.heterozygousNoCallMask!=null) {
- patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.heterozygousNoCallMask);
- }
- if(masks.homozygousMask!=null) {
- patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.homozygousMask);
- }
- });
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ contigStore.get(bucket).values().forEach((VariantMasks masks)->{
+ if(masks.heterozygousMask!=null) {
+ patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.heterozygousMask);
+ }
+ //add hetreo no call bits to mask
+ if(masks.heterozygousNoCallMask!=null) {
+ patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.heterozygousNoCallMask);
+ }
+ if(masks.homozygousMask!=null) {
+ patientMaskForBucket[0] = patientMaskForBucket[0].or(masks.homozygousMask);
+ }
+ });
// For each patient set the patientBucketCharMask entry to 0 or 1 if they have a variant in the bucket.
int maxIndex = patientMaskForBucket[0].bitLength() - 1;
@@ -121,37 +118,17 @@ public BucketIndexBySample(VariantStore variantStore, String storageDir) throws
});
// populate patientBucketMasks with bucketMasks for each patient
- patientBucketMasks = new FileBackedByteIndexedStorage(Integer.class, BigInteger.class, new File(storageFileStr));
+ patientBucketMasks = new FileBackedJavaIndexedStorage(Integer.class, BigInteger.class, new File(storageFileStr));
- //the process to write out the bucket masks takes a very long time.
- //Lets spin up another thread that occasionally logs progress
int[] processedPatients = new int[1];
- processedPatients[0] = 0;
- new Thread(new Runnable() {
- @Override
- public void run() {
- log.info("writing patient bucket masks to backing store (this may take some time).");
- while(!patientBucketMasks.isComplete()) {
- try {
- Thread.sleep(5 * 1000 * 60); //log a message every 5 minutes
- } catch (InterruptedException e) {
- log.error("Thread interrupted", e);
- }
- log.info("wrote " + processedPatients[0] + " patient bucket masks");
- }
- }
- }).start();
-
patientIds.parallelStream().forEach((patientId)->{
- try {
- BigInteger patientMask = new BigInteger(new String(patientBucketCharMasks[patientIds.indexOf(patientId)]),2);
- patientBucketMasks.put(patientId, patientMask);
- }catch(NumberFormatException e) {
- log.error("NFE caught for " + patientId, e);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ BigInteger patientMask = new BigInteger(new String(patientBucketCharMasks[patientIds.indexOf(patientId)]),2);
+ patientBucketMasks.put(patientId, patientMask);
processedPatients[0] += 1;
+ int processedPatientsCount = processedPatients[0];
+ if (processedPatientsCount % 1000 == 0) {
+ log.info("wrote " + processedPatientsCount + " patient bucket masks");
+ }
});
patientBucketMasks.complete();
log.info("Done creating patient bucket masks");
@@ -174,14 +151,9 @@ public Collection filterVariantSetForPatientSet(Set variantSet,
new BigInteger(new String(emptyBucketMaskChar()),2) : patientBucketMasks.get(patientSet.get(0));
BigInteger _defaultMask = patientBucketMask;
- List patientBucketmasksForSet = patientSet.parallelStream().map((patientNum)->{
- try {
- return patientBucketMasks.get(patientNum);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- //return _defaultMask;
- }).collect(Collectors.toList());
+ List patientBucketmasksForSet = patientSet.parallelStream()
+ .map((patientNum)-> patientBucketMasks.get(patientNum))
+ .collect(Collectors.toList());
for(BigInteger patientMask : patientBucketmasksForSet) {
patientBucketMask = patientMask.or(patientBucketMask);
}
@@ -218,17 +190,4 @@ private char[] emptyBucketMaskChar() {
}
return _emptyBucketMaskChar.clone();
}
-
- /**
- * Use while debugging
- */
- public void printPatientMasks() {
- for(Integer patientID : patientBucketMasks.keys()) {
- try {
- log.info("BucketMask length for " + patientID + ":\t" + patientBucketMasks.get(patientID).toString(2).length());
- } catch (IOException e) {
- log.error("FBBIS Error: ", e);
- }
- }
- }
}
\ No newline at end of file
diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/CompressedIndex.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/CompressedIndex.java
index eb8705b9..48637410 100644
--- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/CompressedIndex.java
+++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/CompressedIndex.java
@@ -32,7 +32,7 @@ public class CompressedIndex implements Serializable {
private HashMap, byte[]> compressedRangeMap;
private int valueCount;
- public TreeMap> buildContinuousValuesMap(FileBackedByteIndexedStorage allValues) {
+ public TreeMap> buildContinuousValuesMap(FileBackedByteIndexedStorage allValues) {
TreeMap> continuousValueMap = new TreeMap<>();
for(String key : allValues.keys()) {
try{
diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/FileBackedByteIndexedInfoStore.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/FileBackedByteIndexedInfoStore.java
index f282707b..4fd4ae37 100644
--- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/FileBackedByteIndexedInfoStore.java
+++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/FileBackedByteIndexedInfoStore.java
@@ -1,8 +1,6 @@
package edu.harvard.hms.dbmi.avillach.hpds.data.genotype;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
@@ -10,8 +8,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
+import java.util.zip.GZIPOutputStream;
+import edu.harvard.hms.dbmi.avillach.hpds.data.storage.FileBackedStorageVariantIndexImpl;
import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage;
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage;
public class FileBackedByteIndexedInfoStore implements Serializable {
@@ -21,12 +22,12 @@ public class FileBackedByteIndexedInfoStore implements Serializable {
public boolean isContinuous;
public Float min = Float.MAX_VALUE, max = Float.MIN_VALUE;
- private FileBackedByteIndexedStorage allValues;
+ private FileBackedByteIndexedStorage allValues;
public TreeMap> continuousValueMap;
public CompressedIndex continuousValueIndex;
- public FileBackedByteIndexedStorage getAllValues() {
+ public FileBackedByteIndexedStorage getAllValues() {
return allValues;
}
@@ -41,8 +42,8 @@ public List search(String term) {
}
}
- public void addEntry(String value, String[] variantSpecs) throws IOException {
- allValues.put(value, variantSpecs);
+ public void addEntry(String value, Integer[] variantIds) throws IOException {
+ allValues.put(value, variantIds);
}
@@ -51,8 +52,7 @@ public void complete() {
}
public FileBackedByteIndexedInfoStore(File storageFolder, InfoStore infoStore) throws IOException {
- this.allValues = new FileBackedByteIndexedStorage(String.class, String[].class,
- new File(storageFolder, infoStore.column_key + "_infoStoreStorage.javabin"));
+ this.allValues = new FileBackedStorageVariantIndexImpl(new File(storageFolder, infoStore.column_key + "_infoStoreStorage.javabin"));
this.description = infoStore.description;
this.column_key = infoStore.column_key;
this.isContinuous = infoStore.isNumeric();
@@ -71,8 +71,8 @@ public FileBackedByteIndexedInfoStore(File storageFolder, InfoStore infoStore) t
if(x%10000 == 0) {
System.out.println(infoStore.column_key + " " + ((((double)x) / sortedKeys.size()) * 100) + "% done");
}
- ConcurrentSkipListSet variantSpecs = infoStore.allValues.get(key);
- addEntry(key, variantSpecs.toArray(new String[variantSpecs.size()]));
+ ConcurrentSkipListSet variantIds = infoStore.allValues.get(key);
+ addEntry(key, variantIds.toArray(new Integer[variantIds.size()]));
x++;
}
}
@@ -89,10 +89,10 @@ public FileBackedByteIndexedInfoStore(File storageFolder, InfoStore infoStore) t
private static void normalizeNumericStore(InfoStore store) {
TreeSet allKeys = new TreeSet(store.allValues.keySet());
- ConcurrentHashMap> normalizedValues = new ConcurrentHashMap<>();
+ ConcurrentHashMap> normalizedValues = new ConcurrentHashMap<>();
for(String key : allKeys) {
String[] keys = key.split(",");
- ConcurrentSkipListSet variantSpecs = store.allValues.get(key);
+ ConcurrentSkipListSet variantIds = store.allValues.get(key);
if(key.contentEquals(".")) {
//don't add it
}else if(keyHasMultipleValues(keys)) {
@@ -100,26 +100,26 @@ private static void normalizeNumericStore(InfoStore store) {
if(value.contentEquals(".")) {
}else {
- ConcurrentSkipListSet normalizedSpecs = normalizedValues.get(value);
- if(normalizedSpecs == null) {
- normalizedSpecs = variantSpecs;
+ ConcurrentSkipListSet normalizedVariantIds = normalizedValues.get(value);
+ if(normalizedVariantIds == null) {
+ normalizedVariantIds = variantIds;
}else {
- normalizedSpecs.addAll(variantSpecs);
+ normalizedVariantIds.addAll(variantIds);
}
- normalizedValues.put(value, normalizedSpecs);
+ normalizedValues.put(value, normalizedVariantIds);
}
}
}else {
if(key.contentEquals(".")) {
}else {
- ConcurrentSkipListSet normalizedSpecs = normalizedValues.get(key);
- if(normalizedSpecs == null) {
- normalizedSpecs = variantSpecs;
+ ConcurrentSkipListSet normalizedVariantIds = normalizedValues.get(key);
+ if(normalizedVariantIds == null) {
+ normalizedVariantIds = variantIds;
}else {
- normalizedSpecs.addAll(variantSpecs);
+ normalizedVariantIds.addAll(variantIds);
}
- normalizedValues.put(key, normalizedSpecs);
+ normalizedValues.put(key, normalizedVariantIds);
}
}
@@ -139,5 +139,19 @@ private static boolean keyHasMultipleValues(String[] keys) {
return x>1;
}
+ public void updateStorageDirectory(File storageDirectory) {
+ allValues.updateStorageDirectory(storageDirectory);
+ }
+
+ public void write(File outputFile) {
+ try(
+ FileOutputStream fos = new FileOutputStream(outputFile);
+ GZIPOutputStream gzos = new GZIPOutputStream(fos);
+ ObjectOutputStream oos = new ObjectOutputStream(gzos);) {
+ oos.writeObject(this);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/InfoStore.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/InfoStore.java
index b58373f2..a62e0591 100644
--- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/InfoStore.java
+++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/InfoStore.java
@@ -14,7 +14,7 @@ public class InfoStore implements Serializable {
public final String column_key;
public final String description;
- public ConcurrentHashMap> allValues = new ConcurrentHashMap<>();
+ public ConcurrentHashMap> allValues = new ConcurrentHashMap<>();
private String prefix;
public List search(String term) {
@@ -28,21 +28,6 @@ public List search(String term) {
}
}
- public void processRecord(VariantSpec spec, String[] values) {
- for(String value : values) {
- if(value.startsWith(column_key + "=")) {
- String valueWithoutkey = value.replaceFirst(column_key + "=", "");
- ConcurrentSkipListSet entriesForValue = allValues.get(valueWithoutkey);
- if(entriesForValue == null) {
- entriesForValue = new ConcurrentSkipListSet<>();
- allValues.put(valueWithoutkey, entriesForValue);
- }
- entriesForValue.add(spec.specNotation());
- }
- }
- }
-
-
public InfoStore(String description, String delimiter, String key) {
this.prefix = key + "=";
this.description = description;
@@ -53,7 +38,7 @@ public boolean isNumeric() {
int nonNumericCount = 0;
int numericCount = 0;
System.out.println("Testing for numeric : " + this.column_key + " : " + allValues.size() + " values");
- KeySetView> allKeys = allValues.keySet();
+ KeySetView> allKeys = allValues.keySet();
for(String key : allKeys){
try {
Double.parseDouble(key);
@@ -84,16 +69,16 @@ public boolean isNumeric() {
return false;
}
- public void processRecord(String specNotation, String[] infoValues) {
+ public void processRecord(Integer variantId, String[] infoValues) {
for(String value : infoValues) {
if(value.startsWith(prefix)) {
String valueWithoutkey = value.substring(prefix.length());
- ConcurrentSkipListSet entriesForValue = allValues.get(valueWithoutkey);
+ ConcurrentSkipListSet entriesForValue = allValues.get(valueWithoutkey);
if(entriesForValue == null) {
entriesForValue = new ConcurrentSkipListSet<>();
allValues.put(valueWithoutkey, entriesForValue);
}
- entriesForValue.add(specNotation);
+ entriesForValue.add(variantId);
}
}
}
diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMasks.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMasks.java
index 9d3e599a..a8e2b75d 100644
--- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMasks.java
+++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMasks.java
@@ -1,5 +1,9 @@
package edu.harvard.hms.dbmi.avillach.hpds.data.genotype;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.ser.ToStringSerializer;
+
import java.io.Serializable;
import java.math.BigInteger;
@@ -166,8 +170,19 @@ public VariantMasks(char[][] maskValues) {
}
+ public VariantMasks() {
+ }
+
+ @JsonProperty("ho")
+ @JsonSerialize(using = ToStringSerializer.class, include=JsonSerialize.Inclusion.NON_NULL)
public BigInteger homozygousMask;
+ @JsonProperty("he")
+ @JsonSerialize(using = ToStringSerializer.class, include=JsonSerialize.Inclusion.NON_NULL)
public BigInteger heterozygousMask;
+ @JsonProperty("hon")
+ @JsonSerialize(using = ToStringSerializer.class, include=JsonSerialize.Inclusion.NON_NULL)
public BigInteger homozygousNoCallMask;
+ @JsonProperty("hen")
+ @JsonSerialize(using = ToStringSerializer.class, include=JsonSerialize.Inclusion.NON_NULL)
public BigInteger heterozygousNoCallMask;
}
diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java
index 3b706cbc..1f6cdf0f 100644
--- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java
+++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java
@@ -6,6 +6,7 @@
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,7 +92,7 @@ public String[] findBySingleVariantSpec(String variantSpec, VariantBucketHolder<
log.warn("No bucket found for spec " + variantSpec + " in bucket " + chrOffset);
return new String[0];
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
log.warn("IOException caught looking up variantSpec : " + variantSpec, e);
return new String[0];
}
@@ -112,7 +113,6 @@ public Map findByMultipleVariantSpec(Collection varien
* have to write them to disk once. The data will be written to disk only when the flush() method is called.
*
* @param variantSpec
- * @param array
* @throws IOException
*/
public void put(String variantSpec, String metaData ) throws IOException {
@@ -161,7 +161,7 @@ public synchronized void flush() throws IOException {
if(contigFbbis == null) {
log.info("creating new file for " + contig);
String filePath = fileStoragePrefix + "_" + contig + ".bin";
- contigFbbis = new FileBackedByteIndexedStorage>(Integer.class, (Class>)(Class>) ConcurrentHashMap.class, new File(filePath));
+ contigFbbis = new FileBackedJavaIndexedStorage(Integer.class, (Class>)(Class>) ConcurrentHashMap.class, new File(filePath));
indexMap.put(contig, contigFbbis);
}
@@ -200,7 +200,8 @@ public static VariantMetadataIndex createInstance(String metadataIndexPath) {
return (VariantMetadataIndex) in.readObject();
} catch(Exception e) {
// todo: handle exceptions better
- log.error("No Metadata Index found at " + metadataIndexPath, e);
+ log.info("No Metadata Index found at " + metadataIndexPath);
+ log.debug("Error loading metadata index:", e);
return null;
}
}
diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantStore.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantStore.java
index 4a566427..6541c9dc 100644
--- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantStore.java
+++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantStore.java
@@ -1,60 +1,78 @@
package edu.harvard.hms.dbmi.avillach.hpds.data.genotype;
+import com.google.common.collect.RangeSet;
+import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder;
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.*;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import com.google.errorprone.annotations.Var;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.event.Level;
-
-import com.google.common.collect.RangeSet;
-
-import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder;
-import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage;
-
public class VariantStore implements Serializable {
private static final long serialVersionUID = -6970128712587609414L;
private static Logger log = LoggerFactory.getLogger(VariantStore.class);
public static final int BUCKET_SIZE = 1000;
+ public static final String VARIANT_SPEC_INDEX_FILE = "variantSpecIndex.javabin";
+
private BigInteger emptyBitmask;
private String[] patientIds;
+ private transient String[] variantSpecIndex;
+
private Integer variantStorageSize;
private String[] vcfHeaders = new String[24];
- private TreeMap>> variantMaskStorage = new TreeMap<>();
+ private Map>> variantMaskStorage = new TreeMap<>();
- public TreeMap>> getVariantMaskStorage() {
+ public Map>> getVariantMaskStorage() {
return variantMaskStorage;
}
- public void setVariantMaskStorage(TreeMap>> variantMaskStorage) {
+ public void setVariantMaskStorage(Map>> variantMaskStorage) {
this.variantMaskStorage = variantMaskStorage;
}
- public static VariantStore deserializeInstance(String genomicDataDirectory) throws IOException, ClassNotFoundException, InterruptedException {
- if(new File(genomicDataDirectory + "variantStore.javabin").exists()) {
- ObjectInputStream ois = new ObjectInputStream(new GZIPInputStream(new FileInputStream(genomicDataDirectory + "variantStore.javabin")));
- VariantStore variantStore = (VariantStore) ois.readObject();
- ois.close();
- variantStore.open();
- return variantStore;
- } else {
- //we still need an object to reference when checking the variant store, even if it's empty.
- VariantStore variantStore = new VariantStore();
- variantStore.setPatientIds(new String[0]);
- return variantStore;
+ public String[] getVariantSpecIndex() {
+ return variantSpecIndex;
+ }
+ public void setVariantSpecIndex(String[] variantSpecIndex) {
+ this.variantSpecIndex = variantSpecIndex;
+ }
+
+ public static VariantStore readInstance(String genomicDataDirectory) throws IOException, ClassNotFoundException {
+ ObjectInputStream ois = new ObjectInputStream(new GZIPInputStream(new FileInputStream(genomicDataDirectory + "variantStore.javabin")));
+ VariantStore variantStore = (VariantStore) ois.readObject();
+ ois.close();
+ variantStore.getVariantMaskStorage().values().forEach(store -> {
+ store.updateStorageDirectory(new File(genomicDataDirectory));
+ });
+ variantStore.open();
+ variantStore.setVariantSpecIndex(loadVariantIndexFromFile(genomicDataDirectory));
+ return variantStore;
+ }
+
+ public void writeInstance(String genomicDirectory) {
+ try (FileOutputStream fos = new FileOutputStream(new File(genomicDirectory, "variantStore.javabin"));
+ GZIPOutputStream gzos = new GZIPOutputStream(fos);
+ ObjectOutputStream oos = new ObjectOutputStream(gzos);) {
+ oos.writeObject(this);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ try (FileOutputStream fos = new FileOutputStream(new File(genomicDirectory, "variantSpecIndex.javabin"));
+ GZIPOutputStream gzos = new GZIPOutputStream(fos);
+ ObjectOutputStream oos = new ObjectOutputStream(gzos);) {
+ oos.writeObject(Arrays.asList(variantSpecIndex));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
}
@@ -63,31 +81,27 @@ public Map countVariants() {
TreeMap counts = new TreeMap<>();
for (String contig : variantMaskStorage.keySet()) {
counts.put(contig, new int[5]);
- FileBackedByteIndexedStorage> storage = variantMaskStorage
+ FileBackedJsonIndexStorage> storage = variantMaskStorage
.get(contig);
storage.keys().stream().forEach((Integer key) -> {
int[] contigCounts = counts.get(contig);
- try {
- Collection values = storage.get(key).values();
- contigCounts[0] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
- return masks.heterozygousMask != null ? 1 : 0;
- }));
- contigCounts[1] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
- return masks.homozygousMask != null ? 1 : 0;
- }));
- contigCounts[2] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
- return masks.heterozygousNoCallMask != null ? 1 : 0;
- }));
- contigCounts[3] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
- return masks.homozygousNoCallMask != null ? 1 : 0;
- }));
- contigCounts[4] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
- return masks.heterozygousMask != null || masks.homozygousMask != null
- || masks.heterozygousNoCallMask != null || masks.homozygousNoCallMask != null ? 1 : 0;
- }));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ Collection values = storage.get(key).values();
+ contigCounts[0] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
+ return masks.heterozygousMask != null ? 1 : 0;
+ }));
+ contigCounts[1] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
+ return masks.homozygousMask != null ? 1 : 0;
+ }));
+ contigCounts[2] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
+ return masks.heterozygousNoCallMask != null ? 1 : 0;
+ }));
+ contigCounts[3] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
+ return masks.homozygousNoCallMask != null ? 1 : 0;
+ }));
+ contigCounts[4] += values.stream().collect(Collectors.summingInt((VariantMasks masks) -> {
+ return masks.heterozygousMask != null || masks.homozygousMask != null
+ || masks.heterozygousNoCallMask != null || masks.homozygousNoCallMask != null ? 1 : 0;
+ }));
});
}
return counts;
@@ -160,7 +174,7 @@ public void setVariantStorageSize(int variantStorageSize) {
public List getMasksForRangesOfChromosome(String contigForGene, List offsetsForGene,
RangeSet rangeSetsForGene) throws IOException {
- FileBackedByteIndexedStorage masksForChromosome = variantMaskStorage.get(contigForGene);
+ FileBackedJsonIndexStorage masksForChromosome = variantMaskStorage.get(contigForGene);
Set bucketsForGene = offsetsForGene.stream().map((offset) -> {
return offset / BUCKET_SIZE;
}).collect(Collectors.toSet());
@@ -189,4 +203,19 @@ public BigInteger emptyBitmask() {
return emptyBitmask;
}
+ public static String[] loadVariantIndexFromFile(String genomicDataDirectory) {
+ try (ObjectInputStream objectInputStream = new ObjectInputStream(new GZIPInputStream(new FileInputStream(genomicDataDirectory + "/" + VARIANT_SPEC_INDEX_FILE)));){
+
+ List variants = (List) objectInputStream.readObject();
+ return variants.toArray(new String[0]);
+
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantIndexImpl.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantIndexImpl.java
new file mode 100644
index 00000000..f2ec9e48
--- /dev/null
+++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantIndexImpl.java
@@ -0,0 +1,24 @@
+package edu.harvard.hms.dbmi.avillach.hpds.data.storage;
+
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.Serializable;
+
+public class FileBackedStorageVariantIndexImpl extends FileBackedJsonIndexStorage implements Serializable {
+ private static final long serialVersionUID = -893724459359928779L;
+
+ public FileBackedStorageVariantIndexImpl(File storageFile) throws FileNotFoundException {
+ super(storageFile);
+ }
+
+ private static final TypeReference typeRef
+ = new TypeReference() {};
+
+ @Override
+ public TypeReference getTypeReference() {
+ return typeRef;
+ }
+}
diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantMasksImpl.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantMasksImpl.java
new file mode 100644
index 00000000..6d39d79a
--- /dev/null
+++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/storage/FileBackedStorageVariantMasksImpl.java
@@ -0,0 +1,25 @@
+package edu.harvard.hms.dbmi.avillach.hpds.data.storage;
+
+import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks;
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class FileBackedStorageVariantMasksImpl extends FileBackedJsonIndexStorage> implements Serializable {
+ private static final long serialVersionUID = -1086729119489479152L;
+
+ public FileBackedStorageVariantMasksImpl(File storageFile) throws FileNotFoundException {
+ super(storageFile);
+ }
+ private static final TypeReference> typeRef
+ = new TypeReference>() {};
+
+ @Override
+ public TypeReference> getTypeReference() {
+ return typeRef;
+ }
+}
diff --git a/docker/pic-sure-hpds/Dockerfile b/docker/pic-sure-hpds/Dockerfile
index 0b38a4de..64366119 100644
--- a/docker/pic-sure-hpds/Dockerfile
+++ b/docker/pic-sure-hpds/Dockerfile
@@ -6,6 +6,6 @@ RUN apk add --no-cache --purge -uU curl wget unzip
RUN apk add --no-cache --purge openjdk11
-ADD hpds-war-1.0-SNAPSHOT-war-exec.jar /hpds.jar
+ADD hpds-war-2.0.0-SNAPSHOT-war-exec.jar /hpds.jar
EXPOSE 8080
diff --git a/docker/pom.xml b/docker/pom.xml
index d19a2729..8322b603 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -5,7 +5,7 @@
pic-sure-hpds
edu.harvard.hms.dbmi.avillach.hpds
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
docker
diff --git a/etl/pom.xml b/etl/pom.xml
index ded9fdd5..d56669d6 100644
--- a/etl/pom.xml
+++ b/etl/pom.xml
@@ -6,7 +6,7 @@
pic-sure-hpds
edu.harvard.hms.dbmi.avillach.hpds
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
etl
@@ -325,7 +325,27 @@
single
-
+
+
+ GenomicDatasetMergerRunner
+
+
+
+ edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.GenomicDatasetMergerRunner
+
+
+ ${project.basedir}/../docker/pic-sure-hpds-etl
+
+ jar-with-dependencies
+
+ GenomicDatasetMergerRunner
+ GenomicDatasetMergerRunner
+
+ package
+
+ single
+
+
diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java
index cab26774..0bd73458 100644
--- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java
+++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java
@@ -51,10 +51,7 @@ public class RemapPatientIds {
private static final int TEXT_VALUE = 3;
public static void main(String[] args) throws ClassNotFoundException, FileNotFoundException, IOException {
-
- ObjectInputStream objectInputStream = new ObjectInputStream(new GZIPInputStream(new FileInputStream("/opt/local/hpds/all/variantStore.javabin")));
- VariantStore variantStore = (VariantStore) objectInputStream.readObject();
- objectInputStream.close();
+ VariantStore variantStore = VariantStore.readInstance("/opt/local/hpds/all/");
String[] oldPatientIds = variantStore.getPatientIds();
String[] newPatientIds = new String[oldPatientIds.length];
diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java
new file mode 100644
index 00000000..e66f0c30
--- /dev/null
+++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java
@@ -0,0 +1,289 @@
+package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype;
+
+import com.google.common.collect.Sets;
+import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.*;
+import edu.harvard.hms.dbmi.avillach.hpds.data.storage.FileBackedStorageVariantMasksImpl;
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage;
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class GenomicDatasetMerger {
+
+ private static Logger log = LoggerFactory.getLogger(GenomicDatasetMerger.class);
+
+ private final VariantStore variantStore1;
+ private final VariantStore variantStore2;
+
+ private final Map infoStores1;
+ private final Map infoStores2;
+
+ private final String outputDirectory;
+
+ private final VariantStore mergedVariantStore;
+
+ public GenomicDatasetMerger(VariantStore variantStore1, VariantStore variantStore2, Map infoStores1, Map infoStores2, String outputDirectory) {
+ this.variantStore1 = variantStore1;
+ this.variantStore2 = variantStore2;
+ this.mergedVariantStore = new VariantStore();
+ this.infoStores1 = infoStores1;
+ this.infoStores2 = infoStores2;
+ this.outputDirectory = outputDirectory;
+ validate();
+ }
+
+ private void validate() {
+ if (!variantStore1.getVariantMaskStorage().keySet().equals(variantStore2.getVariantMaskStorage().keySet())) {
+ log.error("Variant store chromosomes do not match:");
+ log.error(String.join(", ", variantStore1.getVariantMaskStorage().keySet()));
+ log.error(String.join(", ", variantStore2.getVariantMaskStorage().keySet()));
+ throw new IllegalStateException("Unable to merge variant stores with different numbers of chromosomes");
+ }
+ Sets.SetView patientIntersection = Sets.intersection(Sets.newHashSet(variantStore1.getPatientIds()), Sets.newHashSet(variantStore2.getPatientIds()));
+ if (!patientIntersection.isEmpty()) {
+ throw new IllegalStateException("Cannot merge genomic datasets containing the same patient id");
+ }
+ }
+
+ public VariantStore mergeVariantStore(Map>> mergedChromosomeMasks) {
+ mergedVariantStore.setVariantMaskStorage(mergedChromosomeMasks);
+ mergedVariantStore.setPatientIds(mergePatientIds());
+ return mergedVariantStore;
+ }
+
+ /**
+ * Since both sets of variant indexes reference a different variant spec list, we need to re-index
+ * the values in the second set of variant indexes.
+ *
+ * For each variant in the second list of variants:
+ * If it exists in list one, update any references to it by index to it's index in list 1
+ * Otherwise, append it to list one, and update any references to it by index to it's new position in list 1
+ *
+ * Ex:
+ *
+ * variantIndex1 = ["chr1,1000,A,G", "chr1,1002,A,G", "chr1,2000,A,G"]
+ * variantIndex2 = ["chr1,1000,A,G", "chr1,1004,A,G", "chr1,3000,A,G"]
+ *
+ * GeneWithVariant_store1 = [0, 1]
+ * GeneWithVariant_store2 = [0, 1, 2]
+ *
+ * mergedVariantIndex = ["chr1,1000,A,G", "chr1,1002,A,G", "chr1,2000,A,G", "chr1,1004,A,G", "chr1,3000,A,G"]
+ * GeneWithVariant_merged = [0, 1, 3, 4]
+ *
+ * @return
+ * @throws IOException
+ */
+ public Map mergeVariantIndexes() throws IOException {
+ String[] variantIndex1 = variantStore1.getVariantSpecIndex();
+ String[] variantIndex2 = variantStore2.getVariantSpecIndex();
+
+ Map variantSpecToIndexMap = new HashMap<>();
+ LinkedList variantSpecList = new LinkedList<>(Arrays.asList(variantIndex1));
+ for (int i = 0; i < variantIndex1.length; i++) {
+ variantSpecToIndexMap.put(variantIndex1[i], i);
+ }
+
+ // Will contain any re-mapped indexes in the second variant index. For example, if a variant is contained in both
+ // data sets, the merged data set will use the index from dataset 1 to reference it, and any references in data
+ // set 2 for this variant needs to be re-mapped. Likewise, if a variant in set 2 is new, it will be appended to
+ // the list and also need to be re-mapped
+ Integer[] remappedIndexes = new Integer[variantIndex2.length];
+
+ for (int i = 0; i < variantIndex2.length; i++) {
+ String variantSpec = variantIndex2[i];
+ Integer variantIndex = variantSpecToIndexMap.get(variantSpec);
+ if (variantIndex != null) {
+ remappedIndexes[i] = variantIndex;
+ } else {
+ variantSpecList.add(variantSpec);
+ // the new index is the now last item in the list
+ int newVariantSpecIndex = variantSpecList.size() - 1;
+ remappedIndexes[i] = newVariantSpecIndex;
+ variantSpecToIndexMap.put(variantSpec, newVariantSpecIndex);
+ }
+ }
+
+ Map mergedInfoStores = new HashMap<>();
+
+ if (!infoStores1.keySet().equals(infoStores2.keySet())) {
+ throw new IllegalStateException("Info stores do not match");
+ }
+ for (Map.Entry infoStores1Entry : infoStores1.entrySet()) {
+ FileBackedByteIndexedInfoStore infoStore2 = infoStores2.get(infoStores1Entry.getKey());
+
+ FileBackedByteIndexedStorage allValuesStore1 = infoStores1Entry.getValue().getAllValues();
+ FileBackedByteIndexedStorage allValuesStore2 = infoStore2.getAllValues();
+ ConcurrentHashMap> mergedInfoStoreValues = new ConcurrentHashMap<>();
+
+ Sets.SetView allKeys = Sets.union(allValuesStore1.keys(), allValuesStore2.keys());
+ for (String key : allKeys) {
+ Set store1Values = Set.of(allValuesStore1.getOrELse(key, new Integer[]{}));
+ Set store2Values = Set.of(allValuesStore2.getOrELse(key, new Integer[]{}));
+ Set remappedValuesStore2 = store2Values.stream().map(value -> remappedIndexes[value]).collect(Collectors.toSet());
+
+ Set mergedValues = Sets.union(store1Values, remappedValuesStore2);
+ mergedInfoStoreValues.put(key, new ConcurrentSkipListSet<>(mergedValues));
+ }
+
+ InfoStore infoStore = new InfoStore(infoStore2.description, null, infoStores1Entry.getKey());
+ infoStore.allValues = mergedInfoStoreValues;
+ FileBackedByteIndexedInfoStore mergedStore = new FileBackedByteIndexedInfoStore(new File(outputDirectory), infoStore);
+ mergedInfoStores.put(infoStores1Entry.getKey(), mergedStore);
+ }
+
+ mergedVariantStore.setVariantSpecIndex(variantSpecList.toArray(new String[0]));
+ return mergedInfoStores;
+ }
+
+ /**
+ * Merge patient ids from both variant stores. We are simply appending patients from store 2 to patients from store 1
+ *
+ * @return the merged patient ids
+ */
+ private String[] mergePatientIds() {
+ return Stream.concat(Arrays.stream(variantStore1.getPatientIds()), Arrays.stream(variantStore2.getPatientIds()))
+ .toArray(String[]::new);
+ }
+
+ /**
+ * For each chromosome, call mergeChromosomeMask to merge the masks
+ * @return
+ */
+ public Map>> mergeChromosomeMasks() {
+ Map>> mergedMaskStorage = new ConcurrentHashMap<>();
+ variantStore1.getVariantMaskStorage().keySet().parallelStream().forEach(chromosome -> {
+ try {
+ mergedMaskStorage.put(chromosome, mergeChromosomeMask(chromosome));
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ return mergedMaskStorage;
+ }
+
+ /**
+ * Merge the masks for a chormosome. The logic here is somewhat complex but straightforward, dealing with no values at various places in the maps.
+ * If a variant mask contains no matches (ie, is 000000...), it is not stored in the data.
+ *
+ * Examples:
+ * variantMaskStorage1: {
+ * 10001 -> {
+ * "chr22,10001031,A,G" -> "10101010",
+ * "chr22,10001143,G,A" -> "10101010"
+ * },
+ * 10002 -> {
+ * "chr22,10002031,A,G" -> "10101010",
+ * "chr22,10002143,G,A" -> "10101010"
+ * }
+ * }
+ * variantMaskStorage2: {
+ * 10001 -> {
+ * "chr22,10001031,A,G" -> "00001111",
+ * "chr22,10001213,A,G" -> "00001111"
+ * },
+ * 10003 -> {
+ * "chr22,10003031,A,G" -> "00001111",
+ * "chr22,10003213,A,G" -> "00001111"
+ * }
+ * }
+ *
+ * mergedVariantMaskStorage: {
+ * 10001 -> {
+ * "chr22,10001031,A,G" -> "1010101000001111",
+ * "chr22,10001213,A,G" -> "0000000000001111",
+ * "chr22,10001143,G,A" -> "1010101000000000"
+ * },
+ * 10002 -> {
+ * "chr22,10002031,A,G" -> "1010101000000000",
+ * "chr22,10002143,G,A" -> "1010101000000000"
+ * }
+ * 10003 -> {
+ * "chr22,10003031,A,G" -> "0000000000001111",
+ * "chr22,10003213,A,G" -> "0000000000001111"
+ * }
+ * }
+ */
+ public FileBackedJsonIndexStorage> mergeChromosomeMask(String chromosome) throws FileNotFoundException {
+ FileBackedJsonIndexStorage> variantMaskStorage1 = variantStore1.getVariantMaskStorage().get(chromosome);
+ FileBackedJsonIndexStorage> variantMaskStorage2 = variantStore2.getVariantMaskStorage().get(chromosome);
+
+ FileBackedJsonIndexStorage> merged = new FileBackedStorageVariantMasksImpl(new File(outputDirectory + chromosome + "masks.bin"));
+ variantMaskStorage1.keys().forEach(key -> {
+ Map masks1 = variantMaskStorage1.get(key);
+ Map masks2 = variantMaskStorage2.get(key);
+ if (masks2 == null) {
+ masks2 = Map.of();
+ }
+
+ ConcurrentHashMap mergedMasks = new ConcurrentHashMap<>();
+ for (Map.Entry entry : masks1.entrySet()) {
+ VariantMasks variantMasks2 = masks2.get(entry.getKey());
+ if (variantMasks2 == null) {
+ // this will have all null masks, which will result in null when
+ // appended to a null, or be replaced with an empty bitmask otherwise
+ variantMasks2 = new VariantMasks();
+ }
+ mergedMasks.put(entry.getKey(), append(entry.getValue(), variantMasks2));
+ }
+ // Any entry in the second set that is not in the merged set can be merged with an empty variant mask,
+ // if there were a corresponding entry in set 1, it would have been merged in the previous loop
+ for (Map.Entry entry : masks2.entrySet()) {
+ if (!mergedMasks.containsKey(entry.getKey())) {
+ mergedMasks.put(entry.getKey(), append(new VariantMasks(), entry.getValue()));
+ }
+ }
+ merged.put(key, mergedMasks);
+ });
+
+ ConcurrentHashMap mergedMasks = new ConcurrentHashMap<>();
+ variantMaskStorage2.keys().forEach(key -> {
+ Map masks2 = variantMaskStorage2.get(key);
+ for (Map.Entry entry : masks2.entrySet()) {
+ if (!mergedMasks.containsKey(entry.getKey())) {
+ mergedMasks.put(entry.getKey(), append(new VariantMasks(), entry.getValue()));
+ }
+ }
+ });
+ return merged;
+ }
+
+ public VariantMasks append(VariantMasks variantMasks1, VariantMasks variantMasks2) {
+ VariantMasks appendedMasks = new VariantMasks();
+ appendedMasks.homozygousMask = appendMask(variantMasks1.homozygousMask, variantMasks2.homozygousMask);
+ appendedMasks.heterozygousMask = appendMask(variantMasks1.heterozygousMask, variantMasks2.heterozygousMask);
+ appendedMasks.homozygousNoCallMask = appendMask(variantMasks1.homozygousNoCallMask, variantMasks2.homozygousNoCallMask);
+ appendedMasks.heterozygousNoCallMask = appendMask(variantMasks1.heterozygousNoCallMask, variantMasks2.heterozygousNoCallMask);
+ return appendedMasks;
+ }
+
+ /**
+ * Appends one mask to another. This assumes the masks are both padded with '11' on each end
+ * to prevent overflow issues.
+ */
+ public BigInteger appendMask(BigInteger mask1, BigInteger mask2) {
+ if (mask1 == null && mask2 == null) {
+ return null;
+ }
+ if (mask1 == null) {
+ mask1 = variantStore1.emptyBitmask();
+ }
+ if (mask2 == null) {
+ mask2 = variantStore2.emptyBitmask();
+ }
+ String binaryMask1 = mask1.toString(2);
+ String binaryMask2 = mask2.toString(2);
+ String appendedString = binaryMask1.substring(0, binaryMask1.length() - 2) +
+ binaryMask2.substring(2);
+ return new BigInteger(appendedString, 2);
+ }
+}
diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java
new file mode 100644
index 00000000..70565730
--- /dev/null
+++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMergerRunner.java
@@ -0,0 +1,81 @@
+package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype;
+
+import com.google.common.base.Preconditions;
+import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.FileBackedByteIndexedInfoStore;
+import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks;
+import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore;
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.GZIPInputStream;
+
+public class GenomicDatasetMergerRunner {
+
+ private static Logger log = LoggerFactory.getLogger(GenomicDatasetMerger.class);
+
+ public static final String INFO_STORE_JAVABIN_SUFFIX = "infoStore.javabin";
+ public static final String VARIANT_SPEC_INDEX_FILENAME = "variantSpecIndex.javabin";
+
+ private static String genomicDirectory1;
+ private static String genomicDirectory2;
+
+ /**
+ * args[0]: directory containing genomic dataset 1
+ * args[1]: directory containing genomic dataset 2
+ * args[2]: output directory
+ */
+ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
+ if (args.length != 3) {
+ throw new IllegalArgumentException("Three arguments must be provided: source directory 1, source directory 2, output directory");
+ }
+ genomicDirectory1 = args[0];
+ genomicDirectory2 = args[1];
+ String outputDirectory = args[2];
+
+ Map infoStores1 = loadInfoStores(genomicDirectory1);
+ Map infoStores2 = loadInfoStores(genomicDirectory2);
+
+ GenomicDatasetMerger genomicDatasetMerger = new GenomicDatasetMerger(VariantStore.readInstance(genomicDirectory1),VariantStore.readInstance(genomicDirectory2), infoStores1, infoStores2, outputDirectory);
+
+ Map>> mergedChromosomeMasks = genomicDatasetMerger.mergeChromosomeMasks();
+ VariantStore mergedVariantStore = genomicDatasetMerger.mergeVariantStore(mergedChromosomeMasks);
+ Map variantIndexes = genomicDatasetMerger.mergeVariantIndexes();
+
+ mergedVariantStore.writeInstance(outputDirectory);
+ variantIndexes.values().forEach(variantIndex -> {
+ variantIndex.write(new File(outputDirectory + variantIndex.column_key + "_" + INFO_STORE_JAVABIN_SUFFIX));
+ });
+ }
+
+ private static Map loadInfoStores(String directory) {
+ Map infoStores = new HashMap<>();
+ File genomicDataDirectory = new File(directory);
+ if(genomicDataDirectory.exists() && genomicDataDirectory.isDirectory()) {
+ Arrays.stream(genomicDataDirectory.list((file, filename)->{return filename.endsWith(INFO_STORE_JAVABIN_SUFFIX);}))
+ .forEach((String filename)->{
+ try (
+ FileInputStream fis = new FileInputStream(directory + filename);
+ GZIPInputStream gis = new GZIPInputStream(fis);
+ ObjectInputStream ois = new ObjectInputStream(gis)
+ ){
+ log.info("loading " + filename);
+ FileBackedByteIndexedInfoStore infoStore = (FileBackedByteIndexedInfoStore) ois.readObject();
+ infoStore.updateStorageDirectory(genomicDataDirectory);
+ infoStores.put(filename.replace("_" + INFO_STORE_JAVABIN_SUFFIX, ""), infoStore);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ return infoStores;
+ }
+}
diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/MultialleleCounter.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/MultialleleCounter.java
deleted file mode 100644
index 758e772f..00000000
--- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/MultialleleCounter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.zip.GZIPInputStream;
-
-import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks;
-import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantSpec;
-import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore;
-import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage;
-
-public class MultialleleCounter {
-
- public static void main(String[] args) throws ClassNotFoundException, FileNotFoundException, IOException {
- try(FileInputStream fis = new FileInputStream("/opt/local/hpds/all/variantStore.javabin");
- ){
- VariantStore variantStore = (VariantStore) new ObjectInputStream(new GZIPInputStream(fis)).readObject();
- variantStore.open();
- for(String contig : variantStore.getVariantMaskStorage().keySet()) {
- System.out.println("Starting contig : " + contig);
- FileBackedByteIndexedStorage>
- currentChromosome = variantStore.getVariantMaskStorage().get(contig);
- currentChromosome.keys().parallelStream().forEach((offsetBucket)->{
- System.out.println("Starting bucket : " + offsetBucket);
- ConcurrentHashMap maskMap;
- try {
- maskMap = currentChromosome.get(offsetBucket);
-
- TreeSet variantsSortedByOffset = new TreeSet();
- for(String variant : maskMap.keySet()) {
- variantsSortedByOffset.add(new VariantSpec(variant));
- }
- ArrayList variantsSortedByOffsetList = new ArrayList(variantsSortedByOffset);
- for(int y = 1; y zygosityMaskStrings;
- private static TreeMap>> variantMaskStorage = new TreeMap<>();
+ private static TreeMap>> variantMaskStorage = new TreeMap<>();
private static long startTime;
@@ -152,7 +158,7 @@ private static void loadVCFs(File indexFile) throws IOException {
try {
walker.nextLine();
} catch (IOException e) {
- logger.error("Error reading nextline of VCF file [" + walker.vcfIndexLine.vcfPath + "]", e);
+ throw new UncheckedIOException(e);
}
});
zygosityMaskStrings.put(currentSpecNotation, maskStringsForVariantSpec[0]);
@@ -165,8 +171,6 @@ private static void loadVCFs(File indexFile) throws IOException {
shutdownChunkWriteExecutor();
- saveVariantStore(store, variantMaskStorage);
-
saveInfoStores();
splitInfoStoresByColumn();
@@ -185,22 +189,18 @@ private static void loadVCFs(File indexFile) throws IOException {
chunkIds.addAll(chromosomeStorage.keys());
for (Integer chunkId : chunkIds) {
for (String variantSpec : chromosomeStorage.get(chunkId).keySet()) {
- try {
- count[0]++;
- VariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec);
- if (variantMasks != null) {
- BigInteger heterozygousMask = variantMasks.heterozygousMask;
- String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask);
- BigInteger homozygousMask = variantMasks.homozygousMask;
- String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask);
-
- if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000)
- logger.debug(variantSpec + " : heterozygous : " + heteroIdList);
- if (!homoIdList.isEmpty() && homoIdList.length() < 1000)
- logger.debug(variantSpec + " : homozygous : " + homoIdList);
- }
- } catch (IOException e) {
- logger.error("an error occurred", e);
+ count[0]++;
+ VariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec);
+ if (variantMasks != null) {
+ BigInteger heterozygousMask = variantMasks.heterozygousMask;
+ String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask);
+ BigInteger homozygousMask = variantMasks.homozygousMask;
+ String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask);
+
+ if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000)
+ logger.debug(variantSpec + " : heterozygous : " + heteroIdList);
+ if (!homoIdList.isEmpty() && homoIdList.length() < 1000)
+ logger.debug(variantSpec + " : homozygous : " + homoIdList);
}
}
if (count[0] > 50)
@@ -211,22 +211,18 @@ private static void loadVCFs(File indexFile) throws IOException {
for (int x = chunkIds.size() - 1; x > 0; x--) {
int chunkId = chunkIds.get(x);
chromosomeStorage.get(chunkId).keySet().forEach((variantSpec) -> {
- try {
- count[0]++;
- VariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec);
- if (variantMasks != null) {
- BigInteger heterozygousMask = variantMasks.heterozygousMask;
- String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask);
- BigInteger homozygousMask = variantMasks.homozygousMask;
- String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask);
-
- if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000)
- logger.debug(variantSpec + " : heterozygous : " + heteroIdList);
- if (!homoIdList.isEmpty() && homoIdList.length() < 1000)
- logger.debug(variantSpec + " : homozygous : " + homoIdList);
- }
- } catch (IOException e) {
- logger.error("an error occurred", e);
+ count[0]++;
+ VariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec);
+ if (variantMasks != null) {
+ BigInteger heterozygousMask = variantMasks.heterozygousMask;
+ String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask);
+ BigInteger homozygousMask = variantMasks.homozygousMask;
+ String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask);
+
+ if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000)
+ logger.debug(variantSpec + " : heterozygous : " + heteroIdList);
+ if (!homoIdList.isEmpty() && homoIdList.length() < 1000)
+ logger.debug(variantSpec + " : homozygous : " + homoIdList);
}
});
if (count[0] > 50)
@@ -235,6 +231,9 @@ private static void loadVCFs(File indexFile) throws IOException {
}
}
}
+
+ store.setVariantSpecIndex(variantIndexBuilder.getVariantSpecIndex().toArray(new String[0]));
+ saveVariantStore(store, variantMaskStorage);
}
private static String sampleIdsForMask(String[] sampleIds, BigInteger heterozygousMask) {
@@ -279,7 +278,7 @@ private static void flipChunk(String lastContigProcessed, int lastChunkProcessed
}
if (!currentContig.contentEquals(lastContigProcessed) || currentChunk > lastChunkProcessed || isLastChunk) {
// flip chunk
- TreeMap>> variantMaskStorage_f = variantMaskStorage;
+ TreeMap>> variantMaskStorage_f = variantMaskStorage;
HashMap zygosityMaskStrings_f = zygosityMaskStrings;
String lastContigProcessed_f = lastContigProcessed;
int lastChunkProcessed_f = lastChunkProcessed;
@@ -287,17 +286,17 @@ private static void flipChunk(String lastContigProcessed, int lastChunkProcessed
try {
if (variantMaskStorage_f.get(lastContigProcessed_f) == null) {
String fileName = lastContigProcessed_f + "masks.bin";
- if ("chr".startsWith(fileName)) {
+ if (!fileName.startsWith("chr")) {
fileName = "chr" + fileName;
}
+
variantMaskStorage_f.put(lastContigProcessed_f,
- new FileBackedByteIndexedStorage(Integer.class, ConcurrentHashMap.class,
- new File(storageDir, fileName)));
+ new FileBackedStorageVariantMasksImpl(new File(storageDir, fileName)));
}
variantMaskStorage_f.get(lastContigProcessed_f).put(lastChunkProcessed_f,
convertLoadingMapToMaskMap(zygosityMaskStrings_f));
} catch (IOException e) {
- logger.error("an error occurred", e);
+ throw new UncheckedIOException(e);
}
});
if (lastChunkProcessed % 100 == 0) {
@@ -309,7 +308,7 @@ private static void flipChunk(String lastContigProcessed, int lastChunkProcessed
}
private static void saveVariantStore(VariantStore store,
- TreeMap>> variantMaskStorage)
+ TreeMap>> variantMaskStorage)
throws IOException, FileNotFoundException {
store.setVariantMaskStorage(variantMaskStorage);
for (FileBackedByteIndexedStorage> storage : variantMaskStorage
@@ -317,11 +316,7 @@ private static void saveVariantStore(VariantStore store,
if (storage != null)
storage.complete();
}
- try (FileOutputStream fos = new FileOutputStream(new File(storageDir, "variantStore.javabin"));
- GZIPOutputStream gzos = new GZIPOutputStream(fos);
- ObjectOutputStream oos = new ObjectOutputStream(gzos);) {
- oos.writeObject(store);
- }
+ store.writeInstance(storageDirStr);
logger.debug("Done saving variant masks.");
}
@@ -340,8 +335,8 @@ public static void splitInfoStoresByColumn() throws FileNotFoundException, IOExc
logger.debug("Splitting" + (System.currentTimeMillis() - startTime) + " seconds");
try {
VCFPerPatientInfoStoreSplitter.splitAll(storageDir, new File(mergedDirStr));
- } catch (ClassNotFoundException | InterruptedException | ExecutionException e) {
- logger.error("Error splitting infostore's by column", e);
+ } catch (ClassNotFoundException | ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
}
logger.debug("Split" + (System.currentTimeMillis() - startTime) + " seconds");
}
@@ -350,8 +345,8 @@ public static void convertInfoStoresToByteIndexed() throws FileNotFoundException
logger.debug("Converting" + (System.currentTimeMillis() - startTime) + " seconds");
try {
VCFPerPatientInfoStoreToFBBIISConverter.convertAll(mergedDirStr, storageDirStr);
- } catch (ClassNotFoundException | InterruptedException | ExecutionException e) {
- logger.error("Error converting infostore to byteindexed", e);
+ } catch (ClassNotFoundException | ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
}
logger.debug("Converted " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
}
@@ -370,7 +365,7 @@ private static void shutdownChunkWriteExecutor() {
private static ConcurrentHashMap convertLoadingMapToMaskMap(
HashMap zygosityMaskStrings_f) {
- ConcurrentHashMap maskMap = new ConcurrentHashMap<>();
+ ConcurrentHashMap maskMap = new ConcurrentHashMap<>(zygosityMaskStrings_f.size());
zygosityMaskStrings_f.entrySet().parallelStream().forEach((entry) -> {
maskMap.put(entry.getKey(), new VariantMasks(entry.getValue()));
});
@@ -449,7 +444,7 @@ public void updateRecords(char[][] zygosityMaskStrings, ConcurrentHashMap {
- infoStore.processRecord(currentSpecNotation, infoColumns);
+ infoStore.processRecord(variantIndex, infoColumns);
});
}
@@ -650,7 +646,7 @@ private static List parseVCFIndex(File vcfIndexFile) {
}
});
} catch (IOException e) {
- throw new RuntimeException("IOException caught parsing vcfIndexFile", e);
+ throw new UncheckedIOException("IOException caught parsing vcfIndexFile", e);
}
return new ArrayList<>(vcfSet);
}
diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFPerPatientInfoStoreToFBBIISConverter.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFPerPatientInfoStoreToFBBIISConverter.java
index 2b2b6053..cdfe3cdd 100644
--- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFPerPatientInfoStoreToFBBIISConverter.java
+++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFPerPatientInfoStoreToFBBIISConverter.java
@@ -5,7 +5,6 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +40,7 @@ public static void convert(File outputFolder, File file) {
if (store.allValues.size() > 0) {
FileBackedByteIndexedInfoStore fbbiis = new FileBackedByteIndexedInfoStore(outputFolder, store);
- writeStore(new File(outputFolder, file.getName()), fbbiis);
+ fbbiis.write(new File(outputFolder, file.getName()));
logger.info("Completed converting InfoStore file: " + file.getAbsolutePath());
} else {
logger.info("Skipping empty InfoStore file: " + file.getAbsolutePath() + "");
@@ -55,18 +54,4 @@ public static void convert(File outputFolder, File file) {
}
}
- private static synchronized void writeStore(File outputFile, FileBackedByteIndexedInfoStore fbbiis)
- throws FileNotFoundException, IOException {
- FileOutputStream fos = new FileOutputStream(outputFile);
- GZIPOutputStream gzos = new GZIPOutputStream(fos);
- ObjectOutputStream oos = new ObjectOutputStream(gzos);
- oos.writeObject(fbbiis);
- oos.flush();
- oos.close();
- gzos.flush();
- gzos.close();
- fos.flush();
- fos.close();
- }
-
}
\ No newline at end of file
diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantCounter.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantCounter.java
deleted file mode 100644
index baee8fd6..00000000
--- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantCounter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.zip.GZIPInputStream;
-
-import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks;
-import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantSpec;
-import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore;
-import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage;
-
-public class VariantCounter {
-
- public static void main(String[] args) throws ClassNotFoundException, FileNotFoundException, IOException {
- try(FileInputStream fis = new FileInputStream("/opt/local/hpds/all/variantStore.javabin");
- ){
- VariantStore variantStore = (VariantStore) new ObjectInputStream(new GZIPInputStream(fis)).readObject();
- variantStore.open();
- for(String contig : variantStore.getVariantMaskStorage().keySet()) {
- int[] countOfVariants = {0};
- FileBackedByteIndexedStorage>
- currentChromosome = variantStore.getVariantMaskStorage().get(contig);
- currentChromosome.keys().parallelStream().forEach((offsetBucket)->{
- ConcurrentHashMap maskMap;
- try {
- maskMap = currentChromosome.get(offsetBucket);
- if(maskMap!=null) {
- countOfVariants[0]+=maskMap.size();
- }
-
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
- System.out.println(contig + "," + countOfVariants[0]);
- }
- }
- }
-}
diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantIndexBuilder.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantIndexBuilder.java
new file mode 100644
index 00000000..71fec959
--- /dev/null
+++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantIndexBuilder.java
@@ -0,0 +1,30 @@
+package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class VariantIndexBuilder {
+
+ private static Logger logger = LoggerFactory.getLogger(VariantIndexBuilder.class);
+
+ private final LinkedList variantSpecIndex = new LinkedList<>();
+ private final Map variantSpecToIndexMap = new ConcurrentHashMap<>();
+
+ public synchronized Integer getIndex(String variantSpec) {
+ Integer variantIndex = variantSpecToIndexMap.get(variantSpec);
+ if (variantIndex == null) {
+ variantIndex = variantSpecIndex.size();
+ variantSpecIndex.add(variantSpec);
+ variantSpecToIndexMap.put(variantSpec, variantIndex);
+ }
+ return variantIndex;
+ }
+
+ public LinkedList getVariantSpecIndex() {
+ return variantSpecIndex;
+ }
+}
diff --git a/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySampleTest.java b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySampleTest.java
index 69fbfae3..b7474ef9 100644
--- a/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySampleTest.java
+++ b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySampleTest.java
@@ -75,12 +75,8 @@ public class BucketIndexBySampleTest {
public static void initializeBinfile() throws Exception {
//load variant data
NewVCFLoader.main(new String[] {VCF_INDEX_FILE, STORAGE_DIR, MERGED_DIR});
-
- //read in variantStore object created by VCFLoader
- ObjectInputStream ois = new ObjectInputStream(new GZIPInputStream(new FileInputStream(STORAGE_DIR + "variantStore.javabin")));
- variantStore = (VariantStore) ois.readObject();
- ois.close();
- variantStore.open();
+
+ VariantStore variantStore = VariantStore.readInstance(STORAGE_DIR);
//now use that object to initialize the BucketIndexBySample object
bucketIndexBySample = new BucketIndexBySample(variantStore, STORAGE_DIR);
diff --git a/pom.xml b/pom.xml
index ab958be4..df3ca3fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
edu.harvard.hms.dbmi.avillach.hpds
pic-sure-hpds
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
pom
pic-sure-hpds
diff --git a/processing/pom.xml b/processing/pom.xml
index e1da83c2..f989a9d9 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -5,7 +5,7 @@
pic-sure-hpds
edu.harvard.hms.dbmi.avillach.hpds
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
processing
diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java
index e2f6c44a..95258704 100644
--- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java
+++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessor.java
@@ -126,6 +126,7 @@ public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, VariantService v
){
log.info("loading " + filename);
FileBackedByteIndexedInfoStore infoStore = (FileBackedByteIndexedInfoStore) ois.readObject();
+ infoStore.updateStorageDirectory(genomicDataDirectory);
infoStores.put(filename.replace("_infoStore.javabin", ""), infoStore);
ois.close();
} catch (IOException e) {
diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantIndexCache.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantIndexCache.java
index 2ca6cdfe..a08ae2fd 100644
--- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantIndexCache.java
+++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantIndexCache.java
@@ -67,23 +67,21 @@ public VariantIndex load(String infoColumn_valueKey) throws IOException {
log.debug("Calculating value for cache for key " + infoColumn_valueKey);
long time = System.currentTimeMillis();
String[] column_and_value = infoColumn_valueKey.split(COLUMN_AND_KEY_DELIMITER);
- String[] variantArray = infoStores.get(column_and_value[0]).getAllValues().get(column_and_value[1]);
+ Integer[] variantIndexIntArray = infoStores.get(column_and_value[0]).getAllValues().get(column_and_value[1]);
- if ((double)variantArray.length / (double)variantIndex.length < MAX_SPARSE_INDEX_RATIO ) {
+ if ((double)variantIndexIntArray.length / (double)variantIndex.length < MAX_SPARSE_INDEX_RATIO ) {
Set variantIds = new HashSet<>();
- for(String variantSpec : variantArray) {
- int variantIndexArrayIndex = Arrays.binarySearch(variantIndex, variantSpec);
- variantIds.add(variantIndexArrayIndex);
+ for(Integer variantIndex : variantIndexIntArray) {
+ variantIds.add(variantIndex);
}
return new SparseVariantIndex(variantIds);
} else {
boolean[] variantIndexArray = new boolean[variantIndex.length];
int x = 0;
- for(String variantSpec : variantArray) {
- int variantIndexArrayIndex = Arrays.binarySearch(variantIndex, variantSpec);
+ for(Integer variantIndex : variantIndexIntArray) {
// todo: shouldn't this be greater than or equal to 0? 0 is a valid index
- if (variantIndexArrayIndex > 0) {
- variantIndexArray[variantIndexArrayIndex] = true;
+ if (variantIndex > 0) {
+ variantIndexArray[variantIndex] = true;
}
}
log.debug("Cache value for key " + infoColumn_valueKey + " calculated in " + (System.currentTimeMillis() - time) + " ms");
diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantService.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantService.java
index 616d6f0f..f8f99c0b 100644
--- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantService.java
+++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantService.java
@@ -5,6 +5,7 @@
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder;
import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage;
+import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJavaIndexedStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -12,7 +13,6 @@
import java.io.*;
import java.math.BigInteger;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -54,13 +54,13 @@ public Collection filterVariantSetForPatientSet(Set variantSet,
}
}
- public VariantService() throws IOException, ClassNotFoundException, InterruptedException {
- genomicDataDirectory = System.getProperty("HPDS_DATA_HOME", "/opt/local/hpds/all/");
+ public VariantService() {
+ genomicDataDirectory = System.getProperty("HPDS_GENOMIC_DATA_DIRECTORY", "/opt/local/hpds/all/");
VARIANT_INDEX_FBBIS_STORAGE_FILE = genomicDataDirectory + "variantIndex_fbbis_storage.javabin";
VARIANT_INDEX_FBBIS_FILE = genomicDataDirectory + "variantIndex_fbbis.javabin";
BUCKET_INDEX_BY_SAMPLE_FILE = genomicDataDirectory + "BucketIndexBySample.javabin";
- variantStore = VariantStore.deserializeInstance(genomicDataDirectory);
+ variantStore = loadVariantStore();
try {
loadGenomicCacheFiles();
} catch (Exception e) {
@@ -68,66 +68,29 @@ public VariantService() throws IOException, ClassNotFoundException, InterruptedE
}
}
- public void populateVariantIndex() throws InterruptedException {
+ private VariantStore loadVariantStore() {
+ VariantStore variantStore;
+ try {
+ variantStore = VariantStore.readInstance(genomicDataDirectory);
+ } catch (Exception e) {
+ variantStore = new VariantStore();
+ variantStore.setPatientIds(new String[0]);
+ log.warn("Unable to load variant store");
+ }
+ return variantStore;
+ }
+
+ public String[] loadVariantIndex() {
//skip if we have no variants
if(variantStore.getPatientIds().length == 0) {
- variantIndex = new String[0];
log.warn("No Genomic Data found. Skipping variant Indexing");
- return;
- }
- int[] numVariants = {0};
- HashMap contigMap = new HashMap<>();
-
- ExecutorService ex = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- variantStore.getVariantMaskStorage().entrySet().forEach(entry->{
- ex.submit(()->{
- int numVariantsInContig = 0;
- FileBackedByteIndexedStorage> storage = entry.getValue();
- HashMap bucketMap = new HashMap<>();
- log.info("Creating bucketMap for contig " + entry.getKey());
- for(Integer bucket: storage.keys()){
- try {
- ConcurrentHashMap bucketStorage = storage.get(bucket);
- numVariantsInContig += bucketStorage.size();
- bucketMap.put(bucket, bucketStorage.keySet().toArray(new String[0]));
- } catch (IOException e) {
- log.error("an error occurred", e);
- }
- };
- log.info("Completed bucketMap for contig " + entry.getKey());
- String[] variantsInContig = new String[numVariantsInContig];
- int current = 0;
- for(String[] bucketList : bucketMap.values()) {
- System.arraycopy(bucketList, 0, variantsInContig, current, bucketList.length);
- current = current + bucketList.length;
- }
- bucketMap.clear();
- synchronized(numVariants) {
- log.info("Found " + variantsInContig.length + " variants in contig " + entry.getKey() + ".");
- contigMap.put(entry.getKey(), variantsInContig);
- numVariants[0] += numVariantsInContig;
- }
- });
- });
- ex.shutdown();
- while(!ex.awaitTermination(10, TimeUnit.SECONDS)) {
- Thread.sleep(20000);
- log.info("Awaiting completion of variant index");
+ return new String[0];
}
- log.info("Found " + numVariants[0] + " total variants.");
-
- variantIndex = new String[numVariants[0]];
+ String[] variantIndex = VariantStore.loadVariantIndexFromFile(genomicDataDirectory);
- int current = 0;
- for(String[] contigList : contigMap.values()) {
- System.arraycopy(contigList, 0, variantIndex, current, contigList.length);
- current = current + contigList.length;
- }
- contigMap.clear();
-
- Arrays.sort(variantIndex);
log.info("Index created with " + variantIndex.length + " total variants.");
+ return variantIndex;
}
/**
@@ -141,9 +104,9 @@ private void loadGenomicCacheFiles() throws FileNotFoundException, IOException,
if(variantIndex==null) {
if(!new File(VARIANT_INDEX_FBBIS_FILE).exists()) {
log.info("Creating new " + VARIANT_INDEX_FBBIS_FILE);
- populateVariantIndex();
+ this.variantIndex = loadVariantIndex();
FileBackedByteIndexedStorage fbbis =
- new FileBackedByteIndexedStorage(Integer.class, String[].class, new File(VARIANT_INDEX_FBBIS_STORAGE_FILE));
+ new FileBackedJavaIndexedStorage<>(Integer.class, String[].class, new File(VARIANT_INDEX_FBBIS_STORAGE_FILE));
try (ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(VARIANT_INDEX_FBBIS_FILE)));
){
@@ -182,17 +145,10 @@ private void loadGenomicCacheFiles() throws FileNotFoundException, IOException,
for( int i = 0; i < bucketCount; i++) {
final int _i = i;
- ex.submit(new Runnable() {
- @Override
- public void run() {
- try {
- String[] variantIndexBucket = indexStore.get(_i);
- System.arraycopy(variantIndexBucket, 0, _varaiantIndex2, (_i * VARIANT_INDEX_BLOCK_SIZE), variantIndexBucket.length);
- log.info("loaded " + (_i * VARIANT_INDEX_BLOCK_SIZE) + " block");
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
+ ex.submit(() -> {
+ String[] variantIndexBucket = indexStore.get(_i);
+ System.arraycopy(variantIndexBucket, 0, _varaiantIndex2, (_i * VARIANT_INDEX_BLOCK_SIZE), variantIndexBucket.length);
+ log.info("loaded " + (_i * VARIANT_INDEX_BLOCK_SIZE) + " block");
});
}
objectInputStream.close();
diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessorTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessorTest.java
index e22bea5e..9a9e63b4 100644
--- a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessorTest.java
+++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AbstractProcessorTest.java
@@ -42,12 +42,12 @@ public class AbstractProcessorTest {
@Before
public void setup() {
FileBackedByteIndexedInfoStore mockInfoStore = mock(FileBackedByteIndexedInfoStore.class);
- FileBackedByteIndexedStorage mockIndexedStorage = mock(FileBackedByteIndexedStorage.class);
+ FileBackedByteIndexedStorage mockIndexedStorage = mock(FileBackedByteIndexedStorage.class);
when(mockIndexedStorage.keys()).thenReturn(new HashSet<>(EXAMPLE_GENES_WITH_VARIANT));
when(mockInfoStore.getAllValues()).thenReturn(mockIndexedStorage);
FileBackedByteIndexedInfoStore mockInfoStore2 = mock(FileBackedByteIndexedInfoStore.class);
- FileBackedByteIndexedStorage mockIndexedStorage2 = mock(FileBackedByteIndexedStorage.class);
+ FileBackedByteIndexedStorage mockIndexedStorage2 = mock(FileBackedByteIndexedStorage.class);
when(mockIndexedStorage2.keys()).thenReturn(new HashSet<>(EXAMPLE_VARIANT_SEVERITIES));
when(mockInfoStore2.getAllValues()).thenReturn(mockIndexedStorage2);
diff --git a/service/pom.xml b/service/pom.xml
index 734e3a94..728b3af3 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -5,7 +5,7 @@
pic-sure-hpds
edu.harvard.hms.dbmi.avillach.hpds
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
service
diff --git a/war/pom.xml b/war/pom.xml
index e71e6efa..8decd8b5 100644
--- a/war/pom.xml
+++ b/war/pom.xml
@@ -6,7 +6,7 @@
pic-sure-hpds
edu.harvard.hms.dbmi.avillach.hpds
- 1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
hpds-war
war