diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml
index abe2fc8ed8b..9d79733703c 100644
--- a/.github/workflows/pr.yaml
+++ b/.github/workflows/pr.yaml
@@ -13,6 +13,7 @@ jobs:
# Please keep pr-builder as the top job here
pr-builder:
needs:
+ - check-nightly-ci
- changed-files
- checks
- conda-cpp-build
@@ -54,6 +55,18 @@ jobs:
- name: Telemetry setup
if: ${{ vars.TELEMETRY_ENABLED == 'true' }}
uses: rapidsai/shared-actions/telemetry-dispatch-stash-base-env-vars@main
+ check-nightly-ci:
+ # Switch to ubuntu-latest once it defaults to a version of Ubuntu that
+ # provides at least Python 3.11 (see
+ # https://docs.python.org/3/library/datetime.html#datetime.date.fromisoformat)
+ runs-on: ubuntu-24.04
+ env:
+ RAPIDS_GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ steps:
+ - name: Check if nightly CI is passing
+ uses: rapidsai/shared-actions/check_nightly_success/dispatch@main
+ with:
+ repo: cudf
changed-files:
secrets: inherit
needs: telemetry-setup
diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml
index 8a4c203a752..a7833f994d3 100644
--- a/conda/environments/all_cuda-118_arch-x86_64.yaml
+++ b/conda/environments/all_cuda-118_arch-x86_64.yaml
@@ -71,7 +71,7 @@ dependencies:
- ptxcompiler
- pyarrow>=14.0.0,<19.0.0a0
- pydata-sphinx-theme!=0.14.2
-- pynvml>=11.4.1,<12.0.0a0
+- pynvml>=12.0.0,<13.0.0a0
- pytest-benchmark
- pytest-cases>=3.8.2
- pytest-cov
diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml
index ff12f11fd75..89bdd300c93 100644
--- a/conda/environments/all_cuda-125_arch-x86_64.yaml
+++ b/conda/environments/all_cuda-125_arch-x86_64.yaml
@@ -69,7 +69,7 @@ dependencies:
- pyarrow>=14.0.0,<19.0.0a0
- pydata-sphinx-theme!=0.14.2
- pynvjitlink>=0.0.0a0
-- pynvml>=11.4.1,<12.0.0a0
+- pynvml>=12.0.0,<13.0.0a0
- pytest-benchmark
- pytest-cases>=3.8.2
- pytest-cov
diff --git a/conda/recipes/dask-cudf/meta.yaml b/conda/recipes/dask-cudf/meta.yaml
index 74ecded8ead..a476d5d53df 100644
--- a/conda/recipes/dask-cudf/meta.yaml
+++ b/conda/recipes/dask-cudf/meta.yaml
@@ -43,7 +43,7 @@ requirements:
run:
- python
- cudf ={{ version }}
- - pynvml >=11.4.1,<12.0.0a0
+ - pynvml >=12.0.0,<13.0.0a0
- rapids-dask-dependency ={{ minor_version }}
- {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }}
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 9cbacee8e8d..8c6cd922747 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -446,7 +446,6 @@ add_library(
src/groupby/sort/group_quantiles.cu
src/groupby/sort/group_std.cu
src/groupby/sort/group_sum.cu
- src/groupby/sort/scan.cpp
src/groupby/sort/group_count_scan.cu
src/groupby/sort/group_max_scan.cu
src/groupby/sort/group_min_scan.cu
@@ -454,6 +453,8 @@ add_library(
src/groupby/sort/group_rank_scan.cu
src/groupby/sort/group_replace_nulls.cu
src/groupby/sort/group_sum_scan.cu
+ src/groupby/sort/host_udf_aggregation.cpp
+ src/groupby/sort/scan.cpp
src/groupby/sort/sort_helper.cu
src/hash/md5_hash.cu
src/hash/murmurhash3_x86_32.cu
diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp
index f5f514d26d9..a1b7db5e08a 100644
--- a/cpp/include/cudf/aggregation.hpp
+++ b/cpp/include/cudf/aggregation.hpp
@@ -110,8 +110,9 @@ class aggregation {
COLLECT_SET, ///< collect values into a list without duplicate entries
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
- PTX, ///< PTX UDF based reduction
- CUDA, ///< CUDA UDF based reduction
+ PTX, ///< PTX based UDF aggregation
+ CUDA, ///< CUDA based UDF aggregation
+ HOST_UDF, ///< host based UDF aggregation
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
MERGE_M2, ///< merge partial values of M2 aggregation,
@@ -120,7 +121,7 @@ class aggregation {
TDIGEST, ///< create a tdigest from a set of input values
MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together
HISTOGRAM, ///< compute frequency of each element
- MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation,
+ MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation
};
aggregation() = delete;
@@ -599,6 +600,18 @@ std::unique_ptr make_udf_aggregation(udf_type type,
std::string const& user_defined_aggregator,
data_type output_type);
+// Forward declaration of `host_udf_base` for the factory function of `HOST_UDF` aggregation.
+struct host_udf_base;
+
+/**
+ * @brief Factory to create a HOST_UDF aggregation.
+ *
+ * @param host_udf An instance of a class derived from `host_udf_base` to perform aggregation
+ * @return A HOST_UDF aggregation object
+ */
+template
+std::unique_ptr make_host_udf_aggregation(std::unique_ptr host_udf);
+
/**
* @brief Factory to create a MERGE_LISTS aggregation.
*
diff --git a/cpp/include/cudf/aggregation/host_udf.hpp b/cpp/include/cudf/aggregation/host_udf.hpp
new file mode 100644
index 00000000000..bbce76dc5f3
--- /dev/null
+++ b/cpp/include/cudf/aggregation/host_udf.hpp
@@ -0,0 +1,294 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+/**
+ * @file host_udf.hpp
+ * @brief Declare the base class for host-side user-defined function (`HOST_UDF`) and example of
+ * subclass implementation.
+ */
+
+namespace CUDF_EXPORT cudf {
+/**
+ * @addtogroup aggregation_factories
+ * @{
+ */
+
+/**
+ * @brief The interface for host-based UDF implementation.
+ *
+ * An implementation of host-based UDF needs to be derived from this base class, defining
+ * its own version of the required functions. In particular:
+ * - The derived class is required to implement `get_empty_output`, `operator()`, `is_equal`,
+ * and `clone` functions.
+ * - If necessary, the derived class can also override `do_hash` to compute hashing for its
+ * instance, and `get_required_data` to selectively access to the input data as well as
+ * intermediate data provided by libcudf.
+ *
+ * Example of such implementation:
+ * @code{.cpp}
+ * struct my_udf_aggregation : cudf::host_udf_base {
+ * my_udf_aggregation() = default;
+ *
+ * // This UDF aggregation needs `GROUPED_VALUES` and `GROUP_OFFSETS`,
+ * // and the result from groupby `MAX` aggregation.
+ * [[nodiscard]] data_attribute_set_t get_required_data() const override
+ * {
+ * return {groupby_data_attribute::GROUPED_VALUES,
+ * groupby_data_attribute::GROUP_OFFSETS,
+ * cudf::make_max_aggregation()};
+ * }
+ *
+ * [[nodiscard]] output_t get_empty_output(
+ * [[maybe_unused]] std::optional output_dtype,
+ * [[maybe_unused]] rmm::cuda_stream_view stream,
+ * [[maybe_unused]] rmm::device_async_resource_ref mr) const override
+ * {
+ * // This UDF aggregation always returns a column of type INT32.
+ * return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32});
+ * }
+ *
+ * [[nodiscard]] output_t operator()(input_map_t const& input,
+ * rmm::cuda_stream_view stream,
+ * rmm::device_async_resource_ref mr) const override
+ * {
+ * // Perform UDF computation using the input data and return the result.
+ * }
+ *
+ * [[nodiscard]] bool is_equal(host_udf_base const& other) const override
+ * {
+ * // Check if the other object is also instance of this class.
+ * return dynamic_cast(&other) != nullptr;
+ * }
+ *
+ * [[nodiscard]] std::unique_ptr clone() const override
+ * {
+ * return std::make_unique();
+ * }
+ * };
+ * @endcode
+ */
+struct host_udf_base {
+ host_udf_base() = default;
+ virtual ~host_udf_base() = default;
+
+ /**
+ * @brief Define the possible data needed for groupby aggregations.
+ *
+ * Note that only sort-based groupby aggregations are supported.
+ */
+ enum class groupby_data_attribute : int32_t {
+ INPUT_VALUES, ///< The input values column.
+ GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the
+ ///< values within each group maintain their original order.
+ SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and
+ ///< sorted within each group.
+ NUM_GROUPS, ///< The number of groups (i.e., number of distinct keys).
+ GROUP_OFFSETS, ///< The offsets separating groups.
+ GROUP_LABELS ///< Group labels (which is also the same as group indices).
+ };
+
+ /**
+ * @brief Describe possible data that may be needed in the derived class for its operations.
+ *
+ * Such data can be either intermediate data such as sorted values or group labels etc, or the
+ * results of other aggregations.
+ *
+ * Each derived host-based UDF class may need a different set of data. It is inefficient to
+ * evaluate and pass down all these possible data at once from libcudf. A solution for that is,
+ * the derived class can define a subset of data that it needs and libcudf will evaluate
+ * and pass down only data requested from that set.
+ */
+ struct data_attribute {
+ /**
+ * @brief Hold all possible data types for the input of the aggregation in the derived class.
+ */
+ using value_type = std::variant>;
+ value_type value; ///< The actual data attribute, wrapped by this struct
+ ///< as a wrapper is needed to define `hash` and `equal_to` functors.
+
+ data_attribute() = default; ///< Default constructor
+ data_attribute(data_attribute&&) = default; ///< Move constructor
+
+ /**
+ * @brief Construct a new data attribute from an aggregation attribute.
+ * @param value_ An aggregation attribute
+ */
+ template )>
+ data_attribute(T value_) : value{value_}
+ {
+ }
+
+ /**
+ * @brief Construct a new data attribute from another aggregation request.
+ * @param value_ An aggregation request
+ */
+ template ||
+ std::is_same_v)>
+ data_attribute(std::unique_ptr value_) : value{std::move(value_)}
+ {
+ CUDF_EXPECTS(std::get>(value) != nullptr,
+ "Invalid aggregation request.");
+ if constexpr (std::is_same_v) {
+ CUDF_EXPECTS(
+ dynamic_cast(std::get>(value).get()) != nullptr,
+ "Requesting results from other aggregations is only supported in groupby "
+ "aggregations.");
+ }
+ }
+
+ /**
+ * @brief Copy constructor.
+ * @param other The other data attribute to copy from
+ */
+ data_attribute(data_attribute const& other);
+
+ /**
+ * @brief Hash functor for `data_attribute`.
+ */
+ struct hash {
+ /**
+ * @brief Compute the hash value of a data attribute.
+ * @param attr The data attribute to hash
+ * @return The hash value of the data attribute
+ */
+ std::size_t operator()(data_attribute const& attr) const;
+ }; // struct hash
+
+ /**
+ * @brief Equality comparison functor for `data_attribute`.
+ */
+ struct equal_to {
+ /**
+ * @brief Check if two data attributes are equal.
+ * @param lhs The left-hand side data attribute
+ * @param rhs The right-hand side data attribute
+ * @return True if the two data attributes are equal
+ */
+ bool operator()(data_attribute const& lhs, data_attribute const& rhs) const;
+ }; // struct equal_to
+ }; // struct data_attribute
+
+ /**
+ * @brief Set of attributes for the input data that is needed for computing the aggregation.
+ */
+ using data_attribute_set_t =
+ std::unordered_set;
+
+ /**
+ * @brief Return a set of attributes for the data that is needed for computing the aggregation.
+ *
+ * The derived class should return the attributes corresponding to only the data that it needs to
+ * avoid unnecessary computation performed in libcudf. If this function is not overridden, an
+ * empty set is returned. That means all the data attributes (except results from other
+ * aggregations in groupby) will be needed.
+ *
+ * @return A set of `data_attribute`
+ */
+ [[nodiscard]] virtual data_attribute_set_t get_required_data() const { return {}; }
+
+ /**
+ * @brief Hold all possible types of the data that is passed to the derived class for executing
+ * the aggregation.
+ */
+ using input_data_t = std::variant>;
+
+ /**
+ * @brief Input to the aggregation, mapping from each data attribute to its actual data.
+ */
+ using input_map_t = std::
+ unordered_map;
+
+ /**
+ * @brief Output type of the aggregation.
+ *
+ * Currently only a single type is supported as the output of the aggregation, but it will hold
+ * more type in the future when reduction is supported.
+ */
+ using output_t = std::variant>;
+
+ /**
+ * @brief Get the output when the input values column is empty.
+ *
+ * This is called in libcudf when the input values column is empty. In such situations libcudf
+ * tries to generate the output directly without unnecessarily evaluating the intermediate data.
+ *
+ * @param output_dtype The expected output data type
+ * @param stream The CUDA stream to use for any kernel launches
+ * @param mr Device memory resource to use for any allocations
+ * @return The output result of the aggregation when input values is empty
+ */
+ [[nodiscard]] virtual output_t get_empty_output(std::optional output_dtype,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr) const = 0;
+
+ /**
+ * @brief Perform the main computation for the host-based UDF.
+ *
+ * @param input The input data needed for performing all computation
+ * @param stream The CUDA stream to use for any kernel launches
+ * @param mr Device memory resource to use for any allocations
+ * @return The output result of the aggregation
+ */
+ [[nodiscard]] virtual output_t operator()(input_map_t const& input,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr) const = 0;
+
+ /**
+ * @brief Computes hash value of the class's instance.
+ * @return The hash value of the instance
+ */
+ [[nodiscard]] virtual std::size_t do_hash() const
+ {
+ return std::hash{}(static_cast(aggregation::Kind::HOST_UDF));
+ }
+
+ /**
+ * @brief Compares two instances of the derived class for equality.
+ * @param other The other derived class's instance to compare with
+ * @return True if the two instances are equal
+ */
+ [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0;
+
+ /**
+ * @brief Clones the instance.
+ *
+ * A class derived from `host_udf_base` should not store too much data such that its instances
+ * remain lightweight for efficient cloning.
+ *
+ * @return A new instance cloned from this
+ */
+ [[nodiscard]] virtual std::unique_ptr clone() const = 0;
+};
+
+/** @} */ // end of group
+} // namespace CUDF_EXPORT cudf
diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp
index 6661a461b8b..d873e93bd20 100644
--- a/cpp/include/cudf/detail/aggregation/aggregation.hpp
+++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp
@@ -20,6 +20,7 @@
#include
#include
#include
+#include
#include
#include
@@ -88,6 +89,8 @@ class simple_aggregations_collector { // Declares the interface for the simple
class lead_lag_aggregation const& agg);
virtual std::vector> visit(data_type col_type,
class udf_aggregation const& agg);
+ virtual std::vector> visit(data_type col_type,
+ class host_udf_aggregation const& agg);
virtual std::vector> visit(data_type col_type,
class merge_lists_aggregation const& agg);
virtual std::vector> visit(data_type col_type,
@@ -135,6 +138,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer
virtual void visit(class collect_set_aggregation const& agg);
virtual void visit(class lead_lag_aggregation const& agg);
virtual void visit(class udf_aggregation const& agg);
+ virtual void visit(class host_udf_aggregation const& agg);
virtual void visit(class merge_lists_aggregation const& agg);
virtual void visit(class merge_sets_aggregation const& agg);
virtual void visit(class merge_m2_aggregation const& agg);
@@ -960,6 +964,35 @@ class udf_aggregation final : public rolling_aggregation {
}
};
+/**
+ * @brief Derived class for specifying host-based UDF aggregation.
+ */
+class host_udf_aggregation final : public groupby_aggregation {
+ public:
+ std::unique_ptr udf_ptr;
+
+ host_udf_aggregation() = delete;
+ host_udf_aggregation(host_udf_aggregation const&) = delete;
+
+ // Need to define the constructor and destructor in a separate source file where we have the
+ // complete declaration of `host_udf_base`.
+ explicit host_udf_aggregation(std::unique_ptr udf_ptr_);
+ ~host_udf_aggregation() override;
+
+ [[nodiscard]] bool is_equal(aggregation const& _other) const override;
+
+ [[nodiscard]] size_t do_hash() const override;
+
+ [[nodiscard]] std::unique_ptr clone() const override;
+
+ std::vector> get_simple_aggregations(
+ data_type col_type, simple_aggregations_collector& collector) const override
+ {
+ return collector.visit(col_type, *this);
+ }
+ void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
+};
+
/**
* @brief Derived aggregation class for specifying MERGE_LISTS aggregation
*/
@@ -1462,6 +1495,12 @@ struct target_type_impl