From b15e3d3096195e4a6d2ab03bbae5116b1caa2e56 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Wed, 25 Sep 2024 20:22:53 +0000 Subject: [PATCH] Generalize the API and ORC changes by @vuule --- cpp/include/cudf/io/detail/batched_memcpy.hpp | 44 ++++--------- cpp/src/io/orc/stripe_enc.cu | 66 +++++++++++++------ cpp/src/io/parquet/page_data.cu | 20 +++++- 3 files changed, 78 insertions(+), 52 deletions(-) diff --git a/cpp/include/cudf/io/detail/batched_memcpy.hpp b/cpp/include/cudf/io/detail/batched_memcpy.hpp index 318f833a77c..abbc096ce39 100644 --- a/cpp/include/cudf/io/detail/batched_memcpy.hpp +++ b/cpp/include/cudf/io/detail/batched_memcpy.hpp @@ -28,48 +28,32 @@ namespace CUDF_EXPORT cudf { namespace io::detail { /** - * @brief A helper function that copies a vector of host scalar data to the corresponding device - * addresses in a batched manner. + * @brief A helper function that copies a vector of vectors from source to destination addresses in + * a batched manner. * + * @tparam SrcIterator The type of the source address iterator + * @tparam DstIterator The type of the destination address iterator + * @tparam Sizeiterator The type of the buffer size iterator * - * @param[in] src_data A vector of host scalar data - * @param[in] dst_addrs A vector of device destination addresses - * @param[in] mr Device memory resource to allocate temporary memory + * @param[in] src_iter Iterator to source addresses + * @param[in] dst_iter Iterator to destination addresses + * @param[in] size_iter Iterator to the vector sizes (in bytes) * @param[in] stream CUDA stream to use */ -template -void batched_memcpy(std::vector const& src_data, - std::vector const& dst_addrs, - rmm::device_async_resource_ref mr, +template +void batched_memcpy(SrcIterator src_iter, + DstIterator dst_iter, + Sizeiterator size_iter, + size_t num_elems, rmm::cuda_stream_view stream) { - // Number of elements to copy - auto const num_elems = src_data.size(); - - // Copy src data to device and create an iterator - auto d_src_data = cudf::detail::make_device_uvector_async(src_data, stream, mr); - auto src_iter = cudf::detail::make_counting_transform_iterator( - static_cast(0), - cuda::proclaim_return_type( - [src = d_src_data.data()] __device__(std::size_t i) { return src + i; })); - - // Copy dst addresses to device and create an iterator - auto d_dst_addrs = cudf::detail::make_device_uvector_async(dst_addrs, stream, mr); - auto dst_iter = cudf::detail::make_counting_transform_iterator( - static_cast(0), - cuda::proclaim_return_type( - [dst = d_dst_addrs.data()] __device__(std::size_t i) { return dst[i]; })); - - // Scalar src data so size_iter is simply a constant iterator. - auto size_iter = thrust::make_constant_iterator(sizeof(T)); - // Get temp storage needed for cub::DeviceMemcpy::Batched size_t temp_storage_bytes = 0; cub::DeviceMemcpy::Batched( nullptr, temp_storage_bytes, src_iter, dst_iter, size_iter, num_elems, stream.value()); // Allocate temporary storage - auto d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream.value(), mr}; + rmm::device_buffer d_temp_storage{temp_storage_bytes, stream.value()}; // Run cub::DeviceMemcpy::Batched cub::DeviceMemcpy::Batched(d_temp_storage.data(), diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 5c70e35fd2e..b5d86e7197c 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -1087,37 +1088,42 @@ CUDF_KERNEL void __launch_bounds__(block_size) /** * @brief Merge chunked column data into a single contiguous stream * - * @param[in,out] strm_desc StripeStream device array [stripe][stream] - * @param[in,out] streams List of encoder chunk streams [column][rowgroup] + * @param[in] strm_desc StripeStream device array [stripe][stream] + * @param[in] streams List of encoder chunk streams [column][rowgroup] + * @param[out] srcs List of source encoder chunk stream data addresses + * @param[out] dsts List of destination StripeStream data addresses + * @param[out] sizes List of stream sizes in bytes */ // blockDim {compact_streams_block_size,1,1} CUDF_KERNEL void __launch_bounds__(compact_streams_block_size) - gpuCompactOrcDataStreams(device_2dspan strm_desc, - device_2dspan streams) + gpuInitBatchedMemcpy(device_2dspan strm_desc, + device_2dspan streams, + device_span srcs, + device_span dsts, + device_span sizes) { - __shared__ __align__(16) StripeStream ss; - - auto const stripe_id = blockIdx.x; + auto const stripe_id = blockIdx.x * compact_streams_block_size + threadIdx.x; auto const stream_id = blockIdx.y; - auto const t = threadIdx.x; + if (stripe_id >= strm_desc.size().first) { return; } - if (t == 0) { ss = strm_desc[stripe_id][stream_id]; } - __syncthreads(); + auto const out_id = stream_id * strm_desc.size().first + stripe_id; + StripeStream ss = strm_desc[stripe_id][stream_id]; if (ss.data_ptr == nullptr) { return; } auto const cid = ss.stream_type; auto dst_ptr = ss.data_ptr; for (auto group = ss.first_chunk_id; group < ss.first_chunk_id + ss.num_chunks; ++group) { + auto const out_id = stream_id * streams.size().second + group; + srcs[out_id] = streams[ss.column_id][group].data_ptrs[cid]; + dsts[out_id] = dst_ptr; + + // Also update the stream here, data will be copied in a separate kernel + streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; + auto const len = streams[ss.column_id][group].lengths[cid]; - if (len > 0) { - auto const src_ptr = streams[ss.column_id][group].data_ptrs[cid]; - for (uint32_t i = t; i < len; i += blockDim.x) { - dst_ptr[i] = src_ptr[i]; - } - __syncthreads(); - } - if (t == 0) { streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; } + // Multiplying by sizeof(uint8_t) = 1 is redundant here. + sizes[out_id] = len; dst_ptr += len; } } @@ -1326,8 +1332,26 @@ void CompactOrcDataStreams(device_2dspan strm_desc, rmm::cuda_stream_view stream) { dim3 dim_block(compact_streams_block_size, 1); - dim3 dim_grid(strm_desc.size().first, strm_desc.size().second); - gpuCompactOrcDataStreams<<>>(strm_desc, enc_streams); + + auto const num_rowgroups = enc_streams.size().second; + auto const num_streams = strm_desc.size().second; + auto const num_stripes = strm_desc.size().first; + auto const num_chunks = num_rowgroups * num_streams; + auto srcs = cudf::detail::make_zeroed_device_uvector_sync( + num_chunks, stream, rmm::mr::get_current_device_resource()); + auto dsts = cudf::detail::make_zeroed_device_uvector_sync( + num_chunks, stream, rmm::mr::get_current_device_resource()); + auto lengths = cudf::detail::make_zeroed_device_uvector_sync( + num_chunks, stream, rmm::mr::get_current_device_resource()); + + dim3 dim_grid_alt(cudf::util::div_rounding_up_unsafe(num_stripes, compact_streams_block_size), + strm_desc.size().second); + gpuInitBatchedMemcpy<<>>( + strm_desc, enc_streams, srcs, dsts, lengths); + + // Copy streams in a batched manner. + cudf::io::detail::batched_memcpy( + srcs.data(), dsts.data(), lengths.data(), lengths.size(), stream); } std::optional CompressOrcDataStreams( @@ -1438,4 +1462,4 @@ void decimal_sizes_to_offsets(device_2dspan rg_bounds, } // namespace gpu } // namespace orc } // namespace io -} // namespace cudf +} // namespace cudf \ No newline at end of file diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 69d58bab05d..fd7849c1327 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -476,7 +476,25 @@ void __host__ WriteFinalOffsetsBatched(std::vector const& offsets, rmm::device_async_resource_ref mr, rmm::cuda_stream_view stream) { - return cudf::io::detail::batched_memcpy(offsets, buff_addrs, mr, stream); + // Copy offsets to device and create an iterator + auto d_src_data = cudf::detail::make_device_uvector_async(offsets, stream, mr); + auto src_iter = cudf::detail::make_counting_transform_iterator( + static_cast(0), + cuda::proclaim_return_type( + [src = d_src_data.data()] __device__(std::size_t i) { return src + i; })); + + // Copy buffer addresses to device and create an iterator + auto d_dst_addrs = cudf::detail::make_device_uvector_async(buff_addrs, stream, mr); + auto dst_iter = cudf::detail::make_counting_transform_iterator( + static_cast(0), + cuda::proclaim_return_type( + [dst = d_dst_addrs.data()] __device__(std::size_t i) { return dst[i]; })); + + // size_iter is simply a constant iterator of sizeof(size_type) bytes. + auto size_iter = thrust::make_constant_iterator(sizeof(size_type)); + + // Copy offsets to buffers in batched manner. + cudf::io::detail::batched_memcpy(src_iter, dst_iter, size_iter, offsets.size(), stream); } } // namespace cudf::io::parquet::detail