Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scienta PEAK version control #100

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/pynxtools_xps/scienta/scienta_data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,53 @@ class ScientaRegion(XpsDataclass):
time_per_spectrum_channel: float = 0.0
detector_mode: str = ""
data: dict = field(default_factory=dict)


#New dataclass for PEAK version
@dataclass
class ExcitationSourceEnergyInformation:
SourceType: str = ""
Energy: float = 0.0
TargetEnergy: float = 0.0
EnergyStatus: str = ""
EnergyStatusMessage: str = ""

@dataclass
class ScientaRegionIgorPEAK(XpsDataclass):
region_id: int = 0
Name: str = ""
energy_size: int = 0
energy_scale: str = ""
energy_scale_2: str = ""
energy_axis: np.ndarray = field(default_factory=lambda: np.zeros(0))
LensMode: str = ""
PassEnergy: float = 0.0
no_of_scans: int = 0
Energy: ExcitationSourceEnergyInformation = field(default_factory=ExcitationSourceEnergyInformation)
acquisition_mode: str = ""
energy_units: str = "kinetic" # ???
center_energy: float = 0.0
start_energy: float = 0.0
stop_energy: float = 0.0
step_size: float = 0.0
DwellTime: float = 0.0
detector_first_x_channel: int = 0
detector_last_x_channel: int = 0
detector_first_y_channel: int = 0
detector_last_y_channel: int = 0
number_of_slices: str = ""
data_file: str = ""
SequenceName: str = ""
spectrum_type: str = ""
Instrument: str = ""
vendor: str = ""
UserName: str = ""
sample_name: str = ""
spectrum_comment: str = ""
Date: str = ""
Time: str = ""
time_stamp: str = ""
time_per_spectrum_channel: float = 0.0
detector_mode: str = ""
Data: dict = field(default_factory=dict)

251 changes: 249 additions & 2 deletions src/pynxtools_xps/scienta/scienta_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
MPES nxdl (NeXus Definition Language) template.
"""

import re
import json
import copy
import re
import warnings
Expand All @@ -37,7 +39,9 @@
construct_data_key,
construct_entry_name,
)
from pynxtools_xps.scienta.scienta_data_model import ScientaHeader, ScientaRegion
from pynxtools_xps.value_mappers import get_units_for_key, convert_units

from pynxtools_xps.scienta.scienta_data_model import ScientaHeader, ScientaRegion, ScientaRegionIgorPEAK
from pynxtools_xps.scienta.scienta_mappings import (
UNITS,
VALUE_MAP,
Expand Down Expand Up @@ -80,7 +84,14 @@ def _select_parser(self):
if str(self.file).endswith(".txt"):
return ScientaTxtParser()
elif str(self.file).endswith(".ibw"):
return ScientaIgorParser()
with open(str(self.file),'rb') as f:
data_ibw = binarywave.load(f)
try:
check_note = json.loads(data_ibw['wave']['note'].decode('utf-8'))["Version"]
print(check_note)
return ScientaIgorParserPEAK()
except:
return ScientaIgorParser()
raise ValueError(MapperScienta.__file_err_msg__)

def construct_data(self):
Expand Down Expand Up @@ -614,3 +625,239 @@ def axis_units_for_dim(self, wave_header: Dict[str, Any], dim: int) -> str:
unit += elem.decode("utf-8")

return unit

class ScientaIgorParserPEAK:
"""Parser for Scienta TXT exports."""

# pylint: disable=too-few-public-methods

def __init__(self):
self.lines: List[str] = []
self.spectra: List[Dict[str, Any]] = []

def parse_file(self, file: Union[str, Path], **kwargs):
"""
Reads the igor binarywave files and returns a list of
dictionary containing the wave data.

Parameters
----------
file : str
Filepath of the TXT file to be read.

Returns
-------
self.spectra
Flat list of dictionaries containing one spectrum each.

