Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-24.12' into fix-arrow-f…
Browse files Browse the repository at this point in the history
…s-parquet
  • Loading branch information
rjzamora committed Oct 21, 2024
2 parents dcc6cc3 + 074ab74 commit 96d122f
Show file tree
Hide file tree
Showing 79 changed files with 2,556 additions and 1,420 deletions.
5 changes: 5 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,13 @@ add_library(
src/filling/repeat.cu
src/filling/sequence.cu
src/groupby/groupby.cu
src/groupby/hash/compute_groupby.cu
src/groupby/hash/compute_single_pass_aggs.cu
src/groupby/hash/create_sparse_results_table.cu
src/groupby/hash/flatten_single_pass_aggs.cpp
src/groupby/hash/groupby.cu
src/groupby/hash/hash_compound_agg_finalizer.cu
src/groupby/hash/sparse_to_dense_results.cu
src/groupby/sort/aggregate.cpp
src/groupby/sort/group_argmax.cu
src/groupby/sort/group_argmin.cu
Expand Down
2 changes: 1 addition & 1 deletion cpp/doxygen/developer_guide/DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ any type that cudf supports. For example, a `list_scalar` representing a list of
|Value type|Scalar class|Notes|
|-|-|-|
|fixed-width|`fixed_width_scalar<T>`| `T` can be any fixed-width type|
|numeric|`numeric_scalar<T>` | `T` can be `int8_t`, `int16_t`, `int32_t`, `int_64_t`, `float` or `double`|
|numeric|`numeric_scalar<T>` | `T` can be `int8_t`, `int16_t`, `int32_t`, `int64_t`, `float` or `double`|
|fixed-point|`fixed_point_scalar<T>` | `T` can be `numeric::decimal32` or `numeric::decimal64`|
|timestamp|`timestamp_scalar<T>` | `T` can be `timestamp_D`, `timestamp_s`, etc.|
|duration|`duration_scalar<T>` | `T` can be `duration_D`, `duration_s`, etc.|
Expand Down
63 changes: 30 additions & 33 deletions cpp/include/cudf/detail/aggregation/device_aggregators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/aggregation.hpp>
Expand All @@ -29,12 +28,31 @@
#include <cuda/std/type_traits>

namespace cudf::detail {
/// Checks if an aggregation kind needs to operate on the underlying storage type
template <aggregation::Kind k>
__device__ constexpr bool uses_underlying_type()
{
return k == aggregation::MIN or k == aggregation::MAX or k == aggregation::SUM;
}

/// Gets the underlying target type for the given source type and aggregation kind
template <typename Source, aggregation::Kind k>
using underlying_target_t =
cuda::std::conditional_t<uses_underlying_type<k>(),
cudf::device_storage_type_t<cudf::detail::target_type_t<Source, k>>,
cudf::detail::target_type_t<Source, k>>;

/// Gets the underlying source type for the given source type and aggregation kind
template <typename Source, aggregation::Kind k>
using underlying_source_t =
cuda::std::conditional_t<uses_underlying_type<k>(), cudf::device_storage_type_t<Source>, Source>;

template <typename Source, aggregation::Kind k, typename Enable = void>
struct update_target_element {
__device__ void operator()(mutable_column_device_view target,
size_type target_index,
column_device_view source,
size_type source_index) const noexcept
__device__ void operator()(mutable_column_device_view,
size_type,
column_device_view,
size_type) const noexcept
{
CUDF_UNREACHABLE("Invalid source type and aggregation combination.");
}
Expand All @@ -51,8 +69,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::MIN>;
cudf::detail::atomic_min(&target.element<Target>(target_index),
static_cast<Target>(source.element<Source>(source_index)));
Expand All @@ -72,8 +88,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::MIN>;
using DeviceTarget = device_storage_type_t<Target>;
using DeviceSource = device_storage_type_t<Source>;
Expand All @@ -96,8 +110,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::MAX>;
cudf::detail::atomic_max(&target.element<Target>(target_index),
static_cast<Target>(source.element<Source>(source_index)));
Expand All @@ -117,8 +129,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::MAX>;
using DeviceTarget = device_storage_type_t<Target>;
using DeviceSource = device_storage_type_t<Source>;
Expand All @@ -141,8 +151,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::SUM>;
cudf::detail::atomic_add(&target.element<Target>(target_index),
static_cast<Target>(source.element<Source>(source_index)));
Expand All @@ -162,8 +170,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::SUM>;
using DeviceTarget = device_storage_type_t<Target>;
using DeviceSource = device_storage_type_t<Source>;
Expand Down Expand Up @@ -197,10 +203,10 @@ struct update_target_from_dictionary {
template <typename Source,
aggregation::Kind k,
cuda::std::enable_if_t<is_dictionary<Source>()>* = nullptr>
__device__ void operator()(mutable_column_device_view target,
size_type target_index,
column_device_view source,
size_type source_index) const noexcept
__device__ void operator()(mutable_column_device_view,
size_type,
column_device_view,
size_type) const noexcept
{
}
};
Expand All @@ -227,8 +233,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

dispatch_type_and_aggregation(
source.child(cudf::dictionary_column_view::keys_column_index).type(),
k,
Expand All @@ -249,8 +253,6 @@ struct update_target_element<Source,
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::SUM_OF_SQUARES>;
auto value = static_cast<Target>(source.element<Source>(source_index));
cudf::detail::atomic_add(&target.element<Target>(target_index), value * value);
Expand All @@ -267,8 +269,6 @@ struct update_target_element<Source,
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::PRODUCT>;
cudf::detail::atomic_mul(&target.element<Target>(target_index),
static_cast<Target>(source.element<Source>(source_index)));
Expand All @@ -286,8 +286,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::COUNT_VALID>;
cudf::detail::atomic_add(&target.element<Target>(target_index), Target{1});

Expand Down Expand Up @@ -323,8 +321,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::ARGMAX>;
auto old = cudf::detail::atomic_cas(
&target.element<Target>(target_index), ARGMAX_SENTINEL, source_index);
Expand All @@ -349,8 +345,6 @@ struct update_target_element<
column_device_view source,
size_type source_index) const noexcept
{
if (source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::ARGMIN>;
auto old = cudf::detail::atomic_cas(
&target.element<Target>(target_index), ARGMIN_SENTINEL, source_index);
Expand All @@ -376,6 +370,9 @@ struct elementwise_aggregator {
column_device_view source,
size_type source_index) const noexcept
{
if constexpr (k != cudf::aggregation::COUNT_ALL) {
if (source.is_null(source_index)) { return; }
}
update_target_element<Source, k>{}(target, target_index, source, source_index);
}
};
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/detail/copy_if.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/device_scalar.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
Expand All @@ -36,7 +37,6 @@

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/device_scalar.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

Expand Down Expand Up @@ -256,7 +256,7 @@ struct scatter_gather_functor {

cudf::detail::grid_1d grid{input.size(), block_size, per_thread};

rmm::device_scalar<cudf::size_type> null_count{0, stream};
cudf::detail::device_scalar<cudf::size_type> null_count{0, stream};
if (output.nullable()) {
// Have to initialize the output mask to all zeros because we may update
// it with atomicOr().
Expand Down
5 changes: 2 additions & 3 deletions cpp/include/cudf/detail/copy_if_else.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
#include <cudf/column/column.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/device_scalar.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/device_scalar.hpp>

#include <cuda/std/optional>
#include <thrust/iterator/iterator_traits.h>

Expand Down Expand Up @@ -171,7 +170,7 @@ std::unique_ptr<column> copy_if_else(bool nullable,

// if we have validity in the output
if (nullable) {
rmm::device_scalar<size_type> valid_count{0, stream};
cudf::detail::device_scalar<size_type> valid_count{0, stream};

// call the kernel
copy_if_else_kernel<block_size, Element, LeftIter, RightIter, FilterFn, true>
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/detail/copy_range.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_view.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/device_scalar.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
Expand All @@ -27,7 +28,6 @@
#include <cudf/utilities/type_dispatcher.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_scalar.hpp>

#include <cub/cub.cuh>
#include <cuda_runtime.h>
Expand Down Expand Up @@ -154,7 +154,7 @@ void copy_range(SourceValueIterator source_value_begin,
auto grid = cudf::detail::grid_1d{num_items, block_size, 1};

if (target.nullable()) {
rmm::device_scalar<size_type> null_count(target.null_count(), stream);
cudf::detail::device_scalar<size_type> null_count(target.null_count(), stream);

auto kernel =
copy_range_kernel<block_size, SourceValueIterator, SourceValidityIterator, T, true>;
Expand Down
103 changes: 103 additions & 0 deletions cpp/include/cudf/detail/device_scalar.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/host_vector.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_scalar.hpp>
#include <rmm/resource_ref.hpp>

namespace CUDF_EXPORT cudf {
namespace detail {

template <typename T>
class device_scalar : public rmm::device_scalar<T> {
public:
#ifdef __CUDACC__
#pragma nv_exec_check_disable
#endif
~device_scalar() = default;

// Implementation is the same as what compiler should generate
// Could not use default move constructor as 11.8 compiler fails to generate it
#ifdef __CUDACC__
#pragma nv_exec_check_disable
#endif
device_scalar(device_scalar&& other) noexcept
: rmm::device_scalar<T>{std::move(other)}, bounce_buffer{std::move(other.bounce_buffer)}
{
}
device_scalar& operator=(device_scalar&&) noexcept = default;

device_scalar(device_scalar const&) = delete;
device_scalar& operator=(device_scalar const&) = delete;

device_scalar() = delete;

explicit device_scalar(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref())
: rmm::device_scalar<T>(stream, mr), bounce_buffer{make_host_vector<T>(1, stream)}
{
}

explicit device_scalar(
T const& initial_value,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref())
: rmm::device_scalar<T>(stream, mr), bounce_buffer{make_host_vector<T>(1, stream)}
{
bounce_buffer[0] = initial_value;
cuda_memcpy_async<T>(device_span<T>{this->data(), 1}, bounce_buffer, stream);
}

device_scalar(device_scalar const& other,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref())
: rmm::device_scalar<T>(other, stream, mr), bounce_buffer{make_host_vector<T>(1, stream)}
{
}

[[nodiscard]] T value(rmm::cuda_stream_view stream) const
{
cuda_memcpy<T>(bounce_buffer, device_span<T const>{this->data(), 1}, stream);
return bounce_buffer[0];
}

void set_value_async(T const& value, rmm::cuda_stream_view stream)
{
bounce_buffer[0] = value;
cuda_memcpy_async<T>(device_span<T>{this->data(), 1}, bounce_buffer, stream);
}

void set_value_async(T&& value, rmm::cuda_stream_view stream)
{
bounce_buffer[0] = std::move(value);
cuda_memcpy_async<T>(device_span<T>{this->data(), 1}, bounce_buffer, stream);
}

void set_value_to_zero_async(rmm::cuda_stream_view stream) { set_value_async(T{}, stream); }

private:
mutable cudf::detail::host_vector<T> bounce_buffer;
};

} // namespace detail
} // namespace CUDF_EXPORT cudf
Loading

0 comments on commit 96d122f

Please sign in to comment.