Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logging to linkage process and consolidate FHIR link routes #135

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/site/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Each setting can be configured as follows:

: URI for the Splunk HTTP Event Collector (HEC) endpoint. When set, logs will be sent to
the configured Splunk instance for analysis. The format is
`splunkhec://<token>@<host>:<port>?index=<index>&proto=<protocol>&ssl_verify=<verify>&source=<source>`
`splunkhec://<token>@<host>:<port>?index=<index>&proto=<protocol>&source=<source>`

**Docker Default**: `""`

Expand Down
13 changes: 12 additions & 1 deletion src/recordlinker/hl7/fhir.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
from recordlinker import schemas


def get_first_patient_resource(bundle: dict) -> dict:
"""
Get the first patient resource from a FHIR bundle
"""
for entry in bundle.get("entry", []):
resource = entry.get("resource", {})
if resource.get("resourceType") == "Patient":
return resource
return {}


def fhir_record_to_pii_record(fhir_record: dict) -> schemas.PIIRecord:
"""
Parse the FHIR record into a PIIRecord object
Expand Down Expand Up @@ -92,4 +103,4 @@ def add_person_resource(
},
}
bundle.get("entry", []).append(person_resource)
return bundle
return bundle
79 changes: 58 additions & 21 deletions src/recordlinker/linking/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,24 @@

import collections
import dataclasses
import logging
import typing

from sqlalchemy import orm

from recordlinker import models
from recordlinker import schemas
from recordlinker.database import mpi_service
from recordlinker.utils.mock import MockTracer

LOGGER = logging.getLogger(__name__)
TRACER: typing.Any = None
try:
from opentelemetry import trace

TRACER = trace.get_tracer(__name__)
except ImportError:
# OpenTelemetry is an optional dependency, if its not installed use a mock tracer
from recordlinker.utils.mock import MockTracer

TRACER = MockTracer()


Expand All @@ -47,6 +48,7 @@ def compare(
kwargs: dict[typing.Any, typing.Any] = algorithm_pass.kwargs

results: list[float] = []
details: dict[str, typing.Any] = {"patient.reference_id": patient.reference_id}
for e in evals:
# TODO: can we do this check earlier?
feature = getattr(schemas.Feature, e.feature, None)
Expand All @@ -55,15 +57,20 @@ def compare(
# Evaluate the comparison function and append the result to the list
result: float = e.func(record, patient, feature, **kwargs) # type: ignore
results.append(result)
return matching_rule(results, **kwargs) # type: ignore
details[f"evaluator.{e.feature}.result"] = result
is_match = matching_rule(results, **kwargs)
details["rule.results"] = is_match
# TODO: this may add a lot of noise, consider moving to debug
LOGGER.info("patient comparison", extra=details)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is being logged here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I see - the result of the match rule (true or false for match or no match, respectively). I was thinking more of the log odds score, similarity score, log odds ratio and fuzzy threshold as helpful metrics to log in regards to feature comparison here, to make more explainable why a prediction was made. What do you think?

Also, what do you think about logging the match rule being used alongside this match rule return value? That way, there's more context to interpret the true/false result (e.g., on what matching criteria this result was generated).

return is_match


def link_record_against_mpi(
record: schemas.PIIRecord,
session: orm.Session,
algorithm: models.Algorithm,
external_person_id: typing.Optional[str] = None,
) -> tuple[models.Patient, models.Person | None, list[LinkResult]]:
) -> tuple[models.Patient, models.Person | None, list[LinkResult], schemas.Prediction]:
"""
Runs record linkage on a single incoming record (extracted from a FHIR
bundle) using an existing database as an MPI. Uses a flexible algorithm
Expand All @@ -88,6 +95,12 @@ def link_record_against_mpi(
scores: dict[models.Person, float] = collections.defaultdict(float)
# the minimum ratio of matches needed to be considered a cluster member
belongingness_ratio_lower_bound, belongingness_ratio_upper_bound = algorithm.belongingness_ratio
# initialize counters to track evaluation results to log
result_counts: dict[str, int] = {
"patients_compared": 0,
"above_lower_bound": 0,
"above_upper_bound": 0,
}
for algorithm_pass in algorithm.passes:
with TRACER.start_as_current_span("link.pass"):
# initialize a dictionary to hold the clusters of patients for each person
Expand All @@ -111,38 +124,62 @@ def link_record_against_mpi(
with TRACER.start_as_current_span("link.compare"):
if compare(record, patient, algorithm_pass):
matched_count += 1
alhayward marked this conversation as resolved.
Show resolved Hide resolved
result_counts["patients_compared"] += len(patients)
# calculate the match ratio for this person cluster
belongingness_ratio = matched_count / len(patients)
LOGGER.info(
"cluster belongingness",
extra={
"ratio": belongingness_ratio,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"ratio": belongingness_ratio,
"belongingness_ratio": belongingness_ratio,

"person.reference_id": person.reference_id,
"matched": matched_count,
"total": len(patients),
"algorithm.ratio_lower": belongingness_ratio_lower_bound,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"algorithm.ratio_lower": belongingness_ratio_lower_bound,
"algorithm.belongingness_ratio_lower": belongingness_ratio_lower_bound,

"algorithm.ratio_upper": belongingness_ratio_upper_bound,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"algorithm.ratio_upper": belongingness_ratio_upper_bound,
"algorithm.belongingness_ratio_upper": belongingness_ratio_upper_bound,

},
)
if belongingness_ratio >= belongingness_ratio_lower_bound:
# The match ratio is larger than the minimum cluster threshold,
# optionally update the max score for this person
scores[person] = max(scores[person], belongingness_ratio)

prediction: schemas.Prediction = "possible_match"
matched_person: typing.Optional[models.Person] = None
if scores:
# Find the person with the highest matching score
matched_person, _ = max(scores.items(), key=lambda i: i[1])

sorted_scores: list[LinkResult] = [LinkResult(k, v) for k, v in sorted(scores.items(), reverse=True, key=lambda item: item[1])]
if not scores:
results: list[LinkResult] = [
LinkResult(k, v) for k, v in sorted(scores.items(), reverse=True, key=lambda i: i[1])
]
result_counts["above_lower_bound"] = len(results)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will only return a value > 1 if include_multiple_matches=true. Is that designed as intended?

if not results:
# No match
matched_person = models.Person() # Create new Person Cluster
results = []
elif sorted_scores[0].belongingness_ratio >= belongingness_ratio_upper_bound:
prediction = "no_match"
matched_person = models.Person() # Create new Person Cluster
elif results[0].belongingness_ratio >= belongingness_ratio_upper_bound:
# Match (1 or many)
matched_person = sorted_scores[0].person
results = [x for x in sorted_scores if x.belongingness_ratio >= belongingness_ratio_upper_bound] # Multiple matches
prediction = "match"
matched_person = results[0].person
# reduce results to only those that meet the upper bound threshold
results = [x for x in results if x.belongingness_ratio >= belongingness_ratio_upper_bound]
result_counts["above_upper_bound"] = len(results)
if not algorithm.include_multiple_matches:
results = [results[0]] # 1 Match (highest Belongingness Ratio)
else:
# Possible match
matched_person = None
results = sorted_scores
# reduce results to only the highest match
results = [results[0]]

with TRACER.start_as_current_span("insert"):
patient = mpi_service.insert_patient(
session, record, matched_person, record.external_id, external_person_id, commit=False
)

LOGGER.info(
"link results",
extra={
"person.reference_id": matched_person and matched_person.reference_id,
"patient.reference_id": patient.reference_id,
"result.prediction": prediction,
"result.count_patients_compared": result_counts["patients_compared"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also log the number of Person clusters compared? That would put into context the number of Patients compared.

"result.count_persons_above_lower": result_counts["above_lower_bound"],
"result.count_persons_above_upper": result_counts["above_upper_bound"],
},
)

# return a tuple indicating whether a match was found and the person ID
return (patient, patient.person, results)
return (patient, patient.person, results, prediction)
Loading
Loading