From 83640e7c7f99019534af59677dc2e5f7ad2f19dc Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Mon, 18 Dec 2023 10:01:31 -0500 Subject: [PATCH] Fix thread blocking issue, confirmed with integration tests --- .../hpds/processing/GenomicProcessorConfig.java | 10 ++++++++++ .../hpds/processing/GenomicProcessorNodeImpl.java | 2 +- .../hpds/processing/GenomicProcessorParentImpl.java | 1 - .../resources/application-integration-test.properties | 5 +++++ .../service/util/AbstractProcessorIntegrationTest.java | 3 ++- 5 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 service/src/main/resources/application-integration-test.properties diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java index fbd5b7b6..9087a088 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java @@ -38,6 +38,16 @@ public GenomicProcessor localDistributedGenomicProcessor() { return new GenomicProcessorParentImpl(processorNodes); } + @Bean(name = "integrationTestGenomicProcessor") + @ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "integrationTest") + public GenomicProcessor integrationTestGenomicProcessor() { + // todo: parameterize these + return new GenomicProcessorParentImpl(List.of( + new GenomicProcessorNodeImpl("/Users/ryan/dev/pic-sure-hpds-test/data/orchestration/1040.22/all/"), + new GenomicProcessorNodeImpl("/Users/ryan/dev/pic-sure-hpds-test/data/orchestration/1040.20/all/") + )); + } + @Bean(name = "remoteGenomicProcessor") @ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "remote") public GenomicProcessor remoteGenomicProcessor() { diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java index 1edfc982..ae5b242c 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java @@ -78,7 +78,7 @@ public GenomicProcessorNodeImpl(String genomicDataDirectory) { @Override public Mono getPatientMask(DistributableQuery distributableQuery) { - return Mono.fromCallable(() -> runGetPatientMask(distributableQuery)); + return Mono.fromCallable(() -> runGetPatientMask(distributableQuery)).subscribeOn(Schedulers.boundedElastic()); } public BigInteger runGetPatientMask(DistributableQuery distributableQuery) { // log.debug("filterdIDSets START size: " + filteredIdSets.size()); diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java index 43c96a95..22be3f74 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorParentImpl.java @@ -48,7 +48,6 @@ public GenomicProcessorParentImpl(List nodes) { @Override public Mono getPatientMask(DistributableQuery distributableQuery) { Mono result = Flux.just(nodes.toArray(GenomicProcessor[]::new)) - .publishOn(Schedulers.boundedElastic()) .flatMap(node -> node.getPatientMask(distributableQuery)) .reduce(BigInteger::or); return result; diff --git a/service/src/main/resources/application-integration-test.properties b/service/src/main/resources/application-integration-test.properties new file mode 100644 index 00000000..b2b8b32e --- /dev/null +++ b/service/src/main/resources/application-integration-test.properties @@ -0,0 +1,5 @@ +SMALL_JOB_LIMIT = 100 +SMALL_TASK_THREADS = 1 +LARGE_TASK_THREADS = 1 + +hpds.genomicProcessor.impl=integrationTest \ No newline at end of file diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/util/AbstractProcessorIntegrationTest.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/util/AbstractProcessorIntegrationTest.java index 2ddd4141..bd771cda 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/util/AbstractProcessorIntegrationTest.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/util/AbstractProcessorIntegrationTest.java @@ -13,7 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit.jupiter.SpringExtension; import java.util.*; @@ -24,6 +24,7 @@ @ExtendWith(SpringExtension.class) @EnableAutoConfiguration @SpringBootTest(classes = HpdsApplication.class) +@ActiveProfiles("integration-test") public class AbstractProcessorIntegrationTest { @Autowired