From 4782338e553ce1b5daa7a793ac13daf005f5f8e9 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 29 Mar 2021 00:35:26 -0500 Subject: [PATCH 1/5] [dask] make random port search more resilient to random collisions --- python-package/lightgbm/dask.py | 24 ++++++++++++++++++++++++ tests/python_package_test/test_dask.py | 21 +++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index 0153a6370343..60b8d81ea045 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -170,6 +170,18 @@ def _machines_to_worker_map(machines: str, worker_addresses: List[str]) -> Dict[ return out +def _worker_map_has_duplicates(worker_map: Dict[str, int]) -> bool: + """Check if there are any duplicate IP-port pairs in a ``worker_map``""" + host_to_port = defaultdict(set) + for worker, port in worker_map.items(): + host = urlparse(worker).hostname + if port in host_to_port[host]: + return True + else: + host_to_port[host].add(port) + return False + + def _train( client: Client, data: _DaskMatrixLike, @@ -371,6 +383,18 @@ def _train( _find_random_open_port, workers=list(worker_addresses) ) + # handle the case where _find_random_open_port() produces duplicates + retries_left = 10 + while _worker_map_has_duplicates(worker_address_to_port) and retries_left > 0: + retries_left -= 1 + _log_warning( + "Searching for random ports generated duplicates. Trying again (will try %i more times after this)." % retries_left + ) + worker_address_to_port = client.run( + _find_random_open_port, + workers=list(worker_addresses) + ) + machines = ','.join([ '%s:%d' % (urlparse(worker_address).hostname, port) for worker_address, port diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index b1f7ff5605a9..c2cd44554f0c 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -377,6 +377,27 @@ def test_find_random_open_port(client): client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_worker_map_has_duplicates(): + map_with_duplicates = worker_map = { + 'tcp://127.0.0.1:8786': 123, + 'tcp://127.0.0.1:8788': 123, + 'tcp://10.1.1.2:15001': 123 + } + assert lgb.dask._worker_map_has_duplicates(map_with_duplicates) + + map_without_duplicates = { + 'tcp://127.0.0.1:8786': 12405, + 'tcp://10.1.1.2:15001': 12405 + } + assert lgb.dask._worker_map_has_duplicates(map_without_duplicates) is False + + localcluster_map_without_duplicates = { + 'tcp://127.0.0.1:708': 12405, + 'tcp://127.0.0.1:312': 12405, + } + assert lgb.dask._worker_map_has_duplicates(map_without_duplicates) is False + + def test_training_does_not_fail_on_port_conflicts(client): _, _, _, _, dX, dy, dw, _ = _create_data('binary-classification', output='array') From b1cc4eb0db83d2f3a5b468db891687fbf8386300 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 29 Mar 2021 10:35:37 -0500 Subject: [PATCH 2/5] linting --- python-package/lightgbm/dask.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index 60b8d81ea045..79c920641d9f 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -171,7 +171,7 @@ def _machines_to_worker_map(machines: str, worker_addresses: List[str]) -> Dict[ def _worker_map_has_duplicates(worker_map: Dict[str, int]) -> bool: - """Check if there are any duplicate IP-port pairs in a ``worker_map``""" + """Check if there are any duplicate IP-port pairs in a ``worker_map``.""" host_to_port = defaultdict(set) for worker, port in worker_map.items(): host = urlparse(worker).hostname From 05303c813131aeb614cd0b2c200946a829e76f4c Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 29 Mar 2021 20:28:33 -0500 Subject: [PATCH 3/5] more reliable ports check --- python-package/lightgbm/dask.py | 48 ++++++++++++++++++-------- tests/python_package_test/test_dask.py | 37 ++++++++++++-------- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index 2cc5b264eb3c..2ae1a35ebd28 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -170,16 +170,37 @@ def _machines_to_worker_map(machines: str, worker_addresses: List[str]) -> Dict[ return out -def _worker_map_has_duplicates(worker_map: Dict[str, int]) -> bool: - """Check if there are any duplicate IP-port pairs in a ``worker_map``.""" +def _possibly_fix_worker_map_duplicates(worker_map: Dict[str, int], client: Client) -> Dict[str, int]: + """Fix any duplicate IP-port pairs in a ``worker_map``""" + worker_map = deepcopy(worker_map) + workers_that_need_new_ports = [] host_to_port = defaultdict(set) for worker, port in worker_map.items(): host = urlparse(worker).hostname if port in host_to_port[host]: - return True + workers_that_need_new_ports.append(worker) else: host_to_port[host].add(port) - return False + + # if any duplicates were found, search for new ports one by one + for worker in workers_that_need_new_ports: + _log_info(f"Searching for a LightGBM training port for worker '{worker}'") + host = urlparse(worker).hostname + retries_remaining = 100 + while retries_remaining > 0: + retries_remaining -= 1 + new_port = client.submit( + _find_random_open_port, + workers=[worker], + allow_other_workers=False, + pure=False + ).result() + if new_port not in host_to_port[host]: + worker_map[worker] = new_port + host_to_port[host].add(new_port) + break + + return worker_map def _train( @@ -379,21 +400,18 @@ def _train( } else: _log_info("Finding random open ports for workers") + # this approach with client.run() is faster than searching for ports + # serially, but can produce duplicates sometimes. Try the fast approach one + # time, then pass it through a function that will use a slower but more reliable + # approach if duplicates are found. worker_address_to_port = client.run( _find_random_open_port, workers=list(worker_addresses) ) - # handle the case where _find_random_open_port() produces duplicates - retries_left = 10 - while _worker_map_has_duplicates(worker_address_to_port) and retries_left > 0: - retries_left -= 1 - _log_warning( - "Searching for random ports generated duplicates. Trying again (will try %i more times after this)." % retries_left - ) - worker_address_to_port = client.run( - _find_random_open_port, - workers=list(worker_addresses) - ) + worker_address_to_port = _possibly_fix_worker_map_duplicates( + worker_map=worker_address_to_port, + client=client + ) machines = ','.join([ '%s:%d' % (urlparse(worker_address).hostname, port) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index c2cd44554f0c..bcb2061681d1 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -377,25 +377,34 @@ def test_find_random_open_port(client): client.close(timeout=CLIENT_CLOSE_TIMEOUT) -def test_worker_map_has_duplicates(): - map_with_duplicates = worker_map = { - 'tcp://127.0.0.1:8786': 123, - 'tcp://127.0.0.1:8788': 123, - 'tcp://10.1.1.2:15001': 123 - } - assert lgb.dask._worker_map_has_duplicates(map_with_duplicates) +def test_possibly_fix_worker_map(capsys, client): + client.wait_for_workers(2) + worker_addresses = list(client.scheduler_info()["workers"].keys()) + retry_msg = 'Searching for a LightGBM training port for worker' + + # should handle worker maps without any duplicates map_without_duplicates = { - 'tcp://127.0.0.1:8786': 12405, - 'tcp://10.1.1.2:15001': 12405 + worker_address: 12400 + i + for i, worker_address in enumerate(worker_addresses) } - assert lgb.dask._worker_map_has_duplicates(map_without_duplicates) is False + patched_map = lgb.dask._possibly_fix_worker_map_duplicates( + client=client, + worker_map=map_without_duplicates + ) + assert patched_map == map_without_duplicates + assert retry_msg not in capsys.readouterr().out - localcluster_map_without_duplicates = { - 'tcp://127.0.0.1:708': 12405, - 'tcp://127.0.0.1:312': 12405, + # should handle worker maps with duplicates + map_without_duplicates = { + worker_address: 12400 + for i, worker_address in enumerate(worker_addresses) } - assert lgb.dask._worker_map_has_duplicates(map_without_duplicates) is False + patched_map = lgb.dask._possibly_fix_worker_map_duplicates( + client=client, + worker_map=map_without_duplicates + ) + assert retry_msg in capsys.readouterr().out def test_training_does_not_fail_on_port_conflicts(client): From 75cfbbb4c270f59bc1b534de1c43520ad7ba009f Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 29 Mar 2021 21:31:29 -0500 Subject: [PATCH 4/5] address review comments --- python-package/lightgbm/dask.py | 2 +- tests/python_package_test/test_dask.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index 2ae1a35ebd28..3147a6f31006 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -171,7 +171,7 @@ def _machines_to_worker_map(machines: str, worker_addresses: List[str]) -> Dict[ def _possibly_fix_worker_map_duplicates(worker_map: Dict[str, int], client: Client) -> Dict[str, int]: - """Fix any duplicate IP-port pairs in a ``worker_map``""" + """Fix any duplicate IP-port pairs in a ``worker_map``.""" worker_map = deepcopy(worker_map) workers_that_need_new_ports = [] host_to_port = defaultdict(set) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index bcb2061681d1..b886c5ead7c0 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -396,15 +396,16 @@ def test_possibly_fix_worker_map(capsys, client): assert retry_msg not in capsys.readouterr().out # should handle worker maps with duplicates - map_without_duplicates = { + map_with_duplicates = { worker_address: 12400 for i, worker_address in enumerate(worker_addresses) } patched_map = lgb.dask._possibly_fix_worker_map_duplicates( client=client, - worker_map=map_without_duplicates + worker_map=map_with_duplicates ) assert retry_msg in capsys.readouterr().out + assert len(set(patched_map.values())) == len(worker_addresses) def test_training_does_not_fail_on_port_conflicts(client): From bd9a51aaa9e22f25c2d8c0da0f8b975e051262ec Mon Sep 17 00:00:00 2001 From: James Lamb Date: Tue, 30 Mar 2021 17:35:30 -0500 Subject: [PATCH 5/5] add error message --- python-package/lightgbm/dask.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index f1fa94e85d99..b3571bbd786f 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -200,6 +200,11 @@ def _possibly_fix_worker_map_duplicates(worker_map: Dict[str, int], client: Clie host_to_port[host].add(new_port) break + if retries_remaining == 0: + raise LightGBMError( + "Failed to find an open port. Try re-running training or explicitly setting 'machines' or 'local_listen_port'." + ) + return worker_map