From 3ec949fd7279aec73ef02a66abeb1d7d8f225755 Mon Sep 17 00:00:00 2001 From: Chinmaya Sahu Date: Fri, 1 Nov 2024 20:14:25 +0530 Subject: [PATCH] Pre-commit hooks --- package/samplers/hill_climb_search/README.md | 3 +- package/samplers/hill_climb_search/example.py | 11 +- .../hill_climb_search/hill_climb_search.py | 174 ++++++++++-------- 3 files changed, 104 insertions(+), 84 deletions(-) diff --git a/package/samplers/hill_climb_search/README.md b/package/samplers/hill_climb_search/README.md index 831c6ae9..dee21610 100644 --- a/package/samplers/hill_climb_search/README.md +++ b/package/samplers/hill_climb_search/README.md @@ -2,12 +2,11 @@ author: Chinmaya Sahu title: Hill Climb Local Search Sampler description: This sampler used the Hill Climb Algorithm to improve the searching, by selecting the best neighbors and moving in that direction. -tags: [sampler,hill climb] +tags: [sampler, hill climb] optuna_versions: [4.0.0] license: MIT License --- - ## Abstract The **hill climbing algorithm** is an optimization technique that iteratively improves a solution by evaluating neighboring solutions in search of a local maximum or minimum. Starting with an initial guess, the algorithm examines nearby "neighbor" solutions, moving to a better neighbor if one is found. This process continues until no improvement is possible, resulting in a locally optimal solution. Hill climbing is efficient and easy to implement but can get stuck in local optima, making it suitable for simple optimization landscapes or applications with limited time constraints. Variants like random restarts and stochastic selection help overcome some limitations. diff --git a/package/samplers/hill_climb_search/example.py b/package/samplers/hill_climb_search/example.py index 8952d301..a2c27703 100644 --- a/package/samplers/hill_climb_search/example.py +++ b/package/samplers/hill_climb_search/example.py @@ -1,16 +1,17 @@ import optuna import optunahub -if __name__ == "__main__": - def objective(trial): + +if __name__ == "__main__": + + def objective(trial: optuna.trial.Trial) -> None: x = trial.suggest_discrete_uniform("x", -10, 10) y = trial.suggest_discrete_uniform("y", -10, 10) return -(x**2 + y**2) module = optunahub.load_module( - package="samplers/hill-climb-search", - repo_owner="csking101", - ref="hill-climb-algorithm") + package="samplers/hill-climb-search", repo_owner="csking101", ref="hill-climb-algorithm" + ) sampler = module.HillClimbSearch() study = optuna.create_study(sampler=sampler) study.optimize(objective, n_trials=20) diff --git a/package/samplers/hill_climb_search/hill_climb_search.py b/package/samplers/hill_climb_search/hill_climb_search.py index 63ae3f18..0f79d301 100644 --- a/package/samplers/hill_climb_search/hill_climb_search.py +++ b/package/samplers/hill_climb_search/hill_climb_search.py @@ -6,156 +6,176 @@ import optuna import optunahub + class HillClimbSearch(optunahub.samplers.SimpleBaseSampler): - """A sampler based on the Hill Climb Local Search Algorithm dealing with discrete values. - """ + """A sampler based on the Hill Climb Local Search Algorithm dealing with discrete values.""" - def __init__(self,search_space: dict[str, optuna.distributions.BaseDistribution] | None = None) -> None: + def __init__( + self, search_space: dict[str, optuna.distributions.BaseDistribution] | None = None + ) -> None: super().__init__(search_space) - self._remaining_points = [] + self._remaining_points: list[dict] = [] self._rng = np.random.RandomState() - - #This is for storing the current point whose neighbors are under analysis - self._current_point = None + + # This is for storing the current point whose neighbors are under analysis + self._current_point: dict | None = None self._current_point_value = None self._current_state = "Not Initialized" - - #This is for keeping track of the best neighbor + + # This is for keeping track of the best neighbor self._best_neighbor = None self._best_neighbor_value = None - - def _generate_random_point(self, search_space): - """This function generates a random discrete point in the search space - """ + + def _generate_random_point( + self, search_space: dict[str, optuna.distributions.BaseDistribution] + ) -> dict: + """This function generates a random discrete point in the search space""" params = {} for param_name, param_distribution in search_space.items(): if isinstance(param_distribution, optuna.distributions.FloatDistribution): - total_points = int((param_distribution.high - param_distribution.low) / param_distribution.step) - params[param_name] = param_distribution.low + self._rng.randint(0, total_points)*param_distribution.step + total_points = int( + (param_distribution.high - param_distribution.low) / param_distribution.step + ) + params[param_name] = ( + param_distribution.low + + self._rng.randint(0, total_points) * param_distribution.step + ) else: raise NotImplementedError return params - - def _remove_tried_points(self, neighbors, study, current_point): - """This function removes the points that have already been tried from the list of neighbors - """ + + def _remove_tried_points( + self, neighbors: list[dict], study: optuna.study.Study, current_point: dict + ) -> list[dict]: + """This function removes the points that have already been tried from the list of neighbors""" final_neighbors = [] - + tried_points = [trial.params for trial in study.get_trials(deepcopy=False)] points_to_try = self._remaining_points - + invalid_points = tried_points + points_to_try + [current_point] - + for neighbor in neighbors: if neighbor not in invalid_points: final_neighbors.append(neighbor) - - return final_neighbors - - def _generate_neighbors(self, current_point, search_space, study): - """This function generates the neighbors of the current point - """ + + return final_neighbors + + def _generate_neighbors( + self, + current_point: dict, + search_space: dict[str, optuna.distributions.BaseDistribution], + study: optuna.study.Study, + ) -> list[dict]: + """This function generates the neighbors of the current point""" neighbors = [] for param_name, param_distribution in search_space.items(): if isinstance(param_distribution, optuna.distributions.FloatDistribution): current_value = current_point[param_name] step = param_distribution.step - + neighbor_low = max(param_distribution.low, current_value - step) neighbor_high = min(param_distribution.high, current_value + step) - + neighbor_low_point = current_point.copy() neighbor_low_point[param_name] = neighbor_low neighbor_high_point = current_point.copy() neighbor_high_point[param_name] = neighbor_high - + neighbors.append(neighbor_low_point) neighbors.append(neighbor_high_point) else: raise NotImplementedError - - valid_neighbors = self._remove_tried_points(neighbors, study, current_point) - + + valid_neighbors = self._remove_tried_points(neighbors, study, current_point) + return valid_neighbors - - def sample_relative(self, study:optuna.study.Study, trial:optuna.trial.FrozenTrial, search_space: dict[str, optuna.distributions.BaseDistribution]) -> dict[str, Any]: + + def sample_relative( + self, + study: optuna.study.Study, + trial: optuna.trial.FrozenTrial, + search_space: dict[str, optuna.distributions.BaseDistribution], + ) -> dict[str, Any] | None: if search_space == {}: return {} - + if self._current_state == "Not Initialized": - #Create the current point - starting_point = self._generate_random_point(search_space) + # Create the current point + starting_point = self._generate_random_point(search_space) self._current_point = starting_point - - #Add the neighbors + + # Add the neighbors neighbors = self._generate_neighbors(starting_point, search_space, study) self._remaining_points.extend(neighbors) - - #Change the state to initialized + + # Change the state to initialized self._current_state = "Initialized" - - #Return the current point + + # Return the current point return starting_point - + elif self._current_state == "Initialized": - #This section is only for storing the value of the current point and best neighbor point + # This section is only for storing the value of the current point and best neighbor point previous_trial = study.get_trials(deepcopy=False)[-2] if previous_trial.params == self._current_point: - #Just now the current point was evaluated - #Store the value of the current point + # Just now the current point was evaluated + # Store the value of the current point self._current_point_value = previous_trial.value else: - #The neighbor was evaluated - #Store the value of the neighbor, if it improves upon the current point + # The neighbor was evaluated + # Store the value of the neighbor, if it improves upon the current point neighbor_value = previous_trial.value - + if neighbor_value < self._current_point_value: self._best_neighbor = previous_trial.params self._best_neighbor_value = neighbor_value - - #This section is for the next point to be evaluated + + # This section is for the next point to be evaluated if len(self._remaining_points) == 0: - #This means that all the neighbors have been processed - #Now you have to select the best neighbor - + # This means that all the neighbors have been processed + # Now you have to select the best neighbor + if self._best_neighbor is not None: - #There was an improvement - #Select the best neighbor, make that the current point and add its neighbors + # There was an improvement + # Select the best neighbor, make that the current point and add its neighbors self._current_point = self._best_neighbor self._current_point_value = self._best_neighbor_value - + self._best_neighbor = None self._best_neighbor_value = None - self._remaining_points = [] #Happens by virtue of the condition, but just for clarity - - #Add the neighbors + self._remaining_points = [] # Happens by virtue of the condition, but just for clarity + + # Add the neighbors neighbors = self._generate_neighbors(self._current_point, search_space, study) self._remaining_points.extend(neighbors) - + self._current_state = "Initialized" - + return self._current_point - + else: - #If none of the neighbors are better then do a random restart + # If none of the neighbors are better then do a random restart self._current_state = "Not Initialized" restarting_point = self._generate_random_point(search_space) self._current_point = restarting_point - + self._best_neighbor = None self._best_neighbor_value = None - - #Add the neighbors + + # Add the neighbors neighbors = self._generate_neighbors(restarting_point, search_space, study) self._remaining_points.extend(neighbors) - - #Change the state to initialized + + # Change the state to initialized self._current_state = "Initialized" - - #Return the current point + + # Return the current point return self._current_point - + else: - #Process as normal + # Process as normal current_point = self._remaining_points.pop() - return current_point \ No newline at end of file + return current_point + + return {}