Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement objective scaling #9985

Merged
merged 4 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ everest = [
"decorator",
"resdata",
"colorama",
"ropt[pandas]>=0.12,<0.13",
"ropt[pandas]>=0.13,<0.14",
"ropt-dakota>=0.11,<0.12",
]

Expand Down
169 changes: 111 additions & 58 deletions src/ert/run_models/everest_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ropt.evaluator import EvaluatorContext, EvaluatorResult
from ropt.plan import BasicOptimizer
from ropt.plan import Event as OptimizerEvent
from ropt.transforms import OptModelTransforms
from typing_extensions import TypedDict

from _ert.events import EESnapshot, EESnapshotUpdate, Event
Expand All @@ -29,9 +30,13 @@
from ert.runpaths import Runpaths
from ert.storage import open_storage
from everest.config import ControlConfig, ControlVariableGuessListConfig, EverestConfig
from everest.config.utils import FlattenedControls
from everest.everest_storage import EverestStorage, OptimalResult
from everest.optimizer.everest2ropt import everest2ropt
from everest.optimizer.opt_model_transforms import get_opt_model_transforms
from everest.optimizer.opt_model_transforms import (
ObjectiveScaler,
get_optimization_domain_transforms,
)
from everest.simulator.everest_to_ert import everest_to_ert_config
from everest.strings import EVEREST

Expand Down Expand Up @@ -97,11 +102,6 @@ def __init__(
)

self._everest_config = everest_config
self._opt_model_transforms = get_opt_model_transforms(everest_config.controls)
self._ropt_config = everest2ropt(
everest_config, transforms=self._opt_model_transforms
)

self._sim_callback = simulation_callback
self._opt_callback = optimization_callback
self._fm_errors: dict[int, dict[str, Any]] = {}
Expand Down Expand Up @@ -195,7 +195,7 @@ def run_experiment(
output_dir=Path(self._everest_config.optimization_output_dir),
)
self.ever_storage.init(self._everest_config)
self.ever_storage.observe_optimizer(optimizer, self._opt_model_transforms)
self.ever_storage.observe_optimizer(optimizer)

