Skip to content

Commit 51079a7

Browse files
committed
adjust shuffle to produce partitions that are consistent with dask.dataframe.shuffle
1 parent 3e0f7c3 commit 51079a7

File tree

2 files changed

+60
-24
lines changed

2 files changed

+60
-24
lines changed

dask_cuda/explicit_comms/dataframe/shuffle.py

+23-20
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
from operator import getitem
99
from typing import Any, Callable, Dict, List, Optional, Set, TypeVar
1010

11+
import numpy as np
12+
import pandas as pd
13+
1114
import dask
1215
import dask.config
1316
import dask.dataframe
@@ -155,9 +158,16 @@ def compute_map_index(
155158
if column_names[0] == "_partitions":
156159
ind = df[column_names[0]]
157160
else:
158-
ind = hash_object_dispatch(
159-
df[column_names] if column_names else df, index=False
160-
)
161+
# Need to cast numerical dtypes to be consistent
162+
# with `dask.dataframe.shuffle.partitioning_index`
163+
dtypes = {}
164+
index = df[column_names] if column_names else df
165+
for col, dtype in index.dtypes.items():
166+
if pd.api.types.is_numeric_dtype(dtype):
167+
dtypes[col] = np.float64
168+
if dtypes:
169+
index = index.astype(dtypes, errors="ignore")
170+
ind = hash_object_dispatch(index, index=False)
161171
return ind % npartitions
162172

163173

@@ -187,15 +197,7 @@ def partition_dataframe(
187197
partitions
188198
Dict of dataframe-partitions, mapping partition-ID to dataframe
189199
"""
190-
if column_names[0] != "_partitions" and hasattr(df, "partition_by_hash"):
191-
return dict(
192-
zip(
193-
range(npartitions),
194-
df.partition_by_hash(
195-
column_names, npartitions, keep_index=not ignore_index
196-
),
197-
)
198-
)
200+
# TODO: Use `partition_by_hash` after dtype-casting is added
199201
map_index = compute_map_index(df, column_names, npartitions)
200202
return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index)
201203

@@ -529,18 +531,19 @@ def shuffle(
529531
# TODO: can we do this without using `submit()` to avoid the overhead
530532
# of creating a Future for each dataframe partition?
531533

532-
futures = []
534+
_futures = {}
533535
for rank in ranks:
534536
for part_id in rank_to_out_part_ids[rank]:
535-
futures.append(
536-
c.client.submit(
537-
getitem,
538-
shuffle_result[rank],
539-
part_id,
540-
workers=[c.worker_addresses[rank]],
541-
)
537+
_futures[part_id] = c.client.submit(
538+
getitem,
539+
shuffle_result[rank],
540+
part_id,
541+
workers=[c.worker_addresses[rank]],
542542
)
543543

544+
# Make sure partitions are properly ordered
545+
futures = [_futures.pop(i) for i in range(npartitions)]
546+
544547
# Create a distributed Dataframe from all the pieces
545548
divs = [None] * (len(futures) + 1)
546549
kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"}

dask_cuda/tests/test_explicit_comms.py

+37-4
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,14 @@ def test_dataframe_merge_empty_partitions():
109109

110110
def check_partitions(df, npartitions):
111111
"""Check that all values in `df` hashes to the same"""
112-
hashes = partitioning_index(df, npartitions)
112+
dtypes = {}
113+
for col, dtype in df.dtypes.items():
114+
if pd.api.types.is_numeric_dtype(dtype):
115+
dtypes[col] = np.float64
116+
if not dtypes:
117+
dtypes = None
118+
119+
hashes = partitioning_index(df, npartitions, cast_dtype=dtypes)
113120
if len(hashes) > 0:
114121
return len(hashes.unique()) == 1
115122
else:
@@ -132,7 +139,7 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions):
132139
all_workers = list(client.get_worker_logs().keys())
133140
comms.default_comms()
134141
np.random.seed(42)
135-
df = pd.DataFrame({"key": np.random.random(100)})
142+
df = pd.DataFrame({"key": np.random.randint(0, high=100, size=100)})
136143
if backend == "cudf":
137144
df = cudf.DataFrame.from_pandas(df)
138145

@@ -141,15 +148,15 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions):
141148

142149
for input_nparts in range(1, 5):
143150
for output_nparts in range(1, 5):
144-
ddf = dd.from_pandas(df.copy(), npartitions=input_nparts).persist(
151+
ddf1 = dd.from_pandas(df.copy(), npartitions=input_nparts).persist(
145152
workers=all_workers
146153
)
147154
# To reduce test runtime, we change the batchsizes here instead
148155
# of using a test parameter.
149156
for batchsize in (-1, 1, 2):
150157
with dask.config.set(explicit_comms_batchsize=batchsize):
151158
ddf = explicit_comms_shuffle(
152-
ddf,
159+
ddf1,
153160
["_partitions"] if _partitions else ["key"],
154161
npartitions=output_nparts,
155162
batchsize=batchsize,
@@ -177,6 +184,32 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions):
177184
got = ddf.compute().sort_values("key")
178185
assert_eq(got, expected)
179186

187+
# Check that partitioning is consistent with "tasks"
188+
ddf_tasks = ddf1.shuffle(
189+
["key"],
190+
npartitions=output_nparts,
191+
shuffle_method="tasks",
192+
)
193+
for i in range(output_nparts):
194+
expected_partition = ddf_tasks.partitions[
195+
i
196+
].compute()["key"]
197+
actual_partition = ddf.partitions[i].compute()[
198+
"key"
199+
]
200+
if backend == "cudf":
201+
expected_partition = (
202+
expected_partition.values_host
203+
)
204+
actual_partition = actual_partition.values_host
205+
else:
206+
expected_partition = expected_partition.values
207+
actual_partition = actual_partition.values
208+
assert all(
209+
np.sort(expected_partition)
210+
== np.sort(actual_partition)
211+
)
212+
180213

181214
@pytest.mark.parametrize("nworkers", [1, 2, 3])
182215
@pytest.mark.parametrize("backend", ["pandas", "cudf"])

0 commit comments

Comments
 (0)