diff --git a/src/pynxtools_xps/file_parser.py b/src/pynxtools_xps/file_parser.py new file mode 100644 index 00000000..2a64784d --- /dev/null +++ b/src/pynxtools_xps/file_parser.py @@ -0,0 +1,156 @@ +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pylint: disable=too-many-lines + +""" +Generic Classes for reading XPS files into python dictionary. +""" + +from typing import Dict, Tuple + +from pynxtools_xps.phi.spe_pro_phi import MapperPhi +from pynxtools_xps.scienta.scienta_reader import MapperScienta +from pynxtools_xps.specs.sle.sle_specs import SleMapperSpecs +from pynxtools_xps.specs.xy.xy_specs import XyMapperSpecs +from pynxtools_xps.specs.xml.xml_specs import XmlMapperSpecs +from pynxtools_xps.vms.txt_vamas_export import TxtMapperVamasExport +from pynxtools_xps.vms.vamas import VamasMapper + + +class XpsDataFileParser: + """Class intended for receiving any type of XPS data file.""" + + __prmt_file_ext__ = ["ibw", "npl", "pro", "spe", "sle", "txt", "vms", "xml", "xy"] + __prmt_metadata_file_ext__ = ["slh"] + __vendors__ = ["kratos", "phi", "scienta", "specs", "unkwown"] + __prmt_vndr_cls: Dict[str, Dict] = { + "ibw": {"scienta": MapperScienta}, + "npl": {"unkwown": VamasMapper}, + "pro": {"phi": MapperPhi}, + "spe": {"phi": MapperPhi}, + "sle": {"specs": SleMapperSpecs}, + "txt": { + "scienta": MapperScienta, + "unknown": TxtMapperVamasExport, + }, + "vms": {"unkwown": VamasMapper}, + "xml": {"specs": XmlMapperSpecs}, + "xy": {"specs": XyMapperSpecs}, + } + + __file_err_msg__ = ( + "Need an XPS data file with the following extension: " f"{__prmt_file_ext__}" + ) + + __vndr_err_msg__ = ( + "Need an XPS data file from the following vendors: " f"{__vendors__}" + ) + + def __init__(self, file_paths: Tuple[str, ...]) -> None: + """ + Receive XPS file path. + + Parameters + ---------- + file_paths : List + XPS file path. + """ + if isinstance(file_paths, str): + file_paths = [file_paths] + + self.files = file_paths + self.config_file = None + + if not self.files: + raise ValueError(XpsDataFileParser.__file_err_msg__) + + def get_dict(self, **kwargs) -> dict: + """ + Return python dict fully filled data from xps file. + Returns + ------- + python dictionary + """ + for file in self.files: + file_ext = file.rsplit(".")[-1] + + if file_ext in XpsDataFileParser.__prmt_file_ext__: + vendor = XpsDataFileParser.check_for_vendors(file) + try: + parser_class = XpsDataFileParser.__prmt_vndr_cls[file_ext][vendor] + parser_obj = parser_class() + + parser_obj.parse_file(file, **kwargs) + self.config_file = parser_obj.config_file + return parser_obj.data_dict + + except ValueError as val_err: + raise ValueError(XpsDataFileParser.__vndr_err_msg__) from val_err + except KeyError as key_err: + raise KeyError(XpsDataFileParser.__vndr_err_msg__) from key_err + else: + raise ValueError(XpsDataFileParser.__file_err_msg__) + return {} + + @classmethod + def check_for_vendors(cls, file: str) -> str: + """ + Check for the vendor name of the XPS data file. + + """ + file_ext = file.rsplit(".")[-1] + + vendor_dict = XpsDataFileParser.__prmt_vndr_cls[file_ext] + + if len(vendor_dict) == 1: + return list(vendor_dict.keys())[0] + if file_ext == "txt": + return cls._check_for_vendors_txt(file) + return None + + @classmethod + def _check_for_vendors_txt(cls, file: str) -> str: + """ + Search for a vendor names in a txt file + + Parameters + ---------- + file : str + XPS txt file. + + Returns + ------- + vendor + Vendor name if that name is in the txt file. + + """ + vendor_dict = XpsDataFileParser.__prmt_vndr_cls["txt"] + + with open(file, encoding="utf-8") as txt_file: + contents = txt_file.read() + + for vendor in vendor_dict: + vendor_options = [vendor, vendor.upper(), vendor.capitalize()] + + if any(vendor_opt in contents for vendor_opt in vendor_options): + return vendor + if contents[:6] == "[Info]": + # This is for picking the Scienta reader is "scienta" + # is not in the file + return vendor + return "unknown" diff --git a/src/pynxtools_xps/kratos/metadata_kratos.py b/src/pynxtools_xps/kratos/metadata_kratos.py index 95b4ad08..5a54381c 100644 --- a/src/pynxtools_xps/kratos/metadata_kratos.py +++ b/src/pynxtools_xps/kratos/metadata_kratos.py @@ -23,7 +23,6 @@ # pylint: disable=too-many-lines,too-many-instance-attributes import re -import datetime from typing import Any, Dict, List, Union, Tuple from pathlib import Path @@ -34,6 +33,7 @@ from pynxtools_xps.value_mappers import ( convert_bool, convert_units, + parse_datetime, ) from pynxtools_xps.kratos.kratos_data_model import ( @@ -72,6 +72,8 @@ "sample_tilt": "degree", } +POSSIBLE_DATE_FORMATS: List[str] = ["%d.%m.%Y %H:%M", "%d/%m/%Y %H:%M"] + class KratosParser: """ @@ -89,7 +91,7 @@ def __init__(self): self.metadata = KratosMetadata() self.value_function_map: Dict[str, Any] = { - "date_created": _parse_datetime, + "date_created": parse_datetime, "description": _convert_description, "charge_neutraliser": convert_bool, "deflection": _convert_xray_deflection, @@ -218,10 +220,12 @@ def map_values(self, key: str, value, field_type): Value of correct type and internal structure. """ - if key in self.value_function_map: map_fn = self.value_function_map[key] - value = map_fn(value) + if "date" in key: + value = map_fn(value, POSSIBLE_DATE_FORMATS) + else: + value = map_fn(value) return field_type(value) def flatten_metadata(self) -> Dict[str, Any]: @@ -268,34 +272,6 @@ def setup_unit(flattened_dict: Dict[str, Any], unit_key: str): return flattened_dict -def _parse_datetime(datetime_string: str) -> str: - """ - Convert the native time format to the datetime string - in the ISO 8601 format: '%Y-%b-%dT%H:%M:%S.%fZ'. - - Parameters - ---------- - value : str - String representation of the date in the format - "%Y-%m-%d", "%m/%d/%Y" or "%H:%M:%S", "%I:%M:%S %p". - - Returns - ------- - date_object : str - Datetime in ISO8601 format. - - """ - possible_date_formats = ["%d.%m.%Y %H:%M", "%d/%m/%Y %H:%M"] - for date_fmt in possible_date_formats: - try: - datetime_obj = datetime.datetime.strptime(datetime_string, date_fmt) - return datetime_obj.astimezone().isoformat() - - except ValueError: - continue - raise ValueError("Date and time could not be converted to ISO 8601 format.") - - def _convert_description(value: str) -> Dict[str, Any]: """Map all items in description to a dictionary.""" pattern = re.compile( diff --git a/src/pynxtools_xps/reader_utils.py b/src/pynxtools_xps/reader_utils.py index 06ca7265..91652317 100644 --- a/src/pynxtools_xps/reader_utils.py +++ b/src/pynxtools_xps/reader_utils.py @@ -88,7 +88,7 @@ def data_dict(self) -> dict: def parse_file(self, file, **kwargs): """ - Parse the file using the Scienta TXT parser. + Parse the file using the selected parser. """ self.file = file diff --git a/src/pynxtools_xps/specs/xy/xy_specs.py b/src/pynxtools_xps/specs/xy/xy_specs.py index a6fbe97e..702b6748 100644 --- a/src/pynxtools_xps/specs/xy/xy_specs.py +++ b/src/pynxtools_xps/specs/xy/xy_specs.py @@ -47,6 +47,7 @@ convert_measurement_method, convert_energy_scan_mode, convert_units, + parse_datetime, ) SETTINGS_MAP: Dict[str, str] = { @@ -848,8 +849,15 @@ def _parse_datetime(self, date: str) -> str: date = date.strip() tz = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo # type: ignore[assignment] + possible_time_formats = ["%m/%d/%y %H:%M:%S"] + + date_object = datetime.datetime.strptime(date, "%m/%d/%y %H:%M:%S").replace( tzinfo=tz ) return date_object.isoformat() + + + + return parse_datetime(date, possible_time_formats, tzinfo) diff --git a/src/pynxtools_xps/value_mappers.py b/src/pynxtools_xps/value_mappers.py index fe49250b..55eb7554 100644 --- a/src/pynxtools_xps/value_mappers.py +++ b/src/pynxtools_xps/value_mappers.py @@ -6,7 +6,8 @@ """ import re -from typing import Dict, Any +import datetime +from typing import Dict, List, Any ENERGY_TYPE_MAP = { "BE": "binding", @@ -129,3 +130,49 @@ def get_units_for_key(unit_key: str, unit_map: Dict[str, str]) -> str: if regex_match is None: return unit_map.get(unit_key, None) return regex_match.group(1) + + +def parse_datetime(datetime_string: str, possible_date_formats: List[str], tzinfo: datetime.tzinfo = datetime.timezone.utc) -> str: + """ + Convert a date string to ISO 8601 format with optional timezone handling. + + Convert the native time format to the datetime string + in the ISO 8601 format: '%Y-%b-%dT%H:%M:%S.%fZ'. + For different vendors, there are different possible date formats, + all of which can be checked with this method. + Optionally, a timezone (tzinfo) can be applied to the datetime object if provided. + + + Parameters + ---------- + datetime_string : str + String representation of the date + possible_date_formats : List[str] + List of possible date time formats to attempt for parsing. + tzinfo: datetime.tzinfo + A tzinfo object specifying the desired timezone to apply to the datetime object. + Defaults to UTC (datetime.timezone.utc). + + Raises + ------ + ValueError + If the time format cannot be converted, a ValueError is raised. + + Returns + ------- + str + Datetime in ISO8601 format. + + """ + for date_fmt in possible_date_formats: + try: + datetime_obj = datetime.datetime.strptime(datetime_string, date_fmt) + + if tzinfo is not None: + datetime_obj = datetime_obj.replace(tzinfo=tzinfo) + + return datetime_obj.astimezone().isoformat() + + except ValueError: + continue + raise ValueError("Date and time could not be converted to ISO 8601 format.")