Skip to content

Commit

Permalink
Fix async result serialization issue, clean up some genomic configura…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
ramari16 committed Jan 22, 2024
1 parent 2c96b80 commit 08c7c29
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,14 @@ public GenomicProcessor localDistributedGenomicProcessor() {
return new GenomicProcessorParentImpl(genomicProcessors);
}

@Bean(name = "integrationTestGenomicProcessor")
@ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "integrationTest")
public GenomicProcessor integrationTestGenomicProcessor() {
// todo: parameterize these
return new GenomicProcessorParentImpl(List.of(
new GenomicProcessorNodeImpl("/Users/ryan/dev/pic-sure-hpds-test/data/orchestration/1040.22/all/"),
new GenomicProcessorNodeImpl("/Users/ryan/dev/pic-sure-hpds-test/data/orchestration/1040.20/all/")
));
}

@Bean(name = "remoteGenomicProcessor")
@ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "remote")
public GenomicProcessor remoteGenomicProcessor() {
// todo: Just for testing, for now, move to a configuration file or something
String[] hosts = new String[] {"http://localhost:8090/", "http://localhost:8091/"};
List<GenomicProcessor> nodes = List.of(
new GenomicProcessorRestClient(hosts[0]),
new GenomicProcessorRestClient(hosts[1])
);
return new GenomicProcessorParentImpl(nodes);
public GenomicProcessor remoteGenomicProcessor(@Value("${hpds.genomicProcessor.remoteHosts}") List<String> remoteHosts) {
List<GenomicProcessor> genomicProcessors = Flux.fromIterable(remoteHosts)
.flatMap(remoteHost -> Mono.fromCallable(() -> (GenomicProcessor) new GenomicProcessorRestClient(remoteHost)).subscribeOn(Schedulers.boundedElastic()))
.collectList()
.block();
// todo: validate remote processors are valid
return new GenomicProcessorParentImpl(genomicProcessors);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package edu.harvard.hms.dbmi.avillach.hpds.service;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -194,7 +197,7 @@ private QueryStatus convertToQueryStatus(AsyncResult entity) {
}

@PostMapping(value = "/query/{resourceQueryId}/result", produces = MediaType.TEXT_PLAIN_VALUE)
public ResponseEntity queryResult(@PathVariable("resourceQueryId") UUID queryId, @RequestBody QueryRequest resultRequest) {
public ResponseEntity queryResult(@PathVariable("resourceQueryId") UUID queryId, @RequestBody QueryRequest resultRequest) throws IOException {
AsyncResult result = queryService.getResultFor(queryId.toString());
if (result == null) {
// This happens sometimes when users immediately request the status for a query
Expand All @@ -215,7 +218,7 @@ public ResponseEntity queryResult(@PathVariable("resourceQueryId") UUID queryId,
result.stream.open();
return ResponseEntity.ok()
.contentType(MediaType.TEXT_PLAIN)
.body(result.stream);
.body(new String(result.stream.readAllBytes(), StandardCharsets.UTF_8));
} else {
return ResponseEntity.status(400).body("Status : " + result.status.name());
}
Expand Down Expand Up @@ -268,12 +271,6 @@ public PaginatedSearchResult<String> searchGenomicConceptValues(
return paginator.paginate(matchingValues, page, size);
}

@GetMapping("/test")
public ResponseEntity test() {
List<InfoColumnMeta> infoColumnMeta = abstractProcessor.getInfoStoreMeta();
return ResponseEntity.ok(infoColumnMeta);
}

private ResponseEntity _querySync(QueryRequest resultRequest) throws IOException {
Query incomingQuery;
incomingQuery = convertIncomingQuery(resultRequest);
Expand All @@ -297,7 +294,7 @@ private ResponseEntity _querySync(QueryRequest resultRequest) throws IOException
AsyncResult result = queryService.getResultFor(status.getResourceResultId());
if (result.status == AsyncResult.Status.SUCCESS) {
result.stream.open();
return queryOkResponse(result.stream, incomingQuery);
return queryOkResponse(new String(result.stream.readAllBytes(), StandardCharsets.UTF_8), incomingQuery, MediaType.TEXT_PLAIN);
}
return ResponseEntity.status(400).contentType(MediaType.APPLICATION_JSON).body("Status : " + result.status.name());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ SMALL_JOB_LIMIT = 100
SMALL_TASK_THREADS = 1
LARGE_TASK_THREADS = 1

hpds.genomicProcessor.impl=integrationTest
hpds.genomicProcessor.impl=localDistributed
HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/phs0000946/
10 changes: 8 additions & 2 deletions service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,11 @@ SMALL_JOB_LIMIT = 100
SMALL_TASK_THREADS = 1
LARGE_TASK_THREADS = 1

hpds.genomicProcessor.impl=local
HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/
#hpds.genomicProcessor.impl=local
#HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/

#hpds.genomicProcessor.impl=localDistributed
#HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/phs0000946/

hpds.genomicProcessor.impl=remote
hpds.genomicProcessor.remoteHosts=http://localhost:8090/,http://localhost:8091/

0 comments on commit 08c7c29

Please sign in to comment.