Skip to content

Commit

Permalink
Cast partitioning column to float for numerics to avoid loss of data …
Browse files Browse the repository at this point in the history
…in shuffle (dask#10705)
  • Loading branch information
phofl authored Dec 14, 2023
1 parent 8dd74c5 commit 2d8f6d5
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
10 changes: 9 additions & 1 deletion dask/dataframe/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,7 +1412,15 @@ def _split_partition(df, on, nsplits):
# add a "_partitions" column to perform the split.
if not isinstance(on, _Frame):
on = _select_columns_or_index(df, on)
partitions = partitioning_index(on, nsplits)

dtypes = {}
for col, dtype in on.dtypes.items():
if pd.api.types.is_numeric_dtype(dtype):
dtypes[col] = np.float64
if not dtypes:
dtypes = None

partitions = partitioning_index(on, nsplits, cast_dtype=dtypes)
df2 = df.assign(_partitions=partitions)
return shuffle_group(
df2,
Expand Down
16 changes: 15 additions & 1 deletion dask/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,13 @@ def shuffle(
# selection will not match (important when merging).
index = index.to_frame()

dtypes = {}
for col, dtype in index.dtypes.items():
if pd.api.types.is_numeric_dtype(dtype):
dtypes[col] = np.float64
if not dtypes:
dtypes = None

meta = df._meta._constructor_sliced([0])
# Ensure that we have the same index as before to avoid alignment
# when calculating meta dtypes later on
Expand All @@ -429,6 +436,7 @@ def shuffle(
npartitions=npartitions or df.npartitions,
meta=meta,
transform_divisions=False,
cast_dtype=dtypes,
)
df2 = df.assign(_partitions=partitions)
df2._meta.index.name = df._meta.index.name
Expand Down Expand Up @@ -786,7 +794,7 @@ def rearrange_by_column_tasks(
########################################################


def partitioning_index(df, npartitions):
def partitioning_index(df, npartitions, cast_dtype=None):
"""
Computes a deterministic index mapping each record to a partition.
Expand All @@ -797,12 +805,18 @@ def partitioning_index(df, npartitions):
df : DataFrame/Series/Index
npartitions : int
The number of partitions to group into.
cast_dtype : dtype, optional
The dtype to cast to to avoid nullability issues
Returns
-------
partitions : ndarray
An array of int64 values mapping each record to a partition.
"""
if cast_dtype is not None:
# Fixme: astype raises with strings in numeric columns, but raising
# here might be very noisy
df = df.astype(cast_dtype, errors="ignore")
return hash_object_dispatch(df, index=False) % int(npartitions)


Expand Down
22 changes: 22 additions & 0 deletions dask/dataframe/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1693,3 +1693,25 @@ def test_set_index_pyarrow_dtype(data, dtype):
pdf_result = pdf.set_index("arrow_col")
ddf_result = ddf.set_index("arrow_col")
assert_eq(ddf_result, pdf_result)


def test_shuffle_nulls_introduced():
df1 = pd.DataFrame([[True], [False]] * 50, columns=["A"])
df1["B"] = list(range(100))

df2 = pd.DataFrame(
[[2, 3], [109, 2], [345, 3], [50, 7], [95, 1]], columns=["B", "C"]
)

ddf1 = dd.from_pandas(df1, npartitions=10)
ddf2 = dd.from_pandas(df2, npartitions=1)
meta = pd.Series(dtype=int, index=pd.Index([], dtype=bool, name="A"), name="A")
result = (
dd.merge(ddf1, ddf2, how="outer", on="B")
.groupby("A")
.apply(lambda df: len(df), meta=meta)
)
expected = (
pd.merge(df1, df2, how="outer", on="B").groupby("A").apply(lambda df: len(df))
)
assert_eq(result, expected, check_names=False)

0 comments on commit 2d8f6d5

Please sign in to comment.