Skip to content

Commit

Permalink
Merge branch 'main' into support_evaluator_env
Browse files Browse the repository at this point in the history
  • Loading branch information
AngelFP committed Oct 9, 2023
2 parents a3b41ab + 24d559e commit 7bd7f49
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 40 deletions.
143 changes: 109 additions & 34 deletions optimas/explorations/base.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
"""Contains the definition of the base Exploration class."""

import os
import glob
from typing import Optional, Union

import numpy as np

from libensemble.libE import libE
from libensemble.tools import save_libE_output, add_unique_random_streams
from libensemble.tools import add_unique_random_streams
from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens
from libensemble.executors.mpi_executor import MPIExecutor
from libensemble.resources.resources import Resources
from libensemble.executors.executor import Executor
from libensemble.logger import LogConfig

from optimas.generators.base import Generator
from optimas.evaluators.base import Evaluator
from optimas.utils.logger import get_logger


logger = get_logger(__name__)


class Exploration():
Expand Down Expand Up @@ -43,6 +45,14 @@ class Exploration():
history file to disk. By default equals to ``sim_workers``.
exploration_dir_path : str, optional.
Path to the exploration directory. By default, ``'./exploration'``.
resume : bool, optional
Whether the exploration should resume from a previous run in the same
`exploration_dir_path`. If `True`, the exploration will continue from
the last evaluation of the previous run until the total number of
evaluations (including those of the previous run) reaches `max_evals`.
There is no need to provide the `history` path (it will be ignored).
If `False` (default value), the exploration will raise an error if
the `exploration_dir_path` already exists.
libe_comms : {'local', 'mpi'}, optional.
The communication mode for libEnseble. Determines whether to use
Python ``multiprocessing`` (local mode) or MPI for the communication
Expand All @@ -63,6 +73,7 @@ def __init__(
history: Optional[str] = None,
history_save_period: Optional[int] = None,
exploration_dir_path: Optional[str] = './exploration',
resume: Optional[bool] = False,
libe_comms: Optional[str] = 'local'
) -> None:
self.generator = generator
Expand All @@ -76,16 +87,41 @@ def __init__(
self.history_save_period = history_save_period
self.exploration_dir_path = exploration_dir_path
self.libe_comms = libe_comms
self._load_history(history)
self._n_evals = 0
self._resume = resume
self._history_file_name = 'exploration_history_after_evaluation_{}'
self._load_history(history, resume)
self._create_alloc_specs()
self._create_executor()
self._initialize_evaluator()
self._set_default_libe_specs()

def run(self) -> None:
"""Run the exploration."""
def run(
self,
n_evals: Optional[int] = None
) -> None:
"""Run the exploration.
Parameters
----------
n_evals : int, optional
Number of evaluations to run. If not given, the exploration will
run until the number of evaluations reaches `max_evals`.
"""
# Set exit criteria to maximum number of evaluations.
exit_criteria = {'sim_max': self.max_evals}
remaining_evals = self.max_evals - self._n_evals
if remaining_evals < 1:
raise ValueError(
'The maximum number or evaluations has been reached.'
)
if n_evals is None:
sim_max = remaining_evals
else:
sim_max = min(n_evals, remaining_evals)
exit_criteria = {'sim_max': sim_max}

# Get initial number of generator trials.
n_trials_initial = self.generator.n_trials

# Create persis_info.
persis_info = add_unique_random_streams({}, self.sim_workers + 2)
Expand All @@ -97,9 +133,13 @@ def run(self) -> None:
else:
self.libE_specs['zero_resource_workers'] = [1]

if self._n_evals > 0:
self.libE_specs['reuse_output_dir'] = True

# Get gen_specs and sim_specs.
run_params = self.evaluator.get_run_params()
gen_specs = self.generator.get_gen_specs(self.sim_workers, run_params)
gen_specs = self.generator.get_gen_specs(self.sim_workers, run_params,
sim_max)
sim_specs = self.evaluator.get_sim_specs(
self.generator.varying_parameters,
self.generator.objectives,
Expand All @@ -123,23 +163,20 @@ def run(self) -> None:
# Update generator with the one received from libE.
self.generator._update(persis_info[1]['generator'])

# Update number of evaluation in this exploration.
n_trials_final = self.generator.n_trials
self._n_evals += n_trials_final - n_trials_initial

# Determine if current rank is master.
if self.libE_specs["comms"] == "local":
is_master = True
nworkers = self.sim_workers + 1
else:
from mpi4py import MPI
is_master = (MPI.COMM_WORLD.Get_rank() == 0)
nworkers = MPI.COMM_WORLD.Get_size() - 1

# Save history.
if is_master:
save_libE_output(
history, persis_info, __file__, nworkers,
dest_path=os.path.abspath(self.exploration_dir_path))

# Reset state of libEnsemble.
self._reset_libensemble()
self._save_history()

def _create_executor(self) -> None:
"""Create libEnsemble executor."""
Expand All @@ -151,9 +188,25 @@ def _initialize_evaluator(self) -> None:

def _load_history(
self,
history: Union[str, np.ndarray, None]
history: Union[str, np.ndarray, None],
resume: Optional[bool] = False,
) -> None:
"""Load history file."""
# To resume an exploration, get history file from previous run.
if resume:
if history is not None:
logger.info(
'The `history` argument is ignored when `resume=True`. '
'The exploration will resume using the most recent '
'history file.'
)
history = self._get_most_recent_history_file_path()
if history is None:
raise ValueError(
'Previous history file not found. '
'Cannot resume exploration.'
)
# Read file.
if isinstance(history, str):
if os.path.exists(history):
# Load array.
Expand All @@ -169,8 +222,43 @@ def _load_history(
# Incorporate history into generator.
if history is not None:
self.generator.incorporate_history(history)
# When resuming an exploration, update evaluations counter.
if resume:
self._n_evals = history.size
self.history = history

def _save_history(self):
"""Save history array to file."""
filename = self._history_file_name.format(self._n_evals)
exploration_dir_path = os.path.abspath(self.exploration_dir_path)
file_path = os.path.join(exploration_dir_path, filename)
if not os.path.isfile(filename):
old_files = os.path.join(
exploration_dir_path, self._history_file_name.format("*"))
for old_file in glob.glob(old_files):
os.remove(old_file)
np.save(file_path, self.history)

def _get_most_recent_history_file_path(self):
"""Get path of most recently saved history file."""
old_exploration_history_files = glob.glob(
os.path.join(
os.path.abspath(self.exploration_dir_path),
self._history_file_name.format("*")
)
)
old_libe_history_files = glob.glob(
os.path.join(
os.path.abspath(self.exploration_dir_path),
'libE_history_{}'.format("*")
)
)
old_files = old_exploration_history_files + old_libe_history_files
if old_files:
file_evals = [int(file.split('_')[-1][:-4]) for file in old_files]
i_max_evals = np.argmax(np.array(file_evals))
return old_files[i_max_evals]

def _set_default_libe_specs(self) -> None:
"""Set default exploration libe_specs."""
libE_specs = {}
Expand Down Expand Up @@ -205,6 +293,9 @@ def _set_default_libe_specs(self) -> None:
libE_specs['use_workflow_dir'] = True
libE_specs['workflow_dir_path'] = self.exploration_dir_path

# Ensure evaluations of last batch are sent back to the generator.
libE_specs["final_gen_send"] = True

# get specs from generator and evaluator
gen_libE_specs = self.generator.get_libe_specs()
ev_libE_specs = self.evaluator.get_libe_specs()
Expand All @@ -219,19 +310,3 @@ def _create_alloc_specs(self) -> None:
'async_return': self.run_async
}
}

def _reset_libensemble(self) -> None:
"""Reset the state of libEnsemble.
After calling `libE`, some libEnsemble attributes do not come back to
their original states. This leads to issues if another `Exploration`
run is launched within the same script. This method resets the
necessary libEnsemble attributes to their original state.
"""
if Resources.resources is not None:
del Resources.resources
Resources.resources = None
if Executor.executor is not None:
del Executor.executor
Executor.executor = None
LogConfig.config.logger_set = False
9 changes: 7 additions & 2 deletions optimas/gen_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ def persistent_generator(H, persis_info, gen_specs, libE_info):

ps = PersistentSupport(libE_info, EVAL_GEN_TAG)

# Maximum number of total evaluations to generate.
max_evals = gen_specs['user']['max_evals']

# Number of points to generate initially.
number_of_gen_points = gen_specs['user']['gen_batch_size']
number_of_gen_points = min(gen_specs['user']['gen_batch_size'], max_evals)

n_gens = 0
n_failed_gens = 0

# Receive information from the manager (or a STOP_TAG)
Expand Down Expand Up @@ -68,6 +72,7 @@ def persistent_generator(H, persis_info, gen_specs, libE_info):
H_o['num_procs'][i] = run_params["num_procs"]
H_o['num_gpus'][i] = run_params["num_gpus"]

n_gens += np.sum(H_o['num_procs'] != 0)
n_failed_gens = np.sum(H_o['num_procs'] == 0)
H_o = H_o[H_o['num_procs'] > 0]

Expand All @@ -88,7 +93,7 @@ def persistent_generator(H, persis_info, gen_specs, libE_info):
# Register trial with unknown SEM
generator.tell([trial])
# Set the number of points to generate to that number:
number_of_gen_points = n + n_failed_gens
number_of_gen_points = min(n + n_failed_gens, max_evals - n_gens)
n_failed_gens = 0
else:
number_of_gen_points = 0
Expand Down
5 changes: 3 additions & 2 deletions optimas/generators/ax/developer/multitask.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ def __init__(
def get_gen_specs(
self,
sim_workers: int,
run_params: dict
run_params: Dict,
sim_max: int
) -> Dict:
"""Get the libEnsemble gen_specs."""
# Get base specs.
gen_specs = super().get_gen_specs(sim_workers, run_params)
gen_specs = super().get_gen_specs(sim_workers, run_params, sim_max)
# Add task to output parameters.
max_length = max([len(self.lofi_task.name), len(self.hifi_task.name)])
gen_specs['out'].append(('task', str, max_length))
Expand Down
16 changes: 14 additions & 2 deletions optimas/generators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ def gpu_id(self):
def dedicated_resources(self):
return self._dedicated_resources

@property
def n_trials(self):
return len(self._trials)

def ask(
self,
n_trials: int
Expand Down Expand Up @@ -244,14 +248,20 @@ def save_model_to_file(self) -> None:
def get_gen_specs(
self,
sim_workers: int,
run_params: dict
run_params: Dict,
max_evals: int
) -> Dict:
"""Get the libEnsemble gen_specs.
Parameters
----------
sim_workers : int
Total number of parallel simulation workers.
run_params : dict
Dictionary containing the number of processes and gpus
required.
max_evals : int
Maximum number of evaluations to generate.
"""
self._prepare_to_send()
gen_specs = {
Expand Down Expand Up @@ -281,7 +291,9 @@ def get_gen_specs(
# GPU in which to run generator.
'gpu_id': self._gpu_id,
# num of procs and gpus required
'run_params': run_params
'run_params': run_params,
# Maximum number of evaluations to generate.
'max_evals': max_evals
}
}
return gen_specs
Expand Down
Loading

0 comments on commit 7bd7f49

Please sign in to comment.