Skip to content

Commit

Permalink
Merge pull request #110 from optimas-org/chain_evaluator
Browse files Browse the repository at this point in the history
Implement `ChainEvaluator`
  • Loading branch information
AngelFP authored Oct 10, 2023
2 parents 7bfcd89 + 2f81ebc commit 94042f8
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 35 deletions.
1 change: 1 addition & 0 deletions doc/source/api/evaluators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ Evaluators
FunctionEvaluator
TemplateEvaluator
MultitaskEvaluator
ChainEvaluator
4 changes: 3 additions & 1 deletion optimas/evaluators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from .function_evaluator import FunctionEvaluator
from .template_evaluator import TemplateEvaluator
from .multitask_evaluator import MultitaskEvaluator
from .chain_evaluator import ChainEvaluator


__all__ = ['FunctionEvaluator', 'TemplateEvaluator', 'MultitaskEvaluator']
__all__ = ['FunctionEvaluator', 'TemplateEvaluator', 'MultitaskEvaluator',
'ChainEvaluator']
104 changes: 104 additions & 0 deletions optimas/evaluators/chain_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import copy
from typing import List, Dict


from optimas.core import VaryingParameter, Objective, Parameter
from optimas.sim_functions import run_template_simulation
from .base import Evaluator
from .template_evaluator import TemplateEvaluator


class ChainEvaluator(Evaluator):
"""Allows the evaluation of a chain of `TemplateEvaluators`.
This is useful when each evaluation consists of several steps, where each
step is a simulation with a different simulation code. Each step
is defined by a TemplateEvaluator and can request a different number of
resources.
Each evaluation with the `ChainEvaluator` gets allocated the maximum number
of processes (`n_procs`) and GPUs (`n_gpus`) that every step might request
(e.g., if one step requires `n_procs=20` and `n_gpus=0`, and a second step
requires `n_procs=4` and `n_gpus=4`, each evaluation will get assigned
`n_procs=20` and `n_gpus=4`). Then each step will only make use of the
subset of resources it needs.
Parameters
----------
evaluators : list of TemplateEvaluators
A list of TemplateEvaluators given in the order in which they should
be executed.
"""
def __init__(
self,
evaluators: List[TemplateEvaluator]
) -> None:
self._check_evaluators(evaluators)
super().__init__(
run_template_simulation,
)
self.evaluators = evaluators

def get_sim_specs(
self,
varying_parameters: List[VaryingParameter],
objectives: List[Objective],
analyzed_parameters: List[Parameter],
) -> Dict:
"""Get a dictionary with the ``sim_specs`` as expected
by ``libEnsemble``
"""
# Get base sim_specs.
sim_specs = super().get_sim_specs(
varying_parameters, objectives, analyzed_parameters)
sim_specs['user']['steps'] = []
# Get the user specs from each step.
for evaluator in self.evaluators:
sim_specs['user']['steps'].append(evaluator.get_sim_specs(
varying_parameters, objectives, analyzed_parameters)['user'])
return sim_specs

def get_libe_specs(self) -> Dict:
"""Get a dictionary with the ``libE_specs`` as expected
by ``libEnsemble``
"""
# Get libe_specs of each task evaluator.
libE_specs_0 = copy.deepcopy(self.evaluators[0].get_libe_specs())
# Combine the files to copy from all evaluators.
for evaluator in self.evaluators[1:]:
libE_specs_i = evaluator.get_libe_specs()
libE_specs_0['sim_dir_copy_files'] = list(
set(libE_specs_0['sim_dir_copy_files'] +
libE_specs_i['sim_dir_copy_files'])
)
# Use only the combined specs.
return libE_specs_0

def get_run_params(self) -> Dict:
"""Return run parameters for this evaluator."""
num_procs = 0
num_gpus = 0
# Get maximum number of processes and GPUs.
for evaluator in self.evaluators:
ev_run_params = evaluator.get_run_params()
num_procs = max(num_procs, ev_run_params['num_procs'])
num_gpus = max(num_gpus, ev_run_params['num_gpus'])
run_params = {
'num_procs': num_procs,
'num_gpus': num_gpus,
}
return run_params

def _initialize(self) -> None:
"""Initialize the evaluator."""
for i, evaluator in enumerate(self.evaluators):
# Assign a different app name to each evaluator.
evaluator.app_name = f'sim_{i}'
evaluator.initialize()

