Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create end point for events #9691

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ types = [
"types-setuptools",
]
everest = [
"websockets",
"progressbar2",
"ruamel.yaml",
"fastapi",
Expand Down
134 changes: 43 additions & 91 deletions src/ert/run_models/everest_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
from ropt.plan import Event as OptimizerEvent
from typing_extensions import TypedDict

from _ert.events import EESnapshot, EESnapshotUpdate, Event
from ert.config import ErtConfig, ExtParamConfig
from ert.ensemble_evaluator import EnsembleSnapshot, EvaluatorServerConfig
from ert.ensemble_evaluator import EndEvent, EvaluatorServerConfig
from ert.runpaths import Runpaths
from ert.storage import open_storage
from everest.config import ControlConfig, ControlVariableGuessListConfig, EverestConfig
from everest.detached import ServerStatus
from everest.everest_storage import EverestStorage, OptimalResult
from everest.optimizer.everest2ropt import everest2ropt
from everest.optimizer.opt_model_transforms import get_opt_model_transforms
from everest.simulator.everest_to_ert import everest_to_ert_config
from everest.strings import EVEREST
from everest.strings import EVEREST, OPT_FAILURE_REALIZATIONS

from ..run_arg import RunArg, create_run_arguments
from .base_run_model import BaseRunModel, StatusEvents
Expand Down Expand Up @@ -83,12 +83,12 @@ def __init__(
self,
config: ErtConfig,
everest_config: EverestConfig,
simulation_callback: SimulationCallback | None,
optimization_callback: OptimizerCallback | None,
status_queue: queue.SimpleQueue[StatusEvents] | None = None,
):
Path(everest_config.log_dir).mkdir(parents=True, exist_ok=True)
Path(everest_config.optimization_output_dir).mkdir(parents=True, exist_ok=True)

status_queue = queue.SimpleQueue() if status_queue is None else status_queue
assert everest_config.environment is not None
logging.getLogger(EVEREST).info(
"Using random seed: %d. To deterministically reproduce this experiment, "
Expand All @@ -102,7 +102,6 @@ def __init__(
everest_config, transforms=self._opt_model_transforms
)

self._sim_callback = simulation_callback
self._opt_callback = optimization_callback
self._fm_errors: dict[int, dict[str, Any]] = {}
self._result: OptimalResult | None = None
Expand All @@ -118,11 +117,8 @@ def __init__(
self._experiment: Experiment | None = None
self._eval_server_cfg: EvaluatorServerConfig | None = None
self._batch_id: int = 0
self._status: SimulationStatus | None = None

storage = open_storage(config.ens_path, mode="w")
status_queue: queue.SimpleQueue[StatusEvents] = queue.SimpleQueue()

super().__init__(
storage,
config.runpath_file,
Expand All @@ -149,12 +145,13 @@ def create(
ever_config: EverestConfig,
simulation_callback: SimulationCallback | None = None,
optimization_callback: OptimizerCallback | None = None,
status_queue: queue.SimpleQueue[StatusEvents] | None = None,
) -> EverestRunModel:
return cls(
config=everest_to_ert_config(ever_config),
everest_config=ever_config,
simulation_callback=simulation_callback,
optimization_callback=optimization_callback,
status_queue=status_queue,
)

@classmethod
Expand Down Expand Up @@ -213,6 +210,12 @@ def run_experiment(
self._exit_code = EverestExitCode.TOO_FEW_REALIZATIONS
case _:
self._exit_code = EverestExitCode.COMPLETED
self.send_event(
EndEvent(
failed=self._exit_code != EverestExitCode.COMPLETED,
msg=_get_optimization_status(self._exit_code, "", False)[1],
)
)

def _create_optimizer(self) -> BasicOptimizer:
optimizer = BasicOptimizer(
Expand Down Expand Up @@ -253,9 +256,6 @@ def _on_before_forward_model_evaluation(
def _forward_model_evaluator(
self, control_values: NDArray[np.float64], evaluator_context: EvaluatorContext
) -> EvaluatorResult:
# Reset the current run status:
self._status = None

# Get cached_results:
cached_results = self._get_cached_results(control_values, evaluator_context)

Expand All @@ -269,6 +269,7 @@ def _forward_model_evaluator(
ensemble = self._experiment.create_ensemble(
name=f"batch_{self._batch_id}",
ensemble_size=len(batch_data),
iteration=self._batch_id,
)
for sim_id, controls in enumerate(batch_data.values()):
self._setup_sim(sim_id, controls, ensemble)
Expand Down Expand Up @@ -420,10 +421,10 @@ def _get_run_args(
batch_data: dict[int, Any],
) -> list[RunArg]:
substitutions = self._substitutions
substitutions["<BATCH_NAME>"] = ensemble.name
substitutions["<BATCH_NAME>"] = ensemble.name # Dark magic, should be fixed
self.active_realizations = [True] * len(batch_data)
for sim_id, control_idx in enumerate(batch_data.keys()):
substitutions[f"<GEO_ID_{sim_id}_0>"] = str(
substitutions[f"<GEO_ID_{sim_id}_{ensemble.iteration}>"] = str(
self._everest_config.model.realizations[
evaluator_context.realizations[control_idx]
]
Expand Down Expand Up @@ -565,82 +566,6 @@ def check_if_runpath_exists(self) -> bool:
and any(os.listdir(self._everest_config.simulation_dir))
)

def send_snapshot_event(self, event: Event, iteration: int) -> None:
super().send_snapshot_event(event, iteration)
if type(event) in {EESnapshot, EESnapshotUpdate}:
newstatus = self._simulation_status(self.get_current_snapshot())
if self._status != newstatus: # No change in status
if self._sim_callback is not None:
self._sim_callback(newstatus)
self._status = newstatus

def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus:
jobs_progress: list[list[JobProgress]] = []
prev_realization = None
jobs: list[JobProgress] = []
for (realization, simulation), fm_step in snapshot.get_all_fm_steps().items():
if realization != prev_realization:
prev_realization = realization
if jobs:
jobs_progress.append(jobs)
jobs = []
jobs.append(
{
"name": fm_step.get("name") or "Unknown",
"status": fm_step.get("status") or "Unknown",
"error": fm_step.get("error", ""),
"start_time": fm_step.get("start_time", None),
"end_time": fm_step.get("end_time", None),
"realization": realization,
"simulation": simulation,
}
)
if fm_step.get("error", ""):
self._handle_errors(
batch=self._batch_id,
simulation=simulation,
realization=realization,
fm_name=fm_step.get("name", "Unknown"), # type: ignore
error_path=fm_step.get("stderr", ""), # type: ignore
fm_running_err=fm_step.get("error", ""), # type: ignore
)
jobs_progress.append(jobs)

return {
"status": self.get_current_status(),
"progress": jobs_progress,
"batch_number": self._batch_id,
}

def _handle_errors(
self,
batch: int,
simulation: Any,
realization: str,
fm_name: str,
error_path: str,
fm_running_err: str,
) -> None:
fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}"
fm_logger = logging.getLogger("forward_models")
if Path(error_path).is_file():
error_str = Path(error_path).read_text(encoding="utf-8") or fm_running_err
else:
error_str = fm_running_err
error_hash = hash(error_str)
err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format(
batch, realization, simulation, fm_name, "\n Error: {} ID:{}"
)

if error_hash not in self._fm_errors:
error_id = len(self._fm_errors)
fm_logger.error(err_msg.format(error_str, error_id))
self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}})
elif fm_id not in self._fm_errors[error_hash]["ids"]:
self._fm_errors[error_hash]["ids"].append(fm_id)
error_id = self._fm_errors[error_hash]["error_id"]
fm_logger.error(err_msg.format("Already reported as", error_id))


class SimulatorCache:
EPS = float(np.finfo(np.float32).eps)
Expand Down Expand Up @@ -691,3 +616,30 @@ def get(
if np.allclose(controls, control_values, rtol=0.0, atol=self.EPS):
return objectives, constraints
return None


def _get_optimization_status(
exit_code: int, exception: str, stopped: bool
) -> tuple[ServerStatus, str]:
match exit_code:
case EverestExitCode.MAX_BATCH_NUM_REACHED:
return ServerStatus.completed, "Maximum number of batches reached."

case EverestExitCode.MAX_FUNCTIONS_REACHED:
return (
ServerStatus.completed,
"Maximum number of function evaluations reached.",
)

case EverestExitCode.USER_ABORT:
return ServerStatus.stopped, "Optimization aborted."

case EverestExitCode.EXCEPTION:
assert exception is not None
return ServerStatus.failed, exception

case EverestExitCode.TOO_FEW_REALIZATIONS:
status = ServerStatus.stopped if stopped else ServerStatus.failed
return status, OPT_FAILURE_REALIZATIONS
case _:
return ServerStatus.completed, "Optimization completed."
Loading