diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 0638b3e5d5a..5010e116aa6 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -24,6 +24,8 @@ namespace cudf::io::parquet::detail { namespace { +// Unlike cub's algorithm, this provides warp-wide and block-wide results simultaneously. +// Also, this provides the ability to compute warp_bits & lane_mask manually, which we need for lists. struct block_scan_results { uint32_t warp_bits; int thread_count_within_warp; @@ -34,21 +36,34 @@ struct block_scan_results { }; template -static __device__ void scan_block_exclusive_sum(int t, int thread_bit, block_scan_results& results) +static __device__ void scan_block_exclusive_sum(int thread_bit, block_scan_results& results) { - constexpr int num_warps = decode_block_size / cudf::detail::warp_size; + int const t = threadIdx.x; int const warp_index = t / cudf::detail::warp_size; int const warp_lane = t % cudf::detail::warp_size; uint32_t const lane_mask = (uint32_t(1) << warp_lane) - 1; - results.warp_bits = ballot(thread_bit); + uint32_t warp_bits = ballot(thread_bit); + scan_block_exclusive_sum(warp_bits, warp_lane, warp_index, lane_mask, results); +} + +template +static __device__ void scan_block_exclusive_sum(uint32_t warp_bits, int warp_lane, int warp_index, uint32_t lane_mask, block_scan_results& results) +{ + //Compute # warps + constexpr int num_warps = decode_block_size / cudf::detail::warp_size; + + //Compute the warp-wide results + results.warp_bits = warp_bits; results.warp_count = __popc(results.warp_bits); results.thread_count_within_warp = __popc(results.warp_bits & lane_mask); + //Share the warp counts amongst the block threads __shared__ int warp_counts[num_warps]; if (warp_lane == 0) { warp_counts[warp_index] = results.warp_count; } __syncthreads(); + //Compute block-wide results results.block_count = 0; results.thread_count_within_block = results.thread_count_within_warp; for (int warp_idx = 0; warp_idx < num_warps; ++warp_idx) { @@ -244,9 +259,9 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested( int const row_index_lower_bound = s->row_index_lower_bound; - int const max_depth = s->col.max_nesting_depth - 1; - auto& max_depth_ni = s->nesting_info[max_depth]; - int valid_count = max_depth_ni.valid_count; + int const max_depth = s->col.max_nesting_depth - 1; + auto& max_depth_ni = s->nesting_info[max_depth]; + int max_depth_valid_count = max_depth_ni.valid_count; __syncthreads(); @@ -280,7 +295,7 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested( // thread and block validity count block_scan_results valid_count_results; - scan_block_exclusive_sum(t, is_valid, valid_count_results); + scan_block_exclusive_sum(is_valid, valid_count_results); uint32_t const warp_validity_mask = valid_count_results.warp_bits; int thread_valid_count = valid_count_results.thread_count_within_block; int block_valid_count = valid_count_results.block_count; @@ -320,11 +335,11 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested( if (is_valid) { // for non-list types, the value count is always the same across int const dst_pos = value_count + thread_value_count; - int const src_pos = valid_count + thread_valid_count; + int const src_pos = max_depth_valid_count + thread_valid_count; sb->nz_idx[rolling_index(src_pos)] = dst_pos; } // update stuff - valid_count += block_valid_count; + max_depth_valid_count += block_valid_count; } } // end depth loop @@ -334,13 +349,13 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested( if (t == 0) { // update valid value count for decoding and total # of values we've processed - max_depth_ni.valid_count = valid_count; - s->nz_count = valid_count; + max_depth_ni.valid_count = max_depth_valid_count; + s->nz_count = max_depth_valid_count; s->input_value_count = value_count; s->input_row_count = value_count; } - return valid_count; + return max_depth_valid_count; } template @@ -390,7 +405,7 @@ static __device__ int gpuUpdateValidityAndRowIndicesFlat( // thread and block validity count block_scan_results valid_count_results; - scan_block_exclusive_sum(t, is_valid, valid_count_results); + scan_block_exclusive_sum(is_valid, valid_count_results); uint32_t const warp_validity_mask = valid_count_results.warp_bits; int thread_valid_count = valid_count_results.thread_count_within_block; int block_valid_count = valid_count_results.block_count; @@ -671,7 +686,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t) int valid_count = 0; // the core loop. decode batches of level stream data using rle_stream objects // and pass the results to gpuDecodeValues - while (s->error == 0 && processed_count < s->page.num_input_values) { + // For chunked reads we may not process all of the rows on the page; if not stop early + int last_row = s->first_row + s->num_rows; + while ((s->error == 0) && (processed_count < s->page.num_input_values) && + (s->input_row_count <= last_row)) { int next_valid_count; // only need to process definition levels if this is a nullable column