Skip to content

Commit

Permalink
Merge pull request #34986 from dimagi/nh/pmi_evolve_2024
Browse files Browse the repository at this point in the history
New columns for PMI Evolve Supervisory Report
  • Loading branch information
kaapstorm authored Aug 16, 2024
2 parents 6e72ec5 + 4c897cd commit 9a791d9
Show file tree
Hide file tree
Showing 4 changed files with 371 additions and 110 deletions.
28 changes: 28 additions & 0 deletions custom/abt/reports/data_sources/supervisory_v2020.json
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,34 @@
"is_primary_key": false,
"transform": {},
"type": "expression"
},
{
"column_id": "description",
"datatype": "string",
"display_name": "description",
"expression": {
"datatype": null,
"property_name": "description",
"type": "property_name"
},
"is_nullable": true,
"is_primary_key": false,
"transform": {},
"type": "expression"
},
{
"column_id": "time_spent",
"datatype": "string",
"display_name": "time_spent",
"expression": {
"datatype": null,
"property_name": "time_spent",
"type": "property_name"
},
"is_nullable": true,
"is_primary_key": false,
"transform": {},
"type": "expression"
}
],
"description": "",
Expand Down
262 changes: 152 additions & 110 deletions custom/abt/reports/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,23 @@ def _get_flag_name(cls, item, spec):
def _get_responsible_follow_up(self, spec):
return spec.get('responsible_follow_up', "")

@classmethod
def _get_description(cls, item, spec):
description = ()
question = spec.get('description')
if question:
path = spec['base_path'] + question
description = cls._get_val(item, path)
return description if description != () else ''

@classmethod
def _get_time_spent(cls, item, spec):
time_spent = ()
path = spec.get('time_spent')
if path:
time_spent = cls._get_val(item, path)
return time_spent if time_spent != () else ''

