Skip to content

Commit

Permalink
Pre-commit hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
csking101 committed Nov 1, 2024
1 parent db86207 commit 3ec949f
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 84 deletions.
3 changes: 1 addition & 2 deletions package/samplers/hill_climb_search/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
author: Chinmaya Sahu
title: Hill Climb Local Search Sampler
description: This sampler used the Hill Climb Algorithm to improve the searching, by selecting the best neighbors and moving in that direction.
tags: [sampler,hill climb]
tags: [sampler, hill climb]
optuna_versions: [4.0.0]
license: MIT License
---


## Abstract

The **hill climbing algorithm** is an optimization technique that iteratively improves a solution by evaluating neighboring solutions in search of a local maximum or minimum. Starting with an initial guess, the algorithm examines nearby "neighbor" solutions, moving to a better neighbor if one is found. This process continues until no improvement is possible, resulting in a locally optimal solution. Hill climbing is efficient and easy to implement but can get stuck in local optima, making it suitable for simple optimization landscapes or applications with limited time constraints. Variants like random restarts and stochastic selection help overcome some limitations.
Expand Down
11 changes: 6 additions & 5 deletions package/samplers/hill_climb_search/example.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import optuna
import optunahub

if __name__ == "__main__":
def objective(trial):

if __name__ == "__main__":

def objective(trial: optuna.trial.Trial) -> None:
x = trial.suggest_discrete_uniform("x", -10, 10)
y = trial.suggest_discrete_uniform("y", -10, 10)
return -(x**2 + y**2)

module = optunahub.load_module(
package="samplers/hill-climb-search",
repo_owner="csking101",
ref="hill-climb-algorithm")
package="samplers/hill-climb-search", repo_owner="csking101", ref="hill-climb-algorithm"
)
sampler = module.HillClimbSearch()
study = optuna.create_study(sampler=sampler)
study.optimize(objective, n_trials=20)
Expand Down
174 changes: 97 additions & 77 deletions package/samplers/hill_climb_search/hill_climb_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,156 +6,176 @@
import optuna
import optunahub


class HillClimbSearch(optunahub.samplers.SimpleBaseSampler):
"""A sampler based on the Hill Climb Local Search Algorithm dealing with discrete values.
"""
"""A sampler based on the Hill Climb Local Search Algorithm dealing with discrete values."""

def __init__(self,search_space: dict[str, optuna.distributions.BaseDistribution] | None = None) -> None:
def __init__(
self, search_space: dict[str, optuna.distributions.BaseDistribution] | None = None
) -> None:
super().__init__(search_space)
self._remaining_points = []
self._remaining_points: list[dict] = []
self._rng = np.random.RandomState()
#This is for storing the current point whose neighbors are under analysis
self._current_point = None

# This is for storing the current point whose neighbors are under analysis
self._current_point: dict | None = None
self._current_point_value = None
self._current_state = "Not Initialized"
#This is for keeping track of the best neighbor

# This is for keeping track of the best neighbor
self._best_neighbor = None
self._best_neighbor_value = None

def _generate_random_point(self, search_space):
"""This function generates a random discrete point in the search space
"""

def _generate_random_point(
self, search_space: dict[str, optuna.distributions.BaseDistribution]
) -> dict:
"""This function generates a random discrete point in the search space"""
params = {}
for param_name, param_distribution in search_space.items():
if isinstance(param_distribution, optuna.distributions.FloatDistribution):
total_points = int((param_distribution.high - param_distribution.low) / param_distribution.step)
params[param_name] = param_distribution.low + self._rng.randint(0, total_points)*param_distribution.step
total_points = int(
(param_distribution.high - param_distribution.low) / param_distribution.step
)
params[param_name] = (
param_distribution.low
+ self._rng.randint(0, total_points) * param_distribution.step
)
else:
raise NotImplementedError
return params

def _remove_tried_points(self, neighbors, study, current_point):
"""This function removes the points that have already been tried from the list of neighbors
"""

def _remove_tried_points(
self, neighbors: list[dict], study: optuna.study.Study, current_point: dict
) -> list[dict]:
"""This function removes the points that have already been tried from the list of neighbors"""
final_neighbors = []

tried_points = [trial.params for trial in study.get_trials(deepcopy=False)]
points_to_try = self._remaining_points

invalid_points = tried_points + points_to_try + [current_point]

for neighbor in neighbors:
if neighbor not in invalid_points:
final_neighbors.append(neighbor)

return final_neighbors

