Skip to content

Commit

Permalink
more revisions
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Sep 18, 2024
1 parent 3332717 commit eee37f3
Showing 1 changed file with 83 additions and 9 deletions.
92 changes: 83 additions & 9 deletions docs/dask_cudf/source/best_practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ documentation for detailed information.
Please see `RAPIDS-deployment documentation <https://docs.rapids.ai/deployment/stable/>`__
for further details and examples.

Enable cuDF Spilling
Enable cuDF spilling
~~~~~~~~~~~~~~~~~~~~

When using Dask cuDF for classic ETL workloads, it is usually best
Expand Down Expand Up @@ -83,9 +83,65 @@ Use the Dask DataFrame API

Although Dask cuDF provides a public ``dask_cudf`` Python module, we
strongly recommended that you use the CPU/GPU portable ``dask.dataframe``
API instead. Simply use the `Dask configuration <dask:configuration>`__
system to set the ``"dataframe.backend"`` option to ``"cudf"``, and
the ``dask_cudf`` module will be imported and used implicitly.
API instead. Simply `use the Dask configuration system
<https://docs.dask.org/en/stable/how-to/selecting-the-collection-backend.html>`__
to set the ``"dataframe.backend"`` option to ``"cudf"``, and the
``dask_cudf`` module will be imported and used implicitly.

Avoid eager execution
~~~~~~~~~~~~~~~~~~~~~

Although Dask DataFrame collections are lazy by default, there are several
notable methods that will result in the immediate execution of the
underlying task graph::

:func:`compute`: Calling ``ddf.compute()`` will materialize the result of
``ddf`` and return a single cuDF object. This is done by executing the entire
task graph associated with ``ddf`` and concatenating its partitions in
local memory on the client process.

.. note::
Never call :func:`compute` on a large collection that cannot fit comfortably
in the memory of a single GPU!

:func:`persist`: Like :func:`compute`, calling ``ddf.persist()`` will
executing the entire task graph associated with ``ddf``. The important difference
is that the computed partitions will remain in distributed worker memory instead
of being concatenated together on the client process.

.. note::
Avoid calling :func:`persist` on a large collection that cannot fit comfortably
in the global worker memory. If the total sum of the partition sizes is larger
than the sum of all GPU memory, calling persist will result in significant
spilling from device memory. If the individual partition sizes are large, this
is likely to produce an OOM error.

:func:`len` / :func:`head` / :func:`tail`: Although these operations are used
often within pandas/cuDF code to quickly inspect the data, it is best to avoid
them in Dask DataFrame. In most cases, these operations need execute a portion
of the underlying task graph to materialize the collection.

:func:`sort_values` / :func:`set_index` : These operations both require Dask to
eagerly collect quantile information about the column(s) being targeted by the
global sort operation. See `Avoid Sorting`__ for notes on sorting considerations.

.. note::
When using :func:`set_index`, be sure to pass in ``sort=False`` whenever the
global collection does not **need** to be sorted by the new index.

Avoid Sorting
~~~~~~~~~~~~~

`The design of Dask DataFrame <https://docs.dask.org/en/stable/dataframe-design.html#dask-dataframe-design>`__
makes it advantageous to work with data that is already sorted along its index at
creation time. For most other cases, it's best to avoid sorting unless the logic
of your workflow makes global ordering absolutely necessary.

If the purpose of your :func:`sort_values` operation is to ensure that all unique
values in ``by`` will be moved to the same output partition, then `shuffle
<https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.shuffle.html>`__
is often the better option.


Reading Data
------------
Expand Down Expand Up @@ -133,17 +189,24 @@ output partition. This mapping will only account for the uncompressed
storage size of each row group, which is usually smaller than the
correspondng ``cudf.DataFrame``.

``aggregate_files``: Use this argument to specify whether Dask is allowed
to map multiple files to the same DataFrame partition. The default is
``aggregate_files``: Use this argument to specify whether Dask should
map multiple files to the same DataFrame partition. The default is
``False``, but ``aggregate_files=True`` is usually more performant when
the dataset contains many files that are smaller than half of ``blocksize``.

.. note::
Metadata collection can be extremely slow when reading from remote
storage (e.g. S3 and GCS). When reading many remote files that all
correspond to a reasonable partition size, it's usually best to set
`blocksize=None` and `aggregate_files=False`. In most cases, these
settings allow Dask to skip the metadata-collection stage altogether.
``blocksize=None``. In most cases, this setting allows Dask to skip
the metadata-collection stage altogether.

.. note::
If your workflow requires a strict 1-to-1 mapping between files and
partitions, use :func:`from_map` to manually construct your partitions
with ``cudf.read_parquet``. When :func:`dd.read_parquet` is used,
query-planning optimizations may automatically aggregate distinct files
into the same partition (even if ``aggregate_files=False``).


Use :func:`from_map`
Expand All @@ -160,6 +223,11 @@ over :func:`from_delayed`::
See the `from_map API documentation <https://docs.dask.org/en/stable/generated/dask_expr.from_map.html#dask_expr.from_map>`__
for more details.

.. note::
Whenever possible, be sure to specify the ``meta`` argument to
:func:`from_map`. Dask will need to materialize the first partition
eagerly if this argument is excluded.


Sorting, Joining and Grouping
-----------------------------
Expand All @@ -179,7 +247,13 @@ are often recommended::
* Avoid shuffling whenever possible
* Use ``split_out=1`` for low-cardinality groupby aggregations
* Use ``broadcast=True`` for joins when at least one collection comprises a small number of partitions (e.g. ``<=5``)
* `Use UCX <https://docs.rapids.ai/api/dask-cuda/nightly/examples/ucx/>`__ if communication is a bottleneck
* `Use UCX <https://docs.rapids.ai/api/dask-cuda/nightly/examples/ucx/>`__ if communication is a bottleneck.

.. note::
UCX enables Dask-CUDA workers to communicate with high-performance
tansport technologies like `NVLink <https://www.nvidia.com/en-us/data-center/nvlink/>`__
and Infiniband. Without UCX, inter-process communication will rely
on TCP sockets.


User-defined functions
Expand Down

0 comments on commit eee37f3

Please sign in to comment.