Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement HOST_UDF aggregation for reduction and groupby #17249

Draft
wants to merge 68 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
bba150c
Implement host udf aggregation
ttnghia Nov 5, 2024
04e2bda
Add test
ttnghia Nov 5, 2024
5f7ab2b
Change example to compute aggregation on each group
ttnghia Nov 5, 2024
7c9316a
Merge branch 'branch-25.02' into host_udf
ttnghia Nov 19, 2024
57674e1
Add `host_udf_base` class
ttnghia Nov 19, 2024
47c7a7c
Rename variable
ttnghia Nov 19, 2024
5e6017a
Rewrite docs
ttnghia Nov 21, 2024
174678f
Implement `host_udf_aggregation`
ttnghia Nov 21, 2024
cee28f6
Change the `host_udf_base` interface
ttnghia Nov 21, 2024
0da4988
Remove `target_type_impl` for `HOST_UDF`
ttnghia Nov 21, 2024
c9c9ee6
Rewrite comments
ttnghia Nov 21, 2024
227016b
Construct empty output when the input is empty
ttnghia Nov 21, 2024
15732cf
Implement `HOST_UDF` for reduction
ttnghia Nov 21, 2024
ee28be8
Implement `HOST_UDF` for segmented reduction
ttnghia Nov 21, 2024
8333964
Merge branch 'branch-25.02' into host_udf
ttnghia Nov 21, 2024
754ee58
Implementing tests
ttnghia Nov 22, 2024
5a7ea45
Fix error
ttnghia Nov 22, 2024
e0999bb
Change `host_udf_base` interface
ttnghia Nov 22, 2024
52e0acd
Implement `test_udf_simple_type`
ttnghia Nov 22, 2024
a1b568b
Implement a simple test
ttnghia Nov 22, 2024
7ec2dd9
Fix compile issues
ttnghia Nov 22, 2024
237bb72
Fix test
ttnghia Nov 22, 2024
b5b8f5b
Remove `init` value from `get_empty_output`
ttnghia Nov 22, 2024
bfec6a2
Fix test
ttnghia Nov 22, 2024
3bc9ae3
Fix compile issues
ttnghia Nov 22, 2024
1fd4c8a
Merge branch 'branch-25.02' into host_udf
ttnghia Nov 22, 2024
26be262
Enable `segmented_reduce_aggregation`
ttnghia Nov 24, 2024
6b3e3f7
Implement test for `segmented_reduce`
ttnghia Nov 24, 2024
3d505da
Fix empty output
ttnghia Nov 24, 2024
9d1ac9a
Fix empty input handling
ttnghia Nov 24, 2024
3aefaf3
Fix comment
ttnghia Nov 24, 2024
9b61fe3
Rename tests
ttnghia Nov 24, 2024
b87e2a7
Fix groupby type
ttnghia Nov 24, 2024
697993b
Add test `GroupbySimpleInput`
ttnghia Nov 24, 2024
7a81754
Add the ability to call other aggregations
ttnghia Nov 25, 2024
1b8fb92
Add anonymous namespace
ttnghia Nov 25, 2024
91489c1
Refactor
ttnghia Nov 26, 2024
b597192
Revert cmake
ttnghia Nov 26, 2024
26c3ec4
Fix style
ttnghia Nov 26, 2024
9c168e5
Add docs
ttnghia Nov 26, 2024
b10e924
Fix docs
ttnghia Nov 26, 2024
9dd26d3
Still fix docs
ttnghia Nov 26, 2024
d289528
Implement Java & JNI for `HostUDFAggregation`
ttnghia Nov 26, 2024
a5133e6
Fix instantiating code
ttnghia Nov 26, 2024
0043472
Remove unused headers
ttnghia Nov 26, 2024
f190fd8
Fix style
ttnghia Nov 26, 2024
4d559cf
Add unit tests
ttnghia Nov 26, 2024
22df331
Implement random tests
ttnghia Nov 27, 2024
82379ca
Fix compile issue
ttnghia Nov 27, 2024
ecfb879
Rename test file
ttnghia Nov 27, 2024
91a4724
Merge branch 'branch-25.02' into host_udf
ttnghia Nov 27, 2024
ef4392e
Rewrite tests, adding more check
ttnghia Nov 27, 2024
93ac14c
Add more Java classes
ttnghia Nov 27, 2024
26866f0
Merge branch 'branch-25.02' into host_udf
ttnghia Nov 27, 2024
4716373
Rewrite `host_udf_base`
ttnghia Nov 28, 2024
baa7991
Rewrite tests
ttnghia Nov 28, 2024
6d013ff
Merge branch 'branch-25.02' into host_udf
ttnghia Nov 28, 2024
8405167
Rewrite switch statements
ttnghia Nov 28, 2024
bbdc699
Fix out of sync enums
ttnghia Dec 2, 2024
81ce190
Merge branch 'branch-25.02' into host_udf
ttnghia Dec 2, 2024
3bd496d
Merge branch 'branch-25.02' into host_udf
ttnghia Dec 5, 2024
3f4d450
Rewrite example
ttnghia Dec 5, 2024
069600b
Instantiate `HostUDFAggregation` from `HostUDFWrapper`
ttnghia Dec 18, 2024
a63000d
Fix Java
ttnghia Dec 18, 2024
05084a4
Apply new wrapper
ttnghia Dec 18, 2024
28af2de
Move `HostUDFWrapper`
ttnghia Dec 18, 2024
d75d3da
Fix compile error
ttnghia Dec 18, 2024
9e0f996
Merge branch 'branch-25.02' into host_udf
ttnghia Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 283 additions & 3 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@

