Skip to content

Commit

Permalink
Update explicit-comms for dask-expr support (#1323)
Browse files Browse the repository at this point in the history
Makes a few ~small~ changes to explicit-comms to support dask-expr.

EDIT: The changes are no longer "small".

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)
  - Jake Awe (https://github.com/AyodeAwe)

URL: #1323
  • Loading branch information
rjzamora authored Apr 3, 2024
1 parent 21482c5 commit 58e4b95
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 51 deletions.
43 changes: 41 additions & 2 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ set_exit_code() {
trap set_exit_code ERR
set +e

rapids-logger "pytest dask-cuda"
rapids-logger "pytest dask-cuda (dask-expr)"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=True \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
Expand All @@ -62,13 +63,51 @@ timeout 60m pytest \
tests -k "not ucxx"
popd

rapids-logger "Run local benchmark"
rapids-logger "pytest explicit-comms (legacy dd)"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=False \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
UCX_WARN_UNUSED_ENV_VARS=n \
UCX_MEMTYPE_CACHE=n \
timeout 30m pytest \
-vv \
--durations=0 \
--capture=no \
--cache-clear \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \
--cov-config=../pyproject.toml \
--cov=dask_cuda \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \
--cov-report=term \
tests/test_explicit_comms.py -k "not ucxx"
popd

rapids-logger "Run local benchmark (dask-expr)"
DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

rapids-logger "Run local benchmark (legacy dd)"
DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
Expand Down
12 changes: 12 additions & 0 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@
from .proxify_device_objects import proxify_decorator, unproxify_decorator


if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get(
"explicit-comms", False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the shuffle "
"API directly, or use the legacy dask-dataframe API "
"(set the 'dataframe.query-planning' config to `False`"
"before importing `dask.dataframe`).",
)


# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True`
dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper(
dask.dataframe.shuffle.rearrange_by_column
Expand Down
47 changes: 27 additions & 20 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import pandas as pd

import dask
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
import dask.dataframe as dd
from dask.distributed import performance_report, wait
from dask.utils import format_bytes, parse_bytes

Expand All @@ -25,12 +24,20 @@
# <https://gist.github.com/rjzamora/0ffc35c19b5180ab04bbf7c793c45955>


def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu):
# Set default shuffle method to "tasks"
if dask.config.get("dataframe.shuffle.method", None) is None:
dask.config.set({"dataframe.shuffle.method": "tasks"})


def generate_chunk(input):
i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input

# Setting a seed that triggers max amount of comm in the two-GPU case.
if gpu:
import cupy as xp

import cudf as xdf
import dask_cudf # noqa: F401
else:
import numpy as xp
import pandas as xdf
Expand Down Expand Up @@ -105,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args):

parts = [chunk_size for _ in range(num_chunks)]
device_type = True if args.type == "gpu" else False
meta = generate_chunk(0, 4, 1, chunk_type, None, device_type)
meta = generate_chunk((0, 4, 1, chunk_type, None, device_type))
divisions = [None] * (len(parts) + 1)

name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type)

graph = {
(name, i): (
generate_chunk,
i,
part,
len(parts),
chunk_type,
frac_match,
device_type,
)
for i, part in enumerate(parts)
}

ddf = new_dd_object(graph, name, meta, divisions)
ddf = dd.from_map(
generate_chunk,
[
(
i,
part,
len(parts),
chunk_type,
frac_match,
device_type,
)
for i, part in enumerate(parts)
],
meta=meta,
divisions=divisions,
)

if chunk_type == "build":
if not args.no_shuffle:
Expand Down
26 changes: 16 additions & 10 deletions dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

import dask
import dask.dataframe
from dask.dataframe.core import new_dd_object
from dask.dataframe.shuffle import shuffle
from dask.distributed import Client, performance_report, wait
from dask.utils import format_bytes, parse_bytes

Expand All @@ -33,7 +31,7 @@


def shuffle_dask(df, args):
result = shuffle(df, index="data", shuffle="tasks", ignore_index=args.ignore_index)
result = df.shuffle("data", shuffle_method="tasks", ignore_index=args.ignore_index)
if args.backend == "dask-noop":
result = as_noop(result)
t1 = perf_counter()
Expand Down Expand Up @@ -94,18 +92,24 @@ def create_data(
)

# Create partition based to the specified partition distribution
dsk = {}
futures = []
for i, part_size in enumerate(dist):
for _ in range(part_size):
# We use `client.submit` to control placement of the partition.
dsk[(name, len(dsk))] = client.submit(
create_df, chunksize, args.type, workers=[workers[i]], pure=False
futures.append(
client.submit(
create_df, chunksize, args.type, workers=[workers[i]], pure=False
)
)
wait(dsk.values())
wait(futures)

df_meta = create_df(0, args.type)
divs = [None] * (len(dsk) + 1)
ret = new_dd_object(dsk, name, df_meta, divs).persist()
divs = [None] * (len(futures) + 1)
ret = dask.dataframe.from_delayed(
futures,
meta=df_meta,
divisions=divs,
).persist()
wait(ret)

data_processed = args.in_parts * args.partition_size
Expand Down Expand Up @@ -254,7 +258,9 @@ def parse_args():
]

return parse_benchmark_args(
description="Distributed shuffle (dask/cudf) benchmark", args_list=special_args
description="Distributed shuffle (dask/cudf) benchmark",
args_list=special_args,
check_explicit_comms=False,
)


