Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding string row size iterator for row to column and column to row conversion #10157

Merged
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 168 additions & 66 deletions java/src/main/native/src/row_conversion.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/lists/lists_column_device_view.cuh>
#include <cudf/scalar/scalar_factories.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/table/table.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
Expand All @@ -34,6 +35,7 @@
#include <rmm/exec_policy.hpp>
#include <thrust/binary_search.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/scan.h>
#include <type_traits>
Expand Down Expand Up @@ -187,6 +189,62 @@ struct batch_data {
std::vector<row_batch> row_batches; // information about each batch such as byte count
};

rmm::device_uvector<size_type> build_string_row_offsets(table_view const &tbl,
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
size_type fixed_width_and_validity_size,
rmm::cuda_stream_view stream) {
auto const num_rows = tbl.num_rows();
rmm::device_uvector<size_type> d_row_offsets(num_rows, stream);
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

std::vector<strings_column_view::offset_iterator> string_columns_offsets;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
auto column_ptrs = thrust::make_transform_iterator(
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
tbl.begin(), [](auto const &col) -> strings_column_view::offset_iterator {
if (!is_fixed_width(col.type())) {
CUDF_EXPECTS(col.type().id() == type_id::STRING, "only string columns are supported!");
strings_column_view scv = col;
return scv.offsets_begin();
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
} else {
return nullptr;
}
});
std::copy_if(column_ptrs, column_ptrs + tbl.num_columns(),
std::back_inserter(string_columns_offsets),
[](auto const &i) { return i != nullptr; });
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

auto const num_columns = static_cast<size_type>(string_columns_offsets.size());

auto d_string_columns_offsets = make_device_uvector_async(string_columns_offsets, stream);
auto key_iter = cudf::detail::make_counting_transform_iterator(
0, [num_columns] __device__(auto element_idx) { return element_idx / num_columns; });
auto data_iter = cudf::detail::make_counting_transform_iterator(
0, [d_string_columns_offsets = d_string_columns_offsets.data(), num_columns,
num_rows] __device__(auto element_idx) {
auto const row = element_idx / num_columns;
auto const col = element_idx % num_columns;

return d_string_columns_offsets[col][row + 1] - d_string_columns_offsets[col][row];
});
Copy link
Contributor

@ttnghia ttnghia Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Humm, from my perspective this computation is inefficient. You are looping col-by-col. That means, for each row, you iteratively access all the cols before going to the next row. Each col will be accessed separately by num_rows times.

Copy link
Contributor

@ttnghia ttnghia Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

auto const row = element_idx % num_rows;
auto const col = element_idx / num_rows;
...

This way, you may not be able to use reduce_by_key. Instead, you need to initialize the d_row_offsets to zero (thrust::uninitialized_fill) then atomicAdd each output value.

Copy link
Contributor

@ttnghia ttnghia Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this solution is more efficient. It should if we have large number of columns. Otherwise I don't know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can have a benchmark to compare the solutions then it's great 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark max columns defaults to 100 and it seems far more likely to have a very large number of rows. With the requirement of keys being consecutive we can't simply flip the math. I will do some performance testing and report back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I performance tested this code and it seems this function runs in about 1.2ms on my PC for 50 columns and 1,000,000 rows of intermixed ints and string columns. With the changes to not use reduce_by_key and march the data in a more natural way this time drops to 0.75ms. This seems worth it even though it removes the chance of the cool transform output iterator suggested in review. Thanks for pushing for this. I dismissed it probably because I was excited to use reduce_by_key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 is that default limit 100 or have I been led astray by my reading?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

march the data in a more natural way this time drops to 0.75ms.

Kudos, @ttnghia and @hyperbolic2346!
I'm having a hard time grokking why this iteration order is faster. All the string columns have to eventually be accessed num_rows times. So this should be a matter of... proximity? All threads in the warp acting on proximal locations in memory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old way: we access rows 0 of all columns 0, 1, 2, etc then we access rows 1 of all columns 0, 1, 2, etc and so on. Each row access will pull data from different columns from different locations in memory.
In the new way: we access rows 0, 1, 2, etc of column 0, then rows 0, 1, 2, etc of column 1 and so on. So the data is pulled from contiguous memory locations.

thrust::reduce_by_key(rmm::exec_policy(stream), key_iter, key_iter + num_columns * num_rows,
data_iter, thrust::make_discard_iterator(), d_row_offsets.begin());

thrust::transform(
rmm::exec_policy(stream), d_row_offsets.begin(), d_row_offsets.end(), d_row_offsets.begin(),
[fixed_width_and_validity_size] __device__(auto row_size) {
return util::round_up_unsafe(fixed_width_and_validity_size + row_size, JCUDF_ROW_ALIGNMENT);
});
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

return d_row_offsets;
}

