From 9cf7f82cf794e26c4e206cc1f02625c2ec7a8243 Mon Sep 17 00:00:00 2001 From: DanSava Date: Wed, 12 Feb 2025 17:41:01 +0200 Subject: [PATCH 1/4] Change exception type --- src/everest/detached/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index a08f298736e..b9086cce265 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -113,7 +113,7 @@ def start_experiment( except: logging.debug(traceback.format_exc()) time.sleep(retry) - raise ValueError("Failed to start experiment") + raise RuntimeError("Failed to start experiment") def extract_errors_from_file(path: str): From 85658160c7afe2cdd7947500dd7c3e3d1e8ecd19 Mon Sep 17 00:00:00 2001 From: DanSava Date: Wed, 12 Feb 2025 17:47:02 +0200 Subject: [PATCH 2/4] Remove experiment_status and shared_data endpoints Communicate with the server thread via a simple queue --- src/ert/run_models/everest_run_model.py | 1 - src/everest/detached/jobs/everserver.py | 225 ++++++++++-------------- src/everest/strings.py | 2 - tests/everest/test_everserver.py | 180 +++---------------- 4 files changed, 117 insertions(+), 291 deletions(-) diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index ba4746b229d..905b6c0cf0d 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -98,7 +98,6 @@ class EverestExitCode(IntEnum): MAX_FUNCTIONS_REACHED = 3 MAX_BATCH_NUM_REACHED = 4 USER_ABORT = 5 - EXCEPTION = 6 class EverestRunModel(BaseRunModel): diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index 4b76482fd13..8015a212364 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -7,14 +7,14 @@ import socket import ssl import threading -import time import traceback from base64 import b64encode +from contextlib import asynccontextmanager from functools import partial from pathlib import Path +from queue import Empty, SimpleQueue from typing import Any -import requests import uvicorn from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -40,23 +40,19 @@ from ert.run_models.everest_run_model import EverestExitCode, EverestRunModel from everest.config import EverestConfig, ServerConfig from everest.detached import ( - PROXY, ServerStatus, get_opt_status, update_everserver_status, - wait_for_server, ) from everest.plugins.everest_plugin_manager import EverestPluginManager from everest.simulator import JOB_FAILURE from everest.strings import ( DEFAULT_LOGGING_FORMAT, EVEREST, - EXPERIMENT_STATUS_ENDPOINT, OPT_FAILURE_REALIZATIONS, OPT_PROGRESS_ENDPOINT, OPTIMIZATION_LOG_DIR, OPTIMIZATION_OUTPUT_DIR, - SHARED_DATA_ENDPOINT, SIM_PROGRESS_ENDPOINT, START_EXPERIMENT_ENDPOINT, STOP_ENDPOINT, @@ -64,50 +60,67 @@ from everest.util import makedirs_if_needed, version_info -class ExperimentStatus(BaseModel): +class EverestServerMsg(BaseModel): + msg: str | None = None + + +class ServerStarted(EverestServerMsg): + pass + + +class ServerStopped(EverestServerMsg): + pass + + +class ExperimentComplete(EverestServerMsg): exit_code: EverestExitCode - message: str | None = None + data: dict[str, Any] + + +class ExperimentFailed(EverestServerMsg): + pass class ExperimentRunner(threading.Thread): - def __init__(self, everest_config, shared_data: dict): + def __init__( + self, + everest_config, + shared_data: dict, + msg_queue: SimpleQueue[EverestServerMsg], + ): super().__init__() self._everest_config = everest_config self._shared_data = shared_data - self._status: ExperimentStatus | None = None + self._msg_queue = msg_queue def run(self): - run_model = EverestRunModel.create( - self._everest_config, - simulation_callback=partial(_sim_monitor, shared_data=self._shared_data), - optimization_callback=partial(_opt_monitor, shared_data=self._shared_data), - ) - - if run_model._queue_config.queue_system == QueueSystem.LOCAL: - evaluator_server_config = EvaluatorServerConfig() - else: - evaluator_server_config = EvaluatorServerConfig( - port_range=(49152, 51819), use_ipc_protocol=False + try: + run_model = EverestRunModel.create( + self._everest_config, + simulation_callback=partial( + _sim_monitor, shared_data=self._shared_data + ), + optimization_callback=partial( + _opt_monitor, shared_data=self._shared_data + ), ) + if run_model._queue_config.queue_system == QueueSystem.LOCAL: + evaluator_server_config = EvaluatorServerConfig() + else: + evaluator_server_config = EvaluatorServerConfig( + port_range=(49152, 51819), use_ipc_protocol=False + ) - try: run_model.run_experiment(evaluator_server_config) - assert run_model.exit_code is not None - self._status = ExperimentStatus(exit_code=run_model.exit_code) - except Exception as e: - self._status = ExperimentStatus( - exit_code=EverestExitCode.EXCEPTION, message=str(e) + self._msg_queue.put( + ExperimentComplete( + exit_code=run_model.exit_code, data=self._shared_data + ) ) - - @property - def status(self) -> ExperimentStatus | None: - return self._status - - @property - def shared_data(self) -> dict: - return self._shared_data + except Exception as e: + self._msg_queue.put(ExperimentFailed(msg=str(e))) def _get_machine_name() -> str: @@ -163,8 +176,17 @@ def _opt_monitor(shared_data=None): return "stop_optimization" -def _everserver_thread(shared_data, server_config) -> None: - app = FastAPI() +def _everserver_thread(shared_data, server_config, msg_queue) -> None: + # ruff: noqa: RUF029 + @asynccontextmanager + async def lifespan(app: FastAPI): + # Startup event + msg_queue.put(ServerStarted()) + yield + # Shutdown event + msg_queue.put(ServerStopped()) + + app = FastAPI(lifespan=lifespan) security = HTTPBasic() runner: ExperimentRunner | None = None @@ -197,6 +219,7 @@ def stop( _log(request) _check_user(credentials) shared_data[STOP_ENDPOINT] = True + msg_queue.put(ServerStopped()) return Response("Raise STOP flag succeeded. Everest initiates shutdown..", 200) @app.get("/" + SIM_PROGRESS_ENDPOINT) @@ -228,7 +251,7 @@ def start_experiment( nonlocal runner if runner is None: - runner = ExperimentRunner(config, shared_data) + runner = ExperimentRunner(config, shared_data, msg_queue) try: runner.start() return Response("Everest experiment started") @@ -236,33 +259,6 @@ def start_experiment( return Response(f"Could not start experiment: {e!s}", status_code=501) return Response("Everest experiment is running") - @app.get("/" + EXPERIMENT_STATUS_ENDPOINT) - def get_experiment_status( - request: Request, credentials: HTTPBasicCredentials = Depends(security) - ) -> Response: - _log(request) - _check_user(credentials) - if shared_data[STOP_ENDPOINT]: - return JSONResponse( - ExperimentStatus(exit_code=EverestExitCode.USER_ABORT).model_dump_json() - ) - if runner is None: - return Response(None, 204) - status = runner.status - if status is None: - return Response(None, 204) - return JSONResponse(status.model_dump_json()) - - @app.get("/" + SHARED_DATA_ENDPOINT) - def get_shared_data( - request: Request, credentials: HTTPBasicCredentials = Depends(security) - ) -> JSONResponse: - _log(request) - _check_user(credentials) - if runner is None: - return JSONResponse(jsonable_encoder(shared_data)) - return JSONResponse(jsonable_encoder(runner.shared_data)) - uvicorn.run( app, host="0.0.0.0", @@ -364,6 +360,7 @@ def main(): status_path = ServerConfig.get_everserver_status_path(output_dir) host_file = ServerConfig.get_hostfile_path(output_dir) + msg_queue: SimpleQueue[EverestServerMsg] = SimpleQueue() try: _configure_loggers( @@ -397,89 +394,49 @@ def main(): "key_passwd": key_pw, "authentication": authentication, } - + # Starting the server everserver_instance = threading.Thread( target=_everserver_thread, - args=(shared_data, server_config), + args=(shared_data, server_config, msg_queue), ) everserver_instance.daemon = True everserver_instance.start() + # Monitoring the server + while True: + try: + item = msg_queue.get(timeout=1) # Wait for data + match item: + case ServerStarted(): + update_everserver_status(status_path, ServerStatus.running) + case ServerStopped(): + update_everserver_status(status_path, ServerStatus.stopped) + return + case ExperimentFailed(): + update_everserver_status( + status_path, ServerStatus.failed, item.msg + ) + return + case ExperimentComplete(): + status, message = _get_optimization_status( + item.exit_code, item.data + ) + update_everserver_status(status_path, status, message) + return + except Empty: + continue except: update_everserver_status( status_path, ServerStatus.failed, message=traceback.format_exc(), ) - return - - try: - wait_for_server(output_dir, 60) - - update_everserver_status(status_path, ServerStatus.running) - - server_context = (ServerConfig.get_server_context(output_dir),) - url, cert, auth = server_context[0] - - done = False - experiment_status: ExperimentStatus | None = None - # loop until the optimization is done - while not done: - response = requests.get( - "/".join([url, EXPERIMENT_STATUS_ENDPOINT]), - verify=cert, - auth=auth, - timeout=1, - proxies=PROXY, # type: ignore - ) - if response.status_code == requests.codes.OK: - json_body = json.loads( - response.text if hasattr(response, "text") else response.body - ) - experiment_status = ExperimentStatus.model_validate_json(json_body) - done = True - else: - time.sleep(1) - - response = requests.get( - "/".join([url, SHARED_DATA_ENDPOINT]), - verify=cert, - auth=auth, - timeout=1, - proxies=PROXY, # type: ignore - ) - if json_body := json.loads( - response.text if hasattr(response, "text") else response.body - ): - shared_data = json_body - - assert experiment_status is not None - status, message = _get_optimization_status(experiment_status, shared_data) - if status != ServerStatus.completed: - update_everserver_status(status_path, status, message) - return - except: - if shared_data[STOP_ENDPOINT]: - update_everserver_status( - status_path, - ServerStatus.stopped, - message="Optimization aborted.", - ) - else: - update_everserver_status( - status_path, - ServerStatus.failed, - message=traceback.format_exc(), - ) - return - - update_everserver_status(status_path, ServerStatus.completed, message=message) def _get_optimization_status( - experiment_status: ExperimentStatus, shared_data: dict + exit_code: EverestExitCode, shared_data: dict ) -> tuple[ServerStatus, str]: - match experiment_status.exit_code: + match exit_code: case EverestExitCode.MAX_BATCH_NUM_REACHED: return ServerStatus.completed, "Maximum number of batches reached." @@ -492,10 +449,6 @@ def _get_optimization_status( case EverestExitCode.USER_ABORT: return ServerStatus.stopped, "Optimization aborted." - case EverestExitCode.EXCEPTION: - assert experiment_status.message is not None - return ServerStatus.failed, experiment_status.message - case EverestExitCode.TOO_FEW_REALIZATIONS: status = ( ServerStatus.stopped diff --git a/src/everest/strings.py b/src/everest/strings.py index 593f592fa35..52db27deacb 100644 --- a/src/everest/strings.py +++ b/src/everest/strings.py @@ -32,5 +32,3 @@ STOP_ENDPOINT = "stop" STORAGE_DIR = "simulation_results" START_EXPERIMENT_ENDPOINT = "start_experiment" -EXPERIMENT_STATUS_ENDPOINT = "experiment_status" -SHARED_DATA_ENDPOINT = "shared_data" diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index 141d522d54a..b2eeda9f407 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -6,9 +6,6 @@ from unittest.mock import patch import pytest -import requests -from fastapi.encoders import jsonable_encoder -from fastapi.responses import JSONResponse from ert.run_models.everest_run_model import EverestExitCode from ert.scheduler.event import FinishedEvent @@ -21,12 +18,11 @@ wait_for_server, ) from everest.detached.jobs import everserver +from everest.detached.jobs.everserver import ExperimentComplete from everest.everest_storage import EverestStorage -from everest.simulator import JOB_FAILURE, JOB_SUCCESS +from everest.simulator import JOB_FAILURE from everest.strings import ( - OPT_FAILURE_REALIZATIONS, SIM_PROGRESS_ENDPOINT, - STOP_ENDPOINT, ) @@ -56,37 +52,28 @@ def configure_everserver_logger(*args, **kwargs): raise Exception("Configuring logger failed") -def check_status(*args, **kwargs): - everest_server_status_path = str(Path(args[0]).parent / "status") - status = everserver_status(everest_server_status_path) - assert status["status"] == kwargs["status"] - - -def fail_optimization(self, from_ropt=False): - # Patch start_optimization to raise a failed optimization callback. Also - # call the provided simulation callback, which has access to the shared_data - # variable in the eversever main function. Patch that callback to modify - # shared_data (see set_shared_status() below). - self._sim_callback(None) - if from_ropt: - self._exit_code = EverestExitCode.TOO_FEW_REALIZATIONS - return EverestExitCode.TOO_FEW_REALIZATIONS - - raise Exception("Failed optimization") - - -def set_shared_status(*args, progress, shared_data): - # Patch _sim_monitor with this to access the shared_data variable in the - # everserver main function. - failed = len( - [job for queue in progress for job in queue if job["status"] == JOB_FAILURE] - ) - +def fail_experiment_run(shared_data, server_config, msg_queue): shared_data[SIM_PROGRESS_ENDPOINT] = { - "status": {"failed": failed}, - "progress": progress, + "status": {"failed": 3}, + "progress": [ + [ + {"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 1"}, + {"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 2"}, + ], + [ + {"name": "job2", "status": JOB_FAILURE, "error": "job 2 error 1"}, + ], + ], } + msg_queue.put( + ExperimentComplete( + msg="Failed", + exit_code=EverestExitCode.TOO_FEW_REALIZATIONS, + data=shared_data, + ) + ) + @pytest.mark.integration_test def test_certificate_generation(change_to_tmpdir): @@ -124,7 +111,7 @@ def test_hostfile_storage(change_to_tmpdir): "everest.detached.jobs.everserver._configure_loggers", side_effect=configure_everserver_logger, ) -def test_configure_logger_failure(_, tmp_path, change_to_tmpdir): +def test_configure_logger_failure(_, change_to_tmpdir): everserver.main() status = everserver_status( ServerConfig.get_everserver_status_path("everest_output") @@ -136,94 +123,8 @@ def test_configure_logger_failure(_, tmp_path, change_to_tmpdir): @patch("sys.argv", ["name", "--output-dir", "everest_output"]) @patch("everest.detached.jobs.everserver._configure_loggers") -@patch("requests.get") -def test_status_running_complete(mocked_get, _, change_to_tmpdir): - def mocked_server(url, verify, auth, timeout, proxies): - if "/experiment_status" in url: - return JSONResponse( - everserver.ExperimentStatus( - exit_code=EverestExitCode.COMPLETED - ).model_dump_json() - ) - if "/shared_data" in url: - return JSONResponse( - jsonable_encoder( - { - SIM_PROGRESS_ENDPOINT: {}, - STOP_ENDPOINT: False, - } - ) - ) - resp = requests.Response() - resp.status_code = 200 - return resp - - mocked_get.side_effect = mocked_server - - everserver.main() - - status = everserver_status( - ServerConfig.get_everserver_status_path("everest_output") - ) - - assert status["status"] == ServerStatus.completed - assert status["message"] == "Optimization completed." - - -@patch("sys.argv", ["name", "--output-dir", "everest_output"]) -@patch("everest.detached.jobs.everserver._configure_loggers") -@patch("requests.get") -def test_status_failed_job(mocked_get, _, change_to_tmpdir): - def mocked_server(url, verify, auth, timeout, proxies): - if "/experiment_status" in url: - return JSONResponse( - everserver.ExperimentStatus( - exit_code=EverestExitCode.TOO_FEW_REALIZATIONS - ).model_dump_json() - ) - if "/shared_data" in url: - return JSONResponse( - jsonable_encoder( - { - SIM_PROGRESS_ENDPOINT: { - "status": {"failed": 3}, - "progress": [ - [ - { - "name": "job1", - "status": JOB_FAILURE, - "error": "job 1 error 1", - }, - { - "name": "job1", - "status": JOB_FAILURE, - "error": "job 1 error 2", - }, - ], - [ - { - "name": "job2", - "status": JOB_SUCCESS, - "error": "", - }, - { - "name": "job2", - "status": JOB_FAILURE, - "error": "job 2 error 1", - }, - ], - ], - }, - STOP_ENDPOINT: False, - } - ) - ) - resp = requests.Response() - resp.status_code = 200 - return resp - - mocked_get.side_effect = mocked_server - +@patch("everest.detached.jobs.everserver._everserver_thread", fail_experiment_run) +def test_status_failed_job(_, change_to_tmpdir): everserver.main() status = everserver_status( @@ -232,7 +133,6 @@ def mocked_server(url, verify, auth, timeout, proxies): # The server should fail and store a user-friendly message. assert status["status"] == ServerStatus.failed - assert OPT_FAILURE_REALIZATIONS in status["message"] assert "job1 Failed with: job 1 error 1" in status["message"] assert "job1 Failed with: job 1 error 2" in status["message"] assert "job2 Failed with: job 2 error 1" in status["message"] @@ -240,40 +140,16 @@ def mocked_server(url, verify, auth, timeout, proxies): @patch("sys.argv", ["name", "--output-dir", "everest_output"]) @patch("everest.detached.jobs.everserver._configure_loggers") -@patch("requests.get") -def test_status_exception(mocked_get, mocked_logger, change_to_tmpdir): - def mocked_server(url, verify, auth, timeout, proxies): - if "/experiment_status" in url: - return JSONResponse( - everserver.ExperimentStatus( - exit_code=EverestExitCode.EXCEPTION, message="Some message" - ).model_dump_json() - ) - if "/shared_data" in url: - return JSONResponse( - jsonable_encoder( - { - SIM_PROGRESS_ENDPOINT: { - "status": {}, - "progress": [], - }, - STOP_ENDPOINT: False, - } - ) - ) - resp = requests.Response() - resp.status_code = 200 - return resp - - mocked_get.side_effect = mocked_server +async def test_status_exception(_, change_to_tmpdir, min_config): + config = EverestConfig(**min_config) - everserver.main() + await wait_for_server_to_complete(config) status = everserver_status( ServerConfig.get_everserver_status_path("everest_output") ) assert status["status"] == ServerStatus.failed - assert "Some message" in status["message"] + assert "Optimization failed:" in status["message"] @pytest.mark.integration_test From ad404c7d6be13844bf32e42aa7e0f385b6aea06d Mon Sep 17 00:00:00 2001 From: DanSava Date: Wed, 12 Feb 2025 17:48:54 +0200 Subject: [PATCH 3/4] Avoid name clashes with outer scope variables --- src/everest/detached/jobs/everserver.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index 8015a212364..fb1952eb941 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -153,15 +153,15 @@ def _get_machine_name() -> str: def _sim_monitor(context_status, shared_data=None): assert shared_data is not None - status = context_status["status"] + status_ = context_status["status"] shared_data[SIM_PROGRESS_ENDPOINT] = { "batch_number": context_status["batch_number"], "status": { - "running": status.get("Running", 0), - "waiting": status.get("Waiting", 0), - "pending": status.get("Pending", 0), - "complete": status.get("Finished", 0), - "failed": status.get("Failed", 0), + "running": status_.get("Running", 0), + "waiting": status_.get("Waiting", 0), + "pending": status_.get("Pending", 0), + "complete": status_.get("Finished", 0), + "failed": status_.get("Failed", 0), }, "progress": context_status["progress"], } @@ -450,7 +450,7 @@ def _get_optimization_status( return ServerStatus.stopped, "Optimization aborted." case EverestExitCode.TOO_FEW_REALIZATIONS: - status = ( + status_ = ( ServerStatus.stopped if shared_data[STOP_ENDPOINT] else ServerStatus.failed @@ -458,7 +458,7 @@ def _get_optimization_status( messages = _failed_realizations_messages(shared_data) for msg in messages: logging.getLogger(EVEREST).error(msg) - return status, "\n".join(messages) + return status_, "\n".join(messages) case _: return ServerStatus.completed, "Optimization completed." From efea6e93a598c4d6673b97a1cad6a43d04fed9a6 Mon Sep 17 00:00:00 2001 From: DanSava Date: Thu, 13 Feb 2025 13:34:02 +0200 Subject: [PATCH 4/4] Put back removed test in test_everserver --- tests/everest/test_everserver.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index b2eeda9f407..313c12251e2 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -52,6 +52,15 @@ def configure_everserver_logger(*args, **kwargs): raise Exception("Configuring logger failed") +def experiment_run(shared_data, server_config, msg_queue): + msg_queue.put( + ExperimentComplete( + exit_code=EverestExitCode.COMPLETED, + data=shared_data, + ) + ) + + def fail_experiment_run(shared_data, server_config, msg_queue): shared_data[SIM_PROGRESS_ENDPOINT] = { "status": {"failed": 3}, @@ -121,6 +130,20 @@ def test_configure_logger_failure(_, change_to_tmpdir): assert "Exception: Configuring logger failed" in status["message"] +@patch("sys.argv", ["name", "--output-dir", "everest_output"]) +@patch("everest.detached.jobs.everserver._configure_loggers") +@patch("everest.detached.jobs.everserver._everserver_thread", experiment_run) +def test_status_running_complete(_, change_to_tmpdir): + everserver.main() + + status = everserver_status( + ServerConfig.get_everserver_status_path("everest_output") + ) + + assert status["status"] == ServerStatus.completed + assert status["message"] == "Optimization completed." + + @patch("sys.argv", ["name", "--output-dir", "everest_output"]) @patch("everest.detached.jobs.everserver._configure_loggers") @patch("everest.detached.jobs.everserver._everserver_thread", fail_experiment_run)