diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ab94669..df1831a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,3 +19,7 @@ repos: rev: 22.3.0 hooks: - id: black +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.3.0 + hooks: + - id: mypy diff --git a/bletl/core.py b/bletl/core.py index 6531da1..ffbc893 100644 --- a/bletl/core.py +++ b/bletl/core.py @@ -8,7 +8,7 @@ import urllib.request import warnings from collections.abc import Iterable -from typing import Optional, Union +from typing import Optional, Sequence, Union import numpy import pandas @@ -92,14 +92,14 @@ def get_parser(filepath: Union[str, pathlib.Path]) -> BLDParser: def _parse( filepath: str, drop_incomplete_cycles: bool, - lot_number: int, - temp: int, - cal_0: float = None, - cal_100: float = None, - phi_min: float = None, - phi_max: float = None, - pH_0: float = None, - dpH: float = None, + lot_number: Optional[int], + temp: Optional[int], + cal_0: Optional[float] = None, + cal_100: Optional[float] = None, + phi_min: Optional[float] = None, + phi_max: Optional[float] = None, + pH_0: Optional[float] = None, + dpH: Optional[float] = None, ) -> BLData: """Parses a raw BioLector CSV file into a BLData object. @@ -138,29 +138,39 @@ def _parse( When the file contents do not match with a known BioLector result file format. """ parser = get_parser(filepath) - data = parser.parse(filepath, lot_number, temp, cal_0, cal_100, phi_min, phi_max, pH_0, dpH) + data = parser.parse( + filepath, + lot_number=lot_number, + temp=temp, + cal_0=cal_0, + cal_100=cal_100, + phi_min=phi_min, + phi_max=phi_max, + pH_0=pH_0, + dpH=dpH, + ) if (not data.measurements.empty) and drop_incomplete_cycles: index_names, measurements = utils._unindex(data.measurements) latest_full_cycle = utils._last_full_cycle(measurements) measurements = measurements[measurements.cycle <= latest_full_cycle] - data._measurements = utils._reindex(measurements, index_names) + data._measurements = utils._reindex(measurements, index_names) # type: ignore return data def parse( - filepaths, + filepaths: Union[str, Sequence[str]], *, drop_incomplete_cycles: bool = True, - lot_number: int = None, - temp: int = None, - cal_0: float = None, - cal_100: float = None, - phi_min: float = None, - phi_max: float = None, - pH_0: float = None, - dpH: float = None, + lot_number: Optional[int] = None, + temp: Optional[int] = None, + cal_0: Optional[float] = None, + cal_100: Optional[float] = None, + phi_min: Optional[float] = None, + phi_max: Optional[float] = None, + pH_0: Optional[float] = None, + dpH: Optional[float] = None, ) -> BLData: """Parses a raw BioLector CSV file into a BLData object and applies calibration. diff --git a/bletl/growth.py b/bletl/growth.py index c8f2371..a716b2f 100644 --- a/bletl/growth.py +++ b/bletl/growth.py @@ -1,5 +1,6 @@ import logging import typing +from typing import Dict, Optional, Sequence, Tuple, Union import arviz import calibr8 @@ -10,7 +11,7 @@ try: import pytensor.tensor as pt except ModuleNotFoundError: - import aesara.tensor as pt + import aesara.tensor as pt # type: ignore _log = logging.getLogger(__file__) @@ -22,13 +23,13 @@ class GrowthRateResult: def __init__( self, *, - t_data: numpy.ndarray, - t_segments: numpy.ndarray, - y: numpy.ndarray, + t_data: Union[Sequence[float], numpy.ndarray], + t_segments: Union[Sequence[float], numpy.ndarray], + y: Union[Sequence[float], numpy.ndarray], calibration_model: calibr8.CalibrationModel, - switchpoints: typing.Dict[float, str], + switchpoints: Dict[float, str], pmodel: pm.Model, - theta_map: dict, + theta_map: Dict[str, numpy.ndarray], ): """Creates a result object of a growth rate analysis. @@ -47,9 +48,9 @@ def __init__( theta_map : dict the PyMC MAP estimate """ - self._t_data = t_data - self._t_segments = t_segments - self._y = y + self._t_data = numpy.asarray(t_data) + self._t_segments = numpy.asarray(t_segments) + self._y = numpy.asarray(y) self._switchpoints = switchpoints self.calibration_model = calibration_model self._pmodel = pmodel @@ -73,17 +74,17 @@ def y(self) -> numpy.ndarray: return self._y @property - def switchpoints(self) -> typing.Dict[float, str]: + def switchpoints(self) -> Dict[float, str]: """Dictionary (by time) of known and detected switchpoints.""" return self._switchpoints @property - def known_switchpoints(self) -> typing.Tuple[float]: + def known_switchpoints(self) -> Tuple[float, ...]: """Time values of previously known switchpoints in the model.""" return tuple(t for t, label in self.switchpoints.items() if label != "detected") @property - def detected_switchpoints(self) -> typing.Tuple[float]: + def detected_switchpoints(self) -> Tuple[float, ...]: """Time values of switchpoints that were autodetected from the fit.""" return tuple(t for t, label in self.switchpoints.items() if label == "detected") @@ -93,12 +94,12 @@ def pmodel(self) -> pm.Model: return self._pmodel @property - def theta_map(self) -> dict: + def theta_map(self) -> Dict[str, numpy.ndarray]: """MAP estimate of the model parameters.""" return self._theta_map @property - def idata(self) -> typing.Optional[arviz.InferenceData]: + def idata(self) -> Optional[arviz.InferenceData]: """ArviZ InferenceData object of the MCMC trace.""" return self._idata @@ -113,18 +114,20 @@ def x_map(self) -> numpy.ndarray: return self.theta_map["X"] @property - def mu_mcmc(self) -> typing.Optional[numpy.ndarray]: + def mu_mcmc(self) -> Optional[numpy.ndarray]: """Posterior samples of growth rates in segments between data points.""" if not self.idata: return None + assert hasattr(self.idata, "posterior") return self.idata.posterior.mu_t.stack(sample=("chain", "draw")).values.T @property - def x_mcmc(self) -> typing.Optional[numpy.ndarray]: + def x_mcmc(self) -> Optional[numpy.ndarray]: """Posterior samples of biomass curve.""" - if not self.idata: + if self.idata is None: return None - return self._idata.posterior["X"].stack(sample=("chain", "draw")).T + assert hasattr(self.idata, "posterior") + return self.idata.posterior["X"].stack(sample=("chain", "draw")).T def sample(self, **kwargs) -> None: """Runs MCMC sampling with default settings on the growth model. @@ -157,8 +160,8 @@ def _make_random_walk( nu: float = 1, length: int, student_t: bool, - initval: numpy.ndarray = None, - dims: typing.Optional[str] = None, + initval: Optional[numpy.ndarray] = None, + dims: Optional[str] = None, ): """Create a random walk with either a Normal or Student-t distribution. @@ -215,7 +218,11 @@ def _make_random_walk( def _get_smoothed_mu( - t: numpy.ndarray, y: numpy.ndarray, cm_cdw: calibr8.CalibrationModel, *, clip=0.5 + t: Sequence[float], + y: Sequence[float], + cm_cdw: calibr8.CalibrationModel, + *, + clip: float = 0.5, ) -> numpy.ndarray: """Calculate a rough estimate of the specific growth rate from smoothed observations. @@ -236,10 +243,10 @@ def _get_smoothed_mu( A vector of specific growth rates. """ # apply moving average to reduce backscatter noise - y = numpy.convolve(y, numpy.ones(5) / 5, "same") + yarr = numpy.convolve(y, numpy.ones(5) / 5, "same") # convert to biomass - X = cm_cdw.predict_independent(y) + X = cm_cdw.predict_independent(yarr) # calculate growth rate dX = numpy.diff(X) @@ -259,17 +266,17 @@ def _get_smoothed_mu( def fit_mu_t( - t: typing.Sequence[float], - y: typing.Sequence[float], + t: Sequence[float], + y: Sequence[float], calibration_model: calibr8.CalibrationModel, *, - switchpoints: typing.Optional[typing.Union[typing.Sequence[float], typing.Dict[float, str]]] = None, + switchpoints: Optional[Union[Sequence[float], Dict[float, str]]] = None, mcmc_samples: int = 0, mu_prior: float = 0, drift_scale: float, nu: float = 5, x0_prior: float = 0.25, - student_t: typing.Optional[bool] = None, + student_t: Optional[bool] = None, switchpoint_prob: float = 0.01, replicate_id: str = "unnamed", ): @@ -357,7 +364,7 @@ def fit_mu_t( mu_segments = [] i_from = 0 for i, t_switch in enumerate(t_switchpoints_known): - i_to = numpy.argmax(t > t_switch) + i_to = int(numpy.argmax(t > t_switch)) i_len = len(t[i_from:i_to]) name = f"mu_phase_{i}" slc = slice(i_from, i_to) @@ -460,10 +467,10 @@ def fit_mu_t( def detect_switchpoints( switchpoint_prob: float, - t_data: typing.Sequence[float], + t_data: Sequence[float], pmodel: pm.Model, - theta_map: typing.Dict[str, numpy.ndarray], -) -> typing.Dict[float, str]: + theta_map: Dict[str, numpy.ndarray], +) -> Dict[float, str]: """Helper function to detect switchpoints from a fitted random walk. Parameters @@ -509,15 +516,15 @@ def detect_switchpoints( # To get our length vector to align with the , # we prepend a 0.5 as a placeholder for the CDF of the initial point of the random walk. cdf_evals += [0.5, *numpy.exp(logcdfs)] - cdf_evals = numpy.array(cdf_evals) - if len(cdf_evals) != len(t_data) - 1: + cdf_evals_arr = numpy.array(cdf_evals) + if len(cdf_evals_arr) != len(t_data) - 1: raise Exception( - f"Failed to find all random walk segments. Found {len(cdf_evals)}, expected {len(t_data) - 1}." + f"Failed to find all random walk segments. Found {len(cdf_evals_arr)}, expected {len(t_data) - 1}." ) # Filter for the elements that lie outside of the [0.005, 0.995] interval (if switchpoint_prob=0.01). significance_mask = numpy.logical_or( - cdf_evals < (switchpoint_prob / 2), - cdf_evals > (1 - switchpoint_prob / 2), + cdf_evals_arr < (switchpoint_prob / 2), + cdf_evals_arr > (1 - switchpoint_prob / 2), ) # Collect switchpoint information from points with significant CDF values. # Here we don't need to filter known switchpoints, because these correspond to the first diff --git a/bletl/parsing/bl1.py b/bletl/parsing/bl1.py index 379d357..b28933c 100644 --- a/bletl/parsing/bl1.py +++ b/bletl/parsing/bl1.py @@ -92,12 +92,12 @@ def calibrate_with_lot(self, data: BLData, lot_number: Optional[int] = None, tem def calibrate_with_parameters( self, data: BLData, - cal_0: float = None, - cal_100: float = None, - phi_min: float = None, - phi_max: float = None, - pH_0: float = None, - dpH: float = None, + cal_0: Optional[float] = None, + cal_100: Optional[float] = None, + phi_min: Optional[float] = None, + phi_max: Optional[float] = None, + pH_0: Optional[float] = None, + dpH: Optional[float] = None, ): def process_backscatter(raw_data_df, cycle_ref_df, global_ref): """ @@ -182,14 +182,14 @@ def process_DO(raw_data_df, cal_0, cal_100): def parse( self, filepath, - lot_number: int = None, - temp: int = None, - cal_0: float = None, - cal_100: float = None, - phi_min: float = None, - phi_max: float = None, - pH_0: float = None, - dpH: float = None, + lot_number: Optional[int] = None, + temp: Optional[int] = None, + cal_0: Optional[float] = None, + cal_100: Optional[float] = None, + phi_min: Optional[float] = None, + phi_max: Optional[float] = None, + pH_0: Optional[float] = None, + dpH: Optional[float] = None, ): headerlines, data = split_header_data(filepath) @@ -476,6 +476,8 @@ def fetch_calibration_data(lot_number: int, temp: int): Dictionary containing calibration data. Can be readily used in calibration function. """ + assert utils.__spec__ is not None + assert utils.__spec__.origin is not None module_path = pathlib.Path(utils.__spec__.origin).parents[0] calibration_file = pathlib.Path(module_path, "cache", "CalibrationLot.ini") diff --git a/bletl/parsing/blpro.py b/bletl/parsing/blpro.py index d92204f..9a28eaa 100644 --- a/bletl/parsing/blpro.py +++ b/bletl/parsing/blpro.py @@ -3,16 +3,16 @@ import datetime import io import logging +import os import pathlib import re import warnings import xml.etree.ElementTree +from typing import Optional, Union import numpy import pandas -from bletl.parsing.bl1 import fetch_calibration_data - from .. import utils from ..types import ( BioLectorModel, @@ -28,15 +28,15 @@ class BioLectorProParser(BLDParser): def parse( self, - filepath, - lot_number: int = None, - temp: int = None, - cal_0: float = None, - cal_100: float = None, - phi_min: float = None, - phi_max: float = None, - pH_0: float = None, - dpH: float = None, + filepath: Union[str, os.PathLike], + lot_number: Optional[int] = None, + temp: Optional[int] = None, + cal_0: Optional[float] = None, + cal_100: Optional[float] = None, + phi_min: Optional[float] = None, + phi_max: Optional[float] = None, + pH_0: Optional[float] = None, + dpH: Optional[float] = None, ) -> BLData: metadata, data = parse_metadata_data(filepath) @@ -54,7 +54,7 @@ def parse( bld.valves, bld.module = extract_valves_module(data) bld.diagnostics = extract_diagnostics(data) - if not None in [lot_number, temp]: + if lot_number is not None and temp is not None: lot_cal_data = fetch_calibration_data(lot_number, temp) else: lot_cal_data = None @@ -476,6 +476,8 @@ def fetch_calibration_data(lot_number: int, temp: int): Dictionary containing calibration data. Can be readily used in calibration function. """ + assert utils.__spec__ is not None + assert utils.__spec__.origin is not None module_path = pathlib.Path(utils.__spec__.origin).parents[0] calibration_file = pathlib.Path(module_path, "cache", "CalibrationLot_II.xml") diff --git a/bletl/py.typed b/bletl/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/bletl/splines.py b/bletl/splines.py index a8edc11..46244a4 100644 --- a/bletl/splines.py +++ b/bletl/splines.py @@ -3,7 +3,7 @@ import multiprocessing import numbers import pickle -from typing import Callable, Dict, Optional, Sequence, Tuple, Union +from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union import csaps import joblib @@ -47,7 +47,12 @@ def derivative(self, order: int = 1, *, epsilon=0.001) -> Callable[[numpy.ndarra f"{order}-order derivatives are not implemented for the UnivariateCubicSmoothingSpline" ) - def __call__(self, xi: Union[float, numpy.ndarray]): + def __call__( + self, + x: csaps.UnivariateDataType, + nu: Optional[int] = None, + extrapolate: Optional[Union[bool, str]] = None, + ) -> numpy.ndarray: """Evaluate the spline at some coordinates. This method overrides the implementation of the base type @@ -55,13 +60,13 @@ def __call__(self, xi: Union[float, numpy.ndarray]): Parameters ---------- - xi : float, numpy.ndarray + x : float, numpy.ndarray One or more coordinates at which the spline should be evaluated. """ # scipy splines can be called on scalars - xi_arr = numpy.atleast_1d(xi) + xi_arr = numpy.atleast_1d(numpy.asarray(x)) result = super().__call__(xi_arr) - if isinstance(xi, (int, float)): + if isinstance(x, (int, float)): return result[0] return result @@ -119,7 +124,7 @@ def _evaluate_smoothing_factor(smoothing_factor: float, timepoints, values, k: i mssr : float Mean sum of squared residuals of the splines fitted to data subsets. """ - smoothing_factor = numpy.atleast_1d(smoothing_factor) + smoothing_factor_arr = numpy.atleast_1d(smoothing_factor) if k < 2: raise ValueError(f"Need k≥2 splits for crossvalidation. Setting was k={k}.") if len(values) < 3 * k: @@ -138,11 +143,11 @@ def _evaluate_smoothing_factor(smoothing_factor: float, timepoints, values, k: i y=values, train_idxs=train_mask, test_idxs=test_mask, - smoothing_factor=smoothing_factor, + smoothing_factor=smoothing_factor_arr, method=method, ) ) - return numpy.mean(ssrs) + return float(numpy.mean(ssrs)) def _evaluate_spline_test_error( @@ -150,7 +155,7 @@ def _evaluate_spline_test_error( y: numpy.ndarray, train_idxs: numpy.ndarray, test_idxs: numpy.ndarray, - smoothing_factor: float, + smoothing_factor: numpy.ndarray, method: str, ) -> float: """Fits spline to a test set and returns the sum of squared error on the test set. @@ -234,7 +239,7 @@ def get_multiple_splines( k_folds: int = 5, method: str = "us", last_cycle: Optional[int] = None, -) -> Dict[str, Union[UnivariateCubicSmoothingSpline, scipy.interpolate.UnivariateSpline]]: +) -> List[Tuple[str, Union[UnivariateCubicSmoothingSpline, scipy.interpolate.UnivariateSpline]]]: """Returns multiple splines with k-fold crossvalidated smoothing factor Parameters @@ -255,7 +260,7 @@ def get_multiple_splines( Dict with {well:spline} for each well of "wells" """ x, y = fts.get_timeseries(wells[0]) - if last_cycle == None: + if last_cycle is None: last_cycle = len(x) elif last_cycle > len(x): raise ValueError("Please change last_cycle.") diff --git a/bletl/types.py b/bletl/types.py index 2ab1099..a500c9d 100644 --- a/bletl/types.py +++ b/bletl/types.py @@ -1,10 +1,10 @@ """Specifies the base types for parsing and representing BioLector CSV files.""" import abc import enum -import pathlib +import os import typing import warnings -from typing import Dict, Optional, Tuple +from typing import Dict, Optional, Tuple, Union import numpy import pandas @@ -22,13 +22,27 @@ class BioLectorModel(enum.Enum): class BLData(dict): """Standardized data type for BioLector data.""" - def __init__(self, model, environment, filtersets, references, measurements, comments): + def __init__( + self, + model: BioLectorModel, + environment: pandas.DataFrame, + filtersets: pandas.DataFrame, + references: pandas.DataFrame, + measurements: pandas.DataFrame, + comments: pandas.DataFrame, + ): self._model = model self._environment = environment self._filtersets = filtersets self._references = references self._measurements = measurements self._comments = comments + # Optional, depending on the BioLector Model + self.metadata: dict = {} + self.fluidics: Optional[pandas.DataFrame] = None + self.module: Optional[pandas.DataFrame] = None + self.valves: Optional[pandas.DataFrame] = None + self.diagnostics: Optional[pandas.DataFrame] = None super().__init__() @property @@ -47,7 +61,7 @@ def filtersets(self) -> pandas.DataFrame: return self._filtersets @property - def wells(self) -> typing.Tuple[str]: + def wells(self) -> typing.Tuple[str, ...]: """Wells that were measured.""" if len(self) == 0: return tuple() @@ -200,7 +214,7 @@ class FilterTimeSeries: """Generalizable data type for calibrated timeseries.""" @property - def wells(self) -> typing.Tuple[str]: + def wells(self) -> typing.Tuple[str, ...]: """Well IDs that were measured.""" return tuple(self.time.columns) @@ -208,7 +222,9 @@ def __init__(self, time_df: pandas.DataFrame, value_df: pandas.DataFrame): self.time = time_df self.value = value_df - def get_timeseries(self, well: str, *, last_cycle: int = None) -> Tuple[numpy.ndarray, numpy.ndarray]: + def get_timeseries( + self, well: str, *, last_cycle: Optional[int] = None + ) -> Tuple[numpy.ndarray, numpy.ndarray]: """Retrieves (time, value) for a specific well. Parameters @@ -268,7 +284,7 @@ class BLDParser: @abc.abstractmethod def parse( self, - filepath: pathlib.Path, + filepath: Union[str, os.PathLike], *, lot_number: Optional[int] = None, temp: Optional[int] = None, diff --git a/bletl/utils.py b/bletl/utils.py index 815be8a..49c2c70 100644 --- a/bletl/utils.py +++ b/bletl/utils.py @@ -3,14 +3,16 @@ import pathlib import re import urllib -from typing import Sequence, Tuple, Union +from typing import Optional, Sequence, Tuple, Union import pandas from . import core -def __to_typed_cols__(dfin: pandas.DataFrame, ocol_ncol_type: Tuple[str, str, type]) -> pandas.DataFrame: +def __to_typed_cols__( + dfin: pandas.DataFrame, ocol_ncol_type: Sequence[Tuple[Optional[str], str, type]] +) -> pandas.DataFrame: """Can be used to filter & convert data frame columns. Parameters @@ -35,7 +37,7 @@ def __to_typed_cols__(dfin: pandas.DataFrame, ocol_ncol_type: Tuple[str, str, ty return dfout -def _unindex(dataframe: pandas.DataFrame) -> Tuple[Sequence[Union[str, None]], pandas.DataFrame]: +def _unindex(dataframe: pandas.DataFrame) -> Tuple[Sequence[Optional[str]], pandas.DataFrame]: """Resets the index of the DataFrame. Parameters @@ -110,10 +112,10 @@ def _concatenate_fragments( stack = pandas.concat((stack, fragment)) # re-apply the original indexing scheme - return _reindex(stack, index_names) + return _reindex(stack, index_names) # type: ignore -def _last_well_in_cycle(measurements: pandas.DataFrame) -> str: +def _last_well_in_cycle(measurements: pandas.DataFrame) -> Optional[str]: """Finds the name of the last well measured in a cycle. Parameters @@ -195,6 +197,8 @@ def download_calibration_data() -> bool: `True` if calibration data was downloaded successfully, `False` otherwise. """ try: + assert core.__spec__ is not None + assert core.__spec__.origin is not None module_path = pathlib.Path(core.__spec__.origin).parents[0] url_bl1 = "http://updates.m2p-labs.com/CalibrationLot.ini" diff --git a/pyproject.toml b/pyproject.toml index 9046bc1..5b47df4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,3 +9,6 @@ line-length = 110 [tool.isort] profile = "black" + +[tool.mypy] +ignore_missing_imports = true diff --git a/setup.py b/setup.py index 99d477d..868aa8a 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,7 @@ def get_version(): setuptools.setup( name=__packagename__, packages=setuptools.find_packages(), + package_data={"bletl": ["py.typed"]}, zip_safe=False, version=__version__, description="Package for parsing and transforming BioLector raw data.",