Skip to content

Commit

Permalink
Merge pull request #15358 from rapidsai/branch-24.04
Browse files Browse the repository at this point in the history
Forward-merge branch-24.04 to branch-24.06
  • Loading branch information
GPUtester authored Mar 20, 2024
2 parents 79536a3 + 08bd783 commit aa7d991
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 53 deletions.
69 changes: 36 additions & 33 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ struct split_info {
};

struct cumulative_page_info {
size_t row_index; // row index
size_t size_bytes; // cumulative size in bytes
int key; // schema index
size_t end_row_index; // end row index (start_row + num_rows for the corresponding page)
size_t size_bytes; // cumulative size in bytes
int key; // schema index
};

// the minimum amount of memory we can safely expect to be enough to
Expand Down Expand Up @@ -260,7 +260,7 @@ struct set_row_index {
auto const& chunk = chunks[page.chunk_idx];
size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows;
// if we have been passed in a cap, apply it
c_info[i].row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row;
c_info[i].end_row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row;
}
};

Expand Down Expand Up @@ -293,13 +293,13 @@ struct page_total_size {
auto const end = key_offsets[idx + 1];
auto iter = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<size_t>([&] __device__(size_type i) {
return c_info[i].row_index;
return c_info[i].end_row_index;
}));
auto const page_index =
thrust::lower_bound(thrust::seq, iter + start, iter + end, i.row_index) - iter;
thrust::lower_bound(thrust::seq, iter + start, iter + end, i.end_row_index) - iter;
sum += c_info[page_index].size_bytes;
}
return {i.row_index, sum, i.key};
return {i.end_row_index, sum, i.key};
}
};

Expand All @@ -318,18 +318,9 @@ size_t find_start_index(cudf::host_span<cumulative_page_info const> aggregated_i
size_t start_row)
{
auto start = thrust::make_transform_iterator(
aggregated_info.begin(), [&](cumulative_page_info const& i) { return i.row_index; });
auto start_index =
thrust::lower_bound(thrust::host, start, start + aggregated_info.size(), start_row) - start;

// cumulative_page_info.row_index is the -end- of the rows of a given page. so move forward until
// we find the next group of pages
while (start_index < (static_cast<int64_t>(aggregated_info.size()) - 1) &&
(start_index < 0 || aggregated_info[start_index].row_index == start_row)) {
start_index++;
}

return start_index;
aggregated_info.begin(), [&](cumulative_page_info const& i) { return i.end_row_index; });
return thrust::lower_bound(thrust::host, start, start + aggregated_info.size(), start_row) -
start;
}

