diff --git a/python/pylibcudf/pylibcudf/aggregation.pxd b/python/pylibcudf/pylibcudf/aggregation.pxd index c9ab1eab21c..c487106f23a 100644 --- a/python/pylibcudf/pylibcudf/aggregation.pxd +++ b/python/pylibcudf/pylibcudf/aggregation.pxd @@ -13,6 +13,17 @@ from pylibcudf.libcudf.aggregation cimport ( reduce_aggregation, rolling_aggregation, scan_aggregation, + std_var_aggregation, + quantile_aggregation, + nunique_aggregation, + nth_element_aggregation, + ewma_aggregation, + rank_aggregation, + collect_list_aggregation, + collect_set_aggregation, + udf_aggregation, + correlation_aggregation, + covariance_aggregation, ) from pylibcudf.libcudf.types cimport ( interpolation, @@ -32,6 +43,17 @@ ctypedef groupby_scan_aggregation * gbsa_ptr ctypedef reduce_aggregation * ra_ptr ctypedef scan_aggregation * sa_ptr ctypedef rolling_aggregation * roa_ptr +ctypedef std_var_aggregation * std_var_ptr +ctypedef quantile_aggregation * quantile_ptr +ctypedef nunique_aggregation * nunique_ptr +ctypedef nth_element_aggregation * nth_element_ptr +ctypedef ewma_aggregation * ewma_ptr +ctypedef rank_aggregation * rank_ptr +ctypedef collect_list_aggregation * collect_list_ptr +ctypedef collect_set_aggregation * collect_set_ptr +ctypedef udf_aggregation * udf_ptr +ctypedef correlation_aggregation * correlation_ptr +ctypedef covariance_aggregation * covariance_ptr cdef class Aggregation: diff --git a/python/pylibcudf/pylibcudf/aggregation.pyx b/python/pylibcudf/pylibcudf/aggregation.pyx index 662f76d5c8e..db2af14e7d6 100644 --- a/python/pylibcudf/pylibcudf/aggregation.pyx +++ b/python/pylibcudf/pylibcudf/aggregation.pyx @@ -122,6 +122,108 @@ cdef class Aggregation: def __hash__(self): return dereference(self.c_obj).do_hash() + def __reduce__(self): + cdef std_var_aggregation *std_var_cast + cdef quantile_aggregation *quantile_cast + cdef nunique_aggregation *nunique_cast + cdef nth_element_aggregation *nth_element_cast + cdef ewma_aggregation *ewma_cast + cdef rank_aggregation *rank_cast + cdef collect_list_aggregation *collect_list_cast + cdef collect_set_aggregation *collect_set_cast + cdef udf_aggregation *udf_cast + cdef correlation_aggregation *correlation_cast + cdef covariance_aggregation *covariance_cast + + if self.kind() is Kind.SUM: + return (sum, ()) + elif self.kind() is Kind.PRODUCT: + return (product, ()) + elif self.kind() is Kind.MIN: + return (min, ()) + elif self.kind() is Kind.MAX: + return (max, ()) + elif self.kind() is Kind.COUNT_ALL: + return (count, (null_policy.INCLUDE,)) + elif self.kind() is Kind.COUNT_VALID: + return (count, (null_policy.EXCLUDE,)) + elif self.kind() is Kind.ANY: + return (any, ()) + elif self.kind() is Kind.ALL: + return (all, ()) + elif self.kind() is Kind.SUM_OF_SQUARES: + return (sum_of_squares, ()) + elif self.kind() is Kind.MEAN: + return (mean, ()) + elif self.kind() is Kind.VARIANCE: + std_var_cast = dynamic_cast[std_var_ptr](self.c_obj.get()) + return (variance, (std_var_cast._ddof,)) + elif self.kind() is Kind.STD: + std_var_cast = dynamic_cast[std_var_ptr](self.c_obj.get()) + return (std, (std_var_cast._ddof,)) + elif self.kind() is Kind.MEDIAN: + return (median, ()) + elif self.kind() is Kind.QUANTILE: + quantile_cast = dynamic_cast[quantile_ptr](self.c_obj.get()) + return (quantile, (quantile_cast._quantiles, quantile_cast._interpolation)) + elif self.kind() is Kind.ARGMAX: + return (argmax, ()) + elif self.kind() is Kind.ARGMIN: + return (argmin, ()) + elif self.kind() is Kind.NUNIQUE: + nunique_cast = dynamic_cast[nunique_ptr](self.c_obj.get()) + return (nunique, (nunique_cast._null_handling,)) + elif self.kind() is Kind.NTH_ELEMENT: + nth_element_cast = dynamic_cast[nth_element_ptr](self.c_obj.get()) + return (nth_element, (nth_element_cast._n, nth_element_cast._null_handling)) + elif self.kind() is Kind.EWMA: + ewma_cast = dynamic_cast[ewma_ptr](self.c_obj.get()) + return (ewma, (ewma_cast.center_of_mass, ewma_cast.history)) + elif self.kind() is Kind.RANK: + rank_cast = dynamic_cast[rank_ptr](self.c_obj.get()) + return ( + rank, ( + rank_cast._method, + rank_cast._column_order, + rank_cast._null_handling, + rank_cast._null_precedence, + rank_cast._percentage + ) + ) + elif self.kind() is Kind.COLLECT_LIST: + collect_list_cast = dynamic_cast[collect_list_ptr](self.c_obj.get()) + return (collect_list, (collect_list_cast._null_handling, )) + elif self.kind() is Kind.COLLECT_SET: + collect_set_cast = dynamic_cast[collect_set_ptr](self.c_obj.get()) + return ( + collect_set, ( + collect_set_cast._null_handling, + collect_set_cast._nulls_equal, + collect_set_cast._nans_equal + ) + ) + elif self.kind() in (Kind.CUDA, Kind.PTX): + udf_cast = dynamic_cast[udf_ptr](self.c_obj.get()) + return ( + udf, ( + udf_cast._source.decode("utf-8"), + DataType.from_libcudf(udf_cast._output_type) + ) + ) + elif self.kind() is Kind.CORRELATION: + correlation_cast = dynamic_cast[correlation_ptr](self.c_obj.get()) + return ( + correlation, ( + correlation_cast._type, + correlation_cast._min_periods + ) + ) + elif self.kind() is Kind.COVARIANCE: + covariance_cast = dynamic_cast[covariance_ptr](self.c_obj.get()) + return (covariance, (covariance_cast._min_periods, covariance_cast._ddof)) + else: + raise ValueError("Unsupported kind") + # TODO: Ideally we would include the return type here, but we need to do so # in a way that Sphinx understands (currently have issues due to # https://github.com/cython/cython/issues/5609). diff --git a/python/pylibcudf/pylibcudf/libcudf/aggregation.pxd b/python/pylibcudf/pylibcudf/libcudf/aggregation.pxd index 52d1e572ff3..f102ab4d464 100644 --- a/python/pylibcudf/pylibcudf/libcudf/aggregation.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/aggregation.pxd @@ -48,6 +48,7 @@ cdef extern from "cudf/aggregation.hpp" namespace "cudf" nogil: CUDA CORRELATION COVARIANCE + EWMA cdef cppclass aggregation: Kind kind @@ -70,6 +71,9 @@ cdef extern from "cudf/aggregation.hpp" namespace "cudf" nogil: cdef cppclass scan_aggregation(aggregation): pass + cdef cppclass segmented_reduce_aggregation(aggregation): + pass + cpdef enum class udf_type(bool): CUDA PTX @@ -170,3 +174,66 @@ cdef extern from "cudf/aggregation.hpp" namespace "cudf" nogil: null_policy null_handling, null_order null_precedence, rank_percentage percentage) except +libcudf_exception_handler + +cdef extern from "cudf/detail/aggregation/aggregation.hpp" \ + namespace "cudf::detail" nogil: + + cdef cppclass std_var_aggregation( + rolling_aggregation, + groupby_aggregation, + reduce_aggregation, + segmented_reduce_aggregation + ): + size_type _ddof + + cdef cppclass quantile_aggregation(groupby_aggregation, reduce_aggregation): + vector[double] _quantiles + interpolation _interpolation + + cdef cppclass nunique_aggregation( + groupby_aggregation, reduce_aggregation, segmented_reduce_aggregation + ): + null_policy _null_handling + + cdef cppclass nth_element_aggregation( + groupby_aggregation, reduce_aggregation, rolling_aggregation + ): + size_type _n + null_policy _null_handling + + cdef cppclass ewma_aggregation(scan_aggregation): + double center_of_mass + ewm_history history + + cdef cppclass rank_aggregation( + rolling_aggregation, groupby_scan_aggregation, reduce_aggregation + ): + rank_method _method + order _column_order + null_policy _null_handling + null_order _null_precedence + rank_percentage _percentage + + cdef cppclass collect_list_aggregation( + rolling_aggregation, groupby_aggregation, reduce_aggregation + ): + null_policy _null_handling + + cdef cppclass collect_set_aggregation( + rolling_aggregation, groupby_aggregation, reduce_aggregation + ): + null_policy _null_handling + null_equality _nulls_equal + nan_equality _nans_equal + + cdef cppclass udf_aggregation(rolling_aggregation): + string _source + data_type _output_type + + cdef cppclass correlation_aggregation(groupby_aggregation): + correlation_type _type + size_type _min_periods + + cdef cppclass covariance_aggregation(groupby_aggregation): + size_type _min_periods + size_type _ddof diff --git a/python/pylibcudf/pylibcudf/tests/test_serialize.py b/python/pylibcudf/pylibcudf/tests/test_serialize.py index aaee8ff84d5..930c0f4261e 100644 --- a/python/pylibcudf/pylibcudf/tests/test_serialize.py +++ b/python/pylibcudf/pylibcudf/tests/test_serialize.py @@ -1,9 +1,51 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +import itertools import pickle import pytest from pylibcudf import DataType +from pylibcudf.aggregation import ( + Aggregation, + EWMHistory, + all as agg_all, + any as agg_any, + argmax, + argmin, + collect_list, + collect_set, + correlation, + count, + covariance, + ewma, + max as agg_max, + mean, + median, + min as agg_min, + nth_element, + nunique, + product as product, + quantile, + rank, + std, + sum as agg_sum, + sum_of_squares, + udf, + variance, +) +from pylibcudf.libcudf.aggregation import ( + correlation_type, + rank_method, + rank_percentage, +) +from pylibcudf.libcudf.types import ( + interpolation, + nan_equality, + null_equality, + null_order, + null_policy, + order, +) from pylibcudf.types import TypeId @@ -17,12 +59,158 @@ def dtype(request): return DataType(tid, scale) -def test_reduce(dtype): +def test_datatype_reduce(dtype): (typ, (tid, scale)) = dtype.__reduce__() assert typ is DataType assert tid == dtype.id() assert scale == dtype.scale() -def test_pickle(dtype): +def test_datatype_pickle(dtype): assert dtype == pickle.loads(pickle.dumps(dtype)) + + +null_handling_choices = [ + {"null_handling": null_policy.EXCLUDE}, + {"null_handling": null_policy.INCLUDE}, +] +ddof_choices = [ + {"ddof": 1}, + {"ddof": 5}, +] +interpolation_choices = [ + {"interp": interpolation.LINEAR}, + {"interp": interpolation.LOWER}, + {"interp": interpolation.HIGHER}, + {"interp": interpolation.MIDPOINT}, + {"interp": interpolation.NEAREST}, +] +center_of_mass_choices = [ + {"center_of_mass": 1.0}, + {"center_of_mass": 12.34}, +] +ewh_history_choices = [ + {"history": EWMHistory.FINITE}, + {"history": EWMHistory.INFINITE}, +] +ewma_kwargs_choices = [ + d1 | d2 + for d1, d2 in itertools.product( + *[center_of_mass_choices, ewh_history_choices] + ) +] +column_order_choices = [ + {"column_order": order.ASCENDING}, + {"column_order": order.DESCENDING}, +] +null_precedence_choices = [ + {"null_precedence": null_order.AFTER}, + {"null_precedence": null_order.BEFORE}, +] +percentage_choices = [ + {"percentage": rank_percentage.NONE}, + {"percentage": rank_percentage.ZERO_NORMALIZED}, + {"percentage": rank_percentage.ONE_NORMALIZED}, +] +rank_method_choices = [ + [rank_method.FIRST], + [rank_method.AVERAGE], + [rank_method.MIN], + [rank_method.MAX], + [rank_method.DENSE], +] +rank_kwargs_choices = [ + d1 | d2 | d3 | d4 + for d1, d2, d3, d4 in itertools.product( + *[ + column_order_choices, + null_handling_choices, + null_precedence_choices, + percentage_choices, + ] + ) +] +nulls_equal_choices = [ + {"nulls_equal": null_equality.EQUAL}, + {"nulls_equal": null_equality.UNEQUAL}, +] +nans_equal_choices = [ + {"nans_equal": nan_equality.ALL_EQUAL}, + {"nans_equal": nan_equality.UNEQUAL}, +] +collect_set_kwargs_choices = [ + d1 | d2 | d3 + for d1, d2, d3 in itertools.product( + *[null_handling_choices, nulls_equal_choices, nans_equal_choices] + ) +] +# count_choices = itertools.product([count], [[]], [{}, *null_handling_choices]) +# print(count_choices) + + +@pytest.fixture( + params=[ + (agg_sum, [], {}), + (agg_min, [], {}), + (agg_max, [], {}), + (product, [], {}), + *itertools.product([count], [[]], [{}, *null_handling_choices]), + (agg_any, [], {}), + (agg_all, [], {}), + (sum_of_squares, [], {}), + (mean, [], {}), + *itertools.product([variance], [[]], [{}, *ddof_choices]), + *itertools.product([std], [[]], [{}, *ddof_choices]), + (median, [], {}), + *itertools.product( + [quantile], + [[[0.1, 0.9]], [[0.25, 0.5, 0.75]]], + [{}, *interpolation_choices], + ), + (argmax, [], {}), + (argmin, [], {}), + *itertools.product([nunique], [[]], [{}, *null_handling_choices]), + *itertools.product( + [nth_element], [[0], [5]], [{}, *null_handling_choices] + ), + *itertools.product([ewma], [[]], ewma_kwargs_choices), + *itertools.product([rank], rank_method_choices, rank_kwargs_choices), + *itertools.product([collect_list], [[]], [{}, *null_handling_choices]), + *itertools.product( + [collect_set], [[]], [{}, *collect_set_kwargs_choices] + ), + *itertools.product( + [udf], + itertools.product(["x = 1"], [DataType(tid, 0) for tid in TypeId]), + [{}], + ), + *itertools.product( + [correlation], + itertools.product( + [ + correlation_type.PEARSON, + correlation_type.KENDALL, + correlation_type.SPEARMAN, + ], + [2, 3], + ), + [{}], + ), + *itertools.product( + [covariance], itertools.product([2, 3], [0, 5]), [{}] + ), + ] +) +def aggregation(request): + function, args, kwargs = request.param + return function(*args, **kwargs) + + +def test_aggregation_reduce(request, aggregation): + (agg, args) = aggregation.__reduce__() + assert type(aggregation) is Aggregation + assert agg(*args).kind() == aggregation.kind() + + +def test_aggregation_pickle(aggregation): + assert aggregation == pickle.loads(pickle.dumps(aggregation))