Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
frode-aarstad committed Jan 22, 2025
1 parent dafea39 commit 8334307
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 110 deletions.
1 change: 0 additions & 1 deletion everest_output/.internal_data/wells.json

This file was deleted.

1 change: 0 additions & 1 deletion everest_output/.jobs/_recovery_factor

This file was deleted.

1 change: 0 additions & 1 deletion everest_output/.jobs/_render

This file was deleted.

1 change: 0 additions & 1 deletion everest_output/.jobs/_wdcompl

This file was deleted.

1 change: 0 additions & 1 deletion everest_output/.jobs/_wddatefilter

This file was deleted.

1 change: 0 additions & 1 deletion everest_output/.jobs/_wdfilter

This file was deleted.

1 change: 0 additions & 1 deletion everest_output/.jobs/_wdreorder

This file was deleted.

1 change: 0 additions & 1 deletion everest_output/.jobs/_wdset

This file was deleted.

1 change: 0 additions & 1 deletion everest_output/.jobs/_wdupdate

This file was deleted.

45 changes: 5 additions & 40 deletions src/everest/detached/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ def extract_errors_from_file(path: str):
return re.findall(r"(Error \w+.*)", content)


def wait_for_server_simple(
url: str, cert: str, auth: tuple[str, str], timeout: int
) -> None:
def wait_for_server(output_dir: str, timeout: int) -> None:
"""
Checks everest server has started _HTTP_REQUEST_RETRY times. Waits
progressively longer between each check.
Expand All @@ -128,47 +126,13 @@ def wait_for_server_simple(
"""
sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1)
for retry_count in range(_HTTP_REQUEST_RETRY):
try:
requests.get(url + "/", verify=cert, auth=auth, proxies=PROXY) # type: ignore
if server_is_running(*ServerConfig.get_server_context(output_dir)):
return
except Exception:
sleep_time = sleep_time_increment * (2**retry_count)
time.sleep(sleep_time)
else:
time.sleep(sleep_time_increment * (2**retry_count))
raise RuntimeError("Failed to get reply from server within configured timeout.")


def wait_for_server(output_dir: str, timeout: int) -> None:
"""
Checks everest server has started _HTTP_REQUEST_RETRY times. Waits
progressively longer between each check.
Raise an exception when the timeout is reached.
"""
everserver_status_path = ServerConfig.get_everserver_status_path(output_dir)
if not server_is_running(*ServerConfig.get_server_context(output_dir)):
sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1)
for retry_count in range(_HTTP_REQUEST_RETRY):
# Failure may occur before contact with the server is established:
status = everserver_status(everserver_status_path)
if status["status"] == ServerStatus.completed:
# For very small cases the optimization will finish and bring down the
# server before we can verify that it is running.
return

if status["status"] == ServerStatus.failed:
raise SystemExit(
"Failed to start Everest with error:\n{}".format(status["message"])
)

sleep_time = sleep_time_increment * (2**retry_count)
time.sleep(sleep_time)
if server_is_running(*ServerConfig.get_server_context(output_dir)):
return

# If number of retries reached and server is not running - throw exception
raise RuntimeError("Failed to start server within configured timeout.")


def get_opt_status(output_folder):
"""Retrieve a seba database snapshot and return a dictionary with
optimization information."""
Expand Down Expand Up @@ -222,6 +186,7 @@ def wait_for_server_to_stop(server_context: tuple[str, str, tuple[str, str]], ti

def server_is_running(url: str, cert: str, auth: tuple[str, str]):
try:
logging.info(f"Checking server status at {url} ")
response = requests.get(
url,
verify=cert,
Expand Down
24 changes: 13 additions & 11 deletions src/everest/detached/jobs/everserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
HTTPBasicCredentials,
)

from ert.config.parsing.queue_system import QueueSystem
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.run_models.everest_run_model import EverestExitCode, EverestRunModel
from everest import export_to_csv, export_with_progress
Expand All @@ -42,7 +43,7 @@
ServerStatus,
get_opt_status,
update_everserver_status,
wait_for_server_simple,
wait_for_server,
)
from everest.export import check_for_errors
from everest.plugins.everest_plugin_manager import EverestPluginManager
Expand Down Expand Up @@ -76,8 +77,7 @@ def run(self):
optimization_callback=partial(_opt_monitor, shared_data=self._shared_data),
)

