Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the ORC decoding bug for the timestamp data #17570

Open
wants to merge 18 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 169 additions & 2 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,146 @@ struct orcdec_state_s {
} vals;
};

/**
* @brief Manage caching of the first run of TIMESTAMP's DATA stream for a row group.
*
* This class is used to address a special case, where the first run spans two adjacent row groups
* and its length is greater than the maximum length allowed to be consumed. This limit is imposed
* by the decoder when processing the SECONDARY stream. This class shall be instantiated in the
* shared memory, and be used to cache the DATA stream with a decoded data type of `int64_t`. As an
* optimization, the actual cache is a local variable and does not reside in the shared memory.
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved
*/
class run_cache_manager {
private:
enum class status : uint8_t {
DISABLED, ///< Run cache manager is disabled. No caching will be performed.
CAN_WRITE_TO_CACHE, ///< Run cache manager is ready for write. This is expected to happen in
///< the first iteration of the top-level while-loop in
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved
///< gpuDecodeOrcColumnData().
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved
CAN_READ_FROM_CACHE, ///< Run cache manager is ready for read. This is expected to happen in
///< the second iteration of the while-loop.
};

public:
/**
* @brief Initialize the run cache manager.
*
* @param[in] s ORC decoder state.
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
*/
__device__ void initialize(orcdec_state_s* s)
{
_status = (s->top.data.index.run_pos[CI_DATA2] > 0 and s->chunk.type_kind == TIMESTAMP)
? status::CAN_WRITE_TO_CACHE
: status::DISABLED;
_reusable_length = 0;
_run_length = 0;
}

/**
* @brief Set the reusable length object.
*
* @param[in] run_length The length of the first run (spanning two adjacent row groups) of the
* DATA stream.
* @param[in] max_length The maximum length allowed to be consumed. This limit is imposed
* by the decoder when processing the SECONDARY stream.
*/
__device__ void set_reusable_length(uint32_t run_length, uint32_t max_length)
{
if (_status == status::CAN_WRITE_TO_CACHE) {
_run_length = run_length;
_reusable_length = (_run_length > max_length) ? (_run_length - max_length) : 0;
}
}

/**
* @brief Adjust the maximum length allowed to be consumed when the length of the first run is
* greater than it.
*
* @param[in] max_length The maximum length allowed to be consumed for the DATA stream.
* @return A new maximum length.
*/
__device__ uint32_t adjust_max_length(uint32_t max_length)
vuule marked this conversation as resolved.
Show resolved Hide resolved
{
auto new_max_length{max_length};
if (_status == status::CAN_READ_FROM_CACHE and _reusable_length > 0) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
new_max_length -= _reusable_length;
}
return new_max_length;
}

/**
* @brief Copy the excess data from the intermediate buffer for the DATA stream to the cache.
*
* @param[in] src Intermediate buffer for the DATA stream.
* @param[out] cache Local variable serving as the cache for the DATA stream.
*/
__device__ void write_to_cache(int64_t* src, int64_t& cache)
{
if (_status != status::CAN_WRITE_TO_CACHE) { return; }

const auto tid = threadIdx.x;
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved

// All threads in the block always take a uniform code path for the following branches.
// _reusable_length ranges between [0, 512].
if (_reusable_length > 0) {
const auto length_to_skip = _run_length - _reusable_length;
if (tid < _reusable_length) {
const auto src_idx = tid + length_to_skip;
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved
cache = src[src_idx];
}
// Block until all writes are done to safely change _status.
__syncthreads();
if (tid == 0) { _status = status::CAN_READ_FROM_CACHE; }
} else {
__syncthreads();
if (tid == 0) { _status = status::DISABLED; }
}

__syncthreads();
vuule marked this conversation as resolved.
Show resolved Hide resolved
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @brief Copy the cached data to the intermediate buffer for the DATA stream.
*
* @param[in,out] dst Intermediate buffer for the DATA stream.
* @param[in,out] rle Run length decoder state object.
* @param[in] cache Local variable serving as the cache for the DATA stream.
*/
__device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle, int64_t cache)
{
if (_status != status::CAN_READ_FROM_CACHE) { return; }

const auto tid = threadIdx.x;

// First, shift the data up
const auto dst_idx = tid + _reusable_length;
const auto v = (dst_idx < rle->num_vals + _reusable_length) ? dst[tid] : 0;
__syncthreads();
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

if (dst_idx < rle->num_vals + _reusable_length) { dst[dst_idx] = v; }
__syncthreads();

// Second, insert the cached data
if (tid < _reusable_length) { dst[tid] = cache; }
__syncthreads();

if (tid == 0) {
_status = status::DISABLED;
vuule marked this conversation as resolved.
Show resolved Hide resolved
rle->num_vals += _reusable_length;
}

__syncthreads();
}

private:
status _status; ///< The status of the run cache manager.
uint32_t
vuule marked this conversation as resolved.
Show resolved Hide resolved
_reusable_length; ///< The number of data to be cached and reused later. For example, if a run
///< has a length of 512 but the maximum length allowed to be consumed is
///< capped at 162, then 350 (512-162) data will be cached.
uint32_t _run_length; ///< The length of the run, 512 in the above example.
};

