From 58e4b95c4af05772886957eb1d686edaf431dba5 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Wed, 3 Apr 2024 18:11:02 -0500 Subject: [PATCH] Update explicit-comms for dask-expr support (#1323) Makes a few ~small~ changes to explicit-comms to support dask-expr. EDIT: The changes are no longer "small". Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) - Jake Awe (https://github.com/AyodeAwe) URL: https://github.com/rapidsai/dask-cuda/pull/1323 --- ci/test_python.sh | 43 ++++++++++++++++- dask_cuda/__init__.py | 12 +++++ dask_cuda/benchmarks/local_cudf_merge.py | 47 +++++++++++-------- dask_cuda/benchmarks/local_cudf_shuffle.py | 26 ++++++---- dask_cuda/benchmarks/utils.py | 25 +++++++++- dask_cuda/explicit_comms/dataframe/shuffle.py | 32 ++++++++----- dask_cuda/tests/test_explicit_comms.py | 22 +++++++-- 7 files changed, 156 insertions(+), 51 deletions(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index aed602505..b52cbb6d4 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -42,8 +42,9 @@ set_exit_code() { trap set_exit_code ERR set +e -rapids-logger "pytest dask-cuda" +rapids-logger "pytest dask-cuda (dask-expr)" pushd dask_cuda +DASK_DATAFRAME__QUERY_PLANNING=True \ DASK_CUDA_TEST_SINGLE_GPU=1 \ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ UCXPY_IFNAME=eth0 \ @@ -62,13 +63,51 @@ timeout 60m pytest \ tests -k "not ucxx" popd -rapids-logger "Run local benchmark" +rapids-logger "pytest explicit-comms (legacy dd)" +pushd dask_cuda +DASK_DATAFRAME__QUERY_PLANNING=False \ +DASK_CUDA_TEST_SINGLE_GPU=1 \ +DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ +UCXPY_IFNAME=eth0 \ +UCX_WARN_UNUSED_ENV_VARS=n \ +UCX_MEMTYPE_CACHE=n \ +timeout 30m pytest \ + -vv \ + --durations=0 \ + --capture=no \ + --cache-clear \ + --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \ + --cov-config=../pyproject.toml \ + --cov=dask_cuda \ + --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \ + --cov-report=term \ + tests/test_explicit_comms.py -k "not ucxx" +popd + +rapids-logger "Run local benchmark (dask-expr)" +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend dask + +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms + +rapids-logger "Run local benchmark (legacy dd)" +DASK_DATAFRAME__QUERY_PLANNING=False \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend dask +DASK_DATAFRAME__QUERY_PLANNING=False \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 30f987ac4..516599da3 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -20,6 +20,18 @@ from .proxify_device_objects import proxify_decorator, unproxify_decorator +if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get( + "explicit-comms", False +): + raise NotImplementedError( + "The 'explicit-comms' config is not yet supported when " + "query-planning is enabled in dask. Please use the shuffle " + "API directly, or use the legacy dask-dataframe API " + "(set the 'dataframe.query-planning' config to `False`" + "before importing `dask.dataframe`).", + ) + + # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( dask.dataframe.shuffle.rearrange_by_column diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index ba3a9d56d..6a68ad788 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -7,8 +7,7 @@ import pandas as pd import dask -from dask.base import tokenize -from dask.dataframe.core import new_dd_object +import dask.dataframe as dd from dask.distributed import performance_report, wait from dask.utils import format_bytes, parse_bytes @@ -25,12 +24,20 @@ # -def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): +# Set default shuffle method to "tasks" +if dask.config.get("dataframe.shuffle.method", None) is None: + dask.config.set({"dataframe.shuffle.method": "tasks"}) + + +def generate_chunk(input): + i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input + # Setting a seed that triggers max amount of comm in the two-GPU case. if gpu: import cupy as xp import cudf as xdf + import dask_cudf # noqa: F401 else: import numpy as xp import pandas as xdf @@ -105,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): parts = [chunk_size for _ in range(num_chunks)] device_type = True if args.type == "gpu" else False - meta = generate_chunk(0, 4, 1, chunk_type, None, device_type) + meta = generate_chunk((0, 4, 1, chunk_type, None, device_type)) divisions = [None] * (len(parts) + 1) - name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type) - - graph = { - (name, i): ( - generate_chunk, - i, - part, - len(parts), - chunk_type, - frac_match, - device_type, - ) - for i, part in enumerate(parts) - } - - ddf = new_dd_object(graph, name, meta, divisions) + ddf = dd.from_map( + generate_chunk, + [ + ( + i, + part, + len(parts), + chunk_type, + frac_match, + device_type, + ) + for i, part in enumerate(parts) + ], + meta=meta, + divisions=divisions, + ) if chunk_type == "build": if not args.no_shuffle: diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index a3492b664..a1129dd37 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -8,8 +8,6 @@ import dask import dask.dataframe -from dask.dataframe.core import new_dd_object -from dask.dataframe.shuffle import shuffle from dask.distributed import Client, performance_report, wait from dask.utils import format_bytes, parse_bytes @@ -33,7 +31,7 @@ def shuffle_dask(df, args): - result = shuffle(df, index="data", shuffle="tasks", ignore_index=args.ignore_index) + result = df.shuffle("data", shuffle_method="tasks", ignore_index=args.ignore_index) if args.backend == "dask-noop": result = as_noop(result) t1 = perf_counter() @@ -94,18 +92,24 @@ def create_data( ) # Create partition based to the specified partition distribution - dsk = {} + futures = [] for i, part_size in enumerate(dist): for _ in range(part_size): # We use `client.submit` to control placement of the partition. - dsk[(name, len(dsk))] = client.submit( - create_df, chunksize, args.type, workers=[workers[i]], pure=False + futures.append( + client.submit( + create_df, chunksize, args.type, workers=[workers[i]], pure=False + ) ) - wait(dsk.values()) + wait(futures) df_meta = create_df(0, args.type) - divs = [None] * (len(dsk) + 1) - ret = new_dd_object(dsk, name, df_meta, divs).persist() + divs = [None] * (len(futures) + 1) + ret = dask.dataframe.from_delayed( + futures, + meta=df_meta, + divisions=divs, + ).persist() wait(ret) data_processed = args.in_parts * args.partition_size @@ -254,7 +258,9 @@ def parse_args(): ] return parse_benchmark_args( - description="Distributed shuffle (dask/cudf) benchmark", args_list=special_args + description="Distributed shuffle (dask/cudf) benchmark", + args_list=special_args, + check_explicit_comms=False, ) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 51fae7201..5ac79a88d 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -11,6 +11,7 @@ import numpy as np import pandas as pd +from dask import config from dask.distributed import Client, SSHCluster from dask.utils import format_bytes, format_time, parse_bytes from distributed.comm.addressing import get_address_host @@ -47,7 +48,11 @@ def as_noop(dsk): raise RuntimeError("Requested noop computation but dask-noop not installed.") -def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]): +def parse_benchmark_args( + description="Generic dask-cuda Benchmark", + args_list=[], + check_explicit_comms=True, +): parser = argparse.ArgumentParser(description=description) worker_args = parser.add_argument_group(description="Worker configuration") worker_args.add_argument( @@ -317,6 +322,24 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] if args.multi_node and len(args.hosts.split(",")) < 2: raise ValueError("--multi-node requires at least 2 hosts") + # Raise error early if "explicit-comms" is not allowed + if ( + check_explicit_comms + and args.backend == "explicit-comms" + and config.get( + "dataframe.query-planning", + None, + ) + is not False + ): + raise NotImplementedError( + "The 'explicit-comms' config is not yet supported when " + "query-planning is enabled in dask. Please use the legacy " + "dask-dataframe API by setting the following environment " + "variable before executing:", + " DASK_DATAFRAME__QUERY_PLANNING=False", + ) + return args diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index ca69156dd..3f7b79514 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -11,10 +11,12 @@ import dask import dask.config import dask.dataframe +import dask.dataframe as dd import dask.utils import distributed.worker from dask.base import tokenize -from dask.dataframe.core import DataFrame, Series, _concat as dd_concat, new_dd_object +from dask.dataframe import DataFrame, Series +from dask.dataframe.core import _concat as dd_concat from dask.dataframe.shuffle import group_split_dispatch, hash_object_dispatch from distributed import wait from distributed.protocol import nested_deserialize, to_serialize @@ -468,18 +470,19 @@ def shuffle( npartitions = df.npartitions # Step (a): - df = df.persist() # Make sure optimizations are apply on the existing graph + df = df.persist() # Make sure optimizations are applied on the existing graph wait([df]) # Make sure all keys has been materialized on workers + persisted_keys = [f.key for f in c.client.futures_of(df)] name = ( "explicit-comms-shuffle-" - f"{tokenize(df, column_names, npartitions, ignore_index)}" + f"{tokenize(df, column_names, npartitions, ignore_index, batchsize)}" ) df_meta: DataFrame = df._meta # Stage all keys of `df` on the workers and cancel them, which makes it possible # for the shuffle to free memory as the partitions of `df` are consumed. # See CommsContext.stage_keys() for a description of staging. - rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__()) + rank_to_inkeys = c.stage_keys(name=name, keys=persisted_keys) c.client.cancel(df) # Get batchsize @@ -526,23 +529,26 @@ def shuffle( # TODO: can we do this without using `submit()` to avoid the overhead # of creating a Future for each dataframe partition? - dsk = {} + futures = [] for rank in ranks: for part_id in rank_to_out_part_ids[rank]: - dsk[(name, part_id)] = c.client.submit( - getitem, - shuffle_result[rank], - part_id, - workers=[c.worker_addresses[rank]], + futures.append( + c.client.submit( + getitem, + shuffle_result[rank], + part_id, + workers=[c.worker_addresses[rank]], + ) ) # Create a distributed Dataframe from all the pieces - divs = [None] * (len(dsk) + 1) - ret = new_dd_object(dsk, name, df_meta, divs).persist() + divs = [None] * (len(futures) + 1) + kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"} + ret = dd.from_delayed(futures, **kwargs).persist() wait([ret]) # Release all temporary dataframes - for fut in [*shuffle_result.values(), *dsk.values()]: + for fut in [*shuffle_result.values(), *futures]: fut.release() return ret diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 1f70fb2ca..f495648e0 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -22,14 +22,23 @@ from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle from dask_cuda.utils_test import IncreasedCloseTimeoutNanny +mp = mp.get_context("spawn") # type: ignore +ucp = pytest.importorskip("ucp") + +QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False + # Skip these tests when dask-expr is active (for now) -pytestmark = pytest.mark.skipif( - dask.config.get("dataframe.query-planning", None) is not False, - reason="https://github.com/rapidsai/dask-cuda/issues/1311", +query_planning_skip = pytest.mark.skipif( + QUERY_PLANNING_ON, + reason=( + "The 'explicit-comms' config is not supported " + "when query planning is enabled." + ), ) -mp = mp.get_context("spawn") # type: ignore -ucp = pytest.importorskip("ucp") +# Set default shuffle method to "tasks" +if dask.config.get("dataframe.shuffle.method", None) is None: + dask.config.set({"dataframe.shuffle.method": "tasks"}) # Notice, all of the following tests is executed in a new process such @@ -89,6 +98,7 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions): pd.testing.assert_frame_equal(got, expected) +@query_planning_skip def test_dataframe_merge_empty_partitions(): # Notice, we use more partitions than rows p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4)) @@ -227,6 +237,7 @@ def check_shuffle(): check_shuffle() +@query_planning_skip @pytest.mark.parametrize("in_cluster", [True, False]) def test_dask_use_explicit_comms(in_cluster): def _timeout(process, function, timeout): @@ -289,6 +300,7 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): assert_eq(got, expected) +@query_planning_skip @pytest.mark.parametrize("nworkers", [1, 2, 4]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) @pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"])