diff --git a/ci/test_python.sh b/ci/test_python.sh index 319efef2..fadd473e 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -44,9 +44,8 @@ set_exit_code() { trap set_exit_code ERR set +e -rapids-logger "pytest dask-cuda (dask-expr)" +rapids-logger "pytest dask-cuda" pushd dask_cuda -DASK_DATAFRAME__QUERY_PLANNING=True \ DASK_CUDA_TEST_SINGLE_GPU=1 \ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ UCXPY_IFNAME=eth0 \ @@ -65,43 +64,19 @@ timeout 90m pytest \ tests -k "not ucxx" popd -rapids-logger "pytest explicit-comms (legacy dd)" -pushd dask_cuda -DASK_DATAFRAME__QUERY_PLANNING=False \ -DASK_CUDA_TEST_SINGLE_GPU=1 \ -DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ -UCXPY_IFNAME=eth0 \ -UCX_WARN_UNUSED_ENV_VARS=n \ -UCX_MEMTYPE_CACHE=n \ -timeout 60m pytest \ - -vv \ - --durations=50 \ - --capture=no \ - --cache-clear \ - --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \ - --cov-config=../pyproject.toml \ - --cov=dask_cuda \ - --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \ - --cov-report=term \ - tests/test_explicit_comms.py -k "not ucxx" -popd - -rapids-logger "Run local benchmark (dask-expr)" -DASK_DATAFRAME__QUERY_PLANNING=True \ +rapids-logger "Run local benchmark" python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend dask -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --disable-rmm \ --partition-size="1 KiB" \ @@ -109,7 +84,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --disable-rmm-pool \ --partition-size="1 KiB" \ @@ -117,7 +91,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --partition-size="1 KiB" \ @@ -125,7 +98,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --rmm-maximum-pool-size 4GiB \ @@ -134,7 +106,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --rmm-maximum-pool-size 4GiB \ @@ -144,7 +115,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --rmm-maximum-pool-size 4GiB \ @@ -154,20 +124,5 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -rapids-logger "Run local benchmark (legacy dd)" -DASK_DATAFRAME__QUERY_PLANNING=False \ -python dask_cuda/benchmarks/local_cudf_shuffle.py \ - --partition-size="1 KiB" \ - -d 0 \ - --runs 1 \ - --backend dask - -DASK_DATAFRAME__QUERY_PLANNING=False \ -python dask_cuda/benchmarks/local_cudf_shuffle.py \ - --partition-size="1 KiB" \ - -d 0 \ - --runs 1 \ - --backend explicit-comms - rapids-logger "Test script exiting with latest error code: $EXITCODE" exit ${EXITCODE} diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 5711ac08..d9a775ff 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -15,29 +15,15 @@ from ._version import __git_commit__, __version__ from .cuda_worker import CUDAWorker from .explicit_comms.dataframe.shuffle import ( - get_rearrange_by_column_wrapper, get_default_shuffle_method, + patch_shuffle_expression, ) from .local_cuda_cluster import LocalCUDACluster from .proxify_device_objects import proxify_decorator, unproxify_decorator -if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get( - "explicit-comms", False -): - raise NotImplementedError( - "The 'explicit-comms' config is not yet supported when " - "query-planning is enabled in dask. Please use the shuffle " - "API directly, or use the legacy dask-dataframe API " - "(set the 'dataframe.query-planning' config to `False`" - "before importing `dask.dataframe`).", - ) - - # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` -dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( - dask.dataframe.shuffle.rearrange_by_column -) +patch_shuffle_expression() # We have to replace all modules that imports Dask's `get_default_shuffle_method()` # TODO: introduce a shuffle-algorithm dispatcher in Dask so we don't need this hack dask.dataframe.shuffle.get_default_shuffle_method = get_default_shuffle_method diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index 3a0955c4..25c47fd8 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -246,7 +246,6 @@ def parse_args(): return parse_benchmark_args( description="Distributed shuffle (dask/cudf) benchmark", args_list=special_args, - check_explicit_comms=False, ) diff --git a/dask_cuda/benchmarks/read_parquet.py b/dask_cuda/benchmarks/read_parquet.py index bce69673..4b34fd26 100644 --- a/dask_cuda/benchmarks/read_parquet.py +++ b/dask_cuda/benchmarks/read_parquet.py @@ -251,7 +251,6 @@ def parse_args(): args = parse_benchmark_args( description="Parquet read benchmark", args_list=special_args, - check_explicit_comms=False, ) args.no_show_p2p_bandwidth = True return args diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 4f87a025..84557f05 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -11,7 +11,6 @@ import numpy as np import pandas as pd -from dask import config from dask.distributed import Client, SSHCluster from dask.utils import format_bytes, format_time, parse_bytes from distributed.comm.addressing import get_address_host @@ -52,7 +51,6 @@ def as_noop(dsk): def parse_benchmark_args( description="Generic dask-cuda Benchmark", args_list=[], - check_explicit_comms=True, ): parser = argparse.ArgumentParser(description=description) worker_args = parser.add_argument_group(description="Worker configuration") @@ -377,24 +375,6 @@ def parse_benchmark_args( if args.multi_node and len(args.hosts.split(",")) < 2: raise ValueError("--multi-node requires at least 2 hosts") - # Raise error early if "explicit-comms" is not allowed - if ( - check_explicit_comms - and args.backend == "explicit-comms" - and config.get( - "dataframe.query-planning", - None, - ) - is not False - ): - raise NotImplementedError( - "The 'explicit-comms' config is not yet supported when " - "query-planning is enabled in dask. Please use the legacy " - "dask-dataframe API by setting the following environment " - "variable before executing:", - " DASK_DATAFRAME__QUERY_PLANNING=False", - ) - return args diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 70f12335..ba596dd0 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -1,8 +1,6 @@ from __future__ import annotations import asyncio -import functools -import inspect from collections import defaultdict from math import ceil from operator import getitem @@ -570,33 +568,6 @@ def _use_explicit_comms() -> bool: return False -def get_rearrange_by_column_wrapper(func): - """Returns a function wrapper that dispatch the shuffle to explicit-comms. - - Notice, this is monkey patched into Dask at dask_cuda import - """ - - func_sig = inspect.signature(func) - - @functools.wraps(func) - def wrapper(*args, **kwargs): - if _use_explicit_comms(): - # Convert `*args, **kwargs` to a dict of `keyword -> values` - kw = func_sig.bind(*args, **kwargs) - kw.apply_defaults() - kw = kw.arguments - # Notice, we only overwrite the default and the "tasks" shuffle - # algorithm. The "disk" and "p2p" algorithm, we don't touch. - if kw["shuffle_method"] in ("tasks", None): - col = kw["col"] - if isinstance(col, str): - col = [col] - return shuffle(kw["df"], col, kw["npartitions"], kw["ignore_index"]) - return func(*args, **kwargs) - - return wrapper - - def get_default_shuffle_method() -> str: """Return the default shuffle algorithm used by Dask @@ -607,3 +578,31 @@ def get_default_shuffle_method() -> str: if ret is None and _use_explicit_comms(): return "tasks" return dask.utils.get_default_shuffle_method() + + +def patch_shuffle_expression() -> None: + """Patch Dasks Shuffle expression. + + This changes ``Shuffle._lower`` to apply explicit-comms + shuffling when the 'explicit-comms' config is enabled. + """ + from dask_expr._collection import new_collection + from dask_expr._shuffle import Shuffle as DXShuffle + + _base_lower = DXShuffle._lower + + def _lower(self): + if self.method in ("tasks", None) and _use_explicit_comms(): + on = self.partitioning_index + on = [on] if isinstance(on, str) else on + return shuffle( + new_collection(self.frame), + on, + self.npartitions_out, + self.ignore_index, + ).expr + else: + # Use upstream lowering logic + return _base_lower(self) + + DXShuffle._lower = _lower diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 2806dc1c..96ff91e9 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -25,16 +25,6 @@ mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") -QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False - -# Skip these tests when dask-expr is active (for now) -query_planning_skip = pytest.mark.skipif( - QUERY_PLANNING_ON, - reason=( - "The 'explicit-comms' config is not supported " - "when query planning is enabled." - ), -) # Set default shuffle method to "tasks" if dask.config.get("dataframe.shuffle.method", None) is None: @@ -98,7 +88,6 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions): pd.testing.assert_frame_equal(got, expected) -@query_planning_skip def test_dataframe_merge_empty_partitions(): # Notice, we use more partitions than rows p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4)) @@ -267,7 +256,6 @@ def check_shuffle(): check_shuffle() -@query_planning_skip @pytest.mark.parametrize("in_cluster", [True, False]) def test_dask_use_explicit_comms(in_cluster): def _timeout(process, function, timeout): @@ -330,7 +318,6 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): assert_eq(got, expected) -@query_planning_skip @pytest.mark.parametrize("nworkers", [1, 2, 4]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) @pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) diff --git a/pyproject.toml b/pyproject.toml index 01c8d956..cfe5397c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,13 +124,6 @@ filterwarnings = [ "error::FutureWarning", # remove after https://github.com/rapidsai/dask-cuda/issues/1087 is closed "ignore:There is no current event loop:DeprecationWarning:tornado", - # This warning must be filtered until dask-expr support - # is enabled in both dask-cudf and dask-cuda. - # See: https://github.com/rapidsai/dask-cuda/issues/1311 - "ignore:Dask DataFrame implementation is deprecated:DeprecationWarning", - # Dask now loudly throws warnings: https://github.com/dask/dask/pull/11437 - # When the legacy implementation is removed we can remove this warning and stop running pytests with `DASK_DATAFRAME__QUERY_PLANNING=False` - "ignore:The legacy Dask DataFrame implementation is deprecated and will be removed in a future version.*:FutureWarning", ] [tool.rapids-build-backend]