Skip to content

Commit

Permalink
Abstract and share code for fixed window clamped iterators
Browse files Browse the repository at this point in the history
Now neither the ungrouped nor the grouped fixed size rolling window
calculations need to materialise the preceding and following columns.
  • Loading branch information
wence- committed Dec 18, 2024
1 parent 3d3e80b commit 57d318c
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 130 deletions.
22 changes: 7 additions & 15 deletions cpp/src/rolling/detail/rolling_fixed_window.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "rolling.cuh"
#include "rolling_utils.cuh"

#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/iterator.cuh>
Expand Down Expand Up @@ -60,21 +61,12 @@ std::unique_ptr<column> rolling_window(column_view const& input,
stream,
mr);
} else {
auto preceding = cudf::detail::make_counting_transform_iterator(
size_type{0},
cuda::proclaim_return_type<size_type>(
[preceding_window, num_rows = input.size()] __device__(size_type i) -> size_type {
// No overflow, since 0 <= i < num_rows <= size_type::max, so i + 1 is always defined.
return cuda::std::min(i + 1, cuda::std::max(preceding_window, i + 1 - num_rows));
}));

auto following = cudf::detail::make_counting_transform_iterator(
size_type{0},
cuda::proclaim_return_type<size_type>(
[following_window, num_rows = input.size()] __device__(size_type i) -> size_type {
// No overflow, since 0 <= i < num_rows <= size_type::max, so -(i+1) is always defined.
return cuda::std::max(-i - 1, cuda::std::min(following_window, num_rows - i - 1));
}));
namespace utils = cudf::detail::rolling;
auto groups = utils::ungrouped{input.size()};
auto preceding =
utils::make_clamped_window_iterator<utils::direction::PRECEDING>(preceding_window, groups);
auto following =
utils::make_clamped_window_iterator<utils::direction::FOLLOWING>(following_window, groups);
return cudf::detail::rolling_window(
input, default_outputs, preceding, following, min_periods, agg, stream, mr);
}
Expand Down
119 changes: 119 additions & 0 deletions cpp/src/rolling/detail/rolling_utils.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/detail/iterator.cuh>
#include <cudf/types.hpp>

#include <cuda/functional>

namespace cudf {

namespace detail::rolling {

/**
* @brief A group descriptor for an ungrouped rolling window.
*
* @param num_rows The number of rows to be rolled over.
*
* @note This is used for uniformity of interface between grouped and ungrouped iterator
* construction.
*/
struct ungrouped {
cudf::size_type num_rows;

[[nodiscard]] __device__ constexpr cudf::size_type label(cudf::size_type) const noexcept
{
return 0;
}
[[nodiscard]] __device__ constexpr cudf::size_type start(cudf::size_type) const noexcept
{
return 0;
}
[[nodiscard]] __device__ constexpr cudf::size_type end(cudf::size_type) const noexcept
{
return num_rows;
}
};

/**
* @brief A group descriptor for a grouped rolling window.
*
* @param labels The group labels, mapping from input rows to group.
* @param offsets The group offsets providing the endpoints of each group.
*
* @note This is used for uniformity of interface between grouped and ungrouped iterator
* construction.
*/
struct grouped {
cudf::size_type const* labels;
cudf::size_type const* offsets;

[[nodiscard]] __device__ constexpr cudf::size_type label(cudf::size_type i) const noexcept
{
return labels[i];
}
[[nodiscard]] __device__ constexpr cudf::size_type start(cudf::size_type label) const noexcept
{
return offsets[label];
}
[[nodiscard]] __device__ constexpr cudf::size_type end(cudf::size_type label) const noexcept
{
return offsets[label + 1];
}
};

enum class direction : bool {
PRECEDING,
FOLLOWING,
};

template <typename Grouping, direction Direction>
struct fixed_window_clamper {
Grouping groups;
cudf::size_type delta;
[[nodiscard]] __device__ cudf::size_type operator()(cudf::size_type i)
{
auto label = groups.label(i);
auto start = groups.start(label);
auto end = groups.end(label);
if constexpr (Direction == direction::PRECEDING) {
return cuda::std::min(i + 1 - start, cuda::std::max(delta, i + 1 - end));
} else {
return cuda::std::max(start - i - 1, cuda::std::min(delta, end - i - 1));
}
}
};

/**
* @brief Construct a clamped counting iterator for a row-based window offset
*
* @tparam Direction the direction of the window `PRECEDING` or `FOLLOWING`.
* @tparam Grouping the group specification.
* @param delta the window offset.
* @param grouper the grouping object.
*
* @return An iterator suitable for passing to `cudf::detail::rolling_window`
*/
template <direction Direction, typename Grouping>
[[nodiscard]] auto inline make_clamped_window_iterator(cudf::size_type delta, Grouping grouper)
{
return cudf::detail::make_counting_transform_iterator(
cudf::size_type{0}, fixed_window_clamper<Grouping, Direction>{grouper, delta});
}
} // namespace detail::rolling
} // namespace cudf
124 changes: 9 additions & 115 deletions cpp/src/rolling/grouped_rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "detail/range_window_bounds.hpp"
#include "detail/rolling.cuh"
#include "detail/rolling_jit.hpp"
#include "detail/rolling_utils.cuh"

