Skip to content

Commit

Permalink
tweak scan interface for linked lists
Browse files Browse the repository at this point in the history
  • Loading branch information
pmattione-nvidia committed Sep 24, 2024
1 parent 3ef7b0d commit 254f3e9
Showing 1 changed file with 32 additions and 14 deletions.
46 changes: 32 additions & 14 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace cudf::io::parquet::detail {

namespace {

// Unlike cub's algorithm, this provides warp-wide and block-wide results simultaneously.
// Also, this provides the ability to compute warp_bits & lane_mask manually, which we need for lists.
struct block_scan_results {
uint32_t warp_bits;
int thread_count_within_warp;
Expand All @@ -34,21 +36,34 @@ struct block_scan_results {
};

template <int decode_block_size>
static __device__ void scan_block_exclusive_sum(int t, int thread_bit, block_scan_results& results)
static __device__ void scan_block_exclusive_sum(int thread_bit, block_scan_results& results)
{
constexpr int num_warps = decode_block_size / cudf::detail::warp_size;
int const t = threadIdx.x;
int const warp_index = t / cudf::detail::warp_size;
int const warp_lane = t % cudf::detail::warp_size;
uint32_t const lane_mask = (uint32_t(1) << warp_lane) - 1;

results.warp_bits = ballot(thread_bit);
uint32_t warp_bits = ballot(thread_bit);
scan_block_exclusive_sum<decode_block_size>(warp_bits, warp_lane, warp_index, lane_mask, results);
}

template <int decode_block_size>
static __device__ void scan_block_exclusive_sum(uint32_t warp_bits, int warp_lane, int warp_index, uint32_t lane_mask, block_scan_results& results)
{
//Compute # warps
constexpr int num_warps = decode_block_size / cudf::detail::warp_size;

//Compute the warp-wide results
results.warp_bits = warp_bits;
results.warp_count = __popc(results.warp_bits);
results.thread_count_within_warp = __popc(results.warp_bits & lane_mask);

//Share the warp counts amongst the block threads
__shared__ int warp_counts[num_warps];
if (warp_lane == 0) { warp_counts[warp_index] = results.warp_count; }
__syncthreads();

//Compute block-wide results
results.block_count = 0;
results.thread_count_within_block = results.thread_count_within_warp;
for (int warp_idx = 0; warp_idx < num_warps; ++warp_idx) {
Expand Down Expand Up @@ -244,9 +259,9 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested(

int const row_index_lower_bound = s->row_index_lower_bound;

int const max_depth = s->col.max_nesting_depth - 1;
auto& max_depth_ni = s->nesting_info[max_depth];
int valid_count = max_depth_ni.valid_count;
int const max_depth = s->col.max_nesting_depth - 1;
auto& max_depth_ni = s->nesting_info[max_depth];
int max_depth_valid_count = max_depth_ni.valid_count;

__syncthreads();

Expand Down Expand Up @@ -280,7 +295,7 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested(

// thread and block validity count
block_scan_results valid_count_results;
scan_block_exclusive_sum<decode_block_size>(t, is_valid, valid_count_results);
scan_block_exclusive_sum<decode_block_size>(is_valid, valid_count_results);
uint32_t const warp_validity_mask = valid_count_results.warp_bits;
int thread_valid_count = valid_count_results.thread_count_within_block;
int block_valid_count = valid_count_results.block_count;
Expand Down Expand Up @@ -320,11 +335,11 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested(
if (is_valid) {
// for non-list types, the value count is always the same across
int const dst_pos = value_count + thread_value_count;
int const src_pos = valid_count + thread_valid_count;
int const src_pos = max_depth_valid_count + thread_valid_count;
sb->nz_idx[rolling_index<state_buf::nz_buf_size>(src_pos)] = dst_pos;
}
// update stuff
valid_count += block_valid_count;
max_depth_valid_count += block_valid_count;
}

} // end depth loop
Expand All @@ -334,13 +349,13 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested(

if (t == 0) {
// update valid value count for decoding and total # of values we've processed
max_depth_ni.valid_count = valid_count;
s->nz_count = valid_count;
max_depth_ni.valid_count = max_depth_valid_count;
s->nz_count = max_depth_valid_count;
s->input_value_count = value_count;
s->input_row_count = value_count;
}

return valid_count;
return max_depth_valid_count;
}

template <int decode_block_size, typename level_t, typename state_buf>
Expand Down Expand Up @@ -390,7 +405,7 @@ static __device__ int gpuUpdateValidityAndRowIndicesFlat(

// thread and block validity count
block_scan_results valid_count_results;
scan_block_exclusive_sum<decode_block_size>(t, is_valid, valid_count_results);
scan_block_exclusive_sum<decode_block_size>(is_valid, valid_count_results);
uint32_t const warp_validity_mask = valid_count_results.warp_bits;
int thread_valid_count = valid_count_results.thread_count_within_block;
int block_valid_count = valid_count_results.block_count;
Expand Down Expand Up @@ -671,7 +686,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t)
int valid_count = 0;
// the core loop. decode batches of level stream data using rle_stream objects
// and pass the results to gpuDecodeValues
while (s->error == 0 && processed_count < s->page.num_input_values) {
// For chunked reads we may not process all of the rows on the page; if not stop early
int last_row = s->first_row + s->num_rows;
while ((s->error == 0) && (processed_count < s->page.num_input_values) &&
(s->input_row_count <= last_row)) {
int next_valid_count;

// only need to process definition levels if this is a nullable column
Expand Down

0 comments on commit 254f3e9

Please sign in to comment.