def _generate_neighbors(self, current_point, search_space, study):
"""This function generates the neighbors of the current point
"""

return final_neighbors

def _generate_neighbors(
self,
current_point: dict,
search_space: dict[str, optuna.distributions.BaseDistribution],
study: optuna.study.Study,
) -> list[dict]:
"""This function generates the neighbors of the current point"""
neighbors = []
for param_name, param_distribution in search_space.items():
if isinstance(param_distribution, optuna.distributions.FloatDistribution):
current_value = current_point[param_name]
step = param_distribution.step

neighbor_low = max(param_distribution.low, current_value - step)
neighbor_high = min(param_distribution.high, current_value + step)

neighbor_low_point = current_point.copy()
neighbor_low_point[param_name] = neighbor_low
neighbor_high_point = current_point.copy()
neighbor_high_point[param_name] = neighbor_high

neighbors.append(neighbor_low_point)
neighbors.append(neighbor_high_point)
else:
raise NotImplementedError
valid_neighbors = self._remove_tried_points(neighbors, study, current_point)

valid_neighbors = self._remove_tried_points(neighbors, study, current_point)

return valid_neighbors

def sample_relative(self, study:optuna.study.Study, trial:optuna.trial.FrozenTrial, search_space: dict[str, optuna.distributions.BaseDistribution]) -> dict[str, Any]:

def sample_relative(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
search_space: dict[str, optuna.distributions.BaseDistribution],
) -> dict[str, Any] | None:
if search_space == {}:
return {}

if self._current_state == "Not Initialized":
#Create the current point
starting_point = self._generate_random_point(search_space)
# Create the current point
starting_point = self._generate_random_point(search_space)
self._current_point = starting_point
#Add the neighbors

# Add the neighbors
neighbors = self._generate_neighbors(starting_point, search_space, study)
self._remaining_points.extend(neighbors)
#Change the state to initialized

# Change the state to initialized
self._current_state = "Initialized"
#Return the current point

# Return the current point
return starting_point

elif self._current_state == "Initialized":
#This section is only for storing the value of the current point and best neighbor point
# This section is only for storing the value of the current point and best neighbor point
previous_trial = study.get_trials(deepcopy=False)[-2]
if previous_trial.params == self._current_point:
#Just now the current point was evaluated
#Store the value of the current point
# Just now the current point was evaluated
# Store the value of the current point
self._current_point_value = previous_trial.value
else:
#The neighbor was evaluated
#Store the value of the neighbor, if it improves upon the current point
# The neighbor was evaluated
# Store the value of the neighbor, if it improves upon the current point
neighbor_value = previous_trial.value

if neighbor_value < self._current_point_value:
self._best_neighbor = previous_trial.params
self._best_neighbor_value = neighbor_value
#This section is for the next point to be evaluated

# This section is for the next point to be evaluated
if len(self._remaining_points) == 0:
#This means that all the neighbors have been processed
#Now you have to select the best neighbor
# This means that all the neighbors have been processed
# Now you have to select the best neighbor

if self._best_neighbor is not None:
#There was an improvement
#Select the best neighbor, make that the current point and add its neighbors
# There was an improvement
# Select the best neighbor, make that the current point and add its neighbors
self._current_point = self._best_neighbor
self._current_point_value = self._best_neighbor_value

self._best_neighbor = None
self._best_neighbor_value = None
self._remaining_points = [] #Happens by virtue of the condition, but just for clarity
#Add the neighbors
self._remaining_points = [] # Happens by virtue of the condition, but just for clarity

# Add the neighbors
neighbors = self._generate_neighbors(self._current_point, search_space, study)
self._remaining_points.extend(neighbors)

self._current_state = "Initialized"

return self._current_point

else:
#If none of the neighbors are better then do a random restart
# If none of the neighbors are better then do a random restart
self._current_state = "Not Initialized"
restarting_point = self._generate_random_point(search_space)
self._current_point = restarting_point

self._best_neighbor = None
self._best_neighbor_value = None
#Add the neighbors

# Add the neighbors
neighbors = self._generate_neighbors(restarting_point, search_space, study)
self._remaining_points.extend(neighbors)
#Change the state to initialized

# Change the state to initialized
self._current_state = "Initialized"
#Return the current point

# Return the current point
return self._current_point

else:
#Process as normal
# Process as normal
current_point = self._remaining_points.pop()
return current_point
return current_point

return {}

0 comments on commit 3ec949f

Please sign in to comment.