Skip to content

Commit

Permalink
Fix invalid memory access in Parquet reader (#14637)
Browse files Browse the repository at this point in the history
Fixes #14633

When reading files in multiple passes, some pointer fields in `ColumnChunkDesc` that point to transient memory are not cleared out at the end of each pass. This can lead to trying to dereference deallocated memory during Parquet reader string preprocessing.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #14637
  • Loading branch information
etseidl authored Dec 19, 2023
1 parent 5dfafaf commit cf13b86
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 24 deletions.
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ __global__ void __launch_bounds__(preprocess_block_size)
{rep_runs}};

// setup page info
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, all_types_filter{}, false)) { return; }
if (!setupLocalPageInfo(
s, pp, chunks, min_row, num_rows, all_types_filter{}, page_processing_stage::PREPROCESS)) {
return;
}

// initialize the stream decoders (requires values computed in setupLocalPageInfo)
// the size of the rolling batch buffer
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ __global__ void __launch_bounds__(decode_block_size)
min_row,
num_rows,
mask_filter{decode_kernel_mask::GENERAL},
true)) {
page_processing_stage::DECODE)) {
return;
}

Expand Down
23 changes: 15 additions & 8 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,12 @@ struct mask_filter {
}
};

enum class page_processing_stage {
PREPROCESS,
STRING_BOUNDS,
DECODE,
};

/**
* @brief Sets up block-local page state information from the global pages.
*
Expand All @@ -1023,7 +1029,7 @@ struct mask_filter {
* @param[in] min_row Crop all rows below min_row
* @param[in] num_rows Maximum number of rows to read
* @param[in] filter Filtering function used to decide which pages to operate on
* @param[in] is_decode_step If we are setting up for the decode step (instead of the preprocess)
* @param[in] stage What stage of the decoding process is this being called from
* @tparam Filter Function that takes a PageInfo reference and returns true if the given page should
* be operated on Currently only used by gpuComputePageSizes step)
* @return True if this page should be processed further
Expand All @@ -1035,7 +1041,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
size_t min_row,
size_t num_rows,
Filter filter,
bool is_decode_step)
page_processing_stage stage)
{
int t = threadIdx.x;

Expand Down Expand Up @@ -1126,7 +1132,8 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
//
// NOTE: this check needs to be done after the null counts have been zeroed out
bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;
if (is_decode_step && s->num_rows == 0 &&
if ((stage == page_processing_stage::STRING_BOUNDS || stage == page_processing_stage::DECODE) &&
s->num_rows == 0 &&
!(has_repetition && (is_bounds_page(s, min_row, num_rows, has_repetition) ||
is_page_contained(s, min_row, num_rows)))) {
return false;
Expand Down Expand Up @@ -1237,7 +1244,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
// NOTE: in a chunked read situation, s->col.column_data_base and s->col.valid_map_base
// will be aliased to memory that has been freed when we get here in the non-decode step, so
// we cannot check against nullptr. we'll just check a flag directly.
if (is_decode_step) {
if (stage == page_processing_stage::DECODE) {
int max_depth = s->col.max_nesting_depth;
for (int idx = 0; idx < max_depth; idx++) {
PageNestingDecodeInfo* nesting_info = &s->nesting_info[idx];
Expand Down Expand Up @@ -1387,13 +1394,13 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,

// if we're in the decoding step, jump directly to the first
// value we care about
if (is_decode_step) {
if (stage == page_processing_stage::DECODE) {
s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0;
} else {
} else if (stage == page_processing_stage::PREPROCESS) {
s->input_value_count = 0;
s->input_leaf_count = 0;
s->page.skipped_values =
-1; // magic number to indicate it hasn't been set for use inside UpdatePageSizes
// magic number to indicate it hasn't been set for use inside UpdatePageSizes
s->page.skipped_values = -1;
s->page.skipped_leaf_values = 0;
}
}
Expand Down
20 changes: 14 additions & 6 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,13 @@ __global__ void __launch_bounds__(96)
auto* const db = &db_state;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::DELTA_BINARY;
if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::DELTA_BINARY},
page_processing_stage::DECODE)) {
return;
}

Expand Down Expand Up @@ -446,9 +450,13 @@ __global__ void __launch_bounds__(decode_block_size)
auto* const dba = &db_state;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY;
if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::DELTA_BYTE_ARRAY},
page_processing_stage::DECODE)) {
return;
}

Expand Down
40 changes: 32 additions & 8 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,15 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBou

// setup page info
auto const mask = BitOr(decode_kernel_mask::STRING, decode_kernel_mask::DELTA_BYTE_ARRAY);
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; }
if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{mask},
page_processing_stage::STRING_BOUNDS)) {
return;
}

bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

Expand Down Expand Up @@ -659,8 +667,15 @@ __global__ void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPageS
bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0;

// setup page info
auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY;
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; }
if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::DELTA_BYTE_ARRAY},
page_processing_stage::STRING_BOUNDS)) {
return;
}

auto const start_value = pp->start_val;

Expand Down Expand Up @@ -722,8 +737,13 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz
bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0;

// setup page info
if (!setupLocalPageInfo(
s, pp, chunks, min_row, num_rows, mask_filter{decode_kernel_mask::STRING}, true)) {
if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::STRING},
page_processing_stage::STRING_BOUNDS)) {
return;
}

Expand Down Expand Up @@ -816,9 +836,13 @@ __global__ void __launch_bounds__(decode_block_size)
int const lane_id = t % warp_size;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::STRING;
if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::STRING},
page_processing_stage::DECODE)) {
return;
}

Expand Down

0 comments on commit cf13b86

Please sign in to comment.