diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 8273bdd4439f..e69d0fc941d7 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -1,30 +1,30 @@ # coding: utf-8 """Tests for lightgbm.dask module""" -import itertools -import os import socket -import sys +from itertools import groupby +from os import getenv +from sys import platform +import lightgbm as lgb import pytest -if not sys.platform.startswith('linux'): +if not platform.startswith('linux'): pytest.skip('lightgbm.dask is currently supported in Linux environments', allow_module_level=True) +if not lgb.compat.DASK_INSTALLED: + pytest.skip('Dask is not installed', allow_module_level=True) import dask.array as da import dask.dataframe as dd import numpy as np import pandas as pd from scipy.stats import spearmanr -import scipy.sparse from dask.array.utils import assert_eq from dask_ml.metrics import accuracy_score, r2_score from distributed.utils_test import client, cluster_fixture, gen_cluster, loop +from scipy.sparse import csr_matrix from sklearn.datasets import make_blobs, make_regression from sklearn.utils import check_random_state -import lightgbm -import lightgbm.dask as dlgbm - from .utils import make_ranking @@ -33,8 +33,8 @@ group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50] pytestmark = [ - pytest.mark.skipif(os.getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'), - pytest.mark.skipif(os.getenv('TASK', '') == 'gpu', reason='Fails to run with GPU interface') + pytest.mark.skipif(getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'), + pytest.mark.skipif(getenv('TASK', '') == 'gpu', reason='Fails to run with GPU interface') ] @@ -51,7 +51,7 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs) X, y, g = make_ranking(n_samples=n_samples, random_state=42, **kwargs) rnd = np.random.RandomState(42) w = rnd.rand(X.shape[0]) * 0.01 - g_rle = np.array([len(list(grp)) for _, grp in itertools.groupby(g)]) + g_rle = np.array([len(list(grp)) for _, grp in groupby(g)]) if output == 'dataframe': # add target, weight, and group to DataFrame so that partitions abide by group boundaries. @@ -115,7 +115,7 @@ def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size dy = dd.from_pandas(y_df, chunksize=chunk_size) dw = dd.from_array(weights, chunksize=chunk_size) elif output == 'scipy_csr_matrix': - dX = da.from_array(X, chunks=(chunk_size, X.shape[1])).map_blocks(scipy.sparse.csr_matrix) + dX = da.from_array(X, chunks=(chunk_size, X.shape[1])).map_blocks(csr_matrix) dy = da.from_array(y, chunks=chunk_size) dw = da.from_array(weights, chunk_size) else: @@ -137,7 +137,7 @@ def test_classifier(output, centers, client, listen_port): "n_estimators": 10, "num_leaves": 10 } - dask_classifier = dlgbm.DaskLGBMClassifier( + dask_classifier = lgb.DaskLGBMClassifier( time_out=5, local_listen_port=listen_port, **params @@ -148,7 +148,7 @@ def test_classifier(output, centers, client, listen_port): s1 = accuracy_score(dy, p1) p1 = p1.compute() - local_classifier = lightgbm.LGBMClassifier(**params) + local_classifier = lgb.LGBMClassifier(**params) local_classifier.fit(X, y, sample_weight=w) p2 = local_classifier.predict(X) p2_proba = local_classifier.predict_proba(X) @@ -176,7 +176,7 @@ def test_classifier_pred_contrib(output, centers, client, listen_port): "n_estimators": 10, "num_leaves": 10 } - dask_classifier = dlgbm.DaskLGBMClassifier( + dask_classifier = lgb.DaskLGBMClassifier( time_out=5, local_listen_port=listen_port, tree_learner='data', @@ -185,7 +185,7 @@ def test_classifier_pred_contrib(output, centers, client, listen_port): dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client) preds_with_contrib = dask_classifier.predict(dX, pred_contrib=True).compute() - local_classifier = lightgbm.LGBMClassifier(**params) + local_classifier = lgb.LGBMClassifier(**params) local_classifier.fit(X, y, sample_weight=w) local_preds_with_contrib = local_classifier.predict(X, pred_contrib=True) @@ -222,7 +222,7 @@ def test_training_does_not_fail_on_port_conflicts(client): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('127.0.0.1', 12400)) - dask_classifier = dlgbm.DaskLGBMClassifier( + dask_classifier = lgb.DaskLGBMClassifier( time_out=5, local_listen_port=12400, n_estimators=5, @@ -250,7 +250,7 @@ def test_classifier_local_predict(client, listen_port): "n_estimators": 10, "num_leaves": 10 } - dask_classifier = dlgbm.DaskLGBMClassifier( + dask_classifier = lgb.DaskLGBMClassifier( time_out=5, local_port=listen_port, **params @@ -258,7 +258,7 @@ def test_classifier_local_predict(client, listen_port): dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client) p1 = dask_classifier.to_local().predict(dX) - local_classifier = lightgbm.LGBMClassifier(**params) + local_classifier = lgb.LGBMClassifier(**params) local_classifier.fit(X, y, sample_weight=w) p2 = local_classifier.predict(X) @@ -280,7 +280,7 @@ def test_regressor(output, client, listen_port): "random_state": 42, "num_leaves": 10 } - dask_regressor = dlgbm.DaskLGBMRegressor( + dask_regressor = lgb.DaskLGBMRegressor( time_out=5, local_listen_port=listen_port, tree='data', @@ -292,7 +292,7 @@ def test_regressor(output, client, listen_port): s1 = r2_score(dy, p1) p1 = p1.compute() - local_regressor = lightgbm.LGBMRegressor(**params) + local_regressor = lgb.LGBMRegressor(**params) local_regressor.fit(X, y, sample_weight=w) s2 = local_regressor.score(X, y) p2 = local_regressor.predict(X) @@ -319,7 +319,7 @@ def test_regressor_pred_contrib(output, client, listen_port): "n_estimators": 10, "num_leaves": 10 } - dask_regressor = dlgbm.DaskLGBMRegressor( + dask_regressor = lgb.DaskLGBMRegressor( time_out=5, local_listen_port=listen_port, tree_learner='data', @@ -328,7 +328,7 @@ def test_regressor_pred_contrib(output, client, listen_port): dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw, client=client) preds_with_contrib = dask_regressor.predict(dX, pred_contrib=True).compute() - local_regressor = lightgbm.LGBMRegressor(**params) + local_regressor = lgb.LGBMRegressor(**params) local_regressor.fit(X, y, sample_weight=w) local_preds_with_contrib = local_regressor.predict(X, pred_contrib=True) @@ -357,7 +357,7 @@ def test_regressor_quantile(output, client, listen_port, alpha): "n_estimators": 10, "num_leaves": 10 } - dask_regressor = dlgbm.DaskLGBMRegressor( + dask_regressor = lgb.DaskLGBMRegressor( local_listen_port=listen_port, tree_learner_type='data_parallel', **params @@ -366,7 +366,7 @@ def test_regressor_quantile(output, client, listen_port, alpha): p1 = dask_regressor.predict(dX).compute() q1 = np.count_nonzero(y < p1) / y.shape[0] - local_regressor = lightgbm.LGBMRegressor(**params) + local_regressor = lgb.LGBMRegressor(**params) local_regressor.fit(X, y, sample_weight=w) p2 = local_regressor.predict(X) q2 = np.count_nonzero(y < p2) / y.shape[0] @@ -381,7 +381,7 @@ def test_regressor_quantile(output, client, listen_port, alpha): def test_regressor_local_predict(client, listen_port): X, y, _, dX, dy, dw = _create_data('regression', output='array') - dask_regressor = dlgbm.DaskLGBMRegressor( + dask_regressor = lgb.DaskLGBMRegressor( local_listen_port=listen_port, random_state=42, n_estimators=10, @@ -419,7 +419,7 @@ def test_ranker(output, client, listen_port, group): "num_leaves": 20, "min_child_samples": 1 } - dask_ranker = dlgbm.DaskLGBMRanker( + dask_ranker = lgb.DaskLGBMRanker( time_out=5, local_listen_port=listen_port, tree_learner_type='data_parallel', @@ -429,7 +429,7 @@ def test_ranker(output, client, listen_port, group): rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = rnkvec_dask.compute() - local_ranker = lightgbm.LGBMRanker(**params) + local_ranker = lgb.LGBMRanker(**params) local_ranker.fit(X, y, sample_weight=w, group=g) rnkvec_local = local_ranker.predict(X) @@ -451,7 +451,7 @@ def test_ranker_local_predict(output, client, listen_port, group): group=group ) - dask_ranker = dlgbm.DaskLGBMRanker( + dask_ranker = lgb.DaskLGBMRanker( time_out=5, local_listen_port=listen_port, tree_learner='data', @@ -475,7 +475,7 @@ def test_find_open_port_works(): worker_ip = '127.0.0.1' with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((worker_ip, 12400)) - new_port = dlgbm._find_open_port( + new_port = lgb.dask._find_open_port( worker_ip=worker_ip, local_listen_port=12400, ports_to_skip=set() @@ -486,7 +486,7 @@ def test_find_open_port_works(): s_1.bind((worker_ip, 12400)) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s_2: s_2.bind((worker_ip, 12401)) - new_port = dlgbm._find_open_port( + new_port = lgb.dask._find_open_port( worker_ip=worker_ip, local_listen_port=12400, ports_to_skip=set() @@ -502,11 +502,11 @@ def f(part): df = dd.demo.make_timeseries() df = df.map_partitions(f, meta=df._meta) with pytest.raises(Exception) as info: - yield dlgbm._train( + yield lgb.dask._train( client=c, data=df, label=df.x, params={}, - model_factory=lightgbm.LGBMClassifier + model_factory=lgb.LGBMClassifier ) assert 'foo' in str(info.value)