Skip to content

Commit

Permalink
Merge branch 'ALS-6511' into ALS-6511-ALS-6330
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Jul 31, 2024
2 parents b034ddc + 741aed8 commit 4631627
Show file tree
Hide file tree
Showing 17 changed files with 646 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ public String toString() {
writePartFormat("Observation Count Fields", fields, builder, true);
break;
case DATAFRAME:
case DATAFRAME_MERGED:
case SECRET_ADMIN_DATAFRAME:
writePartFormat("Data Export Fields", fields, builder, true);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ public enum ResultType {
* Return the number of observations for included patients and
* included fields, broken up across the included cross count fields.
*/
OBSERVATION_CROSS_COUNT,
/**
* This was developed for UDN, but is completely useless and should
* be deleted.
*/
DATAFRAME_MERGED,
OBSERVATION_CROSS_COUNT,
/**
* Not completely implemented and currently dead code. Someone with
* statistics experience needs to develop a p-value based filter for
Expand Down Expand Up @@ -94,5 +89,10 @@ public enum ResultType {
* is suitable to time series analysis and/or loading into another
* instance of HPDS.
*/
DATAFRAME_TIMESERIES
DATAFRAME_TIMESERIES,
/**
* Exports data as PFB, using avro
* <a href="https://uc-cdis.github.io/pypfb/">https://uc-cdis.github.io/pypfb/</a>
*/
DATAFRAME_PFB
}
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,16 @@
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.10.5</version>
</dependency>


</dependencies>
Expand Down
10 changes: 10 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,15 @@
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,7 +23,19 @@
public class AsyncResult implements Runnable, Comparable<AsyncResult>{

private static Logger log = LoggerFactory.getLogger(AsyncResult.class);


public byte[] readAllBytes() {
try {
return stream.readAllBytes();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public void closeWriter() {
stream.closeWriter();
}

public static enum Status{
SUCCESS {
@Override
Expand Down Expand Up @@ -52,29 +70,82 @@ public PicSureStatus toPicSureStatus() {
public abstract PicSureStatus toPicSureStatus();
}

public Query query;

public Status status;

public long queuedTime;

public long completedTime;

public int retryCount;

public int queueDepth;

public int positionInQueue;

public int numRows;
private Query query;

public Query getQuery() {
return query;
}

private Status status;

public Status getStatus() {
return status;
}

public AsyncResult setStatus(Status status) {
this.status = status;
return this;
}

private long queuedTime;

public long getQueuedTime() {
return queuedTime;
}

public AsyncResult setQueuedTime(long queuedTime) {
this.queuedTime = queuedTime;
return this;
}

private long completedTime;

public long getCompletedTime() {
return completedTime;
}

private int retryCount;

public int numColumns;
private int queueDepth;

public int getQueueDepth() {
return queueDepth;
}

public AsyncResult setQueueDepth(int queueDepth) {
this.queueDepth = queueDepth;
return this;
}

private int positionInQueue;

public AsyncResult setPositionInQueue(int positionInQueue) {
this.positionInQueue = positionInQueue;
return this;
}

private int numRows;

public String id;
private int numColumns;

private String id;

public String getId() {
return id;
}

public AsyncResult setId(String id) {
this.id = id;
return this;
}

@JsonIgnore
public ResultStoreStream stream;

private ResultStoreStream stream;

public ResultStoreStream getStream() {
return stream;
}

@JsonIgnore
private String[] headerRow;

Expand All @@ -86,21 +157,44 @@ public PicSureStatus toPicSureStatus() {
* The actual exception is thrown in @see ResultStore#constructor
*/
@JsonIgnore
public ExecutorService jobQueue;
private ExecutorService jobQueue;

public ExecutorService getJobQueue() {
return jobQueue;
}

public AsyncResult setJobQueue(ExecutorService jobQueue) {
this.jobQueue = jobQueue;
return this;
}

@JsonIgnore
public HpdsProcessor processor;
private HpdsProcessor processor;

public HpdsProcessor getProcessor() {
return processor;
}

public AsyncResult(Query query, String[] headerRow) {
public AsyncResult(Query query, HpdsProcessor processor, ResultWriter writer) {
this.query = query;
this.headerRow = headerRow;
this.processor = processor;
this.headerRow = processor.getHeaderRow(query);
try {
stream = new ResultStoreStream(headerRow, query.getExpectedResultType() == ResultType.DATAFRAME_MERGED);
stream = new ResultStoreStream(headerRow, writer);
} catch (IOException e) {
log.error("Exception creating result stream", e);
}
}

public void appendResults(List<String[]> dataEntries) {
stream.appendResults(dataEntries);
}

public void appendResultStore(ResultStore resultStore) {
stream.appendResultStore(resultStore);
}


@Override
public void run() {
status = AsyncResult.Status.RUNNING;
Expand All @@ -127,9 +221,15 @@ public void enqueue() {
}
}

public void open() {
stream.open();
}

@Override
public int compareTo(AsyncResult o) {
return this.query.getId().compareTo(o.query.getId());
}




}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import com.google.common.collect.Lists;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Component
public class PfbProcessor implements HpdsProcessor {

public static final String PATIENT_ID_FIELD_NAME = "patient_id";
private final int ID_BATCH_SIZE;
private final AbstractProcessor abstractProcessor;

private Logger log = LoggerFactory.getLogger(PfbProcessor.class);


@Autowired
public PfbProcessor(AbstractProcessor abstractProcessor) {
this.abstractProcessor = abstractProcessor;
ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0"));
}

@Override
public String[] getHeaderRow(Query query) {
String[] header = new String[query.getFields().size()+1];
header[0] = PATIENT_ID_FIELD_NAME;
System.arraycopy(query.getFields().toArray(), 0, header, 1, query.getFields().size());
return header;
}

@Override
public void runQuery(Query query, AsyncResult result) {
Set<Integer> idList = abstractProcessor.getPatientSubsetForQuery(query);
log.info("Processing " + idList.size() + " rows for result " + result.getId());
Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream()
.forEach(patientIds -> {
Map<String, Map<Integer, String>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds));
List<String[]> fieldValuesPerPatient = patientIds.stream().map(patientId -> {
return Arrays.stream(getHeaderRow(query)).map(field -> {
if (PATIENT_ID_FIELD_NAME.equals(field)) {
return patientId.toString();
} else {
return pathToPatientToValueMap.get(field).get(patientId);
}
}).toArray(String[]::new);
}).collect(Collectors.toList());
result.appendResults(fieldValuesPerPatient);
});
result.closeWriter();
}

private Map<String, Map<Integer, String>> buildResult(AsyncResult result, Query query, TreeSet<Integer> ids) {
ConcurrentHashMap<String, Map<Integer, String>> pathToPatientToValueMap = new ConcurrentHashMap<>();
List<ColumnMeta> columns = query.getFields().stream()
.map(abstractProcessor.getDictionary()::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
List<String> paths = columns.stream()
.map(ColumnMeta::getName)
.collect(Collectors.toList());
int columnCount = paths.size() + 1;

ArrayList<Integer> columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount);
ResultStore results = new ResultStore(result.getId(), columns, ids);

// todo: investigate if the parallel stream will thrash the cache if the number of executors is > number of resident cubes
columnIndex.parallelStream().forEach((columnId)->{
String columnPath = paths.get(columnId-1);
Map<Integer, String> patientIdToValueMap = processColumn(ids, columnPath);
pathToPatientToValueMap.put(columnPath, patientIdToValueMap);
});

return pathToPatientToValueMap;
}

private Map<Integer, String> processColumn(TreeSet<Integer> patientIds, String path) {

Map<Integer, String> patientIdToValueMap = new HashMap<>();
PhenoCube<?> cube = abstractProcessor.getCube(path);

KeyAndValue<?>[] cubeValues = cube.sortedByKey();

int idPointer = 0;
for(int patientId : patientIds) {
while(idPointer < cubeValues.length) {
int key = cubeValues[idPointer].getKey();
if(key < patientId) {
idPointer++;
} else if(key == patientId){
String value = getResultField(cube, cubeValues, idPointer);
patientIdToValueMap.put(patientId, value);
idPointer++;
break;
} else {
break;
}
}
}
return patientIdToValueMap;
}

private String getResultField(PhenoCube<?> cube, KeyAndValue<?>[] cubeValues,
int idPointer) {
Comparable<?> value = cubeValues[idPointer].getValue();
return value.toString();
}
}
Loading

0 comments on commit 4631627

Please sign in to comment.