diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 4f1ac6c1b..8f50bdcec 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -73,6 +73,18 @@ .. note:: This size is a per-worker configuration, and not cluster-wide.""", ) +@click.option( + "--rmm-maximum-pool-size", + default=None, + help="""When ``--rmm-pool-size`` is specified, this argument indicates the maximum pool size. + Can be an integer (bytes), string (like ``"5GB"`` or ``"5000M"``) or ``None``. + By default, the total available memory on the GPU is used. + ``rmm_pool_size`` must be specified to use RMM pool and + to set the maximum pool size. + + .. note:: + This size is a per-worker configuration, and not cluster-wide.""", +) @click.option( "--rmm-managed-memory/--no-rmm-managed-memory", default=False, @@ -277,6 +289,7 @@ def main( memory_limit, device_memory_limit, rmm_pool_size, + rmm_maximum_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, @@ -327,6 +340,7 @@ def main( memory_limit, device_memory_limit, rmm_pool_size, + rmm_maximum_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 56a6dcdb6..9634c8229 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -58,6 +58,7 @@ def __init__( memory_limit="auto", device_memory_limit="auto", rmm_pool_size=None, + rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_log_directory=None, @@ -152,6 +153,9 @@ def del_pid_file(): ) if rmm_pool_size is not None: rmm_pool_size = parse_bytes(rmm_pool_size) + if rmm_maximum_pool_size is not None: + rmm_maximum_pool_size = parse_bytes(rmm_maximum_pool_size) + else: if enable_nvlink: warnings.warn( @@ -239,7 +243,11 @@ def del_pid_file(): get_cpu_affinity(nvml_device_index(i, cuda_visible_devices(i))) ), RMMSetup( - rmm_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, + rmm_pool_size, + rmm_maximum_pool_size, + rmm_managed_memory, + rmm_async, + rmm_log_directory, ), }, name=name if nprocs == 1 or name is None else str(name) + "-" + str(i), diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 30d48f0d1..80fa225aa 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -117,6 +117,16 @@ class LocalCUDACluster(LocalCluster): RMM pool size to initialize each worker with. Can be an integer (bytes), string (like ``"5GB"`` or ``"5000M"``), or ``None`` to disable RMM pools. + .. note:: + This size is a per-worker configuration, and not cluster-wide. + rmm_maximum_pool_size : int, str or None, default None + When ``rmm_pool_size`` is set, this argument indicates + the maximum pool size. + Can be an integer (bytes), string (like ``"5GB"`` or ``"5000M"``) or ``None``. + By default, the total available memory on the GPU is used. + ``rmm_pool_size`` must be specified to use RMM pool and + to set the maximum pool size. + .. note:: This size is a per-worker configuration, and not cluster-wide. rmm_managed_memory : bool, default False @@ -196,6 +206,7 @@ def __init__( enable_rdmacm=None, ucx_net_devices=None, rmm_pool_size=None, + rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_log_directory=None, @@ -230,6 +241,7 @@ def __init__( ) self.rmm_pool_size = rmm_pool_size + self.rmm_maximum_pool_size = rmm_maximum_pool_size self.rmm_managed_memory = rmm_managed_memory self.rmm_async = rmm_async if rmm_pool_size is not None or rmm_managed_memory: @@ -248,6 +260,8 @@ def __init__( ) if self.rmm_pool_size is not None: self.rmm_pool_size = parse_bytes(self.rmm_pool_size) + if self.rmm_maximum_pool_size is not None: + self.rmm_maximum_pool_size = parse_bytes(self.rmm_maximum_pool_size) else: if enable_nvlink: warnings.warn( @@ -397,6 +411,7 @@ def new_worker_spec(self): ), RMMSetup( self.rmm_pool_size, + self.rmm_maximum_pool_size, self.rmm_managed_memory, self.rmm_async, self.rmm_log_directory, diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index edf73e4e4..17fe6d978 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -150,6 +150,13 @@ async def test_rmm_pool(): assert v is rmm.mr.PoolMemoryResource +@gen_test(timeout=20) +async def test_rmm_maximum_poolsize_without_poolsize_error(): + pytest.importorskip("rmm") + with pytest.raises(ValueError): + await LocalCUDACluster(rmm_maximum_pool_size="2GB", asynchronous=True) + + @gen_test(timeout=20) async def test_rmm_managed(): rmm = pytest.importorskip("rmm") diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 0d52c07ec..2a74bc698 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -45,8 +45,22 @@ def setup(self, worker=None): class RMMSetup: - def __init__(self, nbytes, managed_memory, async_alloc, log_directory): - self.nbytes = nbytes + def __init__( + self, + initial_pool_size, + maximum_pool_size, + managed_memory, + async_alloc, + log_directory, + ): + if initial_pool_size is None and maximum_pool_size is not None: + raise ValueError( + "`rmm_maximum_pool_size` was specified without specifying " + "`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool." + ) + + self.initial_pool_size = initial_pool_size + self.maximum_pool_size = maximum_pool_size self.managed_memory = managed_memory self.async_alloc = async_alloc self.logging = log_directory is not None @@ -63,15 +77,16 @@ def setup(self, worker=None): worker, self.logging, self.log_directory ) ) - elif self.nbytes is not None or self.managed_memory: + elif self.initial_pool_size is not None or self.managed_memory: import rmm - pool_allocator = False if self.nbytes is None else True + pool_allocator = False if self.initial_pool_size is None else True rmm.reinitialize( pool_allocator=pool_allocator, managed_memory=self.managed_memory, - initial_pool_size=self.nbytes, + initial_pool_size=self.initial_pool_size, + maximum_pool_size=self.maximum_pool_size, logging=self.logging, log_file_name=get_rmm_log_file_name( worker, self.logging, self.log_directory