Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the ORC decoding bug for the timestamp data #17570

Open
wants to merge 18 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 202 additions & 2 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,177 @@ struct orcdec_state_s {
} vals;
};

/**
* @brief Manage caching of the first run of TIMESTAMP's DATA stream for a row group.
*
* This class is used to address a special case, where the first run of the DATA stream spans two
* adjacent row groups and its length is greater than the maximum length allowed to be consumed.
* This limit is imposed by the decoder when processing the SECONDARY stream. This class shall be
* instantiated in the shared memory, and be used to cache the DATA stream with a decoded data type
* of `int64_t`. As an optimization, the actual cache is implemented in the cache_helper class as a
* local variable and does not reside in the shared memory.
*/
class run_cache_manager {
private:
enum class status : uint8_t {
DISABLED, ///< Run cache manager is disabled. No caching will be performed. If the special case
///< happens, the run cache manager will be set to this status after the cache read
///< is completed. This status also applies when the special case does not happen.
CAN_WRITE_TO_CACHE, ///< Run cache manager is ready for write. If the special case happens, the
///< run cache manager will be set to this status.
CAN_READ_FROM_CACHE, ///< Run cache manager is ready for read. If the special case happens, the
///< run cache manager will be set to this status after the cache write is
///< completed.
};

public:
/**
* @brief Initialize the run cache manager.
*
* @param[in] s ORC decoder state.
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
*/
__device__ void initialize(orcdec_state_s* s)
{
_status = (s->top.data.index.run_pos[CI_DATA2] > 0 and s->chunk.type_kind == TIMESTAMP)
? status::CAN_WRITE_TO_CACHE
: status::DISABLED;
_reusable_length = 0;
_run_length = 0;
}

private:
status _status; ///< The status of the run cache manager.
uint32_t
_reusable_length; ///< The number of data to be cached and reused later. For example, if a run
///< has a length of 512 but the maximum length allowed to be consumed is
///< capped at 162, then 350 (512-162) data will be cached.
uint32_t _run_length; ///< The length of the run, 512 in the above example.
friend class cache_helper;
};

/**
* @brief Helper class to help run_cache_manager cache the first run of TIMESTAMP's DATA stream for
* a row group.
*
* The run_cache_manager is intended to be stored in the shared memory, whereas the actual cache is
* in the local storage (as an optimization). If a function is to use run_cache_manager, both the
* manager and the cache objects need to be passed. This class is introduced to simplify the
* function call, so that only a single cache_helper object needs to be passed. To that end, public
* methods originally belonging to run_cache_manager have been moved to this class.
*/
class cache_helper {
public:
/**
* @brief Constructor.
*
* @param[in] run_cache_manager_inst An instance of run_cache_manager.
*/
__device__ explicit cache_helper(run_cache_manager& run_cache_manager_inst)
: _manager(run_cache_manager_inst)
{
}

/**
* @brief Set the reusable length object.
*
* @param[in] run_length The length of the first run (spanning two adjacent row groups) of the
* DATA stream.
* @param[in] max_length The maximum length allowed to be consumed. This limit is imposed
* by the decoder when processing the SECONDARY stream.
*/
__device__ void set_reusable_length(uint32_t run_length, uint32_t max_length)
{
if (_manager._status == run_cache_manager::status::CAN_WRITE_TO_CACHE) {
_manager._run_length = run_length;
_manager._reusable_length =
(_manager._run_length > max_length) ? (_manager._run_length - max_length) : 0;
}
}

/**
* @brief Adjust the maximum length allowed to be consumed when the length of the first run is
* greater than it.
*
* @param[in] max_length The maximum length allowed to be consumed for the DATA stream.
* @return A new maximum length.
*/
[[nodiscard]] __device__ uint32_t adjust_max_length(uint32_t max_length)
{
auto new_max_length{max_length};
if (_manager._status == run_cache_manager::status::CAN_READ_FROM_CACHE) {
new_max_length -= _manager._reusable_length;
}
return new_max_length;
}

/**
* @brief Copy the excess data from the intermediate buffer for the DATA stream to the cache.
*
* @param[in] src Intermediate buffer for the DATA stream.
*/
__device__ void write_to_cache(int64_t* src)
{
if (_manager._status != run_cache_manager::status::CAN_WRITE_TO_CACHE) { return; }

auto const tid = threadIdx.x;

__syncthreads();

// All threads in the block always take a uniform code path for the following branches.
// _reusable_length ranges between [0, 512].
if (_manager._reusable_length > 0) {
auto const length_to_skip = _manager._run_length - _manager._reusable_length;
if (tid < _manager._reusable_length) {
auto const src_idx = tid + length_to_skip;
_storage = src[src_idx];
}
if (tid == 0) { _manager._status = run_cache_manager::status::CAN_READ_FROM_CACHE; }
} else {
if (tid == 0) { _manager._status = run_cache_manager::status::DISABLED; }
}

__syncthreads();
vuule marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @brief Copy the cached data to the intermediate buffer for the DATA stream.
*
* @param[in,out] dst Intermediate buffer for the DATA stream.
* @param[in,out] rle Run length decoder state object.
*/
__device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle)
{
if (_manager._status != run_cache_manager::status::CAN_READ_FROM_CACHE) { return; }

auto const tid = threadIdx.x;

// First, shift the data up
auto const dst_idx = tid + _manager._reusable_length;
auto const v = (dst_idx < rle->num_vals + _manager._reusable_length) ? dst[tid] : 0;
__syncthreads();
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

if (dst_idx < rle->num_vals + _manager._reusable_length) { dst[dst_idx] = v; }
__syncthreads();

// Second, insert the cached data
if (tid < _manager._reusable_length) { dst[tid] = _storage; }
__syncthreads();

if (tid == 0) {
// Disable the run cache manager, since cache write-and-read happens at most once per row
// group.
_manager._status = run_cache_manager::status::DISABLED;
rle->num_vals += _manager._reusable_length;
}

__syncthreads();
}

private:
run_cache_manager& _manager; ///< An instance of run_cache_manager.
int64_t _storage; ///< Per-thread cache storage.
};

