diff --git a/dask_cuda/cli.py b/dask_cuda/cli.py index ba58fe3e..6a3518e0 100644 --- a/dask_cuda/cli.py +++ b/dask_cuda/cli.py @@ -101,6 +101,20 @@ def cuda(): total device memory), string (like ``"5GB"`` or ``"5000M"``), or ``"auto"`` or 0 to disable spilling to host (i.e. allow full device memory usage).""", ) +@click.option( + "--enable-cudf-spill/--disable-cudf-spill", + default=False, + show_default=True, + help="""Enable automatic cuDF spilling. WARNING: This should NOT be used with + JIT-Unspill.""", +) +@click.option( + "--cudf-spill-stats", + type=int, + default=0, + help="""Set the cuDF spilling statistics level. This option has no effect if + `--enable-cudf-spill` is not specified.""", +) @click.option( "--rmm-pool-size", default=None, @@ -330,6 +344,8 @@ def worker( name, memory_limit, device_memory_limit, + enable_cudf_spill, + cudf_spill_stats, rmm_pool_size, rmm_maximum_pool_size, rmm_managed_memory, @@ -402,6 +418,8 @@ def worker( name, memory_limit, device_memory_limit, + enable_cudf_spill, + cudf_spill_stats, rmm_pool_size, rmm_maximum_pool_size, rmm_managed_memory, diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index e25a7c14..b88c9bc9 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -20,7 +20,7 @@ from .device_host_file import DeviceHostFile from .initialize import initialize -from .plugins import CPUAffinity, PreImport, RMMSetup +from .plugins import CPUAffinity, CUDFSetup, PreImport, RMMSetup from .proxify_host_file import ProxifyHostFile from .utils import ( cuda_visible_devices, @@ -41,6 +41,8 @@ def __init__( name=None, memory_limit="auto", device_memory_limit="auto", + enable_cudf_spill=False, + cudf_spill_stats=0, rmm_pool_size=None, rmm_maximum_pool_size=None, rmm_managed_memory=False, @@ -166,6 +168,12 @@ def del_pid_file(): if device_memory_limit is None and memory_limit is None: data = lambda _: {} elif jit_unspill: + if enable_cudf_spill: + warnings.warn( + "Enabling cuDF spilling and JIT-Unspill together is not " + "safe, consider disabling JIT-Unspill." + ) + data = lambda i: ( ProxifyHostFile, { @@ -217,6 +225,7 @@ def del_pid_file(): track_allocations=rmm_track_allocations, ), PreImport(pre_import), + CUDFSetup(spill=enable_cudf_spill, spill_stats=cudf_spill_stats), }, name=name if nprocs == 1 or name is None else str(name) + "-" + str(i), local_directory=local_directory, diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 1b81c770..202373e9 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -10,7 +10,7 @@ from .device_host_file import DeviceHostFile from .initialize import initialize -from .plugins import CPUAffinity, PreImport, RMMSetup +from .plugins import CPUAffinity, CUDFSetup, PreImport, RMMSetup from .proxify_host_file import ProxifyHostFile from .utils import ( cuda_visible_devices, @@ -73,6 +73,14 @@ class LocalCUDACluster(LocalCluster): starts spilling to host memory. Can be an integer (bytes), float (fraction of total device memory), string (like ``"5GB"`` or ``"5000M"``), or ``"auto"``, 0, or ``None`` to disable spilling to host (i.e. allow full device memory usage). + enable_cudf_spill : bool, default False + Enable automatic cuDF spilling. + + .. warning:: + This should NOT be used together with JIT-Unspill. + cudf_spill_stats : int, default 0 + Set the cuDF spilling statistics level. This option has no effect if + ``enable_cudf_spill=False``. local_directory : str or None, default None Path on local machine to store temporary files. Can be a string (like ``"path/to/files"``) or ``None`` to fall back on the value of @@ -209,6 +217,8 @@ def __init__( threads_per_worker=1, memory_limit="auto", device_memory_limit=0.8, + enable_cudf_spill=False, + cudf_spill_stats=0, data=None, local_directory=None, shared_filesystem=None, @@ -259,6 +269,8 @@ def __init__( self.device_memory_limit = parse_device_memory_limit( device_memory_limit, device_index=nvml_device_index(0, CUDA_VISIBLE_DEVICES) ) + self.enable_cudf_spill = enable_cudf_spill + self.cudf_spill_stats = cudf_spill_stats self.rmm_pool_size = rmm_pool_size self.rmm_maximum_pool_size = rmm_maximum_pool_size @@ -302,6 +314,12 @@ def __init__( if device_memory_limit is None and memory_limit is None: data = {} elif jit_unspill: + if enable_cudf_spill: + warnings.warn( + "Enabling cuDF spilling and JIT-Unspill together is not " + "safe, consider disabling JIT-Unspill." + ) + data = ( ProxifyHostFile, { @@ -414,6 +432,7 @@ def new_worker_spec(self): track_allocations=self.rmm_track_allocations, ), PreImport(self.pre_import), + CUDFSetup(self.enable_cudf_spill, self.cudf_spill_stats), }, } ) diff --git a/dask_cuda/plugins.py b/dask_cuda/plugins.py index 4eba97f2..122f93ff 100644 --- a/dask_cuda/plugins.py +++ b/dask_cuda/plugins.py @@ -14,6 +14,21 @@ def setup(self, worker=None): os.sched_setaffinity(0, self.cores) +class CUDFSetup(WorkerPlugin): + def __init__(self, spill, spill_stats): + self.spill = spill + self.spill_stats = spill_stats + + def setup(self, worker=None): + try: + import cudf + + cudf.set_option("spill", self.spill) + cudf.set_option("spill_stats", self.spill_stats) + except ImportError: + pass + + class RMMSetup(WorkerPlugin): def __init__( self, diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 974ad131..505af12f 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -231,6 +231,64 @@ def test_rmm_logging(loop): # noqa: F811 assert v is rmm.mr.LoggingResourceAdaptor +def test_cudf_spill_disabled(loop): # noqa: F811 + cudf = pytest.importorskip("cudf") + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): + with popen( + [ + "dask", + "cuda", + "worker", + "127.0.0.1:9369", + "--host", + "127.0.0.1", + "--no-dashboard", + ] + ): + with Client("127.0.0.1:9369", loop=loop) as client: + assert wait_workers(client, n_gpus=get_n_gpus()) + + cudf_spill = client.run( + cudf.get_option, + "spill", + ) + for v in cudf_spill.values(): + assert v is False + + cudf_spill_stats = client.run(cudf.get_option, "spill_stats") + for v in cudf_spill_stats.values(): + assert v == 0 + + +def test_cudf_spill(loop): # noqa: F811 + cudf = pytest.importorskip("cudf") + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): + with popen( + [ + "dask", + "cuda", + "worker", + "127.0.0.1:9369", + "--host", + "127.0.0.1", + "--no-dashboard", + "--enable-cudf-spill", + "--cudf-spill-stats", + "2", + ] + ): + with Client("127.0.0.1:9369", loop=loop) as client: + assert wait_workers(client, n_gpus=get_n_gpus()) + + cudf_spill = client.run(cudf.get_option, "spill") + for v in cudf_spill.values(): + assert v is True + + cudf_spill_stats = client.run(cudf.get_option, "spill_stats") + for v in cudf_spill_stats.values(): + assert v == 2 + + @patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0"}) def test_dashboard_address(loop): # noqa: F811 with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index b05389e4..b144d111 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -500,6 +500,54 @@ async def test_worker_fraction_limits(): ) +@gen_test(timeout=20) +async def test_cudf_spill_disabled(): + cudf = pytest.importorskip("cudf") + + async with LocalCUDACluster( + asynchronous=True, + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + cudf_spill = await client.run( + cudf.get_option, + "spill", + ) + for v in cudf_spill.values(): + assert v is False + + cudf_spill_stats = await client.run( + cudf.get_option, + "spill_stats", + ) + for v in cudf_spill_stats.values(): + assert v == 0 + + +@gen_test(timeout=20) +async def test_cudf_spill(): + cudf = pytest.importorskip("cudf") + + async with LocalCUDACluster( + enable_cudf_spill=True, + cudf_spill_stats=2, + asynchronous=True, + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + cudf_spill = await client.run( + cudf.get_option, + "spill", + ) + for v in cudf_spill.values(): + assert v is True + + cudf_spill_stats = await client.run( + cudf.get_option, + "spill_stats", + ) + for v in cudf_spill_stats.values(): + assert v == 2 + + @pytest.mark.parametrize( "protocol", ["ucx", "ucxx"],