diff --git a/src/probnum/diffeq/odefilter/_odefilter.py b/src/probnum/diffeq/odefilter/_odefilter.py index cebfc3f2d..b91ab1420 100644 --- a/src/probnum/diffeq/odefilter/_odefilter.py +++ b/src/probnum/diffeq/odefilter/_odefilter.py @@ -189,9 +189,8 @@ def attempt_step(self, state, dt): # Read off system matrices; required for calibration / error estimation # Use only where a full call to forward_*() would be too costly. # We use the mathematical symbol `Phi` (and later, `H`), because this makes it easier to read for us. - # The system matrix H of the measurement model can be accessed after the first forward_*, - # therefore we read it off further below. discrete_dynamics = self.prior_process.transition.discretise(dt) + Phi = discrete_dynamics.transition_matrix # Split the current RV into a deterministic part and a noisy part. @@ -204,15 +203,16 @@ def attempt_step(self, state, dt): ) # Compute the measurements for the error-free component - pred_rv_error_free, _ = self.prior_process.transition.forward_realization( - error_free_state, t=state.t, dt=dt + pred_rv_error_free, _ = discrete_dynamics.forward_realization( + error_free_state, t=state.t ) - meas_rv_error_free, _ = self.measurement_model.forward_rv( - pred_rv_error_free, t=state.t + dt + linearized_observation = self.measurement_model.linearize( + state.t + dt, pred_rv_error_free ) - H = self.measurement_model.linearized_model.transition_matrix_fun( - t=state.t + dt + meas_rv_error_free, _ = linearized_observation.forward_rv( + pred_rv_error_free, t=state.t + dt ) + H = linearized_observation.transition_matrix_fun(t=state.t + dt) # Compute the measurements for the full components. # Since the means of noise-free and noisy measurements coincide, diff --git a/src/probnum/filtsmooth/gaussian/_kalmanposterior.py b/src/probnum/filtsmooth/gaussian/_kalmanposterior.py index f85872894..226b31d79 100644 --- a/src/probnum/filtsmooth/gaussian/_kalmanposterior.py +++ b/src/probnum/filtsmooth/gaussian/_kalmanposterior.py @@ -20,7 +20,6 @@ approx.DiscreteUKFComponent, randprocs.markov.continuous.LinearSDE, approx.ContinuousEKFComponent, - approx.ContinuousUKFComponent, ] """Any linear(ized) transition can define an (approximate) Gauss-Markov prior.""" diff --git a/src/probnum/filtsmooth/gaussian/approx/__init__.py b/src/probnum/filtsmooth/gaussian/approx/__init__.py index 48ef276e5..2333e6b5a 100644 --- a/src/probnum/filtsmooth/gaussian/approx/__init__.py +++ b/src/probnum/filtsmooth/gaussian/approx/__init__.py @@ -1,26 +1,17 @@ """Approximate Gaussian filtering and smoothing.""" -from ._extendedkalman import ContinuousEKFComponent, DiscreteEKFComponent, EKFComponent -from ._unscentedkalman import ContinuousUKFComponent, DiscreteUKFComponent, UKFComponent -from ._unscentedtransform import UnscentedTransform +from ._extendedkalman import ContinuousEKFComponent, DiscreteEKFComponent +from ._unscentedkalman import DiscreteUKFComponent # Public classes and functions. Order is reflected in documentation. __all__ = [ - "EKFComponent", "ContinuousEKFComponent", "DiscreteEKFComponent", - "UKFComponent", - "ContinuousUKFComponent", "DiscreteUKFComponent", - "UnscentedTransform", ] # Set correct module paths (for superclasses). # Corrects links and module paths in documentation. -EKFComponent.__module__ = "probnum.filtsmooth.gaussian.approx" ContinuousEKFComponent.__module__ = "probnum.filtsmooth.gaussian.approx" DiscreteEKFComponent.__module__ = "probnum.filtsmooth.gaussian.approx" -UKFComponent.__module__ = "probnum.filtsmooth.gaussian.approx" -ContinuousUKFComponent.__module__ = "probnum.filtsmooth.gaussian.approx" DiscreteUKFComponent.__module__ = "probnum.filtsmooth.gaussian.approx" -UnscentedTransform.__module__ = "probnum.filtsmooth.gaussian.approx" diff --git a/src/probnum/filtsmooth/gaussian/approx/_extendedkalman.py b/src/probnum/filtsmooth/gaussian/approx/_extendedkalman.py index c34a9a611..61ef8de29 100644 --- a/src/probnum/filtsmooth/gaussian/approx/_extendedkalman.py +++ b/src/probnum/filtsmooth/gaussian/approx/_extendedkalman.py @@ -1,130 +1,13 @@ -"""Gaussian filtering and smoothing based on making intractable quantities tractable -through Taylor-method approximations, e.g. linearization.""" - -import abc -from typing import Dict, Tuple +"""Extended Kalman filtering.""" from probnum import randprocs, randvars - -class EKFComponent(abc.ABC): - """Interface for extended Kalman filtering components.""" - - def __init__( - self, - non_linear_model, - ) -> None: - - self.non_linear_model = non_linear_model - - # Will be constructed later - self.linearized_model = None - - def forward_realization( - self, - realization, - t, - dt=None, - compute_gain=False, - _diffusion=1.0, - _linearise_at=None, - ) -> Tuple[randvars.Normal, Dict]: - """Approximate forward-propagation of a realization of a random variable.""" - compute_jacobian_at = ( - _linearise_at - if _linearise_at is not None - else randvars.Constant(realization) - ) - self.linearized_model = self.linearize(at_this_rv=compute_jacobian_at) - return self.linearized_model.forward_realization( - realization=realization, - t=t, - dt=dt, - compute_gain=compute_gain, - _diffusion=_diffusion, - ) - - def forward_rv( - self, - rv, - t, - dt=None, - compute_gain=False, - _diffusion=1.0, - _linearise_at=None, - ) -> Tuple[randvars.Normal, Dict]: - """Approximate forward-propagation of a random variable.""" - - compute_jacobian_at = _linearise_at if _linearise_at is not None else rv - self.linearized_model = self.linearize(at_this_rv=compute_jacobian_at) - return self.linearized_model.forward_rv( - rv=rv, - t=t, - dt=dt, - compute_gain=compute_gain, - _diffusion=_diffusion, - ) - - def backward_realization( - self, - realization_obtained, - rv, - rv_forwarded=None, - gain=None, - t=None, - dt=None, - _diffusion=1.0, - _linearise_at=None, - ): - """Approximate backward-propagation of a realization of a random variable.""" - - return self._backward_realization_via_backward_rv( - realization_obtained, - rv=rv, - rv_forwarded=rv_forwarded, - gain=gain, - t=t, - dt=dt, - _diffusion=_diffusion, - _linearise_at=_linearise_at, - ) - - def backward_rv( - self, - rv_obtained, - rv, - rv_forwarded=None, - gain=None, - t=None, - dt=None, - _diffusion=1.0, - _linearise_at=None, - ): - """Approximate backward-propagation of a random variable.""" - - compute_jacobian_at = _linearise_at if _linearise_at is not None else rv - self.linearized_model = self.linearize(at_this_rv=compute_jacobian_at) - return self.linearized_model.backward_rv( - rv_obtained=rv_obtained, - rv=rv, - rv_forwarded=rv_forwarded, - gain=gain, - t=t, - dt=dt, - _diffusion=_diffusion, - ) - - @abc.abstractmethod - def linearize( - self, at_this_rv: randvars.RandomVariable - ) -> randprocs.markov.Transition: - """Linearize the transition and make it tractable.""" - raise NotImplementedError +from ._interface import _LinearizationInterface # Order of inheritance matters, because forward and backward # are defined in EKFComponent, and must not be inherited from SDE. -class ContinuousEKFComponent(EKFComponent, randprocs.markov.continuous.SDE): +class ContinuousEKFComponent(_LinearizationInterface, randprocs.markov.continuous.SDE): """Continuous-time extended Kalman filter transition. Parameters @@ -163,7 +46,7 @@ def __init__( dispersion_function=non_linear_model.dispersion_function, drift_jacobian=non_linear_model.drift_jacobian, ) - EKFComponent.__init__(self, non_linear_model=non_linear_model) + _LinearizationInterface.__init__(self, non_linear_model=non_linear_model) self.mde_atol = mde_atol self.mde_rtol = mde_rtol @@ -171,7 +54,7 @@ def __init__( self.forward_implementation = forward_implementation - def linearize(self, at_this_rv: randvars.Normal): + def linearize(self, t, at_this_rv: randvars.Normal): """Linearize the drift function with a first order Taylor expansion.""" g = self.non_linear_model.drift_function @@ -202,7 +85,9 @@ def dispersion_matrix_function(t): ) -class DiscreteEKFComponent(EKFComponent, randprocs.markov.discrete.NonlinearGaussian): +class DiscreteEKFComponent( + _LinearizationInterface, randprocs.markov.discrete.NonlinearGaussian +): """Discrete extended Kalman filter transition.""" def __init__( @@ -220,12 +105,12 @@ def __init__( noise_fun=non_linear_model.noise_fun, transition_fun_jacobian=non_linear_model.transition_fun_jacobian, ) - EKFComponent.__init__(self, non_linear_model=non_linear_model) + _LinearizationInterface.__init__(self, non_linear_model=non_linear_model) self.forward_implementation = forward_implementation self.backward_implementation = backward_implementation - def linearize(self, at_this_rv: randvars.Normal): + def linearize(self, t, at_this_rv: randvars.Normal): """Linearize the dynamics function with a first order Taylor expansion.""" g = self.non_linear_model.transition_fun diff --git a/src/probnum/filtsmooth/gaussian/approx/_interface.py b/src/probnum/filtsmooth/gaussian/approx/_interface.py new file mode 100644 index 000000000..a8760973d --- /dev/null +++ b/src/probnum/filtsmooth/gaussian/approx/_interface.py @@ -0,0 +1,118 @@ +"""Temporary interface. + +Will eventually be removed as a part of the refactoring detailed in issue #627. +""" + +from typing import Dict, Tuple + +from probnum import randprocs, randvars + + +class _LinearizationInterface: + """Interface for extended Kalman filtering components.""" + + def __init__( + self, + non_linear_model, + ) -> None: + + self.non_linear_model = non_linear_model + + def forward_realization( + self, + realization, + t, + dt=None, + compute_gain=False, + _diffusion=1.0, + _linearise_at=None, + ) -> Tuple[randvars.Normal, Dict]: + """Approximate forward-propagation of a realization of a random variable.""" + compute_jacobian_at = ( + _linearise_at + if _linearise_at is not None + else randvars.Constant(realization) + ) + linearized_model = self.linearize(t=t, at_this_rv=compute_jacobian_at) + return linearized_model.forward_realization( + realization=realization, + t=t, + dt=dt, + compute_gain=compute_gain, + _diffusion=_diffusion, + ) + + def forward_rv( + self, + rv, + t, + dt=None, + compute_gain=False, + _diffusion=1.0, + _linearise_at=None, + ) -> Tuple[randvars.Normal, Dict]: + """Approximate forward-propagation of a random variable.""" + + compute_jacobian_at = _linearise_at if _linearise_at is not None else rv + linearized_model = self.linearize(t=t, at_this_rv=compute_jacobian_at) + return linearized_model.forward_rv( + rv=rv, + t=t, + dt=dt, + compute_gain=compute_gain, + _diffusion=_diffusion, + ) + + def backward_realization( + self, + realization_obtained, + rv, + rv_forwarded=None, + gain=None, + t=None, + dt=None, + _diffusion=1.0, + _linearise_at=None, + ): + """Approximate backward-propagation of a realization of a random variable.""" + return self._backward_realization_via_backward_rv( + realization_obtained, + rv=rv, + rv_forwarded=rv_forwarded, + gain=gain, + t=t, + dt=dt, + _diffusion=_diffusion, + _linearise_at=_linearise_at, + ) + + def backward_rv( + self, + rv_obtained, + rv, + rv_forwarded=None, + gain=None, + t=None, + dt=None, + _diffusion=1.0, + _linearise_at=None, + ): + """Approximate backward-propagation of a random variable.""" + + compute_jacobian_at = _linearise_at if _linearise_at is not None else rv + linearized_model = self.linearize(t=t, at_this_rv=compute_jacobian_at) + return linearized_model.backward_rv( + rv_obtained=rv_obtained, + rv=rv, + rv_forwarded=rv_forwarded, + gain=gain, + t=t, + dt=dt, + _diffusion=_diffusion, + ) + + def linearize( + self, t, at_this_rv: randvars.RandomVariable + ) -> randprocs.markov.Transition: + """Linearize the transition and make it tractable.""" + raise NotImplementedError diff --git a/src/probnum/filtsmooth/gaussian/approx/_unscentedkalman.py b/src/probnum/filtsmooth/gaussian/approx/_unscentedkalman.py index ab7407900..92a2a60ae 100644 --- a/src/probnum/filtsmooth/gaussian/approx/_unscentedkalman.py +++ b/src/probnum/filtsmooth/gaussian/approx/_unscentedkalman.py @@ -1,263 +1,156 @@ -"""General Gaussian filters based on approximating intractable quantities with numerical -quadrature. - -Examples include the unscented Kalman filter / RTS smoother which is based on a third -degree fully symmetric rule. -""" - -from typing import Dict, Optional, Tuple +"""Unscented Kalman filtering / spherical cubature Kalman filtering.""" import numpy as np +import scipy.linalg from probnum import randprocs, randvars -from probnum.filtsmooth.gaussian.approx import _unscentedtransform -from probnum.typing import FloatLike - -class UKFComponent: - """Interface for unscented Kalman filtering components.""" +from ._interface import _LinearizationInterface - def __init__( - self, - non_linear_model, - spread: Optional[FloatLike] = 1e-4, - priorpar: Optional[FloatLike] = 2.0, - special_scale: Optional[FloatLike] = 0.0, - ) -> None: - self.non_linear_model = non_linear_model - self.ut = _unscentedtransform.UnscentedTransform( - non_linear_model.input_dim, spread, priorpar, special_scale - ) - # Determine the linearization. - # Will be constructed later. - self.sigma_points = None - - def assemble_sigma_points(self, at_this_rv: randvars.Normal) -> np.ndarray: - """Assemble the sigma-points.""" - return self.ut.sigma_points(at_this_rv) - - -class ContinuousUKFComponent(UKFComponent, randprocs.markov.continuous.SDE): - """Continuous-time unscented Kalman filter transition. - - Parameters - ---------- - non_linear_model - Non-linear continuous-time model (:class:`SDE`) that - is approximated with the UKF. - mde_atol - Absolute tolerance passed to the solver - of the moment differential equations (MDEs). Optional. - mde_rtol - Relative tolerance passed to the solver - of the moment differential equations (MDEs). Optional. - mde_solver - Method that is chosen in `scipy.integrate.solve_ivp`. - Any string that is compatible with - ``solve_ivp(..., method=mde_solve,...)`` works here. - Usual candidates are ``[RK45, LSODA, Radau, BDF, RK23, DOP853]``. - Optional. Default is LSODA. - """ +class DiscreteUKFComponent( + _LinearizationInterface, randprocs.markov.discrete.NonlinearGaussian +): + """Discrete unscented Kalman filter transition.""" def __init__( self, non_linear_model, - spread: Optional[FloatLike] = 1e-4, - priorpar: Optional[FloatLike] = 2.0, - special_scale: Optional[FloatLike] = 0.0, - mde_atol: Optional[FloatLike] = 1e-6, - mde_rtol: Optional[FloatLike] = 1e-6, - mde_solver: Optional[str] = "LSODA", + forward_implementation="classic", + backward_implementation="classic", ) -> None: + _LinearizationInterface.__init__(self, non_linear_model) - UKFComponent.__init__( - self, - non_linear_model, - spread=spread, - priorpar=priorpar, - special_scale=special_scale, - ) - randprocs.markov.continuous.SDE.__init__( + randprocs.markov.discrete.NonlinearGaussian.__init__( self, - state_dimension=non_linear_model.state_dimension, - wiener_process_dimension=non_linear_model.wiener_process_dimension, - drift_function=non_linear_model.drift_function, - dispersion_function=non_linear_model.dispersion_function, - drift_jacobian=non_linear_model.drift_jacobian, + input_dim=non_linear_model.input_dim, + output_dim=non_linear_model.output_dim, + transition_fun=non_linear_model.transition_fun, + transition_fun_jacobian=non_linear_model.transition_fun_jacobian, + noise_fun=non_linear_model.noise_fun, ) - self.mde_atol = mde_atol - self.mde_rtol = mde_rtol - self.mde_solver = mde_solver - raise NotImplementedError( - "Implementation of the continuous UKF is incomplete. It cannot be used." + self._cubature_params = _spherical_cubature_unit_params( + dim=non_linear_model.input_dim ) + self._forward_implementation = forward_implementation + self._backward_implementation = backward_implementation - def forward_realization( - self, - realization, - t, - dt=None, - compute_gain=False, - _diffusion=1.0, - _linearise_at=None, - ) -> Tuple[randvars.Normal, Dict]: - return self._forward_realization_as_rv( - realization, + @property + def dimension(self) -> int: + """Dimension of the state-space associated with the UKF.""" + return self.ut.dimension + + def linearize( + self, t, at_this_rv: randvars.RandomVariable + ) -> randprocs.markov.Transition: + """Linearize the transition and make it tractable.""" + return _linearize_via_cubature( t=t, - dt=dt, - compute_gain=compute_gain, - _diffusion=_diffusion, - _linearise_at=_linearise_at, + model=self.non_linear_model, + rv=at_this_rv, + unit_params=self._cubature_params, + forw_impl=self._forward_implementation, + backw_impl=self._backward_implementation, ) - def forward_rv( - self, rv, t, dt=None, compute_gain=False, _diffusion=1.0, _linearise_at=None - ) -> Tuple[randvars.Normal, Dict]: - raise NotImplementedError("TODO") # Issue #234 - def backward_realization( - self, - realization_obtained, - rv, - rv_forwarded=None, - gain=None, - t=None, - dt=None, - _diffusion=1.0, - _linearise_at=None, - ): - return self._backward_realization_as_rv( - realization_obtained, - rv=rv, - rv_forwarded=rv_forwarded, - gain=gain, - t=t, - dt=dt, - _diffusion=_diffusion, - _linearise_at=_linearise_at, - ) +def _spherical_cubature_unit_params(*, dim): + """Return sigma points and weights for spherical cubature integration. - def backward_rv( - self, - rv_obtained, - rv, - rv_forwarded=None, - gain=None, - t=None, - dt=None, - _diffusion=1.0, - _linearise_at=None, - ): - raise NotImplementedError("Not available (yet).") - - -class DiscreteUKFComponent(UKFComponent, randprocs.markov.discrete.NonlinearGaussian): - """Discrete unscented Kalman filter transition.""" + Reference: + Bayesian Filtering and Smoothing. Simo Särkkä. Page 111. + """ + s, I, zeros = np.sqrt(dim), np.eye(dim), np.zeros((1, dim)) + unit_sigma_points = s * np.concatenate((zeros, I, -I), axis=0) + weights_mean, weights_cov = _weights(dim) + return unit_sigma_points, (weights_mean, weights_cov) - def __init__( - self, - non_linear_model, - spread: Optional[FloatLike] = 1e-4, - priorpar: Optional[FloatLike] = 2.0, - special_scale: Optional[FloatLike] = 0.0, - ) -> None: - UKFComponent.__init__( - self, - non_linear_model, - spread=spread, - priorpar=priorpar, - special_scale=special_scale, - ) - randprocs.markov.discrete.NonlinearGaussian.__init__( - self, - input_dim=non_linear_model.input_dim, - output_dim=non_linear_model.output_dim, - transition_fun=non_linear_model.transition_fun, - transition_fun_jacobian=non_linear_model.transition_fun_jacobian, - noise_fun=non_linear_model.noise_fun, - ) +def _weights(dim): + spread, priorpar, special_scale = 1.0, 0.0, 0.0 + scale = spread**2 * (dim + special_scale) - dim - def forward_rv( - self, rv, t, compute_gain=False, _diffusion=1.0, _linearise_at=None, **kwargs - ) -> Tuple[randvars.Normal, Dict]: - compute_sigmapts_at = _linearise_at if _linearise_at is not None else rv - self.sigma_points = self.assemble_sigma_points(at_this_rv=compute_sigmapts_at) + weights_mean = _weights_mean(dim, scale) + weights_cov = _weights_cov(dim, priorpar, scale, spread) + return weights_mean, weights_cov - proppts = self.ut.propagate( - t, self.sigma_points, self.non_linear_model.transition_fun - ) - meascov = _diffusion * self.non_linear_model.noise_fun(t).cov - mean, cov, crosscov = self.ut.estimate_statistics( - proppts, self.sigma_points, meascov, rv.mean - ) - info = {"crosscov": crosscov} - if compute_gain: - gain = crosscov @ np.linalg.inv(cov) - info["gain"] = gain - return randvars.Normal(mean, cov), info - - def forward_realization( - self, realization, t, _diffusion=1.0, _linearise_at=None, **kwargs - ): - - return self._forward_realization_via_forward_rv( - realization, - t=t, - compute_gain=False, - _diffusion=_diffusion, - _linearise_at=_linearise_at, - ) - def backward_rv( - self, - rv_obtained, - rv, - rv_forwarded=None, - gain=None, - t=None, - _diffusion=1.0, - _linearise_at=None, - **kwargs - ): - - # this method is inherited from NonlinearGaussian. - return self._backward_rv_classic( - rv_obtained, - rv, - rv_forwarded, - gain=gain, - t=t, - _diffusion=_diffusion, - _linearise_at=None, - ) +def _weights_mean(dim, scale): + mw0 = np.ones(1) * scale / (dim + scale) + mw = np.ones(2 * dim) / (2.0 * (dim + scale)) + weights_mean = np.hstack((mw0, mw)) + return weights_mean - def backward_realization( - self, - realization_obtained, - rv, - rv_forwarded=None, - gain=None, - t=None, - _diffusion=1.0, - _linearise_at=None, - **kwargs - ): - - # this method is inherited from NonlinearGaussian. - return self._backward_realization_via_backward_rv( - realization_obtained, - rv, - rv_forwarded, - gain=gain, - t=t, - _diffusion=_diffusion, - _linearise_at=_linearise_at, - ) - @property - def dimension(self) -> int: - """Dimension of the state-space associated with the UKF.""" - return self.ut.dimension +def _weights_cov(dim, priorpar, scale, spread): + cw0 = np.ones(1) * scale / (dim + scale) + (1 - spread**2 + priorpar) + cw = np.ones(2 * dim) / (2.0 * (dim + scale)) + weights_cov = np.hstack((cw0, cw)) + return weights_cov + + +def _linearize_via_cubature(*, t, model, rv, unit_params, forw_impl, backw_impl): + """Linearize a nonlinear model statistically with spherical cubature integration.""" + + sigma_points_unit, weights = unit_params + sigma_points = sigma_points_unit @ rv.cov_cholesky.T + rv.mean[None, :] + + sigma_points_transitioned = np.stack( + [model.transition_fun(t, p) for p in sigma_points], axis=0 + ) + + mat, noise_approx = _linearization_system_matrices( + rv_in=rv, + weights=weights, + pts=sigma_points, + pts_transitioned=sigma_points_transitioned, + ) + + def new_noise(s): + noise_model = model.noise_fun(s) + return noise_model + noise_approx + + return randprocs.markov.discrete.LinearGaussian( + input_dim=model.input_dim, + output_dim=model.output_dim, + transition_matrix_fun=lambda _: mat, + noise_fun=new_noise, + forward_implementation=forw_impl, + backward_implementation=backw_impl, + ) + + +def _linearization_system_matrices(*, rv_in, weights, pts, pts_transitioned): + """Notation loosely taken from https://arxiv.org/pdf/2102.00514.pdf.""" + + pts_centered = pts - rv_in.mean[None, :] + rv_out, crosscov = _match_moments( + x_centered=pts_centered, fx=pts_transitioned, weights=weights + ) + + F = scipy.linalg.solve( + rv_in.cov + 1e-12 * np.eye(*rv_in.cov.shape), crosscov, assume_a="sym" + ).T + mean = rv_out.mean - F @ rv_in.mean + cov = rv_out.cov - crosscov.T @ F.T + return F, randvars.Normal(mean=mean, cov=cov) + + +def _match_moments(*, x_centered, fx, weights): + + weights_mean, weights_cov = weights + + fx_mean = weights_mean @ fx + fx_centered = fx - fx_mean[None, :] + + crosscov = _approx_outer_product(weights_cov, x_centered, fx_centered) + fx_cov = _approx_outer_product(weights_cov, fx_centered, fx_centered) + + return randvars.Normal(mean=fx_mean, cov=fx_cov), crosscov + + +def _approx_outer_product(w, a, b): + outer_product_pt = np.einsum("ijx,ikx->ijk", a[..., None], b[..., None]) + outer_product = np.einsum("i,ijk->jk", w, outer_product_pt) + return outer_product diff --git a/src/probnum/filtsmooth/gaussian/approx/_unscentedtransform.py b/src/probnum/filtsmooth/gaussian/approx/_unscentedtransform.py deleted file mode 100644 index b17e1469d..000000000 --- a/src/probnum/filtsmooth/gaussian/approx/_unscentedtransform.py +++ /dev/null @@ -1,264 +0,0 @@ -"""Unscented Transform.""" - - -from typing import Callable, Tuple - -import numpy as np - -from probnum import randvars -from probnum.typing import ArrayLike, FloatLike - - -class UnscentedTransform: - """Used for unscented Kalman filter. - - See also p. 7 ("Unscented transform:") of [1]_. - - Parameters - ---------- - dimension : int - Spatial dimensionality - spread : float - Spread of the sigma points around mean - priorpar : float - Incorporate prior knowledge about distribution of x. - For Gaussians, 2.0 is optimal (see link below) - special_scale : float - Secondary scaling parameter. - The primary parameter is computed below. - - References - ---------- - .. [1] Wan, E. A. and van der Merwe, R., The Unscented Kalman Filter, - http://read.pudn.com/downloads135/ebook/574389/wan01unscented.pdf - """ - - def __init__(self, dimension, spread=1e-4, priorpar=2.0, special_scale=0.0): - self.scale = _compute_scale(dimension, spread, special_scale) - self.dimension = dimension - self.mweights, self.cweights = _unscented_weights( - spread, priorpar, self.dimension, self.scale - ) - - def sigma_points(self, rv: randvars.Normal) -> np.ndarray: - """Sigma points. - - Parameters - ---------- - rv - Gaussian random variable. Shape (d,) - - Raises - ------ - ValueError - If the random variable does not match the dimension of the UT. - - Returns - ------- - np.ndarray - Shape (2 * d + 1, d). Sigma points. - """ - if len(rv.mean) != self.dimension: - raise ValueError("Dimensionality does not match UT") - sigpts = np.zeros((2 * self.dimension + 1, self.dimension)) - sqrtcovar = rv.cov_cholesky - sigpts[0] = rv.mean.copy() - for idx in range(self.dimension): - sigpts[idx + 1] = ( - rv.mean + np.sqrt(self.dimension + self.scale) * sqrtcovar[:, idx] - ) - sigpts[self.dimension + 1 + idx] = ( - rv.mean - np.sqrt(self.dimension + self.scale) * sqrtcovar[:, idx] - ) - return sigpts - - def propagate( - self, time: FloatLike, sigmapts: ArrayLike, modelfct: Callable - ) -> np.ndarray: - """Propagate sigma points. - - Parameters - ---------- - time - Time :math:`t` which is passed on to the modelfunction. - sigmapts - shape=(2 N+1, N). - Sigma points (N is the spatial dimension of the dynamic model) - modelfct - Function through which to propagate - - Returns - ------- - np.ndarray - Shape=(2 N + 1, M). M is the dimension of the measurement model - """ - propsigpts = np.array([modelfct(time, pt) for pt in sigmapts]) - return propsigpts - - def estimate_statistics(self, proppts, sigpts, covmat, mpred): - """Computes predicted summary statistics, predicted - mean/kernels/crosscovariance, from (propagated) sigmapoints. - - Not to be confused with mean and kernels resulting from the prediction step of - the Bayesian filter. Hence we call it "estimate_*" instead of "predict_*". - """ - estmean = _estimate_mean(self.mweights, proppts) - estcovar = _estimate_covar(self.cweights, proppts, estmean, covmat) - estcrosscovar = _estimate_crosscovar( - self.cweights, proppts, estmean, sigpts, mpred - ) - return estmean, estcovar, estcrosscovar - - -def _compute_scale(dimension, spread, special_scale): - """See BFaS; p. 83. - - Parameters - ---------- - dimension: int - Spatial dimensionality of state space model - spread: float - Spread of sigma points around mean (1; alpha) - special_scale: float - Spread of sigma points around mean (2; kappa) - - Returns - ------- - float - Scaling parameter for unscented transform - """ - return spread**2 * (dimension + special_scale) - dimension - - -def _unscented_weights( - spread: FloatLike, priorpar: FloatLike, dimension: int, scale: FloatLike -) -> Tuple[np.ndarray, np.ndarray]: - """See BFaS; p. 84. - - Parameters - ---------- - spread - Spread of sigma points around mean (alpha) - priorpar - Prior information parameter (beta) - dimension - Dimension of the state space - scale - Scaling parameter for unscented transform - - Returns - ------- - np.ndarray - Shape (2 * dimension + 1,). constant mean weights. - np.ndarray - Shape (2 * dimension + 1,). constant kernels weights. - """ - mweights = _meanweights(dimension, scale) - cweights = _covarweights(dimension, spread, priorpar, scale) - return mweights, cweights - - -def _meanweights(dimension: int, lam: FloatLike) -> np.ndarray: - """Mean weights. - - Parameters - ---------- - dimension - Spatial dimensionality of state space model - lam - Scaling parameter for unscented transform (lambda) - - Returns - ------- - np.ndarray - Shape (2*dimension+1,). Constant mean weights. - """ - mw0 = np.ones(1) * lam / (dimension + lam) - mw = np.ones(2 * dimension) / (2.0 * (dimension + lam)) - return np.hstack((mw0, mw)) - - -def _covarweights( - dimension: int, alp: FloatLike, bet: FloatLike, lam: FloatLike -) -> np.ndarray: - """Covariance weights. - - Parameters - ---------- - dimension - Spatial dimensionality of state space model - alp - Spread of sigma points around mean (alpha) - bet - Prior information parameter (beta) - lam - Scaling parameter for unscented transform (lambda) - """ - cw0 = np.ones(1) * lam / (dimension + lam) + (1 - alp**2 + bet) - cw = np.ones(2 * dimension) / (2.0 * (dimension + lam)) - covarweights = np.hstack((cw0, cw)) - return covarweights - - -def _estimate_mean(mweights: np.ndarray, proppts: np.ndarray) -> np.ndarray: - """See BFaS; p. 88. - - Parameters - ---------- - mweights - shape (2*dimension + 1,) Constant mean weights for unscented transform. - proppts - shape (2*dimension + 1, dimension) Propagated sigma points - """ - return mweights @ proppts - - -def _estimate_covar(cweights, proppts, mean, covmat): - """See BFaS; p. 88. - - Parameters - ---------- - cweights: np.ndarray - Shape (2*dimension + 1,). Constant kernels weights for unscented transform. - proppts: np.ndarray - Shape (2*dimension + 1, dimension). Propagated sigma points - mean: np.ndarray - Shape (dimension,). Result of _estimate_mean(...) - covmat: np.ndarray - Shape (dimension, dimension). Covariance of measurement model at current time. - - Returns - ------- - np.ndarray - Shape (dimension, dimension). Estimated kernels. - """ - cent = proppts - mean - empcov = cent.T @ (cweights * cent.T).T - return empcov + covmat - - -def _estimate_crosscovar(cweights, proppts, mean, sigpts, mpred): - """See BFaS; p.88. - - Parameters - ---------- - cweights: np.ndarray - Shape (2*dimension + 1,). Constant kernels weights for unscented transform. - sigpts: np.ndarray - Shape (2*dimension + 1, dimension). Sigma points - mpred: np.ndarray - Shape (dimension,). Predicted mean - proppts: np.ndarray - Shape (2*dimension + 1, dimension). Propagated sigma points - mean: np.ndarray - Shape (dimension,). Result of _estimate_mean(...) - - Returns - ------- - np.ndarray - Shape (dimension,). Estimated kernels. - """ - cent_prop = proppts - mean - cent_sig = sigpts - mpred - empcrosscov = cent_sig.T @ (cweights * cent_prop.T).T - return empcrosscov diff --git a/src/probnum/filtsmooth/particle/_importance_distributions.py b/src/probnum/filtsmooth/particle/_importance_distributions.py index 4c6c5ce6d..954f82dc8 100644 --- a/src/probnum/filtsmooth/particle/_importance_distributions.py +++ b/src/probnum/filtsmooth/particle/_importance_distributions.py @@ -99,16 +99,11 @@ def linearization_strategy(non_linear_model): ) @classmethod - def from_ukf(cls, dynamics_model, spread=1e-4, priorpar=2.0, special_scale=0.0): + def from_ukf(cls, dynamics_model): """Create an importance distribution that uses the UKF for proposals.""" def linearization_strategy(non_linear_model): - return gaussian.approx.DiscreteUKFComponent( - non_linear_model, - spread=spread, - priorpar=priorpar, - special_scale=special_scale, - ) + return gaussian.approx.DiscreteUKFComponent(non_linear_model) return cls( dynamics_model=dynamics_model, linearization_strategy=linearization_strategy diff --git a/tests/test_filtsmooth/test_gaussian/test_approx/_linearization_test_interface.py b/tests/test_filtsmooth/test_gaussian/test_approx/_linearization_test_interface.py index 8aaa373b7..ff212bb29 100644 --- a/tests/test_filtsmooth/test_gaussian/test_approx/_linearization_test_interface.py +++ b/tests/test_filtsmooth/test_gaussian/test_approx/_linearization_test_interface.py @@ -52,9 +52,10 @@ def test_exactness_linear_model(self, rng): # Assert that the give the same outputs. received, info1 = linear_model.forward_rv(initrv, 0.0) expected, info2 = linearised_model.forward_rv(initrv, 0.0) + crosscov1 = info1["crosscov"] crosscov2 = info2["crosscov"] - rtol, atol = 1e-10, 1e-10 + rtol, atol = 1e-9, 1e-9 np.testing.assert_allclose(received.mean, expected.mean, rtol=rtol, atol=atol) np.testing.assert_allclose(received.cov, expected.cov, rtol=rtol, atol=atol) np.testing.assert_allclose(crosscov1, crosscov2, rtol=rtol, atol=atol) @@ -102,7 +103,7 @@ def test_filtsmooth_pendulum(self, rng): np.linalg.norm(regression_problem.observations[:, 0] - comp) / normaliser ) - assert smoormse < filtrmse < obs_rmse + assert smoormse < filtrmse < obs_rmse, (smoormse, filtrmse, obs_rmse) class InterfaceContinuousLinearizationTest: diff --git a/tests/test_filtsmooth/test_gaussian/test_approx/test_unscentedkalman.py b/tests/test_filtsmooth/test_gaussian/test_approx/test_unscentedkalman.py index ce71a40d3..366fff5be 100644 --- a/tests/test_filtsmooth/test_gaussian/test_approx/test_unscentedkalman.py +++ b/tests/test_filtsmooth/test_gaussian/test_approx/test_unscentedkalman.py @@ -2,22 +2,11 @@ import pytest -from probnum import filtsmooth, randprocs +from probnum import filtsmooth from ._linearization_test_interface import InterfaceDiscreteLinearizationTest -class TestContinuousUKFComponent: - """Implementation incomplete, hence check that an error is raised.""" - - def test_notimplementederror(self): - sde = randprocs.markov.continuous.SDE( - 1, 1, None, None, None - ) # content is irrelevant. - with pytest.raises(NotImplementedError): - filtsmooth.gaussian.approx.ContinuousUKFComponent(sde) - - class TestDiscreteUKFComponent(InterfaceDiscreteLinearizationTest): # Replacement for an __init__ in the pytest language. See: diff --git a/tests/test_filtsmooth/test_gaussian/test_approx/test_unscentedtransform.py b/tests/test_filtsmooth/test_gaussian/test_approx/test_unscentedtransform.py deleted file mode 100644 index 39289d69a..000000000 --- a/tests/test_filtsmooth/test_gaussian/test_approx/test_unscentedtransform.py +++ /dev/null @@ -1,71 +0,0 @@ -import unittest - -import numpy as np - -from probnum import filtsmooth, randvars - - -class TestUnscentedTransform(unittest.TestCase): - def setUp(self): - self.ndim = np.random.randint(1, 33) # 1 < random int < 33 - alpha, beta, kappa = np.random.rand(3) - self.ut = filtsmooth.gaussian.approx.UnscentedTransform( - self.ndim, alpha, beta, kappa - ) - self.mean = np.random.rand(self.ndim) - cvr = np.random.rand(self.ndim, self.ndim) - self.covar = cvr @ cvr.T - - def test_weights_shape(self): - self.assertEqual(self.ut.mweights.ndim, 1) - self.assertEqual(self.ut.mweights.shape[0], 2 * self.ndim + 1) - self.assertEqual(self.ut.cweights.ndim, 1) - self.assertEqual(self.ut.cweights.shape[0], 2 * self.ndim + 1) - - def test_sigpts_shape(self): - sigpts = self.ut.sigma_points(randvars.Normal(self.mean, self.covar)) - self.assertEqual(sigpts.ndim, 2) - self.assertEqual(sigpts.shape[0], 2 * self.ndim + 1) - self.assertEqual(sigpts.shape[1], self.ndim) - - def test_propagate_shape(self): - sigpts = self.ut.sigma_points(randvars.Normal(self.mean, self.covar)) - propagated = self.ut.propagate(None, sigpts, lambda t, x: np.sin(x)) - self.assertEqual(propagated.ndim, 2) - self.assertEqual(propagated.shape[0], 2 * self.ndim + 1) - self.assertEqual(propagated.shape[1], self.ndim) - - def test_estimate_statistics_shape(self): - sigpts = self.ut.sigma_points(randvars.Normal(self.mean, self.covar)) - proppts = self.ut.propagate(None, sigpts, lambda t, x: np.sin(x)) - mest, cest, ccest = self.ut.estimate_statistics( - proppts, sigpts, self.covar, self.mean - ) - self.assertEqual(mest.ndim, 1) - self.assertEqual(mest.shape[0], self.ndim) - self.assertEqual(cest.ndim, 2) - self.assertEqual(cest.shape[0], self.ndim) - self.assertEqual(cest.shape[1], self.ndim) - self.assertEqual(ccest.ndim, 2) - self.assertEqual(ccest.shape[0], self.ndim) - self.assertEqual(ccest.shape[1], self.ndim) - - def test_transform_of_gaussian_exact(self): - sigpts = self.ut.sigma_points(randvars.Normal(self.mean, self.covar)) - ndim_meas = self.ndim + 1 # != self.ndim is important - transmtrx = np.random.rand(ndim_meas, self.ndim) - meascov = 0 * np.random.rand(ndim_meas, ndim_meas) - proppts = self.ut.propagate(None, sigpts, lambda t, x: transmtrx @ x) - mest, cest, ccest = self.ut.estimate_statistics( - proppts, sigpts, meascov, self.mean - ) - diff_mean = np.linalg.norm(mest - transmtrx @ self.mean) - diff_covar = np.linalg.norm(cest - transmtrx @ self.covar @ transmtrx.T) - diff_crosscovar = np.linalg.norm(ccest - self.covar @ transmtrx.T) - self.assertLess(diff_mean / np.linalg.norm(transmtrx @ self.mean), 1e-11) - self.assertLess( - diff_covar / np.linalg.norm(transmtrx @ self.covar @ transmtrx.T), 1e-11 - ) - self.assertLess( - diff_crosscovar / np.linalg.norm(self.covar @ transmtrx.T), 1e-11 - )