#pragma once

#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/export.hpp>
#include <cudf/utilities/span.hpp>
#include <cudf/utilities/traits.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <functional>
#include <memory>
#include <optional>
#include <unordered_map>
#include <unordered_set>
#include <variant>
#include <vector>

/**
Expand Down Expand Up @@ -70,6 +79,11 @@ enum class rank_percentage : int32_t {
ONE_NORMALIZED ///< (rank - 1) / (count - 1)
};

/**
* @brief Enum to describe scan operation type.
*/
enum class scan_type : bool { INCLUSIVE, EXCLUSIVE };

/**
* @brief Abstract base class for specifying the desired aggregation in an
* `aggregation_request`.
Expand Down Expand Up @@ -110,8 +124,9 @@ class aggregation {
COLLECT_SET, ///< collect values into a list without duplicate entries
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
PTX, ///< PTX UDF based reduction
CUDA, ///< CUDA UDF based reduction
PTX, ///< PTX based UDF aggregation
CUDA, ///< CUDA based UDF aggregation
HOST_UDF, ///< host based UDF aggregation
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
MERGE_M2, ///< merge partial values of M2 aggregation,
Expand All @@ -120,7 +135,7 @@ class aggregation {
TDIGEST, ///< create a tdigest from a set of input values
MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together
HISTOGRAM, ///< compute frequency of each element
MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation,
MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation
};

aggregation() = delete;
Expand Down Expand Up @@ -599,6 +614,271 @@ std::unique_ptr<Base> make_udf_aggregation(udf_type type,
std::string const& user_defined_aggregator,
data_type output_type);

/**
* @brief The interface for host-based UDF implementation.
*
* An implementation of host-based UDF needs to be derived from this base class, defining
* its own version of the required functions.
*/
struct host_udf_base {
host_udf_base() = default;
virtual ~host_udf_base() = default;

/**
* @brief Define the possible data needed for reduction.
*/
enum class reduction_data_attribute : int32_t {
INPUT_VALUES, ///< The input values column
OUTPUT_DTYPE, ///< Data type for the output result
INIT_VALUE ///< Initial value
};

/**
* @brief Define the possible data needed for segmented reduction.
*/
enum class segmented_reduction_data_attribute : int32_t {
INPUT_VALUES, ///< The input values column
OUTPUT_DTYPE, ///< Data type for the output result
INIT_VALUE, ///< Initial value
NULL_POLICY, ///< To control null handling
OFFSETS ///< The offsets defining segments
};

/**
* @brief Define the possible data needed for groupby aggregations.
*
* Note that only sort-based groupby aggregations are supported.
*/
enum class groupby_data_attribute : int32_t {
INPUT_VALUES, ///< The input values column
GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the
///< values within each group maintain their original order
SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and
///< sorted within each group
GROUP_OFFSETS, ///< The offsets separating groups
GROUP_LABELS ///< Group labels (which is also the same as group indices)
};

/**
* @brief Describe possible data that may be needed in the derived class for its operations.
*
* Such data can be either intermediate data such as sorted values or group labels etc, or the
* results of other aggregations.
*
* Each derived host-based UDF class may need a different set of data. It is inefficient to
* evaluate and pass down all these possible data at once from libcudf. A solution for that is,
* the derived class can define a subset of data that it needs and libcudf will evaluate
* and pass down only data requested from that set.
*/
struct data_attribute {
/**
* @brief Hold all possible data types for the input of the derived class.
*/
using value_type = std::variant<reduction_data_attribute,
segmented_reduction_data_attribute,
groupby_data_attribute,
std::unique_ptr<aggregation>>;
value_type value; ///< The actual data attribute, wrapped by this struct
///< as a wrapper is needed to define hash and equal_to functors.

data_attribute() = default; ///< Default constructor
data_attribute(data_attribute&&) = default; ///< Move constructor

/**
* @brief Construct a new data attribute from an aggregation attribute.
* @param value_ An aggregation attribute
*/
template <typename T,
CUDF_ENABLE_IF(std::is_same_v<T, reduction_data_attribute> ||
std::is_same_v<T, segmented_reduction_data_attribute> ||
std::is_same_v<T, groupby_data_attribute>)>
data_attribute(T value_) : value{value_}
{
}

/**
* @brief Construct a new data attribute from another aggregation request.
* @param value_ An aggregation request
*/
template <typename T,
CUDF_ENABLE_IF(std::is_same_v<T, aggregation> ||
std::is_same_v<T, groupby_aggregation>)>
data_attribute(std::unique_ptr<T> value_) : value{std::move(value_)}
{
if constexpr (std::is_same_v<T, aggregation>) {
CUDF_EXPECTS(
dynamic_cast<groupby_aggregation*>(std::get<std::unique_ptr<T>>(value).get()) != nullptr,
"Requesting results from other aggregations is only supported in groupby "
"aggregations.");
}
CUDF_EXPECTS(std::get<std::unique_ptr<aggregation>>(value) != nullptr,
"Invalid aggregation request.");
}

/**
* @brief Copy constructor.
* @param other The other data attribute to copy from
*/
data_attribute(data_attribute const& other)
: value{std::visit(
cudf::detail::visitor_overload{
[](auto const& val) { return value_type{val}; },
[](std::unique_ptr<aggregation> const& val) { return value_type{val->clone()}; }},
other.value)}
{
}

/**
* @brief Hash functor for `data_attribute`.
*/
struct hash {
/**
* @brief Compute the hash value of a data attribute.
* @param attr The data attribute to hash
* @return The hash value of the data attribute
*/
std::size_t operator()(data_attribute const& attr) const
{
auto const& value = attr.value;
auto const hash_value =
std::visit(cudf::detail::visitor_overload{
[](auto const& val) { return std::hash<int>{}(static_cast<int>(val)); },
[](std::unique_ptr<aggregation> const& val) { return val->do_hash(); }},
value);
return std::hash<std::size_t>{}(value.index()) ^ hash_value;
}
}; // struct hash

/**
* @brief Equality comparison functor for `data_attribute`.
*/
struct equal_to {
/**
* @brief Check if two data attributes are equal.
* @param lhs The left-hand side data attribute
* @param rhs The right-hand side data attribute
* @return True if the two data attributes are equal
*/
bool operator()(data_attribute const& lhs, data_attribute const& rhs) const
{
auto const& lhs_val = lhs.value;
auto const& rhs_val = rhs.value;
if (lhs_val.index() != rhs_val.index()) { return false; }
return std::visit(cudf::detail::visitor_overload{
[](auto const& lhs_val, auto const& rhs_val) {
if constexpr (std::is_same_v<decltype(lhs_val), decltype(rhs_val)>) {
return lhs_val == rhs_val;
}
return false;
},
[](std::unique_ptr<aggregation> const& lhs_val,
std::unique_ptr<aggregation> const& rhs_val) {
return lhs_val->is_equal(*rhs_val);
}},
lhs_val,
rhs_val);
}
}; // struct equal_to
}; // struct data_attribute

/**
* @brief Set of attributes for the input data that is needed for computing the aggregation.
*/
using input_data_attributes =
std::unordered_set<data_attribute, data_attribute::hash, data_attribute::equal_to>;
ttnghia marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Return a set of attributes for the data that is needed for computing the aggregation.
*
* If this function is not overridden, all the data attributes (except results from other
* aggregations in groupby) are assumed to be needed.
*
* @return A set of `data_attribute`.
*/
[[nodiscard]] virtual input_data_attributes get_required_data() const { return {}; }

/**
* @brief Hold all possible types of the data that is passed to the derived class for executing
* the aggregation.
*/
using input_data_type = std::variant<column_view,
data_type,
std::optional<std::reference_wrapper<scalar const>>,
null_policy,
device_span<size_type const>>;

/**
* @brief Input to the aggregation, mapping from each data attribute to its actual data.
*/
using host_udf_input = std::
unordered_map<data_attribute, input_data_type, data_attribute::hash, data_attribute::equal_to>;

/**
* @brief Output type of the aggregation. It can be either a scalar (for reduction) or a column
* (for segmented reduction or groupby aggregations).
*/
using output_type = std::variant<std::unique_ptr<scalar>, std::unique_ptr<column>>;

/**
* @brief Get the output when the input values column is empty.
*
* This is called in libcudf when the input values column is empty. In such situations libcudf
* tries to generate the output directly without unnecessarily evaluating the intermediate data.
*
* @param output_dtype The expected output data type for reduction (if specified)
* @param stream The CUDA stream to use for any kernel launches
* @param mr Device memory resource to use for any allocations
* @return The output result of the aggregation when input values is empty
*/
[[nodiscard]] virtual output_type get_empty_output(std::optional<data_type> output_dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const = 0;

/**
* @brief Perform the main computation for the host-based UDF.
*
* @param input The input data needed for performing all computation
* @param stream The CUDA stream to use for any kernel launches
* @param mr Device memory resource to use for any allocations
* @return The output result of the aggregation
*/
[[nodiscard]] virtual output_type operator()(host_udf_input const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const = 0;

/**
* @brief Computes hash value of the derived class's instance.
* @return The hash value of the instance
*/
[[nodiscard]] virtual std::size_t do_hash() const = 0;

/**
* @brief Compares two instances of the derived class for equality.
* @param other The other derived class's instance to compare with
* @return True if the two instances are equal
*/
[[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0;

/**
* @brief Clones the instance.
*
* A class derived from `host_udf_base` should not store too much data such that its instances
* remain lightweight for efficient cloning.
*
* @return A new instance cloned from this
*/
[[nodiscard]] virtual std::unique_ptr<host_udf_base> clone() const = 0;
};

/**
* @brief Factory to create a HOST_UDF aggregation
*
* @param host_udf An instance of a class derived from `host_udf_base` to perform aggregation
* @return A HOST_UDF aggregation object
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_host_udf_aggregation(std::unique_ptr<host_udf_base> host_udf);

/**
* @brief Factory to create a MERGE_LISTS aggregation.
*
Expand Down
Loading
Loading