Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into fea/plc/io/streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 authored Dec 20, 2024
2 parents 3f84e5c + d8f469f commit f34b519
Show file tree
Hide file tree
Showing 40 changed files with 954 additions and 448 deletions.
39 changes: 24 additions & 15 deletions cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -57,62 +57,71 @@ struct MurmurHash3_x86_32 {
};

template <>
hash_value_type __device__ inline MurmurHash3_x86_32<bool>::operator()(bool const& key) const
MurmurHash3_x86_32<bool>::result_type __device__ inline MurmurHash3_x86_32<bool>::operator()(
bool const& key) const
{
return this->compute(static_cast<uint8_t>(key));
}

template <>
hash_value_type __device__ inline MurmurHash3_x86_32<float>::operator()(float const& key) const
MurmurHash3_x86_32<float>::result_type __device__ inline MurmurHash3_x86_32<float>::operator()(
float const& key) const
{
return this->compute(normalize_nans_and_zeros(key));
}

template <>
hash_value_type __device__ inline MurmurHash3_x86_32<double>::operator()(double const& key) const
MurmurHash3_x86_32<double>::result_type __device__ inline MurmurHash3_x86_32<double>::operator()(
double const& key) const
{
return this->compute(normalize_nans_and_zeros(key));
}

template <>
hash_value_type __device__ inline MurmurHash3_x86_32<cudf::string_view>::operator()(
cudf::string_view const& key) const
MurmurHash3_x86_32<cudf::string_view>::result_type
__device__ inline MurmurHash3_x86_32<cudf::string_view>::operator()(
cudf::string_view const& key) const
{
return this->compute_bytes(reinterpret_cast<cuda::std::byte const*>(key.data()),
key.size_bytes());
}

template <>
hash_value_type __device__ inline MurmurHash3_x86_32<numeric::decimal32>::operator()(
numeric::decimal32 const& key) const
MurmurHash3_x86_32<numeric::decimal32>::result_type
__device__ inline MurmurHash3_x86_32<numeric::decimal32>::operator()(
numeric::decimal32 const& key) const
{
return this->compute(key.value());
}

template <>
hash_value_type __device__ inline MurmurHash3_x86_32<numeric::decimal64>::operator()(
numeric::decimal64 const& key) const
MurmurHash3_x86_32<numeric::decimal64>::result_type
__device__ inline MurmurHash3_x86_32<numeric::decimal64>::operator()(
numeric::decimal64 const& key) const
{
return this->compute(key.value());
}

template <>
hash_value_type __device__ inline MurmurHash3_x86_32<numeric::decimal128>::operator()(
numeric::decimal128 const& key) const
MurmurHash3_x86_32<numeric::decimal128>::result_type
__device__ inline MurmurHash3_x86_32<numeric::decimal128>::operator()(
numeric::decimal128 const& key) const
{
return this->compute(key.value());
}

template <>
hash_value_type __device__ inline MurmurHash3_x86_32<cudf::list_view>::operator()(
cudf::list_view const& key) const
MurmurHash3_x86_32<cudf::list_view>::result_type
__device__ inline MurmurHash3_x86_32<cudf::list_view>::operator()(
cudf::list_view const& key) const
{
CUDF_UNREACHABLE("List column hashing is not supported");
}

