Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] run one training task on each worker #4132

Merged
merged 4 commits into from
Mar 29, 2021
Merged

Conversation

jameslamb
Copy link
Collaborator

@jameslamb jameslamb commented Mar 29, 2021

This PR resolves a critical source of instability in lightgbm.dask, which I think might be the cause of some of the issues we have observed sporadically in tests (#4057, #4074, #4099).

Changes in this PR

  • specifies arguments in lightgbm.dask's use of distributed.Client.submit() to more tightly control how Dask schedules LightGBM training tasks

How this improves lightgbm

I believe this PR will fix a critical source of instability that can cause training to fail (either throwing an error or hanging indefinitely) if multiple training tasks are accidentally scheduled onto the same worker. I think that the risk of this issue increases as you have more workers in the Dask cluster and as there is more other work going on in the cluster alongside LightGBM training.

This PR should also improve the stability of LightGBM's Dask unit tests.

Overview

In distributed LightGBM training, users create n workers, each of which runs LightGBM training on a local chunk of the full training data. Estimators in lightgbm.dask expect training data to be given in Dask collections. Those estimators look at which Dask workers hold each partition of the training data, and run one LightGBM training process per worker.

The logic for scheduling those training processes out onto Dask workers is currently incorrect.

futures_classifiers = [
client.submit(
_train_part,
model_factory=model_factory,
params={**params, 'num_threads': worker_ncores[worker]},
list_of_parts=list_of_parts,
machines=machines,
local_listen_port=worker_address_to_port[worker],
num_machines=num_machines,
time_out=params.get('time_out', 120),
return_model=(worker == master_worker),
**kwargs
)
for worker, list_of_parts in worker_map.items()
]

The current code follows this pseudocode, which is like

"given n workers with data, run n training processes on the cluster"

for work in workers:
    client.submit(
        _train_part
    )

However, the desired behavior is slightly different. It should be

"given n workers with data, run exactly 1 training process on each worker"

for work in workers:
    client.submit(
        _train_part,
        workers=worker
    )

This more specific definition is important. If the training data are split across two workers (for example) and LightGBM sets machines to a list with those two workers, but then two _train_part() tasks are scheduled on the same worker, training will fail.

  • if both scheduled onto the worker that is rank 0, the worker will be killed and restarted.
    • distributed.nanny - INFO - Worker process 23623 was killed by signal 11

  • if both scheduled onto a worker that is not rank 0, I saw that LightGBM just hangs indefinitely. If you have task-level timeouts set in Dask, Dask might eventually kill that process. Either way, training will not succeed.

Per the Dask docs, the following parameters should be used in lightgbm.dask to achieve tight control over where tasks run.

  • workers: specify EXACTLY which worker(s) the task being submitted can run on
  • allow_other_workers (default: True False): indicate whether it is ok to run the task on other workers not mentioned in workers
  • pure (default: True): whether or not the function is a "pure function", which according to Wikipedia, means that it always returns the same outputs given the same inputs, does not modify any global state (including the file system), and does not have side effects on output streams

More Background

The bug fixed by this PR is a general class of problem in lightgbm.dask that we should be aware of. By default, Dask assumes that tasks can be scheduled onto any worker, and even that they can be "stolen" from one worker by another).

These default assumptions don't hold for distributed training of LightGBM or XGBoost.

Notes for Reviewers

I checked the blame in dask/distributed, and it looks like the three keyword arguments to client.submit() that I'm specifying here have been a part of the client for at least 3 years. So I don't think they cause any compatibility issues we need to worry about.

https://github.com/dask/distributed/blame/1b32bd30201ef6ced5029180143d2c37b393b586/distributed/client.py#L1234-L1240

@jameslamb jameslamb added the fix label Mar 29, 2021
@jameslamb jameslamb requested a review from StrikerRUS March 29, 2021 03:09
@jameslamb
Copy link
Collaborator Author

@ffineis and @jmoralez , if you have time this week could you take a look at this and check if my Dask understanding is correct?

@ffineis
Copy link
Contributor

ffineis commented Mar 29, 2021

Awesome!! Wow. Yes I'll review this tomorrow PM. But just reading this for a few minutes, I think you're totally on to something and I'm guessing there's a good reason why xgboost.dask also calls client.submit this way.

@jameslamb
Copy link
Collaborator Author

jameslamb commented Mar 29, 2021

welp, test_model_and_local_version_are_picklable_whether_or_not_client_set_explicitly on linux_latest regular still just failed with the following exception:

Exception: LightGBMError('Socket send error, code: 104',)

https://dev.azure.com/lightgbm-ci/lightgbm-ci/_build/results?buildId=9613&view=logs&j=275189f9-c769-596a-7ef9-49fb48a9ab70&t=3a9e7a4a-04e6-52a0-67ea-6e8f6cfda74f