Expand Down
25 changes: 24 additions & 1 deletion dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import pandas as pd

from dask import config
from dask.distributed import Client, SSHCluster
from dask.utils import format_bytes, format_time, parse_bytes
from distributed.comm.addressing import get_address_host
Expand Down Expand Up @@ -47,7 +48,11 @@ def as_noop(dsk):
raise RuntimeError("Requested noop computation but dask-noop not installed.")


def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]):
def parse_benchmark_args(
description="Generic dask-cuda Benchmark",
args_list=[],
check_explicit_comms=True,
):
parser = argparse.ArgumentParser(description=description)
worker_args = parser.add_argument_group(description="Worker configuration")
worker_args.add_argument(
Expand Down Expand Up @@ -317,6 +322,24 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]
if args.multi_node and len(args.hosts.split(",")) < 2:
raise ValueError("--multi-node requires at least 2 hosts")

# Raise error early if "explicit-comms" is not allowed
if (
check_explicit_comms
and args.backend == "explicit-comms"
and config.get(
"dataframe.query-planning",
None,
)
is not False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the legacy "
"dask-dataframe API by setting the following environment "
"variable before executing:",
" DASK_DATAFRAME__QUERY_PLANNING=False",
)

return args


Expand Down
32 changes: 19 additions & 13 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import dask
import dask.config
import dask.dataframe
import dask.dataframe as dd
import dask.utils
import distributed.worker
from dask.base import tokenize
from dask.dataframe.core import DataFrame, Series, _concat as dd_concat, new_dd_object
from dask.dataframe import DataFrame, Series
from dask.dataframe.core import _concat as dd_concat
from dask.dataframe.shuffle import group_split_dispatch, hash_object_dispatch
from distributed import wait
from distributed.protocol import nested_deserialize, to_serialize
Expand Down Expand Up @@ -468,18 +470,19 @@ def shuffle(
npartitions = df.npartitions

# Step (a):
df = df.persist() # Make sure optimizations are apply on the existing graph
df = df.persist() # Make sure optimizations are applied on the existing graph
wait([df]) # Make sure all keys has been materialized on workers
persisted_keys = [f.key for f in c.client.futures_of(df)]
name = (
"explicit-comms-shuffle-"
f"{tokenize(df, column_names, npartitions, ignore_index)}"
f"{tokenize(df, column_names, npartitions, ignore_index, batchsize)}"
)
df_meta: DataFrame = df._meta

# Stage all keys of `df` on the workers and cancel them, which makes it possible
# for the shuffle to free memory as the partitions of `df` are consumed.
# See CommsContext.stage_keys() for a description of staging.
rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__())
rank_to_inkeys = c.stage_keys(name=name, keys=persisted_keys)
c.client.cancel(df)

# Get batchsize
Expand Down Expand Up @@ -526,23 +529,26 @@ def shuffle(
# TODO: can we do this without using `submit()` to avoid the overhead
# of creating a Future for each dataframe partition?

dsk = {}
futures = []
for rank in ranks:
for part_id in rank_to_out_part_ids[rank]:
dsk[(name, part_id)] = c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
futures.append(
c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
)
)

# Create a distributed Dataframe from all the pieces
divs = [None] * (len(dsk) + 1)
ret = new_dd_object(dsk, name, df_meta, divs).persist()
divs = [None] * (len(futures) + 1)
kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"}
ret = dd.from_delayed(futures, **kwargs).persist()
wait([ret])

# Release all temporary dataframes
for fut in [*shuffle_result.values(), *dsk.values()]:
for fut in [*shuffle_result.values(), *futures]:
fut.release()
return ret

Expand Down
22 changes: 17 additions & 5 deletions dask_cuda/tests/test_explicit_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,23 @@
from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny

mp = mp.get_context("spawn") # type: ignore
ucp = pytest.importorskip("ucp")

QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False

# Skip these tests when dask-expr is active (for now)
pytestmark = pytest.mark.skipif(
dask.config.get("dataframe.query-planning", None) is not False,
reason="https://github.com/rapidsai/dask-cuda/issues/1311",
query_planning_skip = pytest.mark.skipif(
QUERY_PLANNING_ON,
reason=(
"The 'explicit-comms' config is not supported "
"when query planning is enabled."
),
)

mp = mp.get_context("spawn") # type: ignore
ucp = pytest.importorskip("ucp")
# Set default shuffle method to "tasks"
if dask.config.get("dataframe.shuffle.method", None) is None:
dask.config.set({"dataframe.shuffle.method": "tasks"})


# Notice, all of the following tests is executed in a new process such
Expand Down Expand Up @@ -89,6 +98,7 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions):
pd.testing.assert_frame_equal(got, expected)


@query_planning_skip
def test_dataframe_merge_empty_partitions():
# Notice, we use more partitions than rows
p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4))
Expand Down Expand Up @@ -227,6 +237,7 @@ def check_shuffle():
check_shuffle()


@query_planning_skip
@pytest.mark.parametrize("in_cluster", [True, False])
def test_dask_use_explicit_comms(in_cluster):
def _timeout(process, function, timeout):
Expand Down Expand Up @@ -289,6 +300,7 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers):
assert_eq(got, expected)


@query_planning_skip
@pytest.mark.parametrize("nworkers", [1, 2, 4])
@pytest.mark.parametrize("backend", ["pandas", "cudf"])
@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"])
Expand Down

0 comments on commit 58e4b95

Please sign in to comment.