diff --git a/src/pynxtools_xps/specs/sle/sle_specs.py b/src/pynxtools_xps/specs/sle/sle_specs.py index 8e278da4..2354539a 100644 --- a/src/pynxtools_xps/specs/sle/sle_specs.py +++ b/src/pynxtools_xps/specs/sle/sle_specs.py @@ -28,7 +28,6 @@ """ import re -import struct import copy import logging from pathlib import Path @@ -37,8 +36,10 @@ from packaging.version import Version, InvalidVersion import sqlite3 from lxml import etree as ET +import zlib import numpy as np import xarray as xr +from scipy.interpolate import interp1d from pynxtools_xps.reader_utils import ( XPSMapper, @@ -49,13 +50,14 @@ drop_unused_keys, update_dict_without_overwrite, ) -from pynxtools_xps.value_mappers import ( - convert_energy_type, - convert_energy_scan_mode, - get_units_for_key, - parse_datetime, +from pynxtools_xps.value_mappers import get_units_for_key +from pynxtools_xps.specs.sle.utils import ( + KEY_MAP, + VALUE_MAP, + UNITS, + iterate_xml_at_tag, + format_key_and_value, ) -from pynxtools_xps.specs.sle.specs_sle_mapping import KEY_MAP, VALUE_MAP from pynxtools_xps.specs.sle.flatten_xml import ( flatten_schedule, flatten_context, @@ -64,28 +66,9 @@ logger = logging.getLogger(__name__) -UNITS: Dict[str, str] = { - "electronanalyser/work_function": "eV", - "beam/excitation_energy": "eV", - "collectioncolumn/iris_diameter": "mm", - "data/step_size": "eV", - "detector/detector_voltage": "V", - "detector/dwell_time": "s", - "detector/raw_data/raw": "counts_per_second ", - "instrument/polar_angle": "degree ", - "instrument/azimuth_angle": "degree", - "energydispersion/pass_energy": "eV", - "region/start_energy": "eV", - "source/emission_current": "A", - "source/source_voltage": "V", - "collectioncolumn/transmission_function/kinetic_energy": "eV", - "process/transmission_correction/transmission_function/kinetic_energy": "eV", -} - - -def execute_sql_query_on_con(con: sqlite3.Connection, query: str): - """Excute a query on a sqlite connection object.""" - cur = con.cursor() + +def execute_sql_query_with_cur(cur: sqlite3.Cursor, query: str): + """Execute a query with a sqlite3 Cursor object.""" cur.execute(query) return cur.fetchall() @@ -104,13 +87,14 @@ def __init__(self): ] self.file: Union[str, Path] = "" + self.multiple_spectra_groups: bool = True super().__init__() def _get_sle_version(self): con = sqlite3.connect(self.file) query = 'SELECT Value FROM Configuration WHERE Key="Version"' - return execute_sql_query_on_con(con, query)[0][0] + return execute_sql_query_with_cur(con.cursor(), query)[0][0] def _select_parser(self): """ @@ -226,85 +210,19 @@ def construct_data(self): None. """ - # pylint: disable=duplicate-code spectra = copy.deepcopy(self.raw_data) + if len({spectrum.get("group_name") for spectrum in spectra}) == 1: + self.multiple_spectra_groups = False + self._xps_dict["data"]: Dict[str, Any] = {} - template_key_map = { - "user": [], - "instrument": [ - "polar_angle", - "azimuth_angle", - ], - "source": [ - "source_label", - "source_voltage", - "operating_mode", - "emission_current", - ], - "beam": ["excitation_energy"], - "electronanalyser": [ - "voltage_energy_range", - "voltage_energy_range/@units", - "work_function", - ], - "collectioncolumn": [ - "lens1_voltage [nU]", - "lens2_voltage [nU]", - "coil_current [mA]", - "pre_deflector_x_current [nU]", - "pre_deflector_y_current [nU]", - "focus_displacement_current [nU]", - "iris_diameter", - "lens_mode", - "transmission_function/kinetic_energy", - "transmission_function/relative_intensity", - "transmission_function/file", - ], - "energydispersion": [ - "energy_scan_mode", - "entrance_slit", - "exit_slit", - "pass_energy", - ], - "detector": [ - "bias_voltage_electrons [V]", - "bias_voltage_ions [V]", - "detector_voltage [V]", - "dwell_time", - ], - "manipulator": [], - "process/energy_calibration": [ - "calibration_file/dir", - "calibration_file/path", - "energy/@units", - ], - "process/transmission_correction": [ - "transmission_function/kinetic_energy", - "transmission_function/relative_intensity", - "transmission_function/file", - ], - "data": [ - "energy/@type", - "energy/@units", - "intensity/@units", - "n_values", - "step_size", - ], - "region": [ - "analysis_method", - "start_energy", - "spectrum_comment", - "time_stamp", - "total_scans", - ], - } for spectrum in spectra: - self._update_xps_dict_with_spectrum(spectrum, template_key_map) + pass # self._update_xps_dict_with_spectrum(spectrum) def _update_xps_dict_with_spectrum( - self, spectrum: Dict[str, Any], template_key_map: Dict[str, str] + self, + spectrum: Dict[str, Any], ): """ Map one spectrum from raw data to NXmpes-ready dict. @@ -313,8 +231,6 @@ def _update_xps_dict_with_spectrum( ---------- spectrum : Dict[str, Any] Dictionary with data and metadata for one spectrum. - template_key_map : Dict[str, str] - Mapping to NXmpes terms. Returns ------- @@ -323,7 +239,11 @@ def _update_xps_dict_with_spectrum( """ # pylint: disable=too-many-locals,duplicate-code entry_parts = [] - for part in ["group_name", "spectrum_type"]: + + parts_to_use = ["group_name"] * bool(self.multiple_spectra_groups) + [ + "spectrum_type" + ] + for part in parts_to_use: val = spectrum.get(part, None) if val: entry_parts += [val] @@ -331,39 +251,16 @@ def _update_xps_dict_with_spectrum( entry = construct_entry_name(entry_parts) entry_parent = f"/ENTRY[{entry}]" - instrument_parent = f"{entry_parent}/instrument" - analyser_parent = f"{instrument_parent}/electronanalyser" - - path_map = { - "user": f"{entry_parent}/user", - "instrument": f"{instrument_parent}", - "source": f"{instrument_parent}/source", - "beam": f"{instrument_parent}/beam", - "electronanalyser": f"{analyser_parent}", - "collectioncolumn": f"{analyser_parent}/collectioncolumn", - "energydispersion": f"{analyser_parent}/energydispersion", - "detector": f"{analyser_parent}/detector", - "manipulator": f"{instrument_parent}/manipulator", - "process/energy_calibration": f"{entry_parent}/process/energy_calibration", - "process/transmission_correction": f"{entry_parent}/process/transmission_correction", - "sample": f"{entry_parent}/sample", - "data": f"{entry_parent}/data", - "region": f"{entry_parent}/region", - } + for key, value in spectrum.items(): + mpes_key = f"{entry_parent}/{key}" + self._xps_dict[mpes_key] = value - for grouping, spectrum_keys in template_key_map.items(): - root = path_map[str(grouping)] - for spectrum_key in spectrum_keys: - mpes_key = spectrum_key.rsplit(" ", 1)[0] - self._xps_dict[f"{root}/{mpes_key}"] = spectrum[spectrum_key] + units = get_units_for_key(key, UNITS) + if units is not None: + self._xps_dict[f"{mpes_key}/@units"] = units - unit_key = f"{grouping}/{spectrum_key}" - units = get_units_for_key(unit_key, UNITS) - if units is not None: - self._xps_dict[f"{root}/{mpes_key}/@units"] = units - - self._xps_dict[f'{path_map["electronanalyser"]}/name'] = spectrum["devices"][0] - self._xps_dict[f'{path_map["source"]}/name'] = spectrum["devices"][1] + # self._xps_dict[f'{path_map["electronanalyser"]}/name'] = spectrum["devices"][0] + # self._xps_dict[f'{path_map["source"]}/name'] = spectrum["devices"][1] # Create keys for writing to data scan_key = construct_data_key(spectrum) @@ -394,7 +291,7 @@ def _update_xps_dict_with_spectrum( # TODO: fix this hotfix so that all data can be written # Add energy axis to energy_calibration - calib_energy_key = f'{path_map["process/energy_calibration"]}/energy' + calib_energy_key = f"{entry}/process/energy_calibration/energy" self._xps_dict[calib_energy_key] = energy self._xps_dict["data"][entry][scan_key.split("_")[0]] = xr.DataArray( @@ -418,16 +315,15 @@ def _update_xps_dict_with_spectrum( cps = np.array(spectrum["data"][channel]) # # Write raw data to detector. - # self._xps_dict[detector_data_key] = spectrum["data"]["cps_calib"] # Write channel data to 'data'. self._xps_dict["data"][entry][channel_key] = xr.DataArray( data=cps, coords={"energy": energy} ) # Add unit for detector data - detector_data_unit_key = f"{path_map['detector']}/raw_data/raw/@units" + detector_data_unit_key = f"{entry}/raw_data/raw/@units" - detector_data_units = get_units_for_key("detector/raw_data/raw", UNITS) + detector_data_units = get_units_for_key("raw_data/raw", UNITS) if detector_data_units is not None: self._xps_dict[detector_data_unit_key] = detector_data_units @@ -446,13 +342,14 @@ class SleProdigyParser: ] def __init__(self): - self.con = "" + self.con: sqlite3.Conncetion = None + self.cur = sqlite3.Cursor = None + self.spectra: List[Dict[str, Any]] = [] self.xml_schedule: ET.Element = None self.xml_context: ET.Element = None self.xml_metainfo: ET.Element = None - self.sum_channels: bool = False self.remove_align: bool = True self.encodings_dtype = { @@ -462,12 +359,6 @@ def __init__(self): } self.encoding = np.float32 - encodings_map: Dict[str, List[str, float]] = { - "short": ["h", 2], - "double": ["d", 8], - "float": ["f", 4], - } - def parse_file(self, file: str, **kwargs: Dict[str, Any]) -> List[Dict[str, Any]]: """ Parse the file's data and metadata into a flat list of dictionaries. @@ -495,6 +386,9 @@ def parse_file(self, file: str, **kwargs: Dict[str, Any]) -> List[Dict[str, Any] # initiate connection to sql file self.initiate_file_connection(file) + self.version = self._get_version() + self.app_version = self._get_app_version() + # read and parse sle file self._get_xml_schedule() self._get_xml_context() @@ -507,23 +401,24 @@ def parse_file(self, file: str, **kwargs: Dict[str, Any]) -> List[Dict[str, Any] update_dict_without_overwrite(spectrum, flatten_metainfo(self.xml_metainfo)) self._attach_node_ids() + self._get_spectrum_metadata_from_sql() self._remove_empty_nodes() self._attach_device_protocols() - self._get_spectrum_metadata_from_sql() + self._check_encoding() self._append_scan_data() self._convert_to_common_format() - self._close_con() + # self._close_con() - if self.remove_align: - self._remove_fixed_energies() + # if self.remove_align: + # self._remove_fixed_energies() self._remove_syntax() - self._remove_snapshot() - self._reindex_spectra() - self._reindex_groups() + # self._remove_snapshot() + # self._reindex_spectra() + # self._reindex_groups() return self.spectra @@ -531,17 +426,19 @@ def initiate_file_connection(self, file: str): """Set the sqllite connection of the file to be opened.""" sql_connection = file self.con = sqlite3.connect(sql_connection) + self.cur = self.con.cursor() def _execute_sql_query(self, query: str): - return execute_sql_query_on_con(self.con, query) + """Excute a query on the file.""" + return execute_sql_query_with_cur(self.cur, query) def _get_version(self): query = 'SELECT Value FROM Configuration WHERE Key="Version"' - self.version = self._execute_sql_query(query)[0][0] + return self._execute_sql_query(query)[0][0] def _get_app_version(self): query = 'SELECT Value FROM Configuration WHERE Key="AppVersion"' - self.app_version = self._execute_sql_query(query)[0][0] + return self._execute_sql_query(query)[0][0] def _get_xml_from_key(self, key: str): query = f"SELECT Value FROM Configuration WHERE Key='{key}'" @@ -574,60 +471,125 @@ def _append_scan_data(self): """ # pylint: disable=too-many-locals - individual_scans = [] - scan_id = 0 + for spectrum in self.spectra: + spectrum["data"]: Dict[str, Any] = {} + # copy node to new instance + group_node_id = self._get_sql_node_id(spectrum["group_id"]) + + spectrum["detector_calib"] = self._get_detector_calibration(group_node_id) + try: + pass_energy = spectrum["pass_energy_or_retardation_ratio"] + + detector_shifts = [ + item["shift"] + for key, item in spectrum["detector_calib"].items() + if key.startswith("detector") + ] + spectrum["detector_calib"]["shifts"] = ( + np.array(detector_shifts) * pass_energy + ) + except KeyError: + pass + + n_channels = spectrum["energy_channels"] node_id = self._get_sql_node_id(spectrum["spectrum_id"]) - n_channels = self._check_energy_channels(node_id) raw_ids = self._get_raw_ids(node_id) - n_scans = len(raw_ids) + # Add transmission function transmission_data = self._get_transmission(node_id) + spectrum["transmission_function/relative_intensity"] = np.array( + transmission_data + ) - for scan_no in range(n_scans): - scan = copy.copy(spectrum) + spectrum["abscissa_info"] = self._get_sql_abscissa_info(node_id) - scan["scan_id"] = scan_id - # get signal data for each scan - signal_data = self._get_one_scan(raw_ids[scan_no]) + for scan_id, raw_id in enumerate(raw_ids): + scan = {"scan_id": scan_id} - # extract the individual channel data - signal_data = self._separate_channels(signal_data, n_channels) + raw_data = self._get_one_scan(raw_id) + data = self._separate_channels(raw_data, n_channels) - # average channels if required - if self.sum_channels: - signal_data = self._sum_channels(signal_data) + raw_x = np.arange(data.shape[0]) * spectrum["step_size"] + scan["x"] = raw_x - # convert to counts per second - signal_data_cps = self._convert_to_counts_per_sec( - signal_data, float(scan["dwell_time"]) - ) + if spectrum["energy_scan_mode"] == "fixed_analyzer_transmission": + shifts = spectrum["detector_calib"]["shifts"] + num_values = spectrum["num_values"] - # attach individual channel data to scan - for ch_no, channel_data in enumerate(signal_data_cps): - scan[f"cps_ch_{ch_no}"] = list(channel_data) - # no_of_scans_avg['scans'] = 1 + raw_spectrum = [ + np.vstack((raw_x + shifts[i], data[:, i])).T + for i in range(n_channels) + ] - # scan['cps_calib'] = self._get_calibrated_data(spectrum) - # """ This is wrong and needs to be corrected!!!""" - scan["cps_calib"] = copy.copy(scan["cps_ch_0"]) + xmin = ( + max([raw_spectrum[n][:, 0].min() for n in range(n_channels)]), + ) + xmax = min([raw_spectrum[n][:, 0].max() for n in range(n_channels)]) - # Add transmission function - scan["transmission_function/relative_intensity"] = np.array( - transmission_data - ) + new_x = raw_x[np.where((raw_x > xmin) & (raw_x < xmax))] + + if new_x.size == num_values: + new_spectrum = [] + for i in range(n_channels): + f = interp1d( + raw_spectrum[i][:, 0], + raw_spectrum[i][:, 1], + kind="linear", + ) + new_spectrum.append(f(new_x)) + data = np.array(new_spectrum).T + + scan["x"] = new_x + else: + logger.error( + f"Data size mismatch for scan {raw_id} in spectrum {scan_id}." + ) + logger.error(f"Expected size: {num_values}, got {new_x.size}.") + logger.error("Skipping this scan.") + + scan["raw"] = raw_data + scan["channels"] = data + scan["merged"] = np.sum(data, axis=1) # add metadata including scan, loop no and datetime - scan_metadata = self._get_scan_metadata(raw_ids[scan_no]) + scan_metadata = self._get_scan_metadata(raw_id) for key, values in scan_metadata.items(): scan[key] = values - individual_scans += [scan] - scan_id += 1 + spectrum["data"] = scan + + # TODO: make this working + # extension_channels = self._get_extension_channel_info(node_id) - # update self.spectra with the scan data - self.spectra = individual_scans + # setattr(extension_channels, self._format_name( + # extension_channel.detector), extension_channel) + # if len(extension_channels)-1 == len(n_spectrum_channels): + # for extension_channel in extension_channel + # for channel in spectrum.channels: + # if getattr(extension_channels, k).name == channel.name: + # for attr in channel.__members__(): + # setattr(getattr(extension_channels, k), + # attr, getattr(channel, attr)) + # spectrum.channels = extension_channels + + def _get_detector_calibration(self, node_id: int): + """Extract detector calibration for given node_id.""" + query = f'SELECT Data FROM NodeData WHERE Node="{node_id}"' + elem = ET.fromstring(self._execute_sql_query(query)[0][0]) + + detectors = {} + + detectors["info"] = iterate_xml_at_tag(elem, "DetectorCalibration") + + for detector_no, subelem in enumerate(elem.iter("Detector")): + detector = {} + for key, value in subelem.attrib.items(): + key, value = format_key_and_value(key, value) + detector[key] = value + detectors[f"detector{detector_no}"] = detector + return detectors def _get_transmission(self, node_id: int) -> np.ndarray: """ @@ -643,23 +605,82 @@ def _get_transmission(self, node_id: int) -> np.ndarray: transmission_data : array Array of TF values for the spectrum at node ID. """ - cur = self.con.cursor() - query = ( - f'SELECT Data, SAMPLES, Ekin FROM TransmissionData WHERE Node="{node_id}"' - ) - cur.execute(query) - results = cur.fetchall() - buffer = self.encoding[1] - encoding = self.encoding[0] + query = f'SELECT Ekin, NonEnergyChns, Samples, Data FROM TransmissionData WHERE Node="{node_id}"' + try: + results = self._execute_sql_query(query)[0] + except (IndexError, sqlite3.OperationalError): + logger.info(f"No transmission function data found for node {node_id}.") + return None + + transmission_data = np.frombuffer(results[-1], dtype=np.float64) + + return transmission_data + + def _get_sql_abscissa_info(self, node_id: int): + """ + Get the Abscissa Info. + """ + query = f'SELECT * FROM AbscissaInfo WHERE Node="{node_id}"' + try: + results = self._execute_sql_query(query)[0] + except (IndexError, sqlite3.OperationalError): + logger.info(f"No AbscissaInfo found for node {node_id}.") + return None + + abscissa_info: Dict[str, Any] = {} + + for idx, key in enumerate(self._get_column_names("AbscissaInfo")): + abscissa_info[key] = results[idx] - stream = [] - for result in results: - length = result[1] * buffer - data = result[0] - for i in range(0, length, buffer): - stream.append(struct.unpack(encoding, data[i : i + buffer])[0]) + return abscissa_info - return stream + def _get_extension_channel_info(self, node_id): + def _parse_channel_name(channel_name): + device = re.findall(r"\((.*?)\)", channel_name)[0] + unit = re.findall(r"\[(.*?)\]", channel_name)[0] + detector = channel_name.split("(")[0].split("[")[0].strip() + return {"device": device, "unit": unit, "detector": detector} + + query = f'SELECT Node, Channel, Name FROM ExtensionChannelInfo WHERE Node="{node_id}"' + try: + info = self._execute_sql_query(query)[0] + except (IndexError, sqlite3.OperationalError): + logger.info(f"No ExtensionChannelInfo found for node {node_id}.") + return None + + if info: + extension_channels: List[Dict[str, Any]] = [] + # detectors = [entry[2] for entry in info] + # extract detector names from () + # device = [re.findall(r'\((.*?)\)', detector)[0] for detector in detectors] + # unit = [re.findall(r'\[(.*?)\]', detector)[0] for detector in detectors] + # # remove content inside () and [] + # detector = [detector.split('(')[0].split('[')[0].strip() for detector in detectors] + + for entry in info: + extension_channel: Dict[str, Any] = {} + extension_channel["node_id"] = entry[0] + extension_channel["channel"] = entry[1] + extension_channel["name"] = entry[2] + name_info = _parse_channel_name(entry[2]) + for key, value in name_info.items(): + extension_channel[key] = value + + extension_channels.append(extension_channel) + + return extension_channels + + logger.info(f"No ExtensionChannelInfo found for node {node_id}.") + return None + + # def _add_extension_data(self): + # for channel in spectrum.channels.values()[1:]: + # # TODO: this is a temporary fix, could add __iter__ to DataSet + # channel.signal = [] + # for raw_id in spectrum.scans.raw_ids: + # channel.signal.append(np.frombuffer(self._getExtensionData( + # raw_id, channel=channel.channel)[-1], dtype=np.float64)) + # channel.signal = np.array(channel.signal) def _separate_channels(self, data: List[float], n_channels: int) -> np.ndarray: """ @@ -678,224 +699,7 @@ def _separate_channels(self, data: List[float], n_channels: int) -> np.ndarray: Separate data across n_channels. """ - n_points = int(len(data) / n_channels) - return np.reshape(np.array(data), (n_channels, n_points)) - - # """ NEED TO UPDATE THIS METHOD""" - # def _get_calibrated_data(self, raw_data): - # """ - # - # - # Parameters - # ---------- - # raw_data : List - # DESCRIPTION. - # - # Returns - # ------- - # channel_dict : TYPE - # DESCRIPTION. - # - # """ - # mcd_num = int(raw_data["mcd_num"]) - # - # curves_per_scan = raw_data["curves_per_scan"] - # values_per_curve = raw_data["values_per_curve"] - # values_per_scan = int(curves_per_scan * values_per_curve) - # mcd_head = int(raw_data["mcd_head"]) - # mcd_tail = int(raw_data["mcd_tail"]) - # excitation_energy = raw_data["excitation_energy"] - # energy_scan_mode = raw_data["energy_scan_mode"] - # kinetic_energy = raw_data["kinetic_energy"] - # scan_delta = raw_data["scan_delta"] - # pass_energy = raw_data["pass_energy"] - # kinetic_energy_base = raw_data["kinetic_energy_base"] - # # Adding one unit to the binding_energy_upper is added as - # # electron comes out if energy is one unit higher - # binding_energy_upper = excitation_energy - \ - # kinetic_energy + kinetic_energy_base + 1 - # - # mcd_energy_shifts = raw_data["mcd_shifts"] - # mcd_energy_offsets = [] - # offset_ids = [] - # - # # consider offset values for detector with respect to - # # position at +16 which is usually large and positive value - # for mcd_shift in mcd_energy_shifts: - # mcd_energy_offset = ( - # mcd_energy_shifts[-1] - mcd_shift) * pass_energy - # mcd_energy_offsets.append(mcd_energy_offset) - # offset_id = round(mcd_energy_offset / scan_delta) - # offset_ids.append( - # int(offset_id - 1 if offset_id > 0 else offset_id)) - # - # # Skiping entry without count data - # if not mcd_energy_offsets: - # continue - # mcd_energy_offsets = np.array(mcd_energy_offsets) - # # Putting energy of the last detector as a highest energy - # starting_eng_pnts = binding_energy_upper - mcd_energy_offsets - # ending_eng_pnts = (starting_eng_pnts - # - values_per_scan * scan_delta) - # - # channeltron_eng_axes = np.zeros((mcd_num, values_per_scan)) - # for ind in np.arange(len(channeltron_eng_axes)): - # channeltron_eng_axes[ind, :] = \ - # np.linspace(starting_eng_pnts[ind], - # ending_eng_pnts[ind], - # values_per_scan) - # - # channeltron_eng_axes = np.round_(channeltron_eng_axes, - # decimals=8) - # # construct ultimate or incorporated energy axis from - # # lower to higher energy - # scans = list(raw_data["scans"].keys()) - # - # # Check whether array is empty or not - # if not scans: - # continue - # if not raw_data["scans"][scans[0]].any(): - # continue - # # Sorting in descending order - # binding_energy = channeltron_eng_axes[-1, :] - # - # self._xps_dict["data"][entry] = xr.Dataset() - # - # for scan_nm in scans: - # channel_counts = np.zeros((mcd_num + 1, - # values_per_scan)) - # # values for scan_nm corresponds to the data for each - # # "scan" in individual CountsSeq - # scan_counts = raw_data["scans"][scan_nm] - # - # if energy_scan_mode == "fixed_analyser_transmission": - # for row in np.arange(mcd_num): - # - # count_on_row = scan_counts[row::mcd_num] - # # Reverse counts from lower to higher - # # BE as in BE_eng_axis - # count_on_row = \ - # count_on_row[mcd_head:-mcd_tail] - # - # channel_counts[row + 1, :] = count_on_row - # channel_counts[0, :] += count_on_row - # - # # Storing detector's raw counts - # self._xps_dict["data"][entry][f"{scan_nm}_chan_{row}"] = \ - # xr.DataArray(data=channel_counts[row + 1, :], - # coords={"BE": binding_energy}) - # - # # Storing callibrated and after accumulated each scan counts - # if row == mcd_num - 1: - # self._xps_dict["data"][entry][scan_nm] = \ - # xr.DataArray(data=channel_counts[0, :], - # coords={"BE": binding_energy}) - # else: - # for row in np.arange(mcd_num): - # - # start_id = offset_ids[row] - # count_on_row = scan_counts[start_id::mcd_num] - # count_on_row = count_on_row[0:values_per_scan] - # channel_counts[row + 1, :] = count_on_row - # - # # shifting and adding all the curves. - # channel_counts[0, :] += count_on_row - # - # # Storing detector's raw counts - # self._xps_dict["data"][entry][f"{scan_nm}_chan{row}"] = \ - # xr.DataArray(data=channel_counts[row + 1, :], - # coords={"BE": binding_energy}) - # - # # Storing callibrated and after accumulated each scan counts - # if row == mcd_num - 1: - # self._xps_dict["data"][entry][scan_nm] = \ - # xr.DataArray(data=channel_counts[0, :], - # coords={"BE": binding_energy}) - # - # # Skiping entry without count data - # if not mcd_energy_offsets: - # continue - # mcd_energy_offsets = np.array(mcd_energy_offsets) - # # Putting energy of the last detector as a highest energy - # starting_eng_pnts = binding_energy_upper - mcd_energy_offsets - # ending_eng_pnts = (starting_eng_pnts - # - values_per_scan * scan_delta) - # - # channeltron_eng_axes = np.zeros((mcd_num, values_per_scan)) - # for ind in np.arange(len(channeltron_eng_axes)): - # channeltron_eng_axes[ind, :] = \ - # np.linspace(starting_eng_pnts[ind], - # ending_eng_pnts[ind], - # values_per_scan) - # - # channeltron_eng_axes = np.round_(channeltron_eng_axes, - # decimals=8) - # # construct ultimate or incorporated energy axis from - # # lower to higher energy - # scans = list(raw_data["scans"].keys()) - # - # # Check whether array is empty or not - # if not scans: - # continue - # if not raw_data["scans"][scans[0]].any(): - # continue - # # Sorting in descending order - # binding_energy = channeltron_eng_axes[-1, :] - # - # self._xps_dict["data"][entry] = xr.Dataset() - # - # for scan_nm in scans: - # channel_counts = np.zeros((mcd_num + 1, - # values_per_scan)) - # # values for scan_nm corresponds to the data for each - # # "scan" in individual CountsSeq - # scan_counts = raw_data["scans"][scan_nm] - # - # if energy_scan_mode == "fixed_analyser_transmission": - # for row in np.arange(mcd_num): - # - # count_on_row = scan_counts[row::mcd_num] - # # Reverse counts from lower to higher - # # BE as in BE_eng_axis - # count_on_row = \ - # count_on_row[mcd_head:-mcd_tail] - # - # channel_counts[row + 1, :] = count_on_row - # channel_counts[0, :] += count_on_row - # - # # Storing detector's raw counts - # self._xps_dict["data"][entry][f"{scan_nm}_chan_{row}"] = \ - # xr.DataArray(data=channel_counts[row + 1, :], - # coords={"BE": binding_energy}) - # - # # Storing callibrated and after accumulated each scan counts - # if row == mcd_num - 1: - # self._xps_dict["data"][entry][scan_nm] = \ - # xr.DataArray(data=channel_counts[0, :], - # coords={"BE": binding_energy}) - # else: - # for row in np.arange(mcd_num): - # - # start_id = offset_ids[row] - # count_on_row = scan_counts[start_id::mcd_num] - # count_on_row = count_on_row[0:values_per_scan] - # channel_counts[row + 1, :] = count_on_row - # - # # shifting and adding all the curves. - # channel_counts[0, :] += count_on_row - # - # # Storing detector's raw counts - # self._xps_dict["data"][entry][f"{scan_nm}_chan{row}"] = \ - # xr.DataArray(data=channel_counts[row + 1, :], - # coords={"BE": binding_energy}) - # - # # Storing callibrated and after accumulated each scan counts - # if row == mcd_num - 1: - # self._xps_dict["data"][entry][scan_nm] = \ - # xr.DataArray(data=channel_counts[0, :], - # coords={"BE": binding_energy}) - # - # return channel_dict + return data.reshape(data.size // n_channels, n_channels) def _check_energy_channels(self, node_id: int) -> int: """ @@ -914,10 +718,8 @@ def _check_energy_channels(self, node_id: int) -> int: n_channels : int Number of separate energy channels for the spectrum at node ID. """ - cur = self.con.cursor() query = f'SELECT EnergyChns FROM Spectrum WHERE Node="{node_id}"' - cur.execute(query) - result = cur.fetchall() + result = self._execute_sql_query(query)[0][0] if len(result) != 0: n_channels = result[0][0] return n_channels @@ -942,11 +744,8 @@ def _get_raw_ids(self, node_id: int) -> List[int]: List of raw IDs for the given note ID. """ - cur = self.con.cursor() query = f'SELECT RawId FROM RawData WHERE Node="{node_id}"' - cur.execute(query) - - return [i[0] for i in cur.fetchall()] + return [i[0] for i in self._execute_sql_query(query)] def _check_number_of_scans(self, node_id: int) -> int: """ @@ -963,10 +762,8 @@ def _check_number_of_scans(self, node_id: int) -> int: Number of separate scans for the spectrum. """ - cur = self.con.cursor() query = f'SELECT RawId FROM RawData WHERE Node="{node_id}"' - cur.execute(query) - return len(cur.fetchall()) + return len(self._execute_sql_query(query)) def _get_detector_data(self, node_id: int) -> List[float]: """ @@ -987,10 +784,8 @@ def _get_detector_data(self, node_id: int) -> List[float]: List of lists with measured data. """ - cur = self.con.cursor() query = f'SELECT RawID FROM RawData WHERE Node="{node_id}"' - cur.execute(query) - raw_ids = [i[0] for i in cur.fetchall()] + raw_ids = [i[0] for i in self._execute_sql_query(query)] detector_data = [] if len(raw_ids) > 1: for raw_id in raw_ids: @@ -1003,8 +798,8 @@ def _get_detector_data(self, node_id: int) -> List[float]: def _attach_device_protocols(self): """ - Get the device protocol for each node and add the paramaters of - the Phoibos to the spectra table. Occassionally these are not + Get the device protocol for each node and add the paramaters to + the spectra table. Occassionally these are not recorded, if this is the case just skip the group. Returns @@ -1015,18 +810,18 @@ def _attach_device_protocols(self): # iterate through each spectrum for spectrum in self.spectra: # conver the xml xps id to the node ID and get the device protocol - cur = self.con.cursor() protocol_node_id = self._get_sql_node_id(spectrum["device_group_id"]) query = ( f'SELECT Protocol FROM DeviceProtocol WHERE Node="{protocol_node_id}"' ) - result = cur.execute(query).fetchone() + result = self._execute_sql_query(query)[0] # if a record was accessed then parse, if not skip if result: protocol = ET.fromstring(result[0]) protocal_params = self._get_one_device_protocol(protocol) spectrum.update(protocal_params) + print(spectrum) def _get_one_device_protocol(self, protocol: ET.Element) -> Dict[str, Any]: """ @@ -1043,24 +838,16 @@ def _get_one_device_protocol(self, protocol: ET.Element) -> Dict[str, Any]: All parameters given in the device protocol. """ - protocol_params: Dict[str, Any] = {} - for device in protocol.iter("Command"): - if "Phoibos" in device.attrib["UniqueDeviceName"]: - # iterate through the parameters and add to spectrum - # dict - for parameter in device.iter("Parameter"): - if parameter.attrib["type"] == "double": - param_text = float(parameter.text) - else: - param_text = parameter.text - protocol_params[parameter.attrib["name"]] = param_text - elif "XRC1000" in device.attrib["UniqueDeviceName"]: - for parameter in device.iter("Parameter"): - if parameter.attrib["type"] == "double": - param_text = float(parameter.text) - else: - param_text = parameter.text - protocol_params[parameter.attrib["name"]] = param_text + protocol_params: Dict[str, Dict[str, Any]] = {} + for elem in protocol.iter("Command"): + unique_device_name = elem.attrib["UniqueDeviceName"] + protocol_params[unique_device_name]: Dict[str, Any] = {} + + for parameter in elem.iter("Parameter"): + key, value = format_key_and_value( + parameter.attrib["name"], parameter.text + ) + protocol_params[unique_device_name][key] = value return protocol_params @@ -1083,19 +870,12 @@ def _get_one_scan(self, raw_id: int) -> List[float]: List with measured data. """ - cur = self.con.cursor() - query = f'SELECT Data, ChunkSize FROM CountRateData WHERE RawId="{raw_id}"' - cur.execute(query) - results = cur.fetchall() - buffer = self.encoding[1] - encoding = self.encoding[0] - stream = [] - for result in results: - length = result[1] * buffer - data = result[0] - for i in range(0, length, buffer): - stream.append(struct.unpack(encoding, data[i : i + buffer])[0]) - return stream + query = f'SELECT Data FROM CountRateData WHERE RawId="{raw_id}"' + data = self._execute_sql_query(query)[0][0] + + data = self._decompress_data(data) + + return np.frombuffer(data, dtype=self.encoding) def _parse_external_channels(self, channel: int): """ @@ -1125,10 +905,9 @@ def _get_spectrum_metadata_from_sql(self): """ for spectrum in self.spectra: node_id = self._get_sql_node_id(spectrum["spectrum_id"]) - cur = self.con.cursor() query = f'SELECT * FROM Spectrum WHERE Node="{node_id}"' - cur.execute(query) - results = cur.fetchall() + results = self._execute_sql_query(query) + if len(results) != 0: results = results[0] @@ -1136,20 +915,13 @@ def _get_spectrum_metadata_from_sql(self): combined = { k: v for k, v in dict(zip(column_names, results)).items() - if k in self.sql_metadata_map + if k in KEY_MAP } combined = copy.copy(combined) if "EnergyType" not in combined.keys(): combined["EnergyType"] = "Binding" for key, value in combined.items(): - spectrum[key] = value - - query = f'SELECT Data FROM NodeData WHERE Node="{node_id}"' - cur.execute(query) - results = ET.fromstring(cur.fetchall()[0][0]) - for i in results.iter("AnalyzerSpectrumParameters"): - spectrum["work_function"] = i.attrib["Workfunction"] - spectrum["step_size"] = float(i.attrib["ScanDelta"]) + spectrum[KEY_MAP[key]] = value def _get_scan_metadata(self, raw_id: int) -> Dict[str, Any]: """ @@ -1170,9 +942,8 @@ def _get_scan_metadata(self, raw_id: int) -> Dict[str, Any]: """ # get string Trace from RawData - cur = self.con.cursor() query = f'SELECT ScanDate, Trace FROM RawData WHERE RawID="{raw_id}"' - result = cur.execute(query).fetchone() + result = self._execute_sql_query(query)[0] # process metadata into a dictionary scan_meta: Dict[str, Any] = {} scan_meta["time_stamp_trace"] = result[0] @@ -1253,11 +1024,11 @@ def _get_sql_node_id(self, xml_id: int) -> int: ID in the SQL tables. """ - cur = self.con.cursor() query = f'SELECT Node FROM NodeMapping WHERE InternalID="{xml_id}"' - cur.execute(query) - node_id = cur.fetchall()[0][0] - return node_id + try: + return self._execute_sql_query(query)[0][0] + except IndexError: + return None def _attach_node_ids(self): """ @@ -1285,10 +1056,9 @@ def _remove_empty_nodes(self): idx = j[0] spectrum = j[1] node_id = spectrum["node_id"] - cur = self.con.cursor() query = f'SELECT Node FROM Spectrum WHERE Node="{node_id}"' - cur.execute(query) - result = cur.fetchall() + result = self._execute_sql_query(query) + if len(result) == 0: del self.spectra[idx] @@ -1329,9 +1099,8 @@ def _get_table_names(self) -> List[str]: List of spectrum names. """ - cur = self.con.cursor() - cur.execute('SELECT name FROM sqlite_master WHERE type= "table"') - return [i[0] for i in cur.fetchall()] + query = 'SELECT name FROM sqlite_master WHERE type= "table"' + return [i[0] for i in self._execute_sql_query(query)] def _get_column_names(self, table_name: str) -> List[str]: """ @@ -1348,9 +1117,8 @@ def _get_column_names(self, table_name: str) -> List[str]: List of column names. """ - cur = self.con.cursor() - cur.execute((f"SELECT * FROM {table_name}")) - names = [description[0] for description in cur.description] + self.cur.execute((f"SELECT * FROM {table_name}")) + names = [description[0] for description in self.cur.description] return names def _close_con(self): @@ -1364,44 +1132,47 @@ def _close_con(self): """ self.con.close() - def _sum_channels(self, data: List[float]) -> np.ndarray: + def _check_encoding(self) -> None: """ - Sum together energy channels. - - Parameters - ---------- - data : List[float] - List of measured data. + Check whether the binary data should be decoded float or double. Returns ------- - np.ndarray - Summed energy channels. + None. """ - summed = np.sum(data, axis=0) - return np.reshape(summed, (1, -1)) + query = "SELECT Data, ChunkSize FROM CountRateData LIMIT 1" + binary_data, chunksize = self._execute_sql_query(query)[0] - def _check_encoding(self): - """ - Check whether the binary data should be decoded float or double. + binary_data = self._decompress_data(binary_data) - Returns - ------- - None. + length_ratio = len(binary_data) / chunksize + if length_ratio == 2: + self.encoding = self.encodings_dtype["double"] + elif length_ratio == 4: + self.encoding = self.encodings_dtype["float"] + elif length_ratio == 8: + self.encoding = self.encodings_dtype["double"] + else: + logger.error( + "Unsupported binary encoding for length ratio: %s", length_ratio + ) + def _decompress_data(self, binary_data: Union[bytes, bytearray]) -> bytes: """ - cur = self.con.cursor() - query = "SELECT LENGTH(Data),ChunkSize FROM CountRateData LIMIT 1" - cur.execute(query) - data, chunksize = cur.fetchall()[0] + Attempts to decompress binary data using zlib. If decompression fails, + returns the original data. - if data / chunksize == 4: - self.encoding = encodings_map["float"] - elif data / chunksize == 8: - self.encoding = encodings_map["double"] - else: - logger.error("This binary encoding is not supported.") + Args: + binary_data (bytes | bytearray): Compressed binary data to decompress. + + Returns: + bytes: Decompressed data if successful, otherwise the original binary data. + """ + try: + return zlib.decompress(binary_data) + except zlib.error: # Catch only zlib-specific decompression errors + return binary_data def _reindex_spectra(self): """Re-number the spectrum_id.""" @@ -1475,19 +1246,3 @@ def _remove_snapshot(self): self.spectra = [ spec for spec in self.spectra if "snapshot" not in spec["energy_scan_mode"] ] - - def get_sle_version(self) -> str: - """ - Get the Prodigy SLE version from the file. - - Returns - ------- - version : str - Prodigy SLE version of SLE file. - - """ - cur = self.con.cursor() - query = 'SELECT Value FROM Configuration WHERE Key=="Version"' - cur.execute(query) - version = cur.fetchall()[0][0] - return version diff --git a/src/pynxtools_xps/specs/sle/specs_sle_mapping.py b/src/pynxtools_xps/specs/sle/utils.py similarity index 53% rename from src/pynxtools_xps/specs/sle/specs_sle_mapping.py rename to src/pynxtools_xps/specs/sle/utils.py index 7b575ec7..664375a9 100644 --- a/src/pynxtools_xps/specs/sle/specs_sle_mapping.py +++ b/src/pynxtools_xps/specs/sle/utils.py @@ -18,13 +18,16 @@ Mappings for Specs Lab Prodigy SLE format reader. """ -from typing import List, Dict, Any +from typing import List, Dict, Any, Union, Tuple +from lxml import etree as ET -# from pynxtools_xps.reader_utils import ( -# convert_pascal_to_snake, -# _re_map_single_value, -# ) +from pynxtools_xps.reader_utils import ( + convert_pascal_to_snake, + _re_map_single_value, + _format_value, + extract_unit, +) from pynxtools_xps.value_mappers import ( convert_energy_type, @@ -111,4 +114,86 @@ "Work Function", ] +UNITS: Dict[str, str] = { + "work_function": "eV", + "excitation_energy": "eV", + "iris_diameter": "mm", + "step_size": "eV", + "detector_voltage": "V", + "dwell_time": "s", + "raw_data/raw": "counts_per_second ", + "polar_angle": "degree ", + "azimuth_angle": "degree", + "pass_energy": "eV", + "start_energy": "eV", + "emission_current": "A", + "source_voltage": "V", + "transmission_function/kinetic_energy": "eV", +} + POSSIBLE_DATE_FORMATS: List[str] = ["%Y-%b-%d %H:%M:%S.%f"] + + +def format_key_and_value(key: str, value_str: str) -> Tuple[Any, str]: + """ + Formats a key and a corresponding value string according to a series of transformations. + + This function: + 1. Maps the key based on a predefined dictionary (`KEY_MAP`). + 2. Converts the key from PascalCase to snake_case. + 3. Extracts the numeric value and unit from the value string. + 4. Formats the numeric part of the value according to its expected type. + 5. Remaps the value to a new format if specified in `VALUE_MAP`. + + Args: + key (str): The key associated with the value, which may need mapping and formatting. + value_str (str): The value string to format and separate into numeric value and unit. + + Returns: + Tuple[Any, str]: + - The formatted key (converted to snake_case and remapped if needed). + - The formatted value, with numeric value processed and remapped according to `VALUE_MAP`. + """ + key = KEY_MAP.get(key, convert_pascal_to_snake(key)) + + value, unit = extract_unit(key, value_str) + value = _format_value(value) + value = _re_map_single_value(key, value, VALUE_MAP) + + return key, value + + +def iterate_xml_at_tag( + xml_elem: ET.Element, tag: str +) -> Dict[str, Union[str, float, int]]: + """ + Iterates through XML elements at the specified tag and formats their attributes. + + Parameters + ---------- + xml_elem : ET.Element + The XML element to search within. + + tag : str + The tag name to find in the XML structure. + + Returns + ------- + Dict[str, Union[str, float, int]] + A dictionary containing formatted attribute values keyed by their corresponding names. + """ + + subelem = xml_elem.find(tag) + + settings = {} + + special_key_map = KEY_MAP.get(tag, {}) + + if subelem is not None: + for param in subelem.iter(): + for key, value in param.attrib.items(): + key = special_key_map.get(key, key) + key, value = format_key_and_value(key, value) + settings[key] = value + + return settings