diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d3bf7019e35..559826ac232 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -394,11 +394,14 @@ add_library( src/filling/repeat.cu src/filling/sequence.cu src/groupby/groupby.cu + src/groupby/hash/compute_aggregations.cu + src/groupby/hash/compute_aggregations_null.cu + src/groupby/hash/compute_global_memory_aggs.cu + src/groupby/hash/compute_global_memory_aggs_null.cu src/groupby/hash/compute_groupby.cu src/groupby/hash/compute_mapping_indices.cu src/groupby/hash/compute_mapping_indices_null.cu src/groupby/hash/compute_shared_memory_aggs.cu - src/groupby/hash/compute_single_pass_aggs.cu src/groupby/hash/create_sparse_results_table.cu src/groupby/hash/flatten_single_pass_aggs.cpp src/groupby/hash/groupby.cu diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu index cc0682b68b9..6eb82618e2a 100644 --- a/cpp/src/groupby/groupby.cu +++ b/cpp/src/groupby/groupby.cu @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include diff --git a/cpp/src/groupby/hash/compute_aggregations.cu b/cpp/src/groupby/hash/compute_aggregations.cu new file mode 100644 index 00000000000..cac6c2224f0 --- /dev/null +++ b/cpp/src/groupby/hash/compute_aggregations.cu @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute_aggregations.cuh" +#include "compute_aggregations.hpp" + +namespace cudf::groupby::detail::hash { +template rmm::device_uvector compute_aggregations( + int64_t num_rows, + bool skip_rows_with_nulls, + bitmask_type const* row_bitmask, + global_set_t& global_set, + cudf::host_span requests, + cudf::detail::result_cache* sparse_results, + rmm::cuda_stream_view stream); +} // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_aggregations.cuh b/cpp/src/groupby/hash/compute_aggregations.cuh new file mode 100644 index 00000000000..e8b29a0e7a8 --- /dev/null +++ b/cpp/src/groupby/hash/compute_aggregations.cuh @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "compute_aggregations.hpp" +#include "compute_global_memory_aggs.hpp" +#include "compute_mapping_indices.hpp" +#include "compute_shared_memory_aggs.hpp" +#include "create_sparse_results_table.hpp" +#include "flatten_single_pass_aggs.hpp" +#include "helpers.cuh" +#include "single_pass_functors.cuh" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace cudf::groupby::detail::hash { +/** + * @brief Computes all aggregations from `requests` that require a single pass + * over the data and stores the results in `sparse_results` + */ +template +rmm::device_uvector compute_aggregations( + int64_t num_rows, + bool skip_rows_with_nulls, + bitmask_type const* row_bitmask, + SetType& global_set, + cudf::host_span requests, + cudf::detail::result_cache* sparse_results, + rmm::cuda_stream_view stream) +{ + // flatten the aggs to a table that can be operated on by aggregate_row + auto [flattened_values, agg_kinds, aggs] = flatten_single_pass_aggs(requests); + auto const d_agg_kinds = cudf::detail::make_device_uvector_async( + agg_kinds, stream, rmm::mr::get_current_device_resource()); + + auto const grid_size = + max_occupancy_grid_size>(num_rows); + auto const available_shmem_size = get_available_shared_memory_size(grid_size); + auto const has_sufficient_shmem = + available_shmem_size > (compute_shmem_offsets_size(flattened_values.num_columns()) * 2); + auto const has_dictionary_request = std::any_of( + requests.begin(), requests.end(), [](cudf::groupby::aggregation_request const& request) { + return cudf::is_dictionary(request.values.type()); + }); + auto const is_shared_memory_compatible = !has_dictionary_request and has_sufficient_shmem; + + // Performs naive global memory aggregations when the workload is not compatible with shared + // memory, such as when aggregating dictionary columns or when there is insufficient dynamic + // shared memory for shared memory aggregations. + if (!is_shared_memory_compatible) { + return compute_global_memory_aggs(num_rows, + skip_rows_with_nulls, + row_bitmask, + flattened_values, + d_agg_kinds.data(), + agg_kinds, + global_set, + aggs, + sparse_results, + stream); + } + + // 'populated_keys' contains inserted row_indices (keys) of global hash set + rmm::device_uvector populated_keys(num_rows, stream); + // 'local_mapping_index' maps from the global row index of the input table to its block-wise rank + rmm::device_uvector local_mapping_index(num_rows, stream); + // 'global_mapping_index' maps from the block-wise rank to the row index of global aggregate table + rmm::device_uvector global_mapping_index(grid_size * GROUPBY_SHM_MAX_ELEMENTS, + stream); + rmm::device_uvector block_cardinality(grid_size, stream); + + // Flag indicating whether a global memory aggregation fallback is required or not + rmm::device_scalar needs_global_memory_fallback(stream); + + auto global_set_ref = global_set.ref(cuco::op::insert_and_find); + + compute_mapping_indices(grid_size, + num_rows, + global_set_ref, + row_bitmask, + skip_rows_with_nulls, + local_mapping_index.data(), + global_mapping_index.data(), + block_cardinality.data(), + needs_global_memory_fallback.data(), + stream); + + cuda::std::atomic_flag h_needs_fallback; + // Cannot use `device_scalar::value` as it requires a copy constructor, which + // `atomic_flag` doesn't have. + CUDF_CUDA_TRY(cudaMemcpyAsync(&h_needs_fallback, + needs_global_memory_fallback.data(), + sizeof(cuda::std::atomic_flag), + cudaMemcpyDefault, + stream.value())); + stream.synchronize(); + auto const needs_fallback = h_needs_fallback.test(); + + // make table that will hold sparse results + cudf::table sparse_table = create_sparse_results_table(flattened_values, + d_agg_kinds.data(), + agg_kinds, + needs_fallback, + global_set, + populated_keys, + stream); + // prepare to launch kernel to do the actual aggregation + auto d_values = table_device_view::create(flattened_values, stream); + auto d_sparse_table = mutable_table_device_view::create(sparse_table, stream); + + compute_shared_memory_aggs(grid_size, + available_shmem_size, + num_rows, + row_bitmask, + skip_rows_with_nulls, + local_mapping_index.data(), + global_mapping_index.data(), + block_cardinality.data(), + *d_values, + *d_sparse_table, + d_agg_kinds.data(), + stream); + + // The shared memory groupby is designed so that each thread block can handle up to 128 unique + // keys. When a block reaches this cardinality limit, shared memory becomes insufficient to store + // the temporary aggregation results. In these situations, we must fall back to a global memory + // aggregator to process the remaining aggregation requests. + if (needs_fallback) { + auto const stride = GROUPBY_BLOCK_SIZE * grid_size; + thrust::for_each_n(rmm::exec_policy_nosync(stream), + thrust::counting_iterator{0}, + num_rows, + global_memory_fallback_fn{global_set_ref, + *d_values, + *d_sparse_table, + d_agg_kinds.data(), + block_cardinality.data(), + stride, + row_bitmask, + skip_rows_with_nulls}); + extract_populated_keys(global_set, populated_keys, stream); + } + + // Add results back to sparse_results cache + auto sparse_result_cols = sparse_table.release(); + for (size_t i = 0; i < aggs.size(); i++) { + // Note that the cache will make a copy of this temporary aggregation + sparse_results->add_result( + flattened_values.column(i), *aggs[i], std::move(sparse_result_cols[i])); + } + + return populated_keys; +} +} // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_single_pass_aggs.hpp b/cpp/src/groupby/hash/compute_aggregations.hpp similarity index 70% rename from cpp/src/groupby/hash/compute_single_pass_aggs.hpp rename to cpp/src/groupby/hash/compute_aggregations.hpp index a7434bdf61a..829c3c808b0 100644 --- a/cpp/src/groupby/hash/compute_single_pass_aggs.hpp +++ b/cpp/src/groupby/hash/compute_aggregations.hpp @@ -21,6 +21,7 @@ #include #include +#include namespace cudf::groupby::detail::hash { /** @@ -28,11 +29,12 @@ namespace cudf::groupby::detail::hash { * over the data and stores the results in `sparse_results` */ template -void compute_single_pass_aggs(int64_t num_keys, - bool skip_rows_with_nulls, - bitmask_type const* row_bitmask, - SetType set, - cudf::host_span requests, - cudf::detail::result_cache* sparse_results, - rmm::cuda_stream_view stream); +rmm::device_uvector compute_aggregations( + int64_t num_rows, + bool skip_rows_with_nulls, + bitmask_type const* row_bitmask, + SetType& global_set, + cudf::host_span requests, + cudf::detail::result_cache* sparse_results, + rmm::cuda_stream_view stream); } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_aggregations_null.cu b/cpp/src/groupby/hash/compute_aggregations_null.cu new file mode 100644 index 00000000000..1d7184227ea --- /dev/null +++ b/cpp/src/groupby/hash/compute_aggregations_null.cu @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute_aggregations.cuh" +#include "compute_aggregations.hpp" + +namespace cudf::groupby::detail::hash { +template rmm::device_uvector compute_aggregations( + int64_t num_rows, + bool skip_rows_with_nulls, + bitmask_type const* row_bitmask, + nullable_global_set_t& global_set, + cudf::host_span requests, + cudf::detail::result_cache* sparse_results, + rmm::cuda_stream_view stream); +} // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_global_memory_aggs.cu b/cpp/src/groupby/hash/compute_global_memory_aggs.cu new file mode 100644 index 00000000000..6025686953e --- /dev/null +++ b/cpp/src/groupby/hash/compute_global_memory_aggs.cu @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute_global_memory_aggs.cuh" +#include "compute_global_memory_aggs.hpp" + +namespace cudf::groupby::detail::hash { +template rmm::device_uvector compute_global_memory_aggs( + cudf::size_type num_rows, + bool skip_rows_with_nulls, + bitmask_type const* row_bitmask, + cudf::table_view const& flattened_values, + cudf::aggregation::Kind const* d_agg_kinds, + std::vector const& agg_kinds, + global_set_t& global_set, + std::vector>& aggregations, + cudf::detail::result_cache* sparse_results, + rmm::cuda_stream_view stream); +} // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_global_memory_aggs.cuh b/cpp/src/groupby/hash/compute_global_memory_aggs.cuh new file mode 100644 index 00000000000..00db149c6d9 --- /dev/null +++ b/cpp/src/groupby/hash/compute_global_memory_aggs.cuh @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "compute_global_memory_aggs.hpp" +#include "create_sparse_results_table.hpp" +#include "flatten_single_pass_aggs.hpp" +#include "helpers.cuh" +#include "single_pass_functors.cuh" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include + +namespace cudf::groupby::detail::hash { +template +rmm::device_uvector compute_global_memory_aggs( + cudf::size_type num_rows, + bool skip_rows_with_nulls, + bitmask_type const* row_bitmask, + cudf::table_view const& flattened_values, + cudf::aggregation::Kind const* d_agg_kinds, + std::vector const& agg_kinds, + SetType& global_set, + std::vector>& aggregations, + cudf::detail::result_cache* sparse_results, + rmm::cuda_stream_view stream) +{ + auto constexpr uses_global_memory_aggs = true; + // 'populated_keys' contains inserted row_indices (keys) of global hash set + rmm::device_uvector populated_keys(num_rows, stream); + + // make table that will hold sparse results + cudf::table sparse_table = create_sparse_results_table(flattened_values, + d_agg_kinds, + agg_kinds, + uses_global_memory_aggs, + global_set, + populated_keys, + stream); + + // prepare to launch kernel to do the actual aggregation + auto d_values = table_device_view::create(flattened_values, stream); + auto d_sparse_table = mutable_table_device_view::create(sparse_table, stream); + auto global_set_ref = global_set.ref(cuco::op::insert_and_find); + + thrust::for_each_n( + rmm::exec_policy_nosync(stream), + thrust::counting_iterator{0}, + num_rows, + hash::compute_single_pass_aggs_fn{ + global_set_ref, *d_values, *d_sparse_table, d_agg_kinds, row_bitmask, skip_rows_with_nulls}); + extract_populated_keys(global_set, populated_keys, stream); + + // Add results back to sparse_results cache + auto sparse_result_cols = sparse_table.release(); + for (size_t i = 0; i < aggregations.size(); i++) { + // Note that the cache will make a copy of this temporary aggregation + sparse_results->add_result( + flattened_values.column(i), *aggregations[i], std::move(sparse_result_cols[i])); + } + + return populated_keys; +} +} // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_global_memory_aggs.hpp b/cpp/src/groupby/hash/compute_global_memory_aggs.hpp new file mode 100644 index 00000000000..0777b9ffd93 --- /dev/null +++ b/cpp/src/groupby/hash/compute_global_memory_aggs.hpp @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace cudf::groupby::detail::hash { +template +rmm::device_uvector compute_global_memory_aggs( + cudf::size_type num_rows, + bool skip_rows_with_nulls, + bitmask_type const* row_bitmask, + cudf::table_view const& flattened_values, + cudf::aggregation::Kind const* d_agg_kinds, + std::vector const& agg_kinds, + SetType& global_set, + std::vector>& aggregations, + cudf::detail::result_cache* sparse_results, + rmm::cuda_stream_view stream); +} // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_global_memory_aggs_null.cu b/cpp/src/groupby/hash/compute_global_memory_aggs_null.cu new file mode 100644 index 00000000000..209e2b7f20a --- /dev/null +++ b/cpp/src/groupby/hash/compute_global_memory_aggs_null.cu @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute_global_memory_aggs.cuh" +#include "compute_global_memory_aggs.hpp" + +namespace cudf::groupby::detail::hash { +template rmm::device_uvector compute_global_memory_aggs( + cudf::size_type num_rows, + bool skip_rows_with_nulls, + bitmask_type const* row_bitmask, + cudf::table_view const& flattened_values, + cudf::aggregation::Kind const* d_agg_kinds, + std::vector const& agg_kinds, + nullable_global_set_t& global_set, + std::vector>& aggregations, + cudf::detail::result_cache* sparse_results, + rmm::cuda_stream_view stream); +} // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_groupby.cu b/cpp/src/groupby/hash/compute_groupby.cu index 59457bea694..e1dbf2a3d9e 100644 --- a/cpp/src/groupby/hash/compute_groupby.cu +++ b/cpp/src/groupby/hash/compute_groupby.cu @@ -14,8 +14,8 @@ * limitations under the License. */ +#include "compute_aggregations.hpp" #include "compute_groupby.hpp" -#include "compute_single_pass_aggs.hpp" #include "helpers.cuh" #include "sparse_to_dense_results.hpp" @@ -29,7 +29,6 @@ #include #include -#include #include #include @@ -38,18 +37,6 @@ #include namespace cudf::groupby::detail::hash { -template -rmm::device_uvector extract_populated_keys(SetType const& key_set, - size_type num_keys, - rmm::cuda_stream_view stream) -{ - rmm::device_uvector populated_keys(num_keys, stream); - auto const keys_end = key_set.retrieve_all(populated_keys.begin(), stream.value()); - - populated_keys.resize(std::distance(populated_keys.begin(), keys_end), stream); - return populated_keys; -} - template std::unique_ptr compute_groupby(table_view const& keys, host_span requests, @@ -67,8 +54,8 @@ std::unique_ptr
compute_groupby(table_view const& keys, // column is indexed by the hash set cudf::detail::result_cache sparse_results(requests.size()); - auto const set = cuco::static_set{ - num_keys, + auto set = cuco::static_set{ + cuco::extent{num_keys}, cudf::detail::CUCO_DESIRED_LOAD_FACTOR, // 50% load factor cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL}, d_row_equal, @@ -84,17 +71,13 @@ std::unique_ptr
compute_groupby(table_view const& keys, : rmm::device_buffer{}; // Compute all single pass aggs first - compute_single_pass_aggs(num_keys, - skip_rows_with_nulls, - static_cast(row_bitmask.data()), - set.ref(cuco::insert_and_find), - requests, - &sparse_results, - stream); - - // Extract the populated indices from the hash set and create a gather map. - // Gathering using this map from sparse results will give dense results. - auto gather_map = extract_populated_keys(set, keys.num_rows(), stream); + auto gather_map = compute_aggregations(num_keys, + skip_rows_with_nulls, + static_cast(row_bitmask.data()), + set, + requests, + &sparse_results, + stream); // Compact all results from sparse_results and insert into cache sparse_to_dense_results(requests, @@ -114,12 +97,6 @@ std::unique_ptr
compute_groupby(table_view const& keys, mr); } -template rmm::device_uvector extract_populated_keys( - global_set_t const& key_set, size_type num_keys, rmm::cuda_stream_view stream); - -template rmm::device_uvector extract_populated_keys( - nullable_global_set_t const& key_set, size_type num_keys, rmm::cuda_stream_view stream); - template std::unique_ptr
compute_groupby( table_view const& keys, host_span requests, diff --git a/cpp/src/groupby/hash/compute_groupby.hpp b/cpp/src/groupby/hash/compute_groupby.hpp index 7bb3a60ff07..77243dc0a4f 100644 --- a/cpp/src/groupby/hash/compute_groupby.hpp +++ b/cpp/src/groupby/hash/compute_groupby.hpp @@ -22,28 +22,11 @@ #include #include -#include #include #include namespace cudf::groupby::detail::hash { -/** - * @brief Computes and returns a device vector containing all populated keys in - * `key_set`. - * - * @tparam SetType Type of key hash set - * - * @param key_set Key hash set - * @param num_keys Number of input keys - * @param stream CUDA stream used for device memory operations and kernel launches - * @return An array of unique keys contained in `key_set` - */ -template -rmm::device_uvector extract_populated_keys(SetType const& key_set, - size_type num_keys, - rmm::cuda_stream_view stream); - /** * @brief Computes groupby using hash table. * diff --git a/cpp/src/groupby/hash/compute_shared_memory_aggs.cu b/cpp/src/groupby/hash/compute_shared_memory_aggs.cu index 12c02a1865e..f0361ccced2 100644 --- a/cpp/src/groupby/hash/compute_shared_memory_aggs.cu +++ b/cpp/src/groupby/hash/compute_shared_memory_aggs.cu @@ -47,9 +47,8 @@ struct size_of_functor { /// Shared memory data alignment CUDF_HOST_DEVICE cudf::size_type constexpr ALIGNMENT = 8; -// Prepares shared memory data required by each output column, exits if -// no enough memory space to perform the shared memory aggregation for the -// current output column +// Allocates shared memory required for output columns. Exits if there is insufficient memory to +// perform shared memory aggregation for the current output column. __device__ void calculate_columns_to_aggregate(cudf::size_type& col_start, cudf::size_type& col_end, cudf::mutable_table_device_view output_values, @@ -74,9 +73,7 @@ __device__ void calculate_columns_to_aggregate(cudf::size_type& col_start, ALIGNMENT); auto const next_col_total_size = next_col_size + valid_col_size; - if (bytes_allocated + next_col_total_size > total_agg_size) { - CUDF_UNREACHABLE("Not enough memory for shared memory aggregations"); - } + if (bytes_allocated + next_col_total_size > total_agg_size) { break; } shmem_agg_res_offsets[col_end] = bytes_allocated; shmem_agg_mask_offsets[col_end] = bytes_allocated + next_col_size; @@ -275,7 +272,7 @@ CUDF_KERNEL void single_pass_shmem_aggs_kernel(cudf::size_type num_rows, } } // namespace -std::size_t available_shared_memory_size(cudf::size_type grid_size) +std::size_t get_available_shared_memory_size(cudf::size_type grid_size) { auto const active_blocks_per_sm = cudf::util::div_rounding_up_safe(grid_size, cudf::detail::num_multiprocessors()); @@ -302,11 +299,11 @@ void compute_shared_memory_aggs(cudf::size_type grid_size, { // For each aggregation, need one offset determining where the aggregation is // performed, another indicating the validity of the aggregation - auto const shmem_offsets_size = output_values.num_columns() * sizeof(cudf::size_type); + auto const offsets_size = compute_shmem_offsets_size(output_values.num_columns()); // The rest of shmem is utilized for the actual arrays in shmem - CUDF_EXPECTS(available_shmem_size > shmem_offsets_size * 2, + CUDF_EXPECTS(available_shmem_size > offsets_size * 2, "No enough space for shared memory aggregations"); - auto const shmem_agg_size = available_shmem_size - shmem_offsets_size * 2; + auto const shmem_agg_size = available_shmem_size - offsets_size * 2; single_pass_shmem_aggs_kernel<<>>( num_input_rows, row_bitmask, @@ -318,6 +315,6 @@ void compute_shared_memory_aggs(cudf::size_type grid_size, output_values, d_agg_kinds, shmem_agg_size, - shmem_offsets_size); + offsets_size); } } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_shared_memory_aggs.hpp b/cpp/src/groupby/hash/compute_shared_memory_aggs.hpp index 653821fd53b..346956cdab0 100644 --- a/cpp/src/groupby/hash/compute_shared_memory_aggs.hpp +++ b/cpp/src/groupby/hash/compute_shared_memory_aggs.hpp @@ -22,8 +22,12 @@ #include namespace cudf::groupby::detail::hash { +std::size_t get_available_shared_memory_size(cudf::size_type grid_size); -std::size_t available_shared_memory_size(cudf::size_type grid_size); +std::size_t constexpr compute_shmem_offsets_size(cudf::size_type num_cols) +{ + return sizeof(cudf::size_type) * num_cols; +} void compute_shared_memory_aggs(cudf::size_type grid_size, std::size_t available_shmem_size, @@ -37,5 +41,4 @@ void compute_shared_memory_aggs(cudf::size_type grid_size, cudf::mutable_table_device_view output_values, cudf::aggregation::Kind const* d_agg_kinds, rmm::cuda_stream_view stream); - } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_single_pass_aggs.cu b/cpp/src/groupby/hash/compute_single_pass_aggs.cu deleted file mode 100644 index e292543e6e9..00000000000 --- a/cpp/src/groupby/hash/compute_single_pass_aggs.cu +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "compute_single_pass_aggs.hpp" -#include "create_sparse_results_table.hpp" -#include "flatten_single_pass_aggs.hpp" -#include "helpers.cuh" -#include "single_pass_functors.cuh" -#include "var_hash_functor.cuh" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include -#include -#include - -namespace cudf::groupby::detail::hash { -/** - * @brief Computes all aggregations from `requests` that require a single pass - * over the data and stores the results in `sparse_results` - */ -template -void compute_single_pass_aggs(int64_t num_keys, - bool skip_rows_with_nulls, - bitmask_type const* row_bitmask, - SetType set, - host_span requests, - cudf::detail::result_cache* sparse_results, - rmm::cuda_stream_view stream) -{ - // flatten the aggs to a table that can be operated on by aggregate_row - auto const [flattened_values, agg_kinds, aggs] = flatten_single_pass_aggs(requests); - - // make table that will hold sparse results - table sparse_table = create_sparse_results_table(flattened_values, agg_kinds, stream); - // prepare to launch kernel to do the actual aggregation - auto d_sparse_table = mutable_table_device_view::create(sparse_table, stream); - auto d_values = table_device_view::create(flattened_values, stream); - auto const d_aggs = cudf::detail::make_device_uvector_async( - agg_kinds, stream, cudf::get_current_device_resource_ref()); - - thrust::for_each_n( - rmm::exec_policy_nosync(stream), - thrust::make_counting_iterator(0), - num_keys, - hash::compute_single_pass_aggs_fn{ - set, *d_values, *d_sparse_table, d_aggs.data(), row_bitmask, skip_rows_with_nulls}); - // Add results back to sparse_results cache - auto sparse_result_cols = sparse_table.release(); - for (size_t i = 0; i < aggs.size(); i++) { - // Note that the cache will make a copy of this temporary aggregation - sparse_results->add_result( - flattened_values.column(i), *aggs[i], std::move(sparse_result_cols[i])); - } -} - -template void compute_single_pass_aggs>( - int64_t num_keys, - bool skip_rows_with_nulls, - bitmask_type const* row_bitmask, - hash_set_ref_t set, - host_span requests, - cudf::detail::result_cache* sparse_results, - rmm::cuda_stream_view stream); - -template void compute_single_pass_aggs>( - int64_t num_keys, - bool skip_rows_with_nulls, - bitmask_type const* row_bitmask, - nullable_hash_set_ref_t set, - host_span requests, - cudf::detail::result_cache* sparse_results, - rmm::cuda_stream_view stream); -} // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/create_sparse_results_table.cu b/cpp/src/groupby/hash/create_sparse_results_table.cu index 22fa4fc584c..bc32e306b3f 100644 --- a/cpp/src/groupby/hash/create_sparse_results_table.cu +++ b/cpp/src/groupby/hash/create_sparse_results_table.cu @@ -15,53 +15,110 @@ */ #include "create_sparse_results_table.hpp" +#include "helpers.cuh" +#include "single_pass_functors.cuh" +#include #include #include -#include -#include -#include #include #include #include +#include + +#include #include #include #include namespace cudf::groupby::detail::hash { +template +void extract_populated_keys(SetType const& key_set, + rmm::device_uvector& populated_keys, + rmm::cuda_stream_view stream) +{ + auto const keys_end = key_set.retrieve_all(populated_keys.begin(), stream.value()); + + populated_keys.resize(std::distance(populated_keys.begin(), keys_end), stream); +} + // make table that will hold sparse results -cudf::table create_sparse_results_table(table_view const& flattened_values, - std::vector aggs, +template +cudf::table create_sparse_results_table(cudf::table_view const& flattened_values, + cudf::aggregation::Kind const* d_agg_kinds, + std::vector agg_kinds, + bool direct_aggregations, + GlobalSetType const& global_set, + rmm::device_uvector& populated_keys, rmm::cuda_stream_view stream) { // TODO single allocation - room for performance improvement - std::vector> sparse_columns; - sparse_columns.reserve(flattened_values.num_columns()); - std::transform( - flattened_values.begin(), - flattened_values.end(), - aggs.begin(), - std::back_inserter(sparse_columns), - [stream](auto const& col, auto const& agg) { - bool nullable = - (agg == aggregation::COUNT_VALID or agg == aggregation::COUNT_ALL) - ? false - : (col.has_nulls() or agg == aggregation::VARIANCE or agg == aggregation::STD); - auto mask_flag = (nullable) ? mask_state::ALL_NULL : mask_state::UNALLOCATED; + std::vector> sparse_columns; + std::transform(flattened_values.begin(), + flattened_values.end(), + agg_kinds.begin(), + std::back_inserter(sparse_columns), + [stream](auto const& col, auto const& agg) { + auto const nullable = + (agg == cudf::aggregation::COUNT_VALID or agg == cudf::aggregation::COUNT_ALL) + ? false + : (col.has_nulls() or agg == cudf::aggregation::VARIANCE or + agg == cudf::aggregation::STD); + auto const mask_flag = + (nullable) ? cudf::mask_state::ALL_NULL : cudf::mask_state::UNALLOCATED; + auto const col_type = cudf::is_dictionary(col.type()) + ? cudf::dictionary_column_view(col).keys().type() + : col.type(); + return make_fixed_width_column( + cudf::detail::target_type(col_type, agg), col.size(), mask_flag, stream); + }); + cudf::table sparse_table(std::move(sparse_columns)); + // If no direct aggregations, initialize the sparse table + // only for the keys inserted in global hash set + if (!direct_aggregations) { + auto d_sparse_table = cudf::mutable_table_device_view::create(sparse_table, stream); + extract_populated_keys(global_set, populated_keys, stream); + thrust::for_each_n( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + populated_keys.size(), + initialize_sparse_table{populated_keys.data(), *d_sparse_table, d_agg_kinds}); + } + // Else initialize the whole table + else { + cudf::mutable_table_view sparse_table_view = sparse_table.mutable_view(); + cudf::detail::initialize_with_identity(sparse_table_view, agg_kinds, stream); + } + return sparse_table; +} - auto col_type = cudf::is_dictionary(col.type()) - ? cudf::dictionary_column_view(col).keys().type() - : col.type(); +template void extract_populated_keys( + global_set_t const& key_set, + rmm::device_uvector& populated_keys, + rmm::cuda_stream_view stream); - return make_fixed_width_column( - cudf::detail::target_type(col_type, agg), col.size(), mask_flag, stream); - }); +template void extract_populated_keys( + nullable_global_set_t const& key_set, + rmm::device_uvector& populated_keys, + rmm::cuda_stream_view stream); - table sparse_table(std::move(sparse_columns)); - mutable_table_view table_view = sparse_table.mutable_view(); - cudf::detail::initialize_with_identity(table_view, aggs, stream); - return sparse_table; -} +template cudf::table create_sparse_results_table( + cudf::table_view const& flattened_values, + cudf::aggregation::Kind const* d_agg_kinds, + std::vector agg_kinds, + bool direct_aggregations, + global_set_t const& global_set, + rmm::device_uvector& populated_keys, + rmm::cuda_stream_view stream); + +template cudf::table create_sparse_results_table( + cudf::table_view const& flattened_values, + cudf::aggregation::Kind const* d_agg_kinds, + std::vector agg_kinds, + bool direct_aggregations, + nullable_global_set_t const& global_set, + rmm::device_uvector& populated_keys, + rmm::cuda_stream_view stream); } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/create_sparse_results_table.hpp b/cpp/src/groupby/hash/create_sparse_results_table.hpp index c1d4e0d3f20..8155ce852e0 100644 --- a/cpp/src/groupby/hash/create_sparse_results_table.hpp +++ b/cpp/src/groupby/hash/create_sparse_results_table.hpp @@ -15,18 +15,41 @@ */ #pragma once +#include #include #include #include #include #include +#include #include namespace cudf::groupby::detail::hash { +/** + * @brief Computes and returns a device vector containing all populated keys in + * `key_set`. + * + * @tparam SetType Type of the key hash set + * + * @param key_set Key hash set + * @param populated_keys Array of unique keys + * @param stream CUDA stream used for device memory operations and kernel launches + * @return An array of unique keys contained in `key_set` + */ +template +void extract_populated_keys(SetType const& key_set, + rmm::device_uvector& populated_keys, + rmm::cuda_stream_view stream); + // make table that will hold sparse results -cudf::table create_sparse_results_table(table_view const& flattened_values, - std::vector aggs_kinds, +template +cudf::table create_sparse_results_table(cudf::table_view const& flattened_values, + cudf::aggregation::Kind const* d_agg_kinds, + std::vector agg_kinds, + bool direct_aggregations, + GlobalSetType const& global_set, + rmm::device_uvector& populated_keys, rmm::cuda_stream_view stream); } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/helpers.cuh b/cpp/src/groupby/hash/helpers.cuh index 00836567b4f..f950e03e0fb 100644 --- a/cpp/src/groupby/hash/helpers.cuh +++ b/cpp/src/groupby/hash/helpers.cuh @@ -23,8 +23,6 @@ #include namespace cudf::groupby::detail::hash { -// TODO: similar to `contains_table`, using larger CG size like 2 or 4 for nested -// types and `cg_size = 1`for flat data to improve performance /// Number of threads to handle each input element CUDF_HOST_DEVICE auto constexpr GROUPBY_CG_SIZE = 1; diff --git a/cpp/src/groupby/hash/single_pass_functors.cuh b/cpp/src/groupby/hash/single_pass_functors.cuh index 28a5b578e00..048c9252773 100644 --- a/cpp/src/groupby/hash/single_pass_functors.cuh +++ b/cpp/src/groupby/hash/single_pass_functors.cuh @@ -15,12 +15,14 @@ */ #pragma once -#include +#include "helpers.cuh" + #include -#include -#include +#include +#include +#include -#include +#include namespace cudf::groupby::detail::hash { // TODO: TO BE REMOVED issue tracked via #17171 @@ -104,6 +106,114 @@ struct initialize_shmem { } }; +template +struct initialize_target_element_gmem { + __device__ void operator()(cudf::mutable_column_device_view, cudf::size_type) const noexcept + { + CUDF_UNREACHABLE("Invalid source type and aggregation combination."); + } +}; + +template +struct initialize_target_element_gmem< + Target, + k, + std::enable_if_t() && cudf::is_fixed_width() && + !cudf::is_fixed_point()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index) const noexcept + { + using DeviceType = cudf::device_storage_type_t; + target.element(target_index) = get_identity(); + } +}; + +template +struct initialize_target_element_gmem< + Target, + k, + std::enable_if_t() && cudf::is_fixed_point()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index) const noexcept + { + using DeviceType = cudf::device_storage_type_t; + target.element(target_index) = get_identity(); + } +}; + +struct initialize_gmem { + template + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index) const noexcept + { + initialize_target_element_gmem{}(target, target_index); + } +}; + +struct initialize_sparse_table { + cudf::size_type const* row_indices; + cudf::mutable_table_device_view sparse_table; + cudf::aggregation::Kind const* __restrict__ aggs; + initialize_sparse_table(cudf::size_type const* row_indices, + cudf::mutable_table_device_view sparse_table, + cudf::aggregation::Kind const* aggs) + : row_indices(row_indices), sparse_table(sparse_table), aggs(aggs) + { + } + __device__ void operator()(cudf::size_type i) + { + auto key_idx = row_indices[i]; + for (auto col_idx = 0; col_idx < sparse_table.num_columns(); col_idx++) { + cudf::detail::dispatch_type_and_aggregation(sparse_table.column(col_idx).type(), + aggs[col_idx], + initialize_gmem{}, + sparse_table.column(col_idx), + key_idx); + } + } +}; + +template +struct global_memory_fallback_fn { + SetType set; + cudf::table_device_view input_values; + cudf::mutable_table_device_view output_values; + cudf::aggregation::Kind const* __restrict__ aggs; + cudf::size_type* block_cardinality; + cudf::size_type stride; + bitmask_type const* __restrict__ row_bitmask; + bool skip_rows_with_nulls; + + global_memory_fallback_fn(SetType set, + cudf::table_device_view input_values, + cudf::mutable_table_device_view output_values, + cudf::aggregation::Kind const* aggs, + cudf::size_type* block_cardinality, + cudf::size_type stride, + bitmask_type const* row_bitmask, + bool skip_rows_with_nulls) + : set(set), + input_values(input_values), + output_values(output_values), + aggs(aggs), + block_cardinality(block_cardinality), + stride(stride), + row_bitmask(row_bitmask), + skip_rows_with_nulls(skip_rows_with_nulls) + { + } + + __device__ void operator()(cudf::size_type i) + { + auto const block_id = (i % stride) / GROUPBY_BLOCK_SIZE; + if (block_cardinality[block_id] >= GROUPBY_CARDINALITY_THRESHOLD and + (not skip_rows_with_nulls or cudf::bit_is_set(row_bitmask, i))) { + auto const result = set.insert_and_find(i); + cudf::detail::aggregate_row(output_values, *result.first, input_values, i, aggs); + } + } +}; + /** * @brief Computes single-pass aggregations and store results into a sparse `output_values` table, * and populate `set` with indices of unique keys