Skip to content

Commit

Permalink
Fix the orc decoding bug
Browse files Browse the repository at this point in the history
  • Loading branch information
kingcrimsontianyu committed Dec 11, 2024
1 parent 0c5bd66 commit bfa6b0c
Showing 1 changed file with 109 additions and 2 deletions.
111 changes: 109 additions & 2 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

#include "cub/util_ptx.cuh"
#include "cuda/std/__type_traits/is_same.h"
#include "io/utilities/block_utils.cuh"
#include "orc_gpu.hpp"

Expand Down Expand Up @@ -132,6 +134,95 @@ struct orcdec_state_s {
} vals;
};

class run_cache {
private:
enum class status : uint8_t {
DISABLED,
CAN_WRITE_TO_CACHE,
CAN_READ_FROM_CACHE,
};

public:
__forceinline__ __device__ void initialize(orcdec_state_s* s)
{
_status = (s->top.data.index.run_pos[CI_DATA2] > 0 and s->chunk.type_kind == TIMESTAMP)
? status::CAN_WRITE_TO_CACHE
: status::DISABLED;
_reusable_length = 0;
_run_length = 0;
}

__forceinline__ __device__ void set_reusable_length(uint32_t run_length, uint32_t max_length)
{
if (_status == status::CAN_WRITE_TO_CACHE) {
_run_length = run_length;
_reusable_length = (_run_length > max_length) ? (_run_length - max_length) : 0;
}
}

__forceinline__ __device__ uint32_t adjust_max_length(uint32_t max_length)
{
auto new_max_length{max_length};
if (_status == status::CAN_READ_FROM_CACHE and _reusable_length > 0) {
new_max_length -= _reusable_length;
}
return new_max_length;
}

__forceinline__ __device__ void write_to_cache(int64_t* src)
{
const auto tid = threadIdx.x;

// All threads in the block take a uniform code path.
// _reusable_length ranges between [0, 512]
if (_status == status::CAN_WRITE_TO_CACHE and _reusable_length > 0) {
const auto length_to_skip = _run_length - _reusable_length;
if (tid < _reusable_length) {
const auto src_idx = tid + length_to_skip;
_buf[tid] = src[src_idx];
}
// Block until all writes are done to safely change _status.
__syncthreads();
if (tid == 0) { _status = status::CAN_READ_FROM_CACHE; }
} else {
__syncthreads();
if (tid == 0) { _status = status::DISABLED; }
}
}

__forceinline__ __device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle)
{
const auto tid = threadIdx.x;

// All threads in the block take a uniform code path.
// _reusable_length ranges between [0, 512]
if (_status == status::CAN_READ_FROM_CACHE and _reusable_length > 0) {
// First, shift the data up
const auto dst_idx = tid + _reusable_length;
const auto v = (dst_idx < rle->num_vals + _reusable_length) ? dst[tid] : 0;
__syncthreads();

if (dst_idx < rle->num_vals + _reusable_length) { dst[dst_idx] = v; }
__syncthreads();

// Second, insert the cached data
if (tid < _reusable_length) { dst[tid] = _buf[tid]; }
__syncthreads();

if (tid == 0) {
_status = status::DISABLED;
rle->num_vals += _reusable_length;
}
}
}

private:
status _status;
uint32_t _reusable_length;
uint32_t _run_length;
int64_t _buf[bytestream_buffer_size >> 4];
};

/**
* @brief Initializes byte stream, modifying length and start position to keep the read pointer
* 8-byte aligned.
Expand Down Expand Up @@ -640,9 +731,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
T* vals,
uint32_t maxvals,
int t,
bool has_buffered_values = false)
bool has_buffered_values = false,
run_cache* run_cache_bs = nullptr)
{
if (t == 0) {
if (run_cache_bs != nullptr) { maxvals = run_cache_bs->adjust_max_length(maxvals); }
uint32_t maxpos = min(bs->len, bs->pos + (bytestream_buffer_size - 8u));
uint32_t lastpos = bs->pos;
auto numvals = 0;
Expand Down Expand Up @@ -685,6 +778,9 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
l += deltapos;
}
}

if (run_cache_bs != nullptr) { run_cache_bs->set_reusable_length(n, maxvals); }

if ((numvals != 0) and (numvals + n > maxvals)) break;
// case where there are buffered values and can't consume a whole chunk
// from decoded values, so skip adding any more to buffer, work on buffered values and then
Expand Down Expand Up @@ -866,6 +962,13 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
__syncwarp();
}
__syncthreads();
if constexpr (cuda::std::is_same_v<T, int64_t>) {
if (run_cache_bs != nullptr) {
run_cache_bs->read_from_cache(vals, rle);
run_cache_bs->write_to_cache(vals);
}
}
__syncthreads();
return rle->num_vals;
}

Expand Down Expand Up @@ -1401,6 +1504,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)
// Struct doesn't have any data in itself, so skip
bool const is_valid = s->chunk.type_kind != STRUCT;
size_t const max_num_rows = s->chunk.column_num_rows;
__shared__ run_cache run_cache_bs;
if (t == 0 and is_valid) {
// If we have an index, seek to the initial run and update row positions
if (num_rowgroups > 0) {
Expand Down Expand Up @@ -1443,6 +1547,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)

bytestream_init(&s->bs, s->chunk.streams[CI_DATA], s->chunk.strm_len[CI_DATA]);
bytestream_init(&s->bs2, s->chunk.streams[CI_DATA2], s->chunk.strm_len[CI_DATA2]);

run_cache_bs.initialize(s);
}
__syncthreads();

Expand Down Expand Up @@ -1602,7 +1708,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)
if (is_rlev1(s->chunk.encoding_kind)) {
numvals = Integer_RLEv1<int64_t>(bs, &s->u.rlev1, s->vals.i64, numvals, t);
} else {
numvals = Integer_RLEv2<int64_t>(bs, &s->u.rlev2, s->vals.i64, numvals, t);
numvals =
Integer_RLEv2<int64_t>(bs, &s->u.rlev2, s->vals.i64, numvals, t, false, &run_cache_bs);
}
if (s->chunk.type_kind == DECIMAL) {
// If we're using an index, we may have to drop values from the initial run
Expand Down

0 comments on commit bfa6b0c

Please sign in to comment.