From dbee95a09d6224d5bf6ce25898d2ab1096892976 Mon Sep 17 00:00:00 2001 From: Frode Aarstad Date: Tue, 14 Jan 2025 13:33:08 +0100 Subject: [PATCH] Refactor everserver --- src/ert/run_models/everest_run_model.py | 1 + src/everest/bin/everest_script.py | 15 +- src/everest/detached/__init__.py | 60 +++-- src/everest/detached/jobs/everserver.py | 172 +++++++++++-- src/everest/strings.py | 3 + .../entry_points/test_everest_entry.py | 16 ++ .../functional/test_main_everest_entry.py | 6 +- tests/everest/test_detached.py | 10 +- tests/everest/test_everest_output.py | 3 +- tests/everest/test_everserver.py | 237 ++++++++++++------ tests/everest/test_logging.py | 11 +- 11 files changed, 406 insertions(+), 128 deletions(-) diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index 43f41f664f4..3444fc256fe 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -94,6 +94,7 @@ class EverestExitCode(IntEnum): MAX_FUNCTIONS_REACHED = 3 MAX_BATCH_NUM_REACHED = 4 USER_ABORT = 5 + EXCEPTION = 6 class EverestRunModel(BaseRunModel): diff --git a/src/everest/bin/everest_script.py b/src/everest/bin/everest_script.py index 4cc91ae7f59..2c3561cbf0d 100755 --- a/src/everest/bin/everest_script.py +++ b/src/everest/bin/everest_script.py @@ -9,12 +9,12 @@ import threading from functools import partial -from ert.run_models.everest_run_model import EverestRunModel from everest.config import EverestConfig, ServerConfig from everest.detached import ( ServerStatus, everserver_status, server_is_running, + start_experiment, start_server, wait_for_server, ) @@ -114,7 +114,11 @@ async def run_everest(options): except ValueError as exc: raise SystemExit(f"Config validation error: {exc}") from exc - if EverestRunModel.create(options.config).check_if_runpath_exists(): + if ( + options.config.simulation_dir is not None + and os.path.exists(options.config.simulation_dir) + and any(os.listdir(options.config.simulation_dir)) + ): warn_user_that_runpath_is_nonempty() try: @@ -128,6 +132,13 @@ async def run_everest(options): print("Waiting for server ...") wait_for_server(options.config.output_dir, timeout=600) print("Everest server found!") + + start_experiment( + server_context=ServerConfig.get_server_context(options.config.output_dir), + config=options.config, + ) + + # blocks until the run is finished run_detached_monitor( server_context=ServerConfig.get_server_context(options.config.output_dir), optimization_output_dir=options.config.optimization_output_dir, diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index 984b6ac6c05..e908f1c50ad 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -25,6 +25,7 @@ OPT_PROGRESS_ID, SIM_PROGRESS_ENDPOINT, SIM_PROGRESS_ID, + START_EXPERIMENT_ENDPOINT, STOP_ENDPOINT, ) @@ -62,7 +63,9 @@ async def start_server(config: EverestConfig, debug: bool = False) -> Driver: return driver -def stop_server(server_context: tuple[str, str, tuple[str, str]], retries: int = 5): +def stop_server( + server_context: tuple[str, str, tuple[str, str]], retries: int = 5 +) -> bool: """ Stop server if found and it is running. """ @@ -84,6 +87,30 @@ def stop_server(server_context: tuple[str, str, tuple[str, str]], retries: int = return False +def start_experiment( + server_context: tuple[str, str, tuple[str, str]], + config: EverestConfig, + retries: int = 5, +) -> None: + for retry in range(retries): + try: + url, cert, auth = server_context + start_endpoint = "/".join([url, START_EXPERIMENT_ENDPOINT]) + response = requests.post( + start_endpoint, + verify=cert, + auth=auth, + proxies=PROXY, # type: ignore + json=config.to_dict(), + ) + response.raise_for_status() + return + except: + logging.debug(traceback.format_exc()) + time.sleep(retry) + raise ValueError("Failed to start experiment") + + def extract_errors_from_file(path: str): with open(path, encoding="utf-8") as f: content = f.read() @@ -97,29 +124,13 @@ def wait_for_server(output_dir: str, timeout: int) -> None: Raise an exception when the timeout is reached. """ - everserver_status_path = ServerConfig.get_everserver_status_path(output_dir) - if not server_is_running(*ServerConfig.get_server_context(output_dir)): - sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1) - for retry_count in range(_HTTP_REQUEST_RETRY): - # Failure may occur before contact with the server is established: - status = everserver_status(everserver_status_path) - if status["status"] == ServerStatus.completed: - # For very small cases the optimization will finish and bring down the - # server before we can verify that it is running. - return - - if status["status"] == ServerStatus.failed: - raise SystemExit( - "Failed to start Everest with error:\n{}".format(status["message"]) - ) - - sleep_time = sleep_time_increment * (2**retry_count) - time.sleep(sleep_time) - if server_is_running(*ServerConfig.get_server_context(output_dir)): - return - - # If number of retries reached and server is not running - throw exception - raise RuntimeError("Failed to start server within configured timeout.") + sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1) + for retry_count in range(_HTTP_REQUEST_RETRY): + if server_is_running(*ServerConfig.get_server_context(output_dir)): + return + else: + time.sleep(sleep_time_increment * (2**retry_count)) + raise RuntimeError("Failed to get reply from server within configured timeout.") def get_opt_status(output_folder): @@ -175,6 +186,7 @@ def wait_for_server_to_stop(server_context: tuple[str, str, tuple[str, str]], ti def server_is_running(url: str, cert: str, auth: tuple[str, str]): try: + logging.info(f"Checking server status at {url} ") response = requests.get( url, verify=cert, diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index f963340f869..088a08494f2 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -6,12 +6,14 @@ import socket import ssl import threading +import time import traceback from base64 import b64encode from functools import partial from pathlib import Path from typing import Any +import requests import uvicorn from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -30,27 +32,83 @@ HTTPBasic, HTTPBasicCredentials, ) +from pydantic import BaseModel from ert.config.parsing.queue_system import QueueSystem from ert.ensemble_evaluator import EvaluatorServerConfig from ert.run_models.everest_run_model import EverestExitCode, EverestRunModel from everest import export_to_csv, export_with_progress from everest.config import EverestConfig, ExportConfig, ServerConfig -from everest.detached import ServerStatus, get_opt_status, update_everserver_status +from everest.detached import ( + PROXY, + ServerStatus, + get_opt_status, + update_everserver_status, + wait_for_server, +) from everest.export import check_for_errors from everest.plugins.everest_plugin_manager import EverestPluginManager from everest.simulator import JOB_FAILURE from everest.strings import ( DEFAULT_LOGGING_FORMAT, EVEREST, + EXPERIMENT_STATUS_ENDPOINT, OPT_FAILURE_REALIZATIONS, OPT_PROGRESS_ENDPOINT, + SHARED_DATA_ENDPOINT, SIM_PROGRESS_ENDPOINT, + START_EXPERIMENT_ENDPOINT, STOP_ENDPOINT, ) from everest.util import makedirs_if_needed, version_info +class ExperimentStatus(BaseModel): + exit_code: EverestExitCode + message: str | None = None + + +class ExperimentRunner(threading.Thread): + def __init__(self, everest_config, shared_data: dict): + super().__init__() + + self._everest_config = everest_config + self._shared_data = shared_data + self._status: ExperimentStatus | None = None + + def run(self): + run_model = EverestRunModel.create( + self._everest_config, + simulation_callback=partial(_sim_monitor, shared_data=self._shared_data), + optimization_callback=partial(_opt_monitor, shared_data=self._shared_data), + ) + + if run_model._queue_config.queue_system == QueueSystem.LOCAL: + evaluator_server_config = EvaluatorServerConfig() + else: + evaluator_server_config = EvaluatorServerConfig( + custom_port_range=range(49152, 51819), use_ipc_protocol=False + ) + + try: + run_model.run_experiment(evaluator_server_config) + + assert run_model.exit_code is not None + self._status = ExperimentStatus(exit_code=run_model.exit_code) + except Exception as e: + self._status = ExperimentStatus( + exit_code=EverestExitCode.EXCEPTION, message=str(e) + ) + + @property + def status(self) -> ExperimentStatus | None: + return self._status + + @property + def shared_data(self) -> dict: + return self._shared_data + + def _get_machine_name() -> str: """Returns a name that can be used to identify this machine in a network @@ -79,6 +137,8 @@ def _get_machine_name() -> str: def _sim_monitor(context_status, shared_data=None): + assert shared_data is not None + status = context_status["status"] shared_data[SIM_PROGRESS_ENDPOINT] = { "batch_number": context_status["batch_number"], @@ -97,6 +157,7 @@ def _sim_monitor(context_status, shared_data=None): def _opt_monitor(shared_data=None): + assert shared_data is not None if shared_data[STOP_ENDPOINT]: return "stop_optimization" @@ -105,6 +166,8 @@ def _everserver_thread(shared_data, server_config) -> None: app = FastAPI() security = HTTPBasic() + runner: ExperimentRunner | None = None + def _check_user(credentials: HTTPBasicCredentials) -> None: if credentials.password != server_config["authentication"]: raise HTTPException( @@ -153,6 +216,52 @@ def get_opt_progress( progress = get_opt_status(server_config["optimization_output_dir"]) return JSONResponse(jsonable_encoder(progress)) + @app.post("/" + START_EXPERIMENT_ENDPOINT) + def start_experiment( + config: EverestConfig, + request: Request, + credentials: HTTPBasicCredentials = Depends(security), + ) -> Response: + _log(request) + _check_user(credentials) + + nonlocal runner + if runner is None: + runner = ExperimentRunner(config, shared_data) + try: + runner.start() + return Response("Everest experiment started") + except Exception as e: + return Response(f"Could not start experiment: {e!s}", status_code=501) + return Response("Everest experiment is running") + + @app.get("/" + EXPERIMENT_STATUS_ENDPOINT) + def get_experiment_status( + request: Request, credentials: HTTPBasicCredentials = Depends(security) + ) -> Response: + _log(request) + _check_user(credentials) + if shared_data[STOP_ENDPOINT]: + return JSONResponse( + ExperimentStatus(exit_code=EverestExitCode.USER_ABORT).model_dump_json() + ) + if runner is None: + return Response(None, 204) + status = runner.status + if status is None: + return Response(None, 204) + return JSONResponse(status.model_dump_json()) + + @app.get("/" + SHARED_DATA_ENDPOINT) + def get_shared_data( + request: Request, credentials: HTTPBasicCredentials = Depends(security) + ) -> JSONResponse: + _log(request) + _check_user(credentials) + if runner is None: + return JSONResponse(jsonable_encoder(shared_data)) + return JSONResponse(jsonable_encoder(runner.shared_data)) + uvicorn.run( app, host="0.0.0.0", @@ -287,6 +396,7 @@ def main(): ) everserver_instance.daemon = True everserver_instance.start() + except: update_everserver_status( status_path, @@ -296,23 +406,47 @@ def main(): return try: + wait_for_server(config.output_dir, 60) + update_everserver_status(status_path, ServerStatus.running) - run_model = EverestRunModel.create( - config, - simulation_callback=partial(_sim_monitor, shared_data=shared_data), - optimization_callback=partial(_opt_monitor, shared_data=shared_data), - ) - if run_model._queue_config.queue_system == QueueSystem.LOCAL: - evaluator_server_config = EvaluatorServerConfig() - else: - evaluator_server_config = EvaluatorServerConfig( - custom_port_range=range(49152, 51819), use_ipc_protocol=False + server_context = (ServerConfig.get_server_context(config.output_dir),) + url, cert, auth = server_context[0] + + done = False + experiment_status: ExperimentStatus | None = None + # loop until the optimization is done + while not done: + response = requests.get( + "/".join([url, EXPERIMENT_STATUS_ENDPOINT]), + verify=cert, + auth=auth, + timeout=1, + proxies=PROXY, # type: ignore ) + if response.status_code == requests.codes.OK: + json_body = json.loads( + response.text if hasattr(response, "text") else response.body + ) + experiment_status = ExperimentStatus.model_validate_json(json_body) + done = True + else: + time.sleep(1) + + response = requests.get( + "/".join([url, SHARED_DATA_ENDPOINT]), + verify=cert, + auth=auth, + timeout=1, + proxies=PROXY, # type: ignore + ) + if json_body := json.loads( + response.text if hasattr(response, "text") else response.body + ): + shared_data = json_body - run_model.run_experiment(evaluator_server_config) - - status, message = _get_optimization_status(run_model.exit_code, shared_data) + assert experiment_status is not None + status, message = _get_optimization_status(experiment_status, shared_data) if status != ServerStatus.completed: update_everserver_status(status_path, status, message) return @@ -362,8 +496,10 @@ def main(): update_everserver_status(status_path, ServerStatus.completed, message=message) -def _get_optimization_status(exit_code, shared_data): - match exit_code: +def _get_optimization_status( + experiment_status: ExperimentStatus, shared_data: dict +) -> tuple[ServerStatus, str]: + match experiment_status.exit_code: case EverestExitCode.MAX_BATCH_NUM_REACHED: return ServerStatus.completed, "Maximum number of batches reached." @@ -376,6 +512,10 @@ def _get_optimization_status(exit_code, shared_data): case EverestExitCode.USER_ABORT: return ServerStatus.stopped, "Optimization aborted." + case EverestExitCode.EXCEPTION: + assert experiment_status.message is not None + return ServerStatus.failed, experiment_status.message + case EverestExitCode.TOO_FEW_REALIZATIONS: status = ( ServerStatus.stopped diff --git a/src/everest/strings.py b/src/everest/strings.py index 50be1da326b..593f592fa35 100644 --- a/src/everest/strings.py +++ b/src/everest/strings.py @@ -31,3 +31,6 @@ SIM_PROGRESS_ID = "simulation_progress" STOP_ENDPOINT = "stop" STORAGE_DIR = "simulation_results" +START_EXPERIMENT_ENDPOINT = "start_experiment" +EXPERIMENT_STATUS_ENDPOINT = "experiment_status" +SHARED_DATA_ENDPOINT = "shared_data" diff --git a/tests/everest/entry_points/test_everest_entry.py b/tests/everest/entry_points/test_everest_entry.py index f1bece7f1cd..3b38b0496ab 100644 --- a/tests/everest/entry_points/test_everest_entry.py +++ b/tests/everest/entry_points/test_everest_entry.py @@ -77,7 +77,9 @@ def run_detached_monitor_mock(status=ServerStatus.completed, error=None, **kwarg "everest.bin.everest_script.everserver_status", return_value={"status": ServerStatus.never_run, "message": None}, ) +@patch("everest.bin.everest_script.start_experiment") def test_everest_entry_debug( + start_experiment_mock, everserver_status_mock, start_server_mock, wait_for_server_mock, @@ -93,6 +95,7 @@ def test_everest_entry_debug( wait_for_server_mock.assert_called_once() start_monitor_mock.assert_called_once() everserver_status_mock.assert_called() + start_experiment_mock.assert_called_once() # the config file itself is dumped at DEBUG level assert '"controls"' in logstream @@ -108,7 +111,9 @@ def test_everest_entry_debug( "everest.bin.everest_script.everserver_status", return_value={"status": ServerStatus.never_run, "message": None}, ) +@patch("everest.bin.everest_script.start_experiment") def test_everest_entry( + start_experiment_mock, everserver_status_mock, start_server_mock, wait_for_server_mock, @@ -121,6 +126,7 @@ def test_everest_entry( wait_for_server_mock.assert_called_once() start_monitor_mock.assert_called_once() everserver_status_mock.assert_called() + start_experiment_mock.assert_called_once() @patch("everest.bin.everest_script.server_is_running", return_value=False) @@ -131,7 +137,9 @@ def test_everest_entry( "everest.bin.everest_script.everserver_status", return_value={"status": ServerStatus.completed, "message": None}, ) +@patch("everest.bin.everest_script.start_experiment") def test_everest_entry_detached_already_run( + start_experiment_mock, everserver_status_mock, start_server_mock, wait_for_server_mock, @@ -296,7 +304,9 @@ def test_everest_entry_monitor_no_run( "everest.bin.everest_script.everserver_status", return_value={"status": ServerStatus.never_run, "message": None}, ) +@patch("everest.bin.everest_script.start_experiment") def test_everest_entry_show_all_jobs( + start_experiment_mock, everserver_status_mock, get_opt_status_mock, get_server_context_mock, @@ -330,7 +340,9 @@ def test_everest_entry_show_all_jobs( "everest.bin.everest_script.everserver_status", return_value={"status": ServerStatus.never_run, "message": None}, ) +@patch("everest.bin.everest_script.start_experiment") def test_everest_entry_no_show_all_jobs( + start_experiment_mock, everserver_status_mock, get_opt_status_mock, get_server_context_mock, @@ -429,7 +441,9 @@ def test_monitor_entry_no_show_all_jobs( ) @patch("everest.bin.everest_script.wait_for_server") @patch("everest.bin.everest_script.start_server") +@patch("everest.bin.everest_script.start_experiment") def test_exception_raised_when_server_run_fails( + start_experiment_mock, start_server_mock, wait_for_server_mock, start_monitor_mock, @@ -461,7 +475,9 @@ def test_exception_raised_when_server_run_fails_monitor( ) @patch("everest.bin.everest_script.wait_for_server") @patch("everest.bin.everest_script.start_server") +@patch("everest.bin.everest_script.start_experiment") def test_complete_status_for_normal_run( + start_experiment_mock, start_server_mock, wait_for_server_mock, start_monitor_mock, diff --git a/tests/everest/functional/test_main_everest_entry.py b/tests/everest/functional/test_main_everest_entry.py index 8a80384d74e..6a7a6bb495b 100644 --- a/tests/everest/functional/test_main_everest_entry.py +++ b/tests/everest/functional/test_main_everest_entry.py @@ -19,7 +19,6 @@ pytestmark = pytest.mark.xdist_group(name="starts_everest") -@pytest.mark.integration_test def test_everest_entry_version(): """Test calling everest with --version""" with capture_streams() as (out, err), pytest.raises(SystemExit): @@ -29,7 +28,6 @@ def test_everest_entry_version(): assert any(everest_version in channel for channel in channels) -@pytest.mark.integration_test def test_everest_main_entry_bad_command(): # Setup command line arguments for the test with capture_streams() as (_, err), pytest.raises(SystemExit): @@ -40,9 +38,9 @@ def test_everest_main_entry_bad_command(): assert "Run everest --help for more information on a command" in lines -@pytest.mark.flaky(reruns=5) @pytest.mark.skip_mac_ci @pytest.mark.integration_test +@pytest.mark.xdist_group(name="starts_everest") def test_everest_entry_run(cached_example): _, config_file, _ = cached_example("math_func/config_minimal.yml") # Setup command line arguments @@ -76,7 +74,6 @@ def test_everest_entry_run(cached_example): assert status["status"] == ServerStatus.completed -@pytest.mark.integration_test def test_everest_entry_monitor_no_run(cached_example): _, config_file, _ = cached_example("math_func/config_minimal.yml") with capture_streams(): @@ -99,7 +96,6 @@ def test_everest_main_export_entry(cached_example): assert os.path.exists(os.path.join("everest_output", "config_minimal.csv")) -@pytest.mark.integration_test def test_everest_main_lint_entry(cached_example): # Setup command line arguments _, config_file, _ = cached_example("math_func/config_minimal.yml") diff --git a/tests/everest/test_detached.py b/tests/everest/test_detached.py index c6dacf5cd2b..c177af3c177 100644 --- a/tests/everest/test_detached.py +++ b/tests/everest/test_detached.py @@ -40,7 +40,6 @@ from everest.util import makedirs_if_needed -@pytest.mark.flaky(reruns=5) @pytest.mark.integration_test @pytest.mark.skip_mac_ci @pytest.mark.xdist_group(name="starts_everest") @@ -65,7 +64,7 @@ async def test_https_requests(copy_math_func_test_data_to_tmp): raise e server_status = everserver_status(status_path) - assert ServerStatus.running == server_status["status"] + assert server_status["status"] in {ServerStatus.running, ServerStatus.starting} url, cert, auth = ServerConfig.get_server_context(everest_config.output_dir) result = requests.get(url, verify=cert, auth=auth, proxies=PROXY) # noqa: ASYNC210 @@ -140,12 +139,13 @@ def test_server_status(copy_math_func_test_data_to_tmp): assert status["message"] == f"{err_msg_1}\n{err_msg_2}" -@pytest.mark.integration_test @patch("everest.detached.server_is_running", return_value=False) -def test_wait_for_server(server_is_running_mock, caplog, monkeypatch): +def test_wait_for_server(server_is_running_mock, caplog): config = EverestConfig.with_defaults() - with pytest.raises(RuntimeError, match=r"Failed to start .* timeout"): + with pytest.raises( + RuntimeError, match=r"Failed to get reply from server .* timeout" + ): wait_for_server(config.output_dir, timeout=1) assert not caplog.messages diff --git a/tests/everest/test_everest_output.py b/tests/everest/test_everest_output.py index 76ba729ae70..a82a89e8120 100644 --- a/tests/everest/test_everest_output.py +++ b/tests/everest/test_everest_output.py @@ -31,11 +31,12 @@ def test_that_one_experiment_creates_one_ensemble_per_batch(cached_example): @patch("everest.bin.everest_script.run_detached_monitor") @patch("everest.bin.everest_script.wait_for_server") @patch("everest.bin.everest_script.start_server") +@patch("everest.bin.everest_script.start_experiment") @patch( "everest.bin.everest_script.everserver_status", return_value={"status": ServerStatus.never_run, "message": None}, ) -def test_save_running_config(_, _1, _2, _3, _4, copy_math_func_test_data_to_tmp): +def test_save_running_config(_, _1, _2, _3, _4, _5, copy_math_func_test_data_to_tmp): """Test everest detached, when an optimization has already run""" # optimization already run, notify the user file_name = "config_minimal.yml" diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index ee31441e4e3..90563c26efb 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -1,19 +1,53 @@ import json import os import ssl -from functools import partial from pathlib import Path from unittest.mock import patch import pytest +import requests +from fastapi.encoders import jsonable_encoder +from fastapi.responses import JSONResponse from seba_sqlite.snapshot import SebaSnapshot from ert.run_models.everest_run_model import EverestExitCode +from ert.scheduler.event import FinishedEvent from everest.config import EverestConfig, ServerConfig -from everest.detached import ServerStatus, everserver_status +from everest.detached import ( + ServerStatus, + everserver_status, + start_experiment, + start_server, + wait_for_server, +) from everest.detached.jobs import everserver from everest.simulator import JOB_FAILURE, JOB_SUCCESS -from everest.strings import OPT_FAILURE_REALIZATIONS, SIM_PROGRESS_ENDPOINT +from everest.strings import ( + OPT_FAILURE_REALIZATIONS, + SIM_PROGRESS_ENDPOINT, + STOP_ENDPOINT, +) + + +async def wait_for_server_to_complete(config): + # Wait for the server to complete the optimization. + # There should be a @pytest.mark.timeout(x) for tests that call this function. + async def server_running(): + while True: + event = await driver.event_queue.get() + if isinstance(event, FinishedEvent) and event.iens == 0: + return + + driver = await start_server(config, debug=True) + try: + wait_for_server(config.output_dir, 120) + start_experiment( + server_context=ServerConfig.get_server_context(config.output_dir), + config=config, + ) + except (SystemExit, RuntimeError) as e: + raise e + await server_running() def configure_everserver_logger(*args, **kwargs): @@ -91,7 +125,7 @@ def test_hostfile_storage(tmp_path, monkeypatch): "everest.detached.jobs.everserver._configure_loggers", side_effect=configure_everserver_logger, ) -def test_everserver_status_failure(_1, copy_math_func_test_data_to_tmp): +def test_configure_logger_failure(mocked_logger, copy_math_func_test_data_to_tmp): config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) everserver.main() @@ -104,20 +138,39 @@ def test_everserver_status_failure(_1, copy_math_func_test_data_to_tmp): @patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) -@patch( - "ert.run_models.everest_run_model.EverestRunModel.run_experiment", - autospec=True, - side_effect=lambda self, evaluator_server_config, restart=False: check_status( - ServerConfig.get_hostfile_path(self._everest_config.output_dir), - status=ServerStatus.running, - ), -) -def test_everserver_status_running_complete( - _1, mock_server, copy_math_func_test_data_to_tmp +@patch("everest.detached.jobs.everserver._configure_loggers") +@patch("everest.bin.utils.export_to_csv") +@patch("requests.get") +def test_status_running_complete( + mocked_get, mocked_export_to_csv, mocked_logger, copy_math_func_test_data_to_tmp ): config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) + + def mocked_server(url, verify, auth, timeout, proxies): + if "/experiment_status" in url: + return JSONResponse( + everserver.ExperimentStatus( + exit_code=EverestExitCode.COMPLETED + ).model_dump_json() + ) + if "/shared_data" in url: + return JSONResponse( + jsonable_encoder( + { + SIM_PROGRESS_ENDPOINT: {}, + STOP_ENDPOINT: False, + } + ) + ) + resp = requests.Response() + resp.status_code = 200 + return resp + + mocked_get.side_effect = mocked_server + everserver.main() + status = everserver_status( ServerConfig.get_everserver_status_path(config.output_dir) ) @@ -127,35 +180,64 @@ def test_everserver_status_running_complete( @patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) -@patch( - "ert.run_models.everest_run_model.EverestRunModel.run_experiment", - autospec=True, - side_effect=lambda self, evaluator_server_config, restart=False: fail_optimization( - self, from_ropt=True - ), -) -@patch( - "everest.detached.jobs.everserver._sim_monitor", - side_effect=partial( - set_shared_status, - progress=[ - [ - {"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 1"}, - {"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 2"}, - ], - [ - {"name": "job2", "status": JOB_SUCCESS, "error": ""}, - {"name": "job2", "status": JOB_FAILURE, "error": "job 2 error 1"}, - ], - ], - ), -) -def test_everserver_status_failed_job( - _1, _2, mock_server, copy_math_func_test_data_to_tmp -): +@patch("everest.detached.jobs.everserver._configure_loggers") +@patch("requests.get") +def test_status_failed_job(mocked_get, mocked_logger, copy_math_func_test_data_to_tmp): config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) + + def mocked_server(url, verify, auth, timeout, proxies): + if "/experiment_status" in url: + return JSONResponse( + everserver.ExperimentStatus( + exit_code=EverestExitCode.TOO_FEW_REALIZATIONS + ).model_dump_json() + ) + if "/shared_data" in url: + return JSONResponse( + jsonable_encoder( + { + SIM_PROGRESS_ENDPOINT: { + "status": {"failed": 3}, + "progress": [ + [ + { + "name": "job1", + "status": JOB_FAILURE, + "error": "job 1 error 1", + }, + { + "name": "job1", + "status": JOB_FAILURE, + "error": "job 1 error 2", + }, + ], + [ + { + "name": "job2", + "status": JOB_SUCCESS, + "error": "", + }, + { + "name": "job2", + "status": JOB_FAILURE, + "error": "job 2 error 1", + }, + ], + ], + }, + STOP_ENDPOINT: False, + } + ) + ) + resp = requests.Response() + resp.status_code = 200 + return resp + + mocked_get.side_effect = mocked_server + everserver.main() + status = everserver_status( ServerConfig.get_everserver_status_path(config.output_dir) ) @@ -169,42 +251,51 @@ def test_everserver_status_failed_job( @patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) -@patch( - "ert.run_models.everest_run_model.EverestRunModel.run_experiment", - autospec=True, - side_effect=lambda self, evaluator_server_config, restart=False: fail_optimization( - self, from_ropt=False - ), -) -@patch( - "everest.detached.jobs.everserver._sim_monitor", - side_effect=partial(set_shared_status, progress=[]), -) -def test_everserver_status_exception( - _1, _2, mock_server, copy_math_func_test_data_to_tmp -): +@patch("everest.detached.jobs.everserver._configure_loggers") +@patch("requests.get") +def test_status_exception(mocked_get, mocked_logger, copy_math_func_test_data_to_tmp): config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) + + def mocked_server(url, verify, auth, timeout, proxies): + if "/experiment_status" in url: + return JSONResponse( + everserver.ExperimentStatus( + exit_code=EverestExitCode.EXCEPTION, message="Some message" + ).model_dump_json() + ) + if "/shared_data" in url: + return JSONResponse( + jsonable_encoder( + { + SIM_PROGRESS_ENDPOINT: { + "status": {}, + "progress": [], + }, + STOP_ENDPOINT: False, + } + ) + ) + resp = requests.Response() + resp.status_code = 200 + return resp + + mocked_get.side_effect = mocked_server + everserver.main() status = everserver_status( ServerConfig.get_everserver_status_path(config.output_dir) ) - # The server should fail, and store the exception that - # start_optimization raised. assert status["status"] == ServerStatus.failed - assert "Exception: Failed optimization" in status["message"] + assert "Some message" in status["message"] @pytest.mark.integration_test +@pytest.mark.xdist_group(name="starts_everest") +@pytest.mark.timeout(120) @patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) -@patch( - "everest.detached.jobs.everserver._sim_monitor", - side_effect=partial(set_shared_status, progress=[]), -) -def test_everserver_status_max_batch_num( - _1, mock_server, copy_math_func_test_data_to_tmp -): +async def test_status_max_batch_num(copy_math_func_test_data_to_tmp): config = EverestConfig.load_file("config_minimal.yml") config_dict = { **config.model_dump(exclude_none=True), @@ -213,7 +304,8 @@ def test_everserver_status_max_batch_num( config = EverestConfig.model_validate(config_dict) config.dump("config_minimal.yml") - everserver.main() + await wait_for_server_to_complete(config) + status = everserver_status( ServerConfig.get_everserver_status_path(config.output_dir) ) @@ -229,9 +321,11 @@ def test_everserver_status_max_batch_num( @pytest.mark.integration_test +@pytest.mark.xdist_group(name="starts_everest") +@pytest.mark.timeout(120) @patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) -def test_everserver_status_contains_max_runtime_failure( - mock_server, change_to_tmpdir, min_config +async def test_status_contains_max_runtime_failure( + copy_math_func_test_data_to_tmp, min_config ): config_file = "config_minimal.yml" @@ -240,10 +334,13 @@ def test_everserver_status_contains_max_runtime_failure( min_config["forward_model"] = ["sleep 5"] min_config["install_jobs"] = [{"name": "sleep", "source": "SLEEP_job"}] - config = EverestConfig(**min_config) - config.dump(config_file) + tmp_config = EverestConfig(**min_config) + tmp_config.dump(config_file) + + config = EverestConfig.load_file(config_file) + + await wait_for_server_to_complete(config) - everserver.main() status = everserver_status( ServerConfig.get_everserver_status_path(config.output_dir) ) diff --git a/tests/everest/test_logging.py b/tests/everest/test_logging.py index 7b6e76addc7..de3e9b1d893 100644 --- a/tests/everest/test_logging.py +++ b/tests/everest/test_logging.py @@ -9,7 +9,7 @@ ServerConfig, ) from everest.config.install_job_config import InstallJobConfig -from everest.detached import start_server, wait_for_server +from everest.detached import start_experiment, start_server, wait_for_server from everest.util import makedirs_if_needed @@ -17,7 +17,6 @@ def _string_exists_in_file(file_path, string): return string in Path(file_path).read_text(encoding="utf-8") -@pytest.mark.flaky(reruns=5) @pytest.mark.timeout(120) # Simulation might not finish @pytest.mark.integration_test @pytest.mark.xdist_group(name="starts_everest") @@ -45,17 +44,19 @@ async def server_running(): driver = await start_server(everest_config, debug=True) try: wait_for_server(everest_config.output_dir, 120) + + start_experiment( + server_context=ServerConfig.get_server_context(everest_config.output_dir), + config=everest_config, + ) except (SystemExit, RuntimeError) as e: raise e await server_running() everest_output_path = os.path.join(os.getcwd(), "everest_output") - everest_logs_dir_path = everest_config.log_dir - detached_node_dir = ServerConfig.get_detached_node_dir(everest_config.output_dir) endpoint_log_path = os.path.join(detached_node_dir, "endpoint.log") - everest_log_path = os.path.join(everest_logs_dir_path, "everest.log") forward_model_log_path = os.path.join(everest_logs_dir_path, "forward_models.log")