Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logging to linkage process and consolidate FHIR link routes #135

Merged
merged 19 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/recordlinker/hl7/fhir.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
from recordlinker import schemas


def get_first_patient_resource(bundle: dict) -> dict:
"""
Get the first patient resource from a FHIR bundle
"""
for entry in bundle.get("entry", []):
resource = entry.get("resource", {})
if resource.get("resourceType") == "Patient":
return resource
return {}


def fhir_record_to_pii_record(fhir_record: dict) -> schemas.PIIRecord:
"""
Parse the FHIR record into a PIIRecord object
Expand Down Expand Up @@ -92,4 +103,4 @@ def add_person_resource(
},
}
bundle.get("entry", []).append(person_resource)
return bundle
return bundle
63 changes: 43 additions & 20 deletions src/recordlinker/linking/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,24 @@

import collections
import dataclasses
import logging
import typing

from sqlalchemy import orm

from recordlinker import models
from recordlinker import schemas
from recordlinker.database import mpi_service
from recordlinker.utils.mock import MockTracer

LOGGER = logging.getLogger(__name__)
TRACER: typing.Any = None
try:
from opentelemetry import trace

TRACER = trace.get_tracer(__name__)
except ImportError:
# OpenTelemetry is an optional dependency, if its not installed use a mock tracer
from recordlinker.utils.mock import MockTracer

TRACER = MockTracer()


Expand Down Expand Up @@ -63,7 +64,7 @@ def link_record_against_mpi(
session: orm.Session,
algorithm: models.Algorithm,
external_person_id: typing.Optional[str] = None,
) -> tuple[models.Patient, models.Person | None, list[LinkResult]]:
) -> tuple[models.Patient, models.Person | None, list[LinkResult], schemas.Prediction]:
"""
Runs record linkage on a single incoming record (extracted from a FHIR
bundle) using an existing database as an MPI. Uses a flexible algorithm
Expand Down Expand Up @@ -113,36 +114,58 @@ def link_record_against_mpi(
matched_count += 1
alhayward marked this conversation as resolved.
Show resolved Hide resolved
# calculate the match ratio for this person cluster
belongingness_ratio = matched_count / len(patients)
LOGGER.info(
"cluster belongingness",
extra={
"ratio": belongingness_ratio,
ericbuckley marked this conversation as resolved.
Show resolved Hide resolved
"person.reference_id": person.reference_id,
"matched": matched_count,
"total": len(patients),
"algorithm.ratio_lower": belongingness_ratio_lower_bound,
ericbuckley marked this conversation as resolved.
Show resolved Hide resolved
"algorithm.ratio_upper": belongingness_ratio_upper_bound,
ericbuckley marked this conversation as resolved.
Show resolved Hide resolved
"algorithm.pass.blockingkeys": algorithm_pass.blocking_keys,
"algorithm.pass.evaluators": algorithm_pass.evaluator_features,
},
)
if belongingness_ratio >= belongingness_ratio_lower_bound:
# The match ratio is larger than the minimum cluster threshold,
# optionally update the max score for this person
scores[person] = max(scores[person], belongingness_ratio)

prediction: schemas.Prediction = "possible_match"
matched_person: typing.Optional[models.Person] = None
if scores:
# Find the person with the highest matching score
matched_person, _ = max(scores.items(), key=lambda i: i[1])

sorted_scores: list[LinkResult] = [LinkResult(k, v) for k, v in sorted(scores.items(), reverse=True, key=lambda item: item[1])]
if not scores:
results: list[LinkResult] = [
LinkResult(k, v) for k, v in sorted(scores.items(), reverse=True, key=lambda i: i[1])
]
if not results:
# No match
matched_person = models.Person() # Create new Person Cluster
results = []
elif sorted_scores[0].belongingness_ratio >= belongingness_ratio_upper_bound:
prediction = "no_match"
matched_person = models.Person() # Create new Person Cluster
elif results[0].belongingness_ratio >= belongingness_ratio_upper_bound:
# Match (1 or many)
matched_person = sorted_scores[0].person
results = [x for x in sorted_scores if x.belongingness_ratio >= belongingness_ratio_upper_bound] # Multiple matches
prediction = "match"
matched_person = results[0].person
# reduce results to only those that meet the upper bound threshold
results = [x for x in results if x.belongingness_ratio >= belongingness_ratio_upper_bound]
if not algorithm.include_multiple_matches:
results = [results[0]] # 1 Match (highest Belongingness Ratio)
else:
# Possible match
matched_person = None
results = sorted_scores
# reduce results to only the highest match
results = [results[0]]

with TRACER.start_as_current_span("insert"):
patient = mpi_service.insert_patient(
session, record, matched_person, record.external_id, external_person_id, commit=False
)

LOGGER.info(
"link results",
extra={
"person.reference_id": matched_person and matched_person.reference_id,
"patient.reference_id": patient.reference_id,
"result.prediction": prediction,
"result.count": len(results),
"result.scored_count": len(scores),
alhayward marked this conversation as resolved.
Show resolved Hide resolved
},
)

# return a tuple indicating whether a match was found and the person ID
return (patient, patient.person, results)
return (patient, patient.person, results, prediction)
7 changes: 7 additions & 0 deletions src/recordlinker/models/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ def evaluators(self, value: list[dict]):
if hasattr(self, "_bound_evaluators"):
del self._bound_evaluators

@property
def evaluator_features(self) -> list[str]:
"""
Get the features used by the evaluators for this algorithm pass.
"""
return [e["feature"] for e in self.evaluators]

def bound_evaluators(self) -> list[BoundEvaluator]:
"""
Get the evaluators for this algorithm pass, bound to the algorithm.
Expand Down
204 changes: 59 additions & 145 deletions src/recordlinker/routes/link_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import fastapi
import sqlalchemy.orm as orm

from recordlinker import models
from recordlinker import schemas
from recordlinker.database import algorithm_service
from recordlinker.database import get_session
Expand All @@ -20,119 +21,49 @@
router = fastapi.APIRouter()


@router.post("", summary="Link Record")
async def link_piirecord(
request: fastapi.Request,
input: typing.Annotated[schemas.LinkInput, fastapi.Body()],
response: fastapi.Response,
db_session: orm.Session = fastapi.Depends(get_session),
) -> schemas.LinkResponse:
def algorithm_or_422(db_session: orm.Session, label: str | None) -> models.Algorithm:
"""
Compare a PII Record with records in the Master Patient Index (MPI) to
check for matches with existing patient records If matches are found,
returns the patient and person reference id's
Get the Algorithm, or default if no label. Raise a 422 if no Algorithm can be found.
"""
if input.algorithm:
algorithm = algorithm_service.get_algorithm(db_session, input.algorithm)
else:
algorithm = algorithm_service.default_algorithm(db_session)

algorithm = (
algorithm_service.get_algorithm(db_session, label)
if label
else algorithm_service.default_algorithm(db_session)
)
cbrinson-rise8 marked this conversation as resolved.
Show resolved Hide resolved
if not algorithm:
msg = "Error: No algorithm found"
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY, detail=msg
status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="No algorithm found",
)
return algorithm

# link the record
try:
# Make a copy of record_to_link so we don't modify the original
(patient, person, results) = link.link_record_against_mpi(
record=input.record,
session=db_session,
algorithm=algorithm,
external_person_id=input.external_person_id,
)
return schemas.LinkResponse(
patient_reference_id=patient.reference_id,
person_reference_id=(person and person.reference_id),
results=[schemas.LinkResult(**r.__dict__) for r in results]
)

except ValueError:
msg = "Error: Bad request"
raise fastapi.HTTPException(status_code=fastapi.status.HTTP_400_BAD_REQUEST, detail=msg)


@router.post("/dibbs", summary="Link FHIR for DIBBs")
async def link_dibbs(
@router.post("", summary="Link Record")
async def link_piirecord(
request: fastapi.Request,
input: typing.Annotated[schemas.LinkFhirInput, fastapi.Body()],
input: typing.Annotated[schemas.LinkInput, fastapi.Body()],
response: fastapi.Response,
db_session: orm.Session = fastapi.Depends(get_session),
) -> schemas.LinkFhirResponse:
) -> schemas.LinkResponse:
"""
Compare a FHIR bundle with records in the Master Patient Index (MPI) to
Compare a PII Record with records in the Master Patient Index (MPI) to
check for matches with existing patient records If matches are found,
returns the FHIR bundle with updated references to existing patients.
This is a special endpoint that allows integration into a DIBBs pipeline,
as it accepts and returns FHIR bundles.
returns the patient and person reference id's
"""
input_bundle = input.bundle
external_id = input.external_person_id

if input.algorithm:
algorithm = algorithm_service.get_algorithm(db_session, input.algorithm)
else:
algorithm = algorithm_service.default_algorithm(db_session)

if not algorithm:
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Error: Invalid algorithm specified"
)

# Now extract the patient record we want to link
try:
record_to_link = [
entry.get("resource")
for entry in input_bundle.get("entry", [])
if entry.get("resource", {}).get("resourceType", "") == "Patient"
][0]
except IndexError:
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_400_BAD_REQUEST,
detail="Supplied bundle contains no Patient resource to link on."
)


# convert record to PII
pii_record: schemas.PIIRecord = fhir.fhir_record_to_pii_record(record_to_link)

# Now link the record
try:
(patient, person, results) = link.link_record_against_mpi(
record=pii_record,
session=db_session,
algorithm=algorithm,
external_person_id=external_id,
)
updated_bundle: dict | None = None
if person:
updated_bundle = fhir.add_person_resource(
str(person.reference_id), pii_record.external_id, input_bundle
)
return schemas.LinkFhirResponse(
patient_reference_id=patient.reference_id,
person_reference_id=(person and person.reference_id),
results=[schemas.LinkResult(**r.__dict__) for r in results],
updated_bundle=updated_bundle
)

except ValueError as err:
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_400_BAD_REQUEST,
detail=f"Could not connect to database: {err}"
)
algorithm = algorithm_or_422(db_session, input.algorithm)

(patient, person, results, prediction) = link.link_record_against_mpi(
record=input.record,
session=db_session,
algorithm=algorithm,
external_person_id=input.external_person_id,
)
return schemas.LinkResponse(
prediction=prediction,
patient_reference_id=patient.reference_id,
person_reference_id=(person and person.reference_id),
results=[schemas.LinkResult(**r.__dict__) for r in results],
)


@router.post("/fhir", summary="Link FHIR")
Expand All @@ -141,59 +72,42 @@ async def link_fhir(
input: typing.Annotated[schemas.LinkFhirInput, fastapi.Body()],
response: fastapi.Response,
db_session: orm.Session = fastapi.Depends(get_session),
) -> schemas.LinkResponse:
) -> schemas.LinkFhirResponse:
"""
Compare a FHIR bundle with records in the Master Patient Index (MPI) to
check for matches with existing patient records If matches are found,
returns the patient and person reference id's
returns the FHIR bundle with updated references to existing patients.
"""
input_bundle = input.bundle
external_id = input.external_person_id

if input.algorithm:
algorithm = algorithm_service.get_algorithm(db_session, input.algorithm)
else:
algorithm = algorithm_service.default_algorithm(db_session)

if not algorithm:
response.status_code = fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY
raise fastapi.HTTPException(status_code=422, detail="Error: No algorithm found")
algorithm = algorithm_or_422(db_session, input.algorithm)

# Now extract the patient record we want to link
try:
record_to_link = [
entry.get("resource")
for entry in input_bundle.get("entry", [])
if entry.get("resource", {}).get("resourceType", "") == "Patient"
][0]
except IndexError:
response.status_code = fastapi.status.HTTP_400_BAD_REQUEST
patient: dict = fhir.get_first_patient_resource(input.bundle)
if not patient:
raise fastapi.HTTPException(
status_code=400,
detail="Error: Supplied bundle contains no Patient resource to link on.",
status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Supplied bundle contains no Patient resource",
)

# convert record to PII
pii_record: schemas.PIIRecord = fhir.fhir_record_to_pii_record(record_to_link)

# link the record
try:
# Make a copy of pii_record so we don't modify the original
(patient, person, results) = link.link_record_against_mpi(
record=pii_record,
session=db_session,
algorithm=algorithm,
external_person_id=external_id,
)
return schemas.LinkResponse(
patient_reference_id=patient.reference_id,
person_reference_id=(person and person.reference_id),
results=[schemas.LinkResult(**r.__dict__) for r in results]
)

record: schemas.PIIRecord = fhir.fhir_record_to_pii_record(patient)
except ValueError:
response.status_code = fastapi.status.HTTP_400_BAD_REQUEST
raise fastapi.HTTPException(
status_code=400,
detail="Error: Bad request"
)
status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Invalid Patient resource",
)

(patient, person, results, prediction) = link.link_record_against_mpi(
record=record,
session=db_session,
algorithm=algorithm,
external_person_id=input.external_person_id,
)
return schemas.LinkFhirResponse(
prediction=prediction,
patient_reference_id=patient.reference_id,
person_reference_id=(person and person.reference_id),
results=[schemas.LinkResult(**r.__dict__) for r in results],
updated_bundle=(
person
and fhir.add_person_resource(str(person.reference_id), record.external_id, input.bundle)
),
)
Loading
Loading