Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests][cli] distributed training #4254

Merged
merged 18 commits into from
Jul 4, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .ci/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ if [[ $TASK == "if-else" ]]; then
exit 0
fi

if [[ $TASK == "cli-distributed" ]]; then
conda install -q -y -n $CONDA_ENV numpy pytest scikit-learn
mkdir $BUILD_DIRECTORY/build && cd $BUILD_DIRECTORY/build && cmake .. && make -j4 || exit -1
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
cp $BUILD_DIRECTORY/lightgbm $BUILD_DIRECTORY/tests/distributed/ || exit -1
cd $BUILD_DIRECTORY/python-package/ && python setup.py install --precompile || exit -1
cd $BUILD_DIRECTORY/tests/distributed && pytest _test_distributed.py || exit -1
exit 0
fi

if [[ $TASK == "swig" ]]; then
mkdir $BUILD_DIRECTORY/build && cd $BUILD_DIRECTORY/build
if [[ $OS_NAME == "macos" ]]; then
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ miktex*.zip
**/lgb.Dataset.data
**/model.txt
**/lgb-model.txt
tests/distributed/mlist.txt
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
tests/distributed/train*
tests/distributed/model*
tests/distributed/predictions*.txt
jmoralez marked this conversation as resolved.
Show resolved Hide resolved


# Files from interactive R sessions
.Rproj.user
Expand Down
2 changes: 2 additions & 0 deletions .vsts-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ jobs:
TASK: gpu
METHOD: wheel
PYTHON_VERSION: 3.7
distributed:
TASK: cli-distributed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for writing this so late!
Do you @jameslamb really think that parallel tests should be executed as a separate CI job? I believe that environment initialization overhead is quite big in this case.
From my point of view, we should add new CI jobs in cases when new job requires something special to be done we haven't done before as a part of our CI scripts.
For example,

  • dask is tested within our ordinary (mostly Python-package) tests (we just added some dependencies in conda env), we don't have separate dask CI job
  • mpi requires MPI installation and is mutually exclusive with ordinary installation, hence we have a separate mpi CI job
  • gpu requires installation of boost, GPU drivers and is mutually exclusive with ordinary installation, hence we have a separate gpu CI job
  • regular, sdist, bdist jobs are presented because they all test different ways of Python-package installation and its' correctness
  • swig only compiles SWIG wrapper and doesn't run any tests (even for such scenario we used to compile SWIG artifacts within sdist job before got self-running agents to balance CI loading)
  • cpp_tests requires special CMake flags and doesn't require Python ecosystem initialization, hence we have a separate cpp-tests CI job

Also, please note that C API tests are run with each Python-package-related CI job.
https://github.com/microsoft/LightGBM/blob/master/tests/c_api_test/test_.py

I believe that basic distributed tests which are not requiring MPI installation or other special things can be run with each Python-package-related CI job. Please share your opinion on this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that environment initialization overhead is quite big in this case.

Specifically, the most recent run of this job took 4m15s, about 1m40s of which was spent compiling lightgbm and running the tests (based on timestamps in logs)

2021-05-12T03:28:03.4369397Z -- The C compiler identification is Clang 10.0.0
...
2021-05-12T03:29:43.5905794Z ============================== 2 passed in 1.04s ===

reference: https://dev.azure.com/lightgbm-ci/lightgbm-ci/_build/results?buildId=10053&view=logs&j=cd496dfc-df55-5b41-9968-b20563c04f1a

I believe that basic distributed tests which are not requiring MPI installation or other special things can be run with each Python-package-related CI job

They are "basic" today because I've asked @jmoralez to focus this first PR on only setting up the structure and not on adding a lot of additional tests. But I expect that a lot more tests will be added in the future. Every time we see an issue with lightgbm.dask or from mmlspark and wonder "is this a problem at the level of those interfaces or something from LightGBM's core library?", I think we'll want to add tests here.

I think it's possible that a few months from now, these distributed training tests might take 3 or 4 minutes to run (just the test part, not all of the environment setup). Instead of adding 3 to 4 minutes to each Python CI job, I thought it made sense to add only a single job as a starting point.

If you don't find that convincing, I don't feel strongly enough about it to block the PR until we agree. If you still think these tests should be run on every Python job instead of as a standalone, please advise @jmoralez on specifically what changes should be made (for example).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

focus this first PR on only setting up the structure and not on adding a lot of additional tests.

I do support this.

Every time we see an issue with lightgbm.dask or from mmlspark and wonder "is this a problem at the level of those interfaces or something from LightGBM's core library?", I think we'll want to add tests here.

