Skip to content

Commit

Permalink
Generalize the API and ORC changes by @vuule
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 committed Sep 25, 2024
1 parent cab885d commit b15e3d3
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 52 deletions.
44 changes: 14 additions & 30 deletions cpp/include/cudf/io/detail/batched_memcpy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,32 @@ namespace CUDF_EXPORT cudf {
namespace io::detail {

/**
* @brief A helper function that copies a vector of host scalar data to the corresponding device
* addresses in a batched manner.
* @brief A helper function that copies a vector of vectors from source to destination addresses in
* a batched manner.
*
* @tparam SrcIterator The type of the source address iterator
* @tparam DstIterator The type of the destination address iterator
* @tparam Sizeiterator The type of the buffer size iterator
*
* @param[in] src_data A vector of host scalar data
* @param[in] dst_addrs A vector of device destination addresses
* @param[in] mr Device memory resource to allocate temporary memory
* @param[in] src_iter Iterator to source addresses
* @param[in] dst_iter Iterator to destination addresses
* @param[in] size_iter Iterator to the vector sizes (in bytes)
* @param[in] stream CUDA stream to use
*/
template <typename T>
void batched_memcpy(std::vector<T> const& src_data,
std::vector<T*> const& dst_addrs,
rmm::device_async_resource_ref mr,
template <typename SrcIterator, typename DstIterator, typename Sizeiterator>
void batched_memcpy(SrcIterator src_iter,
DstIterator dst_iter,
Sizeiterator size_iter,
size_t num_elems,
rmm::cuda_stream_view stream)
{
// Number of elements to copy
auto const num_elems = src_data.size();

// Copy src data to device and create an iterator
auto d_src_data = cudf::detail::make_device_uvector_async(src_data, stream, mr);
auto src_iter = cudf::detail::make_counting_transform_iterator(
static_cast<std::size_t>(0),
cuda::proclaim_return_type<T*>(
[src = d_src_data.data()] __device__(std::size_t i) { return src + i; }));

// Copy dst addresses to device and create an iterator
auto d_dst_addrs = cudf::detail::make_device_uvector_async(dst_addrs, stream, mr);
auto dst_iter = cudf::detail::make_counting_transform_iterator(
static_cast<std::size_t>(0),
cuda::proclaim_return_type<T*>(
[dst = d_dst_addrs.data()] __device__(std::size_t i) { return dst[i]; }));

// Scalar src data so size_iter is simply a constant iterator.
auto size_iter = thrust::make_constant_iterator(sizeof(T));

// Get temp storage needed for cub::DeviceMemcpy::Batched
size_t temp_storage_bytes = 0;
cub::DeviceMemcpy::Batched(
nullptr, temp_storage_bytes, src_iter, dst_iter, size_iter, num_elems, stream.value());

// Allocate temporary storage
auto d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream.value(), mr};
rmm::device_buffer d_temp_storage{temp_storage_bytes, stream.value()};

// Run cub::DeviceMemcpy::Batched
cub::DeviceMemcpy::Batched(d_temp_storage.data(),
Expand Down
66 changes: 45 additions & 21 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/logger.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/io/detail/batched_memcpy.hpp>
#include <cudf/io/orc_types.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/utilities/bit.hpp>
Expand Down Expand Up @@ -1087,37 +1088,42 @@ CUDF_KERNEL void __launch_bounds__(block_size)
/**
* @brief Merge chunked column data into a single contiguous stream
*
* @param[in,out] strm_desc StripeStream device array [stripe][stream]
* @param[in,out] streams List of encoder chunk streams [column][rowgroup]
* @param[in] strm_desc StripeStream device array [stripe][stream]
* @param[in] streams List of encoder chunk streams [column][rowgroup]
* @param[out] srcs List of source encoder chunk stream data addresses
* @param[out] dsts List of destination StripeStream data addresses
* @param[out] sizes List of stream sizes in bytes
*/
// blockDim {compact_streams_block_size,1,1}
CUDF_KERNEL void __launch_bounds__(compact_streams_block_size)
gpuCompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> streams)
gpuInitBatchedMemcpy(device_2dspan<StripeStream const> strm_desc,
device_2dspan<encoder_chunk_streams> streams,
device_span<uint8_t*> srcs,
device_span<uint8_t*> dsts,
device_span<size_t> sizes)
{
__shared__ __align__(16) StripeStream ss;

auto const stripe_id = blockIdx.x;
auto const stripe_id = blockIdx.x * compact_streams_block_size + threadIdx.x;
auto const stream_id = blockIdx.y;
auto const t = threadIdx.x;
if (stripe_id >= strm_desc.size().first) { return; }

if (t == 0) { ss = strm_desc[stripe_id][stream_id]; }
__syncthreads();
auto const out_id = stream_id * strm_desc.size().first + stripe_id;
StripeStream ss = strm_desc[stripe_id][stream_id];

if (ss.data_ptr == nullptr) { return; }

auto const cid = ss.stream_type;
auto dst_ptr = ss.data_ptr;
for (auto group = ss.first_chunk_id; group < ss.first_chunk_id + ss.num_chunks; ++group) {
auto const out_id = stream_id * streams.size().second + group;
srcs[out_id] = streams[ss.column_id][group].data_ptrs[cid];
dsts[out_id] = dst_ptr;

// Also update the stream here, data will be copied in a separate kernel
streams[ss.column_id][group].data_ptrs[cid] = dst_ptr;

auto const len = streams[ss.column_id][group].lengths[cid];
if (len > 0) {
auto const src_ptr = streams[ss.column_id][group].data_ptrs[cid];
for (uint32_t i = t; i < len; i += blockDim.x) {
dst_ptr[i] = src_ptr[i];
}
__syncthreads();
}
if (t == 0) { streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; }
// Multiplying by sizeof(uint8_t) = 1 is redundant here.
sizes[out_id] = len;
dst_ptr += len;
}
}
Expand Down Expand Up @@ -1326,8 +1332,26 @@ void CompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
rmm::cuda_stream_view stream)
{
dim3 dim_block(compact_streams_block_size, 1);
dim3 dim_grid(strm_desc.size().first, strm_desc.size().second);
gpuCompactOrcDataStreams<<<dim_grid, dim_block, 0, stream.value()>>>(strm_desc, enc_streams);

auto const num_rowgroups = enc_streams.size().second;
auto const num_streams = strm_desc.size().second;
auto const num_stripes = strm_desc.size().first;
auto const num_chunks = num_rowgroups * num_streams;
auto srcs = cudf::detail::make_zeroed_device_uvector_sync<uint8_t*>(
num_chunks, stream, rmm::mr::get_current_device_resource());
auto dsts = cudf::detail::make_zeroed_device_uvector_sync<uint8_t*>(
num_chunks, stream, rmm::mr::get_current_device_resource());
auto lengths = cudf::detail::make_zeroed_device_uvector_sync<size_t>(
num_chunks, stream, rmm::mr::get_current_device_resource());

dim3 dim_grid_alt(cudf::util::div_rounding_up_unsafe(num_stripes, compact_streams_block_size),
strm_desc.size().second);
gpuInitBatchedMemcpy<<<dim_grid_alt, dim_block, 0, stream.value()>>>(
strm_desc, enc_streams, srcs, dsts, lengths);

// Copy streams in a batched manner.
cudf::io::detail::batched_memcpy(
srcs.data(), dsts.data(), lengths.data(), lengths.size(), stream);
}

std::optional<writer_compression_statistics> CompressOrcDataStreams(
Expand Down Expand Up @@ -1438,4 +1462,4 @@ void decimal_sizes_to_offsets(device_2dspan<rowgroup_rows const> rg_bounds,
} // namespace gpu
} // namespace orc
} // namespace io
} // namespace cudf
} // namespace cudf
20 changes: 19 additions & 1 deletion cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,25 @@ void __host__ WriteFinalOffsetsBatched(std::vector<size_type> const& offsets,
rmm::device_async_resource_ref mr,
rmm::cuda_stream_view stream)
{
return cudf::io::detail::batched_memcpy(offsets, buff_addrs, mr, stream);
// Copy offsets to device and create an iterator
auto d_src_data = cudf::detail::make_device_uvector_async(offsets, stream, mr);
auto src_iter = cudf::detail::make_counting_transform_iterator(
static_cast<std::size_t>(0),
cuda::proclaim_return_type<size_type*>(
[src = d_src_data.data()] __device__(std::size_t i) { return src + i; }));

// Copy buffer addresses to device and create an iterator
auto d_dst_addrs = cudf::detail::make_device_uvector_async(buff_addrs, stream, mr);
auto dst_iter = cudf::detail::make_counting_transform_iterator(
static_cast<std::size_t>(0),
cuda::proclaim_return_type<size_type*>(
[dst = d_dst_addrs.data()] __device__(std::size_t i) { return dst[i]; }));

// size_iter is simply a constant iterator of sizeof(size_type) bytes.
auto size_iter = thrust::make_constant_iterator(sizeof(size_type));

// Copy offsets to buffers in batched manner.
cudf::io::detail::batched_memcpy(src_iter, dst_iter, size_iter, offsets.size(), stream);
}

} // namespace cudf::io::parquet::detail

0 comments on commit b15e3d3

Please sign in to comment.