From 2cd21b5de0f490c3eb6209d736d756bd974e7143 Mon Sep 17 00:00:00 2001 From: chgl Date: Fri, 26 Apr 2024 19:03:24 +0200 Subject: [PATCH] fix: use the Patient.id as the output message key for patient resources --- deploy/compose.dev.yaml | 2 ++ package.json | 2 +- .../ume/obdstofhir/processor/ObdsProcessor.java | 15 +++++++++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/deploy/compose.dev.yaml b/deploy/compose.dev.yaml index 9195d275..52ef3cec 100644 --- a/deploy/compose.dev.yaml +++ b/deploy/compose.dev.yaml @@ -30,6 +30,8 @@ services: KAFKA_CFG_LOG_CLEANUP_POLICY: compact KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT + ports: + - 127.0.0.1:9094:9094 kafka-connect: #image: ghcr.io/miracum/kafka-connect-images/cricketeerone-kafka-connect:v1.1.0@sha256:172f25d8b8beed81bf2839a7164eeff1b195238d6b5aa35307eff83296d6d42f diff --git a/package.json b/package.json index 6b88d8dd..cfdb3310 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "dependencies": { "hl7.fhir.r4.core": "4.0.1", "de.dktk.oncology": "1.3.0", - "de.medizininformatikinitiative.kerndatensatz.person": "2024.0.0-ballot" + "de.medizininformatikinitiative.kerndatensatz.person": "2024.0.0" }, "fhirVersions": [ "4.0.1" diff --git a/src/main/java/org/miracum/streams/ume/obdstofhir/processor/ObdsProcessor.java b/src/main/java/org/miracum/streams/ume/obdstofhir/processor/ObdsProcessor.java index a4a96136..8643c0cf 100644 --- a/src/main/java/org/miracum/streams/ume/obdstofhir/processor/ObdsProcessor.java +++ b/src/main/java/org/miracum/streams/ume/obdstofhir/processor/ObdsProcessor.java @@ -1,5 +1,7 @@ package org.miracum.streams.ume.obdstofhir.processor; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.util.BundleUtil; import java.util.*; import java.util.function.BiFunction; import org.apache.commons.lang3.tuple.Pair; @@ -7,6 +9,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.*; import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.Patient; import org.miracum.streams.ume.obdstofhir.FhirProperties; import org.miracum.streams.ume.obdstofhir.mapper.*; import org.miracum.streams.ume.obdstofhir.model.Meldeanlass; @@ -24,6 +27,7 @@ public class ObdsProcessor extends ObdsToFhirMapper { private static final Logger LOG = LoggerFactory.getLogger(ObdsProcessor.class); + private static final FhirContext ctx = FhirContext.forR4(); @Value("${spring.profiles.active}") private String profile; @@ -113,6 +117,7 @@ protected ObdsProcessor( .get("out-Aggregate") .mapValues(this.getOnkoToPatientBundleMapper()) .filter((key, value) -> value != null) + .selectKey((key, value) -> patientBundleKeySelector(value)) }; } else { return new KStream[] { @@ -206,4 +211,14 @@ public ValueMapper, Bundle> getOnkoToConditionBu meldungExportList, meldungPair.getRight()); }; } + + private static String patientBundleKeySelector(Bundle bundle) { + var patients = BundleUtil.toListOfResourcesOfType(ctx, bundle, Patient.class); + + if (patients.isEmpty() || patients.size() > 1) { + throw new RuntimeException("A patient bundle contains more or less than 1 resource"); + } + var patient = patients.get(0); + return String.format("%s/%s", patient.getResourceType(), patient.getId()); + } }