Skip to content

Commit

Permalink
Merge pull request #18 from Jacob-Stevens-Haas/static-typing
Browse files Browse the repository at this point in the history
Add more static typing
  • Loading branch information
Jacob-Stevens-Haas authored Feb 18, 2024
2 parents 1f142df + 5369b2d commit 2557f81
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 76 deletions.
22 changes: 21 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,24 @@ addopts = '-m "not slow"'
markers = ["slow"]

[tool.mypy]
files = ["src/gen_experiments/__init__.py"]
files = ["src/gen_experiments/__init__.py", "src/gen_experiments/utils.py"]

[[tool.mypy.overrides]]
module="auto_ks.*"
ignore_missing_imports=true

[[tool.mypy.overrides]]
module="sklearn.*"
ignore_missing_imports=true

[[tool.mypy.overrides]]
module="pysindy.*"
ignore_missing_imports=true

[[tool.mypy.overrides]]
module="kalman.*"
ignore_missing_imports=true

[[tool.mypy.overrides]]
module="scipy.*"
ignore_missing_imports=true
10 changes: 5 additions & 5 deletions src/gen_experiments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from pysindy import BaseDifferentiation, FiniteDifference, SINDy # type: ignore

from . import gridsearch, odes, pdes
from .utils import TrialData
from .utils import SINDyTrialData

this_module = importlib.import_module(__name__)
BORING_ARRAY = np.ones((2, 2))
BORING_ARRAY = np.ones((2, 2), dtype=float)

Scores = Mapping[str, float]