def __call__(self, item, evaluation_context=None):
"""
Given a document (item), return a list of documents representing each
Expand Down Expand Up @@ -230,120 +247,145 @@ def __call__(self, item, evaluation_context=None):
form_value = self._get_val(partial, spec['question'])
warning_type = spec.get("warning_type", None)

if warning_type == "unchecked" and form_value:
# Don't raise flag if no answer given
ignore = spec.get("ignore", [])
section = spec.get("section", "data")
unchecked = self._get_unchecked(
item,
spec.get('base_path', []) + spec['question'],
form_value,
ignore,
section
)
if unchecked:
# Raise a flag because there are unchecked answers.
docs.append({
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(msg=", ".join(unchecked)),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec)
})

elif warning_type == "unchecked_special" and form_value:
# Don't raise flag if no answer given
ignore = spec.get("ignore", [])
section = spec.get("section", "data")
master_value = self._get_val(item, ['insecticide_prep_grp', 'Q10', 'sop_full_ppe'])
second_unchecked = self._get_unchecked(
item,
spec.get('base_path', []) + spec['question'],
form_value,
ignore,
section
)
if not master_value and second_unchecked:
# Raise a flag because master question is not answered but duplicate question is.
docs.append({
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(msg=", ".join(second_unchecked)),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec)
})

elif warning_type == "q3_special" and form_value:
# One of the questions doesn't follow the same format as the
# others, hence this special case.
missing_items = ""
if form_value == "only_license":
missing_items = "cell"
if form_value == "only_cell":
missing_items = "license"
if form_value == "none":
missing_items = "cell, license"
if missing_items:
docs.append({
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(msg=missing_items),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec)
})
elif warning_type == "not_selected" and form_value:
value = spec.get("value", "")
if form_value and value not in form_value:
warning_question_data = partial if not spec.get('warning_question_root', False) else item
docs.append({
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(
msg=self._get_val(warning_question_data, spec.get('warning_question', None)) or ""
),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec)
})

flag_doc_methods = {
'unchecked': self._get_unchecked_flag_doc,
'unchecked_special': self._get_unchecked_special_flag_doc,
'q3_special': self._get_q3_special_flag_doc,
'not_selected': self._get_not_selected_flag_doc,
}
if warning_type in flag_doc_methods and form_value:
method = flag_doc_methods[warning_type]
flag_doc = method(item, spec, partial, names, form_value)
else:
danger_value = spec.get('answer', [])
if form_value == danger_value or (
self._question_answered(form_value) and
self._raise_for_any_answer(danger_value)
):
warning_question_data = partial if not spec.get('warning_question_root', False) else item
docs.append({
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(
msg=self._get_val(warning_question_data, spec.get('warning_question', None)) or ""
),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec)
})
flag_doc = self._get_answer_flag_doc(item, spec, partial, names, form_value)
if flag_doc:
docs.append(flag_doc)

return docs

def _get_unchecked_flag_doc(self, item, spec, partial, names, form_value):
# Don't raise flag if no answer given
ignore = spec.get("ignore", [])
section = spec.get("section", "data")
unchecked = self._get_unchecked(
item,
spec.get('base_path', []) + spec['question'],
form_value,
ignore,
section
)
if unchecked:
# Raise a flag because there are unchecked answers.
return {
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(msg=", ".join(unchecked)),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec),
'description': self._get_description(item, spec),
'time_spent': self._get_time_spent(item, spec),
}

def _get_unchecked_special_flag_doc(self, item, spec, partial, names, form_value):
# Don't raise flag if no answer given
ignore = spec.get("ignore", [])
section = spec.get("section", "data")
master_value = self._get_val(item, ['insecticide_prep_grp', 'Q10', 'sop_full_ppe'])
second_unchecked = self._get_unchecked(
item,
spec.get('base_path', []) + spec['question'],
form_value,
ignore,
section
)
if not master_value and second_unchecked:
# Raise a flag because master question is not answered but duplicate question is.
return {
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(msg=", ".join(second_unchecked)),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec),
'description': self._get_description(item, spec),
'time_spent': self._get_time_spent(item, spec),
}

def _get_q3_special_flag_doc(self, item, spec, partial, names, form_value):
# One of the questions doesn't follow the same format as the
# others, hence this special case.
missing_items = ""
if form_value == "only_license":
missing_items = "cell"
if form_value == "only_cell":
missing_items = "license"
if form_value == "none":
missing_items = "cell, license"
if missing_items:
return {
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(msg=missing_items),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec),
'description': self._get_description(item, spec),
'time_spent': self._get_time_spent(item, spec),
}

def _get_not_selected_flag_doc(self, item, spec, partial, names, form_value):
value = spec.get("value", "")
if form_value and value not in form_value:
warning_question_data = partial if not spec.get('warning_question_root', False) else item
return {
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(
msg=self._get_val(warning_question_data, spec.get('warning_question', None)) or ""
),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec),
'description': self._get_description(item, spec),
'time_spent': self._get_time_spent(item, spec),
}

def _get_answer_flag_doc(self, item, spec, partial, names, form_value):
danger_value = spec.get('answer', [])
if form_value == danger_value or (
self._question_answered(form_value)
and self._raise_for_any_answer(danger_value)
):
warning_question_data = partial if not spec.get('warning_question_root', False) else item
return {
'flag': self._get_flag_name(item, spec),
'warning': self._get_warning(spec, item).format(
msg=self._get_val(warning_question_data, spec.get('warning_question', None)) or ""
),
'comments': self._get_comments(
partial if not self.comment_from_root else item,
spec
),
'names': names,
'form_name': self._get_form_name(item),
'responsible_follow_up': self._get_responsible_follow_up(spec),
'description': self._get_description(item, spec),
'time_spent': self._get_time_spent(item, spec),
}


class AbtSupervisorExpressionSpec(AbtExpressionSpec):
type = TypeProperty('abt_supervisor')
Expand Down
Loading

0 comments on commit 9a791d9

Please sign in to comment.