diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index c4d4dbc37..68768fe96 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -52,6 +52,7 @@ ConfigureBench(STATIC_SET_BENCH static_set/contains_bench.cu static_set/find_bench.cu static_set/insert_bench.cu + static_set/retrieve_bench.cu static_set/retrieve_all_bench.cu static_set/size_bench.cu static_set/rehash_bench.cu) diff --git a/benchmarks/static_set/retrieve_bench.cu b/benchmarks/static_set/retrieve_bench.cu new file mode 100644 index 000000000..50c59c0b6 --- /dev/null +++ b/benchmarks/static_set/retrieve_bench.cu @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +#include + +#include +#include + +using namespace cuco::benchmark; +using namespace cuco::utility; + +/** + * @brief A benchmark evaluating `cuco::static_set::retrieve` performance + */ +template +void static_set_retrieve(nvbench::state& state, nvbench::type_list) +{ + auto const num_keys = state.get_int64("NumInputs"); + auto const occupancy = state.get_float64("Occupancy"); + auto const matching_rate = state.get_float64("MatchingRate"); + + std::size_t const size = num_keys / occupancy; + + thrust::device_vector keys(num_keys); + + key_generator gen; + gen.generate(dist_from_state(state), keys.begin(), keys.end()); + + gen.dropout(keys.begin(), keys.end(), matching_rate); + + state.add_element_count(num_keys); + + cuco::static_set set{size, cuco::empty_key{-1}}; + set.insert(keys.begin(), keys.end()); + + auto const output_size = set.count(keys.begin(), keys.end()); + thrust::device_vector output_match(output_size); + auto output_probe_begin = thrust::discard_iterator{}; + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + set.retrieve( + keys.begin(), keys.end(), output_probe_begin, output_match.begin(), {launch.get_stream()}); + }); +} + +NVBENCH_BENCH_TYPES(static_set_retrieve, + NVBENCH_TYPE_AXES(defaults::KEY_TYPE_RANGE, + nvbench::type_list)) + .set_name("static_set_retrieve_uniform_occupancy") + .set_type_axes_names({"Key", "Distribution"}) + .set_max_noise(defaults::MAX_NOISE) + .add_int64_axis("NumInputs", {defaults::N}) + .add_float64_axis("Occupancy", defaults::OCCUPANCY_RANGE) + .add_float64_axis("MatchingRate", {defaults::MATCHING_RATE}) + .add_int64_axis("Multiplicity", {defaults::MULTIPLICITY}); + +NVBENCH_BENCH_TYPES(static_set_retrieve, + NVBENCH_TYPE_AXES(defaults::KEY_TYPE_RANGE, + nvbench::type_list)) + .set_name("static_set_retrieve_uniform_matching_rate") + .set_type_axes_names({"Key", "Distribution"}) + .set_max_noise(defaults::MAX_NOISE) + .add_int64_axis("NumInputs", {defaults::N}) + .add_float64_axis("Occupancy", {defaults::OCCUPANCY}) + .add_float64_axis("MatchingRate", defaults::MATCHING_RATE_RANGE) + .add_int64_axis("Multiplicity", {defaults::MULTIPLICITY}); + +NVBENCH_BENCH_TYPES(static_set_retrieve, + NVBENCH_TYPE_AXES(defaults::KEY_TYPE_RANGE, + nvbench::type_list)) + .set_name("static_set_retrieve_uniform_multiplicity") + .set_type_axes_names({"Key", "Distribution"}) + .set_max_noise(defaults::MAX_NOISE) + .add_int64_axis("NumInputs", {defaults::N}) + .add_float64_axis("Occupancy", {defaults::OCCUPANCY}) + .add_float64_axis("MatchingRate", {defaults::MATCHING_RATE}) + .add_int64_axis("Multiplicity", defaults::MULTIPLICITY_RANGE); diff --git a/include/cuco/detail/open_addressing/kernels.cuh b/include/cuco/detail/open_addressing/kernels.cuh index 67314462a..56f329485 100644 --- a/include/cuco/detail/open_addressing/kernels.cuh +++ b/include/cuco/detail/open_addressing/kernels.cuh @@ -415,77 +415,6 @@ CUCO_KERNEL __launch_bounds__(BlockSize) void find_if_n(InputIt first, } } -/** - * @brief Retrieves the equivalent container elements of all keys in the range `[input_probe, - * input_probe + n)`. - * - * If key `k = *(input_probe + i)` has one or more matches in the container, copies `k` to - * `output_probe` and associated slot contents to `output_match`, respectively. The output order is - * unspecified. - * - * @tparam IsOuter Flag indicating whether it's an outer count or not - * @tparam block_size The size of the thread block - * @tparam InputProbeIt Device accessible input iterator - * @tparam OutputProbeIt Device accessible input iterator whose `value_type` is - * convertible to the `InputProbeIt`'s `value_type` - * @tparam OutputMatchIt Device accessible input iterator whose `value_type` is - * convertible to the container's `value_type` - * @tparam AtomicCounter Integral atomic type that follows the same semantics as - * `cuda::(std::)atomic(_ref)` - * @tparam Ref Type of non-owning device ref allowing access to storage - * - * @param input_probe Beginning of the sequence of input keys - * @param n Number of the keys to query - * @param output_probe Beginning of the sequence of keys corresponding to matching elements in - * `output_match` - * @param output_match Beginning of the sequence of matching elements - * @param atomic_counter Pointer to an atomic object of integral type that is used to count the - * number of output elements - * @param ref Non-owning container device ref used to access the slot storage - */ -template -CUCO_KERNEL __launch_bounds__(BlockSize) void retrieve(InputProbeIt input_probe, - cuco::detail::index_type n, - OutputProbeIt output_probe, - OutputMatchIt output_match, - AtomicCounter* atomic_counter, - Ref ref) -{ - namespace cg = cooperative_groups; - - auto const block = cg::this_thread_block(); - auto constexpr tiles_in_block = BlockSize / Ref::cg_size; - // make sure all but the last block are always occupied - auto const items_per_block = detail::int_div_ceil(n, tiles_in_block * gridDim.x) * tiles_in_block; - - auto const block_begin_offset = block.group_index().x * items_per_block; - auto const block_end_offset = min(n, block_begin_offset + items_per_block); - - if (block_begin_offset < block_end_offset) { - if constexpr (IsOuter) { - ref.retrieve_outer(block, - input_probe + block_begin_offset, - input_probe + block_end_offset, - output_probe, - output_match, - *atomic_counter); - } else { - ref.retrieve(block, - input_probe + block_begin_offset, - input_probe + block_end_offset, - output_probe, - output_match, - *atomic_counter); - } - } -} - /** * @brief Inserts all elements in the range `[first, last)`. * @@ -642,6 +571,76 @@ CUCO_KERNEL __launch_bounds__(BlockSize) void count(InputIt first, if (threadIdx.x == 0) { count->fetch_add(block_count, cuda::std::memory_order_relaxed); } } +/** + * @brief Retrieves the equivalent container elements of all keys in the range `[input_probe, + * input_probe + n)`. + * + * If key `k = *(input_probe + i)` has one or more matches in the container, copies `k` to + * `output_probe` and associated slot contents to `output_match`, respectively. The output order is + * unspecified. + * + * @tparam IsOuter Flag indicating whether it's an outer count or not + * @tparam block_size The size of the thread block + * @tparam InputProbeIt Device accessible input iterator + * @tparam OutputProbeIt Device accessible input iterator whose `value_type` is + * convertible to the `InputProbeIt`'s `value_type` + * @tparam OutputMatchIt Device accessible input iterator whose `value_type` is + * convertible to the container's `value_type` + * @tparam AtomicCounter Integral atomic type that follows the same semantics as + * `cuda::(std::)atomic(_ref)` + * @tparam Ref Type of non-owning device ref allowing access to storage + * + * @param input_probe Beginning of the sequence of input keys + * @param n Number of the keys to query + * @param output_probe Beginning of the sequence of keys corresponding to matching elements in + * `output_match` + * @param output_match Beginning of the sequence of matching elements + * @param atomic_counter Pointer to an atomic object of integral type that is used to count the + * number of output elements + * @param ref Non-owning container device ref used to access the slot storage + */ +template +CUCO_KERNEL void retrieve(InputProbeIt input_probe, + cuco::detail::index_type n, + OutputProbeIt output_probe, + OutputMatchIt output_match, + AtomicCounter* atomic_counter, + Ref ref) +{ + namespace cg = cooperative_groups; + + auto const block = cg::this_thread_block(); + auto constexpr tiles_in_block = BlockSize / Ref::cg_size; + + auto const block_begin_offset = block.group_index().x * tiles_in_block; + auto const block_end_offset = + min(n, static_cast(block_begin_offset + tiles_in_block)); + + if (block_begin_offset < block_end_offset) { + if constexpr (IsOuter) { + ref.retrieve_outer(block, + input_probe + block_begin_offset, + input_probe + block_end_offset, + output_probe, + output_match, + atomic_counter); + } else { + ref.retrieve(block, + input_probe + block_begin_offset, + input_probe + block_end_offset, + output_probe, + output_match, + atomic_counter); + } + } +} + /** * @brief Calculates the number of filled slots for the given bucket storage. * diff --git a/include/cuco/detail/open_addressing/open_addressing_ref_impl.cuh b/include/cuco/detail/open_addressing/open_addressing_ref_impl.cuh index a8edb156e..1c7fa518a 100644 --- a/include/cuco/detail/open_addressing/open_addressing_ref_impl.cuh +++ b/include/cuco/detail/open_addressing/open_addressing_ref_impl.cuh @@ -26,6 +26,9 @@ #include #include #include +#include +#include +#include #include #if defined(CUCO_HAS_CUDA_BARRIER) #include @@ -821,76 +824,6 @@ class open_addressing_ref_impl { } } - /** - * @brief Counts the occurrence of a given key contained in the container - * - * @tparam ProbeKey Probe key type - * - * @param key The key to count for - * - * @return Number of occurrences found by the current thread - */ - template - [[nodiscard]] __device__ size_type count(ProbeKey const& key) const noexcept - { - if constexpr (not allows_duplicates) { - return static_cast(this->contains(key)); - } else { - auto probing_iter = probing_scheme_(key, storage_ref_.bucket_extent()); - size_type count = 0; - - while (true) { - // TODO atomic_ref::load if insert operator is present - auto const bucket_slots = storage_ref_[*probing_iter]; - - for (auto& slot_content : bucket_slots) { - switch ( - this->predicate_.operator()(key, this->extract_key(slot_content))) { - case detail::equal_result::EMPTY: return count; - case detail::equal_result::EQUAL: ++count; break; - default: continue; - } - } - ++probing_iter; - } - } - } - - /** - * @brief Counts the occurrence of a given key contained in the container - * - * @tparam ProbeKey Probe key type - * - * @param group The Cooperative Group used to perform group count - * @param key The key to count for - * - * @return Number of occurrences found by the current thread - */ - template - [[nodiscard]] __device__ size_type count( - cooperative_groups::thread_block_tile const& group, ProbeKey const& key) const noexcept - { - auto probing_iter = probing_scheme_(group, key, storage_ref_.bucket_extent()); - size_type count = 0; - - while (true) { - auto const bucket_slots = storage_ref_[*probing_iter]; - - auto const state = [&]() { - auto res = detail::equal_result::UNEQUAL; - for (auto& slot : bucket_slots) { - res = this->predicate_.operator()(key, this->extract_key(slot)); - if (res == detail::equal_result::EMPTY) { return res; } - count += static_cast(res); - } - return res; - }(); - - if (group.any(state == detail::equal_result::EMPTY)) { return count; } - ++probing_iter; - } - } - /** * @brief Finds an element in the container with key equivalent to the probe key. * @@ -978,6 +911,76 @@ class open_addressing_ref_impl { } } + /** + * @brief Counts the occurrence of a given key contained in the container + * + * @tparam ProbeKey Probe key type + * + * @param key The key to count for + * + * @return Number of occurrences found by the current thread + */ + template + [[nodiscard]] __device__ size_type count(ProbeKey const& key) const noexcept + { + if constexpr (not allows_duplicates) { + return static_cast(this->contains(key)); + } else { + auto probing_iter = probing_scheme_(key, storage_ref_.bucket_extent()); + size_type count = 0; + + while (true) { + // TODO atomic_ref::load if insert operator is present + auto const bucket_slots = storage_ref_[*probing_iter]; + + for (auto& slot_content : bucket_slots) { + switch ( + this->predicate_.operator()(key, this->extract_key(slot_content))) { + case detail::equal_result::EMPTY: return count; + case detail::equal_result::EQUAL: ++count; break; + default: continue; + } + } + ++probing_iter; + } + } + } + + /** + * @brief Counts the occurrence of a given key contained in the container + * + * @tparam ProbeKey Probe key type + * + * @param group The Cooperative Group used to perform group count + * @param key The key to count for + * + * @return Number of occurrences found by the current thread + */ + template + [[nodiscard]] __device__ size_type count( + cooperative_groups::thread_block_tile const& group, ProbeKey const& key) const noexcept + { + auto probing_iter = probing_scheme_(group, key, storage_ref_.bucket_extent()); + size_type count = 0; + + while (true) { + auto const bucket_slots = storage_ref_[*probing_iter]; + + auto const state = [&]() { + auto res = detail::equal_result::UNEQUAL; + for (auto& slot : bucket_slots) { + res = this->predicate_.operator()(key, this->extract_key(slot)); + if (res == detail::equal_result::EMPTY) { return res; } + count += static_cast(res); + } + return res; + }(); + + if (group.any(state == detail::equal_result::EMPTY)) { return count; } + ++probing_iter; + } + } + /** * @brief Retrieves all the slots corresponding to all keys in the range `[input_probe_begin, * input_probe_end)`. @@ -1016,7 +1019,7 @@ class open_addressing_ref_impl { InputProbeIt input_probe_end, OutputProbeIt output_probe, OutputMatchIt output_match, - AtomicCounter& atomic_counter) const + AtomicCounter* atomic_counter) const { auto constexpr is_outer = false; auto const n = cuco::detail::distance(input_probe_begin, input_probe_end); // TODO include @@ -1065,7 +1068,7 @@ class open_addressing_ref_impl { InputProbeIt input_probe_end, OutputProbeIt output_probe, OutputMatchIt output_match, - AtomicCounter& atomic_counter) const + AtomicCounter* atomic_counter) const { auto constexpr is_outer = true; auto const n = cuco::detail::distance(input_probe_begin, input_probe_end); // TODO include @@ -1116,7 +1119,7 @@ class open_addressing_ref_impl { cuco::detail::index_type n, OutputProbeIt output_probe, OutputMatchIt output_match, - AtomicCounter& atomic_counter) const + AtomicCounter* atomic_counter) const { namespace cg = cooperative_groups; @@ -1134,39 +1137,32 @@ class open_addressing_ref_impl { auto constexpr num_flushing_tiles = BlockSize / flushing_tile_size; auto constexpr max_matches_per_step = flushing_tile_size * bucket_size; - auto constexpr buffer_size = buffer_multiplier * max_matches_per_step; + auto constexpr buffer_size = buffer_multiplier * max_matches_per_step + flushing_tile_size; auto const flushing_tile = cg::tiled_partition(block); auto const probing_tile = cg::tiled_partition(block); auto const flushing_tile_id = flushing_tile.meta_group_rank(); - auto idx = probing_tile.meta_group_rank(); auto const stride = probing_tile.meta_group_size(); + auto idx = probing_tile.meta_group_rank(); - // TODO align to 16B? - __shared__ probe_type probe_buffers[num_flushing_tiles][buffer_size]; - __shared__ value_type match_buffers[num_flushing_tiles][buffer_size]; - size_type num_matches = 0; + __shared__ cuco::pair buffers[num_flushing_tiles][buffer_size]; + __shared__ int32_t counters[num_flushing_tiles]; - auto flush_buffers = [&](cg::coalesced_group const& tile) { - auto const rank = tile.thread_rank(); + if (flushing_tile.thread_rank() == 0) { counters[flushing_tile_id] = 0; } + flushing_tile.sync(); -#if defined(CUCO_HAS_CG_INVOKE_ONE) - auto const offset = cg::invoke_one_broadcast(tile, [&]() { - return atomic_counter.fetch_add(num_matches, cuda::std::memory_order_relaxed); - }); -#else - size_type offset; - if (rank == 0) { - offset = atomic_counter.fetch_add(num_matches, cuda::std::memory_order_relaxed); - } + auto flush_buffers = [&](auto const& tile) { + size_type offset = 0; + auto const count = counters[flushing_tile_id]; + auto const rank = tile.thread_rank(); + if (rank == 0) { offset = atomic_counter->fetch_add(count, cuda::memory_order_relaxed); } offset = tile.shfl(offset, 0); -#endif // flush_buffers - for (size_type i = rank; i < num_matches; i += tile.size()) { - *(output_probe + offset + i) = probe_buffers[flushing_tile_id][i]; - *(output_match + offset + i) = match_buffers[flushing_tile_id][i]; + for (auto i = rank; i < count; i += tile.size()) { + *(output_probe + offset + i) = buffers[flushing_tile_id][i].first; + *(output_match + offset + i) = buffers[flushing_tile_id][i].second; } }; @@ -1178,107 +1174,119 @@ class open_addressing_ref_impl { if (active_flag) { // perform probing // make sure the flushing_tile is converged at this point to get a coalesced load - auto const& probe = *(input_probe + idx); + auto const& probe_key = *(input_probe + idx); auto probing_iter = - this->probing_scheme_(probing_tile, probe, this->storage_ref_.bucket_extent()); - bool empty_found = false; - bool match_found = false; - [[maybe_unused]] bool found_any_match = false; // only needed if `IsOuter == true` - - while (true) { - // TODO atomic_ref::load if insert operator is present - auto const bucket_slots = this->storage_ref_[*probing_iter]; - - for (int32_t i = 0; i < bucket_size; ++i) { - if (not empty_found) { - // inspect slot content - switch (this->predicate_.operator()( - probe, this->extract_key(bucket_slots[i]))) { - case detail::equal_result::EMPTY: { - empty_found = true; - break; - } - case detail::equal_result::EQUAL: { - match_found = true; - break; - } - default: { - break; + this->probing_scheme_(probing_tile, probe_key, this->storage_ref_.bucket_extent()); + + bool running = true; + [[maybe_unused]] bool found_match = false; + + bool equals[buffer_size]; + uint32_t exists[buffer_size]; + + while (active_flushing_tile.any(running)) { + if (running) { + // TODO atomic_ref::load if insert operator is present + auto const bucket_slots = this->storage_ref_[*probing_iter]; + +#pragma unroll buffer_size + for (int32_t i = 0; i < bucket_size; ++i) { + equals[i] = false; + if (running) { + // inspect slot content + switch (this->predicate_.operator()( + probe_key, this->extract_key(bucket_slots[i]))) { + case detail::equal_result::EMPTY: { + running = false; + break; + } + case detail::equal_result::EQUAL: { + if constexpr (!AllowsDuplicates) { running = false; } + equals[i] = true; + break; + } + default: { + break; + } } } } - if (active_flushing_tile.any(match_found)) { - auto const matching_tile = cg::binary_partition(active_flushing_tile, match_found); - // stage matches in shmem buffer - if (match_found) { - probe_buffers[flushing_tile_id][num_matches + matching_tile.thread_rank()] = probe; - match_buffers[flushing_tile_id][num_matches + matching_tile.thread_rank()] = - bucket_slots[i]; - } - - // add number of new matches to the buffer counter - num_matches += (match_found) ? matching_tile.size() - : active_flushing_tile.size() - matching_tile.size(); + probing_tile.sync(); + running = probing_tile.all(running); +#pragma unroll buffer_size + for (int32_t i = 0; i < bucket_size; ++i) { + exists[i] = probing_tile.ballot(equals[i]); } - if constexpr (IsOuter) { - if (not found_any_match /*yet*/ and probing_tile.any(match_found) /*now*/) { - found_any_match = true; + // Fill the buffer if any matching keys are found + auto const lane_id = probing_tile.thread_rank(); + if (thrust::any_of(thrust::seq, exists, exists + bucket_size, thrust::identity{})) { + if constexpr (IsOuter) { found_match = true; } + + int32_t num_matches[bucket_size]; + + for (int32_t i = 0; i < bucket_size; ++i) { + num_matches[i] = __popc(exists[i]); } - } - // reset flag for next iteration - match_found = false; - } - empty_found = probing_tile.any(empty_found); - - // check if all probing tiles have finished their work - bool const finished = active_flushing_tile.all(empty_found); - - if constexpr (IsOuter) { - if (finished) { - bool const writes_sentinel = - ((probing_tile.thread_rank() == 0) and not found_any_match); - - auto const sentinel_writers = - cg::binary_partition(active_flushing_tile, writes_sentinel); - if (writes_sentinel) { - auto const rank = sentinel_writers.thread_rank(); - probe_buffers[flushing_tile_id][num_matches + rank] = probe; - match_buffers[flushing_tile_id][num_matches + rank] = this->empty_slot_sentinel(); + int32_t output_idx; + if (lane_id == 0) { + auto const total_matches = + thrust::reduce(thrust::seq, num_matches, num_matches + bucket_size); + auto ref = + cuda::atomic_ref{counters[flushing_tile_id]}; + output_idx = ref.fetch_add(total_matches, cuda::memory_order_relaxed); + } + output_idx = probing_tile.shfl(output_idx, 0); + + int32_t matches_offset = 0; +#pragma unroll buffer_size + for (int32_t i = 0; i < bucket_size; ++i) { + if (equals[i]) { + auto const lane_offset = detail::count_least_significant_bits(exists[i], lane_id); + buffers[flushing_tile_id][output_idx + matches_offset + lane_offset] = { + probe_key, bucket_slots[i]}; + } + matches_offset += num_matches[i]; } - // add number of new matches to the buffer counter - num_matches += (writes_sentinel) - ? sentinel_writers.size() - : active_flushing_tile.size() - sentinel_writers.size(); } - } + // Special handling for outer cases where no match is found + if constexpr (IsOuter) { + if (!running) { + if (!found_match and lane_id == 0) { + auto ref = + cuda::atomic_ref{counters[flushing_tile_id]}; + auto const output_idx = ref.fetch_add(1, cuda::memory_order_relaxed); + buffers[flushing_tile_id][output_idx] = {probe_key, this->empty_slot_sentinel()}; + } + } + } + } // if running + active_flushing_tile.sync(); // if the buffer has not enough empty slots for the next iteration - if (num_matches > (buffer_size - max_matches_per_step)) { + if (counters[flushing_tile_id] > (buffer_size - max_matches_per_step)) { flush_buffers(active_flushing_tile); + active_flushing_tile.sync(); // reset buffer counter - num_matches = 0; + if (active_flushing_tile.thread_rank() == 0) { counters[flushing_tile_id] = 0; } + active_flushing_tile.sync(); } - // the entire flushing tile has finished its work - if (finished) { break; } - // onto the next probing bucket ++probing_iter; - } - - // entire flusing_tile has finished; flush remaining elements - if (num_matches != 0 and active_flushing_tile.all((idx + stride) >= n)) { - flush_buffers(active_flushing_tile); - } - } + } // while running + } // if active_flag // onto the next key idx += stride; } + + flushing_tile.sync(); + // entire flusing_tile has finished; flush remaining elements + if (counters[flushing_tile_id] > 0) { flush_buffers(flushing_tile); } } /** diff --git a/include/cuco/detail/static_set/kernels.cuh b/include/cuco/detail/static_set/kernels.cuh deleted file mode 100644 index b3488094a..000000000 --- a/include/cuco/detail/static_set/kernels.cuh +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include - -#include -#include -#include - -#include - -#include - -namespace cuco::static_set_ns::detail { - -CUCO_SUPPRESS_KERNEL_WARNINGS - -template -__device__ void flush_buffer(CG const& tile, - Size buffer_size, - cuco::pair* buffer, - cuda::atomic* counter, - OutputIt1 output_probe, - OutputIt2 output_match) -{ - auto i = tile.thread_rank(); - -#if defined(CUCO_HAS_CG_INVOKE_ONE) - auto const offset = cooperative_groups::invoke_one_broadcast( - tile, [&]() { return counter->fetch_add(buffer_size, cuda::std::memory_order_relaxed); }); -#else - Size offset; - if (i == 0) { offset = counter->fetch_add(buffer_size, cuda::std::memory_order_relaxed); } - offset = tile.shfl(offset, 0); -#endif - - while (i < buffer_size) { - *(output_probe + offset + i) = buffer[i].first; - *(output_match + offset + i) = buffer[i].second; - - i += tile.size(); - } -} - -template -__device__ void group_retrieve(InputIt first, - cuco::detail::index_type n, - OutputIt1 output_probe, - OutputIt2 output_match, - AtomicT* counter, - Ref ref) -{ - namespace cg = cooperative_groups; - - using size_type = typename Ref::size_type; - using ProbeKey = typename std::iterator_traits::value_type; - using Key = typename Ref::key_type; - - auto constexpr tile_size = Ref::cg_size; - auto constexpr bucket_size = Ref::bucket_size; - - auto idx = cuco::detail::global_thread_id() / tile_size; - auto const stride = cuco::detail::grid_stride() / tile_size; - auto const block = cg::this_thread_block(); - auto const tile = cg::tiled_partition(block); - - auto constexpr flushing_tile_size = cuco::detail::warp_size() / bucket_size; - // random choice to tune - auto constexpr flushing_buffer_size = 2 * flushing_tile_size; - auto constexpr num_flushing_tiles = BlockSize / flushing_tile_size; - auto constexpr max_matches = flushing_tile_size / tile_size; - - static_assert(flushing_tile_size > 0); - - auto const flushing_tile = cg::tiled_partition(block); - auto const flushing_tile_id = flushing_tile.meta_group_rank(); - - __shared__ cuco::pair flushing_tile_buffer[num_flushing_tiles][flushing_tile_size]; - - using atomic_counter_type = cuda::atomic; - // per flushing-tile counter to track number of filled elements - __shared__ atomic_counter_type flushing_counter[num_flushing_tiles]; - -#if defined(CUCO_HAS_CG_INVOKE_ONE) - cg::invoke_one(flushing_tile, - [&]() { new (&flushing_counter[flushing_tile_id]) atomic_counter_type{0}; }); -#else - if (flushing_tile.thread_rank() == 0) { - new (&flushing_counter[flushing_tile_id]) atomic_counter_type{0}; - } -#endif - flushing_tile.sync(); // sync still needed since cg.any doesn't imply a memory barrier - - while (flushing_tile.any(idx < n)) { - bool active_flag = idx < n; - auto const active_flushing_tile = - cg::binary_partition(flushing_tile, active_flag); - if (active_flag) { - auto const found = ref.find(tile, *(first + idx)); -#if defined(CUCO_HAS_CG_INVOKE_ONE) - if (found != ref.end()) { - cg::invoke_one(tile, [&]() { - auto const offset = - flushing_counter[flushing_tile_id].fetch_add(1, cuda::std::memory_order_relaxed); - flushing_tile_buffer[flushing_tile_id][offset] = {*(first + idx), *found}; - }); - } -#else - if (tile.thread_rank() == 0 and found != ref.end()) { - auto const offset = - flushing_counter[flushing_tile_id].fetch_add(1, cuda::std::memory_order_relaxed); - flushing_tile_buffer[flushing_tile_id][offset] = {*(first + idx), *found}; - } -#endif - } - - flushing_tile.sync(); - auto const buffer_size = - flushing_counter[flushing_tile_id].load(cuda::std::memory_order_relaxed); - if (buffer_size + max_matches > flushing_buffer_size) { - flush_buffer(flushing_tile, - buffer_size, - flushing_tile_buffer[flushing_tile_id], - counter, - output_probe, - output_match); - flushing_tile.sync(); -#if defined(CUCO_HAS_CG_INVOKE_ONE) - cg::invoke_one(flushing_tile, [&]() { - flushing_counter[flushing_tile_id].store(0, cuda::std::memory_order_relaxed); - }); -#else - if (flushing_tile.thread_rank() == 0) { - flushing_counter[flushing_tile_id].store(0, cuda::std::memory_order_relaxed); - } -#endif - flushing_tile.sync(); - } - - idx += stride; - } // while - - auto const buffer_size = flushing_counter[flushing_tile_id].load(cuda::std::memory_order_relaxed); - if (buffer_size > 0) { - flush_buffer(flushing_tile, - buffer_size, - flushing_tile_buffer[flushing_tile_id], - counter, - output_probe, - output_match); - } -} - -template -__device__ void flush_buffer(cooperative_groups::thread_block const& block, - Size buffer_size, - cuco::pair* buffer, - cuda::atomic* counter, - OutputIt1 output_probe, - OutputIt2 output_match) -{ - auto i = block.thread_rank(); - __shared__ Size offset; - -#if defined(CUCO_HAS_CG_INVOKE_ONE) - cooperative_groups::invoke_one( - block, [&]() { offset = counter->fetch_add(buffer_size, cuda::std::memory_order_relaxed); }); -#else - if (i == 0) { offset = counter->fetch_add(buffer_size, cuda::std::memory_order_relaxed); } -#endif - block.sync(); - - while (i < buffer_size) { - *(output_probe + offset + i) = buffer[i].first; - *(output_match + offset + i) = buffer[i].second; - - i += block.size(); - } -} - -template -__device__ void scalar_retrieve(InputIt first, - cuco::detail::index_type n, - OutputIt1 output_probe, - OutputIt2 output_match, - AtomicT* counter, - Ref ref) -{ - namespace cg = cooperative_groups; - - using size_type = typename Ref::size_type; - using ProbeKey = typename std::iterator_traits::value_type; - using Key = typename Ref::key_type; - - auto idx = cuco::detail::global_thread_id(); - auto const stride = cuco::detail::grid_stride(); - auto const block = cg::this_thread_block(); - - using block_scan = cub::BlockScan; - __shared__ typename block_scan::TempStorage block_scan_temp_storage; - - auto constexpr buffer_capacity = 2 * BlockSize; - __shared__ cuco::pair buffer[buffer_capacity]; - size_type buffer_size = 0; - - while (idx - block.thread_rank() < n) { // the whole thread block falls into the same iteration - auto const found = idx < n ? ref.find(*(first + idx)) : ref.end(); - auto const has_match = found != ref.end(); - - // Use a whole-block scan to calculate the output location - size_type offset; - size_type block_count; - block_scan(block_scan_temp_storage) - .ExclusiveSum(static_cast(has_match), offset, block_count); - - if (buffer_size + block_count > buffer_capacity) { - flush_buffer(block, buffer_size, buffer, counter, output_probe, output_match); - block.sync(); - buffer_size = 0; - } - - if (has_match) { buffer[buffer_size + offset] = {*(first + idx), *found}; } - buffer_size += block_count; - block.sync(); - - idx += stride; - } // while - - if (buffer_size > 0) { - flush_buffer(block, buffer_size, buffer, counter, output_probe, output_match); - } -} - -template -CUCO_KERNEL __launch_bounds__(BlockSize) void retrieve(InputIt first, - cuco::detail::index_type n, - OutputIt1 output_probe, - OutputIt2 output_match, - AtomicT* counter, - Ref ref) -{ - // Scalar retrieve without using CG - if constexpr (Ref::cg_size == 1) { - scalar_retrieve(first, n, output_probe, output_match, counter, ref); - } else { - group_retrieve(first, n, output_probe, output_match, counter, ref); - } -} - -} // namespace cuco::static_set_ns::detail diff --git a/include/cuco/detail/static_set/static_set.inl b/include/cuco/detail/static_set/static_set.inl index d50e90c2e..dd9398aa0 100644 --- a/include/cuco/detail/static_set/static_set.inl +++ b/include/cuco/detail/static_set/static_set.inl @@ -15,7 +15,6 @@ */ #include -#include #include #include #include @@ -468,21 +467,7 @@ static_set::ret OutputIt2 output_match, cuda::stream_ref stream) const { - auto const num_keys = cuco::detail::distance(first, last); - if (num_keys == 0) { return {output_probe, output_match}; } - - auto counter = - detail::counter_storage{this->impl_->allocator()}; - counter.reset(stream); - - auto const grid_size = cuco::detail::grid_size(num_keys, cg_size); - - static_set_ns::detail::retrieve - <<>>( - first, num_keys, output_probe, output_match, counter.data(), ref(op::find)); - - auto const count = counter.load_to_host(stream); - return {output_probe + count, output_match + count}; + return impl_->retrieve(first, last, output_probe, output_match, this->ref(op::retrieve), stream); } template +class operator_impl> { + using base_type = static_set_ref; + using ref_type = static_set_ref; + using key_type = typename base_type::key_type; + using value_type = typename base_type::value_type; + using iterator = typename base_type::iterator; + using const_iterator = typename base_type::const_iterator; + + static constexpr auto cg_size = base_type::cg_size; + static constexpr auto bucket_size = base_type::bucket_size; + + public: + /** + * @brief Retrieves all the slots corresponding to all keys in the range `[input_probe_begin, + * input_probe_end)`. + * + * If key `k = *(first + i)` exists in the container, copies `k` to `output_probe` and associated + * slot content to `output_match`, respectively. The output order is unspecified. + * + * Behavior is undefined if the size of the output range exceeds the number of retrieved slots. + * Use `count()` to determine the size of the output range. + * + * @tparam BlockSize Size of the thread block this operation is executed in + * @tparam InputProbeIt Device accessible input iterator whose `value_type` is + * convertible to the container's `key_type` + * @tparam OutputProbeIt Device accessible input iterator whose `value_type` is + * convertible to the container's `key_type` + * @tparam OutputMatchIt Device accessible input iterator whose `value_type` is + * convertible to the container's `value_type` + * @tparam AtomicCounter Atomic counter type that follows the same semantics as + * `cuda::atomic(_ref)` + * + * @param block Thread block this operation is executed in + * @param input_probe_begin Beginning of the input sequence of keys + * @param input_probe_end End of the input sequence of keys + * @param output_probe Beginning of the sequence of keys corresponding to matching elements in + * `output_match` + * @param output_match Beginning of the sequence of matching elements + * @param atomic_counter Counter that is used to determine the next free position in the output + * sequences + */ + template + __device__ void retrieve(cooperative_groups::thread_block const& block, + InputProbeIt input_probe_begin, + InputProbeIt input_probe_end, + OutputProbeIt output_probe, + OutputMatchIt output_match, + AtomicCounter* atomic_counter) const + { + auto const& ref_ = static_cast(*this); + ref_.impl_.retrieve( + block, input_probe_begin, input_probe_end, output_probe, output_match, atomic_counter); + } +}; } // namespace detail } // namespace cuco diff --git a/tests/static_multiset/retrieve_test.cu b/tests/static_multiset/retrieve_test.cu index ad21333ba..ba171544f 100644 --- a/tests/static_multiset/retrieve_test.cu +++ b/tests/static_multiset/retrieve_test.cu @@ -24,11 +24,8 @@ #include #include #include -#include -#include -#include +#include #include -#include #include @@ -47,32 +44,28 @@ void test_multiplicity(Container& container, std::size_t num_keys, std::size_t m auto const num_actual_keys = num_unique_keys * multiplicity; REQUIRE(num_actual_keys <= num_keys); - thrust::device_vector input_keys(num_actual_keys); thrust::device_vector probed_keys(num_actual_keys); thrust::device_vector matched_keys(num_actual_keys); - thrust::transform(thrust::counting_iterator(0), - thrust::counting_iterator(num_actual_keys), - input_keys.begin(), - cuda::proclaim_return_type([multiplicity] __device__(auto const& i) { - return static_cast(i / multiplicity); - })); - thrust::shuffle(input_keys.begin(), input_keys.end(), thrust::default_random_engine{}); + auto const keys_begin = thrust::make_transform_iterator( + thrust::counting_iterator(0), + cuda::proclaim_return_type([multiplicity] __device__(auto const& i) { + return static_cast(i / multiplicity); + })); - container.insert(input_keys.begin(), input_keys.end()); + container.insert(keys_begin, keys_begin + num_actual_keys); REQUIRE(container.size() == num_actual_keys); SECTION("All inserted keys should be contained.") { auto const [probed_end, matched_end] = container.retrieve( - input_keys.begin(), input_keys.end(), probed_keys.begin(), matched_keys.begin()); - thrust::sort(input_keys.begin(), input_keys.end()); + keys_begin, keys_begin + num_actual_keys, probed_keys.begin(), matched_keys.begin()); thrust::sort(probed_keys.begin(), probed_end); thrust::sort(matched_keys.begin(), matched_end); REQUIRE(cuco::test::equal( - probed_keys.begin(), probed_keys.end(), input_keys.begin(), thrust::equal_to{})); + probed_keys.begin(), probed_keys.end(), keys_begin, thrust::equal_to{})); REQUIRE(cuco::test::equal( - matched_keys.begin(), matched_keys.end(), input_keys.begin(), thrust::equal_to{})); + matched_keys.begin(), matched_keys.end(), keys_begin, thrust::equal_to{})); } } @@ -84,18 +77,16 @@ void test_outer(Container& container, std::size_t num_keys) container.clear(); - thrust::device_vector insert_keys(num_keys); - thrust::sequence(insert_keys.begin(), insert_keys.end(), 0); - thrust::device_vector query_keys(num_keys * 2ull); - thrust::sequence(query_keys.begin(), query_keys.end(), 0); + auto const keys_begin = thrust::counting_iterator{0}; + auto const query_size = num_keys * 2ull; thrust::device_vector probed_keys(num_keys * 2ull); thrust::device_vector matched_keys(num_keys * 2ull); SECTION("Non-inserted keys should output sentinels.") { - auto const [probed_end, matched_end] = container.retrieve_outer(query_keys.begin(), - query_keys.end(), + auto const [probed_end, matched_end] = container.retrieve_outer(keys_begin, + keys_begin + query_size, container.key_eq(), container.hash_function(), probed_keys.begin(), @@ -112,12 +103,12 @@ void test_outer(Container& container, std::size_t num_keys) }))); } - container.insert(insert_keys.begin(), insert_keys.end()); + container.insert(keys_begin, keys_begin + num_keys); SECTION("All inserted keys should be contained.") { - auto const [probed_end, matched_end] = container.retrieve_outer(query_keys.begin(), - query_keys.end(), + auto const [probed_end, matched_end] = container.retrieve_outer(keys_begin, + keys_begin + query_size, container.key_eq(), container.hash_function(), probed_keys.begin(), @@ -126,10 +117,10 @@ void test_outer(Container& container, std::size_t num_keys) probed_keys.begin(), probed_end, matched_keys.begin(), thrust::less()); REQUIRE(cuco::test::equal( - probed_keys.begin(), probed_keys.end(), query_keys.begin(), thrust::equal_to{})); + probed_keys.begin(), probed_keys.end(), keys_begin, thrust::equal_to{})); REQUIRE(cuco::test::equal(matched_keys.begin(), matched_keys.begin() + num_keys, - insert_keys.begin(), + keys_begin, thrust::equal_to{})); REQUIRE(cuco::test::all_of( matched_keys.begin() + num_keys,