diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml
index 49ca5ca0fb9..9d79733703c 100644
--- a/.github/workflows/pr.yaml
+++ b/.github/workflows/pr.yaml
@@ -13,6 +13,7 @@ jobs:
# Please keep pr-builder as the top job here
pr-builder:
needs:
+ - check-nightly-ci
- changed-files
- checks
- conda-cpp-build
@@ -54,6 +55,18 @@ jobs:
- name: Telemetry setup
if: ${{ vars.TELEMETRY_ENABLED == 'true' }}
uses: rapidsai/shared-actions/telemetry-dispatch-stash-base-env-vars@main
+ check-nightly-ci:
+ # Switch to ubuntu-latest once it defaults to a version of Ubuntu that
+ # provides at least Python 3.11 (see
+ # https://docs.python.org/3/library/datetime.html#datetime.date.fromisoformat)
+ runs-on: ubuntu-24.04
+ env:
+ RAPIDS_GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ steps:
+ - name: Check if nightly CI is passing
+ uses: rapidsai/shared-actions/check_nightly_success/dispatch@main
+ with:
+ repo: cudf
changed-files:
secrets: inherit
needs: telemetry-setup
@@ -328,16 +341,11 @@ jobs:
run_script: "ci/cudf_pandas_scripts/pandas-tests/diff.sh"
telemetry-summarize:
- runs-on: ubuntu-latest
+ # This job must use a self-hosted runner to record telemetry traces.
+ runs-on: linux-amd64-cpu4
needs: pr-builder
if: ${{ vars.TELEMETRY_ENABLED == 'true' && !cancelled() }}
continue-on-error: true
steps:
- - name: Load stashed telemetry env vars
- uses: rapidsai/shared-actions/telemetry-dispatch-load-base-env-vars@main
- with:
- load_service_name: true
- name: Telemetry summarize
- uses: rapidsai/shared-actions/telemetry-dispatch-write-summary@main
- with:
- cert_concat: "${{ secrets.OTEL_EXPORTER_OTLP_CA_CERTIFICATE }};${{ secrets.OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE }};${{ secrets.OTEL_EXPORTER_OTLP_CLIENT_KEY }}"
+ uses: rapidsai/shared-actions/telemetry-dispatch-summarize@main
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 890b01e99a8..6c3db891de0 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -446,7 +446,6 @@ add_library(
src/groupby/sort/group_quantiles.cu
src/groupby/sort/group_std.cu
src/groupby/sort/group_sum.cu
- src/groupby/sort/scan.cpp
src/groupby/sort/group_count_scan.cu
src/groupby/sort/group_max_scan.cu
src/groupby/sort/group_min_scan.cu
@@ -454,6 +453,8 @@ add_library(
src/groupby/sort/group_rank_scan.cu
src/groupby/sort/group_replace_nulls.cu
src/groupby/sort/group_sum_scan.cu
+ src/groupby/sort/host_udf_aggregation.cpp
+ src/groupby/sort/scan.cpp
src/groupby/sort/sort_helper.cu
src/hash/md5_hash.cu
src/hash/murmurhash3_x86_32.cu
diff --git a/cpp/cmake/thirdparty/get_nanoarrow.cmake b/cpp/cmake/thirdparty/get_nanoarrow.cmake
index c440643037b..b0c48e04710 100644
--- a/cpp/cmake/thirdparty/get_nanoarrow.cmake
+++ b/cpp/cmake/thirdparty/get_nanoarrow.cmake
@@ -14,11 +14,6 @@
# This function finds nanoarrow and sets any additional necessary environment variables.
function(find_and_configure_nanoarrow)
- include(${rapids-cmake-dir}/cpm/package_override.cmake)
-
- set(cudf_patch_dir "${CMAKE_CURRENT_FUNCTION_LIST_DIR}/patches")
- rapids_cpm_package_override("${cudf_patch_dir}/nanoarrow_override.json")
-
if(NOT BUILD_SHARED_LIBS)
set(_exclude_from_all EXCLUDE_FROM_ALL FALSE)
else()
@@ -31,6 +26,9 @@ function(find_and_configure_nanoarrow)
nanoarrow 0.6.0.dev
GLOBAL_TARGETS nanoarrow
CPM_ARGS
+ GIT_REPOSITORY https://github.com/apache/arrow-nanoarrow.git
+ GIT_TAG 1e2664a70ec14907409cadcceb14d79b9670bcdb
+ GIT_SHALLOW FALSE
OPTIONS "BUILD_SHARED_LIBS OFF" "NANOARROW_NAMESPACE cudf" ${_exclude_from_all}
)
set_target_properties(nanoarrow PROPERTIES POSITION_INDEPENDENT_CODE ON)
diff --git a/cpp/cmake/thirdparty/patches/nanoarrow_clang_tidy_compliance.diff b/cpp/cmake/thirdparty/patches/nanoarrow_clang_tidy_compliance.diff
deleted file mode 100644
index e9a36fcb567..00000000000
--- a/cpp/cmake/thirdparty/patches/nanoarrow_clang_tidy_compliance.diff
+++ /dev/null
@@ -1,38 +0,0 @@
-diff --git a/src/nanoarrow/common/inline_buffer.h b/src/nanoarrow/common/inline_buffer.h
-index caa6be4..70ec8a2 100644
---- a/src/nanoarrow/common/inline_buffer.h
-+++ b/src/nanoarrow/common/inline_buffer.h
-@@ -347,7 +347,7 @@ static inline void _ArrowBitsUnpackInt32(const uint8_t word, int32_t* out) {
- }
-
- static inline void _ArrowBitmapPackInt8(const int8_t* values, uint8_t* out) {
-- *out = (uint8_t)(values[0] | ((values[1] + 0x1) & 0x2) | ((values[2] + 0x3) & 0x4) |
-+ *out = (uint8_t)(values[0] | ((values[1] + 0x1) & 0x2) | ((values[2] + 0x3) & 0x4) | // NOLINT
- ((values[3] + 0x7) & 0x8) | ((values[4] + 0xf) & 0x10) |
- ((values[5] + 0x1f) & 0x20) | ((values[6] + 0x3f) & 0x40) |
- ((values[7] + 0x7f) & 0x80));
-@@ -471,13 +471,13 @@ static inline void ArrowBitsSetTo(uint8_t* bits, int64_t start_offset, int64_t l
- // set bits within a single byte
- const uint8_t only_byte_mask =
- i_end % 8 == 0 ? first_byte_mask : (uint8_t)(first_byte_mask | last_byte_mask);
-- bits[bytes_begin] &= only_byte_mask;
-+ bits[bytes_begin] &= only_byte_mask; // NOLINT
- bits[bytes_begin] |= (uint8_t)(fill_byte & ~only_byte_mask);
- return;
- }
-
- // set/clear trailing bits of first byte
-- bits[bytes_begin] &= first_byte_mask;
-+ bits[bytes_begin] &= first_byte_mask; // NOLINT
- bits[bytes_begin] |= (uint8_t)(fill_byte & ~first_byte_mask);
-
- if (bytes_end - bytes_begin > 2) {
-@@ -637,7 +637,7 @@ static inline void ArrowBitmapAppendInt8Unsafe(struct ArrowBitmap* bitmap,
- n_remaining -= n_full_bytes * 8;
- if (n_remaining > 0) {
- // Zero out the last byte
-- *out_cursor = 0x00;
-+ *out_cursor = 0x00; // NOLINT
- for (int i = 0; i < n_remaining; i++) {
- ArrowBitSetTo(bitmap->buffer.data, out_i_cursor++, values_cursor[i]);
- }
diff --git a/cpp/cmake/thirdparty/patches/nanoarrow_override.json b/cpp/cmake/thirdparty/patches/nanoarrow_override.json
deleted file mode 100644
index d529787e7c8..00000000000
--- a/cpp/cmake/thirdparty/patches/nanoarrow_override.json
+++ /dev/null
@@ -1,18 +0,0 @@
-
-{
- "packages" : {
- "nanoarrow" : {
- "version" : "0.6.0.dev",
- "git_url" : "https://github.com/apache/arrow-nanoarrow.git",
- "git_tag" : "1e2664a70ec14907409cadcceb14d79b9670bcdb",
- "git_shallow" : false,
- "patches" : [
- {
- "file" : "${current_json_dir}/nanoarrow_clang_tidy_compliance.diff",
- "issue" : "https://github.com/apache/arrow-nanoarrow/issues/537",
- "fixed_in" : ""
- }
- ]
- }
- }
-}
diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp
index f5f514d26d9..a1b7db5e08a 100644
--- a/cpp/include/cudf/aggregation.hpp
+++ b/cpp/include/cudf/aggregation.hpp
@@ -110,8 +110,9 @@ class aggregation {
COLLECT_SET, ///< collect values into a list without duplicate entries
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
- PTX, ///< PTX UDF based reduction
- CUDA, ///< CUDA UDF based reduction
+ PTX, ///< PTX based UDF aggregation
+ CUDA, ///< CUDA based UDF aggregation
+ HOST_UDF, ///< host based UDF aggregation
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
MERGE_M2, ///< merge partial values of M2 aggregation,
@@ -120,7 +121,7 @@ class aggregation {
TDIGEST, ///< create a tdigest from a set of input values
MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together
HISTOGRAM, ///< compute frequency of each element
- MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation,
+ MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation
};
aggregation() = delete;
@@ -599,6 +600,18 @@ std::unique_ptr make_udf_aggregation(udf_type type,
std::string const& user_defined_aggregator,
data_type output_type);
+// Forward declaration of `host_udf_base` for the factory function of `HOST_UDF` aggregation.
+struct host_udf_base;
+
+/**
+ * @brief Factory to create a HOST_UDF aggregation.
+ *
+ * @param host_udf An instance of a class derived from `host_udf_base` to perform aggregation
+ * @return A HOST_UDF aggregation object
+ */
+template
+std::unique_ptr make_host_udf_aggregation(std::unique_ptr host_udf);
+
/**
* @brief Factory to create a MERGE_LISTS aggregation.
*
diff --git a/cpp/include/cudf/aggregation/host_udf.hpp b/cpp/include/cudf/aggregation/host_udf.hpp
new file mode 100644
index 00000000000..bbce76dc5f3
--- /dev/null
+++ b/cpp/include/cudf/aggregation/host_udf.hpp
@@ -0,0 +1,294 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+/**
+ * @file host_udf.hpp
+ * @brief Declare the base class for host-side user-defined function (`HOST_UDF`) and example of
+ * subclass implementation.
+ */
+
+namespace CUDF_EXPORT cudf {
+/**
+ * @addtogroup aggregation_factories
+ * @{
+ */
+
+/**
+ * @brief The interface for host-based UDF implementation.
+ *
+ * An implementation of host-based UDF needs to be derived from this base class, defining
+ * its own version of the required functions. In particular:
+ * - The derived class is required to implement `get_empty_output`, `operator()`, `is_equal`,
+ * and `clone` functions.
+ * - If necessary, the derived class can also override `do_hash` to compute hashing for its
+ * instance, and `get_required_data` to selectively access to the input data as well as
+ * intermediate data provided by libcudf.
+ *
+ * Example of such implementation:
+ * @code{.cpp}
+ * struct my_udf_aggregation : cudf::host_udf_base {
+ * my_udf_aggregation() = default;
+ *
+ * // This UDF aggregation needs `GROUPED_VALUES` and `GROUP_OFFSETS`,
+ * // and the result from groupby `MAX` aggregation.
+ * [[nodiscard]] data_attribute_set_t get_required_data() const override
+ * {
+ * return {groupby_data_attribute::GROUPED_VALUES,
+ * groupby_data_attribute::GROUP_OFFSETS,
+ * cudf::make_max_aggregation()};
+ * }
+ *
+ * [[nodiscard]] output_t get_empty_output(
+ * [[maybe_unused]] std::optional output_dtype,
+ * [[maybe_unused]] rmm::cuda_stream_view stream,
+ * [[maybe_unused]] rmm::device_async_resource_ref mr) const override
+ * {
+ * // This UDF aggregation always returns a column of type INT32.
+ * return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32});
+ * }
+ *
+ * [[nodiscard]] output_t operator()(input_map_t const& input,
+ * rmm::cuda_stream_view stream,
+ * rmm::device_async_resource_ref mr) const override
+ * {
+ * // Perform UDF computation using the input data and return the result.
+ * }
+ *
+ * [[nodiscard]] bool is_equal(host_udf_base const& other) const override
+ * {
+ * // Check if the other object is also instance of this class.
+ * return dynamic_cast(&other) != nullptr;
+ * }
+ *
+ * [[nodiscard]] std::unique_ptr clone() const override
+ * {
+ * return std::make_unique();
+ * }
+ * };
+ * @endcode
+ */
+struct host_udf_base {
+ host_udf_base() = default;
+ virtual ~host_udf_base() = default;
+
+ /**
+ * @brief Define the possible data needed for groupby aggregations.
+ *
+ * Note that only sort-based groupby aggregations are supported.
+ */
+ enum class groupby_data_attribute : int32_t {
+ INPUT_VALUES, ///< The input values column.
+ GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the
+ ///< values within each group maintain their original order.
+ SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and
+ ///< sorted within each group.
+ NUM_GROUPS, ///< The number of groups (i.e., number of distinct keys).
+ GROUP_OFFSETS, ///< The offsets separating groups.
+ GROUP_LABELS ///< Group labels (which is also the same as group indices).
+ };
+
+ /**
+ * @brief Describe possible data that may be needed in the derived class for its operations.
+ *
+ * Such data can be either intermediate data such as sorted values or group labels etc, or the
+ * results of other aggregations.
+ *
+ * Each derived host-based UDF class may need a different set of data. It is inefficient to
+ * evaluate and pass down all these possible data at once from libcudf. A solution for that is,
+ * the derived class can define a subset of data that it needs and libcudf will evaluate
+ * and pass down only data requested from that set.
+ */
+ struct data_attribute {
+ /**
+ * @brief Hold all possible data types for the input of the aggregation in the derived class.
+ */
+ using value_type = std::variant>;
+ value_type value; ///< The actual data attribute, wrapped by this struct
+ ///< as a wrapper is needed to define `hash` and `equal_to` functors.
+
+ data_attribute() = default; ///< Default constructor
+ data_attribute(data_attribute&&) = default; ///< Move constructor
+
+ /**
+ * @brief Construct a new data attribute from an aggregation attribute.
+ * @param value_ An aggregation attribute
+ */
+ template )>
+ data_attribute(T value_) : value{value_}
+ {
+ }
+
+ /**
+ * @brief Construct a new data attribute from another aggregation request.
+ * @param value_ An aggregation request
+ */
+ template ||
+ std::is_same_v)>
+ data_attribute(std::unique_ptr value_) : value{std::move(value_)}
+ {
+ CUDF_EXPECTS(std::get>(value) != nullptr,
+ "Invalid aggregation request.");
+ if constexpr (std::is_same_v) {
+ CUDF_EXPECTS(
+ dynamic_cast(std::get>(value).get()) != nullptr,
+ "Requesting results from other aggregations is only supported in groupby "
+ "aggregations.");
+ }
+ }
+
+ /**
+ * @brief Copy constructor.
+ * @param other The other data attribute to copy from
+ */
+ data_attribute(data_attribute const& other);
+
+ /**
+ * @brief Hash functor for `data_attribute`.
+ */
+ struct hash {
+ /**
+ * @brief Compute the hash value of a data attribute.
+ * @param attr The data attribute to hash
+ * @return The hash value of the data attribute
+ */
+ std::size_t operator()(data_attribute const& attr) const;
+ }; // struct hash
+
+ /**
+ * @brief Equality comparison functor for `data_attribute`.
+ */
+ struct equal_to {
+ /**
+ * @brief Check if two data attributes are equal.
+ * @param lhs The left-hand side data attribute
+ * @param rhs The right-hand side data attribute
+ * @return True if the two data attributes are equal
+ */
+ bool operator()(data_attribute const& lhs, data_attribute const& rhs) const;
+ }; // struct equal_to
+ }; // struct data_attribute
+
+ /**
+ * @brief Set of attributes for the input data that is needed for computing the aggregation.
+ */
+ using data_attribute_set_t =
+ std::unordered_set;
+
+ /**
+ * @brief Return a set of attributes for the data that is needed for computing the aggregation.
+ *
+ * The derived class should return the attributes corresponding to only the data that it needs to
+ * avoid unnecessary computation performed in libcudf. If this function is not overridden, an
+ * empty set is returned. That means all the data attributes (except results from other
+ * aggregations in groupby) will be needed.
+ *
+ * @return A set of `data_attribute`
+ */
+ [[nodiscard]] virtual data_attribute_set_t get_required_data() const { return {}; }
+
+ /**
+ * @brief Hold all possible types of the data that is passed to the derived class for executing
+ * the aggregation.
+ */
+ using input_data_t = std::variant>;
+
+ /**
+ * @brief Input to the aggregation, mapping from each data attribute to its actual data.
+ */
+ using input_map_t = std::
+ unordered_map;
+
+ /**
+ * @brief Output type of the aggregation.
+ *
+ * Currently only a single type is supported as the output of the aggregation, but it will hold
+ * more type in the future when reduction is supported.
+ */
+ using output_t = std::variant>;
+
+ /**
+ * @brief Get the output when the input values column is empty.
+ *
+ * This is called in libcudf when the input values column is empty. In such situations libcudf
+ * tries to generate the output directly without unnecessarily evaluating the intermediate data.
+ *
+ * @param output_dtype The expected output data type
+ * @param stream The CUDA stream to use for any kernel launches
+ * @param mr Device memory resource to use for any allocations
+ * @return The output result of the aggregation when input values is empty
+ */
+ [[nodiscard]] virtual output_t get_empty_output(std::optional output_dtype,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr) const = 0;
+
+ /**
+ * @brief Perform the main computation for the host-based UDF.
+ *
+ * @param input The input data needed for performing all computation
+ * @param stream The CUDA stream to use for any kernel launches
+ * @param mr Device memory resource to use for any allocations
+ * @return The output result of the aggregation
+ */
+ [[nodiscard]] virtual output_t operator()(input_map_t const& input,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr) const = 0;
+
+ /**
+ * @brief Computes hash value of the class's instance.
+ * @return The hash value of the instance
+ */
+ [[nodiscard]] virtual std::size_t do_hash() const
+ {
+ return std::hash{}(static_cast(aggregation::Kind::HOST_UDF));
+ }
+
+ /**
+ * @brief Compares two instances of the derived class for equality.
+ * @param other The other derived class's instance to compare with
+ * @return True if the two instances are equal
+ */
+ [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0;
+
+ /**
+ * @brief Clones the instance.
+ *
+ * A class derived from `host_udf_base` should not store too much data such that its instances
+ * remain lightweight for efficient cloning.
+ *
+ * @return A new instance cloned from this
+ */
+ [[nodiscard]] virtual std::unique_ptr clone() const = 0;
+};
+
+/** @} */ // end of group
+} // namespace CUDF_EXPORT cudf
diff --git a/cpp/include/cudf/detail/aggregation/aggregation.cuh b/cpp/include/cudf/detail/aggregation/aggregation.cuh
index c30c3d6f4bd..59011f7b138 100644
--- a/cpp/include/cudf/detail/aggregation/aggregation.cuh
+++ b/cpp/include/cudf/detail/aggregation/aggregation.cuh
@@ -23,6 +23,7 @@
#include
#include
#include
+#include
#include
#include
@@ -31,7 +32,6 @@
#include
#include
-#include
namespace cudf {
namespace detail {
@@ -216,12 +216,12 @@ struct identity_initializer {
* @throw cudf::logic_error if column type is not fixed-width
*
* @param table The table of columns to initialize.
- * @param aggs A vector of aggregation operations corresponding to the table
+ * @param aggs A span of aggregation operations corresponding to the table
* columns. The aggregations determine the identity value for each column.
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
void initialize_with_identity(mutable_table_view& table,
- std::vector const& aggs,
+ host_span aggs,
rmm::cuda_stream_view stream);
} // namespace detail
diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp
index 6661a461b8b..d873e93bd20 100644
--- a/cpp/include/cudf/detail/aggregation/aggregation.hpp
+++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp
@@ -20,6 +20,7 @@
#include
#include
#include
+#include
#include
#include
@@ -88,6 +89,8 @@ class simple_aggregations_collector { // Declares the interface for the simple
class lead_lag_aggregation const& agg);
virtual std::vector> visit(data_type col_type,
class udf_aggregation const& agg);
+ virtual std::vector> visit(data_type col_type,
+ class host_udf_aggregation const& agg);
virtual std::vector> visit(data_type col_type,
class merge_lists_aggregation const& agg);
virtual std::vector> visit(data_type col_type,
@@ -135,6 +138,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer
virtual void visit(class collect_set_aggregation const& agg);
virtual void visit(class lead_lag_aggregation const& agg);
virtual void visit(class udf_aggregation const& agg);
+ virtual void visit(class host_udf_aggregation const& agg);
virtual void visit(class merge_lists_aggregation const& agg);
virtual void visit(class merge_sets_aggregation const& agg);
virtual void visit(class merge_m2_aggregation const& agg);
@@ -960,6 +964,35 @@ class udf_aggregation final : public rolling_aggregation {
}
};
+/**
+ * @brief Derived class for specifying host-based UDF aggregation.
+ */
+class host_udf_aggregation final : public groupby_aggregation {
+ public:
+ std::unique_ptr udf_ptr;
+
+ host_udf_aggregation() = delete;
+ host_udf_aggregation(host_udf_aggregation const&) = delete;
+
+ // Need to define the constructor and destructor in a separate source file where we have the
+ // complete declaration of `host_udf_base`.
+ explicit host_udf_aggregation(std::unique_ptr udf_ptr_);
+ ~host_udf_aggregation() override;
+
+ [[nodiscard]] bool is_equal(aggregation const& _other) const override;
+
+ [[nodiscard]] size_t do_hash() const override;
+
+ [[nodiscard]] std::unique_ptr clone() const override;
+
+ std::vector> get_simple_aggregations(
+ data_type col_type, simple_aggregations_collector& collector) const override
+ {
+ return collector.visit(col_type, *this);
+ }
+ void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
+};
+
/**
* @brief Derived aggregation class for specifying MERGE_LISTS aggregation
*/
@@ -1462,6 +1495,12 @@ struct target_type_impl