Skip to content

Commit

Permalink
Use hdf5 references for arrays (#118)
Browse files Browse the repository at this point in the history
* updated plugin structure

* added pynxtools dependency

* Apply suggestions from code review

Co-authored-by: Sarthak Kapoor <[email protected]>
Co-authored-by: Hampus Näsström <[email protected]>

* Add sections for RSM and 1D which uses HDF5 references

* Abstract out data interaction using setter and getter; allows to use same methods for classes with hdf5 refs

* Use arrays, not references, in the `archive.results` section

* Lock the state for using nexus file and corresponding references

* Populate results without references

* Make a general reader for raw files

* Remove nexus flags

* Add quantity for auxialiary file

* Fix rebase

* Make integration_time as hdf5reference

* Reset results (refactor)

* Add backward compatibility

* Refactor reader

* add missing imports

* AttrDict class

* Make concept map global

* Add function to remove nexus annotations in concept map

* Move try block inside walk_through_object

* Fix imports

* Add methods for generating hdf5 file

* Rename auxiliary file

* Expect aux file to be .nxs in the beginning

* Add attributes for hdf5: data_dict, dataset_paths

* Method for adding a quantity to hdf5_data_dict

* Abstract out methods for creating files based on hdf5_data_dict

* Add dataset_paths for nexus

* Some reverting back

* Minor fixes

* Refactor populate_hdf5_data_dict: store a reference to be made later

* Handle shift from nxs to hdf5

* Set hdf5 references after aux file is created

* Cleaning

* Fixing

* Redefine result sections instead of extending

* Remove plotly plots from ELN

* Read util for hdf5 ref

* Fixing

* Move hdf5 handling into a util class

* Refactor instance variables

* Reset data dicts and reference after each writing

* Fixing

* Overwrite dataset if it already exists

* Refactor add_dataset

* Reorganize and doctrings

* Rename variable

* Add read_dataset method

* Cleaning

* Adapting schema with hdf5 handler

* Cooments, minor refactoring

* Fixing; add `hdf5_handler` as an attribute for archive

* Reorganization

* Fixing

* Refactoring

* Cleaning

* Try block for using hdf5 handler: dont fail early, as later normalization steps will have the handler!

* Extract units from dataset attrs when reading

* Fixing

* Linting

* Make archive_path optional in add_dataset

* Rename class

* attrs for add_dataset; use it for units

* Add add_attribute method

* Refactor add_attribute

* Add plot attributes: 1D

* Refactor hdf5 states

* Add back plotly figures

* rename auxiliary file name if changed by handler

* Add referenced plots

* Allow hard link using internel reference

* Add sections for plots

* Comment out validation

* Add archive paths for the plot subsections

* Add back validation with flag

* Use nexus flag

* Add interpolated intensity data into h5 for qspace plots

* Use prefix to reduce len of string

* Store regularized linespace of q vectors; revise descriptions

* Remove plotly plots

* Bring plots to overview

* Fix tests

* Linting; remove attr arg from add_dataset

* Review: move none check into method

* Review: use 'with' for opening h5 file

* Review: make internal states as private vars

* Add pydantic basemodel for dataset

* Use data from variables if available for reading

* Review: remove lazy arg

* Move DatasetModel outside Handler class

* Remove None from get, as it is already a default

* Merge if conditions

---------

Co-authored-by: Andrea Albino <[email protected]>
Co-authored-by: Andrea Albino <[email protected]>
Co-authored-by: Hampus Näsström <[email protected]>
  • Loading branch information
4 people committed Dec 19, 2024
1 parent 6d04d02 commit 19dec87
Show file tree
Hide file tree
Showing 4 changed files with 831 additions and 323 deletions.
331 changes: 331 additions & 0 deletions src/nomad_measurements/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import collections
import os.path
import re
from typing import (
TYPE_CHECKING,
Any,
Optional,
)

import h5py
import numpy as np
import pint
from nomad.datamodel.hdf5 import HDF5Reference
from nomad.units import ureg
from pydantic import BaseModel, Field

if TYPE_CHECKING:
from nomad.datamodel.data import (
Expand Down Expand Up @@ -153,3 +162,325 @@ def get_bounding_range_2d(ax1, ax2):
]

return ax1_range, ax2_range


class DatasetModel(BaseModel):
"""
Pydantic model for the dataset to be stored in the HDF5 file.
"""

data: Any = Field(description='The data to be stored in the HDF5 file.')
archive_path: Optional[str] = Field(
None, description='The path of the quantity in the NOMAD archive.'
)
internal_reference: Optional[bool] = Field(
False,
description='If True, an internal reference is set to an existing HDF5 '
'dataset.',
)


class HDF5Handler:
"""
Class for handling the creation of auxiliary files to store big data arrays outside
the main archive file (e.g. HDF5, NeXus).
"""

def __init__(
self,
filename: str,
archive: 'EntryArchive',
logger: 'BoundLogger',
valid_dataset_paths: list = None,
nexus: bool = False,
):
"""
Initialize the handler.
Args:
filename (str): The name of the auxiliary file.
archive (EntryArchive): The NOMAD archive.
logger (BoundLogger): A structlog logger.
valid_dataset_paths (list): The list of valid dataset paths.
nexus (bool): If True, the file is created as a NeXus file.
"""
if not filename.endswith(('.nxs', '.h5')):
raise ValueError('Only .h5 or .nxs files are supported.')

self.data_file = filename
self.archive = archive
self.logger = logger
self.valid_dataset_paths = []
if valid_dataset_paths:
self.valid_dataset_paths = valid_dataset_paths
self.nexus = nexus

self._hdf5_datasets = collections.OrderedDict()
self._hdf5_attributes = collections.OrderedDict()

def add_dataset(
self,
path: str,
params: dict,
validate_path: bool = True,
):
"""
Add a dataset to the HDF5 file. The dataset is written lazily to the file
when `write_file` method is called. The `path` is validated against the
`valid_dataset_paths` if provided before adding the data.
`params` should be a dictionary containing `data`. Optionally,
it can also contain `archive_path` and `internal_reference`:
{
'data': Any,
'archive_path': str,
'internal_reference': bool,
}
Args:
path (str): The dataset path to be used in the HDF5 file.
params (dict): The dataset parameters.
validate_path (bool): If True, the dataset path is validated.
"""
if not params:
self.logger.warning('Dataset `params` must be provided.')
return

dataset = DatasetModel(
**params,
)
if (
validate_path
and self.valid_dataset_paths
and path not in self.valid_dataset_paths
):
self.logger.warning(f'Invalid dataset path "{path}".')
return

# handle the pint.Quantity and add data
if isinstance(dataset.data, pint.Quantity):
self.add_attribute(
path=path,
params=dict(
units=str(dataset.data.units),
),
)
dataset.data = dataset.data.magnitude

self._hdf5_datasets[path] = dataset

def add_attribute(
self,
path: str,
params: dict,
):
"""
Add an attribute to the dataset or group at the given path. The attribute is
written lazily to the file when `write_file` method is called.
Args:
path (str): The dataset or group path in the HDF5 file.
params (dict): The attributes to be added.
"""
if not params:
self.logger.warning('Attribute `params` must be provided.')
return
self._hdf5_attributes[path] = params

def read_dataset(self, path: str):
"""
Returns the dataset at the given path. If the quantity has `units` as an
attribute, tries to returns a `pint.Quantity`.
If the dataset available in the `self._hdf5_datasets`, it is returned directly.
Args:
path (str): The dataset path in the HDF5 file.
"""
if path is None:
return
file_path, dataset_path = path.split('#')

# find path in the instance variables
value = None
if dataset_path in self._hdf5_datasets:
value = self._hdf5_datasets[dataset_path].data
if dataset_path in self._hdf5_attributes:
units = self._hdf5_attributes[dataset_path].get('units')
if units:
value *= ureg(units)
return value

file_name = file_path.rsplit('/raw/', 1)[1]
with h5py.File(self.archive.m_context.raw_file(file_name, 'rb')) as h5:
if dataset_path not in h5:
self.logger.warning(f'Dataset "{dataset_path}" not found.')
else:
value = h5[dataset_path][...]
try:
units = h5[dataset_path].attrs['units']
value *= ureg(units)
except KeyError:
pass
return value

def write_file(self):
"""
Method for creating an auxiliary file to store big data arrays outside the
main archive file (e.g. HDF5, NeXus).
"""
if self.nexus:
try:
self._write_nx_file()
except Exception as e:
self.nexus = False
self.logger.warning(
f'Encountered "{e}" error while creating nexus file. '
'Creating h5 file instead.'
)
self._write_hdf5_file()
else:
self._write_hdf5_file()

def _write_nx_file(self):
"""
Method for creating a NeXus file. Additional data from the archive is added
to the `hdf5_data_dict` before creating the nexus file. This provides a NeXus
view of the data in addition to storing array data.
"""
if self.data_file.endswith('.h5'):
self.data_file = self.data_file.replace('.h5', '.nxs')
raise NotImplementedError('Method `write_nx_file` is not implemented.')
# TODO add archive data to `hdf5_data_dict` before creating the nexus file. Use
# `populate_hdf5_data_dict` method for each quantity that is needed in .nxs
# file. Create a NeXus file with the data in `hdf5_data_dict`.
# One issue here is as we populate the `hdf5_data_dict` with the archive data,
# we will always have to over write the nexus file

def _write_hdf5_file(self): # noqa: PLR0912
"""
Method for creating an HDF5 file.
"""
if self.data_file.endswith('.nxs'):
self.data_file = self.data_file.replace('.nxs', '.h5')
if not self._hdf5_datasets and not self._hdf5_attributes:
return
# remove the nexus annotations from the dataset paths if any
tmp_dict = {}
for key, value in self._hdf5_datasets.items():
new_key = self._remove_nexus_annotations(key)
tmp_dict[new_key] = value
self._hdf5_datasets = tmp_dict
tmp_dict = {}
for key, value in self._hdf5_attributes.items():
tmp_dict[self._remove_nexus_annotations(key)] = value
self._hdf5_attributes = tmp_dict

# create the HDF5 file
mode = 'r+b' if self.archive.m_context.raw_path_exists(self.data_file) else 'wb'
with h5py.File(
self.archive.m_context.raw_file(self.data_file, mode), 'a'
) as h5:
for key, value in self._hdf5_datasets.items():
if value.data is None:
self.logger.warning(f'No data found for "{key}". Skipping.')
continue
elif value.internal_reference:
# resolve the internal reference
try:
data = h5[self._remove_nexus_annotations(value.data)]
except KeyError:
self.logger.warning(
f'Internal reference "{value.data}" not found. Skipping.'
)
continue
else:
data = value.data

group_name, dataset_name = key.rsplit('/', 1)
group = h5.require_group(group_name)

if key in h5:
group[dataset_name][...] = data
else:
group.create_dataset(
name=dataset_name,
data=data,
)
self._set_hdf5_reference(
self.archive,
value.archive_path,
f'/uploads/{self.archive.m_context.upload_id}/raw'
f'/{self.data_file}#{key}',
)
for key, value in self._hdf5_attributes.items():
if key in h5:
h5[key].attrs.update(value)
else:
self.logger.warning(f'Path "{key}" not found to add attribute.')

# reset hdf5 datasets and atttributes
self._hdf5_datasets = collections.OrderedDict()
self._hdf5_attributes = collections.OrderedDict()

@staticmethod
def _remove_nexus_annotations(path: str) -> str:
"""
Remove the nexus related annotations from the dataset path.
For e.g.,
'/ENTRY[entry]/experiment_result/intensity' ->
'/entry/experiment_result/intensity'
Args:
path (str): The dataset path with nexus annotations.
Returns:
str: The dataset path without nexus annotations.
"""
if not path:
return path

pattern = r'.*\[.*\]'
new_path = ''
for part in path.split('/')[1:]:
if re.match(pattern, part):
new_path += '/' + part.split('[')[0].strip().lower()
else:
new_path += '/' + part
new_path = new_path.replace('.nxs', '.h5')
return new_path

@staticmethod
def _set_hdf5_reference(
section: 'ArchiveSection' = None, path: str = None, ref: str = None
):
"""
Method for setting a HDF5Reference quantity in a section. It can handle
nested quantities and repeatable sections, provided that the quantity itself
is of type `HDF5Reference`.
For example, one can set the reference for a quantity path like
`data.results[0].intensity`.
Args:
section (Section): The NOMAD section containing the quantity.
path (str): The path to the quantity.
ref (str): The reference to the HDF5 dataset.
"""
# TODO handle the case when section in the path is not initialized

if not section or not path or not ref:
return
attr = section
path = path.split('.')
quantity_name = path.pop()

for subpath in path:
if re.match(r'.*\[.*\]', subpath):
index = int(subpath.split('[')[1].split(']')[0])
attr = attr.m_get(subpath.split('[')[0], index=index)
else:
attr = attr.m_get(subpath)

if isinstance(
attr.m_get_quantity_definition(quantity_name).type, HDF5Reference
):
attr.m_set(quantity_name, ref)
Loading

0 comments on commit 19dec87

Please sign in to comment.