diff --git a/src/ramanchada2/spectrum/calibration/normalize.py b/src/ramanchada2/spectrum/calibration/normalize.py index c45fa2de..6f90d4fa 100644 --- a/src/ramanchada2/spectrum/calibration/normalize.py +++ b/src/ramanchada2/spectrum/calibration/normalize.py @@ -10,7 +10,8 @@ @validate_call(config=dict(arbitrary_types_allowed=True)) def normalize(old_spe: Spectrum, new_spe: Spectrum, /, - strategy: Literal['unity', 'min_unity', 'unity_density', 'unity_area', 'minmax'] = 'minmax'): + strategy: Literal['unity', 'min_unity', 'unity_density', 'unity_area', 'minmax', + 'L1', 'L2'] = 'minmax'): """ Normalize the spectrum. @@ -18,7 +19,7 @@ def normalize(old_spe: Spectrum, strategy: If `unity`: normalize to `sum(y)`. If `min_unity`: subtract the minimum and normalize to 'unity'. If `unity_density`: normalize to `Σ(y_i*Δx_i)`. If `unity_area`: same as `unity_density`. If `minmax`: scale - amplitudes in range `[0, 1]`. + amplitudes in range `[0, 1]`. If 'L1' or 'L2': L1 or L2 norm without subtracting the pedestal. """ if strategy == 'unity': res = old_spe.y @@ -36,3 +37,9 @@ def normalize(old_spe: Spectrum, res = old_spe.y - np.min(old_spe.y) res /= np.max(res) new_spe.y = res + elif strategy == 'L1': + res /= np.linalg.norm(res, 1) + new_spe.y = res + elif strategy == 'L2': + res /= np.linalg.norm(res) + new_spe.y = res diff --git a/src/ramanchada2/spectrum/filters/resampling.py b/src/ramanchada2/spectrum/filters/resampling.py index 715f93e2..e7a53b27 100644 --- a/src/ramanchada2/spectrum/filters/resampling.py +++ b/src/ramanchada2/spectrum/filters/resampling.py @@ -1,8 +1,10 @@ -from typing import Any, Callable, Literal, Optional, Tuple, Union +from typing import Any, Callable, Dict, Literal, Optional, Tuple, Union import numpy as np from pydantic import PositiveInt, validate_call from scipy import fft, signal +from scipy.interpolate import (Akima1DInterpolator, CubicSpline, + PchipInterpolator) from ramanchada2.misc.spectrum_deco import (add_spectrum_filter, add_spectrum_method) @@ -69,3 +71,55 @@ def resample_NUDFT_filter(old_spe: Spectrum, xnew_bins=xnew_bins, window=window, cumulative=cumulative) + + +@add_spectrum_method +@validate_call(config=dict(arbitrary_types_allowed=True)) +def resample_spline(spe: Spectrum, /, + x_range: Tuple[float, float] = (0, 4000), + xnew_bins: PositiveInt = 100, + spline: Literal['pchip', 'akima', 'makima', 'cubic_spline'] = 'pchip', + interp_kw_args: Optional[Dict] = None, + cumulative: bool = False): + + kw_args: Dict[str, Any] = {} + if spline == 'pchip': + spline_fn = PchipInterpolator + elif spline == 'akima': + spline_fn = Akima1DInterpolator + elif spline == 'makima': + spline_fn = Akima1DInterpolator + kw_args['method'] = 'makima' + elif spline == 'cubic_spline': + spline_fn = CubicSpline + kw_args['bc_type'] = 'natural' + + if interp_kw_args is not None: + kw_args.update(interp_kw_args) + + x_new = np.linspace(x_range[0], x_range[1], xnew_bins, endpoint=False) + y_new = spline_fn(spe.x, spe.y, **kw_args)(x_new) + + y_new[np.isnan(y_new)] = 0 + if cumulative: + y_new = np.cumsum(y_new) + y_new /= y_new[-1] + + return x_new, y_new + + +@add_spectrum_filter +@validate_call(config=dict(arbitrary_types_allowed=True)) +def resample_spline_filter(old_spe: Spectrum, + new_spe: Spectrum, /, + x_range: Tuple[float, float] = (0, 4000), + xnew_bins: PositiveInt = 100, + spline: Literal['pchip', 'akima', 'makima', 'cubic_spline'] = 'pchip', + interp_kw_args: Optional[Dict] = None, + cumulative: bool = False): + new_spe.x, new_spe.y = resample_spline(old_spe, + x_range=x_range, + xnew_bins=xnew_bins, + spline=spline, + interp_kw_args=interp_kw_args, + cumulative=cumulative) diff --git a/tests/spectrum/filters/test_resample_spline.py b/tests/spectrum/filters/test_resample_spline.py new file mode 100644 index 00000000..863b8a35 --- /dev/null +++ b/tests/spectrum/filters/test_resample_spline.py @@ -0,0 +1,12 @@ +import ramanchada2 as rc2 +import numpy as np +from scipy.interpolate import CubicSpline + + +def test_resample_spline(): + spe = rc2.spectrum.from_test_spe(2, laser_wl=['785'], sample=['PST']).normalize() + spe_tr = spe.trim_axes(method='x-axis', boundaries=(400, 2000)) + res_spe = spe.resample_spline_filter((400, 2000), 1000, spline='akima', cumulative=False) + aaa = CubicSpline(spe_tr.x, spe_tr.y)(np.linspace(400, 2000, 1000, endpoint=False)) + assert np.all(res_spe.x == np.linspace(400, 2000, 1000, endpoint=False)) + assert np.allclose(aaa, res_spe.y, rtol=.05)