Skip to content

Commit

Permalink
fix: use the Patient.id as the output message key for patient resources
Browse files Browse the repository at this point in the history
  • Loading branch information
chgl committed Apr 26, 2024
1 parent bc766e0 commit 2cd21b5
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 1 deletion.
2 changes: 2 additions & 0 deletions deploy/compose.dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ services:
KAFKA_CFG_LOG_CLEANUP_POLICY: compact
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
ports:
- 127.0.0.1:9094:9094

kafka-connect:
#image: ghcr.io/miracum/kafka-connect-images/cricketeerone-kafka-connect:v1.1.0@sha256:172f25d8b8beed81bf2839a7164eeff1b195238d6b5aa35307eff83296d6d42f
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"dependencies": {
"hl7.fhir.r4.core": "4.0.1",
"de.dktk.oncology": "1.3.0",
"de.medizininformatikinitiative.kerndatensatz.person": "2024.0.0-ballot"
"de.medizininformatikinitiative.kerndatensatz.person": "2024.0.0"
},
"fhirVersions": [
"4.0.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.miracum.streams.ume.obdstofhir.processor;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.util.BundleUtil;
import java.util.*;
import java.util.function.BiFunction;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Patient;
import org.miracum.streams.ume.obdstofhir.FhirProperties;
import org.miracum.streams.ume.obdstofhir.mapper.*;
import org.miracum.streams.ume.obdstofhir.model.Meldeanlass;
Expand All @@ -24,6 +27,7 @@
public class ObdsProcessor extends ObdsToFhirMapper {

private static final Logger LOG = LoggerFactory.getLogger(ObdsProcessor.class);
private static final FhirContext ctx = FhirContext.forR4();

@Value("${spring.profiles.active}")
private String profile;
Expand Down Expand Up @@ -113,6 +117,7 @@ protected ObdsProcessor(
.get("out-Aggregate")
.mapValues(this.getOnkoToPatientBundleMapper())
.filter((key, value) -> value != null)
.selectKey((key, value) -> patientBundleKeySelector(value))
};
} else {
return new KStream[] {
Expand Down Expand Up @@ -206,4 +211,14 @@ public ValueMapper<Pair<MeldungExportList, Bundle>, Bundle> getOnkoToConditionBu
meldungExportList, meldungPair.getRight());
};
}

private static String patientBundleKeySelector(Bundle bundle) {
var patients = BundleUtil.toListOfResourcesOfType(ctx, bundle, Patient.class);

if (patients.isEmpty() || patients.size() > 1) {
throw new RuntimeException("A patient bundle contains more or less than 1 resource");
}
var patient = patients.get(0);
return String.format("%s/%s", patient.getResourceType(), patient.getId());
}
}

0 comments on commit 2cd21b5

Please sign in to comment.