Skip to content

Commit

Permalink
enrich cql parser
Browse files Browse the repository at this point in the history
  • Loading branch information
delcroip committed Dec 9, 2022
1 parent 8bd1e34 commit b4b08db
Showing 1 changed file with 32 additions and 42 deletions.
74 changes: 32 additions & 42 deletions pyfhirsdc/serializers/librarySerializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def generate_attached_library(resource, df_actions, type = 'pd'):
else:
resource = add_library_extentions(resource, library.id)


def generate_library(name, df_actions, type = 'pd', description = None):

id = clean_name(name)
Expand Down Expand Up @@ -227,7 +226,7 @@ def check_expression_keyword(row, keword):

# libs [{name,version,alias}]
# parameters [{name,type}]
def writeLibraryHeader(library, libs = []):
def writeLibraryHeader(library, libs = [], other_header='context Patient'):
return """/*
@author: Patrick Delcroix
@description: This library is part of the project {3}
Expand All @@ -239,15 +238,16 @@ def writeLibraryHeader(library, libs = []):
{5}
context Patient
{6}
""".format(
get_fhir_cfg().version,
library.name,
get_fhir_cfg().lib_version,
get_processor_cfg().scope,
get_include_lib(libs, library),
get_include_parameters(library.parameter))
get_include_parameters(library.parameter),
other_header)


# libs is a list {name='', version='', alias = ''}
Expand Down Expand Up @@ -305,49 +305,32 @@ def write_cql_pd(library, planDefinition):
return cql


def get_observation_cql_from_concepts(concepts, lib):
def get_code_cql_from_concepts(concepts, lib):
# write 3 cql : start, end, applicability
list_of_display = []
cql = {}
libs = [{'name':'emcarebase','alias':'B','version':get_fhir_cfg().lib_version}]
cql['header'] = writeLibraryHeader(lib, libs)
libs = []
cql['header'] = writeLibraryHeader(lib, libs,
'codesystem "{}" : \'{}\'\n'.format(get_processor_cfg().scope, get_custom_codesystem_url()) )
i = 0
if concepts is not None:
for concept in concepts:
concept.display=str(concept.display).lower()
if concept.display not in list_of_display:
list_of_display.append(concept.display)
if (concept is not None and concept.code is not None):
concept_cql = write_obsevation(concept)
concept_cql = write_code(concept)
#concept_cql = write_obsevation(concept)
if concept_cql is not None:
append_used_obs(concept.code, concept.display)
cql[i] = concept_cql
i = i+1
else:
print("Warning: obs {} is defined multiple times".format(concept.display))
print("Warning: {} :{} is defined multiple times".format(lib.name,concept.display))
return cql

def get_valueset_cql_from_concepts(concepts, lib):
# write 3 cql : start, end, applicability
cql = {}
list_of_display = []
libs = [{'name':'emcarebase','alias':'B','version':get_fhir_cfg().lib_version}]
cql['header'] = writeLibraryHeader(lib, libs)
i = 0
if concepts:
for concept in concepts:
concept.display=str(concept.display).lower()
if concept.display not in list_of_display:
list_of_display.append(concept.display)
if (concept and concept.code):
concept_cql = write_valueset(concept)
if concept_cql is not None:
append_used_valueset(concept.code,concept.display)
cql[i] = concept_cql
i = i+1
else:
print("Warning: code {} is defined multiple times".format(concept.display))
return cql



def write_valueset(concept):
cql = ""
Expand All @@ -371,6 +354,13 @@ def write_obsevation(concept):
" B.HasObs('{}')".format(concept.code, get_custom_codesystem_url()) + "\n\n"
return cql

def write_code(concept):
cql = ""
if concept.display is not None and pd.notna(concept.display):
## Output false, manual process to convert the pseudo-code to CQL
cql += "code \"{0}\": '{1}' from \"{2}\" display '{3}'\n".format(concept.display.replace('"', '\\"'),concept.code, get_processor_cfg().scope ,concept.display.replace("'", "\\'"))

return cql


def write_action_condition(action):
Expand Down Expand Up @@ -504,7 +494,7 @@ def map_to_obs_valueset(cql_exp, df):
obs_linkid = [x.lower() for x in get_used_obs().keys()]
obs_label = [x.lower() for x in get_used_obs().values()]
changed = []
matches = re.findall(r'(^|.)"([^"\.]+)"',cql_exp)
matches = re.findall(r'([a-z\.])?"([^"\.]+)"',cql_exp)
out = cql_exp
out = out.replace('HasCond', 'Base.HasCond')
out = out.replace('HasObs', 'Base.HasObs')
Expand All @@ -525,22 +515,22 @@ def map_to_obs_valueset(cql_exp, df):
elif match in ("No"):
out = out.replace('"{}"'.format(match), 'false'.format(match) )
changed.append(match)
elif match.lower() in valueset_linkid:
elif match.lower() in valueset_linkid and prefix in ('',' ','v'):
changed.append(match)
out = out.replace('"{0}"'.format(match), "Base.coding('{0}')".format(match) )
elif match.lower() in valueset_label:
linkid = list(get_used_valueset().keys())[list(valueset_label).index(match.lower())]
out = out.replace('{0}"{1}"'.format(prefix if prefix == 'v' else '',match), 'val."{0}"'.format(get_used_valueset()[match.lower()].lower()) )
elif match.lower() in valueset_label and prefix in ('',' ','v'):

changed.append(match)
out = out.replace('"{0}"'.format(match), "Base.coding('{0}')".format(linkid) )
elif match.lower() in obs_linkid:
out = out.replace('{0}"{1}"'.format(prefix if prefix == 'v' else '',match), 'val."{0}"'.format(match.lower()) )
elif match.lower() in obs_linkid and prefix in ('',' ','o'):
changed.append(match)
out = out.replace('"{0}"'.format(match), "Base.GetObsValue('{0}')".format(match.lower()) )
elif match.lower() in obs_label:
out = out.replace('{0}"{1}"'.format(prefix if prefix == 'o' else '',match), "Base.GetObsValue('{0}')".format( match) )
elif match.lower() in obs_label and prefix in ('',' ','o'):
linkid = list(get_used_obs().keys())[list(obs_label).index(match.lower())]
changed.append(match)
out = out.replace('"{0}"'.format(match), "Base.GetObsValue('{0}')".format(linkid) )
out = out.replace('{0}"{1}"'.format(prefix if prefix == 'o' else '',match), "Base.GetObsValue('{0}')".format( linkid) )
else:
print("Warning: string not translated {} ".format(match))
print("Warning: string not translated {} {} ".format(prefix,match))

# quick fix remove the select multiple
matches = re.findall(r"(Base\.GetObsValue\('([^']+)'\) *= *Base\.GetObsValue\('([^']+)'\))",out)
Expand Down

0 comments on commit b4b08db

Please sign in to comment.