Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Helper function for choosing a child group default attribute. #349

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
},
"editor.defaultFormatter": "charliermarsh.ruff"
},
"python.testing.pytestArgs": ["tests"],
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
183 changes: 183 additions & 0 deletions src/pynxtools/dataconverter/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,3 +921,186 @@ def nested_dict_to_slash_separated_path(
nested_dict_to_slash_separated_path(val, flattened_dict, path)
else:
flattened_dict[path] = val


def dump_possible_groups_for_default_keys(
concept,
dflt_key_to_grp_li,
special_default_key_to_grp_li,
concept_level,
group_part,
group_name,
):
""" """

# root level default
if concept_level == 0:
default_key = concept + "/@default"
if not special_default_key_to_grp_li.get("root", None):
special_default_key_to_grp_li["root"] = {default_key: []}
special_default_key_to_grp_li["root"][default_key].append(group_name)

# entry level default
elif concept_level == 1:
default_key = concept + "/@default"
if not special_default_key_to_grp_li.get("entry", None):
special_default_key_to_grp_li["entry"] = {default_key: None}
# Assuming multiple entries
if not special_default_key_to_grp_li["entry"].get(default_key, None):
special_default_key_to_grp_li["entry"][default_key] = {
"data": [],
"other": [],
}

if group_part.startswith("DATA"):
special_default_key_to_grp_li["entry"][default_key]["data"].append(
group_name
)
else:
special_default_key_to_grp_li["entry"][default_key]["other"].append(
group_name
)

else:
default_key = concept + "/@default"
if not dflt_key_to_grp_li.get(default_key, None):
dflt_key_to_grp_li[default_key] = {"data": [], "other": []}
if group_part.startswith("DATA"):
dflt_key_to_grp_li[default_key]["data"].append(group_name)
else:
dflt_key_to_grp_li[default_key]["other"].append(group_name)


def set_default_from_key_grp_list_data(
template,
dflt_key_to_grp_li,
special_default_key_to_grp_li,
dflt_key_to_exist_grp,
):
"""Set default attribute for each group of Nexus file.

Each group will have a /@default attrubute refering the immediate child group in a
NeXus definition chain.

If the default attribute already refers to a group in the template that will
be persisted.

For root level attribute, the default will be first entry, if attribute is already empty.

For default attribute in the entry, NXdata group will dominate over other groups.
e.g. /@default = "entry1"
/entry1/@default = "data1"

Parameters
----------
template : Template
Template from filled with datafile and eln.
dflt_key_to_grp_li : dict[str, dict[str, list]]
defalut attribute key to the list of immediate child group.
special_default_key_to_grp_li : dict[str, dict[str, dict[str, list]]]
special dict for root and entry.
dflt_key_to_exist_grp : dict[str, str]
defalut attribute key to the group set by reader.
"""
for deflt_key, value in dflt_key_to_grp_li.items():
pre_defalt_grp = dflt_key_to_exist_grp.get(deflt_key, None)
# Verify if user added the group here
if pre_defalt_grp:
if pre_defalt_grp in value["data"] or pre_defalt_grp in value["other"]:
continue

else:
if value["data"]:
template[deflt_key] = value["data"][0]
elif value["other"]:
template[deflt_key] = value["other"][0]

for key, value in special_default_key_to_grp_li.items():
if key == "root":
if not dflt_key_to_exist_grp.get("/@default", None):
template["/@default"] = value["/@default"][0]

elif key == "entry":
# For muliple entries
for entry_defalt_key, entry_value in value.items():
# check already default set by reader
if dflt_key_to_exist_grp.get(entry_defalt_key, None):
continue
if entry_value["data"]:
template[entry_defalt_key] = entry_value["data"][0]
else:
template[entry_defalt_key] = entry_value["other"][0]


def set_default_from_child_group(template):
"""Set default attribute for each group of Nexus file.

Each group will have a /@default attrubute refering the immediate child group in a
NeXus definition chain.

If the default attribute already refers to a group in the template that will
be persisted.

For root level attribute, the default will be first entry, if attribute is already empty.

For default attribute in the entry, NXdata group will dominate over other groups.
e.g. /@default = "entry1"
/entry1/@default = "data1"

Parameters
----------
template : Template
Template from filled with datafile and eln.
"""
# defalut attribute key to the list of immediate child group
dflt_key_to_grp_li: Optional[dict[str, list]] = {}
# special dict for root and entry
special_default_key_to_grp_li: Optional[dict[str, dict[str, dict[str, list]]]] = {}
# defalut attribute key to the group set by reader
dflt_key_to_exist_grp: dict[str, str] = {}

# "/abc[DATA]/XYe[anything]/mnf[MNYZ]/anything" -> ['DATA', 'anything', 'MNYZ']
pattern = r"\[(.*?)\]"

for template_concept, val in template.items():
if template_concept.endswith("/@default") and val:
dflt_key_to_exist_grp[template_concept] = val
continue

groups_list = template_concept.split("/")[1:] # skip empty string

# skip other attributes like @units, @versions and so on
if groups_list[-1].startswith("@"):
continue
# Assume complete key refers to a field
# If key refers to a group, last chain group will be handled from other keys
else:
groups_list = groups_list[0:-1]

if not groups_list:
continue
concept = ""

for ind, group in enumerate(groups_list):
grp_name = re.findall(pattern, group)
if grp_name:
grp_name = grp_name[0]
else:
grp_name = group
dump_possible_groups_for_default_keys(
concept,
dflt_key_to_grp_li,
special_default_key_to_grp_li,
ind,
group,
grp_name,
)

concept = concept + "/" + group

set_default_from_key_grp_list_data(
template,
dflt_key_to_grp_li,
special_default_key_to_grp_li,
dflt_key_to_exist_grp,
)
78 changes: 76 additions & 2 deletions tests/dataconverter/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@

import numpy as np
import pytest
from setuptools import distutils

from pynxtools.dataconverter import helpers
from pynxtools.dataconverter.template import Template
from pynxtools.dataconverter.validation import validate_dict_against
from setuptools import distutils


def remove_optional_parent(data_dict: Template):
Expand Down Expand Up @@ -641,3 +640,78 @@ def test_warning_on_definition_changed_by_reader(caplog):

assert "/ENTRY[entry]/definition" in template.keys()
assert template["/ENTRY[entry]/definition"] == "NXtest"


# TODO add nxdata in temp_dict example
@pytest.mark.parametrize(
"temp_dict",
[
{
"/ENTRY[entry1]/NXODD_name[nxodd_name]/float_value": 2.0,
"/ENTRY[entry1]/NXODD_name[nxodd_name_2]/float_value": 8.0,
"/ENTRY[entry1]/NXODD_name[nxodd_name_2]/DATA[grp_data1]/data1": [
3,
5,
6,
],
"/ENTRY[entry1]/NXODD_name[nxodd_name_2]/DATA[grp_data2]/data2": [
3,
5,
6,
],
"/ENTRY[entry2]/@default": "nxodd_name_2",
"/ENTRY[entry2]/NXODD_name[nxodd_name]/float_value": 2.0,
"/ENTRY[entry2]/NXODD_name[nxodd_name_2]/float_value": 8.0,
"/ENTRY[entry2]/NXODD_name[nxodd_name_2]/INSTRUMENT[instrument]/DATA[data]/float_value": 8.0,
"/ENTRY[entry2]/NXODD_name[nxodd_name_2]/INSTRUMENT[instrument]/DATA[data1]/float_value": 8.0,
}
],
)
def test_set_default_attr_in_group(temp_dict):
"""Test default attriute that consider child group.
Parameters
----------
template : Template
"""
template = Template(temp_dict)
assert (
"/@default" not in template
), "To test the root level /@default should be empty."
assert (
"/ENTRY[entry1]/@default" not in template
), "To test default attribute, entry attribute should be empty."

helpers.set_default_from_child_group(template)
assert template["/@default"] in [
"entry1",
"entry2",
], "To test the root level /@default should be empty."

assert template["/ENTRY[entry1]/@default"] in [
"nxodd_name",
"nxodd_name_2",
], "To test default attribute, entry attribute should be empty."
assert template["/ENTRY[entry1]/NXODD_name[nxodd_name_2]/@default"] in [
"grp_data1",
"grp_data2",
], "To test default attribute, entry attribute should be empty."

assert (
"/ENTRY[entry1]/NXODD_name[nxodd_name_2]/DATA[data2]/data2/@default"
not in template
), (
"Group can not have a default attribute refering any field,"
" unless it is not supplied by reader."
)
assert (
template["/ENTRY[entry2]/@default"] == "nxodd_name_2"
), "Default attribute supplied by reader can not be overwritten."
assert (
template["/ENTRY[entry2]/NXODD_name[nxodd_name_2]/@default"] == "instrument"
), "Default attribute supplied by reader can not be overwritten."
assert template[
"/ENTRY[entry2]/NXODD_name[nxodd_name_2]/INSTRUMENT[instrument]/@default"
] in [
"data",
"data1",
], "Default attribute supplied by reader can not be overwritten."
Loading