diff --git a/.gitignore b/.gitignore index 66b8a9b4acff..c054aee9111a 100644 --- a/.gitignore +++ b/.gitignore @@ -430,6 +430,11 @@ miktex*.zip **/lgb.Dataset.data **/model.txt **/lgb-model.txt +tests/distributed/mlist.txt +tests/distributed/train* +tests/distributed/model* +tests/distributed/predict* + # Files from interactive R sessions .Rproj.user diff --git a/tests/distributed/_test_distributed.py b/tests/distributed/_test_distributed.py new file mode 100644 index 000000000000..64ffa2b22399 --- /dev/null +++ b/tests/distributed/_test_distributed.py @@ -0,0 +1,198 @@ +import copy +import io +import os +import socket +import subprocess +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, Generator, List + +import numpy as np +import pytest +from sklearn.datasets import make_blobs, make_regression +from sklearn.metrics import accuracy_score + +TESTS_DIR = os.path.abspath(os.path.dirname(__file__)) + + +@pytest.fixture(scope='module') +def executable(pytestconfig) -> str: + """Returns the path to the lightgbm executable.""" + return pytestconfig.getoption('execfile') + + +def _find_random_open_port() -> int: + """Find a random open port on localhost.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + port = s.getsockname()[1] + return port + + +def _generate_n_ports(n: int) -> Generator[int, None, None]: + return (_find_random_open_port() for _ in range(n)) + + +def _write_dict(d: Dict, file: io.TextIOWrapper) -> None: + for k, v in d.items(): + file.write(f'{k} = {v}\n') + + +def create_data(task: str, n_samples: int = 1_000) -> np.ndarray: + """Create the appropriate data for the task. + + The data is returned as a numpy array with the label as the first column. + """ + if task == 'binary-classification': + centers = [[-4, -4], [4, 4]] + X, y = make_blobs(n_samples, centers=centers, random_state=42) + elif task == 'regression': + X, y = make_regression(n_samples, n_features=4, n_informative=2, random_state=42) + dataset = np.hstack([y.reshape(-1, 1), X]) + return dataset + + +class DistributedMockup: + """Simulate distributed training.""" + + default_train_config = { + 'task': 'train', + 'pre_partition': True, + 'machine_list_file': os.path.join(TESTS_DIR, 'mlist.txt'), + 'tree_learner': 'data', + 'force_row_wise': True, + 'verbose': 0, + 'num_boost_round': 20, + 'num_leaves': 15, + 'num_threads': 2, + } + + default_predict_config = { + 'task': 'predict', + 'data': os.path.join(TESTS_DIR, 'train.txt'), + 'input_model': os.path.join(TESTS_DIR, 'model0.txt'), + 'output_result': os.path.join(TESTS_DIR, 'predictions.txt'), + } + + def __init__(self, executable: str): + self.executable = executable + + def worker_train(self, i: int) -> subprocess.CompletedProcess: + """Start the training process on the `i`-th worker.""" + config_path = os.path.join(TESTS_DIR, f'train{i}.conf') + cmd = [self.executable, f'config={config_path}'] + return subprocess.run(cmd) + + def _set_ports(self) -> None: + """Randomly assign a port for training to each worker and save all ports to mlist.txt.""" + ports = set(_generate_n_ports(self.n_workers)) + i = 0 + max_tries = 100 + while i < max_tries and len(ports) < self.n_workers: + n_ports_left = self.n_workers - len(ports) + candidates = _generate_n_ports(n_ports_left) + ports.update(candidates) + i += 1 + if i == max_tries: + raise RuntimeError('Unable to find non-colliding ports.') + self.listen_ports = list(ports) + with open(os.path.join(TESTS_DIR, 'mlist.txt'), 'wt') as f: + for port in self.listen_ports: + f.write(f'127.0.0.1 {port}\n') + + def _write_data(self, partitions: List[np.ndarray]) -> None: + """Write all training data as train.txt and each training partition as train{i}.txt.""" + all_data = np.vstack(partitions) + np.savetxt(os.path.join(TESTS_DIR, 'train.txt'), all_data, delimiter=',') + for i, partition in enumerate(partitions): + np.savetxt(os.path.join(TESTS_DIR, f'train{i}.txt'), partition, delimiter=',') + + def fit(self, partitions: List[np.ndarray], train_config: Dict = {}) -> None: + """Run the distributed training process on a single machine. + + For each worker i: + 1. The i-th partition is saved as train{i}.txt. + 2. A random port is assigned for training. + 3. A configuration file train{i}.conf is created. + 4. The lightgbm binary is called with config=train{i}.conf in another thread. + 5. The trained model is saved as model{i}.txt. Each model file only differs in data and local_listen_port. + The whole training set is saved as train.txt. + """ + self.train_config = copy.deepcopy(self.default_train_config) + self.train_config.update(train_config) + self.n_workers = self.train_config['num_machines'] + self._set_ports() + self._write_data(partitions) + self.label_ = np.hstack([partition[:, 0] for partition in partitions]) + futures = [] + with ThreadPoolExecutor(max_workers=self.n_workers) as executor: + for i in range(self.n_workers): + self.write_train_config(i) + train_future = executor.submit(self.worker_train, i) + futures.append(train_future) + results = [f.result() for f in futures] + for result in results: + if result.returncode != 0: + raise RuntimeError('Error in training') + + def predict(self, predict_config: Dict[str, Any] = {}) -> np.ndarray: + """Compute the predictions using the model created in the fit step. + + predict_config is used to predict the training set train.txt + The predictions are saved as predictions.txt and are then loaded to return them as a numpy array. + """ + self.predict_config = copy.deepcopy(self.default_predict_config) + self.predict_config.update(predict_config) + config_path = os.path.join(TESTS_DIR, 'predict.conf') + with open(config_path, 'wt') as file: + _write_dict(self.predict_config, file) + cmd = [self.executable, f'config={config_path}'] + result = subprocess.run(cmd) + if result.returncode != 0: + raise RuntimeError + y_pred = np.loadtxt(os.path.join(TESTS_DIR, 'predictions.txt')) + return y_pred + + def write_train_config(self, i: int) -> None: + """Create a file train{i}.conf with the required configuration to train. + + Each worker gets a different port and piece of the data, the rest are the + model parameters contained in `self.config`. + """ + with open(os.path.join(TESTS_DIR, f'train{i}.conf'), 'wt') as file: + output_model = os.path.join(TESTS_DIR, f'model{i}.txt') + data = os.path.join(TESTS_DIR, f'train{i}.txt') + file.write(f'output_model = {output_model}\n') + file.write(f'local_listen_port = {self.listen_ports[i]}\n') + file.write(f'data = {data}\n') + _write_dict(self.train_config, file) + + +def test_classifier(executable): + """Test the classification task.""" + num_machines = 2 + data = create_data(task='binary-classification') + partitions = np.array_split(data, num_machines) + train_params = { + 'objective': 'binary', + 'num_machines': num_machines, + } + clf = DistributedMockup(executable) + clf.fit(partitions, train_params) + y_probas = clf.predict() + y_pred = y_probas > 0.5 + assert accuracy_score(clf.label_, y_pred) == 1. + + +def test_regressor(executable): + """Test the regression task.""" + num_machines = 2 + data = create_data(task='regression') + partitions = np.array_split(data, num_machines) + train_params = { + 'objective': 'regression', + 'num_machines': num_machines, + } + reg = DistributedMockup(executable) + reg.fit(partitions, train_params) + y_pred = reg.predict() + np.testing.assert_allclose(y_pred, reg.label_, rtol=0.2, atol=50.) diff --git a/tests/distributed/conftest.py b/tests/distributed/conftest.py new file mode 100644 index 000000000000..d5db71c69513 --- /dev/null +++ b/tests/distributed/conftest.py @@ -0,0 +1,8 @@ +import os + +TESTS_DIR = os.path.dirname(__file__) +default_exec_file = os.path.abspath(os.path.join(TESTS_DIR, '..', '..', 'lightgbm')) + + +def pytest_addoption(parser): + parser.addoption('--execfile', action='store', default=default_exec_file)