Skip to content

Commit

Permalink
- Create endpoint for writing data to file
Browse files Browse the repository at this point in the history
- Create service for writing out to file
- Add configuration to specify where to write
- Add PIC-SURE uuid to query object
  - This is how we namespace file writes
- Make abstract processor intersect ID sets more proactively
  - Memory issues
- Most of this work will be done by another service, but
  • Loading branch information
Luke Sikina committed Jan 8, 2024
1 parent 3d460e9 commit 9e6d766
Show file tree
Hide file tree
Showing 13 changed files with 524 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public Query(Query query) {
});
}
this.id = query.id;
this.picSureId = query.picSureId;
}

private ResultType expectedResultType = ResultType.COUNT;
Expand All @@ -43,6 +44,7 @@ public Query(Query query) {
private List<VariantInfoFilter> variantInfoFilters = new ArrayList<>();
private String id;

private String picSureId;

public ResultType getExpectedResultType() {
return expectedResultType;
Expand Down Expand Up @@ -127,6 +129,14 @@ public void setId(String id) {
this.id = id;
}

public String getPicSureId() {
return picSureId;
}

public void setPicSureId(String picSureId) {
this.picSureId = picSureId;
}

public static class VariantInfoFilter {
public VariantInfoFilter() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;


@Component
public class AbstractProcessor {
Expand Down Expand Up @@ -187,6 +190,16 @@ protected Set<Integer> applyBooleanLogic(List<Set<Integer>> filteredIdSets) {
return ids[0];
}

private class IntersectingSetContainer<T> {
Set<T> set = null;
boolean initialized = false;

void intersect(@NotNull Set<T> toIntersect) {
initialized = true;
set = set == null ? toIntersect : Sets.intersection(set, toIntersect);
}
}

/**
* For each filter in the query, return a set of patient ids that match. The order of these sets in the
* returned list of sets does not matter and cannot currently be tied back to the filter that generated
Expand All @@ -196,26 +209,27 @@ protected Set<Integer> applyBooleanLogic(List<Set<Integer>> filteredIdSets) {
* @return
*/
protected List<Set<Integer>> idSetsForEachFilter(Query query) {
final ArrayList<Set<Integer>> filteredIdSets = new ArrayList<>();
IntersectingSetContainer<Integer> ids = new IntersectingSetContainer<>();

try {
query.getAllAnyRecordOf().forEach(anyRecordOfFilterList -> {
addIdSetsForAnyRecordOf(anyRecordOfFilterList, filteredIdSets);
});
addIdSetsForRequiredFields(query, filteredIdSets);
addIdSetsForNumericFilters(query, filteredIdSets);
addIdSetsForCategoryFilters(query, filteredIdSets);
for (List<String> anyRecordOfFilterList : query.getAllAnyRecordOf()) {
ids = addIdSetsForAnyRecordOf(anyRecordOfFilterList, ids);
}
ids = addIdSetsForRequiredFields(query, ids);
ids = addIdSetsForNumericFilters(query, ids);
ids = addIdSetsForCategoryFilters(query, ids);
} catch (InvalidCacheLoadException e) {
log.warn("Invalid query supplied: " + e.getLocalizedMessage());
filteredIdSets.add(new HashSet<>()); // if an invalid path is supplied, no patients should match.
return List.of(new HashSet<>());
}



//AND logic to make sure all patients match each filter
if(filteredIdSets.size()>1) {
List<Set<Integer>> processedFilteredIdSets = new ArrayList<>(List.of(applyBooleanLogic(filteredIdSets)));
return addIdSetsForVariantInfoFilters(query, processedFilteredIdSets);
if (ids.initialized) {
return addIdSetsForVariantInfoFilters(query, new ArrayList<>(List.of(ids.set)));
} else {
return addIdSetsForVariantInfoFilters(query, filteredIdSets);
return addIdSetsForVariantInfoFilters(query, new ArrayList<>());
}
}

Expand Down Expand Up @@ -249,22 +263,23 @@ public TreeSet<Integer> getPatientSubsetForQuery(Query query) {
return idList;
}

private void addIdSetsForRequiredFields(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private IntersectingSetContainer<Integer> addIdSetsForRequiredFields(Query query, IntersectingSetContainer<Integer> filteredIdSets) {
if(!query.getRequiredFields().isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
filteredIdSets.addAll(query.getRequiredFields().parallelStream().map(path->{
if(VariantUtils.pathIsVariantSpec(path)) {
query.getRequiredFields().parallelStream().map(path -> {
if (VariantUtils.pathIsVariantSpec(path)) {
TreeSet<Integer> patientsInScope = new TreeSet<>();
addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1","1/1"}, path, patientsInScope, bucketCache);
addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1", "1/1"}, path, patientsInScope, bucketCache);
return patientsInScope;
} else {
return new TreeSet<Integer>(getCube(path).keyBasedIndex());
return (Set<Integer>) new TreeSet<Integer>(getCube(path).keyBasedIndex());
}
}).collect(Collectors.toSet()));
}).forEach(filteredIdSets::intersect);
}
return filteredIdSets;
}

private void addIdSetsForAnyRecordOf(List<String> anyRecordOfFilters, ArrayList<Set<Integer>> filteredIdSets) {
private IntersectingSetContainer<Integer> addIdSetsForAnyRecordOf(List<String> anyRecordOfFilters, IntersectingSetContainer<Integer> filteredIdSets) {
if(!anyRecordOfFilters.isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
Set<Integer> anyRecordOfPatientSet = anyRecordOfFilters.parallelStream().flatMap(path -> {
Expand All @@ -281,35 +296,37 @@ private void addIdSetsForAnyRecordOf(List<String> anyRecordOfFilters, ArrayList<
}
}
}).collect(Collectors.toSet());
filteredIdSets.add(anyRecordOfPatientSet);
filteredIdSets.intersect(anyRecordOfPatientSet);
}
return filteredIdSets;
}

private void addIdSetsForNumericFilters(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private IntersectingSetContainer<Integer> addIdSetsForNumericFilters(Query query, IntersectingSetContainer<Integer> filteredIdSets) {
if(!query.getNumericFilters().isEmpty()) {
filteredIdSets.addAll((Set<TreeSet<Integer>>)(query.getNumericFilters().entrySet().parallelStream().map(entry->{
return (TreeSet<Integer>)(getCube(entry.getKey()).getKeysForRange(entry.getValue().getMin(), entry.getValue().getMax()));
}).collect(Collectors.toSet())));
query.getNumericFilters().entrySet().parallelStream().map(entry-> {
return (Set<Integer>)(getCube(entry.getKey()).getKeysForRange(entry.getValue().getMin(), entry.getValue().getMax()));
}).forEach(filteredIdSets::intersect);
}
return filteredIdSets;
}

private void addIdSetsForCategoryFilters(Query query, ArrayList<Set<Integer>> filteredIdSets) {
if(!query.getCategoryFilters().isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
Set<Set<Integer>> idsThatMatchFilters = query.getCategoryFilters().entrySet().parallelStream().map(entry->{
Set<Integer> ids = new TreeSet<>();
if(VariantUtils.pathIsVariantSpec(entry.getKey())) {
addIdSetsForVariantSpecCategoryFilters(entry.getValue(), entry.getKey(), ids, bucketCache);
} else {
String[] categoryFilter = entry.getValue();
for(String category : categoryFilter) {
ids.addAll(getCube(entry.getKey()).getKeysForValue(category));
}
private IntersectingSetContainer<Integer> addIdSetsForCategoryFilters(
Query query, IntersectingSetContainer<Integer> startingIds
) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
query.getCategoryFilters().entrySet().parallelStream().map(entry->{
Set<Integer> ids = new TreeSet<>();
if(VariantUtils.pathIsVariantSpec(entry.getKey())) {
addIdSetsForVariantSpecCategoryFilters(entry.getValue(), entry.getKey(), ids, bucketCache);
} else {
String[] categoryFilter = entry.getValue();
for(String category : categoryFilter) {
ids.addAll(getCube(entry.getKey()).getKeysForValue(category));
}
return ids;
}).collect(Collectors.toSet());
filteredIdSets.addAll(idsThatMatchFilters);
}
}
return ids;
}).forEach(startingIds::intersect);
return startingIds;
}

private void addIdSetsForVariantSpecCategoryFilters(String[] zygosities, String key, Set<Integer> ids, VariantBucketHolder<VariantMasks> bucketCache) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

Expand Down Expand Up @@ -131,5 +132,9 @@ public void enqueue() {
public int compareTo(AsyncResult o) {
return this.query.getId().compareTo(o.query.getId());
}

public Path getTempFilePath() {
return stream.getTempFilePath();
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
Expand Down Expand Up @@ -225,4 +226,8 @@ public long estimatedSize() {
return tempFile.length();
}

public Path getTempFilePath() {
return Path.of(tempFile.getAbsolutePath());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void runQuery(Query query, AsyncResult result) {
* @throws IOException
*/
private void exportTimeData(Query query, AsyncResult result, TreeSet<Integer> idList) throws IOException {

log.info("Starting export for time series data of query {} (HPDS ID {})", query.getPicSureId(), query.getId());
Set<String> exportedConceptPaths = new HashSet<String>();
//get a list of all fields mentioned in the query; export all data associated with any included field
List<String> pathList = new LinkedList<String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;

import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType;
import edu.harvard.hms.dbmi.avillach.hpds.service.filesharing.FileSharingService;
import edu.harvard.hms.dbmi.avillach.hpds.service.util.Paginator;
import edu.harvard.hms.dbmi.avillach.hpds.service.util.QueryDecorator;
import org.apache.http.entity.ContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,13 +30,13 @@

import edu.harvard.dbmi.avillach.domain.*;
import edu.harvard.dbmi.avillach.service.IResourceRS;
import edu.harvard.dbmi.avillach.util.UUIDv5;
import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.FileBackedByteIndexedInfoStore;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import edu.harvard.hms.dbmi.avillach.hpds.processing.*;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;

@Path("PIC-SURE")
@Produces("application/json")
Expand All @@ -42,13 +45,17 @@ public class PicSureService implements IResourceRS {

@Autowired
public PicSureService(QueryService queryService, TimelineProcessor timelineProcessor, CountProcessor countProcessor,
VariantListProcessor variantListProcessor, AbstractProcessor abstractProcessor, Paginator paginator) {
VariantListProcessor variantListProcessor, AbstractProcessor abstractProcessor,
Paginator paginator, FileSharingService fileSystemService, QueryDecorator queryDecorator
) {
this.queryService = queryService;
this.timelineProcessor = timelineProcessor;
this.countProcessor = countProcessor;
this.variantListProcessor = variantListProcessor;
this.abstractProcessor = abstractProcessor;
this.paginator = paginator;
this.fileSystemService = fileSystemService;
this.queryDecorator = queryDecorator;
Crypto.loadDefaultKey();
}

Expand All @@ -68,6 +75,10 @@ public PicSureService(QueryService queryService, TimelineProcessor timelineProce

private final Paginator paginator;

private final FileSharingService fileSystemService;

private final QueryDecorator queryDecorator;

private static final String QUERY_METADATA_FIELD = "queryMetadata";
private static final int RESPONSE_CACHE_SIZE = 50;

Expand Down Expand Up @@ -216,7 +227,8 @@ private QueryStatus convertToQueryStatus(AsyncResult entity) {
status.setStatus(entity.status.toPicSureStatus());

Map<String, Object> metadata = new HashMap<String, Object>();
metadata.put("picsureQueryId", UUIDv5.UUIDFromString(entity.query.toString()));
queryDecorator.setId(entity.query);
metadata.put("picsureQueryId", entity.query.getId());
status.setResultMetadata(metadata);
return status;
}
Expand Down Expand Up @@ -250,6 +262,56 @@ public Response queryResult(@PathParam("resourceQueryId") UUID queryId, QueryReq
}
}

@POST
@Path("/write/{dataType}")
public Response writeQueryResult(
@RequestBody() Query query, @PathParam("dataType") String datatype
) {
if (query.getExpectedResultType() != ResultType.DATAFRAME_TIMESERIES) {
return Response
.status(400, "The write endpoint only writes time series dataframes. Fix result type.")
.build();
}
String hpdsQueryID;
try {
QueryStatus queryStatus = convertToQueryStatus(queryService.runQuery(query));
String status = queryStatus.getResourceStatus();
hpdsQueryID = queryStatus.getResourceResultId();
while ("RUNNING".equalsIgnoreCase(status) || "PENDING".equalsIgnoreCase(status)) {
Thread.sleep(10000); // Yea, this is not restful. Sorry.
status = convertToQueryStatus(queryService.getStatusFor(hpdsQueryID)).getResourceStatus();
}
} catch (ClassNotFoundException | IOException | InterruptedException e) {
log.warn("Error waiting for response", e);
return Response.serverError().build();
}

AsyncResult result = queryService.getResultFor(hpdsQueryID);
// the queryResult has this DIY retry logic that blocks a system thread.
// I'm not going to do that here. If the service can't find it, you get a 404.
// Retry it client side.
if (result == null) {
return Response.status(404).build();
}
if (result.status == AsyncResult.Status.ERROR) {
return Response.status(500).build();
}
if (result.status != AsyncResult.Status.SUCCESS) {
return Response.status(503).build(); // 503 = unavailable
}

// at least for now, this is going to block until we finish writing
// Not very restful, but it will make this API very easy to consume
boolean success = false;
query.setId(hpdsQueryID);
if ("phenotypic".equals(datatype)) {
success = fileSystemService.createPhenotypicData(query);
} else if ("genomic".equals(datatype)) {
success = fileSystemService.createGenomicData(query);
}
return success ? Response.ok().build() : Response.serverError().build();
}

@POST
@Path("/query/{resourceQueryId}/status")
@Override
Expand All @@ -266,7 +328,7 @@ public Response queryFormat(QueryRequest resultRequest) {
return Response.ok().entity(convertIncomingQuery(resultRequest).toString()).build();
} catch (IOException e) {
return Response.ok()
.entity("An error occurred formatting the query for display: " + e.getLocalizedMessage()).build();
.entity("An error occurred formatting the query for display: " + e.getLocalizedMessage()).build();
}
}

Expand All @@ -276,7 +338,7 @@ public Response queryFormat(QueryRequest resultRequest) {
public Response querySync(QueryRequest resultRequest) {
if (Crypto.hasKey(Crypto.DEFAULT_KEY_NAME)) {
try {
return _querySync(resultRequest);
return submitQueryAndWaitForCompletion(resultRequest);
} catch (IOException e) {
log.error("IOException caught: ", e);
return Response.serverError().build();
Expand Down Expand Up @@ -305,7 +367,7 @@ public PaginatedSearchResult<String> searchGenomicConceptValues(
return paginator.paginate(matchingValues, page, size);
}

private Response _querySync(QueryRequest resultRequest) throws IOException {
private Response submitQueryAndWaitForCompletion(QueryRequest resultRequest) throws IOException {
Query incomingQuery;
incomingQuery = convertIncomingQuery(resultRequest);
log.info("Query Converted");
Expand All @@ -324,6 +386,7 @@ private Response _querySync(QueryRequest resultRequest) throws IOException {

case DATAFRAME:
case SECRET_ADMIN_DATAFRAME:
case DATAFRAME_TIMESERIES:
case DATAFRAME_MERGED:
QueryStatus status = query(resultRequest);
while (status.getResourceStatus().equalsIgnoreCase("RUNNING")
Expand Down Expand Up @@ -386,6 +449,7 @@ private Response _querySync(QueryRequest resultRequest) throws IOException {
}

private ResponseBuilder queryOkResponse(Object obj, Query incomingQuery) {
return Response.ok(obj).header(QUERY_METADATA_FIELD, UUIDv5.UUIDFromString(incomingQuery.toString()));
queryDecorator.setId(incomingQuery);
return Response.ok(obj).header(QUERY_METADATA_FIELD, incomingQuery.getId());
}
}
Loading

0 comments on commit 9e6d766

Please sign in to comment.