Expand All @@ -38,20 +38,20 @@ class NoExperiment:
@staticmethod
def run(
*args: Any, return_all: bool = True, **kwargs: Any
) -> Scores | tuple[Scores, TrialData]:
) -> Scores | tuple[Scores, SINDyTrialData]:
metrics = defaultdict(
lambda: 1,
main=1,
)
if return_all:
trial_data: TrialData = {
trial_data: SINDyTrialData = {
"dt": 1,
"coeff_true": BORING_ARRAY[:1],
"coeff_fit": BORING_ARRAY[:1],
# "coefficients": boring_array,
"feature_names": ["1"],
"input_features": ["x", "y"],
"t_train": np.arange(0, 1, 1),
"t_train": np.arange(0, 1, 1, dtype=float),
"x_train": BORING_ARRAY,
"x_true": BORING_ARRAY,
"smooth_train": BORING_ARRAY,
Expand Down
4 changes: 2 additions & 2 deletions src/gen_experiments/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from gen_experiments.data import _signal_avg_power
from gen_experiments.plotting import _PlotPrefs
from gen_experiments.utils import FullTrialData, NestedDict, SeriesDef, SeriesList
from gen_experiments.utils import FullSINDyTrialData, NestedDict, SeriesDef, SeriesList

T = TypeVar("T")
U = TypeVar("U")
Expand All @@ -16,7 +16,7 @@ def ND(d: dict[T, U]) -> NestedDict[T, U]:


def _convert_abs_rel_noise(
grid_vals: list, grid_params: list, recent_results: FullTrialData
grid_vals: list, grid_params: list, recent_results: FullSINDyTrialData
):
"""Convert abs_noise grid_vals to rel_noise"""
signal = np.stack(recent_results["x_true"], axis=-1)
Expand Down
37 changes: 19 additions & 18 deletions src/gen_experiments/data.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
from math import ceil
from pathlib import Path
from typing import Callable
from typing import Callable, Optional, cast
from warnings import warn

import mitosis
import numpy as np
import scipy

from gen_experiments.utils import GridsearchResultDetails
from gen_experiments.utils import Float1D, Float2D, GridsearchResultDetails

INTEGRATOR_KEYWORDS = {"rtol": 1e-12, "method": "LSODA", "atol": 1e-12}
TRIALS_FOLDER = Path(__file__).parent.absolute() / "trials"


def gen_data(
rhs_func,
n_coord,
seed=None,
n_trajectories=1,
x0_center=None,
ic_stdev=3,
noise_abs=None,
noise_rel=None,
nonnegative=False,
dt=0.01,
t_end=10,
):
rhs_func: Callable,
n_coord: int,
seed: Optional[int] = None,
n_trajectories: int = 1,
x0_center: Optional[Float1D] = None,
ic_stdev: float = 3,
noise_abs: Optional[float] = None,
noise_rel: Optional[float] = None,
nonnegative: bool = False,
dt: float = 0.01,
t_end: float = 10,
) -> tuple[float, Float1D, Float2D, Float2D, Float2D, Float2D]:
"""Generate random training and test data
Note that test data has no noise.
Expand Down Expand Up @@ -57,7 +57,7 @@ def gen_data(
rng = np.random.default_rng(seed)
if x0_center is None:
x0_center = np.zeros((n_coord))
t_train = np.arange(0, t_end, dt)
t_train = np.arange(0, t_end, dt, dtype=np.float_)
t_train_span = (t_train[0], t_train[-1])
if nonnegative:
shape = ((x0_center + 1) / ic_stdev) ** 2
Expand Down Expand Up @@ -123,10 +123,11 @@ def _alert_short(arr):
x_train_true = np.copy(x_train)
if noise_rel is not None:
noise_abs = np.sqrt(_signal_avg_power(x_test) * noise_rel)
x_train = x_train + noise_abs * rng.standard_normal(x_train.shape)
x_train = x_train + cast(float, noise_abs) * rng.standard_normal(x_train.shape)
x_train = list(x_train)
x_test = list(x_test)
x_dot_test = list(x_dot_test)
x_train_true = list(x_train_true)
return dt, t_train, x_train, x_test, x_dot_test, x_train_true


Expand Down Expand Up @@ -211,14 +212,14 @@ def gen_pde_data(
x_train_true = np.copy(x_train)
if noise_rel is not None:
noise_abs = _max_amplitude(x_test) * noise_rel
x_train = x_train + noise_abs * rng.standard_normal(x_train.shape)
x_train = x_train + cast(float, noise_abs) * rng.standard_normal(x_train.shape)
x_train = [np.moveaxis(x_train, 0, -2)]
x_train_true = np.moveaxis(x_train_true, 0, -2)
x_test = [np.moveaxis(x_test, [0, 1], [-1, -2])]
return dt, t_train, x_train, x_test, x_dot_test, x_train_true


def _max_amplitude(signal: np.ndarray):
def _max_amplitude(signal: np.ndarray) -> float:
return np.abs(scipy.fft.rfft(signal, axis=0)[1:]).max() / np.sqrt(len(signal))


Expand Down
25 changes: 25 additions & 0 deletions src/gen_experiments/debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Annotated, Generic, TypedDict, TypeVar

import numpy as np
from numpy.typing import DTypeLike, NBitBase, NDArray

# T = TypeVar("T")

# class Foo[T]:
# items: list[T]

# def __init__(self, thing: T):
# self.items = [thing, thing]

# Bar =


T = TypeVar("T", bound=np.generic)
Foo = NDArray[T]
Bar = Annotated[NDArray, "foobar"]

lil_foo = NDArray[np.void]


def baz(qux: Foo[np.void]):
pass
4 changes: 2 additions & 2 deletions src/gen_experiments/gridsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
SavedData,
SeriesDef,
SeriesList,
TrialData,
SINDyTrialData,
_amax_to_full_inds,
_argopt,
_grid_locator_match,
Expand Down Expand Up @@ -115,7 +115,7 @@ def run(
curr_results, grid_data = base_ex.run(
new_seed, **curr_other_params, display=False, return_all=True
)
grid_data: TrialData
grid_data: SINDyTrialData
intermediate_data.append(
{"params": curr_other_params.flatten(), "pind": ind, "data": grid_data}
)
Expand Down
12 changes: 6 additions & 6 deletions src/gen_experiments/odes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
plot_training_data,
)
from .utils import (
FullTrialData,
TrialData,
FullSINDyTrialData,
SINDyTrialData,
_make_model,
coeff_metrics,
integration_metrics,
Expand Down Expand Up @@ -161,7 +161,7 @@ def run(
opt_params: dict,
display: bool = True,
return_all: bool = False,
) -> dict | tuple[dict, TrialData | FullTrialData]:
) -> dict | tuple[dict, SINDyTrialData | FullSINDyTrialData]:
rhsfunc = ode_setup[group]["rhsfunc"]
input_features = ode_setup[group]["input_features"]
coeff_true = ode_setup[group]["coeff_true"]
Expand All @@ -187,7 +187,7 @@ def run(
coeff_true, coefficients, feature_names = unionize_coeff_matrices(model, coeff_true)

sim_ind = -1
trial_data: TrialData = {
trial_data: SINDyTrialData = {
"dt": dt,
"coeff_true": coeff_true,
"coeff_fit": coefficients,
Expand All @@ -202,7 +202,7 @@ def run(
"model": model,
}
if display:
trial_data: FullTrialData = trial_data | simulate_test_data(
trial_data: FullSINDyTrialData = trial_data | simulate_test_data(
trial_data["model"], trial_data["dt"], trial_data["x_test"]
)
plot_ode_panel(trial_data)
Expand All @@ -214,7 +214,7 @@ def run(
return metrics


def plot_ode_panel(trial_data: FullTrialData):
def plot_ode_panel(trial_data: FullSINDyTrialData):
trial_data["model"].print()
plot_training_data(
trial_data["x_train"], trial_data["x_true"], trial_data["smooth_train"]
Expand Down
10 changes: 5 additions & 5 deletions src/gen_experiments/pdes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from .data import gen_pde_data
from .plotting import compare_coefficient_plots, plot_pde_training_data
from .utils import (
FullTrialData,
TrialData,
FullSINDyTrialData,
SINDyTrialData,
_make_model,
coeff_metrics,
integration_metrics,
Expand Down Expand Up @@ -149,7 +149,7 @@ def run(
opt_params: dict,
display: bool = True,
return_all: bool = False,
) -> dict | tuple[dict, TrialData | FullTrialData]:
) -> dict | tuple[dict, SINDyTrialData | FullSINDyTrialData]:
rhsfunc = pde_setup[group]["rhsfunc"]["func"]
input_features = pde_setup[group]["input_features"]
initial_condition = sim_params["init_cond"]
Expand Down Expand Up @@ -177,7 +177,7 @@ def run(
coeff_true, coefficients, feature_names = unionize_coeff_matrices(model, coeff_true)

sim_ind = -1
trial_data: TrialData = {
trial_data: SINDyTrialData = {
"dt": dt,
"coeff_true": coeff_true,
"coeff_fit": coefficients,
Expand All @@ -192,7 +192,7 @@ def run(
"model": model,
}
if display:
trial_data: FullTrialData = trial_data | simulate_test_data(
trial_data: FullSINDyTrialData = trial_data | simulate_test_data(
trial_data["model"], trial_data["dt"], trial_data["x_test"]
)
trial_data["model"].print()
Expand Down
31 changes: 19 additions & 12 deletions src/gen_experiments/plotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import scipy
import seaborn as sns
from matplotlib.axes import Axes

PAL = sns.color_palette("Set1")
PLOT_KWS = {"alpha": 0.7, "linewidth": 3}
Expand Down Expand Up @@ -128,7 +129,7 @@ def signed_sqrt(x):


def plot_training_trajectory(
ax: plt.Axes,
ax: Axes,
x_train: np.ndarray,
x_true: np.ndarray,
x_smooth: np.ndarray,
Expand Down Expand Up @@ -156,6 +157,8 @@ def plot_training_trajectory(
)
if labels:
ax.set(xlabel="$x_0$", ylabel="$x_1$")
else:
ax.set(xticks=[], yticks=[])
elif x_train.shape[1] == 3:
ax.plot(
x_true[:, 0],
Expand Down Expand Up @@ -187,6 +190,8 @@ def plot_training_trajectory(
)
if labels:
ax.set(xlabel="$x$", ylabel="$y$", zlabel="$z$")
else:
ax.set(xticks=[], yticks=[], zticks=[])
else:
raise ValueError("Can only plot 2d or 3d data.")

Expand Down Expand Up @@ -226,7 +231,7 @@ def plot_pde_training_data(last_train, last_train_true, smoothed_last_train):


def plot_test_sim_data_1d_panel(
axs: Sequence[plt.Axes],
axs: Sequence[Axes],
x_test: np.ndarray,
x_sim: np.ndarray,
t_test: np.ndarray,
Expand All @@ -240,31 +245,33 @@ def plot_test_sim_data_1d_panel(


def _plot_test_sim_data_2d(
axs: Annotated[Sequence[plt.Axes], "len=2"],
axs: Annotated[Sequence[Axes], "len=2"],
x_test: np.ndarray,
x_sim: np.ndarray,
labels: bool = True,
) -> None:
axs[0].plot(x_test[:, 0], x_test[:, 1], "k", label="True Trajectory")
if labels:
axs[0].set(xlabel="$x_0$", ylabel="$x_1$")
axs[1].plot(x_sim[:, 0], x_sim[:, 1], "r--", label="Simulation")
if labels:
axs[1].set(xlabel="$x_0$", ylabel="$x_1$")
for ax in axs:
if labels:
ax.set(xlabel="$x_0$", ylabel="$x_1$")
else:
ax.set(xticks=[], yticks=[])


def _plot_test_sim_data_3d(
axs: Annotated[Sequence[plt.Axes], "len=3"],
axs: Annotated[Sequence[Axes], "len=3"],
x_test: np.ndarray,
x_sim: np.ndarray,
labels: bool = True,
) -> None:
axs[0].plot(x_test[:, 0], x_test[:, 1], x_test[:, 2], "k", label="True Trajectory")
if labels:
axs[0].set(xlabel="$x_0$", ylabel="$x_1$", zlabel="$x_2$")
axs[1].plot(x_sim[:, 0], x_sim[:, 1], x_sim[:, 2], "r--", label="Simulation")
if labels:
axs[1].set(xlabel="$x_0$", ylabel="$x_1$", zlabel="$x_2$")
for ax in axs:
if labels:
ax.set(xlabel="$x_0$", ylabel="$x_1$", zlabel="$x_2$")
else:
ax.set(xticks=[], yticks=[], zticks=[])


def plot_test_trajectories(
Expand Down
Loading

0 comments on commit 2557f81

Please sign in to comment.