From 6465f5a0cac4fa9d3fda7bed2c83c7076b7f298c Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Fri, 19 Jan 2024 09:32:59 -0700 Subject: [PATCH] Parallelize genomic node construction --- .../hpds/processing/GenomicProcessorConfig.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java index 86ec80e8..68e1783e 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java @@ -6,6 +6,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.util.ArrayList; import java.util.List; @@ -28,10 +31,11 @@ public GenomicProcessor localGenomicProcessor() { @Bean(name = "localDistributedGenomicProcessor") @ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "localDistributed") public GenomicProcessor localDistributedGenomicProcessor() { - List processorNodes = IntStream.range(1, 23) - .mapToObj(i -> new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory + "/" + i + "/")) - .collect(Collectors.toList()); - return new GenomicProcessorParentImpl(processorNodes); + List genomicProcessors = Flux.range(1, 22) + .flatMap(i -> Mono.fromCallable(() -> (GenomicProcessor) new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory + "/" + i + "/")).subscribeOn(Schedulers.boundedElastic())) + .collectList() + .block(); + return new GenomicProcessorParentImpl(genomicProcessors); } @Bean(name = "integrationTestGenomicProcessor") @@ -47,7 +51,7 @@ public GenomicProcessor integrationTestGenomicProcessor() { @Bean(name = "remoteGenomicProcessor") @ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "remote") public GenomicProcessor remoteGenomicProcessor() { - // Just for testing, for now, move to a configuration file or something + // todo: Just for testing, for now, move to a configuration file or something String[] hosts = new String[] {"http://localhost:8090/", "http://localhost:8091/"}; List nodes = List.of( new GenomicProcessorRestClient(hosts[0]),