diff --git a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/Query.java b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/Query.java index 8e28e74e..5a8a1fb0 100644 --- a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/Query.java +++ b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/Query.java @@ -179,7 +179,6 @@ public String toString() { writePartFormat("Observation Count Fields", fields, builder, true); break; case DATAFRAME: - case DATAFRAME_MERGED: case SECRET_ADMIN_DATAFRAME: writePartFormat("Data Export Fields", fields, builder, true); break; diff --git a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java index 4481190b..f7cd8165 100644 --- a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java +++ b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java @@ -49,12 +49,7 @@ public enum ResultType { * Return the number of observations for included patients and * included fields, broken up across the included cross count fields. */ - OBSERVATION_CROSS_COUNT, - /** - * This was developed for UDN, but is completely useless and should - * be deleted. - */ - DATAFRAME_MERGED, + OBSERVATION_CROSS_COUNT, /** * Not completely implemented and currently dead code. Someone with * statistics experience needs to develop a p-value based filter for @@ -94,5 +89,10 @@ public enum ResultType { * is suitable to time series analysis and/or loading into another * instance of HPDS. */ - DATAFRAME_TIMESERIES + DATAFRAME_TIMESERIES, + /** + * Exports data as PFB, using avro + * https://uc-cdis.github.io/pypfb/ + */ + DATAFRAME_PFB } diff --git a/pom.xml b/pom.xml index b2bbf69d..fb525de5 100644 --- a/pom.xml +++ b/pom.xml @@ -310,7 +310,11 @@ 1.18.30 provided - + + org.apache.avro + avro + 1.11.3 + diff --git a/processing/pom.xml b/processing/pom.xml index 66d9d1e3..96d3311f 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -33,5 +33,9 @@ io.projectreactor.netty reactor-netty + + org.apache.avro + avro + diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java index e4434abf..42d9c31e 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java @@ -1,6 +1,9 @@ package edu.harvard.hms.dbmi.avillach.hpds.processing; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -17,7 +20,15 @@ public class AsyncResult implements Runnable, Comparable{ private static Logger log = LoggerFactory.getLogger(AsyncResult.class); - + + public byte[] readAllBytes() { + try { + return stream.readAllBytes(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public static enum Status{ SUCCESS { @Override @@ -52,29 +63,82 @@ public PicSureStatus toPicSureStatus() { public abstract PicSureStatus toPicSureStatus(); } - public Query query; - - public Status status; - - public long queuedTime; - - public long completedTime; - - public int retryCount; - - public int queueDepth; - - public int positionInQueue; - - public int numRows; + private Query query; + + public Query getQuery() { + return query; + } + + private Status status; + + public Status getStatus() { + return status; + } + + public AsyncResult setStatus(Status status) { + this.status = status; + return this; + } + + private long queuedTime; + + public long getQueuedTime() { + return queuedTime; + } + + public AsyncResult setQueuedTime(long queuedTime) { + this.queuedTime = queuedTime; + return this; + } + + private long completedTime; + + public long getCompletedTime() { + return completedTime; + } + + private int retryCount; - public int numColumns; + private int queueDepth; + + public int getQueueDepth() { + return queueDepth; + } + + public AsyncResult setQueueDepth(int queueDepth) { + this.queueDepth = queueDepth; + return this; + } + + private int positionInQueue; + + public AsyncResult setPositionInQueue(int positionInQueue) { + this.positionInQueue = positionInQueue; + return this; + } + + private int numRows; - public String id; + private int numColumns; + private String id; + + public String getId() { + return id; + } + + public AsyncResult setId(String id) { + this.id = id; + return this; + } + @JsonIgnore - public ResultStoreStream stream; - + private ResultStoreStream stream; + + public ResultStoreStream getStream() { + return stream; + } + @JsonIgnore private String[] headerRow; @@ -86,21 +150,48 @@ public PicSureStatus toPicSureStatus() { * The actual exception is thrown in @see ResultStore#constructor */ @JsonIgnore - public ExecutorService jobQueue; + private ExecutorService jobQueue; + + public ExecutorService getJobQueue() { + return jobQueue; + } + + public AsyncResult setJobQueue(ExecutorService jobQueue) { + this.jobQueue = jobQueue; + return this; + } @JsonIgnore - public HpdsProcessor processor; + private HpdsProcessor processor; + + public HpdsProcessor getProcessor() { + return processor; + } + + public AsyncResult setProcessor(HpdsProcessor processor) { + this.processor = processor; + return this; + } public AsyncResult(Query query, String[] headerRow) { this.query = query; this.headerRow = headerRow; try { - stream = new ResultStoreStream(headerRow, query.getExpectedResultType() == ResultType.DATAFRAME_MERGED); + stream = new ResultStoreStream(headerRow); } catch (IOException e) { log.error("Exception creating result stream", e); } } + public void appendResults(List dataEntries) { + stream.appendResults(dataEntries); + } + + public void appendResultStore(ResultStore resultStore) { + stream.appendResultStore(resultStore); + } + + @Override public void run() { status = AsyncResult.Status.RUNNING; @@ -127,9 +218,15 @@ public void enqueue() { } } + public void open() { + stream.open(); + } + @Override public int compareTo(AsyncResult o) { return this.query.getId().compareTo(o.query.getId()); } - + + + } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java new file mode 100644 index 00000000..e16ee194 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java @@ -0,0 +1,112 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing; + +import com.google.common.collect.Lists; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class PfbProcessor implements HpdsProcessor { + + public static final String PATIENT_ID_FIELD_NAME = "patient_id"; + private final int ID_BATCH_SIZE; + private final AbstractProcessor abstractProcessor; + + private Logger log = LoggerFactory.getLogger(PfbProcessor.class); + + + public PfbProcessor(AbstractProcessor abstractProcessor) { + this.abstractProcessor = abstractProcessor; + ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0")); + } + + @Override + public String[] getHeaderRow(Query query) { + String[] header = new String[query.getFields().size()+1]; + header[0] = PATIENT_ID_FIELD_NAME; + System.arraycopy(query.getFields().toArray(), 0, header, 1, query.getFields().size()); + return header; + } + + @Override + public void runQuery(Query query, AsyncResult result) { + Set idList = abstractProcessor.getPatientSubsetForQuery(query); + log.info("Processing " + idList.size() + " rows for result " + result.getId()); + Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream() + .forEach(patientIds -> { + Map> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds)); + List fieldValuesPerPatient = patientIds.stream().map(patientId -> { + return Arrays.stream(getHeaderRow(query)).map(field -> { + if (PATIENT_ID_FIELD_NAME.equals(field)) { + return patientId.toString(); + } else { + return pathToPatientToValueMap.get(field).get(patientId); + } + }).toArray(String[]::new); + }).collect(Collectors.toList()); + result.appendResults(fieldValuesPerPatient); + }); + } + + private Map> buildResult(AsyncResult result, Query query, TreeSet ids) { + ConcurrentHashMap> pathToPatientToValueMap = new ConcurrentHashMap<>(); + List columns = query.getFields().stream() + .map(abstractProcessor.getDictionary()::get) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + List paths = columns.stream() + .map(ColumnMeta::getName) + .collect(Collectors.toList()); + int columnCount = paths.size() + 1; + + ArrayList columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount); + ResultStore results = new ResultStore(result.getId(), columns, ids); + + // todo: investigate if the parallel stream will thrash the cache if the number of executors is > number of resident cubes + columnIndex.parallelStream().forEach((columnId)->{ + String columnPath = paths.get(columnId-1); + Map patientIdToValueMap = processColumn(ids, columnPath); + pathToPatientToValueMap.put(columnPath, patientIdToValueMap); + }); + + return pathToPatientToValueMap; + } + + private Map processColumn(TreeSet patientIds, String path) { + + Map patientIdToValueMap = new HashMap<>(); + PhenoCube cube = abstractProcessor.getCube(path); + + KeyAndValue[] cubeValues = cube.sortedByKey(); + + int idPointer = 0; + for(int patientId : patientIds) { + while(idPointer < cubeValues.length) { + int key = cubeValues[idPointer].getKey(); + if(key < patientId) { + idPointer++; + } else if(key == patientId){ + String value = getResultField(cube, cubeValues, idPointer); + patientIdToValueMap.put(patientId, value); + idPointer++; + break; + } else { + break; + } + } + } + return patientIdToValueMap; + } + + private String getResultField(PhenoCube cube, KeyAndValue[] cubeValues, + int idPointer) { + Comparable value = cubeValues[idPointer].getValue(); + return value.toString(); + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java index 08d83bc6..f02e91f4 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java @@ -53,11 +53,11 @@ public String[] getHeaderRow(Query query) { public void runQuery(Query query, AsyncResult result) { Set idList = abstractProcessor.getPatientSubsetForQuery(query); - log.info("Processing " + idList.size() + " rows for result " + result.id); + log.info("Processing " + idList.size() + " rows for result " + result.getId()); Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).parallelStream() .map(list -> buildResult(result, query, new TreeSet<>(list))) .sequential() - .forEach(result.stream::appendResultStore); + .forEach(result::appendResultStore); } @@ -72,7 +72,7 @@ private ResultStore buildResult(AsyncResult result, Query query, TreeSet columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount); - ResultStore results = new ResultStore(result.id, columns, ids); + ResultStore results = new ResultStore(result.getId(), columns, ids); columnIndex.parallelStream().forEach((column)->{ clearColumn(paths, ids, results, column); diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStore.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStore.java index b4ec7631..4fc2f7c4 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStore.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStore.java @@ -124,7 +124,7 @@ private int getFieldOffset(int row, int column) { * @param row * @throws IOException */ - public void readRowIntoStringArray(int rowNumber, int[] columnWidths, String[] row) throws IOException { + public void readRowIntoStringArray(int rowNumber, int[] columnWidths, String[] row) { if(wrappedResultArray == null) { wrappedResultArray = ByteBuffer.wrap(resultArray); } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java index 5bb23d81..302ff588 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java @@ -3,94 +3,40 @@ import java.io.*; import java.util.ArrayList; import java.util.List; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.regex.Pattern; +import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter; +import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVPrinter; -import org.apache.commons.csv.CSVRecord; - -import de.siegmar.fastcsv.writer.CsvWriter; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; public class ResultStoreStream extends InputStream { - private CsvWriter writer; + private ResultWriter writer; private File tempFile; private InputStream in; private int value; private boolean streamIsClosed = false; private int numRows; - private String[] expandedHeader; - private TreeMap> mergedColumnIndex; private String[] originalHeader; - private boolean mergeColumns; - public ResultStoreStream(String[] header, boolean mergeColumns) throws IOException { - writer = new CsvWriter(); + public ResultStoreStream(String[] header) throws IOException { tempFile = File.createTempFile("result-"+ System.nanoTime(), ".sstmp"); + writer = new CsvWriter(tempFile); this.originalHeader = header; - if(mergeColumns) { - this.expandedHeader = createMergedColumns(header); - writeHeader(this.expandedHeader); - }else { - writeHeader(this.originalHeader); - } - this.mergeColumns = mergeColumns; + writeHeader(this.originalHeader); numRows = 0; } - private String[] createMergedColumns(String[] header) { - ArrayList allColumns = new ArrayList(); - allColumns.add(header[0]); - TreeMap> mergedColumns = new TreeMap<>(); - this.mergedColumnIndex = new TreeMap<>(); - int columnNumber = 0; - for(String column : header) { - String[] split = column.split("\\\\"); - if(split.length > 1) { - String key = split[1]; - TreeSet subColumns = mergedColumns.get(key); - ArrayList columnIndex = mergedColumnIndex.get(key); - if(subColumns == null) { - subColumns = new TreeSet(); - mergedColumns.put(key, subColumns); - allColumns.add(key); - columnIndex = new ArrayList(); - mergedColumnIndex.put(key, columnIndex); - } - columnIndex.add(columnNumber); - subColumns.add(column); - } - columnNumber++; - } - for(int x = 1;x headerEntries = new ArrayList(); - headerEntries.add(header); - try(FileWriter out = new FileWriter(tempFile);){ - writer.write(out, headerEntries); - } + private void writeHeader(String[] header) { + writer.writeHeader(header); } public void appendResultStore(ResultStore results) { - try (FileWriter out = new FileWriter(tempFile, true);){ - int batchSize = 100; - List entries = new ArrayList(batchSize); - for(int x = 0;x entries = new ArrayList<>(batchSize); + for(int x = 0;x entries) { - try (FileWriter out = new FileWriter(tempFile, true);){ - writer.write(out, entries); - numRows += entries.size(); - } catch (IOException e) { - throw new RuntimeException("IOException while appending temp file : " + tempFile.getAbsolutePath(), e); - } + writer.writeEntity(entries); } - private List writeResultsToTempFile(ResultStore results, FileWriter out, int batchSize, - List entries) throws IOException { + private List writeResultsToTempFile(ResultStore results, int batchSize, + List entries) { List columns = results.getColumns(); int[] columnWidths = new int[columns.size()]; @@ -123,7 +64,7 @@ private List writeResultsToTempFile(ResultStore results, FileWriter ou if(rowsInBatch < batchSize) { entries = entries.subList(0, rowsInBatch); } - writer.write(out, entries); + writer.writeEntity(entries); numRows += rowsInBatch; } return entries; @@ -139,68 +80,12 @@ public void close() { public void open() { try { - in = new BufferedInputStream(new FileInputStream(new File(tempFile.getAbsolutePath())), 1024 * 1024 * 8); - if(mergeColumns) { - File mergedFile = File.createTempFile(tempFile.getName(), "_merged"); - FileWriter out = new FileWriter(mergedFile); - CSVParser parser = CSVFormat.DEFAULT.withDelimiter(',').parse(new InputStreamReader(in)); - CSVPrinter writer = new CSVPrinter(out, CSVFormat.DEFAULT.withDelimiter(',')); - final boolean[] firstRow = new boolean[] {true}; - parser.forEach((CSVRecord record)->{ - if(firstRow[0]) { - try { - ArrayList header = new ArrayList<>(); - header.add("Patient ID"); - header.addAll(mergedColumnIndex.keySet()); - writer.printRecord(header); - firstRow[0] = false; - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }else { - ArrayList records = new ArrayList(); - records.add(record.get(0)); - for(String column : mergedColumnIndex.keySet()) { - ArrayList valuesToMerge = new ArrayList<>(); - for(Integer columnNumber : mergedColumnIndex.get(column)) { - String value = record.get(columnNumber); - if( value != null && ! value.isEmpty() ) { - value = value.replaceAll("\"", "'"); - String label = originalHeader[columnNumber].replaceAll("\\\\"+ column, ""); - if(label.length()>1) { - label = label.substring(1, label.length()-1); - }else { - label = null; - } - if(label==null || label.trim().contentEquals(value.trim())) { - valuesToMerge.add(value); - } else { - valuesToMerge.add(label==null ? value : label.replaceAll("\\\\"+Pattern.quote(value), "") + " : " + value); - } - } - } - records.add(String.join(";", valuesToMerge)); - } - try { - writer.printRecord(records); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - }); - parser.close(); - writer.close(); - out.close(); - in.close(); - in = new BufferedInputStream(new FileInputStream(mergedFile), 1024 * 1024 * 8); - } + in = new BufferedInputStream(new FileInputStream(tempFile.getAbsolutePath()), 1024 * 1024 * 8); streamIsClosed = false; } catch (FileNotFoundException e) { throw new RuntimeException("temp file for result not found : " + tempFile.getAbsolutePath()); - } catch (IOException e) { - throw new UncheckedIOException(e); } - } + } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/TimeseriesProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/TimeseriesProcessor.java index fe539659..9e01ad92 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/TimeseriesProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/TimeseriesProcessor.java @@ -127,11 +127,11 @@ private void addDataForConcepts(Collection pathList, Set exporte } //batch exports so we don't take double memory (valuesForKeys + dataEntries could be a lot of data points) if(dataEntries.size() >= ID_BATCH_SIZE) { - result.stream.appendResults(dataEntries); + result.appendResults(dataEntries); dataEntries = new ArrayList(); } } - result.stream.appendResults(dataEntries); + result.appendResults(dataEntries); exportedConceptPaths.add(conceptPath); } } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java new file mode 100644 index 00000000..a06b867a --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java @@ -0,0 +1,48 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.io; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class CsvWriter implements ResultWriter { + + private final de.siegmar.fastcsv.writer.CsvWriter csvWriter; + + private final FileWriter fileWriter; + + public CsvWriter(File file) { + csvWriter = new de.siegmar.fastcsv.writer.CsvWriter(); + try { + this.fileWriter = new FileWriter(file); + } catch (IOException e) { + throw new RuntimeException("IOException while appending temp file : " + file.getAbsolutePath(), e); + } + } + + @Override + public void writeHeader(String[] data) { + try { + List dataList = new ArrayList<>(); + dataList.add(data); + csvWriter.write(fileWriter, dataList); + } catch (IOException e) { + throw new RuntimeException("IOException while appending to CSV file", e); + } + } + @Override + public void writeEntity(Collection data) { + try { + csvWriter.write(fileWriter, data); + } catch (IOException e) { + throw new RuntimeException("IOException while appending to CSV file", e); + } + } + + @Override + public void close() throws IOException { + fileWriter.close(); + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbResultStore.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbResultStore.java new file mode 100644 index 00000000..57312006 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbResultStore.java @@ -0,0 +1,4 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.io; + +public class PfbResultStore { +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java new file mode 100644 index 00000000..0eb4570f --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java @@ -0,0 +1,139 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.io; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.*; +import java.util.stream.Collectors; + +public class PfbWriter implements ResultWriter { + + private final Schema metadataSchema; + private final Schema nodeSchema; + private SchemaBuilder.FieldAssembler entityFieldAssembler; + + private List fields; + private DataFileWriter dataFileWriter; + private File file; + private Schema entitySchema; + private Schema patientDataSchema; + + public PfbWriter() { + entityFieldAssembler = SchemaBuilder.record("entity") + .namespace("edu.harvard.dbmi") + .fields(); + + SchemaBuilder.FieldAssembler nodeRecord = SchemaBuilder.record("nodes") + .fields() + .requiredString("name") + .nullableString("ontology_reference", "null") + .name("values").type(SchemaBuilder.map().values(SchemaBuilder.nullable().stringType())).noDefault(); + nodeSchema = nodeRecord.endRecord(); + + SchemaBuilder.FieldAssembler metadataRecord = SchemaBuilder.record("metadata") + .fields(); + metadataRecord.requiredString("misc"); + metadataRecord = metadataRecord.name("nodes").type(SchemaBuilder.array().items(nodeSchema)).noDefault(); + metadataSchema = metadataRecord.endRecord(); + } + + @Override + public void writeHeader(String[] data) { + fields = Arrays.stream(data.clone()).map(this::formatFieldName).collect(Collectors.toList()); + SchemaBuilder.FieldAssembler patientRecords = SchemaBuilder.record("patientData") + .fields(); + + fields.forEach(patientRecords::requiredString); + patientDataSchema = patientRecords.endRecord(); + + Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema); + + entityFieldAssembler = entityFieldAssembler.name("object").type(objectSchema).noDefault(); + entityFieldAssembler.nullableString("id", "null"); + entityFieldAssembler.requiredString("name"); + entitySchema = entityFieldAssembler.endRecord(); + + DatumWriter datumWriter = new GenericDatumWriter(entitySchema); + dataFileWriter = new DataFileWriter(datumWriter); + try { + file = File.createTempFile("result-"+ System.nanoTime(), ".avro"); + dataFileWriter.create(entitySchema, file); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + writeMetadata(); + } + + private String formatFieldName(String s) { + return s.replaceAll("\\\\", "_"); + } + + private void writeMetadata() { + GenericRecord entityRecord = new GenericData.Record(entitySchema); + + List nodeList = new ArrayList<>(); + for (String field : fields) { + GenericRecord nodeData = new GenericData.Record(nodeSchema); + nodeData.put("name", field); + nodeData.put("ontology_reference", ""); + nodeData.put("values", Map.of()); + nodeList.add(nodeData); + } + GenericRecord metadata = new GenericData.Record(metadataSchema); + metadata.put("misc", ""); + metadata.put("nodes", nodeList); + + + entityRecord.put("object", metadata); + entityRecord.put("name", "metadata"); + entityRecord.put("id", "null"); + + try { + dataFileWriter.append(entityRecord); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void writeEntity(Collection entities) { + entities.forEach(entity -> { + if (entity.length != fields.size()) { + throw new IllegalArgumentException("Entity length much match the number of fields in this document"); + } + GenericRecord patientData = new GenericData.Record(patientDataSchema); + for(int i = 0; i < fields.size(); i++) { + patientData.put(fields.get(i), entity[i]); + } + + GenericRecord entityRecord = new GenericData.Record(entitySchema); + entityRecord.put("object", patientData); + entityRecord.put("name", "patientData"); + entityRecord.put("id", "192035"); + + try { + dataFileWriter.append(entityRecord); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + @Override + public void close() { + try { + dataFileWriter.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java new file mode 100644 index 00000000..82f42596 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java @@ -0,0 +1,12 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.io; + +import java.io.IOException; +import java.util.Collection; + +public interface ResultWriter { + void writeHeader(String[] data); + + void writeEntity(Collection data); + + void close() throws IOException; +} diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java new file mode 100644 index 00000000..75759c68 --- /dev/null +++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java @@ -0,0 +1,26 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.io; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + + +public class PfbWriterTest { + + @Test + public void writeValidPFB() { + PfbWriter pfbWriter = new PfbWriter(); + + pfbWriter.writeHeader(new String[] {"\\demographics\\age\\", "\\phs123\\stroke\\"}); + pfbWriter.writeEntity(List.of(new String[]{"80", "Y"}, + new String[]{"70", "N"}, + new String[]{"75", "N"} + )); + pfbWriter.close(); + // todo: validate this programatically + } +} \ No newline at end of file diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java index def35e54..26e34403 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java @@ -181,17 +181,17 @@ private Query convertIncomingQuery(QueryRequest queryJson) private QueryStatus convertToQueryStatus(AsyncResult entity) { QueryStatus status = new QueryStatus(); - status.setDuration(entity.completedTime == 0 ? 0 : entity.completedTime - entity.queuedTime); - status.setResourceResultId(entity.id); - status.setResourceStatus(entity.status.name()); - if (entity.status == AsyncResult.Status.SUCCESS) { - status.setSizeInBytes(entity.stream.estimatedSize()); + status.setDuration(entity.getCompletedTime() == 0 ? 0 : entity.getCompletedTime() - entity.getQueuedTime()); + status.setResourceResultId(entity.getId()); + status.setResourceStatus(entity.getStatus().name()); + if (entity.getStatus() == AsyncResult.Status.SUCCESS) { + status.setSizeInBytes(entity.getStream().estimatedSize()); } - status.setStartTime(entity.queuedTime); - status.setStatus(entity.status.toPicSureStatus()); + status.setStartTime(entity.getQueuedTime()); + status.setStatus(entity.getStatus().toPicSureStatus()); Map metadata = new HashMap(); - metadata.put("picsureQueryId", UUIDv5.UUIDFromString(entity.query.toString())); + metadata.put("picsureQueryId", UUIDv5.UUIDFromString(entity.getQuery().toString())); status.setResultMetadata(metadata); return status; } @@ -214,13 +214,13 @@ public ResponseEntity queryResult(@PathVariable("resourceQueryId") UUID queryId, return ResponseEntity.status(404).build(); } } - if (result.status == AsyncResult.Status.SUCCESS) { - result.stream.open(); + if (result.getStatus() == AsyncResult.Status.SUCCESS) { + result.open(); return ResponseEntity.ok() .contentType(MediaType.TEXT_PLAIN) - .body(new String(result.stream.readAllBytes(), StandardCharsets.UTF_8)); + .body(new String(result.readAllBytes(), StandardCharsets.UTF_8)); } else { - return ResponseEntity.status(400).body("Status : " + result.status.name()); + return ResponseEntity.status(400).body("Status : " + result.getStatus().name()); } } @@ -283,20 +283,6 @@ private ResponseEntity _querySync(QueryRequest resultRequest) throws IOException case DATAFRAME: case SECRET_ADMIN_DATAFRAME: - case DATAFRAME_MERGED: - QueryStatus status = query(resultRequest).getBody(); - while (status.getResourceStatus().equalsIgnoreCase("RUNNING") - || status.getResourceStatus().equalsIgnoreCase("PENDING")) { - status = queryStatus(UUID.fromString(status.getResourceResultId()), null); - } - log.info(status.toString()); - - AsyncResult result = queryService.getResultFor(status.getResourceResultId()); - if (result.status == AsyncResult.Status.SUCCESS) { - result.stream.open(); - return queryOkResponse(new String(result.stream.readAllBytes(), StandardCharsets.UTF_8), incomingQuery, MediaType.TEXT_PLAIN); - } - return ResponseEntity.status(400).contentType(MediaType.APPLICATION_JSON).body("Status : " + result.status.name()); case CROSS_COUNT: return queryOkResponse(countProcessor.runCrossCounts(incomingQuery), incomingQuery, MediaType.APPLICATION_JSON); diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java index c2fc7b78..f54290ed 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java @@ -43,6 +43,7 @@ public class QueryService { private final QueryProcessor queryProcessor; private final TimeseriesProcessor timeseriesProcessor; private final CountProcessor countProcessor; + private final PfbProcessor pfbProcessor; HashMap results = new HashMap<>(); @@ -52,6 +53,7 @@ public QueryService (AbstractProcessor abstractProcessor, QueryProcessor queryProcessor, TimeseriesProcessor timeseriesProcessor, CountProcessor countProcessor, + PfbProcessor pfbProcessor, @Value("${SMALL_JOB_LIMIT}") Integer smallJobLimit, @Value("${SMALL_TASK_THREADS}") Integer smallTaskThreads, @Value("${LARGE_TASK_THREADS}") Integer largeTaskThreads) { @@ -59,6 +61,7 @@ public QueryService (AbstractProcessor abstractProcessor, this.queryProcessor = queryProcessor; this.timeseriesProcessor = timeseriesProcessor; this.countProcessor = countProcessor; + this.pfbProcessor = pfbProcessor; SMALL_JOB_LIMIT = smallJobLimit; SMALL_TASK_THREADS = smallTaskThreads; @@ -85,17 +88,17 @@ public AsyncResult runQuery(Query query) throws ClassNotFoundException, IOExcept // This is all the validation we do for now. if(!ensureAllFieldsExist(query)) { - result.status = Status.ERROR; + result.setStatus(Status.ERROR); }else { if(query.getFields().size() > SMALL_JOB_LIMIT) { - result.jobQueue = largeTaskExecutor; + result.setJobQueue(largeTaskExecutor); } else { - result.jobQueue = smallTaskExecutor; + result.setJobQueue(smallTaskExecutor); } result.enqueue(); } - return getStatusFor(result.id); + return getStatusFor(result.getId()); } ExecutorService countExecutor = Executors.newSingleThreadExecutor(); @@ -110,7 +113,6 @@ private AsyncResult initializeResult(Query query) throws ClassNotFoundException, switch(query.getExpectedResultType()) { case DATAFRAME : case SECRET_ADMIN_DATAFRAME: - case DATAFRAME_MERGED : p = queryProcessor; break; case DATAFRAME_TIMESERIES : @@ -121,17 +123,20 @@ private AsyncResult initializeResult(Query query) throws ClassNotFoundException, case CONTINUOUS_CROSS_COUNT : p = countProcessor; break; + case DATAFRAME_PFB: + p = pfbProcessor; + break; default : throw new RuntimeException("UNSUPPORTED RESULT TYPE"); } - AsyncResult result = new AsyncResult(query, p.getHeaderRow(query)); - result.status = AsyncResult.Status.PENDING; - result.queuedTime = System.currentTimeMillis(); - result.id = UUIDv5.UUIDFromString(query.toString()).toString(); - result.processor = p; - query.setId(result.id); - results.put(result.id, result); + AsyncResult result = new AsyncResult(query, p.getHeaderRow(query)) + .setStatus(AsyncResult.Status.PENDING) + .setQueuedTime(System.currentTimeMillis()) + .setId(UUIDv5.UUIDFromString(query.toString()).toString()) + .setProcessor(p); + query.setId(result.getId()); + results.put(result.getId(), result); return result; } @@ -209,21 +214,21 @@ private List includingOnlyDictionaryFields(Set fields, Set SMALL_JOB_LIMIT ? + AsyncResult[] queue = asyncResult.getQuery().getFields().size() > SMALL_JOB_LIMIT ? largeTaskExecutionQueue.toArray(new AsyncResult[largeTaskExecutionQueue.size()]) : smallTaskExecutionQueue.toArray(new AsyncResult[smallTaskExecutionQueue.size()]); - if(asyncResult.status == Status.PENDING) { + if(asyncResult.getStatus() == Status.PENDING) { ArrayList queueSnapshot = new ArrayList(); for(int x = 0;x