if self._everest_config.simulator.queue_system.name == "local":
# if run_model._queue_config.queue_system == QueueSystem.LOCAL:
if run_model._queue_config.queue_system == QueueSystem.LOCAL:
evaluator_server_config = EvaluatorServerConfig()
else:
evaluator_server_config = EvaluatorServerConfig(
Expand Down Expand Up @@ -381,9 +381,6 @@ def main():
everserver_instance.daemon = True
everserver_instance.start()

server_context = (ServerConfig.get_server_context(config.output_dir),)
url, cert, auth = server_context[0]

except:
update_everserver_status(
status_path,
Expand All @@ -393,32 +390,37 @@ def main():
return

try:
wait_for_server_simple(url, cert, auth, 60)
wait_for_server(config.output_dir, 60)

update_everserver_status(status_path, ServerStatus.running)

is_done = False
server_context = (ServerConfig.get_server_context(config.output_dir),)
url, cert, auth = server_context[0]

done = False
exit_code = None
# loop unil the optimization is done
while not is_done:
# loop until the optimization is done
while not done:
response = requests.get(
"/".join([url, EXPERIMENT_STATUS_ENDPOINT]),
verify=cert,
auth=auth,
timeout=1,
proxies=PROXY, # type: ignore
)
if response.status_code == requests.codes.OK:
exit_code = int(
response.text if hasattr(response, "text") else response.body
)
is_done = True
done = True
else:
time.sleep(1)

response = requests.get(
"/".join([url, SHARED_DATA_ENDPOINT]),
verify=cert,
auth=auth,
timeout=1,
proxies=PROXY, # type: ignore
)
if json_body := json.loads(
Expand Down
6 changes: 1 addition & 5 deletions tests/everest/functional/test_main_everest_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
pytestmark = pytest.mark.xdist_group(name="starts_everest")


@pytest.mark.integration_test
def test_everest_entry_version():
"""Test calling everest with --version"""
with capture_streams() as (out, err), pytest.raises(SystemExit):
Expand All @@ -29,7 +28,6 @@ def test_everest_entry_version():
assert any(everest_version in channel for channel in channels)


@pytest.mark.integration_test
def test_everest_main_entry_bad_command():
# Setup command line arguments for the test
with capture_streams() as (_, err), pytest.raises(SystemExit):
Expand All @@ -40,9 +38,9 @@ def test_everest_main_entry_bad_command():
assert "Run everest <command> --help for more information on a command" in lines


@pytest.mark.flaky(reruns=5)
@pytest.mark.skip_mac_ci
@pytest.mark.integration_test
@pytest.mark.xdist_group(name="starts_everest")
def test_everest_entry_run(cached_example):
_, config_file, _ = cached_example("math_func/config_minimal.yml")
# Setup command line arguments
Expand Down Expand Up @@ -76,7 +74,6 @@ def test_everest_entry_run(cached_example):
assert status["status"] == ServerStatus.completed


@pytest.mark.integration_test
def test_everest_entry_monitor_no_run(cached_example):
_, config_file, _ = cached_example("math_func/config_minimal.yml")
with capture_streams():
Expand All @@ -99,7 +96,6 @@ def test_everest_main_export_entry(cached_example):
assert os.path.exists(os.path.join("everest_output", "config_minimal.csv"))


@pytest.mark.integration_test
def test_everest_main_lint_entry(cached_example):
# Setup command line arguments
_, config_file, _ = cached_example("math_func/config_minimal.yml")
Expand Down
8 changes: 4 additions & 4 deletions tests/everest/test_detached.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from everest.util import makedirs_if_needed


# @pytest.mark.flaky(reruns=5)
@pytest.mark.integration_test
@pytest.mark.skip_mac_ci
@pytest.mark.xdist_group(name="starts_everest")
Expand Down Expand Up @@ -140,12 +139,13 @@ def test_server_status(copy_math_func_test_data_to_tmp):
assert status["message"] == f"{err_msg_1}\n{err_msg_2}"


@pytest.mark.integration_test
@patch("everest.detached.server_is_running", return_value=False)
def test_wait_for_server(server_is_running_mock, caplog, monkeypatch):
def test_wait_for_server(server_is_running_mock, caplog):
config = EverestConfig.with_defaults()

with pytest.raises(RuntimeError, match=r"Failed to start .* timeout"):
with pytest.raises(
RuntimeError, match=r"Failed to get reply from server .* timeout"
):
wait_for_server(config.output_dir, timeout=1)

assert not caplog.messages
Expand Down
78 changes: 38 additions & 40 deletions tests/everest/test_everserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from unittest.mock import patch

import pytest
import requests
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, PlainTextResponse, Response
from fastapi.responses import JSONResponse, Response
from seba_sqlite.snapshot import SebaSnapshot

from ert.run_models.everest_run_model import EverestExitCode
Expand All @@ -28,6 +29,27 @@
)


async def wait_for_server_to_complete(config):
# Wait for the server to complete the optimization.
# There should be a @pytest.mark.timeout(x) for tests that call this function.
async def server_running():
while True:
event = await driver.event_queue.get()
if isinstance(event, FinishedEvent) and event.iens == 0:
return

driver = await start_server(config, debug=True)
try:
wait_for_server(config.output_dir, 120)
start_experiment(
server_context=ServerConfig.get_server_context(config.output_dir),
config=config,
)
except (SystemExit, RuntimeError) as e:
raise e
await server_running()


def configure_everserver_logger(*args, **kwargs):
"""Mock exception raised"""
raise Exception("Configuring logger failed")
Expand Down Expand Up @@ -125,7 +147,7 @@ def test_status_running_complete(
config_file = "config_minimal.yml"
config = EverestConfig.load_file(config_file)

def mocked_server(url, verify, auth, proxies):
def mocked_server(url, verify, auth, timeout, proxies):
if "/experiment_status" in url:
return Response(f"{EverestExitCode.COMPLETED}", 200)
if "/shared_data" in url:
Expand All @@ -138,7 +160,9 @@ def mocked_server(url, verify, auth, proxies):
)
)

return PlainTextResponse("Everest is running")
resp = requests.Response()
resp.status_code = 200
return resp

mocked_get.side_effect = mocked_server

Expand All @@ -159,7 +183,7 @@ def test_status_failed_job(mocked_get, mocked_logger, copy_math_func_test_data_t
config_file = "config_minimal.yml"
config = EverestConfig.load_file(config_file)

def mocked_server(url, verify, auth, proxies):
def mocked_server(url, verify, auth, timeout, proxies):
if "/experiment_status" in url:
return Response(f"{EverestExitCode.TOO_FEW_REALIZATIONS}", 200)

Expand Down Expand Up @@ -200,7 +224,9 @@ def mocked_server(url, verify, auth, proxies):
}
)
)
return PlainTextResponse("Everest is running")
resp = requests.Response()
resp.status_code = 200
return resp