"""
ibw = binarywave.load(file)
ibw_version, wave = ibw["version"], ibw["wave"]

notes = self._parse_note(wave["note"])

data_unit_label, data_unit = self._parse_unit(wave["data_units"])
dimension_unit_label, dimension_unit = self._parse_unit(wave["dimension_units"])

wave_header = wave["wave_header"]
data = wave["wData"]

#Create 1d reduced data on the 2D image
#NOTE: This should be removed we want a 2D image displayed as shown in the PEKA software
data = np.sum(data,axis=1)

# Not needed at the moment.
# TODO: Add support for formulas if they are written by the
# measurement software.
# formula = wave["formula"]
# labels = wave["labels"]
# spectrum_indices = wave["sIndices"]
# bin_header = wave["bin_header"]

if len(data.shape) == 1:
self.no_of_regions = 1
else:
#self.no_of_regions = data.shape[0]
#NOTE: Changed due to data shape change
self.no_of_regions = len(data.shape)

for region_id in range(0, self.no_of_regions):
region = ScientaRegionIgorPEAK(region_id=region_id)
region_fields = list(region.__dataclass_fields__.keys())
overwritten_fields = ["region_id", "time_stamp", "data"]
unused_notes_keys = []

for key, note in notes.items():
if _check_valid_value(note):
if key in region_fields:
setattr(region, key, note)
overwritten_fields += [key]
else:
unused_notes_keys += [key]

energies = self.axis_for_dim(wave_header, dim=region_id)
axs_unit = self.axis_units_for_dim(wave_header, dim=region_id)

if data.ndim == 1:
intensities = data
else:
intensities = data[region_id]

# Convert date and time to ISO8601 date time.
region.time_stamp = _construct_date_time(
region.Date, region.Time
)

region.energy_size = len(energies)
region.energy_axis = energies

region.data = {
"energy": np.array(energies),
"intensity": np.array(intensities),
}

region.validate_types()

spectrum_dict = region.dict()

for key in unused_notes_keys:
spectrum_dict[key] = notes[key]

spectrum_dict["igor_binary_wave_format_version"] = ibw_version
spectrum_dict["intensity/@units"] = convert_units(data_unit_label)

self.spectra.append(spectrum_dict)

return self.spectra


def _parse_note(self, bnote: bytes) -> Dict[str, Any]:
"""
Parses the note field of the igor binarywave file.
It assumes that the note field contains a JSON string.

Parameters
----------
bnote : bytes
The bytes of the binarywave note field.

Returns
-------
Dict[str, Any]
The dictionary of the parsed note field.
"""
# Decode the byte string to UTF-8
note_str = bnote.decode("utf-8")

# Parse the JSON string into a dictionary
try:
notes = json.loads(note_str)
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
notes = {}

return notes

#def _parse_note(self, bnote: bytes) -> Dict[str, Any]:
"""
Parses the note field of the igor binarywave file.
It assumes that the note field contains key-value pairs
of the form 'key=value' separated by newlines.

Parameters
----------
bnote : bytes
The bytes of the binarywave note field.

Returns
-------
Dict[str, Any]
The dictionary of the parsed note field.

"""
note = bnote.decode("utf-8").replace("\r", "\n")

notes = {}

for line in note.split("\n"):
key, value = _get_key_value_pair(line)
if key:
notes[key] = value

return notes

def _parse_unit(self, bunit: bytes) -> Tuple[str, object]:
"""
Extracts the label and unit from a string containing a label
followed by a unit enclosed in square brackets.

Parameters
----------
bunit: bytes
The input string containing the label and unit.

Returns
-------
tuple
A tuple containing:
- unit_label : str
The extracted label.
- unit : str
The extracted unit.
"""
unit = bunit.decode("utf-8").replace("\r", "\n")

pattern = r"([\w\s]+)\s*\[([\w\s.]+)\]"
matches = re.match(pattern, unit)
if matches is not None:
label = matches.group(1).strip()
unit = matches.group(2).strip()
return label, unit
return "", ""

def axis_for_dim(self, wave_header: Dict[str, Any], dim: int) -> np.ndarray:
"""
Returns the axis values for a given dimension from the wave header.

Parameters
----------
wave_header : Dict[str, Any]
The wave_header of the ibw file.
dim : int
The dimension to return the axis for..

Returns
-------
np.ndarray
Axis values for a given dimension.

"""
return (
wave_header["sfA"][dim] * np.arange(wave_header["nDim"][dim])
+ wave_header["sfB"][dim]
)

def axis_units_for_dim(self, wave_header: Dict[str, Any], dim: int) -> str:
"""
Returns the unit for a given dimension from the wave header.

Parameters
----------
wave_header : Dict[str, Any]
The wave_header of the ibw file.
dim : int
The dimension to return the axis for..

Returns
-------
str:
The axis units.

"""
unit_arr = wave_header["dimUnits"][dim]

unit = ""
for elem in unit_arr:
unit += elem.decode("utf-8")

return unit
Loading