/**
* @brief Initializes byte stream, modifying length and start position to keep the read pointer
* 8-byte aligned.
Expand Down Expand Up @@ -631,6 +802,9 @@ static const __device__ __constant__ uint8_t ClosestFixedBitsMap[65] = {
* @param[in] maxvals maximum number of values to decode
* @param[in] t thread id
* @param[in] has_buffered_values If true, means there are already buffered values
* @param[in] run_cache_manager_inst If non-null, the run cache manager will be used to manage
* caching of the first run of the DATA stream.
* @param[in] cache Local variable serving as the cache.
*
* @return number of values decoded
*/
Expand All @@ -640,9 +814,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
T* vals,
uint32_t maxvals,
int t,
bool has_buffered_values = false)
bool has_buffered_values = false,
cache_helper* cache_helper_inst = nullptr)
{
if (t == 0) {
if (cache_helper_inst != nullptr) { maxvals = cache_helper_inst->adjust_max_length(maxvals); }
uint32_t maxpos = min(bs->len, bs->pos + (bytestream_buffer_size - 8u));
uint32_t lastpos = bs->pos;
auto numvals = 0;
Expand Down Expand Up @@ -685,6 +861,9 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
l += deltapos;
}
}

if (cache_helper_inst != nullptr) { cache_helper_inst->set_reusable_length(n, maxvals); }

if ((numvals != 0) and (numvals + n > maxvals)) break;
// case where there are buffered values and can't consume a whole chunk
// from decoded values, so skip adding any more to buffer, work on buffered values and then
Expand Down Expand Up @@ -866,6 +1045,17 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
__syncwarp();
}
__syncthreads();
// Currently run_cache_manager is only designed to fix the TIMESTAMP's DATA stream bug where the
// data type is int64_t.
if constexpr (cuda::std::is_same_v<T, int64_t>) {
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
if (cache_helper_inst != nullptr) {
// Run cache is read from during the 2nd iteration of the top-level while loop in
// gpuDecodeOrcColumnData().
cache_helper_inst->read_from_cache(vals, rle);
// Run cache is written to during the 1st iteration of the loop.
cache_helper_inst->write_to_cache(vals);
}
}
return rle->num_vals;
}

Expand Down Expand Up @@ -1401,6 +1591,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)
// Struct doesn't have any data in itself, so skip
bool const is_valid = s->chunk.type_kind != STRUCT;
size_t const max_num_rows = s->chunk.column_num_rows;
__shared__ run_cache_manager run_cache_manager_inst;
cache_helper cache_helper_inst(run_cache_manager_inst);
if (t == 0 and is_valid) {
// If we have an index, seek to the initial run and update row positions
if (num_rowgroups > 0) {
Expand Down Expand Up @@ -1443,6 +1635,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)

bytestream_init(&s->bs, s->chunk.streams[CI_DATA], s->chunk.strm_len[CI_DATA]);
bytestream_init(&s->bs2, s->chunk.streams[CI_DATA2], s->chunk.strm_len[CI_DATA2]);

run_cache_manager_inst.initialize(s);
}
__syncthreads();

Expand Down Expand Up @@ -1602,7 +1796,13 @@ CUDF_KERNEL void __launch_bounds__(block_size)
if (is_rlev1(s->chunk.encoding_kind)) {
numvals = Integer_RLEv1<int64_t>(bs, &s->u.rlev1, s->vals.i64, numvals, t);
vuule marked this conversation as resolved.
Show resolved Hide resolved
} else {
numvals = Integer_RLEv2<int64_t>(bs, &s->u.rlev2, s->vals.i64, numvals, t);
numvals = Integer_RLEv2<int64_t>(bs,
&s->u.rlev2,
s->vals.i64,
numvals,
t,
false /**has_buffered_values */,
&cache_helper_inst);
}
if (s->chunk.type_kind == DECIMAL) {
// If we're using an index, we may have to drop values from the initial run
Expand Down
Binary file not shown.
Binary file not shown.
22 changes: 22 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1970,3 +1970,25 @@ def test_row_group_alignment(datadir):
got = cudf.read_orc(buffer)

assert_eq(expected, got)


@pytest.mark.parametrize(
"inputfile",
[
"TestOrcFile.timestamp.desynced.uncompressed.RLEv2.orc",
"TestOrcFile.timestamp.desynced.snappy.RLEv2.orc",
],
)
def test_orc_reader_desynced_timestamp(datadir, inputfile):
# Test a special case where the DATA stream (second) in a TIMESTAMP column
# is progressed faster than the SECONDARY stream (nanosecond) at the start of a row
# group. In this case, the "run cache manager" in the decoder kernel is used to
# orchestrate the dual-stream processing.
# For more information, see https://github.com/rapidsai/cudf/issues/17155.

path = datadir / inputfile

expect = pd.read_orc(path)
got = cudf.read_orc(path)

assert_frame_equal(cudf.from_pandas(expect), got)
Loading