Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] include multiclass-classification task in tests #4048

Merged
merged 4 commits into from
Mar 10, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 46 additions & 59 deletions tests/python_package_test/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,21 @@
# see https://distributed.dask.org/en/latest/api.html#distributed.Client.close
CLIENT_CLOSE_TIMEOUT = 120

tasks = ['classification', 'regression', 'ranking']
tasks = ['binary-classification', 'multiclass-classification', 'regression', 'ranking']
data_output = ['array', 'scipy_csr_matrix', 'dataframe', 'dataframe-with-categorical']
data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]]
group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50]
task_to_dask_factory = {
'regression': lgb.DaskLGBMRegressor,
'binary-classification': lgb.DaskLGBMClassifier,
'multiclass-classification': lgb.DaskLGBMClassifier,
'ranking': lgb.DaskLGBMRanker
}
task_to_local_factory = {
'regression': lgb.LGBMRegressor,
'binary-classification': lgb.LGBMClassifier,
'multiclass-classification': lgb.LGBMClassifier,
'ranking': lgb.LGBMRanker
}

pytestmark = [
pytest.mark.skipif(getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'),
Expand Down Expand Up @@ -120,8 +131,14 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs)
return X, y, w, g_rle, dX, dy, dw, dg


def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size=50):
if objective == 'classification':
def _create_data(objective, n_samples=100, output='array', chunk_size=50):
if objective.endswith('classification'):
if objective == 'binary-classification':
centers = 2
elif objective == 'multiclass-classification':
centers = 3
else:
raise ValueError(f"Unknown classification task '{objective}'")
X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42)
elif objective == 'regression':
X, y = make_regression(n_samples=n_samples, random_state=42)
Expand Down Expand Up @@ -206,12 +223,11 @@ def _unpickle(filepath, serializer):


@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('centers', data_centers)
def test_classifier(output, centers, client):
@pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification'])
def test_classifier(output, task, client):
X, y, w, dX, dy, dw = _create_data(
objective='classification',
output=output,
centers=centers
objective=task,
output=output
)

