From 29556a2514f4d274164a27a80539410da7e132d6 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 3 Oct 2023 14:44:28 -0700 Subject: [PATCH] Remove the use of volatile in ORC (#14175) `volatile` should no be required in our code, unless there are compiler or synchronization issues. This PR removes the use in ORC reader and writer. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Yunsong Wang (https://github.com/PointKernel) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/14175 --- cpp/src/io/orc/stats_enc.cu | 4 +- cpp/src/io/orc/stripe_data.cu | 82 +++++++++++++++-------------------- cpp/src/io/orc/stripe_enc.cu | 14 +++--- cpp/src/io/orc/stripe_init.cu | 2 +- 4 files changed, 46 insertions(+), 56 deletions(-) diff --git a/cpp/src/io/orc/stats_enc.cu b/cpp/src/io/orc/stats_enc.cu index 95f1db5bfd1..479a2dfada3 100644 --- a/cpp/src/io/orc/stats_enc.cu +++ b/cpp/src/io/orc/stats_enc.cu @@ -76,8 +76,8 @@ __global__ void __launch_bounds__(block_size, 1) { using block_scan = cub::BlockScan; __shared__ typename block_scan::TempStorage temp_storage; - volatile uint32_t stats_size = 0; - auto t = threadIdx.x; + uint32_t stats_size = 0; + auto t = threadIdx.x; __syncthreads(); for (thread_index_type start = 0; start < statistics_count; start += block_size) { uint32_t stats_len = 0, stats_pos; diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 3edcd3d83b2..0b249bbdafe 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -142,9 +142,7 @@ struct orcdec_state_s { * @param[in] base Pointer to raw byte stream data * @param[in] len Stream length in bytes */ -static __device__ void bytestream_init(volatile orc_bytestream_s* bs, - uint8_t const* base, - uint32_t len) +static __device__ void bytestream_init(orc_bytestream_s* bs, uint8_t const* base, uint32_t len) { uint32_t pos = (len > 0) ? static_cast(7 & reinterpret_cast(base)) : 0; bs->base = base - pos; @@ -160,8 +158,7 @@ static __device__ void bytestream_init(volatile orc_bytestream_s* bs, * @param[in] bs Byte stream input * @param[in] bytes_consumed Number of bytes that were consumed */ -static __device__ void bytestream_flush_bytes(volatile orc_bytestream_s* bs, - uint32_t bytes_consumed) +static __device__ void bytestream_flush_bytes(orc_bytestream_s* bs, uint32_t bytes_consumed) { uint32_t pos = bs->pos; uint32_t len = bs->len; @@ -197,7 +194,7 @@ static __device__ void bytestream_fill(orc_bytestream_s* bs, int t) * @param[in] pos Position in byte stream * @return byte */ -inline __device__ uint8_t bytestream_readbyte(volatile orc_bytestream_s* bs, int pos) +inline __device__ uint8_t bytestream_readbyte(orc_bytestream_s* bs, int pos) { return bs->buf.u8[pos & (bytestream_buffer_size - 1)]; } @@ -209,7 +206,7 @@ inline __device__ uint8_t bytestream_readbyte(volatile orc_bytestream_s* bs, int * @param[in] pos Position in byte stream * @result bits */ -inline __device__ uint32_t bytestream_readu32(volatile orc_bytestream_s* bs, int pos) +inline __device__ uint32_t bytestream_readu32(orc_bytestream_s* bs, int pos) { uint32_t a = bs->buf.u32[(pos & (bytestream_buffer_size - 1)) >> 2]; uint32_t b = bs->buf.u32[((pos + 4) & (bytestream_buffer_size - 1)) >> 2]; @@ -224,7 +221,7 @@ inline __device__ uint32_t bytestream_readu32(volatile orc_bytestream_s* bs, int * @param[in] numbits number of bits * @return bits */ -inline __device__ uint64_t bytestream_readu64(volatile orc_bytestream_s* bs, int pos) +inline __device__ uint64_t bytestream_readu64(orc_bytestream_s* bs, int pos) { uint32_t a = bs->buf.u32[(pos & (bytestream_buffer_size - 1)) >> 2]; uint32_t b = bs->buf.u32[((pos + 4) & (bytestream_buffer_size - 1)) >> 2]; @@ -245,9 +242,7 @@ inline __device__ uint64_t bytestream_readu64(volatile orc_bytestream_s* bs, int * @param[in] numbits number of bits * @return decoded value */ -inline __device__ uint32_t bytestream_readbits(volatile orc_bytestream_s* bs, - int bitpos, - uint32_t numbits) +inline __device__ uint32_t bytestream_readbits(orc_bytestream_s* bs, int bitpos, uint32_t numbits) { int idx = bitpos >> 5; uint32_t a = __byte_perm(bs->buf.u32[(idx + 0) & bytestream_buffer_mask], 0, 0x0123); @@ -263,9 +258,7 @@ inline __device__ uint32_t bytestream_readbits(volatile orc_bytestream_s* bs, * @param[in] numbits number of bits * @return decoded value */ -inline __device__ uint64_t bytestream_readbits64(volatile orc_bytestream_s* bs, - int bitpos, - uint32_t numbits) +inline __device__ uint64_t bytestream_readbits64(orc_bytestream_s* bs, int bitpos, uint32_t numbits) { int idx = bitpos >> 5; uint32_t a = __byte_perm(bs->buf.u32[(idx + 0) & bytestream_buffer_mask], 0, 0x0123); @@ -288,7 +281,7 @@ inline __device__ uint64_t bytestream_readbits64(volatile orc_bytestream_s* bs, * @param[in] numbits number of bits * @param[out] result decoded value */ -inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs, +inline __device__ void bytestream_readbe(orc_bytestream_s* bs, int bitpos, uint32_t numbits, uint32_t& result) @@ -304,7 +297,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs, * @param[in] numbits number of bits * @param[out] result decoded value */ -inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs, +inline __device__ void bytestream_readbe(orc_bytestream_s* bs, int bitpos, uint32_t numbits, int32_t& result) @@ -321,7 +314,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs, * @param[in] numbits number of bits * @param[out] result decoded value */ -inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs, +inline __device__ void bytestream_readbe(orc_bytestream_s* bs, int bitpos, uint32_t numbits, uint64_t& result) @@ -337,7 +330,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs, * @param[in] numbits number of bits * @param[out] result decoded value */ -inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs, +inline __device__ void bytestream_readbe(orc_bytestream_s* bs, int bitpos, uint32_t numbits, int64_t& result) @@ -354,7 +347,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs, * @return length of varint in bytes */ template -inline __device__ uint32_t varint_length(volatile orc_bytestream_s* bs, int pos) +inline __device__ uint32_t varint_length(orc_bytestream_s* bs, int pos) { if (bytestream_readbyte(bs, pos) > 0x7f) { uint32_t next32 = bytestream_readu32(bs, pos + 1); @@ -392,7 +385,7 @@ inline __device__ uint32_t varint_length(volatile orc_bytestream_s* bs, int pos) * @return new position in byte stream buffer */ template -inline __device__ int decode_base128_varint(volatile orc_bytestream_s* bs, int pos, T& result) +inline __device__ int decode_base128_varint(orc_bytestream_s* bs, int pos, T& result) { uint32_t v = bytestream_readbyte(bs, pos++); if (v > 0x7f) { @@ -446,7 +439,7 @@ inline __device__ int decode_base128_varint(volatile orc_bytestream_s* bs, int p /** * @brief Decodes a signed int128 encoded as base-128 varint (used for decimals) */ -inline __device__ __int128_t decode_varint128(volatile orc_bytestream_s* bs, int pos) +inline __device__ __int128_t decode_varint128(orc_bytestream_s* bs, int pos) { auto byte = bytestream_readbyte(bs, pos++); __int128_t const sign_mask = -(int32_t)(byte & 1); @@ -463,7 +456,7 @@ inline __device__ __int128_t decode_varint128(volatile orc_bytestream_s* bs, int /** * @brief Decodes an unsigned 32-bit varint */ -inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint32_t& result) +inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, uint32_t& result) { uint32_t u; pos = decode_base128_varint(bs, pos, u); @@ -474,7 +467,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint /** * @brief Decodes an unsigned 64-bit varint */ -inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint64_t& result) +inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, uint64_t& result) { uint64_t u; pos = decode_base128_varint(bs, pos, u); @@ -485,7 +478,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint /** * @brief Signed version of 32-bit decode_varint */ -inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int32_t& result) +inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, int32_t& result) { uint32_t u; pos = decode_base128_varint(bs, pos, u); @@ -496,7 +489,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int3 /** * @brief Signed version of 64-bit decode_varint */ -inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int64_t& result) +inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, int64_t& result) { uint64_t u; pos = decode_base128_varint(bs, pos, u); @@ -514,7 +507,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int6 * @return number of values decoded */ template -inline __device__ void lengths_to_positions(volatile T* vals, uint32_t numvals, unsigned int t) +inline __device__ void lengths_to_positions(T* vals, uint32_t numvals, unsigned int t) { for (uint32_t n = 1; n < numvals; n <<= 1) { __syncthreads(); @@ -534,8 +527,8 @@ inline __device__ void lengths_to_positions(volatile T* vals, uint32_t numvals, * @return number of values decoded */ template -static __device__ uint32_t Integer_RLEv1( - orc_bytestream_s* bs, volatile orc_rlev1_state_s* rle, volatile T* vals, uint32_t maxvals, int t) +static __device__ uint32_t +Integer_RLEv1(orc_bytestream_s* bs, orc_rlev1_state_s* rle, T* vals, uint32_t maxvals, int t) { uint32_t numvals, numruns; if (t == 0) { @@ -642,8 +635,8 @@ static const __device__ __constant__ uint8_t ClosestFixedBitsMap[65] = { */ template static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, - volatile orc_rlev2_state_s* rle, - volatile T* vals, + orc_rlev2_state_s* rle, + T* vals, uint32_t maxvals, int t, bool has_buffered_values = false) @@ -883,7 +876,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, * * @return 32-bit value */ -inline __device__ uint32_t rle8_read_bool32(volatile uint32_t* vals, uint32_t bitpos) +inline __device__ uint32_t rle8_read_bool32(uint32_t* vals, uint32_t bitpos) { uint32_t a = vals[(bitpos >> 5) + 0]; uint32_t b = vals[(bitpos >> 5) + 1]; @@ -903,11 +896,8 @@ inline __device__ uint32_t rle8_read_bool32(volatile uint32_t* vals, uint32_t bi * * @return number of values decoded */ -static __device__ uint32_t Byte_RLE(orc_bytestream_s* bs, - volatile orc_byterle_state_s* rle, - volatile uint8_t* vals, - uint32_t maxvals, - int t) +static __device__ uint32_t +Byte_RLE(orc_bytestream_s* bs, orc_byterle_state_s* rle, uint8_t* vals, uint32_t maxvals, int t) { uint32_t numvals, numruns; int r, tr; @@ -1006,8 +996,8 @@ static const __device__ __constant__ int64_t kPow5i[28] = {1, * @return number of values decoded */ static __device__ int Decode_Decimals(orc_bytestream_s* bs, - volatile orc_byterle_state_s* scratch, - volatile orcdec_state_s::values& vals, + orc_byterle_state_s* scratch, + orcdec_state_s::values& vals, int val_scale, int numvals, type_id dtype_id, @@ -1241,8 +1231,8 @@ __global__ void __launch_bounds__(block_size) } __syncthreads(); while (s->top.dict.dict_len > 0) { - uint32_t numvals = min(s->top.dict.dict_len, blockDim.x), len; - volatile uint32_t* vals = s->vals.u32; + uint32_t numvals = min(s->top.dict.dict_len, blockDim.x), len; + uint32_t* vals = s->vals.u32; bytestream_fill(&s->bs, t); __syncthreads(); if (is_rlev1(s->chunk.encoding_kind)) { @@ -1310,12 +1300,12 @@ static __device__ void DecodeRowPositions(orcdec_state_s* s, min((row_decoder_buffer_size - s->u.rowdec.nz_count) * 2, blockDim.x)); if (s->chunk.valid_map_base != nullptr) { // We have a present stream - uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row); - auto r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row); - uint32_t valid = (t < nrows && r < rmax) - ? (((uint8_t const*)s->chunk.valid_map_base)[r >> 3] >> (r & 7)) & 1 - : 0; - volatile auto* row_ofs_plus1 = (volatile uint16_t*)&s->u.rowdec.row[s->u.rowdec.nz_count]; + uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row); + auto r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row); + uint32_t valid = (t < nrows && r < rmax) + ? (((uint8_t const*)s->chunk.valid_map_base)[r >> 3] >> (r & 7)) & 1 + : 0; + auto* row_ofs_plus1 = (uint16_t*)&s->u.rowdec.row[s->u.rowdec.nz_count]; uint32_t nz_pos, row_plus1, nz_count = s->u.rowdec.nz_count, last_row; if (t < nrows) { row_ofs_plus1[t] = valid; } lengths_to_positions(row_ofs_plus1, nrows, t); diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 73c41e2bbcd..4841fb1141a 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -53,7 +53,7 @@ constexpr bool zero_pll_war = true; struct byterle_enc_state_s { uint32_t literal_run; uint32_t repeat_run; - volatile uint32_t rpt_map[(512 / 32) + 1]; + uint32_t rpt_map[(512 / 32) + 1]; }; struct intrle_enc_state_s { @@ -63,7 +63,7 @@ struct intrle_enc_state_s { uint32_t literal_w; uint32_t hdr_bytes; uint32_t pl_bytes; - volatile uint32_t delta_map[(512 / 32) + 1]; + uint32_t delta_map[(512 / 32) + 1]; }; struct strdata_enc_state_s { @@ -366,7 +366,7 @@ static __device__ uint32_t IntegerRLE( using block_reduce = cub::BlockReduce; uint8_t* dst = s->stream.data_ptrs[cid] + s->strm_pos[cid]; uint32_t out_cnt = 0; - __shared__ volatile uint64_t block_vmin; + __shared__ uint64_t block_vmin; while (numvals > 0) { T v0 = (t < numvals) ? inbuf[(inpos + t) & inmask] : 0; @@ -615,7 +615,7 @@ static __device__ void StoreStringData(uint8_t* dst, * @param[in] t thread id */ template -inline __device__ void lengths_to_positions(volatile T* vals, uint32_t numvals, unsigned int t) +inline __device__ void lengths_to_positions(T* vals, uint32_t numvals, unsigned int t) { for (uint32_t n = 1; n < numvals; n <<= 1) { __syncthreads(); @@ -1143,7 +1143,7 @@ __global__ void __launch_bounds__(256) uint32_t comp_block_align) { __shared__ __align__(16) StripeStream ss; - __shared__ uint8_t* volatile uncomp_base_g; + __shared__ uint8_t* uncomp_base_g; auto const padded_block_header_size = util::round_up_unsafe(block_header_size, comp_block_align); auto const padded_comp_block_size = util::round_up_unsafe(max_comp_blk_size, comp_block_align); @@ -1196,8 +1196,8 @@ __global__ void __launch_bounds__(1024) uint32_t max_comp_blk_size) { __shared__ __align__(16) StripeStream ss; - __shared__ uint8_t const* volatile comp_src_g; - __shared__ uint32_t volatile comp_len_g; + __shared__ uint8_t const* comp_src_g; + __shared__ uint32_t comp_len_g; auto const stripe_id = blockIdx.x; auto const stream_id = blockIdx.y; diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index 8eeca504121..b31a4a081d1 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -499,7 +499,7 @@ __global__ void __launch_bounds__(128, 8) gpuParseRowGroupIndex(RowGroup* row_gr : row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row; for (int j = t4; j < rowgroup_size4; j += 4) { ((uint32_t*)&row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x])[j] = - ((volatile uint32_t*)&s->rowgroups[i])[j]; + ((uint32_t*)&s->rowgroups[i])[j]; } row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows = num_rows; // Updating in case of struct