Skip to content

Commit

Permalink
remove a lot of dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Dec 18, 2024
1 parent a2eccf4 commit 330eaaa
Show file tree
Hide file tree
Showing 21 changed files with 724 additions and 2,801 deletions.
2 changes: 1 addition & 1 deletion python/dask_cudf/dask_cudf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def read_parquet(*args, **kwargs):
read_text = DataFrame.read_text
to_orc = _deprecated_api(
"dask_cudf.to_orc",
new_api="dask_cudf._legacy.io.to_orc",
new_api="dask_cudf.io.to_orc",
rec="Please use DataFrame.to_orc instead.",
)

Expand Down
8 changes: 4 additions & 4 deletions python/dask_cudf/dask_cudf/_expr/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ def groupby(
)

def to_orc(self, *args, **kwargs):
from dask_cudf._legacy.io import to_orc
from dask_cudf.io.orc import to_orc as to_orc_impl

return to_orc(self, *args, **kwargs)
return to_orc_impl(self, *args, **kwargs)

@staticmethod
def read_text(*args, **kwargs):
from dask_cudf._legacy.io.text import read_text as legacy_read_text
from dask_cudf.io.text import read_text as read_text_impl

return legacy_read_text(*args, **kwargs)
return read_text_impl(*args, **kwargs)

def clip(self, lower=None, upper=None, axis=1):
if axis not in (None, 1):
Expand Down
264 changes: 252 additions & 12 deletions python/dask_cudf/dask_cudf/_expr/groupby.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
import functools

