diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index fb416216783..fbcc1781f3a 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -132,15 +132,30 @@ struct orcdec_state_s { } vals; }; +/** + * @brief Cache the first run of the DATA stream for a row group. + * + * This class is used to address a special case, where the first run spans two adjacent row groups + * and its length is greater than the maximum length allowed to be consumed (which limit is imposed + * by the decoder when processing the SECONDARY stream). This class shall be instantiated in the + * shared memory. + */ class run_cache { private: enum class status : uint8_t { - DISABLED, - CAN_WRITE_TO_CACHE, - CAN_READ_FROM_CACHE, + DISABLED, ///< Run cache is disabled. + CAN_WRITE_TO_CACHE, ///< Run cache is ready for write. This is expected to happen in the first + ///< iteration of the top-level while-loop in gpuDecodeOrcColumnData(). + CAN_READ_FROM_CACHE, ///< Run cache is ready for read. This is expected to happen in the second + ///< iteration of the while-loop. }; public: + /** + * @brief Initialize the run cache. + * + * @param s ORC decoder state. + */ __forceinline__ __device__ void initialize(orcdec_state_s* s) { _status = (s->top.data.index.run_pos[CI_DATA2] > 0 and s->chunk.type_kind == TIMESTAMP) @@ -150,6 +165,14 @@ class run_cache { _run_length = 0; } + /** + * @brief Set the reusable length object. + * + * @param run_length The length of the first run (spanning two adjacent row groups) of the DATA + * stream. + * @param max_length The maximum length allowed to be consumed. This limit is imposed + * by the decoder when processing the SECONDARY stream. + */ __forceinline__ __device__ void set_reusable_length(uint32_t run_length, uint32_t max_length) { if (_status == status::CAN_WRITE_TO_CACHE) { @@ -158,6 +181,12 @@ class run_cache { } } + /** + * @brief Adjust the maximum length allowed to be consumed when the length of the first run is + * greater than it. + * + * @param max_length The maximum length allowed to be consumed. + */ __forceinline__ __device__ uint32_t adjust_max_length(uint32_t max_length) { auto new_max_length{max_length}; @@ -167,6 +196,11 @@ class run_cache { return new_max_length; } + /** + * @brief Copy the excess data from the intermediate buffer for the DATA stream to the cache. + * + * @param src Intermediate buffer for the DATA stream. + */ __forceinline__ __device__ void write_to_cache(int64_t* src) { const auto tid = threadIdx.x; @@ -188,6 +222,12 @@ class run_cache { } } + /** + * @brief Copy the cached data to the intermediate buffer for the DATA stream. + * + * @param dst Intermediate buffer for the DATA stream. + * @param rle Run length decoder state object. + */ __forceinline__ __device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle) { const auto tid = threadIdx.x; @@ -215,10 +255,14 @@ class run_cache { } private: - status _status; - uint32_t _reusable_length; - uint32_t _run_length; - int64_t _buf[bytestream_buffer_size >> 4]; + status _status; ///< The status of the run cache. + uint32_t _reusable_length; ///< The number of data to be cached in the run cache and reused + ///< later. For example, if a run has a length of 512 but the maximum + ///< length allowed to be consumed is capped at 162, then 350 + ///< (512-162) data will be cached. + uint32_t _run_length; ///< The length of the run, 512 in the above example. + int64_t _buf[bytestream_buffer_size >> + 4]; ///< The cache buffer. In ORC, the maximum length of a run is 512. }; /** @@ -720,6 +764,8 @@ static const __device__ __constant__ uint8_t ClosestFixedBitsMap[65] = { * @param[in] maxvals maximum number of values to decode * @param[in] t thread id * @param[in] has_buffered_values If true, means there are already buffered values + * @param[in] run_cache_bs If non-null, the run cache will be used to cache the first run of the + * DATA stream. * * @return number of values decoded */ @@ -962,7 +1008,10 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, __syncthreads(); if constexpr (cuda::std::is_same_v) { if (run_cache_bs != nullptr) { + // Run cache is read from during the 2nd iteration of the top-level while-loop in + // gpuDecodeOrcColumnData(). run_cache_bs->read_from_cache(vals, rle); + // Run cache is written to during the 1st iteration of the loop. run_cache_bs->write_to_cache(vals); } }