diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java similarity index 90% rename from processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java rename to processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java index 6968a587..fd931fce 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java @@ -8,27 +8,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import java.util.stream.Stream; @Component -public class PfbProcessor implements HpdsProcessor { +public class MultiValueQueryProcessor implements HpdsProcessor { public static final String PATIENT_ID_FIELD_NAME = "patient_id"; - private final int ID_BATCH_SIZE; + private final int idBatchSize; private final AbstractProcessor abstractProcessor; - private Logger log = LoggerFactory.getLogger(PfbProcessor.class); + private Logger log = LoggerFactory.getLogger(MultiValueQueryProcessor.class); @Autowired - public PfbProcessor(AbstractProcessor abstractProcessor) { + public MultiValueQueryProcessor(AbstractProcessor abstractProcessor, @Value("${ID_BATCH_SIZE:0}") int idBatchSize) { this.abstractProcessor = abstractProcessor; - ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0")); + this.idBatchSize = idBatchSize; } @Override @@ -43,7 +43,7 @@ public String[] getHeaderRow(Query query) { public void runQuery(Query query, AsyncResult result) { Set idList = abstractProcessor.getPatientSubsetForQuery(query); log.info("Processing " + idList.size() + " rows for result " + result.getId()); - Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream() + Lists.partition(new ArrayList<>(idList), idBatchSize).stream() .forEach(patientIds -> { Map>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds)); List>> fieldValuesPerPatient = patientIds.stream().map(patientId -> { diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java index 3302d06e..5da10b36 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java @@ -1,5 +1,6 @@ package edu.harvard.hms.dbmi.avillach.hpds.processing.io; +import com.google.common.base.Joiner; import org.springframework.http.MediaType; import java.io.File; @@ -9,6 +10,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; public class CsvWriter implements ResultWriter { @@ -43,13 +45,27 @@ public void writeEntity(Collection data) { try { csvWriter.write(fileWriter, data); } catch (IOException e) { - throw new RuntimeException("IOException while appending to CSV file", e); + throw new UncheckedIOException("IOException while appending to CSV file", e); } } @Override public void writeMultiValueEntity(Collection>> data) { - throw new RuntimeException("Method not implemented"); + List collect = data.stream().map(line -> { + return line.stream() + .map(cell -> { + if (cell == null) { + return ""; + } + return Joiner.on('\t').join(cell); + }) + .toArray(String[]::new); + }).toList(); + try { + csvWriter.write(fileWriter, collect); + } catch (IOException e) { + throw new UncheckedIOException("IOException while appending to CSV file", e); + } } @Override diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java index 04e5863f..c58705f3 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java @@ -180,8 +180,6 @@ public ResponseEntity query(@RequestBody QueryRequest queryJson) { } catch (IOException e) { log.error("IOException caught in query processing:", e); return ResponseEntity.status(500).build(); - } catch (ClassNotFoundException e) { - return ResponseEntity.status(500).build(); } } else { QueryStatus status = new QueryStatus(); diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java index a41a94d2..a00a8ad0 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java @@ -15,8 +15,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableMap; - import edu.harvard.dbmi.avillach.util.UUIDv5; import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; import edu.harvard.hms.dbmi.avillach.hpds.processing.*; @@ -48,7 +46,7 @@ public class QueryService { private final QueryProcessor queryProcessor; private final TimeseriesProcessor timeseriesProcessor; private final CountProcessor countProcessor; - private final PfbProcessor pfbProcessor; + private final MultiValueQueryProcessor multiValueQueryProcessor; HashMap results = new HashMap<>(); @@ -58,7 +56,7 @@ public QueryService (AbstractProcessor abstractProcessor, QueryProcessor queryProcessor, TimeseriesProcessor timeseriesProcessor, CountProcessor countProcessor, - PfbProcessor pfbProcessor, + MultiValueQueryProcessor multiValueQueryProcessor, @Value("${SMALL_JOB_LIMIT}") Integer smallJobLimit, @Value("${SMALL_TASK_THREADS}") Integer smallTaskThreads, @Value("${LARGE_TASK_THREADS}") Integer largeTaskThreads) { @@ -66,7 +64,7 @@ public QueryService (AbstractProcessor abstractProcessor, this.queryProcessor = queryProcessor; this.timeseriesProcessor = timeseriesProcessor; this.countProcessor = countProcessor; - this.pfbProcessor = pfbProcessor; + this.multiValueQueryProcessor = multiValueQueryProcessor; SMALL_JOB_LIMIT = smallJobLimit; SMALL_TASK_THREADS = smallTaskThreads; @@ -83,7 +81,7 @@ public QueryService (AbstractProcessor abstractProcessor, smallTaskExecutor = createExecutor(smallTaskExecutionQueue, SMALL_TASK_THREADS); } - public AsyncResult runQuery(Query query) throws ClassNotFoundException, IOException { + public AsyncResult runQuery(Query query) throws IOException { // Merging fields from filters into selected fields for user validation of results mergeFilterFieldsIntoSelectedFields(query); @@ -112,11 +110,10 @@ public int runCount(Query query) throws InterruptedException, ExecutionException return countProcessor.runCounts(query); } - private AsyncResult initializeResult(Query query) throws ClassNotFoundException, FileNotFoundException, IOException { + private AsyncResult initializeResult(Query query) throws IOException { HpdsProcessor p; switch(query.getExpectedResultType()) { - case DATAFRAME : case SECRET_ADMIN_DATAFRAME: p = queryProcessor; break; @@ -129,7 +126,8 @@ private AsyncResult initializeResult(Query query) throws ClassNotFoundException, p = countProcessor; break; case DATAFRAME_PFB: - p = pfbProcessor; + case DATAFRAME: + p = multiValueQueryProcessor; break; default : throw new RuntimeException("UNSUPPORTED RESULT TYPE"); diff --git a/service/src/main/resources/application-integration-test.properties b/service/src/main/resources/application-integration-test.properties index f85994f6..90547f24 100644 --- a/service/src/main/resources/application-integration-test.properties +++ b/service/src/main/resources/application-integration-test.properties @@ -1,6 +1,7 @@ SMALL_JOB_LIMIT = 100 SMALL_TASK_THREADS = 1 LARGE_TASK_THREADS = 1 +ID_BATCH_SIZE=1000 VCF_EXCERPT_ENABLED=true hpds.genomicProcessor.impl=local diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java new file mode 100644 index 00000000..f3aa4ef4 --- /dev/null +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java @@ -0,0 +1,67 @@ +package edu.harvard.hms.dbmi.avillach.hpds.service; + +import de.siegmar.fastcsv.reader.CsvContainer; +import de.siegmar.fastcsv.reader.CsvReader; +import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; +import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType; +import edu.harvard.hms.dbmi.avillach.hpds.processing.AsyncResult; +import edu.harvard.hms.dbmi.avillach.hpds.test.util.BuildIntegrationTestEnvironment; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +@ExtendWith(SpringExtension.class) +@EnableAutoConfiguration +@SpringBootTest(classes = edu.harvard.hms.dbmi.avillach.hpds.service.HpdsApplication.class) +@ActiveProfiles("integration-test") +class QueryServiceTest { + + @Autowired + private QueryService queryService; + + @BeforeAll + public static void beforeAll() { + BuildIntegrationTestEnvironment instance = BuildIntegrationTestEnvironment.INSTANCE; + } + + @Test + public void dataframeMulti() throws IOException, InterruptedException { + Query query = new Query(); + List variantInfoFilters = new ArrayList<>(); + Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter(); + variantInfoFilter.categoryVariantInfoFilters = Map.of("Gene_with_variant", new String[]{"LOC102723996", "LOC101928576"}); + variantInfoFilters.add(variantInfoFilter); + query.setVariantInfoFilters(variantInfoFilters); + query.setFields(List.of("\\open_access-1000Genomes\\data\\SYNTHETIC_AGE\\")); + query.setExpectedResultType(ResultType.DATAFRAME); + + AsyncResult asyncResult = queryService.runQuery(query); + + int retries = 0; + while ((AsyncResult.Status.RUNNING.equals(asyncResult.getStatus()) || AsyncResult.Status.PENDING.equals(asyncResult.getStatus())) && retries < 10) { + retries++; + Thread.sleep(200); + } + + assertEquals(AsyncResult.Status.SUCCESS, asyncResult.getStatus()); + CsvReader csvReader = new CsvReader(); + CsvContainer csvContainer = csvReader.read(asyncResult.getFile(), StandardCharsets.UTF_8); + // 22 plus header + assertEquals(23, csvContainer.getRows().size()); + } + +} \ No newline at end of file