#include <cudf/detail/iterator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
Expand All @@ -43,109 +44,6 @@ namespace cudf {

namespace detail {

/// Preceding window calculation functor.
template <bool preceding_less_than_1>
struct row_based_preceding_calc {
cudf::size_type const* _group_offsets_begin;
cudf::size_type const* _group_labels_begin;
cudf::size_type const _preceding_window;

row_based_preceding_calc(rmm::device_uvector<cudf::size_type> const& group_offsets,
rmm::device_uvector<cudf::size_type> const& group_labels,
cudf::size_type const& preceding_window)
: _group_offsets_begin(group_offsets.data()),
_group_labels_begin(group_labels.data()),
_preceding_window(preceding_window)
{
}

__device__ cudf::size_type operator()(cudf::size_type const& idx) const
{
auto group_label = _group_labels_begin[idx];
if constexpr (preceding_less_than_1) { // where 1 indicates only the current row.
auto group_end = _group_offsets_begin[group_label + 1];
return thrust::maximum{}(_preceding_window, -(group_end - 1 - idx));
} else {
auto group_start = _group_offsets_begin[group_label];
return thrust::minimum{}(_preceding_window,
idx - group_start + 1); // Preceding includes current row.
}
}
};

/// Helper to materialize preceding-window column, corrected to respect group boundaries.
/// E.g. If preceding window == 5, then,
/// 1. For the first row in the group, the preceding is set to 1,
/// 2. For the next row in the group, preceding is set to 2, etc.
std::unique_ptr<cudf::column> make_preceding_column(
rmm::device_uvector<cudf::size_type> const& group_offsets,
rmm::device_uvector<cudf::size_type> const& group_labels,
cudf::size_type const& preceding_window,
cudf::size_type const& num_rows,
rmm::cuda_stream_view stream)
{
if (preceding_window < 1) {
auto const calc = row_based_preceding_calc<true>(group_offsets, group_labels, preceding_window);
return cudf::detail::expand_to_column(calc, num_rows, stream);
} else {
auto const calc =
row_based_preceding_calc<false>(group_offsets, group_labels, preceding_window);
return cudf::detail::expand_to_column(calc, num_rows, stream);
}
}

/// Following window calculation functor.
template <bool following_less_than_0>
struct row_based_following_calc {
cudf::size_type const* _group_offsets_begin;
cudf::size_type const* _group_labels_begin;
cudf::size_type const _following_window;

row_based_following_calc(rmm::device_uvector<cudf::size_type> const& group_offsets,
rmm::device_uvector<cudf::size_type> const& group_labels,
cudf::size_type const& following_window)
: _group_offsets_begin(group_offsets.data()),
_group_labels_begin(group_labels.data()),
_following_window(following_window)
{
}

__device__ cudf::size_type operator()(cudf::size_type const& idx) const
{
auto group_label = _group_labels_begin[idx];
if constexpr (following_less_than_0) {
auto group_start = _group_offsets_begin[group_label];
return thrust::maximum{}(_following_window, -(idx - group_start) - 1);
} else {
auto group_end =
_group_offsets_begin[group_label + 1]; // Cannot fall off the end, since offsets
// is capped with `input.size()`.
return thrust::minimum{}(_following_window, (group_end - 1) - idx);
}
}
};

/// Helper to materialize following-window column, corrected to respect group boundaries.
/// i.e. If following window == 5, then:
/// 1. For the last row in the group, the following is set to 0.
/// 2. For the second last row in the group, following is set to 1, etc.
std::unique_ptr<cudf::column> make_following_column(
rmm::device_uvector<cudf::size_type> const& group_offsets,
rmm::device_uvector<cudf::size_type> const& group_labels,
cudf::size_type const& following_window,
cudf::size_type const& num_rows,
rmm::cuda_stream_view stream)
{
if (following_window < 0) {
auto const calc = row_based_following_calc<true>(group_offsets, group_labels, following_window);
return cudf::detail::expand_to_column(calc, num_rows, stream);
} else {
auto const calc =
row_based_following_calc<false>(group_offsets, group_labels, following_window);
return cudf::detail::expand_to_column(calc, num_rows, stream);
}
}

std::unique_ptr<column> grouped_rolling_window(table_view const& group_keys,
column_view const& input,
column_view const& default_outputs,
Expand Down Expand Up @@ -225,18 +123,14 @@ std::unique_ptr<column> grouped_rolling_window(table_view const& group_keys,
stream,
mr);
} else {
auto const preceding_column =
make_preceding_column(group_offsets, group_labels, preceding_window, input.size(), stream);
auto const following_column =
make_following_column(group_offsets, group_labels, following_window, input.size(), stream);
return cudf::detail::rolling_window(input,
default_outputs,
preceding_column->view().begin<cudf::size_type>(),
following_column->view().begin<cudf::size_type>(),
min_periods,
aggr,
stream,
mr);
namespace utils = cudf::detail::rolling;
auto groups = utils::grouped{group_labels.data(), group_offsets.data()};
auto preceding =
utils::make_clamped_window_iterator<utils::direction::PRECEDING>(preceding_window, groups);
auto following =
utils::make_clamped_window_iterator<utils::direction::FOLLOWING>(following_window, groups);
return cudf::detail::rolling_window(
input, default_outputs, preceding, following, min_periods, aggr, stream, mr);
}
}

Expand Down

0 comments on commit 57d318c

Please sign in to comment.