diff --git a/package/samplers/hill_climb_search/LICENSE b/package/samplers/hill_climb_search/LICENSE new file mode 100644 index 00000000..652ef72f --- /dev/null +++ b/package/samplers/hill_climb_search/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Chinmaya Sahu + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/package/samplers/hill_climb_search/README.md b/package/samplers/hill_climb_search/README.md new file mode 100644 index 00000000..4a9ab190 --- /dev/null +++ b/package/samplers/hill_climb_search/README.md @@ -0,0 +1,33 @@ +--- +author: Chinmaya Sahu +title: Hill Climb Local Search Sampler +description: This sampler used the Hill Climb Algorithm to improve the searching, by selecting the best neighbors and moving in that direction. +tags: [sampler, hill climb] +optuna_versions: [4.0.0] +license: MIT License +--- + +## Abstract + +The **hill climbing algorithm** is an optimization technique that iteratively improves a solution by evaluating neighboring solutions in search of a local maximum or minimum. Starting with an initial guess, the algorithm examines nearby "neighbor" solutions, moving to a better neighbor if one is found. This process continues until no improvement is possible, resulting in a locally optimal solution. Hill climbing is efficient and easy to implement but can get stuck in local optima, making it suitable for simple optimization landscapes or applications with limited time constraints. Variants like random restarts and stochastic selection help overcome some limitations. + +## Class or Function Names + +- HillClimbingSampler + +## Example + +```python +import optuna +import optunahub + +def objective(trial): + x = trial.suggest_int("x", -10, 10) + y = trial.suggest_int("y", -10, 10) + return -(x**2 + y**2) + +mod = optunahub.load_module("samplers/hill_climb_search") +sampler = mod.HillClimbingSampler() +study = optuna.create_study(sampler=sampler) +study.optimize(objective, n_trials=20) +``` diff --git a/package/samplers/hill_climb_search/__init__.py b/package/samplers/hill_climb_search/__init__.py new file mode 100644 index 00000000..cfd376ea --- /dev/null +++ b/package/samplers/hill_climb_search/__init__.py @@ -0,0 +1,4 @@ +from .hill_climb_search import HillClimbingSampler + + +__all__ = ["HillClimbingSampler"] diff --git a/package/samplers/hill_climb_search/example.py b/package/samplers/hill_climb_search/example.py new file mode 100644 index 00000000..0d26c2d0 --- /dev/null +++ b/package/samplers/hill_climb_search/example.py @@ -0,0 +1,17 @@ +import optuna +import optunahub + + +if __name__ == "__main__": + + def objective(trial: optuna.trial.Trial) -> None: + x = trial.suggest_int("x", -10, 10) + y = trial.suggest_int("y", -10, 10) + return -(x**2 + y**2) + + module = optunahub.load_module(package="samplers/hill-climb-search") + sampler = module.HillClimbingSampler() + study = optuna.create_study(sampler=sampler) + study.optimize(objective, n_trials=20) + + print(study.best_trial.value, study.best_trial.params) diff --git a/package/samplers/hill_climb_search/hill_climb_search.py b/package/samplers/hill_climb_search/hill_climb_search.py new file mode 100644 index 00000000..d2c671af --- /dev/null +++ b/package/samplers/hill_climb_search/hill_climb_search.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +from typing import Any + +import numpy as np +import optuna +import optunahub + + +class HillClimbingSampler(optunahub.samplers.SimpleBaseSampler): + """A sampler based on the Hill Climb Local Search Algorithm dealing with discrete values.""" + + def __init__( + self, search_space: dict[str, optuna.distributions.BaseDistribution] | None = None + ) -> None: + super().__init__(search_space) + self._remaining_points: list[dict] = [] + self._rng = np.random.RandomState() + + # This is for storing the current point whose neighbors are under analysis + self._current_point: dict | None = None + self._current_point_value = None + self._current_state = "Not Initialized" + + # This is for keeping track of the best neighbor + self._best_neighbor = None + self._best_neighbor_value = None + + def _generate_random_point( + self, search_space: dict[str, optuna.distributions.BaseDistribution] + ) -> dict: + """This function generates a random discrete point in the search space""" + params = {} + for param_name, param_distribution in search_space.items(): + if isinstance(param_distribution, optuna.distributions.IntDistribution): + total_points = int( + (param_distribution.high - param_distribution.low) / param_distribution.step + ) + params[param_name] = ( + param_distribution.low + + self._rng.randint(0, total_points) * param_distribution.step + ) + else: + raise NotImplementedError + return params + + def _remove_tried_points( + self, neighbors: list[dict], study: optuna.study.Study, current_point: dict + ) -> list[dict]: + """This function removes the points that have already been tried from the list of neighbors""" + final_neighbors = [] + + tried_points = [trial.params for trial in study.get_trials(deepcopy=False)] + points_to_try = self._remaining_points + + invalid_points = tried_points + points_to_try + [current_point] + + for neighbor in neighbors: + if neighbor not in invalid_points: + final_neighbors.append(neighbor) + + return final_neighbors + + def _generate_neighbors( + self, + current_point: dict, + search_space: dict[str, optuna.distributions.BaseDistribution], + study: optuna.study.Study, + ) -> list[dict]: + """This function generates the neighbors of the current point""" + neighbors = [] + for param_name, param_distribution in search_space.items(): + if isinstance(param_distribution, optuna.distributions.IntDistribution): + current_value = current_point[param_name] + step = param_distribution.step + + neighbor_low = max(param_distribution.low, current_value - step) + neighbor_high = min(param_distribution.high, current_value + step) + + neighbor_low_point = current_point.copy() + neighbor_low_point[param_name] = neighbor_low + neighbor_high_point = current_point.copy() + neighbor_high_point[param_name] = neighbor_high + + neighbors.append(neighbor_low_point) + neighbors.append(neighbor_high_point) + else: + raise NotImplementedError + + valid_neighbors = self._remove_tried_points(neighbors, study, current_point) + + return valid_neighbors + + def sample_relative( + self, + study: optuna.study.Study, + trial: optuna.trial.FrozenTrial, + search_space: dict[str, optuna.distributions.BaseDistribution], + ) -> dict[str, Any] | None: + if search_space == {}: + return {} + + if self._current_state == "Not Initialized": + # Create the current point + starting_point = self._generate_random_point(search_space) + self._current_point = starting_point + + # Add the neighbors + neighbors = self._generate_neighbors(starting_point, search_space, study) + self._remaining_points.extend(neighbors) + + # Change the state to initialized + self._current_state = "Initialized" + + # Return the current point + return starting_point + + elif self._current_state == "Initialized": + # This section is only for storing the value of the current point and best neighbor point + previous_trial = study.get_trials(deepcopy=False)[-2] + if previous_trial.params == self._current_point: + # Just now the current point was evaluated + # Store the value of the current point + self._current_point_value = previous_trial.value + else: + # The neighbor was evaluated + # Store the value of the neighbor, if it improves upon the current point + neighbor_value = previous_trial.value + + criteria = ( + neighbor_value < self._current_point_value + if study.direction == optuna.study.StudyDirection.MINIMIZE + else neighbor_value > self._current_point_value + ) + + if criteria: + self._best_neighbor = previous_trial.params + self._best_neighbor_value = neighbor_value + + # This section is for the next point to be evaluated + if len(self._remaining_points) == 0: + # This means that all the neighbors have been processed + # Now you have to select the best neighbor + + if self._best_neighbor is not None: + # There was an improvement + # Select the best neighbor, make that the current point and add its neighbors + self._current_point = self._best_neighbor + self._current_point_value = self._best_neighbor_value + + self._best_neighbor = None + self._best_neighbor_value = None + self._remaining_points = [] # Happens by virtue of the condition, but just for clarity + + # Add the neighbors + neighbors = self._generate_neighbors(self._current_point, search_space, study) + self._remaining_points.extend(neighbors) + + self._current_state = "Initialized" + + return self._current_point + + else: + # If none of the neighbors are better then do a random restart + self._current_state = "Not Initialized" + restarting_point = self._generate_random_point(search_space) + self._current_point = restarting_point + + self._best_neighbor = None + self._best_neighbor_value = None + + # Add the neighbors + neighbors = self._generate_neighbors(restarting_point, search_space, study) + self._remaining_points.extend(neighbors) + + # Change the state to initialized + self._current_state = "Initialized" + + # Return the current point + return self._current_point + + else: + # Process as normal + current_point = self._remaining_points.pop() + return current_point + + return {}