From 0e01dbd2d8b2a048964d27d1fd9d9c3bf6540166 Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:03:12 -0500 Subject: [PATCH 1/9] Fix return types for MurmurHash3_x86_32 template specializations (#17622) Changes the return types for the `MurmurHash3_x86_32` specialization functions to match the declarations in the struct. Hoping to fix some intellisense squiggles. Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Tianyu Liu (https://github.com/kingcrimsontianyu) - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/17622 --- .../hashing/detail/murmurhash3_x86_32.cuh | 39 ++++++++++++------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh b/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh index e0c7ce840d7..69edf38e359 100644 --- a/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh +++ b/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh @@ -57,62 +57,71 @@ struct MurmurHash3_x86_32 { }; template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()(bool const& key) const +MurmurHash3_x86_32::result_type __device__ inline MurmurHash3_x86_32::operator()( + bool const& key) const { return this->compute(static_cast(key)); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()(float const& key) const +MurmurHash3_x86_32::result_type __device__ inline MurmurHash3_x86_32::operator()( + float const& key) const { return this->compute(normalize_nans_and_zeros(key)); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()(double const& key) const +MurmurHash3_x86_32::result_type __device__ inline MurmurHash3_x86_32::operator()( + double const& key) const { return this->compute(normalize_nans_and_zeros(key)); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - cudf::string_view const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + cudf::string_view const& key) const { return this->compute_bytes(reinterpret_cast(key.data()), key.size_bytes()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - numeric::decimal32 const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + numeric::decimal32 const& key) const { return this->compute(key.value()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - numeric::decimal64 const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + numeric::decimal64 const& key) const { return this->compute(key.value()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - numeric::decimal128 const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + numeric::decimal128 const& key) const { return this->compute(key.value()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - cudf::list_view const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + cudf::list_view const& key) const { CUDF_UNREACHABLE("List column hashing is not supported"); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - cudf::struct_view const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + cudf::struct_view const& key) const { CUDF_UNREACHABLE("Direct hashing of struct_view is not supported"); } From 8e9254bcba89aa8cf73f6c9ce33213f2c0a7207d Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Thu, 19 Dec 2024 22:08:51 +0530 Subject: [PATCH 2/9] Add ability to modify and propagate `names` of `columns` object (#17597) Fixes: #17482, #14012 This PR fixes a long-standing issue where modifying `columns` `name` never propagates to the parent object. This PR fixes this issue by making `to_pandas_index` a cached-property and accessing it's names if this property was ever invoked in `level_names` property. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/17597 --- python/cudf/cudf/core/_base_index.py | 2 +- python/cudf/cudf/core/column_accessor.py | 17 +++++-- python/cudf/cudf/core/dataframe.py | 46 +++++++++---------- python/cudf/cudf/core/groupby/groupby.py | 2 +- python/cudf/cudf/core/indexed_frame.py | 12 ++--- python/cudf/cudf/core/multiindex.py | 4 +- python/cudf/cudf/core/reshape.py | 9 ++-- python/cudf/cudf/testing/testing.py | 4 +- .../cudf/cudf/tests/test_column_accessor.py | 6 +-- python/cudf/cudf/tests/test_dataframe.py | 29 ++++++++++++ 10 files changed, 85 insertions(+), 46 deletions(-) diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index f4543bc6156..c2f3c782d10 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -1447,7 +1447,7 @@ def _union(self, other, sort=None): other_df["order"] = other_df.index res = self_df.merge(other_df, on=[0], how="outer") res = res.sort_values( - by=res._data.to_pandas_index()[1:], ignore_index=True + by=res._data.to_pandas_index[1:], ignore_index=True ) union_result = cudf.core.index._index_from_data({0: res._data[0]}) diff --git a/python/cudf/cudf/core/column_accessor.py b/python/cudf/cudf/core/column_accessor.py index e4fd82e819b..aaf7d071dff 100644 --- a/python/cudf/cudf/core/column_accessor.py +++ b/python/cudf/cudf/core/column_accessor.py @@ -207,11 +207,16 @@ def _from_columns_like_self( @property def level_names(self) -> tuple[abc.Hashable, ...]: + if self.is_cached("to_pandas_index"): + return self.to_pandas_index.names if self._level_names is None or len(self._level_names) == 0: return tuple((None,) * max(1, self.nlevels)) else: return self._level_names + def is_cached(self, attr_name: str) -> bool: + return attr_name in self.__dict__ + @property def nlevels(self) -> int: if len(self) == 0: @@ -262,7 +267,12 @@ def _clear_cache(self, old_ncols: int, new_ncols: int) -> None: new_ncols: int len(self) after self._data was modified """ - cached_properties = ("columns", "names", "_grouped_data") + cached_properties = ( + "columns", + "names", + "_grouped_data", + "to_pandas_index", + ) for attr in cached_properties: try: self.__delattr__(attr) @@ -276,6 +286,7 @@ def _clear_cache(self, old_ncols: int, new_ncols: int) -> None: except AttributeError: pass + @cached_property def to_pandas_index(self) -> pd.Index: """Convert the keys of the ColumnAccessor to a Pandas Index object.""" if self.multiindex and len(self.level_names) > 0: @@ -726,10 +737,10 @@ def droplevel(self, level: int) -> None: } new_ncols = len(self) self._level_names = ( - self._level_names[:level] + self._level_names[level + 1 :] + self.level_names[:level] + self.level_names[level + 1 :] ) - if len(self._level_names) == 1: + if len(self.level_names) == 1: # can't use nlevels, as it depends on multiindex self.multiindex = False self._clear_cache(old_ncols, new_ncols) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index e66e4f41642..3334b57ce1b 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -961,7 +961,7 @@ def _init_from_series_list(self, data, columns, index): warnings.simplefilter("ignore", FutureWarning) concat_df = cudf.concat(data, axis=1) - cols = concat_df._data.to_pandas_index() + cols = concat_df._data.to_pandas_index if cols.dtype == "object": concat_df.columns = cols.astype("str") @@ -2092,7 +2092,7 @@ def _make_operands_and_index_for_binop( equal_columns = True elif isinstance(other, Series): if ( - not (self_pd_columns := self._data.to_pandas_index()).equals( + not (self_pd_columns := self._data.to_pandas_index).equals( other_pd_index := other.index.to_pandas() ) and not can_reindex @@ -2117,8 +2117,8 @@ def _make_operands_and_index_for_binop( and fn in cudf.utils.utils._EQUALITY_OPS and ( not self.index.equals(other.index) - or not self._data.to_pandas_index().equals( - other._data.to_pandas_index() + or not self._data.to_pandas_index.equals( + other._data.to_pandas_index ) ) ): @@ -2162,11 +2162,11 @@ def _make_operands_and_index_for_binop( if not equal_columns: if isinstance(other, DataFrame): - column_names_list = self._data.to_pandas_index().join( - other._data.to_pandas_index(), how="outer" + column_names_list = self._data.to_pandas_index.join( + other._data.to_pandas_index, how="outer" ) elif isinstance(other, Series): - column_names_list = self._data.to_pandas_index().join( + column_names_list = self._data.to_pandas_index.join( other.index.to_pandas(), how="outer" ) else: @@ -2626,8 +2626,8 @@ def update( if not isinstance(other, DataFrame): other = DataFrame(other) - self_cols = self._data.to_pandas_index() - if not self_cols.equals(other._data.to_pandas_index()): + self_cols = self._data.to_pandas_index + if not self_cols.equals(other._data.to_pandas_index): other = other.reindex(self_cols, axis=1) if not self.index.equals(other.index): other = other.reindex(self.index, axis=0) @@ -2663,7 +2663,7 @@ def __iter__(self): def __contains__(self, item): # This must check against containment in the pandas Index and not # self._column_names to handle NA, None, nan, etc. correctly. - return item in self._data.to_pandas_index() + return item in self._data.to_pandas_index @_performance_tracking def items(self): @@ -2700,14 +2700,14 @@ def at(self): @property # type: ignore @_external_only_api( - "Use _column_names instead, or _data.to_pandas_index() if a pandas " + "Use _column_names instead, or _data.to_pandas_index if a pandas " "index is absolutely necessary. For checking if the columns are a " "MultiIndex, use _data.multiindex." ) @_performance_tracking def columns(self): """Returns a tuple of columns""" - return self._data.to_pandas_index() + return self._data.to_pandas_index @columns.setter # type: ignore @_performance_tracking @@ -2916,7 +2916,7 @@ def reindex( df = self else: columns = cudf.Index(columns) - intersection = self._data.to_pandas_index().intersection( + intersection = self._data.to_pandas_index.intersection( columns.to_pandas() ) df = self.loc[:, intersection] @@ -3430,7 +3430,7 @@ def axes(self): Index(['key', 'k2', 'val', 'temp'], dtype='object')] """ - return [self.index, self._data.to_pandas_index()] + return [self.index, self._data.to_pandas_index] def diff(self, periods=1, axis=0): """ @@ -4129,7 +4129,7 @@ def transpose(self): Not supporting *copy* because default and only behavior is copy=True """ - index = self._data.to_pandas_index() + index = self._data.to_pandas_index columns = self.index.copy(deep=False) if self._num_columns == 0 or self._num_rows == 0: return DataFrame(index=index, columns=columns) @@ -5535,7 +5535,7 @@ def to_pandas( } out_df = pd.DataFrame(out_data, index=out_index) - out_df.columns = self._data.to_pandas_index() + out_df.columns = self._data.to_pandas_index return out_df @@ -6487,7 +6487,7 @@ def _reduce( source = self._get_columns_by_label(numeric_cols) if source.empty: return Series( - index=self._data.to_pandas_index()[:0] + index=self._data.to_pandas_index[:0] if axis == 0 else source.index, dtype="float64", @@ -6540,7 +6540,7 @@ def _reduce( "Columns must all have the same dtype to " f"perform {op=} with {axis=}" ) - pd_index = source._data.to_pandas_index() + pd_index = source._data.to_pandas_index if source._data.multiindex: idx = MultiIndex.from_pandas(pd_index) else: @@ -7242,7 +7242,7 @@ def stack( ] has_unnamed_levels = len(unnamed_levels_indices) > 0 - column_name_idx = self._data.to_pandas_index() + column_name_idx = self._data.to_pandas_index # Construct new index from the levels specified by `level` named_levels = pd.MultiIndex.from_arrays( [column_name_idx.get_level_values(lv) for lv in level_indices] @@ -7432,7 +7432,7 @@ def cov(self, min_periods=None, ddof: int = 1, numeric_only: bool = False): ) cov = cupy.cov(self.values, ddof=ddof, rowvar=False) - cols = self._data.to_pandas_index() + cols = self._data.to_pandas_index df = DataFrame(cupy.asfortranarray(cov), index=cols) df._set_columns_like(self._data) return df @@ -7475,7 +7475,7 @@ def corr( ) corr = cupy.corrcoef(values, rowvar=False) - cols = self._data.to_pandas_index() + cols = self._data.to_pandas_index df = DataFrame(cupy.asfortranarray(corr), index=cols) df._set_columns_like(self._data) return df @@ -7544,7 +7544,7 @@ def keys(self): >>> df.keys() Index([0, 1, 2, 3], dtype='int64') """ - return self._data.to_pandas_index() + return self._data.to_pandas_index def itertuples(self, index=True, name="Pandas"): """ @@ -7778,7 +7778,7 @@ def nunique(self, axis=0, dropna: bool = True) -> Series: raise NotImplementedError("axis parameter is not supported yet.") counts = [col.distinct_count(dropna=dropna) for col in self._columns] return self._constructor_sliced( - counts, index=self._data.to_pandas_index() + counts, index=self._data.to_pandas_index ) def _sample_axis_1( diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 6cd8e11695f..e8a9e599cb0 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -3087,7 +3087,7 @@ def agg(self, func, *args, engine=None, engine_kwargs=None, **kwargs): # drop the first level if we have a multiindex if result._data.nlevels > 1: - result.columns = result._data.to_pandas_index().droplevel(0) + result.columns = result._data.to_pandas_index.droplevel(0) return result diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 72bb85821fa..6854cb02aa5 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -1106,13 +1106,11 @@ def dot(self, other, reflect=False): lhs = self.reindex(index=common, copy=False).values rhs = other.reindex(index=common, copy=False).values if isinstance(other, cudf.DataFrame): - result_index = other._data.to_pandas_index() + result_index = other._data.to_pandas_index elif isinstance(self, cudf.DataFrame) and isinstance( other, (cudf.Series, cudf.DataFrame) ): - common = self._data.to_pandas_index().union( - other.index.to_pandas() - ) + common = self._data.to_pandas_index.union(other.index.to_pandas()) if len(common) > self._num_columns or len(common) > len( other.index ): @@ -1124,7 +1122,7 @@ def dot(self, other, reflect=False): rhs = other.reindex(index=common, copy=False).values lhs = lhs.values if isinstance(other, cudf.DataFrame): - result_cols = other._data.to_pandas_index() + result_cols = other._data.to_pandas_index elif isinstance( other, (cp.ndarray, np.ndarray) @@ -2244,7 +2242,7 @@ def truncate(self, before=None, after=None, axis=0, copy=True): if not copy: raise ValueError("Truncating with copy=False is not supported.") axis = self._get_axis_from_axis_arg(axis) - ax = self.index if axis == 0 else self._data.to_pandas_index() + ax = self.index if axis == 0 else self._data.to_pandas_index if not ax.is_monotonic_increasing and not ax.is_monotonic_decreasing: raise ValueError("truncate requires a sorted index") @@ -6770,7 +6768,7 @@ def _drop_rows_by_labels( return obj.__class__._from_data( join_res.iloc[:, idx_nlv:]._data, index=midx, - columns=obj._data.to_pandas_index(), + columns=obj._data.to_pandas_index, ) else: diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index d2afe643dc4..1e613e49ffc 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -1123,7 +1123,7 @@ def _concat(cls, objs) -> Self: # TODO: Verify if this is really necessary or if we can rely on # DataFrame._concat. if len(source_data) > 1: - colnames = source_data[0]._data.to_pandas_index() + colnames = source_data[0]._data.to_pandas_index for obj in source_data[1:]: obj.columns = colnames @@ -2068,7 +2068,7 @@ def _union(self, other, sort=None) -> Self: result_df = self_df.merge(other_df, on=col_names, how="outer") result_df = result_df.sort_values( - by=result_df._data.to_pandas_index()[self.nlevels :], + by=result_df._data.to_pandas_index[self.nlevels :], ignore_index=True, ) diff --git a/python/cudf/cudf/core/reshape.py b/python/cudf/cudf/core/reshape.py index 3ab6ed306b6..0abd42d4d4e 100644 --- a/python/cudf/cudf/core/reshape.py +++ b/python/cudf/cudf/core/reshape.py @@ -431,8 +431,9 @@ def concat( result_columns = ( objs[0] - ._data.to_pandas_index() - .append([obj._data.to_pandas_index() for obj in objs[1:]]) + ._data.to_pandas_index.append( + [obj._data.to_pandas_index for obj in objs[1:]] + ) .unique() ) @@ -689,7 +690,7 @@ def _tile(A, reps): if not value_vars: # TODO: Use frame._data.label_dtype when it's more consistently set var_data = cudf.Series( - value_vars, dtype=frame._data.to_pandas_index().dtype + value_vars, dtype=frame._data.to_pandas_index.dtype ) else: var_data = ( @@ -1273,7 +1274,7 @@ def unstack(df, level, fill_value=None, sort: bool = True): res = df.T.stack(future_stack=False) # Result's index is a multiindex res.index.names = ( - tuple(df._data.to_pandas_index().names) + df.index.names + tuple(df._data.to_pandas_index.names) + df.index.names ) return res else: diff --git a/python/cudf/cudf/testing/testing.py b/python/cudf/cudf/testing/testing.py index 0b09cf7dc34..a1df2c7d857 100644 --- a/python/cudf/cudf/testing/testing.py +++ b/python/cudf/cudf/testing/testing.py @@ -692,8 +692,8 @@ def assert_frame_equal( ) pd.testing.assert_index_equal( - left._data.to_pandas_index(), - right._data.to_pandas_index(), + left._data.to_pandas_index, + right._data.to_pandas_index, exact=check_column_type, check_names=check_names, check_exact=check_exact, diff --git a/python/cudf/cudf/tests/test_column_accessor.py b/python/cudf/cudf/tests/test_column_accessor.py index 5cef077c18d..27ec4fcd1f3 100644 --- a/python/cudf/cudf/tests/test_column_accessor.py +++ b/python/cudf/cudf/tests/test_column_accessor.py @@ -64,7 +64,7 @@ def test_to_pandas_simple(simple_data): # Index([], dtype='object'), and `integer` for RangeIndex() # to ignore this `inferred_type` comparison, we pass exact=False. assert_eq( - ca.to_pandas_index(), + ca.to_pandas_index, pd.DataFrame( {key: value.values_host for key, value in simple_data.items()} ).columns, @@ -75,7 +75,7 @@ def test_to_pandas_simple(simple_data): def test_to_pandas_multiindex(mi_data): ca = ColumnAccessor(mi_data, multiindex=True) assert_eq( - ca.to_pandas_index(), + ca.to_pandas_index, pd.DataFrame( {key: value.values_host for key, value in mi_data.items()} ).columns, @@ -89,7 +89,7 @@ def test_to_pandas_multiindex_names(): level_names=("foo", "bar"), ) assert_eq( - ca.to_pandas_index(), + ca.to_pandas_index, pd.MultiIndex.from_tuples( (("a", "b"), ("c", "d")), names=("foo", "bar") ), diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index d04fd97dcbd..11a9b398b50 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -11193,3 +11193,32 @@ def test_dataframe_init_column(): expect = cudf.DataFrame({"a": s}) actual = cudf.DataFrame._from_arrays(s._column, columns=["a"]) assert_eq(expect, actual) + + +@pytest.mark.parametrize("name", [None, "foo", 1, 1.0]) +def test_dataframe_column_name(name): + df = cudf.DataFrame({"a": [1, 2, 3]}) + pdf = df.to_pandas() + + df.columns.name = name + pdf.columns.name = name + + assert_eq(df, pdf) + assert_eq(df.columns.name, pdf.columns.name) + + +@pytest.mark.parametrize("names", [["abc", "def"], [1, 2], ["abc", 10]]) +def test_dataframe_multiindex_column_names(names): + arrays = [["A", "A", "B", "B"], ["one", "two", "one", "two"]] + tuples = list(zip(*arrays)) + index = pd.MultiIndex.from_tuples(tuples, names=["first", "second"]) + + pdf = pd.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], columns=index) + df = cudf.from_pandas(pdf) + + assert_eq(df, pdf) + assert_eq(df.columns.names, pdf.columns.names) + pdf.columns.names = names + df.columns.names = names + assert_eq(df, pdf) + assert_eq(df.columns.names, pdf.columns.names) From 989fac48c06a18c4ff245aea46ef1a304369111d Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 19 Dec 2024 11:22:17 -0800 Subject: [PATCH 3/9] Remove cudf._lib.groupby in favor of inlining pylibcudf (#17582) Contributes to https://github.com/rapidsai/cudf/issues/17317 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Matthew Murray (https://github.com/Matt711) URL: https://github.com/rapidsai/cudf/pull/17582 --- python/cudf/cudf/_lib/CMakeLists.txt | 2 +- python/cudf/cudf/_lib/__init__.py | 5 +- python/cudf/cudf/_lib/groupby.pyx | 281 --------------- .../cudf/cudf/core/_internals/aggregation.py | 4 +- python/cudf/cudf/core/column/column.py | 4 +- python/cudf/cudf/core/groupby/groupby.py | 335 ++++++++++++++++-- python/cudf/cudf/core/window/rolling.py | 2 +- python/cudf/cudf/tests/test_groupby.py | 4 +- 8 files changed, 316 insertions(+), 321 deletions(-) delete mode 100644 python/cudf/cudf/_lib/groupby.pyx diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 410fd57691e..da4faabf189 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources column.pyx groupby.pyx scalar.pyx strings_udf.pyx types.pyx utils.pyx) +set(cython_sources column.pyx scalar.pyx strings_udf.pyx types.pyx utils.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 6b5a7814e48..10f9d813ccc 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -1,10 +1,7 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. import numpy as np -from . import ( - groupby, - strings_udf, -) +from . import strings_udf MAX_COLUMN_SIZE = np.iinfo(np.int32).max MAX_COLUMN_SIZE_STR = "INT32_MAX" diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx deleted file mode 100644 index 80a77ef2267..00000000000 --- a/python/cudf/cudf/_lib/groupby.pyx +++ /dev/null @@ -1,281 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. -from functools import singledispatch - -from pandas.errors import DataError - -from cudf.api.types import _is_categorical_dtype, is_string_dtype -from cudf.core.buffer import acquire_spill_lock -from cudf.core.dtypes import ( - CategoricalDtype, - DecimalDtype, - IntervalDtype, - ListDtype, - StructDtype, -) - -from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.utils cimport columns_from_pylibcudf_table - -from cudf._lib.scalar import as_device_scalar - -import pylibcudf - -from cudf.core._internals.aggregation import make_aggregation - -# The sets below define the possible aggregations that can be performed on -# different dtypes. These strings must be elements of the AggregationKind enum. -# The libcudf infrastructure exists for "COLLECT" support on -# categoricals, but the dtype support in python does not. -_CATEGORICAL_AGGS = {"COUNT", "NUNIQUE", "SIZE", "UNIQUE"} -_STRING_AGGS = { - "COLLECT", - "COUNT", - "MAX", - "MIN", - "NTH", - "NUNIQUE", - "SIZE", - "UNIQUE", -} -_LIST_AGGS = {"COLLECT"} -_STRUCT_AGGS = {"COLLECT", "CORRELATION", "COVARIANCE"} -_INTERVAL_AGGS = {"COLLECT"} -_DECIMAL_AGGS = { - "ARGMIN", - "ARGMAX", - "COLLECT", - "COUNT", - "MAX", - "MIN", - "NTH", - "NUNIQUE", - "SUM", -} - - -@singledispatch -def get_valid_aggregation(dtype): - if is_string_dtype(dtype): - return _STRING_AGGS - return "ALL" - - -@get_valid_aggregation.register -def _(dtype: ListDtype): - return _LIST_AGGS - - -@get_valid_aggregation.register -def _(dtype: CategoricalDtype): - return _CATEGORICAL_AGGS - - -@get_valid_aggregation.register -def _(dtype: ListDtype): - return _LIST_AGGS - - -@get_valid_aggregation.register -def _(dtype: StructDtype): - return _STRUCT_AGGS - - -@get_valid_aggregation.register -def _(dtype: IntervalDtype): - return _INTERVAL_AGGS - - -@get_valid_aggregation.register -def _(dtype: DecimalDtype): - return _DECIMAL_AGGS - - -cdef class GroupBy: - cdef dict __dict__ - - def __init__(self, keys, dropna=True): - with acquire_spill_lock() as spill_lock: - self._groupby = pylibcudf.groupby.GroupBy( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in keys]), - pylibcudf.types.NullPolicy.EXCLUDE if dropna - else pylibcudf.types.NullPolicy.INCLUDE - ) - - # We spill lock the columns while this GroupBy instance is alive. - self._spill_lock = spill_lock - - def groups(self, list values): - """ - Perform a sort groupby, using the keys used to construct the Groupby as the key - columns and ``values`` as the value columns. - - Parameters - ---------- - values: list of Columns - The value columns - - Returns - ------- - offsets: list of integers - Integer offsets such that offsets[i+1] - offsets[i] - represents the size of group `i`. - grouped_keys: list of Columns - The grouped key columns - grouped_values: list of Columns - The grouped value columns - """ - offsets, grouped_keys, grouped_values = self._groupby.get_groups( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]) - if values else None - ) - - return ( - offsets, - columns_from_pylibcudf_table(grouped_keys), - ( - columns_from_pylibcudf_table(grouped_values) - if grouped_values is not None else [] - ), - ) - - def aggregate(self, values, aggregations): - """ - Parameters - ---------- - values : Frame - aggregations - A dict mapping column names in `Frame` to a list of aggregations - to perform on that column - - Each aggregation may be specified as: - - a string (e.g., "max") - - a lambda/function - - Returns - ------- - Frame of aggregated values - """ - included_aggregations = [] - column_included = [] - requests = [] - for i, (col, aggs) in enumerate(zip(values, aggregations)): - valid_aggregations = get_valid_aggregation(col.dtype) - included_aggregations_i = [] - col_aggregations = [] - for agg in aggs: - str_agg = str(agg) - if ( - is_string_dtype(col) - and agg not in _STRING_AGGS - and - ( - str_agg in {"cumsum", "cummin", "cummax"} - or not ( - any(a in str_agg for a in { - "count", - "max", - "min", - "first", - "last", - "nunique", - "unique", - "nth" - }) - or (agg is list) - ) - ) - ): - raise TypeError( - f"function is not supported for this dtype: {agg}" - ) - elif ( - _is_categorical_dtype(col) - and agg not in _CATEGORICAL_AGGS - and ( - str_agg in {"cumsum", "cummin", "cummax"} - or - not ( - any(a in str_agg for a in {"count", "max", "min", "unique"}) - ) - ) - ): - raise TypeError( - f"{col.dtype} type does not support {agg} operations" - ) - - agg_obj = make_aggregation(agg) - if valid_aggregations == "ALL" or agg_obj.kind in valid_aggregations: - included_aggregations_i.append((agg, agg_obj.kind)) - col_aggregations.append(agg_obj.c_obj) - included_aggregations.append(included_aggregations_i) - if col_aggregations: - requests.append(pylibcudf.groupby.GroupByRequest( - col.to_pylibcudf(mode="read"), col_aggregations - )) - column_included.append(i) - - if not requests and any(len(v) > 0 for v in aggregations): - raise DataError("All requested aggregations are unsupported.") - - keys, results = self._groupby.scan(requests) if \ - _is_all_scan_aggregate(aggregations) else self._groupby.aggregate(requests) - - result_columns = [[] for _ in range(len(values))] - for i, result in zip(column_included, results): - result_columns[i] = columns_from_pylibcudf_table(result) - - return result_columns, columns_from_pylibcudf_table(keys), included_aggregations - - def shift(self, list values, int periods, list fill_values): - keys, shifts = self._groupby.shift( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]), - [periods] * len(values), - [ - ( as_device_scalar(val, dtype=col.dtype)).c_value - for val, col in zip(fill_values, values) - ], - ) - - return columns_from_pylibcudf_table(shifts), columns_from_pylibcudf_table(keys) - - def replace_nulls(self, list values, object method): - _, replaced = self._groupby.replace_nulls( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]), - [ - pylibcudf.replace.ReplacePolicy.PRECEDING - if method == 'ffill' else pylibcudf.replace.ReplacePolicy.FOLLOWING - ] * len(values), - ) - - return columns_from_pylibcudf_table(replaced) - - -_GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax", "cumprod", "rank"} - - -def _is_all_scan_aggregate(all_aggs): - """ - Returns true if all are scan aggregations. - Raises - ------ - NotImplementedError - If both reduction aggregations and scan aggregations are present. - """ - - def get_name(agg): - return agg.__name__ if callable(agg) else agg - - all_scan = all( - get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs - for agg_name in aggs - ) - any_scan = any( - get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs - for agg_name in aggs - ) - - if not all_scan and any_scan: - raise NotImplementedError( - "Cannot perform both aggregation and scan in one operation" - ) - return all_scan and any_scan diff --git a/python/cudf/cudf/core/_internals/aggregation.py b/python/cudf/cudf/core/_internals/aggregation.py index fe8ea5a947a..1d21d34b1bf 100644 --- a/python/cudf/cudf/core/_internals/aggregation.py +++ b/python/cudf/cudf/core/_internals/aggregation.py @@ -29,11 +29,11 @@ class Aggregation: def __init__(self, agg: plc.aggregation.Aggregation) -> None: - self.c_obj = agg + self.plc_obj = agg @property def kind(self) -> str: - name = self.c_obj.kind().name + name = self.plc_obj.kind().name return _agg_name_map.get(name, name) @classmethod diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index cccafaeba88..75b9070b53f 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -1605,7 +1605,7 @@ def scan(self, scan_op: str, inclusive: bool, **kwargs) -> Self: return type(self).from_pylibcudf( # type: ignore[return-value] plc.reduce.scan( self.to_pylibcudf(mode="read"), - aggregation.make_aggregation(scan_op, kwargs).c_obj, + aggregation.make_aggregation(scan_op, kwargs).plc_obj, plc.reduce.ScanType.INCLUSIVE if inclusive else plc.reduce.ScanType.EXCLUSIVE, @@ -1637,7 +1637,7 @@ def reduce(self, reduction_op: str, dtype=None, **kwargs) -> ScalarLike: with acquire_spill_lock(): plc_scalar = plc.reduce.reduce( self.to_pylibcudf(mode="read"), - aggregation.make_aggregation(reduction_op, kwargs).c_obj, + aggregation.make_aggregation(reduction_op, kwargs).plc_obj, dtype_to_pylibcudf_type(col_dtype), ) result_col = type(self).from_pylibcudf( diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index e8a9e599cb0..be3cc410174 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -4,9 +4,10 @@ import copy import itertools import textwrap +import types import warnings from collections import abc -from functools import cached_property +from functools import cached_property, singledispatch from typing import TYPE_CHECKING, Any, Literal import cupy as cp @@ -18,17 +19,27 @@ import cudf import cudf.core._internals from cudf import _lib as libcudf -from cudf._lib import groupby as libgroupby from cudf._lib.types import size_type_dtype from cudf.api.extensions import no_default -from cudf.api.types import is_list_like, is_numeric_dtype +from cudf.api.types import ( + is_list_like, + is_numeric_dtype, + is_string_dtype, +) from cudf.core._compat import PANDAS_LT_300 -from cudf.core._internals import sorting +from cudf.core._internals import aggregation, sorting from cudf.core.abc import Serializable from cudf.core.buffer import acquire_spill_lock -from cudf.core.column.column import ColumnBase, StructDtype, as_column +from cudf.core.column.column import ColumnBase, as_column from cudf.core.column_accessor import ColumnAccessor from cudf.core.copy_types import GatherMap +from cudf.core.dtypes import ( + CategoricalDtype, + DecimalDtype, + IntervalDtype, + ListDtype, + StructDtype, +) from cudf.core.join._join_helpers import _match_join_keys from cudf.core.mixins import Reducible, Scannable from cudf.core.multiindex import MultiIndex @@ -37,7 +48,7 @@ from cudf.utils.utils import GetAttrGetItemMixin if TYPE_CHECKING: - from collections.abc import Iterable + from collections.abc import Generator, Iterable from cudf._typing import ( AggType, @@ -46,6 +57,152 @@ ScalarLike, ) +# The sets below define the possible aggregations that can be performed on +# different dtypes. These strings must be elements of the AggregationKind enum. +# The libcudf infrastructure exists for "COLLECT" support on +# categoricals, but the dtype support in python does not. +_CATEGORICAL_AGGS = {"COUNT", "NUNIQUE", "SIZE", "UNIQUE"} +_STRING_AGGS = { + "COLLECT", + "COUNT", + "MAX", + "MIN", + "NTH", + "NUNIQUE", + "SIZE", + "UNIQUE", +} +_LIST_AGGS = {"COLLECT"} +_STRUCT_AGGS = {"COLLECT", "CORRELATION", "COVARIANCE"} +_INTERVAL_AGGS = {"COLLECT"} +_DECIMAL_AGGS = { + "ARGMIN", + "ARGMAX", + "COLLECT", + "COUNT", + "MAX", + "MIN", + "NTH", + "NUNIQUE", + "SUM", +} + + +@singledispatch +def get_valid_aggregation(dtype): + if is_string_dtype(dtype): + return _STRING_AGGS + return "ALL" + + +@get_valid_aggregation.register +def _(dtype: ListDtype): + return _LIST_AGGS + + +@get_valid_aggregation.register +def _(dtype: CategoricalDtype): + return _CATEGORICAL_AGGS + + +@get_valid_aggregation.register +def _(dtype: ListDtype): + return _LIST_AGGS + + +@get_valid_aggregation.register +def _(dtype: StructDtype): + return _STRUCT_AGGS + + +@get_valid_aggregation.register +def _(dtype: IntervalDtype): + return _INTERVAL_AGGS + + +@get_valid_aggregation.register +def _(dtype: DecimalDtype): + return _DECIMAL_AGGS + + +@singledispatch +def _is_unsupported_agg_for_type(dtype, str_agg: str) -> bool: + return False + + +@_is_unsupported_agg_for_type.register +def _(dtype: np.dtype, str_agg: str) -> bool: + # string specifically + cumulative_agg = str_agg in {"cumsum", "cummin", "cummax"} + basic_agg = any( + a in str_agg + for a in ( + "count", + "max", + "min", + "first", + "last", + "nunique", + "unique", + "nth", + ) + ) + return ( + dtype.kind == "O" + and str_agg not in _STRING_AGGS + and (cumulative_agg or not (basic_agg or str_agg == "")) + ) + + +@_is_unsupported_agg_for_type.register +def _(dtype: CategoricalDtype, str_agg: str) -> bool: + cumulative_agg = str_agg in {"cumsum", "cummin", "cummax"} + not_basic_agg = not any( + a in str_agg for a in ("count", "max", "min", "unique") + ) + return str_agg not in _CATEGORICAL_AGGS and ( + cumulative_agg or not_basic_agg + ) + + +def _is_all_scan_aggregate(all_aggs: list[list[str]]) -> bool: + """ + Returns True if all are scan aggregations. + + Raises + ------ + NotImplementedError + If both reduction aggregations and scan aggregations are present. + """ + groupby_scans = { + "cumcount", + "cumsum", + "cummin", + "cummax", + "cumprod", + "rank", + } + + def get_name(agg): + return agg.__name__ if callable(agg) else agg + + all_scan = all( + get_name(agg_name) in groupby_scans + for aggs in all_aggs + for agg_name in aggs + ) + any_scan = any( + get_name(agg_name) in groupby_scans + for aggs in all_aggs + for agg_name in aggs + ) + + if not all_scan and any_scan: + raise NotImplementedError( + "Cannot perform both aggregation and scan in one operation" + ) + return all_scan and any_scan + def _deprecate_collect(): warnings.warn( @@ -423,7 +580,7 @@ def indices(self) -> dict[ScalarLike, cp.ndarray]: >>> df.groupby(by=["a"]).indices {10: array([0, 1]), 40: array([2])} """ - offsets, group_keys, (indices,) = self._groupby.groups( + offsets, group_keys, (indices,) = self._groups( [ cudf.core.column.as_column( range(len(self.obj)), dtype=size_type_dtype @@ -582,11 +739,137 @@ def rank(x): return result @cached_property - def _groupby(self): - return libgroupby.GroupBy( - [*self.grouping.keys._columns], dropna=self._dropna + def _groupby(self) -> types.SimpleNamespace: + with acquire_spill_lock() as spill_lock: + plc_groupby = plc.groupby.GroupBy( + plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in self.grouping.keys._columns + ] + ), + plc.types.NullPolicy.EXCLUDE + if self._dropna + else plc.types.NullPolicy.INCLUDE, + ) + # Do we need this because we just check _spill_locks in test_spillable_df_groupby? + return types.SimpleNamespace( + plc_groupby=plc_groupby, _spill_locks=spill_lock + ) + + def _groups( + self, values: Iterable[ColumnBase] + ) -> tuple[list[int], list[ColumnBase], list[ColumnBase]]: + plc_columns = [col.to_pylibcudf(mode="read") for col in values] + if not plc_columns: + plc_table = None + else: + plc_table = plc.Table(plc_columns) + offsets, grouped_keys, grouped_values = ( + self._groupby.plc_groupby.get_groups(plc_table) + ) + + return ( + offsets, + [ColumnBase.from_pylibcudf(col) for col in grouped_keys.columns()], + ( + [ + ColumnBase.from_pylibcudf(col) + for col in grouped_values.columns() + ] + if grouped_values is not None + else [] + ), + ) + + def _aggregate( + self, values: tuple[ColumnBase, ...], aggregations + ) -> tuple[ + list[list[ColumnBase]], + list[ColumnBase], + list[list[tuple[str, str]]], + ]: + included_aggregations = [] + column_included = [] + requests = [] + result_columns: list[list[ColumnBase]] = [] + for i, (col, aggs) in enumerate(zip(values, aggregations)): + valid_aggregations = get_valid_aggregation(col.dtype) + included_aggregations_i = [] + col_aggregations = [] + for agg in aggs: + str_agg = str(agg) + if _is_unsupported_agg_for_type(col.dtype, str_agg): + raise TypeError( + f"{col.dtype} type does not support {agg} operations" + ) + agg_obj = aggregation.make_aggregation(agg) + if ( + valid_aggregations == "ALL" + or agg_obj.kind in valid_aggregations + ): + included_aggregations_i.append((agg, agg_obj.kind)) + col_aggregations.append(agg_obj.plc_obj) + included_aggregations.append(included_aggregations_i) + result_columns.append([]) + if col_aggregations: + requests.append( + plc.groupby.GroupByRequest( + col.to_pylibcudf(mode="read"), col_aggregations + ) + ) + column_included.append(i) + + if not requests and any(len(v) > 0 for v in aggregations): + raise pd.errors.DataError( + "All requested aggregations are unsupported." + ) + + keys, results = ( + self._groupby.plc_groupby.scan(requests) + if _is_all_scan_aggregate(aggregations) + else self._groupby.plc_groupby.aggregate(requests) ) + for i, result in zip(column_included, results): + result_columns[i] = [ + ColumnBase.from_pylibcudf(col) for col in result.columns() + ] + + return ( + result_columns, + [ColumnBase.from_pylibcudf(key) for key in keys.columns()], + included_aggregations, + ) + + def _shift( + self, values: tuple[ColumnBase, ...], periods: int, fill_values: list + ) -> Generator[ColumnBase]: + _, shifts = self._groupby.plc_groupby.shift( + plc.table.Table([col.to_pylibcudf(mode="read") for col in values]), + [periods] * len(values), + [ + cudf.Scalar(val, dtype=col.dtype).device_value.c_value + for val, col in zip(fill_values, values) + ], + ) + return (ColumnBase.from_pylibcudf(col) for col in shifts.columns()) + + def _replace_nulls( + self, values: tuple[ColumnBase, ...], method: str + ) -> Generator[ColumnBase]: + _, replaced = self._groupby.plc_groupby.replace_nulls( + plc.Table([col.to_pylibcudf(mode="read") for col in values]), + [ + plc.replace.ReplacePolicy.PRECEDING + if method == "ffill" + else plc.replace.ReplacePolicy.FOLLOWING + ] + * len(values), + ) + + return (ColumnBase.from_pylibcudf(col) for col in replaced.columns()) + @_performance_tracking def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): """ @@ -702,7 +985,7 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): result_columns, grouped_key_cols, included_aggregations, - ) = self._groupby.aggregate(columns, normalized_aggs) + ) = self._aggregate(columns, normalized_aggs) result_index = self.grouping.keys._from_columns_like_self( grouped_key_cols, @@ -761,7 +1044,7 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): else: if cudf.get_option( "mode.pandas_compatible" - ) and not libgroupby._is_all_scan_aggregate(normalized_aggs): + ) and not _is_all_scan_aggregate(normalized_aggs): # Even with `sort=False`, pandas guarantees that # groupby preserves the order of rows within each group. left_cols = list(self.grouping.keys.drop_duplicates()._columns) @@ -810,7 +1093,7 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): if not self._as_index: result = result.reset_index() - if libgroupby._is_all_scan_aggregate(normalized_aggs): + if _is_all_scan_aggregate(normalized_aggs): # Scan aggregations return rows in original index order return self._mimic_pandas_order(result) @@ -920,7 +1203,7 @@ def _head_tail(self, n, *, take_head: bool, preserve_order: bool): # Can't use _mimic_pandas_order because we need to # subsample the gather map from the full input ordering, # rather than permuting the gather map of the output. - _, _, (ordering,) = self._groupby.groups( + _, _, (ordering,) = self._groups( [as_column(range(0, len(self.obj)))] ) # Invert permutation from original order to groups on the @@ -1312,8 +1595,8 @@ def deserialize(cls, header, frames): return cls(obj, grouping, **kwargs) def _grouped(self, *, include_groups: bool = True): - offsets, grouped_key_cols, grouped_value_cols = self._groupby.groups( - [*self.obj.index._columns, *self.obj._columns] + offsets, grouped_key_cols, grouped_value_cols = self._groups( + itertools.chain(self.obj.index._columns, self.obj._columns) ) grouped_keys = cudf.core.index._index_from_data( dict(enumerate(grouped_key_cols)) @@ -1945,7 +2228,7 @@ def transform( "Currently, `transform()` supports only aggregations." ) from e # If the aggregation is a scan, don't broadcast - if libgroupby._is_all_scan_aggregate([[func]]): + if _is_all_scan_aggregate([[func]]): if len(result) != len(self.obj): raise AssertionError( "Unexpected result length for scan transform" @@ -2409,7 +2692,7 @@ def _scan_fill(self, method: str, limit: int) -> DataFrameOrSeries: dict( zip( values._column_names, - self._groupby.replace_nulls([*values._columns], method), + self._replace_nulls(values._columns, method), ) ) ) @@ -2513,7 +2796,7 @@ def fillna( @_performance_tracking def shift( self, - periods=1, + periods: int = 1, freq=None, axis=0, fill_value=None, @@ -2560,7 +2843,7 @@ def shift( if freq is not None: raise NotImplementedError("Parameter freq is unsupported.") - if not axis == 0: + if axis != 0: raise NotImplementedError("Only axis=0 is supported.") if suffix is not None: @@ -2568,20 +2851,18 @@ def shift( values = self.grouping.values if is_list_like(fill_value): - if len(fill_value) != len(values._data): + if len(fill_value) != values._num_columns: raise ValueError( "Mismatched number of columns and values to fill." ) else: - fill_value = [fill_value] * len(values._data) + fill_value = [fill_value] * values._num_columns result = self.obj.__class__._from_data( dict( zip( values._column_names, - self._groupby.shift( - [*values._columns], periods, fill_value - )[0], + self._shift(values._columns, periods, fill_value), ) ) ) @@ -2680,9 +2961,7 @@ def _mimic_pandas_order( # result coming back from libcudf has null_count few rows than # the input, so we must produce an ordering from the full # input range. - _, _, (ordering,) = self._groupby.groups( - [as_column(range(0, len(self.obj)))] - ) + _, _, (ordering,) = self._groups([as_column(range(0, len(self.obj)))]) if self._dropna and any( c.has_nulls(include_nan=True) > 0 for c in self.grouping._key_columns diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index a580c35ccbf..2f8a6d9e5e7 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -315,7 +315,7 @@ def _apply_agg_column(self, source_column, agg_name): {"dtype": source_column.dtype} if callable(agg_name) else self.agg_params, - ).c_obj, + ).plc_obj, ) ) diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index d8a2528230e..db4f3cd3c9f 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -3960,8 +3960,8 @@ def test_group_by_value_counts_with_count_column(): def test_groupby_internal_groups_empty(gdf): # test that we don't segfault when calling the internal # .groups() method with an empty list: - gb = gdf.groupby("y")._groupby - _, _, grouped_vals = gb.groups([]) + gb = gdf.groupby("y") + _, _, grouped_vals = gb._groups([]) assert grouped_vals == [] From 253b0d89152c7d004efdf58255c7f0c1ec4f2c25 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Thu, 19 Dec 2024 13:44:54 -0600 Subject: [PATCH 4/9] Add multi-partition `Scan` support to cuDF-Polars (#17494) Adds multi-partition `Scan` support following the same design as https://github.com/rapidsai/cudf/pull/17441 Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/17494 --- python/cudf_polars/cudf_polars/callback.py | 3 +- .../cudf_polars/experimental/io.py | 283 +++++++++++++++++- .../tests/experimental/test_scan.py | 80 +++++ 3 files changed, 362 insertions(+), 4 deletions(-) create mode 100644 python/cudf_polars/tests/experimental/test_scan.py diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index 29d3dc4ae79..074096446fd 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -231,7 +231,8 @@ def validate_config_options(config: dict) -> None: executor = config.get("executor", "pylibcudf") if executor == "dask-experimental": unsupported = config.get("executor_options", {}).keys() - { - "max_rows_per_partition" + "max_rows_per_partition", + "parquet_blocksize", } else: unsupported = config.get("executor_options", {}).keys() diff --git a/python/cudf_polars/cudf_polars/experimental/io.py b/python/cudf_polars/cudf_polars/experimental/io.py index 3a1fec36079..2a5b400af4c 100644 --- a/python/cudf_polars/cudf_polars/experimental/io.py +++ b/python/cudf_polars/cudf_polars/experimental/io.py @@ -4,18 +4,24 @@ from __future__ import annotations +import enum import math -from typing import TYPE_CHECKING +import random +from enum import IntEnum +from typing import TYPE_CHECKING, Any -from cudf_polars.dsl.ir import DataFrameScan, Union +import pylibcudf as plc + +from cudf_polars.dsl.ir import IR, DataFrameScan, Scan, Union from cudf_polars.experimental.base import PartitionInfo from cudf_polars.experimental.dispatch import lower_ir_node if TYPE_CHECKING: from collections.abc import MutableMapping - from cudf_polars.dsl.ir import IR + from cudf_polars.dsl.expr import NamedExpr from cudf_polars.experimental.dispatch import LowerIRTransformer + from cudf_polars.typing import Schema @lower_ir_node.register(DataFrameScan) @@ -47,3 +53,274 @@ def _( } return ir, {ir: PartitionInfo(count=1)} + + +class ScanPartitionFlavor(IntEnum): + """Flavor of Scan partitioning.""" + + SINGLE_FILE = enum.auto() # 1:1 mapping between files and partitions + SPLIT_FILES = enum.auto() # Split each file into >1 partition + FUSED_FILES = enum.auto() # Fuse multiple files into each partition + + +class ScanPartitionPlan: + """ + Scan partitioning plan. + + Notes + ----- + The meaning of `factor` depends on the value of `flavor`: + - SINGLE_FILE: `factor` must be `1`. + - SPLIT_FILES: `factor` is the number of partitions per file. + - FUSED_FILES: `factor` is the number of files per partition. + """ + + __slots__ = ("factor", "flavor") + factor: int + flavor: ScanPartitionFlavor + + def __init__(self, factor: int, flavor: ScanPartitionFlavor) -> None: + if ( + flavor == ScanPartitionFlavor.SINGLE_FILE and factor != 1 + ): # pragma: no cover + raise ValueError(f"Expected factor == 1 for {flavor}, got: {factor}") + self.factor = factor + self.flavor = flavor + + @staticmethod + def from_scan(ir: Scan) -> ScanPartitionPlan: + """Extract the partitioning plan of a Scan operation.""" + if ir.typ == "parquet": + # TODO: Use system info to set default blocksize + parallel_options = ir.config_options.get("executor_options", {}) + blocksize: int = parallel_options.get("parquet_blocksize", 1024**3) + stats = _sample_pq_statistics(ir) + file_size = sum(float(stats[column]) for column in ir.schema) + if file_size > 0: + if file_size > blocksize: + # Split large files + return ScanPartitionPlan( + math.ceil(file_size / blocksize), + ScanPartitionFlavor.SPLIT_FILES, + ) + else: + # Fuse small files + return ScanPartitionPlan( + max(blocksize // int(file_size), 1), + ScanPartitionFlavor.FUSED_FILES, + ) + + # TODO: Use file sizes for csv and json + return ScanPartitionPlan(1, ScanPartitionFlavor.SINGLE_FILE) + + +class SplitScan(IR): + """ + Input from a split file. + + This class wraps a single-file `Scan` object. At + IO/evaluation time, this class will only perform + a partial read of the underlying file. The range + (skip_rows and n_rows) is calculated at IO time. + """ + + __slots__ = ( + "base_scan", + "schema", + "split_index", + "total_splits", + ) + _non_child = ( + "schema", + "base_scan", + "split_index", + "total_splits", + ) + base_scan: Scan + """Scan operation this node is based on.""" + split_index: int + """Index of the current split.""" + total_splits: int + """Total number of splits.""" + + def __init__( + self, schema: Schema, base_scan: Scan, split_index: int, total_splits: int + ): + self.schema = schema + self.base_scan = base_scan + self.split_index = split_index + self.total_splits = total_splits + self._non_child_args = ( + split_index, + total_splits, + *base_scan._non_child_args, + ) + self.children = () + if base_scan.typ not in ("parquet",): # pragma: no cover + raise NotImplementedError( + f"Unhandled Scan type for file splitting: {base_scan.typ}" + ) + + @classmethod + def do_evaluate( + cls, + split_index: int, + total_splits: int, + schema: Schema, + typ: str, + reader_options: dict[str, Any], + config_options: dict[str, Any], + paths: list[str], + with_columns: list[str] | None, + skip_rows: int, + n_rows: int, + row_index: tuple[str, int] | None, + predicate: NamedExpr | None, + ): + """Evaluate and return a dataframe.""" + if typ not in ("parquet",): # pragma: no cover + raise NotImplementedError(f"Unhandled Scan type for file splitting: {typ}") + + if len(paths) > 1: # pragma: no cover + raise ValueError(f"Expected a single path, got: {paths}") + + # Parquet logic: + # - We are one of "total_splits" SplitScan nodes + # assigned to the same file. + # - We know our index within this file ("split_index") + # - We can also use parquet metadata to query the + # total number of rows in each row-group of the file. + # - We can use all this information to calculate the + # "skip_rows" and "n_rows" options to use locally. + + rowgroup_metadata = plc.io.parquet_metadata.read_parquet_metadata( + plc.io.SourceInfo(paths) + ).rowgroup_metadata() + total_row_groups = len(rowgroup_metadata) + if total_splits <= total_row_groups: + # We have enough row-groups in the file to align + # all "total_splits" of our reads with row-group + # boundaries. Calculate which row-groups to include + # in the current read, and use metadata to translate + # the row-group indices to "skip_rows" and "n_rows". + rg_stride = total_row_groups // total_splits + skip_rgs = rg_stride * split_index + skip_rows = sum(rg["num_rows"] for rg in rowgroup_metadata[:skip_rgs]) + n_rows = sum( + rg["num_rows"] + for rg in rowgroup_metadata[skip_rgs : skip_rgs + rg_stride] + ) + else: + # There are not enough row-groups to align + # all "total_splits" of our reads with row-group + # boundaries. Use metadata to directly calculate + # "skip_rows" and "n_rows" for the current read. + total_rows = sum(rg["num_rows"] for rg in rowgroup_metadata) + n_rows = total_rows // total_splits + skip_rows = n_rows * split_index + + # Last split should always read to end of file + if split_index == (total_splits - 1): + n_rows = -1 + + # Perform the partial read + return Scan.do_evaluate( + schema, + typ, + reader_options, + config_options, + paths, + with_columns, + skip_rows, + n_rows, + row_index, + predicate, + ) + + +def _sample_pq_statistics(ir: Scan) -> dict[str, float]: + import numpy as np + import pyarrow.dataset as pa_ds + + # Use average total_uncompressed_size of three files + # TODO: Use plc.io.parquet_metadata.read_parquet_metadata + n_sample = 3 + column_sizes = {} + ds = pa_ds.dataset(random.sample(ir.paths, n_sample), format="parquet") + for i, frag in enumerate(ds.get_fragments()): + md = frag.metadata + for rg in range(md.num_row_groups): + row_group = md.row_group(rg) + for col in range(row_group.num_columns): + column = row_group.column(col) + name = column.path_in_schema + if name not in column_sizes: + column_sizes[name] = np.zeros(n_sample, dtype="int64") + column_sizes[name][i] += column.total_uncompressed_size + + return {name: np.mean(sizes) for name, sizes in column_sizes.items()} + + +@lower_ir_node.register(Scan) +def _( + ir: Scan, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + partition_info: MutableMapping[IR, PartitionInfo] + if ir.typ in ("csv", "parquet", "ndjson") and ir.n_rows == -1 and ir.skip_rows == 0: + plan = ScanPartitionPlan.from_scan(ir) + paths = list(ir.paths) + if plan.flavor == ScanPartitionFlavor.SPLIT_FILES: + # Disable chunked reader when splitting files + config_options = ir.config_options.copy() + config_options["parquet_options"] = config_options.get( + "parquet_options", {} + ).copy() + config_options["parquet_options"]["chunked"] = False + + slices: list[SplitScan] = [] + for path in paths: + base_scan = Scan( + ir.schema, + ir.typ, + ir.reader_options, + ir.cloud_options, + config_options, + [path], + ir.with_columns, + ir.skip_rows, + ir.n_rows, + ir.row_index, + ir.predicate, + ) + slices.extend( + SplitScan(ir.schema, base_scan, sindex, plan.factor) + for sindex in range(plan.factor) + ) + new_node = Union(ir.schema, None, *slices) + partition_info = {slice: PartitionInfo(count=1) for slice in slices} | { + new_node: PartitionInfo(count=len(slices)) + } + else: + groups: list[Scan] = [ + Scan( + ir.schema, + ir.typ, + ir.reader_options, + ir.cloud_options, + ir.config_options, + paths[i : i + plan.factor], + ir.with_columns, + ir.skip_rows, + ir.n_rows, + ir.row_index, + ir.predicate, + ) + for i in range(0, len(paths), plan.factor) + ] + new_node = Union(ir.schema, None, *groups) + partition_info = {group: PartitionInfo(count=1) for group in groups} | { + new_node: PartitionInfo(count=len(groups)) + } + return new_node, partition_info + + return ir, {ir: PartitionInfo(count=1)} # pragma: no cover diff --git a/python/cudf_polars/tests/experimental/test_scan.py b/python/cudf_polars/tests/experimental/test_scan.py new file mode 100644 index 00000000000..a26d751dc86 --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_scan.py @@ -0,0 +1,80 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars import Translator +from cudf_polars.experimental.parallel import lower_ir_graph +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.fixture(scope="module") +def df(): + return pl.DataFrame( + { + "x": range(3_000), + "y": ["cat", "dog", "fish"] * 1_000, + "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 600, + } + ) + + +def make_source(df, path, fmt, n_files=3): + n_rows = len(df) + stride = int(n_rows / n_files) + for i in range(n_files): + offset = stride * i + part = df.slice(offset, stride) + if fmt == "csv": + part.write_csv(path / f"part.{i}.csv") + elif fmt == "ndjson": + part.write_ndjson(path / f"part.{i}.ndjson") + else: + part.write_parquet( + path / f"part.{i}.parquet", + row_group_size=int(stride / 2), + ) + + +@pytest.mark.parametrize( + "fmt, scan_fn", + [ + ("csv", pl.scan_csv), + ("ndjson", pl.scan_ndjson), + ("parquet", pl.scan_parquet), + ], +) +def test_parallel_scan(tmp_path, df, fmt, scan_fn): + make_source(df, tmp_path, fmt) + q = scan_fn(tmp_path) + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + ) + assert_gpu_result_equal(q, engine=engine) + + +@pytest.mark.parametrize("blocksize", [1_000, 10_000, 1_000_000]) +def test_parquet_blocksize(tmp_path, df, blocksize): + n_files = 3 + make_source(df, tmp_path, "parquet", n_files) + q = pl.scan_parquet(tmp_path) + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + executor_options={"parquet_blocksize": blocksize}, + ) + assert_gpu_result_equal(q, engine=engine) + + # Check partitioning + qir = Translator(q._ldf.visit(), engine).translate_ir() + ir, info = lower_ir_graph(qir) + count = info[ir].count + if blocksize <= 12_000: + assert count > n_files + else: + assert count < n_files From 550ea35d55390ddbf4f128c6fdf2e877d9aa0306 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 19 Dec 2024 13:52:02 -0800 Subject: [PATCH 5/9] Support compression= in DataFrame.to_json (#17634) closes https://github.com/rapidsai/cudf/issues/17564 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Matthew Murray (https://github.com/Matt711) URL: https://github.com/rapidsai/cudf/pull/17634 --- python/cudf/cudf/io/json.py | 30 ++++++++++++------- python/cudf/cudf/tests/test_json.py | 9 ++++++ python/pylibcudf/pylibcudf/io/json.pxd | 2 ++ python/pylibcudf/pylibcudf/io/json.pyi | 2 ++ python/pylibcudf/pylibcudf/io/json.pyx | 30 +++++++++++++++++++ .../pylibcudf/pylibcudf/libcudf/io/json.pxd | 8 +++++ 6 files changed, 70 insertions(+), 11 deletions(-) diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 39a85465deb..4be556e1d67 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -54,6 +54,22 @@ def _get_cudf_schema_element_from_dtype( return lib_type, child_types +def _to_plc_compression( + compression: Literal["infer", "gzip", "bz2", "zip", "xz", None], +) -> plc.io.types.CompressionType: + if compression is not None: + if compression == "gzip": + return plc.io.types.CompressionType.GZIP + elif compression == "bz2": + return plc.io.types.CompressionType.BZIP2 + elif compression == "zip": + return plc.io.types.CompressionType.ZIP + else: + return plc.io.types.CompressionType.AUTO + else: + return plc.io.types.CompressionType.NONE + + @ioutils.doc_read_json() def read_json( path_or_buf, @@ -115,17 +131,7 @@ def read_json( if isinstance(source, str) and not os.path.isfile(source): filepaths_or_buffers[idx] = source.encode() - if compression is not None: - if compression == "gzip": - c_compression = plc.io.types.CompressionType.GZIP - elif compression == "bz2": - c_compression = plc.io.types.CompressionType.BZIP2 - elif compression == "zip": - c_compression = plc.io.types.CompressionType.ZIP - else: - c_compression = plc.io.types.CompressionType.AUTO - else: - c_compression = plc.io.types.CompressionType.NONE + c_compression = _to_plc_compression(compression) if on_bad_lines.lower() == "error": c_on_bad_lines = plc.io.types.JSONRecoveryMode.FAIL @@ -291,6 +297,7 @@ def _plc_write_json( include_nulls: bool = True, lines: bool = False, rows_per_chunk: int = 1024 * 64, # 64K rows + compression: Literal["infer", "gzip", "bz2", "zip", "xz", None] = None, ) -> None: try: tbl_w_meta = plc.io.TableWithMetadata( @@ -307,6 +314,7 @@ def _plc_write_json( .na_rep(na_rep) .include_nulls(include_nulls) .lines(lines) + .compression(_to_plc_compression(compression)) .build() ) if rows_per_chunk != np.iinfo(np.int32).max: diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index aaa8d7d07ee..db34329261f 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -1453,3 +1453,12 @@ def test_chunked_json_reader(): with cudf.option_context("io.json.low_memory", True): gdf = cudf.read_json(buf, lines=True) assert_eq(df, gdf) + + +@pytest.mark.parametrize("compression", ["gzip", None]) +def test_roundtrip_compression(compression, tmp_path): + expected = cudf.DataFrame({"a": 1, "b": "2"}) + fle = BytesIO() + expected.to_json(fle, engine="cudf", compression=compression) + result = cudf.read_json(fle, engine="cudf", compression=compression) + assert_eq(result, expected) diff --git a/python/pylibcudf/pylibcudf/io/json.pxd b/python/pylibcudf/pylibcudf/io/json.pxd index 7e446298ba9..7ce3cb859a5 100644 --- a/python/pylibcudf/pylibcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/io/json.pxd @@ -62,6 +62,7 @@ cdef class JsonWriterOptions: cpdef void set_rows_per_chunk(self, size_type val) cpdef void set_true_value(self, str val) cpdef void set_false_value(self, str val) + cpdef void set_compression(self, compression_type comptype) cdef class JsonWriterOptionsBuilder: cdef json_writer_options_builder c_obj @@ -71,6 +72,7 @@ cdef class JsonWriterOptionsBuilder: cpdef JsonWriterOptionsBuilder na_rep(self, str val) cpdef JsonWriterOptionsBuilder include_nulls(self, bool val) cpdef JsonWriterOptionsBuilder lines(self, bool val) + cpdef JsonWriterOptionsBuilder compression(self, compression_type comptype) cpdef JsonWriterOptions build(self) cpdef void write_json(JsonWriterOptions options) diff --git a/python/pylibcudf/pylibcudf/io/json.pyi b/python/pylibcudf/pylibcudf/io/json.pyi index b84b437a3a2..db4546f138d 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyi +++ b/python/pylibcudf/pylibcudf/io/json.pyi @@ -60,12 +60,14 @@ class JsonWriterOptions: def set_rows_per_chunk(self, val: int) -> None: ... def set_true_value(self, val: str) -> None: ... def set_false_value(self, val: str) -> None: ... + def set_compression(self, comptype: CompressionType) -> None: ... class JsonWriterOptionsBuilder: def metadata(self, tbl_w_meta: TableWithMetadata) -> Self: ... def na_rep(self, val: str) -> Self: ... def include_nulls(self, val: bool) -> Self: ... def lines(self, val: bool) -> Self: ... + def compression(self, comptype: CompressionType) -> Self: ... def build(self) -> JsonWriterOptions: ... def write_json(options: JsonWriterOptions) -> None: ... diff --git a/python/pylibcudf/pylibcudf/io/json.pyx b/python/pylibcudf/pylibcudf/io/json.pyx index 1d8a559afad..cf286378902 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyx +++ b/python/pylibcudf/pylibcudf/io/json.pyx @@ -587,6 +587,20 @@ cdef class JsonWriterOptions: """ self.c_obj.set_false_value(val.encode()) + cpdef void set_compression(self, compression_type comptype): + """ + Sets compression type to be used + + Parameters + ---------- + comptype : CompressionType + Compression type for sink + + Returns + ------- + None + """ + self.c_obj.set_compression(comptype) cdef class JsonWriterOptionsBuilder: cpdef JsonWriterOptionsBuilder metadata(self, TableWithMetadata tbl_w_meta): @@ -653,6 +667,22 @@ cdef class JsonWriterOptionsBuilder: self.c_obj.lines(val) return self + cpdef JsonWriterOptionsBuilder compression(self, compression_type comptype): + """ + Sets compression type of output sink. + + Parameters + ---------- + comptype : CompressionType + Compression type used + + Returns + ------- + Self + """ + self.c_obj.compression(comptype) + return self + cpdef JsonWriterOptions build(self): """Create a JsonWriterOptions object""" cdef JsonWriterOptions json_options = JsonWriterOptions.__new__( diff --git a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd index c241c478f25..d23dd0685d1 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd @@ -167,6 +167,8 @@ cdef extern from "cudf/io/json.hpp" \ size_type get_rows_per_chunk() except +libcudf_exception_handler string get_true_value() except +libcudf_exception_handler string get_false_value() except +libcudf_exception_handler + cudf_io_types.compression_type get_compression()\ + except +libcudf_exception_handler # setter void set_table( @@ -181,6 +183,9 @@ cdef extern from "cudf/io/json.hpp" \ void set_rows_per_chunk(size_type val) except +libcudf_exception_handler void set_true_value(string val) except +libcudf_exception_handler void set_false_value(string val) except +libcudf_exception_handler + void set_compression( + cudf_io_types.compression_type comptype + ) except +libcudf_exception_handler @staticmethod json_writer_options_builder builder( @@ -218,6 +223,9 @@ cdef extern from "cudf/io/json.hpp" \ json_writer_options_builder& false_value( string val ) except +libcudf_exception_handler + json_writer_options_builder& compression( + cudf_io_types.compression_type comptype + ) except +libcudf_exception_handler json_writer_options build() except +libcudf_exception_handler From d8f469fb61af1824a9fe175b6c4bcdc1a0ceda3b Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 19 Dec 2024 14:05:04 -0800 Subject: [PATCH 6/9] Fix pylibcudf to_arrow with multiple nested data types (#17504) Fixes the following case ```python In [25]: import pyarrow as pa, pylibcudf as plc In [26]: pa_array = pa.array([[{"a": 1}]]) In [27]: pa_array.type Out[27]: ListType(list>) In [28]: plc_table = plc.Table([plc.interop.from_arrow(pa_array)]) In [29]: plc.interop.to_arrow(plc_table) RuntimeError: CUDF failure at: cpp/src/interop/to_arrow_schema.cpp:146: Number of field names and number of children doesn't match ``` Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/17504 --- python/pylibcudf/pylibcudf/interop.pyx | 13 +++++++++-- .../pylibcudf/pylibcudf/tests/test_interop.py | 22 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/python/pylibcudf/pylibcudf/interop.pyx b/python/pylibcudf/pylibcudf/interop.pyx index bd5397ac328..7a102cf0c88 100644 --- a/python/pylibcudf/pylibcudf/interop.pyx +++ b/python/pylibcudf/pylibcudf/interop.pyx @@ -273,10 +273,19 @@ cdef void _release_array(object array_capsule) noexcept: free(array) +def _maybe_create_nested_column_metadata(Column col): + return ColumnMetadata( + children_meta=[ + _maybe_create_nested_column_metadata(child) for child in col.children() + ] + ) + + def _table_to_schema(Table tbl, metadata): if metadata is None: - metadata = [ColumnMetadata() for _ in range(len(tbl.columns()))] - metadata = [ColumnMetadata(m) if isinstance(m, str) else m for m in metadata] + metadata = [_maybe_create_nested_column_metadata(col) for col in tbl.columns()] + else: + metadata = [ColumnMetadata(m) if isinstance(m, str) else m for m in metadata] cdef vector[column_metadata] c_metadata c_metadata.reserve(len(metadata)) diff --git a/python/pylibcudf/pylibcudf/tests/test_interop.py b/python/pylibcudf/pylibcudf/tests/test_interop.py index af80b6e5978..ca42eacdfdb 100644 --- a/python/pylibcudf/pylibcudf/tests/test_interop.py +++ b/python/pylibcudf/pylibcudf/tests/test_interop.py @@ -40,6 +40,28 @@ def test_struct_dtype_roundtrip(): assert arrow_type == struct_type +def test_table_with_nested_dtype_to_arrow(): + pa_array = pa.array([[{"": 1}]]) + plc_table = plc.Table([plc.interop.from_arrow(pa_array)]) + result = plc.interop.to_arrow(plc_table) + expected_schema = pa.schema( + [ + pa.field( + "", + pa.list_( + pa.field( + "", + pa.struct([pa.field("", pa.int64(), nullable=False)]), + nullable=False, + ) + ), + nullable=False, + ) + ] + ) + assert result.schema == expected_schema + + def test_decimal128_roundtrip(): decimal_type = pa.decimal128(10, 2) plc_type = plc.interop.from_arrow(decimal_type) From 6cd15133f048623c7fa9e78d6ae9249d9b8a194f Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Thu, 19 Dec 2024 20:59:50 -0500 Subject: [PATCH 7/9] Fix failing xgboost test in the cudf.pandas third-party integration tests (#17616) A part of #17490 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17616 --- .../third_party_integration_tests/tests/test_xgboost.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf_pandas_tests/third_party_integration_tests/tests/test_xgboost.py b/python/cudf/cudf_pandas_tests/third_party_integration_tests/tests/test_xgboost.py index 0fd632507a6..ba98273404d 100644 --- a/python/cudf/cudf_pandas_tests/third_party_integration_tests/tests/test_xgboost.py +++ b/python/cudf/cudf_pandas_tests/third_party_integration_tests/tests/test_xgboost.py @@ -113,9 +113,6 @@ def test_with_external_memory( return predt -@pytest.mark.skip( - reason="TypeError: Implicit conversion to a NumPy array is not allowed. Please use `.get()` to construct a NumPy array explicitly." -) @pytest.mark.parametrize("device", ["cpu", "cuda"]) def test_predict(device: str) -> np.ndarray: reg = xgb.XGBRegressor(n_estimators=2, device=device) @@ -127,6 +124,11 @@ def test_predict(device: str) -> np.ndarray: predt0 = reg.predict(X_df) predt1 = booster.inplace_predict(X_df) + # After https://github.com/dmlc/xgboost/pull/11014, .inplace_predict() + # returns a real cupy array when called on a cudf.pandas proxy dataframe. + # So we need to ensure we have a valid numpy array. + if not isinstance(predt1, np.ndarray): + predt1 = predt1.get() np.testing.assert_allclose(predt0, predt1) predt2 = booster.predict(xgb.DMatrix(X_df)) From 2837a45e05f97f0a63c8ca45e57057c4d2a52ee2 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:08:21 -0800 Subject: [PATCH 8/9] Simplify expression transformer in Parquet predicate pushdown with `ast::tree` (#17587) This PR simplifies the StatsAST expression transformer in Parquet reader's predicate pushdown using `ast::tree` from (https://github.com/rapidsai/cudf/pull/17156). This PR is a follow up to @bdice's comment at https://github.com/rapidsai/cudf/pull/17289#discussion_r1876966752. Similar changes for the `BloomfilterAST` expression converter have been incorporated in the PR https://github.com/rapidsai/cudf/pull/17289. Related to https://github.com/rapidsai/cudf/issues/17164 Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Karthikeyan (https://github.com/karthikeyann) - Vukasin Milovanovic (https://github.com/vuule) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17587 --- cpp/src/io/parquet/predicate_pushdown.cpp | 51 +++++++++++------------ 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/cpp/src/io/parquet/predicate_pushdown.cpp b/cpp/src/io/parquet/predicate_pushdown.cpp index b0cbabf1c12..9047ff9169b 100644 --- a/cpp/src/io/parquet/predicate_pushdown.cpp +++ b/cpp/src/io/parquet/predicate_pushdown.cpp @@ -265,7 +265,6 @@ class stats_expression_converter : public ast::detail::expression_transformer { */ std::reference_wrapper visit(ast::literal const& expr) override { - _stats_expr = std::reference_wrapper(expr); return expr; } @@ -278,7 +277,6 @@ class stats_expression_converter : public ast::detail::expression_transformer { "Statistics AST supports only left table"); CUDF_EXPECTS(expr.get_column_index() < _num_columns, "Column index cannot be more than number of columns in the table"); - _stats_expr = std::reference_wrapper(expr); return expr; } @@ -307,6 +305,9 @@ class stats_expression_converter : public ast::detail::expression_transformer { CUDF_EXPECTS(dynamic_cast(&operands[1].get()) != nullptr, "Second operand of binary operation with column reference must be a literal"); v->accept(*this); + // Push literal into the ast::tree + auto const& literal = + _stats_expr.push(*dynamic_cast(&operands[1].get())); auto const col_index = v->get_column_index(); switch (op) { /* transform to stats conditions. op(col, literal) @@ -318,34 +319,33 @@ class stats_expression_converter : public ast::detail::expression_transformer { col1 <= val --> vmin <= val */ case ast_operator::EQUAL: { - auto const& vmin = _col_ref.emplace_back(col_index * 2); - auto const& vmax = _col_ref.emplace_back(col_index * 2 + 1); - auto const& op1 = - _operators.emplace_back(ast_operator::LESS_EQUAL, vmin, operands[1].get()); - auto const& op2 = - _operators.emplace_back(ast_operator::GREATER_EQUAL, vmax, operands[1].get()); - _operators.emplace_back(ast::ast_operator::LOGICAL_AND, op1, op2); + auto const& vmin = _stats_expr.push(ast::column_reference{col_index * 2}); + auto const& vmax = _stats_expr.push(ast::column_reference{col_index * 2 + 1}); + _stats_expr.push(ast::operation{ + ast::ast_operator::LOGICAL_AND, + _stats_expr.push(ast::operation{ast_operator::GREATER_EQUAL, vmax, literal}), + _stats_expr.push(ast::operation{ast_operator::LESS_EQUAL, vmin, literal})}); break; } case ast_operator::NOT_EQUAL: { - auto const& vmin = _col_ref.emplace_back(col_index * 2); - auto const& vmax = _col_ref.emplace_back(col_index * 2 + 1); - auto const& op1 = _operators.emplace_back(ast_operator::NOT_EQUAL, vmin, vmax); - auto const& op2 = - _operators.emplace_back(ast_operator::NOT_EQUAL, vmax, operands[1].get()); - _operators.emplace_back(ast_operator::LOGICAL_OR, op1, op2); + auto const& vmin = _stats_expr.push(ast::column_reference{col_index * 2}); + auto const& vmax = _stats_expr.push(ast::column_reference{col_index * 2 + 1}); + _stats_expr.push(ast::operation{ + ast_operator::LOGICAL_OR, + _stats_expr.push(ast::operation{ast_operator::NOT_EQUAL, vmin, vmax}), + _stats_expr.push(ast::operation{ast_operator::NOT_EQUAL, vmax, literal})}); break; } case ast_operator::LESS: [[fallthrough]]; case ast_operator::LESS_EQUAL: { - auto const& vmin = _col_ref.emplace_back(col_index * 2); - _operators.emplace_back(op, vmin, operands[1].get()); + auto const& vmin = _stats_expr.push(ast::column_reference{col_index * 2}); + _stats_expr.push(ast::operation{op, vmin, literal}); break; } case ast_operator::GREATER: [[fallthrough]]; case ast_operator::GREATER_EQUAL: { - auto const& vmax = _col_ref.emplace_back(col_index * 2 + 1); - _operators.emplace_back(op, vmax, operands[1].get()); + auto const& vmax = _stats_expr.push(ast::column_reference{col_index * 2 + 1}); + _stats_expr.push(ast::operation{op, vmax, literal}); break; } default: CUDF_FAIL("Unsupported operation in Statistics AST"); @@ -353,13 +353,12 @@ class stats_expression_converter : public ast::detail::expression_transformer { } else { auto new_operands = visit_operands(operands); if (cudf::ast::detail::ast_operator_arity(op) == 2) { - _operators.emplace_back(op, new_operands.front(), new_operands.back()); + _stats_expr.push(ast::operation{op, new_operands.front(), new_operands.back()}); } else if (cudf::ast::detail::ast_operator_arity(op) == 1) { - _operators.emplace_back(op, new_operands.front()); + _stats_expr.push(ast::operation{op, new_operands.front()}); } } - _stats_expr = std::reference_wrapper(_operators.back()); - return std::reference_wrapper(_operators.back()); + return _stats_expr.back(); } /** @@ -369,7 +368,7 @@ class stats_expression_converter : public ast::detail::expression_transformer { */ [[nodiscard]] std::reference_wrapper get_stats_expr() const { - return _stats_expr.value().get(); + return _stats_expr.back(); } private: @@ -383,10 +382,8 @@ class stats_expression_converter : public ast::detail::expression_transformer { } return transformed_operands; } - std::optional> _stats_expr; + ast::tree _stats_expr; size_type _num_columns; - std::list _col_ref; - std::list _operators; }; } // namespace From 27404bc92348a8c82d4857dc83090eab16154f23 Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:51:51 -0800 Subject: [PATCH 9/9] Implement `HOST_UDF` aggregation for groupby (#17592) This implements `HOST_UDF` aggregation, allowing to execute a host-side user-defined function (UDF) through libcudf aggregation framework. * A host-side function can be an arbitrarily independent function running on the host machine. It may or may not call other device kernels depending on its implementation. * Such user-defined function must follow the libcudf provided interface (`cudf::host_udf_base`). The interface provides the ability to fully interact with libcudf aggregation framework. * Since it is implemented on the user application side, it has a very high degree of freedom to perform arbitrary operations to satisfy the user's need. Partially contributes to https://github.com/rapidsai/cudf/issues/16633. --- Usage 1. Define a functor deriving from `cudf::host_udf_base` and implement the required virtual functions declared in that base struct. For example: ``` struct my_aggregation : cudf::host_udf_base { ... }; ``` 2. Create an instance of libcudf `HOST_UDF` aggregation which is constructed from an instance of the functor defined above. For example: ``` auto agg = cudf::make_host_udf_aggregation( std::make_unique()); ``` 3. Perform aggregation operation on the created instance. Authors: - Nghia Truong (https://github.com/ttnghia) Approvers: - Yunsong Wang (https://github.com/PointKernel) - Chong Gao (https://github.com/res-life) - Vyas Ramasubramani (https://github.com/vyasr) - David Wendt (https://github.com/davidwendt) URL: https://github.com/rapidsai/cudf/pull/17592 --- cpp/CMakeLists.txt | 3 +- cpp/include/cudf/aggregation.hpp | 19 +- cpp/include/cudf/aggregation/host_udf.hpp | 294 ++++++++++++++++++ .../cudf/detail/aggregation/aggregation.hpp | 41 +++ cpp/src/aggregation/aggregation.cpp | 11 + cpp/src/groupby/groupby.cu | 58 ++-- cpp/src/groupby/sort/aggregate.cpp | 60 ++++ cpp/src/groupby/sort/host_udf_aggregation.cpp | 103 ++++++ cpp/tests/CMakeLists.txt | 2 + cpp/tests/groupby/host_udf_example_tests.cu | 245 +++++++++++++++ cpp/tests/groupby/host_udf_tests.cpp | 241 ++++++++++++++ .../main/java/ai/rapids/cudf/Aggregation.java | 62 +++- .../ai/rapids/cudf/GroupByAggregation.java | 9 + .../java/ai/rapids/cudf/HostUDFWrapper.java | 34 ++ java/src/main/native/src/AggregationJni.cpp | 36 ++- 15 files changed, 1175 insertions(+), 43 deletions(-) create mode 100644 cpp/include/cudf/aggregation/host_udf.hpp create mode 100644 cpp/src/groupby/sort/host_udf_aggregation.cpp create mode 100644 cpp/tests/groupby/host_udf_example_tests.cu create mode 100644 cpp/tests/groupby/host_udf_tests.cpp create mode 100644 java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9cbacee8e8d..8c6cd922747 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -446,7 +446,6 @@ add_library( src/groupby/sort/group_quantiles.cu src/groupby/sort/group_std.cu src/groupby/sort/group_sum.cu - src/groupby/sort/scan.cpp src/groupby/sort/group_count_scan.cu src/groupby/sort/group_max_scan.cu src/groupby/sort/group_min_scan.cu @@ -454,6 +453,8 @@ add_library( src/groupby/sort/group_rank_scan.cu src/groupby/sort/group_replace_nulls.cu src/groupby/sort/group_sum_scan.cu + src/groupby/sort/host_udf_aggregation.cpp + src/groupby/sort/scan.cpp src/groupby/sort/sort_helper.cu src/hash/md5_hash.cu src/hash/murmurhash3_x86_32.cu diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index f5f514d26d9..a1b7db5e08a 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -110,8 +110,9 @@ class aggregation { COLLECT_SET, ///< collect values into a list without duplicate entries LEAD, ///< window function, accesses row at specified offset following current row LAG, ///< window function, accesses row at specified offset preceding current row - PTX, ///< PTX UDF based reduction - CUDA, ///< CUDA UDF based reduction + PTX, ///< PTX based UDF aggregation + CUDA, ///< CUDA based UDF aggregation + HOST_UDF, ///< host based UDF aggregation MERGE_LISTS, ///< merge multiple lists values into one list MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries MERGE_M2, ///< merge partial values of M2 aggregation, @@ -120,7 +121,7 @@ class aggregation { TDIGEST, ///< create a tdigest from a set of input values MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together HISTOGRAM, ///< compute frequency of each element - MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation, + MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation }; aggregation() = delete; @@ -599,6 +600,18 @@ std::unique_ptr make_udf_aggregation(udf_type type, std::string const& user_defined_aggregator, data_type output_type); +// Forward declaration of `host_udf_base` for the factory function of `HOST_UDF` aggregation. +struct host_udf_base; + +/** + * @brief Factory to create a HOST_UDF aggregation. + * + * @param host_udf An instance of a class derived from `host_udf_base` to perform aggregation + * @return A HOST_UDF aggregation object + */ +template +std::unique_ptr make_host_udf_aggregation(std::unique_ptr host_udf); + /** * @brief Factory to create a MERGE_LISTS aggregation. * diff --git a/cpp/include/cudf/aggregation/host_udf.hpp b/cpp/include/cudf/aggregation/host_udf.hpp new file mode 100644 index 00000000000..bbce76dc5f3 --- /dev/null +++ b/cpp/include/cudf/aggregation/host_udf.hpp @@ -0,0 +1,294 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +/** + * @file host_udf.hpp + * @brief Declare the base class for host-side user-defined function (`HOST_UDF`) and example of + * subclass implementation. + */ + +namespace CUDF_EXPORT cudf { +/** + * @addtogroup aggregation_factories + * @{ + */ + +/** + * @brief The interface for host-based UDF implementation. + * + * An implementation of host-based UDF needs to be derived from this base class, defining + * its own version of the required functions. In particular: + * - The derived class is required to implement `get_empty_output`, `operator()`, `is_equal`, + * and `clone` functions. + * - If necessary, the derived class can also override `do_hash` to compute hashing for its + * instance, and `get_required_data` to selectively access to the input data as well as + * intermediate data provided by libcudf. + * + * Example of such implementation: + * @code{.cpp} + * struct my_udf_aggregation : cudf::host_udf_base { + * my_udf_aggregation() = default; + * + * // This UDF aggregation needs `GROUPED_VALUES` and `GROUP_OFFSETS`, + * // and the result from groupby `MAX` aggregation. + * [[nodiscard]] data_attribute_set_t get_required_data() const override + * { + * return {groupby_data_attribute::GROUPED_VALUES, + * groupby_data_attribute::GROUP_OFFSETS, + * cudf::make_max_aggregation()}; + * } + * + * [[nodiscard]] output_t get_empty_output( + * [[maybe_unused]] std::optional output_dtype, + * [[maybe_unused]] rmm::cuda_stream_view stream, + * [[maybe_unused]] rmm::device_async_resource_ref mr) const override + * { + * // This UDF aggregation always returns a column of type INT32. + * return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); + * } + * + * [[nodiscard]] output_t operator()(input_map_t const& input, + * rmm::cuda_stream_view stream, + * rmm::device_async_resource_ref mr) const override + * { + * // Perform UDF computation using the input data and return the result. + * } + * + * [[nodiscard]] bool is_equal(host_udf_base const& other) const override + * { + * // Check if the other object is also instance of this class. + * return dynamic_cast(&other) != nullptr; + * } + * + * [[nodiscard]] std::unique_ptr clone() const override + * { + * return std::make_unique(); + * } + * }; + * @endcode + */ +struct host_udf_base { + host_udf_base() = default; + virtual ~host_udf_base() = default; + + /** + * @brief Define the possible data needed for groupby aggregations. + * + * Note that only sort-based groupby aggregations are supported. + */ + enum class groupby_data_attribute : int32_t { + INPUT_VALUES, ///< The input values column. + GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the + ///< values within each group maintain their original order. + SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and + ///< sorted within each group. + NUM_GROUPS, ///< The number of groups (i.e., number of distinct keys). + GROUP_OFFSETS, ///< The offsets separating groups. + GROUP_LABELS ///< Group labels (which is also the same as group indices). + }; + + /** + * @brief Describe possible data that may be needed in the derived class for its operations. + * + * Such data can be either intermediate data such as sorted values or group labels etc, or the + * results of other aggregations. + * + * Each derived host-based UDF class may need a different set of data. It is inefficient to + * evaluate and pass down all these possible data at once from libcudf. A solution for that is, + * the derived class can define a subset of data that it needs and libcudf will evaluate + * and pass down only data requested from that set. + */ + struct data_attribute { + /** + * @brief Hold all possible data types for the input of the aggregation in the derived class. + */ + using value_type = std::variant>; + value_type value; ///< The actual data attribute, wrapped by this struct + ///< as a wrapper is needed to define `hash` and `equal_to` functors. + + data_attribute() = default; ///< Default constructor + data_attribute(data_attribute&&) = default; ///< Move constructor + + /** + * @brief Construct a new data attribute from an aggregation attribute. + * @param value_ An aggregation attribute + */ + template )> + data_attribute(T value_) : value{value_} + { + } + + /** + * @brief Construct a new data attribute from another aggregation request. + * @param value_ An aggregation request + */ + template || + std::is_same_v)> + data_attribute(std::unique_ptr value_) : value{std::move(value_)} + { + CUDF_EXPECTS(std::get>(value) != nullptr, + "Invalid aggregation request."); + if constexpr (std::is_same_v) { + CUDF_EXPECTS( + dynamic_cast(std::get>(value).get()) != nullptr, + "Requesting results from other aggregations is only supported in groupby " + "aggregations."); + } + } + + /** + * @brief Copy constructor. + * @param other The other data attribute to copy from + */ + data_attribute(data_attribute const& other); + + /** + * @brief Hash functor for `data_attribute`. + */ + struct hash { + /** + * @brief Compute the hash value of a data attribute. + * @param attr The data attribute to hash + * @return The hash value of the data attribute + */ + std::size_t operator()(data_attribute const& attr) const; + }; // struct hash + + /** + * @brief Equality comparison functor for `data_attribute`. + */ + struct equal_to { + /** + * @brief Check if two data attributes are equal. + * @param lhs The left-hand side data attribute + * @param rhs The right-hand side data attribute + * @return True if the two data attributes are equal + */ + bool operator()(data_attribute const& lhs, data_attribute const& rhs) const; + }; // struct equal_to + }; // struct data_attribute + + /** + * @brief Set of attributes for the input data that is needed for computing the aggregation. + */ + using data_attribute_set_t = + std::unordered_set; + + /** + * @brief Return a set of attributes for the data that is needed for computing the aggregation. + * + * The derived class should return the attributes corresponding to only the data that it needs to + * avoid unnecessary computation performed in libcudf. If this function is not overridden, an + * empty set is returned. That means all the data attributes (except results from other + * aggregations in groupby) will be needed. + * + * @return A set of `data_attribute` + */ + [[nodiscard]] virtual data_attribute_set_t get_required_data() const { return {}; } + + /** + * @brief Hold all possible types of the data that is passed to the derived class for executing + * the aggregation. + */ + using input_data_t = std::variant>; + + /** + * @brief Input to the aggregation, mapping from each data attribute to its actual data. + */ + using input_map_t = std:: + unordered_map; + + /** + * @brief Output type of the aggregation. + * + * Currently only a single type is supported as the output of the aggregation, but it will hold + * more type in the future when reduction is supported. + */ + using output_t = std::variant>; + + /** + * @brief Get the output when the input values column is empty. + * + * This is called in libcudf when the input values column is empty. In such situations libcudf + * tries to generate the output directly without unnecessarily evaluating the intermediate data. + * + * @param output_dtype The expected output data type + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation when input values is empty + */ + [[nodiscard]] virtual output_t get_empty_output(std::optional output_dtype, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; + + /** + * @brief Perform the main computation for the host-based UDF. + * + * @param input The input data needed for performing all computation + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation + */ + [[nodiscard]] virtual output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; + + /** + * @brief Computes hash value of the class's instance. + * @return The hash value of the instance + */ + [[nodiscard]] virtual std::size_t do_hash() const + { + return std::hash{}(static_cast(aggregation::Kind::HOST_UDF)); + } + + /** + * @brief Compares two instances of the derived class for equality. + * @param other The other derived class's instance to compare with + * @return True if the two instances are equal + */ + [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0; + + /** + * @brief Clones the instance. + * + * A class derived from `host_udf_base` should not store too much data such that its instances + * remain lightweight for efficient cloning. + * + * @return A new instance cloned from this + */ + [[nodiscard]] virtual std::unique_ptr clone() const = 0; +}; + +/** @} */ // end of group +} // namespace CUDF_EXPORT cudf diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index 6661a461b8b..d873e93bd20 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -88,6 +89,8 @@ class simple_aggregations_collector { // Declares the interface for the simple class lead_lag_aggregation const& agg); virtual std::vector> visit(data_type col_type, class udf_aggregation const& agg); + virtual std::vector> visit(data_type col_type, + class host_udf_aggregation const& agg); virtual std::vector> visit(data_type col_type, class merge_lists_aggregation const& agg); virtual std::vector> visit(data_type col_type, @@ -135,6 +138,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer virtual void visit(class collect_set_aggregation const& agg); virtual void visit(class lead_lag_aggregation const& agg); virtual void visit(class udf_aggregation const& agg); + virtual void visit(class host_udf_aggregation const& agg); virtual void visit(class merge_lists_aggregation const& agg); virtual void visit(class merge_sets_aggregation const& agg); virtual void visit(class merge_m2_aggregation const& agg); @@ -960,6 +964,35 @@ class udf_aggregation final : public rolling_aggregation { } }; +/** + * @brief Derived class for specifying host-based UDF aggregation. + */ +class host_udf_aggregation final : public groupby_aggregation { + public: + std::unique_ptr udf_ptr; + + host_udf_aggregation() = delete; + host_udf_aggregation(host_udf_aggregation const&) = delete; + + // Need to define the constructor and destructor in a separate source file where we have the + // complete declaration of `host_udf_base`. + explicit host_udf_aggregation(std::unique_ptr udf_ptr_); + ~host_udf_aggregation() override; + + [[nodiscard]] bool is_equal(aggregation const& _other) const override; + + [[nodiscard]] size_t do_hash() const override; + + [[nodiscard]] std::unique_ptr clone() const override; + + std::vector> get_simple_aggregations( + data_type col_type, simple_aggregations_collector& collector) const override + { + return collector.visit(col_type, *this); + } + void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } +}; + /** * @brief Derived aggregation class for specifying MERGE_LISTS aggregation */ @@ -1462,6 +1495,12 @@ struct target_type_impl +struct target_type_impl { + // Just a placeholder. The actual return type is unknown. + using type = struct_view; +}; + /** * @brief Helper alias to get the accumulator type for performing aggregation * `k` on elements of type `Source` @@ -1579,6 +1618,8 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind return f.template operator()(std::forward(args)...); case aggregation::EWMA: return f.template operator()(std::forward(args)...); + case aggregation::HOST_UDF: + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Unsupported aggregation."); diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index a60a7f63882..0d4400b891b 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -237,6 +237,12 @@ std::vector> simple_aggregations_collector::visit( return visit(col_type, static_cast(agg)); } +std::vector> simple_aggregations_collector::visit( + data_type col_type, host_udf_aggregation const& agg) +{ + return visit(col_type, static_cast(agg)); +} + // aggregation_finalizer ---------------------------------------- void aggregation_finalizer::visit(aggregation const& agg) {} @@ -410,6 +416,11 @@ void aggregation_finalizer::visit(merge_tdigest_aggregation const& agg) visit(static_cast(agg)); } +void aggregation_finalizer::visit(host_udf_aggregation const& agg) +{ + visit(static_cast(agg)); +} + } // namespace detail std::vector> aggregation::get_simple_aggregations( diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu index c42038026e5..4c90cd0eef5 100644 --- a/cpp/src/groupby/groupby.cu +++ b/cpp/src/groupby/groupby.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -32,7 +33,6 @@ #include #include #include -#include #include #include #include @@ -99,6 +99,8 @@ namespace { struct empty_column_constructor { column_view values; aggregation const& agg; + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; template std::unique_ptr operator()() const @@ -108,7 +110,7 @@ struct empty_column_constructor { if constexpr (k == aggregation::Kind::COLLECT_LIST || k == aggregation::Kind::COLLECT_SET) { return make_lists_column( - 0, make_empty_column(type_to_id()), empty_like(values), 0, {}); + 0, make_empty_column(type_to_id()), empty_like(values), 0, {}, stream, mr); } if constexpr (k == aggregation::Kind::HISTOGRAM) { @@ -116,7 +118,9 @@ struct empty_column_constructor { make_empty_column(type_to_id()), cudf::reduction::detail::make_empty_histogram_like(values), 0, - {}); + {}, + stream, + mr); } if constexpr (k == aggregation::Kind::MERGE_HISTOGRAM) { return empty_like(values); } @@ -140,31 +144,41 @@ struct empty_column_constructor { return empty_like(values); } + if constexpr (k == aggregation::Kind::HOST_UDF) { + auto const& udf_ptr = dynamic_cast(agg).udf_ptr; + return std::get>(udf_ptr->get_empty_output(std::nullopt, stream, mr)); + } + return make_empty_column(target_type(values.type(), k)); } }; /// Make an empty table with appropriate types for requested aggs template -auto empty_results(host_span requests) +auto empty_results(host_span requests, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { std::vector empty_results; - std::transform( - requests.begin(), requests.end(), std::back_inserter(empty_results), [](auto const& request) { - std::vector> results; - - std::transform( - request.aggregations.begin(), - request.aggregations.end(), - std::back_inserter(results), - [&request](auto const& agg) { - return cudf::detail::dispatch_type_and_aggregation( - request.values.type(), agg->kind, empty_column_constructor{request.values, *agg}); - }); - - return aggregation_result{std::move(results)}; - }); + std::transform(requests.begin(), + requests.end(), + std::back_inserter(empty_results), + [stream, mr](auto const& request) { + std::vector> results; + + std::transform(request.aggregations.begin(), + request.aggregations.end(), + std::back_inserter(results), + [&request, stream, mr](auto const& agg) { + return cudf::detail::dispatch_type_and_aggregation( + request.values.type(), + agg->kind, + empty_column_constructor{request.values, *agg, stream, mr}); + }); + + return aggregation_result{std::move(results)}; + }); return empty_results; } @@ -206,7 +220,7 @@ std::pair, std::vector> groupby::aggr verify_valid_requests(requests); - if (_keys.num_rows() == 0) { return {empty_like(_keys), empty_results(requests)}; } + if (_keys.num_rows() == 0) { return {empty_like(_keys), empty_results(requests, stream, mr)}; } return dispatch_aggregation(requests, stream, mr); } @@ -226,7 +240,9 @@ std::pair, std::vector> groupby::scan verify_valid_requests(requests); - if (_keys.num_rows() == 0) { return std::pair(empty_like(_keys), empty_results(requests)); } + if (_keys.num_rows() == 0) { + return std::pair(empty_like(_keys), empty_results(requests, stream, mr)); + } return sort_scan(requests, stream, mr); } diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index 7a8a1883ed4..e9f885a5917 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -19,6 +19,7 @@ #include "groupby/sort/group_reductions.hpp" #include +#include #include #include #include @@ -795,6 +796,65 @@ void aggregate_result_functor::operator()(aggregatio mr)); } +template <> +void aggregate_result_functor::operator()(aggregation const& agg) +{ + if (cache.has_result(values, agg)) { return; } + + auto const& udf_ptr = dynamic_cast(agg).udf_ptr; + auto const data_attrs = [&]() -> host_udf_base::data_attribute_set_t { + if (auto tmp = udf_ptr->get_required_data(); !tmp.empty()) { return tmp; } + // Empty attribute set means everything. + return {host_udf_base::groupby_data_attribute::INPUT_VALUES, + host_udf_base::groupby_data_attribute::GROUPED_VALUES, + host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, + host_udf_base::groupby_data_attribute::NUM_GROUPS, + host_udf_base::groupby_data_attribute::GROUP_OFFSETS, + host_udf_base::groupby_data_attribute::GROUP_LABELS}; + }(); + + // Do not cache udf_input, as the actual input data may change from run to run. + host_udf_base::input_map_t udf_input; + for (auto const& attr : data_attrs) { + CUDF_EXPECTS(std::holds_alternative(attr.value) || + std::holds_alternative>(attr.value), + "Invalid input data attribute for HOST_UDF groupby aggregation."); + if (std::holds_alternative(attr.value)) { + switch (std::get(attr.value)) { + case host_udf_base::groupby_data_attribute::INPUT_VALUES: + udf_input.emplace(attr, values); + break; + case host_udf_base::groupby_data_attribute::GROUPED_VALUES: + udf_input.emplace(attr, get_grouped_values()); + break; + case host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES: + udf_input.emplace(attr, get_sorted_values()); + break; + case host_udf_base::groupby_data_attribute::NUM_GROUPS: + udf_input.emplace(attr, helper.num_groups(stream)); + break; + case host_udf_base::groupby_data_attribute::GROUP_OFFSETS: + udf_input.emplace(attr, helper.group_offsets(stream)); + break; + case host_udf_base::groupby_data_attribute::GROUP_LABELS: + udf_input.emplace(attr, helper.group_labels(stream)); + break; + default: CUDF_UNREACHABLE("Invalid input data attribute for HOST_UDF groupby aggregation."); + } + } else { // data is result from another aggregation + auto other_agg = std::get>(attr.value)->clone(); + cudf::detail::aggregation_dispatcher(other_agg->kind, *this, *other_agg); + auto result = cache.get_result(values, *other_agg); + udf_input.emplace(std::move(other_agg), std::move(result)); + } + } + + auto output = (*udf_ptr)(udf_input, stream, mr); + CUDF_EXPECTS(std::holds_alternative>(output), + "Invalid output type from HOST_UDF groupby aggregation."); + cache.add_result(values, agg, std::get>(std::move(output))); +} + } // namespace detail // Sort-based groupby diff --git a/cpp/src/groupby/sort/host_udf_aggregation.cpp b/cpp/src/groupby/sort/host_udf_aggregation.cpp new file mode 100644 index 00000000000..0da47e17f48 --- /dev/null +++ b/cpp/src/groupby/sort/host_udf_aggregation.cpp @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +namespace cudf { + +host_udf_base::data_attribute::data_attribute(data_attribute const& other) + : value{std::visit(cudf::detail::visitor_overload{[](auto const& val) { return value_type{val}; }, + [](std::unique_ptr const& val) { + return value_type{val->clone()}; + }}, + other.value)} +{ +} + +std::size_t host_udf_base::data_attribute::hash::operator()(data_attribute const& attr) const +{ + auto const hash_value = + std::visit(cudf::detail::visitor_overload{ + [](auto const& val) { return std::hash{}(static_cast(val)); }, + [](std::unique_ptr const& val) { return val->do_hash(); }}, + attr.value); + return std::hash{}(attr.value.index()) ^ hash_value; +} + +bool host_udf_base::data_attribute::equal_to::operator()(data_attribute const& lhs, + data_attribute const& rhs) const +{ + auto const& lhs_val = lhs.value; + auto const& rhs_val = rhs.value; + if (lhs_val.index() != rhs_val.index()) { return false; } + return std::visit( + cudf::detail::visitor_overload{ + [](auto const& lhs_val, auto const& rhs_val) { + if constexpr (std::is_same_v) { + return lhs_val == rhs_val; + } else { + return false; + } + }, + [](std::unique_ptr const& lhs_val, std::unique_ptr const& rhs_val) { + return lhs_val->is_equal(*rhs_val); + }}, + lhs_val, + rhs_val); +} + +namespace detail { + +host_udf_aggregation::host_udf_aggregation(std::unique_ptr udf_ptr_) + : aggregation{HOST_UDF}, udf_ptr{std::move(udf_ptr_)} +{ + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid host_udf_base instance."); +} + +host_udf_aggregation::~host_udf_aggregation() = default; + +bool host_udf_aggregation::is_equal(aggregation const& _other) const +{ + if (!this->aggregation::is_equal(_other)) { return false; } + auto const& other = dynamic_cast(_other); + return udf_ptr->is_equal(*other.udf_ptr); +} + +size_t host_udf_aggregation::do_hash() const +{ + return this->aggregation::do_hash() ^ udf_ptr->do_hash(); +} + +std::unique_ptr host_udf_aggregation::clone() const +{ + return std::make_unique(udf_ptr->clone()); +} + +} // namespace detail + +template +std::unique_ptr make_host_udf_aggregation(std::unique_ptr udf_ptr_) +{ + return std::make_unique(std::move(udf_ptr_)); +} +template CUDF_EXPORT std::unique_ptr make_host_udf_aggregation( + std::unique_ptr); +template CUDF_EXPORT std::unique_ptr + make_host_udf_aggregation(std::unique_ptr); + +} // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index adf512811cc..e5c29314203 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -132,6 +132,8 @@ ConfigureTest( groupby/groupby_test_util.cpp groupby/groups_tests.cpp groupby/histogram_tests.cpp + groupby/host_udf_example_tests.cu + groupby/host_udf_tests.cpp groupby/keys_tests.cpp groupby/lists_tests.cpp groupby/m2_tests.cpp diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu new file mode 100644 index 00000000000..a454bd692fc --- /dev/null +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -0,0 +1,245 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace { +/** + * @brief A host-based UDF implementation for groupby. + * + * For each group of values, the aggregation computes + * `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`. + */ +struct host_udf_groupby_example : cudf::host_udf_base { + host_udf_groupby_example() = default; + + [[nodiscard]] data_attribute_set_t get_required_data() const override + { + // We need grouped values, group offsets, group labels, and also results from groups' + // MAX and SUM aggregations. + return {groupby_data_attribute::GROUPED_VALUES, + groupby_data_attribute::GROUP_OFFSETS, + groupby_data_attribute::GROUP_LABELS, + cudf::make_max_aggregation(), + cudf::make_sum_aggregation()}; + } + + [[nodiscard]] output_t get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + return cudf::make_empty_column( + cudf::data_type{cudf::type_to_id()}); + } + + [[nodiscard]] output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + auto const& values = + std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); + return cudf::type_dispatcher(values.type(), groupby_fn{this}, input, stream, mr); + } + + [[nodiscard]] std::size_t do_hash() const override + { + // Just return the same hash for all instances of this class. + return std::size_t{12345}; + } + + [[nodiscard]] bool is_equal(host_udf_base const& other) const override + { + // Just check if the other object is also instance of this class. + return dynamic_cast(&other) != nullptr; + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(); + } + + struct groupby_fn { + // Store pointer to the parent class so we can call its functions. + host_udf_groupby_example const* parent; + + // For simplicity, this example only accepts double input and always produces double output. + using InputType = double; + using OutputType = double; + + template )> + output_t operator()(Args...) const + { + CUDF_FAIL("Unsupported input type."); + } + + template )> + output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + auto const& values = + std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); + if (values.size() == 0) { return parent->get_empty_output(std::nullopt, stream, mr); } + + auto const offsets = std::get>( + input.at(groupby_data_attribute::GROUP_OFFSETS)); + CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets."); + auto const num_groups = static_cast(offsets.size()) - 1; + auto const group_indices = std::get>( + input.at(groupby_data_attribute::GROUP_LABELS)); + auto const group_max = std::get( + input.at(cudf::make_max_aggregation())); + auto const group_sum = std::get( + input.at(cudf::make_sum_aggregation())); + + auto const values_dv_ptr = cudf::column_device_view::create(values, stream); + auto const output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, + num_groups, + cudf::mask_state::UNALLOCATED, + stream, + mr); + + // Store row index if it is valid, otherwise store a negative value denoting a null row. + rmm::device_uvector valid_idx(num_groups, stream); + + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_groups), + thrust::make_zip_iterator(output->mutable_view().begin(), valid_idx.begin()), + transform_fn{*values_dv_ptr, + offsets, + group_indices, + group_max.begin(), + group_sum.begin()}); + + auto const valid_idx_cv = cudf::column_view{ + cudf::data_type{cudf::type_id::INT32}, num_groups, valid_idx.begin(), nullptr, 0}; + return std::move(cudf::gather(cudf::table_view{{output->view()}}, + valid_idx_cv, + cudf::out_of_bounds_policy::NULLIFY, + stream, + mr) + ->release() + .front()); + } + + struct transform_fn { + cudf::column_device_view values; + cudf::device_span offsets; + cudf::device_span group_indices; + InputType const* group_max; + InputType const* group_sum; + + thrust::tuple __device__ operator()(cudf::size_type idx) const + { + auto const start = offsets[idx]; + auto const end = offsets[idx + 1]; + + auto constexpr invalid_idx = cuda::std::numeric_limits::lowest(); + if (start == end) { return {OutputType{0}, invalid_idx}; } + + auto sum_sqr = OutputType{0}; + bool has_valid{false}; + for (auto i = start; i < end; ++i) { + if (values.is_null(i)) { continue; } + has_valid = true; + auto const val = static_cast(values.element(i)); + sum_sqr += val * val; + } + + if (!has_valid) { return {OutputType{0}, invalid_idx}; } + return {static_cast(group_indices[start] + 1) * sum_sqr - + static_cast(group_max[idx]) * static_cast(group_sum[idx]), + idx}; + } + }; + }; +}; + +} // namespace + +using doubles_col = cudf::test::fixed_width_column_wrapper; +using int32s_col = cudf::test::fixed_width_column_wrapper; + +struct HostUDFGroupbyExampleTest : cudf::test::BaseFixture {}; + +TEST_F(HostUDFGroupbyExampleTest, SimpleInput) +{ + double constexpr null = 0.0; + auto const keys = int32s_col{0, 1, 2, 0, 1, 2, 0, 1, 2, 0}; + auto const vals = doubles_col{{0.0, null, 2.0, 3.0, null, 5.0, null, null, 8.0, 9.0}, + {true, false, true, true, false, true, false, false, true, true}}; + auto agg = cudf::make_host_udf_aggregation( + std::make_unique()); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + + auto const grp_result = gb_obj.aggregate(requests, cudf::test::get_default_stream()); + auto const& result = grp_result.second[0].results[0]; + + // Output type of groupby is double. + // Values grouped by keys: [ {0, 3, null, 9}, {null, null, null}, {2, 5, 8} ] + // Group sum_sqr: [ 90, null, 93 ] + // Group max: [ 9, null, 8 ] + // Group sum: [ 12, null, 15 ] + // Output: [ 1 * 90 - 9 * 12, null, 3 * 93 - 8 * 15 ] + auto const expected = doubles_col{{-18.0, null, 159.0}, {true, false, true}}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + +TEST_F(HostUDFGroupbyExampleTest, EmptyInput) +{ + auto const keys = int32s_col{}; + auto const vals = doubles_col{}; + auto agg = cudf::make_host_udf_aggregation( + std::make_unique()); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + + auto const grp_result = gb_obj.aggregate(requests, cudf::test::get_default_stream()); + auto const& result = grp_result.second[0].results[0]; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(vals, *result); +} diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp new file mode 100644 index 00000000000..1a0f68c0c6c --- /dev/null +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -0,0 +1,241 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace { +/** + * @brief A host-based UDF implementation used for unit tests. + */ +struct host_udf_test_base : cudf::host_udf_base { + int test_location_line; // the location where testing is called + bool* test_run; // to check if the test is accidentally skipped + data_attribute_set_t input_attrs; + + host_udf_test_base(int test_location_line_, bool* test_run_, data_attribute_set_t input_attrs_) + : test_location_line{test_location_line_}, + test_run{test_run_}, + input_attrs(std::move(input_attrs_)) + { + } + + [[nodiscard]] data_attribute_set_t get_required_data() const override { return input_attrs; } + + // This is the main testing function, which checks for the correctness of input data. + // The rests are just to satisfy the interface. + [[nodiscard]] output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + SCOPED_TRACE("Test instance created at line: " + std::to_string(test_location_line)); + + test_data_attributes(input, stream, mr); + + *test_run = true; // test is run successfully + return get_empty_output(std::nullopt, stream, mr); + } + + [[nodiscard]] output_t get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + // Unused function - dummy output. + return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); + } + + [[nodiscard]] std::size_t do_hash() const override { return 0; } + [[nodiscard]] bool is_equal(host_udf_base const& other) const override { return true; } + + // The main test function, which must be implemented for each kind of aggregations + // (groupby/reduction/segmented_reduction). + virtual void test_data_attributes(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; +}; + +/** + * @brief A host-based UDF implementation used for unit tests for groupby aggregation. + */ +struct host_udf_groupby_test : host_udf_test_base { + host_udf_groupby_test(int test_location_line_, + bool* test_run_, + data_attribute_set_t input_attrs_ = {}) + : host_udf_test_base(test_location_line_, test_run_, std::move(input_attrs_)) + { + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(test_location_line, test_run, input_attrs); + } + + void test_data_attributes(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + data_attribute_set_t check_attrs = input_attrs; + if (check_attrs.empty()) { + check_attrs = data_attribute_set_t{groupby_data_attribute::INPUT_VALUES, + groupby_data_attribute::GROUPED_VALUES, + groupby_data_attribute::SORTED_GROUPED_VALUES, + groupby_data_attribute::NUM_GROUPS, + groupby_data_attribute::GROUP_OFFSETS, + groupby_data_attribute::GROUP_LABELS}; + } + EXPECT_EQ(input.size(), check_attrs.size()); + for (auto const& attr : check_attrs) { + EXPECT_TRUE(input.count(attr) > 0); + EXPECT_TRUE(std::holds_alternative(attr.value) || + std::holds_alternative>(attr.value)); + if (std::holds_alternative(attr.value)) { + switch (std::get(attr.value)) { + case groupby_data_attribute::INPUT_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::GROUPED_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::SORTED_GROUPED_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::NUM_GROUPS: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::GROUP_OFFSETS: + EXPECT_TRUE( + std::holds_alternative>(input.at(attr))); + break; + case groupby_data_attribute::GROUP_LABELS: + EXPECT_TRUE( + std::holds_alternative>(input.at(attr))); + break; + default:; + } + } else { // std::holds_alternative>(attr.value) + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + } + } + } +}; + +/** + * @brief Get a random subset of input data attributes. + */ +cudf::host_udf_base::data_attribute_set_t get_subset( + cudf::host_udf_base::data_attribute_set_t const& attrs) +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution size_distr(1, attrs.size() - 1); + auto const subset_size = size_distr(gen); + auto const elements = + std::vector(attrs.begin(), attrs.end()); + std::uniform_int_distribution idx_distr(0, attrs.size() - 1); + cudf::host_udf_base::data_attribute_set_t output; + while (output.size() < subset_size) { + output.insert(elements[idx_distr(gen)]); + } + return output; +} + +/** + * @brief Generate a random aggregation object from {min, max, sum, product}. + */ +std::unique_ptr get_random_agg() +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution distr(1, 4); + switch (distr(gen)) { + case 1: return cudf::make_min_aggregation(); + case 2: return cudf::make_max_aggregation(); + case 3: return cudf::make_sum_aggregation(); + case 4: return cudf::make_product_aggregation(); + default: CUDF_UNREACHABLE("This should not be reached."); + } + return nullptr; +} + +} // namespace + +using int32s_col = cudf::test::fixed_width_column_wrapper; + +// Number of randomly testing on the input data attributes. +// For each test, a subset of data attributes will be randomly generated from all the possible input +// data attributes. The input data corresponding to that subset passed from libcudf will be tested +// for correctness. +constexpr int NUM_RANDOM_TESTS = 20; + +struct HostUDFTest : cudf::test::BaseFixture {}; + +TEST_F(HostUDFTest, GroupbyAllInput) +{ + bool test_run = false; + auto const keys = int32s_col{0, 1, 2}; + auto const vals = int32s_col{0, 1, 2}; + auto agg = cudf::make_host_udf_aggregation( + std::make_unique(__LINE__, &test_run)); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + [[maybe_unused]] auto const grp_result = + gb_obj.aggregate(requests, cudf::test::get_default_stream()); + EXPECT_TRUE(test_run); +} + +TEST_F(HostUDFTest, GroupbySomeInput) +{ + auto const keys = int32s_col{0, 1, 2}; + auto const vals = int32s_col{0, 1, 2}; + auto const all_attrs = cudf::host_udf_base::data_attribute_set_t{ + cudf::host_udf_base::groupby_data_attribute::INPUT_VALUES, + cudf::host_udf_base::groupby_data_attribute::GROUPED_VALUES, + cudf::host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, + cudf::host_udf_base::groupby_data_attribute::NUM_GROUPS, + cudf::host_udf_base::groupby_data_attribute::GROUP_OFFSETS, + cudf::host_udf_base::groupby_data_attribute::GROUP_LABELS}; + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + bool test_run = false; + auto input_attrs = get_subset(all_attrs); + input_attrs.insert(get_random_agg()); + auto agg = cudf::make_host_udf_aggregation( + std::make_unique(__LINE__, &test_run, std::move(input_attrs))); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + [[maybe_unused]] auto const grp_result = + gb_obj.aggregate(requests, cudf::test::get_default_stream()); + EXPECT_TRUE(test_run); + } +} diff --git a/java/src/main/java/ai/rapids/cudf/Aggregation.java b/java/src/main/java/ai/rapids/cudf/Aggregation.java index 379750bb0b7..2276b223740 100644 --- a/java/src/main/java/ai/rapids/cudf/Aggregation.java +++ b/java/src/main/java/ai/rapids/cudf/Aggregation.java @@ -62,15 +62,16 @@ enum Kind { LAG(23), PTX(24), CUDA(25), - M2(26), - MERGE_M2(27), - RANK(28), - DENSE_RANK(29), - PERCENT_RANK(30), - TDIGEST(31), // This can take a delta argument for accuracy level - MERGE_TDIGEST(32), // This can take a delta argument for accuracy level - HISTOGRAM(33), - MERGE_HISTOGRAM(34); + HOST_UDF(26), + M2(27), + MERGE_M2(28), + RANK(29), + DENSE_RANK(30), + PERCENT_RANK(31), + TDIGEST(32), // This can take a delta argument for accuracy level + MERGE_TDIGEST(33), // This can take a delta argument for accuracy level + HISTOGRAM(34), + MERGE_HISTOGRAM(35); final int nativeId; @@ -385,6 +386,35 @@ public boolean equals(Object other) { } } + static final class HostUDFAggregation extends Aggregation { + private final HostUDFWrapper wrapper; + + private HostUDFAggregation(HostUDFWrapper wrapper) { + super(Kind.HOST_UDF); + this.wrapper = wrapper; + } + + @Override + long createNativeInstance() { + return Aggregation.createHostUDFAgg(wrapper.udfNativeHandle); + } + + @Override + public int hashCode() { + return 31 * kind.hashCode() + wrapper.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (other instanceof HostUDFAggregation) { + return wrapper.equals(((HostUDFAggregation) other).wrapper); + } + return false; + } + } + protected final Kind kind; protected Aggregation(Kind kind) { @@ -837,6 +867,15 @@ static MergeSetsAggregation mergeSets(NullEquality nullEquality, NaNEquality nan return new MergeSetsAggregation(nullEquality, nanEquality); } + /** + * Host UDF aggregation, to execute a host-side user-defined function (UDF). + * @param wrapper The wrapper for the native host UDF instance. + * @return A new HostUDFAggregation instance + */ + static HostUDFAggregation hostUDF(HostUDFWrapper wrapper) { + return new HostUDFAggregation(wrapper); + } + static final class LeadAggregation extends LeadLagAggregation { private LeadAggregation(int offset, ColumnVector defaultOutput) { super(Kind.LEAD, offset, defaultOutput); @@ -990,4 +1029,9 @@ static MergeHistogramAggregation mergeHistogram() { * Create a TDigest aggregation. */ private static native long createTDigestAgg(int kind, int delta); + + /** + * Create a HOST_UDF aggregation. + */ + private static native long createHostUDFAgg(long udfNativeHandle); } diff --git a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java index 0fae33927b6..27966ddfdd4 100644 --- a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java @@ -277,6 +277,15 @@ public static GroupByAggregation mergeSets() { return new GroupByAggregation(Aggregation.mergeSets()); } + /** + * Execute an aggregation using a host-side user-defined function (UDF). + * @param wrapper The wrapper for the native host UDF instance. + * @return A new GroupByAggregation instance + */ + public static GroupByAggregation hostUDF(HostUDFWrapper wrapper) { + return new GroupByAggregation(Aggregation.hostUDF(wrapper)); + } + /** * Merge the partial sets produced by multiple CollectSetAggregations. * diff --git a/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java b/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java new file mode 100644 index 00000000000..0b6ecf2e140 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf; + +/** + * A wrapper around native host UDF aggregations. + *

+ * This class is used to store the native handle of a host UDF aggregation and is used as + * a proxy object to compute hash code and compare two host UDF aggregations for equality. + *

+ * A new host UDF aggregation implementation must extend this class and override the + * {@code hashCode} and {@code equals} methods for such purposes. + */ +public abstract class HostUDFWrapper { + public final long udfNativeHandle; + + public HostUDFWrapper(long udfNativeHandle) { + this.udfNativeHandle = udfNativeHandle; + } +} diff --git a/java/src/main/native/src/AggregationJni.cpp b/java/src/main/native/src/AggregationJni.cpp index c40f1c55500..dd41c677761 100644 --- a/java/src/main/native/src/AggregationJni.cpp +++ b/java/src/main/native/src/AggregationJni.cpp @@ -17,6 +17,7 @@ #include "cudf_jni_apis.hpp" #include +#include extern "C" { @@ -80,25 +81,28 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createNoParamAgg(JNIEnv* // case 23: LAG // case 24: PTX // case 25: CUDA - case 26: // M2 + // case 26: HOST_UDF + case 27: // M2 return cudf::make_m2_aggregation(); - case 27: // MERGE_M2 + case 28: // MERGE_M2 return cudf::make_merge_m2_aggregation(); - case 28: // RANK + case 29: // RANK return cudf::make_rank_aggregation( cudf::rank_method::MIN, {}, cudf::null_policy::INCLUDE); - case 29: // DENSE_RANK + case 30: // DENSE_RANK return cudf::make_rank_aggregation( cudf::rank_method::DENSE, {}, cudf::null_policy::INCLUDE); - case 30: // ANSI SQL PERCENT_RANK + case 31: // ANSI SQL PERCENT_RANK return cudf::make_rank_aggregation(cudf::rank_method::MIN, {}, cudf::null_policy::INCLUDE, {}, cudf::rank_percentage::ONE_NORMALIZED); - case 33: // HISTOGRAM + // case 32: TDIGEST + // case 33: MERGE_TDIGEST + case 34: // HISTOGRAM return cudf::make_histogram_aggregation(); - case 34: // MERGE_HISTOGRAM + case 35: // MERGE_HISTOGRAM return cudf::make_merge_histogram_aggregation(); default: throw std::logic_error("Unsupported No Parameter Aggregation Operation"); @@ -160,10 +164,10 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createTDigestAgg(JNIEnv* std::unique_ptr ret; // These numbers come from Aggregation.java and must stay in sync switch (kind) { - case 31: // TDIGEST + case 32: // TDIGEST ret = cudf::make_tdigest_aggregation(delta); break; - case 32: // MERGE_TDIGEST + case 33: // MERGE_TDIGEST ret = cudf::make_merge_tdigest_aggregation(delta); break; default: throw std::logic_error("Unsupported TDigest Aggregation Operation"); @@ -296,4 +300,18 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createMergeSetsAgg(JNIEn CATCH_STD(env, 0); } +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createHostUDFAgg(JNIEnv* env, + jclass class_object, + jlong udf_native_handle) +{ + JNI_NULL_CHECK(env, udf_native_handle, "udf_native_handle is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const udf_ptr = reinterpret_cast(udf_native_handle); + auto output = cudf::make_host_udf_aggregation(udf_ptr->clone()); + return reinterpret_cast(output.release()); + } + CATCH_STD(env, 0); +} + } // extern "C"