diff --git a/ci/test_python_other.sh b/ci/test_python_other.sh index db86721755d..5b0dd5759d7 100755 --- a/ci/test_python_other.sh +++ b/ci/test_python_other.sh @@ -24,8 +24,8 @@ EXITCODE=0 trap "EXITCODE=1" ERR set +e -rapids-logger "pytest dask_cudf (dask-expr)" -DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \ +rapids-logger "pytest dask_cudf" +./ci/run_dask_cudf_pytests.sh \ --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf.xml" \ --numprocesses=8 \ --dist=worksteal \ @@ -34,13 +34,6 @@ DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \ --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cudf-coverage.xml" \ --cov-report=term -rapids-logger "pytest dask_cudf (legacy)" -DASK_DATAFRAME__QUERY_PLANNING=False ./ci/run_dask_cudf_pytests.sh \ - --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-legacy.xml" \ - --numprocesses=8 \ - --dist=worksteal \ - . - rapids-logger "pytest cudf_kafka" ./ci/run_cudf_kafka_pytests.sh \ --junitxml="${RAPIDS_TESTS_DIR}/junit-cudf-kafka.xml" diff --git a/ci/test_wheel_dask_cudf.sh b/ci/test_wheel_dask_cudf.sh index e15949f4bdb..e50b39df796 100755 --- a/ci/test_wheel_dask_cudf.sh +++ b/ci/test_wheel_dask_cudf.sh @@ -30,21 +30,11 @@ RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${RESULTS_DIR}/test-results"}/ mkdir -p "${RAPIDS_TESTS_DIR}" # Run tests in dask_cudf/tests and dask_cudf/io/tests -rapids-logger "pytest dask_cudf (dask-expr)" +rapids-logger "pytest dask_cudf" pushd python/dask_cudf/dask_cudf -DASK_DATAFRAME__QUERY_PLANNING=True python -m pytest \ +python -m pytest \ --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf.xml" \ --numprocesses=8 \ --dist=worksteal \ . popd - -# Run tests in dask_cudf/tests and dask_cudf/io/tests (legacy) -rapids-logger "pytest dask_cudf (legacy)" -pushd python/dask_cudf/dask_cudf -DASK_DATAFRAME__QUERY_PLANNING=False python -m pytest \ - --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-legacy.xml" \ - --numprocesses=8 \ - --dist=worksteal \ - . -popd diff --git a/python/dask_cudf/dask_cudf/__init__.py b/python/dask_cudf/dask_cudf/__init__.py index 20eb2404b77..48fe6189f9f 100644 --- a/python/dask_cudf/dask_cudf/__init__.py +++ b/python/dask_cudf/dask_cudf/__init__.py @@ -1,19 +1,21 @@ # Copyright (c) 2018-2024, NVIDIA CORPORATION. -import warnings -from importlib import import_module - import dask.dataframe as dd from dask import config from dask.dataframe import from_delayed import cudf -from . import backends # noqa: F401 +from . import backends, io # noqa: F401 +from ._expr.expr import _patch_dask_expr from ._version import __git_commit__, __version__ # noqa: F401 -from .core import DataFrame, Index, Series, concat, from_cudf +from .core import DataFrame, Index, Series, _deprecated_api, concat, from_cudf -QUERY_PLANNING_ON = dd.DASK_EXPR_ENABLED +if not (QUERY_PLANNING_ON := dd.DASK_EXPR_ENABLED): + raise ValueError( + "The legacy DataFrame API is not supported in dask_cudf>24.12. " + "Please enable query-planning, or downgrade to dask_cudf<=24.12" + ) def read_csv(*args, **kwargs): @@ -36,46 +38,18 @@ def read_parquet(*args, **kwargs): return dd.read_parquet(*args, **kwargs) -def _deprecated_api(old_api, new_api=None, rec=None): - def inner_func(*args, **kwargs): - if new_api: - # Use alternative - msg = f"{old_api} is now deprecated. " - msg += rec or f"Please use {new_api} instead." - warnings.warn(msg, FutureWarning) - new_attr = new_api.split(".") - module = import_module(".".join(new_attr[:-1])) - return getattr(module, new_attr[-1])(*args, **kwargs) - - # No alternative - raise an error - raise NotImplementedError( - f"{old_api} is no longer supported. " + (rec or "") - ) - - return inner_func - - -if QUERY_PLANNING_ON: - from . import io - from ._expr.expr import _patch_dask_expr - - groupby_agg = _deprecated_api("dask_cudf.groupby_agg") - read_text = DataFrame.read_text - _patch_dask_expr() - -else: - from . import io # noqa: F401 - from ._legacy.groupby import groupby_agg # noqa: F401 - from ._legacy.io import read_text # noqa: F401 - - +groupby_agg = _deprecated_api("dask_cudf.groupby_agg") +read_text = DataFrame.read_text to_orc = _deprecated_api( "dask_cudf.to_orc", - new_api="dask_cudf._legacy.io.to_orc", + new_api="dask_cudf.io.to_orc", rec="Please use DataFrame.to_orc instead.", ) +_patch_dask_expr() + + __all__ = [ "DataFrame", "Index", diff --git a/python/dask_cudf/dask_cudf/_expr/collection.py b/python/dask_cudf/dask_cudf/_expr/collection.py index 5192e6b8171..6c8522711e2 100644 --- a/python/dask_cudf/dask_cudf/_expr/collection.py +++ b/python/dask_cudf/dask_cudf/_expr/collection.py @@ -19,15 +19,6 @@ import cudf -_LEGACY_WORKAROUND = ( - "To enable the 'legacy' dask-cudf API, set the " - "global 'dataframe.query-planning' config to " - "`False` before dask is imported. This can also " - "be done by setting an environment variable: " - "`DASK_DATAFRAME__QUERY_PLANNING=False` " -) - - ## ## Custom collection classes ## @@ -103,9 +94,8 @@ def set_index( divisions = None warnings.warn( "Ignoring divisions='quantile'. This option is now " - "deprecated. Please use the legacy API and raise an " - "issue on github if this feature is necessary." - f"\n{_LEGACY_WORKAROUND}", + "deprecated. Please raise an issue on github if this " + "feature is necessary.", FutureWarning, ) @@ -135,9 +125,7 @@ def groupby( if kwargs.pop("as_index") is not True: raise NotImplementedError( - f"{msg} Please reset the index after aggregating, or " - "use the legacy API if `as_index=False` is required.\n" - f"{_LEGACY_WORKAROUND}" + f"{msg} Please reset the index after aggregating." ) else: warnings.warn(msg, FutureWarning) @@ -153,15 +141,15 @@ def groupby( ) def to_orc(self, *args, **kwargs): - from dask_cudf._legacy.io import to_orc + from dask_cudf.io.orc import to_orc as to_orc_impl - return to_orc(self, *args, **kwargs) + return to_orc_impl(self, *args, **kwargs) @staticmethod def read_text(*args, **kwargs): - from dask_cudf._legacy.io.text import read_text as legacy_read_text + from dask_cudf.io.text import read_text as read_text_impl - return legacy_read_text(*args, **kwargs) + return read_text_impl(*args, **kwargs) def clip(self, lower=None, upper=None, axis=1): if axis not in (None, 1): diff --git a/python/dask_cudf/dask_cudf/_expr/groupby.py b/python/dask_cudf/dask_cudf/_expr/groupby.py index 0242fac6e72..f1def40e442 100644 --- a/python/dask_cudf/dask_cudf/_expr/groupby.py +++ b/python/dask_cudf/dask_cudf/_expr/groupby.py @@ -1,6 +1,7 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import functools +import numpy as np import pandas as pd from dask_expr._collection import new_collection from dask_expr._groupby import ( @@ -16,11 +17,262 @@ from dask.dataframe.groupby import Aggregation from cudf.core.groupby.groupby import _deprecate_collect +from cudf.utils.performance_tracking import _dask_cudf_performance_tracking ## ## Fused groupby aggregations ## +OPTIMIZED_AGGS = ( + "count", + "mean", + "std", + "var", + "sum", + "min", + "max", + list, + "first", + "last", +) + + +def _make_name(col_name, sep="_"): + """Combine elements of `col_name` into a single string, or no-op if + `col_name` is already a string + """ + if isinstance(col_name, str): + return col_name + return sep.join(name for name in col_name if name != "") + + +@_dask_cudf_performance_tracking +def _groupby_partition_agg(df, gb_cols, aggs, columns, dropna, sort, sep): + """Initial partition-level aggregation task. + + This is the first operation to be executed on each input + partition in `groupby_agg`. Depending on `aggs`, four possible + groupby aggregations ("count", "sum", "min", and "max") are + performed. The result is then partitioned (by hashing `gb_cols`) + into a number of distinct dictionary elements. The number of + elements in the output dictionary (`split_out`) corresponds to + the number of partitions in the final output of `groupby_agg`. + """ + + # Modify dict for initial (partition-wise) aggregations + _agg_dict = {} + for col, agg_list in aggs.items(): + _agg_dict[col] = set() + for agg in agg_list: + if agg in ("mean", "std", "var"): + _agg_dict[col].add("count") + _agg_dict[col].add("sum") + else: + _agg_dict[col].add(agg) + _agg_dict[col] = list(_agg_dict[col]) + if set(agg_list).intersection({"std", "var"}): + pow2_name = _make_name((col, "pow2"), sep=sep) + df[pow2_name] = df[col].astype("float64").pow(2) + _agg_dict[pow2_name] = ["sum"] + + gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg( + _agg_dict + ) + output_columns = [_make_name(name, sep=sep) for name in gb.columns] + gb.columns = output_columns + # Return with deterministic column ordering + return gb[sorted(output_columns)] + + +@_dask_cudf_performance_tracking +def _tree_node_agg(df, gb_cols, dropna, sort, sep): + """Node in groupby-aggregation reduction tree. + + The input DataFrame (`df`) corresponds to the + concatenated output of one or more `_groupby_partition_agg` + tasks. In this function, "sum", "min" and/or "max" groupby + aggregations will be used to combine the statistics for + duplicate keys. + """ + + agg_dict = {} + for col in df.columns: + if col in gb_cols: + continue + agg = col.split(sep)[-1] + if agg in ("count", "sum"): + agg_dict[col] = ["sum"] + elif agg == "list": + agg_dict[col] = [list] + elif agg in OPTIMIZED_AGGS: + agg_dict[col] = [agg] + else: + raise ValueError(f"Unexpected aggregation: {agg}") + + gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg( + agg_dict + ) + + # Don't include the last aggregation in the column names + output_columns = [ + _make_name(name[:-1] if isinstance(name, tuple) else name, sep=sep) + for name in gb.columns + ] + gb.columns = output_columns + # Return with deterministic column ordering + return gb[sorted(output_columns)] + + +@_dask_cudf_performance_tracking +def _var_agg(df, col, count_name, sum_name, pow2_sum_name, ddof=1): + """Calculate variance (given count, sum, and sum-squared columns).""" + + # Select count, sum, and sum-squared + n = df[count_name] + x = df[sum_name] + x2 = df[pow2_sum_name] + + # Use sum-squared approach to get variance + var = x2 - x**2 / n + div = n - ddof + div[div < 1] = 1 # Avoid division by 0 + var /= div + + # Set appropriate NaN elements + # (since we avoided 0-division) + var[(n - ddof) == 0] = np.nan + + return var + + +@_dask_cudf_performance_tracking +def _finalize_gb_agg( + gb_in, + gb_cols, + aggs, + columns, + final_columns, + as_index, + dropna, + sort, + sep, + str_cols_out, + aggs_renames, +): + """Final aggregation task. + + This is the final operation on each output partitions + of the `groupby_agg` algorithm. This function must + take care of higher-order aggregations, like "mean", + "std" and "var". We also need to deal with the column + index, the row index, and final sorting behavior. + """ + + gb = _tree_node_agg(gb_in, gb_cols, dropna, sort, sep) + + # Deal with higher-order aggregations + for col in columns: + agg_list = aggs.get(col, []) + agg_set = set(agg_list) + if agg_set.intersection({"mean", "std", "var"}): + count_name = _make_name((col, "count"), sep=sep) + sum_name = _make_name((col, "sum"), sep=sep) + if agg_set.intersection({"std", "var"}): + pow2_sum_name = _make_name((col, "pow2", "sum"), sep=sep) + var = _var_agg(gb, col, count_name, sum_name, pow2_sum_name) + if "var" in agg_list: + name_var = _make_name((col, "var"), sep=sep) + gb[name_var] = var + if "std" in agg_list: + name_std = _make_name((col, "std"), sep=sep) + gb[name_std] = np.sqrt(var) + gb.drop(columns=[pow2_sum_name], inplace=True) + if "mean" in agg_list: + mean_name = _make_name((col, "mean"), sep=sep) + gb[mean_name] = gb[sum_name] / gb[count_name] + if "sum" not in agg_list: + gb.drop(columns=[sum_name], inplace=True) + if "count" not in agg_list: + gb.drop(columns=[count_name], inplace=True) + if list in agg_list: + collect_name = _make_name((col, "list"), sep=sep) + gb[collect_name] = gb[collect_name].list.concat() + + # Ensure sorted keys if `sort=True` + if sort: + gb = gb.sort_values(gb_cols) + + # Set index if necessary + if as_index: + gb.set_index(gb_cols, inplace=True) + + # Unflatten column names + col_array = [] + agg_array = [] + for col in gb.columns: + if col in gb_cols: + col_array.append(col) + agg_array.append("") + else: + name, agg = col.split(sep) + col_array.append(name) + agg_array.append(aggs_renames.get((name, agg), agg)) + if str_cols_out: + gb.columns = col_array + else: + gb.columns = pd.MultiIndex.from_arrays([col_array, agg_array]) + + return gb[final_columns] + + +@_dask_cudf_performance_tracking +def _redirect_aggs(arg): + """Redirect aggregations to their corresponding name in cuDF""" + redirects = { + sum: "sum", + max: "max", + min: "min", + "collect": list, + "list": list, + } + if isinstance(arg, dict): + new_arg = dict() + for col in arg: + if isinstance(arg[col], list): + new_arg[col] = [redirects.get(agg, agg) for agg in arg[col]] + elif isinstance(arg[col], dict): + new_arg[col] = { + k: redirects.get(v, v) for k, v in arg[col].items() + } + else: + new_arg[col] = redirects.get(arg[col], arg[col]) + return new_arg + if isinstance(arg, list): + return [redirects.get(agg, agg) for agg in arg] + return redirects.get(arg, arg) + + +@_dask_cudf_performance_tracking +def _aggs_optimized(arg, supported: set): + """Check that aggregations in `arg` are a subset of `supported`""" + if isinstance(arg, (list, dict)): + if isinstance(arg, dict): + _global_set: set[str] = set() + for col in arg: + if isinstance(arg[col], list): + _global_set = _global_set.union(set(arg[col])) + elif isinstance(arg[col], dict): + _global_set = _global_set.union(set(arg[col].values())) + else: + _global_set.add(arg[col]) + else: + _global_set = set(arg) + + return bool(_global_set.issubset(supported)) + elif isinstance(arg, (str, type)): + return arg in supported + return False + def _get_spec_info(gb): if isinstance(gb.arg, (dict, list)): @@ -105,20 +357,14 @@ def shuffle_by_index(self): @classmethod def chunk(cls, df, *by, **kwargs): - from dask_cudf._legacy.groupby import _groupby_partition_agg - return _groupby_partition_agg(df, **kwargs) @classmethod def combine(cls, inputs, **kwargs): - from dask_cudf._legacy.groupby import _tree_node_agg - return _tree_node_agg(_concat(inputs), **kwargs) @classmethod def aggregate(cls, inputs, **kwargs): - from dask_cudf._legacy.groupby import _finalize_gb_agg - return _finalize_gb_agg(_concat(inputs), **kwargs) @property @@ -193,12 +439,6 @@ def _maybe_get_custom_expr( shuffle_method=None, **kwargs, ): - from dask_cudf._legacy.groupby import ( - OPTIMIZED_AGGS, - _aggs_optimized, - _redirect_aggs, - ) - if kwargs: # Unsupported key-word arguments return None diff --git a/python/dask_cudf/dask_cudf/_legacy/core.py b/python/dask_cudf/dask_cudf/_legacy/core.py deleted file mode 100644 index d6beb775a5e..00000000000 --- a/python/dask_cudf/dask_cudf/_legacy/core.py +++ /dev/null @@ -1,711 +0,0 @@ -# Copyright (c) 2018-2024, NVIDIA CORPORATION. - -import math -import warnings - -import numpy as np -import pandas as pd -from tlz import partition_all - -from dask import dataframe as dd -from dask.base import normalize_token, tokenize -from dask.dataframe.core import ( - Scalar, - handle_out, - make_meta as dask_make_meta, - map_partitions, -) -from dask.dataframe.utils import raise_on_meta_error -from dask.highlevelgraph import HighLevelGraph -from dask.utils import M, OperatorMethodMixin, apply, derived_from, funcname - -import cudf -from cudf import _lib as libcudf -from cudf.utils.performance_tracking import _dask_cudf_performance_tracking - -from dask_cudf._expr.accessors import ListMethods, StructMethods -from dask_cudf._legacy import sorting -from dask_cudf._legacy.sorting import ( - _deprecate_shuffle_kwarg, - _get_shuffle_method, -) - - -class _Frame(dd.core._Frame, OperatorMethodMixin): - """Superclass for DataFrame and Series - - Parameters - ---------- - dsk : dict - The dask graph to compute this DataFrame - name : str - The key prefix that specifies which keys in the dask comprise this - particular DataFrame / Series - meta : cudf.DataFrame, cudf.Series, or cudf.Index - An empty cudf object with names, dtypes, and indices matching the - expected output. - divisions : tuple of index values - Values along which we partition our blocks on the index - """ - - def _is_partition_type(self, meta): - return isinstance(meta, self._partition_type) - - def __repr__(self): - s = "" - return s % (type(self).__name__, len(self.dask), self.npartitions) - - -normalize_token.register(_Frame, lambda a: a._name) - - -class DataFrame(_Frame, dd.core.DataFrame): - """ - A distributed Dask DataFrame where the backing dataframe is a - :class:`cuDF DataFrame `. - - Typically you would not construct this object directly, but rather - use one of Dask-cuDF's IO routines. - - Most operations on :doc:`Dask DataFrames ` are - supported, with many of the same caveats. - - """ - - _partition_type = cudf.DataFrame - - @_dask_cudf_performance_tracking - def _assign_column(self, k, v): - def assigner(df, k, v): - out = df.copy() - out[k] = v - return out - - meta = assigner(self._meta, k, dask_make_meta(v)) - return self.map_partitions(assigner, k, v, meta=meta) - - @_dask_cudf_performance_tracking - def apply_rows(self, func, incols, outcols, kwargs=None, cache_key=None): - import uuid - - if kwargs is None: - kwargs = {} - - if cache_key is None: - cache_key = uuid.uuid4() - - def do_apply_rows(df, func, incols, outcols, kwargs): - return df.apply_rows( - func, incols, outcols, kwargs, cache_key=cache_key - ) - - meta = do_apply_rows(self._meta, func, incols, outcols, kwargs) - return self.map_partitions( - do_apply_rows, func, incols, outcols, kwargs, meta=meta - ) - - @_deprecate_shuffle_kwarg - @_dask_cudf_performance_tracking - def merge(self, other, shuffle_method=None, **kwargs): - on = kwargs.pop("on", None) - if isinstance(on, tuple): - on = list(on) - return super().merge( - other, - on=on, - shuffle_method=_get_shuffle_method(shuffle_method), - **kwargs, - ) - - @_deprecate_shuffle_kwarg - @_dask_cudf_performance_tracking - def join(self, other, shuffle_method=None, **kwargs): - # CuDF doesn't support "right" join yet - how = kwargs.pop("how", "left") - if how == "right": - return other.join(other=self, how="left", **kwargs) - - on = kwargs.pop("on", None) - if isinstance(on, tuple): - on = list(on) - return super().join( - other, - how=how, - on=on, - shuffle_method=_get_shuffle_method(shuffle_method), - **kwargs, - ) - - @_deprecate_shuffle_kwarg - @_dask_cudf_performance_tracking - def set_index( - self, - other, - sorted=False, - divisions=None, - shuffle_method=None, - **kwargs, - ): - pre_sorted = sorted - del sorted - - if divisions == "quantile": - warnings.warn( - "Using divisions='quantile' is now deprecated. " - "Please raise an issue on github if you believe " - "this feature is necessary.", - FutureWarning, - ) - - if ( - divisions == "quantile" - or isinstance(divisions, (cudf.DataFrame, cudf.Series)) - or ( - isinstance(other, str) - and cudf.api.types.is_string_dtype(self[other].dtype) - ) - ): - # Let upstream-dask handle "pre-sorted" case - if pre_sorted: - return dd.shuffle.set_sorted_index( - self, other, divisions=divisions, **kwargs - ) - - by = other - if not isinstance(other, list): - by = [by] - if len(by) > 1: - raise ValueError("Dask does not support MultiIndex (yet).") - if divisions == "quantile": - divisions = None - - # Use dask_cudf's sort_values - df = self.sort_values( - by, - max_branch=kwargs.get("max_branch", None), - divisions=divisions, - set_divisions=True, - ignore_index=True, - shuffle_method=shuffle_method, - ) - - # Ignore divisions if its a dataframe - if isinstance(divisions, cudf.DataFrame): - divisions = None - - # Set index and repartition - df2 = df.map_partitions( - sorting.set_index_post, - index_name=other, - drop=kwargs.get("drop", True), - column_dtype=df.columns.dtype, - ) - npartitions = kwargs.get("npartitions", self.npartitions) - partition_size = kwargs.get("partition_size", None) - if partition_size: - return df2.repartition(partition_size=partition_size) - if not divisions and df2.npartitions != npartitions: - return df2.repartition(npartitions=npartitions) - if divisions and df2.npartitions != len(divisions) - 1: - return df2.repartition(divisions=divisions) - return df2 - - return super().set_index( - other, - sorted=pre_sorted, - shuffle_method=_get_shuffle_method(shuffle_method), - divisions=divisions, - **kwargs, - ) - - @_deprecate_shuffle_kwarg - @_dask_cudf_performance_tracking - def sort_values( - self, - by, - ignore_index=False, - max_branch=None, - divisions=None, - set_divisions=False, - ascending=True, - na_position="last", - sort_function=None, - sort_function_kwargs=None, - shuffle_method=None, - **kwargs, - ): - if kwargs: - raise ValueError( - f"Unsupported input arguments passed : {list(kwargs.keys())}" - ) - - df = sorting.sort_values( - self, - by, - max_branch=max_branch, - divisions=divisions, - set_divisions=set_divisions, - ignore_index=ignore_index, - ascending=ascending, - na_position=na_position, - shuffle_method=shuffle_method, - sort_function=sort_function, - sort_function_kwargs=sort_function_kwargs, - ) - - if ignore_index: - return df.reset_index(drop=True) - return df - - @_dask_cudf_performance_tracking - def to_parquet(self, path, *args, **kwargs): - """Calls dask.dataframe.io.to_parquet with CudfEngine backend""" - from dask_cudf._legacy.io import to_parquet - - return to_parquet(self, path, *args, **kwargs) - - @_dask_cudf_performance_tracking - def to_orc(self, path, **kwargs): - """Calls dask_cudf._legacy.io.to_orc""" - from dask_cudf._legacy.io import to_orc - - return to_orc(self, path, **kwargs) - - @derived_from(pd.DataFrame) - @_dask_cudf_performance_tracking - def var( - self, - axis=None, - skipna=True, - ddof=1, - split_every=False, - dtype=None, - out=None, - naive=False, - numeric_only=False, - ): - axis = self._validate_axis(axis) - meta = self._meta_nonempty.var( - axis=axis, skipna=skipna, numeric_only=numeric_only - ) - if axis == 1: - result = map_partitions( - M.var, - self, - meta=meta, - token=self._token_prefix + "var", - axis=axis, - skipna=skipna, - ddof=ddof, - numeric_only=numeric_only, - ) - return handle_out(out, result) - elif naive: - return _naive_var(self, meta, skipna, ddof, split_every, out) - else: - return _parallel_var(self, meta, skipna, split_every, out) - - @_deprecate_shuffle_kwarg - @_dask_cudf_performance_tracking - def shuffle(self, *args, shuffle_method=None, **kwargs): - """Wraps dask.dataframe DataFrame.shuffle method""" - return super().shuffle( - *args, shuffle_method=_get_shuffle_method(shuffle_method), **kwargs - ) - - @_dask_cudf_performance_tracking - def groupby(self, by=None, **kwargs): - from .groupby import CudfDataFrameGroupBy - - return CudfDataFrameGroupBy(self, by=by, **kwargs) - - -@_dask_cudf_performance_tracking -def sum_of_squares(x): - x = x.astype("f8")._column - outcol = libcudf.reduce.reduce("sum_of_squares", x) - return cudf.Series._from_column(outcol) - - -@_dask_cudf_performance_tracking -def var_aggregate(x2, x, n, ddof): - try: - with warnings.catch_warnings(record=True): - warnings.simplefilter("always") - result = (x2 / n) - (x / n) ** 2 - if ddof != 0: - result = result * n / (n - ddof) - return result - except ZeroDivisionError: - return np.float64(np.nan) - - -@_dask_cudf_performance_tracking -def nlargest_agg(x, **kwargs): - return cudf.concat(x).nlargest(**kwargs) - - -@_dask_cudf_performance_tracking -def nsmallest_agg(x, **kwargs): - return cudf.concat(x).nsmallest(**kwargs) - - -class Series(_Frame, dd.core.Series): - _partition_type = cudf.Series - - @_dask_cudf_performance_tracking - def count(self, split_every=False): - return reduction( - [self], - chunk=M.count, - aggregate=np.sum, - split_every=split_every, - meta="i8", - ) - - @_dask_cudf_performance_tracking - def mean(self, split_every=False): - sum = self.sum(split_every=split_every) - n = self.count(split_every=split_every) - return sum / n - - @derived_from(pd.DataFrame) - @_dask_cudf_performance_tracking - def var( - self, - axis=None, - skipna=True, - ddof=1, - split_every=False, - dtype=None, - out=None, - naive=False, - ): - axis = self._validate_axis(axis) - meta = self._meta_nonempty.var(axis=axis, skipna=skipna) - if axis == 1: - result = map_partitions( - M.var, - self, - meta=meta, - token=self._token_prefix + "var", - axis=axis, - skipna=skipna, - ddof=ddof, - ) - return handle_out(out, result) - elif naive: - return _naive_var(self, meta, skipna, ddof, split_every, out) - else: - return _parallel_var(self, meta, skipna, split_every, out) - - @_dask_cudf_performance_tracking - def groupby(self, *args, **kwargs): - from .groupby import CudfSeriesGroupBy - - return CudfSeriesGroupBy(self, *args, **kwargs) - - @property # type: ignore - @_dask_cudf_performance_tracking - def list(self): - return ListMethods(self) - - @property # type: ignore - @_dask_cudf_performance_tracking - def struct(self): - return StructMethods(self) - - -class Index(Series, dd.core.Index): - _partition_type = cudf.Index # type: ignore - - -@_dask_cudf_performance_tracking -def _naive_var(ddf, meta, skipna, ddof, split_every, out): - num = ddf._get_numeric_data() - x = 1.0 * num.sum(skipna=skipna, split_every=split_every) - x2 = 1.0 * (num**2).sum(skipna=skipna, split_every=split_every) - n = num.count(split_every=split_every) - name = ddf._token_prefix + "var" - result = map_partitions( - var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof - ) - if isinstance(ddf, DataFrame): - result.divisions = (min(ddf.columns), max(ddf.columns)) - return handle_out(out, result) - - -@_dask_cudf_performance_tracking -def _parallel_var(ddf, meta, skipna, split_every, out): - def _local_var(x, skipna): - if skipna: - n = x.count() - avg = x.mean(skipna=skipna) - else: - # Not skipping nulls, so might as well - # avoid the full `count` operation - n = len(x) - avg = x.sum(skipna=skipna) / n - m2 = ((x - avg) ** 2).sum(skipna=skipna) - return n, avg, m2 - - def _aggregate_var(parts): - n, avg, m2 = parts[0] - for i in range(1, len(parts)): - n_a, avg_a, m2_a = n, avg, m2 - n_b, avg_b, m2_b = parts[i] - n = n_a + n_b - avg = (n_a * avg_a + n_b * avg_b) / n - delta = avg_b - avg_a - m2 = m2_a + m2_b + delta**2 * n_a * n_b / n - return n, avg, m2 - - def _finalize_var(vals): - n, _, m2 = vals - return m2 / (n - 1) - - # Build graph - nparts = ddf.npartitions - if not split_every: - split_every = nparts - name = "var-" + tokenize(skipna, split_every, out) - local_name = "local-" + name - num = ddf._get_numeric_data() - dsk = { - (local_name, n, 0): (_local_var, (num._name, n), skipna) - for n in range(nparts) - } - - # Use reduction tree - widths = [nparts] - while nparts > 1: - nparts = math.ceil(nparts / split_every) - widths.append(nparts) - height = len(widths) - for depth in range(1, height): - for group in range(widths[depth]): - p_max = widths[depth - 1] - lstart = split_every * group - lstop = min(lstart + split_every, p_max) - node_list = [ - (local_name, p, depth - 1) for p in range(lstart, lstop) - ] - dsk[(local_name, group, depth)] = (_aggregate_var, node_list) - if height == 1: - group = depth = 0 - dsk[(name, 0)] = (_finalize_var, (local_name, group, depth)) - - graph = HighLevelGraph.from_collections(name, dsk, dependencies=[num, ddf]) - result = dd.core.new_dd_object(graph, name, meta, (None, None)) - if isinstance(ddf, DataFrame): - result.divisions = (min(ddf.columns), max(ddf.columns)) - return handle_out(out, result) - - -@_dask_cudf_performance_tracking -def _extract_meta(x): - """ - Extract internal cache data (``_meta``) from dask_cudf objects - """ - if isinstance(x, (Scalar, _Frame)): - return x._meta - elif isinstance(x, list): - return [_extract_meta(_x) for _x in x] - elif isinstance(x, tuple): - return tuple(_extract_meta(_x) for _x in x) - elif isinstance(x, dict): - return {k: _extract_meta(v) for k, v in x.items()} - return x - - -@_dask_cudf_performance_tracking -def _emulate(func, *args, **kwargs): - """ - Apply a function using args / kwargs. If arguments contain dd.DataFrame / - dd.Series, using internal cache (``_meta``) for calculation - """ - with raise_on_meta_error(funcname(func)): - return func(*_extract_meta(args), **_extract_meta(kwargs)) - - -@_dask_cudf_performance_tracking -def align_partitions(args): - """Align partitions between dask_cudf objects. - - Note that if all divisions are unknown, but have equal npartitions, then - they will be passed through unchanged. - """ - dfs = [df for df in args if isinstance(df, _Frame)] - if not dfs: - return args - - divisions = dfs[0].divisions - if not all(df.divisions == divisions for df in dfs): - raise NotImplementedError("Aligning mismatched partitions") - return args - - -@_dask_cudf_performance_tracking -def reduction( - args, - chunk=None, - aggregate=None, - combine=None, - meta=None, - token=None, - chunk_kwargs=None, - aggregate_kwargs=None, - combine_kwargs=None, - split_every=None, - **kwargs, -): - """Generic tree reduction operation. - - Parameters - ---------- - args : - Positional arguments for the `chunk` function. All `dask.dataframe` - objects should be partitioned and indexed equivalently. - chunk : function [block-per-arg] -> block - Function to operate on each block of data - aggregate : function list-of-blocks -> block - Function to operate on the list of results of chunk - combine : function list-of-blocks -> block, optional - Function to operate on intermediate lists of results of chunk - in a tree-reduction. If not provided, defaults to aggregate. - $META - token : str, optional - The name to use for the output keys. - chunk_kwargs : dict, optional - Keywords for the chunk function only. - aggregate_kwargs : dict, optional - Keywords for the aggregate function only. - combine_kwargs : dict, optional - Keywords for the combine function only. - split_every : int, optional - Group partitions into groups of this size while performing a - tree-reduction. If set to False, no tree-reduction will be used, - and all intermediates will be concatenated and passed to ``aggregate``. - Default is 8. - kwargs : - All remaining keywords will be passed to ``chunk``, ``aggregate``, and - ``combine``. - """ - if chunk_kwargs is None: - chunk_kwargs = dict() - if aggregate_kwargs is None: - aggregate_kwargs = dict() - chunk_kwargs.update(kwargs) - aggregate_kwargs.update(kwargs) - - if combine is None: - if combine_kwargs: - raise ValueError("`combine_kwargs` provided with no `combine`") - combine = aggregate - combine_kwargs = aggregate_kwargs - else: - if combine_kwargs is None: - combine_kwargs = dict() - combine_kwargs.update(kwargs) - - if not isinstance(args, (tuple, list)): - args = [args] - - npartitions = {arg.npartitions for arg in args if isinstance(arg, _Frame)} - if len(npartitions) > 1: - raise ValueError("All arguments must have same number of partitions") - npartitions = npartitions.pop() - - if split_every is None: - split_every = 8 - elif split_every is False: - split_every = npartitions - elif split_every < 2 or not isinstance(split_every, int): - raise ValueError("split_every must be an integer >= 2") - - token_key = tokenize( - token or (chunk, aggregate), - meta, - args, - chunk_kwargs, - aggregate_kwargs, - combine_kwargs, - split_every, - ) - - # Chunk - a = f"{token or funcname(chunk)}-chunk-{token_key}" - if len(args) == 1 and isinstance(args[0], _Frame) and not chunk_kwargs: - dsk = { - (a, 0, i): (chunk, key) - for i, key in enumerate(args[0].__dask_keys__()) - } - else: - dsk = { - (a, 0, i): ( - apply, - chunk, - [(x._name, i) if isinstance(x, _Frame) else x for x in args], - chunk_kwargs, - ) - for i in range(args[0].npartitions) - } - - # Combine - b = f"{token or funcname(combine)}-combine-{token_key}" - k = npartitions - depth = 0 - while k > split_every: - for part_i, inds in enumerate(partition_all(split_every, range(k))): - conc = (list, [(a, depth, i) for i in inds]) - dsk[(b, depth + 1, part_i)] = ( - (apply, combine, [conc], combine_kwargs) - if combine_kwargs - else (combine, conc) - ) - k = part_i + 1 - a = b - depth += 1 - - # Aggregate - b = f"{token or funcname(aggregate)}-agg-{token_key}" - conc = (list, [(a, depth, i) for i in range(k)]) - if aggregate_kwargs: - dsk[(b, 0)] = (apply, aggregate, [conc], aggregate_kwargs) - else: - dsk[(b, 0)] = (aggregate, conc) - - if meta is None: - meta_chunk = _emulate(apply, chunk, args, chunk_kwargs) - meta = _emulate(apply, aggregate, [[meta_chunk]], aggregate_kwargs) - meta = dask_make_meta(meta) - - graph = HighLevelGraph.from_collections(b, dsk, dependencies=args) - return dd.core.new_dd_object(graph, b, meta, (None, None)) - - -for name in ( - "add", - "sub", - "mul", - "truediv", - "floordiv", - "mod", - "pow", - "radd", - "rsub", - "rmul", - "rtruediv", - "rfloordiv", - "rmod", - "rpow", -): - meth = getattr(cudf.DataFrame, name) - DataFrame._bind_operator_method(name, meth, original=cudf.Series) - - meth = getattr(cudf.Series, name) - Series._bind_operator_method(name, meth, original=cudf.Series) - -for name in ("lt", "gt", "le", "ge", "ne", "eq"): - meth = getattr(cudf.Series, name) - Series._bind_comparison_method(name, meth, original=cudf.Series) diff --git a/python/dask_cudf/dask_cudf/_legacy/groupby.py b/python/dask_cudf/dask_cudf/_legacy/groupby.py deleted file mode 100644 index 7e01e91476d..00000000000 --- a/python/dask_cudf/dask_cudf/_legacy/groupby.py +++ /dev/null @@ -1,909 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. -from __future__ import annotations - -from functools import wraps - -import numpy as np -import pandas as pd - -from dask.dataframe.core import ( - DataFrame as DaskDataFrame, - aca, - split_out_on_cols, -) -from dask.dataframe.groupby import DataFrameGroupBy, SeriesGroupBy -from dask.utils import funcname - -import cudf -from cudf.core.groupby.groupby import _deprecate_collect -from cudf.utils.performance_tracking import _dask_cudf_performance_tracking - -from dask_cudf._legacy.sorting import _deprecate_shuffle_kwarg - -# aggregations that are dask-cudf optimized -OPTIMIZED_AGGS = ( - "count", - "mean", - "std", - "var", - "sum", - "min", - "max", - list, - "first", - "last", -) - - -def _check_groupby_optimized(func): - """ - Decorator for dask-cudf's groupby methods that returns the dask-cudf - optimized method if the groupby object is supported, otherwise - reverting to the upstream Dask method - """ - - @wraps(func) - def wrapper(*args, **kwargs): - gb = args[0] - if _groupby_optimized(gb): - return func(*args, **kwargs) - # note that we use upstream Dask's default kwargs for this call if - # none are specified; this shouldn't be an issue as those defaults are - # consistent with dask-cudf - return getattr(super(type(gb), gb), func.__name__)(*args[1:], **kwargs) - - return wrapper - - -class CudfDataFrameGroupBy(DataFrameGroupBy): - @_dask_cudf_performance_tracking - def __init__(self, *args, sort=None, **kwargs): - self.sep = kwargs.pop("sep", "___") - self.as_index = kwargs.pop("as_index", True) - super().__init__(*args, sort=sort, **kwargs) - - @_dask_cudf_performance_tracking - def __getitem__(self, key): - if isinstance(key, list): - g = CudfDataFrameGroupBy( - self.obj, - by=self.by, - slice=key, - sort=self.sort, - **self.dropna, - ) - else: - g = CudfSeriesGroupBy( - self.obj, - by=self.by, - slice=key, - sort=self.sort, - **self.dropna, - ) - - g._meta = g._meta[key] - return g - - @_dask_cudf_performance_tracking - def _make_groupby_method_aggs(self, agg_name): - """Create aggs dictionary for aggregation methods""" - - if isinstance(self.by, list): - return {c: agg_name for c in self.obj.columns if c not in self.by} - return {c: agg_name for c in self.obj.columns if c != self.by} - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def count(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs("count"), - split_every, - split_out, - ) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def mean(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs("mean"), - split_every, - split_out, - ) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def std(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs("std"), - split_every, - split_out, - ) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def var(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs("var"), - split_every, - split_out, - ) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def sum(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs("sum"), - split_every, - split_out, - ) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def min(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs("min"), - split_every, - split_out, - ) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def max(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs("max"), - split_every, - split_out, - ) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def collect(self, split_every=None, split_out=1): - _deprecate_collect() - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs(list), - split_every, - split_out, - ) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def first(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs("first"), - split_every, - split_out, - ) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def last(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - self._make_groupby_method_aggs("last"), - split_every, - split_out, - ) - - @_deprecate_shuffle_kwarg - @_dask_cudf_performance_tracking - def aggregate( - self, arg, split_every=None, split_out=1, shuffle_method=None - ): - if arg == "size": - return self.size() - - arg = _redirect_aggs(arg) - - if _groupby_optimized(self) and _aggs_optimized(arg, OPTIMIZED_AGGS): - if isinstance(self._meta.grouping.keys, cudf.MultiIndex): - keys = self._meta.grouping.keys.names - else: - keys = self._meta.grouping.keys.name - - return groupby_agg( - self.obj, - keys, - arg, - split_every=split_every, - split_out=split_out, - sep=self.sep, - sort=self.sort, - as_index=self.as_index, - shuffle_method=shuffle_method, - **self.dropna, - ) - - return super().aggregate( - arg, - split_every=split_every, - split_out=split_out, - shuffle_method=shuffle_method, - ) - - -class CudfSeriesGroupBy(SeriesGroupBy): - @_dask_cudf_performance_tracking - def __init__(self, *args, sort=None, **kwargs): - self.sep = kwargs.pop("sep", "___") - self.as_index = kwargs.pop("as_index", True) - super().__init__(*args, sort=sort, **kwargs) - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def count(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - {self._slice: "count"}, - split_every, - split_out, - )[self._slice] - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def mean(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - {self._slice: "mean"}, - split_every, - split_out, - )[self._slice] - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def std(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - {self._slice: "std"}, - split_every, - split_out, - )[self._slice] - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def var(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - {self._slice: "var"}, - split_every, - split_out, - )[self._slice] - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def sum(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - {self._slice: "sum"}, - split_every, - split_out, - )[self._slice] - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def min(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - {self._slice: "min"}, - split_every, - split_out, - )[self._slice] - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def max(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - {self._slice: "max"}, - split_every, - split_out, - )[self._slice] - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def collect(self, split_every=None, split_out=1): - _deprecate_collect() - return _make_groupby_agg_call( - self, - {self._slice: list}, - split_every, - split_out, - )[self._slice] - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def first(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - {self._slice: "first"}, - split_every, - split_out, - )[self._slice] - - @_dask_cudf_performance_tracking - @_check_groupby_optimized - def last(self, split_every=None, split_out=1): - return _make_groupby_agg_call( - self, - {self._slice: "last"}, - split_every, - split_out, - )[self._slice] - - @_deprecate_shuffle_kwarg - @_dask_cudf_performance_tracking - def aggregate( - self, arg, split_every=None, split_out=1, shuffle_method=None - ): - if arg == "size": - return self.size() - - arg = _redirect_aggs(arg) - - if not isinstance(arg, dict): - arg = {self._slice: arg} - - if _groupby_optimized(self) and _aggs_optimized(arg, OPTIMIZED_AGGS): - return _make_groupby_agg_call( - self, arg, split_every, split_out, shuffle_method - )[self._slice] - - return super().aggregate( - arg, - split_every=split_every, - split_out=split_out, - shuffle_method=shuffle_method, - ) - - -def _shuffle_aggregate( - ddf, - gb_cols, - chunk, - chunk_kwargs, - aggregate, - aggregate_kwargs, - split_every, - split_out, - token=None, - sort=None, - shuffle_method=None, -): - # Shuffle-based groupby aggregation - # NOTE: This function is the dask_cudf version of - # dask.dataframe.groupby._shuffle_aggregate - - # Step 1 - Chunkwise groupby operation - chunk_name = f"{token or funcname(chunk)}-chunk" - chunked = ddf.map_partitions( - chunk, - meta=chunk(ddf._meta, **chunk_kwargs), - token=chunk_name, - **chunk_kwargs, - ) - - # Step 2 - Perform global sort or shuffle - shuffle_npartitions = max( - chunked.npartitions // split_every, - split_out, - ) - if sort and split_out > 1: - # Sort-based code path - result = ( - chunked.repartition(npartitions=shuffle_npartitions) - .sort_values( - gb_cols, - ignore_index=True, - shuffle_method=shuffle_method, - ) - .map_partitions( - aggregate, - meta=aggregate(chunked._meta, **aggregate_kwargs), - **aggregate_kwargs, - ) - ) - else: - # Hash-based code path - result = chunked.shuffle( - gb_cols, - npartitions=shuffle_npartitions, - ignore_index=True, - shuffle_method=shuffle_method, - ).map_partitions( - aggregate, - meta=aggregate(chunked._meta, **aggregate_kwargs), - **aggregate_kwargs, - ) - - # Step 3 - Repartition and return - if split_out < result.npartitions: - return result.repartition(npartitions=split_out) - return result - - -@_dask_cudf_performance_tracking -def groupby_agg( - ddf, - gb_cols, - aggs_in, - split_every=None, - split_out=None, - dropna=True, - sep="___", - sort=False, - as_index=True, - shuffle_method=None, -): - """Optimized groupby aggregation for Dask-CuDF. - - Parameters - ---------- - ddf : DataFrame - DataFrame object to perform grouping on. - gb_cols : str or list[str] - Column names to group by. - aggs_in : str, list, or dict - Aggregations to perform. - split_every : int (optional) - How to group intermediate aggregates. - dropna : bool - Drop grouping key values corresponding to NA values. - as_index : bool - Currently ignored. - sort : bool - Sort the group keys, better performance is obtained when - not sorting. - shuffle_method : str (optional) - Control how shuffling of the DataFrame is performed. - sep : str - Internal usage. - - - Notes - ----- - This "optimized" approach is more performant than the algorithm in - implemented in :meth:`DataFrame.apply` because it allows the cuDF - backend to perform multiple aggregations at once. - - This aggregation algorithm only supports the following options - - * "list" - * "count" - * "first" - * "last" - * "max" - * "mean" - * "min" - * "std" - * "sum" - * "var" - - - See Also - -------- - DataFrame.groupby : generic groupby of a DataFrame - dask.dataframe.apply_concat_apply : for more description of the - split_every argument. - - """ - # Assert that aggregations are supported - aggs = _redirect_aggs(aggs_in) - if not _aggs_optimized(aggs, OPTIMIZED_AGGS): - raise ValueError( - f"Supported aggs include {OPTIMIZED_AGGS} for groupby_agg API. " - f"Aggregations must be specified with dict or list syntax." - ) - - # If split_every is False, we use an all-to-one reduction - if split_every is False: - split_every = max(ddf.npartitions, 2) - - # Deal with default split_out and split_every params - split_every = split_every or 8 - split_out = split_out or 1 - - # Standardize `gb_cols`, `columns`, and `aggs` - if isinstance(gb_cols, str): - gb_cols = [gb_cols] - columns = [c for c in ddf.columns if c not in gb_cols] - if not isinstance(aggs, dict): - aggs = {col: aggs for col in columns} - - # Assert if our output will have a MultiIndex; this will be the case if - # any value in the `aggs` dict is not a string (i.e. multiple/named - # aggregations per column) - str_cols_out = True - aggs_renames = {} - for col in aggs: - if isinstance(aggs[col], str) or callable(aggs[col]): - aggs[col] = [aggs[col]] - elif isinstance(aggs[col], dict): - str_cols_out = False - col_aggs = [] - for k, v in aggs[col].items(): - aggs_renames[col, v] = k - col_aggs.append(v) - aggs[col] = col_aggs - else: - str_cols_out = False - if col in gb_cols: - columns.append(col) - - # Construct meta - _aggs = aggs.copy() - if str_cols_out: - # Metadata should use `str` for dict values if that is - # what the user originally specified (column names will - # be str, rather than tuples). - for col in aggs: - _aggs[col] = _aggs[col][0] - _meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs) - if aggs_renames: - col_array = [] - agg_array = [] - for col, agg in _meta.columns: - col_array.append(col) - agg_array.append(aggs_renames.get((col, agg), agg)) - _meta.columns = pd.MultiIndex.from_arrays([col_array, agg_array]) - - chunk = _groupby_partition_agg - chunk_kwargs = { - "gb_cols": gb_cols, - "aggs": aggs, - "columns": columns, - "dropna": dropna, - "sort": sort, - "sep": sep, - } - - combine = _tree_node_agg - combine_kwargs = { - "gb_cols": gb_cols, - "dropna": dropna, - "sort": sort, - "sep": sep, - } - - aggregate = _finalize_gb_agg - aggregate_kwargs = { - "gb_cols": gb_cols, - "aggs": aggs, - "columns": columns, - "final_columns": _meta.columns, - "as_index": as_index, - "dropna": dropna, - "sort": sort, - "sep": sep, - "str_cols_out": str_cols_out, - "aggs_renames": aggs_renames, - } - - # Use shuffle_method=True for split_out>1 - if sort and split_out > 1 and shuffle_method is None: - shuffle_method = "tasks" - - # Check if we are using the shuffle-based algorithm - if shuffle_method: - # Shuffle-based aggregation - return _shuffle_aggregate( - ddf, - gb_cols, - chunk, - chunk_kwargs, - aggregate, - aggregate_kwargs, - split_every, - split_out, - token="cudf-aggregate", - sort=sort, - shuffle_method=shuffle_method - if isinstance(shuffle_method, str) - else None, - ) - - # Deal with sort/shuffle defaults - if split_out > 1 and sort: - raise ValueError( - "dask-cudf's groupby algorithm does not yet support " - "`sort=True` when `split_out>1`, unless a shuffle-based " - "algorithm is used. Please use `split_out=1`, group " - "with `sort=False`, or set `shuffle_method=True`." - ) - - # Determine required columns to enable column projection - required_columns = list( - set(gb_cols).union(aggs.keys()).intersection(ddf.columns) - ) - - return aca( - [ddf[required_columns]], - chunk=chunk, - chunk_kwargs=chunk_kwargs, - combine=combine, - combine_kwargs=combine_kwargs, - aggregate=aggregate, - aggregate_kwargs=aggregate_kwargs, - token="cudf-aggregate", - split_every=split_every, - split_out=split_out, - split_out_setup=split_out_on_cols, - split_out_setup_kwargs={"cols": gb_cols}, - sort=sort, - ignore_index=True, - ) - - -@_dask_cudf_performance_tracking -def _make_groupby_agg_call( - gb, aggs, split_every, split_out, shuffle_method=None -): - """Helper method to consolidate the common `groupby_agg` call for all - aggregations in one place - """ - - return groupby_agg( - gb.obj, - gb.by, - aggs, - split_every=split_every, - split_out=split_out, - sep=gb.sep, - sort=gb.sort, - as_index=gb.as_index, - shuffle_method=shuffle_method, - **gb.dropna, - ) - - -@_dask_cudf_performance_tracking -def _redirect_aggs(arg): - """Redirect aggregations to their corresponding name in cuDF""" - redirects = { - sum: "sum", - max: "max", - min: "min", - "collect": list, - "list": list, - } - if isinstance(arg, dict): - new_arg = dict() - for col in arg: - if isinstance(arg[col], list): - new_arg[col] = [redirects.get(agg, agg) for agg in arg[col]] - elif isinstance(arg[col], dict): - new_arg[col] = { - k: redirects.get(v, v) for k, v in arg[col].items() - } - else: - new_arg[col] = redirects.get(arg[col], arg[col]) - return new_arg - if isinstance(arg, list): - return [redirects.get(agg, agg) for agg in arg] - return redirects.get(arg, arg) - - -@_dask_cudf_performance_tracking -def _aggs_optimized(arg, supported: set): - """Check that aggregations in `arg` are a subset of `supported`""" - if isinstance(arg, (list, dict)): - if isinstance(arg, dict): - _global_set: set[str] = set() - for col in arg: - if isinstance(arg[col], list): - _global_set = _global_set.union(set(arg[col])) - elif isinstance(arg[col], dict): - _global_set = _global_set.union(set(arg[col].values())) - else: - _global_set.add(arg[col]) - else: - _global_set = set(arg) - - return bool(_global_set.issubset(supported)) - elif isinstance(arg, (str, type)): - return arg in supported - return False - - -@_dask_cudf_performance_tracking -def _groupby_optimized(gb): - """Check that groupby input can use dask-cudf optimized codepath""" - return isinstance(gb.obj, DaskDataFrame) and ( - isinstance(gb.by, str) - or (isinstance(gb.by, list) and all(isinstance(x, str) for x in gb.by)) - ) - - -def _make_name(col_name, sep="_"): - """Combine elements of `col_name` into a single string, or no-op if - `col_name` is already a string - """ - if isinstance(col_name, str): - return col_name - return sep.join(name for name in col_name if name != "") - - -@_dask_cudf_performance_tracking -def _groupby_partition_agg(df, gb_cols, aggs, columns, dropna, sort, sep): - """Initial partition-level aggregation task. - - This is the first operation to be executed on each input - partition in `groupby_agg`. Depending on `aggs`, four possible - groupby aggregations ("count", "sum", "min", and "max") are - performed. The result is then partitioned (by hashing `gb_cols`) - into a number of distinct dictionary elements. The number of - elements in the output dictionary (`split_out`) corresponds to - the number of partitions in the final output of `groupby_agg`. - """ - - # Modify dict for initial (partition-wise) aggregations - _agg_dict = {} - for col, agg_list in aggs.items(): - _agg_dict[col] = set() - for agg in agg_list: - if agg in ("mean", "std", "var"): - _agg_dict[col].add("count") - _agg_dict[col].add("sum") - else: - _agg_dict[col].add(agg) - _agg_dict[col] = list(_agg_dict[col]) - if set(agg_list).intersection({"std", "var"}): - pow2_name = _make_name((col, "pow2"), sep=sep) - df[pow2_name] = df[col].astype("float64").pow(2) - _agg_dict[pow2_name] = ["sum"] - - gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg( - _agg_dict - ) - output_columns = [_make_name(name, sep=sep) for name in gb.columns] - gb.columns = output_columns - # Return with deterministic column ordering - return gb[sorted(output_columns)] - - -@_dask_cudf_performance_tracking -def _tree_node_agg(df, gb_cols, dropna, sort, sep): - """Node in groupby-aggregation reduction tree. - - The input DataFrame (`df`) corresponds to the - concatenated output of one or more `_groupby_partition_agg` - tasks. In this function, "sum", "min" and/or "max" groupby - aggregations will be used to combine the statistics for - duplicate keys. - """ - - agg_dict = {} - for col in df.columns: - if col in gb_cols: - continue - agg = col.split(sep)[-1] - if agg in ("count", "sum"): - agg_dict[col] = ["sum"] - elif agg == "list": - agg_dict[col] = [list] - elif agg in OPTIMIZED_AGGS: - agg_dict[col] = [agg] - else: - raise ValueError(f"Unexpected aggregation: {agg}") - - gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg( - agg_dict - ) - - # Don't include the last aggregation in the column names - output_columns = [ - _make_name(name[:-1] if isinstance(name, tuple) else name, sep=sep) - for name in gb.columns - ] - gb.columns = output_columns - # Return with deterministic column ordering - return gb[sorted(output_columns)] - - -@_dask_cudf_performance_tracking -def _var_agg(df, col, count_name, sum_name, pow2_sum_name, ddof=1): - """Calculate variance (given count, sum, and sum-squared columns).""" - - # Select count, sum, and sum-squared - n = df[count_name] - x = df[sum_name] - x2 = df[pow2_sum_name] - - # Use sum-squared approach to get variance - var = x2 - x**2 / n - div = n - ddof - div[div < 1] = 1 # Avoid division by 0 - var /= div - - # Set appropriate NaN elements - # (since we avoided 0-division) - var[(n - ddof) == 0] = np.nan - - return var - - -@_dask_cudf_performance_tracking -def _finalize_gb_agg( - gb_in, - gb_cols, - aggs, - columns, - final_columns, - as_index, - dropna, - sort, - sep, - str_cols_out, - aggs_renames, -): - """Final aggregation task. - - This is the final operation on each output partitions - of the `groupby_agg` algorithm. This function must - take care of higher-order aggregations, like "mean", - "std" and "var". We also need to deal with the column - index, the row index, and final sorting behavior. - """ - - gb = _tree_node_agg(gb_in, gb_cols, dropna, sort, sep) - - # Deal with higher-order aggregations - for col in columns: - agg_list = aggs.get(col, []) - agg_set = set(agg_list) - if agg_set.intersection({"mean", "std", "var"}): - count_name = _make_name((col, "count"), sep=sep) - sum_name = _make_name((col, "sum"), sep=sep) - if agg_set.intersection({"std", "var"}): - pow2_sum_name = _make_name((col, "pow2", "sum"), sep=sep) - var = _var_agg(gb, col, count_name, sum_name, pow2_sum_name) - if "var" in agg_list: - name_var = _make_name((col, "var"), sep=sep) - gb[name_var] = var - if "std" in agg_list: - name_std = _make_name((col, "std"), sep=sep) - gb[name_std] = np.sqrt(var) - gb.drop(columns=[pow2_sum_name], inplace=True) - if "mean" in agg_list: - mean_name = _make_name((col, "mean"), sep=sep) - gb[mean_name] = gb[sum_name] / gb[count_name] - if "sum" not in agg_list: - gb.drop(columns=[sum_name], inplace=True) - if "count" not in agg_list: - gb.drop(columns=[count_name], inplace=True) - if list in agg_list: - collect_name = _make_name((col, "list"), sep=sep) - gb[collect_name] = gb[collect_name].list.concat() - - # Ensure sorted keys if `sort=True` - if sort: - gb = gb.sort_values(gb_cols) - - # Set index if necessary - if as_index: - gb.set_index(gb_cols, inplace=True) - - # Unflatten column names - col_array = [] - agg_array = [] - for col in gb.columns: - if col in gb_cols: - col_array.append(col) - agg_array.append("") - else: - name, agg = col.split(sep) - col_array.append(name) - agg_array.append(aggs_renames.get((name, agg), agg)) - if str_cols_out: - gb.columns = col_array - else: - gb.columns = pd.MultiIndex.from_arrays([col_array, agg_array]) - - return gb[final_columns] diff --git a/python/dask_cudf/dask_cudf/_legacy/io/__init__.py b/python/dask_cudf/dask_cudf/_legacy/io/__init__.py index 0421bd755f4..47f4edc970d 100644 --- a/python/dask_cudf/dask_cudf/_legacy/io/__init__.py +++ b/python/dask_cudf/dask_cudf/_legacy/io/__init__.py @@ -1,11 +1 @@ # Copyright (c) 2018-2024, NVIDIA CORPORATION. - -from .csv import read_csv # noqa: F401 -from .json import read_json # noqa: F401 -from .orc import read_orc, to_orc # noqa: F401 -from .text import read_text # noqa: F401 - -try: - from .parquet import read_parquet, to_parquet # noqa: F401 -except ImportError: - pass diff --git a/python/dask_cudf/dask_cudf/_legacy/io/csv.py b/python/dask_cudf/dask_cudf/_legacy/io/csv.py deleted file mode 100644 index fa5400344f9..00000000000 --- a/python/dask_cudf/dask_cudf/_legacy/io/csv.py +++ /dev/null @@ -1,222 +0,0 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. - -import os -from glob import glob -from warnings import warn - -from fsspec.utils import infer_compression - -from dask import dataframe as dd -from dask.base import tokenize -from dask.dataframe.io.csv import make_reader -from dask.utils import apply, parse_bytes - -import cudf - - -def read_csv(path, blocksize="default", **kwargs): - """ - Read CSV files into a :class:`.DataFrame`. - - This API parallelizes the :func:`cudf:cudf.read_csv` function in - the following ways: - - It supports loading many files at once using globstrings: - - >>> import dask_cudf - >>> df = dask_cudf.read_csv("myfiles.*.csv") - - In some cases it can break up large files: - - >>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB") - - It can read CSV files from external resources (e.g. S3, HTTP, FTP) - - >>> df = dask_cudf.read_csv("s3://bucket/myfiles.*.csv") - >>> df = dask_cudf.read_csv("https://www.mycloud.com/sample.csv") - - Internally ``read_csv`` uses :func:`cudf:cudf.read_csv` and - supports many of the same keyword arguments with the same - performance guarantees. See the docstring for - :func:`cudf:cudf.read_csv` for more information on available - keyword arguments. - - Parameters - ---------- - path : str, path object, or file-like object - Either a path to a file (a str, :py:class:`pathlib.Path`, or - py._path.local.LocalPath), URL (including http, ftp, and S3 - locations), or any object with a read() method (such as - builtin :py:func:`open` file handler function or - :py:class:`~io.StringIO`). - blocksize : int or str, default "256 MiB" - The target task partition size. If ``None``, a single block - is used for each file. - **kwargs : dict - Passthrough key-word arguments that are sent to - :func:`cudf:cudf.read_csv`. - - Notes - ----- - If any of `skipfooter`/`skiprows`/`nrows` are passed, - `blocksize` will default to None. - - Examples - -------- - >>> import dask_cudf - >>> ddf = dask_cudf.read_csv("sample.csv", usecols=["a", "b"]) - >>> ddf.compute() - a b - 0 1 hi - 1 2 hello - 2 3 ai - - """ - - # Handle `chunksize` deprecation - if "chunksize" in kwargs: - chunksize = kwargs.pop("chunksize", "default") - warn( - "`chunksize` is deprecated and will be removed in the future. " - "Please use `blocksize` instead.", - FutureWarning, - ) - if blocksize == "default": - blocksize = chunksize - - # Set default `blocksize` - if blocksize == "default": - if ( - kwargs.get("skipfooter", 0) != 0 - or kwargs.get("skiprows", 0) != 0 - or kwargs.get("nrows", None) is not None - ): - # Cannot read in blocks if skipfooter, - # skiprows or nrows is passed. - blocksize = None - else: - blocksize = "256 MiB" - - if "://" in str(path): - func = make_reader(cudf.read_csv, "read_csv", "CSV") - return func(path, blocksize=blocksize, **kwargs) - else: - return _internal_read_csv(path=path, blocksize=blocksize, **kwargs) - - -def _internal_read_csv(path, blocksize="256 MiB", **kwargs): - if isinstance(blocksize, str): - blocksize = parse_bytes(blocksize) - - if isinstance(path, list): - filenames = path - elif isinstance(path, str): - filenames = sorted(glob(path)) - elif hasattr(path, "__fspath__"): - filenames = sorted(glob(path.__fspath__())) - else: - raise TypeError(f"Path type not understood:{type(path)}") - - if not filenames: - msg = f"A file in: {filenames} does not exist." - raise FileNotFoundError(msg) - - name = "read-csv-" + tokenize( - path, tokenize, **kwargs - ) # TODO: get last modified time - - compression = kwargs.get("compression", "infer") - - if compression == "infer": - # Infer compression from first path by default - compression = infer_compression(filenames[0]) - - if compression and blocksize: - # compressed CSVs reading must read the entire file - kwargs.pop("byte_range", None) - warn( - "Warning %s compression does not support breaking apart files\n" - "Please ensure that each individual file can fit in memory and\n" - "use the keyword ``blocksize=None to remove this message``\n" - "Setting ``blocksize=(size of file)``" % compression - ) - blocksize = None - - if blocksize is None: - return read_csv_without_blocksize(path, **kwargs) - - # Let dask.dataframe generate meta - dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV") - kwargs1 = kwargs.copy() - usecols = kwargs1.pop("usecols", None) - dtype = kwargs1.pop("dtype", None) - meta = dask_reader(filenames[0], **kwargs1)._meta - names = meta.columns - if usecols or dtype: - # Regenerate meta with original kwargs if - # `usecols` or `dtype` was specified - meta = dask_reader(filenames[0], **kwargs)._meta - - dsk = {} - i = 0 - dtypes = meta.dtypes.values - - for fn in filenames: - size = os.path.getsize(fn) - for start in range(0, size, blocksize): - kwargs2 = kwargs.copy() - kwargs2["byte_range"] = ( - start, - blocksize, - ) # specify which chunk of the file we care about - if start != 0: - kwargs2["names"] = names # no header in the middle of the file - kwargs2["header"] = None - dsk[(name, i)] = (apply, _read_csv, [fn, dtypes], kwargs2) - - i += 1 - - divisions = [None] * (len(dsk) + 1) - return dd.core.new_dd_object(dsk, name, meta, divisions) - - -def _read_csv(fn, dtypes=None, **kwargs): - return cudf.read_csv(fn, **kwargs) - - -def read_csv_without_blocksize(path, **kwargs): - """Read entire CSV with optional compression (gzip/zip) - - Parameters - ---------- - path : str - path to files (support for glob) - """ - if isinstance(path, list): - filenames = path - elif isinstance(path, str): - filenames = sorted(glob(path)) - elif hasattr(path, "__fspath__"): - filenames = sorted(glob(path.__fspath__())) - else: - raise TypeError(f"Path type not understood:{type(path)}") - - name = "read-csv-" + tokenize(path, **kwargs) - - meta_kwargs = kwargs.copy() - if "skipfooter" in meta_kwargs: - meta_kwargs.pop("skipfooter") - if "nrows" in meta_kwargs: - meta_kwargs.pop("nrows") - # Read "head" of first file (first 5 rows). - # Convert to empty df for metadata. - meta = cudf.read_csv(filenames[0], nrows=5, **meta_kwargs).iloc[:0] - - graph = { - (name, i): (apply, cudf.read_csv, [fn], kwargs) - for i, fn in enumerate(filenames) - } - - divisions = [None] * (len(filenames) + 1) - - return dd.core.new_dd_object(graph, name, meta, divisions) diff --git a/python/dask_cudf/dask_cudf/_legacy/io/json.py b/python/dask_cudf/dask_cudf/_legacy/io/json.py deleted file mode 100644 index 98c5ceedb76..00000000000 --- a/python/dask_cudf/dask_cudf/_legacy/io/json.py +++ /dev/null @@ -1,209 +0,0 @@ -# Copyright (c) 2019-2024, NVIDIA CORPORATION. - -from functools import partial - -import numpy as np -from fsspec.core import get_compression, get_fs_token_paths - -import dask -from dask.utils import parse_bytes - -import cudf -from cudf.core.column import as_column -from cudf.utils.ioutils import _is_local_filesystem - -from dask_cudf.backends import _default_backend - - -def _read_json_partition( - paths, - fs=None, - include_path_column=False, - path_converter=None, - **kwargs, -): - # Transfer all data up front for remote storage - sources = ( - paths - if fs is None - else fs.cat_ranges( - paths, - [0] * len(paths), - fs.sizes(paths), - ) - ) - - if include_path_column: - # Add "path" column. - # Must iterate over sources sequentially - if not isinstance(include_path_column, str): - include_path_column = "path" - converted_paths = ( - paths - if path_converter is None - else [path_converter(path) for path in paths] - ) - dfs = [] - for i, source in enumerate(sources): - df = cudf.read_json(source, **kwargs) - df[include_path_column] = as_column( - converted_paths[i], length=len(df) - ) - dfs.append(df) - return cudf.concat(dfs) - else: - # Pass sources directly to cudf - return cudf.read_json(sources, **kwargs) - - -def read_json( - url_path, - engine="auto", - blocksize=None, - orient="records", - lines=None, - compression="infer", - aggregate_files=True, - **kwargs, -): - """Read JSON data into a :class:`.DataFrame`. - - This function wraps :func:`dask.dataframe.read_json`, and passes - ``engine=partial(cudf.read_json, engine="auto")`` by default. - - Parameters - ---------- - url_path : str, list of str - Location to read from. If a string, can include a glob character to - find a set of file names. - Supports protocol specifications such as ``"s3://"``. - engine : str or Callable, default "auto" - - If str, this value will be used as the ``engine`` argument - when :func:`cudf.read_json` is used to create each partition. - If a :obj:`~collections.abc.Callable`, this value will be used as the - underlying function used to create each partition from JSON - data. The default value is "auto", so that - ``engine=partial(cudf.read_json, engine="auto")`` will be - passed to :func:`dask.dataframe.read_json` by default. - aggregate_files : bool or int - Whether to map multiple files to each output partition. If True, - the `blocksize` argument will be used to determine the number of - files in each partition. If any one file is larger than `blocksize`, - the `aggregate_files` argument will be ignored. If an integer value - is specified, the `blocksize` argument will be ignored, and that - number of files will be mapped to each partition. Default is True. - **kwargs : - Key-word arguments to pass through to :func:`dask.dataframe.read_json`. - - Returns - ------- - :class:`.DataFrame` - - Examples - -------- - Load single file - - >>> from dask_cudf import read_json - >>> read_json('myfile.json') # doctest: +SKIP - - Load large line-delimited JSON files using partitions of approx - 256MB size - - >>> read_json('data/file*.csv', blocksize=2**28) # doctest: +SKIP - - Load nested JSON data - - >>> read_json('myfile.json') # doctest: +SKIP - - See Also - -------- - dask.dataframe.read_json - - """ - - if lines is None: - lines = orient == "records" - if orient != "records" and lines: - raise ValueError( - 'Line-delimited JSON is only available with orient="records".' - ) - if blocksize and (orient != "records" or not lines): - raise ValueError( - "JSON file chunking only allowed for JSON-lines" - "input (orient='records', lines=True)." - ) - - inputs = [] - if aggregate_files and blocksize or int(aggregate_files) > 1: - # Attempt custom read if we are mapping multiple files - # to each output partition. Otherwise, upstream logic - # is sufficient. - - storage_options = kwargs.get("storage_options", {}) - fs, _, paths = get_fs_token_paths( - url_path, mode="rb", storage_options=storage_options - ) - if isinstance(aggregate_files, int) and aggregate_files > 1: - # Map a static file count to each partition - inputs = [ - paths[offset : offset + aggregate_files] - for offset in range(0, len(paths), aggregate_files) - ] - elif aggregate_files is True and blocksize: - # Map files dynamically (using blocksize) - file_sizes = fs.sizes(paths) # NOTE: This can be slow - blocksize = parse_bytes(blocksize) - if all([file_size <= blocksize for file_size in file_sizes]): - counts = np.unique( - np.floor(np.cumsum(file_sizes) / blocksize), - return_counts=True, - )[1] - offsets = np.concatenate([[0], counts.cumsum()]) - inputs = [ - paths[offsets[i] : offsets[i + 1]] - for i in range(len(offsets) - 1) - ] - - if inputs: - # Inputs were successfully populated. - # Use custom _read_json_partition function - # to generate each partition. - - compression = get_compression( - url_path[0] if isinstance(url_path, list) else url_path, - compression, - ) - _kwargs = dict( - orient=orient, - lines=lines, - compression=compression, - include_path_column=kwargs.get("include_path_column", False), - path_converter=kwargs.get("path_converter"), - ) - if not _is_local_filesystem(fs): - _kwargs["fs"] = fs - # TODO: Generate meta more efficiently - meta = _read_json_partition(inputs[0][:1], **_kwargs) - return dask.dataframe.from_map( - _read_json_partition, - inputs, - meta=meta, - **_kwargs, - ) - - # Fall back to dask.dataframe.read_json - return _default_backend( - dask.dataframe.read_json, - url_path, - engine=( - partial(cudf.read_json, engine=engine) - if isinstance(engine, str) - else engine - ), - blocksize=blocksize, - orient=orient, - lines=lines, - compression=compression, - **kwargs, - ) diff --git a/python/dask_cudf/dask_cudf/_legacy/io/orc.py b/python/dask_cudf/dask_cudf/_legacy/io/orc.py deleted file mode 100644 index fcf684fd6c8..00000000000 --- a/python/dask_cudf/dask_cudf/_legacy/io/orc.py +++ /dev/null @@ -1,195 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from io import BufferedWriter, IOBase - -from fsspec.core import get_fs_token_paths -from fsspec.utils import stringify_path -from pyarrow import orc as orc - -from dask import dataframe as dd -from dask.dataframe.io.utils import _get_pyarrow_dtypes - -import cudf - - -def _read_orc_stripe(source, fs, columns=None, kwargs=None): - """Pull out specific columns from specific stripe""" - path, stripe = source - if kwargs is None: - kwargs = {} - with fs.open(path, "rb") as f: - df_stripe = cudf.read_orc( - f, stripes=[stripe], columns=columns, **kwargs - ) - return df_stripe - - -def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs): - """Read ORC files into a :class:`.DataFrame`. - - Note that this function is mostly borrowed from upstream Dask. - - Parameters - ---------- - path : str or list[str] - Location of file(s), which can be a full URL with protocol specifier, - and may include glob character if a single string. - columns : None or list[str] - Columns to load. If None, loads all. - filters : None or list of tuple or list of lists of tuples - If not None, specifies a filter predicate used to filter out - row groups using statistics stored for each row group as - Parquet metadata. Row groups that do not match the given - filter predicate are not read. The predicate is expressed in - `disjunctive normal form (DNF) - `__ - like ``[[('x', '=', 0), ...], ...]``. DNF allows arbitrary - boolean logical combinations of single column predicates. The - innermost tuples each describe a single column predicate. The - list of inner predicates is interpreted as a conjunction - (AND), forming a more selective and multiple column predicate. - Finally, the outermost list combines these filters as a - disjunction (OR). Predicates may also be passed as a list of - tuples. This form is interpreted as a single conjunction. To - express OR in predicates, one must use the (preferred) - notation of list of lists of tuples. - storage_options : None or dict - Further parameters to pass to the bytes backend. - - See Also - -------- - dask.dataframe.read_orc - - Returns - ------- - dask_cudf.DataFrame - - """ - - storage_options = storage_options or {} - fs, _, paths = get_fs_token_paths( - path, mode="rb", storage_options=storage_options - ) - schema = None - nstripes_per_file = [] - for path in paths: - with fs.open(path, "rb") as f: - o = orc.ORCFile(f) - if schema is None: - schema = o.schema - elif schema != o.schema: - raise ValueError( - "Incompatible schemas while parsing ORC files" - ) - nstripes_per_file.append(o.nstripes) - schema = _get_pyarrow_dtypes(schema, categories=None) - if columns is not None: - ex = set(columns) - set(schema) - if ex: - raise ValueError( - f"Requested columns ({ex}) not in schema ({set(schema)})" - ) - else: - columns = list(schema) - - with fs.open(paths[0], "rb") as f: - meta = cudf.read_orc( - f, - stripes=[0] if nstripes_per_file[0] else None, - columns=columns, - **kwargs, - ) - - sources = [] - for path, n in zip(paths, nstripes_per_file): - for stripe in ( - range(n) - if filters is None - else cudf.io.orc._filter_stripes(filters, path) - ): - sources.append((path, stripe)) - - return dd.from_map( - _read_orc_stripe, - sources, - args=[fs], - columns=columns, - kwargs=kwargs, - meta=meta, - ) - - -def write_orc_partition(df, path, fs, filename, compression="snappy"): - full_path = fs.sep.join([path, filename]) - with fs.open(full_path, mode="wb") as out_file: - if not isinstance(out_file, IOBase): - out_file = BufferedWriter(out_file) - cudf.io.to_orc(df, out_file, compression=compression) - return full_path - - -def to_orc( - df, - path, - write_index=True, - storage_options=None, - compression="snappy", - compute=True, - **kwargs, -): - """ - Write a :class:`.DataFrame` to ORC file(s) (one file per partition). - - Parameters - ---------- - df : DataFrame - path : str or pathlib.Path - Destination directory for data. Prepend with protocol like ``s3://`` - or ``hdfs://`` for remote data. - write_index : boolean, optional - Whether or not to write the index. Defaults to True. - storage_options : None or dict - Further parameters to pass to the bytes backend. - compression : string or dict, optional - compute : bool, optional - If True (default) then the result is computed immediately. If - False then a :class:`~dask.delayed.Delayed` object is returned - for future computation. - - """ - - from dask import compute as dask_compute, delayed - - # TODO: Use upstream dask implementation once available - # (see: Dask Issue#5596) - - if hasattr(path, "name"): - path = stringify_path(path) - fs, _, _ = get_fs_token_paths( - path, mode="wb", storage_options=storage_options - ) - # Trim any protocol information from the path before forwarding - path = fs._strip_protocol(path) - - if write_index: - df = df.reset_index() - else: - # Not writing index - might as well drop it - df = df.reset_index(drop=True) - - fs.mkdirs(path, exist_ok=True) - - # Use i_offset and df.npartitions to define file-name list - filenames = ["part.%i.orc" % i for i in range(df.npartitions)] - - # write parts - dwrite = delayed(write_orc_partition) - parts = [ - dwrite(d, path, fs, filename, compression=compression) - for d, filename in zip(df.to_delayed(), filenames) - ] - - if compute: - return dask_compute(*parts) - - return delayed(list)(parts) diff --git a/python/dask_cudf/dask_cudf/_legacy/io/parquet.py b/python/dask_cudf/dask_cudf/_legacy/io/parquet.py index c0638e4a1c3..ed08ad485d7 100644 --- a/python/dask_cudf/dask_cudf/_legacy/io/parquet.py +++ b/python/dask_cudf/dask_cudf/_legacy/io/parquet.py @@ -8,7 +8,6 @@ import pandas as pd from pyarrow import dataset as pa_ds, parquet as pq -from dask import dataframe as dd from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine try: @@ -448,66 +447,6 @@ def set_object_dtypes_from_pa_schema(df, schema): df._data[col_name] = col.astype(typ) -def read_parquet(path, columns=None, **kwargs): - """ - Read parquet files into a :class:`.DataFrame`. - - Calls :func:`dask.dataframe.read_parquet` with ``engine=CudfEngine`` - to coordinate the execution of :func:`cudf.read_parquet`, and to - ultimately create a :class:`.DataFrame` collection. - - See the :func:`dask.dataframe.read_parquet` documentation for - all available options. - - Examples - -------- - >>> from dask_cudf import read_parquet - >>> df = read_parquet("/path/to/dataset/") # doctest: +SKIP - - When dealing with one or more large parquet files having an - in-memory footprint >15% device memory, the ``split_row_groups`` - argument should be used to map Parquet **row-groups** to DataFrame - partitions (instead of **files** to partitions). For example, the - following code will map each row-group to a distinct partition: - - >>> df = read_parquet(..., split_row_groups=True) # doctest: +SKIP - - To map **multiple** row-groups to each partition, an integer can be - passed to ``split_row_groups`` to specify the **maximum** number of - row-groups allowed in each output partition: - - >>> df = read_parquet(..., split_row_groups=10) # doctest: +SKIP - - See Also - -------- - cudf.read_parquet - dask.dataframe.read_parquet - """ - if isinstance(columns, str): - columns = [columns] - - # Set "check_file_size" option to determine whether we - # should check the parquet-file size. This check is meant - # to "protect" users from `split_row_groups` default changes - check_file_size = kwargs.pop("check_file_size", 500_000_000) - if ( - check_file_size - and ("split_row_groups" not in kwargs) - and ("chunksize" not in kwargs) - ): - # User is not specifying `split_row_groups` or `chunksize`, - # so we should warn them if/when a file is ~>0.5GB on disk. - # They can set `split_row_groups` explicitly to silence/skip - # this check - if "read" not in kwargs: - kwargs["read"] = {} - kwargs["read"]["check_file_size"] = check_file_size - - return dd.read_parquet(path, columns=columns, engine=CudfEngine, **kwargs) - - -to_parquet = partial(dd.to_parquet, engine=CudfEngine) - if create_metadata_file_dd is None: create_metadata_file = create_metadata_file_dd else: diff --git a/python/dask_cudf/dask_cudf/_legacy/io/text.py b/python/dask_cudf/dask_cudf/_legacy/io/text.py deleted file mode 100644 index 3757c85c80c..00000000000 --- a/python/dask_cudf/dask_cudf/_legacy/io/text.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. - -import os -from glob import glob - -import dask.dataframe as dd -from dask.utils import parse_bytes - -import cudf - - -def _read_text(source, **kwargs): - # Wrapper for cudf.read_text operation - fn, byte_range = source - return cudf.read_text(fn, byte_range=byte_range, **kwargs) - - -def read_text(path, chunksize="256 MiB", byte_range=None, **kwargs): - if isinstance(chunksize, str): - chunksize = parse_bytes(chunksize) - - if isinstance(path, list): - filenames = path - elif isinstance(path, str): - filenames = sorted(glob(path)) - elif hasattr(path, "__fspath__"): - filenames = sorted(glob(path.__fspath__())) - else: - raise TypeError(f"Path type not understood:{type(path)}") - - if not filenames: - msg = f"A file in: {filenames} does not exist." - raise FileNotFoundError(msg) - - if chunksize and byte_range: - raise ValueError("Cannot specify both chunksize and byte_range.") - - if chunksize: - sources = [] - for fn in filenames: - size = os.path.getsize(fn) - for start in range(0, size, chunksize): - byte_range = ( - start, - chunksize, - ) # specify which chunk of the file we care about - sources.append((fn, byte_range)) - else: - sources = [(fn, byte_range) for fn in filenames] - - return dd.from_map( - _read_text, - sources, - meta=cudf.Series([], dtype="O"), - **kwargs, - ) diff --git a/python/dask_cudf/dask_cudf/_legacy/sorting.py b/python/dask_cudf/dask_cudf/_legacy/sorting.py deleted file mode 100644 index a2ba4d1878e..00000000000 --- a/python/dask_cudf/dask_cudf/_legacy/sorting.py +++ /dev/null @@ -1,361 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -import warnings -from collections.abc import Iterator -from functools import wraps - -import cupy -import numpy as np -import tlz as toolz - -from dask import config -from dask.base import tokenize -from dask.dataframe import methods -from dask.dataframe.core import DataFrame, Index, Series -from dask.dataframe.shuffle import rearrange_by_column -from dask.highlevelgraph import HighLevelGraph -from dask.utils import M - -import cudf -from cudf.api.types import _is_categorical_dtype -from cudf.utils.performance_tracking import _dask_cudf_performance_tracking - -_SHUFFLE_SUPPORT = ("tasks", "p2p") # "disk" not supported - - -def _deprecate_shuffle_kwarg(func): - @wraps(func) - def wrapper(*args, **kwargs): - old_arg_value = kwargs.pop("shuffle", None) - - if old_arg_value is not None: - new_arg_value = old_arg_value - msg = ( - "the 'shuffle' keyword is deprecated, " - "use 'shuffle_method' instead." - ) - - warnings.warn(msg, FutureWarning) - if kwargs.get("shuffle_method") is not None: - msg = ( - "Can only specify 'shuffle' " - "or 'shuffle_method', not both." - ) - raise TypeError(msg) - kwargs["shuffle_method"] = new_arg_value - return func(*args, **kwargs) - - return wrapper - - -@_dask_cudf_performance_tracking -def set_index_post(df, index_name, drop, column_dtype): - df2 = df.set_index(index_name, drop=drop) - df2.columns = df2.columns.astype(column_dtype) - return df2 - - -@_dask_cudf_performance_tracking -def _set_partitions_pre(s, divisions, ascending=True, na_position="last"): - if ascending: - partitions = divisions.searchsorted(s, side="right") - 1 - else: - partitions = ( - len(divisions) - divisions.searchsorted(s, side="right") - 1 - ) - partitions[(partitions < 0) | (partitions >= len(divisions) - 1)] = ( - 0 if ascending else (len(divisions) - 2) - ) - partitions[s._columns[0].isnull().values] = ( - len(divisions) - 2 if na_position == "last" else 0 - ) - return partitions - - -@_dask_cudf_performance_tracking -def _quantile(a, q): - n = len(a) - if not len(a): - return None, n - return ( - a.quantile(q=q.tolist(), interpolation="nearest", method="table"), - n, - ) - - -@_dask_cudf_performance_tracking -def merge_quantiles(finalq, qs, vals): - """Combine several quantile calculations of different data. - [NOTE: Same logic as dask.array merge_percentiles] - """ - if isinstance(finalq, Iterator): - finalq = list(finalq) - finalq = np.array(finalq) - qs = list(map(list, qs)) - vals = list(vals) - vals, Ns = zip(*vals) - Ns = list(Ns) - - L = list(zip(*[(q, val, N) for q, val, N in zip(qs, vals, Ns) if N])) - if not L: - raise ValueError("No non-trivial arrays found") - qs, vals, Ns = L - - if len(vals) != len(qs) or len(Ns) != len(qs): - raise ValueError("qs, vals, and Ns parameters must be the same length") - - # transform qs and Ns into number of observations between quantiles - counts = [] - for q, N in zip(qs, Ns): - count = np.empty(len(q)) - count[1:] = np.diff(q) - count[0] = q[0] - count *= N - counts.append(count) - - def _append_counts(val, count): - val["_counts"] = count - return val - - # Sort by calculated quantile values, then number of observations. - combined_vals_counts = cudf.core.reshape._merge_sorted( - [*map(_append_counts, vals, counts)] - ) - combined_counts = cupy.asnumpy(combined_vals_counts["_counts"].values) - combined_vals = combined_vals_counts.drop(columns=["_counts"]) - - # quantile-like, but scaled by total number of observations - combined_q = np.cumsum(combined_counts) - - # rescale finalq quantiles to match combined_q - desired_q = finalq * sum(Ns) - - # TODO: Support other interpolation methods - # For now - Always use "nearest" for interpolation - left = np.searchsorted(combined_q, desired_q, side="left") - right = np.searchsorted(combined_q, desired_q, side="right") - 1 - np.minimum(left, len(combined_vals) - 1, left) # don't exceed max index - lower = np.minimum(left, right) - upper = np.maximum(left, right) - lower_residual = np.abs(combined_q[lower] - desired_q) - upper_residual = np.abs(combined_q[upper] - desired_q) - mask = lower_residual > upper_residual - index = lower # alias; we no longer need lower - index[mask] = upper[mask] - rv = combined_vals.iloc[index] - return rv.reset_index(drop=True) - - -@_dask_cudf_performance_tracking -def _approximate_quantile(df, q): - """Approximate quantiles of DataFrame or Series. - [NOTE: Same logic as dask.dataframe Series quantile] - """ - # current implementation needs q to be sorted so - # sort if array-like, otherwise leave it alone - q_ndarray = np.array(q) - if q_ndarray.ndim > 0: - q_ndarray.sort(kind="mergesort") - q = q_ndarray - - # Lets assume we are dealing with a DataFrame throughout - if isinstance(df, (Series, Index)): - df = df.to_frame() - assert isinstance(df, DataFrame) - final_type = df._meta._constructor - - # Create metadata - meta = df._meta_nonempty.quantile(q=q, method="table") - - # Define final action (create df with quantiles as index) - def finalize_tsk(tsk): - return (final_type, tsk) - - return_type = df.__class__ - - # pandas/cudf uses quantile in [0, 1] - # numpy / cupy uses [0, 100] - qs = np.asarray(q) - token = tokenize(df, qs) - - if len(qs) == 0: - name = "quantiles-" + token - empty_index = cudf.Index([], dtype=float) - return Series( - { - (name, 0): final_type( - {col: [] for col in df.columns}, - name=df.name, - index=empty_index, - ) - }, - name, - df._meta, - [None, None], - ) - else: - new_divisions = [np.min(q), np.max(q)] - - name = "quantiles-1-" + token - val_dsk = { - (name, i): (_quantile, key, qs) - for i, key in enumerate(df.__dask_keys__()) - } - - name2 = "quantiles-2-" + token - merge_dsk = { - (name2, 0): finalize_tsk( - (merge_quantiles, qs, [qs] * df.npartitions, sorted(val_dsk)) - ) - } - dsk = toolz.merge(val_dsk, merge_dsk) - graph = HighLevelGraph.from_collections(name2, dsk, dependencies=[df]) - df = return_type(graph, name2, meta, new_divisions) - - def set_quantile_index(df): - df.index = q - return df - - df = df.map_partitions(set_quantile_index, meta=meta) - return df - - -@_dask_cudf_performance_tracking -def quantile_divisions(df, by, npartitions): - qn = np.linspace(0.0, 1.0, npartitions + 1).tolist() - divisions = _approximate_quantile(df[by], qn).compute() - columns = divisions.columns - - # TODO: Make sure divisions are correct for all dtypes.. - if ( - len(columns) == 1 - and df[columns[0]].dtype != "object" - and not _is_categorical_dtype(df[columns[0]].dtype) - ): - dtype = df[columns[0]].dtype - divisions = divisions[columns[0]].astype("int64") - divisions.iloc[-1] += 1 - divisions = sorted( - divisions.drop_duplicates().astype(dtype).to_arrow().tolist(), - key=lambda x: (x is None, x), - ) - else: - for col in columns: - dtype = df[col].dtype - if dtype != "object": - divisions[col] = divisions[col].astype("int64") - divisions[col].iloc[-1] += 1 - divisions[col] = divisions[col].astype(dtype) - else: - if last := divisions[col].iloc[-1]: - val = chr(ord(last[0]) + 1) - else: - val = "this string intentionally left empty" # any but "" - divisions[col].iloc[-1] = val - divisions = divisions.drop_duplicates().sort_index() - return divisions - - -@_deprecate_shuffle_kwarg -@_dask_cudf_performance_tracking -def sort_values( - df, - by, - max_branch=None, - divisions=None, - set_divisions=False, - ignore_index=False, - ascending=True, - na_position="last", - shuffle_method=None, - sort_function=None, - sort_function_kwargs=None, -): - """Sort by the given list/tuple of column names.""" - - if not isinstance(ascending, bool): - raise ValueError("ascending must be either True or False") - if na_position not in ("first", "last"): - raise ValueError("na_position must be either 'first' or 'last'") - - npartitions = df.npartitions - if isinstance(by, tuple): - by = list(by) - elif not isinstance(by, list): - by = [by] - - # parse custom sort function / kwargs if provided - sort_kwargs = { - "by": by, - "ascending": ascending, - "na_position": na_position, - } - if sort_function is None: - sort_function = M.sort_values - if sort_function_kwargs is not None: - sort_kwargs.update(sort_function_kwargs) - - # handle single partition case - if npartitions == 1: - return df.map_partitions(sort_function, **sort_kwargs) - - # Step 1 - Calculate new divisions (if necessary) - if divisions is None: - divisions = quantile_divisions(df, by, npartitions) - - # Step 2 - Perform repartitioning shuffle - meta = df._meta._constructor_sliced([0]) - if not isinstance(divisions, (cudf.Series, cudf.DataFrame)): - dtype = df[by[0]].dtype - divisions = df._meta._constructor_sliced(divisions, dtype=dtype) - - partitions = df[by].map_partitions( - _set_partitions_pre, - divisions=divisions, - ascending=ascending, - na_position=na_position, - meta=meta, - ) - - df2 = df.assign(_partitions=partitions) - df3 = rearrange_by_column( - df2, - "_partitions", - max_branch=max_branch, - npartitions=len(divisions) - 1, - shuffle_method=_get_shuffle_method(shuffle_method), - ignore_index=ignore_index, - ).drop(columns=["_partitions"]) - df3.divisions = (None,) * (df3.npartitions + 1) - - # Step 3 - Return final sorted df - df4 = df3.map_partitions(sort_function, **sort_kwargs) - if not isinstance(divisions, cudf.DataFrame) and set_divisions: - # Can't have multi-column divisions elsewhere in dask (yet) - df4.divisions = tuple(methods.tolist(divisions)) - - return df4 - - -def get_default_shuffle_method(): - # Note that `dask.utils.get_default_shuffle_method` - # will return "p2p" by default when a distributed - # client is present. Dask-cudf supports "p2p", but - # will not use it by default (yet) - default = config.get("dataframe.shuffle.method", "tasks") - if default not in _SHUFFLE_SUPPORT: - default = "tasks" - return default - - -def _get_shuffle_method(shuffle_method): - # Utility to set the shuffle_method-kwarg default - # and to validate user-specified options - shuffle_method = shuffle_method or get_default_shuffle_method() - if shuffle_method not in _SHUFFLE_SUPPORT: - raise ValueError( - "Dask-cudf only supports the following shuffle " - f"methods: {_SHUFFLE_SUPPORT}. Got shuffle_method={shuffle_method}" - ) - - return shuffle_method diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index fceaaf185e8..afd96326c2f 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -11,14 +11,19 @@ from packaging.version import Version from pandas.api.types import is_scalar -import dask.dataframe as dd from dask import config from dask.array.dispatch import percentile_lookup from dask.dataframe.backends import ( DataFrameBackendEntrypoint, PandasBackendEntrypoint, ) -from dask.dataframe.core import get_parallel_type, meta_nonempty +from dask.dataframe.core import ( + DataFrame, + Index, + Series, + get_parallel_type, + meta_nonempty, +) from dask.dataframe.dispatch import ( categorical_dtype_dispatch, concat_dispatch, @@ -28,6 +33,7 @@ hash_object_dispatch, is_categorical_dtype_dispatch, make_meta_dispatch, + partd_encode_dispatch, pyarrow_schema_dispatch, to_pyarrow_table_dispatch, tolist_dispatch, @@ -46,8 +52,6 @@ from cudf.api.types import is_string_dtype from cudf.utils.performance_tracking import _dask_cudf_performance_tracking -from ._legacy.core import DataFrame, Index, Series - get_parallel_type.register(cudf.DataFrame, lambda _: DataFrame) get_parallel_type.register(cudf.Series, lambda _: Series) get_parallel_type.register(cudf.BaseIndex, lambda _: Index) @@ -464,28 +468,21 @@ def sizeof_cudf_series_index(obj): return obj.memory_usage() -# TODO: Remove try/except when cudf is pinned to dask>=2023.10.0 -try: - from dask.dataframe.dispatch import partd_encode_dispatch - - @partd_encode_dispatch.register(cudf.DataFrame) - def _simple_cudf_encode(_): - # Basic pickle-based encoding for a partd k-v store - import pickle - - import partd +@partd_encode_dispatch.register(cudf.DataFrame) +def _simple_cudf_encode(_): + # Basic pickle-based encoding for a partd k-v store + import pickle - def join(dfs): - if not dfs: - return cudf.DataFrame() - else: - return cudf.concat(dfs) + import partd - dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) - return partial(partd.Encode, dumps, pickle.loads, join) + def join(dfs): + if not dfs: + return cudf.DataFrame() + else: + return cudf.concat(dfs) -except ImportError: - pass + dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) + return partial(partd.Encode, dumps, pickle.loads, join) def _default_backend(func, *args, **kwargs): @@ -557,105 +554,22 @@ def to_cudf_dispatch_from_cudf(data, **kwargs): return data -# Define "cudf" backend engine to be registered with Dask -class CudfBackendEntrypoint(DataFrameBackendEntrypoint): - """Backend-entrypoint class for Dask-DataFrame +# Define the "cudf" backend for "legacy" Dask DataFrame +class LegacyCudfBackendEntrypoint(DataFrameBackendEntrypoint): + """Backend-entrypoint class for legacy Dask-DataFrame This class is registered under the name "cudf" for the - ``dask.dataframe.backends`` entrypoint in ``setup.cfg``. - Dask-DataFrame will use the methods defined in this class - in place of ``dask.dataframe.`` when the - "dataframe.backend" configuration is set to "cudf": - - Examples - -------- - >>> import dask - >>> import dask.dataframe as dd - >>> with dask.config.set({"dataframe.backend": "cudf"}): - ... ddf = dd.from_dict({"a": range(10)}) - >>> type(ddf) - + ``dask.dataframe.backends`` entrypoint in ``pyproject.toml``. + This "legacy" backend is only used for CSV support. """ - @classmethod - def to_backend_dispatch(cls): - return to_cudf_dispatch - - @classmethod - def to_backend(cls, data: dd.core._Frame, **kwargs): - if isinstance(data._meta, (cudf.DataFrame, cudf.Series, cudf.Index)): - # Already a cudf-backed collection - _unsupported_kwargs("cudf", "cudf", kwargs) - return data - return data.map_partitions(cls.to_backend_dispatch(), **kwargs) - @staticmethod - def from_dict( - data, - npartitions, - orient="columns", - dtype=None, - columns=None, - constructor=cudf.DataFrame, - ): - return _default_backend( - dd.from_dict, - data, - npartitions=npartitions, - orient=orient, - dtype=dtype, - columns=columns, - constructor=constructor, - ) - - @staticmethod - def read_parquet(*args, engine=None, **kwargs): - from dask_cudf._legacy.io.parquet import CudfEngine - - _raise_unsupported_parquet_kwargs(**kwargs) - return _default_backend( - dd.read_parquet, - *args, - engine=CudfEngine, - **kwargs, - ) - - @staticmethod - def read_json(*args, **kwargs): - from dask_cudf._legacy.io.json import read_json - - return read_json(*args, **kwargs) - - @staticmethod - def read_orc(*args, **kwargs): - from dask_cudf._legacy.io import read_orc - - return read_orc(*args, **kwargs) - - @staticmethod - def read_csv(*args, **kwargs): - from dask_cudf._legacy.io import read_csv - - return read_csv(*args, **kwargs) - - @staticmethod - def read_hdf(*args, **kwargs): - # HDF5 reader not yet implemented in cudf - warnings.warn( - "read_hdf is not yet implemented in cudf/dask_cudf. " - "Moving to cudf from pandas. Expect poor performance!" - ) - return _default_backend(dd.read_hdf, *args, **kwargs).to_backend( - "cudf" - ) - - -# Define "cudf" backend entrypoint for dask-expr -class CudfDXBackendEntrypoint(DataFrameBackendEntrypoint): +# Define the "cudf" backend for expr-based Dask DataFrame +class CudfBackendEntrypoint(DataFrameBackendEntrypoint): """Backend-entrypoint class for Dask-Expressions This class is registered under the name "cudf" for the - ``dask-expr.dataframe.backends`` entrypoint in ``setup.cfg``. + ``dask_expr.dataframe.backends`` entrypoint in ``pyproject.toml``. Dask-DataFrame will use the methods defined in this class in place of ``dask_expr.`` when the "dataframe.backend" configuration is set to "cudf": @@ -746,12 +660,12 @@ def read_csv( @staticmethod def read_json(*args, **kwargs): - from dask_cudf._legacy.io.json import read_json as read_json_impl + from dask_cudf.io.json import read_json as read_json_impl return read_json_impl(*args, **kwargs) @staticmethod def read_orc(*args, **kwargs): - from dask_cudf._legacy.io.orc import read_orc as legacy_read_orc + from dask_cudf.io.orc import read_orc as legacy_read_orc return legacy_read_orc(*args, **kwargs) diff --git a/python/dask_cudf/dask_cudf/core.py b/python/dask_cudf/dask_cudf/core.py index 5fd217209ec..087465adcb3 100644 --- a/python/dask_cudf/dask_cudf/core.py +++ b/python/dask_cudf/dask_cudf/core.py @@ -1,56 +1,41 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. import textwrap +import warnings +from importlib import import_module import dask.dataframe as dd -from dask.tokenize import tokenize import cudf from cudf.utils.performance_tracking import _dask_cudf_performance_tracking # This module provides backward compatibility for legacy import patterns. -if dd.DASK_EXPR_ENABLED: - from dask_cudf._expr.collection import ( - DataFrame, - Index, - Series, - ) -else: - from dask_cudf._legacy.core import DataFrame, Index, Series # noqa: F401 - +from dask_cudf._expr.collection import ( + DataFrame, # noqa: F401 + Index, # noqa: F401 + Series, # noqa: F401 +) concat = dd.concat @_dask_cudf_performance_tracking def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None): - from dask_cudf import QUERY_PLANNING_ON - if isinstance(getattr(data, "index", None), cudf.MultiIndex): raise NotImplementedError( "dask_cudf does not support MultiIndex Dataframes." ) - # Dask-expr doesn't support the `name` argument - name = {} - if not QUERY_PLANNING_ON: - name = { - "name": name - or ("from_cudf-" + tokenize(data, npartitions or chunksize)) - } - return dd.from_pandas( data, npartitions=npartitions, chunksize=chunksize, sort=sort, - **name, ) -from_cudf.__doc__ = ( - textwrap.dedent( - """ +from_cudf.__doc__ = textwrap.dedent( + """ Create a :class:`.DataFrame` from a :class:`cudf.DataFrame`. This function is a thin wrapper around @@ -58,9 +43,23 @@ def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None): arguments (described below) excepting that it operates on cuDF rather than pandas objects.\n """ - ) - # TODO: `dd.from_pandas.__doc__` is empty when - # `DASK_DATAFRAME__QUERY_PLANNING=True` - # since dask-expr does not provide a docstring for from_pandas. - + textwrap.dedent(dd.from_pandas.__doc__ or "") -) +) + textwrap.dedent(dd.from_pandas.__doc__) + + +def _deprecated_api(old_api, new_api=None, rec=None): + def inner_func(*args, **kwargs): + if new_api: + # Use alternative + msg = f"{old_api} is now deprecated. " + msg += rec or f"Please use {new_api} instead." + warnings.warn(msg, FutureWarning) + new_attr = new_api.split(".") + module = import_module(".".join(new_attr[:-1])) + return getattr(module, new_attr[-1])(*args, **kwargs) + + # No alternative - raise an error + raise NotImplementedError( + f"{old_api} is no longer supported. " + (rec or "") + ) + + return inner_func diff --git a/python/dask_cudf/dask_cudf/io/__init__.py b/python/dask_cudf/dask_cudf/io/__init__.py index 9bca33e414a..6837cad25ce 100644 --- a/python/dask_cudf/dask_cudf/io/__init__.py +++ b/python/dask_cudf/dask_cudf/io/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from dask_cudf import QUERY_PLANNING_ON, _deprecated_api +from dask_cudf.core import _deprecated_api from . import csv, json, orc, parquet, text # noqa: F401 @@ -15,20 +15,13 @@ ) to_orc = _deprecated_api( "dask_cudf.io.to_orc", - new_api="dask_cudf._legacy.io.to_orc", + new_api="dask_cudf.io.orc.to_orc", rec="Please use the DataFrame.to_orc method instead.", ) read_text = _deprecated_api( "dask_cudf.io.read_text", new_api="dask_cudf.read_text" ) -if QUERY_PLANNING_ON: - read_parquet = parquet.read_parquet -else: - read_parquet = _deprecated_api( - "The legacy dask_cudf.io.read_parquet API", - new_api="dask_cudf.read_parquet", - rec="", - ) +read_parquet = parquet.read_parquet to_parquet = _deprecated_api( "dask_cudf.io.to_parquet", new_api="dask_cudf._legacy.io.parquet.to_parquet", diff --git a/python/dask_cudf/dask_cudf/io/csv.py b/python/dask_cudf/dask_cudf/io/csv.py index 29f98b14511..2bd9a5c6607 100644 --- a/python/dask_cudf/dask_cudf/io/csv.py +++ b/python/dask_cudf/dask_cudf/io/csv.py @@ -25,11 +25,11 @@ def read_csv(path, blocksize="default", **kwargs): >>> import dask_cudf >>> df = dask_cudf.read_csv("myfiles.*.csv") - In some cases it can break up large files: + It can break up large files if blocksize is specified: >>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB") - It can read CSV files from external resources (e.g. S3, HTTP, FTP) + It can read CSV files from external resources (e.g. S3, HTTP, FTP): >>> df = dask_cudf.read_csv("s3://bucket/myfiles.*.csv") >>> df = dask_cudf.read_csv("https://www.mycloud.com/sample.csv") @@ -44,15 +44,15 @@ def read_csv(path, blocksize="default", **kwargs): ---------- path : str, path object, or file-like object Either a path to a file (a str, :py:class:`pathlib.Path`, or - py._path.local.LocalPath), URL (including http, ftp, and S3 - locations), or any object with a read() method (such as + ``py._path.local.LocalPath``), URL (including HTTP, FTP, and S3 + locations), or any object with a ``read()`` method (such as builtin :py:func:`open` file handler function or :py:class:`~io.StringIO`). blocksize : int or str, default "256 MiB" The target task partition size. If ``None``, a single block is used for each file. **kwargs : dict - Passthrough key-word arguments that are sent to + Passthrough keyword arguments that are sent to :func:`cudf:cudf.read_csv`. Notes diff --git a/python/dask_cudf/dask_cudf/io/json.py b/python/dask_cudf/dask_cudf/io/json.py index 8f85ea54c0a..98c5ceedb76 100644 --- a/python/dask_cudf/dask_cudf/io/json.py +++ b/python/dask_cudf/dask_cudf/io/json.py @@ -1,8 +1,209 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. -from dask_cudf import _deprecated_api +from functools import partial -read_json = _deprecated_api( - "dask_cudf.io.json.read_json", - new_api="dask_cudf.read_json", -) +import numpy as np +from fsspec.core import get_compression, get_fs_token_paths + +import dask +from dask.utils import parse_bytes + +import cudf +from cudf.core.column import as_column +from cudf.utils.ioutils import _is_local_filesystem + +from dask_cudf.backends import _default_backend + + +def _read_json_partition( + paths, + fs=None, + include_path_column=False, + path_converter=None, + **kwargs, +): + # Transfer all data up front for remote storage + sources = ( + paths + if fs is None + else fs.cat_ranges( + paths, + [0] * len(paths), + fs.sizes(paths), + ) + ) + + if include_path_column: + # Add "path" column. + # Must iterate over sources sequentially + if not isinstance(include_path_column, str): + include_path_column = "path" + converted_paths = ( + paths + if path_converter is None + else [path_converter(path) for path in paths] + ) + dfs = [] + for i, source in enumerate(sources): + df = cudf.read_json(source, **kwargs) + df[include_path_column] = as_column( + converted_paths[i], length=len(df) + ) + dfs.append(df) + return cudf.concat(dfs) + else: + # Pass sources directly to cudf + return cudf.read_json(sources, **kwargs) + + +def read_json( + url_path, + engine="auto", + blocksize=None, + orient="records", + lines=None, + compression="infer", + aggregate_files=True, + **kwargs, +): + """Read JSON data into a :class:`.DataFrame`. + + This function wraps :func:`dask.dataframe.read_json`, and passes + ``engine=partial(cudf.read_json, engine="auto")`` by default. + + Parameters + ---------- + url_path : str, list of str + Location to read from. If a string, can include a glob character to + find a set of file names. + Supports protocol specifications such as ``"s3://"``. + engine : str or Callable, default "auto" + + If str, this value will be used as the ``engine`` argument + when :func:`cudf.read_json` is used to create each partition. + If a :obj:`~collections.abc.Callable`, this value will be used as the + underlying function used to create each partition from JSON + data. The default value is "auto", so that + ``engine=partial(cudf.read_json, engine="auto")`` will be + passed to :func:`dask.dataframe.read_json` by default. + aggregate_files : bool or int + Whether to map multiple files to each output partition. If True, + the `blocksize` argument will be used to determine the number of + files in each partition. If any one file is larger than `blocksize`, + the `aggregate_files` argument will be ignored. If an integer value + is specified, the `blocksize` argument will be ignored, and that + number of files will be mapped to each partition. Default is True. + **kwargs : + Key-word arguments to pass through to :func:`dask.dataframe.read_json`. + + Returns + ------- + :class:`.DataFrame` + + Examples + -------- + Load single file + + >>> from dask_cudf import read_json + >>> read_json('myfile.json') # doctest: +SKIP + + Load large line-delimited JSON files using partitions of approx + 256MB size + + >>> read_json('data/file*.csv', blocksize=2**28) # doctest: +SKIP + + Load nested JSON data + + >>> read_json('myfile.json') # doctest: +SKIP + + See Also + -------- + dask.dataframe.read_json + + """ + + if lines is None: + lines = orient == "records" + if orient != "records" and lines: + raise ValueError( + 'Line-delimited JSON is only available with orient="records".' + ) + if blocksize and (orient != "records" or not lines): + raise ValueError( + "JSON file chunking only allowed for JSON-lines" + "input (orient='records', lines=True)." + ) + + inputs = [] + if aggregate_files and blocksize or int(aggregate_files) > 1: + # Attempt custom read if we are mapping multiple files + # to each output partition. Otherwise, upstream logic + # is sufficient. + + storage_options = kwargs.get("storage_options", {}) + fs, _, paths = get_fs_token_paths( + url_path, mode="rb", storage_options=storage_options + ) + if isinstance(aggregate_files, int) and aggregate_files > 1: + # Map a static file count to each partition + inputs = [ + paths[offset : offset + aggregate_files] + for offset in range(0, len(paths), aggregate_files) + ] + elif aggregate_files is True and blocksize: + # Map files dynamically (using blocksize) + file_sizes = fs.sizes(paths) # NOTE: This can be slow + blocksize = parse_bytes(blocksize) + if all([file_size <= blocksize for file_size in file_sizes]): + counts = np.unique( + np.floor(np.cumsum(file_sizes) / blocksize), + return_counts=True, + )[1] + offsets = np.concatenate([[0], counts.cumsum()]) + inputs = [ + paths[offsets[i] : offsets[i + 1]] + for i in range(len(offsets) - 1) + ] + + if inputs: + # Inputs were successfully populated. + # Use custom _read_json_partition function + # to generate each partition. + + compression = get_compression( + url_path[0] if isinstance(url_path, list) else url_path, + compression, + ) + _kwargs = dict( + orient=orient, + lines=lines, + compression=compression, + include_path_column=kwargs.get("include_path_column", False), + path_converter=kwargs.get("path_converter"), + ) + if not _is_local_filesystem(fs): + _kwargs["fs"] = fs + # TODO: Generate meta more efficiently + meta = _read_json_partition(inputs[0][:1], **_kwargs) + return dask.dataframe.from_map( + _read_json_partition, + inputs, + meta=meta, + **_kwargs, + ) + + # Fall back to dask.dataframe.read_json + return _default_backend( + dask.dataframe.read_json, + url_path, + engine=( + partial(cudf.read_json, engine=engine) + if isinstance(engine, str) + else engine + ), + blocksize=blocksize, + orient=orient, + lines=lines, + compression=compression, + **kwargs, + ) diff --git a/python/dask_cudf/dask_cudf/io/orc.py b/python/dask_cudf/dask_cudf/io/orc.py index 5219cdacc31..fcf684fd6c8 100644 --- a/python/dask_cudf/dask_cudf/io/orc.py +++ b/python/dask_cudf/dask_cudf/io/orc.py @@ -1,13 +1,195 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -from dask_cudf import _deprecated_api - -read_orc = _deprecated_api( - "dask_cudf.io.orc.read_orc", - new_api="dask_cudf.read_orc", -) -to_orc = _deprecated_api( - "dask_cudf.io.orc.to_orc", - new_api="dask_cudf._legacy.io.orc.to_orc", - rec="Please use the DataFrame.to_orc method instead.", -) +# Copyright (c) 2020-2024, NVIDIA CORPORATION. + +from io import BufferedWriter, IOBase + +from fsspec.core import get_fs_token_paths +from fsspec.utils import stringify_path +from pyarrow import orc as orc + +from dask import dataframe as dd +from dask.dataframe.io.utils import _get_pyarrow_dtypes + +import cudf + + +def _read_orc_stripe(source, fs, columns=None, kwargs=None): + """Pull out specific columns from specific stripe""" + path, stripe = source + if kwargs is None: + kwargs = {} + with fs.open(path, "rb") as f: + df_stripe = cudf.read_orc( + f, stripes=[stripe], columns=columns, **kwargs + ) + return df_stripe + + +def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs): + """Read ORC files into a :class:`.DataFrame`. + + Note that this function is mostly borrowed from upstream Dask. + + Parameters + ---------- + path : str or list[str] + Location of file(s), which can be a full URL with protocol specifier, + and may include glob character if a single string. + columns : None or list[str] + Columns to load. If None, loads all. + filters : None or list of tuple or list of lists of tuples + If not None, specifies a filter predicate used to filter out + row groups using statistics stored for each row group as + Parquet metadata. Row groups that do not match the given + filter predicate are not read. The predicate is expressed in + `disjunctive normal form (DNF) + `__ + like ``[[('x', '=', 0), ...], ...]``. DNF allows arbitrary + boolean logical combinations of single column predicates. The + innermost tuples each describe a single column predicate. The + list of inner predicates is interpreted as a conjunction + (AND), forming a more selective and multiple column predicate. + Finally, the outermost list combines these filters as a + disjunction (OR). Predicates may also be passed as a list of + tuples. This form is interpreted as a single conjunction. To + express OR in predicates, one must use the (preferred) + notation of list of lists of tuples. + storage_options : None or dict + Further parameters to pass to the bytes backend. + + See Also + -------- + dask.dataframe.read_orc + + Returns + ------- + dask_cudf.DataFrame + + """ + + storage_options = storage_options or {} + fs, _, paths = get_fs_token_paths( + path, mode="rb", storage_options=storage_options + ) + schema = None + nstripes_per_file = [] + for path in paths: + with fs.open(path, "rb") as f: + o = orc.ORCFile(f) + if schema is None: + schema = o.schema + elif schema != o.schema: + raise ValueError( + "Incompatible schemas while parsing ORC files" + ) + nstripes_per_file.append(o.nstripes) + schema = _get_pyarrow_dtypes(schema, categories=None) + if columns is not None: + ex = set(columns) - set(schema) + if ex: + raise ValueError( + f"Requested columns ({ex}) not in schema ({set(schema)})" + ) + else: + columns = list(schema) + + with fs.open(paths[0], "rb") as f: + meta = cudf.read_orc( + f, + stripes=[0] if nstripes_per_file[0] else None, + columns=columns, + **kwargs, + ) + + sources = [] + for path, n in zip(paths, nstripes_per_file): + for stripe in ( + range(n) + if filters is None + else cudf.io.orc._filter_stripes(filters, path) + ): + sources.append((path, stripe)) + + return dd.from_map( + _read_orc_stripe, + sources, + args=[fs], + columns=columns, + kwargs=kwargs, + meta=meta, + ) + + +def write_orc_partition(df, path, fs, filename, compression="snappy"): + full_path = fs.sep.join([path, filename]) + with fs.open(full_path, mode="wb") as out_file: + if not isinstance(out_file, IOBase): + out_file = BufferedWriter(out_file) + cudf.io.to_orc(df, out_file, compression=compression) + return full_path + + +def to_orc( + df, + path, + write_index=True, + storage_options=None, + compression="snappy", + compute=True, + **kwargs, +): + """ + Write a :class:`.DataFrame` to ORC file(s) (one file per partition). + + Parameters + ---------- + df : DataFrame + path : str or pathlib.Path + Destination directory for data. Prepend with protocol like ``s3://`` + or ``hdfs://`` for remote data. + write_index : boolean, optional + Whether or not to write the index. Defaults to True. + storage_options : None or dict + Further parameters to pass to the bytes backend. + compression : string or dict, optional + compute : bool, optional + If True (default) then the result is computed immediately. If + False then a :class:`~dask.delayed.Delayed` object is returned + for future computation. + + """ + + from dask import compute as dask_compute, delayed + + # TODO: Use upstream dask implementation once available + # (see: Dask Issue#5596) + + if hasattr(path, "name"): + path = stringify_path(path) + fs, _, _ = get_fs_token_paths( + path, mode="wb", storage_options=storage_options + ) + # Trim any protocol information from the path before forwarding + path = fs._strip_protocol(path) + + if write_index: + df = df.reset_index() + else: + # Not writing index - might as well drop it + df = df.reset_index(drop=True) + + fs.mkdirs(path, exist_ok=True) + + # Use i_offset and df.npartitions to define file-name list + filenames = ["part.%i.orc" % i for i in range(df.npartitions)] + + # write parts + dwrite = delayed(write_orc_partition) + parts = [ + dwrite(d, path, fs, filename, compression=compression) + for d, filename in zip(df.to_delayed(), filenames) + ] + + if compute: + return dask_compute(*parts) + + return delayed(list)(parts) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index ba6209c4820..0a0245445d2 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -37,10 +37,9 @@ def TaskList(*x): import cudf -from dask_cudf import QUERY_PLANNING_ON, _deprecated_api - # Dask-expr imports CudfEngine from this module from dask_cudf._legacy.io.parquet import CudfEngine +from dask_cudf.core import _deprecated_api if TYPE_CHECKING: from collections.abc import MutableMapping @@ -832,15 +831,8 @@ def read_parquet_expr( ) -if QUERY_PLANNING_ON: - read_parquet = read_parquet_expr - read_parquet.__doc__ = read_parquet_expr.__doc__ -else: - read_parquet = _deprecated_api( - "The legacy dask_cudf.io.parquet.read_parquet API", - new_api="dask_cudf.read_parquet", - rec="", - ) +read_parquet = read_parquet_expr +read_parquet.__doc__ = read_parquet_expr.__doc__ to_parquet = _deprecated_api( "dask_cudf.io.parquet.to_parquet", new_api="dask_cudf._legacy.io.parquet.to_parquet", diff --git a/python/dask_cudf/dask_cudf/io/tests/test_json.py b/python/dask_cudf/dask_cudf/io/tests/test_json.py index f5509cf91c3..b04a9045922 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_json.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_json.py @@ -11,10 +11,6 @@ from dask.utils import tmpfile import dask_cudf -from dask_cudf.tests.utils import skip_dask_expr - -# No dask-expr support for dask<2024.4.0 -pytestmark = skip_dask_expr(lt_version="2024.4.0") def test_read_json_backend_dispatch(tmp_path): @@ -137,7 +133,3 @@ def test_deprecated_api_paths(tmp_path): with pytest.warns(match="dask_cudf.io.read_json is now deprecated"): df2 = dask_cudf.io.read_json(path) dd.assert_eq(df, df2, check_divisions=False) - - with pytest.warns(match="dask_cudf.io.json.read_json is now deprecated"): - df2 = dask_cudf.io.json.read_json(path) - dd.assert_eq(df, df2, check_divisions=False) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_orc.py b/python/dask_cudf/dask_cudf/io/tests/test_orc.py index b6064d851ca..ab1a16814ef 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_orc.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_orc.py @@ -12,10 +12,6 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import skip_dask_expr - -# No dask-expr support for dask<2024.4.0 -pytestmark = skip_dask_expr(lt_version="2024.4.0") cur_dir = os.path.dirname(__file__) sample_orc = os.path.join(cur_dir, "data/orc/sample.orc") @@ -159,7 +155,3 @@ def test_deprecated_api_paths(tmpdir): with pytest.warns(match="dask_cudf.io.read_orc is now deprecated"): df2 = dask_cudf.io.read_orc(paths) dd.assert_eq(df, df2, check_divisions=False) - - with pytest.warns(match="dask_cudf.io.orc.read_orc is now deprecated"): - df2 = dask_cudf.io.orc.read_orc(paths) - dd.assert_eq(df, df2, check_divisions=False) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 6efe6c4f388..6f1d3979106 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -16,11 +16,6 @@ import dask_cudf from dask_cudf._legacy.io.parquet import create_metadata_file -from dask_cudf.tests.utils import ( - require_dask_expr, - skip_dask_expr, - xfail_dask_expr, -) # Check if create_metadata_file is supported by # the current dask.dataframe version @@ -450,7 +445,6 @@ def test_create_metadata_file(tmpdir, partition_on): dd.assert_eq(ddf1, ddf2) -@xfail_dask_expr("Newer dask version needed", lt_version="2024.5.0") @need_create_meta def test_create_metadata_file_inconsistent_schema(tmpdir): # NOTE: This test demonstrates that the CudfEngine @@ -531,19 +525,6 @@ def test_cudf_list_struct_write(tmpdir): dd.assert_eq(df, new_ddf) -@skip_dask_expr("Not necessary in dask-expr") -def test_check_file_size(tmpdir): - # Test simple file-size check to help warn users - # of upstream change to `split_row_groups` default - fn = str(tmpdir.join("test.parquet")) - cudf.DataFrame({"a": np.arange(1000)}).to_parquet(fn) - with pytest.warns(match="large parquet file"): - # Need to use `dask_cudf._legacy.io` path - # TODO: Remove outdated `check_file_size` functionality - dask_cudf._legacy.io.read_parquet(fn, check_file_size=1).compute() - - -@xfail_dask_expr("HivePartitioning cannot be hashed", lt_version="2024.3.0") def test_null_partition(tmpdir): import pyarrow as pa from pyarrow.dataset import HivePartitioning @@ -626,7 +607,6 @@ def test_timezone_column(tmpdir): dd.assert_eq(got, expect) -@require_dask_expr() @pytest.mark.skipif( not dask_cudf.backends.PYARROW_GE_15, reason="Requires pyarrow 15", @@ -677,17 +657,8 @@ def test_deprecated_api_paths(tmpdir): with pytest.warns(match="dask_cudf.io.to_parquet is now deprecated"): dask_cudf.io.to_parquet(df, tmpdir) - if dask_cudf.QUERY_PLANNING_ON: - df2 = dask_cudf.io.read_parquet(tmpdir) - dd.assert_eq(df, df2, check_divisions=False) - - df2 = dask_cudf.io.parquet.read_parquet(tmpdir) - dd.assert_eq(df, df2, check_divisions=False) - else: - with pytest.warns(match="legacy dask_cudf.io.read_parquet"): - df2 = dask_cudf.io.read_parquet(tmpdir) - dd.assert_eq(df, df2, check_divisions=False) + df2 = dask_cudf.io.read_parquet(tmpdir) + dd.assert_eq(df, df2, check_divisions=False) - with pytest.warns(match="legacy dask_cudf.io.parquet.read_parquet"): - df2 = dask_cudf.io.parquet.read_parquet(tmpdir) - dd.assert_eq(df, df2, check_divisions=False) + df2 = dask_cudf.io.parquet.read_parquet(tmpdir) + dd.assert_eq(df, df2, check_divisions=False) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 90907f6fb99..11d3a89bf52 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -14,7 +14,6 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import QUERY_PLANNING_ON moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") @@ -136,7 +135,7 @@ def test_read_parquet_open_file_options_raises(): pytest.param( "arrow", marks=pytest.mark.skipif( - not QUERY_PLANNING_ON or not dask_cudf.backends.PYARROW_GE_15, + not dask_cudf.backends.PYARROW_GE_15, reason="Not supported", ), ), diff --git a/python/dask_cudf/dask_cudf/io/tests/test_text.py b/python/dask_cudf/dask_cudf/io/tests/test_text.py index e35b6411a9d..3006dece9e7 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_text.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_text.py @@ -9,10 +9,6 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import skip_dask_expr - -# No dask-expr support for dask<2024.4.0 -pytestmark = skip_dask_expr(lt_version="2024.4.0") cur_dir = os.path.dirname(__file__) text_file = os.path.join(cur_dir, "data/text/sample.pgn") @@ -42,7 +38,3 @@ def test_deprecated_api_paths(): with pytest.warns(match="dask_cudf.io.read_text is now deprecated"): df2 = dask_cudf.io.read_text(text_file, delimiter=".") dd.assert_eq(df, df2, check_divisions=False) - - with pytest.warns(match="dask_cudf.io.text.read_text is now deprecated"): - df2 = dask_cudf.io.text.read_text(text_file, delimiter=".") - dd.assert_eq(df, df2, check_divisions=False) diff --git a/python/dask_cudf/dask_cudf/io/text.py b/python/dask_cudf/dask_cudf/io/text.py index 1caf4e81d8e..3757c85c80c 100644 --- a/python/dask_cudf/dask_cudf/io/text.py +++ b/python/dask_cudf/dask_cudf/io/text.py @@ -1,8 +1,56 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. -from dask_cudf import _deprecated_api +import os +from glob import glob -read_text = _deprecated_api( - "dask_cudf.io.text.read_text", - new_api="dask_cudf.read_text", -) +import dask.dataframe as dd +from dask.utils import parse_bytes + +import cudf + + +def _read_text(source, **kwargs): + # Wrapper for cudf.read_text operation + fn, byte_range = source + return cudf.read_text(fn, byte_range=byte_range, **kwargs) + + +def read_text(path, chunksize="256 MiB", byte_range=None, **kwargs): + if isinstance(chunksize, str): + chunksize = parse_bytes(chunksize) + + if isinstance(path, list): + filenames = path + elif isinstance(path, str): + filenames = sorted(glob(path)) + elif hasattr(path, "__fspath__"): + filenames = sorted(glob(path.__fspath__())) + else: + raise TypeError(f"Path type not understood:{type(path)}") + + if not filenames: + msg = f"A file in: {filenames} does not exist." + raise FileNotFoundError(msg) + + if chunksize and byte_range: + raise ValueError("Cannot specify both chunksize and byte_range.") + + if chunksize: + sources = [] + for fn in filenames: + size = os.path.getsize(fn) + for start in range(0, size, chunksize): + byte_range = ( + start, + chunksize, + ) # specify which chunk of the file we care about + sources.append((fn, byte_range)) + else: + sources = [(fn, byte_range) for fn in filenames] + + return dd.from_map( + _read_text, + sources, + meta=cudf.Series([], dtype="O"), + **kwargs, + ) diff --git a/python/dask_cudf/dask_cudf/tests/test_accessor.py b/python/dask_cudf/dask_cudf/tests/test_accessor.py index 3fbb2aacd2c..81cf5d0b5cf 100644 --- a/python/dask_cudf/dask_cudf/tests/test_accessor.py +++ b/python/dask_cudf/dask_cudf/tests/test_accessor.py @@ -13,7 +13,6 @@ from cudf.testing._utils import does_not_raise import dask_cudf -from dask_cudf.tests.utils import xfail_dask_expr ############################################################################# # Datetime Accessor # @@ -112,7 +111,6 @@ def test_categorical_accessor_initialization2(data): dsr.cat -@xfail_dask_expr("Newer dask version needed", lt_version="2024.5.0") @pytest.mark.parametrize("data", [data_cat_1()]) def test_categorical_basic(data): cat = data.copy() diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index 7101fb7e00a..fa157091af7 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -15,12 +15,6 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import ( - QUERY_PLANNING_ON, - require_dask_expr, - skip_dask_expr, - xfail_dask_expr, -) rng = np.random.default_rng(seed=0) @@ -393,44 +387,6 @@ def test_setitem_scalar_datetime(): np.testing.assert_array_equal(got["z"], df["z"]) -@skip_dask_expr("Not relevant for dask-expr") -@pytest.mark.parametrize( - "func", - [ - lambda: pd.DataFrame( - {"A": rng.random(10), "B": rng.random(10)}, - index=list("abcdefghij"), - ), - lambda: pd.DataFrame( - { - "A": rng.random(10), - "B": list("a" * 10), - "C": pd.Series( - [str(20090101 + i) for i in range(10)], - dtype="datetime64[ns]", - ), - }, - index=list("abcdefghij"), - ), - lambda: pd.Series(list("abcdefghijklmnop")), - lambda: pd.Series( - rng.random(10), - index=pd.Index( - [str(20090101 + i) for i in range(10)], dtype="datetime64[ns]" - ), - ), - ], -) -def test_repr(func): - pdf = func() - gdf = cudf.from_pandas(pdf) - gddf = dd.from_pandas(gdf, npartitions=3, sort=False) - - assert repr(gddf) - if hasattr(pdf, "_repr_html_"): - assert gddf._repr_html_() - - @pytest.mark.skip(reason="datetime indexes not fully supported in cudf") @pytest.mark.parametrize("start", ["1d", "5d", "1w", "12h"]) @pytest.mark.parametrize("stop", ["1d", "3d", "8h"]) @@ -784,7 +740,6 @@ def test_dataframe_set_index(): assert_eq(ddf.compute(), pddf.compute()) -@xfail_dask_expr("Newer dask version needed", lt_version="2024.5.0") def test_series_describe(): random.seed(0) sr = cudf.datasets.randomdata(20)["x"] @@ -800,7 +755,6 @@ def test_series_describe(): ) -@xfail_dask_expr("Newer dask version needed", lt_version="2024.5.0") def test_dataframe_describe(): random.seed(0) df = cudf.datasets.randomdata(20) @@ -814,7 +768,6 @@ def test_dataframe_describe(): ) -@xfail_dask_expr("Newer dask version needed", lt_version="2024.5.0") def test_zero_std_describe(): num = 84886781 df = cudf.DataFrame( @@ -932,14 +885,9 @@ def func(x): result = ds.map_partitions(func, meta=s.values) - if QUERY_PLANNING_ON: - # Check Array and round-tripped DataFrame - dask.array.assert_eq(result, func(s)) - dd.assert_eq(result.to_dask_dataframe(), s, check_index=False) - else: - # Legacy version still carries numpy metadata - # See: https://github.com/dask/dask/issues/11017 - dask.array.assert_eq(result.compute(), func(s)) + # Check Array and round-tripped DataFrame + dask.array.assert_eq(result, func(s)) + dd.assert_eq(result.to_dask_dataframe(), s, check_index=False) def test_implicit_array_conversion_cupy_sparse(): @@ -981,7 +929,6 @@ def test_series_isin_error(): ddf.isin([1, 5, "a"]).compute() -@require_dask_expr() def test_to_backend_simplify(): # Check that column projection is not blocked by to_backend with dask.config.set({"dataframe.backend": "pandas"}): diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 9bd3b506db0..e7efe4ea4ef 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -13,12 +13,7 @@ from cudf.testing._utils import expect_warning_if import dask_cudf -from dask_cudf._legacy.groupby import OPTIMIZED_AGGS, _aggs_optimized -from dask_cudf.tests.utils import ( - QUERY_PLANNING_ON, - require_dask_expr, - xfail_dask_expr, -) +from dask_cudf._expr.groupby import OPTIMIZED_AGGS, _aggs_optimized def assert_cudf_groupby_layers(ddf): @@ -78,18 +73,12 @@ def test_groupby_basic(series, aggregation, pdf): expect = getattr(gdf_grouped, aggregation)() actual = getattr(ddf_grouped, aggregation)() - if not QUERY_PLANNING_ON: - assert_cudf_groupby_layers(actual) - dd.assert_eq(expect, actual, check_dtype=check_dtype) if not series: expect = gdf_grouped.agg({"x": aggregation}) actual = ddf_grouped.agg({"x": aggregation}) - if not QUERY_PLANNING_ON: - assert_cudf_groupby_layers(actual) - dd.assert_eq(expect, actual, check_dtype=check_dtype) @@ -134,13 +123,6 @@ def test_groupby_agg(func, aggregation, pdf): check_dtype = aggregation != "count" - if not QUERY_PLANNING_ON: - assert_cudf_groupby_layers(actual) - - # groupby.agg should add an explicit getitem layer - # to improve/enable column projection - assert hlg_layer(actual.dask, "getitem") - dd.assert_eq(expect, actual, check_names=False, check_dtype=check_dtype) @@ -556,20 +538,13 @@ def test_groupby_categorical_key(): True, pytest.param( False, - marks=xfail_dask_expr("as_index not supported in dask-expr"), - ), - ], -) -@pytest.mark.parametrize( - "fused", - [ - True, - pytest.param( - False, - marks=require_dask_expr("Not supported by legacy API"), + marks=pytest.mark.xfail( + reason="as_index not supported in dask-expr" + ), ), ], ) +@pytest.mark.parametrize("fused", [True, False]) @pytest.mark.parametrize("split_out", ["use_dask_default", 1, 2]) @pytest.mark.parametrize("split_every", [False, 4]) @pytest.mark.parametrize("npartitions", [1, 10]) @@ -590,19 +565,16 @@ def test_groupby_agg_params( "c": ["mean", "std", "var"], } - fused_kwarg = {"fused": fused} if QUERY_PLANNING_ON else {} + fused_kwarg = {"fused": fused} split_kwargs = {"split_every": split_every, "split_out": split_out} if split_out == "use_dask_default": split_kwargs.pop("split_out") # Avoid using as_index when query-planning is enabled - if QUERY_PLANNING_ON: - with pytest.warns(FutureWarning, match="argument is now deprecated"): - # Should warn when `as_index` is used - ddf.groupby(["name", "a"], sort=False, as_index=as_index) - maybe_as_index = {"as_index": as_index} if as_index is False else {} - else: - maybe_as_index = {"as_index": as_index} + with pytest.warns(FutureWarning, match="argument is now deprecated"): + # Should warn when `as_index` is used + ddf.groupby(["name", "a"], sort=False, as_index=as_index) + maybe_as_index = {"as_index": as_index} if as_index is False else {} # Check `sort=True` behavior if split_out == 1: @@ -671,7 +643,6 @@ def test_groupby_agg_params( dd.assert_eq(gf, pf) -@xfail_dask_expr("Newer dask version needed", lt_version="2024.5.0") @pytest.mark.parametrize( "aggregations", [(sum, "sum"), (max, "max"), (min, "min")] ) @@ -711,7 +682,6 @@ def test_is_supported(arg, supported): assert _aggs_optimized(arg, OPTIMIZED_AGGS) is supported -@xfail_dask_expr("Newer dask version needed", lt_version="2024.5.0") def test_groupby_unique_lists(): df = pd.DataFrame({"a": [0, 0, 0, 1, 1, 1], "b": [10, 10, 10, 7, 8, 9]}) gdf = cudf.from_pandas(df) @@ -758,7 +728,7 @@ def test_groupby_first_last(data, agg): ) -@xfail_dask_expr("Co-alignment check fails in dask-expr") +@pytest.mark.xfail(reason="Co-alignment check fails in dask-expr") def test_groupby_with_list_of_series(): df = cudf.DataFrame({"a": [1, 2, 3, 4, 5]}) gdf = dask_cudf.from_cudf(df, npartitions=2) @@ -773,7 +743,6 @@ def test_groupby_with_list_of_series(): ) -@xfail_dask_expr("Newer dask version needed", lt_version="2024.5.0") @pytest.mark.parametrize( "func", [ @@ -833,7 +802,7 @@ def test_groupby_all_columns(func): expect = func(ddf) actual = func(gddf) - dd.assert_eq(expect, actual, check_names=not QUERY_PLANNING_ON) + dd.assert_eq(expect, actual, check_names=False) def test_groupby_shuffle(): @@ -870,15 +839,3 @@ def test_groupby_shuffle(): # NOTE: `shuffle_method=True` should be default got = gddf.groupby("a", sort=False).agg(spec, split_out=2) dd.assert_eq(expect, got.compute().sort_index()) - - if not QUERY_PLANNING_ON: - # Sorted aggregation fails with split_out>1 when shuffle is False - # (sort=True, split_out=2, shuffle_method=False) - with pytest.raises(ValueError): - gddf.groupby("a", sort=True).agg( - spec, shuffle_method=False, split_out=2 - ) - - # Check shuffle kwarg deprecation - with pytest.warns(match="'shuffle' keyword is deprecated"): - gddf.groupby("a", sort=True).agg(spec, shuffle=False) diff --git a/python/dask_cudf/dask_cudf/tests/test_onehot.py b/python/dask_cudf/dask_cudf/tests/test_onehot.py index 0b7c7855e07..428cad20183 100644 --- a/python/dask_cudf/dask_cudf/tests/test_onehot.py +++ b/python/dask_cudf/dask_cudf/tests/test_onehot.py @@ -8,12 +8,6 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import xfail_dask_expr - -# No dask-expr support -pytestmark = xfail_dask_expr( - "Newer dask version needed", lt_version="2024.5.0" -) def test_get_dummies_cat(): diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index 02c815427f3..4ae75082ada 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -10,7 +10,6 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import xfail_dask_expr @pytest.mark.parametrize("ascending", [True, False]) @@ -67,7 +66,6 @@ def test_sort_repartition(): dd.assert_eq(len(new_ddf), len(ddf)) -@xfail_dask_expr("missing null support", lt_version="2024.5.1") @pytest.mark.parametrize("na_position", ["first", "last"]) @pytest.mark.parametrize("ascending", [True, False]) @pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) diff --git a/python/dask_cudf/dask_cudf/tests/utils.py b/python/dask_cudf/dask_cudf/tests/utils.py index b44b3f939e7..8b28b4366b8 100644 --- a/python/dask_cudf/dask_cudf/tests/utils.py +++ b/python/dask_cudf/dask_cudf/tests/utils.py @@ -2,21 +2,11 @@ import numpy as np import pandas as pd -import pytest -from packaging.version import Version -import dask import dask.dataframe as dd import cudf -from dask_cudf import QUERY_PLANNING_ON - -if QUERY_PLANNING_ON: - DASK_VERSION = Version(dask.__version__) -else: - DASK_VERSION = None - def _make_random_frame(nelem, npartitions=2, include_na=False): rng = np.random.default_rng(seed=0) @@ -30,26 +20,3 @@ def _make_random_frame(nelem, npartitions=2, include_na=False): gdf = cudf.DataFrame.from_pandas(df) dgf = dd.from_pandas(gdf, npartitions=npartitions) return df, dgf - - -_default_reason = "Not compatible with dask-expr" - - -def skip_dask_expr(reason=_default_reason, lt_version=None): - if lt_version is not None: - skip = QUERY_PLANNING_ON and DASK_VERSION < Version(lt_version) - else: - skip = QUERY_PLANNING_ON - return pytest.mark.skipif(skip, reason=reason) - - -def xfail_dask_expr(reason=_default_reason, lt_version=None): - if lt_version is not None: - xfail = QUERY_PLANNING_ON and DASK_VERSION < Version(lt_version) - else: - xfail = QUERY_PLANNING_ON - return pytest.mark.xfail(xfail, reason=reason) - - -def require_dask_expr(reason="requires dask-expr"): - return pytest.mark.skipif(not QUERY_PLANNING_ON, reason=reason) diff --git a/python/dask_cudf/pyproject.toml b/python/dask_cudf/pyproject.toml index 33ba8fe083f..1a25d616f74 100644 --- a/python/dask_cudf/pyproject.toml +++ b/python/dask_cudf/pyproject.toml @@ -39,10 +39,10 @@ classifiers = [ ] [project.entry-points."dask.dataframe.backends"] -cudf = "dask_cudf.backends:CudfBackendEntrypoint" +cudf = "dask_cudf.backends:LegacyCudfBackendEntrypoint" [project.entry-points."dask_expr.dataframe.backends"] -cudf = "dask_cudf.backends:CudfDXBackendEntrypoint" +cudf = "dask_cudf.backends:CudfBackendEntrypoint" [project.optional-dependencies] test = [ @@ -104,8 +104,5 @@ filterwarnings = [ # https://github.com/dask/partd/blob/main/partd/pandas.py#L198 "ignore:Passing a BlockManager to DataFrame is deprecated and will raise in a future version. Use public APIs instead.:DeprecationWarning", "ignore:String support for `aggregate_files` is experimental. Behavior may change in the future.:FutureWarning:dask", - # Dask now loudly throws warnings: https://github.com/dask/dask/pull/11437 - # When the legacy implementation is removed we can remove this warning and stop running pytests with `DASK_DATAFRAME__QUERY_PLANNING=False` - "ignore:The legacy Dask DataFrame implementation is deprecated and will be removed in a future version.*:FutureWarning", ] xfail_strict = true