diff --git a/package/samplers/ctpe/LICENSE b/package/samplers/ctpe/LICENSE new file mode 100644 index 00000000..f6547d35 --- /dev/null +++ b/package/samplers/ctpe/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Shuhei Watanabe + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/package/samplers/ctpe/README.md b/package/samplers/ctpe/README.md new file mode 100644 index 00000000..8b07f9a7 --- /dev/null +++ b/package/samplers/ctpe/README.md @@ -0,0 +1,98 @@ +--- +author: Shuhei Watanabe +title: c-TPE; Tree-structured Parzen Estimator with Inequality Constraints for Expensive Hyperparameter Optimization +description: The optimizer that reproduces the algorithm described in the paper ``c-TPE; Tree-structured Parzen Estimator with Inequality Constraints for Expensive Hyperparameter Optimization''. +tags: [sampler, tpe, c-tpe, paper, research] +optuna_versions: [v4.1.0] +license: MIT License +--- + +## Abstract + +This package aims to reproduce the TPE algorithm used in the paper published at IJCAI'23: + +- [c-TPE: Tree-structured Parzen Estimator with Inequality Constraints for Expensive Hyperparameter Optimization](https://arxiv.org/abs/2211.14411) + +The default parameter set of this sampler is the recommended setup from the paper and the experiments in the paper can also be reproduced by this sampler. + +Note that this sampler is officially implemented by the first author of the original paper. +The performance was verified, c.f. [#177](https://github.com/optuna/optunahub-registry/pull/177#issuecomment-2517083730), using Optuna v4.1.0 by reproducing the results of Fig. 3 (Top Row) in the original paper. + +![Performance Verification](images/slice-localization.png) + +## Class or Function Names + +- cTPESampler + +Although most arguments in [`TPESampler`](https://optuna.readthedocs.io/en/stable/reference/samplers/generated/optuna.samplers.TPESampler.html) are also available for `cTPESampler`, the original paper uses the default arguments of this sampler. +However, `constraints_func` must be provided to use this sampler. +For the reproducibility purpose, please set a fixed `seed`. +Please refer to [the documentation of CustomizableTPESampler](https://hub.optuna.org/samplers/tpe_tutorial/) for more details of the other arguments such as `bandwidth_strategy` and `b_magic_exponent`. + +## Installation + +The version constraint of this package is Optuna v4.0.0 or later. + +```shell +# The requirement is only Optuna. +$ pip install optuna + +# You can also optionally install as follows: +$ pip install -r https://hub.optuna.org/samplers/ctpe/requirements.txt +``` + +## Example + +This sampler is the official implementation of [the c-TPE paper](https://arxiv.org/abs/2211.14411). +The interface for the constraint handling is identical to the Optuna API, so please check [the official API reference](https://optuna.readthedocs.io/en/stable/faq.html#how-can-i-optimize-a-model-with-some-constraints) for more details. + +```python +from __future__ import annotations + +import numpy as np +import optuna +import optunahub + + +def objective(trial: optuna.Trial) -> float: + x = trial.suggest_float("x", 0.0, 2 * np.pi) + y = trial.suggest_float("y", 0.0, 2 * np.pi) + return float(np.sin(x) + y) + + +def constraints(trial: optuna.trial.FrozenTrial) -> tuple[float]: + x = trial.params["x"] + y = trial.params["y"] + c = float(np.sin(x) * np.sin(y) + 0.95) + trial.set_user_attr("c", c) + # For multiple constraints, we can simply include every constraint in the tuple below. + # e.g., if c1 <= t1 and c2 <= t2 are the constraints, return (c1 - t1, c2 - t2). + return (c, ) + + +sampler = optunahub.load_module(package="samplers/ctpe").cTPESampler(constraints_func=constraints) +study = optuna.create_study(sampler=sampler) +study.optimize(objective, n_trials=30) +print(study.best_trials) + +``` + +### Bibtex + +When you use this sampler, please cite the following: + +```bibtex +@inproceedings{watanabe_ctpe_ijcai_2023, + title={{c-TPE}: Tree-structured {P}arzen Estimator with Inequality Constraints for Expensive Hyperparameter Optimization}, + author={Watanabe, Shuhei and Hutter, Frank}, + booktitle={International Joint Conference on Artificial Intelligence}, + year={2023} +} + +@inproceedings{watanabe_ctpe_workshop_2022, + title={{c-TPE}: Generalizing Tree-structured {P}arzen Estimator with Inequality Constraints for Continuous and Categorical Hyperparameter Optimization}, + author={Watanabe, Shuhei and Hutter, Frank}, + journal = {Gaussian Processes, Spatiotemporal Modeling, and Decision-making Systems Workshop at Advances in Neural Information Processing Systems}, + year={2022} +} +``` diff --git a/package/samplers/ctpe/__init__.py b/package/samplers/ctpe/__init__.py new file mode 100644 index 00000000..079ad70e --- /dev/null +++ b/package/samplers/ctpe/__init__.py @@ -0,0 +1,4 @@ +from .sampler import cTPESampler + + +__all__ = ["cTPESampler"] diff --git a/package/samplers/ctpe/components.py b/package/samplers/ctpe/components.py new file mode 100644 index 00000000..71a8d6d1 --- /dev/null +++ b/package/samplers/ctpe/components.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import numpy as np + + +class GammaFunc: + def __init__(self, strategy: str, beta: float): + strategy_choices = ["linear", "sqrt"] + if strategy not in strategy_choices: + raise ValueError(f"strategy must be in {strategy_choices}, but got {strategy}.") + + self._strategy = strategy + self._beta = beta + + def __call__(self, x: int) -> int: + if self._strategy == "linear": + n = int(np.ceil(self._beta * x)) + elif self._strategy == "sqrt": + n = int(np.ceil(self._beta * np.sqrt(x))) + else: + assert "Should not reach." + + return min(n, 25) + + +class WeightFunc: + def __init__(self, strategy: str): + strategy_choices = ["old-decay", "old-drop", "uniform"] + if strategy not in strategy_choices: + raise ValueError(f"strategy must be in {strategy_choices}, but got {strategy}.") + + self._strategy = strategy + + def __call__(self, x: int) -> np.ndarray: + if x == 0: + return np.asarray([]) + elif x < 25 or self._strategy == "uniform": + return np.ones(x) + elif self._strategy == "old-decay": + ramp = np.linspace(1.0 / x, 1.0, num=x - 25) + flat = np.ones(25) + return np.concatenate([ramp, flat], axis=0) + elif self._strategy == "old-drop": + weights = np.ones(x) + weights[:-25] = 1e-12 + return weights + else: + assert "Should not reach." diff --git a/package/samplers/ctpe/images/slice-localization.png b/package/samplers/ctpe/images/slice-localization.png new file mode 100644 index 00000000..6a57cd43 Binary files /dev/null and b/package/samplers/ctpe/images/slice-localization.png differ diff --git a/package/samplers/ctpe/parzen_estimator.py b/package/samplers/ctpe/parzen_estimator.py new file mode 100644 index 00000000..e0c6a2ed --- /dev/null +++ b/package/samplers/ctpe/parzen_estimator.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import NamedTuple + +import numpy as np +from optuna.distributions import CategoricalDistribution +from optuna.samplers._tpe.parzen_estimator import _ParzenEstimator +from optuna.samplers._tpe.probability_distributions import _BatchedCategoricalDistributions +from optuna.samplers._tpe.probability_distributions import _BatchedDiscreteTruncNormDistributions +from optuna.samplers._tpe.probability_distributions import _BatchedDistributions +from optuna.samplers._tpe.probability_distributions import _BatchedTruncNormDistributions + + +class _CustomizableParzenEstimatorParameters(NamedTuple): + consider_prior: bool + prior_weight: float | None + consider_magic_clip: bool + weights: Callable[[int], np.ndarray] + multivariate: bool + b_magic_exponent: float + min_bandwidth_factor: float + bandwidth_strategy: str + use_min_bandwidth_discrete: bool + categorical_prior_weight: float | None + + +def _bandwidth_hyperopt( + mus: np.ndarray, + low: float, + high: float, + step: float | None, +) -> np.ndarray: + step_or_0 = step or 0 + sorted_indices = np.argsort(mus) + sorted_mus_with_endpoints = np.empty(len(mus) + 2, dtype=float) + sorted_mus_with_endpoints[0] = low - step_or_0 / 2 + sorted_mus_with_endpoints[1:-1] = mus[sorted_indices] + sorted_mus_with_endpoints[-1] = high + step_or_0 / 2 + sorted_sigmas = np.maximum( + sorted_mus_with_endpoints[1:-1] - sorted_mus_with_endpoints[0:-2], + sorted_mus_with_endpoints[2:] - sorted_mus_with_endpoints[1:-1], + ) + return sorted_sigmas[np.argsort(sorted_indices)] + + +def _bandwidth_optuna( + n_observations: int, + consider_prior: bool, + domain_range: float, + dim: int, +) -> np.ndarray: + SIGMA0_MAGNITUDE = 0.2 + sigma = SIGMA0_MAGNITUDE * max(n_observations, 1) ** (-1.0 / (dim + 4)) * domain_range + return np.full(shape=(n_observations + consider_prior,), fill_value=sigma) + + +def _bandwidth_scott(mus: np.ndarray) -> np.ndarray: + std = np.std(mus, ddof=int(mus.size > 1)) + IQR = np.subtract.reduce(np.percentile(mus, [75, 25])) + return np.full_like(mus, 1.059 * min(IQR / 1.34, std) * mus.size**-0.2) + + +def _clip_bandwidth( + sigmas: np.ndarray, + n_observations: int, + domain_range: float, + consider_prior: bool, + consider_magic_clip: bool, + b_magic_exponent: float, + min_bandwidth_factor: float, +) -> np.ndarray: + # We adjust the range of the 'sigmas' according to the 'consider_magic_clip' flag. + maxsigma = 1.0 * domain_range + if consider_magic_clip: + bandwidth_factor = max( + min_bandwidth_factor, 1.0 / (1 + n_observations + consider_prior) ** b_magic_exponent + ) + minsigma = bandwidth_factor * domain_range + else: + minsigma = 1e-12 + + clipped_sigmas = np.asarray(np.clip(sigmas, minsigma, maxsigma)) + if consider_prior: + clipped_sigmas[-1] = maxsigma + + return clipped_sigmas + + +class _CustomizableParzenEstimator(_ParzenEstimator): + def _calculate_numerical_distributions( + self, + observations: np.ndarray, + low: float, + high: float, + step: float | None, + parameters: _CustomizableParzenEstimatorParameters, + ) -> _BatchedDistributions: + domain_range = high - low + (step or 0) + consider_prior = parameters.consider_prior or len(observations) == 0 + + if consider_prior: + mus = np.append(observations, [0.5 * (low + high)]) + else: + mus = observations.copy() + + if parameters.bandwidth_strategy == "hyperopt": + sigmas = _bandwidth_hyperopt(mus, low, high, step) + elif parameters.bandwidth_strategy == "optuna": + sigmas = _bandwidth_optuna( + n_observations=len(observations), + consider_prior=consider_prior, + domain_range=domain_range, + dim=len(self._search_space), + ) + elif parameters.bandwidth_strategy == "scott": + sigmas = _bandwidth_scott(mus) + else: + raise ValueError(f"Got unknown bandwidth_strategy={parameters.bandwidth_strategy}.") + + sigmas = _clip_bandwidth( + sigmas=sigmas, + n_observations=len(observations), + domain_range=domain_range, + consider_magic_clip=parameters.consider_magic_clip, + consider_prior=consider_prior, + b_magic_exponent=parameters.b_magic_exponent, + min_bandwidth_factor=parameters.min_bandwidth_factor, + ) + + if step is not None and parameters.use_min_bandwidth_discrete: + # NOTE(nabenabe0928): This hack is specifically for c-TPE. + n_grids = int((high - low) / step + 1) + sigmas = np.maximum(sigmas, domain_range / n_grids) + + if step is None: + return _BatchedTruncNormDistributions(mus, sigmas, low, high) + else: + return _BatchedDiscreteTruncNormDistributions(mus, sigmas, low, high, step) + + def _calculate_categorical_distributions( + self, + observations: np.ndarray, + param_name: str, + search_space: CategoricalDistribution, + parameters: _CustomizableParzenEstimatorParameters, + ) -> _BatchedDistributions: + choices = search_space.choices + n_choices = len(choices) + if len(observations) == 0: + return _BatchedCategoricalDistributions( + weights=np.full((1, n_choices), fill_value=1.0 / n_choices) + ) + + n_kernels = len(observations) + parameters.consider_prior + observed_indices = observations.astype(int) + if parameters.categorical_prior_weight is None: + weights = np.full(shape=(n_kernels, n_choices), fill_value=1.0 / n_kernels) + weights[np.arange(len(observed_indices)), observed_indices] += 1 + weights /= weights.sum(axis=1, keepdims=True) + else: + assert 0 <= parameters.categorical_prior_weight <= 1 + b = parameters.categorical_prior_weight + weights = np.full(shape=(n_kernels, n_choices), fill_value=b / (n_choices - 1)) + weights[np.arange(len(observed_indices)), observed_indices] = 1 - b + weights[-1] = 1.0 / n_choices + + return _BatchedCategoricalDistributions(weights) diff --git a/package/samplers/ctpe/requirements.txt b/package/samplers/ctpe/requirements.txt new file mode 100644 index 00000000..afa8210b --- /dev/null +++ b/package/samplers/ctpe/requirements.txt @@ -0,0 +1 @@ +optuna>=4.0.0 diff --git a/package/samplers/ctpe/sampler.py b/package/samplers/ctpe/sampler.py new file mode 100644 index 00000000..6725f2e8 --- /dev/null +++ b/package/samplers/ctpe/sampler.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +from collections.abc import Callable +from collections.abc import Sequence +from typing import Any + +import numpy as np +from optuna.distributions import BaseDistribution +from optuna.logging import get_logger +from optuna.samplers import TPESampler +from optuna.samplers._tpe.parzen_estimator import _ParzenEstimator +from optuna.study import Study +from optuna.study import StudyDirection +from optuna.trial import FrozenTrial +from optuna.trial import TrialState + +from .components import GammaFunc +from .components import WeightFunc +from .parzen_estimator import _CustomizableParzenEstimator +from .parzen_estimator import _CustomizableParzenEstimatorParameters + + +_logger = get_logger(f"optuna.{__name__}") + + +class cTPESampler(TPESampler): + def __init__( + self, + *, + consider_prior: bool = True, + prior_weight: float = 1.0, + consider_magic_clip: bool = True, + n_startup_trials: int = 10, + n_ei_candidates: int = 24, + seed: int | None = None, + categorical_prior_weight: float | None = 0.2, + multivariate: bool = True, + b_magic_exponent: float = np.inf, + min_bandwidth_factor: float = 0.01, + gamma_strategy: str = "sqrt", + gamma_beta: float = 0.25, + weight_strategy: str = "uniform", + bandwidth_strategy: str = "scott", + use_min_bandwidth_discrete: bool = True, + constraints_func: Callable[[FrozenTrial], Sequence[float]] | None = None, + ): + if constraints_func is None: + raise ValueError( + f"{self.__class__.__name__} must take constraints_func, but got None." + ) + + gamma = GammaFunc(strategy=gamma_strategy, beta=gamma_beta) + weights = WeightFunc(strategy=weight_strategy) + super().__init__( + consider_prior=consider_prior, + prior_weight=prior_weight, + consider_magic_clip=consider_magic_clip, + consider_endpoints=True, + warn_independent_sampling=False, + n_startup_trials=n_startup_trials, + n_ei_candidates=n_ei_candidates, + gamma=gamma, + weights=weights, + seed=seed, + multivariate=multivariate, + constraints_func=constraints_func, + ) + self._parzen_estimator_cls = _CustomizableParzenEstimator + self._parzen_estimator_parameters = _CustomizableParzenEstimatorParameters( + consider_prior=consider_prior, + prior_weight=prior_weight, + consider_magic_clip=consider_magic_clip, + weights=weights, + multivariate=multivariate, + b_magic_exponent=b_magic_exponent, + min_bandwidth_factor=min_bandwidth_factor, + bandwidth_strategy=bandwidth_strategy, + categorical_prior_weight=categorical_prior_weight, + use_min_bandwidth_discrete=use_min_bandwidth_discrete, + ) + + def _warning_multi_objective_for_ctpe(self, study: Study) -> None: + if study._is_multi_objective(): + + def _get_additional_msg() -> str: + beta = getattr(self._gamma, "_beta", None) + strategy = getattr(self._gamma, "_strategy", None) + if beta != 0.15 or strategy != "linear": + return "" + + return ( + "Note that the original MOTPE uses beta=0.15 and strategy='sqrt', but " + f"beta={beta} and strategy='{strategy}' are used in this study." + ) + + _logger.warning( + "Multi-objective c-TPE does not exist in the original paper, " + "but sampling will be performed by c-TPE based on Optuna MOTPE. " + f"{_get_additional_msg()}" + ) + + def _build_parzen_estimators_for_constraints_and_get_quantiles( + self, + trials: list[FrozenTrial], + study: Study, + search_space: dict[str, BaseDistribution], + constraints_vals: np.ndarray, + ) -> tuple[list[_ParzenEstimator], list[_ParzenEstimator], list[float]]: + mpes_below: list[_ParzenEstimator] = [] + mpes_above: list[_ParzenEstimator] = [] + quantiles: list[float] = [] + for constraint_vals in constraints_vals.T: + is_satisfied = (constraint_vals <= 0) | (constraint_vals == min(constraint_vals)) + satisfied_trials = [t for t, include in zip(trials, is_satisfied) if include] + unsatisfied_trials = [t for t, exclude in zip(trials, is_satisfied) if not exclude] + mpes_below.append( + self._build_parzen_estimator( + study, search_space, satisfied_trials, handle_below=False + ) + ) + mpes_above.append( + self._build_parzen_estimator( + study, search_space, unsatisfied_trials, handle_below=False + ) + ) + quantiles.append(len(satisfied_trials) / len(trials)) + + return mpes_below, mpes_above, quantiles + + def _sample( + self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution] + ) -> dict[str, Any]: + self._warning_multi_objective_for_ctpe(study) + trials = study._get_trials(deepcopy=False, states=(TrialState.COMPLETE,), use_cache=True) + constraints_vals = np.asarray([self._constraints_func(t) for t in trials]) + (mpes_below, mpes_above, quantiles) = ( + self._build_parzen_estimators_for_constraints_and_get_quantiles( + trials, study, search_space, constraints_vals + ) + ) + + n_below_feasible = self._gamma(len(trials)) + below_trials, above_trials = _split_trials_for_ctpe( + study, trials, n_below_feasible, is_feasible=np.all(constraints_vals <= 0, axis=-1) + ) + mpes_below.append( + self._build_parzen_estimator(study, search_space, below_trials, handle_below=True) + ) + mpes_above.append( + self._build_parzen_estimator(study, search_space, above_trials, handle_below=False) + ) + quantiles.append(len(below_trials) / len(trials)) + + _samples_below: dict[str, list[_ParzenEstimator]] = { + param_name: [] for param_name in search_space + } + for mpe in mpes_below: + for param_name, samples in mpe.sample(self._rng.rng, self._n_ei_candidates).items(): + _samples_below[param_name].append(samples) + + samples_below = { + param_name: np.hstack(samples) for param_name, samples in _samples_below.items() + } + acq_func_vals = self._compute_acquisition_func( + samples_below, mpes_below, mpes_above, quantiles + ) + ret = TPESampler._compare(samples_below, acq_func_vals) + for param_name, dist in search_space.items(): + ret[param_name] = dist.to_external_repr(ret[param_name]) + + return ret + + def _compute_acquisition_func( + self, + samples: dict[str, np.ndarray], + mpes_below: list[_ParzenEstimator], + mpes_above: list[_ParzenEstimator], + quantiles: list[float], + ) -> np.ndarray: + _EPS = 1e-12 + assert len(mpes_above) == len(mpes_below) == len(quantiles) + lls_above = np.asarray([mpe_above.log_pdf(samples) for mpe_above in mpes_above]) + lls_below = np.asarray([mpe_below.log_pdf(samples) for mpe_below in mpes_below]) + _q = np.asarray(quantiles)[:, np.newaxis] + log_first_term = np.log(_q + _EPS) + log_second_term = np.log(1.0 - _q + _EPS) + lls_above - lls_below + acq_func_vals = np.sum(-np.logaddexp(log_first_term, log_second_term), axis=0) + return acq_func_vals + + +def _split_trials_for_ctpe( + study: Study, trials: list[FrozenTrial], n_below_feasible: int, is_feasible: np.ndarray +) -> tuple[list[FrozenTrial], list[FrozenTrial]]: + if len(trials) == 0: + return [], [] + if np.count_nonzero(is_feasible) < n_below_feasible or len(trials) == n_below_feasible: + return trials, [] + if n_below_feasible == 0: + return [], trials + + loss_vals = np.asarray([t.values for t in trials]) + loss_vals *= np.asarray([1 if d == StudyDirection.MINIMIZE else -1 for d in study.directions]) + if study._is_multi_objective(): + return _split_trials_for_multi_objective_ctpe(loss_vals, n_below_feasible, is_feasible) + else: + order = np.argsort(loss_vals[:, 0]) + n_below = np.searchsorted(np.cumsum(is_feasible[order]), n_below_feasible) + 1 + indices_below = set(np.arange(len(trials))[order[:n_below]]) + below_trials = [t for i, t in enumerate(trials) if i in indices_below] + above_trials = [t for i, t in enumerate(trials) if i not in indices_below] + return below_trials, above_trials + + +def _split_trials_for_multi_objective_ctpe( + loss_vals: np.ndarray, n_below_feasible: int, is_feasible: np.ndarray +) -> tuple[list[FrozenTrial], list[FrozenTrial]]: + assert 0 < n_below_feasible <= np.count_nonzero(is_feasible) + assert n_below_feasible < len(loss_vals) + raise ValueError("c-TPE has not supported multi-objective optimization yet.")