diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index f5f514d26d9..7bce858c168 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -16,11 +16,20 @@ #pragma once +#include #include #include +#include +#include + +#include #include #include +#include +#include +#include +#include #include /** @@ -70,6 +79,11 @@ enum class rank_percentage : int32_t { ONE_NORMALIZED ///< (rank - 1) / (count - 1) }; +/** + * @brief Enum to describe scan operation type. + */ +enum class scan_type : bool { INCLUSIVE, EXCLUSIVE }; + /** * @brief Abstract base class for specifying the desired aggregation in an * `aggregation_request`. @@ -110,8 +124,9 @@ class aggregation { COLLECT_SET, ///< collect values into a list without duplicate entries LEAD, ///< window function, accesses row at specified offset following current row LAG, ///< window function, accesses row at specified offset preceding current row - PTX, ///< PTX UDF based reduction - CUDA, ///< CUDA UDF based reduction + PTX, ///< PTX based UDF aggregation + CUDA, ///< CUDA based UDF aggregation + HOST_UDF, ///< host based UDF aggregation MERGE_LISTS, ///< merge multiple lists values into one list MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries MERGE_M2, ///< merge partial values of M2 aggregation, @@ -120,7 +135,7 @@ class aggregation { TDIGEST, ///< create a tdigest from a set of input values MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together HISTOGRAM, ///< compute frequency of each element - MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation, + MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation }; aggregation() = delete; @@ -599,6 +614,271 @@ std::unique_ptr make_udf_aggregation(udf_type type, std::string const& user_defined_aggregator, data_type output_type); +/** + * @brief The interface for host-based UDF implementation. + * + * An implementation of host-based UDF needs to be derived from this base class, defining + * its own version of the required functions. + */ +struct host_udf_base { + host_udf_base() = default; + virtual ~host_udf_base() = default; + + /** + * @brief Define the possible data needed for reduction. + */ + enum class reduction_data_attribute : int32_t { + INPUT_VALUES, ///< The input values column + OUTPUT_DTYPE, ///< Data type for the output result + INIT_VALUE ///< Initial value + }; + + /** + * @brief Define the possible data needed for segmented reduction. + */ + enum class segmented_reduction_data_attribute : int32_t { + INPUT_VALUES, ///< The input values column + OUTPUT_DTYPE, ///< Data type for the output result + INIT_VALUE, ///< Initial value + NULL_POLICY, ///< To control null handling + OFFSETS ///< The offsets defining segments + }; + + /** + * @brief Define the possible data needed for groupby aggregations. + * + * Note that only sort-based groupby aggregations are supported. + */ + enum class groupby_data_attribute : int32_t { + INPUT_VALUES, ///< The input values column + GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the + ///< values within each group maintain their original order + SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and + ///< sorted within each group + GROUP_OFFSETS, ///< The offsets separating groups + GROUP_LABELS ///< Group labels (which is also the same as group indices) + }; + + /** + * @brief Describe possible data that may be needed in the derived class for its operations. + * + * Such data can be either intermediate data such as sorted values or group labels etc, or the + * results of other aggregations. + * + * Each derived host-based UDF class may need a different set of data. It is inefficient to + * evaluate and pass down all these possible data at once from libcudf. A solution for that is, + * the derived class can define a subset of data that it needs and libcudf will evaluate + * and pass down only data requested from that set. + */ + struct data_attribute { + /** + * @brief Hold all possible data types for the input of the derived class. + */ + using value_type = std::variant>; + value_type value; ///< The actual data attribute, wrapped by this struct + ///< as a wrapper is needed to define hash and equal_to functors. + + data_attribute() = default; ///< Default constructor + data_attribute(data_attribute&&) = default; ///< Move constructor + + /** + * @brief Construct a new data attribute from an aggregation attribute. + * @param value_ An aggregation attribute + */ + template || + std::is_same_v || + std::is_same_v)> + data_attribute(T value_) : value{value_} + { + } + + /** + * @brief Construct a new data attribute from another aggregation request. + * @param value_ An aggregation request + */ + template || + std::is_same_v)> + data_attribute(std::unique_ptr value_) : value{std::move(value_)} + { + if constexpr (std::is_same_v) { + CUDF_EXPECTS( + dynamic_cast(std::get>(value).get()) != nullptr, + "Requesting results from other aggregations is only supported in groupby " + "aggregations."); + } + CUDF_EXPECTS(std::get>(value) != nullptr, + "Invalid aggregation request."); + } + + /** + * @brief Copy constructor. + * @param other The other data attribute to copy from + */ + data_attribute(data_attribute const& other) + : value{std::visit( + cudf::detail::visitor_overload{ + [](auto const& val) { return value_type{val}; }, + [](std::unique_ptr const& val) { return value_type{val->clone()}; }}, + other.value)} + { + } + + /** + * @brief Hash functor for `data_attribute`. + */ + struct hash { + /** + * @brief Compute the hash value of a data attribute. + * @param attr The data attribute to hash + * @return The hash value of the data attribute + */ + std::size_t operator()(data_attribute const& attr) const + { + auto const& value = attr.value; + auto const hash_value = + std::visit(cudf::detail::visitor_overload{ + [](auto const& val) { return std::hash{}(static_cast(val)); }, + [](std::unique_ptr const& val) { return val->do_hash(); }}, + value); + return std::hash{}(value.index()) ^ hash_value; + } + }; // struct hash + + /** + * @brief Equality comparison functor for `data_attribute`. + */ + struct equal_to { + /** + * @brief Check if two data attributes are equal. + * @param lhs The left-hand side data attribute + * @param rhs The right-hand side data attribute + * @return True if the two data attributes are equal + */ + bool operator()(data_attribute const& lhs, data_attribute const& rhs) const + { + auto const& lhs_val = lhs.value; + auto const& rhs_val = rhs.value; + if (lhs_val.index() != rhs_val.index()) { return false; } + return std::visit(cudf::detail::visitor_overload{ + [](auto const& lhs_val, auto const& rhs_val) { + if constexpr (std::is_same_v) { + return lhs_val == rhs_val; + } + return false; + }, + [](std::unique_ptr const& lhs_val, + std::unique_ptr const& rhs_val) { + return lhs_val->is_equal(*rhs_val); + }}, + lhs_val, + rhs_val); + } + }; // struct equal_to + }; // struct data_attribute + + /** + * @brief Set of attributes for the input data that is needed for computing the aggregation. + */ + using input_data_attributes = + std::unordered_set; + + /** + * @brief Return a set of attributes for the data that is needed for computing the aggregation. + * + * If this function is not overridden, all the data attributes (except results from other + * aggregations in groupby) are assumed to be needed. + * + * @return A set of `data_attribute`. + */ + [[nodiscard]] virtual input_data_attributes get_required_data() const { return {}; } + + /** + * @brief Hold all possible types of the data that is passed to the derived class for executing + * the aggregation. + */ + using input_data_type = std::variant>, + null_policy, + device_span>; + + /** + * @brief Input to the aggregation, mapping from each data attribute to its actual data. + */ + using host_udf_input = std:: + unordered_map; + + /** + * @brief Output type of the aggregation. It can be either a scalar (for reduction) or a column + * (for segmented reduction or groupby aggregations). + */ + using output_type = std::variant, std::unique_ptr>; + + /** + * @brief Get the output when the input values column is empty. + * + * This is called in libcudf when the input values column is empty. In such situations libcudf + * tries to generate the output directly without unnecessarily evaluating the intermediate data. + * + * @param output_dtype The expected output data type for reduction (if specified) + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation when input values is empty + */ + [[nodiscard]] virtual output_type get_empty_output(std::optional output_dtype, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; + + /** + * @brief Perform the main computation for the host-based UDF. + * + * @param input The input data needed for performing all computation + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation + */ + [[nodiscard]] virtual output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; + + /** + * @brief Computes hash value of the derived class's instance. + * @return The hash value of the instance + */ + [[nodiscard]] virtual std::size_t do_hash() const = 0; + + /** + * @brief Compares two instances of the derived class for equality. + * @param other The other derived class's instance to compare with + * @return True if the two instances are equal + */ + [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0; + + /** + * @brief Clones the instance. + * + * A class derived from `host_udf_base` should not store too much data such that its instances + * remain lightweight for efficient cloning. + * + * @return A new instance cloned from this + */ + [[nodiscard]] virtual std::unique_ptr clone() const = 0; +}; + +/** + * @brief Factory to create a HOST_UDF aggregation + * + * @param host_udf An instance of a class derived from `host_udf_base` to perform aggregation + * @return A HOST_UDF aggregation object + */ +template +std::unique_ptr make_host_udf_aggregation(std::unique_ptr host_udf); + /** * @brief Factory to create a MERGE_LISTS aggregation. * diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index 6661a461b8b..f9de5c455b0 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -88,6 +89,8 @@ class simple_aggregations_collector { // Declares the interface for the simple class lead_lag_aggregation const& agg); virtual std::vector> visit(data_type col_type, class udf_aggregation const& agg); + virtual std::vector> visit(data_type col_type, + class host_udf_aggregation const& agg); virtual std::vector> visit(data_type col_type, class merge_lists_aggregation const& agg); virtual std::vector> visit(data_type col_type, @@ -135,6 +138,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer virtual void visit(class collect_set_aggregation const& agg); virtual void visit(class lead_lag_aggregation const& agg); virtual void visit(class udf_aggregation const& agg); + virtual void visit(class host_udf_aggregation const& agg); virtual void visit(class merge_lists_aggregation const& agg); virtual void visit(class merge_sets_aggregation const& agg); virtual void visit(class merge_m2_aggregation const& agg); @@ -960,6 +964,49 @@ class udf_aggregation final : public rolling_aggregation { } }; +/** + * @brief Derived class for specifying a custom aggregation specified in host-based UDF. + */ +class host_udf_aggregation final : public groupby_aggregation, + public reduce_aggregation, + public segmented_reduce_aggregation { + public: + std::unique_ptr udf_ptr; + + host_udf_aggregation() = delete; + host_udf_aggregation(host_udf_aggregation const&) = delete; + + explicit host_udf_aggregation(std::unique_ptr udf_ptr_) + : aggregation{HOST_UDF}, udf_ptr{std::move(udf_ptr_)} + { + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid host-based UDF instance."); + } + + [[nodiscard]] bool is_equal(aggregation const& _other) const override + { + if (!this->aggregation::is_equal(_other)) { return false; } + auto const& other = dynamic_cast(_other); + return udf_ptr->is_equal(*other.udf_ptr); + } + + [[nodiscard]] size_t do_hash() const override + { + return this->aggregation::do_hash() ^ udf_ptr->do_hash(); + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(udf_ptr->clone()); + } + + std::vector> get_simple_aggregations( + data_type col_type, simple_aggregations_collector& collector) const override + { + return collector.visit(col_type, *this); + } + void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } +}; + /** * @brief Derived aggregation class for specifying MERGE_LISTS aggregation */ @@ -1462,6 +1509,12 @@ struct target_type_impl +struct target_type_impl { + // Just a placeholder. The actual return type is unknown. + using type = struct_view; +}; + /** * @brief Helper alias to get the accumulator type for performing aggregation * `k` on elements of type `Source` @@ -1579,6 +1632,8 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind return f.template operator()(std::forward(args)...); case aggregation::EWMA: return f.template operator()(std::forward(args)...); + case aggregation::HOST_UDF: + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Unsupported aggregation."); diff --git a/cpp/include/cudf/reduction.hpp b/cpp/include/cudf/reduction.hpp index 41be2e70cc3..d72f85d7e28 100644 --- a/cpp/include/cudf/reduction.hpp +++ b/cpp/include/cudf/reduction.hpp @@ -30,11 +30,6 @@ namespace CUDF_EXPORT cudf { * @file */ -/** - * @brief Enum to describe scan operation type - */ -enum class scan_type : bool { INCLUSIVE, EXCLUSIVE }; - /** * @brief Computes the reduction of the values in all rows of a column. * diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index a60a7f63882..e70724ab2f0 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -237,6 +237,12 @@ std::vector> simple_aggregations_collector::visit( return visit(col_type, static_cast(agg)); } +std::vector> simple_aggregations_collector::visit( + data_type col_type, host_udf_aggregation const& agg) +{ + return visit(col_type, static_cast(agg)); +} + // aggregation_finalizer ---------------------------------------- void aggregation_finalizer::visit(aggregation const& agg) {} @@ -410,6 +416,11 @@ void aggregation_finalizer::visit(merge_tdigest_aggregation const& agg) visit(static_cast(agg)); } +void aggregation_finalizer::visit(host_udf_aggregation const& agg) +{ + visit(static_cast(agg)); +} + } // namespace detail std::vector> aggregation::get_simple_aggregations( @@ -917,6 +928,20 @@ make_merge_tdigest_aggregation(int max_centroids); template CUDF_EXPORT std::unique_ptr make_merge_tdigest_aggregation(int max_centroids); +template +std::unique_ptr make_host_udf_aggregation(std::unique_ptr udf_ptr_) +{ + return std::make_unique(std::move(udf_ptr_)); +} +template CUDF_EXPORT std::unique_ptr make_host_udf_aggregation( + std::unique_ptr); +template CUDF_EXPORT std::unique_ptr + make_host_udf_aggregation(std::unique_ptr); +template CUDF_EXPORT std::unique_ptr + make_host_udf_aggregation(std::unique_ptr); +template CUDF_EXPORT std::unique_ptr + make_host_udf_aggregation(std::unique_ptr); + namespace detail { namespace { struct target_type_functor { diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu index c42038026e5..904626b1cce 100644 --- a/cpp/src/groupby/groupby.cu +++ b/cpp/src/groupby/groupby.cu @@ -99,6 +99,8 @@ namespace { struct empty_column_constructor { column_view values; aggregation const& agg; + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; template std::unique_ptr operator()() const @@ -140,31 +142,41 @@ struct empty_column_constructor { return empty_like(values); } + if constexpr (k == aggregation::Kind::HOST_UDF) { + auto const& udf_ptr = dynamic_cast(agg).udf_ptr; + return std::get>(udf_ptr->get_empty_output(std::nullopt, stream, mr)); + } + return make_empty_column(target_type(values.type(), k)); } }; /// Make an empty table with appropriate types for requested aggs template -auto empty_results(host_span requests) +auto empty_results(host_span requests, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { std::vector empty_results; - std::transform( - requests.begin(), requests.end(), std::back_inserter(empty_results), [](auto const& request) { - std::vector> results; - - std::transform( - request.aggregations.begin(), - request.aggregations.end(), - std::back_inserter(results), - [&request](auto const& agg) { - return cudf::detail::dispatch_type_and_aggregation( - request.values.type(), agg->kind, empty_column_constructor{request.values, *agg}); - }); - - return aggregation_result{std::move(results)}; - }); + std::transform(requests.begin(), + requests.end(), + std::back_inserter(empty_results), + [stream, mr](auto const& request) { + std::vector> results; + + std::transform(request.aggregations.begin(), + request.aggregations.end(), + std::back_inserter(results), + [&request, stream, mr](auto const& agg) { + return cudf::detail::dispatch_type_and_aggregation( + request.values.type(), + agg->kind, + empty_column_constructor{request.values, *agg, stream, mr}); + }); + + return aggregation_result{std::move(results)}; + }); return empty_results; } @@ -206,7 +218,7 @@ std::pair, std::vector> groupby::aggr verify_valid_requests(requests); - if (_keys.num_rows() == 0) { return {empty_like(_keys), empty_results(requests)}; } + if (_keys.num_rows() == 0) { return {empty_like(_keys), empty_results(requests, stream, mr)}; } return dispatch_aggregation(requests, stream, mr); } @@ -226,7 +238,9 @@ std::pair, std::vector> groupby::scan verify_valid_requests(requests); - if (_keys.num_rows() == 0) { return std::pair(empty_like(_keys), empty_results(requests)); } + if (_keys.num_rows() == 0) { + return std::pair(empty_like(_keys), empty_results(requests, cudf::get_default_stream(), mr)); + } return sort_scan(requests, stream, mr); } diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index 7a8a1883ed4..767edfd1ba9 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -795,6 +795,60 @@ void aggregate_result_functor::operator()(aggregatio mr)); } +template <> +void aggregate_result_functor::operator()(aggregation const& agg) +{ + if (cache.has_result(values, agg)) { return; } + + auto const& udf_ptr = dynamic_cast(agg).udf_ptr; + auto data_attrs = udf_ptr->get_required_data(); + if (data_attrs.empty()) { // empty means everything + data_attrs = {host_udf_base::groupby_data_attribute::INPUT_VALUES, + host_udf_base::groupby_data_attribute::GROUPED_VALUES, + host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, + host_udf_base::groupby_data_attribute::GROUP_OFFSETS, + host_udf_base::groupby_data_attribute::GROUP_LABELS}; + } + + // Do not cache udf_input, as the actual input data may change from run to run. + host_udf_base::host_udf_input udf_input; + for (auto const& attr : data_attrs) { + CUDF_EXPECTS(std::holds_alternative(attr.value) || + std::holds_alternative>(attr.value), + "Invalid input data attribute for HOST_UDF groupby aggregation."); + if (std::holds_alternative(attr.value)) { + switch (std::get(attr.value)) { + case host_udf_base::groupby_data_attribute::INPUT_VALUES: + udf_input.emplace(attr, values); + break; + case host_udf_base::groupby_data_attribute::GROUPED_VALUES: + udf_input.emplace(attr, get_grouped_values()); + break; + case host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES: + udf_input.emplace(attr, get_sorted_values()); + break; + case host_udf_base::groupby_data_attribute::GROUP_OFFSETS: + udf_input.emplace(attr, helper.group_offsets(stream)); + break; + case host_udf_base::groupby_data_attribute::GROUP_LABELS: + udf_input.emplace(attr, helper.group_labels(stream)); + break; + default: CUDF_UNREACHABLE("Invalid input data attribute for HOST_UDF groupby aggregation."); + } + } else { // data is result from another aggregation + auto other_agg = std::get>(attr.value)->clone(); + cudf::detail::aggregation_dispatcher(other_agg->kind, *this, *other_agg); + auto result = cache.get_result(values, *other_agg); + udf_input.emplace(std::move(other_agg), std::move(result)); + } + } + + auto output = (*udf_ptr)(udf_input, stream, mr); + CUDF_EXPECTS(std::holds_alternative>(output), + "Invalid output type from HOST_UDF groupby aggregation."); + cache.add_result(values, agg, std::get>(std::move(output))); +} + } // namespace detail // Sort-based groupby diff --git a/cpp/src/reductions/reductions.cpp b/cpp/src/reductions/reductions.cpp index 75ebc078930..e7f5ca19d2c 100644 --- a/cpp/src/reductions/reductions.cpp +++ b/cpp/src/reductions/reductions.cpp @@ -144,6 +144,38 @@ struct reduce_dispatch_functor { auto td_agg = static_cast(agg); return tdigest::detail::reduce_merge_tdigest(col, td_agg.max_centroids, stream, mr); } + case aggregation::HOST_UDF: { + auto const& udf_ptr = dynamic_cast(agg).udf_ptr; + auto data_attrs = udf_ptr->get_required_data(); + if (data_attrs.empty()) { // empty means everything + data_attrs = {host_udf_base::reduction_data_attribute::INPUT_VALUES, + host_udf_base::reduction_data_attribute::OUTPUT_DTYPE, + host_udf_base::reduction_data_attribute::INIT_VALUE}; + } + + // Do not cache udf_input, as the actual input data may change from run to run. + host_udf_base::host_udf_input udf_input; + for (auto const& attr : data_attrs) { + CUDF_EXPECTS(std::holds_alternative(attr.value), + "Invalid input data attribute for HOST_UDF reduction."); + switch (std::get(attr.value)) { + case host_udf_base::reduction_data_attribute::INPUT_VALUES: + udf_input.emplace(attr, col); + break; + case host_udf_base::reduction_data_attribute::OUTPUT_DTYPE: + udf_input.emplace(attr, output_dtype); + break; + case host_udf_base::reduction_data_attribute::INIT_VALUE: + udf_input.emplace(attr, init); + break; + default: CUDF_UNREACHABLE("Invalid input data attribute for HOST_UDF reduction."); + } + } + auto output = (*udf_ptr)(udf_input, stream, mr); + CUDF_EXPECTS(std::holds_alternative>(output), + "Invalid output type from HOST_UDF reduction."); + return std::get>(std::move(output)); + } // case aggregation::HOST_UDF default: CUDF_FAIL("Unsupported reduction operator"); } } @@ -161,9 +193,11 @@ std::unique_ptr reduce(column_view const& col, cudf::data_type_error); if (init.has_value() && !(agg.kind == aggregation::SUM || agg.kind == aggregation::PRODUCT || agg.kind == aggregation::MIN || agg.kind == aggregation::MAX || - agg.kind == aggregation::ANY || agg.kind == aggregation::ALL)) { + agg.kind == aggregation::ANY || agg.kind == aggregation::ALL || + agg.kind == aggregation::HOST_UDF)) { CUDF_FAIL( - "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, and ALL aggregation types"); + "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, ALL, and HOST_UDF " + "aggregation types"); } // Returns default scalar if input column is empty or all null diff --git a/cpp/src/reductions/segmented/reductions.cpp b/cpp/src/reductions/segmented/reductions.cpp index 1c3a2b0c0f3..cde3faa9b47 100644 --- a/cpp/src/reductions/segmented/reductions.cpp +++ b/cpp/src/reductions/segmented/reductions.cpp @@ -98,6 +98,48 @@ struct segmented_reduce_dispatch_functor { } case segmented_reduce_aggregation::NUNIQUE: return segmented_nunique(col, offsets, null_handling, stream, mr); + case aggregation::HOST_UDF: { + auto const& udf_ptr = dynamic_cast(agg).udf_ptr; + auto data_attrs = udf_ptr->get_required_data(); + if (data_attrs.empty()) { // empty means everything + data_attrs = {host_udf_base::segmented_reduction_data_attribute::INPUT_VALUES, + host_udf_base::segmented_reduction_data_attribute::OUTPUT_DTYPE, + host_udf_base::segmented_reduction_data_attribute::INIT_VALUE, + host_udf_base::segmented_reduction_data_attribute::NULL_POLICY, + host_udf_base::segmented_reduction_data_attribute::OFFSETS}; + } + + // Do not cache udf_input, as the actual input data may change from run to run. + host_udf_base::host_udf_input udf_input; + for (auto const& attr : data_attrs) { + CUDF_EXPECTS( + std::holds_alternative(attr.value), + "Invalid input data attribute for HOST_UDF segmented reduction."); + switch (std::get(attr.value)) { + case host_udf_base::segmented_reduction_data_attribute::INPUT_VALUES: + udf_input.emplace(attr, col); + break; + case host_udf_base::segmented_reduction_data_attribute::OUTPUT_DTYPE: + udf_input.emplace(attr, output_dtype); + break; + case host_udf_base::segmented_reduction_data_attribute::INIT_VALUE: + udf_input.emplace(attr, init); + break; + case host_udf_base::segmented_reduction_data_attribute::NULL_POLICY: + udf_input.emplace(attr, null_handling); + break; + case host_udf_base::segmented_reduction_data_attribute::OFFSETS: + udf_input.emplace(attr, offsets); + break; + default: + CUDF_UNREACHABLE("Invalid input data attribute for HOST_UDF segmented reduction."); + } + } + auto output = (*udf_ptr)(udf_input, stream, mr); + CUDF_EXPECTS(std::holds_alternative>(output), + "Invalid output type from HOST_UDF segmented reduction."); + return std::get>(std::move(output)); + } // case aggregation::HOST_UDF default: CUDF_FAIL("Unsupported aggregation type."); } } @@ -117,9 +159,11 @@ std::unique_ptr segmented_reduce(column_view const& segmented_values, cudf::data_type_error); if (init.has_value() && !(agg.kind == aggregation::SUM || agg.kind == aggregation::PRODUCT || agg.kind == aggregation::MIN || agg.kind == aggregation::MAX || - agg.kind == aggregation::ANY || agg.kind == aggregation::ALL)) { + agg.kind == aggregation::ANY || agg.kind == aggregation::ALL || + agg.kind == aggregation::HOST_UDF)) { CUDF_FAIL( - "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, and ALL aggregation types"); + "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, ALL, and HOST_UDF " + "aggregation types"); } if (segmented_values.is_empty() && offsets.empty()) { diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index adf512811cc..e5c29314203 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -132,6 +132,8 @@ ConfigureTest( groupby/groupby_test_util.cpp groupby/groups_tests.cpp groupby/histogram_tests.cpp + groupby/host_udf_example_tests.cu + groupby/host_udf_tests.cpp groupby/keys_tests.cpp groupby/lists_tests.cpp groupby/m2_tests.cpp diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu new file mode 100644 index 00000000000..729f1b77756 --- /dev/null +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -0,0 +1,571 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace { +/** + * @brief A host-based UDF implementation. + * + * The aggregations perform the following computation: + * - For reduction: compute `sum(value^2, for value in group)` (this is sum of squared). + * - For segmented reduction: compute `segment_size * sum(value^2, for value in group)`. + * - For groupby: compute `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`. + * + * In addition, for segmented reduction, if null_policy is set to `INCLUDE`, the null values are + * replaced with an initial value if it is provided. + */ +template +struct test_udf_simple_type : cudf::host_udf_base { + static_assert(std::is_same_v || + std::is_same_v || + std::is_same_v); + + test_udf_simple_type() = default; + + [[nodiscard]] input_data_attributes get_required_data() const override + { + if constexpr (std::is_same_v || + std::is_same_v) { + // Empty set, which means we need everything. + return {}; + } else { + // We need grouped values, group offsets, group labels, and also results from groups' + // MAX and SUM aggregations. + return {groupby_data_attribute::GROUPED_VALUES, + groupby_data_attribute::GROUP_OFFSETS, + groupby_data_attribute::GROUP_LABELS, + cudf::make_max_aggregation(), + cudf::make_sum_aggregation()}; + } + } + + [[nodiscard]] output_type get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + if constexpr (std::is_same_v) { + CUDF_EXPECTS(output_dtype.has_value(), + "Data type for the reduction result must be specified."); + return cudf::make_default_constructed_scalar(output_dtype.value(), stream, mr); + } else if constexpr (std::is_same_v) { + CUDF_EXPECTS(output_dtype.has_value(), + "Data type for the reduction result must be specified."); + return cudf::make_empty_column(output_dtype.value()); + } else { + return cudf::make_empty_column( + cudf::data_type{cudf::type_to_id()}); + } + } + + [[nodiscard]] output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + if constexpr (std::is_same_v) { + auto const& values = + std::get(input.at(reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(reduction_data_attribute::OUTPUT_DTYPE)); + return cudf::double_type_dispatcher( + values.type(), output_dtype, reduce_fn{this}, input, stream, mr); + } else if constexpr (std::is_same_v) { + auto const& values = + std::get(input.at(segmented_reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(segmented_reduction_data_attribute::OUTPUT_DTYPE)); + return cudf::double_type_dispatcher( + values.type(), output_dtype, segmented_reduce_fn{this}, input, stream, mr); + } else { + auto const& values = + std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); + return cudf::type_dispatcher(values.type(), groupby_fn{this}, input, stream, mr); + } + } + + [[nodiscard]] std::size_t do_hash() const override + { + // Just return the same hash for all instances of this class. + return std::size_t{12345}; + } + + [[nodiscard]] bool is_equal(host_udf_base const& other) const override + { + // Just check if the other object is also instance of this class. + return dynamic_cast(&other) != nullptr; + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(); + } + + // For quick compilation, we only instantiate a few input/output types. + template + static constexpr bool is_valid_input_t() + { + return std::is_same_v; + } + + // For quick compilation, we only instantiate a few input/output types. + template + static constexpr bool is_valid_output_t() + { + return std::is_same_v; + } + + struct reduce_fn { + // Store pointer to the parent class so we can call its functions. + test_udf_simple_type const* parent; + + template () || !is_valid_output_t())> + output_type operator()(Args...) const + { + CUDF_FAIL("Unsupported input/output type."); + } + + template () && is_valid_output_t())> + output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + auto const& values = + std::get(input.at(reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(reduction_data_attribute::OUTPUT_DTYPE)); + if (values.size() == 0) { return parent->get_empty_output(output_dtype, stream, mr); } + + auto const input_init_value = + std::get>>( + input.at(reduction_data_attribute::INIT_VALUE)); + auto const init_value = [&]() -> InputType { + if (input_init_value.has_value() && input_init_value.value().get().is_valid(stream)) { + auto const numeric_init_scalar = + dynamic_cast const*>(&input_init_value.value().get()); + CUDF_EXPECTS(numeric_init_scalar != nullptr, "Invalid init scalar for reduction."); + return numeric_init_scalar->value(stream); + } + return InputType{0}; + }(); + + auto const values_dv_ptr = cudf::column_device_view::create(values, stream); + auto const result = + thrust::transform_reduce(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(values.size()), + transform_fn{*values_dv_ptr}, + static_cast(init_value), + thrust::plus<>{}); + + auto output = cudf::make_numeric_scalar(output_dtype, stream, mr); + static_cast*>(output.get())->set_value(result, stream); + return output; + } + + template + struct transform_fn { + cudf::column_device_view values; + OutputType __device__ operator()(cudf::size_type idx) const + { + if (values.is_null(idx)) { return OutputType{0}; } + auto const val = static_cast(values.element(idx)); + return val * val; + } + }; + }; + + struct segmented_reduce_fn { + // Store pointer to the parent class so we can call its functions. + test_udf_simple_type const* parent; + + template () || !is_valid_output_t())> + output_type operator()(Args...) const + { + CUDF_FAIL("Unsupported input/output type."); + } + + template () && is_valid_output_t())> + output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + auto const& values = + std::get(input.at(segmented_reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(segmented_reduction_data_attribute::OUTPUT_DTYPE)); + auto const offsets = std::get>( + input.at(segmented_reduction_data_attribute::OFFSETS)); + CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets."); + auto const num_segments = static_cast(offsets.size()) - 1; + + if (values.size() == 0) { + if (num_segments <= 0) { return parent->get_empty_output(output_dtype, stream, mr); } + return cudf::make_numeric_column( + output_dtype, num_segments, cudf::mask_state::ALL_NULL, stream, mr); + } + + auto const input_init_value = + std::get>>( + input.at(segmented_reduction_data_attribute::INIT_VALUE)); + auto const init_value = [&]() -> InputType { + if (input_init_value.has_value() && input_init_value.value().get().is_valid(stream)) { + auto const numeric_init_scalar = + dynamic_cast const*>(&input_init_value.value().get()); + CUDF_EXPECTS(numeric_init_scalar != nullptr, "Invalid init scalar for reduction."); + return numeric_init_scalar->value(stream); + } + return InputType{0}; + }(); + + auto const null_handling = + std::get(input.at(segmented_reduction_data_attribute::NULL_POLICY)); + auto const values_dv_ptr = cudf::column_device_view::create(values, stream); + auto output = cudf::make_numeric_column( + output_dtype, num_segments, cudf::mask_state::UNALLOCATED, stream); + rmm::device_uvector validity(num_segments, stream); + + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_segments), + thrust::make_zip_iterator(output->mutable_view().begin(), validity.begin()), + transform_fn{ + *values_dv_ptr, offsets, static_cast(init_value), null_handling}); + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity<>{}, stream, mr); + if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } + return output; + } + + template + struct transform_fn { + cudf::column_device_view values; + cudf::device_span offsets; + OutputType init_value; + cudf::null_policy null_handling; + + thrust::tuple __device__ operator()(cudf::size_type idx) const + { + auto const start = offsets[idx]; + auto const end = offsets[idx + 1]; + if (start == end) { return {OutputType{0}, false}; } + + auto sum = init_value; + for (auto i = start; i < end; ++i) { + if (values.is_null(i)) { + if (null_handling == cudf::null_policy::INCLUDE) { sum += init_value * init_value; } + continue; + } + auto const val = static_cast(values.element(i)); + sum += val * val; + } + auto const segment_size = end - start; + return {static_cast(segment_size) * sum, true}; + } + }; + }; + + struct groupby_fn { + // Store pointer to the parent class so we can call its functions. + test_udf_simple_type const* parent; + using OutputType = double; + template + using MaxType = cudf::detail::target_type_t; + template + using SumType = cudf::detail::target_type_t; + + template ())> + output_type operator()(Args...) const + { + CUDF_FAIL("Unsupported input/output type."); + } + + template ())> + output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + auto const& values = + std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); + if (values.size() == 0) { return parent->get_empty_output(std::nullopt, stream, mr); } + + auto const offsets = std::get>( + input.at(groupby_data_attribute::GROUP_OFFSETS)); + CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets."); + auto const num_groups = static_cast(offsets.size()) - 1; + auto const group_indices = std::get>( + input.at(groupby_data_attribute::GROUP_LABELS)); + auto const group_max = std::get( + input.at(cudf::make_max_aggregation())); + auto const group_sum = std::get( + input.at(cudf::make_sum_aggregation())); + + auto const values_dv_ptr = cudf::column_device_view::create(values, stream); + auto output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, + num_groups, + cudf::mask_state::UNALLOCATED, + stream); + rmm::device_uvector validity(num_groups, stream); + + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_groups), + thrust::make_zip_iterator(output->mutable_view().begin(), validity.begin()), + transform_fn{*values_dv_ptr, + offsets, + group_indices, + group_max.begin>(), + group_sum.begin>()}); + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity<>{}, stream, mr); + if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } + return output; + } + + template + struct transform_fn { + cudf::column_device_view values; + cudf::device_span offsets; + cudf::device_span group_indices; + MaxType const* group_max; + SumType const* group_sum; + + thrust::tuple __device__ operator()(cudf::size_type idx) const + { + auto const start = offsets[idx]; + auto const end = offsets[idx + 1]; + if (start == end) { return {OutputType{0}, false}; } + + auto sum_sqr = OutputType{0}; + bool has_valid{false}; + for (auto i = start; i < end; ++i) { + if (values.is_null(i)) { continue; } + has_valid = true; + auto const val = static_cast(values.element(i)); + sum_sqr += val * val; + } + + if (!has_valid) { return {OutputType{0}, false}; } + return {static_cast(group_indices[start] + 1) * sum_sqr - + static_cast(group_max[idx]) * static_cast(group_sum[idx]), + true}; + } + }; + }; +}; + +} // namespace + +using doubles_col = cudf::test::fixed_width_column_wrapper; +using int32s_col = cudf::test::fixed_width_column_wrapper; +using int64s_col = cudf::test::fixed_width_column_wrapper; + +struct HostUDFExampleTest : cudf::test::BaseFixture {}; + +TEST_F(HostUDFExampleTest, ReductionSimpleInput) +{ + auto const vals = doubles_col{0.0, 1.0, 2.0, 3.0, 4.0, 5.0}; + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique>()); + auto const reduced = cudf::reduce(vals, + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + EXPECT_TRUE(reduced->is_valid()); + EXPECT_EQ(cudf::type_id::INT64, reduced->type().id()); + auto const result = + static_cast*>(reduced.get())->value(cudf::get_default_stream()); + auto constexpr expected = 55; // 0^2 + 1^2 + 2^2 + 3^2 + 4^2 + 5^2 = 55 + EXPECT_EQ(expected, result); +} + +TEST_F(HostUDFExampleTest, ReductionEmptyInput) +{ + auto const vals = doubles_col{}; + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique>()); + auto const reduced = cudf::reduce(vals, + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + EXPECT_FALSE(reduced->is_valid()); + EXPECT_EQ(cudf::type_id::INT64, reduced->type().id()); +} + +TEST_F(HostUDFExampleTest, SegmentedReductionSimpleInput) +{ + double constexpr null = 0.0; + auto const vals = doubles_col{{0.0, null, 2.0, 3.0, null, 5.0, null, null, 8.0, 9.0}, + {true, false, true, true, false, true, false, false, true, true}}; + auto const offsets = int32s_col{0, 3, 5, 10}.release(); + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique>()); + + // Test without init value. + { + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + std::nullopt, // init value + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + // When null_policy is set to `INCLUDE`, the null values are replaced with the init value. + // Since init value is not given, it is set to 0. + // [ 3 * (0^2 + init^2 + 2^2), 2 * (3^2 + init^2), 5 * (5^2 + init^2 + init^2 + 8^2 + 9^2) ] + auto const expected = int64s_col{12, 18, 850}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); + } + + // Test with init value, and include nulls. + { + auto const init_scalar = cudf::make_fixed_width_scalar(3.0); + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + *init_scalar, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + // When null_policy is set to `INCLUDE`, the null values are replaced with the init value. + // [ 3 * (3 + 0^2 + 3^2 + 2^2), 2 * (3 + 3^2 + 3^2), 5 * (3 + 5^2 + 3^2 + 3^2 + 8^2 + 9^2) ] + auto const expected = int64s_col{48, 42, 955}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); + } + + // Test with init value, and exclude nulls. + { + auto const init_scalar = cudf::make_fixed_width_scalar(3.0); + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::EXCLUDE, + *init_scalar, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + // [ 3 * (3 + 0^2 + 2^2), 2 * (3 + 3^2), 5 * (3 + 5^2 + 8^2 + 9^2) ] + auto const expected = int64s_col{21, 24, 865}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); + } +} + +TEST_F(HostUDFExampleTest, SegmentedReductionEmptySegments) +{ + auto const vals = doubles_col{}; + auto const offsets = int32s_col{0, 0, 0, 0}.release(); + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique>()); + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + std::nullopt, // init value + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + auto const expected = int64s_col{{0, 0, 0}, {false, false, false}}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + +TEST_F(HostUDFExampleTest, SegmentedReductionEmptyInput) +{ + auto const vals = doubles_col{}; + auto const offsets = int32s_col{}.release(); + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique>()); + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + std::nullopt, // init value + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + auto const expected = int64s_col{}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + +TEST_F(HostUDFExampleTest, GroupbySimpleInput) +{ + double constexpr null = 0.0; + auto const keys = int32s_col{0, 1, 2, 0, 1, 2, 0, 1, 2, 0}; + auto const vals = doubles_col{{0.0, null, 2.0, 3.0, null, 5.0, null, null, 8.0, 9.0}, + {true, false, true, true, false, true, false, false, true, true}}; + auto agg = cudf::make_host_udf_aggregation( + std::make_unique>()); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + + auto const grp_result = gb_obj.aggregate(requests, cudf::test::get_default_stream()); + auto const& result = grp_result.second[0].results[0]; + + // Output type of groupby is double. + // Values grouped by keys: [ {0, 3, null, 9}, {null, null, null}, {2, 5, 8} ] + // Group sum_sqr: [ 90, null, 93 ] + // Group max: [ 9, null, 8 ] + // Group sum: [ 12, null, 15 ] + // Output: [ 1 * 90 - 9 * 12, null, 3 * 93 - 8 * 15 ] + auto const expected = doubles_col{{-18.0, null, 159.0}, {true, false, true}}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp new file mode 100644 index 00000000000..f3c2ce42931 --- /dev/null +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -0,0 +1,362 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace { +/** + * @brief A host-based UDF implementation used for unit tests. + */ +template +struct host_udf_test : cudf::host_udf_base { + static_assert(std::is_same_v || + std::is_same_v || + std::is_same_v); + + bool* const test_run; // to check if the test is accidentally skipped + input_data_attributes const input_attrs; + host_udf_test(bool* test_run_, input_data_attributes input_attrs_ = {}) + : test_run{test_run_}, input_attrs(std::move(input_attrs_)) + { + } + + [[nodiscard]] input_data_attributes get_required_data() const override { return input_attrs; } + + // This is the main testing function, which checks for the correctness of input data. + // The rests are just to satisfy the interface. + [[nodiscard]] output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + SCOPED_TRACE("Original line of failure: " + std::to_string(test_location_line)); + + input_data_attributes check_attrs = input_attrs; + if constexpr (std::is_same_v) { + if (check_attrs.empty()) { + check_attrs = input_data_attributes{reduction_data_attribute::INPUT_VALUES, + reduction_data_attribute::OUTPUT_DTYPE, + reduction_data_attribute::INIT_VALUE}; + } + EXPECT_EQ(input.size(), check_attrs.size()); + for (auto const& attr : check_attrs) { + EXPECT_TRUE(input.count(attr) > 0); + EXPECT_TRUE(std::holds_alternative(attr.value)); + switch (std::get(attr.value)) { + case reduction_data_attribute::INPUT_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case reduction_data_attribute::OUTPUT_DTYPE: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case reduction_data_attribute::INIT_VALUE: + EXPECT_TRUE( + std::holds_alternative>>( + input.at(attr))); + break; + default:; + } + } + } else if constexpr (std::is_same_v) { + if (check_attrs.empty()) { + check_attrs = input_data_attributes{segmented_reduction_data_attribute::INPUT_VALUES, + segmented_reduction_data_attribute::OUTPUT_DTYPE, + segmented_reduction_data_attribute::INIT_VALUE, + segmented_reduction_data_attribute::NULL_POLICY, + segmented_reduction_data_attribute::OFFSETS}; + } + EXPECT_EQ(input.size(), check_attrs.size()); + for (auto const& attr : check_attrs) { + EXPECT_TRUE(input.count(attr) > 0); + EXPECT_TRUE(std::holds_alternative(attr.value)); + switch (std::get(attr.value)) { + case segmented_reduction_data_attribute::INPUT_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case segmented_reduction_data_attribute::OUTPUT_DTYPE: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case segmented_reduction_data_attribute::INIT_VALUE: + EXPECT_TRUE( + std::holds_alternative>>( + input.at(attr))); + break; + case segmented_reduction_data_attribute::NULL_POLICY: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case segmented_reduction_data_attribute::OFFSETS: + EXPECT_TRUE( + std::holds_alternative>(input.at(attr))); + break; + default:; + } + } + } else { + if (check_attrs.empty()) { + check_attrs = input_data_attributes{groupby_data_attribute::INPUT_VALUES, + groupby_data_attribute::GROUPED_VALUES, + groupby_data_attribute::SORTED_GROUPED_VALUES, + groupby_data_attribute::GROUP_OFFSETS, + groupby_data_attribute::GROUP_LABELS}; + } + EXPECT_EQ(input.size(), check_attrs.size()); + for (auto const& attr : check_attrs) { + EXPECT_TRUE(input.count(attr) > 0); + EXPECT_TRUE(std::holds_alternative(attr.value) || + std::holds_alternative>(attr.value)); + if (std::holds_alternative(attr.value)) { + switch (std::get(attr.value)) { + case groupby_data_attribute::INPUT_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::GROUPED_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::SORTED_GROUPED_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::GROUP_OFFSETS: + EXPECT_TRUE( + std::holds_alternative>(input.at(attr))); + break; + case groupby_data_attribute::GROUP_LABELS: + EXPECT_TRUE( + std::holds_alternative>(input.at(attr))); + break; + default:; + } + } else { // std::holds_alternative>(attr.value) + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + } + } + } + + *test_run = true; // test is run successfully + return get_empty_output(std::nullopt, stream, mr); + } + + [[nodiscard]] output_type get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + if constexpr (std::is_same_v) { + return cudf::make_fixed_width_scalar(0, stream, mr); + } else if constexpr (std::is_same_v) { + return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); + } else { + return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); + } + } + + [[nodiscard]] std::size_t do_hash() const override { return 0; } + [[nodiscard]] bool is_equal(host_udf_base const& other) const override { return true; } + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(test_run, input_attrs); + } +}; + +/** + * @brief Get a random subset of input data attributes. + */ +cudf::host_udf_base::input_data_attributes get_subset( + cudf::host_udf_base::input_data_attributes const& attrs) +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution size_distr(1, attrs.size() - 1); + auto const subset_size = size_distr(gen); + auto const elements = + std::vector(attrs.begin(), attrs.end()); + std::uniform_int_distribution idx_distr(0, attrs.size() - 1); + cudf::host_udf_base::input_data_attributes output; + while (output.size() < subset_size) { + output.insert(elements[idx_distr(gen)]); + } + return output; +} + +/** + * @brief Generate a random aggregation object from {min, max, sum, product}. + */ +std::unique_ptr get_random_agg() +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution distr(1, 4); + switch (distr(gen)) { + case 1: return cudf::make_min_aggregation(); + case 2: return cudf::make_max_aggregation(); + case 3: return cudf::make_sum_aggregation(); + case 4: return cudf::make_product_aggregation(); + default: CUDF_UNREACHABLE("This should not be reached."); + } + return nullptr; +} + +} // namespace + +using int32s_col = cudf::test::fixed_width_column_wrapper; + +// Number of randomly testing on the input data attributes. +// For each test, a subset of data attributes will be randomly generated from all the possible input +// data attributes. The input data corresponding to that subset passed from libcudf will be tested +// for correctness. +constexpr int NUM_RANDOM_TESTS = 10; + +struct HostUDFTest : cudf::test::BaseFixture {}; + +TEST_F(HostUDFTest, ReductionAllInput) +{ + bool test_run = false; + auto const vals = int32s_col{1, 2, 3}; + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique>(&test_run)); + [[maybe_unused]] auto const reduced = cudf::reduce(vals, + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + EXPECT_TRUE(test_run); +} + +TEST_F(HostUDFTest, ReductionSomeInput) +{ + auto const vals = int32s_col{1, 2, 3}; + auto const all_attrs = cudf::host_udf_base::input_data_attributes{ + cudf::host_udf_base::reduction_data_attribute::INPUT_VALUES, + cudf::host_udf_base::reduction_data_attribute::OUTPUT_DTYPE, + cudf::host_udf_base::reduction_data_attribute::INIT_VALUE}; + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + bool test_run = false; + auto input_attrs = get_subset(all_attrs); + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique>(&test_run, + std::move(input_attrs))); + [[maybe_unused]] auto const reduced = cudf::reduce(vals, + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + EXPECT_TRUE(test_run); + } +} + +TEST_F(HostUDFTest, SegmentedReductionAllInput) +{ + bool test_run = false; + auto const vals = int32s_col{1, 2, 3}; + auto const offsets = int32s_col{0, 3, 5, 10}.release(); + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique>(&test_run)); + [[maybe_unused]] auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + std::nullopt, // init value + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + EXPECT_TRUE(test_run); +} + +TEST_F(HostUDFTest, SegmentedReductionSomeInput) +{ + auto const vals = int32s_col{1, 2, 3}; + auto const all_attrs = cudf::host_udf_base::input_data_attributes{ + cudf::host_udf_base::segmented_reduction_data_attribute::INPUT_VALUES, + cudf::host_udf_base::segmented_reduction_data_attribute::OUTPUT_DTYPE, + cudf::host_udf_base::segmented_reduction_data_attribute::INIT_VALUE, + cudf::host_udf_base::segmented_reduction_data_attribute::NULL_POLICY, + cudf::host_udf_base::segmented_reduction_data_attribute::OFFSETS}; + auto const offsets = int32s_col{0, 3, 5, 10}.release(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + bool test_run = false; + auto input_attrs = get_subset(all_attrs); + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique>( + &test_run, std::move(input_attrs))); + [[maybe_unused]] auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + std::nullopt, // init value + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + EXPECT_TRUE(test_run); + } +} + +TEST_F(HostUDFTest, GroupbyAllInput) +{ + bool test_run = false; + auto const keys = int32s_col{0, 1, 2}; + auto const vals = int32s_col{0, 1, 2}; + auto agg = cudf::make_host_udf_aggregation( + std::make_unique>(&test_run)); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + [[maybe_unused]] auto const grp_result = + gb_obj.aggregate(requests, cudf::test::get_default_stream()); + EXPECT_TRUE(test_run); +} + +TEST_F(HostUDFTest, GroupbySomeInput) +{ + auto const keys = int32s_col{0, 1, 2}; + auto const vals = int32s_col{0, 1, 2}; + auto const all_attrs = cudf::host_udf_base::input_data_attributes{ + cudf::host_udf_base::groupby_data_attribute::INPUT_VALUES, + cudf::host_udf_base::groupby_data_attribute::GROUPED_VALUES, + cudf::host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, + cudf::host_udf_base::groupby_data_attribute::GROUP_OFFSETS, + cudf::host_udf_base::groupby_data_attribute::GROUP_LABELS}; + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + bool test_run = false; + auto input_attrs = get_subset(all_attrs); + input_attrs.insert(get_random_agg()); + auto agg = cudf::make_host_udf_aggregation( + std::make_unique>(&test_run, + std::move(input_attrs))); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + [[maybe_unused]] auto const grp_result = + gb_obj.aggregate(requests, cudf::test::get_default_stream()); + EXPECT_TRUE(test_run); + } +} diff --git a/java/src/main/java/ai/rapids/cudf/Aggregation.java b/java/src/main/java/ai/rapids/cudf/Aggregation.java index 379750bb0b7..d3692ed2d5a 100644 --- a/java/src/main/java/ai/rapids/cudf/Aggregation.java +++ b/java/src/main/java/ai/rapids/cudf/Aggregation.java @@ -62,15 +62,16 @@ enum Kind { LAG(23), PTX(24), CUDA(25), - M2(26), - MERGE_M2(27), - RANK(28), - DENSE_RANK(29), - PERCENT_RANK(30), - TDIGEST(31), // This can take a delta argument for accuracy level - MERGE_TDIGEST(32), // This can take a delta argument for accuracy level - HISTOGRAM(33), - MERGE_HISTOGRAM(34); + HOST_UDF(26), + M2(27), + MERGE_M2(28), + RANK(29), + DENSE_RANK(30), + PERCENT_RANK(31), + TDIGEST(32), // This can take a delta argument for accuracy level + MERGE_TDIGEST(33), // This can take a delta argument for accuracy level + HISTOGRAM(34), + MERGE_HISTOGRAM(35); final int nativeId; @@ -385,6 +386,35 @@ public boolean equals(Object other) { } } + static final class HostUDFAggregation extends Aggregation { + HostUDFWrapper wrapper; + + private HostUDFAggregation(HostUDFWrapper wrapper) { + super(Kind.HOST_UDF); + this.wrapper = wrapper; + } + + @Override + long createNativeInstance() { + return Aggregation.createHostUDFAgg(wrapper.udfNativeHandle); + } + + @Override + public int hashCode() { + return 31 * kind.hashCode() ^ wrapper.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (other instanceof HostUDFAggregation) { + return wrapper.equals(((HostUDFAggregation) other).wrapper); + } + return false; + } + } + protected final Kind kind; protected Aggregation(Kind kind) { @@ -837,6 +867,15 @@ static MergeSetsAggregation mergeSets(NullEquality nullEquality, NaNEquality nan return new MergeSetsAggregation(nullEquality, nanEquality); } + /** + * Host UDF aggregation, to execute a host-side user-defined function (UDF). + * @param TODO + * @return A new HostUDFAggregation instance + */ + static HostUDFAggregation hostUDF(HostUDFWrapper wrapper) { + return new HostUDFAggregation(wrapper); + } + static final class LeadAggregation extends LeadLagAggregation { private LeadAggregation(int offset, ColumnVector defaultOutput) { super(Kind.LEAD, offset, defaultOutput); @@ -990,4 +1029,9 @@ static MergeHistogramAggregation mergeHistogram() { * Create a TDigest aggregation. */ private static native long createTDigestAgg(int kind, int delta); + + /** + * Create a HOST_UDF aggregation. + */ + private static native long createHostUDFAgg(long udfNativeHandle); } diff --git a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java index 0fae33927b6..f8b07336f62 100644 --- a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java @@ -277,6 +277,15 @@ public static GroupByAggregation mergeSets() { return new GroupByAggregation(Aggregation.mergeSets()); } + /** + * Execute an aggregation using a host-side user-defined function (UDF). + * @param TODO + * @return A new GroupByAggregation instance + */ + public static GroupByAggregation hostUDF(HostUDFWrapper wrapper) { + return new GroupByAggregation(Aggregation.hostUDF(wrapper)); + } + /** * Merge the partial sets produced by multiple CollectSetAggregations. * diff --git a/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java b/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java new file mode 100644 index 00000000000..0b6ecf2e140 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf; + +/** + * A wrapper around native host UDF aggregations. + *

