Skip to content

Commit

Permalink
remove legacy support and update explicit-comms config support
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Dec 16, 2024
1 parent d70f28d commit 1af30be
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 134 deletions.
49 changes: 2 additions & 47 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ set_exit_code() {
trap set_exit_code ERR
set +e

rapids-logger "pytest dask-cuda (dask-expr)"
rapids-logger "pytest dask-cuda"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=True \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
Expand All @@ -65,67 +64,40 @@ timeout 90m pytest \
tests -k "not ucxx"
popd

rapids-logger "pytest explicit-comms (legacy dd)"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=False \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
UCX_WARN_UNUSED_ENV_VARS=n \
UCX_MEMTYPE_CACHE=n \
timeout 60m pytest \
-vv \
--durations=50 \
--capture=no \
--cache-clear \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \
--cov-config=../pyproject.toml \
--cov=dask_cuda \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \
--cov-report=term \
tests/test_explicit_comms.py -k "not ucxx"
popd

rapids-logger "Run local benchmark (dask-expr)"
DASK_DATAFRAME__QUERY_PLANNING=True \
rapids-logger "Run local benchmark"
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm-pool \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
Expand All @@ -134,7 +106,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
Expand All @@ -144,7 +115,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
Expand All @@ -154,20 +124,5 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

rapids-logger "Run local benchmark (legacy dd)"
DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

rapids-logger "Test script exiting with latest error code: $EXITCODE"
exit ${EXITCODE}
18 changes: 2 additions & 16 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,15 @@
from ._version import __git_commit__, __version__
from .cuda_worker import CUDAWorker
from .explicit_comms.dataframe.shuffle import (
get_rearrange_by_column_wrapper,
get_default_shuffle_method,
patch_shuffle_expression,
)
from .local_cuda_cluster import LocalCUDACluster
from .proxify_device_objects import proxify_decorator, unproxify_decorator


if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get(
"explicit-comms", False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the shuffle "
"API directly, or use the legacy dask-dataframe API "
"(set the 'dataframe.query-planning' config to `False`"
"before importing `dask.dataframe`).",
)


# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True`
dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper(
dask.dataframe.shuffle.rearrange_by_column
)
patch_shuffle_expression()
# We have to replace all modules that imports Dask's `get_default_shuffle_method()`
# TODO: introduce a shuffle-algorithm dispatcher in Dask so we don't need this hack
dask.dataframe.shuffle.get_default_shuffle_method = get_default_shuffle_method
Expand Down
1 change: 0 additions & 1 deletion dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ def parse_args():
return parse_benchmark_args(
description="Distributed shuffle (dask/cudf) benchmark",
args_list=special_args,
check_explicit_comms=False,
)


Expand Down
1 change: 0 additions & 1 deletion dask_cuda/benchmarks/read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ def parse_args():
args = parse_benchmark_args(
description="Parquet read benchmark",
args_list=special_args,
check_explicit_comms=False,
)
args.no_show_p2p_bandwidth = True
return args
Expand Down
20 changes: 0 additions & 20 deletions dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import numpy as np
import pandas as pd

from dask import config
from dask.distributed import Client, SSHCluster
from dask.utils import format_bytes, format_time, parse_bytes
from distributed.comm.addressing import get_address_host
Expand Down Expand Up @@ -52,7 +51,6 @@ def as_noop(dsk):
def parse_benchmark_args(
description="Generic dask-cuda Benchmark",
args_list=[],
check_explicit_comms=True,
):
parser = argparse.ArgumentParser(description=description)
worker_args = parser.add_argument_group(description="Worker configuration")
Expand Down Expand Up @@ -377,24 +375,6 @@ def parse_benchmark_args(
if args.multi_node and len(args.hosts.split(",")) < 2:
raise ValueError("--multi-node requires at least 2 hosts")

# Raise error early if "explicit-comms" is not allowed
if (
check_explicit_comms
and args.backend == "explicit-comms"
and config.get(
"dataframe.query-planning",
None,
)
is not False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the legacy "
"dask-dataframe API by setting the following environment "
"variable before executing:",
" DASK_DATAFRAME__QUERY_PLANNING=False",
)

return args


Expand Down
57 changes: 28 additions & 29 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import annotations

import asyncio
import functools
import inspect
from collections import defaultdict
from math import ceil
from operator import getitem
Expand Down Expand Up @@ -570,33 +568,6 @@ def _use_explicit_comms() -> bool:
return False


def get_rearrange_by_column_wrapper(func):
"""Returns a function wrapper that dispatch the shuffle to explicit-comms.
Notice, this is monkey patched into Dask at dask_cuda import
"""

func_sig = inspect.signature(func)

@functools.wraps(func)
def wrapper(*args, **kwargs):
if _use_explicit_comms():
# Convert `*args, **kwargs` to a dict of `keyword -> values`
kw = func_sig.bind(*args, **kwargs)
kw.apply_defaults()
kw = kw.arguments
# Notice, we only overwrite the default and the "tasks" shuffle
# algorithm. The "disk" and "p2p" algorithm, we don't touch.
if kw["shuffle_method"] in ("tasks", None):
col = kw["col"]
if isinstance(col, str):
col = [col]
return shuffle(kw["df"], col, kw["npartitions"], kw["ignore_index"])
return func(*args, **kwargs)

return wrapper


def get_default_shuffle_method() -> str:
"""Return the default shuffle algorithm used by Dask
Expand All @@ -607,3 +578,31 @@ def get_default_shuffle_method() -> str:
if ret is None and _use_explicit_comms():
return "tasks"
return dask.utils.get_default_shuffle_method()


def patch_shuffle_expression() -> None:
"""Patch Dasks Shuffle expression.
This changes ``Shuffle._lower`` to apply explicit-comms
shuffling when the 'explicit-comms' config is enabled.
"""
from dask_expr._collection import new_collection
from dask_expr._shuffle import Shuffle as DXShuffle

_base_lower = DXShuffle._lower

def _lower(self):
if self.method in ("tasks", None) and _use_explicit_comms():
on = self.partitioning_index
on = [on] if isinstance(on, str) else on
return shuffle(
new_collection(self.frame),
on,
self.npartitions_out,
self.ignore_index,
).expr
else:
# Use upstream lowering logic
return _base_lower(self)

DXShuffle._lower = _lower
13 changes: 0 additions & 13 deletions dask_cuda/tests/test_explicit_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,6 @@
mp = mp.get_context("spawn") # type: ignore
ucp = pytest.importorskip("ucp")

QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False

# Skip these tests when dask-expr is active (for now)
query_planning_skip = pytest.mark.skipif(
QUERY_PLANNING_ON,
reason=(
"The 'explicit-comms' config is not supported "
"when query planning is enabled."
),
)

# Set default shuffle method to "tasks"
if dask.config.get("dataframe.shuffle.method", None) is None:
Expand Down Expand Up @@ -98,7 +88,6 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions):
pd.testing.assert_frame_equal(got, expected)


@query_planning_skip
def test_dataframe_merge_empty_partitions():
# Notice, we use more partitions than rows
p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4))
Expand Down Expand Up @@ -267,7 +256,6 @@ def check_shuffle():
check_shuffle()


@query_planning_skip
@pytest.mark.parametrize("in_cluster", [True, False])
def test_dask_use_explicit_comms(in_cluster):
def _timeout(process, function, timeout):
Expand Down Expand Up @@ -330,7 +318,6 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers):
assert_eq(got, expected)


@query_planning_skip
@pytest.mark.parametrize("nworkers", [1, 2, 4])
@pytest.mark.parametrize("backend", ["pandas", "cudf"])
@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"])
Expand Down
7 changes: 0 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ filterwarnings = [
"error::FutureWarning",
# remove after https://github.com/rapidsai/dask-cuda/issues/1087 is closed
"ignore:There is no current event loop:DeprecationWarning:tornado",
# This warning must be filtered until dask-expr support
# is enabled in both dask-cudf and dask-cuda.
# See: https://github.com/rapidsai/dask-cuda/issues/1311
"ignore:Dask DataFrame implementation is deprecated:DeprecationWarning",
# Dask now loudly throws warnings: https://github.com/dask/dask/pull/11437
# When the legacy implementation is removed we can remove this warning and stop running pytests with `DASK_DATAFRAME__QUERY_PLANNING=False`
"ignore:The legacy Dask DataFrame implementation is deprecated and will be removed in a future version.*:FutureWarning",
]

[tool.rapids-build-backend]
Expand Down

0 comments on commit 1af30be

Please sign in to comment.