Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask][tests] skip Dask tests when Dask is not installed and improve imports in Dask tests #3852

Merged
merged 2 commits into from
Jan 25, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions tests/python_package_test/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
# coding: utf-8
"""Tests for lightgbm.dask module"""

import itertools
import os
import socket
import sys
from itertools import groupby
from os import getenv
from sys import platform

import lightgbm as lgb
import pytest
if not sys.platform.startswith('linux'):
if not platform.startswith('linux'):
pytest.skip('lightgbm.dask is currently supported in Linux environments', allow_module_level=True)
if not lgb.compat.DASK_INSTALLED:
pytest.skip('Dask is not installed', allow_module_level=True)

import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
from scipy.stats import spearmanr
import scipy.sparse
from dask.array.utils import assert_eq
from dask_ml.metrics import accuracy_score, r2_score
from distributed.utils_test import client, cluster_fixture, gen_cluster, loop
from scipy.sparse import csr_matrix
from sklearn.datasets import make_blobs, make_regression
from sklearn.utils import check_random_state

import lightgbm
import lightgbm.dask as dlgbm

from .utils import make_ranking


Expand All @@ -33,8 +33,8 @@
group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50]

pytestmark = [
pytest.mark.skipif(os.getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'),
pytest.mark.skipif(os.getenv('TASK', '') == 'gpu', reason='Fails to run with GPU interface')
pytest.mark.skipif(getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'),
pytest.mark.skipif(getenv('TASK', '') == 'gpu', reason='Fails to run with GPU interface')
]


Expand All @@ -51,7 +51,7 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs)
X, y, g = make_ranking(n_samples=n_samples, random_state=42, **kwargs)
rnd = np.random.RandomState(42)
w = rnd.rand(X.shape[0]) * 0.01
g_rle = np.array([len(list(grp)) for _, grp in itertools.groupby(g)])
g_rle = np.array([len(list(grp)) for _, grp in groupby(g)])

if output == 'dataframe':
# add target, weight, and group to DataFrame so that partitions abide by group boundaries.
Expand Down Expand Up @@ -115,7 +115,7 @@ def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size
dy = dd.from_pandas(y_df, chunksize=chunk_size)
dw = dd.from_array(weights, chunksize=chunk_size)
elif output == 'scipy_csr_matrix':
dX = da.from_array(X, chunks=(chunk_size, X.shape[1])).map_blocks(scipy.sparse.csr_matrix)
dX = da.from_array(X, chunks=(chunk_size, X.shape[1])).map_blocks(csr_matrix)
dy = da.from_array(y, chunks=chunk_size)
dw = da.from_array(weights, chunk_size)
else:
Expand All @@ -137,7 +137,7 @@ def test_classifier(output, centers, client, listen_port):
"n_estimators": 10,
"num_leaves": 10
}
dask_classifier = dlgbm.DaskLGBMClassifier(
dask_classifier = lgb.DaskLGBMClassifier(
time_out=5,
local_listen_port=listen_port,
**params
Expand All @@ -148,7 +148,7 @@ def test_classifier(output, centers, client, listen_port):
s1 = accuracy_score(dy, p1)
p1 = p1.compute()

local_classifier = lightgbm.LGBMClassifier(**params)
local_classifier = lgb.LGBMClassifier(**params)
local_classifier.fit(X, y, sample_weight=w)
p2 = local_classifier.predict(X)
p2_proba = local_classifier.predict_proba(X)
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_classifier_pred_contrib(output, centers, client, listen_port):
"n_estimators": 10,
"num_leaves": 10
}
dask_classifier = dlgbm.DaskLGBMClassifier(
dask_classifier = lgb.DaskLGBMClassifier(
time_out=5,
local_listen_port=listen_port,
tree_learner='data',
Expand All @@ -185,7 +185,7 @@ def test_classifier_pred_contrib(output, centers, client, listen_port):
dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client)
preds_with_contrib = dask_classifier.predict(dX, pred_contrib=True).compute()

local_classifier = lightgbm.LGBMClassifier(**params)
local_classifier = lgb.LGBMClassifier(**params)
local_classifier.fit(X, y, sample_weight=w)
local_preds_with_contrib = local_classifier.predict(X, pred_contrib=True)

Expand Down Expand Up @@ -222,7 +222,7 @@ def test_training_does_not_fail_on_port_conflicts(client):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 12400))

