From 6b40904cbca412dbfd4c6fd4994b5d175b0dcb1e Mon Sep 17 00:00:00 2001 From: Jon Holba Date: Thu, 6 Feb 2025 09:52:14 +0100 Subject: [PATCH] Let zmq select ports itself zmq will select a port each time ensemble evaluator starts. This way there will be no time where other programs can pick up the target port before zmq does. Removed large parts of net_utils and its tests that were no longer needed. --- src/ert/cli/main.py | 5 +- src/ert/ensemble_evaluator/_ensemble.py | 4 +- src/ert/ensemble_evaluator/config.py | 53 ++- src/ert/ensemble_evaluator/evaluator.py | 13 +- .../evaluator_connection_info.py | 9 - src/ert/ensemble_evaluator/monitor.py | 14 +- src/ert/gui/simulation/run_dialog.py | 7 +- src/ert/run_models/base_run_model.py | 2 +- src/ert/services/_storage_main.py | 8 +- src/ert/shared/net_utils.py | 61 +-- src/everest/detached/jobs/everserver.py | 2 +- .../unit_tests/ensemble_evaluator/conftest.py | 2 +- .../test_ensemble_evaluator.py | 65 +-- .../test_ensemble_evaluator_config.py | 18 +- .../test_ensemble_legacy.py | 18 +- .../ensemble_evaluator/test_monitor.py | 36 +- .../ensemble_evaluator/test_scheduler.py | 8 +- .../unit_tests/shared/test_port_handler.py | 417 ++---------------- tests/ert/unit_tests/test_tracking.py | 18 +- tests/everest/test_simulator_cache.py | 8 +- 20 files changed, 170 insertions(+), 598 deletions(-) delete mode 100644 src/ert/ensemble_evaluator/evaluator_connection_info.py diff --git a/src/ert/cli/main.py b/src/ert/cli/main.py index 4568681c636..2a888e34e6e 100644 --- a/src/ert/cli/main.py +++ b/src/ert/cli/main.py @@ -104,7 +104,10 @@ def run_cli(args: Namespace, plugin_manager: ErtPluginManager | None = None) -> use_ipc_protocol = model.queue_system == QueueSystem.LOCAL evaluator_server_config = EvaluatorServerConfig( - custom_port_range=args.port_range, use_ipc_protocol=use_ipc_protocol + port_range=None + if args.port_range is None + else (min(args.port_range), max(args.port_range) + 1), + use_ipc_protocol=use_ipc_protocol, ) if model.check_if_runpath_exists(): diff --git a/src/ert/ensemble_evaluator/_ensemble.py b/src/ert/ensemble_evaluator/_ensemble.py index 07fc0a7f876..a2979b24578 100644 --- a/src/ert/ensemble_evaluator/_ensemble.py +++ b/src/ert/ensemble_evaluator/_ensemble.py @@ -218,7 +218,7 @@ async def evaluate( ce_unary_send_method_name, partialmethod( self.__class__.send_event, - self._config.get_connection_info().router_uri, + self._config.get_uri(), token=self._config.token, ), ) @@ -267,7 +267,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches max_running=self._queue_config.max_running, submit_sleep=self._queue_config.submit_sleep, ens_id=self.id_, - ee_uri=self._config.get_connection_info().router_uri, + ee_uri=self._config.get_uri(), ee_token=self._config.token, ) logger.info( diff --git a/src/ert/ensemble_evaluator/config.py b/src/ert/ensemble_evaluator/config.py index 882824889f1..2721503deeb 100644 --- a/src/ert/ensemble_evaluator/config.py +++ b/src/ert/ensemble_evaluator/config.py @@ -1,14 +1,11 @@ import logging -import socket import uuid import warnings import zmq -from ert.shared import find_available_socket from ert.shared import get_machine_name as ert_shared_get_machine_name - -from .evaluator_connection_info import EvaluatorConnectionInfo +from ert.shared.net_utils import get_ip_address logger = logging.getLogger(__name__) @@ -25,39 +22,41 @@ def get_machine_name() -> str: class EvaluatorServerConfig: def __init__( self, - custom_port_range: range | None = None, + port_range: tuple[int, int] | None = None, use_token: bool = True, - custom_host: str | None = None, + host: str | None = None, use_ipc_protocol: bool = True, ) -> None: - self.host: str | None = None + self.host: str | None = host self.router_port: int | None = None - self.url = f"ipc:///tmp/socket-{uuid.uuid4().hex[:8]}" self.token: str | None = None - self._socket_handle: socket.socket | None = None - self.server_public_key: bytes | None = None self.server_secret_key: bytes | None = None - if not use_ipc_protocol: - self._socket_handle = find_available_socket( - custom_range=custom_port_range, - custom_host=custom_host, - will_close_then_reopen_socket=True, - ) - self.host, self.router_port = self._socket_handle.getsockname() - self.url = f"tcp://{self.host}:{self.router_port}" + self.use_ipc_protocol: bool = use_ipc_protocol + + if port_range is None: + port_range = (51820, 51840 + 1) + else: + if port_range[0] > port_range[1]: + raise ValueError("Minimum port in range is higher than maximum port") + + if port_range[0] == port_range[1]: + port_range = (port_range[0], port_range[0] + 1) + + self.min_port = port_range[0] + self.max_port = port_range[1] + + if use_ipc_protocol: + self.uri = f"ipc:///tmp/socket-{uuid.uuid4().hex[:8]}" + elif self.host is None: + self.host = get_ip_address() if use_token: self.server_public_key, self.server_secret_key = zmq.curve_keypair() self.token = self.server_public_key.decode("utf-8") - def get_socket(self) -> socket.socket | None: - if self._socket_handle: - return self._socket_handle.dup() - return None + def get_uri(self) -> str: + if not self.use_ipc_protocol: + return f"tcp://{self.host}:{self.router_port}" - def get_connection_info(self) -> EvaluatorConnectionInfo: - return EvaluatorConnectionInfo( - self.url, - self.token, - ) + return self.uri diff --git a/src/ert/ensemble_evaluator/evaluator.py b/src/ert/ensemble_evaluator/evaluator.py index 317957f7dca..c3cefe22b4c 100644 --- a/src/ert/ensemble_evaluator/evaluator.py +++ b/src/ert/ensemble_evaluator/evaluator.py @@ -305,12 +305,17 @@ async def _server(self) -> None: self._router_socket.curve_publickey = self._config.server_public_key self._router_socket.curve_server = True - if self._config.router_port: - self._router_socket.bind(f"tcp://*:{self._config.router_port}") + if self._config.use_ipc_protocol: + self._router_socket.bind(self._config.get_uri()) else: - self._router_socket.bind(self._config.url) + self._config.router_port = self._router_socket.bind_to_random_port( + "tcp://*", + min_port=self._config.min_port, + max_port=self._config.max_port, + ) + self._server_started.set_result(None) - except zmq.error.ZMQError as e: + except zmq.error.ZMQBaseError as e: logger.error(f"ZMQ error encountered {e} during evaluator initialization") self._server_started.set_exception(e) zmq_context.destroy(linger=0) diff --git a/src/ert/ensemble_evaluator/evaluator_connection_info.py b/src/ert/ensemble_evaluator/evaluator_connection_info.py deleted file mode 100644 index ac8ec35ef0c..00000000000 --- a/src/ert/ensemble_evaluator/evaluator_connection_info.py +++ /dev/null @@ -1,9 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class EvaluatorConnectionInfo: - """Read only server-info""" - - router_uri: str - token: str | None = None diff --git a/src/ert/ensemble_evaluator/monitor.py b/src/ert/ensemble_evaluator/monitor.py index 8b95af490df..addb74f00a4 100644 --- a/src/ert/ensemble_evaluator/monitor.py +++ b/src/ert/ensemble_evaluator/monitor.py @@ -4,7 +4,7 @@ import logging import uuid from collections.abc import AsyncGenerator -from typing import TYPE_CHECKING, Final +from typing import Final from _ert.events import ( EETerminated, @@ -16,10 +16,6 @@ ) from _ert.forward_model_runner.client import Client -if TYPE_CHECKING: - from ert.ensemble_evaluator.evaluator_connection_info import EvaluatorConnectionInfo - - logger = logging.getLogger(__name__) @@ -30,15 +26,11 @@ class EventSentinel: class Monitor(Client): _sentinel: Final = EventSentinel() - def __init__(self, ee_con_info: EvaluatorConnectionInfo) -> None: + def __init__(self, uri: str, token: str | None = None) -> None: self._id = str(uuid.uuid1()).split("-", maxsplit=1)[0] self._event_queue: asyncio.Queue[Event | EventSentinel] = asyncio.Queue() self._receiver_timeout: float = 60.0 - super().__init__( - ee_con_info.router_uri, - ee_con_info.token, - dealer_name=f"client-{self._id}", - ) + super().__init__(uri, token, dealer_name=f"client-{self._id}") async def process_message(self, msg: str) -> None: event = event_from_json(msg) diff --git a/src/ert/gui/simulation/run_dialog.py b/src/ert/gui/simulation/run_dialog.py index aa73db5dca3..79b868207f6 100644 --- a/src/ert/gui/simulation/run_dialog.py +++ b/src/ert/gui/simulation/run_dialog.py @@ -348,13 +348,8 @@ def run_experiment(self, restart: bool = False) -> None: self._snapshot_model.reset() self._tab_widget.clear() - port_range = None - use_ipc_protocol = False - if self._run_model.queue_system == QueueSystem.LOCAL: - port_range = range(49152, 51819) - use_ipc_protocol = True evaluator_server_config = EvaluatorServerConfig( - custom_port_range=port_range, use_ipc_protocol=use_ipc_protocol + use_ipc_protocol=self._run_model.queue_system == QueueSystem.LOCAL ) def run() -> None: diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index 0bba4d2c818..6389e091912 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -503,7 +503,7 @@ async def run_monitor( ) -> bool: try: logger.debug("connecting to new monitor...") - async with Monitor(ee_config.get_connection_info()) as monitor: + async with Monitor(ee_config.get_uri(), ee_config.token) as monitor: logger.debug("connected") async for event in monitor.track(heartbeat_interval=0.1): if type(event) in { diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index 45e86a96112..0298faf3054 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -95,9 +95,7 @@ def run_server( if args is None: args = parse_args() - if "ERT_STORAGE_TOKEN" in os.environ: - authtoken = os.environ["ERT_STORAGE_TOKEN"] - else: + if (authtoken := os.environ.get("ERT_STORAGE_TOKEN")) is None: authtoken = generate_authtoken() os.environ["ERT_STORAGE_TOKEN"] = authtoken @@ -106,9 +104,7 @@ def run_server( config_args.update(reload=True, reload_dirs=[os.path.dirname(ert_shared_path)]) os.environ["ERT_STORAGE_DEBUG"] = "1" - sock = find_available_socket( - custom_host=args.host, custom_range=range(51850, 51870) - ) + sock = find_available_socket(host=args.host, port_range=range(51850, 51870 + 1)) connection_info = _create_connection_info(sock, authtoken) # Appropriated from uvicorn.main:run diff --git a/src/ert/shared/net_utils.py b/src/ert/shared/net_utils.py index cbd321a79f1..f86ff80e742 100644 --- a/src/ert/shared/net_utils.py +++ b/src/ert/shared/net_utils.py @@ -46,9 +46,8 @@ def get_machine_name() -> str: def find_available_socket( - custom_host: str | None = None, - custom_range: range | None = None, - will_close_then_reopen_socket: bool = False, + host: str | None = None, + port_range: range = range(51820, 51840 + 1), ) -> socket.socket: """ The default and recommended approach here is to return a bound socket to the @@ -70,15 +69,12 @@ def find_available_socket( See e.g. implementation and comments in EvaluatorServerConfig """ - current_host = custom_host if custom_host is not None else _get_ip_address() - current_range = ( - custom_range if custom_range is not None else range(51820, 51840 + 1) - ) + current_host = host if host is not None else get_ip_address() - if current_range.start == current_range.stop: - ports = list(range(current_range.start, current_range.stop + 1)) + if port_range.start == port_range.stop: + ports = list(range(port_range.start, port_range.stop + 1)) else: - ports = list(range(current_range.start, current_range.stop)) + ports = list(range(port_range.start, port_range.stop)) random.shuffle(ports) for port in ports: @@ -86,33 +82,17 @@ def find_available_socket( return _bind_socket( host=current_host, port=port, - will_close_then_reopen_socket=will_close_then_reopen_socket, ) except PortAlreadyInUseException: continue - raise NoPortsInRangeException(f"No available ports in range {current_range}.") + raise NoPortsInRangeException(f"No available ports in range {port_range}.") -def _bind_socket( - host: str, port: int, will_close_then_reopen_socket: bool = False -) -> socket.socket: +def _bind_socket(host: str, port: int) -> socket.socket: try: family = get_family(host=host) sock = socket.socket(family=family, type=socket.SOCK_STREAM) - - # Setting flags like SO_REUSEADDR and/or SO_REUSEPORT may have - # undesirable side-effects but we allow it if caller insists. Refer to - # comment on find_available_socket() - # - # See e.g. https://stackoverflow.com/a/14388707 for an extensive - # explanation of these flags, in particular the part about TIME_WAIT - - if will_close_then_reopen_socket: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - else: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 0) - sock.bind((host, port)) return sock except socket.gaierror as err_info: @@ -139,18 +119,19 @@ def get_family(host: str) -> socket.AddressFamily: # See https://stackoverflow.com/a/28950776 -def _get_ip_address() -> str: +def get_ip_address() -> str: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.settimeout(0) - # try pinging a reserved, internal address in order - # to determine IP representing the default route - s.connect(("10.255.255.255", 1)) - retval = s.getsockname()[0] + try: + s.settimeout(0) + # try pinging a reserved, internal address in order + # to determine IP representing the default route + s.connect(("10.255.255.255", 1)) + address = s.getsockname()[0] + finally: + s.close() except BaseException: - logger.warning("Cannot determine ip-address. Fallback to localhost...") - retval = "127.0.0.1" - finally: - s.close() - logger.debug(f"ip-address: {retval}") - return retval + logger.warning("Cannot determine ip-address. Falling back to localhost.") + address = "127.0.0.1" + logger.debug(f"ip-address: {address}") + return address diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index 64bef7b14ca..49e68c03784 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -85,7 +85,7 @@ def run(self): evaluator_server_config = EvaluatorServerConfig() else: evaluator_server_config = EvaluatorServerConfig( - custom_port_range=range(49152, 51819), use_ipc_protocol=False + port_range=(49152, 51819), use_ipc_protocol=False ) try: diff --git a/tests/ert/unit_tests/ensemble_evaluator/conftest.py b/tests/ert/unit_tests/ensemble_evaluator/conftest.py index b677c95a44d..b0115eb5cd8 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/conftest.py +++ b/tests/ert/unit_tests/ensemble_evaluator/conftest.py @@ -170,6 +170,6 @@ def _dump_forward_model(forward_model, index): @pytest.fixture(name="make_ee_config") def make_ee_config_fixture(): def _ee_config(**kwargs): - return EvaluatorServerConfig(custom_port_range=range(1024, 65535), **kwargs) + return EvaluatorServerConfig(**kwargs) return _ee_config diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py index 8e02ab70558..b024521fe9e 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py @@ -79,9 +79,7 @@ async def mock_failure(message, *args, **kwargs): await evaluator.run_and_get_successful_realizations() -async def test_evaluator_raises_on_invalid_dispatch_event( - make_ee_config, -): +async def test_evaluator_raises_on_invalid_dispatch_event(make_ee_config): evaluator = EnsembleEvaluator(TestEnsemble(0, 2, 2, id_="0"), make_ee_config()) with pytest.raises(ValidationError): @@ -107,9 +105,17 @@ async def test_evaluator_raises_on_start_with_address_in_use(make_ee_config): ctx = zmq.asyncio.Context() socket = ctx.socket(zmq.ROUTER) try: - socket.bind(f"tcp://*:{ee_config.router_port}") + ee_config.router_port = socket.bind_to_random_port( + "tcp://*", + min_port=ee_config.min_port, + max_port=ee_config.max_port, + ) + ee_config.min_port = ee_config.router_port + ee_config.max_port = ee_config.router_port + 1 evaluator = EnsembleEvaluator(TestEnsemble(0, 2, 2, id_="0"), ee_config) - with pytest.raises(zmq.error.ZMQError, match="Address already in use"): + with pytest.raises( + zmq.error.ZMQBindError, match="Could not bind socket to random port" + ): await evaluator.run_and_get_successful_realizations() finally: socket.close() @@ -156,7 +162,7 @@ async def test_new_connections_are_no_problem_when_evaluator_is_closing_down( async def new_connection(): await evaluator._server_done.wait() - async with Monitor(evaluator._config.get_connection_info()): + async with Monitor(evaluator._config.get_uri()): pass new_connection_task = asyncio.create_task(new_connection()) @@ -182,10 +188,9 @@ async def evaluator_to_use_fixture(make_ee_config): async def test_restarted_jobs_do_not_have_error_msgs(evaluator_to_use): evaluator = evaluator_to_use token = evaluator._config.token - url = evaluator._config.get_connection_info().router_uri + url = evaluator._config.get_uri() - config_info = evaluator._config.get_connection_info() - async with Monitor(config_info) as monitor: + async with Monitor(url, token) as monitor: # first snapshot before any event occurs events = monitor.track() snapshot_event = await anext(events) @@ -243,7 +248,7 @@ def is_completed_snapshot(snapshot: EnsembleSnapshot) -> bool: await dispatch.send(event_to_json(event)) # reconnect new monitor - async with Monitor(config_info) as new_monitor: + async with Monitor(url, token) as new_monitor: def check_if_final_snapshot_is_complete(snapshot: EnsembleSnapshot) -> bool: try: @@ -268,12 +273,10 @@ def check_if_final_snapshot_is_complete(snapshot: EnsembleSnapshot) -> bool: @pytest.mark.timeout(20) async def test_new_monitor_can_pick_up_where_we_left_off(evaluator_to_use): evaluator = evaluator_to_use - token = evaluator._config.token - url = evaluator._config.get_connection_info().router_uri + url = evaluator._config.get_uri() - config_info = evaluator._config.get_connection_info() - async with Monitor(config_info) as monitor: + async with Monitor(url, token) as monitor: async with ( Client( url, @@ -375,7 +378,7 @@ def check_if_final_snapshot_is_complete(final_snapshot: EnsembleSnapshot) -> boo return False # reconnect new monitor - async with Monitor(config_info) as new_monitor: + async with Monitor(url, token) as new_monitor: final_snapshot = EnsembleSnapshot() async for event in new_monitor.track(): final_snapshot = final_snapshot.update_from_event(event) @@ -387,7 +390,8 @@ def check_if_final_snapshot_is_complete(final_snapshot: EnsembleSnapshot) -> boo @pytest.mark.integration_test async def test_monitor_receive_heartbeats(evaluator_to_use): evaluator = evaluator_to_use - conn_info = evaluator._config.get_connection_info() + token = evaluator._config.token + url = evaluator._config.get_uri() received_heartbeats = 0 async def mock_receiver(self): @@ -400,7 +404,7 @@ async def mock_receiver(self): received_heartbeats += 1 with patch.object(Monitor, "_receiver", mock_receiver): - async with Monitor(conn_info) as monitor: + async with Monitor(url, token) as monitor: await asyncio.sleep(0.5) await monitor.signal_done() # in 0.5 second we should receive at least 2 heartbeats @@ -415,12 +419,11 @@ async def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_e evaluator._batching_interval = 0.1 evaluator._max_batch_size = 4 - conn_info = evaluator._config.get_connection_info() - async with Monitor(conn_info) as monitor: + token = evaluator._config.token + url = evaluator._config.get_uri() + async with Monitor(url, token) as monitor: events = monitor.track() - token = evaluator._config.token - url = conn_info.router_uri # first snapshot before any event occurs snapshot_event = await anext(events) assert type(snapshot_event) is EESnapshot @@ -495,7 +498,7 @@ def is_completed_snapshot(snapshot): if is_completed_snapshot(snapshot): break # a second monitor connects - async with Monitor(evaluator._config.get_connection_info()) as monitor2: + async with Monitor(url, token) as monitor2: events2 = monitor2.track() full_snapshot_event = await anext(events2) event = cast(EESnapshot, full_snapshot_event) @@ -533,12 +536,11 @@ def is_completed_snapshot(snapshot): async def test_ensure_multi_level_events_in_order(evaluator_to_use): evaluator = evaluator_to_use - config_info = evaluator._config.get_connection_info() - async with Monitor(config_info) as monitor: - events = monitor.track() + token = evaluator._config.token + url = evaluator._config.get_uri() - token = evaluator._config.token - url = config_info.router_uri + async with Monitor(url, token) as monitor: + events = monitor.track() snapshot_event = await anext(events) assert type(snapshot_event) is EESnapshot @@ -612,12 +614,11 @@ def test_overspent_cpu_is_logged( async def test_snapshot_on_resubmit_is_cleared(evaluator_to_use): evaluator = evaluator_to_use evaluator._batching_interval = 0.4 - config_info = evaluator._config.get_connection_info() - async with Monitor(config_info) as monitor: - events = monitor.track() + token = evaluator._config.token + url = evaluator._config.get_uri() - token = evaluator._config.token - url = config_info.router_uri + async with Monitor(url, token) as monitor: + events = monitor.track() snapshot_event = await anext(events) assert type(snapshot_event) is EESnapshot diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator_config.py b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator_config.py index 2d8f2189fe7..3b83b51d2ad 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator_config.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator_config.py @@ -4,34 +4,30 @@ def test_ensemble_evaluator_config_tcp_protocol(unused_tcp_port): - fixed_port = range(unused_tcp_port, unused_tcp_port) + fixed_port = (unused_tcp_port, unused_tcp_port + 1) serv_config = EvaluatorServerConfig( - custom_port_range=fixed_port, - custom_host="127.0.0.1", + port_range=fixed_port, + host="127.0.0.1", use_ipc_protocol=False, ) + serv_config.router_port = unused_tcp_port expected_host = "127.0.0.1" expected_port = unused_tcp_port expected_url = f"tcp://{expected_host}:{expected_port}" - url = urlparse(serv_config.url) + url = urlparse(serv_config.get_uri()) assert url.hostname == expected_host assert url.port == expected_port - assert serv_config.url == expected_url + assert serv_config.get_uri() == expected_url assert serv_config.token is not None assert serv_config.server_public_key is not None assert serv_config.server_secret_key is not None - sock = serv_config.get_socket() - assert sock is not None - assert not sock._closed - sock.close() def test_ensemble_evaluator_config_ipc_protocol(): serv_config = EvaluatorServerConfig(use_ipc_protocol=True, use_token=False) - assert serv_config.url.startswith("ipc:///tmp/socket-") + assert serv_config.get_uri().startswith("ipc:///tmp/socket-") assert serv_config.token is None assert serv_config.server_public_key is None assert serv_config.server_secret_key is None - assert serv_config.get_socket() is None diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py index 7a2ee7291cf..0c5a000f260 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py @@ -35,17 +35,12 @@ async def test_run_legacy_ensemble( tmpdir, make_ensemble, monkeypatch, evaluator_to_use ): num_reals = 2 - custom_port_range = range(1024, 65535) with tmpdir.as_cwd(): ensemble = make_ensemble(monkeypatch, tmpdir, num_reals, 2) - config = EvaluatorServerConfig( - custom_port_range=custom_port_range, - custom_host="127.0.0.1", - use_token=False, - ) + config = EvaluatorServerConfig(use_token=False) async with ( evaluator_to_use(ensemble, config) as evaluator, - Monitor(config.get_connection_info()) as monitor, + Monitor(config.get_uri(), config.token) as monitor, ): async for event in monitor.track(): if type(event) in { @@ -70,20 +65,15 @@ async def test_run_and_cancel_legacy_ensemble( tmpdir, make_ensemble, monkeypatch, evaluator_to_use ): num_reals = 2 - custom_port_range = range(1024, 65535) with tmpdir.as_cwd(): ensemble = make_ensemble(monkeypatch, tmpdir, num_reals, 2, job_sleep=40) - config = EvaluatorServerConfig( - custom_port_range=custom_port_range, - custom_host="127.0.0.1", - use_token=False, - ) + config = EvaluatorServerConfig(use_token=False) terminated_event = False async with ( evaluator_to_use(ensemble, config) as evaluator, - Monitor(config.get_connection_info()) as monitor, + Monitor(config.get_uri(), config.token) as monitor, ): # on lesser hardware the realizations might be killed by max_runtime # and the ensemble is set to STOPPED diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_monitor.py b/tests/ert/unit_tests/ensemble_evaluator/test_monitor.py index 287774ee837..0af4c688bcb 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_monitor.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_monitor.py @@ -14,7 +14,10 @@ ClientConnectionError, ) from ert.ensemble_evaluator import Monitor -from ert.ensemble_evaluator.config import EvaluatorConnectionInfo + + +def localhost_uri(port: int): + return f"tcp://127.0.0.1:{port}" async def async_zmq_server(port, handler, secret_key: bytes | None = None): @@ -32,8 +35,7 @@ async def async_zmq_server(port, handler, secret_key: bytes | None = None): async def test_monitor_connects_and_disconnects_successfully(unused_tcp_port): - ee_con_info = EvaluatorConnectionInfo(f"tcp://127.0.0.1:{unused_tcp_port}") - monitor = Monitor(ee_con_info) + monitor = Monitor(localhost_uri(unused_tcp_port)) messages = [] @@ -63,7 +65,7 @@ async def mock_event_handler(router_socket): async def test_no_connection_established(monkeypatch, make_ee_config): ee_config = make_ee_config() monkeypatch.setattr(Monitor, "DEFAULT_MAX_RETRIES", 0) - monitor = Monitor(ee_config.get_connection_info()) + monitor = Monitor(ee_config.get_uri()) monitor._ack_timeout = 0.1 with pytest.raises(ClientConnectionError): async with monitor: @@ -71,8 +73,6 @@ async def test_no_connection_established(monkeypatch, make_ee_config): async def test_immediate_stop(unused_tcp_port): - ee_con_info = EvaluatorConnectionInfo(f"tcp://127.0.0.1:{unused_tcp_port}") - connected = False async def mock_event_handler(router_socket): @@ -94,7 +94,7 @@ async def mock_event_handler(router_socket): websocket_server_task = asyncio.create_task( async_zmq_server(unused_tcp_port, mock_event_handler) ) - async with Monitor(ee_con_info) as monitor: + async with Monitor(localhost_uri(unused_tcp_port)) as monitor: assert connected is True await monitor.signal_done() await websocket_server_task @@ -105,8 +105,6 @@ async def mock_event_handler(router_socket): async def test_unexpected_close_after_connection_successful( monkeypatch, unused_tcp_port ): - ee_con_info = EvaluatorConnectionInfo(f"tcp://127.0.0.1:{unused_tcp_port}") - monkeypatch.setattr(Monitor, "DEFAULT_MAX_RETRIES", 0) monkeypatch.setattr(Monitor, "DEFAULT_ACK_TIMEOUT", 0.5) @@ -121,7 +119,7 @@ async def mock_event_handler(router_socket): websocket_server_task = asyncio.create_task( async_zmq_server(unused_tcp_port, mock_event_handler) ) - async with Monitor(ee_con_info) as monitor: + async with Monitor(localhost_uri(unused_tcp_port)) as monitor: with pytest.raises(ClientConnectionError): await monitor.signal_done() @@ -139,10 +137,8 @@ async def test_that_monitor_cannot_connect_with_wrong_server_key( correct_server_key, monkeypatch, unused_tcp_port ): public_key, secret_key = zmq.curve_keypair() - ee_con_info = EvaluatorConnectionInfo( - f"tcp://127.0.0.1:{unused_tcp_port}", - public_key.decode("utf-8") if correct_server_key else None, - ) + uri = localhost_uri(unused_tcp_port) + token = public_key.decode("utf-8") if correct_server_key else None monkeypatch.setattr(Monitor, "DEFAULT_MAX_RETRIES", 0) monkeypatch.setattr(Monitor, "DEFAULT_ACK_TIMEOUT", 0.5) @@ -164,12 +160,12 @@ async def mock_event_handler(router_socket): async_zmq_server(unused_tcp_port, mock_event_handler, secret_key=secret_key) ) if correct_server_key: - async with Monitor(ee_con_info): + async with Monitor(uri, token): assert connected assert connected is False else: with pytest.raises(ClientConnectionError): - async with Monitor(ee_con_info): + async with Monitor(uri, token): pass assert connected is False websocket_server_task.cancel() @@ -181,7 +177,7 @@ async def test_that_monitor_track_can_exit_without_terminated_event_from_evaluat unused_tcp_port, caplog ): caplog.set_level(logging.ERROR) - ee_con_info = EvaluatorConnectionInfo(f"tcp://127.0.0.1:{unused_tcp_port}") + uri = localhost_uri(unused_tcp_port) connected = False @@ -204,7 +200,7 @@ async def mock_event_handler(router_socket): async_zmq_server(unused_tcp_port, mock_event_handler) ) - async with Monitor(ee_con_info) as monitor: + async with Monitor(uri) as monitor: monitor._receiver_timeout = 0.1 await monitor.signal_cancel() @@ -223,7 +219,7 @@ async def test_that_monitor_can_emit_heartbeats(unused_tcp_port): exit anytime. A heartbeat is a None event. If the heartbeat is never sent, this test function will hang and then timeout.""" - ee_con_info = EvaluatorConnectionInfo(f"tcp://127.0.0.1:{unused_tcp_port}") + uri = localhost_uri(unused_tcp_port) async def mock_event_handler(router_socket): while True: @@ -237,7 +233,7 @@ async def mock_event_handler(router_socket): async_zmq_server(unused_tcp_port, mock_event_handler) ) - async with Monitor(ee_con_info) as monitor: + async with Monitor(uri) as monitor: async for event in monitor.track(heartbeat_interval=0.001): if event is None: break diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_scheduler.py b/tests/ert/unit_tests/ensemble_evaluator/test_scheduler.py index 83887bf3c04..70edac2547a 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_scheduler.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_scheduler.py @@ -16,7 +16,7 @@ async def test_scheduler_receives_checksum_and_waits_for_disk_sync( tmpdir, make_ensemble, monkeypatch, caplog ): num_reals = 1 - custom_port_range = range(1024, 65535) + port_range = (1024, 65535) async def rename_and_wait(): Path("real_0/job_test_file").rename("real_0/test") @@ -25,7 +25,7 @@ async def rename_and_wait(): Path("real_0/test").rename("real_0/job_test_file") async def _run_monitor(): - async with Monitor(config.get_connection_info()) as monitor: + async with Monitor(config.get_uri(), config.token) as monitor: async for event in monitor.track(): if type(event) is ForwardModelStepChecksum: # Monitor got the checksum message renaming the file @@ -57,8 +57,8 @@ def create_manifest_file(): file_path.write_text("test") # actual_md5sum = hashlib.md5(file_path.read_bytes()).hexdigest() config = EvaluatorServerConfig( - custom_port_range=custom_port_range, - custom_host="127.0.0.1", + port_range=port_range, + host="127.0.0.1", use_token=False, ) evaluator = EnsembleEvaluator(ensemble, config) diff --git a/tests/ert/unit_tests/shared/test_port_handler.py b/tests/ert/unit_tests/shared/test_port_handler.py index b06a41d861b..2a9bea6b8d8 100644 --- a/tests/ert/unit_tests/shared/test_port_handler.py +++ b/tests/ert/unit_tests/shared/test_port_handler.py @@ -1,6 +1,5 @@ import contextlib import socket -import sys import threading import pytest @@ -50,22 +49,22 @@ def test_that_get_machine_name_is_predictive(mocker): def test_find_available_socket(unused_tcp_port): - custom_range = range(unused_tcp_port, unused_tcp_port + 1) - sock = find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") + port_range = range(unused_tcp_port, unused_tcp_port + 1) + sock = find_available_socket(port_range=port_range, host="127.0.0.1") ( host, port, ) = sock.getsockname() assert host is not None assert port is not None - assert port in custom_range + assert port in port_range assert sock is not None assert sock.fileno() != -1 def test_find_available_socket_forced(unused_tcp_port): - custom_range = range(unused_tcp_port, unused_tcp_port) - sock = find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") + port_range = range(unused_tcp_port, unused_tcp_port) + sock = find_available_socket(port_range=port_range, host="127.0.0.1") ( _, port, @@ -79,7 +78,7 @@ def test_invalid_host_name(): invalid_host = "invalid_host" with pytest.raises(InvalidHostException) as exc_info: - find_available_socket(custom_host=invalid_host) + find_available_socket(host=invalid_host) assert ( "Trying to bind socket with what looks " @@ -99,31 +98,20 @@ def test_get_family(): def test_gc_closes_socket(unused_tcp_port): - custom_range = range(unused_tcp_port, unused_tcp_port + 1) + port_range = range(unused_tcp_port, unused_tcp_port + 1) - orig_sock = find_available_socket( - custom_range=custom_range, custom_host="127.0.0.1" - ) + orig_sock = find_available_socket(port_range=port_range, host="127.0.0.1") _, port = orig_sock.getsockname() assert port == unused_tcp_port assert orig_sock is not None assert orig_sock.fileno() != -1 with pytest.raises(NoPortsInRangeException): - find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - - with pytest.raises(NoPortsInRangeException): - find_available_socket( - custom_range=custom_range, - will_close_then_reopen_socket=True, - custom_host="127.0.0.1", - ) + find_available_socket(port_range=port_range, host="127.0.0.1") orig_sock = None - orig_sock = find_available_socket( - custom_range=custom_range, custom_host="127.0.0.1" - ) + orig_sock = find_available_socket(port_range=port_range, host="127.0.0.1") _, port = orig_sock.getsockname() assert port == unused_tcp_port assert orig_sock is not None @@ -161,94 +149,35 @@ def run(self): assert getattr(dummy_server, "data", None) == "Hi there" -# Tests below checks results of trying to get a new socket on an -# already used port over permutations of 3 (boolean) parameters: -# -# - mode when obtaining the first socket (default/reuse) -# - activity on original socket or whether it is never used -# - original socket live or closed -# -# The test-names encodes the permutation, the platform and finally -# whether subsequent calls to find_available_socket() succeeds with -# default-mode and/or reuse-mode. For example: -# -# test_def_active_close_macos_nok_ok -# -# means obtaining first socket in default-mode, activate it and -# then close it. On MacOS, trying to obtain it in default mode -# fails (nok) whereas obtaining it with reuse-flag succeeds (ok) -# -# -# Test identifier | mode | activated | live -# ------------------------------------------+-------+-----------+------ -# test_def_passive_live_nok_nok_close_ok_ok | def | false | both -# -# test_def_active_live_nok_nok | def | true | true -# test_def_active_close_macos_nok_ok | def | true | false -# test_def_active_close_linux_nok_nok | def | true | false -# -# test_reuse_passive_live_macos_nok_nok | reuse | false | true -# test_reuse_passive_live_linux_nok_ok | reuse | false | true -# test_reuse_passive_close_ok_ok | reuse | false | false -# test_reuse_active_live_nok_nok | reuse | true | true -# test_reuse_active_close_nok_ok | reuse | true | false -# -# -# Note the behaviour of the first test: The recommended practice -# is to obtain the port/socket in default mode, keep the socket -# alive as long as the port is needed and provide dup() of the -# socket-object to other modules. If the other module cannot use -# an already bound socket, close the UN-ACTIVATED socket, give -# the port-number to the module and hope that no-one else grabs -# the port in the meantime. :) -# -# If you (for whatever obscure reason) activated the socket (i.e. -# some communication happened on the socket) and THEN provides -# the port-number to another module, you're on the last test and -# have to use reuse-mode when obtaining the first socket, and pray -# that the other module set SO_REUSEADDR before attempting to bind -# its socket. - - -def test_def_passive_live_nok_nok_close_ok_ok(unused_tcp_port): +def test_socket_can_rebind_if_never_used(unused_tcp_port): """ Executive summary of this test - 1. the original socket is obtained in default, recommended mode + 1. the original socket is obtained 2. no activity is triggered on the socket 3. port is not closed but kept alive 4. port can not be re-bound in any mode while socket-object is live 5. port is closed 6. after socket is closed the port can immediately be re-bound in any mode """ - custom_range = range(unused_tcp_port, unused_tcp_port + 1) + port_range = range(unused_tcp_port, unused_tcp_port + 1) # Opening original socket with will_close_then_reopen_socket=False - orig_sock = find_available_socket( - custom_range=custom_range, custom_host="127.0.0.1" - ) + orig_sock = find_available_socket(port_range=port_range, host="127.0.0.1") _, port = orig_sock.getsockname() assert port == unused_tcp_port assert orig_sock is not None assert orig_sock.fileno() != -1 - # When the socket is kept open, this port can not be reused - # with or without setting will_close_then_reopen_socket + # When the socket is kept open with pytest.raises(NoPortsInRangeException): - find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - - with pytest.raises(NoPortsInRangeException): - find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) + find_available_socket(port_range=port_range, host="127.0.0.1") orig_sock.close() # When we close the socket without actually having used it, it is - # immediately reusable with or without setting will_close_then_reopen_socket - sock = find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") + # immediately reusable + sock = find_available_socket(port_range=port_range, host="127.0.0.1") _, port = sock.getsockname() assert port == unused_tcp_port assert sock is not None @@ -257,112 +186,25 @@ def test_def_passive_live_nok_nok_close_ok_ok(unused_tcp_port): # we want to try again, so close it sock.close() - sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) + sock = find_available_socket(port_range=port_range, host="127.0.0.1") _, port = sock.getsockname() assert port == unused_tcp_port assert sock is not None assert sock.fileno() != -1 -def test_reuse_active_close_nok_ok(unused_tcp_port): +def test_socket_can_not_rebind_if_open(unused_tcp_port): """ Executive summary of this test - 1. the original socket is obtained with will_close_then_reopen_socket=True - 2. activity is triggered on the socket using a dummy-server/client - 3. socket is closed - 4. port can not be re-bound in default mode (TIME_WAIT?)... - 5. ... but can with will_close_then_reopen_socket=True (ignoring TIME_WAIT) - """ - custom_range = range(unused_tcp_port, unused_tcp_port + 1) - - # Note: Setting will_close_then_reopen_socket=True on original socket - orig_sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - host, port = orig_sock.getsockname() - assert port == unused_tcp_port - assert orig_sock is not None - assert orig_sock.fileno() != -1 - - # Run a dummy-server to actually use the socket a little, then close it - _simulate_server(host, port, orig_sock) - orig_sock.close() - - # Using will_close_then_reopen_socket=False fails... - with pytest.raises(NoPortsInRangeException): - find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - - # ... but using will_close_then_reopen_socket=True succeeds - sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - host, port = sock.getsockname() - assert port == unused_tcp_port - assert sock is not None - assert sock.fileno() != -1 - - -def test_reuse_active_live_nok_nok(unused_tcp_port): - """ - Executive summary of this test - - 1. the original socket is obtained with will_close_then_reopen_socket=True + 1. the original socket is obtained 2. activity is triggered on the socket using a dummy-server/client 3. socket is not closed but kept alive - 4. port can not be re-bound in default mode (TIME_WAIT?)... - 5. ... but can with will_close_then_reopen_socket=True (ignoring TIME_WAIT) + 4. port can not be re-bound while socket-object is live """ - custom_range = range(unused_tcp_port, unused_tcp_port + 1) + port_range = range(unused_tcp_port, unused_tcp_port + 1) - orig_sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - host, port = orig_sock.getsockname() - assert port == unused_tcp_port - assert orig_sock is not None - assert orig_sock.fileno() != -1 - - # Run a dummy-server to actually use the socket a little, then close it - _simulate_server(host, port, orig_sock) - - # Even with "will_close_then_reopen_socket"=True when obtaining original - # socket, subsequent calls fails - with pytest.raises(NoPortsInRangeException): - find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - - with pytest.raises(NoPortsInRangeException): - find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - - -def test_def_active_live_nok_nok(unused_tcp_port): - """ - Executive summary of this test - - 1. the original socket is obtained in default, recommended mode - 2. activity is triggered on the socket using a dummy-server/client - 3. socket is not closed but kept alive - 4. port can not be re-bound in any mode while socket-object is live - """ - custom_range = range(unused_tcp_port, unused_tcp_port + 1) - - orig_sock = find_available_socket( - custom_range=custom_range, custom_host="127.0.0.1" - ) + orig_sock = find_available_socket(port_range=port_range, host="127.0.0.1") host, port = orig_sock.getsockname() assert port == unused_tcp_port assert orig_sock is not None @@ -373,84 +215,22 @@ def test_def_active_live_nok_nok(unused_tcp_port): # Immediately trying to bind to the same port fails... with pytest.raises(NoPortsInRangeException): - find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - - # ... also using will_close_then_reopen_socket=True - with pytest.raises(NoPortsInRangeException): - find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - - -@pytest.mark.integration_test -@pytest.mark.skipif( - not sys.platform.startswith("darwin"), reason="MacOS-specific socket behaviour" -) -def test_def_active_close_macos_nok_ok(unused_tcp_port): - """ - Executive summary of this test - - 1. the original socket is obtained in default, recommended mode - 2. activity is triggered on the socket using a dummy-server/client - 3. socket is closed - 4. after socket is closed the port can not be re-bound in - default mode (TIME_WAIT?)... - 5. ...but it can be re-bound with will_close_then_reopen_socket=True - (ignoring TIME_WAIT) - """ - custom_range = range(unused_tcp_port, unused_tcp_port + 1) - - orig_sock = find_available_socket( - custom_range=custom_range, custom_host="127.0.0.1" - ) - host, port = orig_sock.getsockname() - assert port == unused_tcp_port - assert orig_sock is not None - assert orig_sock.fileno() != -1 - - # Now, run a dummy-server to actually use the socket a little, then close it - _simulate_server(host, port, orig_sock) - orig_sock.close() - - # Immediately trying to bind to the same port fails - with pytest.raises(NoPortsInRangeException): - find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - - # On MacOS, setting will_close_then_reopen_socket=True in subsequent calls allows - # to reuse the port - sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - host, port = sock.getsockname() - assert port == unused_tcp_port - assert sock is not None - assert sock.fileno() != -1 + find_available_socket(port_range=port_range, host="127.0.0.1") @pytest.mark.integration_test -@pytest.mark.skipif( - not sys.platform.startswith("linux"), reason="Linux-specific socket behaviour" -) -def test_def_active_close_linux_nok_nok(unused_tcp_port): +def test_socket_can_not_rebind_immediately_after_close_if_used(unused_tcp_port): """ Executive summary of this test - 1. the original socket is obtained in default, recommended mode + 1. the original socket is obtained 2. activity is triggered on the socket using a dummy-server/client 3. socket is closed - 4. after socket is closed the port can not be re-bound in - default mode (TIME_WAIT?)... - 5. ...nor with will_close_then_reopen_socket=True (not ignoring TIME_WAIT?) + 4. after socket is closed the port can not be re-bound immediately due to TIME_WAIT """ - custom_range = range(unused_tcp_port, unused_tcp_port + 1) + port_range = range(unused_tcp_port, unused_tcp_port + 1) - orig_sock = find_available_socket( - custom_range=custom_range, custom_host="127.0.0.1" - ) + orig_sock = find_available_socket(port_range=port_range, host="127.0.0.1") host, port = orig_sock.getsockname() assert port == unused_tcp_port assert orig_sock is not None @@ -462,139 +242,4 @@ def test_def_active_close_linux_nok_nok(unused_tcp_port): # Immediately trying to bind to the same port fails with pytest.raises(NoPortsInRangeException): - find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - - # On Linux, setting will_close_then_reopen_socket=True in subsequent calls do - # NOT allow reusing the port in this case - with pytest.raises(NoPortsInRangeException): - find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - - -@pytest.mark.integration_test -@pytest.mark.skipif( - not sys.platform.startswith("darwin"), reason="MacOS-specific socket behaviour" -) -def test_reuse_passive_live_macos_nok_nok(unused_tcp_port): - """ - Executive summary of this test - - 1. the original socket is obtained with will_close_then_reopen_socket=True - 2. no activity is triggered on the socket - 3. the socket is not closed but kept alive - 4. port can not be re-bound in any mode - """ - custom_range = range(unused_tcp_port, unused_tcp_port + 1) - - orig_sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - _, port = orig_sock.getsockname() - assert port == unused_tcp_port - assert orig_sock is not None - assert orig_sock.fileno() != -1 - - # As long as the socket is kept alive this port can not be bound again... - with pytest.raises(NoPortsInRangeException): - find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - - # ... not even when setting will_close_then_reopen_socket=True - with pytest.raises(NoPortsInRangeException): - find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - - -@pytest.mark.skipif( - not sys.platform.startswith("linux"), reason="Linux-specific socket behaviour" -) -def test_reuse_passive_live_linux_nok_ok(unused_tcp_port): - """ - Executive summary of this test - - 1. the original socket is obtained with will_close_then_reopen_socket=True - 2. no activity is triggered on the socket - 3. the socket is not closed but kept alive - 4. port can not be re-bound in default mode... - 5. ... but can with will_close_then_reopen_socket=True - """ - custom_range = range(unused_tcp_port, unused_tcp_port + 1) - - # Opening original socket with will_close_then_reopen_socket=True - orig_sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - _, port = orig_sock.getsockname() - assert port == unused_tcp_port - assert orig_sock is not None - assert orig_sock.fileno() != -1 - - # As long as the socket is kept alive this port can not be bound again... - with pytest.raises(NoPortsInRangeException): - find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - - # ... but on Linux the port can be re-bound by setting this flag! - # This does not seem safe in a multi-user/-process environment! - sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - _, port = orig_sock.getsockname() - assert port == unused_tcp_port - assert sock is not None - assert sock.fileno() != -1 - - -def test_reuse_passive_close_ok_ok(unused_tcp_port): - """ - Executive summary of this test - - 1. the original socket is obtained with will_close_then_reopen_socket=True - 2. no activity is triggered on the socket - 3. the socket is closed - 4. port can be re-bound in any mode - """ - custom_range = range(unused_tcp_port, unused_tcp_port + 1) - - orig_sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - _, port = orig_sock.getsockname() - assert port == unused_tcp_port - assert orig_sock is not None - assert orig_sock.fileno() != -1 - - orig_sock.close() - - # When we close the socket without actually having used it, it is - # immediately reusable with or without setting will_close_then_reopen_socket - sock = find_available_socket(custom_range=custom_range, custom_host="127.0.0.1") - _, port = sock.getsockname() - assert port == unused_tcp_port - assert sock is not None - assert sock.fileno() != -1 - - # we want to try again, so close it - sock.close() - - sock = find_available_socket( - custom_range=custom_range, - custom_host="127.0.0.1", - will_close_then_reopen_socket=True, - ) - _, port = sock.getsockname() - assert port == unused_tcp_port - assert sock is not None - assert sock.fileno() != -1 + find_available_socket(port_range=port_range, host="127.0.0.1") diff --git a/tests/ert/unit_tests/test_tracking.py b/tests/ert/unit_tests/test_tracking.py index fb856eb0cb0..659cbbdba82 100644 --- a/tests/ert/unit_tests/test_tracking.py +++ b/tests/ert/unit_tests/test_tracking.py @@ -184,11 +184,7 @@ def test_tracking( queue, ) - evaluator_server_config = EvaluatorServerConfig( - custom_port_range=range(1024, 65535), - custom_host="127.0.0.1", - use_token=False, - ) + evaluator_server_config = EvaluatorServerConfig(use_token=False) thread = ErtThread( name="ert_cli_simulation_thread", @@ -274,11 +270,7 @@ def test_setting_env_context_during_run( ert_config = ErtConfig.from_file(parsed.config) os.chdir(ert_config.config_path) - evaluator_server_config = EvaluatorServerConfig( - custom_port_range=range(1024, 65535), - custom_host="127.0.0.1", - use_token=False, - ) + evaluator_server_config = EvaluatorServerConfig(use_token=False) queue = Events() model = create_model( ert_config, @@ -350,11 +342,7 @@ def test_run_information_present_as_env_var_in_fm_context( ert_config = ErtConfig.from_file(parsed.config) os.chdir(ert_config.config_path) - evaluator_server_config = EvaluatorServerConfig( - custom_port_range=range(1024, 65535), - custom_host="127.0.0.1", - use_token=False, - ) + evaluator_server_config = EvaluatorServerConfig(use_token=False) queue = Events() model = create_model(ert_config, storage, parsed, queue) diff --git a/tests/everest/test_simulator_cache.py b/tests/everest/test_simulator_cache.py index 26c9b64abdf..039b5283647 100644 --- a/tests/everest/test_simulator_cache.py +++ b/tests/everest/test_simulator_cache.py @@ -1,7 +1,6 @@ import numpy as np import pytest -from ert.config import QueueSystem from ert.ensemble_evaluator import EvaluatorServerConfig from ert.run_models.everest_run_model import EverestRunModel from everest.config import EverestConfig @@ -22,12 +21,7 @@ def new_call(*args): config = EverestConfig.model_validate(config_dict) run_model = EverestRunModel.create(config) - - evaluator_server_config = EvaluatorServerConfig( - custom_port_range=range(49152, 51819) - if run_model._queue_config.queue_system == QueueSystem.LOCAL - else None - ) + evaluator_server_config = EvaluatorServerConfig() # Modify the forward model function to track number of calls: original_call = run_model._forward_model_evaluator