Skip to content

Commit

Permalink
Nexus file generation:
Browse files Browse the repository at this point in the history
hdf5 link,
parsing nexis,
with error.
  • Loading branch information
RubelMozumder committed Dec 18, 2024
1 parent e3164ff commit df4c741
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 14 deletions.
90 changes: 76 additions & 14 deletions src/nomad_measurements/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@
from nomad.datamodel.hdf5 import HDF5Reference
from nomad.units import ureg
from pydantic import BaseModel, Field
from pynxtools.dataconverter.helpers import (
generate_template_from_nxdl,
get_nxdl_root_and_path,
)
from pynxtools.dataconverter.template import Template
from pynxtools.dataconverter.writer import Writer as pynxtools_writer

from nomad_measurements.xrd.nx import populate_nx_dataset_and_attribute

if TYPE_CHECKING:
from nomad.datamodel.data import (
Expand Down Expand Up @@ -326,27 +334,81 @@ def write_file(self):
main archive file (e.g. HDF5, NeXus).
"""
if self.nexus:
try:
self._write_nx_file()
except Exception as e:
self.nexus = False
self.logger.warning(
f'Encountered "{e}" error while creating nexus file. '
'Creating h5 file instead.'
)
self._write_hdf5_file()
else:
self._write_hdf5_file()
self._write_nx_file()
# try:
# self._write_nx_file()
# except Exception as e:
# self.nexus = False
# self.logger.warning(
# f'Encountered "{e}" error while creating nexus file. '
# 'Creating h5 file instead.'
# )
# self._write_hdf5_file()
# else:
# self._write_hdf5_file()

def _write_nx_file(self):
"""
Method for creating a NeXus file. Additional data from the archive is added
to the `hdf5_data_dict` before creating the nexus file. This provides a NeXus
view of the data in addition to storing array data.
"""
if self.data_file.endswith('.h5'):
self.data_file = self.data_file.replace('.h5', '.nxs')
raise NotImplementedError('Method `write_nx_file` is not implemented.')
from nomad.processing.data import Entry

app_def = 'NXxrd_pan'
nxdl_root, nxdl_f_path = get_nxdl_root_and_path(app_def)
template = Template()
generate_template_from_nxdl(nxdl_root, template)
attr_dict = {}
dataset_dict = {}
populate_nx_dataset_and_attribute(
archive=self.archive, attr_dict=attr_dict, dataset_dict=dataset_dict
)
for nx_path, dset in list(self._hdf5_datasets.items()) + list(
dataset_dict.items()
):
try:
template[nx_path] = dset.data
except KeyError:
template['optional'][nx_path] = dset.data

hdf_path = self._remove_nexus_annotations(nx_path)
self._set_hdf5_reference(
self.archive,
dset.archive_path,
f'/uploads/{self.archive.m_context.upload_id}/raw'
f'/{self.data_file}#{hdf_path}',
)
for nx_path, attr_d in list(self._hdf5_attributes.items()) + list(
attr_dict.items()
):
for attr_k, attr_v in attr_d.items():
if attr_k:
try:
template[f'{nx_path}/@{attr_k}'] = attr_v
except KeyError:
template['optional'][f'{nx_path}/@{attr_k}'] = attr_v
try:
nx_full_path = os.path.join(
self.archive.m_context.raw_path(), self.data_file
)
pynxtools_writer(
data=template, nxdl_f_path=nxdl_f_path, output_path=nx_full_path
).write()

# entry_list = Entry.objects(
# upload_id=self.archive.m_context.upload_id, mainfile=self.data_file
# )
# if not entry_list:
# self.archive.m_context.process_updated_raw_file(self.data_file)

except Exception as exc:
if os.path.exists(self.data_file):
os.remove(self.data_file)
self.data_file = self.data_file.rsplit(os.pathsep, 1)[-1]
raise Exception('NeXus file can not be generated.') from exc

# raise NotImplementedError('Method `write_nx_file` is not implemented.')
# TODO add archive data to `hdf5_data_dict` before creating the nexus file. Use
# `populate_hdf5_data_dict` method for each quantity that is needed in .nxs
# file. Create a NeXus file with the data in `hdf5_data_dict`.
Expand Down
106 changes: 106 additions & 0 deletions src/nomad_measurements/xrd/nx.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
# limitations under the License.
#

from typing import TYPE_CHECKING, Any, Optional
import pint

import copy
from pydantic import BaseModel, Field

if TYPE_CHECKING:
from nomad.datamodel.datamodel import EntryArchive


NEXUS_DATASET_PATHS = [
'/ENTRY[entry]/experiment_result/intensity',
'/ENTRY[entry]/experiment_result/two_theta',
Expand All @@ -37,3 +47,99 @@
'/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/ratio_k_alphatwo_k_alphaone',
'/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/kbeta',
]


CONCEPT_MAP = {
'/ENTRY[entry]/method': 'archive.data.method',
'/ENTRY[entry]/measurement_type': 'archive.data.diffraction_method_name',
'/ENTRY[entry]/INSTRUMENT[instrument]/DETECTOR[detector]/scan_axis': 'archive.data.results[0].scan_axis',
'/ENTRY[entry]/experiment_config/count_time': 'archive.data.results[0].integration_time',
'/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/xray_tube_material': 'archive.data.xrd_settings.source.xray_tube_material',
'/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/xray_tube_current': 'archive.data.xrd_settings.source.xray_tube_current',
'/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/xray_tube_voltage': 'archive.data.xrd_settings.source.xray_tube_voltage',
'/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/k_alpha_one': 'archive.data.xrd_settings.source.kalpha_one',
'/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/k_alpha_two': 'archive.data.xrd_settings.source.kalpha_two',
'/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/ratio_k_alphatwo_k_alphaone': 'archive.data.xrd_settings.source.ratio_kalphatwo_kalphaone',
'/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/kbeta': 'archive.data.xrd_settings.source.kbeta',
'/ENTRY[entry]/@default': 'experiment_result',
'/ENTRY[entry]/experiment_result/@signal': 'intensity',
'/ENTRY[entry]/definition': 'NXxrd_pan',
}


def walk_through_object(parent_obj, attr_chain):
"""
Walk though the object until reach the leaf.
Args:
parent_obj: This is a python obj.
e.g.Arvhive
attr_chain: Dot separated obj chain.
e.g. 'archive.data.xrd_settings.source.xray_tube_material'
default: A value to be returned by default, if not data is found.
"""
if parent_obj is None:
return parent_obj

if isinstance(attr_chain, str) and attr_chain.startswith('archive.'):
parts = attr_chain.split('.')
child_obj = None
for part in parts[1:]:
child_nm = part
if '[' in child_nm:
child_nm, index = child_nm.split('[')
index = int(index[:-1])
# section always exists
child_obj = getattr(parent_obj, child_nm)[index]
else:
child_obj = getattr(parent_obj, child_nm, None)
parent_obj = child_obj

return child_obj


def populate_nx_dataset_and_attribute(
archive: 'EntryArchive', attr_dict: dict, dataset_dict: dict
):
"""Construct datasets and attributes for nexus and populate."""
from nomad_measurements.utils import DatasetModel

concept_map = copy.deepcopy(CONCEPT_MAP)
for nx_path, arch_path in concept_map.items():
if arch_path.startswith('archive.'):
data = walk_through_object(archive, arch_path)
else:
data = arch_path # default value

dataset = DatasetModel(
data=data,
)

if isinstance(data, pint.Quantity):
if str(data.units) != 'unitless' and str(data.units):
attr_tmp = {nx_path: dict(units=str(data.units))}
attr_dict.update(attr_tmp)
# attr_dict[nx_path].update({'units': str(data.units)})
dataset.data = data.magnitude

l_part, r_part = nx_path.split('/', 1)
if r_part.startswith('@'):
attr_dict[l_part] = {r_part.replace('@', ''): data}
else:
dataset_dict[nx_path] = dataset


def add_group_and_return_child_group(child_group_name, parent_group=None, nxclass=None):
"""Create group with name `child_group_name` under the parent_group"""

if (parts := child_group_name.split('[', 1)) and len(parts) > 1:
nxclass = parts[0]
grp_name_tmp = parts[1].split(']')[0]
else:
grp_name_tmp = child_group_name
parent_group.require_group(grp_name_tmp)
child_group = parent_group[grp_name_tmp]
if nxclass:
child_group.attrs['NX_class'] = 'NX' + nxclass.lower()

return child_group

0 comments on commit df4c741

Please sign in to comment.