From 75e2448323bae52e6525290b0fb97da2c16f521f Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Tue, 6 Aug 2024 08:18:09 -0400 Subject: [PATCH] ALS-6511: Update pfb result to handle multiple values per variable --- .../avillach/hpds/processing/AsyncResult.java | 3 ++ .../hpds/processing/PfbProcessor.java | 28 ++++++++--------- .../hpds/processing/ResultStoreStream.java | 9 +++++- .../hpds/processing/io/CsvWriter.java | 5 +++ .../hpds/processing/io/PfbWriter.java | 31 +++++++++++++++++-- .../hpds/processing/io/ResultWriter.java | 2 ++ 6 files changed, 61 insertions(+), 17 deletions(-) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java index bbce0020..9f0e414e 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java @@ -189,6 +189,9 @@ public AsyncResult(Query query, HpdsProcessor processor, ResultWriter writer) { public void appendResults(List dataEntries) { stream.appendResults(dataEntries); } + public void appendMultiValueResults(List>> dataEntries) { + stream.appendMultiValueResults(dataEntries); + } public void appendResultStore(ResultStore resultStore) { stream.appendResultStore(resultStore); diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java index 58df38df..6a2b3118 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java @@ -13,6 +13,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import java.util.stream.Stream; @Component public class PfbProcessor implements HpdsProcessor { @@ -44,23 +45,24 @@ public void runQuery(Query query, AsyncResult result) { log.info("Processing " + idList.size() + " rows for result " + result.getId()); Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream() .forEach(patientIds -> { - Map> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds)); - List fieldValuesPerPatient = patientIds.stream().map(patientId -> { - return Arrays.stream(getHeaderRow(query)).map(field -> { + Map>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds)); + List>> fieldValuesPerPatient = patientIds.stream().map(patientId -> { + List> objectStream = Arrays.stream(getHeaderRow(query)).map(field -> { if (PATIENT_ID_FIELD_NAME.equals(field)) { - return patientId.toString(); + return List.of(patientId.toString()); } else { return pathToPatientToValueMap.get(field).get(patientId); } - }).toArray(String[]::new); + }).collect(Collectors.toList()); + return objectStream; }).collect(Collectors.toList()); - result.appendResults(fieldValuesPerPatient); + result.appendMultiValueResults(fieldValuesPerPatient); }); result.closeWriter(); } - private Map> buildResult(AsyncResult result, Query query, TreeSet ids) { - ConcurrentHashMap> pathToPatientToValueMap = new ConcurrentHashMap<>(); + private Map>> buildResult(AsyncResult result, Query query, TreeSet ids) { + ConcurrentHashMap>> pathToPatientToValueMap = new ConcurrentHashMap<>(); List columns = query.getFields().stream() .map(abstractProcessor.getDictionary()::get) .filter(Objects::nonNull) @@ -76,16 +78,16 @@ private Map> buildResult(AsyncResult result, Query // todo: investigate if the parallel stream will thrash the cache if the number of executors is > number of resident cubes columnIndex.parallelStream().forEach((columnId)->{ String columnPath = paths.get(columnId-1); - Map patientIdToValueMap = processColumn(ids, columnPath); + Map> patientIdToValueMap = processColumn(ids, columnPath); pathToPatientToValueMap.put(columnPath, patientIdToValueMap); }); return pathToPatientToValueMap; } - private Map processColumn(TreeSet patientIds, String path) { + private Map> processColumn(TreeSet patientIds, String path) { - Map patientIdToValueMap = new HashMap<>(); + Map> patientIdToValueMap = new HashMap<>(); PhenoCube cube = abstractProcessor.getCube(path); KeyAndValue[] cubeValues = cube.sortedByKey(); @@ -98,9 +100,7 @@ private Map processColumn(TreeSet patientIds, String p idPointer++; } else if(key == patientId){ String value = getResultField(cube, cubeValues, idPointer); - patientIdToValueMap.put(patientId, value); - idPointer++; - break; + patientIdToValueMap.computeIfAbsent(patientId, k -> new ArrayList<>()).add(value); } else { break; } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java index 3fe6cbb2..38daf2f3 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java @@ -36,7 +36,7 @@ public void appendResultStore(ResultStore results) { } writeResultsToTempFile(results, batchSize, entries); } - + /** * A more compact method to append data to the temp file without making assumptions about the composition. * @param entries @@ -44,6 +44,13 @@ public void appendResultStore(ResultStore results) { public void appendResults(List entries) { writer.writeEntity(entries); } + /** + * A more compact method to append data to the temp file without making assumptions about the composition. + * @param entries + */ + public void appendMultiValueResults(List>> entries) { + writer.writeMultiValueEntity(entries); + } private List writeResultsToTempFile(ResultStore results, int batchSize, List entries) { diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java index 5d09b93d..a615094e 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java @@ -45,6 +45,11 @@ public void writeEntity(Collection data) { } } + @Override + public void writeMultiValueEntity(Collection>> data) { + throw new RuntimeException("Method not implemented"); + } + @Override public File getFile() { return file; diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java index 673df4d5..883f8667 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java @@ -58,7 +58,7 @@ public void writeHeader(String[] data) { SchemaBuilder.FieldAssembler patientRecords = SchemaBuilder.record("patientData") .fields(); - fields.forEach(field -> patientRecords.nullableString(field, "null")); + fields.forEach(field -> patientRecords.name(field).type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault()); patientDataSchema = patientRecords.endRecord(); Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema); @@ -124,7 +124,8 @@ public void writeEntity(Collection entities) { } GenericRecord patientData = new GenericData.Record(patientDataSchema); for(int i = 0; i < fields.size(); i++) { - patientData.put(fields.get(i), entity[i]); + List fieldValue = entity[i] != null ? List.of(entity[i]) : List.of(); + patientData.put(fields.get(i), fieldValue); } GenericRecord entityRecord = new GenericData.Record(entitySchema); @@ -140,6 +141,32 @@ public void writeEntity(Collection entities) { }); } + @Override + public void writeMultiValueEntity(Collection>> entities) { + entities.forEach(entity -> { + if (entity.size() != fields.size()) { + throw new IllegalArgumentException("Entity length much match the number of fields in this document"); + } + GenericRecord patientData = new GenericData.Record(patientDataSchema); + for(int i = 0; i < fields.size(); i++) { + List fieldValue = entity.get(i) != null ? entity.get(i) : List.of(); + patientData.put(fields.get(i), fieldValue); + } + + + GenericRecord entityRecord = new GenericData.Record(entitySchema); + entityRecord.put("object", patientData); + entityRecord.put("name", "patientData"); + entityRecord.put("id", "192035"); + + try { + dataFileWriter.append(entityRecord); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + @Override public void close() { try { diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java index 75cbd0ee..6f2f5d64 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java @@ -3,11 +3,13 @@ import java.io.File; import java.io.IOException; import java.util.Collection; +import java.util.List; public interface ResultWriter { void writeHeader(String[] data); void writeEntity(Collection data); + void writeMultiValueEntity(Collection>> data); File getFile();