import numpy as np
import pandas as pd
from dask_expr._collection import new_collection
from dask_expr._groupby import (
Expand All @@ -16,11 +17,262 @@
from dask.dataframe.groupby import Aggregation

from cudf.core.groupby.groupby import _deprecate_collect
from cudf.utils.performance_tracking import _dask_cudf_performance_tracking

##
## Fused groupby aggregations
##

OPTIMIZED_AGGS = (
"count",
"mean",
"std",
"var",
"sum",
"min",
"max",
list,
"first",
"last",
)


def _make_name(col_name, sep="_"):
"""Combine elements of `col_name` into a single string, or no-op if
`col_name` is already a string
"""
if isinstance(col_name, str):
return col_name
return sep.join(name for name in col_name if name != "")


@_dask_cudf_performance_tracking
def _groupby_partition_agg(df, gb_cols, aggs, columns, dropna, sort, sep):
"""Initial partition-level aggregation task.
This is the first operation to be executed on each input
partition in `groupby_agg`. Depending on `aggs`, four possible
groupby aggregations ("count", "sum", "min", and "max") are
performed. The result is then partitioned (by hashing `gb_cols`)
into a number of distinct dictionary elements. The number of
elements in the output dictionary (`split_out`) corresponds to
the number of partitions in the final output of `groupby_agg`.
"""

# Modify dict for initial (partition-wise) aggregations
_agg_dict = {}
for col, agg_list in aggs.items():
_agg_dict[col] = set()
for agg in agg_list:
if agg in ("mean", "std", "var"):
_agg_dict[col].add("count")
_agg_dict[col].add("sum")
else:
_agg_dict[col].add(agg)
_agg_dict[col] = list(_agg_dict[col])
if set(agg_list).intersection({"std", "var"}):
pow2_name = _make_name((col, "pow2"), sep=sep)
df[pow2_name] = df[col].astype("float64").pow(2)
_agg_dict[pow2_name] = ["sum"]

gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg(
_agg_dict
)
output_columns = [_make_name(name, sep=sep) for name in gb.columns]
gb.columns = output_columns
# Return with deterministic column ordering
return gb[sorted(output_columns)]


@_dask_cudf_performance_tracking
def _tree_node_agg(df, gb_cols, dropna, sort, sep):
"""Node in groupby-aggregation reduction tree.
The input DataFrame (`df`) corresponds to the
concatenated output of one or more `_groupby_partition_agg`
tasks. In this function, "sum", "min" and/or "max" groupby
aggregations will be used to combine the statistics for
duplicate keys.
"""

agg_dict = {}
for col in df.columns:
if col in gb_cols:
continue
agg = col.split(sep)[-1]
if agg in ("count", "sum"):
agg_dict[col] = ["sum"]
elif agg == "list":
agg_dict[col] = [list]
elif agg in OPTIMIZED_AGGS:
agg_dict[col] = [agg]
else:
raise ValueError(f"Unexpected aggregation: {agg}")

gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg(
agg_dict
)

# Don't include the last aggregation in the column names
output_columns = [
_make_name(name[:-1] if isinstance(name, tuple) else name, sep=sep)
for name in gb.columns
]
gb.columns = output_columns
# Return with deterministic column ordering
return gb[sorted(output_columns)]


@_dask_cudf_performance_tracking
def _var_agg(df, col, count_name, sum_name, pow2_sum_name, ddof=1):
"""Calculate variance (given count, sum, and sum-squared columns)."""

# Select count, sum, and sum-squared
n = df[count_name]
x = df[sum_name]
x2 = df[pow2_sum_name]

# Use sum-squared approach to get variance
var = x2 - x**2 / n
div = n - ddof
div[div < 1] = 1 # Avoid division by 0
var /= div

# Set appropriate NaN elements
# (since we avoided 0-division)
var[(n - ddof) == 0] = np.nan

return var


@_dask_cudf_performance_tracking
def _finalize_gb_agg(
gb_in,
gb_cols,
aggs,
columns,
final_columns,
as_index,
dropna,
sort,
sep,
str_cols_out,
aggs_renames,
):
"""Final aggregation task.
This is the final operation on each output partitions
of the `groupby_agg` algorithm. This function must
take care of higher-order aggregations, like "mean",
"std" and "var". We also need to deal with the column
index, the row index, and final sorting behavior.
"""

gb = _tree_node_agg(gb_in, gb_cols, dropna, sort, sep)

# Deal with higher-order aggregations
for col in columns:
agg_list = aggs.get(col, [])
agg_set = set(agg_list)
if agg_set.intersection({"mean", "std", "var"}):
count_name = _make_name((col, "count"), sep=sep)
sum_name = _make_name((col, "sum"), sep=sep)
if agg_set.intersection({"std", "var"}):
pow2_sum_name = _make_name((col, "pow2", "sum"), sep=sep)
var = _var_agg(gb, col, count_name, sum_name, pow2_sum_name)
if "var" in agg_list:
name_var = _make_name((col, "var"), sep=sep)
gb[name_var] = var
if "std" in agg_list:
name_std = _make_name((col, "std"), sep=sep)
gb[name_std] = np.sqrt(var)
gb.drop(columns=[pow2_sum_name], inplace=True)
if "mean" in agg_list:
mean_name = _make_name((col, "mean"), sep=sep)
gb[mean_name] = gb[sum_name] / gb[count_name]
if "sum" not in agg_list:
gb.drop(columns=[sum_name], inplace=True)
if "count" not in agg_list:
gb.drop(columns=[count_name], inplace=True)
if list in agg_list:
collect_name = _make_name((col, "list"), sep=sep)
gb[collect_name] = gb[collect_name].list.concat()

# Ensure sorted keys if `sort=True`
if sort:
gb = gb.sort_values(gb_cols)

# Set index if necessary
if as_index:
gb.set_index(gb_cols, inplace=True)

# Unflatten column names
col_array = []
agg_array = []
for col in gb.columns:
if col in gb_cols:
col_array.append(col)
agg_array.append("")
else:
name, agg = col.split(sep)
col_array.append(name)
agg_array.append(aggs_renames.get((name, agg), agg))
if str_cols_out:
gb.columns = col_array
else:
gb.columns = pd.MultiIndex.from_arrays([col_array, agg_array])

return gb[final_columns]


@_dask_cudf_performance_tracking
def _redirect_aggs(arg):
"""Redirect aggregations to their corresponding name in cuDF"""
redirects = {
sum: "sum",
max: "max",
min: "min",
"collect": list,
"list": list,
}
if isinstance(arg, dict):
new_arg = dict()
for col in arg:
if isinstance(arg[col], list):
new_arg[col] = [redirects.get(agg, agg) for agg in arg[col]]
elif isinstance(arg[col], dict):
new_arg[col] = {
k: redirects.get(v, v) for k, v in arg[col].items()
}
else:
new_arg[col] = redirects.get(arg[col], arg[col])
return new_arg
if isinstance(arg, list):
return [redirects.get(agg, agg) for agg in arg]
return redirects.get(arg, arg)


@_dask_cudf_performance_tracking
def _aggs_optimized(arg, supported: set):
"""Check that aggregations in `arg` are a subset of `supported`"""
if isinstance(arg, (list, dict)):
if isinstance(arg, dict):
_global_set: set[str] = set()
for col in arg:
if isinstance(arg[col], list):
_global_set = _global_set.union(set(arg[col]))
elif isinstance(arg[col], dict):
_global_set = _global_set.union(set(arg[col].values()))
else:
_global_set.add(arg[col])
else:
_global_set = set(arg)

return bool(_global_set.issubset(supported))
elif isinstance(arg, (str, type)):
return arg in supported
return False


def _get_spec_info(gb):
if isinstance(gb.arg, (dict, list)):
Expand Down Expand Up @@ -105,20 +357,14 @@ def shuffle_by_index(self):

@classmethod
def chunk(cls, df, *by, **kwargs):
from dask_cudf._legacy.groupby import _groupby_partition_agg

return _groupby_partition_agg(df, **kwargs)

@classmethod
def combine(cls, inputs, **kwargs):
from dask_cudf._legacy.groupby import _tree_node_agg

return _tree_node_agg(_concat(inputs), **kwargs)

@classmethod
def aggregate(cls, inputs, **kwargs):
from dask_cudf._legacy.groupby import _finalize_gb_agg

return _finalize_gb_agg(_concat(inputs), **kwargs)

@property
Expand Down Expand Up @@ -193,12 +439,6 @@ def _maybe_get_custom_expr(
shuffle_method=None,
**kwargs,
):
from dask_cudf._legacy.groupby import (
OPTIMIZED_AGGS,
_aggs_optimized,
_redirect_aggs,
)

if kwargs:
# Unsupported key-word arguments
return None
Expand Down
Loading

0 comments on commit 330eaaa

Please sign in to comment.