From 4316ffbf79c8a51522b9464dde5e7e908c86d092 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Fri, 13 Dec 2024 11:40:50 -0800 Subject: [PATCH 01/22] Implement `HOST_UDF` aggregation for groupby Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation.hpp | 254 ++++++++++++++++- .../cudf/detail/aggregation/aggregation.hpp | 53 ++++ cpp/src/aggregation/aggregation.cpp | 21 ++ cpp/src/groupby/groupby.cu | 56 ++-- cpp/src/groupby/sort/aggregate.cpp | 54 ++++ cpp/tests/CMakeLists.txt | 2 + cpp/tests/groupby/host_udf_example_tests.cu | 259 ++++++++++++++++++ cpp/tests/groupby/host_udf_tests.cpp | 209 ++++++++++++++ .../main/java/ai/rapids/cudf/Aggregation.java | 68 ++++- .../ai/rapids/cudf/GroupByAggregation.java | 9 + java/src/main/native/src/AggregationJni.cpp | 51 +++- 11 files changed, 995 insertions(+), 41 deletions(-) create mode 100644 cpp/tests/groupby/host_udf_example_tests.cu create mode 100644 cpp/tests/groupby/host_udf_tests.cpp diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index f5f514d26d9..3adb5cbd489 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -16,11 +16,20 @@ #pragma once +#include #include #include +#include +#include + +#include #include #include +#include +#include +#include +#include #include /** @@ -110,8 +119,9 @@ class aggregation { COLLECT_SET, ///< collect values into a list without duplicate entries LEAD, ///< window function, accesses row at specified offset following current row LAG, ///< window function, accesses row at specified offset preceding current row - PTX, ///< PTX UDF based reduction - CUDA, ///< CUDA UDF based reduction + PTX, ///< PTX based UDF aggregation + CUDA, ///< CUDA based UDF aggregation + HOST_UDF, ///< host based UDF aggregation MERGE_LISTS, ///< merge multiple lists values into one list MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries MERGE_M2, ///< merge partial values of M2 aggregation, @@ -120,7 +130,7 @@ class aggregation { TDIGEST, ///< create a tdigest from a set of input values MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together HISTOGRAM, ///< compute frequency of each element - MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation, + MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation }; aggregation() = delete; @@ -599,6 +609,244 @@ std::unique_ptr make_udf_aggregation(udf_type type, std::string const& user_defined_aggregator, data_type output_type); +/** + * @brief The interface for host-based UDF implementation. + * + * An implementation of host-based UDF needs to be derived from this base class, defining + * its own version of the required functions. + */ +struct host_udf_base { + host_udf_base() = default; + virtual ~host_udf_base() = default; + + /** + * @brief Define the possible data needed for groupby aggregations. + * + * Note that only sort-based groupby aggregations are supported. + */ + enum class groupby_data_attribute : int32_t { + INPUT_VALUES, ///< The input values column. + GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the + ///< values within each group maintain their original order. + SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and + ///< sorted within each group. + GROUP_OFFSETS, ///< The offsets separating groups. + GROUP_LABELS ///< Group labels (which is also the same as group indices). + }; + + /** + * @brief Describe possible data that may be needed in the derived class for its operations. + * + * Such data can be either intermediate data such as sorted values or group labels etc, or the + * results of other aggregations. + * + * Each derived host-based UDF class may need a different set of data. It is inefficient to + * evaluate and pass down all these possible data at once from libcudf. A solution for that is, + * the derived class can define a subset of data that it needs and libcudf will evaluate + * and pass down only data requested from that set. + */ + struct data_attribute { + /** + * @brief Hold all possible data types for the input of the aggregation in the derived class. + */ + using value_type = std::variant>; + value_type value; ///< The actual data attribute, wrapped by this struct + ///< as a wrapper is needed to define `hash` and `equal_to` functors. + + data_attribute() = default; ///< Default constructor + data_attribute(data_attribute&&) = default; ///< Move constructor + + /** + * @brief Construct a new data attribute from an aggregation attribute. + * @param value_ An aggregation attribute + */ + template )> + data_attribute(T value_) : value{value_} + { + } + + /** + * @brief Construct a new data attribute from another aggregation request. + * @param value_ An aggregation request + */ + template || + std::is_same_v)> + data_attribute(std::unique_ptr value_) : value{std::move(value_)} + { + if constexpr (std::is_same_v) { + CUDF_EXPECTS( + dynamic_cast(std::get>(value).get()) != nullptr, + "Requesting results from other aggregations is only supported in groupby " + "aggregations."); + } + CUDF_EXPECTS(std::get>(value) != nullptr, + "Invalid aggregation request."); + } + + /** + * @brief Copy constructor. + * @param other The other data attribute to copy from + */ + data_attribute(data_attribute const& other) + : value{std::visit( + cudf::detail::visitor_overload{ + [](auto const& val) { return value_type{val}; }, + [](std::unique_ptr const& val) { return value_type{val->clone()}; }}, + other.value)} + { + } + + /** + * @brief Hash functor for `data_attribute`. + */ + struct hash { + /** + * @brief Compute the hash value of a data attribute. + * @param attr The data attribute to hash + * @return The hash value of the data attribute + */ + std::size_t operator()(data_attribute const& attr) const + { + auto const& value = attr.value; + auto const hash_value = + std::visit(cudf::detail::visitor_overload{ + [](auto const& val) { return std::hash{}(static_cast(val)); }, + [](std::unique_ptr const& val) { return val->do_hash(); }}, + value); + return std::hash{}(value.index()) ^ hash_value; + } + }; // struct hash + + /** + * @brief Equality comparison functor for `data_attribute`. + */ + struct equal_to { + /** + * @brief Check if two data attributes are equal. + * @param lhs The left-hand side data attribute + * @param rhs The right-hand side data attribute + * @return True if the two data attributes are equal + */ + bool operator()(data_attribute const& lhs, data_attribute const& rhs) const + { + auto const& lhs_val = lhs.value; + auto const& rhs_val = rhs.value; + if (lhs_val.index() != rhs_val.index()) { return false; } + return std::visit(cudf::detail::visitor_overload{ + [](auto const& lhs_val, auto const& rhs_val) { + if constexpr (std::is_same_v) { + return lhs_val == rhs_val; + } else { + return false; + } + }, + [](std::unique_ptr const& lhs_val, + std::unique_ptr const& rhs_val) { + return lhs_val->is_equal(*rhs_val); + }}, + lhs_val, + rhs_val); + } + }; // struct equal_to + }; // struct data_attribute + + /** + * @brief Set of attributes for the input data that is needed for computing the aggregation. + */ + using data_attributes_set_t = + std::unordered_set; + + /** + * @brief Return a set of attributes for the data that is needed for computing the aggregation. + * + * If this function is not overridden, an empty set is returned. That means all the data + * attributes (except results from other aggregations in groupby) will be needed. + * + * @return A set of `data_attribute` + */ + [[nodiscard]] virtual data_attributes_set_t get_required_data() const { return {}; } + + /** + * @brief Hold all possible types of the data that is passed to the derived class for executing + * the aggregation. + */ + using input_data_t = std::variant>; + + /** + * @brief Input to the aggregation, mapping from each data attribute to its actual data. + */ + using host_udf_input = std:: + unordered_map; + + /** + * @brief Output type of the aggregation. + * + * Currently only a single type is supported as the output of the aggregation, but it will hold + * more type in the future when reduction is supported. + */ + using output_t = std::variant>; + + /** + * @brief Get the output when the input values column is empty. + * + * This is called in libcudf when the input values column is empty. In such situations libcudf + * tries to generate the output directly without unnecessarily evaluating the intermediate data. + * + * @param output_dtype The expected output data type + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation when input values is empty + */ + [[nodiscard]] virtual output_t get_empty_output(std::optional output_dtype, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; + + /** + * @brief Perform the main computation for the host-based UDF. + * + * @param input The input data needed for performing all computation + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation + */ + [[nodiscard]] virtual output_t operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; + + /** + * @brief Computes hash value of the derived class's instance. + * @return The hash value of the instance + */ + [[nodiscard]] virtual std::size_t do_hash() const = 0; + + /** + * @brief Compares two instances of the derived class for equality. + * @param other The other derived class's instance to compare with + * @return True if the two instances are equal + */ + [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0; + + /** + * @brief Clones the instance. + * + * A class derived from `host_udf_base` should not store too much data such that its instances + * remain lightweight for efficient cloning. + * + * @return A new instance cloned from this + */ + [[nodiscard]] virtual std::unique_ptr clone() const = 0; +}; + +/** + * @brief Factory to create a HOST_UDF aggregation + * + * @param host_udf An instance of a class derived from `host_udf_base` to perform aggregation + * @return A HOST_UDF aggregation object + */ +template +std::unique_ptr make_host_udf_aggregation(std::unique_ptr host_udf); + /** * @brief Factory to create a MERGE_LISTS aggregation. * diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index 6661a461b8b..df8f1a395d7 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -88,6 +89,8 @@ class simple_aggregations_collector { // Declares the interface for the simple class lead_lag_aggregation const& agg); virtual std::vector> visit(data_type col_type, class udf_aggregation const& agg); + virtual std::vector> visit(data_type col_type, + class host_udf_aggregation const& agg); virtual std::vector> visit(data_type col_type, class merge_lists_aggregation const& agg); virtual std::vector> visit(data_type col_type, @@ -135,6 +138,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer virtual void visit(class collect_set_aggregation const& agg); virtual void visit(class lead_lag_aggregation const& agg); virtual void visit(class udf_aggregation const& agg); + virtual void visit(class host_udf_aggregation const& agg); virtual void visit(class merge_lists_aggregation const& agg); virtual void visit(class merge_sets_aggregation const& agg); virtual void visit(class merge_m2_aggregation const& agg); @@ -960,6 +964,47 @@ class udf_aggregation final : public rolling_aggregation { } }; +/** + * @brief Derived class for specifying a custom aggregation specified in host-based UDF. + */ +class host_udf_aggregation final : public groupby_aggregation { + public: + std::unique_ptr udf_ptr; + + host_udf_aggregation() = delete; + host_udf_aggregation(host_udf_aggregation const&) = delete; + + explicit host_udf_aggregation(std::unique_ptr udf_ptr_) + : aggregation{HOST_UDF}, udf_ptr{std::move(udf_ptr_)} + { + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid host-based UDF instance."); + } + + [[nodiscard]] bool is_equal(aggregation const& _other) const override + { + if (!this->aggregation::is_equal(_other)) { return false; } + auto const& other = dynamic_cast(_other); + return udf_ptr->is_equal(*other.udf_ptr); + } + + [[nodiscard]] size_t do_hash() const override + { + return this->aggregation::do_hash() ^ udf_ptr->do_hash(); + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(udf_ptr->clone()); + } + + std::vector> get_simple_aggregations( + data_type col_type, simple_aggregations_collector& collector) const override + { + return collector.visit(col_type, *this); + } + void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } +}; + /** * @brief Derived aggregation class for specifying MERGE_LISTS aggregation */ @@ -1462,6 +1507,12 @@ struct target_type_impl +struct target_type_impl { + // Just a placeholder. The actual return type is unknown. + using type = struct_view; +}; + /** * @brief Helper alias to get the accumulator type for performing aggregation * `k` on elements of type `Source` @@ -1579,6 +1630,8 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind return f.template operator()(std::forward(args)...); case aggregation::EWMA: return f.template operator()(std::forward(args)...); + case aggregation::HOST_UDF: + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Unsupported aggregation."); diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index a60a7f63882..bbccd40f24e 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -237,6 +237,12 @@ std::vector> simple_aggregations_collector::visit( return visit(col_type, static_cast(agg)); } +std::vector> simple_aggregations_collector::visit( + data_type col_type, host_udf_aggregation const& agg) +{ + return visit(col_type, static_cast(agg)); +} + // aggregation_finalizer ---------------------------------------- void aggregation_finalizer::visit(aggregation const& agg) {} @@ -410,6 +416,11 @@ void aggregation_finalizer::visit(merge_tdigest_aggregation const& agg) visit(static_cast(agg)); } +void aggregation_finalizer::visit(host_udf_aggregation const& agg) +{ + visit(static_cast(agg)); +} + } // namespace detail std::vector> aggregation::get_simple_aggregations( @@ -917,6 +928,16 @@ make_merge_tdigest_aggregation(int max_centroids); template CUDF_EXPORT std::unique_ptr make_merge_tdigest_aggregation(int max_centroids); +template +std::unique_ptr make_host_udf_aggregation(std::unique_ptr udf_ptr_) +{ + return std::make_unique(std::move(udf_ptr_)); +} +template CUDF_EXPORT std::unique_ptr make_host_udf_aggregation( + std::unique_ptr); +template CUDF_EXPORT std::unique_ptr + make_host_udf_aggregation(std::unique_ptr); + namespace detail { namespace { struct target_type_functor { diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu index c42038026e5..104cbe110db 100644 --- a/cpp/src/groupby/groupby.cu +++ b/cpp/src/groupby/groupby.cu @@ -99,6 +99,8 @@ namespace { struct empty_column_constructor { column_view values; aggregation const& agg; + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; template std::unique_ptr operator()() const @@ -108,7 +110,7 @@ struct empty_column_constructor { if constexpr (k == aggregation::Kind::COLLECT_LIST || k == aggregation::Kind::COLLECT_SET) { return make_lists_column( - 0, make_empty_column(type_to_id()), empty_like(values), 0, {}); + 0, make_empty_column(type_to_id()), empty_like(values), 0, {}, stream, mr); } if constexpr (k == aggregation::Kind::HISTOGRAM) { @@ -116,7 +118,9 @@ struct empty_column_constructor { make_empty_column(type_to_id()), cudf::reduction::detail::make_empty_histogram_like(values), 0, - {}); + {}, + stream, + mr); } if constexpr (k == aggregation::Kind::MERGE_HISTOGRAM) { return empty_like(values); } @@ -140,31 +144,41 @@ struct empty_column_constructor { return empty_like(values); } + if constexpr (k == aggregation::Kind::HOST_UDF) { + auto const& udf_ptr = dynamic_cast(agg).udf_ptr; + return std::get>(udf_ptr->get_empty_output(std::nullopt, stream, mr)); + } + return make_empty_column(target_type(values.type(), k)); } }; /// Make an empty table with appropriate types for requested aggs template -auto empty_results(host_span requests) +auto empty_results(host_span requests, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { std::vector empty_results; - std::transform( - requests.begin(), requests.end(), std::back_inserter(empty_results), [](auto const& request) { - std::vector> results; - - std::transform( - request.aggregations.begin(), - request.aggregations.end(), - std::back_inserter(results), - [&request](auto const& agg) { - return cudf::detail::dispatch_type_and_aggregation( - request.values.type(), agg->kind, empty_column_constructor{request.values, *agg}); - }); - - return aggregation_result{std::move(results)}; - }); + std::transform(requests.begin(), + requests.end(), + std::back_inserter(empty_results), + [stream, mr](auto const& request) { + std::vector> results; + + std::transform(request.aggregations.begin(), + request.aggregations.end(), + std::back_inserter(results), + [&request, stream, mr](auto const& agg) { + return cudf::detail::dispatch_type_and_aggregation( + request.values.type(), + agg->kind, + empty_column_constructor{request.values, *agg, stream, mr}); + }); + + return aggregation_result{std::move(results)}; + }); return empty_results; } @@ -206,7 +220,7 @@ std::pair, std::vector> groupby::aggr verify_valid_requests(requests); - if (_keys.num_rows() == 0) { return {empty_like(_keys), empty_results(requests)}; } + if (_keys.num_rows() == 0) { return {empty_like(_keys), empty_results(requests, stream, mr)}; } return dispatch_aggregation(requests, stream, mr); } @@ -226,7 +240,9 @@ std::pair, std::vector> groupby::scan verify_valid_requests(requests); - if (_keys.num_rows() == 0) { return std::pair(empty_like(_keys), empty_results(requests)); } + if (_keys.num_rows() == 0) { + return std::pair(empty_like(_keys), empty_results(requests, stream, mr)); + } return sort_scan(requests, stream, mr); } diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index 7a8a1883ed4..767edfd1ba9 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -795,6 +795,60 @@ void aggregate_result_functor::operator()(aggregatio mr)); } +template <> +void aggregate_result_functor::operator()(aggregation const& agg) +{ + if (cache.has_result(values, agg)) { return; } + + auto const& udf_ptr = dynamic_cast(agg).udf_ptr; + auto data_attrs = udf_ptr->get_required_data(); + if (data_attrs.empty()) { // empty means everything + data_attrs = {host_udf_base::groupby_data_attribute::INPUT_VALUES, + host_udf_base::groupby_data_attribute::GROUPED_VALUES, + host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, + host_udf_base::groupby_data_attribute::GROUP_OFFSETS, + host_udf_base::groupby_data_attribute::GROUP_LABELS}; + } + + // Do not cache udf_input, as the actual input data may change from run to run. + host_udf_base::host_udf_input udf_input; + for (auto const& attr : data_attrs) { + CUDF_EXPECTS(std::holds_alternative(attr.value) || + std::holds_alternative>(attr.value), + "Invalid input data attribute for HOST_UDF groupby aggregation."); + if (std::holds_alternative(attr.value)) { + switch (std::get(attr.value)) { + case host_udf_base::groupby_data_attribute::INPUT_VALUES: + udf_input.emplace(attr, values); + break; + case host_udf_base::groupby_data_attribute::GROUPED_VALUES: + udf_input.emplace(attr, get_grouped_values()); + break; + case host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES: + udf_input.emplace(attr, get_sorted_values()); + break; + case host_udf_base::groupby_data_attribute::GROUP_OFFSETS: + udf_input.emplace(attr, helper.group_offsets(stream)); + break; + case host_udf_base::groupby_data_attribute::GROUP_LABELS: + udf_input.emplace(attr, helper.group_labels(stream)); + break; + default: CUDF_UNREACHABLE("Invalid input data attribute for HOST_UDF groupby aggregation."); + } + } else { // data is result from another aggregation + auto other_agg = std::get>(attr.value)->clone(); + cudf::detail::aggregation_dispatcher(other_agg->kind, *this, *other_agg); + auto result = cache.get_result(values, *other_agg); + udf_input.emplace(std::move(other_agg), std::move(result)); + } + } + + auto output = (*udf_ptr)(udf_input, stream, mr); + CUDF_EXPECTS(std::holds_alternative>(output), + "Invalid output type from HOST_UDF groupby aggregation."); + cache.add_result(values, agg, std::get>(std::move(output))); +} + } // namespace detail // Sort-based groupby diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index adf512811cc..e5c29314203 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -132,6 +132,8 @@ ConfigureTest( groupby/groupby_test_util.cpp groupby/groups_tests.cpp groupby/histogram_tests.cpp + groupby/host_udf_example_tests.cu + groupby/host_udf_tests.cpp groupby/keys_tests.cpp groupby/lists_tests.cpp groupby/m2_tests.cpp diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu new file mode 100644 index 00000000000..ce7a95384c7 --- /dev/null +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -0,0 +1,259 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace { +/** + * @brief A host-based UDF implementation. + * + * The aggregations perform the following computation: + * - For reduction: compute `sum(value^2, for value in group)` (this is sum of squared). + * - For segmented reduction: compute `segment_size * sum(value^2, for value in group)`. + * - For groupby: compute `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`. + * + * In addition, for segmented reduction, if null_policy is set to `INCLUDE`, the null values are + * replaced with an initial value if it is provided. + */ +template +struct test_udf_simple_type : cudf::host_udf_base { + static_assert(std::is_same_v); + + test_udf_simple_type() = default; + + [[nodiscard]] data_attributes_set_t get_required_data() const override + { + // We need grouped values, group offsets, group labels, and also results from groups' + // MAX and SUM aggregations. + return {groupby_data_attribute::GROUPED_VALUES, + groupby_data_attribute::GROUP_OFFSETS, + groupby_data_attribute::GROUP_LABELS, + cudf::make_max_aggregation(), + cudf::make_sum_aggregation()}; + } + + [[nodiscard]] output_t get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + return cudf::make_empty_column( + cudf::data_type{cudf::type_to_id()}); + } + + [[nodiscard]] output_t operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + auto const& values = + std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); + return cudf::type_dispatcher(values.type(), groupby_fn{this}, input, stream, mr); + } + + [[nodiscard]] std::size_t do_hash() const override + { + // Just return the same hash for all instances of this class. + return std::size_t{12345}; + } + + [[nodiscard]] bool is_equal(host_udf_base const& other) const override + { + // Just check if the other object is also instance of this class. + return dynamic_cast(&other) != nullptr; + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(); + } + + // For quick compilation, we only instantiate a few input/output types. + template + static constexpr bool is_valid_input_t() + { + return std::is_same_v; + } + + // For quick compilation, we only instantiate a few input/output types. + template + static constexpr bool is_valid_output_t() + { + return std::is_same_v; + } + + struct groupby_fn { + // Store pointer to the parent class so we can call its functions. + test_udf_simple_type const* parent; + using OutputType = double; + template + using MaxType = cudf::detail::target_type_t; + template + using SumType = cudf::detail::target_type_t; + + template ())> + output_t operator()(Args...) const + { + CUDF_FAIL("Unsupported input/output type."); + } + + template ())> + output_t operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + auto const& values = + std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); + if (values.size() == 0) { return parent->get_empty_output(std::nullopt, stream, mr); } + + auto const offsets = std::get>( + input.at(groupby_data_attribute::GROUP_OFFSETS)); + CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets."); + auto const num_groups = static_cast(offsets.size()) - 1; + auto const group_indices = std::get>( + input.at(groupby_data_attribute::GROUP_LABELS)); + auto const group_max = std::get( + input.at(cudf::make_max_aggregation())); + auto const group_sum = std::get( + input.at(cudf::make_sum_aggregation())); + + auto const values_dv_ptr = cudf::column_device_view::create(values, stream); + auto output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, + num_groups, + cudf::mask_state::UNALLOCATED, + stream); + rmm::device_uvector validity(num_groups, stream); + + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_groups), + thrust::make_zip_iterator(output->mutable_view().begin(), validity.begin()), + transform_fn{*values_dv_ptr, + offsets, + group_indices, + group_max.begin>(), + group_sum.begin>()}); + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity<>{}, stream, mr); + if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } + return output; + } + + template + struct transform_fn { + cudf::column_device_view values; + cudf::device_span offsets; + cudf::device_span group_indices; + MaxType const* group_max; + SumType const* group_sum; + + thrust::tuple __device__ operator()(cudf::size_type idx) const + { + auto const start = offsets[idx]; + auto const end = offsets[idx + 1]; + if (start == end) { return {OutputType{0}, false}; } + + auto sum_sqr = OutputType{0}; + bool has_valid{false}; + for (auto i = start; i < end; ++i) { + if (values.is_null(i)) { continue; } + has_valid = true; + auto const val = static_cast(values.element(i)); + sum_sqr += val * val; + } + + if (!has_valid) { return {OutputType{0}, false}; } + return {static_cast(group_indices[start] + 1) * sum_sqr - + static_cast(group_max[idx]) * static_cast(group_sum[idx]), + true}; + } + }; + }; +}; + +} // namespace + +using doubles_col = cudf::test::fixed_width_column_wrapper; +using int32s_col = cudf::test::fixed_width_column_wrapper; +using int64s_col = cudf::test::fixed_width_column_wrapper; + +struct HostUDFExampleTest : cudf::test::BaseFixture {}; + +TEST_F(HostUDFExampleTest, GroupbySimpleInput) +{ + double constexpr null = 0.0; + auto const keys = int32s_col{0, 1, 2, 0, 1, 2, 0, 1, 2, 0}; + auto const vals = doubles_col{{0.0, null, 2.0, 3.0, null, 5.0, null, null, 8.0, 9.0}, + {true, false, true, true, false, true, false, false, true, true}}; + auto agg = cudf::make_host_udf_aggregation( + std::make_unique>()); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + + auto const grp_result = gb_obj.aggregate(requests, cudf::test::get_default_stream()); + auto const& result = grp_result.second[0].results[0]; + + // Output type of groupby is double. + // Values grouped by keys: [ {0, 3, null, 9}, {null, null, null}, {2, 5, 8} ] + // Group sum_sqr: [ 90, null, 93 ] + // Group max: [ 9, null, 8 ] + // Group sum: [ 12, null, 15 ] + // Output: [ 1 * 90 - 9 * 12, null, 3 * 93 - 8 * 15 ] + auto const expected = doubles_col{{-18.0, null, 159.0}, {true, false, true}}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + +TEST_F(HostUDFExampleTest, GroupbyEmptyInput) +{ + auto const keys = int32s_col{}; + auto const vals = doubles_col{}; + auto agg = cudf::make_host_udf_aggregation( + std::make_unique>()); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + + auto const grp_result = gb_obj.aggregate(requests, cudf::test::get_default_stream()); + auto const& result = grp_result.second[0].results[0]; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(vals, *result); +} diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp new file mode 100644 index 00000000000..13ff2e274d5 --- /dev/null +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace { +/** + * @brief A host-based UDF implementation used for unit tests. + */ +template +struct host_udf_test : cudf::host_udf_base { + static_assert(std::is_same_v); + + bool* const test_run; // to check if the test is accidentally skipped + data_attributes_set_t const input_attrs; + host_udf_test(bool* test_run_, data_attributes_set_t input_attrs_ = {}) + : test_run{test_run_}, input_attrs(std::move(input_attrs_)) + { + } + + [[nodiscard]] data_attributes_set_t get_required_data() const override { return input_attrs; } + + // This is the main testing function, which checks for the correctness of input data. + // The rests are just to satisfy the interface. + [[nodiscard]] output_t operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + SCOPED_TRACE("Original line of failure: " + std::to_string(test_location_line)); + + data_attributes_set_t check_attrs = input_attrs; + if (check_attrs.empty()) { + check_attrs = data_attributes_set_t{groupby_data_attribute::INPUT_VALUES, + groupby_data_attribute::GROUPED_VALUES, + groupby_data_attribute::SORTED_GROUPED_VALUES, + groupby_data_attribute::GROUP_OFFSETS, + groupby_data_attribute::GROUP_LABELS}; + } + EXPECT_EQ(input.size(), check_attrs.size()); + for (auto const& attr : check_attrs) { + EXPECT_TRUE(input.count(attr) > 0); + EXPECT_TRUE(std::holds_alternative(attr.value) || + std::holds_alternative>(attr.value)); + if (std::holds_alternative(attr.value)) { + switch (std::get(attr.value)) { + case groupby_data_attribute::INPUT_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::GROUPED_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::SORTED_GROUPED_VALUES: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; + case groupby_data_attribute::GROUP_OFFSETS: + EXPECT_TRUE( + std::holds_alternative>(input.at(attr))); + break; + case groupby_data_attribute::GROUP_LABELS: + EXPECT_TRUE( + std::holds_alternative>(input.at(attr))); + break; + default:; + } + } else { // std::holds_alternative>(attr.value) + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + } + } + + *test_run = true; // test is run successfully + return get_empty_output(std::nullopt, stream, mr); + } + + [[nodiscard]] output_t get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); + } + + [[nodiscard]] std::size_t do_hash() const override { return 0; } + [[nodiscard]] bool is_equal(host_udf_base const& other) const override { return true; } + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(test_run, input_attrs); + } +}; + +/** + * @brief Get a random subset of input data attributes. + */ +cudf::host_udf_base::data_attributes_set_t get_subset( + cudf::host_udf_base::data_attributes_set_t const& attrs) +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution size_distr(1, attrs.size() - 1); + auto const subset_size = size_distr(gen); + auto const elements = + std::vector(attrs.begin(), attrs.end()); + std::uniform_int_distribution idx_distr(0, attrs.size() - 1); + cudf::host_udf_base::data_attributes_set_t output; + while (output.size() < subset_size) { + output.insert(elements[idx_distr(gen)]); + } + return output; +} + +/** + * @brief Generate a random aggregation object from {min, max, sum, product}. + */ +std::unique_ptr get_random_agg() +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution distr(1, 4); + switch (distr(gen)) { + case 1: return cudf::make_min_aggregation(); + case 2: return cudf::make_max_aggregation(); + case 3: return cudf::make_sum_aggregation(); + case 4: return cudf::make_product_aggregation(); + default: CUDF_UNREACHABLE("This should not be reached."); + } + return nullptr; +} + +} // namespace + +using int32s_col = cudf::test::fixed_width_column_wrapper; + +// Number of randomly testing on the input data attributes. +// For each test, a subset of data attributes will be randomly generated from all the possible input +// data attributes. The input data corresponding to that subset passed from libcudf will be tested +// for correctness. +constexpr int NUM_RANDOM_TESTS = 10; + +struct HostUDFTest : cudf::test::BaseFixture {}; + +TEST_F(HostUDFTest, GroupbyAllInput) +{ + bool test_run = false; + auto const keys = int32s_col{0, 1, 2}; + auto const vals = int32s_col{0, 1, 2}; + auto agg = cudf::make_host_udf_aggregation( + std::make_unique>(&test_run)); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + [[maybe_unused]] auto const grp_result = + gb_obj.aggregate(requests, cudf::test::get_default_stream()); + EXPECT_TRUE(test_run); +} + +TEST_F(HostUDFTest, GroupbySomeInput) +{ + auto const keys = int32s_col{0, 1, 2}; + auto const vals = int32s_col{0, 1, 2}; + auto const all_attrs = cudf::host_udf_base::data_attributes_set_t{ + cudf::host_udf_base::groupby_data_attribute::INPUT_VALUES, + cudf::host_udf_base::groupby_data_attribute::GROUPED_VALUES, + cudf::host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, + cudf::host_udf_base::groupby_data_attribute::GROUP_OFFSETS, + cudf::host_udf_base::groupby_data_attribute::GROUP_LABELS}; + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + bool test_run = false; + auto input_attrs = get_subset(all_attrs); + input_attrs.insert(get_random_agg()); + auto agg = cudf::make_host_udf_aggregation( + std::make_unique>(&test_run, + std::move(input_attrs))); + + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + cudf::groupby::groupby gb_obj( + cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); + [[maybe_unused]] auto const grp_result = + gb_obj.aggregate(requests, cudf::test::get_default_stream()); + EXPECT_TRUE(test_run); + } +} diff --git a/java/src/main/java/ai/rapids/cudf/Aggregation.java b/java/src/main/java/ai/rapids/cudf/Aggregation.java index 379750bb0b7..18d1e9b432c 100644 --- a/java/src/main/java/ai/rapids/cudf/Aggregation.java +++ b/java/src/main/java/ai/rapids/cudf/Aggregation.java @@ -62,15 +62,16 @@ enum Kind { LAG(23), PTX(24), CUDA(25), - M2(26), - MERGE_M2(27), - RANK(28), - DENSE_RANK(29), - PERCENT_RANK(30), - TDIGEST(31), // This can take a delta argument for accuracy level - MERGE_TDIGEST(32), // This can take a delta argument for accuracy level - HISTOGRAM(33), - MERGE_HISTOGRAM(34); + HOST_UDF(26), + M2(27), + MERGE_M2(28), + RANK(29), + DENSE_RANK(30), + PERCENT_RANK(31), + TDIGEST(32), // This can take a delta argument for accuracy level + MERGE_TDIGEST(33), // This can take a delta argument for accuracy level + HISTOGRAM(34), + MERGE_HISTOGRAM(35); final int nativeId; @@ -385,6 +386,36 @@ public boolean equals(Object other) { } } + static final class HostUDFAggregation extends Aggregation { + private final long udfNativeHandle; + + private HostUDFAggregation(long udfNativeHandle) { + super(Kind.HOST_UDF); + this.udfNativeHandle = udfNativeHandle; + } + + @Override + long createNativeInstance() { + return Aggregation.createHostUDFAgg(udfNativeHandle); + } + + @Override + public int hashCode() { + return 31 * kind.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (other instanceof HostUDFAggregation) { + HostUDFAggregation o = (HostUDFAggregation) other; + return Aggregation.areHostUDFsEqual(udfNativeHandle, o.udfNativeHandle); + } + return false; + } + } + protected final Kind kind; protected Aggregation(Kind kind) { @@ -837,6 +868,15 @@ static MergeSetsAggregation mergeSets(NullEquality nullEquality, NaNEquality nan return new MergeSetsAggregation(nullEquality, nanEquality); } + /** + * Host UDF aggregation, to execute a host-side user-defined function (UDF). + * @param udfNativeHandle Pointer to the native host UDF instance + * @return A new HostUDFAggregation instance + */ + static HostUDFAggregation hostUDF(long udfNativeHandle) { + return new HostUDFAggregation(udfNativeHandle); + } + static final class LeadAggregation extends LeadLagAggregation { private LeadAggregation(int offset, ColumnVector defaultOutput) { super(Kind.LEAD, offset, defaultOutput); @@ -990,4 +1030,14 @@ static MergeHistogramAggregation mergeHistogram() { * Create a TDigest aggregation. */ private static native long createTDigestAgg(int kind, int delta); + + /** + * Create a HOST_UDF aggregation. + */ + private static native long createHostUDFAgg(long udfNativeHandle); + + /** + * Compare two host UDFs to see if they are equal. + */ + private static native boolean areHostUDFsEqual(long lhsNativeHandle, long rhsNativeHandle); } diff --git a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java index 0fae33927b6..0c945a5ed2f 100644 --- a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java @@ -277,6 +277,15 @@ public static GroupByAggregation mergeSets() { return new GroupByAggregation(Aggregation.mergeSets()); } + /** + * Execute an aggregation using a host-side user-defined function (UDF). + * @param udfNativeHandle Pointer to the native host UDF instance + * @return A new GroupByAggregation instance + */ + public static GroupByAggregation hostUDF(long udfNativeHandle) { + return new GroupByAggregation(Aggregation.hostUDF(udfNativeHandle)); + } + /** * Merge the partial sets produced by multiple CollectSetAggregations. * diff --git a/java/src/main/native/src/AggregationJni.cpp b/java/src/main/native/src/AggregationJni.cpp index c40f1c55500..e39b91c8f15 100644 --- a/java/src/main/native/src/AggregationJni.cpp +++ b/java/src/main/native/src/AggregationJni.cpp @@ -80,25 +80,28 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createNoParamAgg(JNIEnv* // case 23: LAG // case 24: PTX // case 25: CUDA - case 26: // M2 + // case 26: HOST_UDF + case 27: // M2 return cudf::make_m2_aggregation(); - case 27: // MERGE_M2 + case 28: // MERGE_M2 return cudf::make_merge_m2_aggregation(); - case 28: // RANK + case 29: // RANK return cudf::make_rank_aggregation( cudf::rank_method::MIN, {}, cudf::null_policy::INCLUDE); - case 29: // DENSE_RANK + case 30: // DENSE_RANK return cudf::make_rank_aggregation( cudf::rank_method::DENSE, {}, cudf::null_policy::INCLUDE); - case 30: // ANSI SQL PERCENT_RANK + case 31: // ANSI SQL PERCENT_RANK return cudf::make_rank_aggregation(cudf::rank_method::MIN, {}, cudf::null_policy::INCLUDE, {}, cudf::rank_percentage::ONE_NORMALIZED); - case 33: // HISTOGRAM + // case 32: TDIGEST + // case 33: MERGE_TDIGEST + case 34: // HISTOGRAM return cudf::make_histogram_aggregation(); - case 34: // MERGE_HISTOGRAM + case 35: // MERGE_HISTOGRAM return cudf::make_merge_histogram_aggregation(); default: throw std::logic_error("Unsupported No Parameter Aggregation Operation"); @@ -160,10 +163,10 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createTDigestAgg(JNIEnv* std::unique_ptr ret; // These numbers come from Aggregation.java and must stay in sync switch (kind) { - case 31: // TDIGEST + case 32: // TDIGEST ret = cudf::make_tdigest_aggregation(delta); break; - case 32: // MERGE_TDIGEST + case 33: // MERGE_TDIGEST ret = cudf::make_merge_tdigest_aggregation(delta); break; default: throw std::logic_error("Unsupported TDigest Aggregation Operation"); @@ -296,4 +299,34 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createMergeSetsAgg(JNIEn CATCH_STD(env, 0); } +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createHostUDFAgg(JNIEnv* env, + jclass class_object, + jlong udf_native_handle) +{ + JNI_NULL_CHECK(env, udf_native_handle, "udf_native_handle is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const udf_ptr = reinterpret_cast(udf_native_handle); + auto output = cudf::make_host_udf_aggregation(udf_ptr->clone()); + return reinterpret_cast(output.release()); + } + CATCH_STD(env, 0); +} + +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_areHostUDFsEqual(JNIEnv* env, + jclass class_object, + jlong lhs_native_handle, + jlong rhs_native_handle) +{ + JNI_NULL_CHECK(env, lhs_native_handle, "lhs_native_handle is null", 0); + JNI_NULL_CHECK(env, rhs_native_handle, "rhs_native_handle is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const lhs_udf_ptr = reinterpret_cast(lhs_native_handle); + auto const rhs_udf_ptr = reinterpret_cast(rhs_native_handle); + return lhs_udf_ptr->is_equal(*rhs_udf_ptr); + } + CATCH_STD(env, 0); +} + } // extern "C" From 3a080f464e852815d3575bd11793930847deb500 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 12:46:51 -0800 Subject: [PATCH 02/22] Rewrite tests Signed-off-by: Nghia Truong --- cpp/tests/groupby/host_udf_example_tests.cu | 61 +++++++-------------- cpp/tests/groupby/host_udf_tests.cpp | 1 - 2 files changed, 20 insertions(+), 42 deletions(-) diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu index ce7a95384c7..83c19b4d022 100644 --- a/cpp/tests/groupby/host_udf_example_tests.cu +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -23,7 +23,6 @@ #include #include #include -#include #include #include @@ -32,25 +31,16 @@ #include #include #include -#include namespace { /** - * @brief A host-based UDF implementation. + * @brief A host-based UDF implementation for groupby. * - * The aggregations perform the following computation: - * - For reduction: compute `sum(value^2, for value in group)` (this is sum of squared). - * - For segmented reduction: compute `segment_size * sum(value^2, for value in group)`. - * - For groupby: compute `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`. - * - * In addition, for segmented reduction, if null_policy is set to `INCLUDE`, the null values are - * replaced with an initial value if it is provided. + * For each group of values, the aggregation computes + * `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`. */ -template -struct test_udf_simple_type : cudf::host_udf_base { - static_assert(std::is_same_v); - - test_udf_simple_type() = default; +struct host_udf_groupby_example : cudf::host_udf_base { + host_udf_groupby_example() = default; [[nodiscard]] data_attributes_set_t get_required_data() const override { @@ -90,44 +80,34 @@ struct test_udf_simple_type : cudf::host_udf_base { [[nodiscard]] bool is_equal(host_udf_base const& other) const override { // Just check if the other object is also instance of this class. - return dynamic_cast(&other) != nullptr; + return dynamic_cast(&other) != nullptr; } [[nodiscard]] std::unique_ptr clone() const override { - return std::make_unique(); - } - - // For quick compilation, we only instantiate a few input/output types. - template - static constexpr bool is_valid_input_t() - { - return std::is_same_v; - } - - // For quick compilation, we only instantiate a few input/output types. - template - static constexpr bool is_valid_output_t() - { - return std::is_same_v; + return std::make_unique(); } struct groupby_fn { // Store pointer to the parent class so we can call its functions. - test_udf_simple_type const* parent; + host_udf_groupby_example const* parent; + + // For simplicity, this example only accepts double input and always produces double output. + using InputType = double; using OutputType = double; + template using MaxType = cudf::detail::target_type_t; template using SumType = cudf::detail::target_type_t; - template ())> + template )> output_t operator()(Args...) const { - CUDF_FAIL("Unsupported input/output type."); + CUDF_FAIL("Unsupported input type."); } - template ())> + template )> output_t operator()(host_udf_input const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const @@ -206,18 +186,17 @@ struct test_udf_simple_type : cudf::host_udf_base { using doubles_col = cudf::test::fixed_width_column_wrapper; using int32s_col = cudf::test::fixed_width_column_wrapper; -using int64s_col = cudf::test::fixed_width_column_wrapper; -struct HostUDFExampleTest : cudf::test::BaseFixture {}; +struct HostUDFGroupbyExampleTest : cudf::test::BaseFixture {}; -TEST_F(HostUDFExampleTest, GroupbySimpleInput) +TEST_F(HostUDFGroupbyExampleTest, SimpleInput) { double constexpr null = 0.0; auto const keys = int32s_col{0, 1, 2, 0, 1, 2, 0, 1, 2, 0}; auto const vals = doubles_col{{0.0, null, 2.0, 3.0, null, 5.0, null, null, 8.0, 9.0}, {true, false, true, true, false, true, false, false, true, true}}; auto agg = cudf::make_host_udf_aggregation( - std::make_unique>()); + std::make_unique()); std::vector requests; requests.emplace_back(); @@ -239,12 +218,12 @@ TEST_F(HostUDFExampleTest, GroupbySimpleInput) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); } -TEST_F(HostUDFExampleTest, GroupbyEmptyInput) +TEST_F(HostUDFGroupbyExampleTest, EmptyInput) { auto const keys = int32s_col{}; auto const vals = doubles_col{}; auto agg = cudf::make_host_udf_aggregation( - std::make_unique>()); + std::make_unique()); std::vector requests; requests.emplace_back(); diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp index 13ff2e274d5..f38388f1c19 100644 --- a/cpp/tests/groupby/host_udf_tests.cpp +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include From 343f85947accd14ac739d6c6b0bb7d4c70c524e7 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 13:11:46 -0800 Subject: [PATCH 03/22] Add `NUM_GROUPS` attribute Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation.hpp | 3 ++- cpp/src/groupby/sort/aggregate.cpp | 4 ++++ cpp/tests/groupby/host_udf_tests.cpp | 7 ++++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index 3adb5cbd489..b60d9f321a0 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -630,6 +630,7 @@ struct host_udf_base { ///< values within each group maintain their original order. SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and ///< sorted within each group. + NUM_GROUPS, ///< The number of groups (i.e., number of distinct keys). GROUP_OFFSETS, ///< The offsets separating groups. GROUP_LABELS ///< Group labels (which is also the same as group indices). }; @@ -771,7 +772,7 @@ struct host_udf_base { * @brief Hold all possible types of the data that is passed to the derived class for executing * the aggregation. */ - using input_data_t = std::variant>; + using input_data_t = std::variant>; /** * @brief Input to the aggregation, mapping from each data attribute to its actual data. diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index 767edfd1ba9..4e7d8265f27 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -806,6 +806,7 @@ void aggregate_result_functor::operator()(aggregation con data_attrs = {host_udf_base::groupby_data_attribute::INPUT_VALUES, host_udf_base::groupby_data_attribute::GROUPED_VALUES, host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, + host_udf_base::groupby_data_attribute::NUM_GROUPS, host_udf_base::groupby_data_attribute::GROUP_OFFSETS, host_udf_base::groupby_data_attribute::GROUP_LABELS}; } @@ -827,6 +828,9 @@ void aggregate_result_functor::operator()(aggregation con case host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES: udf_input.emplace(attr, get_sorted_values()); break; + case host_udf_base::groupby_data_attribute::NUM_GROUPS: + udf_input.emplace(attr, helper.num_groups(stream)); + break; case host_udf_base::groupby_data_attribute::GROUP_OFFSETS: udf_input.emplace(attr, helper.group_offsets(stream)); break; diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp index f38388f1c19..40276e3a177 100644 --- a/cpp/tests/groupby/host_udf_tests.cpp +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -54,6 +54,7 @@ struct host_udf_test : cudf::host_udf_base { check_attrs = data_attributes_set_t{groupby_data_attribute::INPUT_VALUES, groupby_data_attribute::GROUPED_VALUES, groupby_data_attribute::SORTED_GROUPED_VALUES, + groupby_data_attribute::NUM_GROUPS, groupby_data_attribute::GROUP_OFFSETS, groupby_data_attribute::GROUP_LABELS}; } @@ -73,6 +74,9 @@ struct host_udf_test : cudf::host_udf_base { case groupby_data_attribute::SORTED_GROUPED_VALUES: EXPECT_TRUE(std::holds_alternative(input.at(attr))); break; + case groupby_data_attribute::NUM_GROUPS: + EXPECT_TRUE(std::holds_alternative(input.at(attr))); + break; case groupby_data_attribute::GROUP_OFFSETS: EXPECT_TRUE( std::holds_alternative>(input.at(attr))); @@ -154,7 +158,7 @@ using int32s_col = cudf::test::fixed_width_column_wrapper; // For each test, a subset of data attributes will be randomly generated from all the possible input // data attributes. The input data corresponding to that subset passed from libcudf will be tested // for correctness. -constexpr int NUM_RANDOM_TESTS = 10; +constexpr int NUM_RANDOM_TESTS = 20; struct HostUDFTest : cudf::test::BaseFixture {}; @@ -185,6 +189,7 @@ TEST_F(HostUDFTest, GroupbySomeInput) cudf::host_udf_base::groupby_data_attribute::INPUT_VALUES, cudf::host_udf_base::groupby_data_attribute::GROUPED_VALUES, cudf::host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, + cudf::host_udf_base::groupby_data_attribute::NUM_GROUPS, cudf::host_udf_base::groupby_data_attribute::GROUP_OFFSETS, cudf::host_udf_base::groupby_data_attribute::GROUP_LABELS}; for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { From ff89b77721d45d6ab816af3bb77ca5500f867347 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 13:40:08 -0800 Subject: [PATCH 04/22] Rewrite test Signed-off-by: Nghia Truong --- cpp/tests/groupby/host_udf_tests.cpp | 85 ++++++++++++++++++---------- 1 file changed, 56 insertions(+), 29 deletions(-) diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp index 40276e3a177..9aceb6a95b8 100644 --- a/cpp/tests/groupby/host_udf_tests.cpp +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -28,14 +28,15 @@ namespace { /** * @brief A host-based UDF implementation used for unit tests. */ -template -struct host_udf_test : cudf::host_udf_base { - static_assert(std::is_same_v); - - bool* const test_run; // to check if the test is accidentally skipped +struct host_udf_test_base : cudf::host_udf_base { + int const test_location_line; // the location where testing is called + bool* const test_run; // to check if the test is accidentally skipped data_attributes_set_t const input_attrs; - host_udf_test(bool* test_run_, data_attributes_set_t input_attrs_ = {}) - : test_run{test_run_}, input_attrs(std::move(input_attrs_)) + + host_udf_test_base(int test_location_line_, bool* test_run_, data_attributes_set_t input_attrs_) + : test_location_line{test_location_line_}, + test_run{test_run_}, + input_attrs(std::move(input_attrs_)) { } @@ -47,8 +48,53 @@ struct host_udf_test : cudf::host_udf_base { rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override { - SCOPED_TRACE("Original line of failure: " + std::to_string(test_location_line)); + SCOPED_TRACE("Test instance created at line: " + std::to_string(test_location_line)); + + test_data_attributes(input, stream, mr); + + *test_run = true; // test is run successfully + return get_empty_output(std::nullopt, stream, mr); + } + + [[nodiscard]] output_t get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + // Unused function - dummy output. + return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); + } + + [[nodiscard]] std::size_t do_hash() const override { return 0; } + [[nodiscard]] bool is_equal(host_udf_base const& other) const override { return true; } + + // The main test function, which must be implemented for each kind of aggregations + // (groupby/reduction/segmented_reduction). + virtual void test_data_attributes(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; +}; + +/** + * @brief A host-based UDF implementation used for unit tests for groupby aggregation. + */ +struct host_udf_groupby_test : host_udf_test_base { + host_udf_groupby_test(int test_location_line_, + bool* test_run_, + data_attributes_set_t input_attrs_ = {}) + : host_udf_test_base(test_location_line_, test_run_, std::move(input_attrs_)) + { + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(test_location_line, test_run, input_attrs); + } + void test_data_attributes(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { data_attributes_set_t check_attrs = input_attrs; if (check_attrs.empty()) { check_attrs = data_attributes_set_t{groupby_data_attribute::INPUT_VALUES, @@ -91,24 +137,6 @@ struct host_udf_test : cudf::host_udf_base { EXPECT_TRUE(std::holds_alternative(input.at(attr))); } } - - *test_run = true; // test is run successfully - return get_empty_output(std::nullopt, stream, mr); - } - - [[nodiscard]] output_t get_empty_output( - [[maybe_unused]] std::optional output_dtype, - [[maybe_unused]] rmm::cuda_stream_view stream, - [[maybe_unused]] rmm::device_async_resource_ref mr) const override - { - return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); - } - - [[nodiscard]] std::size_t do_hash() const override { return 0; } - [[nodiscard]] bool is_equal(host_udf_base const& other) const override { return true; } - [[nodiscard]] std::unique_ptr clone() const override - { - return std::make_unique(test_run, input_attrs); } }; @@ -168,7 +196,7 @@ TEST_F(HostUDFTest, GroupbyAllInput) auto const keys = int32s_col{0, 1, 2}; auto const vals = int32s_col{0, 1, 2}; auto agg = cudf::make_host_udf_aggregation( - std::make_unique>(&test_run)); + std::make_unique(__LINE__, &test_run)); std::vector requests; requests.emplace_back(); @@ -197,8 +225,7 @@ TEST_F(HostUDFTest, GroupbySomeInput) auto input_attrs = get_subset(all_attrs); input_attrs.insert(get_random_agg()); auto agg = cudf::make_host_udf_aggregation( - std::make_unique>(&test_run, - std::move(input_attrs))); + std::make_unique(__LINE__, &test_run, std::move(input_attrs))); std::vector requests; requests.emplace_back(); From d1f06c77e77d55fbb38a3649993dc9e032fbd3fc Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 14:34:51 -0800 Subject: [PATCH 05/22] Update docs and rename variable Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation.hpp | 15 +++++++---- cpp/src/groupby/sort/aggregate.cpp | 2 +- cpp/tests/groupby/host_udf_example_tests.cu | 29 +++++++++------------ cpp/tests/groupby/host_udf_tests.cpp | 12 ++++----- 4 files changed, 30 insertions(+), 28 deletions(-) diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index b60d9f321a0..35cf9467b11 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -613,7 +613,10 @@ std::unique_ptr make_udf_aggregation(udf_type type, * @brief The interface for host-based UDF implementation. * * An implementation of host-based UDF needs to be derived from this base class, defining - * its own version of the required functions. + * its own version of the required functions. In particular, the derived class must define the + * following function: `get_empty_output`, `operator()`, `do_hash`, `is_equal` and `clone`. + * The function `get_required_data` can also be optionally overriden to facilitate selective + * access to the input data as well as intermediate data provided by libcudf. */ struct host_udf_base { host_udf_base() = default; @@ -761,8 +764,10 @@ struct host_udf_base { /** * @brief Return a set of attributes for the data that is needed for computing the aggregation. * - * If this function is not overridden, an empty set is returned. That means all the data - * attributes (except results from other aggregations in groupby) will be needed. + * The derived class should return the attributes corresponding to only the data that it needs to + * avoid unnecessary computation performed in libcudf. If this function is not overridden, an + * empty set is returned. That means all the data attributes (except results from other + * aggregations in groupby) will be needed. * * @return A set of `data_attribute` */ @@ -777,7 +782,7 @@ struct host_udf_base { /** * @brief Input to the aggregation, mapping from each data attribute to its actual data. */ - using host_udf_input = std:: + using input_map_t = std:: unordered_map; /** @@ -811,7 +816,7 @@ struct host_udf_base { * @param mr Device memory resource to use for any allocations * @return The output result of the aggregation */ - [[nodiscard]] virtual output_t operator()(host_udf_input const& input, + [[nodiscard]] virtual output_t operator()(input_map_t const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const = 0; diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index 4e7d8265f27..b2bff29e631 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -812,7 +812,7 @@ void aggregate_result_functor::operator()(aggregation con } // Do not cache udf_input, as the actual input data may change from run to run. - host_udf_base::host_udf_input udf_input; + host_udf_base::input_map_t udf_input; for (auto const& attr : data_attrs) { CUDF_EXPECTS(std::holds_alternative(attr.value) || std::holds_alternative>(attr.value), diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu index 83c19b4d022..e03dc03b2e8 100644 --- a/cpp/tests/groupby/host_udf_example_tests.cu +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -62,7 +62,7 @@ struct host_udf_groupby_example : cudf::host_udf_base { cudf::data_type{cudf::type_to_id()}); } - [[nodiscard]] output_t operator()(host_udf_input const& input, + [[nodiscard]] output_t operator()(input_map_t const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override { @@ -95,11 +95,8 @@ struct host_udf_groupby_example : cudf::host_udf_base { // For simplicity, this example only accepts double input and always produces double output. using InputType = double; using OutputType = double; - - template - using MaxType = cudf::detail::target_type_t; - template - using SumType = cudf::detail::target_type_t; + using MaxType = cudf::detail::target_type_t; + using SumType = cudf::detail::target_type_t; template )> output_t operator()(Args...) const @@ -108,7 +105,7 @@ struct host_udf_groupby_example : cudf::host_udf_base { } template )> - output_t operator()(host_udf_input const& input, + output_t operator()(input_map_t const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const { @@ -131,7 +128,8 @@ struct host_udf_groupby_example : cudf::host_udf_base { auto output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, num_groups, cudf::mask_state::UNALLOCATED, - stream); + stream, + mr); rmm::device_uvector validity(num_groups, stream); thrust::transform( @@ -139,24 +137,23 @@ struct host_udf_groupby_example : cudf::host_udf_base { thrust::make_counting_iterator(0), thrust::make_counting_iterator(num_groups), thrust::make_zip_iterator(output->mutable_view().begin(), validity.begin()), - transform_fn{*values_dv_ptr, - offsets, - group_indices, - group_max.begin>(), - group_sum.begin>()}); + transform_fn{*values_dv_ptr, + offsets, + group_indices, + group_max.begin(), + group_sum.begin()}); auto [null_mask, null_count] = cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity<>{}, stream, mr); if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } return output; } - template struct transform_fn { cudf::column_device_view values; cudf::device_span offsets; cudf::device_span group_indices; - MaxType const* group_max; - SumType const* group_sum; + MaxType const* group_max; + SumType const* group_sum; thrust::tuple __device__ operator()(cudf::size_type idx) const { diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp index 9aceb6a95b8..320e0786d9a 100644 --- a/cpp/tests/groupby/host_udf_tests.cpp +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -29,9 +29,9 @@ namespace { * @brief A host-based UDF implementation used for unit tests. */ struct host_udf_test_base : cudf::host_udf_base { - int const test_location_line; // the location where testing is called - bool* const test_run; // to check if the test is accidentally skipped - data_attributes_set_t const input_attrs; + int test_location_line; // the location where testing is called + bool* test_run; // to check if the test is accidentally skipped + data_attributes_set_t input_attrs; host_udf_test_base(int test_location_line_, bool* test_run_, data_attributes_set_t input_attrs_) : test_location_line{test_location_line_}, @@ -44,7 +44,7 @@ struct host_udf_test_base : cudf::host_udf_base { // This is the main testing function, which checks for the correctness of input data. // The rests are just to satisfy the interface. - [[nodiscard]] output_t operator()(host_udf_input const& input, + [[nodiscard]] output_t operator()(input_map_t const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override { @@ -70,7 +70,7 @@ struct host_udf_test_base : cudf::host_udf_base { // The main test function, which must be implemented for each kind of aggregations // (groupby/reduction/segmented_reduction). - virtual void test_data_attributes(host_udf_input const& input, + virtual void test_data_attributes(input_map_t const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const = 0; }; @@ -91,7 +91,7 @@ struct host_udf_groupby_test : host_udf_test_base { return std::make_unique(test_location_line, test_run, input_attrs); } - void test_data_attributes(host_udf_input const& input, + void test_data_attributes(input_map_t const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override { From c9a7463989878da36fe2c69f4792836c211b5da5 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 19:52:16 -0800 Subject: [PATCH 06/22] Instantiate `HostUDFAggregation` from `HostUDFWrapper` Signed-off-by: Nghia Truong --- .../main/java/ai/rapids/cudf/Aggregation.java | 32 ++++++++++++------- java/src/main/native/src/AggregationJni.cpp | 16 ---------- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/Aggregation.java b/java/src/main/java/ai/rapids/cudf/Aggregation.java index 18d1e9b432c..57f293452b6 100644 --- a/java/src/main/java/ai/rapids/cudf/Aggregation.java +++ b/java/src/main/java/ai/rapids/cudf/Aggregation.java @@ -386,17 +386,33 @@ public boolean equals(Object other) { } } + /** + * A wrapper class for native host UDF aggregations. + *

