Skip to content

Commit

Permalink
ALS-7581: Implement multiple values per patient in CSV output (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 authored Nov 15, 2024
1 parent 6378f5e commit 0ecb88d
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class PfbProcessor implements HpdsProcessor {
public class MultiValueQueryProcessor implements HpdsProcessor {

public static final String PATIENT_ID_FIELD_NAME = "patient_id";
private final int ID_BATCH_SIZE;
private final int idBatchSize;
private final AbstractProcessor abstractProcessor;

private Logger log = LoggerFactory.getLogger(PfbProcessor.class);
private Logger log = LoggerFactory.getLogger(MultiValueQueryProcessor.class);


@Autowired
public PfbProcessor(AbstractProcessor abstractProcessor) {
public MultiValueQueryProcessor(AbstractProcessor abstractProcessor, @Value("${ID_BATCH_SIZE:0}") int idBatchSize) {
this.abstractProcessor = abstractProcessor;
ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0"));
this.idBatchSize = idBatchSize;
}

@Override
Expand All @@ -43,7 +43,7 @@ public String[] getHeaderRow(Query query) {
public void runQuery(Query query, AsyncResult result) {
Set<Integer> idList = abstractProcessor.getPatientSubsetForQuery(query);
log.info("Processing " + idList.size() + " rows for result " + result.getId());
Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream()
Lists.partition(new ArrayList<>(idList), idBatchSize).stream()
.forEach(patientIds -> {
Map<String, Map<Integer, List<String>>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds));
List<List<List<String>>> fieldValuesPerPatient = patientIds.stream().map(patientId -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.io;

import com.google.common.base.Joiner;
import org.springframework.http.MediaType;

import java.io.File;
Expand All @@ -9,6 +10,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

public class CsvWriter implements ResultWriter {

Expand Down Expand Up @@ -43,13 +45,27 @@ public void writeEntity(Collection<String[]> data) {
try {
csvWriter.write(fileWriter, data);
} catch (IOException e) {
throw new RuntimeException("IOException while appending to CSV file", e);
throw new UncheckedIOException("IOException while appending to CSV file", e);
}
}

@Override
public void writeMultiValueEntity(Collection<List<List<String>>> data) {
throw new RuntimeException("Method not implemented");
List<String[]> collect = data.stream().map(line -> {
return line.stream()
.map(cell -> {
if (cell == null) {
return "";
}
return Joiner.on('\t').join(cell);
})
.toArray(String[]::new);
}).toList();
try {
csvWriter.write(fileWriter, collect);
} catch (IOException e) {
throw new UncheckedIOException("IOException while appending to CSV file", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ public ResponseEntity<QueryStatus> query(@RequestBody QueryRequest queryJson) {
} catch (IOException e) {
log.error("IOException caught in query processing:", e);
return ResponseEntity.status(500).build();
} catch (ClassNotFoundException e) {
return ResponseEntity.status(500).build();
}
} else {
QueryStatus status = new QueryStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;

import edu.harvard.dbmi.avillach.util.UUIDv5;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import edu.harvard.hms.dbmi.avillach.hpds.processing.*;
Expand Down Expand Up @@ -48,7 +46,7 @@ public class QueryService {
private final QueryProcessor queryProcessor;
private final TimeseriesProcessor timeseriesProcessor;
private final CountProcessor countProcessor;
private final PfbProcessor pfbProcessor;
private final MultiValueQueryProcessor multiValueQueryProcessor;

HashMap<String, AsyncResult> results = new HashMap<>();

Expand All @@ -58,15 +56,15 @@ public QueryService (AbstractProcessor abstractProcessor,
QueryProcessor queryProcessor,
TimeseriesProcessor timeseriesProcessor,
CountProcessor countProcessor,
PfbProcessor pfbProcessor,
MultiValueQueryProcessor multiValueQueryProcessor,
@Value("${SMALL_JOB_LIMIT}") Integer smallJobLimit,
@Value("${SMALL_TASK_THREADS}") Integer smallTaskThreads,
@Value("${LARGE_TASK_THREADS}") Integer largeTaskThreads) {
this.abstractProcessor = abstractProcessor;
this.queryProcessor = queryProcessor;
this.timeseriesProcessor = timeseriesProcessor;
this.countProcessor = countProcessor;
this.pfbProcessor = pfbProcessor;
this.multiValueQueryProcessor = multiValueQueryProcessor;

SMALL_JOB_LIMIT = smallJobLimit;
SMALL_TASK_THREADS = smallTaskThreads;
Expand All @@ -83,7 +81,7 @@ public QueryService (AbstractProcessor abstractProcessor,
smallTaskExecutor = createExecutor(smallTaskExecutionQueue, SMALL_TASK_THREADS);
}

public AsyncResult runQuery(Query query) throws ClassNotFoundException, IOException {
public AsyncResult runQuery(Query query) throws IOException {
// Merging fields from filters into selected fields for user validation of results
mergeFilterFieldsIntoSelectedFields(query);

Expand Down Expand Up @@ -112,11 +110,10 @@ public int runCount(Query query) throws InterruptedException, ExecutionException
return countProcessor.runCounts(query);
}

private AsyncResult initializeResult(Query query) throws ClassNotFoundException, FileNotFoundException, IOException {
private AsyncResult initializeResult(Query query) throws IOException {

HpdsProcessor p;
switch(query.getExpectedResultType()) {
case DATAFRAME :
case SECRET_ADMIN_DATAFRAME:
p = queryProcessor;
break;
Expand All @@ -129,7 +126,8 @@ private AsyncResult initializeResult(Query query) throws ClassNotFoundException,
p = countProcessor;
break;
case DATAFRAME_PFB:
p = pfbProcessor;
case DATAFRAME:
p = multiValueQueryProcessor;
break;
default :
throw new RuntimeException("UNSUPPORTED RESULT TYPE");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
SMALL_JOB_LIMIT = 100
SMALL_TASK_THREADS = 1
LARGE_TASK_THREADS = 1
ID_BATCH_SIZE=1000
VCF_EXCERPT_ENABLED=true

hpds.genomicProcessor.impl=local
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package edu.harvard.hms.dbmi.avillach.hpds.service;

import de.siegmar.fastcsv.reader.CsvContainer;
import de.siegmar.fastcsv.reader.CsvReader;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType;
import edu.harvard.hms.dbmi.avillach.hpds.processing.AsyncResult;
import edu.harvard.hms.dbmi.avillach.hpds.test.util.BuildIntegrationTestEnvironment;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;
@ExtendWith(SpringExtension.class)
@EnableAutoConfiguration
@SpringBootTest(classes = edu.harvard.hms.dbmi.avillach.hpds.service.HpdsApplication.class)
@ActiveProfiles("integration-test")
class QueryServiceTest {

@Autowired
private QueryService queryService;

@BeforeAll
public static void beforeAll() {
BuildIntegrationTestEnvironment instance = BuildIntegrationTestEnvironment.INSTANCE;
}

@Test
public void dataframeMulti() throws IOException, InterruptedException {
Query query = new Query();
List<Query.VariantInfoFilter> variantInfoFilters = new ArrayList<>();
Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter();
variantInfoFilter.categoryVariantInfoFilters = Map.of("Gene_with_variant", new String[]{"LOC102723996", "LOC101928576"});
variantInfoFilters.add(variantInfoFilter);
query.setVariantInfoFilters(variantInfoFilters);
query.setFields(List.of("\\open_access-1000Genomes\\data\\SYNTHETIC_AGE\\"));
query.setExpectedResultType(ResultType.DATAFRAME);

AsyncResult asyncResult = queryService.runQuery(query);

int retries = 0;
while ((AsyncResult.Status.RUNNING.equals(asyncResult.getStatus()) || AsyncResult.Status.PENDING.equals(asyncResult.getStatus())) && retries < 10) {
retries++;
Thread.sleep(200);
}

assertEquals(AsyncResult.Status.SUCCESS, asyncResult.getStatus());
CsvReader csvReader = new CsvReader();
CsvContainer csvContainer = csvReader.read(asyncResult.getFile(), StandardCharsets.UTF_8);
// 22 plus header
assertEquals(23, csvContainer.getRows().size());
}

}

0 comments on commit 0ecb88d

Please sign in to comment.