From cd0d27290128a659440b9b19beacc9d3c372ebaa Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 20 Dec 2024 23:22:42 -0500 Subject: [PATCH] Simplify implementation at the sacrifice of encapsulation --- cpp/src/io/orc/stripe_data.cu | 142 ++++++++++++++-------------------- 1 file changed, 57 insertions(+), 85 deletions(-) diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 3fe3ae75599..0f5bf938bd4 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -170,6 +170,38 @@ class run_cache_manager { _run_length = 0; } + private: + status _status; ///< The status of the run cache manager. + uint32_t + _reusable_length; ///< The number of data to be cached and reused later. For example, if a run + ///< has a length of 512 but the maximum length allowed to be consumed is + ///< capped at 162, then 350 (512-162) data will be cached. + uint32_t _run_length; ///< The length of the run, 512 in the above example. + friend class cache_helper; +}; + +/** + * @brief Helper class to help run_cache_manager cache the first run of TIMESTAMP's DATA stream for + * a row group. + * + * The run_cache_manager is intended to be stored in the shared memory, whereas the actual cache in + * the local storage as an optimization. If a function is to use run_cache_manager, both the manager + * and the cache objects need to be passed. This class is introduced to simplify the function call, + * so that only a single cache_helper object needs to be passed. To that end, public methods + * originally belonging to run_cache_manager have been moved to this class. + */ +class cache_helper { + public: + /** + * @brief Constructor. + * + * @param[in] run_cache_manager_inst An instance of run_cache_manager. + */ + __device__ explicit cache_helper(run_cache_manager& run_cache_manager_inst) + : _manager(run_cache_manager_inst) + { + } + /** * @brief Set the reusable length object. * @@ -180,9 +212,10 @@ class run_cache_manager { */ __device__ void set_reusable_length(uint32_t run_length, uint32_t max_length) { - if (_status == status::CAN_WRITE_TO_CACHE) { - _run_length = run_length; - _reusable_length = (_run_length > max_length) ? (_run_length - max_length) : 0; + if (_manager._status == run_cache_manager::status::CAN_WRITE_TO_CACHE) { + _manager._run_length = run_length; + _manager._reusable_length = + (_manager._run_length > max_length) ? (_manager._run_length - max_length) : 0; } } @@ -196,7 +229,9 @@ class run_cache_manager { [[nodiscard]] __device__ uint32_t adjust_max_length(uint32_t max_length) { auto new_max_length{max_length}; - if (_status == status::CAN_READ_FROM_CACHE) { new_max_length -= _reusable_length; } + if (_manager._status == run_cache_manager::status::CAN_READ_FROM_CACHE) { + new_max_length -= _manager._reusable_length; + } return new_max_length; } @@ -206,9 +241,9 @@ class run_cache_manager { * @param[in] src Intermediate buffer for the DATA stream. * @param[out] cache_storage Local variable serving as the cache for the DATA stream. */ - __device__ void write_to_cache(int64_t* src, int64_t& cache_storage) + __device__ void write_to_cache(int64_t* src) { - if (_status != status::CAN_WRITE_TO_CACHE) { return; } + if (_manager._status != run_cache_manager::status::CAN_WRITE_TO_CACHE) { return; } auto const tid = threadIdx.x; @@ -216,15 +251,15 @@ class run_cache_manager { // All threads in the block always take a uniform code path for the following branches. // _reusable_length ranges between [0, 512]. - if (_reusable_length > 0) { - auto const length_to_skip = _run_length - _reusable_length; - if (tid < _reusable_length) { + if (_manager._reusable_length > 0) { + auto const length_to_skip = _manager._run_length - _manager._reusable_length; + if (tid < _manager._reusable_length) { auto const src_idx = tid + length_to_skip; - cache_storage = src[src_idx]; + _storage = src[src_idx]; } - if (tid == 0) { _status = status::CAN_READ_FROM_CACHE; } + if (tid == 0) { _manager._status = run_cache_manager::status::CAN_READ_FROM_CACHE; } } else { - if (tid == 0) { _status = status::DISABLED; } + if (tid == 0) { _manager._status = run_cache_manager::status::DISABLED; } } __syncthreads(); @@ -235,101 +270,38 @@ class run_cache_manager { * * @param[in,out] dst Intermediate buffer for the DATA stream. * @param[in,out] rle Run length decoder state object. - * @param[in] cache_storage Local variable serving as the cache for the DATA stream. */ - __device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle, int64_t cache_storage) + __device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle) { - if (_status != status::CAN_READ_FROM_CACHE) { return; } + if (_manager._status != run_cache_manager::status::CAN_READ_FROM_CACHE) { return; } auto const tid = threadIdx.x; // First, shift the data up - auto const dst_idx = tid + _reusable_length; - auto const v = (dst_idx < rle->num_vals + _reusable_length) ? dst[tid] : 0; + auto const dst_idx = tid + _manager._reusable_length; + auto const v = (dst_idx < rle->num_vals + _manager._reusable_length) ? dst[tid] : 0; __syncthreads(); - if (dst_idx < rle->num_vals + _reusable_length) { dst[dst_idx] = v; } + if (dst_idx < rle->num_vals + _manager._reusable_length) { dst[dst_idx] = v; } __syncthreads(); // Second, insert the cached data - if (tid < _reusable_length) { dst[tid] = cache_storage; } + if (tid < _manager._reusable_length) { dst[tid] = _storage; } __syncthreads(); if (tid == 0) { // Disable the run cache manager, since cache write-and-read happens at most once per row // group. - _status = status::DISABLED; - rle->num_vals += _reusable_length; + _manager._status = run_cache_manager::status::DISABLED; + rle->num_vals += _manager._reusable_length; } __syncthreads(); } private: - status _status; ///< The status of the run cache manager. - uint32_t - _reusable_length; ///< The number of data to be cached and reused later. For example, if a run - ///< has a length of 512 but the maximum length allowed to be consumed is - ///< capped at 162, then 350 (512-162) data will be cached. - uint32_t _run_length; ///< The length of the run, 512 in the above example. -}; - -/** - * @brief Helper class to help run_cache_manager cache the first run of TIMESTAMP's DATA stream for - * a row group. - * - * The run_cache_manager is intended to be stored in the shared memory, whereas the actual cache in - * the local storage as an optimization. If a function is to use run_cache_manager, both the manager - * and the cache objects need to be passed. This class is introduced to simplify the function call, - * so that only a single cache_helper object needs to be passed. - */ -class cache_helper { - public: - /** - * @brief Constructor. - * - * @param[in] run_cache_manager_inst - */ - __device__ explicit cache_helper(run_cache_manager& run_cache_manager_inst) - : _run_cache_manager_inst(run_cache_manager_inst) - { - } - - /** - * @brief Wrapper of run_cache_manager's namesake function. - */ - __device__ void set_reusable_length(uint32_t run_length, uint32_t max_length) - { - _run_cache_manager_inst.set_reusable_length(run_length, max_length); - } - - /** - * @brief Wrapper of run_cache_manager's namesake function. - */ - __device__ void write_to_cache(int64_t* src) - { - _run_cache_manager_inst.write_to_cache(src, _storage); - } - - /** - * @brief Wrapper of run_cache_manager's namesake function. - */ - __device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle) - { - _run_cache_manager_inst.read_from_cache(dst, rle, _storage); - } - - /** - * @brief Wrapper of run_cache_manager's namesake function. - */ - [[nodiscard]] __device__ uint32_t adjust_max_length(uint32_t max_length) - { - return _run_cache_manager_inst.adjust_max_length(max_length); - } - - private: - run_cache_manager& _run_cache_manager_inst; - int64_t _storage; + run_cache_manager& _manager; ///< An instance of run_cache_manager. + int64_t _storage; ///< Per-thread cache storage. }; /**