HOWEVER, I still think this PR fixes a critical source of instability. Maybe we just have multiple stability problems in the lightgbm.dask / the Dask tests 😂


UPDATE: looking more closely at the logs, I think this test actually failed with a variation of the issue mentioned in #4112 (comment), where _find_random_open_port() ends up finding the same port for multiple workers.

Finding random open ports for workers
[LightGBM] [Info] Trying to bind port 43103...
[LightGBM] [Info] Binding port 43103 succeeded
[LightGBM] [Info] Listening...
[LightGBM] [Info] Connected to rank 1
[LightGBM] [Info] Local rank: 0, total number of machines: 2
[LightGBM] [Warning] num_threads is set=1, n_jobs=-1 will be ignored. Current value: num_threads=1
[LightGBM] [Info] Trying to bind port 43103...
[LightGBM] [Info] Binding port 43103 succeeded
[LightGBM] [Info] Listening...
[LightGBM] [Info] Connected to rank 1
[LightGBM] [Info] Local rank: 0, total number of machines: 2
[LightGBM] [Warning] num_threads is set=1, n_jobs=-1 will be ignored. Current value: num_threads=1
----------------------------- Captured stderr call -----------------------------
[LightGBM] [Fatal] Socket send error, code: 104
[LightGBM] [Fatal] Socket send error, code: 104
distributed.worker - WARNING -  Compute Failed
Function:  _train_part
args:      ()
kwargs:    {'model_factory': <class 'lightgbm.sklearn.LGBMClassifier'>, 'params': {'boosting_type': 'gbdt', 'class_weight': None, 'colsample_bytree': 1.0, 'importance_type': 'split', 'learning_rate': 0.1, 'max_depth': -1, 'min_child_samples': 20, 'min_child_weight': 0.001, 'min_split_gain': 0.0, 'n_estimators': 1, 'num_leaves': 2, 'objective': None, 'random_state': None, 'reg_alpha': 0.0, 'reg_lambda': 0.0, 'silent': True, 'subsample': 1.0, 'subsample_for_bin': 200000, 'subsample_freq': 0, 'time_out': 5, 'tree_learner': 'data', 'num_threads': 1, 'machines': '127.0.0.1:43103,127.0.0.1:43103', 'local_listen_port': 43103, 'num_machines': 2}, 'list_of_parts': [{'data': array([[ 5.02406253,  4.59252695],
       [ 5.66125921,  3.54290374],
       [-5.60748323, -3.81536614],
       [ 5.07600714,  4.02131165],

Notice that the same port was chosen for both workers!

'machines': '127.0.0.1:43103,127.0.0.1:43103',

That aligns with the idea from #4112 (comment), that _find_random_open_port() isn't as reliably "random" as I originally thought. I'm going to add a separate PR that makes lightgbm.dask more resilient to that in the "just search randomly" case.

@jameslamb
Copy link
Collaborator Author

Added #4133 which I think should fix the "sometimes we get a machines list with duplicate ports" problem.

Pushing an empty commit to re-trigger CI, but leaving the build from #4132 (comment) undisturbed in case anyone wants to look at the logs.

@jameslamb
Copy link
Collaborator Author

The most recent cuda 11.2.2 source (linux, gcc, Python 3.7) build failed with what looks like a temporary issue from the main anaconda channel

Collecting package metadata (current_repodata.json): ...working... failed
                                                                                         
CondaHTTPError: HTTP 502 BAD GATEWAY for url <https://repo.anaconda.com/pkgs/r/noarch/current_repodata.json>
Elapsed: 00:00.056802
CF-RAY: 637a79459a561382-YVR

A remote server error occurred when trying to retrieve this URL.

A 500-type error (e.g. 500, 501, 502, 503, etc.) indicates the server failed to
fulfill a valid request.  The problem may be spurious, and will resolve itself if you
try your request again.  If the problem persists, consider notifying the maintainer
of the remote server.

I've manually restarted that build.

Copy link
Collaborator

@StrikerRUS StrikerRUS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jameslamb Your detailed description makes a lot of sense!

allow_other_workers (default: True): indicate whether it is ok to run the task on other workers not mentioned in workers`

But it looks like this param is already False by default:

image

@jameslamb
Copy link
Collaborator Author

But it looks like this param is already False by default:

It is, but I think we should be explicit about that. In case it wasn't clear, the lack of workers=[worker] was the actual BUG fixed here. The other two parameters (pure and allow_other_workers) added are just for completeness, so you can look at that call and understand exactly how the training tasks should be treated.

@jameslamb
Copy link
Collaborator Author

oh I see I wrote "Default: True` in the PR description. Edited 😂

@github-actions
Copy link

This pull request has been automatically locked since there has not been any recent activity since it was closed. To start a new related discussion, open a new issue at https://github.com/microsoft/LightGBM/issues including a reference to this.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Aug 23, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants