Skip to content

Commit

Permalink
Simplify implementation at the sacrifice of encapsulation
Browse files Browse the repository at this point in the history
  • Loading branch information
kingcrimsontianyu committed Dec 21, 2024
1 parent 9d2cac0 commit cd0d272
Showing 1 changed file with 57 additions and 85 deletions.
142 changes: 57 additions & 85 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,38 @@ class run_cache_manager {
_run_length = 0;
}

private:
status _status; ///< The status of the run cache manager.
uint32_t
_reusable_length; ///< The number of data to be cached and reused later. For example, if a run
///< has a length of 512 but the maximum length allowed to be consumed is
///< capped at 162, then 350 (512-162) data will be cached.
uint32_t _run_length; ///< The length of the run, 512 in the above example.
friend class cache_helper;
};

/**
* @brief Helper class to help run_cache_manager cache the first run of TIMESTAMP's DATA stream for
* a row group.
*
* The run_cache_manager is intended to be stored in the shared memory, whereas the actual cache in
* the local storage as an optimization. If a function is to use run_cache_manager, both the manager
* and the cache objects need to be passed. This class is introduced to simplify the function call,
* so that only a single cache_helper object needs to be passed. To that end, public methods
* originally belonging to run_cache_manager have been moved to this class.
*/
class cache_helper {
public:
/**
* @brief Constructor.
*
* @param[in] run_cache_manager_inst An instance of run_cache_manager.
*/
__device__ explicit cache_helper(run_cache_manager& run_cache_manager_inst)
: _manager(run_cache_manager_inst)
{
}

/**
* @brief Set the reusable length object.
*
Expand All @@ -180,9 +212,10 @@ class run_cache_manager {
*/
__device__ void set_reusable_length(uint32_t run_length, uint32_t max_length)
{
if (_status == status::CAN_WRITE_TO_CACHE) {
_run_length = run_length;
_reusable_length = (_run_length > max_length) ? (_run_length - max_length) : 0;
if (_manager._status == run_cache_manager::status::CAN_WRITE_TO_CACHE) {
_manager._run_length = run_length;
_manager._reusable_length =
(_manager._run_length > max_length) ? (_manager._run_length - max_length) : 0;
}
}

Expand All @@ -196,7 +229,9 @@ class run_cache_manager {
[[nodiscard]] __device__ uint32_t adjust_max_length(uint32_t max_length)
{
auto new_max_length{max_length};
if (_status == status::CAN_READ_FROM_CACHE) { new_max_length -= _reusable_length; }
if (_manager._status == run_cache_manager::status::CAN_READ_FROM_CACHE) {
new_max_length -= _manager._reusable_length;
}
return new_max_length;
}

Expand All @@ -206,25 +241,25 @@ class run_cache_manager {
* @param[in] src Intermediate buffer for the DATA stream.
* @param[out] cache_storage Local variable serving as the cache for the DATA stream.
*/
__device__ void write_to_cache(int64_t* src, int64_t& cache_storage)
__device__ void write_to_cache(int64_t* src)
{
if (_status != status::CAN_WRITE_TO_CACHE) { return; }
if (_manager._status != run_cache_manager::status::CAN_WRITE_TO_CACHE) { return; }

auto const tid = threadIdx.x;

__syncthreads();

// All threads in the block always take a uniform code path for the following branches.
// _reusable_length ranges between [0, 512].
if (_reusable_length > 0) {
auto const length_to_skip = _run_length - _reusable_length;
if (tid < _reusable_length) {
if (_manager._reusable_length > 0) {
auto const length_to_skip = _manager._run_length - _manager._reusable_length;
if (tid < _manager._reusable_length) {
auto const src_idx = tid + length_to_skip;
cache_storage = src[src_idx];
_storage = src[src_idx];
}
if (tid == 0) { _status = status::CAN_READ_FROM_CACHE; }
if (tid == 0) { _manager._status = run_cache_manager::status::CAN_READ_FROM_CACHE; }
} else {
if (tid == 0) { _status = status::DISABLED; }
if (tid == 0) { _manager._status = run_cache_manager::status::DISABLED; }
}

__syncthreads();
Expand All @@ -235,101 +270,38 @@ class run_cache_manager {
*
* @param[in,out] dst Intermediate buffer for the DATA stream.
* @param[in,out] rle Run length decoder state object.
* @param[in] cache_storage Local variable serving as the cache for the DATA stream.
*/
__device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle, int64_t cache_storage)
__device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle)
{
if (_status != status::CAN_READ_FROM_CACHE) { return; }
if (_manager._status != run_cache_manager::status::CAN_READ_FROM_CACHE) { return; }

auto const tid = threadIdx.x;

// First, shift the data up
auto const dst_idx = tid + _reusable_length;
auto const v = (dst_idx < rle->num_vals + _reusable_length) ? dst[tid] : 0;
auto const dst_idx = tid + _manager._reusable_length;
auto const v = (dst_idx < rle->num_vals + _manager._reusable_length) ? dst[tid] : 0;
__syncthreads();

if (dst_idx < rle->num_vals + _reusable_length) { dst[dst_idx] = v; }
if (dst_idx < rle->num_vals + _manager._reusable_length) { dst[dst_idx] = v; }
__syncthreads();

// Second, insert the cached data
if (tid < _reusable_length) { dst[tid] = cache_storage; }
if (tid < _manager._reusable_length) { dst[tid] = _storage; }
__syncthreads();

if (tid == 0) {
// Disable the run cache manager, since cache write-and-read happens at most once per row
// group.
_status = status::DISABLED;
rle->num_vals += _reusable_length;
_manager._status = run_cache_manager::status::DISABLED;
rle->num_vals += _manager._reusable_length;
}

__syncthreads();
}

private:
status _status; ///< The status of the run cache manager.
uint32_t
_reusable_length; ///< The number of data to be cached and reused later. For example, if a run
///< has a length of 512 but the maximum length allowed to be consumed is
///< capped at 162, then 350 (512-162) data will be cached.
uint32_t _run_length; ///< The length of the run, 512 in the above example.
};

/**
* @brief Helper class to help run_cache_manager cache the first run of TIMESTAMP's DATA stream for
* a row group.
*
* The run_cache_manager is intended to be stored in the shared memory, whereas the actual cache in
* the local storage as an optimization. If a function is to use run_cache_manager, both the manager
* and the cache objects need to be passed. This class is introduced to simplify the function call,
* so that only a single cache_helper object needs to be passed.
*/
class cache_helper {
public:
/**
* @brief Constructor.
*
* @param[in] run_cache_manager_inst
*/
__device__ explicit cache_helper(run_cache_manager& run_cache_manager_inst)
: _run_cache_manager_inst(run_cache_manager_inst)
{
}

/**
* @brief Wrapper of run_cache_manager's namesake function.
*/
__device__ void set_reusable_length(uint32_t run_length, uint32_t max_length)
{
_run_cache_manager_inst.set_reusable_length(run_length, max_length);
}

/**
* @brief Wrapper of run_cache_manager's namesake function.
*/
__device__ void write_to_cache(int64_t* src)
{
_run_cache_manager_inst.write_to_cache(src, _storage);
}

/**
* @brief Wrapper of run_cache_manager's namesake function.
*/
__device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle)
{
_run_cache_manager_inst.read_from_cache(dst, rle, _storage);
}

/**
* @brief Wrapper of run_cache_manager's namesake function.
*/
[[nodiscard]] __device__ uint32_t adjust_max_length(uint32_t max_length)
{
return _run_cache_manager_inst.adjust_max_length(max_length);
}

private:
run_cache_manager& _run_cache_manager_inst;
int64_t _storage;
run_cache_manager& _manager; ///< An instance of run_cache_manager.
int64_t _storage; ///< Per-thread cache storage.
};

/**
Expand Down

0 comments on commit cd0d272

Please sign in to comment.