Skip to content

Commit

Permalink
Allow disabling RMM in benchmarks
Browse files Browse the repository at this point in the history
Allows disabling RMM in benchmarks via a new option `--disable-rmm`.
This change makes benchmarks a little more similar to RMM setup in
`LocalCUDACluster`/`dask cuda worker`, where not specifying
`rmm-pool-size` or specifying `None` as its value entirely disables
setting up RMM as the default allocator. Since for benchmarks it's
desired that the default is having an RMM pool we cannot change the
default `--rmm-pool-size` to `None` as that would make benchmarks run
much slower by default, therefore `--disable-rmm` is the closest we can
make this to the rest of Dask-CUDA.

Additionally add `--rmm-maximum-pool-size` for benchmarks.
  • Loading branch information
pentschev committed Jun 25, 2024
1 parent 7363b0c commit 73388d7
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 40 deletions.
53 changes: 53 additions & 0 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,59 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm-pool \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
--rmm-async \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
--rmm-managed \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

rapids-logger "Run local benchmark (legacy dd)"
DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
Expand Down
22 changes: 12 additions & 10 deletions dask_cuda/benchmarks/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,18 @@ def run(client: Client, args: Namespace, config: Config):
wait_for_cluster(client, shutdown_on_failure=True)
assert len(client.scheduler_info()["workers"]) > 0
setup_memory_pools(
client,
args.type == "gpu",
args.rmm_pool_size,
args.disable_rmm_pool,
args.enable_rmm_async,
args.enable_rmm_managed,
args.rmm_release_threshold,
args.rmm_log_directory,
args.enable_rmm_statistics,
args.enable_rmm_track_allocations,
client=client,
is_gpu=args.type == "gpu",
disable_rmm=args.disable_rmm,
disable_rmm_pool=args.disable_rmm_pool,
pool_size=args.rmm_pool_size,
maximum_pool_size=args.rmm_maximum_pool_size,
rmm_async=args.enable_rmm_async,
rmm_managed=args.enable_rmm_managed,
release_threshold=args.rmm_release_threshold,
log_directory=args.rmm_log_directory,
statistics=args.enable_rmm_statistics,
rmm_track_allocations=args.enable_rmm_track_allocations,
)
address_to_index, results, message_data = gather_bench_results(client, args, config)
p2p_bw = peer_to_peer_bandwidths(message_data, address_to_index)
Expand Down
127 changes: 97 additions & 30 deletions dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from distributed.comm.addressing import get_address_host

from dask_cuda.local_cuda_cluster import LocalCUDACluster
from dask_cuda.utils import parse_device_memory_limit


def as_noop(dsk):
Expand Down Expand Up @@ -93,15 +94,37 @@ def parse_benchmark_args(
"'forkserver' can be used to avoid issues with fork not being allowed "
"after the networking stack has been initialised.",
)
cluster_args.add_argument(
"--disable-rmm",
action="store_true",
help="Disable RMM.",
)
cluster_args.add_argument(
"--disable-rmm-pool",
action="store_true",
help="Uses RMM for allocations but without a memory pool.",
)
cluster_args.add_argument(
"--rmm-pool-size",
default=None,
type=parse_bytes,
help="The size of the RMM memory pool. Can be an integer (bytes) or a string "
"(like '4GB' or '5000M'). By default, 1/2 of the total GPU memory is used.",
"(like '4GB' or '5000M'). By default, 1/2 of the total GPU memory is used."
""
".. note::"
" This size is a per-worker configuration, and not cluster-wide.",
)
cluster_args.add_argument(
"--disable-rmm-pool", action="store_true", help="Disable the RMM memory pool"
"--rmm-maximum-pool-size",
default=None,
help="When ``--rmm-pool-size`` is specified, this argument indicates the "
"maximum pool size. Can be an integer (bytes), or a string (like '4GB' or "
"'5000M'). By default, the total available memory on the GPU is used. "
"``rmm_pool_size`` must be specified to use RMM pool and to set the maximum "
"pool size."
""
".. note::"
" This size is a per-worker configuration, and not cluster-wide.",
)
cluster_args.add_argument(
"--enable-rmm-managed",
Expand Down Expand Up @@ -407,56 +430,96 @@ def get_worker_device():
return -1


def setup_rmm_resources(statistics=False, rmm_track_allocations=False):
import cupy

import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

cupy.cuda.set_allocator(rmm_cupy_allocator)
if statistics:
rmm.mr.set_current_device_resource(
rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource())
)
if rmm_track_allocations:
rmm.mr.set_current_device_resource(
rmm.mr.TrackingResourceAdaptor(rmm.mr.get_current_device_resource())
)