template <>
hash_value_type __device__ inline MurmurHash3_x86_32<cudf::struct_view>::operator()(
cudf::struct_view const& key) const
MurmurHash3_x86_32<cudf::struct_view>::result_type
__device__ inline MurmurHash3_x86_32<cudf::struct_view>::operator()(
cudf::struct_view const& key) const
{
CUDF_UNREACHABLE("Direct hashing of struct_view is not supported");
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/hash/compute_groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ std::unique_ptr<table> compute_groupby(table_view const& keys,
d_row_equal,
probing_scheme_t{d_row_hash},
cuco::thread_scope_device,
cuco::storage<GROUPBY_WINDOW_SIZE>{},
cuco::storage<GROUPBY_BUCKET_SIZE>{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/groupby/hash/compute_mapping_indices.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ CUDF_KERNEL void mapping_indices_kernel(cudf::size_type num_input_rows,
__shared__ cudf::size_type shared_set_indices[GROUPBY_SHM_MAX_ELEMENTS];

// Shared set initialization
__shared__ cuco::window<cudf::size_type, GROUPBY_WINDOW_SIZE> windows[window_extent.value()];
__shared__ cuco::bucket<cudf::size_type, GROUPBY_BUCKET_SIZE> buckets[bucket_extent.value()];

auto raw_set = cuco::static_set_ref{
cuco::empty_key<cudf::size_type>{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
global_set.key_eq(),
probing_scheme_t{global_set.hash_function()},
cuco::thread_scope_block,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, decltype(window_extent)>{
window_extent, windows}};
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, decltype(bucket_extent)>{
bucket_extent, buckets}};
auto shared_set = raw_set.rebind_operators(cuco::insert_and_find);

auto const block = cooperative_groups::this_thread_block();
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/groupby/hash/helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace cudf::groupby::detail::hash {
CUDF_HOST_DEVICE auto constexpr GROUPBY_CG_SIZE = 1;

/// Number of slots per thread
CUDF_HOST_DEVICE auto constexpr GROUPBY_WINDOW_SIZE = 1;
CUDF_HOST_DEVICE auto constexpr GROUPBY_BUCKET_SIZE = 1;

/// Thread block size
CUDF_HOST_DEVICE auto constexpr GROUPBY_BLOCK_SIZE = 128;
Expand All @@ -48,9 +48,9 @@ using shmem_extent_t =
cuco::extent<cudf::size_type,
static_cast<cudf::size_type>(static_cast<double>(GROUPBY_SHM_MAX_ELEMENTS) * 1.43)>;

/// Number of windows needed by each shared memory hash set
CUDF_HOST_DEVICE auto constexpr window_extent =
cuco::make_window_extent<GROUPBY_CG_SIZE, GROUPBY_WINDOW_SIZE>(shmem_extent_t{});
/// Number of buckets needed by each shared memory hash set
CUDF_HOST_DEVICE auto constexpr bucket_extent =
cuco::make_bucket_extent<GROUPBY_CG_SIZE, GROUPBY_BUCKET_SIZE>(shmem_extent_t{});

using row_hash_t =
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
Expand All @@ -75,23 +75,23 @@ using global_set_t = cuco::static_set<cudf::size_type,
row_comparator_t,
probing_scheme_t,
cudf::detail::cuco_allocator<char>,
cuco::storage<GROUPBY_WINDOW_SIZE>>;
cuco::storage<GROUPBY_BUCKET_SIZE>>;

using nullable_global_set_t = cuco::static_set<cudf::size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
nullable_row_comparator_t,
probing_scheme_t,
cudf::detail::cuco_allocator<char>,
cuco::storage<GROUPBY_WINDOW_SIZE>>;
cuco::storage<GROUPBY_BUCKET_SIZE>>;

template <typename Op>
using hash_set_ref_t = cuco::static_set_ref<
cudf::size_type,
cuda::thread_scope_device,
row_comparator_t,
probing_scheme_t,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, cuco::window_extent<int64_t>>,
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, cuco::bucket_extent<int64_t>>,
Op>;

template <typename Op>
Expand All @@ -100,6 +100,6 @@ using nullable_hash_set_ref_t = cuco::static_set_ref<
cuda::thread_scope_device,
nullable_row_comparator_t,
probing_scheme_t,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, cuco::window_extent<int64_t>>,
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, cuco::bucket_extent<int64_t>>,
Op>;
} // namespace cudf::groupby::detail::hash
6 changes: 6 additions & 0 deletions cpp/src/io/fst/logical_stack.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,12 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols,
stream));
}

// Check if the last element of d_kv_operations is 0. If not, then we have a problem.
if (num_symbols_in && !supports_reset_op) {
StackOpT last_symbol = d_kv_ops_current.element(num_symbols_in - 1, stream);
CUDF_EXPECTS(last_symbol.stack_level == 0, "The logical stack is not empty!");
}

// Stable radix sort, sorting by stack level of the operations
d_kv_operations_unsigned = cub::DoubleBuffer<StackOpUnsignedT>{
reinterpret_cast<StackOpUnsignedT*>(d_kv_operations.Current()),
Expand Down
10 changes: 4 additions & 6 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1473,10 +1473,11 @@ void get_stack_context(device_span<SymbolT const> json_in,
to_stack_op::start_state,
stream);

auto stack_ops_bufsize = d_num_stack_ops.value(stream);
// Copy back to actual number of stack operations
auto num_stack_ops = d_num_stack_ops.value(stream);
// Sequence of stack symbols and their position in the original input (sparse representation)
rmm::device_uvector<StackSymbolT> stack_ops{stack_ops_bufsize, stream};
rmm::device_uvector<SymbolOffsetT> stack_op_indices{stack_ops_bufsize, stream};
rmm::device_uvector<StackSymbolT> stack_ops{num_stack_ops, stream};
rmm::device_uvector<SymbolOffsetT> stack_op_indices{num_stack_ops, stream};

// Run bracket-brace FST to retrieve starting positions of structs and lists
json_to_stack_ops_fst.Transduce(json_in.begin(),
Expand All @@ -1487,9 +1488,6 @@ void get_stack_context(device_span<SymbolT const> json_in,
to_stack_op::start_state,
stream);

// Copy back to actual number of stack operations
auto const num_stack_ops = d_num_stack_ops.value(stream);

// Stack operations with indices are converted to top of the stack for each character in the input
if (stack_behavior == stack_behavior_t::ResetOnDelimiter) {
fst::sparse_stack_op_to_top_of_stack<fst::stack_op_support::WITH_RESET_SUPPORT, StackLevelT>(
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ CUDF_KERNEL void __launch_bounds__(block_size)

for (size_type i = 0; i < dict.map_slots.size(); i += block_size) {
if (t + i < dict.map_slots.size()) {
auto window = dict.map_slots.begin() + t + i;
// Collect all slots from each window.
for (auto& slot : *window) {
auto bucket = dict.map_slots.begin() + t + i;
// Collect all slots from each bucket.
for (auto& slot : *bucket) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
auto constexpr bucket_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_type = cuco::bucket_storage<slot_type,
bucket_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using bucket_type = typename storage_type::bucket_type;
using slot_type = cuco::pair<key_type, mapped_type>;

auto constexpr KEY_SENTINEL = size_type{-1};
Expand Down Expand Up @@ -193,7 +193,7 @@ struct StripeStream {
*/
struct stripe_dictionary {
// input
device_span<window_type> map_slots; // hash map (windows) storage
device_span<bucket_type> map_slots; // hash map (buckets) storage
uint32_t column_idx = 0; // column index
size_type start_row = 0; // first row in the stripe
size_type start_rowgroup = 0; // first rowgroup in the stripe
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ struct map_find_fn {

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
populate_chunk_hash_maps_kernel(device_span<window_type> const map_storage,
populate_chunk_hash_maps_kernel(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto const col_idx = blockIdx.y;
Expand Down Expand Up @@ -239,7 +239,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
collect_map_entries_kernel(device_span<window_type> const map_storage,
collect_map_entries_kernel(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks)
{
auto& chunk = chunks[blockIdx.x];
Expand All @@ -251,11 +251,11 @@ CUDF_KERNEL void __launch_bounds__(block_size)
if (t == 0) { new (&counter) cuda::atomic<size_type, SCOPE>{0}; }
__syncthreads();

// Iterate over all windows in the map.
// Iterate over all buckets in the map.
for (; t < chunk.dict_map_size; t += block_size) {
auto window = map_storage.data() + chunk.dict_map_offset + t;
// Collect all slots from each window.
for (auto& slot : *window) {
auto bucket = map_storage.data() + chunk.dict_map_offset + t;
// Collect all slots from each bucket.
for (auto& slot : *bucket) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto const loc = counter.fetch_add(1, memory_order_relaxed);
Expand All @@ -272,7 +272,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
get_dictionary_indices_kernel(device_span<window_type> const map_storage,
get_dictionary_indices_kernel(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto const col_idx = blockIdx.y;
Expand Down Expand Up @@ -302,7 +302,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)
s_ck_start_val_idx);
}

void populate_chunk_hash_maps(device_span<window_type> const map_storage,
void populate_chunk_hash_maps(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
Expand All @@ -311,7 +311,7 @@ void populate_chunk_hash_maps(device_span<window_type> const map_storage,
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_storage, frags);
}

void collect_map_entries(device_span<window_type> const map_storage,
void collect_map_entries(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream)
{
Expand All @@ -320,7 +320,7 @@ void collect_map_entries(device_span<window_type> const map_storage,
<<<chunks.size(), block_size, 0, stream.value()>>>(map_storage, chunks);
}

void get_dictionary_indices(device_span<window_type> const map_storage,
void get_dictionary_indices(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/io/parquet/parquet_gpu.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
auto constexpr bucket_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
Expand All @@ -43,12 +43,12 @@ auto constexpr KEY_SENTINEL = key_type{-1};
auto constexpr VALUE_SENTINEL = mapped_type{-1};
auto constexpr SCOPE = cuda::thread_scope_block;

using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_type = cuco::bucket_storage<slot_type,
bucket_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using bucket_type = typename storage_type::bucket_type;

/**
* @brief Return the byte length of parquet dtypes that are physically represented by INT32
Expand Down Expand Up @@ -100,7 +100,7 @@ inline size_type __device__ row_to_value_idx(size_type idx,
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void populate_chunk_hash_maps(device_span<window_type> const map_storage,
void populate_chunk_hash_maps(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand All @@ -111,7 +111,7 @@ void populate_chunk_hash_maps(device_span<window_type> const map_storage,
* @param chunks Flat span of chunks to compact hash maps for
* @param stream CUDA stream to use
*/
void collect_map_entries(device_span<window_type> const map_storage,
void collect_map_entries(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream);

Expand All @@ -128,7 +128,7 @@ void collect_map_entries(device_span<window_type> const map_storage,
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void get_dictionary_indices(device_span<window_type> const map_storage,
void get_dictionary_indices(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
} else {
chunk.use_dictionary = true;
chunk.dict_map_size =
static_cast<cudf::size_type>(cuco::make_window_extent<map_cg_size, window_size>(
static_cast<cudf::size_type>(cuco::make_bucket_extent<map_cg_size, bucket_size>(
static_cast<cudf::size_type>(occupancy_factor * chunk.num_values)));
chunk.dict_map_offset = total_map_storage_size;
total_map_storage_size += chunk.dict_map_size;
Expand All @@ -1317,7 +1317,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
total_map_storage_size,
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream}};
// Create a span of non-const map_storage as map_storage_ref takes in a non-const pointer.
device_span<window_type> const map_storage_data{map_storage.data(), total_map_storage_size};
device_span<bucket_type> const map_storage_data{map_storage.data(), total_map_storage_size};

// Synchronize
chunks.host_to_device_async(stream);
Expand Down
Loading

0 comments on commit f34b519

Please sign in to comment.