Skip to content

Commit

Permalink
Remove c-MOTPE
Browse files Browse the repository at this point in the history
  • Loading branch information
nabenabe0928 committed Dec 3, 2024
1 parent 69939b9 commit d185af3
Showing 1 changed file with 101 additions and 59 deletions.
160 changes: 101 additions & 59 deletions package/samplers/ctpe/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from optuna.samplers._tpe.parzen_estimator import _ParzenEstimator
from optuna.samplers._tpe.sampler import _split_trials
from optuna.study import Study
from optuna.study import StudyDirection
from optuna.trial import FrozenTrial
from optuna.trial import TrialState

Expand All @@ -23,12 +24,6 @@
_logger = get_logger(f"optuna.{__name__}")


def _ctpe_split_trials(
study: Study, trials: list[FrozenTrial], n_below: int, enable_constriants: bool
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
return [], []


class cTPESampler(TPESampler):
def __init__(
self,
Expand Down Expand Up @@ -80,37 +75,91 @@ def __init__(
use_min_bandwidth_discrete=use_min_bandwidth_discrete,
)

def _sample(
self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution]
) -> dict[str, Any]:
def _warning_multi_objective_for_ctpe(self, study: Study) -> None:
if study._is_multi_objective():

def _get_additional_msg() -> str:
beta = getattr(self._gamma, "_beta", None)
strategy = getattr(self._gamma, "_strategy", None)
if beta != 0.15 or strategy != "linear":
return ""

return (
"Note that the original MOTPE uses beta=0.15 and strategy='sqrt', but "
f"beta={beta} and strategy='{strategy}' are used in this study."
)

_logger.warning(
"Multi-objective c-TPE does not exist in the original paper, "
"but sampling will be performed by c-TPE based on Optuna MOTPE."
"but sampling will be performed by c-TPE based on Optuna MOTPE. "
f"{_get_additional_msg()}"
)

def _build_parzen_estimators_for_constraints_and_get_quantiles(
self,
trials: list[FrozenTrial],
study: Study,
search_space: dict[str, BaseDistribution],
constraints_vals: np.ndarray,
) -> tuple[list[_ParzenEstimator], list[_ParzenEstimator], list[float]]:
mpes_below: list[_ParzenEstimator] = []
mpes_above: list[_ParzenEstimator] = []
quantiles: list[float] = []
for constraint_vals in constraints_vals.T:
is_satisfied = constraint_vals <= 0
satisfied_trials = [t for t, include in zip(trials, is_satisfied) if include]
unsatisfied_trials = [t for t, exclude in zip(trials, is_satisfied) if not exclude]
mpes_below.append(
self._build_parzen_estimator(
study, search_space, satisfied_trials, handle_below=False
)
)
mpes_above.append(
self._build_parzen_estimator(
study, search_space, unsatisfied_trials, handle_below=False
)
)
quantiles.append(len(satisfied_trials) / len(trials))

return mpes_below, mpes_above, quantiles

def _sample(
self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution]
) -> dict[str, Any]:
self._warning_multi_objective_for_ctpe(study)
trials = study._get_trials(deepcopy=False, states=(TrialState.COMPLETE,), use_cache=True)
# n_below_feasible = self._gamma(len(trials))
# constraints_vals = np.asarray([self._constraints_func(t) for t in trials])
n_below = ...
# qs = ...
constraints_vals = np.asarray([self._constraints_func(t) for t in trials])
(mpes_below, mpes_above, quantiles) = (
self._build_parzen_estimators_for_constraints_and_get_quantiles(
trials, study, search_space, constraints_vals
)
)