dask_classifier = dlgbm.DaskLGBMClassifier(
dask_classifier = lgb.DaskLGBMClassifier(
time_out=5,
local_listen_port=12400,
n_estimators=5,
Expand Down Expand Up @@ -250,15 +250,15 @@ def test_classifier_local_predict(client, listen_port):
"n_estimators": 10,
"num_leaves": 10
}
dask_classifier = dlgbm.DaskLGBMClassifier(
dask_classifier = lgb.DaskLGBMClassifier(
time_out=5,
local_port=listen_port,
**params
)
dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client)
p1 = dask_classifier.to_local().predict(dX)

local_classifier = lightgbm.LGBMClassifier(**params)
local_classifier = lgb.LGBMClassifier(**params)
local_classifier.fit(X, y, sample_weight=w)
p2 = local_classifier.predict(X)

Expand All @@ -280,7 +280,7 @@ def test_regressor(output, client, listen_port):
"random_state": 42,
"num_leaves": 10
}
dask_regressor = dlgbm.DaskLGBMRegressor(
dask_regressor = lgb.DaskLGBMRegressor(
time_out=5,
local_listen_port=listen_port,
tree='data',
Expand All @@ -292,7 +292,7 @@ def test_regressor(output, client, listen_port):
s1 = r2_score(dy, p1)
p1 = p1.compute()

local_regressor = lightgbm.LGBMRegressor(**params)
local_regressor = lgb.LGBMRegressor(**params)
local_regressor.fit(X, y, sample_weight=w)
s2 = local_regressor.score(X, y)
p2 = local_regressor.predict(X)
Expand All @@ -319,7 +319,7 @@ def test_regressor_pred_contrib(output, client, listen_port):
"n_estimators": 10,
"num_leaves": 10
}
dask_regressor = dlgbm.DaskLGBMRegressor(
dask_regressor = lgb.DaskLGBMRegressor(
time_out=5,
local_listen_port=listen_port,
tree_learner='data',
Expand All @@ -328,7 +328,7 @@ def test_regressor_pred_contrib(output, client, listen_port):
dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw, client=client)
preds_with_contrib = dask_regressor.predict(dX, pred_contrib=True).compute()

local_regressor = lightgbm.LGBMRegressor(**params)
local_regressor = lgb.LGBMRegressor(**params)
local_regressor.fit(X, y, sample_weight=w)
local_preds_with_contrib = local_regressor.predict(X, pred_contrib=True)

Expand Down Expand Up @@ -357,7 +357,7 @@ def test_regressor_quantile(output, client, listen_port, alpha):
"n_estimators": 10,
"num_leaves": 10
}
dask_regressor = dlgbm.DaskLGBMRegressor(
dask_regressor = lgb.DaskLGBMRegressor(
local_listen_port=listen_port,
tree_learner_type='data_parallel',
**params
Expand All @@ -366,7 +366,7 @@ def test_regressor_quantile(output, client, listen_port, alpha):
p1 = dask_regressor.predict(dX).compute()
q1 = np.count_nonzero(y < p1) / y.shape[0]

local_regressor = lightgbm.LGBMRegressor(**params)
local_regressor = lgb.LGBMRegressor(**params)
local_regressor.fit(X, y, sample_weight=w)
p2 = local_regressor.predict(X)
q2 = np.count_nonzero(y < p2) / y.shape[0]
Expand All @@ -381,7 +381,7 @@ def test_regressor_quantile(output, client, listen_port, alpha):
def test_regressor_local_predict(client, listen_port):
X, y, _, dX, dy, dw = _create_data('regression', output='array')

dask_regressor = dlgbm.DaskLGBMRegressor(
dask_regressor = lgb.DaskLGBMRegressor(
local_listen_port=listen_port,
random_state=42,
n_estimators=10,
Expand Down Expand Up @@ -419,7 +419,7 @@ def test_ranker(output, client, listen_port, group):
"num_leaves": 20,
"min_child_samples": 1
}
dask_ranker = dlgbm.DaskLGBMRanker(
dask_ranker = lgb.DaskLGBMRanker(
time_out=5,
local_listen_port=listen_port,
tree_learner_type='data_parallel',
Expand All @@ -429,7 +429,7 @@ def test_ranker(output, client, listen_port, group):
rnkvec_dask = dask_ranker.predict(dX)
rnkvec_dask = rnkvec_dask.compute()

local_ranker = lightgbm.LGBMRanker(**params)
local_ranker = lgb.LGBMRanker(**params)
local_ranker.fit(X, y, sample_weight=w, group=g)
rnkvec_local = local_ranker.predict(X)

Expand All @@ -451,7 +451,7 @@ def test_ranker_local_predict(output, client, listen_port, group):
group=group
)

dask_ranker = dlgbm.DaskLGBMRanker(
dask_ranker = lgb.DaskLGBMRanker(
time_out=5,
local_listen_port=listen_port,
tree_learner='data',
Expand All @@ -475,7 +475,7 @@ def test_find_open_port_works():
worker_ip = '127.0.0.1'
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((worker_ip, 12400))
new_port = dlgbm._find_open_port(
new_port = lgb.dask._find_open_port(
worker_ip=worker_ip,
local_listen_port=12400,
ports_to_skip=set()
Expand All @@ -486,7 +486,7 @@ def test_find_open_port_works():
s_1.bind((worker_ip, 12400))
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s_2:
s_2.bind((worker_ip, 12401))
new_port = dlgbm._find_open_port(
new_port = lgb.dask._find_open_port(
worker_ip=worker_ip,
local_listen_port=12400,
ports_to_skip=set()
Expand All @@ -502,11 +502,11 @@ def f(part):
df = dd.demo.make_timeseries()
df = df.map_partitions(f, meta=df._meta)
with pytest.raises(Exception) as info:
yield dlgbm._train(
yield lgb.dask._train(
client=c,
data=df,
label=df.x,
params={},
model_factory=lightgbm.LGBMClassifier
model_factory=lgb.LGBMClassifier
)
assert 'foo' in str(info.value)