+ * This class is used to store the native handle of a host UDF aggregation and is used as + * a proxy object to compute hash code and compare two host UDF aggregations. + * A new host UDF aggregation implementation must extend this class and override the + * {@code hashCode} and {@code equals} methods for such purposes. + */ + public static abstract class HostUDFWrapper { + public final long udfNativeHandle; + + HostUDFWrapper(long udfNativeHandle) { + this.udfNativeHandle = udfNativeHandle; + } + } + static final class HostUDFAggregation extends Aggregation { - private final long udfNativeHandle; + HostUDFWrapper wrapper; - private HostUDFAggregation(long udfNativeHandle) { + private HostUDFAggregation(HostUDFWrapper wrapper) { super(Kind.HOST_UDF); - this.udfNativeHandle = udfNativeHandle; + this.wrapper = wrapper; } @Override long createNativeInstance() { - return Aggregation.createHostUDFAgg(udfNativeHandle); + return Aggregation.createHostUDFAgg(wrapper.udfNativeHandle); } @Override @@ -409,8 +425,7 @@ public boolean equals(Object other) { if (this == other) { return true; } else if (other instanceof HostUDFAggregation) { - HostUDFAggregation o = (HostUDFAggregation) other; - return Aggregation.areHostUDFsEqual(udfNativeHandle, o.udfNativeHandle); + return wrapper.equals(((HostUDFAggregation) other).wrapper); } return false; } @@ -1035,9 +1050,4 @@ static MergeHistogramAggregation mergeHistogram() { * Create a HOST_UDF aggregation. */ private static native long createHostUDFAgg(long udfNativeHandle); - - /** - * Compare two host UDFs to see if they are equal. - */ - private static native boolean areHostUDFsEqual(long lhsNativeHandle, long rhsNativeHandle); } diff --git a/java/src/main/native/src/AggregationJni.cpp b/java/src/main/native/src/AggregationJni.cpp index e39b91c8f15..86b44b7e628 100644 --- a/java/src/main/native/src/AggregationJni.cpp +++ b/java/src/main/native/src/AggregationJni.cpp @@ -313,20 +313,4 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createHostUDFAgg(JNIEnv* CATCH_STD(env, 0); } -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_areHostUDFsEqual(JNIEnv* env, - jclass class_object, - jlong lhs_native_handle, - jlong rhs_native_handle) -{ - JNI_NULL_CHECK(env, lhs_native_handle, "lhs_native_handle is null", 0); - JNI_NULL_CHECK(env, rhs_native_handle, "rhs_native_handle is null", 0); - try { - cudf::jni::auto_set_device(env); - auto const lhs_udf_ptr = reinterpret_cast(lhs_native_handle); - auto const rhs_udf_ptr = reinterpret_cast(rhs_native_handle); - return lhs_udf_ptr->is_equal(*rhs_udf_ptr); - } - CATCH_STD(env, 0); -} - } // extern "C" From 8bc63437cfe0725fe4ea56c232d74764cb6d2dd6 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 21:04:20 -0800 Subject: [PATCH 07/22] Add `HostUDFWrapper` class Signed-off-by: Nghia Truong --- .../main/java/ai/rapids/cudf/Aggregation.java | 26 +++----------- .../ai/rapids/cudf/GroupByAggregation.java | 6 ++-- .../java/ai/rapids/cudf/HostUDFWrapper.java | 34 +++++++++++++++++++ 3 files changed, 42 insertions(+), 24 deletions(-) create mode 100644 java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java diff --git a/java/src/main/java/ai/rapids/cudf/Aggregation.java b/java/src/main/java/ai/rapids/cudf/Aggregation.java index 57f293452b6..2276b223740 100644 --- a/java/src/main/java/ai/rapids/cudf/Aggregation.java +++ b/java/src/main/java/ai/rapids/cudf/Aggregation.java @@ -386,24 +386,8 @@ public boolean equals(Object other) { } } - /** - * A wrapper class for native host UDF aggregations. - *

- * This class is used to store the native handle of a host UDF aggregation and is used as - * a proxy object to compute hash code and compare two host UDF aggregations. - * A new host UDF aggregation implementation must extend this class and override the - * {@code hashCode} and {@code equals} methods for such purposes. - */ - public static abstract class HostUDFWrapper { - public final long udfNativeHandle; - - HostUDFWrapper(long udfNativeHandle) { - this.udfNativeHandle = udfNativeHandle; - } - } - static final class HostUDFAggregation extends Aggregation { - HostUDFWrapper wrapper; + private final HostUDFWrapper wrapper; private HostUDFAggregation(HostUDFWrapper wrapper) { super(Kind.HOST_UDF); @@ -417,7 +401,7 @@ long createNativeInstance() { @Override public int hashCode() { - return 31 * kind.hashCode(); + return 31 * kind.hashCode() + wrapper.hashCode(); } @Override @@ -885,11 +869,11 @@ static MergeSetsAggregation mergeSets(NullEquality nullEquality, NaNEquality nan /** * Host UDF aggregation, to execute a host-side user-defined function (UDF). - * @param udfNativeHandle Pointer to the native host UDF instance + * @param wrapper The wrapper for the native host UDF instance. * @return A new HostUDFAggregation instance */ - static HostUDFAggregation hostUDF(long udfNativeHandle) { - return new HostUDFAggregation(udfNativeHandle); + static HostUDFAggregation hostUDF(HostUDFWrapper wrapper) { + return new HostUDFAggregation(wrapper); } static final class LeadAggregation extends LeadLagAggregation { diff --git a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java index 0c945a5ed2f..27966ddfdd4 100644 --- a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java @@ -279,11 +279,11 @@ public static GroupByAggregation mergeSets() { /** * Execute an aggregation using a host-side user-defined function (UDF). - * @param udfNativeHandle Pointer to the native host UDF instance + * @param wrapper The wrapper for the native host UDF instance. * @return A new GroupByAggregation instance */ - public static GroupByAggregation hostUDF(long udfNativeHandle) { - return new GroupByAggregation(Aggregation.hostUDF(udfNativeHandle)); + public static GroupByAggregation hostUDF(HostUDFWrapper wrapper) { + return new GroupByAggregation(Aggregation.hostUDF(wrapper)); } /** diff --git a/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java b/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java new file mode 100644 index 00000000000..0b6ecf2e140 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf; + +/** + * A wrapper around native host UDF aggregations. + *

+ * This class is used to store the native handle of a host UDF aggregation and is used as + * a proxy object to compute hash code and compare two host UDF aggregations for equality. + *

+ * A new host UDF aggregation implementation must extend this class and override the + * {@code hashCode} and {@code equals} methods for such purposes. + */ +public abstract class HostUDFWrapper { + public final long udfNativeHandle; + + public HostUDFWrapper(long udfNativeHandle) { + this.udfNativeHandle = udfNativeHandle; + } +} From 6acb5d521940742a71d961dc598e6f33bad19714 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 21:09:06 -0800 Subject: [PATCH 08/22] Fix spell Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index 35cf9467b11..3d497c7a87f 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -615,7 +615,7 @@ std::unique_ptr make_udf_aggregation(udf_type type, * An implementation of host-based UDF needs to be derived from this base class, defining * its own version of the required functions. In particular, the derived class must define the * following function: `get_empty_output`, `operator()`, `do_hash`, `is_equal` and `clone`. - * The function `get_required_data` can also be optionally overriden to facilitate selective + * The function `get_required_data` can also be optionally overridden to facilitate selective * access to the input data as well as intermediate data provided by libcudf. */ struct host_udf_base { From 1a6872829105b7445588039d194cccfc9b3c605b Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 18 Dec 2024 09:49:05 -0800 Subject: [PATCH 09/22] Rename variable Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation.hpp | 4 +-- cpp/tests/groupby/host_udf_example_tests.cu | 2 +- cpp/tests/groupby/host_udf_tests.cpp | 30 ++++++++++----------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index 3d497c7a87f..edd43a4a311 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -758,7 +758,7 @@ struct host_udf_base { /** * @brief Set of attributes for the input data that is needed for computing the aggregation. */ - using data_attributes_set_t = + using data_attribute_set_t = std::unordered_set; /** @@ -771,7 +771,7 @@ struct host_udf_base { * * @return A set of `data_attribute` */ - [[nodiscard]] virtual data_attributes_set_t get_required_data() const { return {}; } + [[nodiscard]] virtual data_attribute_set_t get_required_data() const { return {}; } /** * @brief Hold all possible types of the data that is passed to the derived class for executing diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu index e03dc03b2e8..5768f1c5e25 100644 --- a/cpp/tests/groupby/host_udf_example_tests.cu +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -42,7 +42,7 @@ namespace { struct host_udf_groupby_example : cudf::host_udf_base { host_udf_groupby_example() = default; - [[nodiscard]] data_attributes_set_t get_required_data() const override + [[nodiscard]] data_attribute_set_t get_required_data() const override { // We need grouped values, group offsets, group labels, and also results from groups' // MAX and SUM aggregations. diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp index 320e0786d9a..84b277fcba0 100644 --- a/cpp/tests/groupby/host_udf_tests.cpp +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -31,16 +31,16 @@ namespace { struct host_udf_test_base : cudf::host_udf_base { int test_location_line; // the location where testing is called bool* test_run; // to check if the test is accidentally skipped - data_attributes_set_t input_attrs; + data_attribute_set_t input_attrs; - host_udf_test_base(int test_location_line_, bool* test_run_, data_attributes_set_t input_attrs_) + host_udf_test_base(int test_location_line_, bool* test_run_, data_attribute_set_t input_attrs_) : test_location_line{test_location_line_}, test_run{test_run_}, input_attrs(std::move(input_attrs_)) { } - [[nodiscard]] data_attributes_set_t get_required_data() const override { return input_attrs; } + [[nodiscard]] data_attribute_set_t get_required_data() const override { return input_attrs; } // This is the main testing function, which checks for the correctness of input data. // The rests are just to satisfy the interface. @@ -81,7 +81,7 @@ struct host_udf_test_base : cudf::host_udf_base { struct host_udf_groupby_test : host_udf_test_base { host_udf_groupby_test(int test_location_line_, bool* test_run_, - data_attributes_set_t input_attrs_ = {}) + data_attribute_set_t input_attrs_ = {}) : host_udf_test_base(test_location_line_, test_run_, std::move(input_attrs_)) { } @@ -95,14 +95,14 @@ struct host_udf_groupby_test : host_udf_test_base { rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override { - data_attributes_set_t check_attrs = input_attrs; + data_attribute_set_t check_attrs = input_attrs; if (check_attrs.empty()) { - check_attrs = data_attributes_set_t{groupby_data_attribute::INPUT_VALUES, - groupby_data_attribute::GROUPED_VALUES, - groupby_data_attribute::SORTED_GROUPED_VALUES, - groupby_data_attribute::NUM_GROUPS, - groupby_data_attribute::GROUP_OFFSETS, - groupby_data_attribute::GROUP_LABELS}; + check_attrs = data_attribute_set_t{groupby_data_attribute::INPUT_VALUES, + groupby_data_attribute::GROUPED_VALUES, + groupby_data_attribute::SORTED_GROUPED_VALUES, + groupby_data_attribute::NUM_GROUPS, + groupby_data_attribute::GROUP_OFFSETS, + groupby_data_attribute::GROUP_LABELS}; } EXPECT_EQ(input.size(), check_attrs.size()); for (auto const& attr : check_attrs) { @@ -143,8 +143,8 @@ struct host_udf_groupby_test : host_udf_test_base { /** * @brief Get a random subset of input data attributes. */ -cudf::host_udf_base::data_attributes_set_t get_subset( - cudf::host_udf_base::data_attributes_set_t const& attrs) +cudf::host_udf_base::data_attribute_set_t get_subset( + cudf::host_udf_base::data_attribute_set_t const& attrs) { std::random_device rd; std::mt19937 gen(rd()); @@ -153,7 +153,7 @@ cudf::host_udf_base::data_attributes_set_t get_subset( auto const elements = std::vector(attrs.begin(), attrs.end()); std::uniform_int_distribution idx_distr(0, attrs.size() - 1); - cudf::host_udf_base::data_attributes_set_t output; + cudf::host_udf_base::data_attribute_set_t output; while (output.size() < subset_size) { output.insert(elements[idx_distr(gen)]); } @@ -213,7 +213,7 @@ TEST_F(HostUDFTest, GroupbySomeInput) { auto const keys = int32s_col{0, 1, 2}; auto const vals = int32s_col{0, 1, 2}; - auto const all_attrs = cudf::host_udf_base::data_attributes_set_t{ + auto const all_attrs = cudf::host_udf_base::data_attribute_set_t{ cudf::host_udf_base::groupby_data_attribute::INPUT_VALUES, cudf::host_udf_base::groupby_data_attribute::GROUPED_VALUES, cudf::host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, From e51fc98e852759afaa8b6ae0483dbe6db2d00dc6 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 18 Dec 2024 11:09:12 -0800 Subject: [PATCH 10/22] Make `do_hash` optional and rewrite expression Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation.hpp | 17 +++++++++++------ cpp/src/groupby/sort/aggregate.cpp | 21 +++++++++++---------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index edd43a4a311..08865913128 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -613,10 +613,12 @@ std::unique_ptr make_udf_aggregation(udf_type type, * @brief The interface for host-based UDF implementation. * * An implementation of host-based UDF needs to be derived from this base class, defining - * its own version of the required functions. In particular, the derived class must define the - * following function: `get_empty_output`, `operator()`, `do_hash`, `is_equal` and `clone`. - * The function `get_required_data` can also be optionally overridden to facilitate selective - * access to the input data as well as intermediate data provided by libcudf. + * its own version of the required functions. In particular: + * - The derived class is required to implement `get_empty_output`, `operator()`, `is_equal`, + * and `clone` functions. + * - If necessary, the derived class can also override `do_hash` to compute hashing for its + * instance, and `get_required_data` to selectively access to the input data as well as + * intermediate data provided by libcudf. */ struct host_udf_base { host_udf_base() = default; @@ -821,10 +823,13 @@ struct host_udf_base { rmm::device_async_resource_ref mr) const = 0; /** - * @brief Computes hash value of the derived class's instance. + * @brief Computes hash value of the class's instance. * @return The hash value of the instance */ - [[nodiscard]] virtual std::size_t do_hash() const = 0; + [[nodiscard]] virtual std::size_t do_hash() const + { + return std::hash{}(static_cast(aggregation::Kind::HOST_UDF)); + } /** * @brief Compares two instances of the derived class for equality. diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index b2bff29e631..1cb0e2c56ed 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -800,16 +800,17 @@ void aggregate_result_functor::operator()(aggregation con { if (cache.has_result(values, agg)) { return; } - auto const& udf_ptr = dynamic_cast(agg).udf_ptr; - auto data_attrs = udf_ptr->get_required_data(); - if (data_attrs.empty()) { // empty means everything - data_attrs = {host_udf_base::groupby_data_attribute::INPUT_VALUES, - host_udf_base::groupby_data_attribute::GROUPED_VALUES, - host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, - host_udf_base::groupby_data_attribute::NUM_GROUPS, - host_udf_base::groupby_data_attribute::GROUP_OFFSETS, - host_udf_base::groupby_data_attribute::GROUP_LABELS}; - } + auto const& udf_ptr = dynamic_cast(agg).udf_ptr; + auto const data_attrs = [&]() -> host_udf_base::data_attribute_set_t { + if (auto tmp = udf_ptr->get_required_data(); !tmp.empty()) { return tmp; } + // Empty attribute set means everything. + return {host_udf_base::groupby_data_attribute::INPUT_VALUES, + host_udf_base::groupby_data_attribute::GROUPED_VALUES, + host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, + host_udf_base::groupby_data_attribute::NUM_GROUPS, + host_udf_base::groupby_data_attribute::GROUP_OFFSETS, + host_udf_base::groupby_data_attribute::GROUP_LABELS}; + }(); // Do not cache udf_input, as the actual input data may change from run to run. host_udf_base::input_map_t udf_input; From c5cd36694c7b1cae6a10a1348c5a313f4bcf60ee Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 10:31:14 -0800 Subject: [PATCH 11/22] Extract `host_udf` header Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation.hpp | 241 +-------------- cpp/include/cudf/aggregation/host_udf.hpp | 285 ++++++++++++++++++ .../cudf/detail/aggregation/aggregation.hpp | 1 + cpp/src/groupby/groupby.cu | 2 +- cpp/src/groupby/sort/aggregate.cpp | 1 + cpp/tests/groupby/host_udf_example_tests.cu | 1 + cpp/tests/groupby/host_udf_tests.cpp | 1 + 7 files changed, 292 insertions(+), 240 deletions(-) create mode 100644 cpp/include/cudf/aggregation/host_udf.hpp diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index 08865913128..0f7617999db 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -609,245 +609,8 @@ std::unique_ptr make_udf_aggregation(udf_type type, std::string const& user_defined_aggregator, data_type output_type); -/** - * @brief The interface for host-based UDF implementation. - * - * An implementation of host-based UDF needs to be derived from this base class, defining - * its own version of the required functions. In particular: - * - The derived class is required to implement `get_empty_output`, `operator()`, `is_equal`, - * and `clone` functions. - * - If necessary, the derived class can also override `do_hash` to compute hashing for its - * instance, and `get_required_data` to selectively access to the input data as well as - * intermediate data provided by libcudf. - */ -struct host_udf_base { - host_udf_base() = default; - virtual ~host_udf_base() = default; - - /** - * @brief Define the possible data needed for groupby aggregations. - * - * Note that only sort-based groupby aggregations are supported. - */ - enum class groupby_data_attribute : int32_t { - INPUT_VALUES, ///< The input values column. - GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the - ///< values within each group maintain their original order. - SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and - ///< sorted within each group. - NUM_GROUPS, ///< The number of groups (i.e., number of distinct keys). - GROUP_OFFSETS, ///< The offsets separating groups. - GROUP_LABELS ///< Group labels (which is also the same as group indices). - }; - - /** - * @brief Describe possible data that may be needed in the derived class for its operations. - * - * Such data can be either intermediate data such as sorted values or group labels etc, or the - * results of other aggregations. - * - * Each derived host-based UDF class may need a different set of data. It is inefficient to - * evaluate and pass down all these possible data at once from libcudf. A solution for that is, - * the derived class can define a subset of data that it needs and libcudf will evaluate - * and pass down only data requested from that set. - */ - struct data_attribute { - /** - * @brief Hold all possible data types for the input of the aggregation in the derived class. - */ - using value_type = std::variant>; - value_type value; ///< The actual data attribute, wrapped by this struct - ///< as a wrapper is needed to define `hash` and `equal_to` functors. - - data_attribute() = default; ///< Default constructor - data_attribute(data_attribute&&) = default; ///< Move constructor - - /** - * @brief Construct a new data attribute from an aggregation attribute. - * @param value_ An aggregation attribute - */ - template )> - data_attribute(T value_) : value{value_} - { - } - - /** - * @brief Construct a new data attribute from another aggregation request. - * @param value_ An aggregation request - */ - template || - std::is_same_v)> - data_attribute(std::unique_ptr value_) : value{std::move(value_)} - { - if constexpr (std::is_same_v) { - CUDF_EXPECTS( - dynamic_cast(std::get>(value).get()) != nullptr, - "Requesting results from other aggregations is only supported in groupby " - "aggregations."); - } - CUDF_EXPECTS(std::get>(value) != nullptr, - "Invalid aggregation request."); - } - - /** - * @brief Copy constructor. - * @param other The other data attribute to copy from - */ - data_attribute(data_attribute const& other) - : value{std::visit( - cudf::detail::visitor_overload{ - [](auto const& val) { return value_type{val}; }, - [](std::unique_ptr const& val) { return value_type{val->clone()}; }}, - other.value)} - { - } - - /** - * @brief Hash functor for `data_attribute`. - */ - struct hash { - /** - * @brief Compute the hash value of a data attribute. - * @param attr The data attribute to hash - * @return The hash value of the data attribute - */ - std::size_t operator()(data_attribute const& attr) const - { - auto const& value = attr.value; - auto const hash_value = - std::visit(cudf::detail::visitor_overload{ - [](auto const& val) { return std::hash{}(static_cast(val)); }, - [](std::unique_ptr const& val) { return val->do_hash(); }}, - value); - return std::hash{}(value.index()) ^ hash_value; - } - }; // struct hash - - /** - * @brief Equality comparison functor for `data_attribute`. - */ - struct equal_to { - /** - * @brief Check if two data attributes are equal. - * @param lhs The left-hand side data attribute - * @param rhs The right-hand side data attribute - * @return True if the two data attributes are equal - */ - bool operator()(data_attribute const& lhs, data_attribute const& rhs) const - { - auto const& lhs_val = lhs.value; - auto const& rhs_val = rhs.value; - if (lhs_val.index() != rhs_val.index()) { return false; } - return std::visit(cudf::detail::visitor_overload{ - [](auto const& lhs_val, auto const& rhs_val) { - if constexpr (std::is_same_v) { - return lhs_val == rhs_val; - } else { - return false; - } - }, - [](std::unique_ptr const& lhs_val, - std::unique_ptr const& rhs_val) { - return lhs_val->is_equal(*rhs_val); - }}, - lhs_val, - rhs_val); - } - }; // struct equal_to - }; // struct data_attribute - - /** - * @brief Set of attributes for the input data that is needed for computing the aggregation. - */ - using data_attribute_set_t = - std::unordered_set; - - /** - * @brief Return a set of attributes for the data that is needed for computing the aggregation. - * - * The derived class should return the attributes corresponding to only the data that it needs to - * avoid unnecessary computation performed in libcudf. If this function is not overridden, an - * empty set is returned. That means all the data attributes (except results from other - * aggregations in groupby) will be needed. - * - * @return A set of `data_attribute` - */ - [[nodiscard]] virtual data_attribute_set_t get_required_data() const { return {}; } - - /** - * @brief Hold all possible types of the data that is passed to the derived class for executing - * the aggregation. - */ - using input_data_t = std::variant>; - - /** - * @brief Input to the aggregation, mapping from each data attribute to its actual data. - */ - using input_map_t = std:: - unordered_map; - - /** - * @brief Output type of the aggregation. - * - * Currently only a single type is supported as the output of the aggregation, but it will hold - * more type in the future when reduction is supported. - */ - using output_t = std::variant>; - - /** - * @brief Get the output when the input values column is empty. - * - * This is called in libcudf when the input values column is empty. In such situations libcudf - * tries to generate the output directly without unnecessarily evaluating the intermediate data. - * - * @param output_dtype The expected output data type - * @param stream The CUDA stream to use for any kernel launches - * @param mr Device memory resource to use for any allocations - * @return The output result of the aggregation when input values is empty - */ - [[nodiscard]] virtual output_t get_empty_output(std::optional output_dtype, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const = 0; - - /** - * @brief Perform the main computation for the host-based UDF. - * - * @param input The input data needed for performing all computation - * @param stream The CUDA stream to use for any kernel launches - * @param mr Device memory resource to use for any allocations - * @return The output result of the aggregation - */ - [[nodiscard]] virtual output_t operator()(input_map_t const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const = 0; - - /** - * @brief Computes hash value of the class's instance. - * @return The hash value of the instance - */ - [[nodiscard]] virtual std::size_t do_hash() const - { - return std::hash{}(static_cast(aggregation::Kind::HOST_UDF)); - } - - /** - * @brief Compares two instances of the derived class for equality. - * @param other The other derived class's instance to compare with - * @return True if the two instances are equal - */ - [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0; - - /** - * @brief Clones the instance. - * - * A class derived from `host_udf_base` should not store too much data such that its instances - * remain lightweight for efficient cloning. - * - * @return A new instance cloned from this - */ - [[nodiscard]] virtual std::unique_ptr clone() const = 0; -}; +// Forward declaration of `host_udf_base`. +struct host_udf_base; /** * @brief Factory to create a HOST_UDF aggregation diff --git a/cpp/include/cudf/aggregation/host_udf.hpp b/cpp/include/cudf/aggregation/host_udf.hpp new file mode 100644 index 00000000000..34c11b74c3a --- /dev/null +++ b/cpp/include/cudf/aggregation/host_udf.hpp @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +/** + * @file host_udf.hpp + * @brief Declare the base class for host-side user-defined function (`HOST_UDF`) and example of + * `HOST_UDF` aggregation usage. + */ + +namespace CUDF_EXPORT cudf { +/** + * @addtogroup aggregation_factories + * @{ + */ + +/** + * @brief The interface for host-based UDF implementation. + * + * An implementation of host-based UDF needs to be derived from this base class, defining + * its own version of the required functions. In particular: + * - The derived class is required to implement `get_empty_output`, `operator()`, `is_equal`, + * and `clone` functions. + * - If necessary, the derived class can also override `do_hash` to compute hashing for its + * instance, and `get_required_data` to selectively access to the input data as well as + * intermediate data provided by libcudf. + */ +struct host_udf_base { + host_udf_base() = default; + virtual ~host_udf_base() = default; + + /** + * @brief Define the possible data needed for groupby aggregations. + * + * Note that only sort-based groupby aggregations are supported. + */ + enum class groupby_data_attribute : int32_t { + INPUT_VALUES, ///< The input values column. + GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the + ///< values within each group maintain their original order. + SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and + ///< sorted within each group. + NUM_GROUPS, ///< The number of groups (i.e., number of distinct keys). + GROUP_OFFSETS, ///< The offsets separating groups. + GROUP_LABELS ///< Group labels (which is also the same as group indices). + }; + + /** + * @brief Describe possible data that may be needed in the derived class for its operations. + * + * Such data can be either intermediate data such as sorted values or group labels etc, or the + * results of other aggregations. + * + * Each derived host-based UDF class may need a different set of data. It is inefficient to + * evaluate and pass down all these possible data at once from libcudf. A solution for that is, + * the derived class can define a subset of data that it needs and libcudf will evaluate + * and pass down only data requested from that set. + */ + struct data_attribute { + /** + * @brief Hold all possible data types for the input of the aggregation in the derived class. + */ + using value_type = std::variant>; + value_type value; ///< The actual data attribute, wrapped by this struct + ///< as a wrapper is needed to define `hash` and `equal_to` functors. + + data_attribute() = default; ///< Default constructor + data_attribute(data_attribute&&) = default; ///< Move constructor + + /** + * @brief Construct a new data attribute from an aggregation attribute. + * @param value_ An aggregation attribute + */ + template )> + data_attribute(T value_) : value{value_} + { + } + + /** + * @brief Construct a new data attribute from another aggregation request. + * @param value_ An aggregation request + */ + template || + std::is_same_v)> + data_attribute(std::unique_ptr value_) : value{std::move(value_)} + { + if constexpr (std::is_same_v) { + CUDF_EXPECTS( + dynamic_cast(std::get>(value).get()) != nullptr, + "Requesting results from other aggregations is only supported in groupby " + "aggregations."); + } + CUDF_EXPECTS(std::get>(value) != nullptr, + "Invalid aggregation request."); + } + + /** + * @brief Copy constructor. + * @param other The other data attribute to copy from + */ + data_attribute(data_attribute const& other) + : value{std::visit( + cudf::detail::visitor_overload{ + [](auto const& val) { return value_type{val}; }, + [](std::unique_ptr const& val) { return value_type{val->clone()}; }}, + other.value)} + { + } + + /** + * @brief Hash functor for `data_attribute`. + */ + struct hash { + /** + * @brief Compute the hash value of a data attribute. + * @param attr The data attribute to hash + * @return The hash value of the data attribute + */ + std::size_t operator()(data_attribute const& attr) const + { + auto const hash_value = + std::visit(cudf::detail::visitor_overload{ + [](auto const& val) { return std::hash{}(static_cast(val)); }, + [](std::unique_ptr const& val) { return val->do_hash(); }}, + attr.value); + return std::hash{}(attr.value.index()) ^ hash_value; + } + }; // struct hash + + /** + * @brief Equality comparison functor for `data_attribute`. + */ + struct equal_to { + /** + * @brief Check if two data attributes are equal. + * @param lhs The left-hand side data attribute + * @param rhs The right-hand side data attribute + * @return True if the two data attributes are equal + */ + bool operator()(data_attribute const& lhs, data_attribute const& rhs) const + { + auto const& lhs_val = lhs.value; + auto const& rhs_val = rhs.value; + if (lhs_val.index() != rhs_val.index()) { return false; } + return std::visit(cudf::detail::visitor_overload{ + [](auto const& lhs_val, auto const& rhs_val) { + if constexpr (std::is_same_v) { + return lhs_val == rhs_val; + } else { + return false; + } + }, + [](std::unique_ptr const& lhs_val, + std::unique_ptr const& rhs_val) { + return lhs_val->is_equal(*rhs_val); + }}, + lhs_val, + rhs_val); + } + }; // struct equal_to + }; // struct data_attribute + + /** + * @brief Set of attributes for the input data that is needed for computing the aggregation. + */ + using data_attribute_set_t = + std::unordered_set; + + /** + * @brief Return a set of attributes for the data that is needed for computing the aggregation. + * + * The derived class should return the attributes corresponding to only the data that it needs to + * avoid unnecessary computation performed in libcudf. If this function is not overridden, an + * empty set is returned. That means all the data attributes (except results from other + * aggregations in groupby) will be needed. + * + * @return A set of `data_attribute` + */ + [[nodiscard]] virtual data_attribute_set_t get_required_data() const { return {}; } + + /** + * @brief Hold all possible types of the data that is passed to the derived class for executing + * the aggregation. + */ + using input_data_t = std::variant>; + + /** + * @brief Input to the aggregation, mapping from each data attribute to its actual data. + */ + using input_map_t = std:: + unordered_map; + + /** + * @brief Output type of the aggregation. + * + * Currently only a single type is supported as the output of the aggregation, but it will hold + * more type in the future when reduction is supported. + */ + using output_t = std::variant>; + + /** + * @brief Get the output when the input values column is empty. + * + * This is called in libcudf when the input values column is empty. In such situations libcudf + * tries to generate the output directly without unnecessarily evaluating the intermediate data. + * + * @param output_dtype The expected output data type + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation when input values is empty + */ + [[nodiscard]] virtual output_t get_empty_output(std::optional output_dtype, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; + + /** + * @brief Perform the main computation for the host-based UDF. + * + * @param input The input data needed for performing all computation + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation + */ + [[nodiscard]] virtual output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; + + /** + * @brief Computes hash value of the class's instance. + * @return The hash value of the instance + */ + [[nodiscard]] virtual std::size_t do_hash() const + { + return std::hash{}(static_cast(aggregation::Kind::HOST_UDF)); + } + + /** + * @brief Compares two instances of the derived class for equality. + * @param other The other derived class's instance to compare with + * @return True if the two instances are equal + */ + [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0; + + /** + * @brief Clones the instance. + * + * A class derived from `host_udf_base` should not store too much data such that its instances + * remain lightweight for efficient cloning. + * + * @return A new instance cloned from this + */ + [[nodiscard]] virtual std::unique_ptr clone() const = 0; +}; + +/** @} */ // end of group +} // namespace CUDF_EXPORT cudf diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index df8f1a395d7..f6ac0adf5f7 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu index 104cbe110db..4c90cd0eef5 100644 --- a/cpp/src/groupby/groupby.cu +++ b/cpp/src/groupby/groupby.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -32,7 +33,6 @@ #include #include #include -#include #include #include #include diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index 1cb0e2c56ed..e9f885a5917 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -19,6 +19,7 @@ #include "groupby/sort/group_reductions.hpp" #include +#include #include #include #include diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu index 5768f1c5e25..13b6c611ca4 100644 --- a/cpp/tests/groupby/host_udf_example_tests.cu +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -17,6 +17,7 @@ #include #include +#include #include #include #include diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp index 84b277fcba0..1a0f68c0c6c 100644 --- a/cpp/tests/groupby/host_udf_tests.cpp +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include From 8229e5976e8560b99c316f859007a9878b4e9e35 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 11:02:04 -0800 Subject: [PATCH 12/22] Add example into doxygen Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation/host_udf.hpp | 47 ++++++++++++++++++++++- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/cpp/include/cudf/aggregation/host_udf.hpp b/cpp/include/cudf/aggregation/host_udf.hpp index 34c11b74c3a..128c02f2627 100644 --- a/cpp/include/cudf/aggregation/host_udf.hpp +++ b/cpp/include/cudf/aggregation/host_udf.hpp @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -33,7 +34,7 @@ /** * @file host_udf.hpp * @brief Declare the base class for host-side user-defined function (`HOST_UDF`) and example of - * `HOST_UDF` aggregation usage. + * subclass implementation. */ namespace CUDF_EXPORT cudf { @@ -52,6 +53,48 @@ namespace CUDF_EXPORT cudf { * - If necessary, the derived class can also override `do_hash` to compute hashing for its * instance, and `get_required_data` to selectively access to the input data as well as * intermediate data provided by libcudf. + * + * Example of such implementation: + * @code{.cpp} + * struct my_udf_aggregation : cudf::host_udf_base { + * my_udf_aggregation() = default; + * + * // This UDF aggregation needs `GROUPED_VALUES` and `GROUP_OFFSETS`, + * // and the result from groupby `MAX` aggregation. + * [[nodiscard]] data_attribute_set_t get_required_data() const override { + * return {groupby_data_attribute::GROUPED_VALUES, + * groupby_data_attribute::GROUP_OFFSETS, + * cudf::make_max_aggregation()}; + * } + * + * [[nodiscard]] output_t get_empty_output( + * [[maybe_unused]] std::optional output_dtype, + * [[maybe_unused]] rmm::cuda_stream_view stream, + * [[maybe_unused]] rmm::device_async_resource_ref mr) const override + * { + * // This UDF aggregation always returns a column of type INT32. + * return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); + * } + * + * [[nodiscard]] output_t operator()(input_map_t const& input, + * rmm::cuda_stream_view stream, + * rmm::device_async_resource_ref mr) const override + * { + * // Perform UDF computation using the input data and return the result. + * } + * + * [[nodiscard]] bool is_equal(host_udf_base const& other) const override + * { + * // Check if the other object is also instance of this class. + * return dynamic_cast(&other) != nullptr; + * } + * + * [[nodiscard]] std::unique_ptr clone() const override + * { + * return std::make_unique(); + * } + * }; + * @endcode */ struct host_udf_base { host_udf_base() = default; @@ -187,7 +230,7 @@ struct host_udf_base { rhs_val); } }; // struct equal_to - }; // struct data_attribute + }; // struct data_attribute /** * @brief Set of attributes for the input data that is needed for computing the aggregation. From c71452a7c47567cfe271c898337456ac3bdc5d44 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 11:05:50 -0800 Subject: [PATCH 13/22] Cleanup headers and rewrite comments Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation.hpp | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index 0f7617999db..a1b7db5e08a 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -16,20 +16,11 @@ #pragma once -#include #include #include -#include -#include - -#include #include #include -#include -#include -#include -#include #include /** @@ -609,11 +600,11 @@ std::unique_ptr make_udf_aggregation(udf_type type, std::string const& user_defined_aggregator, data_type output_type); -// Forward declaration of `host_udf_base`. +// Forward declaration of `host_udf_base` for the factory function of `HOST_UDF` aggregation. struct host_udf_base; /** - * @brief Factory to create a HOST_UDF aggregation + * @brief Factory to create a HOST_UDF aggregation. * * @param host_udf An instance of a class derived from `host_udf_base` to perform aggregation * @return A HOST_UDF aggregation object From 4b854b05bbff88c87b144668d9aa5f2e3670d18c Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 11:47:33 -0800 Subject: [PATCH 14/22] Remove `host_udf.hpp` dependency from `detail/aggregation/aggregation.hpp` Signed-off-by: Nghia Truong --- cpp/CMakeLists.txt | 3 +- .../cudf/detail/aggregation/aggregation.hpp | 29 +++------ cpp/src/aggregation/aggregation.cpp | 10 ---- cpp/src/groupby/sort/host_udf_aggregation.cpp | 60 +++++++++++++++++++ 4 files changed, 70 insertions(+), 32 deletions(-) create mode 100644 cpp/src/groupby/sort/host_udf_aggregation.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9cbacee8e8d..8c6cd922747 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -446,7 +446,6 @@ add_library( src/groupby/sort/group_quantiles.cu src/groupby/sort/group_std.cu src/groupby/sort/group_sum.cu - src/groupby/sort/scan.cpp src/groupby/sort/group_count_scan.cu src/groupby/sort/group_max_scan.cu src/groupby/sort/group_min_scan.cu @@ -454,6 +453,8 @@ add_library( src/groupby/sort/group_rank_scan.cu src/groupby/sort/group_replace_nulls.cu src/groupby/sort/group_sum_scan.cu + src/groupby/sort/host_udf_aggregation.cpp + src/groupby/sort/scan.cpp src/groupby/sort/sort_helper.cu src/hash/md5_hash.cu src/hash/murmurhash3_x86_32.cu diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index f6ac0adf5f7..d873e93bd20 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -17,7 +17,6 @@ #pragma once #include -#include #include #include #include @@ -966,7 +965,7 @@ class udf_aggregation final : public rolling_aggregation { }; /** - * @brief Derived class for specifying a custom aggregation specified in host-based UDF. + * @brief Derived class for specifying host-based UDF aggregation. */ class host_udf_aggregation final : public groupby_aggregation { public: @@ -975,28 +974,16 @@ class host_udf_aggregation final : public groupby_aggregation { host_udf_aggregation() = delete; host_udf_aggregation(host_udf_aggregation const&) = delete; - explicit host_udf_aggregation(std::unique_ptr udf_ptr_) - : aggregation{HOST_UDF}, udf_ptr{std::move(udf_ptr_)} - { - CUDF_EXPECTS(udf_ptr != nullptr, "Invalid host-based UDF instance."); - } + // Need to define the constructor and destructor in a separate source file where we have the + // complete declaration of `host_udf_base`. + explicit host_udf_aggregation(std::unique_ptr udf_ptr_); + ~host_udf_aggregation() override; - [[nodiscard]] bool is_equal(aggregation const& _other) const override - { - if (!this->aggregation::is_equal(_other)) { return false; } - auto const& other = dynamic_cast(_other); - return udf_ptr->is_equal(*other.udf_ptr); - } + [[nodiscard]] bool is_equal(aggregation const& _other) const override; - [[nodiscard]] size_t do_hash() const override - { - return this->aggregation::do_hash() ^ udf_ptr->do_hash(); - } + [[nodiscard]] size_t do_hash() const override; - [[nodiscard]] std::unique_ptr clone() const override - { - return std::make_unique(udf_ptr->clone()); - } + [[nodiscard]] std::unique_ptr clone() const override; std::vector> get_simple_aggregations( data_type col_type, simple_aggregations_collector& collector) const override diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index bbccd40f24e..0d4400b891b 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -928,16 +928,6 @@ make_merge_tdigest_aggregation(int max_centroids); template CUDF_EXPORT std::unique_ptr make_merge_tdigest_aggregation(int max_centroids); -template -std::unique_ptr make_host_udf_aggregation(std::unique_ptr udf_ptr_) -{ - return std::make_unique(std::move(udf_ptr_)); -} -template CUDF_EXPORT std::unique_ptr make_host_udf_aggregation( - std::unique_ptr); -template CUDF_EXPORT std::unique_ptr - make_host_udf_aggregation(std::unique_ptr); - namespace detail { namespace { struct target_type_functor { diff --git a/cpp/src/groupby/sort/host_udf_aggregation.cpp b/cpp/src/groupby/sort/host_udf_aggregation.cpp new file mode 100644 index 00000000000..7d0f37646b9 --- /dev/null +++ b/cpp/src/groupby/sort/host_udf_aggregation.cpp @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace cudf { +namespace detail { + +host_udf_aggregation::host_udf_aggregation(std::unique_ptr udf_ptr_) + : aggregation{HOST_UDF}, udf_ptr{std::move(udf_ptr_)} +{ + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid host_udf_base instance."); +} + +host_udf_aggregation::~host_udf_aggregation() = default; + +bool host_udf_aggregation::is_equal(aggregation const& _other) const +{ + if (!this->aggregation::is_equal(_other)) { return false; } + auto const& other = dynamic_cast(_other); + return udf_ptr->is_equal(*other.udf_ptr); +} + +size_t host_udf_aggregation::do_hash() const +{ + return this->aggregation::do_hash() ^ udf_ptr->do_hash(); +} + +std::unique_ptr host_udf_aggregation::clone() const +{ + return std::make_unique(udf_ptr->clone()); +} + +} // namespace detail + +template +std::unique_ptr make_host_udf_aggregation(std::unique_ptr udf_ptr_) +{ + return std::make_unique(std::move(udf_ptr_)); +} +template CUDF_EXPORT std::unique_ptr make_host_udf_aggregation( + std::unique_ptr); +template CUDF_EXPORT std::unique_ptr + make_host_udf_aggregation(std::unique_ptr); + +} // namespace cudf From 757d3eb2ff37d8e7c5ceff36525bebe3e0926530 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 12:17:04 -0800 Subject: [PATCH 15/22] Extract more code to source file Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation/host_udf.hpp | 45 +++---------------- cpp/src/groupby/sort/host_udf_aggregation.cpp | 43 ++++++++++++++++++ 2 files changed, 48 insertions(+), 40 deletions(-) diff --git a/cpp/include/cudf/aggregation/host_udf.hpp b/cpp/include/cudf/aggregation/host_udf.hpp index 128c02f2627..53a2171fa54 100644 --- a/cpp/include/cudf/aggregation/host_udf.hpp +++ b/cpp/include/cudf/aggregation/host_udf.hpp @@ -17,7 +17,6 @@ #pragma once #include -#include #include #include #include @@ -156,28 +155,21 @@ struct host_udf_base { std::is_same_v)> data_attribute(std::unique_ptr value_) : value{std::move(value_)} { + CUDF_EXPECTS(std::get>(value) != nullptr, + "Invalid aggregation request."); if constexpr (std::is_same_v) { CUDF_EXPECTS( dynamic_cast(std::get>(value).get()) != nullptr, "Requesting results from other aggregations is only supported in groupby " "aggregations."); } - CUDF_EXPECTS(std::get>(value) != nullptr, - "Invalid aggregation request."); } /** * @brief Copy constructor. * @param other The other data attribute to copy from */ - data_attribute(data_attribute const& other) - : value{std::visit( - cudf::detail::visitor_overload{ - [](auto const& val) { return value_type{val}; }, - [](std::unique_ptr const& val) { return value_type{val->clone()}; }}, - other.value)} - { - } + data_attribute(data_attribute const& other); /** * @brief Hash functor for `data_attribute`. @@ -188,15 +180,7 @@ struct host_udf_base { * @param attr The data attribute to hash * @return The hash value of the data attribute */ - std::size_t operator()(data_attribute const& attr) const - { - auto const hash_value = - std::visit(cudf::detail::visitor_overload{ - [](auto const& val) { return std::hash{}(static_cast(val)); }, - [](std::unique_ptr const& val) { return val->do_hash(); }}, - attr.value); - return std::hash{}(attr.value.index()) ^ hash_value; - } + std::size_t operator()(data_attribute const& attr) const; }; // struct hash /** @@ -209,26 +193,7 @@ struct host_udf_base { * @param rhs The right-hand side data attribute * @return True if the two data attributes are equal */ - bool operator()(data_attribute const& lhs, data_attribute const& rhs) const - { - auto const& lhs_val = lhs.value; - auto const& rhs_val = rhs.value; - if (lhs_val.index() != rhs_val.index()) { return false; } - return std::visit(cudf::detail::visitor_overload{ - [](auto const& lhs_val, auto const& rhs_val) { - if constexpr (std::is_same_v) { - return lhs_val == rhs_val; - } else { - return false; - } - }, - [](std::unique_ptr const& lhs_val, - std::unique_ptr const& rhs_val) { - return lhs_val->is_equal(*rhs_val); - }}, - lhs_val, - rhs_val); - } + bool operator()(data_attribute const& lhs, data_attribute const& rhs) const; }; // struct equal_to }; // struct data_attribute diff --git a/cpp/src/groupby/sort/host_udf_aggregation.cpp b/cpp/src/groupby/sort/host_udf_aggregation.cpp index 7d0f37646b9..0da47e17f48 100644 --- a/cpp/src/groupby/sort/host_udf_aggregation.cpp +++ b/cpp/src/groupby/sort/host_udf_aggregation.cpp @@ -16,8 +16,51 @@ #include #include +#include namespace cudf { + +host_udf_base::data_attribute::data_attribute(data_attribute const& other) + : value{std::visit(cudf::detail::visitor_overload{[](auto const& val) { return value_type{val}; }, + [](std::unique_ptr const& val) { + return value_type{val->clone()}; + }}, + other.value)} +{ +} + +std::size_t host_udf_base::data_attribute::hash::operator()(data_attribute const& attr) const +{ + auto const hash_value = + std::visit(cudf::detail::visitor_overload{ + [](auto const& val) { return std::hash{}(static_cast(val)); }, + [](std::unique_ptr const& val) { return val->do_hash(); }}, + attr.value); + return std::hash{}(attr.value.index()) ^ hash_value; +} + +bool host_udf_base::data_attribute::equal_to::operator()(data_attribute const& lhs, + data_attribute const& rhs) const +{ + auto const& lhs_val = lhs.value; + auto const& rhs_val = rhs.value; + if (lhs_val.index() != rhs_val.index()) { return false; } + return std::visit( + cudf::detail::visitor_overload{ + [](auto const& lhs_val, auto const& rhs_val) { + if constexpr (std::is_same_v) { + return lhs_val == rhs_val; + } else { + return false; + } + }, + [](std::unique_ptr const& lhs_val, std::unique_ptr const& rhs_val) { + return lhs_val->is_equal(*rhs_val); + }}, + lhs_val, + rhs_val); +} + namespace detail { host_udf_aggregation::host_udf_aggregation(std::unique_ptr udf_ptr_) From 4a70a379f65583c681349fbac820dc2355426e88 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 12:23:44 -0800 Subject: [PATCH 16/22] Use `make_empty_lists_column` Signed-off-by: Nghia Truong --- cpp/src/groupby/groupby.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu index 4c90cd0eef5..07fb65f4bee 100644 --- a/cpp/src/groupby/groupby.cu +++ b/cpp/src/groupby/groupby.cu @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -109,8 +110,7 @@ struct empty_column_constructor { using namespace cudf::detail; if constexpr (k == aggregation::Kind::COLLECT_LIST || k == aggregation::Kind::COLLECT_SET) { - return make_lists_column( - 0, make_empty_column(type_to_id()), empty_like(values), 0, {}, stream, mr); + return cudf::lists::detail::make_empty_lists_column(values.type(), stream, mr); } if constexpr (k == aggregation::Kind::HISTOGRAM) { From cce6e0a1c772a242222d493ffb46761f49f3bcbe Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 12:24:18 -0800 Subject: [PATCH 17/22] Revert "Use `make_empty_lists_column`" This reverts commit 4a70a379f65583c681349fbac820dc2355426e88. --- cpp/src/groupby/groupby.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu index 07fb65f4bee..4c90cd0eef5 100644 --- a/cpp/src/groupby/groupby.cu +++ b/cpp/src/groupby/groupby.cu @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -110,7 +109,8 @@ struct empty_column_constructor { using namespace cudf::detail; if constexpr (k == aggregation::Kind::COLLECT_LIST || k == aggregation::Kind::COLLECT_SET) { - return cudf::lists::detail::make_empty_lists_column(values.type(), stream, mr); + return make_lists_column( + 0, make_empty_column(type_to_id()), empty_like(values), 0, {}, stream, mr); } if constexpr (k == aggregation::Kind::HISTOGRAM) { From 322fb2543cf95473e5a4085b0d8e457921258f47 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 14:04:15 -0800 Subject: [PATCH 18/22] Reformat Signed-off-by: Nghia Truong --- cpp/include/cudf/aggregation/host_udf.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/include/cudf/aggregation/host_udf.hpp b/cpp/include/cudf/aggregation/host_udf.hpp index 53a2171fa54..bbce76dc5f3 100644 --- a/cpp/include/cudf/aggregation/host_udf.hpp +++ b/cpp/include/cudf/aggregation/host_udf.hpp @@ -60,7 +60,8 @@ namespace CUDF_EXPORT cudf { * * // This UDF aggregation needs `GROUPED_VALUES` and `GROUP_OFFSETS`, * // and the result from groupby `MAX` aggregation. - * [[nodiscard]] data_attribute_set_t get_required_data() const override { + * [[nodiscard]] data_attribute_set_t get_required_data() const override + * { * return {groupby_data_attribute::GROUPED_VALUES, * groupby_data_attribute::GROUP_OFFSETS, * cudf::make_max_aggregation()}; From 7e1644091322dc2a87c26e28e37fc4ccd9faa4fd Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 14:32:33 -0800 Subject: [PATCH 19/22] Avoid using anything from `cudf::detail::` in the example Signed-off-by: Nghia Truong --- cpp/tests/groupby/host_udf_example_tests.cu | 40 ++++++++++++--------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu index 13b6c611ca4..0ec424d1cc1 100644 --- a/cpp/tests/groupby/host_udf_example_tests.cu +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -20,8 +20,8 @@ #include #include #include +#include #include -#include #include #include #include @@ -96,8 +96,6 @@ struct host_udf_groupby_example : cudf::host_udf_base { // For simplicity, this example only accepts double input and always produces double output. using InputType = double; using OutputType = double; - using MaxType = cudf::detail::target_type_t; - using SumType = cudf::detail::target_type_t; template )> output_t operator()(Args...) const @@ -131,36 +129,44 @@ struct host_udf_groupby_example : cudf::host_udf_base { cudf::mask_state::UNALLOCATED, stream, mr); - rmm::device_uvector validity(num_groups, stream); + + // Store row index if it is valid, otherwise store a negative value denoting a null row. + rmm::device_uvector valid_idx(num_groups, stream); thrust::transform( rmm::exec_policy(stream), thrust::make_counting_iterator(0), thrust::make_counting_iterator(num_groups), - thrust::make_zip_iterator(output->mutable_view().begin(), validity.begin()), + thrust::make_zip_iterator(output->mutable_view().begin(), valid_idx.begin()), transform_fn{*values_dv_ptr, offsets, group_indices, - group_max.begin(), - group_sum.begin()}); - auto [null_mask, null_count] = - cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity<>{}, stream, mr); - if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } - return output; + group_max.begin(), + group_sum.begin()}); + + auto const valid_idx_cv = cudf::column_view{ + cudf::data_type{cudf::type_id::INT32}, num_groups, valid_idx.begin(), nullptr, 0}; + return std::move(cudf::gather(cudf::table_view{{output->view()}}, + valid_idx_cv, + cudf::out_of_bounds_policy::NULLIFY, + stream, + mr) + ->release() + .front()); } struct transform_fn { cudf::column_device_view values; cudf::device_span offsets; cudf::device_span group_indices; - MaxType const* group_max; - SumType const* group_sum; + InputType const* group_max; + InputType const* group_sum; - thrust::tuple __device__ operator()(cudf::size_type idx) const + thrust::tuple __device__ operator()(cudf::size_type idx) const { auto const start = offsets[idx]; auto const end = offsets[idx + 1]; - if (start == end) { return {OutputType{0}, false}; } + if (start == end) { return {OutputType{0}, -1}; } auto sum_sqr = OutputType{0}; bool has_valid{false}; @@ -171,10 +177,10 @@ struct host_udf_groupby_example : cudf::host_udf_base { sum_sqr += val * val; } - if (!has_valid) { return {OutputType{0}, false}; } + if (!has_valid) { return {OutputType{0}, -1}; } return {static_cast(group_indices[start] + 1) * sum_sqr - static_cast(group_max[idx]) * static_cast(group_sum[idx]), - true}; + idx}; } }; }; From 5131fd0a6ecdc48a7479200e35a8e16fd7a6070b Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 14:45:10 -0800 Subject: [PATCH 20/22] Add `const` Signed-off-by: Nghia Truong --- cpp/tests/groupby/host_udf_example_tests.cu | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu index 0ec424d1cc1..6f2affd2cad 100644 --- a/cpp/tests/groupby/host_udf_example_tests.cu +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -124,11 +124,11 @@ struct host_udf_groupby_example : cudf::host_udf_base { input.at(cudf::make_sum_aggregation())); auto const values_dv_ptr = cudf::column_device_view::create(values, stream); - auto output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, - num_groups, - cudf::mask_state::UNALLOCATED, - stream, - mr); + auto const output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, + num_groups, + cudf::mask_state::UNALLOCATED, + stream, + mr); // Store row index if it is valid, otherwise store a negative value denoting a null row. rmm::device_uvector valid_idx(num_groups, stream); From e59c5cd4bafe6642de19b8ccd1099b3203cb46b3 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 16:14:59 -0800 Subject: [PATCH 21/22] Fix test Signed-off-by: Nghia Truong --- cpp/tests/groupby/host_udf_example_tests.cu | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu index 6f2affd2cad..a454bd692fc 100644 --- a/cpp/tests/groupby/host_udf_example_tests.cu +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -166,7 +167,9 @@ struct host_udf_groupby_example : cudf::host_udf_base { { auto const start = offsets[idx]; auto const end = offsets[idx + 1]; - if (start == end) { return {OutputType{0}, -1}; } + + auto constexpr invalid_idx = cuda::std::numeric_limits::lowest(); + if (start == end) { return {OutputType{0}, invalid_idx}; } auto sum_sqr = OutputType{0}; bool has_valid{false}; @@ -177,7 +180,7 @@ struct host_udf_groupby_example : cudf::host_udf_base { sum_sqr += val * val; } - if (!has_valid) { return {OutputType{0}, -1}; } + if (!has_valid) { return {OutputType{0}, invalid_idx}; } return {static_cast(group_indices[start] + 1) * sum_sqr - static_cast(group_max[idx]) * static_cast(group_sum[idx]), idx}; From 9741ee4a0efa85be12b326719bd61777de8dc2c0 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 19 Dec 2024 17:30:22 -0800 Subject: [PATCH 22/22] Fix Java build Signed-off-by: Nghia Truong --- java/src/main/native/src/AggregationJni.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/java/src/main/native/src/AggregationJni.cpp b/java/src/main/native/src/AggregationJni.cpp index 86b44b7e628..dd41c677761 100644 --- a/java/src/main/native/src/AggregationJni.cpp +++ b/java/src/main/native/src/AggregationJni.cpp @@ -17,6 +17,7 @@ #include "cudf_jni_apis.hpp" #include +#include extern "C" {