From bfa6b0c944ba780c4b53b583e8a855e0980c1bdc Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 10 Dec 2024 15:05:48 -0500 Subject: [PATCH] Fix the orc decoding bug --- cpp/src/io/orc/stripe_data.cu | 111 +++++++++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 1572b7246c0..d8abc044119 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "cub/util_ptx.cuh" +#include "cuda/std/__type_traits/is_same.h" #include "io/utilities/block_utils.cuh" #include "orc_gpu.hpp" @@ -132,6 +134,95 @@ struct orcdec_state_s { } vals; }; +class run_cache { + private: + enum class status : uint8_t { + DISABLED, + CAN_WRITE_TO_CACHE, + CAN_READ_FROM_CACHE, + }; + + public: + __forceinline__ __device__ void initialize(orcdec_state_s* s) + { + _status = (s->top.data.index.run_pos[CI_DATA2] > 0 and s->chunk.type_kind == TIMESTAMP) + ? status::CAN_WRITE_TO_CACHE + : status::DISABLED; + _reusable_length = 0; + _run_length = 0; + } + + __forceinline__ __device__ void set_reusable_length(uint32_t run_length, uint32_t max_length) + { + if (_status == status::CAN_WRITE_TO_CACHE) { + _run_length = run_length; + _reusable_length = (_run_length > max_length) ? (_run_length - max_length) : 0; + } + } + + __forceinline__ __device__ uint32_t adjust_max_length(uint32_t max_length) + { + auto new_max_length{max_length}; + if (_status == status::CAN_READ_FROM_CACHE and _reusable_length > 0) { + new_max_length -= _reusable_length; + } + return new_max_length; + } + + __forceinline__ __device__ void write_to_cache(int64_t* src) + { + const auto tid = threadIdx.x; + + // All threads in the block take a uniform code path. + // _reusable_length ranges between [0, 512] + if (_status == status::CAN_WRITE_TO_CACHE and _reusable_length > 0) { + const auto length_to_skip = _run_length - _reusable_length; + if (tid < _reusable_length) { + const auto src_idx = tid + length_to_skip; + _buf[tid] = src[src_idx]; + } + // Block until all writes are done to safely change _status. + __syncthreads(); + if (tid == 0) { _status = status::CAN_READ_FROM_CACHE; } + } else { + __syncthreads(); + if (tid == 0) { _status = status::DISABLED; } + } + } + + __forceinline__ __device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle) + { + const auto tid = threadIdx.x; + + // All threads in the block take a uniform code path. + // _reusable_length ranges between [0, 512] + if (_status == status::CAN_READ_FROM_CACHE and _reusable_length > 0) { + // First, shift the data up + const auto dst_idx = tid + _reusable_length; + const auto v = (dst_idx < rle->num_vals + _reusable_length) ? dst[tid] : 0; + __syncthreads(); + + if (dst_idx < rle->num_vals + _reusable_length) { dst[dst_idx] = v; } + __syncthreads(); + + // Second, insert the cached data + if (tid < _reusable_length) { dst[tid] = _buf[tid]; } + __syncthreads(); + + if (tid == 0) { + _status = status::DISABLED; + rle->num_vals += _reusable_length; + } + } + } + + private: + status _status; + uint32_t _reusable_length; + uint32_t _run_length; + int64_t _buf[bytestream_buffer_size >> 4]; +}; + /** * @brief Initializes byte stream, modifying length and start position to keep the read pointer * 8-byte aligned. @@ -640,9 +731,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, T* vals, uint32_t maxvals, int t, - bool has_buffered_values = false) + bool has_buffered_values = false, + run_cache* run_cache_bs = nullptr) { if (t == 0) { + if (run_cache_bs != nullptr) { maxvals = run_cache_bs->adjust_max_length(maxvals); } uint32_t maxpos = min(bs->len, bs->pos + (bytestream_buffer_size - 8u)); uint32_t lastpos = bs->pos; auto numvals = 0; @@ -685,6 +778,9 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, l += deltapos; } } + + if (run_cache_bs != nullptr) { run_cache_bs->set_reusable_length(n, maxvals); } + if ((numvals != 0) and (numvals + n > maxvals)) break; // case where there are buffered values and can't consume a whole chunk // from decoded values, so skip adding any more to buffer, work on buffered values and then @@ -866,6 +962,13 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, __syncwarp(); } __syncthreads(); + if constexpr (cuda::std::is_same_v) { + if (run_cache_bs != nullptr) { + run_cache_bs->read_from_cache(vals, rle); + run_cache_bs->write_to_cache(vals); + } + } + __syncthreads(); return rle->num_vals; } @@ -1401,6 +1504,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) // Struct doesn't have any data in itself, so skip bool const is_valid = s->chunk.type_kind != STRUCT; size_t const max_num_rows = s->chunk.column_num_rows; + __shared__ run_cache run_cache_bs; if (t == 0 and is_valid) { // If we have an index, seek to the initial run and update row positions if (num_rowgroups > 0) { @@ -1443,6 +1547,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) bytestream_init(&s->bs, s->chunk.streams[CI_DATA], s->chunk.strm_len[CI_DATA]); bytestream_init(&s->bs2, s->chunk.streams[CI_DATA2], s->chunk.strm_len[CI_DATA2]); + + run_cache_bs.initialize(s); } __syncthreads(); @@ -1602,7 +1708,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (is_rlev1(s->chunk.encoding_kind)) { numvals = Integer_RLEv1(bs, &s->u.rlev1, s->vals.i64, numvals, t); } else { - numvals = Integer_RLEv2(bs, &s->u.rlev2, s->vals.i64, numvals, t); + numvals = + Integer_RLEv2(bs, &s->u.rlev2, s->vals.i64, numvals, t, false, &run_cache_bs); } if (s->chunk.type_kind == DECIMAL) { // If we're using an index, we may have to drop values from the initial run