/**
Expand All @@ -353,16 +344,17 @@ int64_t find_next_split(int64_t cur_pos,
int64_t split_pos = thrust::lower_bound(thrust::seq, start + cur_pos, end, size_limit) - start;

// if we're past the end, or if the returned bucket is > than the chunk_read_limit, move back
// one.
// one as long as this doesn't put us before our starting point.
if (static_cast<size_t>(split_pos) >= sizes.size() ||
(sizes[split_pos].size_bytes - cur_cumulative_size > size_limit)) {
((split_pos > cur_pos) && (sizes[split_pos].size_bytes - cur_cumulative_size > size_limit))) {
split_pos--;
}

// cumulative_page_info.row_index is the -end- of the rows of a given page. so move forward until
// we find the next group of pages
// move forward until we find the next group of pages that will actually advance our row count.
// this guarantees that even if we cannot fit the set of rows represented by our where our cur_pos
// is, we will still move forward instead of failing.
while (split_pos < (static_cast<int64_t>(sizes.size()) - 1) &&
(split_pos < 0 || sizes[split_pos].row_index == cur_row_index)) {
(sizes[split_pos].end_row_index == cur_row_index)) {
split_pos++;
}

Expand Down Expand Up @@ -413,7 +405,7 @@ template <typename T = uint8_t>
struct row_count_less {
__device__ bool operator()(cumulative_page_info const& a, cumulative_page_info const& b) const
{
return a.row_index < b.row_index;
return a.end_row_index < b.end_row_index;
}
};

Expand Down Expand Up @@ -501,10 +493,10 @@ struct page_span {
size_t start, end;
};

struct get_page_row_index {
struct get_page_end_row_index {
device_span<cumulative_page_info const> c_info;

__device__ size_t operator()(size_t i) const { return c_info[i].row_index; }
__device__ size_t operator()(size_t i) const { return c_info[i].end_row_index; }
};

/**
Expand All @@ -514,15 +506,18 @@ struct get_page_row_index {
template <typename RowIndexIter>
struct get_page_span {
device_span<size_type const> page_offsets;
device_span<ColumnChunkDesc const> chunks;
RowIndexIter page_row_index;
size_t const start_row;
size_t const end_row;

get_page_span(device_span<size_type const> _page_offsets,
device_span<ColumnChunkDesc const> _chunks,
RowIndexIter _page_row_index,
size_t _start_row,
size_t _end_row)
: page_offsets(_page_offsets),
chunks(_chunks),
page_row_index(_page_row_index),
start_row(_start_row),
end_row(_end_row)
Expand All @@ -535,12 +530,17 @@ struct get_page_span {
auto const column_page_start = page_row_index + first_page_index;
auto const column_page_end = page_row_index + page_offsets[column_index + 1];
auto const num_pages = column_page_end - column_page_start;
bool const is_list = chunks[column_index].max_level[level_type::REPETITION] > 0;

auto start_page =
(thrust::lower_bound(thrust::seq, column_page_start, column_page_end, start_row) -
column_page_start) +
first_page_index;
if (page_row_index[start_page] == start_row) { start_page++; }
// list rows can span page boundaries, so it is not always safe to assume that the row
// represented by end_row_index starts on the subsequent page. It is possible that
// the values for row end_row_index start within the page itself. so we must
// include the page in that case.
if (page_row_index[start_page] == start_row && !is_list) { start_page++; }

auto end_page = (thrust::lower_bound(thrust::seq, column_page_start, column_page_end, end_row) -
column_page_start) +
Expand Down Expand Up @@ -623,6 +623,7 @@ struct copy_subpass_page {
*
* @param c_info The cumulative page size information (row count and byte size) per column
* @param pages All of the pages in the pass
* @param chunks All of the chunks in the pass
* @param page_offsets Offsets into the pages array representing the first page for each column
* @param start_row The row to start the subpass at
* @param size_limit The size limit in bytes of the subpass
Expand All @@ -636,6 +637,7 @@ struct copy_subpass_page {
std::tuple<rmm::device_uvector<page_span>, size_t, size_t> compute_next_subpass(
device_span<cumulative_page_info const> c_info,
device_span<PageInfo const> pages,
device_span<ColumnChunkDesc const> chunks,
device_span<size_type const> page_offsets,
size_t start_row,
size_t size_limit,
Expand All @@ -658,18 +660,18 @@ std::tuple<rmm::device_uvector<page_span>, size_t, size_t> compute_next_subpass(
start_row == 0 || start_index == 0 ? 0 : h_aggregated_info[start_index - 1].size_bytes;
auto const end_index =
find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit);
auto const end_row = h_aggregated_info[end_index].row_index;
auto const end_row = h_aggregated_info[end_index].end_row_index;

// for each column, collect the set of pages that spans start_row / end_row
rmm::device_uvector<page_span> page_bounds(num_columns, stream);
auto iter = thrust::make_counting_iterator(size_t{0});
auto page_row_index =
cudf::detail::make_counting_transform_iterator(0, get_page_row_index{c_info});
cudf::detail::make_counting_transform_iterator(0, get_page_end_row_index{c_info});
thrust::transform(rmm::exec_policy_nosync(stream),
iter,
iter + num_columns,
page_bounds.begin(),
get_page_span{page_offsets, page_row_index, start_row, end_row});
get_page_span{page_offsets, chunks, page_row_index, start_row, end_row});

// total page count over all columns
auto page_count_iter = thrust::make_transform_iterator(page_bounds.begin(), get_span_size{});
Expand Down Expand Up @@ -700,13 +702,13 @@ std::vector<row_range> compute_page_splits_by_row(device_span<cumulative_page_in
size_t cur_pos = find_start_index(h_aggregated_info, skip_rows);
size_t cur_row_index = skip_rows;
size_t cur_cumulative_size = 0;
auto const max_row = min(skip_rows + num_rows, h_aggregated_info.back().row_index);
auto const max_row = min(skip_rows + num_rows, h_aggregated_info.back().end_row_index);
while (cur_row_index < max_row) {
auto const split_pos =
find_next_split(cur_pos, cur_row_index, cur_cumulative_size, h_aggregated_info, size_limit);

auto const start_row = cur_row_index;
cur_row_index = min(max_row, h_aggregated_info[split_pos].row_index);
cur_row_index = min(max_row, h_aggregated_info[split_pos].end_row_index);
splits.push_back({start_row, cur_row_index - start_row});
cur_pos = split_pos;
cur_cumulative_size = h_aggregated_info[split_pos].size_bytes;
Expand Down Expand Up @@ -1375,6 +1377,7 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
// get the next batch of pages
return compute_next_subpass(c_info,
pass.pages,
pass.chunks,
pass.page_offsets,
pass.processed_rows + pass.skip_rows,
remaining_read_limit,
Expand Down
30 changes: 14 additions & 16 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -700,16 +700,16 @@ struct set_list_row_count_estimate {
struct set_final_row_count {
device_span<PageInfo> pages;
device_span<const ColumnChunkDesc> chunks;
device_span<const size_type> page_offsets;
size_t const max_row;

__device__ void operator()(size_t i)
{
auto const last_page_index = page_offsets[i + 1] - 1;
auto const& page = pages[last_page_index];
auto const& chunk = chunks[page.chunk_idx];
size_t const page_start_row = chunk.start_row + page.chunk_row;
pages[last_page_index].num_rows = max_row - page_start_row;
auto& page = pages[i];
auto const& chunk = chunks[page.chunk_idx];
// only do this for the last page in each chunk
if (i < pages.size() - 1 && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; }
size_t const page_start_row = chunk.start_row + page.chunk_row;
size_t const chunk_last_row = chunk.start_row + chunk.num_rows;
page.num_rows = chunk_last_row - page_start_row;
}
};

Expand Down Expand Up @@ -1300,17 +1300,15 @@ void reader::impl::generate_list_column_row_count_estimates()
chunk_row_output_iter{pass.pages.device_ptr()});
}

// finally, fudge the last page for each column such that it ends on the real known row count
// for the pass. this is so that as we march through the subpasses, we will find that every column
// cleanly ends up the expected row count at the row group boundary.
auto const& last_chunk = pass.chunks[pass.chunks.size() - 1];
auto const num_columns = _input_columns.size();
size_t const max_row = last_chunk.start_row + last_chunk.num_rows;
auto iter = thrust::make_counting_iterator(0);
// to compensate for the list row size estimates, force the row count on the last page for each
// column chunk (each rowgroup) such that it ends on the real known row count. this is so that as
// we march through the subpasses, we will find that every column cleanly ends up the expected row
// count at the row group boundary and our split computations work correctly.
auto iter = thrust::make_counting_iterator(0);
thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + num_columns,
set_final_row_count{pass.pages, pass.chunks, pass.page_offsets, max_row});
iter + pass.pages.size(),
set_final_row_count{pass.pages, pass.chunks});

pass.chunks.device_to_host_async(_stream);
pass.pages.device_to_host_async(_stream);
Expand Down
86 changes: 82 additions & 4 deletions cpp/tests/io/parquet_chunked_reader_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ auto write_file(std::vector<std::unique_ptr<cudf::column>>& input_columns,
return std::pair{std::move(input_table), std::move(filepath)};
}

auto chunked_read(std::string const& filepath,
auto chunked_read(std::vector<std::string> const& filepaths,
std::size_t output_limit,
std::size_t input_limit = 0)
{
auto const read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build();
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepaths}).build();
auto reader = cudf::io::chunked_parquet_reader(output_limit, input_limit, read_opts);

auto num_chunks = 0;
Expand All @@ -141,6 +141,14 @@ auto chunked_read(std::string const& filepath,
return std::pair(cudf::concatenate(out_tviews), num_chunks);
}

auto chunked_read(std::string const& filepath,
std::size_t output_limit,
std::size_t input_limit = 0)
{
std::vector<std::string> vpath{filepath};
return chunked_read(vpath, output_limit, input_limit);
}

} // namespace

struct ParquetChunkedReaderTest : public cudf::test::BaseFixture {};
Expand Down Expand Up @@ -1113,7 +1121,7 @@ TEST_F(ParquetChunkedReaderInputLimitConstrainedTest, SingleFixedWidthColumn)
input_limit_test_write(test_filenames, tbl);

// semi-reasonable limit
constexpr int expected_a[] = {1, 17, 4, 1};
constexpr int expected_a[] = {1, 25, 5, 1};
input_limit_test_read(test_filenames, tbl, 0, 2 * 1024 * 1024, expected_a);
// an unreasonable limit
constexpr int expected_b[] = {1, 50, 50, 1};
Expand Down Expand Up @@ -1145,7 +1153,7 @@ TEST_F(ParquetChunkedReaderInputLimitConstrainedTest, MixedColumns)

input_limit_test_write(test_filenames, tbl);

constexpr int expected_a[] = {1, 50, 10, 7};
constexpr int expected_a[] = {1, 50, 13, 7};
input_limit_test_read(test_filenames, tbl, 0, 2 * 1024 * 1024, expected_a);
constexpr int expected_b[] = {1, 50, 50, 50};
input_limit_test_read(test_filenames, tbl, 0, 1, expected_b);
Expand Down Expand Up @@ -1227,6 +1235,76 @@ TEST_F(ParquetChunkedReaderInputLimitTest, List)
input_limit_test_read(test_filenames, tbl, 128 * 1024 * 1024, 512 * 1024 * 1024, expected_c);
}

void tiny_list_rowgroup_test(bool just_list_col)
{
auto iter = thrust::make_counting_iterator(0);

// test a specific edge case: a list column composed of multiple row groups, where each row
// group contains a single, relatively small row.
std::vector<int> row_sizes{12, 7, 16, 20, 10, 3, 15};
std::vector<std::unique_ptr<cudf::table>> row_groups;
for (size_t idx = 0; idx < row_sizes.size(); idx++) {
std::vector<std::unique_ptr<cudf::column>> cols;

// add a column before the list
if (!just_list_col) {
cudf::test::fixed_width_column_wrapper<int> int_col({idx});
cols.push_back(int_col.release());
}

// write out the single-row list column as it's own file
cudf::test::fixed_width_column_wrapper<int> values(iter, iter + row_sizes[idx]);
cudf::test::fixed_width_column_wrapper<int> offsets({0, row_sizes[idx]});
cols.push_back(cudf::make_lists_column(1, offsets.release(), values.release(), 0, {}));

// add a column after the list
if (!just_list_col) {
cudf::test::fixed_width_column_wrapper<float> float_col({idx});
cols.push_back(float_col.release());
}

auto tbl = std::make_unique<cudf::table>(std::move(cols));

auto filepath = temp_env->get_temp_filepath("Tlrg" + std::to_string(idx));
auto const write_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *tbl).build();
cudf::io::write_parquet(write_opts);

// store off the table
row_groups.push_back(std::move(tbl));
}

// build expected
std::vector<cudf::table_view> views;
std::transform(row_groups.begin(),
row_groups.end(),
std::back_inserter(views),
[](std::unique_ptr<cudf::table> const& tbl) { return tbl->view(); });
auto expected = cudf::concatenate(views);

// load the individual files all at once
std::vector<std::string> source_files;
std::transform(iter, iter + row_groups.size(), std::back_inserter(source_files), [](int i) {
return temp_env->get_temp_filepath("Tlrg" + std::to_string(i));
});
auto result =
chunked_read(source_files, size_t{2} * 1024 * 1024 * 1024, size_t{2} * 1024 * 1024 * 1024);

CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *(result.first));
}

TEST_F(ParquetChunkedReaderInputLimitTest, TinyListRowGroupsSingle)
{
// test with just a single list column
tiny_list_rowgroup_test(true);
}

TEST_F(ParquetChunkedReaderInputLimitTest, TinyListRowGroupsMixed)
{
// test with other columns mixed in
tiny_list_rowgroup_test(false);
}

struct char_values {
__device__ int8_t operator()(int i)
{
Expand Down

0 comments on commit aa7d991

Please sign in to comment.