From 84bfc149a222194e9fedec542a3eb68eca88672a Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 26 Oct 2023 10:31:18 +0200 Subject: [PATCH] Move some `dask_cuda.utils` pieces to their own modules (#1263) Move some functions and classes into their own modules: - Move plugins to new `dask_cuda.plugins` module; - Move test utils to `dask_cuda.utils_test` module; - Move `IncreasedCloseTimeoutNanny` to `dask_cuda.utils_test` module, not anymore as a default to `LocalCUDACluster`. Additionally, pass `worker_class=IncreasedCloseTimeoutNanny` to tests that have failed in the past due to `Nanny`'s close timeout. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/dask-cuda/pull/1263 --- dask_cuda/cuda_worker.py | 4 +- dask_cuda/local_cuda_cluster.py | 29 ++--- dask_cuda/plugins.py | 122 ++++++++++++++++++ dask_cuda/tests/test_dask_cuda_worker.py | 6 +- dask_cuda/tests/test_explicit_comms.py | 2 +- dask_cuda/tests/test_initialize.py | 5 + dask_cuda/tests/test_local_cuda_cluster.py | 2 +- dask_cuda/tests/test_proxify_host_file.py | 6 +- dask_cuda/tests/test_proxy.py | 2 + dask_cuda/tests/test_spill.py | 3 + dask_cuda/utils.py | 140 +-------------------- dask_cuda/utils_test.py | 45 +++++++ dask_cuda/worker_spec.py | 3 +- 13 files changed, 204 insertions(+), 165 deletions(-) create mode 100644 dask_cuda/plugins.py create mode 100644 dask_cuda/utils_test.py diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 9dc2d56ce..e25a7c142 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -20,11 +20,9 @@ from .device_host_file import DeviceHostFile from .initialize import initialize +from .plugins import CPUAffinity, PreImport, RMMSetup from .proxify_host_file import ProxifyHostFile from .utils import ( - CPUAffinity, - PreImport, - RMMSetup, cuda_visible_devices, get_cpu_affinity, get_n_gpus, diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index ef15dcce3..d0ea92748 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -3,7 +3,6 @@ import os import warnings from functools import partial -from typing import Literal import dask from distributed import LocalCluster, Nanny, Worker @@ -11,11 +10,9 @@ from .device_host_file import DeviceHostFile from .initialize import initialize +from .plugins import CPUAffinity, PreImport, RMMSetup from .proxify_host_file import ProxifyHostFile from .utils import ( - CPUAffinity, - PreImport, - RMMSetup, cuda_visible_devices, get_cpu_affinity, get_ucx_config, @@ -25,13 +22,6 @@ ) -class IncreasedCloseTimeoutNanny(Nanny): - async def close( # type:ignore[override] - self, timeout: float = 10.0, reason: str = "nanny-close" - ) -> Literal["OK"]: - return await super().close(timeout=timeout, reason=reason) - - class LoggedWorker(Worker): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -41,7 +31,7 @@ async def start(self): self.data.set_address(self.address) -class LoggedNanny(IncreasedCloseTimeoutNanny): +class LoggedNanny(Nanny): def __init__(self, *args, **kwargs): super().__init__(*args, worker_class=LoggedWorker, **kwargs) @@ -342,10 +332,17 @@ def __init__( enable_rdmacm=enable_rdmacm, ) - worker_class = partial( - LoggedNanny if log_spilling is True else IncreasedCloseTimeoutNanny, - worker_class=worker_class, - ) + if worker_class is not None: + if log_spilling is True: + raise ValueError( + "Cannot enable `log_spilling` when `worker_class` is specified. If " + "logging is needed, ensure `worker_class` is a subclass of " + "`distributed.local_cuda_cluster.LoggedNanny` or a subclass of " + "`distributed.local_cuda_cluster.LoggedWorker`, and specify " + "`log_spilling=False`." + ) + if not issubclass(worker_class, Nanny): + worker_class = partial(Nanny, worker_class=worker_class) self.pre_import = pre_import diff --git a/dask_cuda/plugins.py b/dask_cuda/plugins.py new file mode 100644 index 000000000..4eba97f2b --- /dev/null +++ b/dask_cuda/plugins.py @@ -0,0 +1,122 @@ +import importlib +import os + +from distributed import WorkerPlugin + +from .utils import get_rmm_log_file_name, parse_device_memory_limit + + +class CPUAffinity(WorkerPlugin): + def __init__(self, cores): + self.cores = cores + + def setup(self, worker=None): + os.sched_setaffinity(0, self.cores) + + +class RMMSetup(WorkerPlugin): + def __init__( + self, + initial_pool_size, + maximum_pool_size, + managed_memory, + async_alloc, + release_threshold, + log_directory, + track_allocations, + ): + if initial_pool_size is None and maximum_pool_size is not None: + raise ValueError( + "`rmm_maximum_pool_size` was specified without specifying " + "`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool." + ) + if async_alloc is True: + if managed_memory is True: + raise ValueError( + "`rmm_managed_memory` is incompatible with the `rmm_async`." + ) + if async_alloc is False and release_threshold is not None: + raise ValueError("`rmm_release_threshold` requires `rmm_async`.") + + self.initial_pool_size = initial_pool_size + self.maximum_pool_size = maximum_pool_size + self.managed_memory = managed_memory + self.async_alloc = async_alloc + self.release_threshold = release_threshold + self.logging = log_directory is not None + self.log_directory = log_directory + self.rmm_track_allocations = track_allocations + + def setup(self, worker=None): + if self.initial_pool_size is not None: + self.initial_pool_size = parse_device_memory_limit( + self.initial_pool_size, alignment_size=256 + ) + + if self.async_alloc: + import rmm + + if self.release_threshold is not None: + self.release_threshold = parse_device_memory_limit( + self.release_threshold, alignment_size=256 + ) + + mr = rmm.mr.CudaAsyncMemoryResource( + initial_pool_size=self.initial_pool_size, + release_threshold=self.release_threshold, + ) + + if self.maximum_pool_size is not None: + self.maximum_pool_size = parse_device_memory_limit( + self.maximum_pool_size, alignment_size=256 + ) + mr = rmm.mr.LimitingResourceAdaptor( + mr, allocation_limit=self.maximum_pool_size + ) + + rmm.mr.set_current_device_resource(mr) + if self.logging: + rmm.enable_logging( + log_file_name=get_rmm_log_file_name( + worker, self.logging, self.log_directory + ) + ) + elif self.initial_pool_size is not None or self.managed_memory: + import rmm + + pool_allocator = False if self.initial_pool_size is None else True + + if self.initial_pool_size is not None: + if self.maximum_pool_size is not None: + self.maximum_pool_size = parse_device_memory_limit( + self.maximum_pool_size, alignment_size=256 + ) + + rmm.reinitialize( + pool_allocator=pool_allocator, + managed_memory=self.managed_memory, + initial_pool_size=self.initial_pool_size, + maximum_pool_size=self.maximum_pool_size, + logging=self.logging, + log_file_name=get_rmm_log_file_name( + worker, self.logging, self.log_directory + ), + ) + if self.rmm_track_allocations: + import rmm + + mr = rmm.mr.get_current_device_resource() + rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr)) + + +class PreImport(WorkerPlugin): + def __init__(self, libraries): + if libraries is None: + libraries = [] + elif isinstance(libraries, str): + libraries = libraries.split(",") + self.libraries = libraries + + def setup(self, worker=None): + for l in self.libraries: + importlib.import_module(l) diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 449fdba7e..974ad1319 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -40,7 +40,7 @@ def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 str(nthreads), "--no-dashboard", "--worker-class", - "dask_cuda.utils.MockWorker", + "dask_cuda.utils_test.MockWorker", ] ): with Client("127.0.0.1:9359", loop=loop) as client: @@ -329,7 +329,7 @@ def test_cuda_mig_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: str(nthreads), "--no-dashboard", "--worker-class", - "dask_cuda.utils.MockWorker", + "dask_cuda.utils_test.MockWorker", ] ): with Client("127.0.0.1:9359", loop=loop) as client: @@ -364,7 +364,7 @@ def test_cuda_visible_devices_uuid(loop): # noqa: F811 "127.0.0.1", "--no-dashboard", "--worker-class", - "dask_cuda.utils.MockWorker", + "dask_cuda.utils_test.MockWorker", ] ): with Client("127.0.0.1:9359", loop=loop) as client: diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index d9cd6dfb2..bd6770225 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -17,7 +17,7 @@ import dask_cuda from dask_cuda.explicit_comms import comms from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle -from dask_cuda.local_cuda_cluster import IncreasedCloseTimeoutNanny +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") diff --git a/dask_cuda/tests/test_initialize.py b/dask_cuda/tests/test_initialize.py index 60c7a798f..05b72f996 100644 --- a/dask_cuda/tests/test_initialize.py +++ b/dask_cuda/tests/test_initialize.py @@ -10,6 +10,7 @@ from dask_cuda.initialize import initialize from dask_cuda.utils import get_ucx_config +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") @@ -29,6 +30,7 @@ def _test_initialize_ucx_tcp(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: @@ -64,6 +66,7 @@ def _test_initialize_ucx_nvlink(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: @@ -100,6 +103,7 @@ def _test_initialize_ucx_infiniband(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: @@ -138,6 +142,7 @@ def _test_initialize_ucx_all(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config()}, ) as cluster: with Client(cluster) as client: diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 845759dfd..5d7762579 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -13,13 +13,13 @@ from dask_cuda import CUDAWorker, LocalCUDACluster, utils from dask_cuda.initialize import initialize from dask_cuda.utils import ( - MockWorker, get_cluster_configuration, get_device_total_memory, get_gpu_count_mig, get_gpu_uuid_from_index, print_cluster_config, ) +from dask_cuda.utils_test import MockWorker @gen_test(timeout=20) diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 2e3f8269d..191f62fe4 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -19,6 +19,7 @@ from dask_cuda.proxify_host_file import ProxifyHostFile from dask_cuda.proxy_object import ProxyObject, asproxy, unproxy from dask_cuda.utils import get_device_total_memory +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny cupy = pytest.importorskip("cupy") cupy.cuda.set_allocator(None) @@ -393,7 +394,10 @@ def is_proxy_object(x): with dask.config.set(jit_unspill_compatibility_mode=compatibility_mode): async with dask_cuda.LocalCUDACluster( - n_workers=1, jit_unspill=True, asynchronous=True + n_workers=1, + jit_unspill=True, + worker_class=IncreasedCloseTimeoutNanny, + asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: ddf = dask.dataframe.from_pandas( diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index c779a39ef..8de56a5c5 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -23,6 +23,7 @@ from dask_cuda.disk_io import SpillToDiskFile from dask_cuda.proxify_device_objects import proxify_device_objects from dask_cuda.proxify_host_file import ProxifyHostFile +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny # Make the "disk" serializer available and use a directory that are # remove on exit. @@ -422,6 +423,7 @@ def task(x): async with dask_cuda.LocalCUDACluster( n_workers=1, protocol=protocol, + worker_class=IncreasedCloseTimeoutNanny, asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 6172b0bc6..f8df7e04f 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -12,6 +12,7 @@ from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401 from dask_cuda import LocalCUDACluster, utils +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny if utils.get_device_total_memory() < 1e10: pytest.skip("Not enough GPU memory", allow_module_level=True) @@ -160,6 +161,7 @@ async def test_cupy_cluster_device_spill(params): asynchronous=True, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], + worker_class=IncreasedCloseTimeoutNanny, ) as cluster: async with Client(cluster, asynchronous=True) as client: @@ -263,6 +265,7 @@ async def test_cudf_cluster_device_spill(params): asynchronous=True, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], + worker_class=IncreasedCloseTimeoutNanny, ) as cluster: async with Client(cluster, asynchronous=True) as client: diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 1e244bb31..f16ad18a2 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -1,4 +1,3 @@ -import importlib import math import operator import os @@ -18,7 +17,7 @@ import distributed # noqa: required for dask.config.get("distributed.comm.ucx") from dask.config import canonical_name from dask.utils import format_bytes, parse_bytes -from distributed import Worker, WorkerPlugin, wait +from distributed import wait from distributed.comm import parse_address try: @@ -32,122 +31,6 @@ def nvtx_annotate(message=None, color="blue", domain=None): yield -class CPUAffinity(WorkerPlugin): - def __init__(self, cores): - self.cores = cores - - def setup(self, worker=None): - os.sched_setaffinity(0, self.cores) - - -class RMMSetup(WorkerPlugin): - def __init__( - self, - initial_pool_size, - maximum_pool_size, - managed_memory, - async_alloc, - release_threshold, - log_directory, - track_allocations, - ): - if initial_pool_size is None and maximum_pool_size is not None: - raise ValueError( - "`rmm_maximum_pool_size` was specified without specifying " - "`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool." - ) - if async_alloc is True: - if managed_memory is True: - raise ValueError( - "`rmm_managed_memory` is incompatible with the `rmm_async`." - ) - if async_alloc is False and release_threshold is not None: - raise ValueError("`rmm_release_threshold` requires `rmm_async`.") - - self.initial_pool_size = initial_pool_size - self.maximum_pool_size = maximum_pool_size - self.managed_memory = managed_memory - self.async_alloc = async_alloc - self.release_threshold = release_threshold - self.logging = log_directory is not None - self.log_directory = log_directory - self.rmm_track_allocations = track_allocations - - def setup(self, worker=None): - if self.initial_pool_size is not None: - self.initial_pool_size = parse_device_memory_limit( - self.initial_pool_size, alignment_size=256 - ) - - if self.async_alloc: - import rmm - - if self.release_threshold is not None: - self.release_threshold = parse_device_memory_limit( - self.release_threshold, alignment_size=256 - ) - - mr = rmm.mr.CudaAsyncMemoryResource( - initial_pool_size=self.initial_pool_size, - release_threshold=self.release_threshold, - ) - - if self.maximum_pool_size is not None: - self.maximum_pool_size = parse_device_memory_limit( - self.maximum_pool_size, alignment_size=256 - ) - mr = rmm.mr.LimitingResourceAdaptor( - mr, allocation_limit=self.maximum_pool_size - ) - - rmm.mr.set_current_device_resource(mr) - if self.logging: - rmm.enable_logging( - log_file_name=get_rmm_log_file_name( - worker, self.logging, self.log_directory - ) - ) - elif self.initial_pool_size is not None or self.managed_memory: - import rmm - - pool_allocator = False if self.initial_pool_size is None else True - - if self.initial_pool_size is not None: - if self.maximum_pool_size is not None: - self.maximum_pool_size = parse_device_memory_limit( - self.maximum_pool_size, alignment_size=256 - ) - - rmm.reinitialize( - pool_allocator=pool_allocator, - managed_memory=self.managed_memory, - initial_pool_size=self.initial_pool_size, - maximum_pool_size=self.maximum_pool_size, - logging=self.logging, - log_file_name=get_rmm_log_file_name( - worker, self.logging, self.log_directory - ), - ) - if self.rmm_track_allocations: - import rmm - - mr = rmm.mr.get_current_device_resource() - rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr)) - - -class PreImport(WorkerPlugin): - def __init__(self, libraries): - if libraries is None: - libraries = [] - elif isinstance(libraries, str): - libraries = libraries.split(",") - self.libraries = libraries - - def setup(self, worker=None): - for l in self.libraries: - importlib.import_module(l) - - def unpack_bitmask(x, mask_bits=64): """Unpack a list of integers containing bitmasks. @@ -669,27 +552,6 @@ def _align(size, alignment_size): return _align(int(device_memory_limit), alignment_size) -class MockWorker(Worker): - """Mock Worker class preventing NVML from getting used by SystemMonitor. - - By preventing the Worker from initializing NVML in the SystemMonitor, we can - mock test multiple devices in `CUDA_VISIBLE_DEVICES` behavior with single-GPU - machines. - """ - - def __init__(self, *args, **kwargs): - distributed.diagnostics.nvml.device_get_count = MockWorker.device_get_count - self._device_get_count = distributed.diagnostics.nvml.device_get_count - super().__init__(*args, **kwargs) - - def __del__(self): - distributed.diagnostics.nvml.device_get_count = self._device_get_count - - @staticmethod - def device_get_count(): - return 0 - - def get_gpu_uuid_from_index(device_index=0): """Get GPU UUID from CUDA device index. diff --git a/dask_cuda/utils_test.py b/dask_cuda/utils_test.py new file mode 100644 index 000000000..aba77ee79 --- /dev/null +++ b/dask_cuda/utils_test.py @@ -0,0 +1,45 @@ +from typing import Literal + +import distributed +from distributed import Nanny, Worker + + +class MockWorker(Worker): + """Mock Worker class preventing NVML from getting used by SystemMonitor. + + By preventing the Worker from initializing NVML in the SystemMonitor, we can + mock test multiple devices in `CUDA_VISIBLE_DEVICES` behavior with single-GPU + machines. + """ + + def __init__(self, *args, **kwargs): + distributed.diagnostics.nvml.device_get_count = MockWorker.device_get_count + self._device_get_count = distributed.diagnostics.nvml.device_get_count + super().__init__(*args, **kwargs) + + def __del__(self): + distributed.diagnostics.nvml.device_get_count = self._device_get_count + + @staticmethod + def device_get_count(): + return 0 + + +class IncreasedCloseTimeoutNanny(Nanny): + """Increase `Nanny`'s close timeout. + + The internal close timeout mechanism of `Nanny` recomputes the time left to kill + the `Worker` process based on elapsed time of the close task, which may leave + very little time for the subprocess to shutdown cleanly, which may cause tests + to fail when the system is under higher load. This class increases the default + close timeout of 5.0 seconds that `Nanny` sets by default, which can be overriden + via Distributed's public API. + + This class can be used with the `worker_class` argument of `LocalCluster` or + `LocalCUDACluster` to provide a much higher default of 30.0 seconds. + """ + + async def close( # type:ignore[override] + self, timeout: float = 30.0, reason: str = "nanny-close" + ) -> Literal["OK"]: + return await super().close(timeout=timeout, reason=reason) diff --git a/dask_cuda/worker_spec.py b/dask_cuda/worker_spec.py index 6a61fa8f8..84ce51725 100644 --- a/dask_cuda/worker_spec.py +++ b/dask_cuda/worker_spec.py @@ -5,7 +5,8 @@ from .initialize import initialize from .local_cuda_cluster import cuda_visible_devices -from .utils import CPUAffinity, get_cpu_affinity, get_gpu_count +from .plugins import CPUAffinity +from .utils import get_cpu_affinity, get_gpu_count def worker_spec(