diff --git a/pyproject.toml b/pyproject.toml
index d3f7b434..d063d438 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -35,9 +35,10 @@ maintainers = [
]
license = { file = "LICENSE" }
dependencies = [
- "nomad-lab>=1.3.6",
+ "nomad-lab>=1.3.6",
"xmltodict==0.13.0",
"fairmat-readers-xrd>=0.0.3",
+ "pynxtools==0.9.3",
"nomad-material-processing",
"fairmat-readers-transmission",
]
diff --git a/src/nomad_measurements/utils.py b/src/nomad_measurements/utils.py
index 250e030f..06ab7b16 100644
--- a/src/nomad_measurements/utils.py
+++ b/src/nomad_measurements/utils.py
@@ -15,12 +15,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import collections
+import copy
import os.path
+import re
from typing import (
TYPE_CHECKING,
+ Any,
+ Optional,
)
+import h5py
import numpy as np
+import pint
+from nomad.datamodel.hdf5 import HDF5Reference
+from nomad.units import ureg
+from pydantic import BaseModel, Field
+from pynxtools.dataconverter.helpers import (
+ generate_template_from_nxdl,
+ get_nxdl_root_and_path,
+)
+from pynxtools.dataconverter.template import Template
+from pynxtools.dataconverter.writer import Writer as pynxtools_writer
if TYPE_CHECKING:
from nomad.datamodel.data import (
@@ -34,6 +50,10 @@
)
+class NXFileGenerationError(Exception):
+ pass
+
+
def get_reference(upload_id: str, entry_id: str) -> str:
return f'../uploads/{upload_id}/archive/{entry_id}#data'
@@ -166,3 +186,443 @@ def get_bounding_range_2d(ax1, ax2):
]
return ax1_range, ax2_range
+
+
+class DatasetModel(BaseModel):
+ """
+ Pydantic model for the dataset to be stored in the HDF5 file.
+ """
+
+ data: Any = Field(description='The data to be stored in the HDF5 file.')
+ archive_path: Optional[str] = Field(
+ None, description='The path of the quantity in the NOMAD archive.'
+ )
+ internal_reference: Optional[bool] = Field(
+ False,
+ description='If True, an internal reference is set to an existing HDF5 '
+ 'dataset.',
+ )
+
+
+class HDF5Handler:
+ """
+ Class for handling the creation of auxiliary files to store big data arrays outside
+ the main archive file (e.g. HDF5, NeXus).
+ """
+
+ def __init__(
+ self,
+ filename: str,
+ archive: 'EntryArchive',
+ logger: 'BoundLogger',
+ nexus_dataset_map: dict = None,
+ ):
+ """
+ Initialize the handler.
+
+ Args:
+ filename (str): The name of the auxiliary file.
+ archive (EntryArchive): The NOMAD archive.
+ logger (BoundLogger): A structlog logger.
+ nexus_dataset_map (dict): The NeXus dataset map containing the nexus file
+ dataset paths and the corresponding archive paths.
+ """
+ if not filename.endswith(('.nxs', '.h5')):
+ raise ValueError('Only .h5 or .nxs files are supported.')
+
+ self.filename = filename
+ self.archive = archive
+ self.logger = logger
+
+ self.nexus = bool(nexus_dataset_map)
+ self.nexus_dataset_map = nexus_dataset_map
+ self.valid_dataset_paths = (
+ list(nexus_dataset_map.keys()) if nexus_dataset_map else []
+ )
+
+ self._hdf5_datasets = collections.OrderedDict()
+ self._hdf5_attributes = collections.OrderedDict()
+ self._hdf5_path_map = collections.OrderedDict()
+
+ def add_dataset(
+ self,
+ path: str,
+ params: dict,
+ validate_path: bool = True,
+ ):
+ """
+ Add a dataset to the HDF5 file. The dataset is written lazily to the file
+ when `write_file` method is called. The `path` is validated against the
+ `valid_dataset_paths` if provided before adding the data.
+
+ `params` should be a dictionary containing `data`. Optionally,
+ it can also contain `archive_path` and `internal_reference`:
+ {
+ 'data': Any,
+ 'archive_path': str,
+ 'internal_reference': bool,
+ }
+
+ Args:
+ path (str): The dataset path to be used in the HDF5 file.
+ params (dict): The dataset parameters.
+ validate_path (bool): If True, the dataset path is validated.
+ """
+ if not params:
+ self.logger.warning(f'No params provided for path "{path}". Skipping.')
+ return
+
+ dataset = DatasetModel(
+ **params,
+ )
+ if dataset.data is None:
+ self.logger.warning(f'No data provided for the path "{path}". Skipping.')
+ return
+ if (
+ validate_path
+ and self.valid_dataset_paths
+ and path not in self.valid_dataset_paths
+ ):
+ self.logger.warning(f'Invalid dataset path "{path}". Skipping.')
+ return
+
+ # handle the pint.Quantity and add data
+ if isinstance(dataset.data, pint.Quantity):
+ self.add_attribute(
+ path=path,
+ params=dict(
+ units=str(dataset.data.units),
+ ),
+ )
+ dataset.data = dataset.data.magnitude
+
+ self._hdf5_datasets[path] = dataset
+ if dataset.archive_path:
+ self._hdf5_path_map[dataset.archive_path] = path
+
+ def add_attribute(
+ self,
+ path: str,
+ params: dict,
+ ):
+ """
+ Add an attribute to the dataset or group at the given path. The attribute is
+ written lazily to the file when `write_file` method is called.
+
+ Args:
+ path (str): The dataset or group path in the HDF5 file.
+ params (dict): The attributes to be added.
+ """
+ if not params:
+ self.logger.warning(f'No params provided for attribute {path}.')
+ return
+ self._hdf5_attributes[path] = params
+
+ def read_dataset(self, path: str, is_archive_path: bool = False):
+ """
+ Returns the dataset at the given path. If the quantity has `units` as an
+ attribute, tries to returns a `pint.Quantity`.
+ If the dataset available in the `self._hdf5_datasets`, it is returned directly.
+
+ Args:
+ path (str): The dataset path in the HDF5 file.
+ is_archive_path (bool): If True, the path is resolved from the archive path.
+ """
+ if path is None:
+ return
+ if is_archive_path and path in self._hdf5_path_map:
+ path = self._hdf5_path_map[path]
+ if path is None:
+ return
+ if '#' not in path:
+ file_path, dataset_path = None, path
+ else:
+ file_path, dataset_path = path.rsplit('#', 1)
+
+ # find path in the instance variables
+ value = None
+ if dataset_path in self._hdf5_datasets:
+ value = self._hdf5_datasets[dataset_path].data
+ if dataset_path in self._hdf5_attributes:
+ if units := self._hdf5_attributes[dataset_path].get('units'):
+ value *= ureg(units)
+ return value
+
+ # find path in the HDF5 file
+ if file_path:
+ file_name = file_path.rsplit('/raw/', 1)[1]
+ with h5py.File(self.archive.m_context.raw_file(file_name, 'rb')) as h5:
+ if dataset_path not in h5:
+ self.logger.warning(f'Dataset "{dataset_path}" not found.')
+ else:
+ value = h5[dataset_path][...]
+ try:
+ units = h5[dataset_path].attrs['units']
+ value *= ureg(units)
+ except KeyError:
+ pass
+ return value
+
+ return None
+
+ def write_file(self):
+ """
+ Method for creating an auxiliary file to store big data arrays outside the
+ main archive file (e.g. HDF5, NeXus).
+ """
+ if self.nexus:
+ try:
+ self._write_nx_file()
+ except Exception as e:
+ self.nexus = False
+ self.logger.warning(
+ f"""NeXusFileGenerationError: Encountered '{e}' error while creating
+ nexus file. Creating h5 file instead."""
+ )
+ self._write_hdf5_file()
+ else:
+ self._write_hdf5_file()
+
+ self.set_hdf5_references()
+
+ def _write_nx_file(self):
+ """
+ Method for creating a NeXus file. Additional data from the archive is added
+ to the `hdf5_data_dict` before creating the nexus file. This provides a NeXus
+ view of the data in addition to storing array data.
+ """
+
+ app_def = self.nexus_dataset_map.get('/ENTRY[entry]/definition')
+ nxdl_root, nxdl_f_path = get_nxdl_root_and_path(app_def)
+ template = Template()
+ generate_template_from_nxdl(nxdl_root, template)
+ attr_dict = {}
+ dataset_dict = {}
+ self.populate_nx_dataset_and_attribute(
+ attr_dict=attr_dict, dataset_dict=dataset_dict
+ )
+ for nx_path, dset_original in list(self._hdf5_datasets.items()) + list(
+ dataset_dict.items()
+ ):
+ dset = copy.deepcopy(dset_original)
+ if dset.internal_reference:
+ # convert to the nexus type link
+ dset.data = {'link': self._remove_nexus_annotations(dset.data)}
+
+ try:
+ template[nx_path] = dset.data
+ except KeyError:
+ template['optional'][nx_path] = dset.data
+
+ for nx_path, attr_d in list(self._hdf5_attributes.items()) + list(
+ attr_dict.items()
+ ):
+ for attr_k, attr_v in attr_d.items():
+ if attr_v != 'dimensionless' and attr_v:
+ try:
+ template[f'{nx_path}/@{attr_k}'] = attr_v
+ except KeyError:
+ template['optional'][f'{nx_path}/@{attr_k}'] = attr_v
+
+ nx_full_file_path = os.path.join(
+ self.archive.m_context.raw_path(), self.filename
+ )
+
+ pynxtools_writer(
+ data=template, nxdl_f_path=nxdl_f_path, output_path=nx_full_file_path
+ ).write()
+ self.archive.m_context.process_updated_raw_file(
+ self.filename, allow_modify=True
+ )
+
+ def _write_hdf5_file(self): # noqa: PLR0912
+ """
+ Method for creating an HDF5 file.
+ """
+ if self.filename.endswith('.nxs'):
+ self.filename = self.filename.replace('.nxs', '.h5')
+ if not self._hdf5_datasets and not self._hdf5_attributes:
+ return
+ # remove the nexus annotations from the dataset paths if any
+ tmp_dict = {}
+ for key, value in self._hdf5_datasets.items():
+ new_key = self._remove_nexus_annotations(key)
+ tmp_dict[new_key] = value
+ self._hdf5_datasets = tmp_dict
+ tmp_dict = {}
+ for key, value in self._hdf5_attributes.items():
+ tmp_dict[self._remove_nexus_annotations(key)] = value
+ self._hdf5_attributes = tmp_dict
+
+ # create the HDF5 file
+ mode = 'r+b' if self.archive.m_context.raw_path_exists(self.filename) else 'wb'
+ with h5py.File(self.archive.m_context.raw_file(self.filename, mode), 'a') as h5:
+ for key, value in self._hdf5_datasets.items():
+ data = value.data
+ if value.internal_reference:
+ # resolve the internal reference
+ try:
+ data = h5[self._remove_nexus_annotations(value.data)]
+ except KeyError:
+ self.logger.warning(
+ f'Internal reference "{value.data}" not found. Skipping.'
+ )
+ continue
+
+ group_name, dataset_name = key.rsplit('/', 1)
+ group = h5.require_group(group_name)
+
+ if key in h5:
+ # remove the existing dataset if any
+ del h5[key]
+
+ if value.internal_reference:
+ # create a hard link to the existing dataset
+ group[dataset_name] = data
+ else:
+ # create the dataset
+ group.create_dataset(
+ name=dataset_name,
+ data=data,
+ )
+ for key, value in self._hdf5_attributes.items():
+ if key in h5:
+ h5[key].attrs.update(value)
+ else:
+ self.logger.warning(f'Path "{key}" not found to add attribute.')
+
+ def set_hdf5_references(self):
+ """
+ Method for adding the HDF5 references to the archive quantities.
+ """
+ for key, value in self._hdf5_datasets.items():
+ if value.archive_path:
+ reference = self._remove_nexus_annotations(key)
+ self._set_hdf5_reference(
+ self.archive,
+ value.archive_path,
+ f'/uploads/{self.archive.m_context.upload_id}/raw'
+ f'/{self.filename}#{reference}',
+ )
+
+ def populate_nx_dataset_and_attribute(self, attr_dict: dict, dataset_dict: dict):
+ """Construct datasets and attributes for nexus and populate.
+
+ The common hdf5 datasets and attributes will be extended with
+ nexus specific concepts which are not part of the common hdf5 file
+ such as `signal` attrubute of NXdata.
+ """
+
+ for nx_path, arch_path in self.nexus_dataset_map.items():
+ if nx_path in self._hdf5_datasets or nx_path in self._hdf5_attributes:
+ continue
+ if arch_path.startswith('archive.'):
+ data = resolve_path(self.archive, arch_path.split('archive.', 1)[1])
+ else:
+ data = arch_path # default value
+
+ dataset = DatasetModel(data=data)
+
+ if (
+ isinstance(data, pint.Quantity)
+ and str(data.units) != 'dimensionless'
+ and str(data.units)
+ ):
+ attr_tmp = {nx_path: dict(units=str(data.units))}
+ attr_dict |= attr_tmp
+ dataset.data = data.magnitude
+
+ l_part, r_part = nx_path.rsplit('/', 1)
+ if r_part.startswith('@'):
+ attr_dict[l_part] = {r_part.replace('@', ''): data}
+ else:
+ dataset_dict[nx_path] = dataset
+
+ @staticmethod
+ def _remove_nexus_annotations(path: str) -> str:
+ """
+ Remove the nexus related annotations from the dataset path.
+ For e.g.,
+ '/ENTRY[entry]/experiment_result/intensity' ->
+ '/entry/experiment_result/intensity'
+
+ Args:
+ path (str): The dataset path with nexus annotations.
+
+ Returns:
+ str: The dataset path without nexus annotations.
+ """
+ if not path:
+ return path
+
+ pattern = r'.*\[.*\]'
+ return ''.join(
+ (
+ '/' + part.split('[')[0].strip().lower()
+ if re.match(pattern, part)
+ else f'/{part}'
+ )
+ for part in path.split('/')[1:]
+ )
+
+ @staticmethod
+ def _set_hdf5_reference(
+ section: 'ArchiveSection' = None, path: str = None, ref: str = None
+ ):
+ """
+ Method for setting a HDF5Reference quantity in a section.
+ For example, one can set the reference for a quantity path like
+ `data.results[0].intensity`.
+ In case the section is not initialized, the method returns without setting
+ the reference.
+
+ Args:
+ section (Section): The NOMAD section containing the quantity.
+ path (str): The path to the quantity.
+ ref (str): The reference to the HDF5 dataset.
+ """
+ if not section or not path or not ref:
+ return
+
+ section_path, quantity_name = path.rsplit('.', 1)
+ resolved_section = resolve_path(section, section_path)
+
+ if resolved_section and isinstance(
+ resolved_section.m_get_quantity_definition(quantity_name).type,
+ HDF5Reference,
+ ):
+ resolved_section.m_set(quantity_name, ref)
+
+
+def resolve_path(section: 'ArchiveSection', path: str, logger: 'BoundLogger' = None):
+ """
+ Resolves the attribute path within the given NOMAD section.
+
+ Args:
+ section (ArchiveSection): The NOMAD section.
+ path (str): The dot-separated path to the attribute.
+ logger (BoundLogger): A structlog logger.
+
+ Returns:
+ The resolved section or attribute or None if not found.
+ """
+ attr = section
+ parts = path.split('.')
+ try:
+ for part in parts:
+ attr_path = part
+ if re.match(r'.*\[.*\]', attr_path):
+ attr_path, index = part[:-1].split('[')
+ index = int(index)
+ else:
+ index = None
+ attr = attr.m_get(attr_path, index=index)
+ except (KeyError, ValueError, AttributeError) as e:
+ if logger:
+ logger.error(
+ f'Unable to resolve part "{part}" of the given path "{path}". '
+ f'Encountered error "{e}".'
+ )
+ return None
+
+ return attr
diff --git a/src/nomad_measurements/xrd/nx.py b/src/nomad_measurements/xrd/nx.py
new file mode 100644
index 00000000..9b0d6944
--- /dev/null
+++ b/src/nomad_measurements/xrd/nx.py
@@ -0,0 +1,69 @@
+#
+# Copyright The NOMAD Authors.
+#
+# This file is part of NOMAD. See https://nomad-lab.eu for further info.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The following connects the nexus file paths to the archive paths.
+The nexus file paths come from the nexus_definitions available at:
+https://github.com/FAIRmat-NFDI/nexus_definitions/ in the following file:
+`contributed_definitions/NXxrd_pan.nxdl.xml`.
+The archive paths are the paths in the NOMAD archive defined in the class:
+`nomad_measurement.xrd.schema.ELNXRayDiffraction`.
+"""
+
+NEXUS_DATASET_MAP = {
+ '/ENTRY[entry]/@default': 'experiment_result',
+ '/ENTRY[entry]/definition': 'NXxrd_pan',
+ '/ENTRY[entry]/experiment_result/intensity': 'archive.data.results[0].intensity',
+ '/ENTRY[entry]/experiment_result/two_theta': 'archive.data.results[0].two_theta',
+ '/ENTRY[entry]/experiment_result/omega': 'archive.data.results[0].omega',
+ '/ENTRY[entry]/experiment_result/chi': 'archive.data.results[0].chi',
+ '/ENTRY[entry]/experiment_result/phi': 'archive.data.results[0].phi',
+ '/ENTRY[entry]/experiment_result/q_norm': 'archive.data.results[0].q_norm',
+ '/ENTRY[entry]/experiment_result/q_parallel': 'archive.data.results[0].q_parallel',
+ '/ENTRY[entry]/experiment_result/q_perpendicular': (
+ 'archive.data.results[0].q_perpendicular'
+ ),
+ '/ENTRY[entry]/method': 'archive.data.method',
+ '/ENTRY[entry]/measurement_type': 'archive.data.diffraction_method_name',
+ '/ENTRY[entry]/experiment_result/@signal': 'intensity',
+ '/ENTRY[entry]/experiment_config/count_time': 'archive.data.results[0].count_time',
+ '/ENTRY[entry]/INSTRUMENT[instrument]/DETECTOR[detector]/scan_axis': (
+ 'archive.data.results[0].scan_axis'
+ ),
+ '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/xray_tube_material': (
+ 'archive.data.xrd_settings.source.xray_tube_material'
+ ),
+ '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/xray_tube_current': (
+ 'archive.data.xrd_settings.source.xray_tube_current'
+ ),
+ '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/xray_tube_voltage': (
+ 'archive.data.xrd_settings.source.xray_tube_voltage'
+ ),
+ '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/k_alpha_one': (
+ 'archive.data.xrd_settings.source.kalpha_one'
+ ),
+ '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/k_alpha_two': (
+ 'archive.data.xrd_settings.source.kalpha_two'
+ ),
+ '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/ratio_k_alphatwo_k_alphaone': (
+ 'archive.data.xrd_settings.source.ratio_kalphatwo_kalphaone'
+ ),
+ '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/kbeta': (
+ 'archive.data.xrd_settings.source.kbeta'
+ ),
+}
diff --git a/src/nomad_measurements/xrd/schema.py b/src/nomad_measurements/xrd/schema.py
index c6a2e3df..fc0a2b40 100644
--- a/src/nomad_measurements/xrd/schema.py
+++ b/src/nomad_measurements/xrd/schema.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
from typing import (
TYPE_CHECKING,
Any,
@@ -22,19 +23,27 @@
)
import numpy as np
+import pint
import plotly.express as px
from fairmat_readers_xrd import (
read_bruker_brml,
read_panalytical_xrdml,
read_rigaku_rasx,
)
+from nomad.config import config
from nomad.datamodel.data import (
ArchiveSection,
EntryData,
)
+from nomad.datamodel.hdf5 import (
+ HDF5Reference,
+)
from nomad.datamodel.metainfo.annotations import (
ELNAnnotation,
ELNComponentEnum,
+ Filter,
+ H5WebAnnotation,
+ SectionProperties,
)
from nomad.datamodel.metainfo.basesections import (
CompositeSystemReference,
@@ -42,10 +51,7 @@
MeasurementResult,
ReadableIdentifiers,
)
-from nomad.datamodel.metainfo.plot import (
- PlotlyFigure,
- PlotSection,
-)
+from nomad.datamodel.metainfo.plot import PlotlyFigure
from nomad.datamodel.results import (
DiffractionPattern,
MeasurementMethod,
@@ -67,72 +73,35 @@
from nomad_measurements.general import (
NOMADMeasurementsCategory,
)
-from nomad_measurements.utils import get_bounding_range_2d, merge_sections
+from nomad_measurements.utils import (
+ HDF5Handler,
+ get_bounding_range_2d,
+ get_entry_id_from_file_name,
+ get_reference,
+ merge_sections,
+)
+
+# from nomad_measurements.xrd.nx import NEXUS_DATASET_MAP
if TYPE_CHECKING:
- import pint
from nomad.datamodel.datamodel import (
EntryArchive,
)
- from pynxtools.dataconverter.template import Template
from structlog.stdlib import (
BoundLogger,
)
-from nomad.config import config
configuration = config.get_plugin_entry_point('nomad_measurements.xrd:schema')
m_package = SchemaPackage(aliases=['nomad_measurements.xrd.parser.parser'])
-def populate_nexus_subsection(**kwargs):
- raise NotImplementedError
-
-
-def handle_nexus_subsection(
- xrd_template: 'Template',
- nexus_out: str,
- archive: 'EntryArchive',
- logger: 'BoundLogger',
-):
- """
- Function for populating the NeXus section from the xrd_template.
-
- Args:
- xrd_template (Template): The xrd data in a NeXus Template.
- nexus_out (str): The name of the optional NeXus output file.
- archive (EntryArchive): The archive containing the section.
- logger (BoundLogger): A structlog logger.
- """
- nxdl_name = 'NXxrd_pan'
- if nexus_out:
- if not nexus_out.endswith('.nxs'):
- nexus_out = nexus_out + '.nxs'
- populate_nexus_subsection(
- template=xrd_template,
- app_def=nxdl_name,
- archive=archive,
- logger=logger,
- output_file_path=nexus_out,
- on_temp_file=False,
- )
- else:
- populate_nexus_subsection(
- template=xrd_template,
- app_def=nxdl_name,
- archive=archive,
- logger=logger,
- output_file_path=nexus_out,
- on_temp_file=True,
- )
-
-
def calculate_two_theta_or_q(
- wavelength: 'pint.Quantity',
- q: 'pint.Quantity' = None,
- two_theta: 'pint.Quantity' = None,
-) -> tuple['pint.Quantity', 'pint.Quantity']:
+ wavelength: pint.Quantity,
+ q: pint.Quantity = None,
+ two_theta: pint.Quantity = None,
+) -> tuple[pint.Quantity, pint.Quantity]:
"""
Calculate the two-theta array from the scattering vector (q) or vice-versa,
given the wavelength of the X-ray source.
@@ -154,10 +123,10 @@ def calculate_two_theta_or_q(
return q, two_theta
-def calculate_q_vectors_RSM(
- wavelength: 'pint.Quantity',
- two_theta: 'pint.Quantity',
- omega: 'pint.Quantity',
+def calculate_q_vectors_rsm(
+ wavelength: pint.Quantity,
+ two_theta: pint.Quantity,
+ omega: pint.Quantity,
):
"""
Calculate the q-vectors for RSM scans in coplanar configuration.
@@ -306,6 +275,237 @@ class XRDSettings(ArchiveSection):
source = SubSection(section_def=XRayTubeSource)
+class PlotIntensity(ArchiveSection):
+ """
+ Section for plotting the intensity over 2-theta. A separate sub-section allows to
+ create a separate group in `.h5` file. Attributes are added to the group to generate
+ the plot.
+ """
+
+ m_def = Section(
+ a_h5web=H5WebAnnotation(
+ axes=['two_theta', 'omega', 'phi', 'chi'], signal='intensity'
+ )
+ )
+ intensity = Quantity(
+ type=HDF5Reference,
+ description='The count at each 2-theta value, dimensionless',
+ )
+ two_theta = Quantity(
+ type=HDF5Reference,
+ description='The 2-theta range of the diffractogram',
+ )
+ omega = Quantity(
+ type=HDF5Reference,
+ description='The omega range of the diffractogram',
+ )
+ phi = Quantity(
+ type=HDF5Reference,
+ description='The phi range of the diffractogram',
+ )
+ chi = Quantity(
+ type=HDF5Reference,
+ description='The chi range of the diffractogram',
+ )
+
+ def normalize(self, archive, logger):
+ super().normalize(archive, logger)
+ prefix = '/ENTRY[entry]/experiment_result'
+ try:
+ hdf5_handler = self.m_parent.m_parent.hdf5_handler
+ assert isinstance(hdf5_handler, HDF5Handler)
+ except (AttributeError, AssertionError):
+ return
+
+ hdf5_handler.add_dataset(
+ path=f'{prefix}/plot_intensity/two_theta',
+ params=dict(
+ data=f'{prefix}/two_theta',
+ archive_path='data.results[0].plot_intensity.two_theta',
+ internal_reference=True,
+ ),
+ validate_path=False,
+ )
+ hdf5_handler.add_dataset(
+ path=f'{prefix}/plot_intensity/intensity',
+ params=dict(
+ data=f'{prefix}/intensity',
+ archive_path='data.results[0].plot_intensity.intensity',
+ internal_reference=True,
+ ),
+ validate_path=False,
+ )
+ hdf5_handler.add_attribute(
+ path=f'{prefix}/plot_intensity',
+ params=dict(
+ axes='two_theta',
+ signal='intensity',
+ NX_class='NXdata',
+ ),
+ )
+ if isinstance(self.m_parent, XRDResult1DHDF5):
+ return
+
+ for var_axis in ['omega', 'phi', 'chi']:
+ var_axis_data = hdf5_handler.read_dataset(
+ path=f'data.results[0].{var_axis}',
+ is_archive_path=True,
+ )
+ if var_axis_data is not None:
+ hdf5_handler.add_dataset(
+ path=f'{prefix}/plot_intensity/{var_axis}',
+ params=dict(
+ data=f'{prefix}/{var_axis}',
+ archive_path=f'data.results[0].plot_intensity.{var_axis}',
+ internal_reference=True,
+ ),
+ validate_path=False,
+ )
+ hdf5_handler.add_attribute(
+ path=f'{prefix}/plot_intensity',
+ params=dict(
+ axes=[var_axis, 'two_theta'],
+ signal='intensity',
+ NX_class='NXdata',
+ ),
+ )
+ break
+
+
+class PlotIntensityScatteringVector(ArchiveSection):
+ """
+ Section for plotting the intensity over scattering vector. A separate sub-section
+ allows to create a separate group in `.h5` file. Attributes are added to the group
+ to generate the plot.
+ """
+
+ m_def = Section(
+ a_h5web=H5WebAnnotation(
+ axes=['q_parallel', 'q_perpendicular', 'q_norm'], signal='intensity'
+ )
+ )
+ intensity = Quantity(
+ type=HDF5Reference,
+ description="""
+ The count at each q value. In case of RSM, it contains interpolated values of
+ `intensity` at regularized grid of `q` vectors.
+ """,
+ )
+ q_norm = Quantity(
+ type=HDF5Reference,
+ description='The q range of the diffractogram',
+ )
+ q_parallel = Quantity(
+ type=HDF5Reference,
+ description='The regularized grid of `q_parallel` range for plotting.',
+ )
+ q_perpendicular = Quantity(
+ type=HDF5Reference,
+ description='The regularized grid of `q_perpendicular` range for plotting.',
+ )
+
+ def normalize(self, archive, logger):
+ super().normalize(archive, logger)
+ prefix = '/ENTRY[entry]/experiment_result'
+ try:
+ hdf5_handler = self.m_parent.m_parent.hdf5_handler
+ assert isinstance(hdf5_handler, HDF5Handler)
+ except (AttributeError, AssertionError):
+ return
+
+ intensity = hdf5_handler.read_dataset(
+ path='data.results[0].intensity',
+ is_archive_path=True,
+ )
+ q_norm = hdf5_handler.read_dataset(
+ path='data.results[0].q_norm',
+ is_archive_path=True,
+ )
+ q_parallel = hdf5_handler.read_dataset(
+ path='data.results[0].q_parallel',
+ is_archive_path=True,
+ )
+ q_perpendicular = hdf5_handler.read_dataset(
+ path='data.results[0].q_perpendicular',
+ is_archive_path=True,
+ )
+
+ if q_norm is not None:
+ hdf5_handler.add_dataset(
+ path=f'{prefix}/plot_intensity_scattering_vector/intensity',
+ params=dict(
+ data=f'{prefix}/intensity',
+ archive_path='data.results[0].plot_intensity_scattering_vector.intensity',
+ internal_reference=True,
+ ),
+ validate_path=False,
+ )
+ hdf5_handler.add_dataset(
+ path=f'{prefix}/plot_intensity_scattering_vector/q_norm',
+ params=dict(
+ data=f'{prefix}/q_norm',
+ archive_path='data.results[0].plot_intensity_scattering_vector.q_norm',
+ internal_reference=True,
+ ),
+ validate_path=False,
+ )
+ hdf5_handler.add_attribute(
+ path=f'{prefix}/plot_intensity_scattering_vector',
+ params=dict(
+ axes='q_norm',
+ signal='intensity',
+ NX_class='NXdata',
+ ),
+ )
+ elif q_parallel is not None and q_perpendicular is not None:
+ # q_vectors lead to irregular grid
+ # generate a regular grid using interpolation
+ x = q_parallel.to('1/angstrom').magnitude.flatten()
+ y = q_perpendicular.to('1/angstrom').magnitude.flatten()
+ x_regular = np.linspace(x.min(), x.max(), intensity.shape[0])
+ y_regular = np.linspace(y.min(), y.max(), intensity.shape[1])
+ x_grid, y_grid = np.meshgrid(x_regular, y_regular)
+ z_interpolated = griddata(
+ points=(x, y),
+ values=intensity.flatten(),
+ xi=(x_grid, y_grid),
+ method='linear',
+ fill_value=intensity.min(),
+ )
+ hdf5_handler.add_dataset(
+ path=f'{prefix}/plot_intensity_scattering_vector/q_parallel',
+ params=dict(
+ data=x_regular,
+ archive_path='data.results[0].plot_intensity_scattering_vector.q_parallel',
+ ),
+ validate_path=False,
+ )
+ hdf5_handler.add_dataset(
+ path=f'{prefix}/plot_intensity_scattering_vector/q_perpendicular',
+ params=dict(
+ data=y_regular,
+ archive_path='data.results[0].plot_intensity_scattering_vector.q_perpendicular',
+ ),
+ validate_path=False,
+ )
+ hdf5_handler.add_dataset(
+ path=f'{prefix}/plot_intensity_scattering_vector/intensity',
+ params=dict(
+ data=z_interpolated,
+ archive_path='data.results[0].plot_intensity_scattering_vector.intensity',
+ ),
+ validate_path=False,
+ )
+ hdf5_handler.add_attribute(
+ path=f'{prefix}/plot_intensity_scattering_vector',
+ params=dict(
+ axes=['q_perpendicular', 'q_parallel'],
+ signal='intensity',
+ NX_class='NXdata',
+ ),
+ )
+
+
class XRDResult(MeasurementResult):
"""
Section containing the result of an X-ray diffraction scan.
@@ -384,9 +584,17 @@ class XRDResult1D(XRDResult):
Section containing the result of a 1D X-ray diffraction scan.
"""
- m_def = Section()
+ m_def = Section(
+ a_eln=ELNAnnotation(
+ properties=SectionProperties(
+ visible=Filter(
+ exclude=['array_index'],
+ ),
+ ),
+ )
+ )
- def generate_plots(self, archive: 'EntryArchive', logger: 'BoundLogger'):
+ def generate_plots(self):
"""
Plot the 1D diffractogram.
@@ -569,7 +777,15 @@ class XRDResultRSM(XRDResult):
Section containing the result of a Reciprocal Space Map (RSM) scan.
"""
- m_def = Section()
+ m_def = Section(
+ a_eln=ELNAnnotation(
+ properties=SectionProperties(
+ visible=Filter(
+ exclude=['array_index'],
+ ),
+ ),
+ )
+ )
q_parallel = Quantity(
type=np.dtype(np.float64),
shape=['*', '*'],
@@ -589,7 +805,7 @@ class XRDResultRSM(XRDResult):
description='The count at each position, dimensionless',
)
- def generate_plots(self, archive: 'EntryArchive', logger: 'BoundLogger'):
+ def generate_plots(self):
"""
Plot the 2D RSM diffractogram.
@@ -771,7 +987,7 @@ def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger'):
self[var_axis] is not None
and len(np.unique(self[var_axis].magnitude)) > 1
):
- self.q_parallel, self.q_perpendicular = calculate_q_vectors_RSM(
+ self.q_parallel, self.q_perpendicular = calculate_q_vectors_rsm(
wavelength=self.source_peak_wavelength,
two_theta=self.two_theta * np.ones_like(self.intensity),
omega=self[var_axis],
@@ -779,139 +995,783 @@ def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger'):
break
-class XRayDiffraction(Measurement):
+class XRDResult1DHDF5(XRDResult):
"""
- Generic X-ray diffraction measurement.
+ Section containing the result of a 1D X-ray diffraction scan.
"""
- m_def = Section()
- method = Quantity(
- type=str,
- default='X-Ray Diffraction (XRD)',
+ m_def = Section(
+ a_eln=ELNAnnotation(
+ properties=SectionProperties(
+ visible=Filter(
+ exclude=['array_index'],
+ ),
+ ),
+ )
)
- xrd_settings = SubSection(
- section_def=XRDSettings,
+ intensity = Quantity(
+ type=HDF5Reference,
+ description='The count at each 2-theta value, dimensionless',
+ shape=[],
)
- diffraction_method_name = Quantity(
- type=MEnum(
- [
- 'Powder X-Ray Diffraction (PXRD)',
- 'Single Crystal X-Ray Diffraction (SCXRD)',
- 'High-Resolution X-Ray Diffraction (HRXRD)',
- 'Small-Angle X-Ray Scattering (SAXS)',
- 'X-Ray Reflectivity (XRR)',
- 'Grazing Incidence X-Ray Diffraction (GIXRD)',
- 'Reciprocal Space Mapping (RSM)',
- ]
- ),
- description="""
- The diffraction method used to obtain the diffraction pattern.
- | X-ray Diffraction Method | Description |
- |------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
- | **Powder X-ray Diffraction (PXRD)** | The term "powder" refers more to the random orientation of small crystallites than to the physical form of the sample. Can be used with non-powder samples if they present random crystallite orientations. |
- | **Single Crystal X-ray Diffraction (SCXRD)** | Used for determining the atomic structure of a single crystal. |
- | **High-Resolution X-ray Diffraction (HRXRD)** | A technique typically used for detailed characterization of epitaxial thin films using precise diffraction measurements. |
- | **Small-Angle X-ray Scattering (SAXS)** | Used for studying nanostructures in the size range of 1-100 nm. Provides information on particle size, shape, and distribution. |
- | **X-ray Reflectivity (XRR)** | Used to study thin film layers, interfaces, and multilayers. Provides info on film thickness, density, and roughness. |
- | **Grazing Incidence X-ray Diffraction (GIXRD)** | Primarily used for the analysis of thin films with the incident beam at a fixed shallow angle. |
- | **Reciprocal Space Mapping (RSM)** | High-resolution XRD method to measure diffracted intensity in a 2-dimensional region of reciprocal space. Provides information about the real-structure (lattice mismatch, domain structure, stress and defects) in single-crystalline and epitaxial samples.|
- """, # noqa: E501
+ two_theta = Quantity(
+ type=HDF5Reference,
+ description='The 2-theta range of the diffractogram',
+ shape=[],
+ )
+ q_norm = Quantity(
+ type=HDF5Reference,
+ description='The norm of scattering vector *Q* of the diffractogram',
+ shape=[],
+ )
+ omega = Quantity(
+ type=HDF5Reference,
+ description='The omega range of the diffractogram',
+ shape=[],
+ )
+ phi = Quantity(
+ type=HDF5Reference,
+ description='The phi range of the diffractogram',
+ shape=[],
+ )
+ chi = Quantity(
+ type=HDF5Reference,
+ description='The chi range of the diffractogram',
+ shape=[],
+ )
+ integration_time = Quantity(
+ type=HDF5Reference,
+ description='Integration time per channel',
+ shape=[],
+ )
+ plot_intensity = SubSection(section_def=PlotIntensity)
+ plot_intensity_scattering_vector = SubSection(
+ section_def=PlotIntensityScatteringVector
)
- results = Measurement.results.m_copy()
- results.section_def = XRDResult
- def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger'):
+ def generate_plots(self):
"""
- The normalize function of the `XRayDiffraction` section.
+ Plot the 1D diffractogram.
Args:
archive (EntryArchive): The archive containing the section that is being
normalized.
logger (BoundLogger): A structlog logger.
- """
- super().normalize(archive, logger)
- if (
- self.xrd_settings is not None
- and self.xrd_settings.source is not None
- and self.xrd_settings.source.kalpha_one is not None
- ):
- for result in self.results:
- if result.source_peak_wavelength is None:
- result.source_peak_wavelength = self.xrd_settings.source.kalpha_one
- result.normalize(archive, logger)
- if not archive.results:
- archive.results = Results()
- if not archive.results.properties:
- archive.results.properties = Properties()
- if not archive.results.properties.structural:
- diffraction_patterns = []
- for result in self.results:
- if len(result.intensity.shape) == 1:
- diffraction_patterns.append(
- DiffractionPattern(
- incident_beam_wavelength=result.source_peak_wavelength,
- two_theta_angles=result.two_theta,
- intensity=result.intensity,
- q_vector=result.q_norm,
- )
- )
- archive.results.properties.structural = StructuralProperties(
- diffraction_pattern=diffraction_patterns
- )
- if not archive.results.method:
- archive.results.method = Method(
- method_name='XRD',
- measurement=MeasurementMethod(
- xrd=XRDMethod(diffraction_method_name=self.diffraction_method_name)
- ),
- )
+ Returns:
+ (dict, dict): line_linear, line_log
+ """
+ plots = []
-class ELNXRayDiffraction(XRayDiffraction, EntryData, PlotSection):
- """
- Example section for how XRayDiffraction can be implemented with a general reader for
- common XRD file types.
- """
+ try:
+ hdf5_handler = self.m_parent.hdf5_handler
+ assert isinstance(hdf5_handler, HDF5Handler)
+ except (AttributeError, AssertionError):
+ return plots
- m_def = Section(
- categories=[NOMADMeasurementsCategory],
- label='X-Ray Diffraction (XRD)',
- a_eln=ELNAnnotation(
- lane_width='800px',
- hide=['generate_nexus_file'],
- ),
- a_template={
- 'measurement_identifiers': {},
- },
- )
- data_file = Quantity(
- type=str,
- description='Data file containing the diffractogram',
- a_eln=ELNAnnotation(
- component=ELNComponentEnum.FileEditQuantity,
- ),
- )
- measurement_identifiers = SubSection(
- section_def=ReadableIdentifiers,
- )
- diffraction_method_name = XRayDiffraction.diffraction_method_name.m_copy()
- diffraction_method_name.m_annotations['eln'] = ELNAnnotation(
- component=ELNComponentEnum.EnumEditQuantity,
- )
- generate_nexus_file = Quantity(
- type=bool,
- description='Whether or not to generate a NeXus output file (if possible).',
- a_eln=ELNAnnotation(
- component=ELNComponentEnum.BoolEditQuantity,
- label='Generate NeXus file',
- ),
- )
+ two_theta = hdf5_handler.read_dataset(
+ path='data.results[0].two_theta',
+ is_archive_path=True,
+ )
+ intensity = hdf5_handler.read_dataset(
+ path='data.results[0].intensity',
+ is_archive_path=True,
+ )
+ if two_theta is None or intensity is None:
+ return plots
- def get_read_write_functions(self) -> tuple[Callable, Callable]:
- """
+ x = two_theta.to('degree').magnitude
+ y = intensity.magnitude
+ fig_line_linear = px.line(
+ x=x,
+ y=y,
+ )
+ fig_line_linear.update_layout(
+ title={
+ 'text': 'Intensity over 2θ (linear scale)',
+ 'x': 0.5,
+ 'xanchor': 'center',
+ },
+ xaxis_title='2θ (°)',
+ yaxis_title='Intensity',
+ xaxis=dict(
+ fixedrange=False,
+ ),
+ yaxis=dict(
+ fixedrange=False,
+ ),
+ template='plotly_white',
+ hovermode='closest',
+ hoverlabel=dict(
+ bgcolor='white',
+ ),
+ dragmode='zoom',
+ width=600,
+ height=600,
+ )
+ fig_line_linear.update_traces(
+ hovertemplate='Intensity: %{y:.2f}
2θ: %{x}°',
+ )
+ plot_json = fig_line_linear.to_plotly_json()
+ plot_json['config'] = dict(
+ scrollZoom=False,
+ )
+ plots.append(
+ PlotlyFigure(
+ label='Intensity over 2θ (linear scale)',
+ index=1,
+ figure=plot_json,
+ )
+ )
+
+ fig_line_log = px.line(
+ x=x,
+ y=y,
+ log_y=True,
+ )
+ fig_line_log.update_layout(
+ title={
+ 'text': 'Intensity over 2θ (log scale)',
+ 'x': 0.5,
+ 'xanchor': 'center',
+ },
+ xaxis_title='2θ (°)',
+ yaxis_title='Intensity',
+ xaxis=dict(
+ fixedrange=False,
+ ),
+ yaxis=dict(
+ fixedrange=False,
+ ),
+ template='plotly_white',
+ hovermode='closest',
+ hoverlabel=dict(
+ bgcolor='white',
+ ),
+ dragmode='zoom',
+ width=600,
+ height=600,
+ )
+ fig_line_log.update_traces(
+ hovertemplate='Intensity: %{y:.2f}
2θ: %{x}°',
+ )
+ plot_json = fig_line_log.to_plotly_json()
+ plot_json['config'] = dict(
+ scrollZoom=False,
+ )
+ plots.append(
+ PlotlyFigure(
+ label='Intensity over 2θ (log scale)',
+ index=0,
+ figure=plot_json,
+ )
+ )
+
+ q_norm = hdf5_handler.read_dataset(
+ path='data.results[0].q_norm',
+ is_archive_path=True,
+ )
+ if q_norm is None:
+ return plots
+
+ x = q_norm.to('1/angstrom').magnitude
+ fig_line_log = px.line(
+ x=x,
+ y=y,
+ log_y=True,
+ )
+ fig_line_log.update_layout(
+ title={
+ 'text': 'Intensity over |q| (log scale)',
+ 'x': 0.5,
+ 'xanchor': 'center',
+ },
+ xaxis_title='|q| (Å-1)',
+ yaxis_title='Intensity',
+ xaxis=dict(
+ fixedrange=False,
+ ),
+ yaxis=dict(
+ fixedrange=False,
+ ),
+ template='plotly_white',
+ hovermode='closest',
+ hoverlabel=dict(
+ bgcolor='white',
+ ),
+ dragmode='zoom',
+ width=600,
+ height=600,
+ )
+ fig_line_log.update_traces(
+ hovertemplate=(
+ 'Intensity: %{y:.2f}
|q|: %{x} Å-1'
+ ),
+ )
+ plot_json = fig_line_log.to_plotly_json()
+ plot_json['config'] = dict(
+ scrollZoom=False,
+ )
+ plots.append(
+ PlotlyFigure(
+ label='Intensity over q_norm (log scale)',
+ index=2,
+ figure=plot_json,
+ )
+ )
+
+ return plots
+
+ def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger'):
+ """
+ The normalize function of the `XRDResult` section.
+
+ Args:
+ archive (EntryArchive): The archive containing the section that is being
+ normalized.
+ logger (BoundLogger): A structlog logger.
+ """
+ super().normalize(archive, logger)
+ if self.name is None:
+ if self.scan_axis:
+ self.name = f'{self.scan_axis} Scan Result'
+ else:
+ self.name = 'XRD Scan Result'
+
+ try:
+ hdf5_handler = self.m_parent.hdf5_handler
+ assert isinstance(hdf5_handler, HDF5Handler)
+ except (AttributeError, AssertionError):
+ return
+
+ intensity = hdf5_handler.read_dataset(
+ path='data.results[0].intensity',
+ is_archive_path=True,
+ )
+ two_theta = hdf5_handler.read_dataset(
+ path='data.results[0].two_theta',
+ is_archive_path=True,
+ )
+ if intensity is None or two_theta is None:
+ return
+
+ if self.source_peak_wavelength is not None:
+ q_norm = hdf5_handler.read_dataset(
+ path='data.results[0].q_norm',
+ is_archive_path=True,
+ )
+ q_norm, two_theta = calculate_two_theta_or_q(
+ wavelength=self.source_peak_wavelength,
+ two_theta=two_theta,
+ q=q_norm,
+ )
+ hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_result/q_norm',
+ params=dict(
+ data=q_norm,
+ archive_path='data.results[0].q_norm',
+ ),
+ )
+ hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_result/two_theta',
+ params=dict(
+ data=two_theta,
+ archive_path='data.results[0].two_theta',
+ ),
+ )
+ self.m_setdefault('plot_intensity_scattering_vector')
+ self.plot_intensity_scattering_vector.normalize(archive, logger)
+
+ self.m_setdefault('plot_intensity')
+ self.plot_intensity.normalize(archive, logger)
+
+
+class XRDResultRSMHDF5(XRDResult):
+ """
+ Section containing the result of a Reciprocal Space Map (RSM) scan.
+ """
+
+ m_def = Section(
+ a_eln=ELNAnnotation(
+ properties=SectionProperties(
+ visible=Filter(
+ exclude=['array_index'],
+ ),
+ ),
+ )
+ )
+ intensity = Quantity(
+ type=HDF5Reference,
+ description='The count at each 2-theta value, dimensionless',
+ shape=[],
+ )
+ two_theta = Quantity(
+ type=HDF5Reference,
+ description='The 2-theta range of the diffractogram',
+ shape=[],
+ )
+ q_norm = Quantity(
+ type=HDF5Reference,
+ description='The norm of scattering vector *Q* of the diffractogram',
+ shape=[],
+ )
+ omega = Quantity(
+ type=HDF5Reference,
+ description='The omega range of the diffractogram',
+ shape=[],
+ )
+ phi = Quantity(
+ type=HDF5Reference,
+ description='The phi range of the diffractogram',
+ shape=[],
+ )
+ chi = Quantity(
+ type=HDF5Reference,
+ description='The chi range of the diffractogram',
+ shape=[],
+ )
+ integration_time = Quantity(
+ type=HDF5Reference,
+ description='Integration time per channel',
+ shape=[],
+ )
+ q_parallel = Quantity(
+ type=HDF5Reference,
+ description='The scattering vector *Q_parallel* of the diffractogram',
+ )
+ q_perpendicular = Quantity(
+ type=HDF5Reference,
+ description='The scattering vector *Q_perpendicular* of the diffractogram',
+ )
+ plot_intensity = SubSection(section_def=PlotIntensity)
+ plot_intensity_scattering_vector = SubSection(
+ section_def=PlotIntensityScatteringVector
+ )
+
+ def generate_plots(self):
+ """
+ Plot the 2D RSM diffractogram.
+
+ Args:
+ archive (EntryArchive): The archive containing the section that is being
+ normalized.
+ logger (BoundLogger): A structlog logger.
+
+ Returns:
+ (dict, dict): json_2theta_omega, json_q_vector
+ """
+ plots = []
+
+ try:
+ hdf5_handler = self.m_parent.hdf5_handler
+ assert isinstance(hdf5_handler, HDF5Handler)
+ except (AttributeError, AssertionError):
+ return plots
+
+ two_theta = hdf5_handler.read_dataset(
+ path='data.results[0].two_theta',
+ is_archive_path=True,
+ )
+ intensity = hdf5_handler.read_dataset(
+ path='data.results[0].intensity',
+ is_archive_path=True,
+ )
+ omega = hdf5_handler.read_dataset(
+ path='data.results[0].omega',
+ is_archive_path=True,
+ )
+ if two_theta is None or intensity is None or omega is None:
+ return plots
+
+ # Plot for 2theta-omega RSM
+ # Zero values in intensity become -inf in log scale and are not plotted
+ x = omega.to('degree').magnitude
+ y = two_theta.to('degree').magnitude
+ z = intensity.magnitude
+ log_z = np.log10(z)
+ x_range, y_range = get_bounding_range_2d(x, y)
+
+ fig_2theta_omega = px.imshow(
+ img=np.around(log_z, 3).T,
+ x=np.around(x, 3),
+ y=np.around(y, 3),
+ )
+ fig_2theta_omega.update_coloraxes(
+ colorscale='inferno',
+ cmin=np.nanmin(log_z[log_z != -np.inf]),
+ cmax=log_z.max(),
+ colorbar={
+ 'len': 0.9,
+ 'title': 'log10 Intensity',
+ 'ticks': 'outside',
+ 'tickformat': '5',
+ },
+ )
+ fig_2theta_omega.update_layout(
+ title={
+ 'text': 'Reciprocal Space Map over 2θ-ω',
+ 'x': 0.5,
+ 'xanchor': 'center',
+ },
+ xaxis_title='ω (°)',
+ yaxis_title='2θ (°)',
+ xaxis=dict(
+ autorange=False,
+ fixedrange=False,
+ range=x_range,
+ ),
+ yaxis=dict(
+ autorange=False,
+ fixedrange=False,
+ range=y_range,
+ ),
+ template='plotly_white',
+ hovermode='closest',
+ hoverlabel=dict(
+ bgcolor='white',
+ ),
+ dragmode='zoom',
+ width=600,
+ height=600,
+ )
+ fig_2theta_omega.update_traces(
+ hovertemplate=(
+ 'Intensity: 10%{z:.2f}
'
+ '2θ: %{y}°
'
+ 'ω: %{x}°'
+ ''
+ )
+ )
+ plot_json = fig_2theta_omega.to_plotly_json()
+ plot_json['config'] = dict(
+ scrollZoom=False,
+ )
+ plots.append(
+ PlotlyFigure(
+ label='RSM 2θ-ω',
+ index=1,
+ figure=plot_json,
+ ),
+ )
+
+ # Plot for RSM in Q-vectors
+ q_parallel = hdf5_handler.read_dataset(
+ path='data.results[0].q_parallel',
+ is_archive_path=True,
+ )
+ q_perpendicular = hdf5_handler.read_dataset(
+ path='data.results[0].q_perpendicular',
+ is_archive_path=True,
+ )
+ if q_parallel is not None and q_perpendicular is not None:
+ x = q_parallel.to('1/angstrom').magnitude.flatten()
+ y = q_perpendicular.to('1/angstrom').magnitude.flatten()
+ # q_vectors lead to irregular grid
+ # generate a regular grid using interpolation
+ x_regular = np.linspace(x.min(), x.max(), z.shape[0])
+ y_regular = np.linspace(y.min(), y.max(), z.shape[1])
+ x_grid, y_grid = np.meshgrid(x_regular, y_regular)
+ z_interpolated = griddata(
+ points=(x, y),
+ values=z.flatten(),
+ xi=(x_grid, y_grid),
+ method='linear',
+ fill_value=z.min(),
+ )
+ log_z_interpolated = np.log10(z_interpolated)
+ x_range, y_range = get_bounding_range_2d(x_regular, y_regular)
+
+ fig_q_vector = px.imshow(
+ img=np.around(log_z_interpolated, 3),
+ x=np.around(x_regular, 3),
+ y=np.around(y_regular, 3),
+ )
+ fig_q_vector.update_coloraxes(
+ colorscale='inferno',
+ cmin=np.nanmin(log_z[log_z != -np.inf]),
+ cmax=log_z_interpolated.max(),
+ colorbar={
+ 'len': 0.9,
+ 'title': 'log10 Intensity',
+ 'ticks': 'outside',
+ 'tickformat': '5',
+ },
+ )
+ fig_q_vector.update_layout(
+ title={
+ 'text': 'Reciprocal Space Map over Q-vectors',
+ 'x': 0.5,
+ 'xanchor': 'center',
+ },
+ xaxis_title='q‖ (Å-1)', # q ‖
+ yaxis_title='q⊥ (Å-1)', # q ⊥
+ xaxis=dict(
+ autorange=False,
+ fixedrange=False,
+ range=x_range,
+ ),
+ yaxis=dict(
+ autorange=False,
+ fixedrange=False,
+ range=y_range,
+ ),
+ template='plotly_white',
+ hovermode='closest',
+ hoverlabel=dict(
+ bgcolor='white',
+ ),
+ dragmode='zoom',
+ width=600,
+ height=600,
+ )
+ fig_q_vector.update_traces(
+ hovertemplate=(
+ 'Intensity: 10%{z:.2f}
'
+ 'q⊥: %{y} Å-1
'
+ 'q‖: %{x} Å-1'
+ ''
+ )
+ )
+ plot_json = fig_q_vector.to_plotly_json()
+ plot_json['config'] = dict(
+ scrollZoom=False,
+ )
+ plots.append(
+ PlotlyFigure(
+ label='RSM Q-vectors',
+ index=0,
+ figure=plot_json,
+ ),
+ )
+
+ return plots
+
+ def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger'):
+ super().normalize(archive, logger)
+
+ if self.name is None:
+ self.name = 'RSM Scan Result'
+
+ try:
+ hdf5_handler = self.m_parent.hdf5_handler
+ assert isinstance(hdf5_handler, HDF5Handler)
+ except (AttributeError, AssertionError):
+ return
+
+ intensity = hdf5_handler.read_dataset(
+ path='data.results[0].intensity',
+ is_archive_path=True,
+ )
+ two_theta = hdf5_handler.read_dataset(
+ path='data.results[0].two_theta',
+ is_archive_path=True,
+ )
+ var_axis = None
+ for axis in ['omega', 'chi', 'phi']:
+ axis_value = hdf5_handler.read_dataset(
+ path=f'data.results[0].{axis}',
+ is_archive_path=True,
+ )
+ if axis_value is not None and len(np.unique(axis_value.magnitude)) > 1:
+ var_axis = axis
+ break
+ if intensity is None or two_theta is None or var_axis is None:
+ return
+
+ if self.source_peak_wavelength is not None:
+ q_parallel, q_perpendicular = calculate_q_vectors_rsm(
+ wavelength=self.source_peak_wavelength,
+ two_theta=two_theta * np.ones_like(intensity),
+ omega=hdf5_handler.read_dataset(
+ path=f'data.results[0].{var_axis}',
+ is_archive_path=True,
+ ),
+ )
+ hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_result/q_parallel',
+ params=dict(
+ data=q_parallel,
+ archive_path='data.results[0].q_parallel',
+ ),
+ )
+ hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_result/q_perpendicular',
+ params=dict(
+ data=q_perpendicular,
+ archive_path='data.results[0].q_perpendicular',
+ ),
+ )
+ self.m_setdefault('plot_intensity_scattering_vector')
+ self.plot_intensity_scattering_vector.normalize(archive, logger)
+
+ self.m_setdefault('plot_intensity')
+ self.plot_intensity.normalize(archive, logger)
+
+
+class XRayDiffraction(Measurement):
+ """
+ Generic X-ray diffraction measurement.
+ """
+
+ m_def = Section()
+ method = Quantity(
+ type=str,
+ default='X-Ray Diffraction (XRD)',
+ )
+ xrd_settings = SubSection(
+ section_def=XRDSettings,
+ )
+ diffraction_method_name = Quantity(
+ type=MEnum(
+ [
+ 'Powder X-Ray Diffraction (PXRD)',
+ 'Single Crystal X-Ray Diffraction (SCXRD)',
+ 'High-Resolution X-Ray Diffraction (HRXRD)',
+ 'Small-Angle X-Ray Scattering (SAXS)',
+ 'X-Ray Reflectivity (XRR)',
+ 'Grazing Incidence X-Ray Diffraction (GIXRD)',
+ 'Reciprocal Space Mapping (RSM)',
+ ]
+ ),
+ description="""
+ The diffraction method used to obtain the diffraction pattern.
+ | X-ray Diffraction Method | Description |
+ |------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+ | **Powder X-ray Diffraction (PXRD)** | The term "powder" refers more to the random orientation of small crystallites than to the physical form of the sample. Can be used with non-powder samples if they present random crystallite orientations. |
+ | **Single Crystal X-ray Diffraction (SCXRD)** | Used for determining the atomic structure of a single crystal. |
+ | **High-Resolution X-ray Diffraction (HRXRD)** | A technique typically used for detailed characterization of epitaxial thin films using precise diffraction measurements. |
+ | **Small-Angle X-ray Scattering (SAXS)** | Used for studying nanostructures in the size range of 1-100 nm. Provides information on particle size, shape, and distribution. |
+ | **X-ray Reflectivity (XRR)** | Used to study thin film layers, interfaces, and multilayers. Provides info on film thickness, density, and roughness. |
+ | **Grazing Incidence X-ray Diffraction (GIXRD)** | Primarily used for the analysis of thin films with the incident beam at a fixed shallow angle. |
+ | **Reciprocal Space Mapping (RSM)** | High-resolution XRD method to measure diffracted intensity in a 2-dimensional region of reciprocal space. Provides information about the real-structure (lattice mismatch, domain structure, stress and defects) in single-crystalline and epitaxial samples.|
+ """, # noqa: E501
+ )
+ results = Measurement.results.m_copy()
+ results.section_def = XRDResult
+
+ def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger'):
+ """
+ The normalize function of the `XRayDiffraction` section.
+
+ Args:
+ archive (EntryArchive): The archive containing the section that is being
+ normalized.
+ logger (BoundLogger): A structlog logger.
+ """
+ super().normalize(archive, logger)
+ if (
+ self.xrd_settings is not None
+ and self.xrd_settings.source is not None
+ and self.xrd_settings.source.kalpha_one is not None
+ ):
+ for result in self.results:
+ if result.source_peak_wavelength is None:
+ result.source_peak_wavelength = self.xrd_settings.source.kalpha_one
+ result.normalize(archive, logger)
+ if not archive.results:
+ archive.results = Results()
+ if not archive.results.properties:
+ archive.results.properties = Properties()
+ if not archive.results.method:
+ archive.results.method = Method(
+ method_name='XRD',
+ measurement=MeasurementMethod(
+ xrd=XRDMethod(diffraction_method_name=self.diffraction_method_name)
+ ),
+ )
+
+ try:
+ hdf5_handler = self.hdf5_handler
+ assert isinstance(hdf5_handler, HDF5Handler)
+ except (AttributeError, AssertionError):
+ return
+ if not archive.results.properties.structural:
+ diffraction_patterns = []
+ for result in self.results:
+ intensity = hdf5_handler.read_dataset(
+ 'data.results[0].intensity', is_archive_path=True
+ )
+ if len(intensity.shape) == 1:
+ two_theta = hdf5_handler.read_dataset(
+ 'data.results[0].two_theta', is_archive_path=True
+ )
+ q_norm = hdf5_handler.read_dataset(
+ 'data.results[0].q_norm', is_archive_path=True
+ )
+ diffraction_patterns.append(
+ DiffractionPattern(
+ incident_beam_wavelength=result.source_peak_wavelength,
+ two_theta_angles=two_theta,
+ intensity=intensity,
+ q_vector=q_norm,
+ )
+ )
+ archive.results.properties.structural = StructuralProperties(
+ diffraction_pattern=diffraction_patterns
+ )
+
+
+class ELNXRayDiffraction(XRayDiffraction, EntryData):
+ """
+ Example section for how XRayDiffraction can be implemented with a general reader for
+ common XRD file types.
+ """
+
+ m_def = Section(
+ categories=[NOMADMeasurementsCategory],
+ label='X-Ray Diffraction (XRD)',
+ a_eln=ELNAnnotation(
+ lane_width='800px',
+ ),
+ a_template={
+ 'measurement_identifiers': {},
+ },
+ a_h5web=H5WebAnnotation(
+ paths=[
+ 'results/0/plot_intensity',
+ 'results/0/plot_intensity_scattering_vector',
+ ]
+ ),
+ )
+ data_file = Quantity(
+ type=str,
+ description='Data file containing the diffractogram',
+ a_eln=ELNAnnotation(
+ component=ELNComponentEnum.FileEditQuantity,
+ ),
+ )
+ auxiliary_file = Quantity(
+ type=str,
+ description='Auxiliary file (like .h5 or .nxs) containing the entry data.',
+ a_eln=ELNAnnotation(
+ component=ELNComponentEnum.FileEditQuantity,
+ ),
+ )
+ overwrite_auxiliary_file = Quantity(
+ type=bool,
+ description='Overwrite the auxiliary file with the current data.',
+ a_eln=ELNAnnotation(
+ component=ELNComponentEnum.BoolEditQuantity,
+ ),
+ )
+ nexus_results = Quantity(
+ type=ArchiveSection,
+ description='Reference to the NeXus entry.',
+ a_eln=ELNAnnotation(component=ELNComponentEnum.ReferenceEditQuantity),
+ )
+ measurement_identifiers = SubSection(
+ section_def=ReadableIdentifiers,
+ )
+ diffraction_method_name = XRayDiffraction.diffraction_method_name.m_copy()
+ diffraction_method_name.m_annotations['eln'] = ELNAnnotation(
+ component=ELNComponentEnum.EnumEditQuantity,
+ )
+ hdf5_handler = None
+
+ def get_read_write_functions(self) -> tuple[Callable, Callable]:
+ """
Method for getting the correct read and write functions for the current data
file.
-
Returns:
tuple[Callable, Callable]: The read, write functions.
"""
@@ -940,49 +1800,81 @@ def write_xrd_data(
metadata_dict: dict = xrd_dict.get('metadata', {})
source_dict: dict = metadata_dict.get('source', {})
- scan_type = metadata_dict.get('scan_type', None)
- if scan_type == 'line':
- result = XRDResult1D(
- intensity=xrd_dict.get('intensity', None),
- two_theta=xrd_dict.get('2Theta', None),
- omega=xrd_dict.get('Omega', None),
- chi=xrd_dict.get('Chi', None),
- phi=xrd_dict.get('Phi', None),
- scan_axis=metadata_dict.get('scan_axis', None),
- integration_time=xrd_dict.get('countTime', None),
- )
- result.normalize(archive, logger)
+ scan_type = metadata_dict.get('scan_type')
+ if scan_type not in ['line', 'rsm']:
+ logger.error(f'Scan type `{scan_type}` is not supported.')
+ return
+ # Create a new result section
+ results = []
+ result = None
+ if scan_type == 'line':
+ result = XRDResult1DHDF5()
elif scan_type == 'rsm':
- result = XRDResultRSM(
- intensity=xrd_dict.get('intensity', None),
- two_theta=xrd_dict.get('2Theta', None),
- omega=xrd_dict.get('Omega', None),
- chi=xrd_dict.get('Chi', None),
- phi=xrd_dict.get('Phi', None),
- scan_axis=metadata_dict.get('scan_axis', None),
- integration_time=xrd_dict.get('countTime', None),
+ result = XRDResultRSMHDF5()
+
+ if result is not None:
+ result.scan_axis = metadata_dict.get('scan_axis')
+ self.hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_result/intensity',
+ params=dict(
+ data=xrd_dict.get('intensity'),
+ archive_path='data.results[0].intensity',
+ ),
+ )
+ self.hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_result/two_theta',
+ params=dict(
+ data=xrd_dict.get('2Theta'),
+ archive_path='data.results[0].two_theta',
+ ),
+ )
+ self.hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_result/omega',
+ params=dict(
+ data=xrd_dict.get('Omega'),
+ archive_path='data.results[0].omega',
+ ),
+ )
+ self.hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_result/chi',
+ params=dict(
+ data=xrd_dict.get('Chi'),
+ archive_path='data.results[0].chi',
+ ),
+ )
+ self.hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_result/phi',
+ params=dict(
+ data=xrd_dict.get('Phi'),
+ archive_path='data.results[0].phi',
+ ),
+ )
+ self.hdf5_handler.add_dataset(
+ path='/ENTRY[entry]/experiment_config/count_time',
+ params=dict(
+ data=xrd_dict.get('countTime'),
+ archive_path='data.results[0].integration_time',
+ ),
)
result.normalize(archive, logger)
- else:
- raise NotImplementedError(f'Scan type `{scan_type}` is not supported.')
+ results.append(result)
source = XRayTubeSource(
- xray_tube_material=source_dict.get('anode_material', None),
- kalpha_one=source_dict.get('kAlpha1', None),
- kalpha_two=source_dict.get('kAlpha2', None),
- ratio_kalphatwo_kalphaone=source_dict.get('ratioKAlpha2KAlpha1', None),
- kbeta=source_dict.get('kBeta', None),
- xray_tube_voltage=source_dict.get('voltage', None),
- xray_tube_current=source_dict.get('current', None),
+ xray_tube_material=source_dict.get('anode_material'),
+ kalpha_one=source_dict.get('kAlpha1'),
+ kalpha_two=source_dict.get('kAlpha2'),
+ ratio_kalphatwo_kalphaone=source_dict.get('ratioKAlpha2KAlpha1'),
+ kbeta=source_dict.get('kBeta'),
+ xray_tube_voltage=source_dict.get('voltage'),
+ xray_tube_current=source_dict.get('current'),
)
source.normalize(archive, logger)
-
xrd_settings = XRDSettings(source=source)
xrd_settings.normalize(archive, logger)
samples = []
- if metadata_dict.get('sample_id', None) is not None:
+ if metadata_dict.get('sample_id') is not None:
sample = CompositeSystemReference(
lab_id=metadata_dict['sample_id'],
)
@@ -990,115 +1882,22 @@ def write_xrd_data(
samples.append(sample)
xrd = ELNXRayDiffraction(
- results=[result],
+ results=results,
xrd_settings=xrd_settings,
samples=samples,
)
+
merge_sections(self, xrd, logger)
- def write_nx_xrd(
- self,
- xrd_dict: 'Template',
- archive: 'EntryArchive',
- logger: 'BoundLogger',
- ) -> None:
+ def backward_compatibility(self):
"""
- Populate `ELNXRayDiffraction` section from a NeXus Template.
-
- Args:
- xrd_dict (Dict[str, Any]): A dictionary with the XRD data.
- archive (EntryArchive): The archive containing the section.
- logger (BoundLogger): A structlog logger.
+ Method for backward compatibility.
"""
- # TODO add the result section based on the scan_type
- result = XRDResult(
- intensity=xrd_dict.get(
- '/ENTRY[entry]/2theta_plot/intensity',
- None,
- ),
- two_theta=xrd_dict.get(
- '/ENTRY[entry]/2theta_plot/two_theta',
- None,
- ),
- omega=xrd_dict.get(
- '/ENTRY[entry]/2theta_plot/omega',
- None,
- ),
- chi=xrd_dict.get('/ENTRY[entry]/2theta_plot/chi', None),
- phi=xrd_dict.get(
- '/ENTRY[entry]/2theta_plot/phi',
- None,
- ),
- scan_axis=xrd_dict.get(
- '/ENTRY[entry]/INSTRUMENT[instrument]/DETECTOR[detector]/scan_axis',
- None,
- ),
- integration_time=xrd_dict.get(
- '/ENTRY[entry]/COLLECTION[collection]/count_time', None
- ),
- )
- result.normalize(archive, logger)
-
- source = XRayTubeSource(
- xray_tube_material=xrd_dict.get(
- '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/xray_tube_material',
- None,
- ),
- kalpha_one=xrd_dict.get(
- '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/k_alpha_one',
- None,
- ),
- kalpha_two=xrd_dict.get(
- '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/k_alpha_two',
- None,
- ),
- ratio_kalphatwo_kalphaone=xrd_dict.get(
- '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/ratio_k_alphatwo_k_alphaone',
- None,
- ),
- kbeta=xrd_dict.get(
- '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/kbeta',
- None,
- ),
- xray_tube_voltage=xrd_dict.get(
- 'ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/xray_tube_voltage',
- None,
- ),
- xray_tube_current=xrd_dict.get(
- '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/xray_tube_current',
- None,
- ),
- )
- source.normalize(archive, logger)
-
- xrd_settings = XRDSettings(source=source)
- xrd_settings.normalize(archive, logger)
-
- sample = CompositeSystemReference(
- lab_id=xrd_dict.get(
- '/ENTRY[entry]/SAMPLE[sample]/sample_id',
- None,
- ),
- )
- sample.normalize(archive, logger)
-
- xrd = ELNXRayDiffraction(
- results=[result],
- xrd_settings=xrd_settings,
- samples=[sample],
- )
- merge_sections(self, xrd, logger)
-
- nexus_output = None
- if self.generate_nexus_file:
- archive_name = archive.metadata.mainfile.split('.')[0]
- nexus_output = f'{archive_name}_output.nxs'
- handle_nexus_subsection(
- xrd_dict,
- nexus_output,
- archive,
- logger,
- )
+ # Migration to using HFD5References: removing exisiting results
+ if self.get('results'):
+ self.results = []
+ if self.get('figures'):
+ self.figures = []
def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger'):
"""
@@ -1109,7 +1908,20 @@ def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger'):
normalized.
logger (BoundLogger): A structlog logger.
"""
+ self.backward_compatibility()
if self.data_file is not None:
+ # TODO (ka-sarthak): use .nxs file once updating the flag through the
+ # normalizer works.
+ # self.auxiliary_file = f'{self.data_file.rsplit(".", 1)[0]}.nxs'
+ self.auxiliary_file = f'{self.data_file.rsplit(".", 1)[0]}.h5'
+ self.hdf5_handler = HDF5Handler(
+ filename=self.auxiliary_file,
+ archive=archive,
+ logger=logger,
+ # TODO (ka-sarthak): use nexus dataset map once updating the flag
+ # through the normalizer works.
+ # nexus_dataset_map=NEXUS_DATASET_MAP,
+ )
read_function, write_function = self.get_read_write_functions()
if read_function is None or write_function is None:
logger.warn(
@@ -1119,10 +1931,29 @@ def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger'):
with archive.m_context.raw_file(self.data_file) as file:
xrd_dict = read_function(file.name, logger)
write_function(xrd_dict, archive, logger)
+
super().normalize(archive, logger)
- if not self.results:
- return
- self.figures = self.results[0].generate_plots(archive, logger)
+
+ if self.overwrite_auxiliary_file or not archive.m_context.raw_path_exists(
+ self.auxiliary_file
+ ):
+ self.hdf5_handler.write_file()
+ if self.hdf5_handler.filename != self.auxiliary_file:
+ self.auxiliary_file = self.hdf5_handler.filename
+ # TODO (ka-sarthak): update the flag through the normalizer once it works.
+ # self.overwrite_auxiliary_file = False
+ else:
+ self.hdf5_handler.set_hdf5_references()
+
+ self.nexus_results = None
+ if self.auxiliary_file.endswith('.nxs'):
+ nx_entry_id = get_entry_id_from_file_name(
+ archive=archive, file_name=self.auxiliary_file
+ )
+ ref_to_nx_entry_data = get_reference(
+ archive.metadata.upload_id, nx_entry_id
+ )
+ self.nexus_results = f'{ref_to_nx_entry_data}'
class RawFileXRDData(EntryData):
diff --git a/tests/conftest.py b/tests/conftest.py
index 975b2f47..d005ecbb 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -66,8 +66,21 @@ def fixture_parsed_measurement_archive(request):
file created by plugin parsers for the measurement data. Parsing this
`.archive.json` file returns the `EntryArchive` object for the measurement data,
which is finally yeilded to the test function.
+
+ Clean-up:
+ By default, the `.archive.json` file is cleaned up after the test. If additional
+ files need to be cleaned up, they can be specified in the request.param as a
+ tuple or list of file extensions. For example, ('file_path', ['.nxs', '.h5'])
+ can be used to clean up '.nxs' and '.h5' files in addition to '.archive.json'.
+ If only the '.archive.json' file needs to be cleaned up, ('file_path',) can be
+ used as parameters for the fixture.
"""
- rel_file_path = request.param
+ clean_up_extensions = ['.archive.json']
+ if isinstance(request.param, (tuple, list)):
+ rel_file_path = request.param[0]
+ clean_up_extensions.extend(request.param[1])
+ else:
+ rel_file_path = request.param
file_archive = parse(rel_file_path)[0]
rel_measurement_archive_path = os.path.join(
@@ -79,5 +92,8 @@ def fixture_parsed_measurement_archive(request):
yield parse(rel_measurement_archive_path)[0]
- if os.path.exists(rel_measurement_archive_path):
- os.remove(rel_measurement_archive_path)
+ # clean up
+ for ext in clean_up_extensions:
+ path = os.path.join(rel_file_path.rsplit('.', 1)[0] + ext)
+ if os.path.exists(path):
+ os.remove(path)
diff --git a/tests/test_xrd.py b/tests/test_xrd.py
index 124be398..1188258b 100644
--- a/tests/test_xrd.py
+++ b/tests/test_xrd.py
@@ -18,6 +18,8 @@
import pytest
from nomad.client import normalize_all
+from nomad_measurements.xrd.schema import XRDResult1DHDF5
+
test_files = [
'tests/data/xrd/XRD-918-16_10.xrdml',
'tests/data/xrd/m54313_om2th_10.xrdml',
@@ -29,11 +31,19 @@
'tests/data/xrd/TwoTheta_scan_powder.rasx',
]
log_levels = ['error', 'critical']
+clean_up_extensions = ['.archive.json', '.nxs', '.h5']
@pytest.mark.parametrize(
'parsed_measurement_archive, caplog',
- [(file, log_level) for file in test_files for log_level in log_levels],
+ [
+ (
+ (file, clean_up_extensions),
+ log_level,
+ )
+ for file in test_files
+ for log_level in log_levels
+ ],
indirect=True,
)
def test_normalize_all(parsed_measurement_archive, caplog):
@@ -52,7 +62,7 @@ def test_normalize_all(parsed_measurement_archive, caplog):
assert parsed_measurement_archive.data.results[
0
].source_peak_wavelength.magnitude == pytest.approx(1.540598, 1e-2)
- if len(parsed_measurement_archive.data.results[0].intensity.shape) == 1:
+ if isinstance(parsed_measurement_archive.data.results[0], XRDResult1DHDF5):
assert (
parsed_measurement_archive.results.properties.structural.diffraction_pattern[
0