params = {
Expand Down Expand Up @@ -273,12 +289,11 @@ def test_classifier(output, centers, client):


@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('centers', data_centers)
def test_classifier_pred_contrib(output, centers, client):
@pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification'])
def test_classifier_pred_contrib(output, task, client):
X, y, w, dX, dy, dw = _create_data(
objective='classification',
output=output,
centers=centers
objective=task,
output=output
)

params = {
Expand Down Expand Up @@ -354,7 +369,7 @@ def test_find_random_open_port(client):


def test_training_does_not_fail_on_port_conflicts(client):
_, _, _, dX, dy, dw = _create_data('classification', output='array')
_, _, _, dX, dy, dw = _create_data('binary-classification', output='array')

lightgbm_default_port = 12400
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
Expand Down Expand Up @@ -640,17 +655,13 @@ def test_training_works_if_client_not_provided_or_set_after_construction(task, c
output='array',
group=None
)
model_factory = lgb.DaskLGBMRanker
else:
_, _, _, dX, dy, _ = _create_data(
objective=task,
output='array',
)
dg = None
if task == 'classification':
model_factory = lgb.DaskLGBMClassifier
elif task == 'regression':
model_factory = lgb.DaskLGBMRegressor
model_factory = task_to_dask_factory[task]

params = {
"time_out": 5,
Expand Down Expand Up @@ -744,12 +755,7 @@ def test_model_and_local_version_are_picklable_whether_or_not_client_set_explici
)
dg_2 = None

if task == 'ranking':
model_factory = lgb.DaskLGBMRanker
elif task == 'classification':
model_factory = lgb.DaskLGBMClassifier
elif task == 'regression':
model_factory = lgb.DaskLGBMRegressor
model_factory = task_to_dask_factory[task]

params = {
"time_out": 5,
Expand Down Expand Up @@ -970,21 +976,16 @@ def collection_to_single_partition(collection):
output=output,
group=None
)
dask_model_factory = lgb.DaskLGBMRanker
local_model_factory = lgb.LGBMRanker
else:
X, y, w, dX, dy, dw = _create_data(
objective=task,
output=output
)
g = None
dg = None
if task == 'classification':
dask_model_factory = lgb.DaskLGBMClassifier
local_model_factory = lgb.LGBMClassifier
elif task == 'regression':
dask_model_factory = lgb.DaskLGBMRegressor
local_model_factory = lgb.LGBMRegressor

dask_model_factory = task_to_dask_factory[task]
local_model_factory = task_to_local_factory[task]

dX = collection_to_single_partition(dX)
dy = collection_to_single_partition(dy)
Expand Down Expand Up @@ -1029,18 +1030,15 @@ def test_network_params_not_required_but_respected_if_given(client, task, output
group=None,
chunk_size=10,
)
dask_model_factory = lgb.DaskLGBMRanker
else:
_, _, _, dX, dy, _ = _create_data(
objective=task,
output=output,
chunk_size=10,
)
dg = None
if task == 'classification':
dask_model_factory = lgb.DaskLGBMClassifier
elif task == 'regression':
dask_model_factory = lgb.DaskLGBMRegressor

dask_model_factory = task_to_dask_factory[task]

# rebalance data to be sure that each worker has a piece of the data
if output == 'array':
Expand Down Expand Up @@ -1103,18 +1101,15 @@ def test_machines_should_be_used_if_provided(task, output):
group=None,
chunk_size=10,
)
dask_model_factory = lgb.DaskLGBMRanker
else:
_, _, _, dX, dy, _ = _create_data(
objective=task,
output=output,
chunk_size=10,
)
dg = None
if task == 'classification':
dask_model_factory = lgb.DaskLGBMClassifier
elif task == 'regression':
dask_model_factory = lgb.DaskLGBMRegressor

dask_model_factory = task_to_dask_factory[task]

# rebalance data to be sure that each worker has a piece of the data
if output == 'array':
Expand Down Expand Up @@ -1201,17 +1196,15 @@ def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(
output='dataframe',
group=None
)
model_factory = lgb.DaskLGBMRanker
else:
_, _, _, dX, dy, dw = _create_data(
objective=task,
output='dataframe',
)
dg = None
if task == 'classification':
model_factory = lgb.DaskLGBMClassifier
elif task == 'regression':
model_factory = lgb.DaskLGBMRegressor

model_factory = task_to_dask_factory[task]

dy = dy.to_dask_array(lengths=True)
dy_col_array = dy.reshape(-1, 1)
assert len(dy_col_array.shape) == 2 and dy_col_array.shape[1] == 1
Expand All @@ -1229,12 +1222,9 @@ def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(
client.close(timeout=CLIENT_CLOSE_TIMEOUT)


@pytest.mark.parametrize('task', tasks)
@pytest.mark.parametrize('task', ['binary-classification', 'ranking', 'regression'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that test_init_score fails for the multiclass-classification task but I removed it from the tasks in that test and can make a different PR adding it back once #4046 is solved.

This PR should include a test for init_score used for multiclass classification. I agree with you that the "pass a 1D array" interface could lead to mistakes in the multiclass classification case, but I don't consider #4046 a bug and I expect that a PR that addresses #4046 would only ADD the ability to pass an array of shape (n_samples, n_classes), not remove the current behavior of passing a 1D array. I expect that it will end up that way so that we don't break existing users' code.

Here's a minimal example showing that the current 1D-array behavior can work in the Dask interface.

from sklearn.datasets import make_blobs
import lightgbm as lgb
import numpy as np
import dask.array as da
from distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=2)
client = Client(cluster)

X, y = make_blobs(n_samples=1000, n_features=50, centers=3)

dX = da.from_array(X, chunks=(100, 50))
dy = da.from_array(y, chunks=(100,))

init_scores = dy.map_blocks(
    lambda x: np.repeat(0.8, x.size * 3),
    dtype=np.float64
)

assert init_scores.npartitions == dy.npartitions

dask_model = lgb.DaskLGBMClassifier()
dask_model.fit(dX, dy, init_score=init_scores)

assert dask_model.booster_.trees_to_dataframe()['value'][0] == 0.0

So for this test, can you please just change the init_score setup to something like this?

# init_scores must be a 1D array, even for multiclass classification
# where you need to provide 1 score per class for each row in X
# https://github.com/microsoft/LightGBM/issues/4046
size_factor = 1
if task == "multiclass-classification":
    size_factor = 3

if output.startswith('dataframe'):
    init_scores = dy.map_partitions(
        lambda x: pd_Series(np.repeat(init_score, x.size * size_factor))
    )
else:
    init_scores = dy.map_blocks(
        lambda x: np.repeat(init_score, x.size * size_factor),
        dtype=np.float64
    )

@pytest.mark.parametrize('output', data_output)
def test_init_score(
task,
output,
client):
def test_init_score(task, output, client):
if task == 'ranking' and output == 'scipy_csr_matrix':
pytest.skip('LGBMRanker is not currently tested on sparse matrices')

Expand All @@ -1243,17 +1233,14 @@ def test_init_score(
output=output,
group=None
)
model_factory = lgb.DaskLGBMRanker
else:
_, _, _, dX, dy, dw = _create_data(
objective=task,
output=output,
)
dg = None
if task == 'classification':
model_factory = lgb.DaskLGBMClassifier
elif task == 'regression':
model_factory = lgb.DaskLGBMRegressor

model_factory = task_to_dask_factory[task]

params = {
'n_estimators': 1,
Expand Down