From f319a2d7b0bb8b0e758408c82ce2b84ed97913e7 Mon Sep 17 00:00:00 2001 From: Lukas Pielsticker <50139597+lukaspie@users.noreply.github.com> Date: Tue, 16 Jul 2024 14:38:47 +0200 Subject: [PATCH] static typing --- src/pynxtools_xps/specs/sle/flatten_xml.py | 530 +++++++++++---------- src/pynxtools_xps/specs/sle/sle_specs.py | 399 +++++++++------- 2 files changed, 499 insertions(+), 430 deletions(-) diff --git a/src/pynxtools_xps/specs/sle/flatten_xml.py b/src/pynxtools_xps/specs/sle/flatten_xml.py index bff14a3f..715ef785 100644 --- a/src/pynxtools_xps/specs/sle/flatten_xml.py +++ b/src/pynxtools_xps/specs/sle/flatten_xml.py @@ -1,261 +1,269 @@ -# -*- coding: utf-8 -*- -""" -Created on Mon Jul 15 13:17:23 2024 - -@author: pielsticker -""" - -import re -import copy -from typing import Tuple, Any - -import lxml - -from pynxtools_xps.reader_utils import convert_pascal_to_snake - -from pynxtools_xps.value_mappers import ( - convert_measurement_method, - convert_energy_scan_mode, - MEASUREMENT_METHOD_MAP, - convert_units, -) - - -def extract_devices(elem: lxml.etree._Element): - settings = {} - - for key, val in elem.attrib.items(): - settings[convert_pascal_to_snake(key)] = val - - for param in elem.iter("Parameter"): - settings[param.attrib["name"]] = param.text - - print(settings) - - return settings - - # data['devices'] += [{'device_type' : j.attrib['DeviceType'], - # 'settings':settings}] - - -# data["devices"] += [device.attrib["DeviceType"]] - - -def step_profiling(elem: lxml.etree._Element): - pass - - -def _get_group_metadata(spectrum_group): - """ - Iteratively retrieve metadata for one spectrum group. - - Parameters - ---------- - spectrum_group: xml.etree.ElementTree.Element - XML element containing one spectrum group. - - Returns - ------- - settings: dict - Dictionary containing all metadata for - the spectrum group. - - """ - settings = {} - settings["group_name"] = spectrum_group.attrib["Name"] - settings["group_id"] = spectrum_group.attrib["ID"] - for comm_settings in spectrum_group.iter("CommonSpectrumSettings"): - common_spectrum_settings = _extract_comm_settings(comm_settings) - settings.update(copy.copy(common_spectrum_settings)) - - for spectrum in spectrum_group.iter("Spectrum"): - spectrum_settings = _get_spectrum_metadata(spectrum) - settings.update(copy.copy(spectrum_settings)) - - return settings - - -def _extract_comm_settings(comm_settings): - """ - Iteratively retrieve metadata for common settings of one spectrum group. - - Parameters - ---------- - spectrum_group: xml.etree.ElementTree.Element - XML element containing common settings for one spectrum group. - - Returns - ------- - settings: dict - Dictionary containing all common metadata for - the spectrum group. - - """ - common_spectrum_settings = {} - for setting in comm_settings.iter(): - if setting.tag == "ScanMode": - energy_scan_mode = convert_energy_scan_mode(setting.attrib["Name"]) - common_spectrum_settings[setting.tag] = energy_scan_mode - elif setting.tag == "SlitInfo": - for key, val in setting.attrib.items(): - common_spectrum_settings[key] = val - elif setting.tag == "Lens": - voltage_range = setting.attrib["VoltageRange"] - value, unit = _extract_unit(voltage_range) - common_spectrum_settings["voltage_range"] = float(value) - common_spectrum_settings["voltage_range/@units"] = unit - elif setting.tag == "EnergyChannelCalibration": - common_spectrum_settings["calibration_file/dir"] = setting.attrib["Dir"] - common_spectrum_settings["calibration_file/path"] = setting.attrib["File"] - elif setting.tag == "Transmission": - common_spectrum_settings["transmission_function/file"] = setting.attrib[ - "File" - ] - elif setting.tag == "Iris": - common_spectrum_settings["iris_diameter"] = setting.attrib["Diameter"] - return common_spectrum_settings - - -def _get_spectrum_metadata(spectrum): - """ - Iteratively retrieve metadata for one spectrum. - - Parameters - ---------- - spectrum: xml.etree.ElementTree.Element - XML element containing one spectrum. - - Returns - ------- - spectrum_ settings: dict - Dictionary containing all metadata for - the spectrum. - - """ - spectrum_settings = {} - - spectrum_settings["spectrum_id"] = spectrum.attrib["ID"] - spectrum_settings["spectrum_type"] = spectrum.attrib["Name"] - for setting in spectrum.iter("FixedEnergiesSettings"): - spectrum_settings["dwell_time"] = float(setting.attrib["DwellTime"]) - spectrum_settings["start_energy"] = float(copy.copy(setting.attrib["Ebin"])) - spectrum_settings["pass_energy"] = float(setting.attrib["Epass"]) - spectrum_settings["lens_mode"] = setting.attrib["LensMode"] - spectrum_settings["total_scans"] = int(setting.attrib["NumScans"]) - spectrum_settings["n_values"] = int(setting.attrib["NumValues"]) - spectrum_settings["end_energy"] = float(setting.attrib["End"]) - spectrum_settings["excitation_energy"] = float(setting.attrib["Eexc"]) - spectrum_settings["step_size"] = ( - spectrum_settings["start_energy"] - spectrum_settings["end_energy"] - ) / (spectrum_settings["n_values"] - 1) - for setting in spectrum.iter("FixedAnalyzerTransmissionSettings"): - spectrum_settings["dwell_time"] = float(setting.attrib["DwellTime"]) - spectrum_settings["start_energy"] = float(copy.copy(setting.attrib["Ebin"])) - spectrum_settings["pass_energy"] = float(setting.attrib["Epass"]) - spectrum_settings["lens_mode"] = setting.attrib["LensMode"] - spectrum_settings["total_scans"] = setting.attrib["NumScans"] - spectrum_settings["n_values"] = int(setting.attrib["NumValues"]) - spectrum_settings["end_energy"] = float(setting.attrib["End"]) - spectrum_settings["scans"] = int(setting.attrib["NumScans"]) - spectrum_settings["excitation_energy"] = float(setting.attrib["Eexc"]) - spectrum_settings["step_size"] = ( - spectrum_settings["start_energy"] - spectrum_settings["end_energy"] - ) / (spectrum_settings["n_values"] - 1) - return spectrum_settings - - -FUNC_MAP = { - "DeviceCommand": extract_devices, - "StepProfiling": step_profiling, - "CommonSpectrumSettings": _extract_comm_settings, - "Spectrum": _get_spectrum_metadata, -} - - -def flatten_xml(xml: lxml.etree): - """ - Flatten the nested XML structure, keeping only the needed metadata. - - Parameters - ---------- - xml : xml.etree.ElementTree - XML schedule of the experiment. - - Returns - ------- - collect : list - List of dictionary with spectra metadata. - - """ - - def process_element(elem: lxml.etree._Element): - settings = {} - # Check if the element's tag is in FUNC_MAP - if elem.tag in FUNC_MAP: - # Apply the corresponding function to the element itself - FUNC_MAP[elem.tag](elem) - - # Recursively process each child element - for child in elem: - process_element(child) - - collect = [] - process_element(xml) - - # print(list(xml.iter())) - - for measurement_type in MEASUREMENT_METHOD_MAP: - for group in xml.iter(measurement_type): - data = {} - data["analysis_method"] = convert_measurement_method(measurement_type) - - data["devices"] = [] - - for spectrum_group in group.iter("SpectrumGroup"): - settings = _get_group_metadata(spectrum_group) - data.update(copy.copy(settings)) - collect += [copy.copy(data)] - return collect - - -def _extract_unit(value: str) -> Tuple[Any, str]: - """ - Extract units for the metadata containing unit information. - - Example: - analyser_work_function: 4.506eV - -> analyser_work_function: 4.506, - analyser_work_function_units: eV, - - Parameters - ---------- - key : str - Key of the associated value. - value : str - Combined unit and value information. - - Returns - ------- - value : - value with units. - unit : str - Associated unit. - - """ - - pattern = re.compile(r"([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)([a-zA-Z]+)") - match = pattern.match(value) - - if match: - value, unit = match.groups() - else: - unit = "" - - unit = convert_units(unit) - - # ============================================================================= - # if key in UNIT_MISSING: - # unit = UNIT_MISSING[key] - # ============================================================================= - - return value, unit +# -*- coding: utf-8 -*- +""" +Created on Mon Jul 15 13:17:23 2024 + +@author: pielsticker +""" + +import re +import copy +from typing import Tuple, Dict, Any + +from lxml import etree as ET + +from pynxtools_xps.reader_utils import convert_pascal_to_snake + +from pynxtools_xps.value_mappers import ( + convert_measurement_method, + convert_energy_scan_mode, + MEASUREMENT_METHOD_MAP, + convert_units, +) + + +def extract_devices(elem: ET.Element) -> Dict[str, Any]: + settings = {} + + for key, val in elem.attrib.items(): + settings[convert_pascal_to_snake(key)] = val + + for param in elem.iter("Parameter"): + settings[convert_pascal_to_snake(param.attrib["name"])] = param.text + + return settings + + # data['devices'] += [{'device_type' : j.attrib['DeviceType'], + # 'settings':settings}] + + +# data["devices"] += [device.attrib["DeviceType"]] + + +def step_profiling(elem: ET.Element) -> Dict[str, Any]: + settings = {} + + for setting in elem.iter(): + print(setting.tag, setting.attrib) + + return settings + + +def _get_group_metadata(spectrum_group: ET.Element) -> Dict[str, Any]: + """ + Iteratively retrieve metadata for one spectrum group. + + Parameters + ---------- + spectrum_group: lxml.etree._Element + XML element containing one spectrum group. + + Returns + ------- + settings: dict + Dictionary containing all metadata for + the spectrum group. + + """ + settings = {} + settings["group_name"] = spectrum_group.attrib["Name"] + settings["group_id"] = spectrum_group.attrib["ID"] + for comm_settings in spectrum_group.iter("CommonSpectrumSettings"): + common_spectrum_settings = _extract_comm_settings(comm_settings) + settings.update(copy.copy(common_spectrum_settings)) + + for spectrum in spectrum_group.iter("Spectrum"): + spectrum_settings = _get_spectrum_metadata(spectrum) + settings.update(copy.copy(spectrum_settings)) + + return settings + + +def _extract_comm_settings(comm_settings: ET.Element) -> Dict[str, Any]: + """ + Iteratively retrieve metadata for common settings of one spectrum group. + + Parameters + ---------- + spectrum_group: lxml.etree._Element + XML element containing common settings for one spectrum group. + + Returns + ------- + settings: dict + Dictionary containing all common metadata for + the spectrum group. + + """ + common_spectrum_settings = {} + for setting in comm_settings.iter(): + if setting.tag == "ScanMode": + energy_scan_mode = convert_energy_scan_mode(setting.attrib["Name"]) + common_spectrum_settings[setting.tag] = energy_scan_mode + elif setting.tag == "SlitInfo": + for key, val in setting.attrib.items(): + common_spectrum_settings[key] = val + elif setting.tag == "Lens": + voltage_range = setting.attrib["VoltageRange"] + value, unit = _extract_unit(voltage_range) + common_spectrum_settings["voltage_range"] = float(value) + common_spectrum_settings["voltage_range/@units"] = unit + elif setting.tag == "EnergyChannelCalibration": + common_spectrum_settings["calibration_file/dir"] = setting.attrib["Dir"] + common_spectrum_settings["calibration_file/path"] = setting.attrib["File"] + elif setting.tag == "Transmission": + common_spectrum_settings["transmission_function/file"] = setting.attrib[ + "File" + ] + elif setting.tag == "Iris": + common_spectrum_settings["iris_diameter"] = setting.attrib["Diameter"] + return common_spectrum_settings + + +def _get_spectrum_metadata(spectrum: ET.Element) -> Dict[str, Any]: + """ + Iteratively retrieve metadata for one spectrum. + + Parameters + ---------- + spectrum: lxml.etree._Element + XML element containing one spectrum. + + Returns + ------- + spectrum_ settings: dict + Dictionary containing all metadata for + the spectrum. + + """ + spectrum_settings = {} + + spectrum_settings["spectrum_id"] = spectrum.attrib["ID"] + spectrum_settings["spectrum_type"] = spectrum.attrib["Name"] + for setting in spectrum.iter("FixedEnergiesSettings"): + spectrum_settings["dwell_time"] = float(setting.attrib["DwellTime"]) + spectrum_settings["start_energy"] = float(copy.copy(setting.attrib["Ebin"])) + spectrum_settings["pass_energy"] = float(setting.attrib["Epass"]) + spectrum_settings["lens_mode"] = setting.attrib["LensMode"] + # spectrum_settings["total_scans"] = int(setting.attrib["NumScans"]) + spectrum_settings["n_values"] = int(setting.attrib["NumValues"]) + # spectrum_settings["end_energy"] = float(setting.attrib["End"]) + # spectrum_settings["excitation_energy"] = float(setting.attrib["Eexc"]) + # spectrum_settings["step_size"] = ( + # spectrum_settings["start_energy"] - spectrum_settings["end_energy"] + # ) / (spectrum_settings["n_values"] - 1) + for setting in spectrum.iter("FixedAnalyzerTransmissionSettings"): + spectrum_settings["dwell_time"] = float(setting.attrib["DwellTime"]) + spectrum_settings["start_energy"] = float(copy.copy(setting.attrib["Ebin"])) + spectrum_settings["pass_energy"] = float(setting.attrib["Epass"]) + spectrum_settings["lens_mode"] = setting.attrib["LensMode"] + # spectrum_settings["total_scans"] = setting.attrib["NumScans"] + spectrum_settings["n_values"] = int(setting.attrib["NumValues"]) + spectrum_settings["end_energy"] = float(setting.attrib["End"]) + # spectrum_settings["excitation_energy"] = float(setting.attrib["Eexc"]) + # spectrum_settings["step_size"] = ( + # spectrum_settings["start_energy"] - spectrum_settings["end_energy"] + # ) / (spectrum_settings["n_values"] - 1) + return spectrum_settings + + +FUNC_MAP = { + "DeviceCommand": extract_devices, + "StepProfiling": step_profiling, + "CommonSpectrumSettings": _extract_comm_settings, + "Spectrum": _get_spectrum_metadata, +} + + +def flatten_xml(xml: ET.Element) -> Dict[str, Any]: + """ + Flatten the nested XML structure, keeping only the needed metadata. + + Parameters + ---------- + xml : lxml.etree + XML schedule of the experiment. + + Returns + ------- + collect : list + List of dictionary with spectra metadata. + + """ + + def process_element(elem: ET.Element, settings: Dict[str, Any]): + # Check if the element's tag is in FUNC_MAP + if elem.tag in FUNC_MAP: + # Apply the corresponding function to the element itself + elem_settings = FUNC_MAP[elem.tag](elem) + print(elem_settings) + settings.update(elem_settings) + + # Recursively process each child element + for child in elem: + process_element(child, settings) + + return settings + + collect = [] + + for measurement_type in MEASUREMENT_METHOD_MAP: + for group in xml.iter(measurement_type): + data = {} + data["devices"] = [] + data["analysis_method"] = convert_measurement_method(measurement_type) + process_element(group, data) + + collect += [copy.copy(data)] + # ============================================================================= + # + # for spectrum_group in group.iter("SpectrumGroup"): + # settings = _get_group_metadata(spectrum_group) + # data.update(copy.copy(settings)) + # + # ============================================================================= + print(collect) + + return collect + + +def _extract_unit(value: str) -> Tuple[Any, str]: + """ + Extract units for the metadata containing unit information. + + Example: + analyser_work_function: 4.506eV + -> analyser_work_function: 4.506, + analyser_work_function_units: eV, + + Parameters + ---------- + key : str + Key of the associated value. + value : str + Combined unit and value information. + + Returns + ------- + value : + value with units. + unit : str + Associated unit. + + """ + + pattern = re.compile(r"([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)([a-zA-Z]+)") + match = pattern.match(value) + + if match: + value, unit = match.groups() + else: + unit = "" + + unit = convert_units(unit) + + # ============================================================================= + # if key in UNIT_MISSING: + # unit = UNIT_MISSING[key] + # ============================================================================= + + return value, unit diff --git a/src/pynxtools_xps/specs/sle/sle_specs.py b/src/pynxtools_xps/specs/sle/sle_specs.py index 13fc70db..d1d224ca 100644 --- a/src/pynxtools_xps/specs/sle/sle_specs.py +++ b/src/pynxtools_xps/specs/sle/sle_specs.py @@ -48,6 +48,7 @@ convert_energy_type, convert_energy_scan_mode, get_units_for_key, + parse_datetime, ) from pynxtools_xps.specs.sle.flatten_xml import flatten_xml @@ -114,7 +115,7 @@ def _select_parser(self): f"Version f{version} of SPECS Prodigy is currently not supported." ) from exc - def _get_sle_version(self): + def _get_sle_version(self) -> str: """Get the Prodigy SLE version from the file.""" con = sqlite3.connect(self.sql_connection) cur = con.cursor() @@ -125,21 +126,41 @@ def _get_sle_version(self): version = version[0] + "." + version[1].split("-")[0] return version - def parse_file(self, file, **kwargs): + def parse_file(self, file: str, **kwargs: Dict[str, Any]) -> Dict[str, Any]: """ Parse the file using the parser that fits the Prodigy SLE version. + Returns flat list of dictionaries containing one spectrum each. + Parameters + ---------- + file : str + String name of the file. + **kwargs : Dict[str, Any] + Dict with additional keyword arguments. + + Returns + ------- + Dict[str, Any] + Dict with parsed data. + """ self.sql_connection = file return super().parse_file(file, **kwargs) def construct_data(self): - """Map SLE format to NXmpes-ready dict.""" + """ + Map SLE format to NXmpes-ready dict. + + Returns + ------- + None. + + """ # pylint: disable=duplicate-code spectra = copy.deepcopy(self.raw_data) - self._xps_dict["data"]: dict = {} + self._xps_dict["data"]: Dict[str, Any] = {} template_key_map = { "user": [], "instrument": [ @@ -219,6 +240,17 @@ def _update_xps_dict_with_spectrum( """ Map one spectrum from raw data to NXmpes-ready dict. + Parameters + ---------- + spectrum : Dict[str, Any] + Dictionary with data and metadata for one spectrum. + template_key_map : Dict[str, str] + Mapping to NXmpes terms. + + Returns + ------- + None. + """ # pylint: disable=too-many-locals,duplicate-code entry_parts = [] @@ -331,6 +363,68 @@ def _update_xps_dict_with_spectrum( self._xps_dict[detector_data_unit_key] = detector_data_units +KEY_MAP: Dict[str, str] = { + "Udet": "detector_voltage", + "Comment": "comments", + "ElectronEnergy": "start_energy", + "SpectrumID": "spectrum_id", + "EpassOrRR": "pass_energy", + "EnergyType": "energy/@type", + "Samples": "n_values", + "Wf": "work_function", + "Step": "step", + "Ubias": "electron_bias", + "DwellTime": "dwell_time", + "NumScans": "total_scans", + "LensMode": "lens_mode", + "Timestamp": "time_stamp", + "Entrance": "entrance_slit", + "Exit": "exit_slit", + "ScanMode": "energy_scan_mode", + "VoltageRange": "voltage_range", + # spectrometer settings + "Coil Current [mA]": "coil_current [mA]", + "Pre Defl Y [nU]": "pre_deflector_y_current [nU]", + "Pre Defl X [nU]": "pre_deflector_x_current [nU]", + "L1 [nU]": "lens1_voltage [nU]", + "L2 [nU]": "lens2_voltage [nU]", + "Focus Displacement 1 [nu]": "focus_displacement_current [nU]", + "Detector Voltage [V]": "detector_voltage [V]", + "Bias Voltage Electrons [V]": "bias_voltage_electrons [V]", + "Bias Voltage Ions [V]": "bias_voltage_ions [V]", + # source settings + "anode": "source_label", + "uanode": "source_voltage", + "iemission": "emission_current", + "ihv": "source_high_voltage", + "ufilament": "filament_voltage", + "ifilament": "filament_current", + "DeviceExcitationEnergy": "excitation_energy", + "panode": "anode_power", + "temperature": "source_temperature", + # sql metadata map: Dict[str, str] + "EnergyType": "energy/@type", + "EpassOrRR": "pass_energy", + "Wf": "work_function", + "Timestamp": "time_stamp", + "Samples": "n_values", + "ElectronEnergy": "start_energy", +} + +VALUE_MAP: Dict[str, Any] = { + "energy/@type": convert_energy_type, + "excitation_energy": float, + "time_stamp": parse_datetime, + "energy_scan_mode": convert_energy_scan_mode, +} + +KEYS_TO_DROP: List[str] = [ + "Work Function", +] + +POSSIBLE_DATE_FORMATS: List[str] = ["%Y-%b-%d %H:%M:%S.%f"] + + class SleProdigyParser(ABC): """ Generic parser without reading capabilities, @@ -339,114 +433,46 @@ class SleProdigyParser(ABC): def __init__(self): self.con = "" - self.spectra = [] - self.xml = None - self.sum_channels = False - self.remove_align = True - - keys_map = { - "Udet": "detector_voltage", - "Comment": "comments", - "ElectronEnergy": "start_energy", - "SpectrumID": "spectrum_id", - "EpassOrRR": "pass_energy", - "EnergyType": "energy/@type", - "Samples": "n_values", - "Wf": "work_function", - "Step": "step", - "Ubias": "electron_bias", - "DwellTime": "dwell_time", - "NumScans": "total_scans", - "LensMode": "lens_mode", - "Timestamp": "time_stamp", - "Entrance": "entrance_slit", - "Exit": "exit_slit", - "ScanMode": "energy_scan_mode", - "VoltageRange": "voltage_energy_range", - } + self.spectra: List[Dict[str, Any]] = [] + self.xml: ET.Element = None + self.sum_channels: bool = False + self.remove_align: bool = True - spectrometer_setting_map = { - "Coil Current [mA]": "coil_current [mA]", - "Pre Defl Y [nU]": "pre_deflector_y_current [nU]", - "Pre Defl X [nU]": "pre_deflector_x_current [nU]", - "L1 [nU]": "lens1_voltage [nU]", - "L2 [nU]": "lens2_voltage [nU]", - "Focus Displacement 1 [nu]": "focus_displacement_current [nU]", - "Detector Voltage [V]": "detector_voltage [V]", - "Bias Voltage Electrons [V]": "bias_voltage_electrons [V]", - "Bias Voltage Ions [V]": "bias_voltage_ions [V]", - } + self.encoding: List[str, float] = ["f", 4] - source_setting_map = { - "anode": "source_label", - "uanode": "source_voltage", - "iemission": "emission_current", - "ihv": "source_high_voltage", - "ufilament": "filament_voltage", - "ifilament": "filament_current", - "DeviceExcitationEnergy": "excitation_energy", - "panode": "anode_power", - "temperature": "source_temperature", - } - - self.sql_metadata_map = { - "EnergyType": "energy/@type", - "EpassOrRR": "pass_energy", - "Wf": "work_function", - "Timestamp": "time_stamp", - "Samples": "n_values", - "ElectronEnergy": "start_energy", - "Step": "step_size", - } - - self.key_maps = [ - keys_map, - spectrometer_setting_map, - source_setting_map, - self.sql_metadata_map, - ] - - self.value_map = { - "energy/@type": convert_energy_type, - "excitation_energy": self._convert_excitation_energy, - "time_stamp": self._convert_date_time, - "energy_scan_mode": convert_energy_scan_mode, - } - - self.keys_to_drop = [ - "Work Function", - ] - - self.encoding = ["f", 4] - - def initiate_file_connection(self, filepath): - """Set the filename of the file to be opened.""" + def initiate_file_connection(self, filepath: str): + """Set the sqllite connection of the file to be opened.""" sql_connection = filepath self.con = sqlite3.connect(sql_connection) - def parse_file(self, filepath, **kwargs): + def parse_file( + self, filepath: str, **kwargs: Dict[str, Any] + ) -> List[Dict[str, Any]]: """ Parse the file's data and metadata into a flat list of dictionaries. - Parameters ---------- - filename : str + filepath : str Filepath of the SLE file to be read. + **kwargs : Dict[str, Any] + Additional keyword arguments: + remove_align(bool): + Whether or not alignment spectra shall be removed. + sum_channels(bool): + Whether or not channel data shall be summed. Returns ------- - self.spectra + List[Dict[str, Any]] Flat list of dictionaries containing one spectrum each. """ if "remove_align" in kwargs: self.remove_align = kwargs["remove_align"] - try: + if "sum_channels" in kwargs: self.sum_channels = kwargs["sum_channels"] - except KeyError: - self.sum_channels = False # initiate connection to sql file self.initiate_file_connection(filepath) @@ -549,7 +575,7 @@ def _append_scan_data(self): # update self.spectra with the scan data self.spectra = individual_scans - def _get_transmission(self, node_id): + def _get_transmission(self, node_id: int) -> np.ndarray: """ Get the transmission function data. @@ -581,24 +607,23 @@ def _get_transmission(self, node_id): return stream - def _separate_channels(self, data, n_channels): + def _separate_channels(self, data: List[float], n_channels: int) -> np.ndarray: """ Separate energy channels. Parameters ---------- - data : list - Array of measured daata . + data : List[float] + List of measured data. n_channels : int Number of channels to be summed. Returns ------- - list - Summed data across n_channels. + TYPE + Separate data across n_channels. """ - n_points = int(len(data) / n_channels) return np.reshape(np.array(data), (n_channels, n_points)) @@ -818,7 +843,7 @@ def _separate_channels(self, data, n_channels): # # return channel_dict - def _check_energy_channels(self, node_id): + def _check_energy_channels(self, node_id: int) -> int: """ Get the number of separate energy channels for the spectrum. @@ -843,7 +868,7 @@ def _check_energy_channels(self, node_id): n_channels = result[0][0] return n_channels - def _get_raw_ids(self, node_id): + def _get_raw_ids(self, node_id: int) -> List[int]: """ Get the raw IDs from SQL. @@ -859,7 +884,7 @@ def _get_raw_ids(self, node_id): Returns ------- - list + List[int] List of raw IDs for the given note ID. """ @@ -869,7 +894,7 @@ def _get_raw_ids(self, node_id): return [i[0] for i in cur.fetchall()] - def _check_number_of_scans(self, node_id): + def _check_number_of_scans(self, node_id: int) -> int: """ Get the number of separate scans for the spectrum. @@ -889,7 +914,7 @@ def _check_number_of_scans(self, node_id): cur.execute(query) return len(cur.fetchall()) - def _get_detector_data(self, node_id): + def _get_detector_data(self, node_id: int) -> List[float]: """ Get the detector data from sle file. @@ -904,7 +929,7 @@ def _get_detector_data(self, node_id): Returns ------- - detector_data : list + detector_data : List[float] List of lists with measured data. """ @@ -949,22 +974,22 @@ def _attach_device_protocols(self): protocal_params = self._get_one_device_protocol(protocol) spectrum.update(protocal_params) - def _get_one_device_protocol(self, protocol): + def _get_one_device_protocol(self, protocol: ET.Element) -> Dict[str, Any]: """ Get all parameters for one device protocol Parameters ---------- - protocol : xml.etree.ElementTree.Element + protocol : lxml.etree.Element One device protocol. Returns ------- - protocal_params : dict + protocal_params: Dict[str, Any] All parameters given in the device protocol. """ - protocal_params = {} + protocol_params: Dict[str, Any] = {} for device in protocol.iter("Command"): if "Phoibos" in device.attrib["UniqueDeviceName"]: # iterate through the parameters and add to spectrum @@ -974,18 +999,18 @@ def _get_one_device_protocol(self, protocol): param_text = float(parameter.text) else: param_text = parameter.text - protocal_params[parameter.attrib["name"]] = param_text + protocol_params[parameter.attrib["name"]] = param_text elif "XRC1000" in device.attrib["UniqueDeviceName"]: for parameter in device.iter("Parameter"): if parameter.attrib["type"] == "double": param_text = float(parameter.text) else: param_text = parameter.text - protocal_params[parameter.attrib["name"]] = param_text + protocol_params[parameter.attrib["name"]] = param_text - return protocal_params + return protocol_params - def _get_one_scan(self, raw_id): + def _get_one_scan(self, raw_id: int) -> List[float]: """ Get the detector data for a single scan and convert it to float. @@ -1000,7 +1025,7 @@ def _get_one_scan(self, raw_id): Returns ------- - stream : list + List[float] List with measured data. """ @@ -1018,14 +1043,14 @@ def _get_one_scan(self, raw_id): stream.append(struct.unpack(encoding, data[i : i + buffer])[0]) return stream - def _parse_external_channels(self, channel): + def _parse_external_channels(self, channel: int): """ - Parse additional external channels. + Parse additional external channels by channel number. Parameters ---------- channel : int - DESCRIPTION. + Channel number. Returns ------- @@ -1072,8 +1097,10 @@ def _get_spectrum_metadata_from_sql(self): spectrum["work_function"] = i.attrib["Workfunction"] spectrum["step_size"] = float(i.attrib["ScanDelta"]) - def _get_scan_metadata(self, raw_id): + def _get_scan_metadata(self, raw_id: int) -> Dict[str, Any]: """ + Get metadata for each scan. + Get the scan and the loop/iteration number of each spectrum scan and the datetime it was taken from the RawData table. @@ -1084,7 +1111,7 @@ def _get_scan_metadata(self, raw_id): Returns ------- - scan_meta : dict + Dict[str, Any] dictionary containing scan metadata. """ @@ -1093,28 +1120,29 @@ def _get_scan_metadata(self, raw_id): query = f'SELECT ScanDate, Trace FROM RawData WHERE RawID="{raw_id}"' result = cur.execute(query).fetchone() # process metadata into a dictionary - scan_meta = {} + scan_meta: Dict[str, Any] = {} scan_meta["time_stamp_trace"] = result[0] scan_meta.update(self._process_trace(result[1])) return scan_meta - def _process_trace(self, trace): + def _process_trace(self, trace: str) -> Dict[str, Any]: """ - Parse Trace string to determine the Scan, loop and iteration for the - given trace. + Parse Trace string to determine the scan, loop, and iteration + for the given trace. Parameters ---------- trace : str - string to be parsed. + Trace string to be parsed. Returns ------- - trace_dict : dict - dictionary containing scan loop and iteration params + Dict[str, Any] + Dictionary containing scan loop and iteration params + """ - trace_dict = {} + trace_dict: Dict[str, Any] = {} loop = re.findall(r"Loop=([0-9]+)u", trace) if len(loop) != 0: trace_dict["loop_no"] = loop[0] @@ -1129,21 +1157,23 @@ def _process_trace(self, trace): return trace_dict - def _convert_to_counts_per_sec(self, signal_data, dwell_time): + def _convert_to_counts_per_sec( + self, signal_data: np.ndarray, dwell_time: float + ) -> np.ndarray: """ Convert signal data given in counts to counts per second. Parameters ---------- - signal_data : list + signal_data : np.ndarray 2D array of floats representing counts Shape: (n_channel, n_value) dwell_time : float - value of dwell_time per scan. + Value of dwell_time per scan. Returns ------- - cps : array + cps : np.ndarray 2D array of values converted to counts per second. Shape: (n_channel, n_value) @@ -1151,7 +1181,7 @@ def _convert_to_counts_per_sec(self, signal_data, dwell_time): cps = signal_data / dwell_time return cps - def _get_sql_node_id(self, xml_id): + def _get_sql_node_id(self, xml_id: int) -> int: """ Get the SQL internal ID for the NodeID taken from XML. @@ -1208,19 +1238,19 @@ def _remove_empty_nodes(self): if len(result) == 0: del self.spectra[idx] - def _get_energy_data(self, spectrum): + def _get_energy_data(self, spectrum: Dict[str, Any]) -> np.ndarray: """ - Create an array of x values. + Create an array of energy values. Parameters ---------- - spectrum : dict + spectrum : Dict[str, Any] Dictionary with spectrum data and metadata. Returns ------- - x : list - List of uniformly separated energy values. + np.ndarray + Array of uniformly separated energy values. """ if spectrum["energy/@type"] == "binding": @@ -1235,22 +1265,21 @@ def _get_energy_data(self, spectrum): energy = [start + i * step for i in range(points)] return np.array(energy) - def _get_table_names(self): + def _get_table_names(self) -> List[str]: """ Get a list of table names in the current database file. Returns ------- - data : list + List[str] List of spectrum names. """ cur = self.con.cursor() cur.execute('SELECT name FROM sqlite_master WHERE type= "table"') - data = [i[0] for i in cur.fetchall()] - return data + return [i[0] for i in cur.fetchall()] - def _get_column_names(self, table_name): + def _get_column_names(self, table_name: str) -> List[str]: """ Get the names of the columns in the table. @@ -1261,8 +1290,8 @@ def _get_column_names(self, table_name): Returns ------- - names : list - List of descriptions. + List[str] + List of column names. """ cur = self.con.cursor() @@ -1281,38 +1310,19 @@ def _close_con(self): """ self.con.close() - def _convert_excitation_energy(self, excitation_energy): - """ - Convert the excitation_energy to a float. - - """ - return float(excitation_energy) - - def _convert_date_time(self, timestamp): - """ - Convert the native time format to the one we decide to use. - Returns datetime string in the format '%Y-%b-%d %H:%M:%S.%f'. - - """ - date_time = datetime.strptime(timestamp, "%Y-%b-%d %H:%M:%S.%f") - date_time = datetime.strftime(date_time, "%Y-%m-%d %H:%M:%S.%f") - return date_time - - def _sum_channels(self, data): + def _sum_channels(self, data: List[float]) -> np.ndarray: """ Sum together energy channels. Parameters ---------- - data : list - Array of measured daata . - n : int - Number of channels to be summed. + data : List[float] + List of measured data. Returns ------- - list - Summed data across n_channels. + np.ndarray + Summed energy channels. """ summed = np.sum(data, axis=0) @@ -1332,7 +1342,7 @@ def _check_encoding(self): cur.execute(query) data, chunksize = cur.fetchall()[0] - encodings_map = { + encodings_map: Dict[str, List[str, float]] = { "double": ["d", 8], "float": ["f", 4], } @@ -1344,6 +1354,57 @@ def _check_encoding(self): else: logger.error("This binary encoding is not supported.") + def _reindex_spectra(self): + """Re-number the spectrum_id.""" + for idx, spectrum in enumerate(self.spectra): + spectrum["spectrum_id"] = idx + + def _reindex_groups(self): + """Re-number the group_id.""" + group_ids = list({spec["group_id"] for spec in self.spectra}) + for idx, group_id in enumerate(group_ids): + for spec in self.spectra: + if int(spec["group_id"]) == int(group_id): + spec["group_id"] = copy.copy(idx) + + def _convert_to_common_format(self): + """ + Reformat spectra into the format needed for the Mapper object + """ + maps = {} + for key_map in self.key_maps: + maps.update(key_map) + for spec in self.spectra: + re_map_keys(spec, maps) + re_map_values(spec, self.value_map) + drop_unused_keys(spec, self.keys_to_drop) + spec["data"] = {} + spec["data"]["x"] = self._get_energy_data(spec) + + channels = [ + key + for key in spec + if any(name in key for name in ["cps_ch_", "cps_calib"]) + ] + + for channel_key in channels: + spec["data"][channel_key] = np.array(spec[channel_key]) + for channel_key in channels: + spec.pop(channel_key) + + spec["energy/@units"] = "eV" + spec["intensity/@units"] = "counts_per_second" + + # Add energy axis for TF data. + if spec["energy/@type"] == "binding": + tf_energy = np.array( + [spec["excitation_energy"] - x for x in spec["data"]["x"]] + ) + elif spec["energy/@type"] == "kinetic": + tf_energy = spec["data"]["x"] + + spec["transmission_function/kinetic_energy"] = tf_energy + def _remove_fixed_energies(self): """ Remove spectra measured with the scan mode FixedEnergies. @@ -1369,7 +1430,7 @@ def _remove_snapshot(self): spec for spec in self.spectra if "snapshot" not in spec["energy_scan_mode"] ] - def get_sle_version(self): + def get_sle_version(self) -> str: """ Get the Prodigy SLE version from the file.