diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java index e66f0c30..88b4fd14 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/GenomicDatasetMerger.java @@ -118,7 +118,8 @@ public Map mergeVariantIndexes() throws if (!infoStores1.keySet().equals(infoStores2.keySet())) { throw new IllegalStateException("Info stores do not match"); } - for (Map.Entry infoStores1Entry : infoStores1.entrySet()) { + //for (Map.Entry infoStores1Entry : infoStores1.entrySet()) { + infoStores1.entrySet().parallelStream().forEach(infoStores1Entry -> { FileBackedByteIndexedInfoStore infoStore2 = infoStores2.get(infoStores1Entry.getKey()); FileBackedByteIndexedStorage allValuesStore1 = infoStores1Entry.getValue().getAllValues(); @@ -137,9 +138,14 @@ public Map mergeVariantIndexes() throws InfoStore infoStore = new InfoStore(infoStore2.description, null, infoStores1Entry.getKey()); infoStore.allValues = mergedInfoStoreValues; - FileBackedByteIndexedInfoStore mergedStore = new FileBackedByteIndexedInfoStore(new File(outputDirectory), infoStore); + FileBackedByteIndexedInfoStore mergedStore = null; + try { + mergedStore = new FileBackedByteIndexedInfoStore(new File(outputDirectory), infoStore); + } catch (IOException e) { + throw new RuntimeException(e); + } mergedInfoStores.put(infoStores1Entry.getKey(), mergedStore); - } + }); mergedVariantStore.setVariantSpecIndex(variantSpecList.toArray(new String[0])); return mergedInfoStores; @@ -161,7 +167,7 @@ private String[] mergePatientIds() { */ public Map>> mergeChromosomeMasks() { Map>> mergedMaskStorage = new ConcurrentHashMap<>(); - variantStore1.getVariantMaskStorage().keySet().parallelStream().forEach(chromosome -> { + variantStore1.getVariantMaskStorage().keySet().forEach(chromosome -> { try { mergedMaskStorage.put(chromosome, mergeChromosomeMask(chromosome)); } catch (FileNotFoundException e) { @@ -218,7 +224,7 @@ public FileBackedJsonIndexStorage> variantMaskStorage2 = variantStore2.getVariantMaskStorage().get(chromosome); FileBackedJsonIndexStorage> merged = new FileBackedStorageVariantMasksImpl(new File(outputDirectory + chromosome + "masks.bin")); - variantMaskStorage1.keys().forEach(key -> { + variantMaskStorage1.keys().parallelStream().forEach(key -> { Map masks1 = variantMaskStorage1.get(key); Map masks2 = variantMaskStorage2.get(key); if (masks2 == null) { diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java index ba3e5109..baf1a4c1 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java @@ -5,15 +5,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.PropertySource; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; @SpringBootApplication public class GenomicProcessorConfig {