+ * This class is used to store the native handle of a host UDF aggregation and is used as + * a proxy object to compute hash code and compare two host UDF aggregations for equality. + *

+ * A new host UDF aggregation implementation must extend this class and override the + * {@code hashCode} and {@code equals} methods for such purposes. + */ +public abstract class HostUDFWrapper { + public final long udfNativeHandle; + + public HostUDFWrapper(long udfNativeHandle) { + this.udfNativeHandle = udfNativeHandle; + } +} diff --git a/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java b/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java index ba8ae379bae..c49516697b6 100644 --- a/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java @@ -161,14 +161,14 @@ public static ReductionAggregation median() { /** * Aggregate to compute the specified quantiles. Uses linear interpolation by default. */ - public static ReductionAggregation quantile(double ... quantiles) { + public static ReductionAggregation quantile(double... quantiles) { return new ReductionAggregation(Aggregation.quantile(quantiles)); } /** * Aggregate to compute various quantiles. */ - public static ReductionAggregation quantile(QuantileMethod method, double ... quantiles) { + public static ReductionAggregation quantile(QuantileMethod method, double... quantiles) { return new ReductionAggregation(Aggregation.quantile(method, quantiles)); } @@ -256,7 +256,7 @@ public static ReductionAggregation collectSet() { * @param nanEquality Flag to specify whether NaN values in floating point column should be considered equal. */ public static ReductionAggregation collectSet(NullPolicy nullPolicy, - NullEquality nullEquality, NaNEquality nanEquality) { + NullEquality nullEquality, NaNEquality nanEquality) { return new ReductionAggregation(Aggregation.collectSet(nullPolicy, nullEquality, nanEquality)); } @@ -286,6 +286,15 @@ public static ReductionAggregation mergeSets(NullEquality nullEquality, NaNEqual return new ReductionAggregation(Aggregation.mergeSets(nullEquality, nanEquality)); } + /** + * Execute a reduction using a host-side user-defined function (UDF). + * @param TODO + * @return A new ReductionAggregation instance + */ + public static ReductionAggregation hostUDF(HostUDFWrapper wrapper) { + return new ReductionAggregation(Aggregation.hostUDF(wrapper)); + } + /** * Create HistogramAggregation, computing the frequencies for each unique row. * diff --git a/java/src/main/java/ai/rapids/cudf/SegmentedReductionAggregation.java b/java/src/main/java/ai/rapids/cudf/SegmentedReductionAggregation.java index 7ed150a2fec..c8c40c74e99 100644 --- a/java/src/main/java/ai/rapids/cudf/SegmentedReductionAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/SegmentedReductionAggregation.java @@ -101,4 +101,13 @@ public static SegmentedReductionAggregation any() { public static SegmentedReductionAggregation all() { return new SegmentedReductionAggregation(Aggregation.all()); } + + /** + * Execute a reduction using a host-side user-defined function (UDF). + * @param TODO + * @return A new SegmentedReductionAggregation instance + */ + public static SegmentedReductionAggregation hostUDF(HostUDFWrapper wrapper) { + return new SegmentedReductionAggregation(Aggregation.hostUDF(wrapper)); + } } diff --git a/java/src/main/native/src/AggregationJni.cpp b/java/src/main/native/src/AggregationJni.cpp index c40f1c55500..86b44b7e628 100644 --- a/java/src/main/native/src/AggregationJni.cpp +++ b/java/src/main/native/src/AggregationJni.cpp @@ -80,25 +80,28 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createNoParamAgg(JNIEnv* // case 23: LAG // case 24: PTX // case 25: CUDA - case 26: // M2 + // case 26: HOST_UDF + case 27: // M2 return cudf::make_m2_aggregation(); - case 27: // MERGE_M2 + case 28: // MERGE_M2 return cudf::make_merge_m2_aggregation(); - case 28: // RANK + case 29: // RANK return cudf::make_rank_aggregation( cudf::rank_method::MIN, {}, cudf::null_policy::INCLUDE); - case 29: // DENSE_RANK + case 30: // DENSE_RANK return cudf::make_rank_aggregation( cudf::rank_method::DENSE, {}, cudf::null_policy::INCLUDE); - case 30: // ANSI SQL PERCENT_RANK + case 31: // ANSI SQL PERCENT_RANK return cudf::make_rank_aggregation(cudf::rank_method::MIN, {}, cudf::null_policy::INCLUDE, {}, cudf::rank_percentage::ONE_NORMALIZED); - case 33: // HISTOGRAM + // case 32: TDIGEST + // case 33: MERGE_TDIGEST + case 34: // HISTOGRAM return cudf::make_histogram_aggregation(); - case 34: // MERGE_HISTOGRAM + case 35: // MERGE_HISTOGRAM return cudf::make_merge_histogram_aggregation(); default: throw std::logic_error("Unsupported No Parameter Aggregation Operation"); @@ -160,10 +163,10 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createTDigestAgg(JNIEnv* std::unique_ptr ret; // These numbers come from Aggregation.java and must stay in sync switch (kind) { - case 31: // TDIGEST + case 32: // TDIGEST ret = cudf::make_tdigest_aggregation(delta); break; - case 32: // MERGE_TDIGEST + case 33: // MERGE_TDIGEST ret = cudf::make_merge_tdigest_aggregation(delta); break; default: throw std::logic_error("Unsupported TDigest Aggregation Operation"); @@ -296,4 +299,18 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createMergeSetsAgg(JNIEn CATCH_STD(env, 0); } +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createHostUDFAgg(JNIEnv* env, + jclass class_object, + jlong udf_native_handle) +{ + JNI_NULL_CHECK(env, udf_native_handle, "udf_native_handle is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const udf_ptr = reinterpret_cast(udf_native_handle); + auto output = cudf::make_host_udf_aggregation(udf_ptr->clone()); + return reinterpret_cast(output.release()); + } + CATCH_STD(env, 0); +} + } // extern "C"