Skip to content

Commit

Permalink
extract datetime parsing to value_mappers
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaspie committed Oct 7, 2024
1 parent 1e8904a commit 9d655e3
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 34 deletions.
156 changes: 156 additions & 0 deletions src/pynxtools_xps/file_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pylint: disable=too-many-lines

"""
Generic Classes for reading XPS files into python dictionary.
"""

from typing import Dict, Tuple

from pynxtools_xps.phi.spe_pro_phi import MapperPhi
from pynxtools_xps.scienta.scienta_reader import MapperScienta
from pynxtools_xps.specs.sle.sle_specs import SleMapperSpecs
from pynxtools_xps.specs.xy.xy_specs import XyMapperSpecs
from pynxtools_xps.specs.xml.xml_specs import XmlMapperSpecs
from pynxtools_xps.vms.txt_vamas_export import TxtMapperVamasExport
from pynxtools_xps.vms.vamas import VamasMapper


class XpsDataFileParser:
"""Class intended for receiving any type of XPS data file."""

__prmt_file_ext__ = ["ibw", "npl", "pro", "spe", "sle", "txt", "vms", "xml", "xy"]
__prmt_metadata_file_ext__ = ["slh"]
__vendors__ = ["kratos", "phi", "scienta", "specs", "unkwown"]
__prmt_vndr_cls: Dict[str, Dict] = {
"ibw": {"scienta": MapperScienta},
"npl": {"unkwown": VamasMapper},
"pro": {"phi": MapperPhi},
"spe": {"phi": MapperPhi},
"sle": {"specs": SleMapperSpecs},
"txt": {
"scienta": MapperScienta,
"unknown": TxtMapperVamasExport,
},
"vms": {"unkwown": VamasMapper},
"xml": {"specs": XmlMapperSpecs},
"xy": {"specs": XyMapperSpecs},
}

__file_err_msg__ = (
"Need an XPS data file with the following extension: " f"{__prmt_file_ext__}"
)

__vndr_err_msg__ = (
"Need an XPS data file from the following vendors: " f"{__vendors__}"
)

def __init__(self, file_paths: Tuple[str, ...]) -> None:
"""
Receive XPS file path.
Parameters
----------
file_paths : List
XPS file path.
"""
if isinstance(file_paths, str):
file_paths = [file_paths]

self.files = file_paths
self.config_file = None

if not self.files:
raise ValueError(XpsDataFileParser.__file_err_msg__)

def get_dict(self, **kwargs) -> dict:
"""
Return python dict fully filled data from xps file.
Returns
-------
python dictionary
"""
for file in self.files:
file_ext = file.rsplit(".")[-1]

if file_ext in XpsDataFileParser.__prmt_file_ext__:
vendor = XpsDataFileParser.check_for_vendors(file)
try:
parser_class = XpsDataFileParser.__prmt_vndr_cls[file_ext][vendor]
parser_obj = parser_class()

parser_obj.parse_file(file, **kwargs)
self.config_file = parser_obj.config_file
return parser_obj.data_dict

except ValueError as val_err:
raise ValueError(XpsDataFileParser.__vndr_err_msg__) from val_err
except KeyError as key_err:
raise KeyError(XpsDataFileParser.__vndr_err_msg__) from key_err
else:
raise ValueError(XpsDataFileParser.__file_err_msg__)
return {}

@classmethod
def check_for_vendors(cls, file: str) -> str:
"""
Check for the vendor name of the XPS data file.
"""
file_ext = file.rsplit(".")[-1]

vendor_dict = XpsDataFileParser.__prmt_vndr_cls[file_ext]

if len(vendor_dict) == 1:
return list(vendor_dict.keys())[0]
if file_ext == "txt":
return cls._check_for_vendors_txt(file)
return None

@classmethod
def _check_for_vendors_txt(cls, file: str) -> str:
"""
Search for a vendor names in a txt file
Parameters
----------
file : str
XPS txt file.
Returns
-------
vendor
Vendor name if that name is in the txt file.
"""
vendor_dict = XpsDataFileParser.__prmt_vndr_cls["txt"]

with open(file, encoding="utf-8") as txt_file:
contents = txt_file.read()

for vendor in vendor_dict:
vendor_options = [vendor, vendor.upper(), vendor.capitalize()]

if any(vendor_opt in contents for vendor_opt in vendor_options):
return vendor
if contents[:6] == "[Info]":
# This is for picking the Scienta reader is "scienta"
# is not in the file
return vendor
return "unknown"
40 changes: 8 additions & 32 deletions src/pynxtools_xps/kratos/metadata_kratos.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
# pylint: disable=too-many-lines,too-many-instance-attributes

import re
import datetime
from typing import Any, Dict, List, Union, Tuple
from pathlib import Path

Expand All @@ -34,6 +33,7 @@
from pynxtools_xps.value_mappers import (
convert_bool,
convert_units,
parse_datetime,
)

