Skip to content

Commit

Permalink
Fix partitioning in explicit-comms shuffle (#1356)
Browse files Browse the repository at this point in the history
Closes #1355

Current version of the explicit-comms shuffle does not produce partitioning that is consistent with `dask.dataframe`.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #1356
  • Loading branch information
rjzamora authored Jul 9, 2024
1 parent 3e0f7c3 commit fe23e45
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 28 deletions.
44 changes: 24 additions & 20 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from operator import getitem
from typing import Any, Callable, Dict, List, Optional, Set, TypeVar

import numpy as np
import pandas as pd

import dask
import dask.config
import dask.dataframe
Expand Down Expand Up @@ -155,9 +158,16 @@ def compute_map_index(
if column_names[0] == "_partitions":
ind = df[column_names[0]]
else:
ind = hash_object_dispatch(
df[column_names] if column_names else df, index=False
)
# Need to cast numerical dtypes to be consistent
# with `dask.dataframe.shuffle.partitioning_index`
dtypes = {}
index = df[column_names] if column_names else df
for col, dtype in index.dtypes.items():
if pd.api.types.is_numeric_dtype(dtype):
dtypes[col] = np.float64
if dtypes:
index = index.astype(dtypes, errors="ignore")
ind = hash_object_dispatch(index, index=False)
return ind % npartitions


Expand Down Expand Up @@ -187,15 +197,8 @@ def partition_dataframe(
partitions
Dict of dataframe-partitions, mapping partition-ID to dataframe
"""
if column_names[0] != "_partitions" and hasattr(df, "partition_by_hash"):
return dict(
zip(
range(npartitions),
df.partition_by_hash(
column_names, npartitions, keep_index=not ignore_index
),
)
)
# TODO: Use `partition_by_hash` if/when dtype-casting is added
# (See: https://github.com/rapidsai/cudf/issues/16221)
map_index = compute_map_index(df, column_names, npartitions)
return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index)

Expand Down Expand Up @@ -529,18 +532,19 @@ def shuffle(
# TODO: can we do this without using `submit()` to avoid the overhead
# of creating a Future for each dataframe partition?

futures = []
_futures = {}
for rank in ranks:
for part_id in rank_to_out_part_ids[rank]:
futures.append(
c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
)
_futures[part_id] = c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
)

# Make sure partitions are properly ordered
futures = [_futures.pop(i) for i in range(npartitions)]

# Create a distributed Dataframe from all the pieces
divs = [None] * (len(futures) + 1)
kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"}
Expand Down
46 changes: 38 additions & 8 deletions dask_cuda/tests/test_explicit_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,14 @@ def test_dataframe_merge_empty_partitions():

def check_partitions(df, npartitions):
"""Check that all values in `df` hashes to the same"""
hashes = partitioning_index(df, npartitions)
dtypes = {}
for col, dtype in df.dtypes.items():
if pd.api.types.is_numeric_dtype(dtype):
dtypes[col] = np.float64
if not dtypes:
dtypes = None

hashes = partitioning_index(df, npartitions, cast_dtype=dtypes)
if len(hashes) > 0:
return len(hashes.unique()) == 1
else:
Expand All @@ -128,11 +135,10 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions):
worker_class=IncreasedCloseTimeoutNanny,
processes=True,
) as cluster:
with Client(cluster) as client:
all_workers = list(client.get_worker_logs().keys())
with Client(cluster):
comms.default_comms()
np.random.seed(42)
df = pd.DataFrame({"key": np.random.random(100)})
df = pd.DataFrame({"key": np.random.randint(0, high=100, size=100)})
if backend == "cudf":
df = cudf.DataFrame.from_pandas(df)

Expand All @@ -141,15 +147,13 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions):

for input_nparts in range(1, 5):
for output_nparts in range(1, 5):
ddf = dd.from_pandas(df.copy(), npartitions=input_nparts).persist(
workers=all_workers
)
ddf1 = dd.from_pandas(df.copy(), npartitions=input_nparts)
# To reduce test runtime, we change the batchsizes here instead
# of using a test parameter.
for batchsize in (-1, 1, 2):
with dask.config.set(explicit_comms_batchsize=batchsize):
ddf = explicit_comms_shuffle(
ddf,
ddf1,
["_partitions"] if _partitions else ["key"],
npartitions=output_nparts,
batchsize=batchsize,
Expand Down Expand Up @@ -177,6 +181,32 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions):
got = ddf.compute().sort_values("key")
assert_eq(got, expected)

# Check that partitioning is consistent with "tasks"
ddf_tasks = ddf1.shuffle(
["key"],
npartitions=output_nparts,
shuffle_method="tasks",
)
for i in range(output_nparts):
expected_partition = ddf_tasks.partitions[
i
].compute()["key"]
actual_partition = ddf.partitions[i].compute()[
"key"
]
if backend == "cudf":
expected_partition = (
expected_partition.values_host
)
actual_partition = actual_partition.values_host
else:
expected_partition = expected_partition.values
actual_partition = actual_partition.values
assert all(
np.sort(expected_partition)
== np.sort(actual_partition)
)


@pytest.mark.parametrize("nworkers", [1, 2, 3])
@pytest.mark.parametrize("backend", ["pandas", "cudf"])
Expand Down

0 comments on commit fe23e45

Please sign in to comment.