# We divide data into below and above.
# n_trials = len(trials)
n_below_feasible = self._gamma(len(trials))
below_trials, above_trials = _split_trials(
study, trials, n_below, constraints_enabled=False
study, trials, n_below_feasible, is_feasible=np.all(constraints_vals <= 0, axis=-1)
)
mpe_below = self._build_parzen_estimator(
study, search_space, below_trials, handle_below=True
mpes_below.append(
self._build_parzen_estimator(study, search_space, below_trials, handle_below=True)
)
mpe_above = self._build_parzen_estimator(
study, search_space, above_trials, handle_below=False
mpes_above.append(
self._build_parzen_estimator(study, search_space, above_trials, handle_below=False)
)

samples_below = mpe_below.sample(self._rng.rng, self._n_ei_candidates)
acq_func_vals = self._compute_acquisition_func(samples_below, mpe_below, mpe_above, [])
_samples_below: dict[str, list[_ParzenEstimator]] = {
param_name: [] for param_name in search_space
}
for mpe in mpes_below:
for param_name, samples in mpe.sample(self._rng.rng, self._n_ei_candidates).items():
_samples_below[param_name].append(samples)

samples_below = {
param_name: np.hstack(samples) for param_name, samples in _samples_below.items()
}
acq_func_vals = self._compute_acquisition_func(
samples_below, mpes_below, mpes_above, quantiles
)
ret = TPESampler._compare(samples_below, acq_func_vals)

for param_name, dist in search_space.items():
ret[param_name] = dist.to_external_repr(ret[param_name])

Expand All @@ -134,39 +183,32 @@ def _compute_acquisition_func(
return acq_func_vals


def _get_reference_point(loss_vals: np.ndarray) -> np.ndarray:
worst_point = np.max(loss_vals, axis=0)
reference_point = np.maximum(1.1 * worst_point, 0.9 * worst_point)
reference_point[reference_point == 0] = EPS
return reference_point


def _split_complete_trials_multi_objective(
trials: Sequence[FrozenTrial], study: Study, n_below: int
def _split_trials_for_ctpe(
study: Study, trials: list[FrozenTrial], n_below_feasible: int, is_feasible: np.ndarray
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
if n_below == 0:
return [], list(trials)

assert 0 <= n_below <= len(trials)
lvals = np.array([trial.values for trial in trials])
lvals *= np.array([-1.0 if d == StudyDirection.MAXIMIZE else 1.0 for d in study.directions])
nondomination_ranks = _fast_non_domination_rank(lvals, n_below=n_below)
unique_sorted_ranks, counts_of_each_rank = np.unique(nondomination_ranks, return_counts=True)
last_rank_before_tie_break = unique_sorted_ranks[np.cumsum(counts_of_each_rank) <= n_below][-1]
is_rank_before_tie_break = nondomination_ranks <= last_rank_before_tie_break
indices = np.arange(len(lvals))
indices_below = indices[is_rank_before_tie_break]

if indices_below.size < n_below: # Tie-break with Hypervolume subset selection problem (HSSP).
need_tie_break = nondomination_ranks == last_rank_before_tie_break + 1
rank_i_lvals = lvals[need_tie_break]
subset_size = n_below - indices_below.size
selected_indices = _solve_hssp(
rank_i_lvals, indices[need_tie_break], subset_size, _get_reference_point(rank_i_lvals)
)
indices_below = np.append(indices_below, selected_indices)

below_indices_set = set(indices_below.tolist())
below_trials = [trials[i] for i in range(len(trials)) if i in below_indices_set]
above_trials = [trials[i] for i in range(len(trials)) if i not in below_indices_set]
return below_trials, above_trials
if len(trials) == 0:
return [], []
if np.count_nonzero(is_feasible) < n_below_feasible or len(trials) == n_below_feasible:
return trials, []
if n_below_feasible == 0:
return [], trials

loss_vals = np.asarray([t.values for t in trials])
loss_vals *= np.asarray([1 if d == StudyDirection.MINIMIZE else -1 for d in study.directions])
if study._is_multi_objective():
return _split_trials_for_multi_objective_ctpe(loss_vals, n_below_feasible, is_feasible)
else:
order = np.argsort(loss_vals[:, 0])
n_below = np.searchsorted(np.cumsum(is_feasible[order]), n_below_feasible) + 1
indices_below = set(np.arange(len(trials))[order[:n_below]])
below_trials = [t for i, t in enumerate(trials) if i in indices_below]
above_trials = [t for i, t in enumerate(trials) if i not in indices_below]
return below_trials, above_trials


def _split_trials_for_multi_objective_ctpe(
loss_vals: np.ndarray, n_below_feasible: int, is_feasible: np.ndarray
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
assert 0 < n_below_feasible <= np.count_nonzero(is_feasible)
assert n_below_feasible < len(loss_vals)
raise ValueError("c-TPE has not supported multi-objective optimization yet.")

0 comments on commit d185af3

Please sign in to comment.