For sure, that was the reason I created #3841.

You are right that separate CI jobs have their own advantages, but I think that the main benefit of embedding distributed tests into CI jobs with main tests is that in this case you get good coverage and balance for different OSes, compilers and environments in general. You don't need to think about where to setup new job: on Azure, GitHub Action, Appveyor; what compiler to choose on one CI service and what compiler on another one; invent new if rules for setup.py or something else. It is already done and you get it for free.

If you don't find that convincing, I don't feel strongly enough about it to block the PR until we agree.

I believe it's a bad idea to merge PRs that are "convenient" for only one of the maintainers.
We are not speaking about just theoretical things, we are discussing this particular PR and changes it proposes to merge into the master branch. I don't agree with that this is "delaying" PR somehow. This is what PRs are designed for - for discussing changes they contain.

With new info uncovered

The executable is definitely needed.
#4254 (comment)

some previous statements may become irrelevant, for example. But this should be discussed before the merge to prevent code reverts in the future, I suppose.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what PRs are designed for

I don't disagree. Sorry if it sounded like I was complaining, that was not my intention.

You don't need to think about where to setup new job: on Azure, GitHub Action, Appveyor; what compiler to choose on one CI service and what compiler on another one; invent new if rules for setup.py or something else. It is already done and you get it for free.

In this case, @jmoralez WILL have to think about those things, all at once, right now. Since distributed training with the CLI isn't currently tested on Mac or Windows, for example, if failures are experienced adding those tests to jobs for those operating systems then they'll have to be fixed (or ignored with if or pytest.skip) before this can be merged. The same goes for other combinations, like different Python version.

So, said another way, I have a preference for taking smaller, incremental steps for adding this particular type of testing. I prefer adding this type of testing in a single job right now because then two types of work (changing which configurations the job runs on, adding new tests) can happen in parallel.

I think you are saying you'd prefer to add broader coverage in this first PR. I am ok with that and happy to support it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, @jmoralez WILL have to think about those things, all at once, right now.

Oh no, definitely no! Only in case he'd love to do it.

As an example, please remember adding cpp tests with GTest. First, possibility of tests compilation was added in #3555 and was checked for correctness: #3555 (comment). Then, we actually added CI jobs with cpp tests: #4166. And right now different tests are proposed to be added in pending PRs. I think this is a good example of

taking smaller, incremental steps for adding this particular type of testing.

As the core code of this PR is independent to the way we will execute it, I think we can go the same direction here. Leave only testing code in this PR and add CI jobs calling it in a follow-up one(s). WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, definitely no

Oh ok, I guess I misunderstood. I thought that your comment at the beginning of this thread was saying that you wanted this PR to add these tests to all existing Python jobs. I think now that maybe you just meant "for us to consider #3841 fixed, it should be added to all of those jobs".

Leave only testing code in this PR and add CI jobs calling it in a follow-up one(s)

Sure! Let's do that, I agree. So then I think this PR would need the following changes:

  • remove fixes from the PR title and close from the PR description
  • remove the changes to .vsts-ci.yml and .ci/test.sh

@StrikerRUS do you agree?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think now that maybe you just meant "for us to consider #3841 fixed, it should be added to all of those jobs".

Yeah, #3841 should be "fixed" only after adding some meaningful number of tests for distributed code.

do you agree?

Totally agree.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. @jmoralez when you have time, could you please remove the changes from .vsts-ci.yml and .ci/test.sh?

I've removed "fixes" from the PR title.

steps:
- script: |
echo "##vso[task.setvariable variable=BUILD_DIRECTORY]$BUILD_SOURCESDIRECTORY"
Expand Down
157 changes: 157 additions & 0 deletions tests/distributed/_test_distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import copy
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List

import numpy as np
from sklearn.datasets import make_blobs, make_regression
from sklearn.metrics import accuracy_score

import lightgbm as lgb
jmoralez marked this conversation as resolved.
Show resolved Hide resolved


def create_data(task: str, n_samples: int = 1_000) -> np.ndarray:
"""Create the appropiate data for the task.
jmoralez marked this conversation as resolved.
Show resolved Hide resolved

The data is returned as a numpy array with the label as the first column.
"""
if task == 'binary-classification':
centers = [[-4, -4], [4, 4]]
X, y = make_blobs(n_samples, centers=centers, random_state=42)
elif task == 'regression':
X, y = make_regression(n_samples, n_features=4, n_informative=2, random_state=42)
dataset = np.hstack([y.reshape(-1, 1), X])
return dataset


