diff --git a/.gitignore b/.gitignore index a9df30b5..bff01e19 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,6 @@ dask-worker-space/ *.egg-info/ .coverage target/ -.venv/ \ No newline at end of file +.venv/ +build/* +*.egg \ No newline at end of file diff --git a/README.md b/README.md index 39207966..6f30b08d 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,21 @@ This is to ensure that you get the version that is compatible with your system. conda install --yes -c conda-forge 'lightgbm>=3.3.3' ``` +### Installing Extra Features with pip + +If you want to utilize the additional features provided by TPOT2 along with `scikit-learn` extensions, you can install them using `pip`. The command to install TPOT2 with these extra features is as follows: + +``` +pip install tpot2[sklearnex] +``` + +Please note that while these extensions can speed up scikit-learn packages, there are some important considerations: + +These extensions may not be fully developed and tested on Arm-based CPUs, such as M1 Macs. You might encounter compatibility issues or reduced performance on such systems. + +We recommend using Python 3.9 when installing these extra features, as it provides better compatibility and stability. + + ### Developer/Latest Branch Installation diff --git a/Tutorial/3_Genetic_Feature_Set_Selectors.ipynb b/Tutorial/3_Genetic_Feature_Set_Selectors.ipynb index becaf73f..59ae0a79 100644 --- a/Tutorial/3_Genetic_Feature_Set_Selectors.ipynb +++ b/Tutorial/3_Genetic_Feature_Set_Selectors.ipynb @@ -7,12 +7,12 @@ "source": [ "The FeatureSetSelector is a subclass of sklearn.feature_selection.SelectorMixin that simply returns the manually specified columns. The parameter sel_subset specifies the name or index of the column that it selects. The transform function then simply indexes and returns the selected columns. You can also optionally name the group with the name parameter, though this is only for note keeping and does is not used by the class.\n", "\n", - "```\n", + "\n", "sel_subset: list or int\n", " If X is a dataframe, items in sel_subset list must correspond to column names\n", " If X is a numpy array, items in sel_subset list must correspond to column indexes\n", " int: index of a single column\n", - "```\n", + "\n", "\n" ] }, @@ -75,10 +75,10 @@ "source": [ "To use the FSS with TPOT2, you can simply pass it in to the configuration dictionary. Note that the FSS is only well defined when used in the leaf nodes of the graph. This is because downstream nodes will receive different transformations of the data such that the original indexes no longer correspond to the same columns in the raw data.\n", "\n", - "TPOT2 includsing the string `\"feature_set_selector\"` in the `leaf_config_dict` parameter will include the FSS in the search space of the pipeline. By default, each FSS node will select a single column. You can also group columns into sets so that each node selects a set of features rather than a single feature.\n", + "TPOT2 includsing the string \"feature_set_selector\" in the leaf_config_dict parameter will include the FSS in the search space of the pipeline. By default, each FSS node will select a single column. You can also group columns into sets so that each node selects a set of features rather than a single feature.\n", + "\n", "\n", "\n", - "``` \n", "subsets : str or list, default=None\n", " Sets the subsets that the FeatureSetSeletor will select from if set as an option in one of the configuration dictionaries.\n", " - str : If a string, it is assumed to be a path to a csv file with the subsets. \n", @@ -86,40 +86,41 @@ " - list or np.ndarray : If a list or np.ndarray, it is assumed to be a list of subsets.\n", " - None : If None, each column will be treated as a subset. One column will be selected per subset.\n", " If subsets is None, each column will be treated as a subset. One column will be selected per subset.\n", - "```\n", + "\n", "\n", "Lets say you want to have three groups of features, each with three columns each. The following examples are equivalent:\n", "\n", "### str\n", "\n", - "`sel_subsets=simple_fss.csv`\n", + "sel_subsets=simple_fss.csv\n", "\n", "\n", "\\# simple_fss.csv\n", - "```\n", + "\n", "group_one, 1,2,3\n", + "\n", "group_two, 4,5,6\n", + "\n", "group_three, 7,8,9\n", - "```\n", + "\n", "\n", "### dict\n", "\n", - "```\n", + "\n", "sel_subsets = { \"group_one\" : [1,2,3],\n", " \"group_two\" : [4,5,6],\n", " \"group_three\" : [7,8,9],\n", " }\n", - "```\n", + "\n", "\n", "### list\n", "\n", - "```\n", + "\n", "sel_subsets = [[1,2,3],[4,5,6],[7,8,9]]\n", - " \n", - "```\n", "\n", "\n", - "(As the FSS is just another transformer, you could also pass it in with the standard configuration dictionary format (described in tutorial 2), in which you would have to define your own function that returns a hyperparameter. Similar to the `params_LogisticRegression` function below. )\n", + "\n", + "(As the FSS is just another transformer, you could also pass it in with the standard configuration dictionary format (described in tutorial 2), in which you would have to define your own function that returns a hyperparameter. Similar to the params_LogisticRegression function below. )\n", "\n", "\n", "(In the future, FSS will be treated as a special case node with its own mutation/crossover functions to make it more efficient when there are large numbers of features.)" @@ -1132,7 +1133,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.10.11" }, "orig_nbformat": 4, "vscode": { diff --git a/setup.py b/setup.py index 6e7e1a9c..19f0f322 100644 --- a/setup.py +++ b/setup.py @@ -52,6 +52,7 @@ def calculate_version(): extras_require={ 'skrebate': ['skrebate>=0.3.4'], 'mdr': ['scikit-mdr>=0.4.4'], + 'sklearnex' : ['scikit-learn-intelex>=2023.2.1'] }, classifiers=[ 'Intended Audience :: Science/Research', diff --git a/tpot2/config/__init__.py b/tpot2/config/__init__.py index c5c18117..e019b78e 100644 --- a/tpot2/config/__init__.py +++ b/tpot2/config/__init__.py @@ -7,6 +7,12 @@ from .autoqtl_builtins import make_FeatureEncodingFrequencySelector_config_dictionary, make_genetic_encoders_config_dictionary from .hyperparametersuggestor import * +try: + from .classifiers_sklearnex import make_sklearnex_classifier_config_dictionary + from .regressors_sklearnex import make_sklearnex_regressor_config_dictionary +except ModuleNotFoundError: #if optional packages are not installed + pass + try: from .mdr_configs import make_skrebate_config_dictionary, make_MDR_config_dictionary, make_ContinuousMDR_config_dictionary except: #if optional packages are not installed diff --git a/tpot2/config/classifiers_sklearnex.py b/tpot2/config/classifiers_sklearnex.py new file mode 100644 index 00000000..7d4129d0 --- /dev/null +++ b/tpot2/config/classifiers_sklearnex.py @@ -0,0 +1,73 @@ +from sklearnex.ensemble import RandomForestClassifier +from sklearnex.neighbors import KNeighborsClassifier +from sklearnex.svm import SVC +from sklearnex.svm import NuSVC +from sklearnex.linear_model import LogisticRegression + + +def params_RandomForestClassifier(trial, name=None): + return { + 'n_estimators': 100, + 'bootstrap': trial.suggest_categorical(name=f'bootstrap_{name}', choices=[True, False]), + 'min_samples_split': trial.suggest_int(f'min_samples_split_{name}', 2, 20), + 'min_samples_leaf': trial.suggest_int(f'min_samples_leaf_{name}', 1, 20), + 'n_jobs': 1, + } + +def params_KNeighborsClassifier(trial, name=None, n_samples=10): + n_neighbors_max = max(n_samples, 100) + return { + 'n_neighbors': trial.suggest_int(f'n_neighbors_{name}', 1, n_neighbors_max, log=True ), + 'weights': trial.suggest_categorical(f'weights_{name}', ['uniform', 'distance']), + } + +def params_LogisticRegression(trial, name=None): + params = {} + params['dual'] = False + params['penalty'] = 'l2' + params['solver'] = trial.suggest_categorical(name=f'solver_{name}', choices=['liblinear', 'sag', 'saga']), + if params['solver'] == 'liblinear': + params['penalty'] = trial.suggest_categorical(name=f'penalty_{name}', choices=['l1', 'l2']) + if params['penalty'] == 'l2': + params['dual'] = trial.suggest_categorical(name=f'dual_{name}', choices=[True, False]) + else: + params['penalty'] = 'l1' + return { + 'solver': params['solver'], + 'penalty': params['penalty'], + 'dual': params['dual'], + 'C': trial.suggest_float(f'C_{name}', 1e-4, 1e4, log=True), + 'max_iter': 1000, + } + +def params_SVC(trial, name=None): + return { + 'kernel': trial.suggest_categorical(name=f'kernel_{name}', choices=['poly', 'rbf', 'linear', 'sigmoid']), + 'C': trial.suggest_float(f'C_{name}', 1e-4, 25, log=True), + 'degree': trial.suggest_int(f'degree_{name}', 1, 4), + 'class_weight': trial.suggest_categorical(name=f'class_weight_{name}', choices=[None, 'balanced']), + 'max_iter': 3000, + 'tol': 0.005, + 'probability': True, + } + +def params_NuSVC(trial, name=None): + return { + 'nu': trial.suggest_float(f'subsample_{name}', 0.05, 1.0), + 'kernel': trial.suggest_categorical(name=f'kernel_{name}', choices=['poly', 'rbf', 'linear', 'sigmoid']), + 'C': trial.suggest_float(f'C_{name}', 1e-4, 25, log=True), + 'degree': trial.suggest_int(f'degree_{name}', 1, 4), + 'class_weight': trial.suggest_categorical(name=f'class_weight_{name}', choices=[None, 'balanced']), + 'max_iter': 3000, + 'tol': 0.005, + 'probability': True, + } + +def make_sklearnex_classifier_config_dictionary(n_samples=10, n_classes=None): + return { + RandomForestClassifier: params_RandomForestClassifier, + KNeighborsClassifier: params_KNeighborsClassifier, + LogisticRegression: params_LogisticRegression, + SVC: params_SVC, + NuSVC: params_NuSVC, + } diff --git a/tpot2/config/regressors_sklearnex.py b/tpot2/config/regressors_sklearnex.py new file mode 100644 index 00000000..4eb10f1c --- /dev/null +++ b/tpot2/config/regressors_sklearnex.py @@ -0,0 +1,84 @@ +from sklearnex.linear_model import LinearRegression +from sklearnex.linear_model import Ridge +from sklearnex.linear_model import Lasso +from sklearnex.linear_model import ElasticNet + +from sklearnex.svm import SVR +from sklearnex.svm import NuSVR + +from sklearnex.ensemble import RandomForestRegressor +from sklearnex.neighbors import KNeighborsRegressor + + +def params_RandomForestRegressor(trial, name=None): + return { + 'n_estimators': 100, + 'max_features': trial.suggest_float(f'max_features_{name}', 0.05, 1.0), + 'bootstrap': trial.suggest_categorical(name=f'bootstrap_{name}', choices=[True, False]), + 'min_samples_split': trial.suggest_int(f'min_samples_split_{name}', 2, 21), + 'min_samples_leaf': trial.suggest_int(f'min_samples_leaf_{name}', 1, 21), + } + +def params_KNeighborsRegressor(trial, name=None, n_samples=100): + n_neighbors_max = max(n_samples, 100) + return { + 'n_neighbors': trial.suggest_int(f'n_neighbors_{name}', 1, n_neighbors_max), + 'weights': trial.suggest_categorical(f'weights_{name}', ['uniform', 'distance']), + } + +def params_LinearRegression(trial, name=None): + return {} + +def params_Ridge(trial, name=None): + return { + 'alpha': trial.suggest_float(f'alpha_{name}', 0.0, 1.0), + 'fit_intercept': True, + 'tol': trial.suggest_float(f'tol_{name}', 1e-5, 1e-1, log=True), + } + +def params_Lasso(trial, name=None): + return { + 'alpha': trial.suggest_float(f'alpha_{name}', 0.0, 1.0), + 'fit_intercept': True, + 'precompute': trial.suggest_categorical(f'precompute_{name}', [True, False, 'auto']), + 'tol': trial.suggest_float(f'tol_{name}', 1e-5, 1e-1, log=True), + 'positive': trial.suggest_categorical(f'positive_{name}', [True, False]), + 'selection': trial.suggest_categorical(f'selection_{name}', ['cyclic', 'random']), + } + +def params_ElasticNet(trial, name=None): + return { + 'alpha': 1 - trial.suggest_float(f'alpha_{name}', 0.0, 1.0), + 'l1_ratio': 1- trial.suggest_float(f'l1_ratio_{name}',0.0, 1.0), + } + +def params_SVR(trial, name=None): + return { + 'kernel': trial.suggest_categorical(name=f'kernel_{name}', choices=['poly', 'rbf', 'linear', 'sigmoid']), + 'C': trial.suggest_float(f'C_{name}', 1e-4, 25, log=True), + 'degree': trial.suggest_int(f'degree_{name}', 1, 4), + 'max_iter': 3000, + 'tol': 0.005, + } + +def params_NuSVR(trial, name=None): + return { + 'nu': trial.suggest_float(f'subsample_{name}', 0.05, 1.0), + 'kernel': trial.suggest_categorical(name=f'kernel_{name}', choices=['poly', 'rbf', 'linear', 'sigmoid']), + 'C': trial.suggest_float(f'C_{name}', 1e-4, 25, log=True), + 'degree': trial.suggest_int(f'degree_{name}', 1, 4), + 'max_iter': 3000, + 'tol': 0.005, + } + +def make_sklearnex_regressor_config_dictionary(n_samples=10): + return { + RandomForestRegressor: params_RandomForestRegressor, + KNeighborsRegressor: params_KNeighborsRegressor, + LinearRegression: params_LinearRegression, + Ridge: params_Ridge, + Lasso: params_Lasso, + ElasticNet: params_ElasticNet, + SVR: params_SVR, + NuSVR: params_NuSVR, + } diff --git a/tpot2/evolvers/base_evolver.py b/tpot2/evolvers/base_evolver.py index 15e0377a..93655d66 100644 --- a/tpot2/evolvers/base_evolver.py +++ b/tpot2/evolvers/base_evolver.py @@ -20,6 +20,11 @@ import math from tpot2.utils.utils import get_thresholds, beta_interpolation, remove_items, equalize_list +def ind_mutate(ind): + return ind.mutate() + +def ind_crossover(ind1, ind2): + return ind1.crossover(ind2) class BaseEvolver(): def __init__( self, @@ -52,6 +57,13 @@ def __init__( self, mutate_probability=.7, mutate_then_crossover_probability=.05, crossover_then_mutate_probability=.05, + + mutation_functions = [ind_mutate], + crossover_functions = [ind_crossover], + + mutation_function_weights = None, + crossover_function_weights = None, + n_parents=2, survival_selector = survival_select_NSGA2, @@ -261,6 +273,20 @@ def __init__( self, self.mutate_then_crossover_probability= mutate_then_crossover_probability / total_var_p self.crossover_then_mutate_probability= crossover_then_mutate_probability / total_var_p + + self.mutation_functions = mutation_functions + self.crossover_functions = crossover_functions + + if mutation_function_weights is None: + self.mutation_function_weights = [1 for _ in range(len(mutation_functions))] + else: + self.mutation_function_weights = mutation_function_weights + + if mutation_function_weights is None: + self.crossover_function_weights = [1 for _ in range(len(mutation_functions))] + else: + self.crossover_function_weights = crossover_function_weights + self.n_parents = n_parents if objective_kwargs is None: @@ -468,7 +494,6 @@ def step(self,): else: self.cur_population_size = self.population_size - if self.budget_list is not None: if len(self.budget_list) <= self.generation: self.budget = self.budget_range[-1] @@ -477,61 +502,39 @@ def step(self,): else: self.budget = None - - - self.one_generation_step() - self.generation += 1 - - - - def one_generation_step(self, ): #your EA Algorithm goes here - if self.survival_selector is not None: n_survivors = max(1,int(self.cur_population_size*self.survival_percentage)) #always keep at least one individual - #Get survivors from current population - weighted_scores = self.population.get_column(self.population.population, column_names=self.objective_names) * self.objective_function_weights - new_population_index = np.ravel(self.survival_selector(weighted_scores, k=n_survivors)) #TODO make it clear that we are concatenating scores... - self.population.set_population(np.array(self.population.population)[new_population_index]) - weighted_scores = self.population.get_column(self.population.population, column_names=self.objective_names) * self.objective_function_weights + self.population.survival_select( selector=self.survival_selector, + weights=self.objective_function_weights, + columns_names=self.objective_names, + n_survivors=n_survivors, + inplace=True) + + self.generate_offspring() + self.evaluate_population() + + self.generation += 1 - #number of crossover pairs and mutation only parent to generate - n_crossover = int(self.cur_population_size*self.crossover_probability) - n_crossover_then_mutate = int(self.cur_population_size*self.crossover_then_mutate_probability) - n_mutate_then_crossover = int(self.cur_population_size*self.mutate_then_crossover_probability) - n_total_crossover_pairs = n_crossover + n_crossover_then_mutate + n_mutate_then_crossover - n_mutate_parents = self.cur_population_size - n_total_crossover_pairs - - #get crossover pairs - if n_total_crossover_pairs > 0: - cx_parents_index = self.parent_selector(weighted_scores, k=n_total_crossover_pairs, n_parents=self.n_parents, ) #TODO make it clear that we are concatenating scores... - cx_var_ops = np.concatenate([ np.repeat("crossover",n_crossover), - np.repeat("mutate_then_crossover",n_mutate_then_crossover), - np.repeat("crossover_then_mutate",n_crossover_then_mutate), - ]) - else: - cx_parents_index = [] - cx_var_ops = [] + def generate_offspring(self, ): #your EA Algorithm goes here + n_mutations = np.random.binomial(self.cur_population_size, self.mutate_probability) + n_crossover = self.cur_population_size - n_mutations - #get mutation only parents - if n_mutate_parents > 0: - m_parents_index = self.parent_selector(weighted_scores, k=n_mutate_parents, n_parents=1, ) #TODO make it clear that we are concatenating scores... - m_var_ops = np.repeat("mutate",len(m_parents_index)) - else: - m_parents_index = [] - m_var_ops = [] + cx_parents = self.population.parent_select(selector=self.parent_selector, weights=self.objective_function_weights, columns_names=self.objective_names, k=n_crossover, n_parents=2) + m_parents = self.population.parent_select(selector=self.parent_selector, weights=self.objective_function_weights, columns_names=self.objective_names, k=n_mutations, n_parents=1) + + p = np.array([self.crossover_probability, self.mutate_then_crossover_probability, self.crossover_then_mutate_probability]) + p = p/np.sum(p) + var_op_list = np.random.choice(["crossover", "mutate_then_crossover", "crossover_then_mutate"], size=n_crossover, p=p) + var_op_list = np.concatenate([var_op_list, ["mutate"]*n_mutations]) - cx_parents = np.array(self.population.population)[cx_parents_index] - m_parents = np.array(self.population.population)[m_parents_index] parents = list(cx_parents) + list(m_parents) - var_ops = np.concatenate([cx_var_ops, m_var_ops]) - offspring = self.population.create_offspring(parents, var_ops, n_jobs=1) + offspring = self.population.create_offspring2(parents, var_op_list, self.mutation_functions, self.mutation_function_weights, self.crossover_functions, self.crossover_function_weights, add_to_population=True, keep_repeats=False, mutate_until_unique=True) + self.population.update_column(offspring, column_names="Generation", data=self.generation, ) - #print("done making offspring") - #print("evaluating") - self.evaluate_population() - #print("done evaluating") + + @@ -609,14 +612,17 @@ def evaluate_population_full(self, budget=None): parallel_timeout = min(theoretical_timeout, scheduled_timeout_time_left) if parallel_timeout < 0: parallel_timeout = 10 - scores = tpot2.utils.eval_utils.parallel_eval_objective_list(individuals_to_evaluate, self.objective_functions, self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, parallel_timeout=parallel_timeout, **self.objective_kwargs) + + #scores = tpot2.utils.eval_utils.parallel_eval_objective_list(individuals_to_evaluate, self.objective_functions, self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, parallel_timeout=parallel_timeout, **self.objective_kwargs) + scores, start_times, end_times = tpot2.utils.eval_utils.parallel_eval_objective_list2(individuals_to_evaluate, self.objective_functions, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, **self.objective_kwargs) self.population.update_column(individuals_to_evaluate, column_names=self.objective_names, data=scores) if budget is not None: self.population.update_column(individuals_to_evaluate, column_names="Budget", data=budget) - self.population.update_column(individuals_to_evaluate, column_names="Completed Timestamp", data=time.time()) + self.population.update_column(individuals_to_evaluate, column_names="Submitted Timestamp", data=start_times) + self.population.update_column(individuals_to_evaluate, column_names="Completed Timestamp", data=end_times) self.population.remove_invalid_from_population(column_names=self.objective_names) self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="TIMEOUT") @@ -680,21 +686,22 @@ def evaluate_population_selection_early_stop(self,survival_counts, thresholds=No if parallel_timeout < 0: parallel_timeout = 10 - scores = tpot2.utils.eval_utils.parallel_eval_objective_list(individual_list=unevaluated_individuals_this_step, + scores, start_times, end_times = tpot2.utils.eval_utils.parallel_eval_objective_list2(individual_list=unevaluated_individuals_this_step, objective_list=self.objective_functions, - n_jobs = self.n_jobs, verbose=self.verbose, - timeout=self.max_eval_time_seconds, + max_eval_time_seconds=self.max_eval_time_seconds, step=step, budget = self.budget, generation = self.generation, n_expected_columns=len(self.objective_names), client=self._client, - parallel_timeout=parallel_timeout, **self.objective_kwargs, ) self.population.update_column(unevaluated_individuals_this_step, column_names=this_step_names, data=scores) + self.population.update_column(unevaluated_individuals_this_step, column_names="Submitted Timestamp", data=start_times) + self.population.update_column(unevaluated_individuals_this_step, column_names="Completed Timestamp", data=end_times) + self.population.remove_invalid_from_population(column_names=this_step_names) self.population.remove_invalid_from_population(column_names=this_step_names, invalid_value="TIMEOUT") @@ -768,3 +775,5 @@ def evaluate_population_selection_early_stop(self,survival_counts, thresholds=No new_population_index = survival_selector(weighted_scores, k=k) cur_individuals = np.array(cur_individuals)[new_population_index] + + diff --git a/tpot2/evolvers/steady_state_evolver.py b/tpot2/evolvers/steady_state_evolver.py index eacbd9dd..23abdfe9 100644 --- a/tpot2/evolvers/steady_state_evolver.py +++ b/tpot2/evolvers/steady_state_evolver.py @@ -279,7 +279,7 @@ def optimize(self): else: #if future is not done #check if the future has been running for too long, cancel the future - if time.time() - submitted_futures[completed_future]["time"] > self.max_eval_time_seconds*2: + if time.time() - submitted_futures[completed_future]["time"] > self.max_eval_time_seconds*1.25: completed_future.cancel() if self.verbose >= 4: diff --git a/tpot2/population.py b/tpot2/population.py index 31f9233b..f32ad5c4 100644 --- a/tpot2/population.py +++ b/tpot2/population.py @@ -86,7 +86,21 @@ def __init__( self, self.callback=callback self.population = [] - + def survival_select(self, selector, weights, columns_names, n_survivors, inplace=True): + weighted_scores = self.get_column(self.population, column_names=columns_names) * weights + new_population_index = np.ravel(selector(weighted_scores, k=n_survivors)) #TODO make it clear that we are concatenating scores... + new_population = np.array(self.population)[new_population_index] + if inplace: + self.set_population(new_population) + return new_population + + def parent_select(self, selector, weights, columns_names, k, n_parents): + + weighted_scores = self.get_column(self.population, column_names=columns_names) * weights + parents_index = selector(weighted_scores, k=k, n_parents=n_parents) + parents = np.array(self.population)[parents_index] + return parents + #remove individuals that either do not have a column_name value or a nan in that value #TODO take into account when the value is not a list/tuple? @@ -294,7 +308,78 @@ def create_offspring(self, parents_list, var_op_list, add_to_population=True, ke return new_offspring - + + #TODO should we just generate one offspring per crossover? + def create_offspring2(self, parents_list, var_op_list, mutation_functions,mutation_function_weights, crossover_functions,crossover_function_weights, add_to_population=True, keep_repeats=False, mutate_until_unique=True): + + new_offspring = [] + + all_offspring = [] + chosen_ops = [] + + for parents, var_op in zip(parents_list,var_op_list): + #TODO put this loop in population class + if var_op == "mutation": + mutation_op = np.random.choice(mutation_functions, p=mutation_function_weights) + all_offspring.append(copy_and_mutate(parents, mutation_op)) + chosen_ops.append(mutation_op.__name__) + + + elif var_op == "crossover": + crossover_op = np.random.choice(crossover_functions, p=crossover_function_weights) + all_offspring.append(copy_and_crossover(parents, crossover_op)) + chosen_ops.append(crossover_op.__name__) + elif var_op == "mutate_then_crossover": + + mutation_op1 = np.random.choice(mutation_functions, p=mutation_function_weights) + mutation_op2 = np.random.choice(mutation_functions, p=mutation_function_weights) + crossover_op = np.random.choice(crossover_functions, p=crossover_function_weights) + p1 = copy_and_mutate(parents[0], mutation_op1) + p2 = copy_and_mutate(parents[1], mutation_op2) + crossover_op(p1,p2) + all_offspring.append(p1) + chosen_ops.append(f"{mutation_op1.__name__} , {mutation_op2.__name__} , {crossover_op.__name__}") + elif var_op == "crossover_then_mutate": + crossover_op = np.random.choice(crossover_functions, p=crossover_function_weights) + child = copy_and_crossover(parents, crossover_op) + mutation_op = np.random.choice(mutation_functions, p=mutation_function_weights) + mutation_op(child) + all_offspring.append(child) + chosen_ops.append(f"{crossover_op.__name__} , {mutation_op.__name__}") + + + for parents, offspring, var_op in zip(parents_list, all_offspring, chosen_ops): + + # if var_op in built_in_var_ops_dict: + # var_op = built_in_var_ops_dict[var_op] + + # offspring = copy.deepcopy(parents) + # offspring = var_op(offspring) + # if isinstance(offspring, collections.abc.Iterable): + # offspring = offspring[0] + + if add_to_population: + added = self.add_to_population(offspring, keep_repeats=keep_repeats, mutate_until_unique=mutate_until_unique) + if len(added) > 0: + for new_child in added: + parent_keys = [parent.unique_id() for parent in parents] + if not pd.api.types.is_object_dtype(self.evaluated_individuals["Parents"]): #TODO Is there a cleaner way of doing this? Not required for some python environments? + self.evaluated_individuals["Parents"] = self.evaluated_individuals["Parents"].astype('object') + self.evaluated_individuals.at[new_child.unique_id(),"Parents"] = tuple(parent_keys) + + self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = var_op + + + new_offspring.append(new_child) + + else: + new_offspring.append(offspring) + + + return new_offspring + + + def get_id(individual): return individual.unique_id() @@ -325,6 +410,8 @@ def nonparallel_create_offpring(parents_list, var_op_list, n_jobs=1): return offspring + + def copy_and_change(parents, var_op): offspring = copy.deepcopy(parents) offspring = var_op(offspring) @@ -332,6 +419,19 @@ def copy_and_change(parents, var_op): offspring = offspring[0] return offspring +def copy_and_mutate(parents, var_op): + offspring = copy.deepcopy(parents) + var_op(offspring) + if isinstance(offspring, collections.abc.Iterable): + offspring = offspring[0] + return offspring + +def copy_and_crossover(parents, var_op): + offspring = copy.deepcopy(parents) + var_op(offspring[0],offspring[1]) + return offspring[0] + def parallel_get_id(n_jobs, individual_list): id_list = Parallel(n_jobs=n_jobs)(delayed(get_id)(ind) for ind in individual_list) return id_list + diff --git a/tpot2/tpot_estimator/estimator.py b/tpot2/tpot_estimator/estimator.py index 1bbe64f0..351aa12b 100644 --- a/tpot2/tpot_estimator/estimator.py +++ b/tpot2/tpot_estimator/estimator.py @@ -128,10 +128,10 @@ def __init__(self, scorers, - (sklearn.model_selection.BaseCrossValidator): A cross-validator to use in the cross-validation process. - max_depth (int): The maximum depth from any node to the root of the pipelines to be generated. - other_objective_functions : list, default=[tpot2.objectives.estimator_objective_functions.average_path_length_objective] - A list of other objective functions to apply to the pipeline. + other_objective_functions : list, default=[] + A list of other objective functions to apply to the pipeline. The function takes a single parameter for the graphpipeline estimator and returns either a single score or a list of scores. - other_objective_functions_weights : list, default=[-1] + other_objective_functions_weights : list, default=[] A list of weights to be applied to the other objective functions. objective_function_names : list, default=None diff --git a/tpot2/tpot_estimator/estimator_utils.py b/tpot2/tpot_estimator/estimator_utils.py index fe7a61a7..08d25f1b 100644 --- a/tpot2/tpot_estimator/estimator_utils.py +++ b/tpot2/tpot_estimator/estimator_utils.py @@ -41,9 +41,15 @@ def get_configuration_dictionary(options, n_samples, n_features, classification, elif option == "classifiers": config_dict.update(tpot2.config.make_classifier_config_dictionary(n_samples=n_samples, n_classes=n_classes)) + elif option == "classifiers_sklearnex": + config_dict.update(tpot2.config.make_sklearnex_classifier_config_dictionary(n_samples=n_samples, n_classes=n_classes)) + elif option == "regressors": config_dict.update(tpot2.config.make_regressor_config_dictionary(n_samples=n_samples)) + elif option == "regressors_sklearnex": + config_dict.update(tpot2.config.make_sklearnex_regressor_config_dictionary(n_samples=n_samples)) + elif option == "transformers": config_dict.update(tpot2.config.make_transformer_config_dictionary(n_features=n_features)) diff --git a/tpot2/tpot_estimator/steady_state_estimator.py b/tpot2/tpot_estimator/steady_state_estimator.py index 3b75cb68..72f7f595 100644 --- a/tpot2/tpot_estimator/steady_state_estimator.py +++ b/tpot2/tpot_estimator/steady_state_estimator.py @@ -59,7 +59,7 @@ def __init__(self, scorers= [], early_stop_seconds = None, scorers_early_stop_tol = 0.001, other_objectives_early_stop_tol = None, - max_time_seconds=float('inf'), + max_time_seconds=None, max_eval_time_seconds=60*10, n_jobs=1, memory_limit = "4GB", @@ -113,10 +113,10 @@ def __init__(self, scorers= [], - (int): Number of folds to use in the cross-validation process. By uses the sklearn.model_selection.KFold cross-validator for regression and StratifiedKFold for classification. In both cases, shuffled is set to True. - (sklearn.model_selection.BaseCrossValidator): A cross-validator to use in the cross-validation process. - other_objective_functions : list, default=[tpot2.objectives.estimator_objective_functions.average_path_length_objective] - A list of other objective functions to apply to the pipeline. + other_objective_functions : list, default=[] + A list of other objective functions to apply to the pipeline. The function takes a single parameter for the graphpipeline estimator and returns either a single score or a list of scores. - other_objective_functions_weights : list, default=[-1] + other_objective_functions_weights : list, default=[] A list of weights to be applied to the other objective functions. objective_function_names : list, default=None diff --git a/tpot2/utils/eval_utils.py b/tpot2/utils/eval_utils.py index 4bfc06d5..d3fe68bc 100644 --- a/tpot2/utils/eval_utils.py +++ b/tpot2/utils/eval_utils.py @@ -13,7 +13,7 @@ from dask.diagnostics import ProgressBar from tqdm.dask import TqdmCallback from dask.distributed import progress - +import distributed import func_timeout def process_scores(scores, n): @@ -139,6 +139,108 @@ def parallel_eval_objective_list(individual_list, return offspring_scores +def parallel_eval_objective_list2(individual_list, + objective_list, + verbose=0, + max_eval_time_seconds=None, + n_expected_columns=None, + client=None, + **objective_kwargs): + + individual_stack = list(individual_list) + max_queue_size = len(client.cluster.workers) + submitted_futures = {} + scores_dict = {} + submitted_inds = set() + + while len(submitted_futures) < max_queue_size and len(individual_stack)>0: + individual = individual_stack.pop() + future = client.submit(eval_objective_list, individual, objective_list, verbose=verbose, timeout=max_eval_time_seconds,**objective_kwargs) + + submitted_futures[future] = {"individual": individual, + "time": time.time(),} + + submitted_inds.add(individual.unique_id()) + + + + while len(individual_stack)>0 or len(submitted_futures)>0: + #wait for at least one future to finish or timeout + try: + next(distributed.as_completed(submitted_futures, timeout=max_eval_time_seconds)) + except dask.distributed.TimeoutError: + pass + except dask.distributed.CancelledError: + pass + + #Loop through all futures, collect completed and timeout futures. + for completed_future in list(submitted_futures.keys()): + #get scores and update + if completed_future.done(): #if future is done + #If the future is done but threw and error, record the error + if completed_future.exception() or completed_future.status == "error": #if the future is done and threw an error + print("Exception in future") + print(completed_future.exception()) + scores = ["INVALID"] + elif completed_future.cancelled(): #if the future is done and was cancelled + print("Cancelled future (likely memory related)") + scores = ["INVALID"] + else: #if the future is done and did not throw an error, get the scores + try: + scores = completed_future.result() + except Exception as e: + print("Exception in future, but not caught by dask") + print(e) + print(completed_future.exception()) + print(completed_future) + print("status", completed_future.status) + print("done", completed_future.done()) + print("cancelld ", completed_future.cancelled()) + scores = ["INVALID"] + else: #if future is not done + + #check if the future has been running for too long, cancel the future + if time.time() - submitted_futures[completed_future]["time"] > max_eval_time_seconds*1.25: + completed_future.cancel() + + if verbose >= 4: + print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n') + + scores = ["TIMEOUT"] + else: + continue #otherwise, continue to next future + + #log scores + cur_individual = submitted_futures[completed_future]["individual"] + scores_dict[cur_individual] = {"scores": scores, + "start_time": submitted_futures[completed_future]["time"], + "end_time": time.time(), + } + + + #update submitted futures + submitted_futures.pop(completed_future) + + #submit new futures + while len(submitted_futures) < max_queue_size and len(individual_stack)>0: + individual = individual_stack.pop() + future = client.submit(eval_objective_list, individual, objective_list, verbose=verbose, timeout=max_eval_time_seconds,**objective_kwargs) + + submitted_futures[future] = {"individual": individual, + "time": time.time(),} + + submitted_inds.add(individual.unique_id()) + + + final_scores = [scores_dict[individual]["scores"] for individual in individual_list] + final_start_times = [scores_dict[individual]["start_time"] for individual in individual_list] + final_end_times = [scores_dict[individual]["end_time"] for individual in individual_list] + + final_scores = process_scores(final_scores, n_expected_columns) + + return final_scores, final_start_times, final_end_times + + ################### # Parallel optimization #############