mocked_get.side_effect = mocked_server

Expand All @@ -225,7 +251,7 @@ def test_status_exception(mocked_get, mocked_logger, copy_math_func_test_data_to
config_file = "config_minimal.yml"
config = EverestConfig.load_file(config_file)

def mocked_server(url, verify, auth, proxies):
def mocked_server(url, verify, auth, timeout, proxies):
if "/experiment_status" in url:
return Response(f"{EverestExitCode.EXCEPTION}", 200)

Expand All @@ -241,7 +267,10 @@ def mocked_server(url, verify, auth, proxies):
}
)
)
return PlainTextResponse("Everest is running")

resp = requests.Response()
resp.status_code = 200
return resp

mocked_get.side_effect = mocked_server

Expand All @@ -267,22 +296,7 @@ async def test_status_max_batch_num(copy_math_func_test_data_to_tmp):
)
config.dump("config_minimal.yml")

async def server_running():
while True:
event = await driver.event_queue.get()
if isinstance(event, FinishedEvent) and event.iens == 0:
return

driver = await start_server(config, debug=True)
try:
wait_for_server(config.output_dir, 120)
start_experiment(
server_context=ServerConfig.get_server_context(config.output_dir),
config=config,
)
except (SystemExit, RuntimeError) as e:
raise e
await server_running()
await wait_for_server_to_complete(config)

status = everserver_status(
ServerConfig.get_everserver_status_path(config.output_dir)
Expand Down Expand Up @@ -317,23 +331,7 @@ async def test_status_contains_max_runtime_failure(

config = EverestConfig.load_file(config_file)

async def server_running():
while True:
event = await driver.event_queue.get()
if isinstance(event, FinishedEvent) and event.iens == 0:
return

driver = await start_server(config, debug=True)
try:
wait_for_server(config.output_dir, 120)

start_experiment(
server_context=ServerConfig.get_server_context(config.output_dir),
config=config,
)
except (SystemExit, RuntimeError) as e:
raise e
await server_running()
await wait_for_server_to_complete(config)

status = everserver_status(
ServerConfig.get_everserver_status_path(config.output_dir)
Expand Down
Loading

0 comments on commit 8334307

Please sign in to comment.