From 2f30d11abe9bf6687a9ed809537e339bb98d8249 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Sun, 28 Feb 2021 21:26:31 -0600 Subject: [PATCH 01/15] [dask] add tutorial documentation (fixes #3814, fixes #3838) --- docs/Parallel-Learning-Guide.rst | 99 ++++++++++++++++++++++ docs/_static/images/dask-concat.svg | 1 + docs/_static/images/dask-initial-setup.svg | 1 + 3 files changed, 101 insertions(+) create mode 100644 docs/_static/images/dask-concat.svg create mode 100644 docs/_static/images/dask-initial-setup.svg diff --git a/docs/Parallel-Learning-Guide.rst b/docs/Parallel-Learning-Guide.rst index 550d8b1dfea2..bbc285b26395 100644 --- a/docs/Parallel-Learning-Guide.rst +++ b/docs/Parallel-Learning-Guide.rst @@ -62,6 +62,99 @@ Dask LightGBM's Python package supports distributed learning via `Dask`_. This integration is maintained by LightGBM's maintainers. +Quick Dask Examples +''''''''''''''''''' + +Dask-Based Training +''''''''''''''''''' + +This section contains detailed information on performing LightGBM distributed training using Dask. + +Configuring the Dask Cluster +**************************** + +**Allocating Threads** + +When setting up a Dask cluster for training, give each Dask worker process at least two threads. If you do not do this, training might be substantially slower because communication work and training work will block each other. + +If you do not have other significant processes competing with Dask for resources, just accept the default ``nthreads`` from your chosen ``dask.distributed`` cluster. + +.. code:: python + + from distributed import Client, LocalCluster + + cluster = LocalCluster(n_workers=3) + client = Client(cluster) + +**Managing Memory** + +Use the Dask diagnostic dashboard or your preferred monitoring tool to monitor Dask workers' memory consumption during training. As described in `the Dask worker documentation`_, Dask workers will automatically start spilling data to Disk if memory consumptio gets too high. This can substantially slow down computations, since disk I/O is usually much slower than reading the same data from memory. + + `At 60% of memory load, [Dask will] spill least recently used data to disk` + +To reduce the risk of hitting memory limits, consider restarting each worker process before running any data loading or training code. + +.. code:: python + + client.restart() + +Setting Up Training Data +************************* + +The estimators in ``lightgbm.dask`` expect that matrix-like or array-like data are provided in Dask DataFrame, Dask Array, or (in some cases) Dask Series format. See `the Dask DataFrame documentation`_ and `the Dask Array documentation`_ for more information on how to create such data structures. + +.. image:: ./_static/images/dask-initial-setup.svg + :align: center + :width: 600px + :alt: On the left, rectangles showing a 5 by 5 grid for a local dataset. On the right, two circles representing Dask workers, one with a 3 by 5 grid and one with a 2 by 5 grid. + :target: ./_static/images/dask-initial-setup.svg + +While setting up for training, ``lightgbm`` will concatenate all of the partitions on a work into a single dataset. Distributed training then proceeds with one LightGBM worker process per Dask worker. + +.. image:: ./_static/images/dask-concat.svg + :align: center + :width: 600px + :alt: A section labeled "before" showing two grids and a section labeled "after" showing a single grid that looks like the two from "before" stacked one on top of the other. + :target: ./_static/images/dask-concat.svg + +When setting up data partitioning for LightGBM training with Dask, try to follow these suggestions: + +* ensure that each worker in the cluster has a piece of the training data +* try to give each worker roughly the same amount of data, especially if your dataset is small +* if you plan to train multiple models (for example, to tune hyperparameters) on the same data use ``distributed.Client.persist()`` before training to materialize the data one time + +Using a Specific Dask Client +**************************** + +In most situations, you should not need to tell ``lightgbm.dask`` to use a specific Dask client. By default, whenever you use code from that module LightGBM will call ``distributed.default_client()`` to find the most recent created client. + +However, you might want to explicitly control the Dask client used by LightGBM if you have multiple active clients in the same session. This is useful in more complex workflows like running multiple training jobs on different Dask clusters. + +LightGBM's Dask estimators support setting an attribute ``client`` to control the client that is used. + +.. code:: python + + import lightgbm as lgb + from distributed import LocalCluster, Client + + cluster = LocalCluster() + client = Client(cluster) + + # option 1: keyword argumentt in constructor + clf = lgb.DaskLGBMClassifier(client=client) + + # option 2: set_params() after construction + clf = lgb.DaskLGBMClassifier() + clf.set_params(client=client) + +Note that the ``client`` for an estimator will not be stored if the model object is pickled. If you want to control the client used by a model object loaded from disk, use ``set_params()`` after loading. For more details on that, see `Saving Dask Models <#saving-dask-models>`__. + +Using Specific Ports +******************** + +Saving Dask Models +'''''''''''''''''' + Kubeflow ^^^^^^^^ @@ -175,6 +268,12 @@ Example .. _this MMLSpark example: https://github.com/Azure/mmlspark/blob/master/notebooks/samples/LightGBM%20-%20Quantile%20Regression%20for%20Drug%20Discovery.ipynb +.. _the Dask Array documentation: https://docs.dask.org/en/latest/array.html + +.. _the Dask DataFrame documentation: https://docs.dask.org/en/latest/dataframe.html + +.. _the Dask worker documentation: https://distributed.dask.org/en/latest/worker.html#memory-management + .. _the MMLSpark Documentation: https://github.com/Azure/mmlspark/blob/master/docs/lightgbm.md .. _Kubeflow Fairing: https://www.kubeflow.org/docs/components/fairing/fairing-overview diff --git a/docs/_static/images/dask-concat.svg b/docs/_static/images/dask-concat.svg new file mode 100644 index 000000000000..a230535d50c2 --- /dev/null +++ b/docs/_static/images/dask-concat.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/docs/_static/images/dask-initial-setup.svg b/docs/_static/images/dask-initial-setup.svg new file mode 100644 index 000000000000..5ffe85b87397 --- /dev/null +++ b/docs/_static/images/dask-initial-setup.svg @@ -0,0 +1 @@ + \ No newline at end of file From a03ff42b80f87720ac38cf01b7de06354ecbefc1 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Sun, 28 Feb 2021 23:47:35 -0600 Subject: [PATCH 02/15] add notes on saving the model --- docs/Parallel-Learning-Guide.rst | 156 ++++++++++++++++++++++++++++++- 1 file changed, 152 insertions(+), 4 deletions(-) diff --git a/docs/Parallel-Learning-Guide.rst b/docs/Parallel-Learning-Guide.rst index bbc285b26395..9a306198793c 100644 --- a/docs/Parallel-Learning-Guide.rst +++ b/docs/Parallel-Learning-Guide.rst @@ -126,7 +126,7 @@ When setting up data partitioning for LightGBM training with Dask, try to follow Using a Specific Dask Client **************************** -In most situations, you should not need to tell ``lightgbm.dask`` to use a specific Dask client. By default, whenever you use code from that module LightGBM will call ``distributed.default_client()`` to find the most recent created client. +In most situations, you should not need to tell ``lightgbm.dask`` to use a specific Dask client. By default, the client returned by ``distributed.default_client()`` will be used. However, you might want to explicitly control the Dask client used by LightGBM if you have multiple active clients in the same session. This is useful in more complex workflows like running multiple training jobs on different Dask clusters. @@ -141,20 +141,168 @@ LightGBM's Dask estimators support setting an attribute ``client`` to control th client = Client(cluster) # option 1: keyword argumentt in constructor - clf = lgb.DaskLGBMClassifier(client=client) + dask_model = lgb.DaskLGBMClassifier(client=client) # option 2: set_params() after construction - clf = lgb.DaskLGBMClassifier() - clf.set_params(client=client) + dask_model = lgb.DaskLGBMClassifier() + dask_model.set_params(client=client) Note that the ``client`` for an estimator will not be stored if the model object is pickled. If you want to control the client used by a model object loaded from disk, use ``set_params()`` after loading. For more details on that, see `Saving Dask Models <#saving-dask-models>`__. Using Specific Ports ******************** +At the beginning of training, ``lightgbm.dask`` sets up a LightGBM network where each Dask worker runs one long-running task that acts as a LightGBM worker. During training, LightGBM workers communicate with each other over TCP sockets. By default, random open ports are used when creating these sockets. + +If the communication between Dask workers in the cluster used for training is restricted by firewall rules, you must tell LightGBM exactly what ports to use. + +**Option 1: Provide a specific list of addresses and ports** + +LightGBM supports a parameter ``machines``, a comma-delimited string where each entry refers to one worker (host name or IP) and a port that that worker will accept connections on. If you provide this parameter to the estimators in ``lightgbm.dask``, LightGBM will not search randomly for ports. + +For example, consider the case where you are running one Dask worker process on each of the following IP addresses: + +:: + + 10.0.1.0 + 10.0.2.0 + 10.0.3.0 + +You could edit your firewall rules to open one additional port on each of these hosts, then provide ``machines`` directly. + +.. code:: python + + import lightgbm as lgb + + machines = "10.0.1.0:12401,10.0.2.0:12402,10.0.3.0:15000" + dask_model = lgb.DaskLGBMRegressor(machines=machines) + +If you are running multiple Dask worker processes on any machine, be sure that there are multiple entries for that IP address, with different ports. For example, if you were running a cluster with ``nprocs=2`` (2 Dask worker processes per machine), you might open two additional ports on each of these hosts, then provide ``machines`` as follows. + +.. code:: python + + import lightgbm as lgb + + machines = ",".join([ + "10.0.1.0:16000", + "10.0.1.0:16001", + "10.0.2.0:16000", + "10.0.2.0:16001", + ]) + dask_model = lgb.DaskLGBMRegressor(machines=machines) + +.. warning:: + + Providing ``machines`` gives you complete control over the networking details of training, but it also makes the training process fragile. Training will fail if you use ``machines`` and any of the following are true: + + * any of the ports mentioned in ``machines`` are not open when training begins + * some partitions of the training data are held by machines that that are not present in ``machines`` + * some machines mentioned in ``machines`` do not hold any of the training data + +**Option 2: specify one port to use on every worker** + +If you are only running one Dask worker process on each host, and if you can reliably identify a port that is open on every host, using ``machines`` is unnecessarily complicated. If ``local_listen_port`` is given and ``machines`` is not, LightGBM will not search for ports randomly, but it will limit the list of addresses in the LightGBM network to those Dask workers that have a piece of the training data. + +For example, consider the case where you are running one Dask worker process on each of the following IP addresses: + +:: + + 10.0.1.0 + 10.0.2.0 + 10.0.3.0 + +You could edit your firewall rules to allow communication between any of the workers over one port, then provide that port via parameter ``local_listen_port``. + +.. code:: python + + import lightgbm as lgb + + dask_model = lgb.DaskLGBMRegressor(local_listen_port=12400) + +.. warning:: + + Providing ``local_listen_port`` is slightly less fragile than ``machines`` because LightGBM will automatically figure out which workers have pieces of the training data. However, using this method, training can fail if any of the following are true: + + * the port ``local_listen_port`` is not open on any of the worker hosts + * any machine has multiple Dask worker processes running on it + Saving Dask Models '''''''''''''''''' +After training with Dask, you have several options for saving a fitted model. + +**Option 1: pickle the Dask estimator** + +LightGBM's Dask estimators can be pickled directly with ``cloudpickle``, ``joblib``, or ``pickle``. + +.. code:: python + + import dask.array as da + import pickle + import lightgbm as lgb + from distributed import Client, LocalCluster + + cluster = LocalCluster(n_workers=2) + client = Client(cluster) + + X = da.random.random((1000, 10), (500, 10)) + y = da.random.random((1000,)) + + dask_model = lgb.DaskLGBMRegressor() + dask_model.fit(X, y) + + with open("dask-model.pkl", "wb") as f: + pickle.dump(dask_model, f) + +A model saved this way can then later be loaded with whichever serialization library you used to save it. + +.. code:: python + + import pickle + with open("dask-model.pkl", "rb") as f: + dask_model = pickle.load(f) + +.. note:: + + If you explicitly set a Dask client (see `Using a Specific Dask Client <#using-a-specific-dask-client>`__), it will not be saved when pickling the estimator. When loading a Dask estimator from disk, if you need to use a specific client you can add it after loading with ``dask_model.set_params(client=client)``. + +**Option 2: pickle the sklearn estimator** + +The estimators available from ``lightgbm.dask`` can be converted to an instance of the equivalent class from ``lightgbm.sklearn``. Choosing this option allows you to use Dask for training but avoid depending on any Dask libraries at scoring time. + +.. code:: python + + import dask.array as da + import pickle + import lightgbm as lgb + from distributed import Client, LocalCluster + + cluster = LocalCluster(n_workers=2) + client = Client(cluster) + + X = da.random.random((1000, 10), (500, 10)) + y = da.random.random((1000,)) + + dask_model = lgb.DaskLGBMRegressor() + dask_model.fit(X, y) + + # convert to sklearn equivalent + sklearn_model = dask_model.to_local() + + print(type(sklearn_model)) + #> lightgbm.sklearn.LGBMRegressor + + with open("sklearn-model.pkl", "wb") as f: + pickle.dump(sklearn_model, f) + +A model saved this way can then later be loaded with whichever serialization library you used to save it. + +.. code:: python + + import pickle + with open("sklearn-model.pkl", "rb") as f: + sklearn_model = pickle.load(f) + Kubeflow ^^^^^^^^ From 55b9c98ee93475041c70e5aef7ec9cb19696f997 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Wed, 3 Mar 2021 00:23:50 -0600 Subject: [PATCH 03/15] quick start examples --- docs/Parallel-Learning-Guide.rst | 131 +++++++++++++++++++++++++++++-- 1 file changed, 126 insertions(+), 5 deletions(-) diff --git a/docs/Parallel-Learning-Guide.rst b/docs/Parallel-Learning-Guide.rst index 9a306198793c..2d728ec9f39a 100644 --- a/docs/Parallel-Learning-Guide.rst +++ b/docs/Parallel-Learning-Guide.rst @@ -65,8 +65,94 @@ LightGBM's Python package supports distributed learning via `Dask`_. This integr Quick Dask Examples ''''''''''''''''''' -Dask-Based Training -''''''''''''''''''' +Theses examples show minimal code needed to train a LightGBM model using Dask. See the other sections after this for more details. + +**Classification** + +For multiclass classification, set ``centers=3`` below. + +.. code:: python + + import dask.array as da + import lightgbm as lgb + from distributed import Client, LocalCluster + from sklearn.datasets import make_blobs + + X, y = make_blobs(n_samples=1000, n_features=50, centers=2) + + cluster = LocalCluster(n_workers=2) + client = Client(cluster) + + dX = da.from_array(X, chunks=(100, 50)) + dy = da.from_array(y, chunks=(100,)) + + dask_model = lgb.DaskLGBMClassifier() + dask_model.fit(dX, dy) + +**Ranking** + +.. code:: python + + IN PROGRESS + + import dask.array as da + import lightgbm as lgb + import numpy as np + import urllib.request + from sklearn.datasets import load_svmlight_file + from distributed import Client, LocalCluster + + base_url = "https://raw.githubusercontent.com/microsoft/LightGBM/master/examples/lambdarank" + + with urllib.request.urlopen(f"{base_url}/rank.train") as f: + X, y = load_svmlight_file(f) + + with urllib.request.urlopen(f"{base_url}/rank.train.query") as f: + group = np.loadtxt(f) + + cluster = LocalCluster(n_workers=2) + client = Client(cluster) + + # split training data into two partitions + rows_in_part1 = int(np.sum(group[:100])) + num_features = X.shape[1] + + dX = da.from_array( + x=X, + chunks=[ + (rows_in_part1, -1), + (X.shape[0] - rows_in_part1, -1) + ] + ) + + dy = da.from_array(y) + dg = da.from_array(group) + + dask_model = lgb.DaskLGBMRanker() + dask_model.fit(dX, dy, group=dg) + +**Regression** + +.. code:: python + + import dask.array as da + import lightgbm as lgb + from distributed import Client, LocalCluster + from sklearn.datasets import make_regression + + X, y = make_blobs(n_samples=1000, n_features=50) + + cluster = LocalCluster(n_workers=2) + client = Client(cluster) + + dX = da.from_array(X, chunks=(100, 50)) + dy = da.from_array(y, chunks=(100,)) + + dask_model = lgb.DaskLGBMRegressor() + dask_model.fit(dX, dy) + +Training with Dask +'''''''''''''''''' This section contains detailed information on performing LightGBM distributed training using Dask. @@ -226,9 +312,14 @@ You could edit your firewall rules to allow communication between any of the wor * the port ``local_listen_port`` is not open on any of the worker hosts * any machine has multiple Dask worker processes running on it -Saving Dask Models +Dask-Based Scoring '''''''''''''''''' + + +Scoring with Dask +''''''''''''''''' + After training with Dask, you have several options for saving a fitted model. **Option 1: pickle the Dask estimator** @@ -246,7 +337,7 @@ LightGBM's Dask estimators can be pickled directly with ``cloudpickle``, ``jobli client = Client(cluster) X = da.random.random((1000, 10), (500, 10)) - y = da.random.random((1000,)) + y = da.random.random((1000,), (500,)) dask_model = lgb.DaskLGBMRegressor() dask_model.fit(X, y) @@ -281,7 +372,7 @@ The estimators available from ``lightgbm.dask`` can be converted to an instance client = Client(cluster) X = da.random.random((1000, 10), (500, 10)) - y = da.random.random((1000,)) + y = da.random.random((1000,), (500,)) dask_model = lgb.DaskLGBMRegressor() dask_model.fit(X, y) @@ -303,6 +394,36 @@ A model saved this way can then later be loaded with whichever serialization lib with open("sklearn-model.pkl", "rb") as f: sklearn_model = pickle.load(f) +**Option 3: save the LightGBM Booster** + +The lowest-level model object in LightGBM is the ``lightgbm.Booster``. After training, you can extract a Booster from the Dask estimator. + +.. code:: python + + import dask.array as da + import pickle + import lightgbm as lgb + from distributed import Client, LocalCluster + + cluster = LocalCluster(n_workers=2) + client = Client(cluster) + + X = da.random.random((1000, 10), (500, 10)) + y = da.random.random((1000,), (500,)) + + dask_model = lgb.DaskLGBMRegressor() + dask_model.fit(X, y) + + # convert to sklearn equivalent + bst = dask_model.booster_ + +From the point forward, you can use any of the following methods to save the Booster. + +* serialize with ``cloudpickle``, ``joblib``, or ``pickle`` +* ``bst.dump_model()``: dump the model to a dictionary which could be written out as JSON +* ``bst.model_to_string()``: dump the model to a string in memory +* ``bst.save_model()``: write the output of ``bst.model_to_string()`` to a text file + Kubeflow ^^^^^^^^ From 76c2d2bc836e369fed3ea0b9daa74c04ff750862 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 4 Mar 2021 16:09:25 -0600 Subject: [PATCH 04/15] add examples --- docs/Parallel-Learning-Guide.rst | 44 +---- examples/dask/README.md | 21 ++ examples/dask/Untitled.ipynb | 220 +++++++++++++++++++++ examples/dask/binary-classification.py | 21 ++ examples/dask/multiclass-classification.py | 21 ++ examples/dask/ranking.py | 29 +++ examples/dask/regression.py | 21 ++ 7 files changed, 338 insertions(+), 39 deletions(-) create mode 100644 examples/dask/README.md create mode 100644 examples/dask/Untitled.ipynb create mode 100644 examples/dask/binary-classification.py create mode 100644 examples/dask/multiclass-classification.py create mode 100644 examples/dask/ranking.py create mode 100644 examples/dask/regression.py diff --git a/docs/Parallel-Learning-Guide.rst b/docs/Parallel-Learning-Guide.rst index 2d728ec9f39a..6f30e1e5e4ca 100644 --- a/docs/Parallel-Learning-Guide.rst +++ b/docs/Parallel-Learning-Guide.rst @@ -62,32 +62,11 @@ Dask LightGBM's Python package supports distributed learning via `Dask`_. This integration is maintained by LightGBM's maintainers. -Quick Dask Examples -''''''''''''''''''' +Dask Examples +''''''''''''' -Theses examples show minimal code needed to train a LightGBM model using Dask. See the other sections after this for more details. +For sample code using ``lightgbm.dask``, see `these Dask examples`__ . -**Classification** - -For multiclass classification, set ``centers=3`` below. - -.. code:: python - - import dask.array as da - import lightgbm as lgb - from distributed import Client, LocalCluster - from sklearn.datasets import make_blobs - - X, y = make_blobs(n_samples=1000, n_features=50, centers=2) - - cluster = LocalCluster(n_workers=2) - client = Client(cluster) - - dX = da.from_array(X, chunks=(100, 50)) - dy = da.from_array(y, chunks=(100,)) - - dask_model = lgb.DaskLGBMClassifier() - dask_model.fit(dX, dy) **Ranking** @@ -135,21 +114,6 @@ For multiclass classification, set ``centers=3`` below. .. code:: python - import dask.array as da - import lightgbm as lgb - from distributed import Client, LocalCluster - from sklearn.datasets import make_regression - - X, y = make_blobs(n_samples=1000, n_features=50) - - cluster = LocalCluster(n_workers=2) - client = Client(cluster) - - dX = da.from_array(X, chunks=(100, 50)) - dy = da.from_array(y, chunks=(100,)) - - dask_model = lgb.DaskLGBMRegressor() - dask_model.fit(dX, dy) Training with Dask '''''''''''''''''' @@ -545,6 +509,8 @@ Example .. _the MMLSpark Documentation: https://github.com/Azure/mmlspark/blob/master/docs/lightgbm.md +.. _these Dask examples: https://github.com/microsoft/lightgbm/tree/master/examples/parallel_learning + .. _Kubeflow Fairing: https://www.kubeflow.org/docs/components/fairing/fairing-overview .. _These examples: https://github.com/kubeflow/fairing/tree/master/examples/lightgbm diff --git a/examples/dask/README.md b/examples/dask/README.md new file mode 100644 index 000000000000..cb7217b123cc --- /dev/null +++ b/examples/dask/README.md @@ -0,0 +1,21 @@ +Dask Examples +============= + +This directory contains examples of machine learning workflows with LightGBM and [Dask](https://dask.org/). + +Before running this code, see [the installation instructions for the Dask-package](https://github.com/microsoft/LightGBM/tree/master/python-package#install-dask-package). + +After installing the package and its dependencies, any of the examples here can be run with a command like this: + +```shell +python binary-classification.py +``` + +**Quick Examples** + +The examples listed below contain minimal code showing how to train LightGBM models using Dask. + +* [binary-classification.py](./binary-classification.py) +* [multiclass-classification.py](./multiclass-classification.py) +* [ranking.py](./ranking.py) +* [regression.py](./regression.py) diff --git a/examples/dask/Untitled.ipynb b/examples/dask/Untitled.ipynb new file mode 100644 index 000000000000..7e7e601a29ad --- /dev/null +++ b/examples/dask/Untitled.ipynb @@ -0,0 +1,220 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import dask.array as da\n", + "import lightgbm as lgb\n", + "import numpy as np\n", + "from dask import delayed\n", + "from sklearn.datasets import load_svmlight_file\n", + "from distributed import Client, LocalCluster, wait" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "X, y = load_svmlight_file(\"../lambdarank/rank.train\")\n", + "group = np.loadtxt(\"../lambdarank/rank.train.query\")\n", + "\n", + "cluster = LocalCluster(n_workers=2)\n", + "client = Client(cluster)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "rows_in_part1 = int(np.sum(group[:100]))\n", + "num_features = X.shape[1]\n", + "\n", + "dX = da.concatenate(\n", + " [\n", + " da.from_array(X[:rows_in_part1]),\n", + " da.from_array(X[rows_in_part1:])\n", + " ]\n", + ")\n", + "dy = da.concatenate(\n", + " [\n", + " da.from_array(y[:rows_in_part1]),\n", + " da.from_array(y[rows_in_part1:])\n", + " ]\n", + ")\n", + "dg = da.concatenate(\n", + " [\n", + " da.from_array(group[:100]),\n", + " da.from_array(group[100:])\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def _split_to_parts(data, is_matrix):\n", + " parts = data.to_delayed()\n", + " if isinstance(parts, np.ndarray):\n", + " if is_matrix:\n", + " assert parts.shape[1] == 1\n", + " else:\n", + " assert parts.ndim == 1 or parts.shape[1] == 1\n", + " parts = parts.flatten().tolist()\n", + " return parts\n", + "\n", + "def _concat(seq):\n", + " if isinstance(seq[0], np.ndarray):\n", + " return np.concatenate(seq, axis=0)\n", + " elif isinstance(seq[0], (pd_DataFrame, pd_Series)):\n", + " return concat(seq, axis=0)\n", + " elif isinstance(seq[0], ss.spmatrix):\n", + " return ss.vstack(seq, format='csr')\n", + " else:\n", + " raise TypeError('Data must be one of: numpy arrays, pandas dataframes, sparse matrices (from scipy). Got %s.' % str(type(seq[0])))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Split arrays/dataframes into parts. Arrange parts into dicts to enforce co-locality\n", + "data_parts = _split_to_parts(data=dX, is_matrix=True)\n", + "label_parts = _split_to_parts(data=dy, is_matrix=False)\n", + "\n", + "parts = [{'data': x, 'label': y} for (x, y) in zip(data_parts, label_parts)]\n", + "n_parts = len(parts)\n", + "\n", + "group_parts = _split_to_parts(data=dg, is_matrix=False)\n", + "for i in range(n_parts):\n", + " parts[i]['group'] = group_parts[i]\n", + "\n", + "# Start computation in the background\n", + "parts = list(map(delayed, parts))\n", + "parts = client.compute(parts)\n", + "wait(parts)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from collections import defaultdict\n", + "\n", + "key_to_part_dict = {part.key: part for part in parts} # type: ignore\n", + "who_has = client.who_has(parts)\n", + "worker_map = defaultdict(list)\n", + "for key, workers in who_has.items():\n", + " worker_map[next(iter(workers))].append(key_to_part_dict[key])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "[x['data'] for x in parts]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# def _group_chunk(group, start, end):\n", + "# return(group[start:end])\n", + "\n", + "# group_chunks = [\n", + "# delayed(_group_chunk)(group, 0, 100),\n", + "# delayed(_group_chunk)(group, 100, group.size)\n", + "# ]\n", + "\n", + "dg = da.from_delayed(group_chunks)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "group.size" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# # split training data into two partitions\n", + "# rows_in_part1 = int(np.sum(group[:100]))\n", + "# num_features = X.shape[1]\n", + "\n", + "# dX = da.from_array(\n", + "# x=X,\n", + "# chunks=[\n", + "# (rows_in_part1, num_features),\n", + "# (X.shape[0] - rows_in_part1, num_features)\n", + "# ]\n", + "# )\n", + "\n", + "# dy = da.from_array(y)\n", + "# dg = da.from_array(group)\n", + "\n", + "dask_model = lgb.DaskLGBMRanker()\n", + "dask_model.fit(dX, dy, group=dg)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/examples/dask/binary-classification.py b/examples/dask/binary-classification.py new file mode 100644 index 000000000000..0772b9e49c61 --- /dev/null +++ b/examples/dask/binary-classification.py @@ -0,0 +1,21 @@ +import dask.array as da +import lightgbm as lgb +from distributed import Client, LocalCluster +from sklearn.datasets import make_blobs + +print("loading data") +X, y = make_blobs(n_samples=1000, n_features=50, centers=2) + +print("initializing a Dask cluster") +cluster = LocalCluster(n_workers=2) +client = Client(cluster) +print("created a Dask LocalCluster") + +print("distributing training data on the Dask cluster") +dX = da.from_array(X, chunks=(100, 50)) +dy = da.from_array(y, chunks=(100,)) + +print("beginning training") +dask_model = lgb.DaskLGBMClassifier() +dask_model.fit(dX, dy) +print("done training") diff --git a/examples/dask/multiclass-classification.py b/examples/dask/multiclass-classification.py new file mode 100644 index 000000000000..667e7251aceb --- /dev/null +++ b/examples/dask/multiclass-classification.py @@ -0,0 +1,21 @@ +import dask.array as da +import lightgbm as lgb +from distributed import Client, LocalCluster +from sklearn.datasets import make_blobs + +print("loading data") +X, y = make_blobs(n_samples=1000, n_features=50, centers=3) + +print("initializing a Dask cluster") +cluster = LocalCluster(n_workers=2) +client = Client(cluster) +print("created a Dask LocalCluster") + +print("distributing training data on the Dask cluster") +dX = da.from_array(X, chunks=(100, 50)) +dy = da.from_array(y, chunks=(100,)) + +print("beginning training") +dask_model = lgb.DaskLGBMClassifier() +dask_model.fit(dX, dy) +print("done training") diff --git a/examples/dask/ranking.py b/examples/dask/ranking.py new file mode 100644 index 000000000000..e09c2ae029e0 --- /dev/null +++ b/examples/dask/ranking.py @@ -0,0 +1,29 @@ +import dask.array as da +import lightgbm as lgb +import numpy as np +from sklearn.datasets import load_svmlight_file +from distributed import Client, LocalCluster + +X, y = load_svmlight_file("../lambdarank/rank.train") +group = np.loadtxt("../lambdarank/rank.train.query") + +cluster = LocalCluster(n_workers=2) +client = Client(cluster) + +# split training data into two partitions +rows_in_part1 = int(np.sum(group[:100])) +num_features = X.shape[1] + +dX = da.from_array( + x=X, + chunks=[ + (rows_in_part1, num_features), + (X.shape[0] - rows_in_part1, num_features) + ] +) + +dy = da.from_array(y) +dg = da.from_array(group) + +dask_model = lgb.DaskLGBMRanker() +dask_model.fit(dX, dy, group=dg) diff --git a/examples/dask/regression.py b/examples/dask/regression.py new file mode 100644 index 000000000000..870ba836c9d1 --- /dev/null +++ b/examples/dask/regression.py @@ -0,0 +1,21 @@ +import dask.array as da +import lightgbm as lgb +from distributed import Client, LocalCluster +from sklearn.datasets import make_regression + +print("loading data") +X, y = make_regression(n_samples=1000, n_features=50) + +print("initializing a Dask cluster") +cluster = LocalCluster(n_workers=2) +client = Client(cluster) +print("created a Dask LocalCluster") + +print("distributing training data on the Dask cluster") +dX = da.from_array(X, chunks=(100, 50)) +dy = da.from_array(y, chunks=(100,)) + +print("beginning training") +dask_model = lgb.DaskLGBMRegressor() +dask_model.fit(dX, dy) +print("done training") From 3ee2f44d58fae8321eb2ccc03fe165af1d0c6f37 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 4 Mar 2021 16:55:33 -0600 Subject: [PATCH 05/15] fix timeouts in examples --- examples/dask/Untitled.ipynb | 48 +++++++++++++++- examples/dask/binary-classification.py | 40 ++++++++------ examples/dask/multiclass-classification.py | 40 ++++++++------ examples/dask/ranking.py | 64 ++++++++++++++++------ examples/dask/regression.py | 40 ++++++++------ 5 files changed, 165 insertions(+), 67 deletions(-) diff --git a/examples/dask/Untitled.ipynb b/examples/dask/Untitled.ipynb index 7e7e601a29ad..a7e60011788e 100644 --- a/examples/dask/Untitled.ipynb +++ b/examples/dask/Untitled.ipynb @@ -27,6 +27,52 @@ "client = Client(cluster)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# split training data into two partitions\n", + "rows_in_part1 = int(np.sum(group[:100]))\n", + "num_features = X.shape[1]\n", + "\n", + "# make this array dense because we're splitting across\n", + "# a sparse boundary to partition the data\n", + "X = X.todense()\n", + "\n", + "dX = da.from_array(\n", + " x=X,\n", + " chunks=[\n", + " (rows_in_part1, X.shape[0] - rows_in_part1),\n", + " (num_features, )\n", + " ]\n", + ")\n", + "\n", + "dy = da.from_array(\n", + " x=y,\n", + " chunks=[\n", + " (rows_in_part1, X.shape[0] - rows_in_part1),\n", + " ]\n", + ")\n", + "dg = da.from_array(\n", + " x=group,\n", + " chunks=[\n", + " (100, group.size - 100)\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dask_model = lgb.DaskLGBMRanker()\n", + "dask_model.fit(dX, dy, group=dg)" + ] + }, { "cell_type": "code", "execution_count": null, @@ -212,7 +258,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.3" + "version": "3.7.6" } }, "nbformat": 4, diff --git a/examples/dask/binary-classification.py b/examples/dask/binary-classification.py index 0772b9e49c61..7ca01edad81b 100644 --- a/examples/dask/binary-classification.py +++ b/examples/dask/binary-classification.py @@ -3,19 +3,27 @@ from distributed import Client, LocalCluster from sklearn.datasets import make_blobs -print("loading data") -X, y = make_blobs(n_samples=1000, n_features=50, centers=2) - -print("initializing a Dask cluster") -cluster = LocalCluster(n_workers=2) -client = Client(cluster) -print("created a Dask LocalCluster") - -print("distributing training data on the Dask cluster") -dX = da.from_array(X, chunks=(100, 50)) -dy = da.from_array(y, chunks=(100,)) - -print("beginning training") -dask_model = lgb.DaskLGBMClassifier() -dask_model.fit(dX, dy) -print("done training") +if __name__ == "__main__": + + print("loading data") + + X, y = make_blobs(n_samples=1000, n_features=50, centers=2) + + print("initializing a Dask cluster") + + cluster = LocalCluster() + client = Client(cluster) + + print("created a Dask LocalCluster") + + print("distributing training data on the Dask cluster") + + dX = da.from_array(X, chunks=(100, 50)) + dy = da.from_array(y, chunks=(100,)) + + print("beginning training") + + dask_model = lgb.DaskLGBMClassifier() + dask_model.fit(dX, dy) + + print("done training") diff --git a/examples/dask/multiclass-classification.py b/examples/dask/multiclass-classification.py index 667e7251aceb..dcf5960e224b 100644 --- a/examples/dask/multiclass-classification.py +++ b/examples/dask/multiclass-classification.py @@ -3,19 +3,27 @@ from distributed import Client, LocalCluster from sklearn.datasets import make_blobs -print("loading data") -X, y = make_blobs(n_samples=1000, n_features=50, centers=3) - -print("initializing a Dask cluster") -cluster = LocalCluster(n_workers=2) -client = Client(cluster) -print("created a Dask LocalCluster") - -print("distributing training data on the Dask cluster") -dX = da.from_array(X, chunks=(100, 50)) -dy = da.from_array(y, chunks=(100,)) - -print("beginning training") -dask_model = lgb.DaskLGBMClassifier() -dask_model.fit(dX, dy) -print("done training") +if __name__ == "__main__": + + print("loading data") + + X, y = make_blobs(n_samples=1000, n_features=50, centers=3) + + print("initializing a Dask cluster") + + cluster = LocalCluster(n_workers=2) + client = Client(cluster) + + print("created a Dask LocalCluster") + + print("distributing training data on the Dask cluster") + + dX = da.from_array(X, chunks=(100, 50)) + dy = da.from_array(y, chunks=(100,)) + + print("beginning training") + + dask_model = lgb.DaskLGBMClassifier() + dask_model.fit(dX, dy) + + print("done training") diff --git a/examples/dask/ranking.py b/examples/dask/ranking.py index e09c2ae029e0..8efb535a5358 100644 --- a/examples/dask/ranking.py +++ b/examples/dask/ranking.py @@ -4,26 +4,54 @@ from sklearn.datasets import load_svmlight_file from distributed import Client, LocalCluster -X, y = load_svmlight_file("../lambdarank/rank.train") -group = np.loadtxt("../lambdarank/rank.train.query") +if __name__ == "__main__": -cluster = LocalCluster(n_workers=2) -client = Client(cluster) + print("loading data") -# split training data into two partitions -rows_in_part1 = int(np.sum(group[:100])) -num_features = X.shape[1] + X, y = load_svmlight_file("../lambdarank/rank.train") + group = np.loadtxt("../lambdarank/rank.train.query") -dX = da.from_array( - x=X, - chunks=[ - (rows_in_part1, num_features), - (X.shape[0] - rows_in_part1, num_features) - ] -) + print("initializing a Dask cluster") -dy = da.from_array(y) -dg = da.from_array(group) + cluster = LocalCluster(n_workers=2) + client = Client(cluster) -dask_model = lgb.DaskLGBMRanker() -dask_model.fit(dX, dy, group=dg) + print("created a Dask LocalCluster") + + print("distributing training data on the Dask cluster") + + # split training data into two partitions + rows_in_part1 = int(np.sum(group[:100])) + rows_in_part2 = X.shape[0] - rows_in_part1 + num_features = X.shape[1] + + # make this array dense because we're splitting across + # a sparse boundary to partition the data + X = X.todense() + + dX = da.from_array( + x=X, + chunks=[ + (rows_in_part1, rows_in_part2), + (num_features, ) + ] + ) + dy = da.from_array( + x=y, + chunks=[ + (rows_in_part1, rows_in_part2), + ] + ) + dg = da.from_array( + x=group, + chunks=[ + (100, group.size - 100) + ] + ) + + print("beginning training") + + dask_model = lgb.DaskLGBMRanker() + dask_model.fit(dX, dy, group=dg) + + print("done training") diff --git a/examples/dask/regression.py b/examples/dask/regression.py index 870ba836c9d1..8d544c3fb223 100644 --- a/examples/dask/regression.py +++ b/examples/dask/regression.py @@ -3,19 +3,27 @@ from distributed import Client, LocalCluster from sklearn.datasets import make_regression -print("loading data") -X, y = make_regression(n_samples=1000, n_features=50) - -print("initializing a Dask cluster") -cluster = LocalCluster(n_workers=2) -client = Client(cluster) -print("created a Dask LocalCluster") - -print("distributing training data on the Dask cluster") -dX = da.from_array(X, chunks=(100, 50)) -dy = da.from_array(y, chunks=(100,)) - -print("beginning training") -dask_model = lgb.DaskLGBMRegressor() -dask_model.fit(dX, dy) -print("done training") +if __name__ == "__main__": + + print("loading data") + + X, y = make_regression(n_samples=1000, n_features=50) + + print("initializing a Dask cluster") + + cluster = LocalCluster(n_workers=2) + client = Client(cluster) + + print("created a Dask LocalCluster") + + print("distributing training data on the Dask cluster") + + dX = da.from_array(X, chunks=(100, 50)) + dy = da.from_array(y, chunks=(100,)) + + print("beginning training") + + dask_model = lgb.DaskLGBMRegressor() + dask_model.fit(dX, dy) + + print("done training") From 8db0a6c14ddf2dd364f0235d1815168a5eccb703 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 4 Mar 2021 16:55:44 -0600 Subject: [PATCH 06/15] remove notebook --- examples/dask/Untitled.ipynb | 266 ----------------------------------- 1 file changed, 266 deletions(-) delete mode 100644 examples/dask/Untitled.ipynb diff --git a/examples/dask/Untitled.ipynb b/examples/dask/Untitled.ipynb deleted file mode 100644 index a7e60011788e..000000000000 --- a/examples/dask/Untitled.ipynb +++ /dev/null @@ -1,266 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import dask.array as da\n", - "import lightgbm as lgb\n", - "import numpy as np\n", - "from dask import delayed\n", - "from sklearn.datasets import load_svmlight_file\n", - "from distributed import Client, LocalCluster, wait" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "X, y = load_svmlight_file(\"../lambdarank/rank.train\")\n", - "group = np.loadtxt(\"../lambdarank/rank.train.query\")\n", - "\n", - "cluster = LocalCluster(n_workers=2)\n", - "client = Client(cluster)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# split training data into two partitions\n", - "rows_in_part1 = int(np.sum(group[:100]))\n", - "num_features = X.shape[1]\n", - "\n", - "# make this array dense because we're splitting across\n", - "# a sparse boundary to partition the data\n", - "X = X.todense()\n", - "\n", - "dX = da.from_array(\n", - " x=X,\n", - " chunks=[\n", - " (rows_in_part1, X.shape[0] - rows_in_part1),\n", - " (num_features, )\n", - " ]\n", - ")\n", - "\n", - "dy = da.from_array(\n", - " x=y,\n", - " chunks=[\n", - " (rows_in_part1, X.shape[0] - rows_in_part1),\n", - " ]\n", - ")\n", - "dg = da.from_array(\n", - " x=group,\n", - " chunks=[\n", - " (100, group.size - 100)\n", - " ]\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "dask_model = lgb.DaskLGBMRanker()\n", - "dask_model.fit(dX, dy, group=dg)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "rows_in_part1 = int(np.sum(group[:100]))\n", - "num_features = X.shape[1]\n", - "\n", - "dX = da.concatenate(\n", - " [\n", - " da.from_array(X[:rows_in_part1]),\n", - " da.from_array(X[rows_in_part1:])\n", - " ]\n", - ")\n", - "dy = da.concatenate(\n", - " [\n", - " da.from_array(y[:rows_in_part1]),\n", - " da.from_array(y[rows_in_part1:])\n", - " ]\n", - ")\n", - "dg = da.concatenate(\n", - " [\n", - " da.from_array(group[:100]),\n", - " da.from_array(group[100:])\n", - " ]\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def _split_to_parts(data, is_matrix):\n", - " parts = data.to_delayed()\n", - " if isinstance(parts, np.ndarray):\n", - " if is_matrix:\n", - " assert parts.shape[1] == 1\n", - " else:\n", - " assert parts.ndim == 1 or parts.shape[1] == 1\n", - " parts = parts.flatten().tolist()\n", - " return parts\n", - "\n", - "def _concat(seq):\n", - " if isinstance(seq[0], np.ndarray):\n", - " return np.concatenate(seq, axis=0)\n", - " elif isinstance(seq[0], (pd_DataFrame, pd_Series)):\n", - " return concat(seq, axis=0)\n", - " elif isinstance(seq[0], ss.spmatrix):\n", - " return ss.vstack(seq, format='csr')\n", - " else:\n", - " raise TypeError('Data must be one of: numpy arrays, pandas dataframes, sparse matrices (from scipy). Got %s.' % str(type(seq[0])))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Split arrays/dataframes into parts. Arrange parts into dicts to enforce co-locality\n", - "data_parts = _split_to_parts(data=dX, is_matrix=True)\n", - "label_parts = _split_to_parts(data=dy, is_matrix=False)\n", - "\n", - "parts = [{'data': x, 'label': y} for (x, y) in zip(data_parts, label_parts)]\n", - "n_parts = len(parts)\n", - "\n", - "group_parts = _split_to_parts(data=dg, is_matrix=False)\n", - "for i in range(n_parts):\n", - " parts[i]['group'] = group_parts[i]\n", - "\n", - "# Start computation in the background\n", - "parts = list(map(delayed, parts))\n", - "parts = client.compute(parts)\n", - "wait(parts)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from collections import defaultdict\n", - "\n", - "key_to_part_dict = {part.key: part for part in parts} # type: ignore\n", - "who_has = client.who_has(parts)\n", - "worker_map = defaultdict(list)\n", - "for key, workers in who_has.items():\n", - " worker_map[next(iter(workers))].append(key_to_part_dict[key])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "[x['data'] for x in parts]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# def _group_chunk(group, start, end):\n", - "# return(group[start:end])\n", - "\n", - "# group_chunks = [\n", - "# delayed(_group_chunk)(group, 0, 100),\n", - "# delayed(_group_chunk)(group, 100, group.size)\n", - "# ]\n", - "\n", - "dg = da.from_delayed(group_chunks)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "group.size" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# # split training data into two partitions\n", - "# rows_in_part1 = int(np.sum(group[:100]))\n", - "# num_features = X.shape[1]\n", - "\n", - "# dX = da.from_array(\n", - "# x=X,\n", - "# chunks=[\n", - "# (rows_in_part1, num_features),\n", - "# (X.shape[0] - rows_in_part1, num_features)\n", - "# ]\n", - "# )\n", - "\n", - "# dy = da.from_array(y)\n", - "# dg = da.from_array(group)\n", - "\n", - "dask_model = lgb.DaskLGBMRanker()\n", - "dask_model.fit(dX, dy, group=dg)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.7.6" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} From 33d79e73054bfd6eb84110362d79caac89373cd5 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 5 Mar 2021 00:22:09 -0600 Subject: [PATCH 07/15] fill out prediction section --- docs/Parallel-Learning-Guide.rst | 86 ++++++++------------------------ examples/dask/prediction.py | 47 +++++++++++++++++ examples/dask/ranking.py | 10 ++-- 3 files changed, 74 insertions(+), 69 deletions(-) create mode 100644 examples/dask/prediction.py diff --git a/docs/Parallel-Learning-Guide.rst b/docs/Parallel-Learning-Guide.rst index 6f30e1e5e4ca..8ac644468cd1 100644 --- a/docs/Parallel-Learning-Guide.rst +++ b/docs/Parallel-Learning-Guide.rst @@ -60,60 +60,12 @@ Dask .. versionadded:: 3.2.0 -LightGBM's Python package supports distributed learning via `Dask`_. This integration is maintained by LightGBM's maintainers. +LightGBM's Python package supports distributed learning via `Dask`_. Dask Examples ''''''''''''' -For sample code using ``lightgbm.dask``, see `these Dask examples`__ . - - -**Ranking** - -.. code:: python - - IN PROGRESS - - import dask.array as da - import lightgbm as lgb - import numpy as np - import urllib.request - from sklearn.datasets import load_svmlight_file - from distributed import Client, LocalCluster - - base_url = "https://raw.githubusercontent.com/microsoft/LightGBM/master/examples/lambdarank" - - with urllib.request.urlopen(f"{base_url}/rank.train") as f: - X, y = load_svmlight_file(f) - - with urllib.request.urlopen(f"{base_url}/rank.train.query") as f: - group = np.loadtxt(f) - - cluster = LocalCluster(n_workers=2) - client = Client(cluster) - - # split training data into two partitions - rows_in_part1 = int(np.sum(group[:100])) - num_features = X.shape[1] - - dX = da.from_array( - x=X, - chunks=[ - (rows_in_part1, -1), - (X.shape[0] - rows_in_part1, -1) - ] - ) - - dy = da.from_array(y) - dg = da.from_array(group) - - dask_model = lgb.DaskLGBMRanker() - dask_model.fit(dX, dy, group=dg) - -**Regression** - -.. code:: python - +For sample code using ``lightgbm.dask``, see `these Dask examples`_ . Training with Dask '''''''''''''''''' @@ -138,7 +90,7 @@ If you do not have other significant processes competing with Dask for resources **Managing Memory** -Use the Dask diagnostic dashboard or your preferred monitoring tool to monitor Dask workers' memory consumption during training. As described in `the Dask worker documentation`_, Dask workers will automatically start spilling data to Disk if memory consumptio gets too high. This can substantially slow down computations, since disk I/O is usually much slower than reading the same data from memory. +Use the Dask diagnostic dashboard or your preferred monitoring tool to monitor Dask workers' memory consumption during training. As described in `the Dask worker documentation`_, Dask workers will automatically start spilling data to Disk if memory consumption gets too high. This can substantially slow down computations, since disk I/O is usually much slower than reading the same data from memory. `At 60% of memory load, [Dask will] spill least recently used data to disk` @@ -159,7 +111,7 @@ The estimators in ``lightgbm.dask`` expect that matrix-like or array-like data a :alt: On the left, rectangles showing a 5 by 5 grid for a local dataset. On the right, two circles representing Dask workers, one with a 3 by 5 grid and one with a 2 by 5 grid. :target: ./_static/images/dask-initial-setup.svg -While setting up for training, ``lightgbm`` will concatenate all of the partitions on a work into a single dataset. Distributed training then proceeds with one LightGBM worker process per Dask worker. +While setting up for training, ``lightgbm`` will concatenate all of the partitions on a worker into a single dataset. Distributed training then proceeds with one LightGBM worker process per Dask worker. .. image:: ./_static/images/dask-concat.svg :align: center @@ -169,9 +121,9 @@ While setting up for training, ``lightgbm`` will concatenate all of the partitio When setting up data partitioning for LightGBM training with Dask, try to follow these suggestions: -* ensure that each worker in the cluster has a piece of the training data +* ensure that each worker in the cluster has some of the training data * try to give each worker roughly the same amount of data, especially if your dataset is small -* if you plan to train multiple models (for example, to tune hyperparameters) on the same data use ``distributed.Client.persist()`` before training to materialize the data one time +* if you plan to train multiple models (for example, to tune hyperparameters) on the same data, use ``client.persist()`` before training to materialize the data one time Using a Specific Dask Client **************************** @@ -190,15 +142,13 @@ LightGBM's Dask estimators support setting an attribute ``client`` to control th cluster = LocalCluster() client = Client(cluster) - # option 1: keyword argumentt in constructor + # option 1: keyword argument in constructor dask_model = lgb.DaskLGBMClassifier(client=client) # option 2: set_params() after construction dask_model = lgb.DaskLGBMClassifier() dask_model.set_params(client=client) -Note that the ``client`` for an estimator will not be stored if the model object is pickled. If you want to control the client used by a model object loaded from disk, use ``set_params()`` after loading. For more details on that, see `Saving Dask Models <#saving-dask-models>`__. - Using Specific Ports ******************** @@ -218,7 +168,7 @@ For example, consider the case where you are running one Dask worker process on 10.0.2.0 10.0.3.0 -You could edit your firewall rules to open one additional port on each of these hosts, then provide ``machines`` directly. +You could edit your firewall rules to allow traffic on one additional port on each of these hosts, then provide ``machines`` directly. .. code:: python @@ -227,7 +177,7 @@ You could edit your firewall rules to open one additional port on each of these machines = "10.0.1.0:12401,10.0.2.0:12402,10.0.3.0:15000" dask_model = lgb.DaskLGBMRegressor(machines=machines) -If you are running multiple Dask worker processes on any machine, be sure that there are multiple entries for that IP address, with different ports. For example, if you were running a cluster with ``nprocs=2`` (2 Dask worker processes per machine), you might open two additional ports on each of these hosts, then provide ``machines`` as follows. +If you are running multiple Dask worker processes on physical host in the cluster, be sure that there are multiple entries for that IP address, with different ports. For example, if you were running a cluster with ``nprocs=2`` (2 Dask worker processes per machine), you might open two additional ports on each of these hosts, then provide ``machines`` as follows. .. code:: python @@ -276,13 +226,17 @@ You could edit your firewall rules to allow communication between any of the wor * the port ``local_listen_port`` is not open on any of the worker hosts * any machine has multiple Dask worker processes running on it -Dask-Based Scoring -'''''''''''''''''' +Prediction with Dask +'''''''''''''''''''' +The estimators from ``lightgbm.dask`` can be used to create predictions based on data stored in Dask collections. In that interface, ``.predict()`` expects a Dask Array or Dask DataFrame, and returns a Dask Array of predictions. +See `the Dask prediction example`_ for some sample code that shows how to perform Dask-based prediction. -Scoring with Dask -''''''''''''''''' +For model evaluation, consider using `the metrics functions from dask-ml`_. Those functions are intended to provide the same API as equivalent functions in ``sklearn.metrics``, but they use distributed computation powered by Dask to compute metrics without all of the input data ever needing to be on a single machine. + +Saving Dask Models +'''''''''''''''''' After training with Dask, you have several options for saving a fitted model. @@ -505,11 +459,15 @@ Example .. _the Dask DataFrame documentation: https://docs.dask.org/en/latest/dataframe.html +.. _the Dask prediction example: https://github.com/microsoft/lightgbm/tree/master/examples/dask/prediction.py + .. _the Dask worker documentation: https://distributed.dask.org/en/latest/worker.html#memory-management +.. _the metrics functions from dask-ml: https://ml.dask.org/modules/api.html#dask-ml-metrics-metrics + .. _the MMLSpark Documentation: https://github.com/Azure/mmlspark/blob/master/docs/lightgbm.md -.. _these Dask examples: https://github.com/microsoft/lightgbm/tree/master/examples/parallel_learning +.. _these Dask examples: https://github.com/microsoft/lightgbm/tree/master/examples/dask .. _Kubeflow Fairing: https://www.kubeflow.org/docs/components/fairing/fairing-overview diff --git a/examples/dask/prediction.py b/examples/dask/prediction.py new file mode 100644 index 000000000000..7ac5f76687ee --- /dev/null +++ b/examples/dask/prediction.py @@ -0,0 +1,47 @@ +import dask.array as da +import lightgbm as lgb +from distributed import Client, LocalCluster +from sklearn.datasets import make_regression +from sklearn.metrics import mean_squared_error + +if __name__ == "__main__": + + print("loading data") + + X, y = make_regression(n_samples=1000, n_features=50) + + print("initializing a Dask cluster") + + cluster = LocalCluster(n_workers=2) + client = Client(cluster) + + print("created a Dask LocalCluster") + + print("distributing training data on the Dask cluster") + + dX = da.from_array(X, chunks=(100, 50)) + dy = da.from_array(y, chunks=(100,)) + + print("beginning training") + + dask_model = lgb.DaskLGBMRegressor() + dask_model.fit(dX, dy) + + print("done training") + + print("predicting on the training data") + + preds = dask_model.predict(dX) + + # the code below uses sklearn.metrics, but this requires pulling all of the + # predictions and target values back from workers to the client + # + # for larger datasets, consider the metrics from dask-ml instead + # https://github.com/dask/dask-ml/tree/main/dask_ml/metrics + print("computing MSE") + + preds_local = preds.compute() + actuals_local = dy.compute() + mse = mean_squared_error(actuals_local, preds_local) + + print(f"MSE: {mse}") diff --git a/examples/dask/ranking.py b/examples/dask/ranking.py index 8efb535a5358..e61acf8ea616 100644 --- a/examples/dask/ranking.py +++ b/examples/dask/ranking.py @@ -30,11 +30,11 @@ X = X.todense() dX = da.from_array( - x=X, - chunks=[ - (rows_in_part1, rows_in_part2), - (num_features, ) - ] + x=X, + chunks=[ + (rows_in_part1, rows_in_part2), + (num_features, ) + ] ) dy = da.from_array( x=y, From 59c76a76136139e179a375eda68ccabb33e94b51 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 5 Mar 2021 00:25:49 -0600 Subject: [PATCH 08/15] table of contents --- examples/dask/README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/dask/README.md b/examples/dask/README.md index cb7217b123cc..c0c2639b7d36 100644 --- a/examples/dask/README.md +++ b/examples/dask/README.md @@ -11,11 +11,15 @@ After installing the package and its dependencies, any of the examples here can python binary-classification.py ``` -**Quick Examples** - The examples listed below contain minimal code showing how to train LightGBM models using Dask. +**Training** + * [binary-classification.py](./binary-classification.py) * [multiclass-classification.py](./multiclass-classification.py) * [ranking.py](./ranking.py) * [regression.py](./regression.py) + +**Prediction** + +* [prediction.py](./prediction.py) From 59df0325f26997aad7edffea52b2fe7cda8e91e4 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 5 Mar 2021 00:30:05 -0600 Subject: [PATCH 09/15] add line back --- docs/Parallel-Learning-Guide.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/Parallel-Learning-Guide.rst b/docs/Parallel-Learning-Guide.rst index 8ac644468cd1..719560a8fcf8 100644 --- a/docs/Parallel-Learning-Guide.rst +++ b/docs/Parallel-Learning-Guide.rst @@ -60,7 +60,7 @@ Dask .. versionadded:: 3.2.0 -LightGBM's Python package supports distributed learning via `Dask`_. +LightGBM's Python package supports distributed learning via `Dask`_. This integration is maintained by LightGBM's maintainers. Dask Examples ''''''''''''' From 3476d6fabb3736f74c370c092e69589230ce4e45 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 5 Mar 2021 00:32:50 -0600 Subject: [PATCH 10/15] linting --- examples/dask/prediction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/dask/prediction.py b/examples/dask/prediction.py index 7ac5f76687ee..f52df3895930 100644 --- a/examples/dask/prediction.py +++ b/examples/dask/prediction.py @@ -32,7 +32,7 @@ print("predicting on the training data") preds = dask_model.predict(dX) - + # the code below uses sklearn.metrics, but this requires pulling all of the # predictions and target values back from workers to the client # From 90ec7b335a94a824c3a3fd5960a25ed908717741 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Sat, 6 Mar 2021 17:12:17 -0600 Subject: [PATCH 11/15] isort --- examples/dask/binary-classification.py | 3 ++- examples/dask/multiclass-classification.py | 3 ++- examples/dask/prediction.py | 3 ++- examples/dask/ranking.py | 5 +++-- examples/dask/regression.py | 3 ++- 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/examples/dask/binary-classification.py b/examples/dask/binary-classification.py index 7ca01edad81b..1ebb5724fbee 100644 --- a/examples/dask/binary-classification.py +++ b/examples/dask/binary-classification.py @@ -1,8 +1,9 @@ import dask.array as da -import lightgbm as lgb from distributed import Client, LocalCluster from sklearn.datasets import make_blobs +import lightgbm as lgb + if __name__ == "__main__": print("loading data") diff --git a/examples/dask/multiclass-classification.py b/examples/dask/multiclass-classification.py index dcf5960e224b..e4154981021a 100644 --- a/examples/dask/multiclass-classification.py +++ b/examples/dask/multiclass-classification.py @@ -1,8 +1,9 @@ import dask.array as da -import lightgbm as lgb from distributed import Client, LocalCluster from sklearn.datasets import make_blobs +import lightgbm as lgb + if __name__ == "__main__": print("loading data") diff --git a/examples/dask/prediction.py b/examples/dask/prediction.py index f52df3895930..8281e6da64b8 100644 --- a/examples/dask/prediction.py +++ b/examples/dask/prediction.py @@ -1,9 +1,10 @@ import dask.array as da -import lightgbm as lgb from distributed import Client, LocalCluster from sklearn.datasets import make_regression from sklearn.metrics import mean_squared_error +import lightgbm as lgb + if __name__ == "__main__": print("loading data") diff --git a/examples/dask/ranking.py b/examples/dask/ranking.py index e61acf8ea616..0b17687027c9 100644 --- a/examples/dask/ranking.py +++ b/examples/dask/ranking.py @@ -1,8 +1,9 @@ import dask.array as da -import lightgbm as lgb import numpy as np -from sklearn.datasets import load_svmlight_file from distributed import Client, LocalCluster +from sklearn.datasets import load_svmlight_file + +import lightgbm as lgb if __name__ == "__main__": diff --git a/examples/dask/regression.py b/examples/dask/regression.py index 8d544c3fb223..395d1e4dcce3 100644 --- a/examples/dask/regression.py +++ b/examples/dask/regression.py @@ -1,8 +1,9 @@ import dask.array as da -import lightgbm as lgb from distributed import Client, LocalCluster from sklearn.datasets import make_regression +import lightgbm as lgb + if __name__ == "__main__": print("loading data") From 0f7051935b0ba485de95678ecac3527e82987e44 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 8 Mar 2021 20:24:48 -0600 Subject: [PATCH 12/15] Apply suggestions from code review Co-authored-by: Nikita Titov --- docs/Parallel-Learning-Guide.rst | 21 ++++++++++----------- examples/dask/binary-classification.py | 2 +- examples/dask/prediction.py | 2 +- examples/dask/ranking.py | 2 +- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/docs/Parallel-Learning-Guide.rst b/docs/Parallel-Learning-Guide.rst index 719560a8fcf8..4144af3b6dde 100644 --- a/docs/Parallel-Learning-Guide.rst +++ b/docs/Parallel-Learning-Guide.rst @@ -90,7 +90,7 @@ If you do not have other significant processes competing with Dask for resources **Managing Memory** -Use the Dask diagnostic dashboard or your preferred monitoring tool to monitor Dask workers' memory consumption during training. As described in `the Dask worker documentation`_, Dask workers will automatically start spilling data to Disk if memory consumption gets too high. This can substantially slow down computations, since disk I/O is usually much slower than reading the same data from memory. +Use the Dask diagnostic dashboard or your preferred monitoring tool to monitor Dask workers' memory consumption during training. As described in `the Dask worker documentation`_, Dask workers will automatically start spilling data to disk if memory consumption gets too high. This can substantially slow down computations, since disk I/O is usually much slower than reading the same data from memory. `At 60% of memory load, [Dask will] spill least recently used data to disk` @@ -137,7 +137,7 @@ LightGBM's Dask estimators support setting an attribute ``client`` to control th .. code:: python import lightgbm as lgb - from distributed import LocalCluster, Client + from distributed import Client, LocalCluster cluster = LocalCluster() client = Client(cluster) @@ -156,7 +156,7 @@ At the beginning of training, ``lightgbm.dask`` sets up a LightGBM network where If the communication between Dask workers in the cluster used for training is restricted by firewall rules, you must tell LightGBM exactly what ports to use. -**Option 1: Provide a specific list of addresses and ports** +**Option 1: provide a specific list of addresses and ports** LightGBM supports a parameter ``machines``, a comma-delimited string where each entry refers to one worker (host name or IP) and a port that that worker will accept connections on. If you provide this parameter to the estimators in ``lightgbm.dask``, LightGBM will not search randomly for ports. @@ -282,7 +282,7 @@ The estimators available from ``lightgbm.dask`` can be converted to an instance .. code:: python import dask.array as da - import pickle + import joblib import lightgbm as lgb from distributed import Client, LocalCluster @@ -301,16 +301,15 @@ The estimators available from ``lightgbm.dask`` can be converted to an instance print(type(sklearn_model)) #> lightgbm.sklearn.LGBMRegressor - with open("sklearn-model.pkl", "wb") as f: - pickle.dump(sklearn_model, f) + joblib.dump(sklearn_model, "sklearn-model.joblib") A model saved this way can then later be loaded with whichever serialization library you used to save it. .. code:: python - import pickle - with open("sklearn-model.pkl", "rb") as f: - sklearn_model = pickle.load(f) + import joblib + + sklearn_model = joblib.load("sklearn-model.joblib") **Option 3: save the LightGBM Booster** @@ -332,10 +331,10 @@ The lowest-level model object in LightGBM is the ``lightgbm.Booster``. After tra dask_model = lgb.DaskLGBMRegressor() dask_model.fit(X, y) - # convert to sklearn equivalent + # get underlying Booster object bst = dask_model.booster_ -From the point forward, you can use any of the following methods to save the Booster. +From the point forward, you can use any of the following methods to save the Booster: * serialize with ``cloudpickle``, ``joblib``, or ``pickle`` * ``bst.dump_model()``: dump the model to a dictionary which could be written out as JSON diff --git a/examples/dask/binary-classification.py b/examples/dask/binary-classification.py index 1ebb5724fbee..c62493ab1a3a 100644 --- a/examples/dask/binary-classification.py +++ b/examples/dask/binary-classification.py @@ -24,7 +24,7 @@ print("beginning training") - dask_model = lgb.DaskLGBMClassifier() + dask_model = lgb.DaskLGBMClassifier(n_estimators=10) dask_model.fit(dX, dy) print("done training") diff --git a/examples/dask/prediction.py b/examples/dask/prediction.py index 8281e6da64b8..fedcef123d9d 100644 --- a/examples/dask/prediction.py +++ b/examples/dask/prediction.py @@ -38,7 +38,7 @@ # predictions and target values back from workers to the client # # for larger datasets, consider the metrics from dask-ml instead - # https://github.com/dask/dask-ml/tree/main/dask_ml/metrics + # https://ml.dask.org/modules/api.html#dask-ml-metrics-metrics print("computing MSE") preds_local = preds.compute() diff --git a/examples/dask/ranking.py b/examples/dask/ranking.py index 0b17687027c9..4f3f210e220c 100644 --- a/examples/dask/ranking.py +++ b/examples/dask/ranking.py @@ -34,7 +34,7 @@ x=X, chunks=[ (rows_in_part1, rows_in_part2), - (num_features, ) + (num_features,) ] ) dy = da.from_array( From 457c363946f7eba6ad95831e457571a710106e1e Mon Sep 17 00:00:00 2001 From: James Lamb Date: Tue, 9 Mar 2021 10:47:08 -0600 Subject: [PATCH 13/15] Apply suggestions from code review Co-authored-by: Nikita Titov --- examples/dask/binary-classification.py | 1 + examples/dask/multiclass-classification.py | 3 ++- examples/dask/prediction.py | 3 ++- examples/dask/ranking.py | 3 ++- examples/dask/regression.py | 3 ++- 5 files changed, 9 insertions(+), 4 deletions(-) diff --git a/examples/dask/binary-classification.py b/examples/dask/binary-classification.py index c62493ab1a3a..4313e8da3ddb 100644 --- a/examples/dask/binary-classification.py +++ b/examples/dask/binary-classification.py @@ -26,5 +26,6 @@ dask_model = lgb.DaskLGBMClassifier(n_estimators=10) dask_model.fit(dX, dy) + assert dask_model.fitted_ print("done training") diff --git a/examples/dask/multiclass-classification.py b/examples/dask/multiclass-classification.py index e4154981021a..8e40b35a8121 100644 --- a/examples/dask/multiclass-classification.py +++ b/examples/dask/multiclass-classification.py @@ -24,7 +24,8 @@ print("beginning training") - dask_model = lgb.DaskLGBMClassifier() + dask_model = lgb.DaskLGBMClassifier(n_estimators=10) dask_model.fit(dX, dy) + assert dask_model.fitted_ print("done training") diff --git a/examples/dask/prediction.py b/examples/dask/prediction.py index fedcef123d9d..64e2bae0c08d 100644 --- a/examples/dask/prediction.py +++ b/examples/dask/prediction.py @@ -25,8 +25,9 @@ print("beginning training") - dask_model = lgb.DaskLGBMRegressor() + dask_model = lgb.DaskLGBMRegressor(n_estimators=10) dask_model.fit(dX, dy) + assert dask_model.fitted_ print("done training") diff --git a/examples/dask/ranking.py b/examples/dask/ranking.py index 4f3f210e220c..b7cae20a44c4 100644 --- a/examples/dask/ranking.py +++ b/examples/dask/ranking.py @@ -52,7 +52,8 @@ print("beginning training") - dask_model = lgb.DaskLGBMRanker() + dask_model = lgb.DaskLGBMRanker(n_estimators=10) dask_model.fit(dX, dy, group=dg) + assert dask_model.fitted_ print("done training") diff --git a/examples/dask/regression.py b/examples/dask/regression.py index 395d1e4dcce3..69a8f764732d 100644 --- a/examples/dask/regression.py +++ b/examples/dask/regression.py @@ -24,7 +24,8 @@ print("beginning training") - dask_model = lgb.DaskLGBMRegressor() + dask_model = lgb.DaskLGBMRegressor(n_estimators=10) dask_model.fit(dX, dy) + assert dask_model.fitted_ print("done training") From bef78243c2546f00ae1b98a7475f4517b6a877bb Mon Sep 17 00:00:00 2001 From: James Lamb Date: Tue, 9 Mar 2021 10:51:25 -0600 Subject: [PATCH 14/15] move examples under python-guide --- docs/Parallel-Learning-Guide.rst | 4 ++-- examples/python-guide/README.md | 1 + examples/{ => python-guide}/dask/README.md | 0 examples/{ => python-guide}/dask/binary-classification.py | 0 examples/{ => python-guide}/dask/multiclass-classification.py | 0 examples/{ => python-guide}/dask/prediction.py | 0 examples/{ => python-guide}/dask/ranking.py | 0 examples/{ => python-guide}/dask/regression.py | 0 8 files changed, 3 insertions(+), 2 deletions(-) rename examples/{ => python-guide}/dask/README.md (100%) rename examples/{ => python-guide}/dask/binary-classification.py (100%) rename examples/{ => python-guide}/dask/multiclass-classification.py (100%) rename examples/{ => python-guide}/dask/prediction.py (100%) rename examples/{ => python-guide}/dask/ranking.py (100%) rename examples/{ => python-guide}/dask/regression.py (100%) diff --git a/docs/Parallel-Learning-Guide.rst b/docs/Parallel-Learning-Guide.rst index 4144af3b6dde..c07909650c22 100644 --- a/docs/Parallel-Learning-Guide.rst +++ b/docs/Parallel-Learning-Guide.rst @@ -458,7 +458,7 @@ Example .. _the Dask DataFrame documentation: https://docs.dask.org/en/latest/dataframe.html -.. _the Dask prediction example: https://github.com/microsoft/lightgbm/tree/master/examples/dask/prediction.py +.. _the Dask prediction example: https://github.com/microsoft/lightgbm/tree/master/examples/python-guide/dask/prediction.py .. _the Dask worker documentation: https://distributed.dask.org/en/latest/worker.html#memory-management @@ -466,7 +466,7 @@ Example .. _the MMLSpark Documentation: https://github.com/Azure/mmlspark/blob/master/docs/lightgbm.md -.. _these Dask examples: https://github.com/microsoft/lightgbm/tree/master/examples/dask +.. _these Dask examples: https://github.com/microsoft/lightgbm/tree/master/examples/python-guide/dask .. _Kubeflow Fairing: https://www.kubeflow.org/docs/components/fairing/fairing-overview diff --git a/examples/python-guide/README.md b/examples/python-guide/README.md index aba3c9f51d7a..08ded17ab559 100644 --- a/examples/python-guide/README.md +++ b/examples/python-guide/README.md @@ -19,6 +19,7 @@ python simple_example.py Examples include: +- [`dask/`](./dask): examples using Dask for distributed training - [simple_example.py](https://github.com/microsoft/LightGBM/blob/master/examples/python-guide/simple_example.py) - Construct Dataset - Basic train and predict diff --git a/examples/dask/README.md b/examples/python-guide/dask/README.md similarity index 100% rename from examples/dask/README.md rename to examples/python-guide/dask/README.md diff --git a/examples/dask/binary-classification.py b/examples/python-guide/dask/binary-classification.py similarity index 100% rename from examples/dask/binary-classification.py rename to examples/python-guide/dask/binary-classification.py diff --git a/examples/dask/multiclass-classification.py b/examples/python-guide/dask/multiclass-classification.py similarity index 100% rename from examples/dask/multiclass-classification.py rename to examples/python-guide/dask/multiclass-classification.py diff --git a/examples/dask/prediction.py b/examples/python-guide/dask/prediction.py similarity index 100% rename from examples/dask/prediction.py rename to examples/python-guide/dask/prediction.py diff --git a/examples/dask/ranking.py b/examples/python-guide/dask/ranking.py similarity index 100% rename from examples/dask/ranking.py rename to examples/python-guide/dask/ranking.py diff --git a/examples/dask/regression.py b/examples/python-guide/dask/regression.py similarity index 100% rename from examples/dask/regression.py rename to examples/python-guide/dask/regression.py From 5d227b595ec8a4f5ac1ab80ea0f30b21f848b498 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Tue, 9 Mar 2021 10:52:43 -0600 Subject: [PATCH 15/15] remove unused pickle import --- docs/Parallel-Learning-Guide.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/Parallel-Learning-Guide.rst b/docs/Parallel-Learning-Guide.rst index c07909650c22..9486a0d8cd23 100644 --- a/docs/Parallel-Learning-Guide.rst +++ b/docs/Parallel-Learning-Guide.rst @@ -318,7 +318,6 @@ The lowest-level model object in LightGBM is the ``lightgbm.Booster``. After tra .. code:: python import dask.array as da - import pickle import lightgbm as lgb from distributed import Client, LocalCluster