/**
* @brief Initializes byte stream, modifying length and start position to keep the read pointer
* 8-byte aligned.
Expand Down Expand Up @@ -631,6 +771,9 @@ static const __device__ __constant__ uint8_t ClosestFixedBitsMap[65] = {
* @param[in] maxvals maximum number of values to decode
* @param[in] t thread id
* @param[in] has_buffered_values If true, means there are already buffered values
* @param[in] run_cache_manager_inst If non-null, the run cache manager will be used to manage
* caching of the first run of the DATA stream.
* @param[in] cache Local variable serving as the cache.
*
* @return number of values decoded
*/
Expand All @@ -640,9 +783,14 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
T* vals,
uint32_t maxvals,
int t,
bool has_buffered_values = false)
bool has_buffered_values = false,
run_cache_manager* run_cache_manager_inst = nullptr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a cuda::std::optional instead?

Aside: if we had C++23 the optional usage in this file would be a great use case for std::optional::and_then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A naive question from someone with very limited knowledge of our I/O: I only see one usage of this function outside of the gpuDecodeOrcColumnData kernel, and that's inside gpuDecodeNullsAndStringDictionaries. Are guaranteed to not need this cache in that case (again, asking naively since I don't really know what that function does).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't studied the function gpuDecodeNullsAndStringDictionaries yet, and I'm on the conservative side applying the cache to existing code.

I'm not quite sure about how to use cuda::std::optional and what's its advantage when it comes to function default arguments. Since the function will have side effect on the optional object, should I pass it by reference cuda::std::optional<X>&? But this way I can't give it a default value cuda::std::nullopt, and have to pass cuda::std::nullopt to all function calls that don't need X. Is my understanding correct?

int64_t* cache = nullptr)
{
if (t == 0) {
if (run_cache_manager_inst != nullptr) {
maxvals = run_cache_manager_inst->adjust_max_length(maxvals);
}
uint32_t maxpos = min(bs->len, bs->pos + (bytestream_buffer_size - 8u));
uint32_t lastpos = bs->pos;
auto numvals = 0;
Expand Down Expand Up @@ -685,6 +833,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
l += deltapos;
}
}

if (run_cache_manager_inst != nullptr) {
run_cache_manager_inst->set_reusable_length(n, maxvals);
}

if ((numvals != 0) and (numvals + n > maxvals)) break;
// case where there are buffered values and can't consume a whole chunk
// from decoded values, so skip adding any more to buffer, work on buffered values and then
Expand Down Expand Up @@ -866,6 +1019,15 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
__syncwarp();
}
__syncthreads();
if constexpr (cuda::std::is_same_v<T, int64_t>) {
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
if (run_cache_manager_inst != nullptr) {
// Run cache is read from during the 2nd iteration of the top-level while-loop in
// gpuDecodeOrcColumnData().
run_cache_manager_inst->read_from_cache(vals, rle, *cache);
// Run cache is written to during the 1st iteration of the loop.
run_cache_manager_inst->write_to_cache(vals, *cache);
}
}
return rle->num_vals;
}

Expand Down Expand Up @@ -1401,6 +1563,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)
// Struct doesn't have any data in itself, so skip
bool const is_valid = s->chunk.type_kind != STRUCT;
size_t const max_num_rows = s->chunk.column_num_rows;
__shared__ run_cache_manager run_cache_manager_inst;
int64_t cache{};
if (t == 0 and is_valid) {
// If we have an index, seek to the initial run and update row positions
if (num_rowgroups > 0) {
Expand Down Expand Up @@ -1443,6 +1607,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)

bytestream_init(&s->bs, s->chunk.streams[CI_DATA], s->chunk.strm_len[CI_DATA]);
bytestream_init(&s->bs2, s->chunk.streams[CI_DATA2], s->chunk.strm_len[CI_DATA2]);

run_cache_manager_inst.initialize(s);
}
__syncthreads();

Expand Down Expand Up @@ -1602,7 +1768,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)
if (is_rlev1(s->chunk.encoding_kind)) {
numvals = Integer_RLEv1<int64_t>(bs, &s->u.rlev1, s->vals.i64, numvals, t);
vuule marked this conversation as resolved.
Show resolved Hide resolved
} else {
numvals = Integer_RLEv2<int64_t>(bs, &s->u.rlev2, s->vals.i64, numvals, t);
numvals = Integer_RLEv2<int64_t>(
bs, &s->u.rlev2, s->vals.i64, numvals, t, false, &run_cache_manager_inst, &cache);
}
if (s->chunk.type_kind == DECIMAL) {
// If we're using an index, we may have to drop values from the initial run
Expand Down
Binary file not shown.
Binary file not shown.
22 changes: 22 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1970,3 +1970,25 @@ def test_row_group_alignment(datadir):
got = cudf.read_orc(buffer)

assert_eq(expected, got)


@pytest.mark.parametrize(
"inputfile",
[
"TestOrcFile.timestamp.desynced.uncompressed.RLEv2.orc",
"TestOrcFile.timestamp.desynced.snappy.RLEv2.orc",
],
)
def test_orc_reader_desynced_timestamp(datadir, inputfile):
# Test a special case where the DATA stream (second) in a TIMESTAMP column
# is progressed faster than the SECONDARY stream (nanosecond) at the start of a row
# group. In this case, the "run cache manager" in the decoder kernel is used to
# orchestrate the dual-stream processing.
# For more information, see https://github.com/rapidsai/cudf/issues/17155.

path = datadir / inputfile

expect = pd.read_orc(path)
got = cudf.read_orc(path)

assert_frame_equal(cudf.from_pandas(expect), got)
Loading