def _check_evaluators(self, evaluators) -> None:
"""Check the given evaluators."""
# Check that both evaluators are of the same type.
for evaluator in evaluators:
assert isinstance(evaluator, TemplateEvaluator), (
'Only TemplateEvaluators are supported for chain evaluation.')
16 changes: 9 additions & 7 deletions optimas/evaluators/template_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ class TemplateEvaluator(Evaluator):
----------
sim_template : str
Path to the simulation template file.
analysis_func : Callable
analysis_func : Callable, optional
Function that will analyze the simulation output to obtain the value
of the objective(s) and other analyzed parameters.
of the objective(s) and other analyzed parameters. This parameter
is only optional if the `TemplateEvaluator` is included in a
`ChainEvaluator`. In this case, at least one of the chained evaluators
should have an analysis function.
executable : str, optional.
Path to the executable that will run the simulation. Only needed if
the simulation template is not a Python script.
Expand Down Expand Up @@ -48,7 +51,7 @@ class TemplateEvaluator(Evaluator):
def __init__(
self,
sim_template: str,
analysis_func: Callable,
analysis_func: Optional[Callable] = None,
executable: Optional[str] = None,
sim_files: Optional[List[str]] = None,
n_procs: Optional[int] = None,
Expand Down Expand Up @@ -96,6 +99,8 @@ def get_sim_specs(
sim_specs['user']['analysis_func'] = self.analysis_func
sim_specs['user']['sim_template'] = os.path.basename(self.sim_template)
sim_specs['user']['app_name'] = self._app_name
sim_specs['user']['num_procs'] = self._n_procs
sim_specs['user']['num_gpus'] = self._n_gpus
sim_specs['user']['env_script'] = self.env_script
sim_specs['user']['env_mpi'] = self.env_mpi
return sim_specs
Expand Down Expand Up @@ -123,10 +128,7 @@ def _register_app(self) -> None:
"""Register the executable as an app in the libEnsemble executor."""
# Determine executable path.
if self.sim_template.endswith('.py'):
sim_script = os.path.basename(self.sim_template)
# Strip 'template_' from name
sim_script = sim_script[len('template_'):]
executable_path = sim_script
executable_path = os.path.basename(self.sim_template)
else:
# By default, if the template is not a `.py` file, we run
# it with an executable.
Expand Down
79 changes: 52 additions & 27 deletions optimas/sim_functions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import os

import jinja2
import numpy as np

Expand Down Expand Up @@ -27,40 +25,72 @@ def run_template_simulation(H, persis_info, sim_specs, libE_info):
value = "'{}'".format(value)
input_values[name] = value

# Prepare the array that is returned to libE
# Automatically include the input parameters
libE_output = np.zeros(1, dtype=sim_specs['out'])
for name in H.dtype.names:
libE_output[name] = H[name][0]

# Get user specs.
if 'task' in H.dtype.names:
task_name = H['task'][0]
user_specs = sim_specs['user'][task_name]
else:
user_specs = sim_specs['user']
sim_template = user_specs['sim_template']
analysis_func = user_specs['analysis_func']
app_name = user_specs['app_name']

# Get list of simulation steps. If no steps are defined (that is, a
# ChainEvaluator is not being used), create a list with a single step.
if 'steps' in user_specs:
simulation_step_specs = user_specs['steps']
else:
simulation_step_specs = [user_specs]

# Launch and analyze each simulation step.
for step_specs in simulation_step_specs:
calc_status = execute_and_analyze_simulation(
app_name=step_specs['app_name'],
sim_template=step_specs['sim_template'],
input_values=input_values,
analysis_func=step_specs['analysis_func'],
libE_output=libE_output,
num_procs=step_specs['num_procs'],
num_gpus=step_specs['num_gpus'],
env_script=step_specs['env_script'],
mpi_runner_type=step_specs['env_mpi']
)
# If a step has failed, do not continue with next steps.
if calc_status != WORKER_DONE:
break

return libE_output, persis_info, calc_status


def execute_and_analyze_simulation(app_name, sim_template, input_values,
analysis_func, libE_output, num_procs,
num_gpus, env_script,
mpi_runner_type):
"""Run simulation, handle outcome and analyze results."""
# Create simulation input file.
sim_script = sim_template[len('template_'):] # Strip 'template_' from name
with open(sim_template, 'r') as f:
template = jinja2.Template(f.read())
with open(sim_script, 'w') as f:
with open(sim_template, 'w') as f:
f.write(template.render(input_values))
os.remove(sim_template)

# If the template is a python file, no need to provide it as argument
# (it has already been registered by libEnsemble as such).
if sim_script.endswith('.py'):
sim_script = None

# Passed to command line in addition to the executable.
exctr = Executor.executor # Get Executor
if sim_template.endswith('.py'):
sim_template = None

# Launch simulation.
task = exctr.submit(
task = Executor.executor.submit(
app_name=app_name,
app_args=sim_script,
app_args=sim_template,
stdout='out.txt',
stderr='err.txt',
env_script=user_specs['env_script'],
mpi_runner_type=user_specs['env_mpi']
num_procs=num_procs,
num_gpus=num_gpus,
env_script=env_script,
mpi_runner_type=mpi_runner_type
)

# Wait for simulation to complete
Expand All @@ -76,19 +106,14 @@ def run_template_simulation(H, persis_info, sim_specs, libE_info):
print("Warning: Task {} in unknown state {}. Error code {}"
.format(task.name, task.state, task.errcode))

# Prepare the array that is returned to libE
# Automatically include the input parameters
libE_output = np.zeros(1, dtype=sim_specs['out'])
for name in H.dtype.names:
libE_output[name] = H[name][0]

# Data analysis from the last simulation
if calc_status == WORKER_DONE:
# Extract the objective function for the current simulation,
# as well as a few diagnostics
analysis_func(task.workdir, libE_output)
if analysis_func is not None:
# Extract the objective function for the current simulation,
# as well as a few diagnostics
analysis_func(task.workdir, libE_output)

return libE_output, persis_info, calc_status
return calc_status


def run_function(H, persis_info, sim_specs, libE_info):
Expand Down
12 changes: 12 additions & 0 deletions tests/resources/template_simulation_script_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""
Dummy simulation template used for testing. It takes the result from a
previous evaluation and uses it to perform another evaluation.
"""

with open('result.txt', 'r') as f:
result_1 = float(f.read())

result_2 = 2 * result_1

with open('result_2.txt', 'w') as f:
f.write("%f" %result_2)
Loading

0 comments on commit 94042f8

Please sign in to comment.