diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 3f7b7951..70f12335 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -8,6 +8,9 @@ from operator import getitem from typing import Any, Callable, Dict, List, Optional, Set, TypeVar +import numpy as np +import pandas as pd + import dask import dask.config import dask.dataframe @@ -155,9 +158,16 @@ def compute_map_index( if column_names[0] == "_partitions": ind = df[column_names[0]] else: - ind = hash_object_dispatch( - df[column_names] if column_names else df, index=False - ) + # Need to cast numerical dtypes to be consistent + # with `dask.dataframe.shuffle.partitioning_index` + dtypes = {} + index = df[column_names] if column_names else df + for col, dtype in index.dtypes.items(): + if pd.api.types.is_numeric_dtype(dtype): + dtypes[col] = np.float64 + if dtypes: + index = index.astype(dtypes, errors="ignore") + ind = hash_object_dispatch(index, index=False) return ind % npartitions @@ -187,15 +197,8 @@ def partition_dataframe( partitions Dict of dataframe-partitions, mapping partition-ID to dataframe """ - if column_names[0] != "_partitions" and hasattr(df, "partition_by_hash"): - return dict( - zip( - range(npartitions), - df.partition_by_hash( - column_names, npartitions, keep_index=not ignore_index - ), - ) - ) + # TODO: Use `partition_by_hash` if/when dtype-casting is added + # (See: https://github.com/rapidsai/cudf/issues/16221) map_index = compute_map_index(df, column_names, npartitions) return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index) @@ -529,18 +532,19 @@ def shuffle( # TODO: can we do this without using `submit()` to avoid the overhead # of creating a Future for each dataframe partition? - futures = [] + _futures = {} for rank in ranks: for part_id in rank_to_out_part_ids[rank]: - futures.append( - c.client.submit( - getitem, - shuffle_result[rank], - part_id, - workers=[c.worker_addresses[rank]], - ) + _futures[part_id] = c.client.submit( + getitem, + shuffle_result[rank], + part_id, + workers=[c.worker_addresses[rank]], ) + # Make sure partitions are properly ordered + futures = [_futures.pop(i) for i in range(npartitions)] + # Create a distributed Dataframe from all the pieces divs = [None] * (len(futures) + 1) kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"} diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index f495648e..2806dc1c 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -109,7 +109,14 @@ def test_dataframe_merge_empty_partitions(): def check_partitions(df, npartitions): """Check that all values in `df` hashes to the same""" - hashes = partitioning_index(df, npartitions) + dtypes = {} + for col, dtype in df.dtypes.items(): + if pd.api.types.is_numeric_dtype(dtype): + dtypes[col] = np.float64 + if not dtypes: + dtypes = None + + hashes = partitioning_index(df, npartitions, cast_dtype=dtypes) if len(hashes) > 0: return len(hashes.unique()) == 1 else: @@ -128,11 +135,10 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): worker_class=IncreasedCloseTimeoutNanny, processes=True, ) as cluster: - with Client(cluster) as client: - all_workers = list(client.get_worker_logs().keys()) + with Client(cluster): comms.default_comms() np.random.seed(42) - df = pd.DataFrame({"key": np.random.random(100)}) + df = pd.DataFrame({"key": np.random.randint(0, high=100, size=100)}) if backend == "cudf": df = cudf.DataFrame.from_pandas(df) @@ -141,15 +147,13 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): for input_nparts in range(1, 5): for output_nparts in range(1, 5): - ddf = dd.from_pandas(df.copy(), npartitions=input_nparts).persist( - workers=all_workers - ) + ddf1 = dd.from_pandas(df.copy(), npartitions=input_nparts) # To reduce test runtime, we change the batchsizes here instead # of using a test parameter. for batchsize in (-1, 1, 2): with dask.config.set(explicit_comms_batchsize=batchsize): ddf = explicit_comms_shuffle( - ddf, + ddf1, ["_partitions"] if _partitions else ["key"], npartitions=output_nparts, batchsize=batchsize, @@ -177,6 +181,32 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): got = ddf.compute().sort_values("key") assert_eq(got, expected) + # Check that partitioning is consistent with "tasks" + ddf_tasks = ddf1.shuffle( + ["key"], + npartitions=output_nparts, + shuffle_method="tasks", + ) + for i in range(output_nparts): + expected_partition = ddf_tasks.partitions[ + i + ].compute()["key"] + actual_partition = ddf.partitions[i].compute()[ + "key" + ] + if backend == "cudf": + expected_partition = ( + expected_partition.values_host + ) + actual_partition = actual_partition.values_host + else: + expected_partition = expected_partition.values + actual_partition = actual_partition.values + assert all( + np.sort(expected_partition) + == np.sort(actual_partition) + ) + @pytest.mark.parametrize("nworkers", [1, 2, 3]) @pytest.mark.parametrize("backend", ["pandas", "cudf"])