diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index b402db0443d..8cec8af3c67 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources column.pyx copying.pyx groupby.pyx interop.pyx reduce.pyx scalar.pyx sort.pyx +set(cython_sources column.pyx copying.pyx groupby.pyx interop.pyx scalar.pyx sort.pyx stream_compaction.pyx string_casting.pyx strings_udf.pyx types.pyx utils.pyx ) set(linked_libraries cudf::cudf) diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 0299b264189..001e5cbb676 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -5,7 +5,6 @@ copying, groupby, interop, - reduce, sort, stream_compaction, string_casting, diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index a7ea9c25a86..ef544dc89eb 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -12,8 +12,6 @@ from cudf._lib.scalar import as_device_scalar from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.reduce import minmax - from pylibcudf.libcudf.types cimport size_type from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table @@ -34,7 +32,7 @@ def _gather_map_is_valid( """ if not check_bounds or nullify or len(gather_map) == 0: return True - gm_min, gm_max = minmax(gather_map) + gm_min, gm_max = gather_map.minmax() return gm_min >= -nrows and gm_max < nrows diff --git a/python/cudf/cudf/_lib/reduce.pyx b/python/cudf/cudf/_lib/reduce.pyx deleted file mode 100644 index 2850cab93a1..00000000000 --- a/python/cudf/cudf/_lib/reduce.pyx +++ /dev/null @@ -1,135 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. -import warnings - -import cudf -from cudf.core.buffer import acquire_spill_lock - -from cudf._lib.column cimport Column -from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.types cimport dtype_to_pylibcudf_type, is_decimal_type_id - -import pylibcudf - -from cudf.core._internals.aggregation import make_aggregation - - -@acquire_spill_lock() -def reduce(reduction_op, Column incol, dtype=None, **kwargs): - """ - Top level Cython reduce function wrapping libcudf reductions. - - Parameters - ---------- - reduction_op : string - A string specifying the operation, e.g. sum, prod - incol : Column - A cuDF Column object - dtype: numpy.dtype, optional - A numpy data type to use for the output, defaults - to the same type as the input column - """ - if dtype is not None: - warnings.warn( - "dtype is deprecated and will be remove in a future release. " - "Cast the result (e.g. .astype) after the operation instead.", - FutureWarning - ) - col_dtype = dtype - else: - col_dtype = incol._reduction_result_dtype(reduction_op) - - # check empty case - if len(incol) <= incol.null_count: - if reduction_op == 'sum' or reduction_op == 'sum_of_squares': - return incol.dtype.type(0) - if reduction_op == 'product': - return incol.dtype.type(1) - if reduction_op == "any": - return False - - return cudf.utils.dtypes._get_nan_for_dtype(col_dtype) - - result = pylibcudf.reduce.reduce( - incol.to_pylibcudf(mode="read"), - make_aggregation(reduction_op, kwargs).c_obj, - dtype_to_pylibcudf_type(col_dtype), - ) - - if is_decimal_type_id(result.type().id()): - scale = -result.type().scale() - precision = _reduce_precision(col_dtype, reduction_op, len(incol)) - return DeviceScalar.from_pylibcudf( - result, - dtype=col_dtype.__class__(precision, scale), - ).value - scalar = DeviceScalar.from_pylibcudf(result).value - if isinstance(col_dtype, cudf.StructDtype): - # TODO: Utilize column_metadata in libcudf to maintain field labels - return dict(zip(col_dtype.fields.keys(), scalar.values())) - return scalar - - -@acquire_spill_lock() -def scan(scan_op, Column incol, inclusive, **kwargs): - """ - Top level Cython scan function wrapping libcudf scans. - - Parameters - ---------- - incol : Column - A cuDF Column object - scan_op : string - A string specifying the operation, e.g. cumprod - inclusive: bool - Flag for including nulls in relevant scan - """ - return Column.from_pylibcudf( - pylibcudf.reduce.scan( - incol.to_pylibcudf(mode="read"), - make_aggregation(scan_op, kwargs).c_obj, - pylibcudf.reduce.ScanType.INCLUSIVE if inclusive - else pylibcudf.reduce.ScanType.EXCLUSIVE, - ) - ) - - -@acquire_spill_lock() -def minmax(Column incol): - """ - Top level Cython minmax function wrapping libcudf minmax. - - Parameters - ---------- - incol : Column - A cuDF Column object - - Returns - ------- - A pair of ``(min, max)`` values of ``incol`` - """ - min, max = pylibcudf.reduce.minmax(incol.to_pylibcudf(mode="read")) - return ( - cudf.Scalar.from_device_scalar(DeviceScalar.from_pylibcudf(min)), - cudf.Scalar.from_device_scalar(DeviceScalar.from_pylibcudf(max)), - ) - - -def _reduce_precision(dtype, op, nrows): - """ - Returns the result precision when performing the reduce - operation `op` for the given dtype and column size. - - See: https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql - """ # noqa: E501 - p = dtype.precision - if op in ("min", "max"): - new_p = p - elif op == "sum": - new_p = p + nrows - 1 - elif op == "product": - new_p = p * nrows + nrows - 1 - elif op == "sum_of_squares": - new_p = 2 * p + nrows - else: - raise NotImplementedError() - return max(min(new_p, dtype.MAX_PRECISION), 0) diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 68307f0e109..42b4fda8be2 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -2,6 +2,7 @@ from __future__ import annotations +import warnings from collections import abc from collections.abc import MutableSequence, Sequence from functools import cached_property @@ -31,7 +32,7 @@ drop_duplicates, drop_nulls, ) -from cudf._lib.types import size_type_dtype +from cudf._lib.types import dtype_to_pylibcudf_type, size_type_dtype from cudf.api.types import ( _is_non_decimal_numeric_dtype, _is_pandas_nullable_extension_dtype, @@ -41,7 +42,7 @@ is_string_dtype, ) from cudf.core._compat import PANDAS_GE_210 -from cudf.core._internals import unary +from cudf.core._internals import aggregation, unary from cudf.core._internals.timezones import get_compatible_timezone from cudf.core.abc import Serializable from cudf.core.buffer import ( @@ -259,21 +260,17 @@ def all(self, skipna: bool = True) -> bool: # The skipna argument is only used for numerical columns. # If all entries are null the result is True, including when the column # is empty. - if self.null_count == self.size: return True - - return libcudf.reduce.reduce("all", self) + return self.reduce("all") def any(self, skipna: bool = True) -> bool: # Early exit for fast cases. - if not skipna and self.has_nulls(): return True elif skipna and self.null_count == self.size: return False - - return libcudf.reduce.reduce("any", self) + return self.reduce("any") def dropna(self) -> Self: if self.has_nulls(): @@ -1393,33 +1390,35 @@ def _reduce( ) if isinstance(preprocessed, ColumnBase): dtype = kwargs.pop("dtype", None) - return libcudf.reduce.reduce( - op, preprocessed, dtype=dtype, **kwargs - ) + return preprocessed.reduce(op, dtype, **kwargs) return preprocessed + def _can_return_nan(self, skipna: bool | None = None) -> bool: + return not skipna and self.has_nulls(include_nan=False) + def _process_for_reduction( self, skipna: bool | None = None, min_count: int = 0 ) -> ColumnBase | ScalarLike: - if skipna is None: - skipna = True + skipna = True if skipna is None else skipna - if self.has_nulls(): + if self._can_return_nan(skipna=skipna): + return cudf.utils.dtypes._get_nan_for_dtype(self.dtype) + + col = self.nans_to_nulls() if skipna else self + if col.has_nulls(): if skipna: - result_col = self.dropna() + col = col.dropna() else: return cudf.utils.dtypes._get_nan_for_dtype(self.dtype) - result_col = self - # TODO: If and when pandas decides to validate that `min_count` >= 0 we # should insert comparable behavior. # https://github.com/pandas-dev/pandas/issues/50022 if min_count > 0: - valid_count = len(result_col) - result_col.null_count + valid_count = len(col) - col.null_count if valid_count < min_count: return cudf.utils.dtypes._get_nan_for_dtype(self.dtype) - return result_col + return col def _reduction_result_dtype(self, reduction_op: str) -> Dtype: """ @@ -1529,6 +1528,91 @@ def one_hot_encode( for col in plc_table.columns() ) + @acquire_spill_lock() + def scan(self, scan_op: str, inclusive: bool, **kwargs) -> Self: + return type(self).from_pylibcudf( # type: ignore[return-value] + plc.reduce.scan( + self.to_pylibcudf(mode="read"), + aggregation.make_aggregation(scan_op, kwargs).c_obj, + plc.reduce.ScanType.INCLUSIVE + if inclusive + else plc.reduce.ScanType.EXCLUSIVE, + ) + ) + + def reduce(self, reduction_op: str, dtype=None, **kwargs) -> ScalarLike: + if dtype is not None: + warnings.warn( + "dtype is deprecated and will be remove in a future release. " + "Cast the result (e.g. .astype) after the operation instead.", + FutureWarning, + ) + col_dtype = dtype + else: + col_dtype = self._reduction_result_dtype(reduction_op) + + # check empty case + if len(self) <= self.null_count: + if reduction_op == "sum" or reduction_op == "sum_of_squares": + return self.dtype.type(0) + if reduction_op == "product": + return self.dtype.type(1) + if reduction_op == "any": + return False + + return cudf.utils.dtypes._get_nan_for_dtype(col_dtype) + + with acquire_spill_lock(): + plc_scalar = plc.reduce.reduce( + self.to_pylibcudf(mode="read"), + aggregation.make_aggregation(reduction_op, kwargs).c_obj, + dtype_to_pylibcudf_type(col_dtype), + ) + result_col = type(self).from_pylibcudf( + plc.Column.from_scalar(plc_scalar, 1) + ) + if plc_scalar.type().id() in { + plc.TypeId.DECIMAL128, + plc.TypeId.DECIMAL64, + plc.TypeId.DECIMAL32, + }: + scale = -plc_scalar.type().scale() + # https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql + p = col_dtype.precision + nrows = len(self) + if reduction_op in {"min", "max"}: + new_p = p + elif reduction_op == "sum": + new_p = p + nrows - 1 + elif reduction_op == "product": + new_p = p * nrows + nrows - 1 + elif reduction_op == "sum_of_squares": + new_p = 2 * p + nrows + else: + raise NotImplementedError( + f"{reduction_op} not implemented for decimal types." + ) + precision = max(min(new_p, col_dtype.MAX_PRECISION), 0) + new_dtype = type(col_dtype)(precision, scale) + result_col = result_col.astype(new_dtype) + elif isinstance(col_dtype, cudf.IntervalDtype): + result_col = type(self).from_struct_column( # type: ignore[attr-defined] + result_col, closed=col_dtype.closed + ) + return result_col.element_indexing(0) + + @acquire_spill_lock() + def minmax(self) -> tuple[ScalarLike, ScalarLike]: + min_val, max_val = plc.reduce.minmax(self.to_pylibcudf(mode="read")) + return ( + type(self) + .from_pylibcudf(plc.Column.from_scalar(min_val, 1)) + .element_indexing(0), + type(self) + .from_pylibcudf(plc.Column.from_scalar(max_val, 1)) + .element_indexing(0), + ) + def _has_any_nan(arbitrary: pd.Series | np.ndarray) -> bool: """Check if an object dtype Series or array contains NaN.""" diff --git a/python/cudf/cudf/core/column/interval.py b/python/cudf/cudf/core/column/interval.py index 34975fc94f4..dd8f58a118e 100644 --- a/python/cudf/cudf/core/column/interval.py +++ b/python/cudf/cudf/core/column/interval.py @@ -14,7 +14,6 @@ if TYPE_CHECKING: from typing_extensions import Self - from cudf._typing import ScalarLike from cudf.core.buffer import Buffer from cudf.core.column import ColumnBase @@ -211,16 +210,3 @@ def element_indexing(self, index: int): if cudf.get_option("mode.pandas_compatible"): return pd.Interval(**result, closed=self.dtype.closed) return result - - def _reduce( - self, - op: str, - skipna: bool | None = None, - min_count: int = 0, - *args, - **kwargs, - ) -> ScalarLike: - result = super()._reduce(op, skipna, min_count, *args, **kwargs) - if cudf.get_option("mode.pandas_compatible"): - return pd.Interval(**result, closed=self.dtype.closed) - return result diff --git a/python/cudf/cudf/core/column/numerical.py b/python/cudf/cudf/core/column/numerical.py index 790cd6ea9bb..28a2bd7fa6c 100644 --- a/python/cudf/cudf/core/column/numerical.py +++ b/python/cudf/cudf/core/column/numerical.py @@ -420,22 +420,12 @@ def all(self, skipna: bool = True) -> bool: # If all entries are null the result is True, including when the column # is empty. result_col = self.nans_to_nulls() if skipna else self - - if result_col.null_count == result_col.size: - return True - - return libcudf.reduce.reduce("all", result_col) + return super(type(self), result_col).all(skipna=skipna) def any(self, skipna: bool = True) -> bool: # Early exit for fast cases. result_col = self.nans_to_nulls() if skipna else self - - if not skipna and result_col.has_nulls(): - return True - elif skipna and result_col.null_count == result_col.size: - return False - - return libcudf.reduce.reduce("any", result_col) + return super(type(self), result_col).any(skipna=skipna) @functools.cached_property def nan_count(self) -> int: @@ -483,19 +473,6 @@ def _process_values_for_isin( def _can_return_nan(self, skipna: bool | None = None) -> bool: return not skipna and self.has_nulls(include_nan=True) - def _process_for_reduction( - self, skipna: bool | None = None, min_count: int = 0 - ) -> NumericalColumn | ScalarLike: - skipna = True if skipna is None else skipna - - if self._can_return_nan(skipna=skipna): - return cudf.utils.dtypes._get_nan_for_dtype(self.dtype) - - col = self.nans_to_nulls() if skipna else self - return super(NumericalColumn, col)._process_for_reduction( - skipna=skipna, min_count=min_count - ) - def find_and_replace( self, to_replace: ColumnLike, diff --git a/python/cudf/cudf/core/column/numerical_base.py b/python/cudf/cudf/core/column/numerical_base.py index 3f9abdabc2f..e06a0447f5c 100644 --- a/python/cudf/cudf/core/column/numerical_base.py +++ b/python/cudf/cudf/core/column/numerical_base.py @@ -263,6 +263,6 @@ def round( ) def _scan(self, op: str) -> ColumnBase: - return libcudf.reduce.scan( - op.replace("cum", ""), self, True - )._with_type_metadata(self.dtype) + return self.scan(op.replace("cum", ""), True)._with_type_metadata( + self.dtype + ) diff --git a/python/cudf/cudf/core/column/struct.py b/python/cudf/cudf/core/column/struct.py index db6ad72ab56..ba765b50729 100644 --- a/python/cudf/cudf/core/column/struct.py +++ b/python/cudf/cudf/core/column/struct.py @@ -107,12 +107,9 @@ def memory_usage(self) -> int: return n - def element_indexing(self, index: int): + def element_indexing(self, index: int) -> dict: result = super().element_indexing(index) - return { - field: value - for field, value in zip(self.dtype.fields, result.values()) - } + return dict(zip(self.dtype.fields, result.values())) def __setitem__(self, key, value): if isinstance(value, dict): diff --git a/python/cudf/cudf/core/copy_types.py b/python/cudf/cudf/core/copy_types.py index 16d8964f083..4b6ad59c8e1 100644 --- a/python/cudf/cudf/core/copy_types.py +++ b/python/cudf/cudf/core/copy_types.py @@ -5,7 +5,6 @@ from typing_extensions import Self import cudf -import cudf._lib as libcudf from cudf._lib.types import size_type_dtype if TYPE_CHECKING: @@ -70,8 +69,8 @@ def __init__(self, column: Any, nrows: int, *, nullify: bool): if self.column.dtype.kind not in {"i", "u"}: raise TypeError("Gather map must have integer dtype") if not nullify: - lo, hi = libcudf.reduce.minmax(self.column) - if lo.value < -nrows or hi.value >= nrows: + lo, hi = self.column.minmax() + if lo < -nrows or hi >= nrows: raise IndexError( f"Gather map is out of bounds for [0, {nrows})" ) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index b74128a8a61..8cdc45e12da 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -2505,16 +2505,7 @@ def scatter_by_map( ) if map_index.size > 0: - plc_lo, plc_hi = plc.reduce.minmax( - map_index.to_pylibcudf(mode="read") - ) - # TODO: Use pylibcudf Scalar once APIs are more developed - lo = libcudf.column.Column.from_pylibcudf( - plc.Column.from_scalar(plc_lo, 1) - ).element_indexing(0) - hi = libcudf.column.Column.from_pylibcudf( - plc.Column.from_scalar(plc_hi, 1) - ).element_indexing(0) + lo, hi = map_index.minmax() if lo < 0 or hi >= map_size: raise ValueError("Partition map has invalid values") diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index 5a41a33e583..f5ee36f851c 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -191,12 +191,12 @@ def __init__( source_data = {} for i, (code, level) in enumerate(zip(new_codes, new_levels)): if len(code): - lo, hi = libcudf.reduce.minmax(code) - if lo.value < -1 or hi.value > len(level) - 1: + lo, hi = code.minmax() + if lo < -1 or hi > len(level) - 1: raise ValueError( f"Codes must be -1 <= codes <= {len(level) - 1}" ) - if lo.value == -1: + if lo == -1: # Now we can gather and insert null automatically code[code == -1] = np.iinfo(size_type_dtype).min result_col = libcudf.copying.gather( diff --git a/python/cudf/cudf/core/window/ewm.py b/python/cudf/cudf/core/window/ewm.py index 094df955273..c4a063a50e8 100644 --- a/python/cudf/cudf/core/window/ewm.py +++ b/python/cudf/cudf/core/window/ewm.py @@ -6,7 +6,6 @@ import numpy as np -from cudf._lib.reduce import scan from cudf.api.types import is_numeric_dtype from cudf.core.window.rolling import _RollingBase @@ -194,13 +193,8 @@ def _apply_agg_column( # as such we need to convert the nans to nulls before # passing them in. to_libcudf_column = source_column.astype("float64").nans_to_nulls() - - return scan( - agg_name, - to_libcudf_column, - True, - com=self.com, - adjust=self.adjust, + return to_libcudf_column.scan( + agg_name, True, com=self.com, adjust=self.adjust )