Skip to content

Commit

Permalink
fix: groupBy just the patient id
Browse files Browse the repository at this point in the history
Ensures that all meldungen by one patient are processed consistently by the same task in the same partition.
  • Loading branch information
chgl committed May 7, 2024
1 parent 7a61b11 commit 4e257bb
Showing 1 changed file with 37 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,10 @@ protected ObdsProcessor(
.getMeldung()
.getMenge_Tumorkonferenz()
== null) // ignore tumor conferences
// group by the patient number. This ensures that all meldungen of one patient
// are processed by the same downstream consumer.
.groupBy(
(key, value) ->
KeyValue.pair(
"Struct{REFERENZ_NUMMER="
+ getPatIdFromAdt(value)
+ ",TUMOR_ID="
+ getTumorIdFromAdt(value)
+ "}",
value),
(key, meldung) -> KeyValue.pair(getPatIdFromMeldung(meldung), meldung),
Grouped.with(Serdes.String(), new MeldungExportSerde()))
.aggregate(
MeldungExportList::new,
Expand All @@ -94,52 +89,40 @@ protected ObdsProcessor(
.branch((k, v) -> v != null, Branched.as("Aggregate"))
.noDefaultBranch();

var branches = new ArrayList<KStream<String, Bundle>>();

// always map Medication, Observation, Procedure, Condition...
branches.addAll(
List.of(
output
.get("out-Aggregate")
.mapValues(this.getOnkoToMedicationStatementBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.mapValues(this.getOnkoToObservationBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.mapValues(this.getOnkoToProcedureBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.leftJoin(stringOnkoObsBundles, Pair::of)
.mapValues(this.getOnkoToConditionBundleMapper())
.filter((key, value) -> value != null)));

// ...but only conditionally map Patient resources
if (Objects.equals(profile, "patient")) {
return new KStream[] {
output
.get("out-Aggregate")
.mapValues(this.getOnkoToMedicationStatementBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.mapValues(this.getOnkoToObservationBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.mapValues(this.getOnkoToProcedureBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.leftJoin(stringOnkoObsBundles, Pair::of)
.mapValues(this.getOnkoToConditionBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.mapValues(this.getOnkoToPatientBundleMapper())
.filter((key, value) -> value != null)
.selectKey((key, value) -> patientBundleKeySelector(value))
};
} else {
return new KStream[] {
output
.get("out-Aggregate")
.mapValues(this.getOnkoToMedicationStatementBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.mapValues(this.getOnkoToObservationBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.mapValues(this.getOnkoToProcedureBundleMapper())
.filter((key, value) -> value != null),
output
.get("out-Aggregate")
.leftJoin(stringOnkoObsBundles, Pair::of)
.mapValues(this.getOnkoToConditionBundleMapper())
.filter((key, value) -> value != null)
};
branches.add(
output
.get("out-Aggregate")
.mapValues(this.getOnkoToPatientBundleMapper())
.filter((key, value) -> value != null)
.selectKey((key, value) -> patientBundleKeySelector(value)));
}

return branches.toArray(new KStream[branches.size()]);
};
}

Expand Down Expand Up @@ -216,7 +199,8 @@ private static String patientBundleKeySelector(Bundle bundle) {
var patients = BundleUtil.toListOfResourcesOfType(ctx, bundle, Patient.class);

if (patients.isEmpty() || patients.size() > 1) {
throw new RuntimeException("A patient bundle contains more or less than 1 resource");
throw new RuntimeException(
String.format("A patient bundle contains %d resources instead of 1", patients.size()));
}
var patient = patients.get(0);
return String.format("%s/%s", patient.getResourceType(), patient.getId());
Expand Down

0 comments on commit 4e257bb

Please sign in to comment.