Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ALS-6511: Add PFB result type #116

Merged
merged 23 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d530a3d
ALS-6511: Initial commit for PFB response type
ramari16 Jul 18, 2024
c6bdaf7
ALS-6511: Fix spring config annotations
ramari16 Jul 19, 2024
0175c75
ALS-6511: Fix avro writer configuration
ramari16 Jul 19, 2024
79fee23
ALS-6511: Fix refactoring bug
ramari16 Jul 22, 2024
1a3fd09
ALS-6511: Fix refactoring bug
ramari16 Jul 22, 2024
bc1cda5
ALS-6511: Fix avro formatting issue
ramari16 Jul 22, 2024
d149826
ALS-6511: Fix null value handling
ramari16 Jul 22, 2024
bd3a945
ALS-6511: Add log message for avro file locaton
ramari16 Jul 22, 2024
1360901
ALS-6511: Attempt to fix snappy class not found exception
ramari16 Jul 22, 2024
7bc1109
ALS-6511: Attempt to fix snappy codec bug
ramari16 Jul 23, 2024
e0f9fad
ALS-6511: Attempt to fix snappy codec bug
ramari16 Jul 23, 2024
741aed8
ALS-6511: Fix pfb file writing issues
ramari16 Jul 23, 2024
75e2448
ALS-6511: Update pfb result to handle multiple values per variable
ramari16 Aug 6, 2024
068e987
ALS-6511: Fix infinite loop
ramari16 Aug 6, 2024
93630c3
ALS-6511: Handle single value variables
ramari16 Aug 7, 2024
37c739f
ALS-6511: Minor fixes and comments
ramari16 Aug 13, 2024
9e2fd0c
Merge branch 'genomic-v2' into ALS-6511
ramari16 Aug 13, 2024
7dac201
ALS-6511: Added test case, removed unused code
ramari16 Aug 13, 2024
091eab0
ALS-6551: Attempt to fix character encoding issue
ramari16 Aug 14, 2024
1be9246
ALS-6511: Attempt to fix encoding issue
ramari16 Aug 14, 2024
02f166d
ALS-6511: Attempt to fix encoding issue
ramari16 Aug 14, 2024
3321f37
ALS-6511: Revert uneccesary changes
ramari16 Aug 16, 2024
f564636
ALS-6511: Revert uneccessary testing changes
ramari16 Aug 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ public String toString() {
writePartFormat("Observation Count Fields", fields, builder, true);
break;
case DATAFRAME:
case DATAFRAME_MERGED:
case SECRET_ADMIN_DATAFRAME:
writePartFormat("Data Export Fields", fields, builder, true);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ public enum ResultType {
* Return the number of observations for included patients and
* included fields, broken up across the included cross count fields.
*/
OBSERVATION_CROSS_COUNT,
/**
* This was developed for UDN, but is completely useless and should
* be deleted.
*/
DATAFRAME_MERGED,
OBSERVATION_CROSS_COUNT,
/**
* Not completely implemented and currently dead code. Someone with
* statistics experience needs to develop a p-value based filter for
Expand Down Expand Up @@ -94,5 +89,10 @@ public enum ResultType {
* is suitable to time series analysis and/or loading into another
* instance of HPDS.
*/
DATAFRAME_TIMESERIES
DATAFRAME_TIMESERIES,
/**
* Exports data as PFB, using avro
* <a href="https://uc-cdis.github.io/pypfb/">https://uc-cdis.github.io/pypfb/</a>
*/
DATAFRAME_PFB
}
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,16 @@
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.10.5</version>
</dependency>


</dependencies>
Expand Down
10 changes: 10 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,15 @@
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,7 +23,19 @@
public class AsyncResult implements Runnable, Comparable<AsyncResult>{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was a huge pain. I tried to improve it but I'm not super happy with how this turned out. But at least this supports multi value exporting, to some degree


private static Logger log = LoggerFactory.getLogger(AsyncResult.class);


public byte[] readAllBytes() {
try {
return stream.readAllBytes();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public void closeWriter() {
stream.closeWriter();
}

public static enum Status{
SUCCESS {
@Override
Expand Down Expand Up @@ -52,29 +70,82 @@ public PicSureStatus toPicSureStatus() {
public abstract PicSureStatus toPicSureStatus();
}

public Query query;

public Status status;

public long queuedTime;

public long completedTime;

public int retryCount;

public int queueDepth;

public int positionInQueue;

public int numRows;
private Query query;

public Query getQuery() {
return query;
}

private Status status;

public Status getStatus() {
return status;
}

public AsyncResult setStatus(Status status) {
this.status = status;
return this;
}

private long queuedTime;

public long getQueuedTime() {
return queuedTime;
}

public AsyncResult setQueuedTime(long queuedTime) {
this.queuedTime = queuedTime;
return this;
}

private long completedTime;

public long getCompletedTime() {
return completedTime;
}

private int retryCount;

public int numColumns;
private int queueDepth;

public int getQueueDepth() {
return queueDepth;
}

public AsyncResult setQueueDepth(int queueDepth) {
this.queueDepth = queueDepth;
return this;
}

private int positionInQueue;

public AsyncResult setPositionInQueue(int positionInQueue) {
this.positionInQueue = positionInQueue;
return this;
}

private int numRows;

public String id;
private int numColumns;

private String id;

public String getId() {
return id;
}

public AsyncResult setId(String id) {
this.id = id;
return this;
}

@JsonIgnore
public ResultStoreStream stream;

private ResultStoreStream stream;

public ResultStoreStream getStream() {
return stream;
}

@JsonIgnore
private String[] headerRow;

Expand All @@ -86,21 +157,47 @@ public PicSureStatus toPicSureStatus() {
* The actual exception is thrown in @see ResultStore#constructor
*/
@JsonIgnore
public ExecutorService jobQueue;
private ExecutorService jobQueue;

public ExecutorService getJobQueue() {
return jobQueue;
}

public AsyncResult setJobQueue(ExecutorService jobQueue) {
this.jobQueue = jobQueue;
return this;
}

@JsonIgnore
public HpdsProcessor processor;
private HpdsProcessor processor;

public HpdsProcessor getProcessor() {
return processor;
}

public AsyncResult(Query query, String[] headerRow) {
public AsyncResult(Query query, HpdsProcessor processor, ResultWriter writer) {
this.query = query;
this.headerRow = headerRow;
this.processor = processor;
this.headerRow = processor.getHeaderRow(query);
try {
stream = new ResultStoreStream(headerRow, query.getExpectedResultType() == ResultType.DATAFRAME_MERGED);
stream = new ResultStoreStream(headerRow, writer);
} catch (IOException e) {
log.error("Exception creating result stream", e);
}
}

public void appendResults(List<String[]> dataEntries) {
stream.appendResults(dataEntries);
}
public void appendMultiValueResults(List<List<List<String>>> dataEntries) {
stream.appendMultiValueResults(dataEntries);
}

public void appendResultStore(ResultStore resultStore) {
stream.appendResultStore(resultStore);
}


@Override
public void run() {
status = AsyncResult.Status.RUNNING;
Expand All @@ -127,9 +224,15 @@ public void enqueue() {
}
}

public void open() {
stream.open();
}

@Override
public int compareTo(AsyncResult o) {
return this.query.getId().compareTo(o.query.getId());
}




}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import com.google.common.collect.Lists;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class PfbProcessor implements HpdsProcessor {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat similar to the QueryProcessor which generates the CSVs, but since PFB needs to write row by row, we batch the rows and then transform the column-based data to row based data before writing


public static final String PATIENT_ID_FIELD_NAME = "patient_id";
private final int ID_BATCH_SIZE;
private final AbstractProcessor abstractProcessor;

private Logger log = LoggerFactory.getLogger(PfbProcessor.class);


@Autowired
public PfbProcessor(AbstractProcessor abstractProcessor) {
this.abstractProcessor = abstractProcessor;
ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0"));
}

@Override
public String[] getHeaderRow(Query query) {
String[] header = new String[query.getFields().size()+1];
header[0] = PATIENT_ID_FIELD_NAME;
System.arraycopy(query.getFields().toArray(), 0, header, 1, query.getFields().size());
return header;
}

@Override
public void runQuery(Query query, AsyncResult result) {
Set<Integer> idList = abstractProcessor.getPatientSubsetForQuery(query);
log.info("Processing " + idList.size() + " rows for result " + result.getId());
Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream()
.forEach(patientIds -> {
Map<String, Map<Integer, List<String>>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds));
List<List<List<String>>> fieldValuesPerPatient = patientIds.stream().map(patientId -> {
List<List<String>> objectStream = Arrays.stream(getHeaderRow(query)).map(field -> {
if (PATIENT_ID_FIELD_NAME.equals(field)) {
return List.of(patientId.toString());
} else {
return pathToPatientToValueMap.get(field).get(patientId);
}
}).collect(Collectors.toList());
return objectStream;
}).collect(Collectors.toList());
result.appendMultiValueResults(fieldValuesPerPatient);
});
result.closeWriter();
}

private Map<String, Map<Integer, List<String>>> buildResult(AsyncResult result, Query query, TreeSet<Integer> ids) {
ConcurrentHashMap<String, Map<Integer, List<String>>> pathToPatientToValueMap = new ConcurrentHashMap<>();
List<ColumnMeta> columns = query.getFields().stream()
.map(abstractProcessor.getDictionary()::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
List<String> paths = columns.stream()
.map(ColumnMeta::getName)
.collect(Collectors.toList());
int columnCount = paths.size() + 1;

ArrayList<Integer> columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount);
ResultStore results = new ResultStore(result.getId(), columns, ids);

// todo: investigate if the parallel stream will thrash the cache if the number of executors is > number of resident cubes
columnIndex.parallelStream().forEach((columnId)->{
String columnPath = paths.get(columnId-1);
Map<Integer, List<String>> patientIdToValueMap = processColumn(ids, columnPath);
pathToPatientToValueMap.put(columnPath, patientIdToValueMap);
});

return pathToPatientToValueMap;
}

private Map<Integer, List<String>> processColumn(TreeSet<Integer> patientIds, String path) {

Map<Integer, List<String>> patientIdToValueMap = new HashMap<>();
PhenoCube<?> cube = abstractProcessor.getCube(path);

KeyAndValue<?>[] cubeValues = cube.sortedByKey();

int idPointer = 0;
for(int patientId : patientIds) {
while(idPointer < cubeValues.length) {
int key = cubeValues[idPointer].getKey();
if(key < patientId) {
idPointer++;
} else if(key == patientId){
String value = getResultField(cube, cubeValues, idPointer);
patientIdToValueMap.computeIfAbsent(patientId, k -> new ArrayList<>()).add(value);
idPointer++;
} else {
break;
}
}
}
return patientIdToValueMap;
}

private String getResultField(PhenoCube<?> cube, KeyAndValue<?>[] cubeValues,
int idPointer) {
Comparable<?> value = cubeValues[idPointer].getValue();
return value.toString();
}
}
Loading
Loading