Skip to content

Commit

Permalink
Add parallelization to genomic dataset merger
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Jan 24, 2024
1 parent 08c7c29 commit 1dd5b56
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public Map<String, FileBackedByteIndexedInfoStore> mergeVariantIndexes() throws
if (!infoStores1.keySet().equals(infoStores2.keySet())) {
throw new IllegalStateException("Info stores do not match");
}
for (Map.Entry<String, FileBackedByteIndexedInfoStore> infoStores1Entry : infoStores1.entrySet()) {
//for (Map.Entry<String, FileBackedByteIndexedInfoStore> infoStores1Entry : infoStores1.entrySet()) {
infoStores1.entrySet().parallelStream().forEach(infoStores1Entry -> {
FileBackedByteIndexedInfoStore infoStore2 = infoStores2.get(infoStores1Entry.getKey());

FileBackedByteIndexedStorage<String, Integer[]> allValuesStore1 = infoStores1Entry.getValue().getAllValues();
Expand All @@ -137,9 +138,14 @@ public Map<String, FileBackedByteIndexedInfoStore> mergeVariantIndexes() throws

InfoStore infoStore = new InfoStore(infoStore2.description, null, infoStores1Entry.getKey());
infoStore.allValues = mergedInfoStoreValues;
FileBackedByteIndexedInfoStore mergedStore = new FileBackedByteIndexedInfoStore(new File(outputDirectory), infoStore);
FileBackedByteIndexedInfoStore mergedStore = null;
try {
mergedStore = new FileBackedByteIndexedInfoStore(new File(outputDirectory), infoStore);
} catch (IOException e) {
throw new RuntimeException(e);
}
mergedInfoStores.put(infoStores1Entry.getKey(), mergedStore);
}
});

mergedVariantStore.setVariantSpecIndex(variantSpecList.toArray(new String[0]));
return mergedInfoStores;
Expand All @@ -161,7 +167,7 @@ private String[] mergePatientIds() {
*/
public Map<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariantMasks>>> mergeChromosomeMasks() {
Map<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariantMasks>>> mergedMaskStorage = new ConcurrentHashMap<>();
variantStore1.getVariantMaskStorage().keySet().parallelStream().forEach(chromosome -> {
variantStore1.getVariantMaskStorage().keySet().forEach(chromosome -> {
try {
mergedMaskStorage.put(chromosome, mergeChromosomeMask(chromosome));
} catch (FileNotFoundException e) {
Expand Down Expand Up @@ -218,7 +224,7 @@ public FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariantMask
FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariantMasks>> variantMaskStorage2 = variantStore2.getVariantMaskStorage().get(chromosome);

FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariantMasks>> merged = new FileBackedStorageVariantMasksImpl(new File(outputDirectory + chromosome + "masks.bin"));
variantMaskStorage1.keys().forEach(key -> {
variantMaskStorage1.keys().parallelStream().forEach(key -> {
Map<String, VariantMasks> masks1 = variantMaskStorage1.get(key);
Map<String, VariantMasks> masks2 = variantMaskStorage2.get(key);
if (masks2 == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@SpringBootApplication
public class GenomicProcessorConfig {
Expand Down

0 comments on commit 1dd5b56

Please sign in to comment.