From eee37f308e0c0ad8247bb57761e83399def6f24e Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 18 Sep 2024 14:54:33 -0700 Subject: [PATCH] more revisions --- docs/dask_cudf/source/best_practices.rst | 92 +++++++++++++++++++++--- 1 file changed, 83 insertions(+), 9 deletions(-) diff --git a/docs/dask_cudf/source/best_practices.rst b/docs/dask_cudf/source/best_practices.rst index e8a357ad53a..6bc807a1bbe 100644 --- a/docs/dask_cudf/source/best_practices.rst +++ b/docs/dask_cudf/source/best_practices.rst @@ -48,7 +48,7 @@ documentation for detailed information. Please see `RAPIDS-deployment documentation `__ for further details and examples. -Enable cuDF Spilling +Enable cuDF spilling ~~~~~~~~~~~~~~~~~~~~ When using Dask cuDF for classic ETL workloads, it is usually best @@ -83,9 +83,65 @@ Use the Dask DataFrame API Although Dask cuDF provides a public ``dask_cudf`` Python module, we strongly recommended that you use the CPU/GPU portable ``dask.dataframe`` -API instead. Simply use the `Dask configuration `__ -system to set the ``"dataframe.backend"`` option to ``"cudf"``, and -the ``dask_cudf`` module will be imported and used implicitly. +API instead. Simply `use the Dask configuration system +`__ +to set the ``"dataframe.backend"`` option to ``"cudf"``, and the +``dask_cudf`` module will be imported and used implicitly. + +Avoid eager execution +~~~~~~~~~~~~~~~~~~~~~ + +Although Dask DataFrame collections are lazy by default, there are several +notable methods that will result in the immediate execution of the +underlying task graph:: + +:func:`compute`: Calling ``ddf.compute()`` will materialize the result of +``ddf`` and return a single cuDF object. This is done by executing the entire +task graph associated with ``ddf`` and concatenating its partitions in +local memory on the client process. + +.. note:: + Never call :func:`compute` on a large collection that cannot fit comfortably + in the memory of a single GPU! + +:func:`persist`: Like :func:`compute`, calling ``ddf.persist()`` will +executing the entire task graph associated with ``ddf``. The important difference +is that the computed partitions will remain in distributed worker memory instead +of being concatenated together on the client process. + +.. note:: + Avoid calling :func:`persist` on a large collection that cannot fit comfortably + in the global worker memory. If the total sum of the partition sizes is larger + than the sum of all GPU memory, calling persist will result in significant + spilling from device memory. If the individual partition sizes are large, this + is likely to produce an OOM error. + +:func:`len` / :func:`head` / :func:`tail`: Although these operations are used +often within pandas/cuDF code to quickly inspect the data, it is best to avoid +them in Dask DataFrame. In most cases, these operations need execute a portion +of the underlying task graph to materialize the collection. + +:func:`sort_values` / :func:`set_index` : These operations both require Dask to +eagerly collect quantile information about the column(s) being targeted by the +global sort operation. See `Avoid Sorting`__ for notes on sorting considerations. + +.. note:: + When using :func:`set_index`, be sure to pass in ``sort=False`` whenever the + global collection does not **need** to be sorted by the new index. + +Avoid Sorting +~~~~~~~~~~~~~ + +`The design of Dask DataFrame `__ +makes it advantageous to work with data that is already sorted along its index at +creation time. For most other cases, it's best to avoid sorting unless the logic +of your workflow makes global ordering absolutely necessary. + +If the purpose of your :func:`sort_values` operation is to ensure that all unique +values in ``by`` will be moved to the same output partition, then `shuffle +`__ +is often the better option. + Reading Data ------------ @@ -133,8 +189,8 @@ output partition. This mapping will only account for the uncompressed storage size of each row group, which is usually smaller than the correspondng ``cudf.DataFrame``. -``aggregate_files``: Use this argument to specify whether Dask is allowed -to map multiple files to the same DataFrame partition. The default is +``aggregate_files``: Use this argument to specify whether Dask should +map multiple files to the same DataFrame partition. The default is ``False``, but ``aggregate_files=True`` is usually more performant when the dataset contains many files that are smaller than half of ``blocksize``. @@ -142,8 +198,15 @@ the dataset contains many files that are smaller than half of ``blocksize``. Metadata collection can be extremely slow when reading from remote storage (e.g. S3 and GCS). When reading many remote files that all correspond to a reasonable partition size, it's usually best to set - `blocksize=None` and `aggregate_files=False`. In most cases, these - settings allow Dask to skip the metadata-collection stage altogether. + ``blocksize=None``. In most cases, this setting allows Dask to skip + the metadata-collection stage altogether. + +.. note:: + If your workflow requires a strict 1-to-1 mapping between files and + partitions, use :func:`from_map` to manually construct your partitions + with ``cudf.read_parquet``. When :func:`dd.read_parquet` is used, + query-planning optimizations may automatically aggregate distinct files + into the same partition (even if ``aggregate_files=False``). Use :func:`from_map` @@ -160,6 +223,11 @@ over :func:`from_delayed`:: See the `from_map API documentation `__ for more details. +.. note:: + Whenever possible, be sure to specify the ``meta`` argument to + :func:`from_map`. Dask will need to materialize the first partition + eagerly if this argument is excluded. + Sorting, Joining and Grouping ----------------------------- @@ -179,7 +247,13 @@ are often recommended:: * Avoid shuffling whenever possible * Use ``split_out=1`` for low-cardinality groupby aggregations * Use ``broadcast=True`` for joins when at least one collection comprises a small number of partitions (e.g. ``<=5``) -* `Use UCX `__ if communication is a bottleneck +* `Use UCX `__ if communication is a bottleneck. + +.. note:: + UCX enables Dask-CUDA workers to communicate with high-performance + tansport technologies like `NVLink `__ + and Infiniband. Without UCX, inter-process communication will rely + on TCP sockets. User-defined functions