diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8b0cb14f6..df09243b7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,28 @@
-# dask-cuda 22.08.00 (17 Aug 2022)
+# dask-cuda 22.10.00 (12 Oct 2022)
+
+## 🐛 Bug Fixes
+
+- Revert "Update rearrange_by_column patch for explicit comms" ([#1001](https://github.com/rapidsai/dask-cuda/pull/1001)) [@rjzamora](https://github.com/rjzamora)
+- Address CI failures caused by upstream distributed and cupy changes ([#993](https://github.com/rapidsai/dask-cuda/pull/993)) [@rjzamora](https://github.com/rjzamora)
+- DeviceSerialized.__reduce_ex__: convert frame to numpy arrays ([#977](https://github.com/rapidsai/dask-cuda/pull/977)) [@madsbk](https://github.com/madsbk)
+
+## 📖 Documentation
+
+- Remove line-break that's breaking link ([#982](https://github.com/rapidsai/dask-cuda/pull/982)) [@ntabris](https://github.com/ntabris)
+- Dask-cuda best practices ([#976](https://github.com/rapidsai/dask-cuda/pull/976)) [@quasiben](https://github.com/quasiben)
+## 🚀 New Features
+
+- Add Groupby benchmark ([#979](https://github.com/rapidsai/dask-cuda/pull/979)) [@rjzamora](https://github.com/rjzamora)
+
+## 🛠️ Improvements
+
+- Pin `dask` and `distributed` for release ([#1003](https://github.com/rapidsai/dask-cuda/pull/1003)) [@galipremsagar](https://github.com/galipremsagar)
+- Update rearrange_by_column patch for explicit comms ([#992](https://github.com/rapidsai/dask-cuda/pull/992)) [@rjzamora](https://github.com/rjzamora)
+- benchmarks: Add option to suppress output of point to point data ([#985](https://github.com/rapidsai/dask-cuda/pull/985)) [@wence-](https://github.com/wence-)
+- Unpin `dask` and `distributed` for development ([#971](https://github.com/rapidsai/dask-cuda/pull/971)) [@galipremsagar](https://github.com/galipremsagar)
+
+# dask-cuda 22.08.00 (17 Aug 2022)
## 🚨 Breaking Changes
- Fix useless property ([#944](https://github.com/rapidsai/dask-cuda/pull/944)) [@wence-](https://github.com/wence-)
diff --git a/ci/cpu/build.sh b/ci/cpu/build.sh
index d2450cfe8..e468b1cb1 100755
--- a/ci/cpu/build.sh
+++ b/ci/cpu/build.sh
@@ -19,6 +19,10 @@ export CUDA_REL=${CUDA_VERSION%.*}
export GPUCI_CONDA_RETRY_MAX=1
export GPUCI_CONDA_RETRY_SLEEP=30
+# Whether to keep `dask/label/dev` channel in the env. If INSTALL_DASK_MAIN=0,
+# `dask/label/dev` channel is removed.
+export INSTALL_DASK_MAIN=0
+
# Switch to project root; also root of repo checkout
cd "$WORKSPACE"
@@ -43,9 +47,13 @@ gpuci_logger "Activate conda env"
. /opt/conda/etc/profile.d/conda.sh
conda activate rapids
-# Remove rapidsai-nightly channel if we are building main branch
+# Remove `rapidsai-nightly` & `dask/label/dev` channel if we are building main branch
if [ "$SOURCE_BRANCH" = "main" ]; then
conda config --system --remove channels rapidsai-nightly
+ conda config --system --remove channels dask/label/dev
+elif [[ "${INSTALL_DASK_MAIN}" == 0 ]]; then
+# Remove `dask/label/dev` channel if INSTALL_DASK_MAIN=0
+ conda config --system --remove channels dask/label/dev
fi
gpuci_logger "Check compiler versions"
@@ -61,8 +69,8 @@ conda list --show-channel-urls
# FIX Added to deal with Anancoda SSL verification issues during conda builds
conda config --set ssl_verify False
-pip install git+https://github.com/dask/dask.git@main
-pip install git+https://github.com/dask/distributed.git@main
+pip install git+https://github.com/dask/dask.git@2022.9.2
+pip install git+https://github.com/dask/distributed.git@2022.9.2
################################################################################
# BUILD - Package builds
diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh
index fb050a6d3..6db58ec0d 100755
--- a/ci/gpu/build.sh
+++ b/ci/gpu/build.sh
@@ -26,7 +26,7 @@ cd "$WORKSPACE"
export GIT_DESCRIBE_TAG=`git describe --tags`
export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'`
export UCX_PATH=$CONDA_PREFIX
-export UCXPY_VERSION=0.27.*
+export UCXPY_VERSION=0.28.*
unset GIT_DESCRIBE_TAG
# Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x,
@@ -38,7 +38,7 @@ export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1
export INSTALL_DASK_MAIN=0
# Dask version to install when `INSTALL_DASK_MAIN=0`
-export DASK_STABLE_VERSION="2022.7.1"
+export DASK_STABLE_VERSION="2022.9.2"
################################################################################
# SETUP - Check environment
@@ -77,6 +77,7 @@ if [[ "${INSTALL_DASK_MAIN}" == 1 ]]; then
else
gpuci_logger "gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall"
gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall
+ conda config --system --remove channels dask/label/dev
fi
diff --git a/dask_cuda/benchmarks/local_cudf_groupby.py b/dask_cuda/benchmarks/local_cudf_groupby.py
new file mode 100644
index 000000000..379ff9309
--- /dev/null
+++ b/dask_cuda/benchmarks/local_cudf_groupby.py
@@ -0,0 +1,273 @@
+import contextlib
+from collections import ChainMap
+from time import perf_counter as clock
+
+import pandas as pd
+
+import dask
+import dask.dataframe as dd
+from dask.distributed import performance_report, wait
+from dask.utils import format_bytes, parse_bytes
+
+from dask_cuda.benchmarks.common import Config, execute_benchmark
+from dask_cuda.benchmarks.utils import (
+ parse_benchmark_args,
+ print_key_value,
+ print_separator,
+ print_throughput_bandwidth,
+)
+
+
+def apply_groupby(
+ df,
+ sort=False,
+ split_out=1,
+ split_every=8,
+ shuffle=None,
+):
+ # Handle special "explicit-comms" case
+ config = {}
+ if shuffle == "explicit-comms":
+ shuffle = "tasks"
+ config = {"explicit-comms": True}
+
+ with dask.config.set(config):
+ agg = df.groupby("key", sort=sort).agg(
+ {"int64": ["max", "count"], "float64": "mean"},
+ split_out=split_out,
+ split_every=split_every,
+ shuffle=shuffle,
+ )
+
+ wait(agg.persist())
+ return agg
+
+
+def generate_chunk(chunk_info, unique_size=1, gpu=True):
+ # Setting a seed that triggers max amount of comm in the two-GPU case.
+ if gpu:
+ import cupy as xp
+
+ import cudf as xdf
+ else:
+ import numpy as xp
+ import pandas as xdf
+
+ i_chunk, local_size = chunk_info
+ xp.random.seed(i_chunk * 1_000)
+ return xdf.DataFrame(
+ {
+ "key": xp.random.randint(0, unique_size, size=local_size, dtype="int64"),
+ "int64": xp.random.permutation(xp.arange(local_size, dtype="int64")),
+ "float64": xp.random.permutation(xp.arange(local_size, dtype="float64")),
+ }
+ )
+
+
+def get_random_ddf(args):
+
+ total_size = args.chunk_size * args.in_parts
+ chunk_kwargs = {
+ "unique_size": max(int(args.unique_ratio * total_size), 1),
+ "gpu": True if args.type == "gpu" else False,
+ }
+
+ return dd.from_map(
+ generate_chunk,
+ [(i, args.chunk_size) for i in range(args.in_parts)],
+ meta=generate_chunk((0, 1), **chunk_kwargs),
+ enforce_metadata=False,
+ **chunk_kwargs,
+ )
+
+
+def bench_once(client, args, write_profile=None):
+
+ # Generate random Dask dataframe
+ df = get_random_ddf(args)
+
+ data_processed = len(df) * sum([t.itemsize for t in df.dtypes])
+ shuffle = {
+ "True": "tasks",
+ "False": False,
+ }.get(args.shuffle, args.shuffle)
+
+ if write_profile is None:
+ ctx = contextlib.nullcontext()
+ else:
+ ctx = performance_report(filename=args.profile)
+
+ with ctx:
+ t1 = clock()
+ agg = apply_groupby(
+ df,
+ sort=args.sort,
+ split_out=args.split_out,
+ split_every=args.split_every,
+ shuffle=shuffle,
+ )
+ t2 = clock()
+
+ output_size = agg.memory_usage(index=True, deep=True).compute().sum()
+ return (data_processed, output_size, t2 - t1)
+
+
+def pretty_print_results(args, address_to_index, p2p_bw, results):
+ if args.markdown:
+ print("```")
+ print("Groupby benchmark")
+ print_separator(separator="-")
+ print_key_value(key="Use shuffle", value=f"{args.shuffle}")
+ print_key_value(key="Output partitions", value=f"{args.split_out}")
+ print_key_value(key="Input partitions", value=f"{args.in_parts}")
+ print_key_value(key="Sort Groups", value=f"{args.sort}")
+ print_key_value(key="Rows-per-chunk", value=f"{args.chunk_size}")
+ print_key_value(key="Unique-group ratio", value=f"{args.unique_ratio}")
+ print_key_value(key="Protocol", value=f"{args.protocol}")
+ print_key_value(key="Device(s)", value=f"{args.devs}")
+ print_key_value(key="Tree-reduction width", value=f"{args.split_every}")
+ if args.device_memory_limit:
+ print_key_value(
+ key="Device memory limit", value=f"{format_bytes(args.device_memory_limit)}"
+ )
+ print_key_value(key="RMM Pool", value=f"{not args.disable_rmm_pool}")
+ if args.protocol == "ucx":
+ print_key_value(key="TCP", value=f"{args.enable_tcp_over_ucx}")
+ print_key_value(key="InfiniBand", value=f"{args.enable_infiniband}")
+ print_key_value(key="NVLink", value=f"{args.enable_nvlink}")
+ print_key_value(key="Worker thread(s)", value=f"{args.threads_per_worker}")
+ print_key_value(key="Data processed", value=f"{format_bytes(results[0][0])}")
+ print_key_value(key="Output size", value=f"{format_bytes(results[0][1])}")
+ if args.markdown:
+ print("\n```")
+ data_processed, output_size, durations = zip(*results)
+ print_throughput_bandwidth(
+ args, durations, data_processed, p2p_bw, address_to_index
+ )
+
+
+def create_tidy_results(args, p2p_bw, results):
+ configuration = {
+ "dataframe_type": "cudf" if args.type == "gpu" else "pandas",
+ "shuffle": args.shuffle,
+ "sort": args.sort,
+ "split_out": args.split_out,
+ "split_every": args.split_every,
+ "in_parts": args.in_parts,
+ "rows_per_chunk": args.chunk_size,
+ "unique_ratio": args.unique_ratio,
+ "protocol": args.protocol,
+ "devs": args.devs,
+ "device_memory_limit": args.device_memory_limit,
+ "rmm_pool": not args.disable_rmm_pool,
+ "tcp": args.enable_tcp_over_ucx,
+ "ib": args.enable_infiniband,
+ "nvlink": args.enable_nvlink,
+ }
+ timing_data = pd.DataFrame(
+ [
+ pd.Series(
+ data=ChainMap(
+ configuration,
+ {
+ "wallclock": duration,
+ "data_processed": data_processed,
+ "output_size": output_size,
+ },
+ )
+ )
+ for data_processed, output_size, duration in results
+ ]
+ )
+ return timing_data, p2p_bw
+
+
+def parse_args():
+ special_args = [
+ {
+ "name": "--in-parts",
+ "default": 100,
+ "metavar": "n",
+ "type": int,
+ "help": "Number of input partitions (default '100')",
+ },
+ {
+ "name": [
+ "-c",
+ "--chunk-size",
+ ],
+ "default": 1_000_000,
+ "metavar": "n",
+ "type": int,
+ "help": "Chunk size (default 1_000_000)",
+ },
+ {
+ "name": "--unique-ratio",
+ "default": 0.01,
+ "type": float,
+ "help": "Fraction of rows that are unique groups",
+ },
+ {
+ "name": "--sort",
+ "default": False,
+ "action": "store_true",
+ "help": "Whether to sort the output group order.",
+ },
+ {
+ "name": "--split_out",
+ "default": 1,
+ "type": int,
+ "help": "How many partitions to return.",
+ },
+ {
+ "name": "--split_every",
+ "default": 8,
+ "type": int,
+ "help": "Tree-reduction width.",
+ },
+ {
+ "name": "--shuffle",
+ "choices": ["False", "True", "tasks", "explicit-comms"],
+ "default": "False",
+ "type": str,
+ "help": "Whether to use shuffle-based groupby.",
+ },
+ {
+ "name": [
+ "-t",
+ "--type",
+ ],
+ "choices": ["cpu", "gpu"],
+ "default": "gpu",
+ "type": str,
+ "help": "Do shuffle with GPU or CPU dataframes (default 'gpu')",
+ },
+ {
+ "name": "--ignore-size",
+ "default": "1 MiB",
+ "metavar": "nbytes",
+ "type": parse_bytes,
+ "help": "Ignore messages smaller than this (default '1 MB')",
+ },
+ {
+ "name": "--runs",
+ "default": 3,
+ "type": int,
+ "help": "Number of runs",
+ },
+ ]
+
+ return parse_benchmark_args(
+ description="Distributed groupby (dask/cudf) benchmark", args_list=special_args
+ )
+
+
+if __name__ == "__main__":
+ execute_benchmark(
+ Config(
+ args=parse_args(),
+ bench_once=bench_once,
+ create_tidy_results=create_tidy_results,
+ pretty_print_results=pretty_print_results,
+ )
+ )
diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py
index 8b110e8c1..344549806 100644
--- a/dask_cuda/benchmarks/utils.py
+++ b/dask_cuda/benchmarks/utils.py
@@ -185,6 +185,11 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]
"Note: --devs is currently ignored in multi-node mode and for each host "
"one worker per GPU will be launched.",
)
+ parser.add_argument(
+ "--no-show-p2p-bandwidth",
+ action="store_true",
+ help="Do not produce detailed point to point bandwidth stats in output",
+ )
parser.add_argument(
"--all-to-all",
action="store_true",
@@ -549,28 +554,29 @@ def print_throughput_bandwidth(
key="Wall clock",
value=f"{format_time(durations.mean())} +/- {format_time(durations.std()) }",
)
- print_separator(separator="=")
- if args.markdown:
- print("\nWorker-Worker Transfer Rates
\n\n```")
-
- print_key_value(key="(w1,w2)", value="25% 50% 75% (total nbytes)")
- print_separator(separator="-")
- for (source, dest) in np.ndindex(p2p_bw.shape[:2]):
- bw = BandwidthStats(*p2p_bw[source, dest, ...])
- if bw.total_bytes > 0:
- print_key_value(
- key=f"({source},{dest})",
- value=f"{format_bytes(bw.q25)}/s {format_bytes(bw.q50)}/s "
- f"{format_bytes(bw.q75)}/s ({format_bytes(bw.total_bytes)})",
- )
- print_separator(separator="=")
- print_key_value(key="Worker index", value="Worker address")
- print_separator(separator="-")
- for address, index in sorted(address_to_index.items(), key=itemgetter(1)):
- print_key_value(key=index, value=address)
- print_separator(separator="=")
- if args.markdown:
- print("```\n \n")
+ if not args.no_show_p2p_bandwidth:
+ print_separator(separator="=")
+ if args.markdown:
+ print("\nWorker-Worker Transfer Rates
\n\n```")
+
+ print_key_value(key="(w1,w2)", value="25% 50% 75% (total nbytes)")
+ print_separator(separator="-")
+ for (source, dest) in np.ndindex(p2p_bw.shape[:2]):
+ bw = BandwidthStats(*p2p_bw[source, dest, ...])
+ if bw.total_bytes > 0:
+ print_key_value(
+ key=f"({source},{dest})",
+ value=f"{format_bytes(bw.q25)}/s {format_bytes(bw.q50)}/s "
+ f"{format_bytes(bw.q75)}/s ({format_bytes(bw.total_bytes)})",
+ )
+ print_separator(separator="=")
+ print_key_value(key="Worker index", value="Worker address")
+ print_separator(separator="-")
+ for address, index in sorted(address_to_index.items(), key=itemgetter(1)):
+ print_key_value(key=index, value=address)
+ print_separator(separator="=")
+ if args.markdown:
+ print("```\n \n")
if args.plot:
plot_benchmark(throughputs, args.plot, historical=True)
diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py
index ae8e53de7..e89ba64b2 100644
--- a/dask_cuda/device_host_file.py
+++ b/dask_cuda/device_host_file.py
@@ -3,6 +3,7 @@
import os
import time
+import numpy
from zict import Buffer, File, Func
from zict.common import ZictBase
@@ -115,7 +116,11 @@ def __sizeof__(self):
def __reduce_ex__(self, protocol):
header, frames = device_serialize(self)
- frames = [f.obj for f in frames]
+ # Since pickle cannot handle memoryviews, we convert them
+ # to NumPy arrays (zero-copy).
+ frames = [
+ (numpy.asarray(f) if isinstance(f, memoryview) else f) for f in frames
+ ]
return device_deserialize, (header, frames)
diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py
index 0c1f9a7fc..2c21023ce 100644
--- a/dask_cuda/tests/test_proxy.py
+++ b/dask_cuda/tests/test_proxy.py
@@ -7,6 +7,7 @@
import numpy as np
import pandas
import pytest
+from packaging import version
from pandas.testing import assert_frame_equal, assert_series_equal
import dask
@@ -649,6 +650,8 @@ def test_cupy_broadcast_to():
def test_cupy_matmul():
cupy = pytest.importorskip("cupy")
+ if version.parse(cupy.__version__) >= version.parse("11.0"):
+ pytest.xfail("See: https://github.com/rapidsai/dask-cuda/issues/995")
a, b = cupy.arange(10), cupy.arange(10)
c = a @ b
assert c == proxy_object.asproxy(a) @ b
@@ -658,6 +661,8 @@ def test_cupy_matmul():
def test_cupy_imatmul():
cupy = pytest.importorskip("cupy")
+ if version.parse(cupy.__version__) >= version.parse("11.0"):
+ pytest.xfail("See: https://github.com/rapidsai/dask-cuda/issues/995")
a = cupy.arange(9).reshape(3, 3)
c = a.copy()
c @= a
diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py
index 73f211d1d..f93b83ec7 100644
--- a/dask_cuda/tests/test_spill.py
+++ b/dask_cuda/tests/test_spill.py
@@ -1,6 +1,5 @@
import os
from time import sleep
-from unittest.mock import patch
import pytest
from zict.file import _safe_key as safe_key
@@ -207,57 +206,58 @@ async def test_cupy_cluster_device_spill(params):
async def test_cudf_cluster_device_spill(params):
cudf = pytest.importorskip("cudf")
- # Disabling compression via environment variable seems to be the only way
- # respected here. It is necessary to ensure spilled size matches the actual
- # data size.
- with patch.dict(os.environ, {"DASK_DISTRIBUTED__COMM__COMPRESSION": "False"}):
- with dask.config.set({"distributed.worker.memory.terminate": False}):
- async with LocalCUDACluster(
- n_workers=1,
- device_memory_limit=params["device_memory_limit"],
- memory_limit=params["memory_limit"],
- memory_target_fraction=params["host_target"],
- memory_spill_fraction=params["host_spill"],
- memory_pause_fraction=params["host_pause"],
- asynchronous=True,
- ) as cluster:
- async with Client(cluster, asynchronous=True) as client:
-
- # There's a known issue with datetime64:
- # https://github.com/numpy/numpy/issues/4983#issuecomment-441332940
- # The same error above happens when spilling datetime64 to disk
- cdf = (
- dask.datasets.timeseries(
- dtypes={"x": int, "y": float}, freq="400ms"
- )
- .reset_index(drop=True)
- .map_partitions(cudf.from_pandas)
- )
+ with dask.config.set(
+ {
+ "distributed.comm.compression": False,
+ "distributed.worker.memory.terminate": False,
+ }
+ ):
+ async with LocalCUDACluster(
+ n_workers=1,
+ device_memory_limit=params["device_memory_limit"],
+ memory_limit=params["memory_limit"],
+ memory_target_fraction=params["host_target"],
+ memory_spill_fraction=params["host_spill"],
+ memory_pause_fraction=params["host_pause"],
+ asynchronous=True,
+ ) as cluster:
+ async with Client(cluster, asynchronous=True) as client:
- sizes = await client.compute(
- cdf.map_partitions(lambda df: df.memory_usage())
+ # There's a known issue with datetime64:
+ # https://github.com/numpy/numpy/issues/4983#issuecomment-441332940
+ # The same error above happens when spilling datetime64 to disk
+ cdf = (
+ dask.datasets.timeseries(
+ dtypes={"x": int, "y": float}, freq="400ms"
)
- sizes = sizes.to_arrow().to_pylist()
- nbytes = sum(sizes)
+ .reset_index(drop=True)
+ .map_partitions(cudf.from_pandas)
+ )
- cdf2 = cdf.persist()
- await wait(cdf2)
+ sizes = await client.compute(
+ cdf.map_partitions(lambda df: df.memory_usage())
+ )
+ sizes = sizes.to_arrow().to_pylist()
+ nbytes = sum(sizes)
- del cdf
+ cdf2 = cdf.persist()
+ await wait(cdf2)
- host_chunks = await client.run(lambda: len(get_worker().data.host))
- disk_chunks = await client.run(
- lambda: len(get_worker().data.disk or list())
- )
- for hc, dc in zip(host_chunks.values(), disk_chunks.values()):
- if params["spills_to_disk"]:
- assert dc > 0
- else:
- assert hc > 0
- assert dc == 0
+ del cdf
+
+ host_chunks = await client.run(lambda: len(get_worker().data.host))
+ disk_chunks = await client.run(
+ lambda: len(get_worker().data.disk or list())
+ )
+ for hc, dc in zip(host_chunks.values(), disk_chunks.values()):
+ if params["spills_to_disk"]:
+ assert dc > 0
+ else:
+ assert hc > 0
+ assert dc == 0
- await client.run(worker_assert, nbytes, 32, 2048)
+ await client.run(worker_assert, nbytes, 32, 2048)
- del cdf2
+ del cdf2
- await client.run(delayed_worker_assert, 0, 0, 0)
+ await client.run(delayed_worker_assert, 0, 0, 0)
diff --git a/docs/source/examples/best-practices.rst b/docs/source/examples/best-practices.rst
new file mode 100644
index 000000000..242e90fff
--- /dev/null
+++ b/docs/source/examples/best-practices.rst
@@ -0,0 +1,117 @@
+Best Practices
+==============
+
+
+Multi-GPU Machines
+~~~~~~~~~~~~~~~~~~
+
+When choosing between two multi-GPU setups, it is best to pick the one where most GPUs are co-located with one-another. This could be a
+`DGX `_, a cloud instance with `multi-gpu options `_ , a high-density GPU HPC instance, etc. This is done for two reasons:
+
+- Moving data between GPUs is costly and performance decreases when computation stops due to communication overheads, Host-to-Device/Device-to-Host transfers, etc
+- Multi-GPU instances often come with accelerated networking like `NVLink `_. These accelerated
+networking paths usually have much higher throughput/bandwidth compared with traditional networking *and* don't force and Host-to-Device/Device-to-Host transfers. See
+`Accelerated Networking`_ for more discussion
+
+.. code-block:: python
+
+ from dask_cuda import LocalCUDACluster
+
+ cluster = LocalCUDACluster(n_workers=2) # will use GPUs 0,1
+ cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="3,4") # will use GPUs 3,4
+
+For more discussion on controlling number of workers/using multiple GPUs see :ref:`controlling-number-of-workers` .
+
+GPU Memory Management
+~~~~~~~~~~~~~~~~~~~~~
+
+When using Dask-CUDA, especially with RAPIDS, it's best to use an |rmm-pool|__ to pre-allocate memory on the GPU. Allocating memory, while fast, takes a small amount of time, however, one can easily make
+hundreds of thousand or even millions of allocations in trivial workflows causing significant performance degradations. With an RMM pool, allocations are sub-sampled from a larger pool and this greatly reduces the allocation time and thereby increases performance:
+
+
+ .. |rmm-pool| replace:: :abbr:`RMM (RAPIDS Memory Manager)` pool
+ __ https://docs.rapids.ai/api/rmm/stable/
+
+
+.. code-block:: python
+
+ from dask_cuda import LocalCUDACluster
+
+ cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1",
+ protocol="ucx",
+ rmm_pool_size="30GB")
+
+
+We also recommend allocating most, though not all, of the GPU memory space. We do this because the `CUDA Context `_ takes a non-zero amount (typically 200-500 MBs) of GPU RAM on the device.
+
+Additionally, when using `Accelerated Networking`_ , we only need to register a single IPC handle for the whole pool (which is expensive, but only done once) since from the IPC point of viewer there's only a single allocation. As opposed to just using RMM without a pool where each new allocation must be registered with IPC.
+
+Accelerated Networking
+~~~~~~~~~~~~~~~~~~~~~~
+
+As discussed in `Multi-GPU Machines`_, accelerated networking has better bandwidth/throughput compared with traditional networking hardware and does
+not force any costly Host-to-Device/Device-to-Host transfers. Dask-CUDA can leverage accelerated networking hardware with `UCX-Py `_.
+
+As an example, let's compare a merge benchmark when using 2 GPUs connected with NVLink. First we'll run with standard TCP comms:
+
+::
+
+ python local_cudf_merge.py -d 0,1 -p tcp -c 50_000_000 --rmm-pool-size 30GB
+
+
+In the above, we used 2 GPUs (2 dask-cuda-workers), pre-allocated 30GB of GPU RAM (to make gpu memory allocations faster), and used TCP comms
+when Dask needed to move data back-and-forth between workers. This setup results in an average wall clock time of: ``19.72 s +/- 694.36 ms``::
+
+ ================================================================================
+ Wall clock | Throughput
+ --------------------------------------------------------------------------------
+ 20.09 s | 151.93 MiB/s
+ 20.33 s | 150.10 MiB/s
+ 18.75 s | 162.75 MiB/s
+ ================================================================================
+ Throughput | 154.73 MiB/s +/- 3.14 MiB/s
+ Bandwidth | 139.22 MiB/s +/- 2.98 MiB/s
+ Wall clock | 19.72 s +/- 694.36 ms
+ ================================================================================
+ (w1,w2) | 25% 50% 75% (total nbytes)
+ --------------------------------------------------------------------------------
+ (0,1) | 138.48 MiB/s 150.16 MiB/s 157.36 MiB/s (8.66 GiB)
+ (1,0) | 107.01 MiB/s 162.38 MiB/s 188.59 MiB/s (8.66 GiB)
+ ================================================================================
+ Worker index | Worker address
+ --------------------------------------------------------------------------------
+ 0 | tcp://127.0.0.1:44055
+ 1 | tcp://127.0.0.1:41095
+ ================================================================================
+
+
+To compare, we'll now change the ``procotol`` from ``tcp`` to ``ucx``:
+
+ python local_cudf_merge.py -d 0,1 -p ucx -c 50_000_000 --rmm-pool-size 30GB
+
+
+
+With UCX and NVLink, we greatly reduced the wall clock time to: ``347.43 ms +/- 5.41 ms``.::
+
+ ================================================================================
+ Wall clock | Throughput
+ --------------------------------------------------------------------------------
+ 354.87 ms | 8.40 GiB/s
+ 345.24 ms | 8.63 GiB/s
+ 342.18 ms | 8.71 GiB/s
+ ================================================================================
+ Throughput | 8.58 GiB/s +/- 78.96 MiB/s
+ Bandwidth | 6.98 GiB/s +/- 46.05 MiB/s
+ Wall clock | 347.43 ms +/- 5.41 ms
+ ================================================================================
+ (w1,w2) | 25% 50% 75% (total nbytes)
+ --------------------------------------------------------------------------------
+ (0,1) | 17.38 GiB/s 17.94 GiB/s 18.88 GiB/s (8.66 GiB)
+ (1,0) | 16.55 GiB/s 17.80 GiB/s 18.87 GiB/s (8.66 GiB)
+ ================================================================================
+ Worker index | Worker address
+ --------------------------------------------------------------------------------
+ 0 | ucx://127.0.0.1:35954
+ 1 | ucx://127.0.0.1:53584
+ ================================================================================
+
diff --git a/docs/source/examples/worker_count.rst b/docs/source/examples/worker_count.rst
index 29c6502c0..62954ffbe 100644
--- a/docs/source/examples/worker_count.rst
+++ b/docs/source/examples/worker_count.rst
@@ -1,3 +1,5 @@
+.. _controlling-number-of-workers:
+
Controlling number of workers
=============================
@@ -44,4 +46,4 @@ These UUIDs can then be passed to ``CUDA_VISIBLE_DEVICES`` in place of a GPU ind
.. code-block:: bash
$ CUDA_VISIBLE_DEVICES="GPU-dae76d0e-3414-958a-8f3e-fc6682b36f31" \
- > dask-cuda-worker 127.0.0.1:8786
+ > dask-cuda-worker 127.0.0.1:8786
diff --git a/docs/source/index.rst b/docs/source/index.rst
index efd7f62fb..a43f29079 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -39,5 +39,6 @@ Contents
:maxdepth: 1
:caption: Examples
+ examples/best-practices
examples/worker_count
examples/ucx
diff --git a/requirements.txt b/requirements.txt
index a384bfc2d..3d673a955 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,5 @@
-dask==2022.7.1
-distributed==2022.7.1
+dask==2022.9.2
+distributed==2022.9.2
pynvml>=11.0.0
numpy>=1.16.0
numba>=0.54