From 842fc235f71e2484a96697d5f3fa569118d89444 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Wed, 20 Nov 2024 08:08:45 -0500 Subject: [PATCH] ALS-7810: Finalize data dictionary pfb output --- .../hpds/processing/dictionary/Concept.java | 2 +- .../hpds/processing/io/PfbWriter.java | 54 +++++++++++++------ .../hpds/processing/io/PfbWriterTest.java | 2 +- 3 files changed, 40 insertions(+), 18 deletions(-) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/Concept.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/Concept.java index 59fe903d..6fe4f706 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/Concept.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/Concept.java @@ -2,5 +2,5 @@ import java.util.Map; -public record Concept(String conceptPath, String name, Map meta) { +public record Concept(String type, String conceptPath, String name, String display, String dataset, String description, Map meta) { } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java index d29b91f7..4e1c4df9 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java @@ -1,5 +1,7 @@ package edu.harvard.hms.dbmi.avillach.hpds.processing.io; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.Concept; import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService; import org.apache.avro.Schema; @@ -24,7 +26,7 @@ public class PfbWriter implements ResultWriter { public static final String PATIENT_TABLE_PREFIX = "pic-sure-"; - public static final String DRS_URL_TABLE_PREFIX = "drs-url-"; + public static final String DATA_DICTIONARY_TABLE_PREFIX = "data-dictionary-"; private Logger log = LoggerFactory.getLogger(PfbWriter.class); private final DictionaryService dictionaryService; @@ -35,7 +37,7 @@ public class PfbWriter implements ResultWriter { private final String queryId; private final String patientTableName; - private final String drsUrlTableName; + private final String dataDictionaryTableName; private SchemaBuilder.FieldAssembler entityFieldAssembler; private List originalFields; @@ -44,7 +46,7 @@ public class PfbWriter implements ResultWriter { private File file; private Schema entitySchema; private Schema patientDataSchema; - private Schema drsUriSchema; + private Schema dataDictionarySchema; private Schema relationSchema; private static final Set SINGULAR_FIELDS = Set.of("patient_id"); @@ -54,7 +56,7 @@ public PfbWriter(File tempFile, String queryId, DictionaryService dictionaryServ this.queryId = queryId; this.dictionaryService = dictionaryService; this.patientTableName = formatFieldName(PATIENT_TABLE_PREFIX + queryId); - this.drsUrlTableName = formatFieldName(DRS_URL_TABLE_PREFIX + queryId); + this.dataDictionaryTableName = formatFieldName(DATA_DICTIONARY_TABLE_PREFIX + queryId); entityFieldAssembler = SchemaBuilder.record("entity") .namespace("edu.harvard.dbmi") .fields(); @@ -85,10 +87,14 @@ public void writeHeader(String[] data) { originalFields = List.of(data); formattedFields = originalFields.stream().map(this::formatFieldName).collect(Collectors.toList()); - drsUriSchema = SchemaBuilder.record(drsUrlTableName) + dataDictionarySchema = SchemaBuilder.record(dataDictionaryTableName) .fields() .requiredString("concept_path") .name("drs_uri").type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault() + .nullableString("type", "null") + .nullableString("display", "null") + .nullableString("dataset", "null") + .nullableString("description", "null") .endRecord(); SchemaBuilder.FieldAssembler patientRecords = SchemaBuilder.record(patientTableName) @@ -103,7 +109,7 @@ public void writeHeader(String[] data) { }); patientDataSchema = patientRecords.endRecord(); - Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema, drsUriSchema); + Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema, dataDictionarySchema); entityFieldAssembler = entityFieldAssembler.name("object").type(objectSchema).noDefault(); entityFieldAssembler.nullableString("id", "null"); @@ -122,36 +128,52 @@ public void writeHeader(String[] data) { } writeMetadata(); - writeDrsUris(); + writeDataDictionary(); } - private void writeDrsUris() { + private void writeDataDictionary() { GenericRecord entityRecord = new GenericData.Record(entitySchema);; Map conceptMap = Map.of(); try { conceptMap = dictionaryService.getConcepts(originalFields).stream() .collect(Collectors.toMap(Concept::conceptPath, Function.identity())); } catch (RuntimeException e) { - log.error("Error fetching DRS URIs from dictionary service", e); + log.error("Error fetching concepts from dictionary service", e); + return; } for (int i = 0; i < formattedFields.size(); i++) { - GenericRecord drsUriData = new GenericData.Record(drsUriSchema); - drsUriData.put("concept_path", formattedFields.get(i)); + String formattedField = formattedFields.get(i); + if ("patient_id".equals(formattedField)) { + continue; + } + GenericRecord dataDictionaryData = new GenericData.Record(dataDictionarySchema); + dataDictionaryData.put("concept_path", formattedField); Concept concept = conceptMap.get(originalFields.get(i)); List drsUris = List.of(); if (concept != null) { Map meta = concept.meta(); if (meta != null) { - drsUris = new ArrayList<>(meta.keySet().stream().toList()); - drsUris.addAll(meta.values().stream().toList()); + String drsUriJson = meta.get("drs_uri"); + if (drsUriJson != null) { + try { + String[] drsUriArray = new ObjectMapper().readValue(drsUriJson, String[].class); + drsUris = List.of(drsUriArray); + } catch (JsonProcessingException e) { + log.error("Error parsing drs_uri as json: " + drsUriJson); + } + } } + dataDictionaryData.put("type", concept.type()); + dataDictionaryData.put("display", concept.display()); + dataDictionaryData.put("dataset", concept.dataset()); + dataDictionaryData.put("description", concept.description()); } - drsUriData.put("drs_uri", drsUris); + dataDictionaryData.put("drs_uri", drsUris); - entityRecord.put("object", drsUriData); - entityRecord.put("name", drsUrlTableName); + entityRecord.put("object", dataDictionaryData); + entityRecord.put("name", dataDictionaryTableName); entityRecord.put("id", "null"); entityRecord.put("relations", List.of()); diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java index aaf37986..1167a090 100644 --- a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java +++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java @@ -28,7 +28,7 @@ public void writeValidPFB() { PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService); Mockito.when(dictionaryService.getConcepts(List.of("patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"))) - .thenReturn(List.of(new Concept("\\demographics\\age\\", "age", Map.of("drs_uri", "a-drs.uri")))); + .thenReturn(List.of(new Concept("Categorical", "\\demographics\\age\\", "age", "AGE", "demographics", "patient age", Map.of("drs_uri", "a-drs.uri")))); pfbWriter.writeHeader(new String[] {"patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"}); List> nullableList = new ArrayList<>();