diff --git a/examples/nexusformat.ipynb b/examples/nexusformat.ipynb new file mode 100644 index 00000000..eb36e6fc --- /dev/null +++ b/examples/nexusformat.ipynb @@ -0,0 +1,215 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import ramanchada2 as rc2\n", + "import matplotlib.pyplot as plt" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "SpeMetadataModel(__root__={'Original file': SpeMetadataFieldModel(__root__='PST10_iR785_OP03_8000msx2.txt')})" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "kwargs = {\"sample\":['PST'], \"provider\" : ['FNMT'], \"OP\":['03'], \"laser_wl\":['785']}\n", + "spe = rc2.spectrum.from_test_spe(**kwargs)\n", + "spe.meta" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{ 'assay_uuid': None,\n", + " 'citation': Citation(year='2022', title='Round Robin 1', owner='FNMT'),\n", + " 'effects': [ EffectArray(endpoint='Raman spectrum', endpointtype=None, result=None, conditions=None, idresult=None, endpointGroup=None, endpointSynonyms=None, sampleID=None, signal=ValueArray(unit=None, values=array([1354.36, 1355.04, 1349.8 , ..., 1031.89, 1031.96, 1031.44]), errQualifier=None, errorValue=None), axes={'x': ValueArray(unit='cm-1', values=array([ 120.111, 120.505, 120.899, ..., 2499.82 , 2499.94 , 2499.98 ]), errQualifier=None, errorValue=None)})],\n", + " 'interpretationCriteria': None,\n", + " 'interpretationResult': None,\n", + " 'investigation_uuid': None,\n", + " 'owner': SampleLink(substance=Sample(uuid='PST'), company=Company(uuid=None, name='Default company')),\n", + " 'parameters': {'E.method': 'Raman spectrometry', 'wavelength': 785},\n", + " 'protocol': Protocol(topcategory='P-CHEM', category=EndpointCategory(code='ANALYTICAL_METHODS_SECTION', term=None, title=None), endpoint=None, guideline=None),\n", + " 'updated': None,\n", + " 'uuid': 'cbf04397-9352-4382-89e1-81be6d99f473'}\n" + ] + }, + { + "data": { + "text/plain": [ + "NXroot('spectrum')" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pynanomapper.datamodel.ambit as mx\n", + "import numpy as np\n", + "from typing import Dict, Optional, Union, List\n", + "from pynanomapper.datamodel.nexus_utils import to_nexus\n", + "import json \n", + "import nexusformat.nexus.tree as nx\n", + "import pprint\n", + "import uuid\n", + "pp = pprint.PrettyPrinter(indent=4)\n", + "\n", + "\n", + "data_dict: Dict[str, mx.ValueArray] = {\n", + " 'x': mx.ValueArray(values = spe.x, unit=\"cm-1\")\n", + "}\n", + "ea = mx.EffectArray(endpoint=\"Raman spectrum\", \n", + " signal = mx.ValueArray(values = spe.y),\n", + " axes = data_dict)\n", + "#ea.to_json()\n", + "effect_list: List[Union[mx.EffectRecord,mx.EffectArray]] = []\n", + "effect_list.append(ea)\n", + "papp = mx.ProtocolApplication(protocol=mx.Protocol(topcategory=\"P-CHEM\",category=mx.EndpointCategory(code=\"ANALYTICAL_METHODS_SECTION\")),\n", + " effects=effect_list)\n", + "papp.citation = mx.Citation(owner=\"FNMT\",title=\"Round Robin 1\",year=2022)\n", + "papp.parameters = {\"E.method\" : \"Raman spectrometry\" , \"wavelength\" : 785}\n", + "\n", + "papp.uuid = str(uuid.uuid4())\n", + "company=mx.Company(name = \"FNMT\")\n", + "substance = mx.Sample(uuid = \"PST\")\n", + "papp.owner = mx.SampleLink(substance = substance)\n", + "#papp.to_json()\n", + "study = mx.Study(study=[papp])\n", + "\n", + "pp.pprint(papp.__dict__)\n", + "#print(papp.to_json())\n", + "#print(study.to_json())\n", + "nxroot = nx.NXroot()\n", + "study.to_nexus(nxroot)\n", + "nxroot.save(\"spectrum.nxs\",mode=\"w\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def spe2pap(spe:rc2.spectrum.Spectrum):\n", + " effect_list: List[EffectRecord] = []\n", + " effect_list.append(EffectRecord(endpoint=\"Endpoint 1\", unit=\"Unit 1\", loValue=5.0))\n", + " protocol = Protocol(topcategory=\"P-CHEM\",category=EndpointCategory(code=\"ANALYTICAL_METHODS_SECTION\"))\n", + " papp = ProtocolApplication(protocol=protocol,effects=effect_list)\n", + " return papp\n", + "\n", + "\n", + "spe2pap(spe)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "spe.plot()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "spe.write_nexus(\"nexus_test.cha\",\"entry\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import List\n", + "from pynanomapper.datamodel.ambit import EffectRecord, Protocol, EndpointCategory, ProtocolApplication\n", + "\n", + "effect_list: List[EffectRecord] = []\n", + "\n", + "effect_list.append(EffectRecord(endpoint=\"Endpoint 1\", unit=\"Unit 1\", loValue=5.0))\n", + "effect_list.append(EffectRecord(endpoint=\"Endpoint 2\", unit=\"Unit 2\", loValue=10.0))\n", + "\n", + "papp = ProtocolApplication(protocol=Protocol(topcategory=\"P-CHEM\",category=EndpointCategory(code=\"XYZ\")),effects=effect_list)\n", + "papp" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "papp = ProtocolApplication(effects=effect_list)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "papp.protocol,papp.effects" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pynanomapper.datamodel.ambit import SubstanceRecord,Sample,SampleLink\n", + "SubstanceRecord(name=\"xky\")\n", + "substance=Sample(uuid=\"xxx\")\n", + "SampleLink(substance=substance)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "ramanchada2", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.0" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/src/ramanchada2/io/HSDS.py b/src/ramanchada2/io/HSDS.py index 6ba41a45..a3393767 100644 --- a/src/ramanchada2/io/HSDS.py +++ b/src/ramanchada2/io/HSDS.py @@ -13,6 +13,63 @@ logger = logging.getLogger() +# https://manual.nexusformat.org/examples/napi/python.html +# https://manual.nexusformat.org/examples/python/simple_example_basic/index.html +@pydantic.validate_arguments(config=dict(arbitrary_types_allowed=True)) +def write_nexus(filename: str, + dataset: str, + x: npt.NDArray, y: npt.NDArray, meta: Dict, h5module=None): + _h5 = h5module or h5py + try: + with _h5.File(filename, 'a') as f: + f.attrs['default'] = dataset + try: + nxentry = f.require_group('sample') + except: # noqa: E722 + pass + + nxentry = f.require_group('instrument') + for m in meta: + print(m, meta[m]) + + try: + nxentry = f.require_group(dataset) + nxentry.attrs["NX_class"] = 'NXentry' + nxentry.attrs['default'] = 'data' + except: # noqa: E722 + pass + + try: + nxdata = nxentry.require_group('data') + nxdata.attrs["NX_class"] = 'NXdata' + nxdata.attrs['signal'] = 'spectrum' + nxdata.attrs['axes'] = 'raman_shift' + nxdata.attrs['raman_shift_indices'] = [0,] + except: # noqa: E722 + pass + + try: + tth = nxdata.require_group('raman_shift', data=x) + tth.attrs['units'] = 'cm-1' + tth.attrs['long_name'] = 'Raman shift (cm-1)' + except: # noqa: E722 + pass + + try: + counts = nxdata.create_dataset('spectrum', data=y) + counts.attrs['units'] = 'au' + counts.attrs['long_name'] = 'spectrum' + except: # noqa: E722 + pass + + except ValueError as e: + logger.warning(repr(e)) + + +class DatasetExistsError(Exception): + pass + + @pydantic.validate_arguments(config=dict(arbitrary_types_allowed=True)) def write_cha(filename: str, dataset: str, @@ -25,9 +82,9 @@ def write_cha(filename: str, ds = h5.create_dataset(dataset, data=data) ds.attrs.update(meta) else: - logger.warning(f'dataset `{dataset}` already exists in file `{filename}`') + raise DatasetExistsError(f'dataset `{dataset}` already exists in file `{filename}`') except ValueError as e: - logger.warning(repr(e)) + raise e def read_cha(filename: str, diff --git a/src/ramanchada2/misc/utils/__init__.py b/src/ramanchada2/misc/utils/__init__.py index 2547b120..0b149730 100644 --- a/src/ramanchada2/misc/utils/__init__.py +++ b/src/ramanchada2/misc/utils/__init__.py @@ -15,4 +15,5 @@ find_closest_pairs, find_closest_pairs_idx, align, align_shift, + match_peaks ) diff --git a/src/ramanchada2/misc/utils/argmin2d.py b/src/ramanchada2/misc/utils/argmin2d.py index 66115de8..889e657d 100644 --- a/src/ramanchada2/misc/utils/argmin2d.py +++ b/src/ramanchada2/misc/utils/argmin2d.py @@ -1,7 +1,10 @@ -import pydantic import numpy as np import numpy.typing as npt +import pandas as pd +import pydantic from scipy import linalg +from sklearn.cluster import KMeans +from sklearn.metrics.pairwise import euclidean_distances from typing import List, Union @@ -77,3 +80,67 @@ def align_shift(x, y, if loss > loss_bak: return p_bak return p + + +def match_peaks(spe_pos_dict, ref): + # Min-Max normalize the reference values + min_value = min(ref.values()) + max_value = max(ref.values()) + if len(ref.keys()) > 1: + normalized_ref = {key: (value - min_value) / (max_value - min_value) for key, value in ref.items()} + else: + normalized_ref = ref + + min_value_spe = min(spe_pos_dict.values()) + max_value_spe = max(spe_pos_dict.values()) + # Min-Max normalize the spe_pos_dict + if len(spe_pos_dict.keys()) > 1: + normalized_spe = { + key: (value - min_value_spe) / (max_value_spe - min_value_spe) for key, value in spe_pos_dict.items() + } + else: + normalized_spe = spe_pos_dict + data_list = [ + {'Wavelength': key, 'Intensity': value, 'Source': 'spe'} for key, value in normalized_spe.items() + ] + [ + {'Wavelength': key, 'Intensity': value, 'Source': 'reference'} for key, value in normalized_ref.items() + ] + + # Create a DataFrame from the list of dictionaries + df = pd.DataFrame(data_list) + feature_matrix = df[['Wavelength', 'Intensity']].to_numpy() + + n_ref = len(ref.keys()) + n_spe = len(spe_pos_dict.keys()) + kmeans = KMeans(n_clusters=n_ref if n_ref > n_spe else n_spe) + kmeans.fit(feature_matrix) + labels = kmeans.labels_ + # Extract cluster labels, x values, and y values + df['Cluster'] = labels + grouped = df.groupby('Cluster') + x_spe = np.array([]) + x_reference = np.array([]) + x_distance = np.array([]) + clusters = np.array([]) + # Iterate through each group + for cluster, group in grouped: + # Get the unique sources within the group + unique_sources = group['Source'].unique() + if 'reference' in unique_sources and 'spe' in unique_sources: + # Pivot the DataFrame to create the desired structure + for w_spe in group.loc[group["Source"] == "spe"]["Wavelength"].values: + x = None + r = None + e_min = None + for w_ref in group.loc[group["Source"] == "reference"]["Wavelength"].values: + e = euclidean_distances(w_spe.reshape(-1, 1), w_ref.reshape(-1, 1))[0][0] + if (e_min is None) or (e < e_min): + x = w_spe + r = w_ref + e_min = e + x_spe = np.append(x_spe, x) + x_reference = np.append(x_reference, r) + x_distance = np.append(x_distance, e_min) + clusters = np.append(clusters, cluster) + sort_indices = np.argsort(x_spe) + return (x_spe[sort_indices], x_reference[sort_indices], x_distance[sort_indices], df) diff --git a/src/ramanchada2/protocols/calibration.py b/src/ramanchada2/protocols/calibration.py new file mode 100644 index 00000000..dfb0a4ff --- /dev/null +++ b/src/ramanchada2/protocols/calibration.py @@ -0,0 +1,279 @@ +import matplotlib.pyplot as plt +import numpy as np +import pickle +import ramanchada2.misc.constants as rc2const +from ..misc import utils as rc2utils +from ..spectrum import Spectrum +from matplotlib.axes import Axes +from ramanchada2.misc.plottable import Plottable +from scipy import interpolate + + +class ProcessingModel: + + def __init__(self): + pass + + +class CalibrationComponent(Plottable): + + def __init__(self, laser_wl, spe, spe_units, ref, ref_units, sample=None): + super(CalibrationComponent, self).__init__() + self.laser_wl = laser_wl + self.spe = spe + self.spe_units = spe_units + self.ref = ref + self.ref_units = ref_units + self.name = "not estimated" + self.model = None + self.model_units = None + self.peaks = None + self.sample = sample + self.enabled = True + + def set_model(self, model, model_units, peaks, name=None): + self.model = model + self.model_units = model_units + self.peaks = peaks + self.name = "calibration component" if name is None else name + + def __str__(self): + return (f"{self.name} spe ({self.spe_units}) reference ({self.ref_units}) " + f"model ({self.model_units}) {self.model}") + + def convert_units(self, old_spe, spe_unit="cm-1", newspe_unit="nm", laser_wl=None): + if laser_wl is None: + laser_wl = self.laser_wl + print("convert laser_wl {} {} --> {}".format(laser_wl, spe_unit, newspe_unit)) + if spe_unit != newspe_unit: + new_spe = old_spe.__copy__() + if spe_unit == "nm": + new_spe = old_spe.abs_nm_to_shift_cm_1_filter(laser_wave_length_nm=laser_wl) + elif spe_unit == "cm-1": + new_spe = old_spe.shift_cm_1_to_abs_nm_filter(laser_wave_length_nm=laser_wl) + else: + raise Exception("Unsupported conversion {} to {}", spe_unit, newspe_unit) + else: + new_spe = old_spe + # new_spe = old_spe.__copy__() + return new_spe + + def process(self, old_spe: Spectrum, spe_units="cm-1", convert_back=False): + raise NotImplementedError(self) + + def derive_model(self, find_kw={}, fit_peaks_kw={}, should_fit=False, name=None): + raise NotImplementedError(self) + + def plot(self, ax=None, label=' ', **kwargs) -> Axes: + if ax is None: + fig, ax = plt.subplots(3, 1) + self._plot(ax[0], label=label, **kwargs) + ax.legend() + return ax + + def _plot(self, ax, **kwargs): + self.spe.plot(ax=ax) + + def _plot_peaks(self, ax, **kwargs): + pass + # fig, ax = plt.subplots(3,1,figsize=(12,4)) + # spe.plot(ax=ax[0].twinx(),label=spe_units) + # spe_to_process.plot(ax=ax[1],label=ref_units) + + +class XCalibrationComponent(CalibrationComponent): + def __init__(self, laser_wl, spe, spe_units, ref, ref_units, sample="Neon"): + super(XCalibrationComponent, self).__init__(laser_wl, spe, spe_units, ref, ref_units, sample) + + def process(self, old_spe: Spectrum, spe_units="cm-1", convert_back=False): + print("convert spe_units {} --> model units {}".format(spe_units, self.model_units)) + new_spe = self.convert_units(old_spe, spe_units, self.model_units) + print("process", self) + if self.model is None: + return new_spe + elif self.enabled: + if isinstance(self.model, interpolate.RBFInterpolator): + new_spe.x = self.model(new_spe.x.reshape(-1, 1)) + elif isinstance(self.model, float): + new_spe.x = new_spe.x + self.model + else: + return new_spe + # convert back + if convert_back: + print("convert back", spe_units) + return self.convert_units(new_spe, self.model_units, spe_units) + else: + return new_spe + + def derive_model(self, find_kw={}, fit_peaks_kw={}, should_fit=False, name=None): + + # convert to ref_units + print("[{}]: convert spe_units {} to ref_units {}".format(self.name, self.spe_units, self.ref_units)) + spe_to_process = self.convert_units(self.spe, self.spe_units, self.ref_units) + print("max x", max(spe_to_process.x), self.ref_units) + fig, ax = plt.subplots(3, 1, figsize=(12, 4)) + self.spe.plot(ax=ax[0].twinx(), label=self.spe_units) + spe_to_process.plot(ax=ax[1], label=self.ref_units) + + # if should_fit: + # spe_pos_dict = spe_to_process.fit_peak_positions(center_err_threshold=1, + # find_peaks_kw=find_kw, fit_peaks_kw=fit_peaks_kw) # type: ignore + # else: + # find_kw = dict(sharpening=None) + # spe_pos_dict = spe_to_process.find_peak_multipeak(**find_kw).get_pos_ampl_dict() # type: ignore + # prominence = prominence, wlen=wlen, width=width + find_kw = dict(sharpening=None) + if should_fit: + spe_pos_dict = spe_to_process.fit_peak_positions( + center_err_threshold=10, find_peaks_kw=find_kw, fit_peaks_kw=fit_peaks_kw) # type: ignore + # fit_res = spe_to_process.fit_peak_multimodel(candidates=cand, **fit_peaks_kw) + # pos, amp = fit_res.center_amplitude(threshold=1) + # spe_pos_dict = dict(zip(pos, amp)) + else: + # prominence = prominence, wlen=wlen, width=width + cand = spe_to_process.find_peak_multipeak(**find_kw) + spe_pos_dict = cand.get_pos_ampl_dict() + + ax[2].stem(spe_pos_dict.keys(), spe_pos_dict.values(), linefmt='b-', basefmt=' ') + ax[2].twinx().stem(self.ref.keys(), self.ref.values(), linefmt='r-', basefmt=' ') + + x_spe, x_reference, x_distance, df = rc2utils.match_peaks(spe_pos_dict, self.ref) + sum_of_differences = np.sum(np.abs(x_spe - x_reference)) / len(x_spe) + print("sum_of_differences original {} {}".format(sum_of_differences, self.ref_units)) + if len(x_reference) == 1: + _offset = (x_reference[0] - x_spe[0]) + print("ref", x_reference[0], "sample", x_spe[0], "offset", _offset, self.ref_units) + self.set_model(_offset, self.ref_units, df, name) + else: + fig, ax = plt.subplots(1, 1, figsize=(3, 3)) + ax.scatter(x_spe, x_reference, marker='o') + ax.set_xlabel("spectrum x ({})".format(self.ref_units)) + ax.set_ylabel("reference x ({})".format(self.ref_units)) + try: + kwargs = {"kernel": "thin_plate_spline"} + interp = interpolate.RBFInterpolator(x_spe.reshape(-1, 1), x_reference, **kwargs) + self.set_model(interp, self.ref_units, df, name) + x_plot = np.linspace(min(x_spe), max(x_spe), 20) + y_plot = interp(x_plot.reshape(-1, 1)) + ax.scatter(x_plot, y_plot, marker='.', label=kwargs["kernel"]) + + except Exception as err: + raise err + + +class LazerZeroingComponent(CalibrationComponent): + def __init__(self, laser_wl, spe, spe_units="nm", ref={520.45: 1}, ref_units="cm-1", sample="Silicon"): + super(LazerZeroingComponent, self).__init__(laser_wl, spe, spe_units, ref, ref_units, sample) + self.profile = "Pearson4" + + def derive_model(self, find_kw={}, fit_peaks_kw={}, should_fit=True, name=None): + find_kw = dict(sharpening=None) + cand = self.spe.find_peak_multipeak(**find_kw) + print(self.name, cand) + # init_guess = self.spe.fit_peak_multimodel(profile='Pearson4', candidates=cand, no_fit=False) + fit_res = self.spe.fit_peak_multimodel(profile=self.profile, candidates=cand, **fit_peaks_kw) + df = fit_res.to_dataframe_peaks() + df = df.sort_values(by='height', ascending=False) + if df.empty: + raise Exception("No peaks found") + else: + if "position" in df.columns: + zero_peak_nm = df.iloc[0]["position"] + elif "center" in df.columns: + zero_peak_nm = df.iloc[0]["center"] + print(self.name, "peak", zero_peak_nm) + # https://www.elodiz.com/calibration-and-validation-of-raman-instruments/ + self.set_model(zero_peak_nm, "nm", df, "Lazer zeroing using {}".format(zero_peak_nm)) + # laser_wl should be calculated based on the peak position and set instead of the nominal + + def zero_nm_to_shift_cm_1(self, wl, zero_pos_nm, zero_ref_cm_1=520.45): + return 1e7*(1/zero_pos_nm - 1/wl) + zero_ref_cm_1 + + # we do not do shift (as initially implemented) + # just convert the spectrum nm->cm-1 using the Si measured peak in nm and reference in cm-1 + # https://www.elodiz.com/calibration-and-validation-of-raman-instruments/ + def process(self, old_spe: Spectrum, spe_units="nm", convert_back=False): + wl_si_ref = list(self.ref.keys())[0] + print(self.name, "process", self.model, wl_si_ref) + new_x = self.zero_nm_to_shift_cm_1(old_spe.x, self.model, wl_si_ref) + new_spe = Spectrum(x=new_x, y=old_spe.y, metadata=old_spe.meta) + # new_spe = old_spe.lazer_zero_nm_to_shift_cm_1(self.model, wl_si_ref) + # print("old si", old_spe.x) + # print("new si", new_spe.x) + return new_spe + + +class CalibrationModel(ProcessingModel, Plottable): + def __init__(self, laser_wl: int): + super(ProcessingModel, self).__init__() + super(Plottable, self).__init__() + self.set_laser_wavelength(laser_wl) + self.neon_wl = { + 785: rc2const.neon_wl_785_nist_dict, + 633: rc2const.neon_wl_633_nist_dict, + 532: rc2const.neon_wl_532_nist_dict + } + self.prominence_coeff = 10 + + def peaks(self, spe, profile='Gaussian', wlen=300, width=1): + cand = spe.find_peak_multipeak(prominence=spe.y_noise*self.prominence_coeff, wlen=wlen, width=width) + init_guess = spe.fit_peak_multimodel(profile=profile, candidates=cand, no_fit=True) + fit_res = spe.fit_peak_multimodel(profile=profile, candidates=cand) + return cand, init_guess, fit_res + + def set_laser_wavelength(self, laser_wl): + self.clear() + self.laser_wl = laser_wl + + def clear(self): + self.laser_wl = None + self.components = [] + + def save(self, filename): + with open(filename, "wb") as file: + pickle.dump(self, file) + + @staticmethod + def from_file(filename): + with open(filename, "rb") as file: + return pickle.load(file) + + def derive_model_x(self, spe_neon, spe_neon_units="cm-1", ref_neon=None, ref_neon_units="nm", spe_sil=None, + spe_sil_units="cm-1", ref_sil=None, ref_sil_units="cm-1", find_kw={}, fit_kw={}): + model_neon = self.derive_model_curve( + spe_neon, self.neon_wl[self.laser_wl], spe_units=spe_neon_units, ref_units=ref_neon_units, find_kw={}, + fit_peaks_kw={}, should_fit=False, name="Neon calibration") + spe_sil_ne_calib = model_neon.process(spe_sil, spe_units=spe_sil_units, convert_back=False) + find_kw = {"prominence": spe_sil_ne_calib.y_noise * 10, "wlen": 200, "width": 1} + model_si = self.derive_model_zero( + spe_sil_ne_calib, ref={520.45: 1}, spe_units="nm", ref_units=ref_sil_units, find_kw=find_kw, + fit_peaks_kw={}, should_fit=True, name="Si laser zeroing") + return (model_neon, model_si) + + def derive_model_curve(self, spe, ref, spe_units="cm-1", ref_units="nm", find_kw={}, fit_peaks_kw={}, + should_fit=False, name="X calibration"): + calibration_x = XCalibrationComponent(self.laser_wl, spe, spe_units, ref, ref_units) + calibration_x.derive_model(find_kw=find_kw, fit_peaks_kw=fit_peaks_kw, should_fit=should_fit, name=name) + self.components.append(calibration_x) + return calibration_x + + def derive_model_zero(self, spe, ref, spe_units="nm", ref_units="cm-1", find_kw={}, fit_peaks_kw={}, + should_fit=False, name="X Shift"): + calibration_shift = LazerZeroingComponent(self.laser_wl, spe, spe_units, ref, ref_units) + calibration_shift.profile = "Gaussian" + calibration_shift.derive_model(find_kw=find_kw, fit_peaks_kw=fit_peaks_kw, should_fit=should_fit, name=name) + self.components.append(calibration_shift) + return calibration_shift + + def apply_calibration_x(self, old_spe: Spectrum, spe_units="cm-1"): + # neon calibration converts to nm + # silicon calibration takes nm and converts back to cm-1 using laser zeroing + new_spe = old_spe + for model in self.components: + # TODO: tbd find out if to convert units + if model.enabled: + new_spe = model.process(new_spe, spe_units, convert_back=False) + return new_spe + + def _plot(self, ax, **kwargs): + pass diff --git a/src/ramanchada2/spectrum/calibration/by_deltas.py b/src/ramanchada2/spectrum/calibration/by_deltas.py index ae0f62d7..e27aaeac 100644 --- a/src/ramanchada2/spectrum/calibration/by_deltas.py +++ b/src/ramanchada2/spectrum/calibration/by_deltas.py @@ -1,17 +1,13 @@ #!/usr/bin/env python3 -from typing import Dict, List, Literal, Union - import lmfit import numpy as np import numpy.typing as npt -from pydantic import NonNegativeInt, validate_arguments -from scipy import interpolate - -from ramanchada2.misc.spectrum_deco import (add_spectrum_filter, - add_spectrum_method) - from ...misc import utils as rc2utils from ..spectrum import Spectrum +from pydantic import NonNegativeInt, validate_arguments +from ramanchada2.misc.spectrum_deco import add_spectrum_filter, add_spectrum_method +from scipy import interpolate +from typing import Dict, List, Literal, Union class DeltaSpeModel: @@ -223,5 +219,10 @@ def xcal_fine_RBF(old_spe: Spectrum, spe_cent = np.array(list(spe_pos_dict.keys())) spe_idx, ref_idx = rc2utils.find_closest_pairs_idx(spe_cent, ref_pos) - interp = interpolate.RBFInterpolator(spe_cent[spe_idx].reshape(-1, 1), ref_pos[ref_idx], **kwargs) - new_spe.x = interp(old_spe.x.reshape(-1, 1)) + if len(ref_idx) == 1: + _offset = (ref_pos[ref_idx][0] - spe_cent[spe_idx][0]) + new_spe.x = old_spe.x + _offset + else: + kwargs["kernel"] = kernel + interp = interpolate.RBFInterpolator(spe_cent[spe_idx].reshape(-1, 1), ref_pos[ref_idx], **kwargs) + new_spe.x = interp(old_spe.x.reshape(-1, 1)) diff --git a/src/ramanchada2/spectrum/filters/convolve.py b/src/ramanchada2/spectrum/filters/convolve.py index 5a5b2cfe..e0c23306 100644 --- a/src/ramanchada2/spectrum/filters/convolve.py +++ b/src/ramanchada2/spectrum/filters/convolve.py @@ -23,6 +23,7 @@ def convolve( Literal[ 'gaussian', 'lorentzian', 'voigt', 'pvoigt', 'moffat', + 'pearson4', 'pearson7' ]], **kwargs): """ @@ -33,7 +34,7 @@ def convolve( lineshape : callable, str or np.ndarray callable: should have a single positional argument x, ex: `lambda x: np.exp((x/5)**2)` predefined peak profile: - 'gaussian', 'lorentzian', 'voigt', 'pvoigt', 'moffat' + 'gaussian', 'lorentzian', 'voigt', 'pvoigt', 'moffat','pearson4' np.ndarray: lineshape in samples **kwargs : additional kwargs will be passed to lineshape function diff --git a/src/ramanchada2/spectrum/peaks/find_peaks.py b/src/ramanchada2/spectrum/peaks/find_peaks.py index 93d86dc5..6cda475d 100644 --- a/src/ramanchada2/spectrum/peaks/find_peaks.py +++ b/src/ramanchada2/spectrum/peaks/find_peaks.py @@ -1,17 +1,12 @@ #!/usr/bin/env python3 - -from typing import Union, Tuple, List, Literal -from scipy import signal import numpy as np -from pydantic import (validate_arguments, PositiveInt, - NonNegativeFloat, NonNegativeInt - ) - from ..spectrum import Spectrum -from ramanchada2.misc.spectrum_deco import (add_spectrum_method, - add_spectrum_filter) - +from pydantic import validate_arguments, PositiveInt, NonNegativeFloat, NonNegativeInt +from ramanchada2.misc.spectrum_deco import add_spectrum_method, add_spectrum_filter from ramanchada2.misc.types.peak_candidates import ListPeakCandidateMultiModel +from scipy import signal +from scipy.signal import find_peaks_cwt +from typing import Union, Tuple, List, Literal def peak_boundaries(spe, wlen, width, prominence): @@ -48,7 +43,7 @@ def find_peak_multipeak( hht_chain: Union[List[PositiveInt], None] = None, bgm_kwargs={}, sharpening: Union[Literal['hht'], None] = None, - strategy: Literal['topo', 'bayesian_gaussian_mixture', 'bgm'] = 'topo' + strategy: Literal['topo', 'bayesian_gaussian_mixture', 'bgm', 'cwt'] = 'topo' ) -> ListPeakCandidateMultiModel: if prominence is None: @@ -112,7 +107,32 @@ def interpolate(x): if peak_group: peak_groups.append(dict(boundaries=(x_arr[li], x_arr[ri]), peaks=peak_group)) - + elif strategy == 'cwt': + # TODO: cwt_args tbd + peaks = find_peaks_cwt(spe.y, **bgm_kwargs) + peak_list = list() + for peak_index in peaks: + half_max = spe.y[peak_index] / 2.0 + left_index = np.where(spe.y[:peak_index] <= half_max)[0][-1] + right_index = np.where(spe.y[peak_index:] <= half_max)[0][0] + peak_index + fwhm = spe.x[right_index] - spe.x[left_index] + # rough sigma estimation based on fwhm + sqrt2ln2 = 2 * np.sqrt(2 * np.log(2)) + # print(spe.x[peak_index], spe.y[peak_index], fwhm / sqrt2ln2 ) + peak_list.append(dict(amplitude=spe.y[peak_index], + position=spe.x[peak_index], + sigma=fwhm / sqrt2ln2, + fwhm=fwhm)) + for li, ri in boundaries: + peak_group = list() + for peak in peak_list: + if li < peak['position'] < ri: + peak_group.append(dict(position=peak['position'], + amplitude=peak['amplitude'], + sigma=peak['sigma'])) + if peak_group: + peak_groups.append(dict(boundaries=(x_arr[li], x_arr[ri]), + peaks=peak_group)) elif strategy == 'topo': for li, ri in boundaries: peak_group = list() diff --git a/src/ramanchada2/spectrum/peaks/fit_peaks.py b/src/ramanchada2/spectrum/peaks/fit_peaks.py index 7ba8efae..7c4be5e0 100644 --- a/src/ramanchada2/spectrum/peaks/fit_peaks.py +++ b/src/ramanchada2/spectrum/peaks/fit_peaks.py @@ -59,7 +59,8 @@ def build_multipeak_model_params(profile: Union[available_models_type, List[avai elif profile == 'Pearson4': fwhm_factor = 1 - fit_params[f'p{peak_i}_height'].set(value=peak.amplitude) + # p{peak_i}_amplitude or p{peak_i}_height + fit_params[f'p{peak_i}_amplitude'].set(value=peak.amplitude) fit_params[f'p{peak_i}_sigma'].set(value=peak.sigma/fwhm_factor) elif profile == 'Pearson7': diff --git a/src/ramanchada2/spectrum/spectrum.py b/src/ramanchada2/spectrum/spectrum.py index 1f36eef4..92a74bdb 100644 --- a/src/ramanchada2/spectrum/spectrum.py +++ b/src/ramanchada2/spectrum/spectrum.py @@ -1,23 +1,18 @@ #!/usr/bin/env python3 - from __future__ import annotations - import logging -from copy import deepcopy -from typing import Dict, List, Set, Tuple, Union - import numpy as np import numpy.typing as npt import pydantic -from scipy.signal import convolve, savgol_coeffs, savgol_filter -from scipy.stats import median_abs_deviation, rv_histogram - -from ramanchada2.io.HSDS import write_cha +from copy import deepcopy +from ramanchada2.io.HSDS import write_cha, write_nexus from ramanchada2.io.output.write_csv import write_csv as io_write_csv from ramanchada2.misc.plottable import Plottable from ramanchada2.misc.types import PositiveOddInt, SpeMetadataModel -from ramanchada2.misc.types.spectrum import (SpeProcessingListModel, - SpeProcessingModel) +from ramanchada2.misc.types.spectrum import SpeProcessingListModel, SpeProcessingModel +from scipy.signal import convolve, savgol_coeffs, savgol_filter +from scipy.stats import median_abs_deviation, rv_histogram +from typing import Dict, List, Set, Tuple, Union logger = logging.getLogger(__name__) @@ -73,8 +68,10 @@ def write_csv(self, filename, delimiter=',', newline='\n'): f.write(c) def write_cha(self, chafile, dataset): - write_cha(chafile, dataset, - self.x, self.y, self.meta.serialize()) + write_cha(chafile, dataset, self.x, self.y, self.meta.serialize()) + + def write_nexus(self, chafile, dataset): + write_nexus(chafile, dataset, self.x, self.y, self.meta.serialize()) def write_cache(self): if self._cachefile: diff --git a/tests/data/experimental/x_axis.txt b/tests/data/experimental/x_axis.txt new file mode 100644 index 00000000..c765ddfe --- /dev/null +++ b/tests/data/experimental/x_axis.txt @@ -0,0 +1,58 @@ +479 +480 +482 +483 +485 +486 +487 +489 +490 +492 +493 +494 +496 +497 +499 +500 +501 +503 +504 +506 +507 +508 +510 +511 +513 +514 +515 +517 +518 +520 +521 +522 +524 +525 +527 +528 +529 +531 +532 +534 +535 +536 +538 +539 +540 +542 +543 +545 +546 +547 +549 +550 +552 +553 +554 +556 +557 +558 diff --git a/tests/data/experimental/y_axis.txt b/tests/data/experimental/y_axis.txt new file mode 100644 index 00000000..7df7d4eb --- /dev/null +++ b/tests/data/experimental/y_axis.txt @@ -0,0 +1,58 @@ +8497 +8793 +8539 +8479 +8646 +8604 +8779 +8872 +8749 +8642 +8895 +8923 +9098 +9238 +9552 +9534 +9946 +10193 +10680 +11311 +11943 +13474 +15775 +20396 +30672 +57457 +113706 +143475 +89386 +46583 +27026 +18828 +15079 +13110 +12383 +11256 +10228 +10218 +10069 +9811 +9620 +9410 +9131 +9270 +9258 +8801 +8964 +8486 +8714 +9002 +8868 +8512 +8236 +8370 +8695 +8235 +8399 +8504 diff --git a/tests/peak/pearson4_test.py b/tests/peak/pearson4_test.py new file mode 100644 index 00000000..6edae21a --- /dev/null +++ b/tests/peak/pearson4_test.py @@ -0,0 +1,71 @@ +import numpy as np +import ramanchada2 as rc2 +import matplotlib.pyplot as plt + + +# https://github.com/lmfit/lmfit-py/blob/master/lmfit/lineshapes.py#L150C1-L150C75 +def test_pearson4(): + x1 = np.loadtxt('./tests/data/experimental/x_axis.txt') + y1 = np.loadtxt('./tests/data/experimental/y_axis.txt') + spe0 = rc2.spectrum.Spectrum(x=x1, y=y1) + peak_candidates = spe0.find_peak_multipeak(sharpening=None, strategy='topo', hht_chain=[150], wlen=200) + fitres = spe0.fit_peak_multimodel(profile='Pearson4', candidates=peak_candidates, no_fit=False) + fig, ax = plt.subplots(figsize=(35, 10)) + fitres.plot(ax=ax, label='peak fitted') + spe0.plot(ax=ax, label='experimental') + plt.savefig('pearson4.png') + + +def test_generate_and_fit_single(): + lines = {40: 20} + spe = rc2.spectrum.from_delta_lines(lines) + # ax = spe.plot(label="delta") + spe = spe.convolve('pearson4', sigma=3, skew=5) + # spe.plot(ax=ax.twinx(),label="convolve") + # plt.show() + dofit(spe, lines, shift=0, name='pearson4_synthetic_single') + + +def test_generate_and_fit_clean(): + lines = {40: 20, 150: 15, 200: 30, 500: 50, 550: 5} + spe = rc2.spectrum.from_delta_lines(lines).convolve('pearson4', sigma=5, skew=.5) + dofit(spe, lines, shift=0) + + +def test_generate_and_fit_noise(): + lines = {40: 20, 150: 15, 200: 30, 500: 50, 550: 5} + spe = rc2.spectrum.from_delta_lines(lines).convolve('pearson4', sigma=3, skew=.5) + spe = spe.add_baseline(n_freq=5, amplitude=3, pedestal=0, rng_seed=1111) + spe = spe.add_gaussian_noise(.1, rng_seed=1111) + # spe = spe.scale_xaxis_fun(lambda x: x - shift) + dofit(spe, lines, shift=0, name='pearson4_synthetic_noise') + + +def test_generate_and_fit_noise_shift(): + lines = {40: 20, 150: 15, 200: 30, 500: 50, 550: 5} + shift = 50 + spe = rc2.spectrum.from_delta_lines(lines).convolve('pearson4', sigma=3, skew=.5) + spe = spe.add_baseline(n_freq=5, amplitude=3, pedestal=0, rng_seed=1111) + spe = spe.add_gaussian_noise(.1, rng_seed=1111) + spe = spe.scale_xaxis_fun(lambda x: x - shift) + dofit(spe, lines, shift=shift, name='pearson4_synthetic_noise_shift') + + +def dofit(spe, lines, shift=0, name='pearson4_synthetic'): + candidates = spe.find_peak_multipeak(prominence=spe.y_noise*5, wlen=40, sharpening=None) + + true_pos = np.array(list(lines.keys())) + calc_pos = [i for gr in candidates for i in gr.positions] + fit_peaks = spe.fit_peak_multimodel(profile='Pearson4', candidates=candidates) + fit_peaks.to_dataframe_peaks().to_csv("{}.csv".format(name), index=False) + fit_pos = fit_peaks.locations + + fig, ax = plt.subplots(figsize=(24, 8)) + spe.plot(ax=ax, label=name) + fit_peaks.plot(ax=ax, individual_peaks=True) + + plt.savefig("{}.png".format(name)) + + assert len(true_pos) == len(calc_pos), 'wrong number of peaks found' + assert np.max(np.abs(true_pos - fit_pos - shift)) < 2, 'fit locations far from truth' + assert np.max(np.abs(true_pos - calc_pos - shift)) < 2, 'find_peaks locations far from truth'