Skip to content

Commit

Permalink
Implement spline based resampling; Implement L1 L2 norms
Browse files Browse the repository at this point in the history
Closes #146
  • Loading branch information
georgievgeorgi committed Sep 3, 2024
1 parent 35da5bd commit d7ae3d8
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 3 deletions.
13 changes: 11 additions & 2 deletions src/ramanchada2/spectrum/calibration/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
@validate_call(config=dict(arbitrary_types_allowed=True))
def normalize(old_spe: Spectrum,
new_spe: Spectrum, /,
strategy: Literal['unity', 'min_unity', 'unity_density', 'unity_area', 'minmax'] = 'minmax'):
strategy: Literal['unity', 'min_unity', 'unity_density', 'unity_area', 'minmax',
'L1', 'L2'] = 'minmax'):
"""
Normalize the spectrum.
Args:
strategy:
If `unity`: normalize to `sum(y)`. If `min_unity`: subtract the minimum and normalize to 'unity'. If
`unity_density`: normalize to `Σ(y_i*Δx_i)`. If `unity_area`: same as `unity_density`. If `minmax`: scale
amplitudes in range `[0, 1]`.
amplitudes in range `[0, 1]`. If 'L1' or 'L2': subtract the minimum and normalize to L1 or L2 norm.
"""
if strategy == 'unity':
res = old_spe.y
Expand All @@ -36,3 +37,11 @@ def normalize(old_spe: Spectrum,
res = old_spe.y - np.min(old_spe.y)
res /= np.max(res)
new_spe.y = res
elif strategy == 'L1':
res = old_spe.y - np.min(old_spe.y)
res /= np.linalg.norm(res, 1)
new_spe.y = res
elif strategy == 'L2':
res = old_spe.y - np.min(old_spe.y)
res /= np.linalg.norm(res)
new_spe.y = res
57 changes: 56 additions & 1 deletion src/ramanchada2/spectrum/filters/resampling.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Any, Callable, Literal, Optional, Tuple, Union
from typing import Any, Callable, Dict, Literal, Optional, Tuple, Union

import numpy as np
from pydantic import PositiveInt, validate_call
from scipy import fft, signal
from scipy.interpolate import (Akima1DInterpolator, CubicSpline,
PchipInterpolator)

from ramanchada2.misc.spectrum_deco import (add_spectrum_filter,
add_spectrum_method)
Expand Down Expand Up @@ -69,3 +71,56 @@ def resample_NUDFT_filter(old_spe: Spectrum,
xnew_bins=xnew_bins,
window=window,
cumulative=cumulative)


@add_spectrum_method
@validate_call(config=dict(arbitrary_types_allowed=True))
def resample_spline(spe: Spectrum, /,
x_range: Tuple[float, float] = (0, 4000),
xnew_bins: PositiveInt = 100,
spline: Literal['pchip', 'akima', 'makima', 'cubic_spline'] = 'makima',
interp_kw_args: Optional[Dict] = None,
cumulative: bool = False):

kw_args = {'extrapolate': False}
if spline == 'pchip':
spline_fn = PchipInterpolator
elif spline == 'akima':
spline_fn = Akima1DInterpolator
kw_args['method'] = 'akima'
elif spline == 'makima':
spline_fn = Akima1DInterpolator
kw_args['method'] = 'makima'
elif spline == 'cubic_spline':
spline_fn = CubicSpline
kw_args['bc_type'] = 'natural'

if interp_kw_args is not None:
kw_args.update(interp_kw_args)

x_new = np.linspace(x_range[0], x_range[1], xnew_bins, endpoint=False)
y_new = spline_fn(spe.x, spe.y, **kw_args)(x_new)

y_new[np.isnan(y_new)] = 0
if cumulative:
y_new = np.cumsum(y_new)
y_new /= y_new[-1]

return x_new, y_new


@add_spectrum_filter
@validate_call(config=dict(arbitrary_types_allowed=True))
def resample_spline_filter(old_spe: Spectrum,
new_spe: Spectrum, /,
x_range: Tuple[float, float] = (0, 4000),
xnew_bins: PositiveInt = 100,
spline: Literal['pchip', 'akima', 'makima', 'cubic_spline'] = 'makima',
interp_kw_args: Optional[Dict] = None,
cumulative: bool = False):
new_spe.x, new_spe.y = resample_spline(old_spe,
x_range=x_range,
xnew_bins=xnew_bins,
spline=spline,
interp_kw_args=interp_kw_args,
cumulative=cumulative)
12 changes: 12 additions & 0 deletions tests/spectrum/filters/test_resample_spline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import ramanchada2 as rc2
import numpy as np
from scipy.interpolate import CubicSpline


def test_resample_spline():
spe = rc2.spectrum.from_test_spe(2, laser_wl=['785'], sample=['PST']).normalize()
spe_tr = spe.trim_axes(method='x-axis', boundaries=(400, 2000))
res_spe = spe.resample_spline_filter((400, 2000), 1000, spline='akima', cumulative=False)
aaa = CubicSpline(spe_tr.x, spe_tr.y)(np.linspace(400, 2000, 1000, endpoint=False))
assert np.all(res_spe.x == np.linspace(400, 2000, 1000, endpoint=False))
assert np.allclose(aaa, res_spe.y, rtol=.05)

0 comments on commit d7ae3d8

Please sign in to comment.