struct string_row_offset_functor {
string_row_offset_functor(device_span<size_type> d_row_offsets) : _d_row_offsets(d_row_offsets){};
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

__device__ inline size_type operator()(int row_number, int tile_row_start) const {
return _d_row_offsets[row_number];
}
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

device_span<size_type> _d_row_offsets;
};

struct row_offset_functor {
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
row_offset_functor(size_type fixed_width_only_row_size)
: _fixed_width_only_row_size(fixed_width_only_row_size){};
Expand Down Expand Up @@ -542,6 +600,10 @@ __global__ void copy_to_rows(const size_type num_rows, const size_type num_colum
auto const relative_col = el / num_fetch_rows;
auto const relative_row = el % num_fetch_rows;
auto const absolute_col = relative_col + fetch_tile.start_col;
if (input_data[absolute_col] == nullptr) {
// variable-width data
continue;
}
auto const absolute_row = relative_row + fetch_tile.start_row;
auto const col_size = col_sizes[absolute_col];
auto const col_offset = col_offsets[absolute_col];
Expand Down Expand Up @@ -1194,10 +1256,8 @@ static size_type compute_column_information(iterator begin, iterator end,
auto validity_offset = fixed_width_size_per_row;
column_starts.push_back(validity_offset);

return util::round_up_unsafe(
fixed_width_size_per_row +
util::div_rounding_up_safe(static_cast<size_type>(std::distance(begin, end)), CHAR_BIT),
JCUDF_ROW_ALIGNMENT);
return fixed_width_size_per_row +
util::div_rounding_up_safe(static_cast<size_type>(std::distance(begin, end)), CHAR_BIT);
}

/**
Expand Down Expand Up @@ -1512,20 +1572,13 @@ void determine_tiles(std::vector<size_type> const &column_sizes,
}
}

#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700

} // namespace detail

std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr) {
#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
auto const num_columns = tbl.num_columns();
auto const num_rows = tbl.num_rows();

auto const fixed_width_only = std::all_of(
tbl.begin(), tbl.end(), [](column_view const &c) { return is_fixed_width(c.type()); });

template <typename RowSizeIter>
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
std::vector<std::unique_ptr<column>>
convert_to_rows(table_view const &tbl, RowSizeIter row_sizes,
std::vector<size_type> const &column_starts,
std::vector<size_type> const &column_sizes, bool fixed_width_only,
size_type const fixed_width_size_per_row, rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr) {
int device_id;
CUDA_TRY(cudaGetDevice(&device_id));
int total_shmem_in_bytes;
Expand All @@ -1537,20 +1590,11 @@ std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
sizeof(cuda::barrier<cuda::thread_scope_block>) * NUM_TILES_PER_KERNEL_LOADED;
auto const shmem_limit_per_tile = total_shmem_in_bytes / NUM_TILES_PER_KERNEL_LOADED;

// break up the work into tiles, which are a starting and ending row/col #.
// this tile size is calculated based on the shared memory size available
// we want a single tile to fill up the entire shared memory space available
// for the transpose-like conversion.

// There are two different processes going on here. The GPU conversion of the data
// and the writing of the data into the list of byte columns that are a maximum of
// 2 gigs each due to offset maximum size. The GPU conversion portion has to understand
// this limitation because the column must own the data inside and as a result it must be
// a distinct allocation for that column. Copying the data into these final buffers would
// be prohibitively expensive, so care is taken to ensure the GPU writes to the proper buffer.
// The tiles are broken at the boundaries of specific rows based on the row sizes up
// to that point. These are row batches and they are decided first before building the
// tiles so the tiles can be properly cut around them.
auto const num_rows = tbl.num_rows();
auto const num_columns = tbl.num_columns();
auto dev_col_sizes = make_device_uvector_async(column_sizes, stream, mr);
auto dev_col_starts = make_device_uvector_async(column_starts, stream, mr);
auto batch_info = detail::build_batches(num_rows, row_sizes, fixed_width_only, stream, mr);

// Get the pointers to the input columnar data ready

Expand All @@ -1565,27 +1609,6 @@ std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
auto dev_input_data = make_device_uvector_async(input_data, stream, mr);
auto dev_input_nm = make_device_uvector_async(input_nm, stream, mr);

std::vector<size_type> column_sizes; // byte size of each column
std::vector<size_type> column_starts; // offset of column inside a row including alignment
column_sizes.reserve(num_columns);
column_starts.reserve(num_columns + 1); // we add a final offset for validity data start

auto schema_column_iter =
thrust::make_transform_iterator(thrust::make_counting_iterator(0),
[&tbl](auto i) -> std::tuple<data_type, column_view const> {
return {tbl.column(i).type(), tbl.column(i)};
});

auto const fixed_width_size_per_row = detail::compute_column_information(
schema_column_iter, schema_column_iter + num_columns, column_starts, column_sizes);

auto dev_col_sizes = make_device_uvector_async(column_sizes, stream, mr);
auto dev_col_starts = make_device_uvector_async(column_starts, stream, mr);

// total encoded row size. This includes fixed-width data, validity, and variable-width data.
auto row_size_iter = thrust::make_constant_iterator<uint64_t>(fixed_width_size_per_row);
auto batch_info = detail::build_batches(num_rows, row_size_iter, fixed_width_only, stream, mr);

// the first batch always exists unless we were sent an empty table
auto const first_batch_size = batch_info.row_batches[0].row_count;

Expand Down Expand Up @@ -1636,19 +1659,36 @@ std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
util::div_rounding_up_unsafe(validity_tile_infos.size(), NUM_VALIDITY_TILES_PER_KERNEL));
dim3 validity_threads(std::min(validity_tile_infos.size() * 32, 128lu));

detail::row_offset_functor offset_functor(fixed_width_size_per_row);
if (fixed_width_only) {
detail::row_offset_functor offset_functor(
util::round_up_unsafe(fixed_width_size_per_row, JCUDF_ROW_ALIGNMENT));

detail::copy_to_rows<<<blocks, threads, total_shmem_in_bytes, stream.value()>>>(
num_rows, num_columns, shmem_limit_per_tile, gpu_tile_infos, dev_input_data.data(),
dev_col_sizes.data(), dev_col_starts.data(), offset_functor,
batch_info.d_batch_row_boundaries.data(),
reinterpret_cast<int8_t **>(dev_output_data.data()));
detail::copy_to_rows<<<blocks, threads, total_shmem_in_bytes, stream.value()>>>(
num_rows, num_columns, shmem_limit_per_tile, gpu_tile_infos, dev_input_data.data(),
dev_col_sizes.data(), dev_col_starts.data(), offset_functor,
batch_info.d_batch_row_boundaries.data(),
reinterpret_cast<int8_t **>(dev_output_data.data()));

detail::copy_validity_to_rows<<<validity_blocks, validity_threads, total_shmem_in_bytes,
stream.value()>>>(
num_rows, num_columns, shmem_limit_per_tile, offset_functor,
batch_info.d_batch_row_boundaries.data(), dev_output_data.data(), column_starts.back(),
dev_validity_tile_infos, dev_input_nm.data());
detail::copy_validity_to_rows<<<validity_blocks, validity_threads, total_shmem_in_bytes,
stream.value()>>>(
num_rows, num_columns, shmem_limit_per_tile, offset_functor,
batch_info.d_batch_row_boundaries.data(), dev_output_data.data(), column_starts.back(),
dev_validity_tile_infos, dev_input_nm.data());
} else {
detail::string_row_offset_functor offset_functor(batch_info.batch_row_offsets);
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

detail::copy_to_rows<<<blocks, threads, total_shmem_in_bytes, stream.value()>>>(
num_rows, num_columns, shmem_limit_per_tile, gpu_tile_infos, dev_input_data.data(),
dev_col_sizes.data(), dev_col_starts.data(), offset_functor,
batch_info.d_batch_row_boundaries.data(),
reinterpret_cast<int8_t **>(dev_output_data.data()));

detail::copy_validity_to_rows<<<validity_blocks, validity_threads, total_shmem_in_bytes,
stream.value()>>>(
num_rows, num_columns, shmem_limit_per_tile, offset_functor,
batch_info.d_batch_row_boundaries.data(), dev_output_data.data(), column_starts.back(),
dev_validity_tile_infos, dev_input_nm.data());
}

// split up the output buffer into multiple buffers based on row batch sizes
// and create list of byte columns
Expand All @@ -1670,6 +1710,67 @@ std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
});

return ret;
}
#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700

} // namespace detail

std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr) {
#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700
auto const num_columns = tbl.num_columns();
auto const num_rows = tbl.num_rows();

auto const fixed_width_only = std::all_of(
tbl.begin(), tbl.end(), [](column_view const &c) { return is_fixed_width(c.type()); });

// break up the work into tiles, which are a starting and ending row/col #.
// this tile size is calculated based on the shared memory size available
// we want a single tile to fill up the entire shared memory space available
// for the transpose-like conversion.

// There are two different processes going on here. The GPU conversion of the data
// and the writing of the data into the list of byte columns that are a maximum of
// 2 gigs each due to offset maximum size. The GPU conversion portion has to understand
// this limitation because the column must own the data inside and as a result it must be
// a distinct allocation for that column. Copying the data into these final buffers would
// be prohibitively expensive, so care is taken to ensure the GPU writes to the proper buffer.
// The tiles are broken at the boundaries of specific rows based on the row sizes up
// to that point. These are row batches and they are decided first before building the
// tiles so the tiles can be properly cut around them.

std::vector<size_type> column_sizes; // byte size of each column
std::vector<size_type> column_starts; // offset of column inside a row including alignment
column_sizes.reserve(num_columns);
column_starts.reserve(num_columns + 1); // we add a final offset for validity data start

auto schema_column_iter =
thrust::make_transform_iterator(thrust::make_counting_iterator(0),
[&tbl](auto i) -> std::tuple<data_type, column_view const> {
return {tbl.column(i).type(), tbl.column(i)};
});

auto const fixed_width_size_per_row = detail::compute_column_information(
schema_column_iter, schema_column_iter + num_columns, column_starts, column_sizes);
if (fixed_width_only) {
// total encoded row size. This includes fixed-width data and validity only. It does not include
// variable-width data since it isn't copied with the fixed-width and validity kernel.
auto row_size_iter = thrust::make_constant_iterator<uint64_t>(
util::round_up_unsafe(fixed_width_size_per_row, JCUDF_ROW_ALIGNMENT));

return detail::convert_to_rows(tbl, row_size_iter, column_starts, column_sizes,
fixed_width_only, fixed_width_size_per_row, stream, mr);
} else {
auto row_sizes = detail::build_string_row_offsets(tbl, fixed_width_size_per_row, stream);

auto row_size_iter = cudf::detail::make_counting_transform_iterator(
0, detail::row_size_functor(num_rows, row_sizes.data(), 0));

return detail::convert_to_rows(tbl, row_size_iter, column_starts, column_sizes,
fixed_width_only, fixed_width_size_per_row, stream, mr);
}

#else
CUDF_FAIL("Column to row conversion optimization requires volta or later hardware.");
return {};
Expand Down Expand Up @@ -1768,8 +1869,9 @@ std::unique_ptr<table> convert_from_rows(lists_column_view const &input,
auto iter = thrust::make_transform_iterator(thrust::make_counting_iterator(0), [&schema](auto i) {
return std::make_tuple(schema[i], nullptr);
});
auto const fixed_width_size_per_row =
detail::compute_column_information(iter, iter + num_columns, column_starts, column_sizes);
auto const fixed_width_size_per_row = util::round_up_unsafe(
detail::compute_column_information(iter, iter + num_columns, column_starts, column_sizes),
JCUDF_ROW_ALIGNMENT);

// Ideally we would check that the offsets are all the same, etc. but for now
// this is probably fine
Expand Down