Skip to content
This repository has been archived by the owner on Jan 14, 2025. It is now read-only.

Commit

Permalink
feat: added support for custom mappers (#13)
Browse files Browse the repository at this point in the history
Includes uklfr examples.
  • Loading branch information
uklft authored Jul 5, 2021
1 parent b8516e5 commit 01771fa
Show file tree
Hide file tree
Showing 9 changed files with 581 additions and 4 deletions.
176 changes: 176 additions & 0 deletions ahd2fhir/mappers/ahd_to_observation_kidney_stone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import datetime
import uuid
from typing import List

from fhir.resources.codeableconcept import CodeableConcept
from fhir.resources.coding import Coding
from fhir.resources.documentreference import DocumentReference
from fhir.resources.fhirtypes import DateTime, QuantityType, ReferenceType
from fhir.resources.meta import Meta
from fhir.resources.observation import Observation
from structlog import get_logger

log = get_logger()

CLINICAL_STATUS_MAPPING = {"ACTIVE": "active", "RESOLVED": "resolved"}
SIDE_MAPPING = {
"LEFT": ("7771000", "Left"),
"RIGHT": ("24028007", "Right"),
"BOTH": ("51440002", "Right and left"),
}

UNIT_MAPPING = {"cm": "258672001", "mm": "258673006"}

OBSERVATION_PROFILE = (
"https://www.medizininformatik-initiative.de/"
+ "fhir/core/StructureDefinition/Observation"
)
UKLFR_TYPE_KIDNEY_STONE = "de.uklfr.KidneyStoneAnnotator.KidneyStoneInfo"
AHD_TYPE = UKLFR_TYPE_KIDNEY_STONE

OBSERVATION_CATEGORY_SYSTEM = (
"http://terminology.hl7.org/CodeSystem/observation-category"
)

STONE_DIMENSION_MAP = {
"width": {
"code": "9805-3",
"name": "Width of Stone",
"display": "Width (Stone) [Length]",
},
"length": {
"code": "9799-8",
"name": "Length of Stone",
"display": "Length (Stone) [Length]",
},
}


def get_fhir_resources(
ahd_response_entry, document_reference: DocumentReference
) -> List[Observation]:
return get_kidney_stone_from_annotation(
annotation=ahd_response_entry,
date=document_reference.date,
doc_ref=document_reference,
)


def fhirdate_now() -> DateTime:
return DateTime.validate(datetime.datetime.now(datetime.timezone.utc))


def get_kidney_stone_from_annotation(
annotation, date, doc_ref: DocumentReference
) -> List[Observation]:
# Observation details
observation = Observation.construct()
observation.status = "final"
observation.meta = Meta.construct()
observation.meta.profile = [OBSERVATION_PROFILE]
observation.subject = doc_ref.subject
observation.effectiveDateTime = date or fhirdate_now()
observation.id = str(uuid.uuid4())

# Coding + Code
observation_coding = Coding.construct()
observation_coding.system = "http://snomed.info/sct"
observation_coding.code = "95570007"
observation_coding.display = "Kidney stone (disorder)"
observation_coding.userSelected = False
observation_code = CodeableConcept.construct()
observation_code.coding = [observation_coding]
observation_code.text = "Kidney stone"
observation.code = observation_code

# Category
observation_category = Coding.construct()
observation_category.system = OBSERVATION_CATEGORY_SYSTEM
observation_category.code = "imaging"
observation_category.display = "Imaging"
category = CodeableConcept.construct()
category.coding = [observation_category]
observation.category = [category]

# Method
observation_method = Coding.construct()
observation_method.system = "http://snomed.info/sct"
observation_method.code = "363680008"
observation_method.display = " Radiographic imaging procedure"
method = CodeableConcept.construct()
method.coding = [observation_method]
observation.method = method

# valueCodeableConcept
value_codeable_concept = CodeableConcept()
value_coding = Coding.construct()
value_coding.system = "http://snomed.info/sct"
value_coding.code = "56381008"
value_codeable_concept.coding = [value_coding]
value_codeable_concept.text = "Calculus (morphologic abnormality)"
observation.valueCodeableConcept = value_codeable_concept

observations = [observation]

# Create Observation for each Dimension (X/Y)
if (stone_size := annotation["size"]) is not None:
observation.hasMember = []
stone_unit = stone_size["unit"]["coveredText"]
stone_len = stone_size["value1"]
stone_len_obs = stone_dimension_observation(
observation, "length", stone_len, unit=stone_unit
)
observation.hasMember.append(stone_len_obs[1])
observations.append(stone_len_obs[0])

stone_width = stone_size["value2"]
if stone_width in [0, "0"]:
stone_width = stone_len
stone_width_obs = stone_dimension_observation(
observation, "width", stone_width, unit=stone_unit
)
observation.hasMember.append(stone_width_obs[1])
observations.append(stone_width_obs[0])

return observations


def stone_dimension_observation(
parent: Observation, dimension: str, value: float, unit: str
):
dimension_type = STONE_DIMENSION_MAP[dimension]
# Observation details
observation = Observation.construct()
observation.status = "final"
observation.meta = Meta.construct()
observation.meta.profile = [OBSERVATION_PROFILE]
observation.subject = parent.subject
observation.effectiveDateTime = parent.effectiveDateTime
observation.id = str(uuid.uuid4())

# Coding
observation_coding = Coding.construct()
observation_coding.system = "http://loinc.org"
observation_coding.code = dimension_type["code"]
observation_coding.display = dimension_type["display"]
observation_coding.userSelected = False
# Code
observation_code = CodeableConcept.construct()
observation_code.coding = [observation_coding]
observation_code.text = dimension_type["name"]
observation.code = observation_code

# valueCodeableConcept
value_quantity = QuantityType()
value_quantity["system"] = "http://unitsofmeasure.org"
value_quantity["code"] = unit
value_quantity["value"] = value
value_quantity["unit"] = {"mm": "Millimeter", "cm": "Centimeter"}[unit]
observation.valueQuantity = value_quantity

# Create Reference to Observation
observation_reference = ReferenceType()
observation_reference["reference"] = f"Observation/{observation.id}"
observation_reference["display"] = observation.code.coding[0].display

return observation, observation_reference
120 changes: 120 additions & 0 deletions ahd2fhir/mappers/ahd_to_observation_smkstat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import datetime
import uuid
from typing import List

from fhir.resources.codeableconcept import CodeableConcept
from fhir.resources.coding import Coding
from fhir.resources.documentreference import DocumentReference
from fhir.resources.fhirtypes import DateTime
from fhir.resources.meta import Meta
from fhir.resources.observation import Observation
from structlog import get_logger

log = get_logger()

OBSERVATION_PROFILE = (
"https://www.medizininformatik-initiative.de/"
+ "fhir/core/StructureDefinition/Observation"
)
UKLFR_TYPE_SMKSTAT = "de.medunifreiburg.imbi.mds.extraction.types.Smoking"
AHD_TYPE = UKLFR_TYPE_SMKSTAT

OBSERVATION_CATEGORY_SYSTEM = (
"http://terminology.hl7.org/CodeSystem/observation-category"
)

SNOMED_LOINC_MAPPING = {
"PAST-SMOKER": {"code": "LA15920-4", "text": "Former smoker"},
"CURRENT-SMOKER": {"code": "LA18976-3", "text": "Current every day smoker"},
"CURRENT-NON-SMOKER": {"code": "LA15920-4", "text": "Former smoker"},
"NEVER-SMOKER": {"code": "LA18978-9", "text": "Never smoker"},
"CURRENT-OR-PAST-SMOKER": {
"code": "LA18979-7",
"text": "Smoker, current status unknown",
},
"UNKNOW": {"code": "LA18980-5", "text": "Unknown if ever smoked"},
}


def get_fhir_resources(
ahd_response_entry, document_reference: DocumentReference
) -> List[Observation]:
return get_smoking_status_observation_from_annotation(
annotation=ahd_response_entry,
date=document_reference.date,
doc_ref=document_reference,
)


def fhirdate_now():
return DateTime.validate(datetime.datetime.now(datetime.timezone.utc))


def get_smoking_status_observation_from_annotation(
annotation, date, doc_ref: DocumentReference
):
# Observation details
observation = Observation.construct()
observation.status = "final"
observation.meta = Meta.construct()
observation.meta.profile = [OBSERVATION_PROFILE]
observation.subject = doc_ref.subject
observation.effectiveDateTime = date or fhirdate_now()
observation.id = str(uuid.uuid4())

# Coding
observation_coding = Coding.construct()
observation_coding.system = "http://loinc.org"
observation_coding.code = "72166-2"
observation_coding.display = "Tobacco smoking status"
observation_coding.userSelected = False

# Code
observation_code = CodeableConcept.construct()
observation_code.coding = [observation_coding]
observation_code.text = "Tobacco smoking status"
observation.code = observation_code

# Category
observation_category = Coding.construct()
observation_category.system = OBSERVATION_CATEGORY_SYSTEM
observation_category.code = "social-history"
observation_category.display = "Social History"
category = CodeableConcept.construct()
category.coding = [observation_category]
observation.category = [category]

smkstat = SNOMED_LOINC_MAPPING[annotation["smokingStatus"]]
# valueCodeableConcept
value_codeable_concept = CodeableConcept()
value_coding = Coding.construct()
value_coding.system = "http://loinc.org"
value_coding.code = smkstat["code"]

value_coding_snomed = Coding.construct()
value_coding_snomed.system = "http://snomed.info/sct"
value_coding_snomed.code = annotation["sctid"]

value_codeable_concept.coding = [value_coding, value_coding_snomed]
value_codeable_concept.text = smkstat["text"]
observation.valueCodeableConcept = value_codeable_concept

# SNOMED concept
# value_codeable_concept = CodeableConcept()
# value_coding_snomed = Coding.construct()
# value_coding_snomed.system = "http://snomed.info/sct"
# value_coding_snomed.code = annotation["sctid"]
# value_codeable_concept.coding = [value_coding]
# value_codeable_concept.text = annotation["smokingStatus"]
# observation.valueCodeableConcept = value_codeable_concept

# if pack_years := annotation["packYears"] != "null":
# valueCodeableConcept
# value_quantity = QuantityType()
# value_quantity["value"] = pack_years
# value_quantity.system = "http://snomed.info/sct"
# value_quantity.code = "401201003"
# value_quantity.text = "401201003"
# observation.valueQuantity = value_quantity

return observation
17 changes: 17 additions & 0 deletions ahd2fhir/utils/custom_mappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from fhir.resources.documentreference import DocumentReference

from ahd2fhir.mappers import ahd_to_observation_kidney_stone as ks
from ahd2fhir.mappers import ahd_to_observation_smkstat as smk

mapper_functions = {
smk.AHD_TYPE: [smk.get_fhir_resources],
ks.AHD_TYPE: [ks.get_fhir_resources],
}


def custom_mappers(val: dict, document_reference: DocumentReference) -> list:
results = []
if (mappers := mapper_functions.get(val["type"], None)) is not None:
for mapper in mappers:
results.extend(mapper(val, document_reference))
return results
22 changes: 20 additions & 2 deletions ahd2fhir/utils/resource_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import datetime
import logging
import os
import time
from typing import List, Tuple

Expand All @@ -21,6 +22,7 @@

from ahd2fhir.mappers import ahd_to_condition, ahd_to_medication_statement
from ahd2fhir.utils.bundle_builder import BundleBuilder
from ahd2fhir.utils.custom_mappers import custom_mappers, mapper_functions
from ahd2fhir.utils.device_builder import build_device
from ahd2fhir.utils.fhir_utils import sha256_of_identifier

Expand Down Expand Up @@ -238,6 +240,10 @@ def _process_documentreference(self, document_reference: DocumentReference):
if statement is not None:
medication_statement_lists.append(statement)

# if custom_mappers_enabled
if os.getenv("CUSTOM_MAPPERS_ENABLED", "False").lower() in ["true", "1"]:
total_results.extend(custom_mappers(val, document_reference))

medication_results = []
medication_statement_results = []
for medication_statement_list in medication_statement_lists:
Expand Down Expand Up @@ -299,10 +305,22 @@ def _extract_text_from_resource(
def _perform_text_analysis(
self, text: str, mime_type: str = "text/plain", lang: str = None
):
types = ",".join(
[
AHD_TYPE_DIAGNOSIS,
AHD_TYPE_MEDICATION,
AHD_TYPE_DOCUMENT_ANNOTATION,
*mapper_functions.keys(),
]
)
if mime_type == "text/html":
return self.pipeline.analyse_html(text, language=lang)
return self.pipeline.analyse_html(
text, language=lang, annotation_types=types
)
else:
return self.pipeline.analyse_text(text, language=lang)
return self.pipeline.analyse_text(
text, language=lang, annotation_types=types
)

def _build_composition_identifier_from_documentreference(
self,
Expand Down
Loading

0 comments on commit 01771fa

Please sign in to comment.