diff --git a/package/samplers/mocma/LICENSE b/package/samplers/mocma/LICENSE new file mode 100644 index 00000000..5b73f6ac --- /dev/null +++ b/package/samplers/mocma/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Yoshihiko Ozaki + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/package/samplers/mocma/README.md b/package/samplers/mocma/README.md new file mode 100644 index 00000000..cba61675 --- /dev/null +++ b/package/samplers/mocma/README.md @@ -0,0 +1,101 @@ +--- +author: Yoshihiko Ozaki +title: Multi-objective CMA-ES (MO-CMA-ES) Sampler +description: A sampler based on a strong variant of CMA-ES for multi-objective optimization (s-MO-CMA). +tags: [sampler, Multi-Objective Optimization, Evolutionary Algorithm (EA), CMA-ES] +optuna_versions: [4.0.0] +license: MIT License +--- + +## Abstract + +MoCmaSampler provides the implementation of the s-MO-CMA-ES algorithm. This algorithm extends (1+1)-CMA-ES to multi-objective optimization by introducing a selection strategy based on non-domination sorting and contributing hypervolume (S-metric). It inherits important properties of CMA-ES, invariance against order-preserving transformations of the fitness function value and rotation and translation of the search space. + +## Class or Function Names + +- `MoCmaSampler(*, search_space: dict[str, BaseDistribution] | None = None, popsize: int | None = None, seed: int | None = None)` + - `search_space`: A dictionary containing the search space that defines the parameter space. The keys are the parameter names and the values are [the parameter's distribution](https://optuna.readthedocs.io/en/stable/reference/distributions.html). If the search space is not provided, the sampler will infer the search space dynamically. + Example: + ```python + search_space = { + "x": optuna.distributions.FloatDistribution(-5, 5), + "y": optuna.distributions.FloatDistribution(-5, 5), + } + MoCmaSampler(search_space=search_space) + ``` + - `popsize`: Population size of the CMA-ES algorithm. If not provided, the population size will be set based on the search space dimensionality. If you have a sufficient evaluation budget, it is recommended to increase the popsize. + - `seed`: Seed for random number generator. + +Note that because of the limitation of the algorithm, only non-conditional numerical parameters are sampled by the MO-CMA-ES algorithm, and categorical parameters and conditional parameters are handled by random sampling. + +## Example + +```python +import optuna +import optunahub + + +def objective(trial: optuna.Trial) -> tuple[float, float]: + x = trial.suggest_float("x", 0, 5) + y = trial.suggest_float("y", 0, 3) + v0 = 4 * x**2 + 4 * y**2 + v1 = (x - 5) ** 2 + (y - 5) ** 2 + return v0, v1 + +samplers = [ + optunahub.load_local_module("samplers/mocma", registry_root="package").MoCmaSampler(popsize=100, seed=42), + optuna.samplers.NSGAIISampler(population_size=100, seed=42), +] +studies = [] +for sampler in samplers: + study = optuna.create_study( + directions=["minimize", "minimize"], + sampler=sampler, + study_name=f"{sampler.__class__.__name__}", + ) + study.optimize(objective, n_trials=1000) + studies.append(study) + +optunahub.load_module("visualization/plot_pareto_front_multi").plot_pareto_front( + studies +).show() +optunahub.load_module("visualization/plot_hypervolume_history_multi").plot_hypervolume_history( + studies, reference_point=[200.0, 100.0] +).show() +``` + +![Pareto front](images/pareto_front.png) +![Hypervolume](images/hypervolume.png) + +## Others + +### Test + +To execute the tests for MoCmaSamler, please run the following commands. The test file is provided in the package. + +```sh +pip install pytest +``` + +```python +pytest -s tests/test_sampler.py +``` + +### Reference + +Christian Igel, Nikolaus Hansen, Stefan Roth. Covariance Matrix Adaptation for Multi-objective Optimization, Evolutionary Computatio. (2007) 15 (1): 1–28. https://doi.org/10.1162/evco.2007.15.1.1. + +### BibTeX + +```bibtex +@article{igel2007covariance, + title={Covariance matrix adaptation for multi-objective optimization}, + author={Igel, Christian and Hansen, Nikolaus and Roth, Stefan}, + journal={Evolutionary computation}, + volume={15}, + number={1}, + pages={1--28}, + year={2007}, + publisher={MIT Press One Rogers Street, Cambridge, MA 02142-1209, USA journals-info~…} +} +``` diff --git a/package/samplers/mocma/__init__.py b/package/samplers/mocma/__init__.py new file mode 100644 index 00000000..9ab35f51 --- /dev/null +++ b/package/samplers/mocma/__init__.py @@ -0,0 +1,4 @@ +from .mocma import MoCmaSampler + + +__all__ = ["MoCmaSampler"] diff --git a/package/samplers/mocma/example.py b/package/samplers/mocma/example.py new file mode 100644 index 00000000..f3adac6f --- /dev/null +++ b/package/samplers/mocma/example.py @@ -0,0 +1,36 @@ +import optuna +import optunahub + + +if __name__ == "__main__": + + def objective(trial: optuna.Trial) -> tuple[float, float]: + x = trial.suggest_float("x", 0, 5) + y = trial.suggest_float("y", 0, 3) + v0 = 4 * x**2 + 4 * y**2 + v1 = (x - 5) ** 2 + (y - 5) ** 2 + return v0, v1 + + samplers = [ + optunahub.load_local_module("samplers/mocma", registry_root="package").MoCmaSampler( + popsize=100, + seed=42, + ), + optuna.samplers.NSGAIISampler(population_size=100, seed=42), + ] + studies = [] + for sampler in samplers: + study = optuna.create_study( + directions=["minimize", "minimize"], + sampler=sampler, + study_name=f"{sampler.__class__.__name__}", + ) + study.optimize(objective, n_trials=1000) + studies.append(study) + + optunahub.load_module("visualization/plot_pareto_front_multi").plot_pareto_front( + studies + ).show() + optunahub.load_module("visualization/plot_hypervolume_history_multi").plot_hypervolume_history( + studies, reference_point=[200.0, 100.0] + ).show() diff --git a/package/samplers/mocma/images/hypervolume.png b/package/samplers/mocma/images/hypervolume.png new file mode 100644 index 00000000..2a9659de Binary files /dev/null and b/package/samplers/mocma/images/hypervolume.png differ diff --git a/package/samplers/mocma/images/pareto_front.png b/package/samplers/mocma/images/pareto_front.png new file mode 100644 index 00000000..2aaf436a Binary files /dev/null and b/package/samplers/mocma/images/pareto_front.png differ diff --git a/package/samplers/mocma/mocma.py b/package/samplers/mocma/mocma.py new file mode 100644 index 00000000..aa9d34c5 --- /dev/null +++ b/package/samplers/mocma/mocma.py @@ -0,0 +1,397 @@ +from __future__ import annotations + +from collections.abc import Sequence +import math +from typing import Any + +import numpy as np +import optuna +from optuna._hypervolume import compute_hypervolume +from optuna._transform import _SearchSpaceTransform +from optuna.distributions import BaseDistribution +from optuna.distributions import FloatDistribution +from optuna.distributions import IntDistribution +from optuna.samplers import BaseSampler +from optuna.samplers._lazy_random_state import LazyRandomState +from optuna.search_space import IntersectionSearchSpace +from optuna.study._multi_objective import _dominates +from optuna.study._multi_objective import _fast_non_domination_rank +from optuna.study._study_direction import StudyDirection +from optuna.trial import TrialState + + +_EPS = 1e-8 + + +class MoCmaSampler(BaseSampler): + """A sampler based on the Multi-Objective Covariance Matrix Adaptation Evolution Strategy (MO-CMA-ES). + + This implementation provides a strong variant of the MO-CMA-ES algorithm called s-MO-CMA, + which employs a selection strategy based on the contributing hypervolume (aka S-metric) of each individual. + For detailed information about MO-CMA-ES algorithm, please refer to the following papers: + + - `Christian Igel, Nikolaus Hansen, Stefan Roth. Covariance Matrix Adaptation for Multi-objective Optimization. + Evolutionary Computation (2007) 15 (1): 1-28. `__ + + Args: + search_space: + A dictionary containing the search space that defines the parameter space. + The keys are the parameter names and the values are the parameter's distribution. + If the search space is not provided, the sampler will infer the search space dynamically. + popsize: + Population size of the CMA-ES algorithm. + If not provided, the population size will be set based on the search space dimensionality. + seed: + Seed for random number generator. + """ + + def __init__( + self, + *, + search_space: dict[str, BaseDistribution] | None = None, + popsize: int | None = None, + seed: int | None = None, + ) -> None: + self._rng = LazyRandomState(seed) + self._independent_sampler = optuna.samplers.RandomSampler(seed=seed) + self._seed = seed + self._popsize = popsize + self._search_space = search_space + self._intersection_search_space = IntersectionSearchSpace() + + def reseed_rng(self) -> None: + self._rng.rng.seed() + self._independent_sampler.reseed_rng() + + def infer_relative_search_space( + self, study: optuna.study, trial: optuna.trial.FrozenTrial + ) -> dict[str, BaseDistribution]: + # If search space information is available (define-and-run) + if self._search_space is not None: + return self._search_space + + # Calculate search space dynamically + search_space: dict[str, BaseDistribution] = {} + for name, distribution in self._intersection_search_space.calculate(study).items(): + if not isinstance(distribution, (FloatDistribution, IntDistribution)): + # Categorical parameters are handled by the _independend_sampler. + continue + search_space[name] = distribution + + return search_space + + def sample_relative( + self, + study: optuna.study, + trial: optuna.trial.FrozenTrial, + search_space: dict[str, BaseDistribution], + ) -> dict[str, Any]: + # If search space information is not avalable (i.e., first trial & define-by-run) + if len(search_space) == 0: + study._storage.set_trial_system_attr(trial._trial_id, "mocma:g", 0) + if self._popsize is None: + # If population size information is not available, we set instance number k to 0 + # because we cannot know how many instances exist per generation. + # This may cause inefficiency in parallelization. + study._storage.set_trial_system_attr(trial._trial_id, "mocma:k", 0) + else: + study._storage.set_trial_system_attr( + trial._trial_id, + "mocma:k", + int(self._rng.rng.choice(len(range(self._popsize)))), + ) + return {} + + trans = _SearchSpaceTransform(search_space, transform_0_1=True) + n = len(trans.bounds) # Search space dimensionality + # Set population size based on the search space demensionality if not given. + if self._popsize is None: + self._popsize = 4 + math.floor(3 * math.log(n)) + + # Compute generation g and instance k. + complete_trials = [ + t for t in study.get_trials(deepcopy=False, states=[TrialState.COMPLETE]) + ] + + # Classify trials by generation and instance number. + classified_trials: dict[int, dict] = {0: {}} + g = 0 # current generation + for t in complete_trials: + g_ = t.system_attrs["mocma:g"] + k_ = t.system_attrs["mocma:k"] + g = max(g, g_) + if g_ not in classified_trials: + classified_trials[g_] = {} + if k_ not in classified_trials[g_]: + classified_trials[g_][k_] = [] + classified_trials[g_][k_].append(t) + + generation_finished = True + ks = [] + for k in range(self._popsize): # k indicates the (1+1)-CMA-ES instance number. + if k not in classified_trials[g]: + generation_finished = False + ks.append(k) + if generation_finished: + g += 1 # Move to the next generation + ks = list(range(self._popsize)) + + # Randomly select an instance number for the current generation. + # This will enhance the performance when n_jobs > 1. + k = self._rng.rng.choice(ks) + + study._storage.set_trial_system_attr(trial._trial_id, "mocma:g", g) + study._storage.set_trial_system_attr(trial._trial_id, "mocma:k", int(k)) + + # CMA-ES parameters + sigma = 1 / 6 + d = 1 + math.floor(n / 2) # damping parameter + p_targetsucc = 1 / (5 + 1 / 2) + c_p = p_targetsucc / (2 + p_targetsucc) + c_c = 2 / (n + 2) + c_cov = 2 / (n**2 + 6) + p_thresh = 0.44 + + if g == 0: + # Generate initial parents randomly. + + return {} # Fall back to random sampling. + elif g == 1 and generation_finished: + # Set parameters for the first generation (g = 0). + elites = [ + list(sorted(instance, key=lambda x: x.datetime_complete))[0] + for instance in classified_trials[g - 1].values() + ] + for a in elites: + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{a._trial_id}:sigma", sigma + ) + study._storage.set_study_system_attr( + study._study_id, + f"mocma:trial:{a._trial_id}:cov", + np.eye(len(a.params)).tolist(), + ) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{a._trial_id}:p_succ", p_targetsucc + ) + study._storage.set_study_system_attr( + study._study_id, + f"mocma:trial:{a._trial_id}:p_c", + np.zeros(len(a.params)).tolist(), + ) + elite_ids = [e._trial_id for e in elites] + study._storage.set_study_system_attr( + study._study_id, f"mocma:generation:{g-1}:elite_ids", elite_ids + ) + elif g >= 2 and generation_finished: + # This section conducts the parameter updates for g-1 with individuals for g-1 and g-2 + # before generating individuals for generation g. + study_system_attrs = study._storage.get_study_system_attrs(study._study_id) + + parents = [ + [t for t in complete_trials if t._trial_id == eid][0] + for eid in study_system_attrs[f"mocma:generation:{g-2}:elite_ids"] + ] + # Handling conditional parameters for parents + # (Discard cma parmeter values for paramaters not in the intersection search space) + for a in parents: + indices = [i for i, n in enumerate(a.params) if n in search_space] + a.params = {n: a.params[n] for n in search_space} + p_c_a = np.asarray(study_system_attrs[f"mocma:trial:{a._trial_id}:p_c"]) + p_c_a = p_c_a[indices] + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{a._trial_id}:p_c", p_c_a.tolist() + ) + + offsprings = [ + list(sorted(instance, key=lambda x: x.datetime_complete))[0] + for instance in classified_trials[g - 1].values() + ] + # Handling conditional parameters for offsprings + for a_ in offsprings: + a_.params = {n: a_.params[n] for n in search_space} + + # Reload study_system_attrs + study_system_attrs = study._storage.get_study_system_attrs(study._study_id) + + for a_ in offsprings: + # Find parent a for a_ + a = [p for p in parents if p._trial_id == a_.system_attrs["mocma:parent_id"]][0] + lambda_succ = int(_dominates(a_, a, study.directions)) + + # Update parent step size + p_succ_a = study_system_attrs[f"mocma:trial:{a._trial_id}:p_succ"] + p_succ_a = (1 - c_p) * p_succ_a + c_p * lambda_succ + sigma_a = study_system_attrs[f"mocma:trial:{a._trial_id}:sigma"] + sigma_a = sigma_a * math.exp( + (1 / d) * ((p_succ_a - p_targetsucc) / (1 - p_targetsucc)) + ) + sigma_a = max(sigma_a, _EPS) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{a._trial_id}:p_succ", p_succ_a + ) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{a._trial_id}:sigma", sigma_a + ) + + # Update offspring step size and covariance matrix + p_succ_a_ = np.asarray(study_system_attrs[f"mocma:trial:{a_._trial_id}:p_succ"]) + p_succ_a_ = (1 - c_p) * p_succ_a_ + c_p * lambda_succ + sigma_a_ = study_system_attrs[f"mocma:trial:{a_._trial_id}:sigma"] + sigma_a_ = sigma_a_ * math.exp( + (1 / d) * ((p_succ_a_ - p_targetsucc) / (1 - p_targetsucc)) + ) + sigma_a_ = max(sigma_a_, _EPS) + cov_a_ = np.asarray(study_system_attrs[f"mocma:trial:{a_._trial_id}:cov"]) + p_c = np.asarray(study_system_attrs[f"mocma:trial:{a._trial_id}:p_c"]) + if p_succ_a_ < p_thresh: + values_a_ = np.asarray(list(a_.params.values())) + values_a = np.asarray(list(a.params.values())) + x_step = (values_a_ - values_a) / sigma_a + p_c = (1 - c_c) * p_c + math.sqrt(c_c * (2 - c_c)) * x_step + cov_a_ = (1 - c_cov) * cov_a_ + c_cov * p_c @ p_c.T + else: + p_c = (1 - c_c) * p_c + cov_a_ = (1 - c_cov) * cov_a_ + c_cov * ( + p_c @ p_c.T + c_c * (2 - c_c) * cov_a_ + ) + + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{a_._trial_id}:sigma", float(sigma_a_) + ) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{a_._trial_id}:cov", cov_a_.tolist() + ) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{a_._trial_id}:p_succ", float(p_succ_a_) + ) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{a_._trial_id}:p_c", p_c.tolist() + ) + + # Selecting elites + population = np.asarray(parents + offsprings) + objective_values = np.asarray([i.values for i in population]) + non_domination_ranks = _fast_non_domination_rank( + objective_values, n_below=self._popsize + ) + elites = [] + for i in range(len(population)): + # Selection based on non-dmination ranks + front_i = population[non_domination_ranks == i].tolist() + if len(elites) + len(front_i) <= self._popsize: + elites += front_i + continue + + # Optuna's hypervolume module assumes minimization + rank_i_vals = np.asarray( + [ + [ + v if d == StudyDirection.MINIMIZE else -v + for v, d in zip(vs, study.directions) + ] + for vs in objective_values[non_domination_ranks == i] + ] + ) + worst_point = np.max(rank_i_vals, axis=0) + reference_point = np.maximum(1.1 * worst_point, 0.9 * worst_point) + reference_point[reference_point == 0] = _EPS + + # Selection based on hypervolume contributions + while len(elites) < self._popsize: + hypervolume = compute_hypervolume(rank_i_vals, reference_point) + contribution_scores = [ + hypervolume + - compute_hypervolume( + np.concatenate([rank_i_vals[:j], rank_i_vals[j + 1 :]], axis=0), + reference_point, + assume_pareto=True, + ) + for j in range(len(rank_i_vals)) + ] # Smaller is better + + candidate = np.argmin(contribution_scores) + elites.append(front_i[candidate]) + + # Remove selected candidate + rank_i_vals = np.delete(rank_i_vals, candidate, axis=0) + del front_i[candidate] + elite_ids = [e._trial_id for e in elites] + study._storage.set_study_system_attr( + study._study_id, f"mocma:generation:{g-1}:elite_ids", elite_ids + ) + + # Load/reload study_system_attrs after updates + study_system_attrs = study._storage.get_study_system_attrs(study._study_id) + + # Generate individual for generation g and instance k + a = [ + t + for t in complete_trials + if t._trial_id == study_system_attrs[f"mocma:generation:{g-1}:elite_ids"][k] + ][0] + mean = trans.transform(a.params) + sigma = study_system_attrs[f"mocma:trial:{a._trial_id}:sigma"] + cov = np.asarray(study_system_attrs[f"mocma:trial:{a._trial_id}:cov"]) + p_c = np.asarray(study_system_attrs[f"mocma:trial:{a._trial_id}:p_c"]) + p_succ = study_system_attrs[f"mocma:trial:{a._trial_id}:p_succ"] + + # Handling conditional parameters + # (Discard cma parmeter values for paramaters not in the intersection search space) + indices = np.asarray([i for i, n in enumerate(a.params) if n in search_space]) + cov = cov[np.ix_(indices, indices)] + p_c = p_c[indices] + + study._storage.set_trial_system_attr(trial._trial_id, "mocma:parent_id", a._trial_id) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{trial._trial_id}:sigma", float(sigma) + ) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{trial._trial_id}:cov", cov.tolist() + ) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{trial._trial_id}:p_succ", float(p_succ) + ) + study._storage.set_study_system_attr( + study._study_id, f"mocma:trial:{trial._trial_id}:p_c", p_c.tolist() + ) + + x = np.clip( + self._rng.rng.multivariate_normal(mean, sigma**2 * cov), + a_min=trans.bounds[:, 0], + a_max=trans.bounds[:, 1], + ) + external_values = trans.untransform(x) + for pn, pv in search_space.items(): + external_values[pn] = np.clip( + external_values[pn], search_space[pn].low, search_space[pn].high + ) + if isinstance(pv, IntDistribution): + external_values[pn] = int(external_values[pn]) + elif isinstance(pv, FloatDistribution): + external_values[pn] = float(external_values[pn]) + + return external_values + + def sample_independent( + self, + study: optuna.study, + trial: optuna.trial.FrozenTrial, + param_name: str, + param_distribution: BaseDistribution, + ) -> Any: + return self._independent_sampler.sample_independent( + study, trial, param_name, param_distribution + ) + + def before_trial(self, study: optuna.Study, trial: optuna.trial.FrozenTrial) -> None: + self._independent_sampler.before_trial(study, trial) + + def after_trial( + self, + study: optuna.study, + trial: optuna.trial.FrozenTrial, + state: TrialState, + values: Sequence[float] | None, + ) -> None: + self._independent_sampler.after_trial(study, trial, state, values) diff --git a/package/samplers/mocma/tests/test_sampler.py b/package/samplers/mocma/tests/test_sampler.py new file mode 100644 index 00000000..e6af8c34 --- /dev/null +++ b/package/samplers/mocma/tests/test_sampler.py @@ -0,0 +1,664 @@ +# MIT License + +# Copyright (c) 2018 Preferred Networks, Inc. + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# This file is taken from Optuna (https://github.com/optuna/optuna/blob/master/tests/samplers_tests/test_samplers.py) +# and modified to test MoCmaSampler. + +from __future__ import annotations + +from collections.abc import Callable +from collections.abc import Sequence +import multiprocessing +from multiprocessing.managers import DictProxy +import os +from typing import Any +from unittest.mock import patch +import warnings + +from _pytest.fixtures import SubRequest +from _pytest.mark.structures import MarkDecorator +import numpy as np +import optuna +from optuna.distributions import BaseDistribution +from optuna.distributions import CategoricalChoiceType +from optuna.distributions import CategoricalDistribution +from optuna.distributions import FloatDistribution +from optuna.distributions import IntDistribution +from optuna.samplers import BaseSampler +from optuna.samplers._lazy_random_state import LazyRandomState +from optuna.study import Study +from optuna.trial import FrozenTrial +from optuna.trial import Trial +from optuna.trial import TrialState +import optunahub +import pytest + + +# Load local MoCmaSampler +MoCmaSampler = optunahub.load_local_module("samplers/mocma", registry_root="../../").MoCmaSampler + +popsize = 5 + +parametrize_sampler = pytest.mark.parametrize( + "sampler_class", + [lambda: MoCmaSampler(popsize=popsize)], +) +parametrize_relative_sampler = pytest.mark.parametrize( + "relative_sampler_class", + [lambda: MoCmaSampler(popsize=popsize)], +) +parametrize_multi_objective_sampler = pytest.mark.parametrize( + "multi_objective_sampler_class", + [lambda: MoCmaSampler(popsize=popsize)], +) + + +sampler_class_with_seed: dict[str, Callable[[int], BaseSampler]] = { + "MoCmaSampler": lambda seed: MoCmaSampler(seed=seed), +} +param_sampler_with_seed = [] +param_sampler_name_with_seed = [] +for sampler_name, sampler_class in sampler_class_with_seed.items(): + param_sampler_with_seed.append(pytest.param(sampler_class, id=sampler_name)) + param_sampler_name_with_seed.append(pytest.param(sampler_name)) +parametrize_sampler_with_seed = pytest.mark.parametrize("sampler_class", param_sampler_with_seed) +parametrize_sampler_name_with_seed = pytest.mark.parametrize( + "sampler_name", param_sampler_name_with_seed +) + + +@pytest.mark.parametrize( + "sampler_class,expected_has_rng,expected_has_another_sampler", + [ + (lambda: MoCmaSampler(), True, True), + ], +) +def test_sampler_reseed_rng( + sampler_class: Callable[[], BaseSampler], + expected_has_rng: bool, + expected_has_another_sampler: bool, +) -> None: + def _extract_attr_name_from_sampler_by_cls(sampler: BaseSampler, cls: Any) -> str | None: + for name, attr in sampler.__dict__.items(): + if isinstance(attr, cls): + return name + return None + + sampler = sampler_class() + + rng_name = _extract_attr_name_from_sampler_by_cls(sampler, LazyRandomState) + has_rng = rng_name is not None + assert expected_has_rng == has_rng + if has_rng: + rng_name = str(rng_name) + original_random_state = sampler.__dict__[rng_name].rng.get_state() + sampler.reseed_rng() + random_state = sampler.__dict__[rng_name].rng.get_state() + assert str(original_random_state) != str(random_state) + + had_sampler_name = _extract_attr_name_from_sampler_by_cls(sampler, BaseSampler) + has_another_sampler = had_sampler_name is not None + assert expected_has_another_sampler == has_another_sampler + + if has_another_sampler: + had_sampler_name = str(had_sampler_name) + had_sampler = sampler.__dict__[had_sampler_name] + had_sampler_rng_name = _extract_attr_name_from_sampler_by_cls(had_sampler, LazyRandomState) + original_had_sampler_random_state = had_sampler.__dict__[ + had_sampler_rng_name + ].rng.get_state() + with patch.object( + had_sampler, + "reseed_rng", + wraps=had_sampler.reseed_rng, + ) as mock_object: + sampler.reseed_rng() + assert mock_object.call_count == 1 + + had_sampler = sampler.__dict__[had_sampler_name] + had_sampler_random_state = had_sampler.__dict__[had_sampler_rng_name].rng.get_state() + assert str(original_had_sampler_random_state) != str(had_sampler_random_state) + + +def parametrize_suggest_method(name: str) -> MarkDecorator: + return pytest.mark.parametrize( + f"suggest_method_{name}", + [ + lambda t: t.suggest_float(name, 0, 10), + lambda t: t.suggest_int(name, 0, 10), + lambda t: t.suggest_categorical(name, [0, 1, 2]), + lambda t: t.suggest_float(name, 0, 10, step=0.5), + lambda t: t.suggest_float(name, 1e-7, 10, log=True), + lambda t: t.suggest_int(name, 1, 10, log=True), + ], + ) + + +@parametrize_sampler +@pytest.mark.parametrize( + "distribution", + [ + FloatDistribution(-1.0, 1.0), + FloatDistribution(0.0, 1.0), + FloatDistribution(-1.0, 0.0), + FloatDistribution(1e-7, 1.0, log=True), + FloatDistribution(-10, 10, step=0.1), + FloatDistribution(-10.2, 10.2, step=0.1), + ], +) +def test_float( + sampler_class: Callable[[], BaseSampler], + distribution: FloatDistribution, +) -> None: + study = optuna.study.create_study(sampler=sampler_class()) + points = np.array( + [ + study.sampler.sample_independent(study, _create_new_trial(study), "x", distribution) + for _ in range(100) + ] + ) + assert np.all(points >= distribution.low) + assert np.all(points <= distribution.high) + assert not isinstance( + study.sampler.sample_independent(study, _create_new_trial(study), "x", distribution), + np.floating, + ) + + if distribution.step is not None: + # Check all points are multiples of distribution.step. + points -= distribution.low + points /= distribution.step + round_points = np.round(points) + np.testing.assert_almost_equal(round_points, points) + + +@parametrize_sampler +@pytest.mark.parametrize( + "distribution", + [ + IntDistribution(-10, 10), + IntDistribution(0, 10), + IntDistribution(-10, 0), + IntDistribution(-10, 10, step=2), + IntDistribution(0, 10, step=2), + IntDistribution(-10, 0, step=2), + IntDistribution(1, 100, log=True), + ], +) +def test_int(sampler_class: Callable[[], BaseSampler], distribution: IntDistribution) -> None: + study = optuna.study.create_study(sampler=sampler_class()) + points = np.array( + [ + study.sampler.sample_independent(study, _create_new_trial(study), "x", distribution) + for _ in range(100) + ] + ) + assert np.all(points >= distribution.low) + assert np.all(points <= distribution.high) + assert not isinstance( + study.sampler.sample_independent(study, _create_new_trial(study), "x", distribution), + np.integer, + ) + + +@parametrize_sampler +@pytest.mark.parametrize("choices", [(1, 2, 3), ("a", "b", "c"), (1, "a")]) +def test_categorical( + sampler_class: Callable[[], BaseSampler], choices: Sequence[CategoricalChoiceType] +) -> None: + distribution = CategoricalDistribution(choices) + + study = optuna.study.create_study(sampler=sampler_class()) + + def sample() -> float: + trial = _create_new_trial(study) + param_value = study.sampler.sample_independent(study, trial, "x", distribution) + return float(distribution.to_internal_repr(param_value)) + + points = np.asarray([sample() for i in range(100)]) + + # 'x' value is corresponding to an index of distribution.choices. + assert np.all(points >= 0) + assert np.all(points <= len(distribution.choices) - 1) + round_points = np.round(points) + np.testing.assert_almost_equal(round_points, points) + + +@parametrize_relative_sampler +@pytest.mark.parametrize( + "x_distribution", + [ + FloatDistribution(-1.0, 1.0), + FloatDistribution(1e-7, 1.0, log=True), + FloatDistribution(-10, 10, step=0.5), + IntDistribution(3, 10), + IntDistribution(1, 100, log=True), + IntDistribution(3, 9, step=2), + ], +) +@pytest.mark.parametrize( + "y_distribution", + [ + FloatDistribution(-1.0, 1.0), + FloatDistribution(1e-7, 1.0, log=True), + FloatDistribution(-10, 10, step=0.5), + IntDistribution(3, 10), + IntDistribution(1, 100, log=True), + IntDistribution(3, 9, step=2), + ], +) +def test_sample_relative_numerical( + relative_sampler_class: Callable[[], BaseSampler], + x_distribution: BaseDistribution, + y_distribution: BaseDistribution, +) -> None: + search_space: dict[str, BaseDistribution] = dict(x=x_distribution, y=y_distribution) + study = optuna.study.create_study(sampler=relative_sampler_class()) + for i in range(popsize): + trial = study.ask(search_space) + study.tell(trial, sum(trial.params.values())) + + def sample() -> list[int | float]: + params = study.sampler.sample_relative(study, _create_new_trial(study), search_space) + return [params[name] for name in search_space] + + points = np.array([sample() for _ in range(10)]) + for i, distribution in enumerate(search_space.values()): + assert isinstance( + distribution, + ( + FloatDistribution, + IntDistribution, + ), + ) + assert np.all(points[:, i] >= distribution.low) + assert np.all(points[:, i] <= distribution.high) + for param_value, distribution in zip(sample(), search_space.values()): + assert not isinstance(param_value, np.floating) + assert not isinstance(param_value, np.integer) + if isinstance(distribution, IntDistribution): + assert isinstance(param_value, int) + else: + assert isinstance(param_value, float) + + +@parametrize_sampler +def test_conditional_sample_independent(sampler_class: Callable[[], BaseSampler]) -> None: + # This test case reproduces the error reported in #2734. + # See https://github.com/optuna/optuna/pull/2734#issuecomment-857649769. + + study = optuna.study.create_study(sampler=sampler_class()) + categorical_distribution = CategoricalDistribution(choices=["x", "y"]) + dependent_distribution = CategoricalDistribution(choices=["a", "b"]) + + study.add_trial( + optuna.create_trial( + params={"category": "x", "x": "a"}, + distributions={"category": categorical_distribution, "x": dependent_distribution}, + value=0.1, + ) + ) + + study.add_trial( + optuna.create_trial( + params={"category": "y", "y": "b"}, + distributions={"category": categorical_distribution, "y": dependent_distribution}, + value=0.1, + ) + ) + + _trial = _create_new_trial(study) + category = study.sampler.sample_independent( + study, _trial, "category", categorical_distribution + ) + assert category in ["x", "y"] + value = study.sampler.sample_independent(study, _trial, category, dependent_distribution) + assert value in ["a", "b"] + + +def _create_new_trial(study: Study) -> FrozenTrial: + trial_id = study._storage.create_new_trial(study._study_id) + return study._storage.get_trial(trial_id) + + +@parametrize_sampler +def test_nan_objective_value(sampler_class: Callable[[], BaseSampler]) -> None: + study = optuna.create_study(sampler=sampler_class()) + + def objective(trial: Trial, base_value: float) -> float: + return trial.suggest_float("x", 0.1, 0.2) + base_value + + # Non NaN objective values. + for i in range(10, 1, -1): + study.optimize(lambda t: objective(t, i), n_trials=1, catch=()) + assert int(study.best_value) == 2 + + # NaN objective values. + study.optimize(lambda t: objective(t, float("nan")), n_trials=1, catch=()) + assert int(study.best_value) == 2 + + # Non NaN objective value. + study.optimize(lambda t: objective(t, 1), n_trials=1, catch=()) + assert int(study.best_value) == 1 + + +@parametrize_multi_objective_sampler +@pytest.mark.parametrize( + "distribution", + [ + FloatDistribution(-1.0, 1.0), + FloatDistribution(0.0, 1.0), + FloatDistribution(-1.0, 0.0), + FloatDistribution(1e-7, 1.0, log=True), + FloatDistribution(-10, 10, step=0.1), + FloatDistribution(-10.2, 10.2, step=0.1), + IntDistribution(-10, 10), + IntDistribution(0, 10), + IntDistribution(-10, 0), + IntDistribution(-10, 10, step=2), + IntDistribution(0, 10, step=2), + IntDistribution(-10, 0, step=2), + IntDistribution(1, 100, log=True), + CategoricalDistribution((1, 2, 3)), + CategoricalDistribution(("a", "b", "c")), + CategoricalDistribution((1, "a")), + ], +) +def test_multi_objective_sample_independent( + multi_objective_sampler_class: Callable[[], BaseSampler], distribution: BaseDistribution +) -> None: + study = optuna.study.create_study( + directions=["minimize", "maximize"], sampler=multi_objective_sampler_class() + ) + for i in range(100): + value = study.sampler.sample_independent( + study, _create_new_trial(study), "x", distribution + ) + assert distribution._contains(distribution.to_internal_repr(value)) + + if not isinstance(distribution, CategoricalDistribution): + # Please see https://github.com/optuna/optuna/pull/393 why this assertion is needed. + assert not isinstance(value, np.floating) + + if isinstance(distribution, FloatDistribution): + if distribution.step is not None: + # Check the value is a multiple of `distribution.step` which is + # the quantization interval of the distribution. + value -= distribution.low + value /= distribution.step + round_value = np.round(value) + np.testing.assert_almost_equal(round_value, value) + + +@parametrize_sampler +def test_sample_single_distribution(sampler_class: Callable[[], BaseSampler]) -> None: + relative_search_space = { + "a": CategoricalDistribution([1]), + "b": IntDistribution(low=1, high=1), + "c": IntDistribution(low=1, high=1, log=True), + "d": FloatDistribution(low=1.0, high=1.0), + "e": FloatDistribution(low=1.0, high=1.0, log=True), + "f": FloatDistribution(low=1.0, high=1.0, step=1.0), + } + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", optuna.exceptions.ExperimentalWarning) + sampler = sampler_class() + study = optuna.study.create_study(sampler=sampler) + + # We need to test the construction of the model, so we should set `n_trials >= 2`. + for _ in range(2): + trial = study.ask(fixed_distributions=relative_search_space) + study.tell(trial, 1.0) + for param_name in relative_search_space.keys(): + assert trial.params[param_name] == 1 + + +@parametrize_sampler +@parametrize_suggest_method("x") +def test_single_parameter_objective( + sampler_class: Callable[[], BaseSampler], suggest_method_x: Callable[[Trial], float] +) -> None: + def objective(trial: Trial) -> float: + return suggest_method_x(trial) + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", optuna.exceptions.ExperimentalWarning) + sampler = sampler_class() + + study = optuna.study.create_study(sampler=sampler) + study.optimize(objective, n_trials=10) + + assert len(study.trials) == 10 + assert all(t.state == TrialState.COMPLETE for t in study.trials) + + +@parametrize_sampler +def test_conditional_parameter_objective(sampler_class: Callable[[], BaseSampler]) -> None: + def objective(trial: Trial) -> float: + x = trial.suggest_categorical("x", [True, False]) + if x: + return trial.suggest_float("y", 0, 1) + return trial.suggest_float("z", 0, 1) + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", optuna.exceptions.ExperimentalWarning) + sampler = sampler_class() + + study = optuna.study.create_study(sampler=sampler) + study.optimize(objective, n_trials=10) + + assert len(study.trials) == 10 + assert all(t.state == TrialState.COMPLETE for t in study.trials) + + +@parametrize_sampler +@parametrize_suggest_method("x") +@parametrize_suggest_method("y") +def test_combination_of_different_distributions_objective( + sampler_class: Callable[[], BaseSampler], + suggest_method_x: Callable[[Trial], float], + suggest_method_y: Callable[[Trial], float], +) -> None: + def objective(trial: Trial) -> float: + return suggest_method_x(trial) + suggest_method_y(trial) + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", optuna.exceptions.ExperimentalWarning) + sampler = sampler_class() + + study = optuna.study.create_study(sampler=sampler) + study.optimize(objective, n_trials=3) + + assert len(study.trials) == 3 + assert all(t.state == TrialState.COMPLETE for t in study.trials) + + +@parametrize_sampler +@pytest.mark.parametrize( + "second_low,second_high", + [ + (0, 5), # Narrow range. + (0, 20), # Expand range. + (20, 30), # Set non-overlapping range. + ], +) +def test_dynamic_range_objective( + sampler_class: Callable[[], BaseSampler], second_low: int, second_high: int +) -> None: + def objective(trial: Trial, low: int, high: int) -> float: + v = trial.suggest_float("x", low, high) + v += trial.suggest_int("y", low, high) + return v + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", optuna.exceptions.ExperimentalWarning) + sampler = sampler_class() + + study = optuna.study.create_study(sampler=sampler) + study.optimize(lambda t: objective(t, 0, 10), n_trials=10) + study.optimize(lambda t: objective(t, second_low, second_high), n_trials=10) + + assert len(study.trials) == 20 + assert all(t.state == TrialState.COMPLETE for t in study.trials) + + +# We add tests for constant objective functions to ensure the reproducibility of sorting. +@parametrize_sampler_with_seed +@pytest.mark.parametrize("objective_func", [lambda *args: sum(args), lambda *args: 0.0]) +def test_reproducible(sampler_class: Callable[[int], BaseSampler], objective_func: Any) -> None: + def objective(trial: Trial) -> float: + a = trial.suggest_float("a", 1, 9) + b = trial.suggest_float("b", 1, 9, log=True) + c = trial.suggest_float("c", 1, 9, step=1) + d = trial.suggest_int("d", 1, 9) + e = trial.suggest_int("e", 1, 9, log=True) + f = trial.suggest_int("f", 1, 9, step=2) + g = trial.suggest_categorical("g", range(1, 10)) + return objective_func(a, b, c, d, e, f, g) + + study = optuna.create_study(sampler=sampler_class(1)) + study.optimize(objective, n_trials=15) + + study_same_seed = optuna.create_study(sampler=sampler_class(1)) + study_same_seed.optimize(objective, n_trials=15) + for i in range(15): + assert study.trials[i].params == study_same_seed.trials[i].params + + study_different_seed = optuna.create_study(sampler=sampler_class(2)) + study_different_seed.optimize(objective, n_trials=15) + assert any( + [study.trials[i].params != study_different_seed.trials[i].params for i in range(15)] + ) + + +@parametrize_sampler_with_seed +def test_reseed_rng_change_sampling(sampler_class: Callable[[int], BaseSampler]) -> None: + def objective(trial: Trial) -> float: + a = trial.suggest_float("a", 1, 9) + b = trial.suggest_float("b", 1, 9, log=True) + c = trial.suggest_float("c", 1, 9, step=1) + d = trial.suggest_int("d", 1, 9) + e = trial.suggest_int("e", 1, 9, log=True) + f = trial.suggest_int("f", 1, 9, step=2) + g = trial.suggest_categorical("g", range(1, 10)) + return a + b + c + d + e + f + g + + sampler = sampler_class(1) + study = optuna.create_study(sampler=sampler) + study.optimize(objective, n_trials=15) + + sampler_different_seed = sampler_class(1) + sampler_different_seed.reseed_rng() + study_different_seed = optuna.create_study(sampler=sampler_different_seed) + study_different_seed.optimize(objective, n_trials=15) + assert any( + [study.trials[i].params != study_different_seed.trials[i].params for i in range(15)] + ) + + +# This function is used only in test_reproducible_in_other_process, but declared at top-level +# because local function cannot be pickled, which occurs within multiprocessing. +def run_optimize( + k: int, + sampler_name: str, + sequence_dict: DictProxy, + hash_dict: DictProxy, +) -> None: + def objective(trial: Trial) -> float: + a = trial.suggest_float("a", 1, 9) + b = trial.suggest_float("b", 1, 9, log=True) + c = trial.suggest_float("c", 1, 9, step=1) + d = trial.suggest_int("d", 1, 9) + e = trial.suggest_int("e", 1, 9, log=True) + f = trial.suggest_int("f", 1, 9, step=2) + g = trial.suggest_categorical("g", range(1, 10)) + return a + b + c + d + e + f + g + + hash_dict[k] = hash("nondeterministic hash") + sampler = sampler_class_with_seed[sampler_name](1) + study = optuna.create_study(sampler=sampler) + study.optimize(objective, n_trials=15) + sequence_dict[k] = list(study.trials[-1].params.values()) + + +@pytest.fixture +def unset_seed_in_test(request: SubRequest) -> None: + # Unset the hashseed at beginning and restore it at end regardless of an exception in the test. + # See https://docs.pytest.org/en/stable/how-to/fixtures.html#adding-finalizers-directly + # for details. + + hash_seed = os.getenv("PYTHONHASHSEED") + if hash_seed is not None: + del os.environ["PYTHONHASHSEED"] + + def restore_seed() -> None: + if hash_seed is not None: + os.environ["PYTHONHASHSEED"] = hash_seed + + request.addfinalizer(restore_seed) + + +@parametrize_sampler_name_with_seed +def test_reproducible_in_other_process(sampler_name: str, unset_seed_in_test: None) -> None: + # This test should be tested without `PYTHONHASHSEED`. However, some tool such as tox + # set the environmental variable "PYTHONHASHSEED" by default. + # To do so, this test calls a finalizer: `unset_seed_in_test`. + + # Multiprocessing supports three way to start a process. + # We use `spawn` option to create a child process as a fresh python process. + # For more detail, see https://github.com/optuna/optuna/pull/3187#issuecomment-997673037. + multiprocessing.set_start_method("spawn", force=True) + manager = multiprocessing.Manager() + sequence_dict: DictProxy = manager.dict() + hash_dict: DictProxy = manager.dict() + for i in range(3): + p = multiprocessing.Process( + target=run_optimize, args=(i, sampler_name, sequence_dict, hash_dict) + ) + p.start() + p.join() + + # Hashes are expected to be different because string hashing is nondeterministic per process. + assert not (hash_dict[0] == hash_dict[1] == hash_dict[2]) + # But the sequences are expected to be the same. + assert sequence_dict[0] == sequence_dict[1] == sequence_dict[2] + + +@pytest.mark.parametrize("n_jobs", [1, 2]) +@parametrize_relative_sampler +def test_cache_is_invalidated( + n_jobs: int, relative_sampler_class: Callable[[], BaseSampler] +) -> None: + sampler = relative_sampler_class() + study = optuna.study.create_study(sampler=sampler) + + def objective(trial: Trial) -> float: + assert trial._relative_params is None + assert study._thread_local.cached_all_trials is None + + trial.suggest_float("x", -10, 10) + trial.suggest_float("y", -10, 10) + assert trial._relative_params is not None + return -1 + + study.optimize(objective, n_trials=10, n_jobs=n_jobs)