Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into comp-headers-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
vuule authored Dec 19, 2024
2 parents 324d635 + 88df0ad commit 350db40
Show file tree
Hide file tree
Showing 14 changed files with 541 additions and 267 deletions.
2 changes: 1 addition & 1 deletion cpp/src/groupby/hash/compute_groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ std::unique_ptr<table> compute_groupby(table_view const& keys,
d_row_equal,
probing_scheme_t{d_row_hash},
cuco::thread_scope_device,
cuco::storage<GROUPBY_WINDOW_SIZE>{},
cuco::storage<GROUPBY_BUCKET_SIZE>{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/groupby/hash/compute_mapping_indices.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ CUDF_KERNEL void mapping_indices_kernel(cudf::size_type num_input_rows,
__shared__ cudf::size_type shared_set_indices[GROUPBY_SHM_MAX_ELEMENTS];

// Shared set initialization
__shared__ cuco::window<cudf::size_type, GROUPBY_WINDOW_SIZE> windows[window_extent.value()];
__shared__ cuco::bucket<cudf::size_type, GROUPBY_BUCKET_SIZE> buckets[bucket_extent.value()];

auto raw_set = cuco::static_set_ref{
cuco::empty_key<cudf::size_type>{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
global_set.key_eq(),
probing_scheme_t{global_set.hash_function()},
cuco::thread_scope_block,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, decltype(window_extent)>{
window_extent, windows}};
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, decltype(bucket_extent)>{
bucket_extent, buckets}};
auto shared_set = raw_set.rebind_operators(cuco::insert_and_find);

auto const block = cooperative_groups::this_thread_block();
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/groupby/hash/helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace cudf::groupby::detail::hash {
CUDF_HOST_DEVICE auto constexpr GROUPBY_CG_SIZE = 1;

/// Number of slots per thread
CUDF_HOST_DEVICE auto constexpr GROUPBY_WINDOW_SIZE = 1;
CUDF_HOST_DEVICE auto constexpr GROUPBY_BUCKET_SIZE = 1;

/// Thread block size
CUDF_HOST_DEVICE auto constexpr GROUPBY_BLOCK_SIZE = 128;
Expand All @@ -48,9 +48,9 @@ using shmem_extent_t =
cuco::extent<cudf::size_type,
static_cast<cudf::size_type>(static_cast<double>(GROUPBY_SHM_MAX_ELEMENTS) * 1.43)>;

/// Number of windows needed by each shared memory hash set
CUDF_HOST_DEVICE auto constexpr window_extent =
cuco::make_window_extent<GROUPBY_CG_SIZE, GROUPBY_WINDOW_SIZE>(shmem_extent_t{});
/// Number of buckets needed by each shared memory hash set
CUDF_HOST_DEVICE auto constexpr bucket_extent =
cuco::make_bucket_extent<GROUPBY_CG_SIZE, GROUPBY_BUCKET_SIZE>(shmem_extent_t{});

using row_hash_t =
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
Expand All @@ -75,23 +75,23 @@ using global_set_t = cuco::static_set<cudf::size_type,
row_comparator_t,
probing_scheme_t,
cudf::detail::cuco_allocator<char>,
cuco::storage<GROUPBY_WINDOW_SIZE>>;
cuco::storage<GROUPBY_BUCKET_SIZE>>;

using nullable_global_set_t = cuco::static_set<cudf::size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
nullable_row_comparator_t,
probing_scheme_t,
cudf::detail::cuco_allocator<char>,
cuco::storage<GROUPBY_WINDOW_SIZE>>;
cuco::storage<GROUPBY_BUCKET_SIZE>>;

template <typename Op>
using hash_set_ref_t = cuco::static_set_ref<
cudf::size_type,
cuda::thread_scope_device,
row_comparator_t,
probing_scheme_t,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, cuco::window_extent<int64_t>>,
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, cuco::bucket_extent<int64_t>>,
Op>;

template <typename Op>
Expand All @@ -100,6 +100,6 @@ using nullable_hash_set_ref_t = cuco::static_set_ref<
cuda::thread_scope_device,
nullable_row_comparator_t,
probing_scheme_t,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, cuco::window_extent<int64_t>>,
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, cuco::bucket_extent<int64_t>>,
Op>;
} // namespace cudf::groupby::detail::hash
6 changes: 3 additions & 3 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ CUDF_KERNEL void __launch_bounds__(block_size)

for (size_type i = 0; i < dict.map_slots.size(); i += block_size) {
if (t + i < dict.map_slots.size()) {
auto window = dict.map_slots.begin() + t + i;
// Collect all slots from each window.
for (auto& slot : *window) {
auto bucket = dict.map_slots.begin() + t + i;
// Collect all slots from each bucket.
for (auto& slot : *bucket) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
auto constexpr bucket_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_type = cuco::bucket_storage<slot_type,
bucket_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using bucket_type = typename storage_type::bucket_type;
using slot_type = cuco::pair<key_type, mapped_type>;

auto constexpr KEY_SENTINEL = size_type{-1};
Expand Down Expand Up @@ -193,7 +193,7 @@ struct StripeStream {
*/
struct stripe_dictionary {
// input
device_span<window_type> map_slots; // hash map (windows) storage
device_span<bucket_type> map_slots; // hash map (buckets) storage
uint32_t column_idx = 0; // column index
size_type start_row = 0; // first row in the stripe
size_type start_rowgroup = 0; // first rowgroup in the stripe
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ struct map_find_fn {

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
populate_chunk_hash_maps_kernel(device_span<window_type> const map_storage,
populate_chunk_hash_maps_kernel(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto const col_idx = blockIdx.y;
Expand Down Expand Up @@ -239,7 +239,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
collect_map_entries_kernel(device_span<window_type> const map_storage,
collect_map_entries_kernel(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks)
{
auto& chunk = chunks[blockIdx.x];
Expand All @@ -251,11 +251,11 @@ CUDF_KERNEL void __launch_bounds__(block_size)
if (t == 0) { new (&counter) cuda::atomic<size_type, SCOPE>{0}; }
__syncthreads();

// Iterate over all windows in the map.
// Iterate over all buckets in the map.
for (; t < chunk.dict_map_size; t += block_size) {
auto window = map_storage.data() + chunk.dict_map_offset + t;
// Collect all slots from each window.
for (auto& slot : *window) {
auto bucket = map_storage.data() + chunk.dict_map_offset + t;
// Collect all slots from each bucket.
for (auto& slot : *bucket) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto const loc = counter.fetch_add(1, memory_order_relaxed);
Expand All @@ -272,7 +272,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
get_dictionary_indices_kernel(device_span<window_type> const map_storage,
get_dictionary_indices_kernel(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto const col_idx = blockIdx.y;
Expand Down Expand Up @@ -302,7 +302,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)
s_ck_start_val_idx);
}

void populate_chunk_hash_maps(device_span<window_type> const map_storage,
void populate_chunk_hash_maps(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
Expand All @@ -311,7 +311,7 @@ void populate_chunk_hash_maps(device_span<window_type> const map_storage,
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_storage, frags);
}

void collect_map_entries(device_span<window_type> const map_storage,
void collect_map_entries(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream)
{
Expand All @@ -320,7 +320,7 @@ void collect_map_entries(device_span<window_type> const map_storage,
<<<chunks.size(), block_size, 0, stream.value()>>>(map_storage, chunks);
}

void get_dictionary_indices(device_span<window_type> const map_storage,
void get_dictionary_indices(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/io/parquet/parquet_gpu.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
auto constexpr bucket_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
Expand All @@ -43,12 +43,12 @@ auto constexpr KEY_SENTINEL = key_type{-1};
auto constexpr VALUE_SENTINEL = mapped_type{-1};
auto constexpr SCOPE = cuda::thread_scope_block;

using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_type = cuco::bucket_storage<slot_type,
bucket_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using bucket_type = typename storage_type::bucket_type;

/**
* @brief Return the byte length of parquet dtypes that are physically represented by INT32
Expand Down Expand Up @@ -100,7 +100,7 @@ inline size_type __device__ row_to_value_idx(size_type idx,
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void populate_chunk_hash_maps(device_span<window_type> const map_storage,
void populate_chunk_hash_maps(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand All @@ -111,7 +111,7 @@ void populate_chunk_hash_maps(device_span<window_type> const map_storage,
* @param chunks Flat span of chunks to compact hash maps for
* @param stream CUDA stream to use
*/
void collect_map_entries(device_span<window_type> const map_storage,
void collect_map_entries(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream);

Expand All @@ -128,7 +128,7 @@ void collect_map_entries(device_span<window_type> const map_storage,
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void get_dictionary_indices(device_span<window_type> const map_storage,
void get_dictionary_indices(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
} else {
chunk.use_dictionary = true;
chunk.dict_map_size =
static_cast<cudf::size_type>(cuco::make_window_extent<map_cg_size, window_size>(
static_cast<cudf::size_type>(cuco::make_bucket_extent<map_cg_size, bucket_size>(
static_cast<cudf::size_type>(occupancy_factor * chunk.num_values)));
chunk.dict_map_offset = total_map_storage_size;
total_map_storage_size += chunk.dict_map_size;
Expand All @@ -1318,7 +1318,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
total_map_storage_size,
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream}};
// Create a span of non-const map_storage as map_storage_ref takes in a non-const pointer.
device_span<window_type> const map_storage_data{map_storage.data(), total_map_storage_size};
device_span<bucket_type> const map_storage_data{map_storage.data(), total_map_storage_size};

// Synchronize
chunks.host_to_device_async(stream);
Expand Down
46 changes: 26 additions & 20 deletions python/cudf/cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,15 @@ def read_json(
if cudf.get_option("io.json.low_memory") and lines:
res_cols, res_col_names, res_child_names = (
plc.io.json.chunked_read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=c_on_bad_lines,
plc.io.json._setup_json_reader_options(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=c_on_bad_lines,
)
)
)
df = cudf.DataFrame._from_data(
Expand All @@ -181,19 +183,23 @@ def read_json(
return df
else:
table_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset=byte_range[0]
if byte_range is not None
else 0,
byte_range_size=byte_range[1] if byte_range is not None else 0,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=c_on_bad_lines,
extra_parameters=kwargs,
plc.io.json._setup_json_reader_options(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset=byte_range[0]
if byte_range is not None
else 0,
byte_range_size=byte_range[1]
if byte_range is not None
else 0,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=c_on_bad_lines,
extra_parameters=kwargs,
)
)

df = cudf.DataFrame._from_data(
Expand Down
10 changes: 6 additions & 4 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,12 @@ def slice_skip(tbl: plc.Table):
(name, typ, []) for name, typ in schema.items()
]
plc_tbl_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(paths),
lines=True,
dtypes=json_schema,
prune_columns=True,
plc.io.json._setup_json_reader_options(
plc.io.SourceInfo(paths),
lines=True,
dtypes=json_schema,
prune_columns=True,
)
)
# TODO: I don't think cudf-polars supports nested types in general right now
# (but when it does, we should pass child column names from nested columns in)
Expand Down
Loading

0 comments on commit 350db40

Please sign in to comment.