# Run the optimization:
optimizer_exit_code = optimizer.run().exit_code
Expand All @@ -214,9 +214,38 @@ def run_experiment(
case _:
self._exit_code = EverestExitCode.COMPLETED

def _init_transforms(self, variables: NDArray[np.float64]) -> OptModelTransforms:
realizations = self._everest_config.model.realizations
nreal = len(realizations)
realization_weights = self._everest_config.model.realizations_weights
if realization_weights is None:
realization_weights = [1.0 / nreal] * nreal
transforms = get_optimization_domain_transforms(
self._everest_config.controls,
self._everest_config.objective_functions,
realization_weights,
)
# If required, initialize auto-scaling:
assert isinstance(transforms.objectives, ObjectiveScaler)
if transforms.objectives.has_auto_scale:
objectives, _, _ = self._run_forward_model(
np.repeat(np.expand_dims(variables, axis=0), nreal, axis=0),
realizations,
)
transforms.objectives.calculate_auto_scales(objectives)
return transforms

def _create_optimizer(self) -> BasicOptimizer:
# Initialize the optimization model transforms:
transforms = self._init_transforms(
np.asarray(
FlattenedControls(self._everest_config.controls).initial_guesses,
dtype=np.float64,
)
)
optimizer = BasicOptimizer(
enopt_config=self._ropt_config, evaluator=self._forward_model_evaluator
enopt_config=everest2ropt(self._everest_config, transforms=transforms),
evaluator=self._forward_model_evaluator,
)

# Before each batch evaluation we check if we should abort:
Expand Down Expand Up @@ -250,19 +279,28 @@ def _on_before_forward_model_evaluation(
logging.getLogger(EVEREST).info("User abort requested.")
optimizer.abort_optimization()

def _forward_model_evaluator(
self, control_values: NDArray[np.float64], evaluator_context: EvaluatorContext
) -> EvaluatorResult:
def _run_forward_model(
self,
control_values: NDArray[np.float64],
realizations: list[int],
active_control_vectors: list[bool] | None = None,
) -> tuple[NDArray[np.float64], NDArray[np.float64] | None, list[int]]:
# Reset the current run status:
self._status = None

# Get cached_results:
cached_results = self._get_cached_results(control_values, evaluator_context)
cached_results = self._get_cached_results(control_values, realizations)

# Collect the indices of the controls that must be evaluated in the batch:
evaluated_control_indices = [
idx
for idx in range(control_values.shape[0])
if idx not in cached_results
and (active_control_vectors is None or active_control_vectors[idx])
]

# Create the batch to run:
batch_data = self._init_batch_data(
control_values, evaluator_context, cached_results
)
batch_data = self._init_batch_data(control_values, evaluated_control_indices)

# Initialize a new ensemble in storage:
assert self._experiment is not None
Expand All @@ -274,7 +312,7 @@ def _forward_model_evaluator(
self._setup_sim(sim_id, controls, ensemble)

# Evaluate the batch:
run_args = self._get_run_args(ensemble, evaluator_context, batch_data)
run_args = self._get_run_args(ensemble, realizations, batch_data)
self._context_env.update(
{
"_ERT_EXPERIMENT_ID": str(ensemble.experiment_id),
Expand All @@ -290,33 +328,69 @@ def _forward_model_evaluator(

# Gather the results and create the result for ropt:
results = self._gather_simulation_results(ensemble)
evaluator_result = self._make_evaluator_result(
objectives, constraints = self._get_objectives_and_constraints(
control_values, batch_data, results, cached_results
)

# Add the results from the evaluations to the cache:
self._add_results_to_cache(
control_values,
evaluator_context,
batch_data,
evaluator_result.objectives,
evaluator_result.constraints,
control_values, realizations, batch_data, objectives, constraints
)

# Increase the batch ID for the next evaluation:
self._batch_id += 1

return evaluator_result
# Return the results, together with the indices of the evaluated controls:
return objectives, constraints, evaluated_control_indices

def _get_cached_results(
def _forward_model_evaluator(
verveerpj marked this conversation as resolved.
Show resolved Hide resolved
self, control_values: NDArray[np.float64], evaluator_context: EvaluatorContext
) -> EvaluatorResult:
control_indices = list(range(control_values.shape[0]))
realizations = [
self._everest_config.model.realizations[evaluator_context.realizations[idx]]
for idx in control_indices
]
active_control_vectors = [
evaluator_context.active is None
or bool(evaluator_context.active[evaluator_context.realizations[idx]])
for idx in control_indices
]
batch_id = self._batch_id # Save the batch ID, it will be modified.
yngve-sk marked this conversation as resolved.
Show resolved Hide resolved
objectives, constraints, evaluated_control_indices = self._run_forward_model(
control_values, realizations, active_control_vectors
)

# The simulation id's are a simple enumeration over the evaluated
# forward models. For the evaluated controls they are therefore
# implicitly given by there position in the evaluated_control_indices
# list. We store for each control vector that id, or -1 if it was not
# evaluated:
sim_ids = np.fromiter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it is more understandable I think, and could be revisited a few more times. In the long run we could maybe get rid of the -1 as evaluation id, maybe just use the ERT realization instead of it, since all evaluations are ultimately stored in ERT storage as a realization. This is future work either way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are going to re-evaluate these ID's anyway. These ID's, and the batch-ID are passed, so you can map a result produced by ropt to a runpath on disk. So, if you have the batch and the model realizations, you look for the folder with the right simulation_# name, where # is the simulation ID. That is a bit primitive and cumbersome, we should think about improving this, and indeed ultimately there is some mapping to a location in ERT storage.

(
evaluated_control_indices.index(idx)
if idx in evaluated_control_indices
else -1
for idx in control_indices
),
dtype=np.intc,
)

return EvaluatorResult(
objectives=objectives,
constraints=constraints,
batch_id=batch_id,
evaluation_ids=sim_ids,
)

def _get_cached_results(
self, control_values: NDArray[np.float64], realizations: list[int]
) -> dict[int, Any]:
cached_results: dict[int, Any] = {}
if self._simulator_cache is not None:
for control_idx, real_idx in enumerate(evaluator_context.realizations):
for control_idx, realization in enumerate(realizations):
cached_data = self._simulator_cache.get(
self._everest_config.model.realizations[real_idx],
control_values[control_idx, :],
realization, control_values[control_idx, :]
)
if cached_data is not None:
cached_results[control_idx] = cached_data
Expand All @@ -325,8 +399,7 @@ def _get_cached_results(
def _init_batch_data(
self,
control_values: NDArray[np.float64],
evaluator_context: EvaluatorContext,
cached_results: dict[int, Any],
controls_to_evaluate: list[int],
) -> dict[int, dict[str, Any]]:
def _add_controls(
controls_config: list[ControlConfig], values: NDArray[np.float64]
Expand All @@ -348,15 +421,9 @@ def _add_controls(
batch_data_item[control.name] = control_dict
return batch_data_item

active = evaluator_context.active
realizations = evaluator_context.realizations
return {
idx: _add_controls(self._everest_config.controls, control_values[idx, :])
for idx in range(control_values.shape[0])
if (
idx not in cached_results
and (active is None or active[realizations[idx]])
)
for idx in controls_to_evaluate
}

def _setup_sim(
Expand Down Expand Up @@ -416,18 +483,14 @@ def _check_suffix(
def _get_run_args(
self,
ensemble: Ensemble,
evaluator_context: EvaluatorContext,
realizations: list[int],
batch_data: dict[int, Any],
) -> list[RunArg]:
substitutions = self._substitutions
substitutions["<BATCH_NAME>"] = ensemble.name
self.active_realizations = [True] * len(batch_data)
for sim_id, control_idx in enumerate(batch_data.keys()):
substitutions[f"<GEO_ID_{sim_id}_0>"] = str(
self._everest_config.model.realizations[
evaluator_context.realizations[control_idx]
]
)
substitutions[f"<GEO_ID_{sim_id}_0>"] = str(realizations[control_idx])
run_paths = Runpaths(
jobname_format=self._model_config.jobname_format_string,
runpath_format=self._model_config.runpath_format_string,
Expand Down Expand Up @@ -483,15 +546,14 @@ def _gather_simulation_results(
result[fnc_name] = result[alias]
return results

def _make_evaluator_result(
def _get_objectives_and_constraints(
self,
control_values: NDArray[np.float64],
batch_data: dict[int, Any],
results: list[dict[str, NDArray[np.float64]]],
cached_results: dict[int, Any],
) -> EvaluatorResult:
# We minimize the negative of the objectives:
objectives = -self._get_simulation_results(
) -> tuple[NDArray[np.float64], NDArray[np.float64] | None]:
objectives = self._get_simulation_results(
yngve-sk marked this conversation as resolved.
Show resolved Hide resolved
results, self._everest_config.objective_names, control_values, batch_data
)

Expand All @@ -514,14 +576,7 @@ def _make_evaluator_result(
assert cached_constraints is not None
constraints[control_idx, ...] = cached_constraints

sim_ids = np.full(control_values.shape[0], -1, dtype=np.intc)
sim_ids[list(batch_data.keys())] = np.arange(len(batch_data), dtype=np.intc)
return EvaluatorResult(
objectives=objectives,
constraints=constraints,
batch_id=self._batch_id,
evaluation_ids=sim_ids,
)
return objectives, constraints

@staticmethod
def _get_simulation_results(
Expand All @@ -542,17 +597,15 @@ def _get_simulation_results(
def _add_results_to_cache(
self,
control_values: NDArray[np.float64],
evaluator_context: EvaluatorContext,
realizations: list[int],
batch_data: dict[int, Any],
objectives: NDArray[np.float64],
constraints: NDArray[np.float64] | None,
) -> None:
if self._simulator_cache is not None:
for control_idx in batch_data:
self._simulator_cache.add(
self._everest_config.model.realizations[
evaluator_context.realizations[control_idx]
],
realizations[control_idx],
control_values[control_idx, ...],
objectives[control_idx, ...],
None if constraints is None else constraints[control_idx, ...],
Expand Down
19 changes: 5 additions & 14 deletions src/everest/everest_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import polars as pl
from ropt.enums import EventType
from ropt.plan import BasicOptimizer, Event
from ropt.results import FunctionResults, GradientResults, convert_to_maximize
from ropt.transforms import OptModelTransforms
from ropt.results import FunctionResults, GradientResults

from everest.config import EverestConfig
from everest.strings import EVEREST
Expand Down Expand Up @@ -387,12 +386,10 @@ def read_from_output_dir(self) -> None:
exp = _OptimizerOnlyExperiment(self._output_dir)
self.data.read_from_experiment(exp)

def observe_optimizer(
self, optimizer: BasicOptimizer, transforms: OptModelTransforms
) -> None:
def observe_optimizer(self, optimizer: BasicOptimizer) -> None:
optimizer.add_observer(
EventType.FINISHED_EVALUATION,
partial(self._on_batch_evaluation_finished, transforms=transforms),
partial(self._on_batch_evaluation_finished),
)
optimizer.add_observer(
EventType.FINISHED_OPTIMIZER_STEP, self._on_optimization_finished
Expand Down Expand Up @@ -661,20 +658,14 @@ def _store_gradient_results(self, results: GradientResults) -> _GradientResults:
"perturbation_constraints": perturbation_constraints,
}

def _on_batch_evaluation_finished(
self, event: Event, transforms: OptModelTransforms
) -> None:
def _on_batch_evaluation_finished(self, event: Event) -> None:
logger.debug("Storing batch results dataframes")

converted_results = tuple(
convert_to_maximize(result).transform_back(transforms)
for result in event.data.get("results", [])
)
results: list[FunctionResults | GradientResults] = []

best_value = -np.inf
best_results = None
for item in converted_results:
for item in event.data.get("results", []):
if isinstance(item, GradientResults):
results.append(item)
if (
Expand Down
6 changes: 0 additions & 6 deletions src/everest/optimizer/everest2ropt.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,13 @@ def _parse_controls(controls: FlattenedControls, ropt_config):


def _parse_objectives(objective_functions: list[ObjectiveFunctionConfig], ropt_config):
scales: list[float] = []
auto_scale: list[bool] = []
weights: list[float] = []
function_estimator_indices: list[int] = []
function_estimators: list = []

for objective in objective_functions:
assert isinstance(objective.name, str)
weights.append(objective.weight or 1.0)
scales.append(objective.scale or 1.0)
auto_scale.append(objective.auto_scale or False)

# If any objective specifies an objective type, we have to specify
# function estimators in ropt to implement these types. This is done by
Expand All @@ -95,8 +91,6 @@ def _parse_objectives(objective_functions: list[ObjectiveFunctionConfig], ropt_c

ropt_config["objectives"] = {
"weights": weights,
"scales": scales,
"auto_scale": auto_scale,
}
if function_estimators:
# Only needed if we specified at least one objective type:
Expand Down
Loading