diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 13accb03..3c40e495 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,7 +22,7 @@ repos: rev: v1.5.1 hooks: - id: mypy - additional_dependencies: [types-beautifulsoup4, types-decorator, types-PyYAML, pydantic] + additional_dependencies: [types-beautifulsoup4, types-decorator, types-PyYAML, pydantic, types-toml] - repo: https://github.com/igorshubovych/markdownlint-cli rev: v0.35.0 hooks: diff --git a/docs/source/examples/comparing-pyprobe-performance.ipynb b/docs/source/examples/comparing-pyprobe-performance.ipynb index a113908f..3ab66862 100644 --- a/docs/source/examples/comparing-pyprobe-performance.ipynb +++ b/docs/source/examples/comparing-pyprobe-performance.ipynb @@ -103,6 +103,12 @@ " csv_time[repeat]= timeit.default_timer() - start_time\n", " start_time = timeit.default_timer()\n", " df = pd.read_parquet(data_directory + '/' + file)\n", + " # Add a column to identify the cycle number\n", + " df['Cycle'] = (\n", + " (df['Step'].astype(int) - df['Step'].astype(int).shift() < 0)\n", + " .fillna(0)\n", + " .cumsum()\n", + " )\n", " cumulative_time[0, repeat] = timeit.default_timer() - start_time\n", "\n", " experiment = df[df['Step'].isin([4, 5, 6, 7])]\n", @@ -240,20 +246,18 @@ "repeated_data = pl.concat([data] * n_repeats)\n", "\n", "# Repeat the 'Cycle' and 'Event' columns to match the length of the repeated data\n", - "cycle_repeated = pl.concat([data['Cycle']] * n_repeats)\n", "event_repeated = pl.concat([data['Event']] * n_repeats)\n", "step_repeated = pl.concat([data['Step']] * n_repeats)\n", "time_repeated = pl.concat([data['Time [s]']]* n_repeats)\n", "\n", "# Increment the 'Cycle' and 'Event' columns\n", - "cycle_increment = data['Cycle'].max() + 1\n", "event_increment = data['Event'].max() + 1\n", "step_increment = data['Step'].max() + 1\n", "time_increment = data['Time [s]'].max()\n", "\n", "\n", "repeated_data = repeated_data.with_columns([\n", - " (pl.arange(0, len(repeated_data)) // len(data) * cycle_increment + cycle_repeated).alias('Cycle'),\n", + " # (pl.arange(0, len(repeated_data)) // len(data) * cycle_increment + cycle_repeated).alias('Cycle'),\n", " (pl.arange(0, len(repeated_data)) // len(data) * event_increment + event_repeated).alias('Event'),\n", " (pl.arange(0, len(repeated_data)) // len(data) * event_increment + step_repeated).alias('Step'),\n", " (pl.arange(0, len(repeated_data)) // len(data) * time_increment + time_repeated).alias('Time [s]'),\n", diff --git a/pyprobe/__init__.py b/pyprobe/__init__.py index 8c6ced0b..7a41a9da 100644 --- a/pyprobe/__init__.py +++ b/pyprobe/__init__.py @@ -1,5 +1,5 @@ """The PyProBE package.""" -from .cell import Cell, make_cell_list # noqa: F401 +from .cell import Cell, __version__, load_archive, make_cell_list # noqa: F401 from .dashboard import launch_dashboard # noqa: F401 from .plot import Plot # noqa: F401 from .result import Result # noqa: F401 diff --git a/pyprobe/analysis/cycling.py b/pyprobe/analysis/cycling.py index b1c6c886..51f17c67 100644 --- a/pyprobe/analysis/cycling.py +++ b/pyprobe/analysis/cycling.py @@ -5,7 +5,7 @@ from pydantic import BaseModel from pyprobe.analysis.utils import AnalysisValidator -from pyprobe.filters import Experiment +from pyprobe.filters import Experiment, get_cycle_column from pyprobe.result import Result @@ -42,6 +42,7 @@ def summary(self, dchg_before_chg: bool = True) -> Result: AnalysisValidator( input_data=self.input_data, required_columns=["Capacity [Ah]", "Time [s]"] ) + self.input_data.base_dataframe = get_cycle_column(self.input_data) self._create_capacity_throughput() lf_capacity_throughput = self.input_data.base_dataframe.group_by( diff --git a/pyprobe/analysis/degradation_mode_analysis.py b/pyprobe/analysis/degradation_mode_analysis.py index 71c95ad6..93a63fe2 100644 --- a/pyprobe/analysis/degradation_mode_analysis.py +++ b/pyprobe/analysis/degradation_mode_analysis.py @@ -224,10 +224,10 @@ def quantify_degradation_modes( self.dma_result = electrode_capacity_results[0].clean_copy( pl.DataFrame( { - "SOH": SOH, - "LAM_pe": LAM_pe, - "LAM_ne": LAM_ne, - "LLI": LLI, + "SOH": SOH[:, 0], + "LAM_pe": LAM_pe[:, 0], + "LAM_ne": LAM_ne[:, 0], + "LLI": LLI[:, 0], } ) ) diff --git a/pyprobe/cell.py b/pyprobe/cell.py index 892e572d..41468235 100644 --- a/pyprobe/cell.py +++ b/pyprobe/cell.py @@ -1,17 +1,23 @@ """Module for the Cell class.""" +import json import os +import shutil import time import warnings +import zipfile from typing import Callable, Dict, List, Optional import distinctipy import polars as pl +import pybamm.solvers.solution from pydantic import BaseModel, Field, field_validator, validate_call from pyprobe.cyclers import arbin, basecycler, basytec, biologic, maccor, neware from pyprobe.filters import Procedure from pyprobe.readme_processor import process_readme +__version__ = "1.0.3" + class Cell(BaseModel): """A class for a cell in a battery experiment.""" @@ -193,12 +199,9 @@ def add_procedure( readme = process_readme(readme_path) self.procedure[procedure_name] = Procedure( - titles=readme.titles, - steps_idx=readme.step_numbers, + readme_dict=readme.experiment_dict, base_dataframe=base_dataframe, info=self.info, - pybamm_experiment=readme.pybamm_experiment, - pybamm_experiment_list=readme.pybamm_experiment_list, ) @staticmethod @@ -287,6 +290,234 @@ def _get_data_paths( data_path = os.path.join(folder_path, filename_str) return data_path + def import_pybamm_solution( + self, + procedure_name: str, + experiment_names: List[str] | str, + pybamm_solutions: List[pybamm.solvers.solution] | pybamm.solvers.solution, + output_data_path: Optional[str] = None, + optional_variables: Optional[List[str]] = None, + ) -> None: + """Import a PyBaMM solution object into a procedure of the cell. + + Filtering a PyBaMM solution object by cycle and step reflects the behaviour of + the :code:`cycles` and :code:`steps` dictionaries of the PyBaMM solution object. + + Multiple experiments can be imported into the same procedure. This is achieved + by providing multiple solution objects and experiment names. + + This method optionally writes the data to a parquet file, if a data path is + provided. + + Args: + procedure_name (str): + A name to give the procedure. This will be used when calling + :code:`cell.procedure[procedure_name]`. + pybamm_solutions (list or pybamm_solution): + A list of PyBaMM solution objects or a single PyBaMM solution object. + experiment_names (list or str): + A list of experiment names or a single experiment name to assign to the + PyBaMM solution object. + output_data_path (str, optional): + The path to write the parquet file. Defaults to None. + optional_variables (list, optional): + A list of variables to import from the PyBaMM solution object in + addition to the PyProBE required variables. Defaults to None. + """ + # the minimum required variables to import from the PyBaMM solution object + required_variables = [ + "Time [s]", + "Current [A]", + "Terminal voltage [V]", + "Discharge capacity [A.h]", + ] + + # get the list of variables to import from the PyBaMM solution object + if optional_variables is not None: + import_variables = required_variables + optional_variables + else: + import_variables = required_variables + + # check if the experiment names and PyBaMM solutions are lists + if isinstance(experiment_names, list) and isinstance(pybamm_solutions, list): + if len(experiment_names) != len(pybamm_solutions): + raise ValueError( + "The number of experiment names and PyBaMM solutions must be equal." + ) + elif isinstance(experiment_names, list) != isinstance(pybamm_solutions, list): + if isinstance(experiment_names, list): + raise ValueError( + "A list of experiment names must be provided with a list of PyBaMM" + " solutions." + ) + else: + raise ValueError( + "A single experiment name must be provided with a single PyBaMM" + " solution." + ) + else: + experiment_names = [str(experiment_names)] + pybamm_solutions = [pybamm_solutions] + + lazyframe_created = False + for experiment_name, pybamm_solution in zip(experiment_names, pybamm_solutions): + # get the data from the PyBaMM solution object + pybamm_data = pybamm_solution.get_data_dict(import_variables) + # convert the PyBaMM data to a polars dataframe and add the experiment name + # as a column + solution_data = pl.LazyFrame(pybamm_data).with_columns( + pl.lit(experiment_name).alias("Experiment") + ) + if lazyframe_created is False: + all_solution_data = solution_data + lazyframe_created = True + else: + # join the new solution data with the existing solution data, a right + # join is used to keep all the data + all_solution_data = all_solution_data.join( + solution_data, on=import_variables + ["Step"], how="right" + ) + # fill null values where the experiment has been extended with the newly + # joined experiment name + all_solution_data = all_solution_data.with_columns( + pl.col("Experiment").fill_null(pl.col("Experiment_right")) + ) + # get the maximum step number for each experiment + max_steps = ( + all_solution_data.group_by("Experiment") + .agg(pl.max("Step").alias("Max Step")) + .sort("Experiment") + .with_columns(pl.col("Max Step").cum_sum().shift()) + ) + # add the maximum step number from the previous experiment to the step number + all_solution_data = all_solution_data.join( + max_steps, on="Experiment", how="left" + ).with_columns( + (pl.col("Step") + pl.col("Max Step").fill_null(-1) + 1).alias("Step") + ) + # get the range of step values for each experiment + step_ranges = all_solution_data.group_by("Experiment").agg( + pl.arange(pl.col("Step").min(), pl.col("Step").max() + 1).alias( + "Step Range" + ) + ) + + # create a dictionary of the experiment names and the step ranges + experiment_dict = {} + for row in step_ranges.collect().iter_rows(): + experiment = row[0] + experiment_dict[experiment] = {"Steps": row[1]} + experiment_dict[experiment]["Step Descriptions"] = [] + + # reformat the data to the PyProBE format + base_dataframe = all_solution_data.select( + [ + pl.col("Time [s]"), + pl.col("Current [A]") * -1, + pl.col("Terminal voltage [V]").alias("Voltage [V]"), + (pl.col("Discharge capacity [A.h]") * -1).alias("Capacity [Ah]"), + pl.col("Step"), + ( + ( + pl.col("Step").cast(pl.Int64) + - pl.col("Step").cast(pl.Int64).shift() + != 0 + ) + .fill_null(strategy="zero") + .cum_sum() + .alias("Event") + ), + ] + ) + # create the procedure object + self.procedure[procedure_name] = Procedure( + base_dataframe=base_dataframe, info=self.info, readme_dict=experiment_dict + ) + + # write the data to a parquet file if a path is provided + if output_data_path is not None: + if not output_data_path.endswith(".parquet"): + output_data_path += ".parquet" + base_dataframe.collect().write_parquet(output_data_path) + + def archive(self, path: str) -> None: + """Archive the cell object. + + Args: + path (str): The path to the archive directory or zip file. + """ + if path.endswith(".zip"): + zip = True + path = path[:-4] + else: + zip = False + if not os.path.exists(path): + os.makedirs(path) + metadata = self.dict() + metadata["PyProBE Version"] = __version__ + for procedure_name, procedure in self.procedure.items(): + if isinstance(procedure.base_dataframe, pl.LazyFrame): + df = procedure.base_dataframe.collect() + else: + df = procedure.base_dataframe + # write the dataframe to a parquet file + filename = procedure_name + ".parquet" + filepath = os.path.join(path, filename) + df.write_parquet(filepath) + # update the metadata with the filename + metadata["procedure"][procedure_name]["base_dataframe"] = filename + with open(os.path.join(path, "metadata.json"), "w") as f: + json.dump(metadata, f) + + if zip: + with zipfile.ZipFile(path + ".zip", "w") as zipf: + for root, _, files in os.walk(path): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, path) + zipf.write(file_path, arcname) + # Delete the original directory + shutil.rmtree(path) + + +def load_archive(path: str) -> Cell: + """Load a cell object from an archive. + + Args: + path (str): The path to the archive directory. + + Returns: + Cell: The cell object. + """ + if path.endswith(".zip"): + extract_path = path[:-4] + with zipfile.ZipFile(path, "r") as zipf: + with zipfile.ZipFile(path, "r") as zipf: + zipf.extractall(extract_path) + # Delete the original zip file + os.remove(path) + archive_path = extract_path + else: + archive_path = path + + with open(os.path.join(archive_path, "metadata.json"), "r") as f: + metadata = json.load(f) + if metadata["PyProBE Version"] != __version__: + warnings.warn( + f"The PyProBE version used to archive the cell was " + f"{metadata['PyProBE Version']}, the current version is " + f"{__version__}. There may be compatibility" + f" issues." + ) + metadata.pop("PyProBE Version") + for procedure in metadata["procedure"].values(): + procedure["base_dataframe"] = os.path.join( + archive_path, procedure["base_dataframe"] + ) + cell = Cell(**metadata) + + return cell + def make_cell_list( record_filepath: str, diff --git a/pyprobe/cyclers/basecycler.py b/pyprobe/cyclers/basecycler.py index 8a5d0776..ecc63f38 100644 --- a/pyprobe/cyclers/basecycler.py +++ b/pyprobe/cyclers/basecycler.py @@ -302,7 +302,6 @@ def _assign_instructions(self) -> None: "Capacity": self.capacity, "Temperature": self.temperature, "Step": self.step, - "Cycle": self.cycle, "Event": self.event, } for quantity in self._column_map.keys(): @@ -353,7 +352,6 @@ def pyprobe_dataframe(self) -> pl.DataFrame: required_columns = [ self.date if "Date" in self._column_map.keys() else None, self.time, - self.cycle, self.step, self.event, self.current, @@ -480,24 +478,6 @@ def step(self) -> pl.Expr: """ return pl.col("Step") - @property - def cycle(self) -> pl.Expr: - """Identify the cycle number. - - Cycles are defined by repetition of steps. They are identified by a decrease - in the step number. - - Returns: - pl.Expr: A polars expression for the cycle number. - """ - return ( - (pl.col("Step").cast(pl.Int64) - pl.col("Step").cast(pl.Int64).shift() < 0) - .fill_null(strategy="zero") - .cum_sum() - .alias("Cycle") - .cast(pl.Int64) - ) - @property def event(self) -> pl.Expr: """Identify the event number. diff --git a/pyprobe/dashboard.py b/pyprobe/dashboard.py index d677db00..a4aeb590 100644 --- a/pyprobe/dashboard.py +++ b/pyprobe/dashboard.py @@ -115,7 +115,7 @@ def dataframe_with_selections(df: pl.DataFrame) -> List[int]: # Select an experiment if selected_raw_data is not None: experiment_names = ( - cell_list[selected_indices[0]].procedure[selected_raw_data].titles + cell_list[selected_indices[0]].procedure[selected_raw_data].experiment_names ) selected_experiment = st.sidebar.multiselect( "Select an experiment", experiment_names @@ -201,7 +201,6 @@ def dataframe_with_selections(df: pl.DataFrame) -> List[int]: tabs = st.tabs(selected_names) columns = [ "Time [s]", - "Cycle", "Step", "Current [A]", "Voltage [V]", diff --git a/pyprobe/filters.py b/pyprobe/filters.py index 2a3fd1b4..60b1d837 100644 --- a/pyprobe/filters.py +++ b/pyprobe/filters.py @@ -1,18 +1,18 @@ """A module for the filtering classes.""" import os import warnings -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast import polars as pl -from pybamm import Experiment as PybammExperiment from pydantic import Field +from pyprobe import utils from pyprobe.rawdata import RawData, default_column_definitions if TYPE_CHECKING: from pyprobe.typing import ( # , FilterToStepType + ExperimentOrCycleType, FilterToCycleType, - FilterToExperimentType, ) @@ -84,10 +84,48 @@ def _step( base_dataframe=base_dataframe, info=filter.info, column_definitions=filter.column_definitions, + step_descriptions=filter.step_descriptions, ) -def _cycle(filter: "FilterToExperimentType", *cycle_numbers: Union[int]) -> "Cycle": +def get_cycle_column(filter: "ExperimentOrCycleType") -> pl.DataFrame | pl.LazyFrame: + """Adds a cycle column to the data. + + If cycle details have been provided in the README, the cycle column will be created + by checking for the last step of the cycle. For nested cycles, the "outer" cycle + will be created first. Subsequent filtering with the cycle method will then allow + for filtering on the "inner" cycles. + + If no cycle details have been provided, the cycle column will be created by + identifying the last step of the cycle by checking for a decrease in the step + number. + + Args: + filter (ExperimentOrCycleType): The experiment or cycle object. + + Returns: + pl.DataFrame | pl.LazyFrame: The data with a cycle column. + """ + if len(filter.cycle_info) > 0: + cycle_ends = ((pl.col("Step").shift() == filter.cycle_info[0][1])) & ( + pl.col("Step") != filter.cycle_info[0][1] + ).fill_null(strategy="zero").cast(pl.Int16) + cycle_column = cycle_ends.cum_sum().fill_null(strategy="zero").alias("Cycle") + else: + warnings.warn( + "No cycle information provided. Cycles will be inferred from the step " + "numbers." + ) + cycle_column = ( + (pl.col("Step").cast(pl.Int64) - pl.col("Step").cast(pl.Int64).shift() < 0) + .fill_null(strategy="zero") + .cum_sum() + .alias("Cycle") + ) + return filter.base_dataframe.with_columns(cycle_column) + + +def _cycle(filter: "ExperimentOrCycleType", *cycle_numbers: Union[int]) -> "Cycle": """Return a cycle object. Filters on the Cycle column. Args: @@ -98,12 +136,21 @@ def _cycle(filter: "FilterToExperimentType", *cycle_numbers: Union[int]) -> "Cyc Returns: Cycle: A cycle object. """ - lf_filtered = _filter_numerical(filter.base_dataframe, "Cycle", cycle_numbers) + df = get_cycle_column(filter) + + if len(filter.cycle_info) > 1: + next_cycle_info = filter.cycle_info[1:] + else: + next_cycle_info = [] + + lf_filtered = _filter_numerical(df, "Cycle", cycle_numbers) return Cycle( base_dataframe=lf_filtered, info=filter.info, column_definitions=filter.column_definitions, + step_descriptions=filter.step_descriptions, + cycle_info=next_cycle_info, ) @@ -229,20 +276,15 @@ def _constant_voltage( class Procedure(RawData): """A class for a procedure in a battery experiment.""" - titles: List[str] - """The titles of the experiments in the procedure.""" - steps_idx: List[List[int]] - """The indices of the steps in each experiment.""" - pybamm_experiment: Optional[PybammExperiment] - """A PyBaMM experiment object for the whole procedure.""" - pybamm_experiment_list: List[Optional[PybammExperiment]] - """A list of PyBaMM experiment objects for each experiment in the procedure.""" + readme_dict: Dict[str, Dict[str, List[str | int | Tuple[int, int, int]]]] base_dataframe: pl.LazyFrame | pl.DataFrame info: Dict[str, Optional[str | int | float]] column_definitions: Dict[str, str] = Field( default_factory=lambda: default_column_definitions.copy() ) + step_descriptions: Dict[str, List[Optional[str | int]]] = {} + cycle_info: List[Tuple[int, int, int]] = [] def model_post_init(self, __context: Any) -> None: """Create a procedure class.""" @@ -257,6 +299,16 @@ def model_post_init(self, __context: Any) -> None: "Procedure Capacity [Ah]", "The net charge passed since beginning of procedure.", ) + self.step_descriptions = {"Step": [], "Description": []} + for experiment in self.readme_dict: + steps = cast(List[int], self.readme_dict[experiment]["Steps"]) + descriptions: List[str | None] = [None] * len(steps) + if "Step Descriptions" in self.readme_dict[experiment]: + descriptions = cast( + List[str | None], self.readme_dict[experiment]["Step Descriptions"] + ) + self.step_descriptions["Step"].extend(steps) + self.step_descriptions["Description"].extend(descriptions) step = _step cycle = _cycle @@ -279,21 +331,57 @@ def experiment(self, *experiment_names: str) -> "Experiment": """ steps_idx = [] for experiment_name in experiment_names: - if experiment_name not in self.titles: + if experiment_name not in self.experiment_names: raise ValueError(f"{experiment_name} not in procedure.") - experiment_number = self.titles.index(experiment_name) - steps_idx.append(self.steps_idx[experiment_number]) - flattened_steps = self._flatten(steps_idx) + steps_idx.append(self.readme_dict[experiment_name]["Steps"]) + flattened_steps = utils.flatten_list(steps_idx) conditions = [ pl.col("Step").is_in(flattened_steps), ] lf_filtered = self.base_dataframe.filter(conditions) + cycles_list: List[Tuple[int, int, int]] = [] + if len(experiment_names) > 1: + warnings.warn( + "Multiple experiments selected. Cycles will be inferred from " + "the step numbers." + ) + elif "Cycles" in self.readme_dict[experiment_names[0]]: + # ignore type on below line due to persistent mypy warnings about + # incompatible types + cycles_list = self.readme_dict[experiment_names[0]][ + "Cycles" + ] # type: ignore + return Experiment( base_dataframe=lf_filtered, info=self.info, column_definitions=self.column_definitions, + step_descriptions=self.step_descriptions, + cycle_info=cycles_list, ) + def remove_experiment(self, *experiment_names: str) -> None: + """Remove an experiment from the procedure. + + Args: + experiment_names (str): + Variable-length argument list of experiment names. + """ + steps_idx = [] + for experiment_name in experiment_names: + if experiment_name not in self.experiment_names: + raise ValueError(f"{experiment_name} not in procedure.") + steps_idx.append(self.readme_dict[experiment_name]["Steps"]) + flattened_steps = utils.flatten_list(steps_idx) + conditions = [ + pl.col("Step").is_in(flattened_steps).not_(), + ] + + self.base_dataframe = self.base_dataframe.filter(conditions) + for experiment_name in experiment_names: + self.readme_dict.pop(experiment_name) + self.model_post_init(self) + @property def experiment_names(self) -> List[str]: """Return the names of the experiments in the procedure. @@ -301,7 +389,7 @@ def experiment_names(self) -> List[str]: Returns: List[str]: The names of the experiments in the procedure. """ - return list(self.titles) + return list(self.readme_dict.keys()) def add_external_data( self, @@ -357,21 +445,6 @@ def load_external_file(self, filepath: str) -> pl.LazyFrame: case _: raise ValueError(f"Unsupported file type: {file_ext}") - @classmethod - def _flatten(cls, lst: int | List[Any]) -> List[int]: - """Flatten a list of lists into a single list. - - Args: - lst (list): The list of lists to flatten. - - Returns: - list: The flattened list. - """ - if not isinstance(lst, list): - return [lst] - else: - return [item for sublist in lst for item in cls._flatten(sublist)] - class Experiment(RawData): """A class for an experiment in a battery experimental procedure.""" @@ -381,6 +454,7 @@ class Experiment(RawData): column_definitions: Dict[str, str] = Field( default_factory=lambda: default_column_definitions.copy() ) + cycle_info: List[Tuple[int, int, int]] def model_post_init(self, __context: Any) -> None: """Create an experiment class.""" @@ -414,6 +488,7 @@ class Cycle(RawData): column_definitions: Dict[str, str] = Field( default_factory=lambda: default_column_definitions.copy() ) + cycle_info: List[Tuple[int, int, int]] def model_post_init(self, __context: Any) -> None: """Create a cycle class.""" diff --git a/pyprobe/rawdata.py b/pyprobe/rawdata.py index 82a96237..aa349998 100644 --- a/pyprobe/rawdata.py +++ b/pyprobe/rawdata.py @@ -1,7 +1,8 @@ """A module for the RawData class.""" -from typing import Dict, Optional +from typing import Dict, List, Optional import polars as pl +import pybamm from pydantic import Field, field_validator from pyprobe.result import Result @@ -9,7 +10,6 @@ required_columns = [ "Time [s]", "Step", - "Cycle", "Event", "Current [A]", "Voltage [V]", @@ -55,6 +55,7 @@ class RawData(Result): column_definitions: Dict[str, str] = Field( default_factory=lambda: default_column_definitions.copy() ) + step_descriptions: Dict[str, List[Optional[str | int]]] = {} @field_validator("base_dataframe") @classmethod @@ -196,3 +197,47 @@ def set_reference_capacity( + reference_capacity ).alias("Capacity - Referenced [Ah]") ) + + @property + def pybamm_experiment(self) -> pybamm.Experiment: + """Return a PyBaMM experiment object for the filtered section of data. + + Returns: + pybamm.Experiment: The PyBaMM experiment object. + """ + step_description_df = pl.LazyFrame(self.step_descriptions) + no_step_descriptions = step_description_df.filter( + pl.col("Description").is_null() + ) + missing_steps = ( + no_step_descriptions.select("Step").collect().to_numpy().flatten() + ) + if len(missing_steps) > 0: + raise ValueError( + f"Descriptions for steps {str(missing_steps)} are missing." + f" Unable to create a PyBaMM experiment object. Please " + f"filter the data to a section with descriptions for all " + f"steps to create an experiment." + ) + + # reduce the full dataframe to only the steps as they appear in order in + # the data + only_steps = self.base_dataframe.filter( + pl.col("Step") != pl.col("Step").shift(1) + ).select("Step") + # match the step with its description + all_steps_with_descriptions = only_steps.join( + step_description_df, on="Step", how="left" + ) + if isinstance(all_steps_with_descriptions, pl.LazyFrame): + all_steps_with_descriptions = all_steps_with_descriptions.select( + "Description" + ).collect() + # form a list of all the descriptions + all_steps_with_descriptions = all_steps_with_descriptions.to_numpy().flatten() + description_list = [] + for description in all_steps_with_descriptions: + line = description.split(",") + for item in line: + description_list.append(item.strip()) + return pybamm.Experiment(description_list) diff --git a/pyprobe/readme_processor.py b/pyprobe/readme_processor.py index 7d1ae95f..6d628140 100644 --- a/pyprobe/readme_processor.py +++ b/pyprobe/readme_processor.py @@ -1,316 +1,102 @@ """Module for processing PyPrBE README files.""" -import warnings -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Tuple, Union, cast -import pybamm import yaml -from pydantic import BaseModel, Field, model_validator +from pyprobe import utils -class ReadmeModel(BaseModel): - """A Pydantic BaseModel class for processing the README.yaml file.""" - readme_dict: Dict[str, Any] - """A dictionary containing the contents of the README.yaml file.""" - readme_type: List[str] = Field(default_factory=list) - """A list of strings indicating the README format for each experiment.""" - number_of_experiments: int = Field(default_factory=int) - """The number of experiments in the README file.""" - titles: List[str] = Field(default_factory=list) - """A list of strings containing the titles of the experiments.""" - step_numbers: List[List[int]] = Field(default_factory=list) - """A list of lists containing the step numbers for each experiment.""" - step_indices: List[List[int]] = Field(default_factory=list) - """A list of lists containing the step indices for each experiment.""" - step_descriptions: List[List[str]] = Field(default_factory=list) - """A list of lists containing the step descriptions for each experiment.""" - cycle_details: List[List[Tuple[int, int, int]]] = Field(default_factory=list) - """A list of lists containing the cycle details for each experiment.""" - pybamm_experiment_descriptions: List[Tuple[str, ...]] = Field(default_factory=list) - """A list of tuples containing the PyBaMM experiment descriptions for each - experiment.""" - pybamm_experiment_list: List[pybamm.Experiment] = Field(default_factory=list) - """A list of PyBaMM experiment objects for each experiment.""" - pybamm_experiment: Optional[pybamm.Experiment] = Field(default=None) - """A PyBaMM experiment object for all experiments.""" +class ReadmeModel: + """A class for processing the README.yaml file.""" - class Config: - """Pydantic configuration settings.""" + def __init__(self, readme_dict: Dict[str, Any]) -> None: + """Initialize the ReadmeModel class.""" + self.readme_dict = readme_dict + experiment_names = self.readme_dict.keys() - arbitrary_types_allowed = True - - @model_validator(mode="before") - @classmethod - def check_readme_dict(cls, data: Any) -> "ReadmeModel": - """Validate the structure of the README.yaml file. - - Args: - data (Any): The data to be validated. - - Returns: - ReadmeModel: The validated data. - - Raises: - TypeError: If the 'Steps' field is not a dictionary or a list. - TypeError: - If the 'Steps' field is a dictionary and the keys are not integers - or the values are not strings. - TypeError: If the 'Steps' field is a list and the values are not strings. - TypeError: If the 'cycle' fields are not dictionaries. - TypeError: - If the 'cycle' fields are dictionaries and the keys are not strings - or the values are not integers. - """ - readme_dict = data["readme_dict"] - data["readme_type"] = [] - cls.number_of_experiments = len(readme_dict) - for experiment in readme_dict: - if "Steps" in readme_dict[experiment]: - steps = readme_dict[experiment]["Steps"] - if isinstance(steps, dict): - if not all( - isinstance(k, int) and isinstance(v, str) - for k, v in steps.items() - ): - raise TypeError( - "The 'Steps' field must be a dictionary with keys of type" - " int and values of type str" - ) - cycle_keys = [ - key - for key in readme_dict[experiment].keys() - if "cycle" in key.lower() - ] - for cycle in cycle_keys: - cycle_dict = readme_dict[experiment][cycle] - if not all( - isinstance(k, str) and isinstance(v, int) - for k, v in cycle_dict.items() - ): - raise TypeError( - f"{cycle} must be a dictionary with keys of type str" - " and values of type int" - ) - data["readme_type"].append("explicit") - elif isinstance(steps, list): - if not all(isinstance(step, str) for step in steps): - raise TypeError("The 'Steps' field must be a list of strings") - data["readme_type"].append("implicit") - elif "Total Steps" in readme_dict[experiment]: - data["readme_type"].append("total") - return data - - def model_post_init(self, __context: Any) -> None: - """Get all the attributes of the class.""" - self.titles = list(self.readme_dict.keys()) - self.step_numbers = self._get_step_numbers() - self.step_indices = self._get_step_indices() - self.step_descriptions = self._get_step_descriptions() - self.cycle_details = self._get_cycle_details() - self.pybamm_experiment_descriptions = self._get_pybamm_experiment_descriptions() - self.pybamm_experiment_list = self._get_pybamm_experiment_list() - self.pybamm_experiment = self._get_pybamm_experiment() - - def _get_step_numbers(self) -> List[List[int]]: - """Get the step numbers from the README.yaml file. - - Returns: - List[List[int]]: - A list of lists containing the step numbers for each - experiment. - """ - max_step = 0 - all_steps = [] - for experiment, readme_format in zip(self.readme_dict, self.readme_type): - if readme_format == "explicit": - exp_steps = list(self.readme_dict[experiment]["Steps"].keys()) - elif readme_format == "total": - exp_steps = list(range(self.readme_dict[experiment]["Total Steps"])) - exp_steps = [x + max_step + 1 for x in exp_steps] + self.experiment_dict: Dict[ + str, Dict[str, List[str | int | Tuple[int, int, int]]] + ] = {name: {} for name in experiment_names} + self.step_details = None + for experiment_name in experiment_names: + if "Steps" in self.readme_dict[experiment_name].keys(): + if isinstance(self.readme_dict[experiment_name]["Steps"], dict): + self._process_explicit_experiment(experiment_name) + elif isinstance(self.readme_dict[experiment_name]["Steps"], list): + self._process_implicit_experiment(experiment_name) + else: + raise ValueError("Invalid format for steps in README file") + elif "Total Steps" in self.readme_dict[experiment_name].keys(): + self._process_total_steps_experiment(experiment_name) else: - exp_steps = list(range(len(self.readme_dict[experiment]["Steps"]))) - exp_steps = [x + max_step + 1 for x in exp_steps] - max_step = exp_steps[-1] - all_steps.append(exp_steps) - return all_steps - - def _get_step_indices(self) -> List[List[int]]: - """Get the step indices from the README.yaml file. - - Returns: - List[List[int]]: - A list of lists containing the step indices for each - experiment. - """ - step_indices = [] - for exp_step_numbers in self.step_numbers: - step_indices.append(list(range(len(exp_step_numbers)))) - return step_indices - - def _get_step_descriptions(self) -> List[List[str]]: - """Get the step descriptions from the README.yaml file. - - Returns: - List[List[str]]: - A list of lists containing the step descriptions for each - experiment. - """ - all_descriptions = [] - for experiment, readme_format in zip(self.readme_dict, self.readme_type): - if readme_format == "explicit": - exp_step_descriptions = list( - self.readme_dict[experiment]["Steps"].values() + raise ValueError( + "Each experiment must have a 'Steps' or 'Total Steps' key." ) - elif readme_format == "implicit": - exp_step_descriptions = self.readme_dict[experiment]["Steps"] - else: - exp_step_descriptions = [] - all_descriptions.append(exp_step_descriptions) - return all_descriptions - - def _get_cycle_details(self) -> List[List[Tuple[int, int, int]]]: - """Get the cycle details from the README.yaml file. - - Returns: - List[List[Tuple[int, int, int]]]: - A list of lists containing the cycle details for each - experiment. - """ - cycles = [] - for experiment, readme_format, step_numbers in zip( - self.readme_dict, self.readme_type, self.step_numbers - ): - exp_cycles = [] - if readme_format == "explicit": - cycle_keys = [ - key - for key in self.readme_dict[experiment].keys() - if "cycle" in key.lower() - ] - for cycle in cycle_keys: - cycle_dict = self.readme_dict[experiment][cycle] - start = cycle_dict["Start"] - end = cycle_dict["End"] - count = cycle_dict["Count"] - exp_cycles.append( - (step_numbers.index(start), step_numbers.index(end), count) - ) - cycles.append(exp_cycles) - return cycles - def _get_pybamm_experiment_descriptions(self) -> List[Tuple[str, ...]]: - """Get the PyBaMM experiment objects from the README.yaml file. + def _process_explicit_experiment(self, experiment_name: str) -> None: + """Process an experiment with explicit step numbers. - Returns: - List[Tuple[str, ...]]: - A list of tuples containing the PyBaMM experiment descriptions for each - experiment. + Args: + experiment_name (str): The name of the experiment. """ - all_descriptions = [] - for step_descriptions, step_indices, step_numbers, cycle_details in zip( - self.step_descriptions, - self.step_indices, - self.step_numbers, - self.cycle_details, - ): - final_descriptions = [] - if len(step_descriptions) > 0: - expanded_indices = self._expand_cycles(step_indices, cycle_details) - expanded_descriptions = [step_descriptions[i] for i in expanded_indices] - # split any descriptions seperated by commas - for desciption in expanded_descriptions: - line = desciption.split(",") - for item in line: - final_descriptions.append(item.strip()) - all_descriptions.append(tuple(final_descriptions)) - return all_descriptions - - def _get_pybamm_experiment_list(self) -> List[pybamm.Experiment]: - """Get the PyBaMM experiment objects from the README.yaml file. + step_numbers = list(self.readme_dict[experiment_name]["Steps"].keys()) + step_descriptions = list(self.readme_dict[experiment_name]["Steps"].values()) + cycle_keys = [ + key for key in self.readme_dict[experiment_name] if "cycle" in key.lower() + ] + exp_cycles: List[str | int | Tuple[int, int, int]] = [] + for cycle in cycle_keys: + start = self.readme_dict[experiment_name][cycle]["Start"] + end = self.readme_dict[experiment_name][cycle]["End"] + count = self.readme_dict[experiment_name][cycle]["Count"] + exp_cycles.append((start, end, count)) + self.experiment_dict[experiment_name]["Steps"] = step_numbers + self.experiment_dict[experiment_name]["Step Descriptions"] = step_descriptions + self.experiment_dict[experiment_name]["Cycles"] = exp_cycles + + def _process_implicit_experiment(self, experiment_name: str) -> None: + """Process an experiment with implicit step numbers. - Returns: - List[pybamm.Experiment]: - A list of PyBaMM experiment objects for each experiment. + Args: + experiment_name (str): The name of the experiment. """ - pybamm_experiments = [] - for experiment, descriptions in zip( - self.readme_dict, self.pybamm_experiment_descriptions - ): - if len(descriptions) > 0: - try: - pybamm_experiments.append(pybamm.Experiment(descriptions)) - except Exception as e: - warnings.warn( - f"PyBaMM experiment could not be created for experiment:" - f" {experiment}. {e}" - ) - pybamm_experiments.append(None) - else: - pybamm_experiments.append(None) - warnings.warn( - f"PyBaMM experiment could not be created for experiment:" - f" {experiment} as there are no step descriptions." - ) - return pybamm_experiments - - def _get_pybamm_experiment(self) -> Optional[pybamm.Experiment]: - """Get the PyBaMM experiment object from the README.yaml file. + max_step = self._get_max_step() + step_descriptions = self.readme_dict[experiment_name]["Steps"] + step_numbers = list(range(max_step + 1, max_step + len(step_descriptions) + 1)) - Returns: - Optional[pybamm.Experiment]: - A PyBaMM experiment object for all experiments. - """ - if any(exp is None for exp in self.pybamm_experiment_list): - warnings.warn( - "Some experiments do not have valid step descriptions." - " Unable to create PyBaMM experiment." - ) - return None - else: - all_descriptions = [exp for exp in self.pybamm_experiment_descriptions] - return pybamm.Experiment(all_descriptions) + self.experiment_dict[experiment_name]["Steps"] = cast( + List[Union[str, int, Tuple[int, int, int]]], step_numbers + ) # cast to satisfy mypy + self.experiment_dict[experiment_name]["Step Descriptions"] = step_descriptions + self.experiment_dict[experiment_name]["Cycles"] = [] - @staticmethod - def _expand_cycles( - indices: List[int], cycles: List[Tuple[int, int, int]] - ) -> List[int]: - """Expand a list of cycle details into a list of step indices. + def _process_total_steps_experiment(self, experiment_name: str) -> None: + """Process an experiment with total steps. Args: - indices (List[int]): A list of step indices. - cycles (List[Tuple[int, int, int]]): A list of cycle details. + experiment_name (str): The name of the experiment. + """ + total_steps = self.readme_dict[experiment_name]["Total Steps"] + max_step = self._get_max_step() + step_numbers = list(range(max_step + 1, max_step + total_steps + 1)) + self.experiment_dict[experiment_name]["Steps"] = cast( + List[Union[str, int, Tuple[int, int, int]]], step_numbers + ) # cast to satisfy mypy + self.experiment_dict[experiment_name]["Step Descriptions"] = [] + self.experiment_dict[experiment_name]["Cycles"] = [] + + def _get_max_step(self) -> int: + """Get the maximum step number from the experiment dictionary. Returns: - List[int]: - A list of step indices expanded with every step in the order of occurrence. + int: The maximum step number from previously processed experiments. """ - if len(cycles) == 0: - return indices - repeated_list = indices - for cycle in cycles: - # cycle = (start, end, repeats) - start = cycle[0] - end = cycle[1] + 1 - repeats = cycle[2] - - # get sublist - sublist = indices[start:end] - - # repeat sublist - repeated_sublist = sublist * repeats - - # insert repeated sublist into repeated_list - result: List[int] = [] - i = 0 - while i < len(repeated_list): - if repeated_list[i : i + len(sublist)] == sublist: - result.extend(repeated_sublist) - i += len(sublist) - else: - result.append(repeated_list[i]) - i += 1 - repeated_list = result - return result + all_steps = [ + experiment["Steps"] + for experiment in self.experiment_dict.values() + if "Steps" in experiment + ] + return max(utils.flatten_list(all_steps)) if all_steps else 0 def process_readme( diff --git a/pyprobe/result.py b/pyprobe/result.py index 009942c8..a1776e1a 100644 --- a/pyprobe/result.py +++ b/pyprobe/result.py @@ -1,12 +1,12 @@ """A module for the Result class.""" import warnings from pprint import pprint -from typing import Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np import polars as pl from numpy.typing import NDArray -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator from pyprobe.units import unit_from_regexp @@ -40,6 +40,14 @@ class Config: column_definitions: Dict[str, str] = Field(default_factory=dict) """A dictionary containing the definitions of the columns in the data.""" + @model_validator(mode="before") + @classmethod + def _load_base_dataframe(cls, data: Any) -> Any: + """Load the base dataframe from a file if provided as a string.""" + if "base_dataframe" in data and isinstance(data["base_dataframe"], str): + data["base_dataframe"] = pl.scan_parquet(data["base_dataframe"]) + return data + def __call__(self, column_name: str) -> NDArray[np.float64]: """Return columns of the data as numpy arrays. diff --git a/pyprobe/typing.py b/pyprobe/typing.py index 09644be0..26b0b913 100644 --- a/pyprobe/typing.py +++ b/pyprobe/typing.py @@ -15,3 +15,5 @@ """Type alias for raw data in PyProbe.""" PyProBEDataType = Union[PyProBERawDataType, Result] """Type alias for data in PyProbe.""" +ExperimentOrCycleType = Union[Experiment, Cycle] +"""Type alias for an experiment or cycle.""" diff --git a/pyprobe/utils.py b/pyprobe/utils.py new file mode 100644 index 00000000..114777b1 --- /dev/null +++ b/pyprobe/utils.py @@ -0,0 +1,17 @@ +"""A collection of utility functions for PyProBE.""" +from typing import Any, List + + +def flatten_list(lst: int | List[Any]) -> List[int]: + """Flatten a list of lists into a single list. + + Args: + lst (list): The list of lists to flatten. + + Returns: + list: The flattened list. + """ + if not isinstance(lst, list): + return [lst] + else: + return [item for sublist in lst for item in flatten_list(sublist)] diff --git a/pyproject.toml b/pyproject.toml index 9c4762d7..d49a27e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,10 +44,11 @@ dev = [ "pytest-mypy", # type checking "pytest-mock", # mock objects "pytest-benchmark", # benchmarking + "types-PyYAML", + "types-toml", # documentation "sphinx", "sphinx-tabs", - "types-PyYAML", "sphinxcontrib-bibtex", # for references "sphinx-design", "pydata-sphinx-theme", # for theme diff --git a/requirements-dev.txt b/requirements-dev.txt index 9bb2c2c8..8f883ea4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -432,6 +432,8 @@ traitlets==5.14.3 # nbsphinx types-pyyaml==6.0.12.20240808 # via PyProBE (pyproject.toml) +types-toml==0.10.8.20240310 + # via PyProBE (pyproject.toml) typing-extensions==4.12.2 # via # altair diff --git a/tests/analysis/test_cycling.py b/tests/analysis/test_cycling.py index dacf929c..75b35085 100644 --- a/tests/analysis/test_cycling.py +++ b/tests/analysis/test_cycling.py @@ -10,9 +10,14 @@ @pytest.fixture -def Cycling_fixture(lazyframe_fixture, info_fixture): +def Cycling_fixture(lazyframe_fixture, info_fixture, step_descriptions_fixture): """Return a Cycling instance.""" - input_data = Experiment(base_dataframe=lazyframe_fixture, info=info_fixture) + input_data = Experiment( + base_dataframe=lazyframe_fixture, + info=info_fixture, + step_descriptions=step_descriptions_fixture, + cycle_info=[], + ) return Cycling(input_data=input_data) diff --git a/tests/analysis/test_dma.py b/tests/analysis/test_dma.py index 59ba00a5..c36a930b 100644 --- a/tests/analysis/test_dma.py +++ b/tests/analysis/test_dma.py @@ -305,13 +305,6 @@ def test_calculate_dma_parameters( info={}, ) - with pytest.raises(ValidationError): - eol_result_fixture.quantify_degradation_modes(bol_result_fixture.fitted_OCV) - - eol_result_fixture.stoichiometry_limits = result - with pytest.raises(ValidationError): - eol_result_fixture.quantify_degradation_modes(bol_result_fixture) - def test_average_ocvs(BreakinCycles_fixture): """Test the average_ocvs method.""" diff --git a/tests/analysis/test_pulsing.py b/tests/analysis/test_pulsing.py index a0842a46..0f0a5f9c 100644 --- a/tests/analysis/test_pulsing.py +++ b/tests/analysis/test_pulsing.py @@ -19,8 +19,12 @@ def Pulsing_fixture(procedure_fixture): def test_pulse(Pulsing_fixture): """Test the pulse method.""" pulse = Pulsing_fixture.pulse(0) + assert pulse.data["Time [s]"][0] == 483572.397 + assert (pulse.data["Step"] == 10).all() + + pulse = Pulsing_fixture.pulse(6) + assert pulse.data["Time [s]"][0] == 531149.401 assert (pulse.data["Step"] == 10).all() - assert (pulse.data["Cycle"] == 4).all() def test_pulse_summary(Pulsing_fixture): diff --git a/tests/conftest.py b/tests/conftest.py index f4c70003..d1e4e9b0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -59,6 +59,27 @@ def step_names_fixture(): ] +@pytest.fixture +def step_descriptions_fixture(): + """Pytest fixture for example step descriptions.""" + return { + "Step": [1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12], + "Description": [ + "Rest for 4 hours", + "Charge at 4mA until 4.2 V, Hold at 4.2 V until 0.04 A", + "Rest for 2 hours", + "Discharge at 4 mA until 3 V", + "Rest for 2 hours", + "Charge at 4 mA until 4.2 V, Hold at 4.2 V until 0.04 A", + "Rest for 2 hours", + "Rest for 10 seconds", + "Discharge at 20 mA for 0.2 hours or until 3 V", + "Rest for 30 minutes", + "Rest for 1.5 hours", + ], + } + + @pytest.fixture def procedure_fixture(info_fixture): """Pytest fixture for example procedure.""" diff --git a/tests/cyclers/test_arbin.py b/tests/cyclers/test_arbin.py index 3e2d5c29..4f90ad96 100644 --- a/tests/cyclers/test_arbin.py +++ b/tests/cyclers/test_arbin.py @@ -3,7 +3,7 @@ from pyprobe.cyclers.arbin import Arbin -def test_read_and_process(benchmark): +def test_read_and_process_arbin(benchmark): """Test the full process of reading and processing a file.""" arbin_cycler = Arbin( input_data_path="tests/sample_data/arbin/sample_data_arbin.csv" @@ -17,7 +17,6 @@ def read_and_process(): "Date", "Time [s]", "Step", - "Cycle", "Event", "Current [A]", "Voltage [V]", diff --git a/tests/cyclers/test_basecycler.py b/tests/cyclers/test_basecycler.py index 57537d79..06578f52 100644 --- a/tests/cyclers/test_basecycler.py +++ b/tests/cyclers/test_basecycler.py @@ -41,7 +41,6 @@ def sample_pyprobe_dataframe(): "2022-02-02 02:02:02.000000", ], "Time [s]": [1.0, 2.0, 3.0], - "Cycle": [0, 0, 0], "Step": [1, 2, 3], "Event": [0, 1, 2], "Current [A]": [7.0e-3, 8.0e-3, 9.0e-3], diff --git a/tests/cyclers/test_basytec.py b/tests/cyclers/test_basytec.py index 761f808e..b4335aa7 100644 --- a/tests/cyclers/test_basytec.py +++ b/tests/cyclers/test_basytec.py @@ -28,7 +28,6 @@ def test_read_and_process_basytec(): "Date", "Time [s]", "Step", - "Cycle", "Event", "Current [A]", "Voltage [V]", @@ -40,7 +39,6 @@ def test_read_and_process_basytec(): { "Date": datetime(2023, 6, 19, 17, 58, 3, 235803), "Time [s]": [70.235804], - "Cycle": [0], "Step": [4], "Event": [1], "Current [A]": [0.449602], diff --git a/tests/cyclers/test_biologic.py b/tests/cyclers/test_biologic.py index 9e94aa27..64e3309a 100644 --- a/tests/cyclers/test_biologic.py +++ b/tests/cyclers/test_biologic.py @@ -72,7 +72,6 @@ def read_and_process(): "Date", "Time [s]", "Step", - "Cycle", "Event", "Current [A]", "Voltage [V]", @@ -100,7 +99,6 @@ def read_and_process(): "Date", "Time [s]", "Step", - "Cycle", "Event", "Current [A]", "Voltage [V]", diff --git a/tests/cyclers/test_maccor.py b/tests/cyclers/test_maccor.py index dfbd7588..f74ad976 100644 --- a/tests/cyclers/test_maccor.py +++ b/tests/cyclers/test_maccor.py @@ -18,7 +18,6 @@ def test_read_and_process_maccor(): "Date", "Time [s]", "Step", - "Cycle", "Event", "Current [A]", "Voltage [V]", @@ -30,7 +29,6 @@ def test_read_and_process_maccor(): { "Date": datetime(2023, 11, 23, 15, 56, 24, 60000), "Time [s]": [13.06], - "Cycle": [0], "Step": [2], "Event": [1], "Current [A]": [28.798], diff --git a/tests/cyclers/test_neware.py b/tests/cyclers/test_neware.py index d64fd05e..306790b3 100644 --- a/tests/cyclers/test_neware.py +++ b/tests/cyclers/test_neware.py @@ -181,7 +181,6 @@ def read_and_process(): "Date", "Time [s]", "Step", - "Cycle", "Event", "Current [A]", "Voltage [V]", diff --git a/tests/sample_data/neware/README_implicit.yaml b/tests/sample_data/neware/README_implicit.yaml index b6a4da1d..c7661004 100644 --- a/tests/sample_data/neware/README_implicit.yaml +++ b/tests/sample_data/neware/README_implicit.yaml @@ -1,17 +1,17 @@ Initial Charge: Steps: - "Rest for 4 hours" - - "Charge at constant current of 4mA and constant voltage of 4.2V until current drops to 0.04A" + - "Charge at 4mA until 4.2 V, Hold at 4.2 V until 0.04 A" - "Rest for 2 hours" Break-in Cycles: Steps: - - "Discharge at constant current of 4mA until voltage reaches 3V" + - "Discharge at 4 mA until 3 V" - "Rest for 2 hours" - - "Charge at constant current of 4mA and constant voltage of 4.2V until current drops to 0.04A" + - "Charge at 4 mA until 4.2 V, Hold at 4.2 V until 0.04 A" - "Rest for 2 hours" Discharge Pulses: Steps: - "Rest for 10 seconds" - - "Discharge at constant current of 20mA until 4mAh has passed or voltage reaches 3V" + - "Discharge at 20 mA for 0.2 hours or until 3 V" - "Rest for 30 minutes" - "Rest for 1.5 hours" \ No newline at end of file diff --git a/tests/sample_data/neware/sample_data_neware.parquet b/tests/sample_data/neware/sample_data_neware.parquet index 99517505..bacb2375 100644 Binary files a/tests/sample_data/neware/sample_data_neware.parquet and b/tests/sample_data/neware/sample_data_neware.parquet differ diff --git a/tests/sample_data/neware/sample_data_neware_ref.parquet b/tests/sample_data/neware/sample_data_neware_ref.parquet index dd07d8d1..bacb2375 100644 Binary files a/tests/sample_data/neware/sample_data_neware_ref.parquet and b/tests/sample_data/neware/sample_data_neware_ref.parquet differ diff --git a/tests/test_cell.py b/tests/test_cell.py index a39bc245..d70350bb 100644 --- a/tests/test_cell.py +++ b/tests/test_cell.py @@ -1,9 +1,14 @@ """Tests for the Cell class.""" import copy +import json import os +import shutil import polars as pl +import pybamm import pytest +import toml +from numpy.testing import assert_array_equal from polars.testing import assert_frame_equal import pyprobe @@ -110,7 +115,6 @@ def test_process_generic_file(cell_instance): expected_df = pl.DataFrame( { "Time [s]": [1.0, 2.0, 3.0], - "Cycle": [0, 0, 0], "Step": [1, 2, 3], "Event": [0, 1, 2], "Current [A]": [7.0, 8.0, 9.0], @@ -142,3 +146,225 @@ def add_procedure(): assert_frame_equal( cell_instance.procedure["Test_custom"].data, procedure_fixture.data ) + + +def test_import_pybamm_solution(benchmark): + """Test the import_pybamm_solution method.""" + parameter_values = pybamm.ParameterValues("Chen2020") + spm = pybamm.lithium_ion.SPM() + experiment = pybamm.Experiment( + [ + ( + "Discharge at C/10 for 10 hours or until 3.3 V", + "Rest for 1 hour", + "Charge at 1 A until 4.1 V", + "Hold at 4.1 V until 50 mA", + "Rest for 1 hour", + ) + ] + * 3 + + [ + "Discharge at 1C until 3.3 V", + ] + ) + sim = pybamm.Simulation( + spm, experiment=experiment, parameter_values=parameter_values + ) + sol = sim.solve() + cell_instance = Cell(info={}) + cell_instance.import_pybamm_solution( + procedure_name="PyBaMM", + pybamm_solutions=sol, + experiment_names="Test", + ) + assert_array_equal( + cell_instance.procedure["PyBaMM"].experiment("Test").get("Voltage [V]"), + sol["Terminal voltage [V]"].entries, + ) + assert_array_equal( + cell_instance.procedure["PyBaMM"].experiment("Test").get("Current [A]"), + sol["Current [A]"].entries * -1, + ) + assert_array_equal( + cell_instance.procedure["PyBaMM"].experiment("Test").get("Time [s]"), + sol["Time [s]"].entries, + ) + assert_array_equal( + cell_instance.procedure["PyBaMM"].experiment("Test").get("Capacity [Ah]"), + sol["Discharge capacity [A.h]"].entries * -1, + ) + + # test filtering by cycle and step + assert_array_equal( + cell_instance.procedure["PyBaMM"] + .experiment("Test") + .cycle(1) + .get("Voltage [V]"), + sol.cycles[1]["Terminal voltage [V]"].entries, + ) + assert_array_equal( + cell_instance.procedure["PyBaMM"] + .experiment("Test") + .cycle(1) + .step(3) + .get("Current [A]"), + sol.cycles[1].steps[3]["Current [A]"].entries * -1, + ) + + assert cell_instance.procedure["PyBaMM"].readme_dict["Test"]["Steps"] == [ + 0, + 1, + 2, + 3, + 4, + ] + + # test with multiple experiments from different simulations + experiment2 = pybamm.Experiment( + [ + ( + "Discharge at 1C for 10 hours or until 3.3 V", + "Rest for 1 hour", + "Charge at 1 A until 4.1 V", + "Hold at 4.1 V until 50 mA", + "Rest for 1 hour", + ) + ] + * 5 + ) + sim2 = pybamm.Simulation( + spm, experiment=experiment2, parameter_values=parameter_values + ) + + sol2 = sim2.solve(starting_solution=sol) + + def add_two_experiments(): + return cell_instance.import_pybamm_solution( + procedure_name="PyBaMM two experiments", + pybamm_solutions=[sol, sol2], + experiment_names=["Test1", "Test2"], + ) + + benchmark(add_two_experiments) + assert set( + cell_instance.procedure["PyBaMM two experiments"].experiment_names + ) == set(["Test1", "Test2"]) + assert_array_equal( + cell_instance.procedure["PyBaMM two experiments"].get("Voltage [V]"), + sol2["Terminal voltage [V]"].entries, + ) + assert_array_equal( + cell_instance.procedure["PyBaMM two experiments"] + .experiment("Test1") + .get("Voltage [V]"), + sol["Terminal voltage [V]"].entries, + ) + sol_length = len(sol["Terminal voltage [V]"].entries) + assert_array_equal( + cell_instance.procedure["PyBaMM two experiments"] + .experiment("Test2") + .get("Voltage [V]"), + sol2["Terminal voltage [V]"].entries[sol_length:], + ) + + # test reading and writing to parquet + cell_instance.import_pybamm_solution( + procedure_name="PyBaMM", + pybamm_solutions=sol, + experiment_names="Test", + output_data_path="tests/sample_data/pybamm.parquet", + ) + written_data = pl.read_parquet("tests/sample_data/pybamm.parquet") + assert_frame_equal( + cell_instance.procedure["PyBaMM"].data.drop( + ["Procedure Time [s]", "Procedure Capacity [Ah]"] + ), + written_data, + ) + os.remove("tests/sample_data/pybamm.parquet") + + +def test_archive(cell_instance): + """Test archiving and loading a cell.""" + input_path = "tests/sample_data/neware/" + file_name = "sample_data_neware.parquet" + title = "Test" + + cell_instance.add_procedure(title, input_path, file_name) + cell_instance.archive(input_path + "archive") + assert os.path.exists(input_path + "archive") + + cell_from_file = pyprobe.load_archive(input_path + "archive") + assert cell_instance.procedure.keys() == cell_from_file.procedure.keys() + assert cell_instance.info == cell_from_file.info + assert ( + cell_instance.procedure[title].readme_dict + == cell_from_file.procedure[title].readme_dict + ) + assert ( + cell_instance.procedure[title].column_definitions + == cell_from_file.procedure[title].column_definitions + ) + assert ( + cell_instance.procedure[title].step_descriptions + == cell_from_file.procedure[title].step_descriptions + ) + assert ( + cell_instance.procedure[title].cycle_info + == cell_from_file.procedure[title].cycle_info + ) + assert_frame_equal( + cell_instance.procedure[title].base_dataframe, + cell_from_file.procedure[title].base_dataframe, + ) + + # test loading an incorrect pyprobe version + with open(os.path.join(input_path, "archive", "metadata.json"), "r") as f: + metadata = json.load(f) + metadata["PyProBE Version"] = "0.0.0" + with open(os.path.join(input_path, "archive", "metadata.json"), "w") as f: + json.dump(metadata, f) + pyproject_path = os.path.join(os.path.dirname(__file__), "..", "pyproject.toml") + pyproject_data = toml.load(pyproject_path) + with pytest.warns( + UserWarning, + match=( + f"The PyProBE version used to archive the cell was " + f"{metadata['PyProBE Version']}, the current version is " + f"{pyproject_data['project']['version']}. There may be compatibility" + f" issues." + ), + ): + cell_from_file = pyprobe.load_archive(input_path + "archive") + + shutil.rmtree(input_path + "archive") + + # test with zip file + cell_instance.archive(input_path + "archive.zip") + assert os.path.exists(input_path + "archive.zip") + assert not os.path.exists(input_path + "archive") + cell_from_file = pyprobe.load_archive(input_path + "archive.zip") + assert cell_instance.procedure.keys() == cell_from_file.procedure.keys() + assert cell_instance.info == cell_from_file.info + assert ( + cell_instance.procedure[title].readme_dict + == cell_from_file.procedure[title].readme_dict + ) + assert ( + cell_instance.procedure[title].column_definitions + == cell_from_file.procedure[title].column_definitions + ) + assert ( + cell_instance.procedure[title].step_descriptions + == cell_from_file.procedure[title].step_descriptions + ) + assert ( + cell_instance.procedure[title].cycle_info + == cell_from_file.procedure[title].cycle_info + ) + assert_frame_equal( + cell_instance.procedure[title].base_dataframe, + cell_from_file.procedure[title].base_dataframe, + ) + + shutil.rmtree(input_path + "archive") diff --git a/tests/test_filter.py b/tests/test_filter.py index 5a560052..b365b7a3 100644 --- a/tests/test_filter.py +++ b/tests/test_filter.py @@ -1,7 +1,10 @@ """Tests for the filter module.""" import numpy as np +import polars as pl import pytest +import pyprobe.filters as filters + def test_step(BreakinCycles_fixture, benchmark): """Test the step method.""" @@ -161,3 +164,120 @@ def test_zeroed_columns(BreakinCycles_fixture): assert cycle_filtered_data.get_only("Cycle Capacity [Ah]")[0] == 0 assert step_filtered_data.get_only("Step Time [s]")[0] == 0 assert step_filtered_data.get_only("Step Capacity [Ah]")[0] == 0 + + +@pytest.fixture +def generic_experiment(): + """Return a generic filter.""" + steps = [ + 0, + 0, + 1, + 1, + 1, + 0, + 0, + 1, + 1, + 1, + 0, + 0, + 1, + 1, + 1, + 0, + 0, + 1, + 1, + 1, + 2, + 2, + 2, + 2, + 3, + 3, + 0, + 0, + 1, + 1, + 1, + 0, + 0, + 1, + 1, + 1, + 0, + 0, + 1, + 1, + 1, + 0, + 0, + 1, + 1, + 1, + 2, + 2, + 2, + 2, + 3, + 3, + ] + dataframe = pl.DataFrame( + { + "Time [s]": list(range(len(steps))), + "Step": steps, + "Event": list(range(len(steps))), + "Current [A]": steps, + "Voltage [V]": steps, + "Capacity [Ah]": steps, + } + ) + info = {} + step_descriptions = { + "Step": [0, 1, 2, 3], + "Description": ["Charge", "Discharge", "Charge", "Discharge"], + } + + cycle_info = [(0, 3, 2), (0, 1, 2)] + return filters.Experiment( + base_dataframe=dataframe, + info=info, + step_descriptions=step_descriptions, + cycle_info=cycle_info, + ) + + +def test_cycle_generic(generic_experiment): + """Test the cycle method.""" + assert generic_experiment.cycle_info == [(0, 3, 2), (0, 1, 2)] + assert filters._cycle(generic_experiment, 0).data[ + "Time [s]" + ].unique().to_list() == list(range(26)) + assert filters._cycle(generic_experiment, 1).data[ + "Time [s]" + ].unique().to_list() == list(range(26, 52)) + assert filters._cycle(generic_experiment, -1).data[ + "Time [s]" + ].unique().to_list() == list(range(26, 52)) + + next_cycle = filters._cycle(generic_experiment, 1) + assert next_cycle.cycle_info == [(0, 1, 2)] + assert filters._cycle(next_cycle, 0).data["Time [s]"].unique().to_list() == list( + range(26, 31) + ) + assert filters._cycle(next_cycle, 3).data["Time [s]"].unique().to_list() == list( + range(41, 46) + ) + assert filters._cycle(next_cycle, -1).data["Time [s]"].unique().to_list() == list( + range(46, 52) + ) + + # test when cycle numbers are inferred + generic_experiment.cycle_info = [] + assert filters._cycle(generic_experiment, 0).data[ + "Time [s]" + ].unique().to_list() == list(range(5)) + assert filters._cycle(generic_experiment, -1).data[ + "Time [s]" + ].unique().to_list() == list(range(41, 52)) diff --git a/tests/test_package.py b/tests/test_package.py new file mode 100644 index 00000000..9b15c5f6 --- /dev/null +++ b/tests/test_package.py @@ -0,0 +1,10 @@ +"""Test package-level functionality.""" + +import toml + +import pyprobe + + +def test_version(): + """Test version.""" + assert pyprobe.__version__ == toml.load("pyproject.toml")["project"]["version"] diff --git a/tests/test_procedure.py b/tests/test_procedure.py index cea3ad87..8d3fc395 100644 --- a/tests/test_procedure.py +++ b/tests/test_procedure.py @@ -5,33 +5,57 @@ import numpy as np import pandas as pd import polars as pl +import pytest +from pyprobe.cell import Cell -def test_experiment(procedure_fixture, cycles_fixture, steps_fixture, benchmark): + +def test_experiment(procedure_fixture, steps_fixture, benchmark): """Test creating an experiment.""" def make_experiment(): return procedure_fixture.experiment("Break-in Cycles") experiment = benchmark(make_experiment) - assert experiment.data["Cycle"].unique().to_list() == cycles_fixture[1] assert experiment.data["Step"].unique().to_list() == steps_fixture[1] + assert experiment.cycle_info == [(4, 7, 5)] experiment = procedure_fixture.experiment("Discharge Pulses") - assert experiment.data["Cycle"].unique().to_list() == cycles_fixture[2] assert experiment.data["Step"].unique().to_list() == steps_fixture[2] + assert experiment.cycle_info == [(9, 12, 10)] """Test filtering by multiple experiment names.""" - experiment = procedure_fixture.experiment("Break-in Cycles", "Discharge Pulses") - assert set(experiment.data["Cycle"].unique().to_list()) == set( - cycles_fixture[1] + cycles_fixture[2] - ) - assert set(experiment.data["Step"].unique().to_list()) == set( - steps_fixture[1] + steps_fixture[2] - ) + with pytest.warns(UserWarning): + experiment = procedure_fixture.experiment("Break-in Cycles", "Discharge Pulses") assert experiment.data["Experiment Time [s]"][0] == 0 assert experiment.data["Experiment Capacity [Ah]"][0] == 0 + assert experiment.cycle_info == [] + + +def test_remove_experiment(procedure_fixture): + """Test removing an experiment.""" + procedure_fixture.remove_experiment("Break-in Cycles") + assert "Break-in Cycles" not in procedure_fixture.experiment_names + assert procedure_fixture.data["Step"].unique().to_list() == [2, 3, 9, 10, 11, 12] + assert procedure_fixture.step_descriptions["Step"] == [1, 2, 3, 9, 10, 11, 12] + + +def test_init(procedure_fixture, step_descriptions_fixture): + """Test initialising a procedure.""" + assert procedure_fixture.step_descriptions == step_descriptions_fixture + + +def test_experiment_no_description(): + """Test creating a procedure with no step descriptions.""" + cell = Cell(info={}) + cell.add_procedure( + "sample", + "tests/sample_data/neware/", + "sample_data_neware.xlsx", + readme_name="README_total_steps.yaml", + ) + assert np.all(np.isnan(cell.procedure["sample"].step_descriptions["Description"])) def test_experiment_names(procedure_fixture, titles_fixture): @@ -39,13 +63,6 @@ def test_experiment_names(procedure_fixture, titles_fixture): assert procedure_fixture.experiment_names == titles_fixture -def test_flatten(procedure_fixture): - """Test flattening lists.""" - lst = [[1, 2, 3], [4, 5], 6] - flat_list = procedure_fixture._flatten(lst) - assert flat_list == [1, 2, 3, 4, 5, 6] - - def test_zero_columns(procedure_fixture): """Test methods to set the first value of columns to zero.""" assert procedure_fixture.data["Procedure Time [s]"][0] == 0 diff --git a/tests/test_rawdata.py b/tests/test_rawdata.py index fb957d43..4bfc28fa 100644 --- a/tests/test_rawdata.py +++ b/tests/test_rawdata.py @@ -4,22 +4,28 @@ import numpy as np import polars as pl +import pybamm import pytest from pyprobe.rawdata import RawData @pytest.fixture -def RawData_fixture(lazyframe_fixture, info_fixture): +def RawData_fixture(lazyframe_fixture, info_fixture, step_descriptions_fixture): """Return a Result instance.""" - return RawData(base_dataframe=lazyframe_fixture, info=info_fixture) + return RawData( + base_dataframe=lazyframe_fixture, + info=info_fixture, + step_descriptions=step_descriptions_fixture, + ) -def test_init(RawData_fixture): +def test_init(RawData_fixture, step_descriptions_fixture): """Test the __init__ method.""" assert isinstance(RawData_fixture, RawData) assert isinstance(RawData_fixture.base_dataframe, pl.LazyFrame) assert isinstance(RawData_fixture.info, dict) + assert RawData_fixture.step_descriptions == step_descriptions_fixture # test with incorrect data data = pl.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) @@ -27,6 +33,34 @@ def test_init(RawData_fixture): RawData(base_dataframe=data, info={"test": 1}) +def test_pybamm_experiment(RawData_fixture): + """Test the pybamm_experiment method.""" + assert isinstance(RawData_fixture.pybamm_experiment, pybamm.Experiment) + assert ( + RawData_fixture.pybamm_experiment.steps[-1].description == "Rest for 1.5 hours" + ) + + RawData_fixture.step_descriptions = { + "Step": [1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12], + "Description": [ + "Rest for 4 hours", + "Charge at 4mA until 4.2 V, Hold at 4.2 V until 0.04 A", + "Rest for 2 hours", + "Discharge at 4 mA until 3 V", + None, + "Charge at 4 mA until 4.2 V, Hold at 4.2 V until 0.04 A", + "Rest for 2 hours", + "Rest for 10 seconds", + None, + "Rest for 30 minutes", + "Rest for 1.5 hours", + ], + } + + with pytest.raises(ValueError): + RawData_fixture.pybamm_experiment + + def test_capacity(BreakinCycles_fixture): """Test the capacity property.""" capacity = BreakinCycles_fixture.cycle(0).charge(0).capacity @@ -99,9 +133,13 @@ def test_zero_column(RawData_fixture): ) -def test_definitions(lazyframe_fixture, info_fixture): +def test_definitions(lazyframe_fixture, info_fixture, step_descriptions_fixture): """Test that the definitions have been correctly set.""" - rawdata = RawData(base_dataframe=lazyframe_fixture, info=info_fixture) + rawdata = RawData( + base_dataframe=lazyframe_fixture, + info=info_fixture, + step_descriptions=step_descriptions_fixture, + ) definition_keys = list(rawdata.column_definitions.keys()) assert set(definition_keys) == set( [ diff --git a/tests/test_readme_processor.py b/tests/test_readme_processor.py index 3d1345e8..fa7b8925 100644 --- a/tests/test_readme_processor.py +++ b/tests/test_readme_processor.py @@ -1,84 +1,14 @@ """Tests for the readme_processor module.""" -import pybamm - -from pyprobe.readme_processor import ReadmeModel, process_readme +import pytest -def test_process_readme(titles_fixture, benchmark): - """Test processing a readme file in yaml format.""" - expected_steps = [ - [1, 2, 3], - [4, 5, 6, 7], - [8, 9, 10, 11], - ] - - def _process_readme(): - return process_readme("tests/sample_data/neware/README_implicit.yaml") - - readme = benchmark(_process_readme) - assert readme.titles == titles_fixture - assert readme.step_numbers == expected_steps - assert readme.pybamm_experiment is None - - # Test with total steps - readme = process_readme("tests/sample_data/neware/README_total_steps.yaml") - assert readme.titles == titles_fixture - assert readme.step_numbers == [ - [1, 2, 3], - [4, 5, 6, 7], - [8, 9, 10, 11], - ] - assert readme.pybamm_experiment is None - - # Test with defined step numbers - readme = process_readme("tests/sample_data/neware/README.yaml") - assert readme.titles == titles_fixture - assert readme.step_numbers == [ - [1, 2, 3], - [4, 5, 6, 7], - [9, 10, 11, 12], - ] - assert all( - isinstance(item, pybamm.Experiment) for item in readme.pybamm_experiment_list - ) - assert isinstance(readme.pybamm_experiment, pybamm.Experiment) - - -def test_expand_cycles(): - """Test the _expand_cycles method.""" - indices = [0, 1, 2, 3, 4, 5, 6, 7, 8] - cycles = [(0, 3, 2), (1, 2, 3), (7, 8, 2)] - expected_result = [ - 0, - 1, - 2, - 1, - 2, - 1, - 2, - 3, - 0, - 1, - 2, - 1, - 2, - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, - 7, - 8, - ] - assert ReadmeModel._expand_cycles(indices, cycles) == expected_result +from pyprobe.readme_processor import ReadmeModel, process_readme -def test_readme_model(): - """Test the ReadmeModel class.""" - exp_dict = { +@pytest.fixture +def readme_dict_fixture(): + """Return a readme dictionary for testing.""" + return { "Experiment 1": { "Steps": { 1: "Rest for 1 hour", @@ -105,56 +35,120 @@ def test_readme_model(): "Total Steps": 8, }, } - model = ReadmeModel(readme_dict=exp_dict) - assert model.readme_type == ["explicit", "implicit", "total"] - assert model.step_numbers == [ - [1, 2, 3, 4, 5], - [6, 7, 8, 9], - [10, 11, 12, 13, 14, 15, 16, 17], + +def test_readme(readme_dict_fixture): + """Test the process_readme function.""" + readme = ReadmeModel(readme_dict=readme_dict_fixture) + assert list(readme.experiment_dict.keys()) == [ + "Experiment 1", + "Experiment 2", + "Experiment 3", + ] + assert readme.experiment_dict["Experiment 1"]["Steps"] == [1, 2, 3, 4, 5] + assert readme.experiment_dict["Experiment 1"]["Step Descriptions"] == [ + "Rest for 1 hour", + "Rest for 2 hours", + "Rest for 3 hours", + "Rest for 4 hours", + "Rest for 5 hours, Rest for 6 hours", ] - assert model.cycle_details == [[(0, 3, 2), (1, 2, 3)], [], []] - assert model.step_indices == [ - [0, 1, 2, 3, 4], - [0, 1, 2, 3], - [0, 1, 2, 3, 4, 5, 6, 7], + assert readme.experiment_dict["Experiment 1"]["Cycles"] == [ + (1, 4, 2), + (2, 3, 3), ] - assert model.step_descriptions == [ - [ - "Rest for 1 hour", - "Rest for 2 hours", - "Rest for 3 hours", - "Rest for 4 hours", - "Rest for 5 hours, Rest for 6 hours", - ], - ["Step 1", "Step 2", "Step 3", "Step 4"], - [], + + assert readme.experiment_dict["Experiment 2"]["Steps"] == [6, 7, 8, 9] + assert readme.experiment_dict["Experiment 2"]["Step Descriptions"] == [ + "Step 1", + "Step 2", + "Step 3", + "Step 4", ] - assert model.pybamm_experiment_descriptions == [ - ( - "Rest for 1 hour", - "Rest for 2 hours", - "Rest for 3 hours", - "Rest for 2 hours", - "Rest for 3 hours", - "Rest for 2 hours", - "Rest for 3 hours", - "Rest for 4 hours", - "Rest for 1 hour", - "Rest for 2 hours", - "Rest for 3 hours", - "Rest for 2 hours", - "Rest for 3 hours", - "Rest for 2 hours", - "Rest for 3 hours", - "Rest for 4 hours", - "Rest for 5 hours", - "Rest for 6 hours", - ), - ("Step 1", "Step 2", "Step 3", "Step 4"), - (), + assert readme.experiment_dict["Experiment 2"]["Cycles"] == [] + + assert readme.experiment_dict["Experiment 3"]["Steps"] == [ + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, ] - assert isinstance(model.pybamm_experiment_list[0], pybamm.Experiment) - assert model.pybamm_experiment_list[1] is None - assert model.pybamm_experiment_list[2] is None - assert model.pybamm_experiment is None + assert readme.experiment_dict["Experiment 3"]["Step Descriptions"] == [] + assert readme.experiment_dict["Experiment 3"]["Cycles"] == [] + + +def test_process_readme_file_explicit(titles_fixture, benchmark): + """Test processing a readme file in yaml format.""" + + def _process_readme(): + return process_readme("tests/sample_data/neware/README.yaml") + + readme = benchmark(_process_readme) + + assert list(readme.experiment_dict.keys()) == titles_fixture + assert readme.experiment_dict["Break-in Cycles"]["Steps"] == [4, 5, 6, 7] + assert readme.experiment_dict["Break-in Cycles"]["Step Descriptions"] == [ + "Discharge at 4 mA until 3 V", + "Rest for 2 hours", + "Charge at 4 mA until 4.2 V, Hold at 4.2 V until 0.04 A", + "Rest for 2 hours", + ] + + assert readme.experiment_dict["Discharge Pulses"]["Steps"] == [9, 10, 11, 12] + assert readme.experiment_dict["Discharge Pulses"]["Step Descriptions"] == [ + "Rest for 10 seconds", + "Discharge at 20 mA for 0.2 hours or until 3 V", + "Rest for 30 minutes", + "Rest for 1.5 hours", + ] + assert readme.experiment_dict["Discharge Pulses"]["Cycles"] == [(9, 12, 10)] + + +def test_process_readme_file_implicit(titles_fixture, benchmark): + """Test processing a readme file in yaml format.""" + + def _process_readme(): + return process_readme("tests/sample_data/neware/README_implicit.yaml") + + readme = benchmark(_process_readme) + + assert list(readme.experiment_dict.keys()) == titles_fixture + assert readme.experiment_dict["Break-in Cycles"]["Steps"] == [4, 5, 6, 7] + assert readme.experiment_dict["Break-in Cycles"]["Step Descriptions"] == [ + "Discharge at 4 mA until 3 V", + "Rest for 2 hours", + "Charge at 4 mA until 4.2 V, Hold at 4.2 V until 0.04 A", + "Rest for 2 hours", + ] + assert readme.experiment_dict["Break-in Cycles"]["Cycles"] == [] + + assert readme.experiment_dict["Discharge Pulses"]["Steps"] == [8, 9, 10, 11] + assert readme.experiment_dict["Discharge Pulses"]["Step Descriptions"] == [ + "Rest for 10 seconds", + "Discharge at 20 mA for 0.2 hours or until 3 V", + "Rest for 30 minutes", + "Rest for 1.5 hours", + ] + assert readme.experiment_dict["Discharge Pulses"]["Cycles"] == [] + + +def process_readme_file_total_steps(titles_fixture, benchmark): + """Test processing a readme file in yaml format.""" + + def _process_readme(): + return process_readme("tests/sample_data/neware/README_total_steps.yaml") + + readme = benchmark(_process_readme) + + assert list(readme.experiment_dict.keys()) == titles_fixture + assert readme.experiment_dict["Break-in Cycles"]["Steps"] == [4, 5, 6, 7] + assert readme.experiment_dict["Break-in Cycles"]["Step Descriptions"] == [] + assert readme.experiment_dict["Break-in Cycles"]["Cycles"] == [] + + assert readme.experiment_dict["Discharge Pulses"]["Steps"] == [8, 9, 10, 11] + assert readme.experiment_dict["Discharge Pulses"]["Step Descriptions"] == [] + assert readme.experiment_dict["Discharge Pulses"]["Cycles"] == [] diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..ee70017c --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,9 @@ +"""Tests for the utils module.""" +from pyprobe import utils + + +def test_flatten(): + """Test flattening lists.""" + lst = [[1, 2, 3], [4, 5], 6] + flat_list = utils.flatten_list(lst) + assert flat_list == [1, 2, 3, 4, 5, 6]