from pynxtools_xps.kratos.kratos_data_model import (
Expand Down Expand Up @@ -72,6 +72,8 @@
"sample_tilt": "degree",
}

POSSIBLE_DATE_FORMATS: List[str] = ["%d.%m.%Y %H:%M", "%d/%m/%Y %H:%M"]


class KratosParser:
"""
Expand All @@ -89,7 +91,7 @@ def __init__(self):
self.metadata = KratosMetadata()

self.value_function_map: Dict[str, Any] = {
"date_created": _parse_datetime,
"date_created": parse_datetime,
"description": _convert_description,
"charge_neutraliser": convert_bool,
"deflection": _convert_xray_deflection,
Expand Down Expand Up @@ -218,10 +220,12 @@ def map_values(self, key: str, value, field_type):
Value of correct type and internal structure.
"""

if key in self.value_function_map:
map_fn = self.value_function_map[key]
value = map_fn(value)
if "date" in key:
value = map_fn(value, POSSIBLE_DATE_FORMATS)
else:
value = map_fn(value)
return field_type(value)

def flatten_metadata(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -268,34 +272,6 @@ def setup_unit(flattened_dict: Dict[str, Any], unit_key: str):
return flattened_dict


def _parse_datetime(datetime_string: str) -> str:
"""
Convert the native time format to the datetime string
in the ISO 8601 format: '%Y-%b-%dT%H:%M:%S.%fZ'.
Parameters
----------
value : str
String representation of the date in the format
"%Y-%m-%d", "%m/%d/%Y" or "%H:%M:%S", "%I:%M:%S %p".
Returns
-------
date_object : str
Datetime in ISO8601 format.
"""
possible_date_formats = ["%d.%m.%Y %H:%M", "%d/%m/%Y %H:%M"]
for date_fmt in possible_date_formats:
try:
datetime_obj = datetime.datetime.strptime(datetime_string, date_fmt)
return datetime_obj.astimezone().isoformat()

except ValueError:
continue
raise ValueError("Date and time could not be converted to ISO 8601 format.")


def _convert_description(value: str) -> Dict[str, Any]:
"""Map all items in description to a dictionary."""
pattern = re.compile(
Expand Down
2 changes: 1 addition & 1 deletion src/pynxtools_xps/reader_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def data_dict(self) -> dict:

def parse_file(self, file, **kwargs):
"""
Parse the file using the Scienta TXT parser.
Parse the file using the selected parser.
"""
self.file = file
Expand Down
8 changes: 8 additions & 0 deletions src/pynxtools_xps/specs/xy/xy_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
convert_measurement_method,
convert_energy_scan_mode,
convert_units,
parse_datetime,
)

SETTINGS_MAP: Dict[str, str] = {
Expand Down Expand Up @@ -848,8 +849,15 @@ def _parse_datetime(self, date: str) -> str:
date = date.strip()
tz = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo # type: ignore[assignment]

possible_time_formats = ["%m/%d/%y %H:%M:%S"]


date_object = datetime.datetime.strptime(date, "%m/%d/%y %H:%M:%S").replace(
tzinfo=tz
)

return date_object.isoformat()



return parse_datetime(date, possible_time_formats, tzinfo)
49 changes: 48 additions & 1 deletion src/pynxtools_xps/value_mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"""

import re
from typing import Dict, Any
import datetime
from typing import Dict, List, Any

ENERGY_TYPE_MAP = {
"BE": "binding",
Expand Down Expand Up @@ -129,3 +130,49 @@ def get_units_for_key(unit_key: str, unit_map: Dict[str, str]) -> str:
if regex_match is None:
return unit_map.get(unit_key, None)
return regex_match.group(1)


def parse_datetime(datetime_string: str, possible_date_formats: List[str], tzinfo: datetime.tzinfo = datetime.timezone.utc) -> str:
"""
Convert a date string to ISO 8601 format with optional timezone handling.
Convert the native time format to the datetime string
in the ISO 8601 format: '%Y-%b-%dT%H:%M:%S.%fZ'.
For different vendors, there are different possible date formats,
all of which can be checked with this method.
Optionally, a timezone (tzinfo) can be applied to the datetime object if provided.
Parameters
----------
datetime_string : str
String representation of the date
possible_date_formats : List[str]
List of possible date time formats to attempt for parsing.
tzinfo: datetime.tzinfo
A tzinfo object specifying the desired timezone to apply to the datetime object.
Defaults to UTC (datetime.timezone.utc).
Raises
------
ValueError
If the time format cannot be converted, a ValueError is raised.
Returns
-------
str
Datetime in ISO8601 format.
"""
for date_fmt in possible_date_formats:
try:
datetime_obj = datetime.datetime.strptime(datetime_string, date_fmt)

if tzinfo is not None:
datetime_obj = datetime_obj.replace(tzinfo=tzinfo)

return datetime_obj.astimezone().isoformat()

except ValueError:
continue
raise ValueError("Date and time could not be converted to ISO 8601 format.")

0 comments on commit 9d655e3

Please sign in to comment.