def run_and_log(cmd: List[str]) -> None:
"""Run `cmd` in another process and pipe its logs to this process' stdout."""
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
assert process.stdout is not None

def stdout_stream():
return process.stdout.read(1)

for c in iter(stdout_stream, b''):
sys.stdout.buffer.write(c)


class DistributedMockup:
"""Simulate distributed training."""

default_config = {
'task': 'train',
'pre_partition': True,
'machine_list_file': 'mlist.txt',
'tree_learner': 'data',
'force_row_wise': True,
'verbose': 0,
'num_boost_round': 20,
'num_leaves': 15,
'num_threads': 2,
}

def __init__(self, config: Dict = {}, n_workers: int = 2):
self.config = copy.deepcopy(self.default_config)
self.config.update(config)
self.config['num_machines'] = n_workers
self.n_workers = n_workers

def worker_train(self, i: int) -> None:
"""Start the training process on the `i`-th worker.

If this is the first worker, its logs are piped to stdout.
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
"""
cmd = f'./lightgbm config=train{i}.conf'.split()
if i == 0:
return run_and_log(cmd)
subprocess.run(cmd)
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved

def _set_ports(self) -> None:
"""Randomly assign a port for training to each worker and save all ports to mlist.txt."""
self.listen_ports = [lgb.dask._find_random_open_port() for _ in range(self.n_workers)]
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
with open('mlist.txt', 'wt') as f:
for port in self.listen_ports:
f.write(f'127.0.0.1 {port}\n')

def _write_data(self, partitions: List[np.ndarray]) -> None:
"""Write all training data as train.txt and each training partition as train{i}.txt."""
all_data = np.vstack(partitions)
np.savetxt('train.txt', all_data, delimiter=',')
for i, partition in enumerate(partitions):
np.savetxt(f'train{i}.txt', partition, delimiter=',')

def fit(self, partitions: List[np.ndarray]) -> None:
"""Run the distributed training process on a single machine.

For each worker i:
1. The i-th partition is saved as train{i}.txt
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
2. A random port is assigned for training.
3. A configuration file train{i}.conf is created.
4. The lightgbm binary is called with config=train{i}.conf in another thread.
The whole training set is saved as train.txt and the logs from the first worker are piped to stdout.
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
"""
self._write_data(partitions)
self.label_ = np.hstack([partition[:, 0] for partition in partitions])
self._set_ports()
futures = []
with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
for i in range(self.n_workers):
self.write_train_config(i)
futures.append(executor.submit(self.worker_train, i))
_ = [f.result() for f in futures]

def predict(self) -> np.ndarray:
"""Compute the predictions using the model created in the fit step.

model.txt is used to predict the training set train.txt using predict.conf.
The predictions are saved as predictions.txt and are then loaded to return them as a numpy array.
The logs are piped to stdout.
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
"""
cmd = './lightgbm config=predict.conf'.split()
run_and_log(cmd)
y_pred = np.loadtxt('predictions.txt')
return y_pred

def write_train_config(self, i: int) -> None:
"""Create a file train{i}.txt with the required configuration to train.

Each worker gets a different port and piece of the data, the rest are the
model parameters contained in `self.config`.
"""
with open(f'train{i}.conf', 'wt') as f:
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
f.write(f'output_model = model{i}.txt\n')
f.write(f'local_listen_port = {self.listen_ports[i]}\n')
f.write(f'data = train{i}.txt\n')
for param, value in self.config.items():
f.write(f'{param} = {value}\n')


def test_classifier():
"""Test the classification task."""
num_machines = 2
data = create_data(task='binary-classification')
partitions = np.array_split(data, num_machines)
params = {
'objective': 'binary',
}
clf = DistributedMockup(params)
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
clf.fit(partitions)
y_probas = clf.predict()
y_pred = y_probas > 0.5
assert accuracy_score(clf.label_, y_pred) == 1.


def test_regressor():
"""Test the regression task."""
num_machines = 2
data = create_data(task='regression')
partitions = np.array_split(data, num_machines)
params = {
'objective': 'regression',
}
reg = DistributedMockup(params)
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
reg.fit(partitions)
y_pred = reg.predict()
np.testing.assert_allclose(y_pred, reg.label_, rtol=0.5, atol=50.)
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 4 additions & 0 deletions tests/distributed/predict.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
task = predict
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
data = train.txt
input_model = model0.txt
output_result = predictions.txt