Skip to content

Commit

Permalink
fix: keep grouping by referenz+tumor_id for non-patient processors
Browse files Browse the repository at this point in the history
  • Loading branch information
chgl committed May 20, 2024
1 parent fbb7b56 commit 3963c7d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 16 deletions.
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This project contains a Kafka Stream processor that creates FHIR resources from

See [package.json](package.json) for a list of used packages and their versions.

## Observations
### Observations

- Histologie (<https://simplifier.net/oncology/histologie>)
- Grading (<https://simplifier.net/oncology/grading>)
Expand All @@ -15,31 +15,35 @@ See [package.json](package.json) for a list of used packages and their versions.
- Fernmetastasen (<https://simplifier.net/oncology/fernmetastasen-duplicate-2>)
- Tod Ursache (<https://simplifier.net/oncology/todursache>)

## Condition
### Condition

- Primärdiagnose (<https://simplifier.net/oncology/primaerdiagnose>)

## Procedure
### Procedure

- Operation (<https://simplifier.net/oncology/operation>)
- Strahlentherapie (<https://simplifier.net/oncology/strahlentherapie>)

## MedicationStatement
### MedicationStatement

- Systemtherapie (<https://simplifier.net/oncology/systemtherapie>)

## Patient
### Patient

- Patient (<https://simplifier.net/medizininformatikinitiative-modulperson>)

## Dev

### Topology

![Stream Topology generated via https://zz85.github.io/kafka-streams-viz/ using http://localhost:8080/actuator/kafkastreamstopology](docs/img/obds-to-fhir-topology.png)

### Dev Stack

- Kafka Broker: `$DOCKER_HOST_IP:9094`
- Kafka Connect: `$DOCKER_HOST_IP:8083`
- AKHQ: `$DOCKER_HOST_IP:8080`
- Oracle DB: `jdbc:oracle:thin:@//$DOCKER_HOST_IP:1521/FREEPDB1` (Use credentials from init script)
- AKHQ: `$DOCKER_HOST_IP:8084`
- Oracle DB: `jdbc:oracle:thin:@//$DOCKER_HOST_IP:1521/FREEPDB1` (User: `DWH_ROUTINE`, Password: `devPassword`)

### Run with

Expand Down
Binary file added docs/img/obds-to-fhir-topology.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected ObdsProcessor(

return (stringOnkoMeldungExpTable, stringOnkoObsBundles) -> {
// return (stringOnkoMeldungExpTable) ->
var output =
var filtered =
stringOnkoMeldungExpTable
// only process adt v2.x.x
.filter((key, value) -> value.getXml_daten().getSchema_Version().matches("^2\\..*"))
Expand All @@ -73,11 +73,14 @@ protected ObdsProcessor(
.getMenge_Meldung()
.getMeldung()
.getMenge_Tumorkonferenz()
== null) // ignore tumor conferences
// group by the patient number. This ensures that all meldungen of one patient
// are processed by the same downstream consumer.
== null); // ignore tumor conferences

var output =
filtered
.groupBy(
(key, meldung) -> KeyValue.pair(getPatIdFromMeldung(meldung), meldung),
(key, meldung) ->
KeyValue.pair(
getPatIdFromMeldung(meldung) + "-" + getTumorIdFromAdt(meldung), meldung),
Grouped.with(Serdes.String(), new MeldungExportSerde()))
.aggregate(
MeldungExportList::new,
Expand Down Expand Up @@ -114,12 +117,24 @@ protected ObdsProcessor(

// ...but only conditionally map Patient resources
if (Objects.equals(profile, "patient")) {
branches.add(
output
.get("out-Aggregate")
var patientStream =
filtered
// group by the patient number. This ensures that all meldungen of one patient
// are processed by the same downstream consumer.
.groupBy(
(key, meldung) -> KeyValue.pair(getPatIdFromMeldung(meldung), meldung),
Grouped.with(Serdes.String(), new MeldungExportSerde()))
.aggregate(
MeldungExportList::new,
(key, value, aggregate) -> aggregate.addElement(value),
(key, value, aggregate) -> aggregate.removeElement(value),
Materialized.with(Serdes.String(), new MeldungExportListSerde()))
.toStream()
.mapValues(this.getOnkoToPatientBundleMapper())
.filter((key, value) -> value != null)
.selectKey((key, value) -> patientBundleKeySelector(value)));
.selectKey((key, value) -> patientBundleKeySelector(value));

branches.add(patientStream);
}

return branches.toArray(new KStream[branches.size()]);
Expand Down

0 comments on commit 3963c7d

Please sign in to comment.