def setup_memory_pool(
dask_worker=None,
disable_rmm=None,
disable_rmm_pool=None,
pool_size=None,
disable_pool=False,
maximum_pool_size=None,
rmm_async=False,
rmm_managed=False,
release_threshold=None,
log_directory=None,
statistics=False,
rmm_track_allocations=False,
):
import cupy

import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

from dask_cuda.utils import get_rmm_log_file_name

logging = log_directory is not None

if rmm_async:
rmm.mr.set_current_device_resource(
rmm.mr.CudaAsyncMemoryResource(
initial_pool_size=pool_size, release_threshold=release_threshold
)
)
else:
rmm.reinitialize(
pool_allocator=not disable_pool,
managed_memory=rmm_managed,
initial_pool_size=pool_size,
logging=logging,
log_file_name=get_rmm_log_file_name(dask_worker, logging, log_directory),
)
cupy.cuda.set_allocator(rmm_cupy_allocator)
if statistics:
rmm.mr.set_current_device_resource(
rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource())
if pool_size is not None:
pool_size = parse_device_memory_limit(pool_size, alignment_size=256)

if maximum_pool_size is not None:
maximum_pool_size = parse_device_memory_limit(
maximum_pool_size, alignment_size=256
)
if rmm_track_allocations:
rmm.mr.set_current_device_resource(
rmm.mr.TrackingResourceAdaptor(rmm.mr.get_current_device_resource())

if release_threshold is not None:
release_threshold = parse_device_memory_limit(
release_threshold, alignment_size=256
)

if not disable_rmm:
if rmm_async:
mr = rmm.mr.CudaAsyncMemoryResource(
initial_pool_size=pool_size,
release_threshold=release_threshold,
)

if maximum_pool_size is not None:
mr = rmm.mr.LimitingResourceAdaptor(
mr, allocation_limit=maximum_pool_size
)

rmm.mr.set_current_device_resource(mr)

setup_rmm_resources(
statistics=statistics, rmm_track_allocations=rmm_track_allocations
)
else:
rmm.reinitialize(
pool_allocator=not disable_rmm_pool,
managed_memory=rmm_managed,
initial_pool_size=pool_size,
maximum_pool_size=maximum_pool_size,
logging=logging,
log_file_name=get_rmm_log_file_name(
dask_worker, logging, log_directory
),
)

setup_rmm_resources(
statistics=statistics, rmm_track_allocations=rmm_track_allocations
)


def setup_memory_pools(
client,
is_gpu,
disable_rmm,
disable_rmm_pool,
pool_size,
disable_pool,
maximum_pool_size,
rmm_async,
rmm_managed,
release_threshold,
Expand All @@ -468,8 +531,10 @@ def setup_memory_pools(
return
client.run(
setup_memory_pool,
disable_rmm=disable_rmm,
disable_rmm_pool=disable_rmm_pool,
pool_size=pool_size,
disable_pool=disable_pool,
maximum_pool_size=maximum_pool_size,
rmm_async=rmm_async,
rmm_managed=rmm_managed,
release_threshold=release_threshold,
Expand All @@ -482,7 +547,9 @@ def setup_memory_pools(
client.run_on_scheduler(
setup_memory_pool,
pool_size=1e9,
disable_pool=disable_pool,
disable_rmm=disable_rmm,
disable_rmm_pool=disable_rmm_pool,
maximum_pool_size=maximum_pool_size,
rmm_async=rmm_async,
rmm_managed=rmm_managed,
release_threshold=release_threshold,
Expand Down

0 comments on commit 73388d7

Please sign in to comment.