Skip to content

Commit

Permalink
Remove the use of volatile in ORC (rapidsai#14175)
Browse files Browse the repository at this point in the history
`volatile` should no be required in our code, unless there are compiler or synchronization issues.
This PR removes the use in ORC reader and writer.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Bradley Dice (https://github.com/bdice)

URL: rapidsai#14175
  • Loading branch information
vuule authored Oct 3, 2023
1 parent 7bd435d commit 29556a2
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 56 deletions.
4 changes: 2 additions & 2 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ __global__ void __launch_bounds__(block_size, 1)
{
using block_scan = cub::BlockScan<uint32_t, block_size, cub::BLOCK_SCAN_WARP_SCANS>;
__shared__ typename block_scan::TempStorage temp_storage;
volatile uint32_t stats_size = 0;
auto t = threadIdx.x;
uint32_t stats_size = 0;
auto t = threadIdx.x;
__syncthreads();
for (thread_index_type start = 0; start < statistics_count; start += block_size) {
uint32_t stats_len = 0, stats_pos;
Expand Down
82 changes: 36 additions & 46 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ struct orcdec_state_s {
* @param[in] base Pointer to raw byte stream data
* @param[in] len Stream length in bytes
*/
static __device__ void bytestream_init(volatile orc_bytestream_s* bs,
uint8_t const* base,
uint32_t len)
static __device__ void bytestream_init(orc_bytestream_s* bs, uint8_t const* base, uint32_t len)
{
uint32_t pos = (len > 0) ? static_cast<uint32_t>(7 & reinterpret_cast<size_t>(base)) : 0;
bs->base = base - pos;
Expand All @@ -160,8 +158,7 @@ static __device__ void bytestream_init(volatile orc_bytestream_s* bs,
* @param[in] bs Byte stream input
* @param[in] bytes_consumed Number of bytes that were consumed
*/
static __device__ void bytestream_flush_bytes(volatile orc_bytestream_s* bs,
uint32_t bytes_consumed)
static __device__ void bytestream_flush_bytes(orc_bytestream_s* bs, uint32_t bytes_consumed)
{
uint32_t pos = bs->pos;
uint32_t len = bs->len;
Expand Down Expand Up @@ -197,7 +194,7 @@ static __device__ void bytestream_fill(orc_bytestream_s* bs, int t)
* @param[in] pos Position in byte stream
* @return byte
*/
inline __device__ uint8_t bytestream_readbyte(volatile orc_bytestream_s* bs, int pos)
inline __device__ uint8_t bytestream_readbyte(orc_bytestream_s* bs, int pos)
{
return bs->buf.u8[pos & (bytestream_buffer_size - 1)];
}
Expand All @@ -209,7 +206,7 @@ inline __device__ uint8_t bytestream_readbyte(volatile orc_bytestream_s* bs, int
* @param[in] pos Position in byte stream
* @result bits
*/
inline __device__ uint32_t bytestream_readu32(volatile orc_bytestream_s* bs, int pos)
inline __device__ uint32_t bytestream_readu32(orc_bytestream_s* bs, int pos)
{
uint32_t a = bs->buf.u32[(pos & (bytestream_buffer_size - 1)) >> 2];
uint32_t b = bs->buf.u32[((pos + 4) & (bytestream_buffer_size - 1)) >> 2];
Expand All @@ -224,7 +221,7 @@ inline __device__ uint32_t bytestream_readu32(volatile orc_bytestream_s* bs, int
* @param[in] numbits number of bits
* @return bits
*/
inline __device__ uint64_t bytestream_readu64(volatile orc_bytestream_s* bs, int pos)
inline __device__ uint64_t bytestream_readu64(orc_bytestream_s* bs, int pos)
{
uint32_t a = bs->buf.u32[(pos & (bytestream_buffer_size - 1)) >> 2];
uint32_t b = bs->buf.u32[((pos + 4) & (bytestream_buffer_size - 1)) >> 2];
Expand All @@ -245,9 +242,7 @@ inline __device__ uint64_t bytestream_readu64(volatile orc_bytestream_s* bs, int
* @param[in] numbits number of bits
* @return decoded value
*/
inline __device__ uint32_t bytestream_readbits(volatile orc_bytestream_s* bs,
int bitpos,
uint32_t numbits)
inline __device__ uint32_t bytestream_readbits(orc_bytestream_s* bs, int bitpos, uint32_t numbits)
{
int idx = bitpos >> 5;
uint32_t a = __byte_perm(bs->buf.u32[(idx + 0) & bytestream_buffer_mask], 0, 0x0123);
Expand All @@ -263,9 +258,7 @@ inline __device__ uint32_t bytestream_readbits(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @return decoded value
*/
inline __device__ uint64_t bytestream_readbits64(volatile orc_bytestream_s* bs,
int bitpos,
uint32_t numbits)
inline __device__ uint64_t bytestream_readbits64(orc_bytestream_s* bs, int bitpos, uint32_t numbits)
{
int idx = bitpos >> 5;
uint32_t a = __byte_perm(bs->buf.u32[(idx + 0) & bytestream_buffer_mask], 0, 0x0123);
Expand All @@ -288,7 +281,7 @@ inline __device__ uint64_t bytestream_readbits64(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @param[out] result decoded value
*/
inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
inline __device__ void bytestream_readbe(orc_bytestream_s* bs,
int bitpos,
uint32_t numbits,
uint32_t& result)
Expand All @@ -304,7 +297,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @param[out] result decoded value
*/
inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
inline __device__ void bytestream_readbe(orc_bytestream_s* bs,
int bitpos,
uint32_t numbits,
int32_t& result)
Expand All @@ -321,7 +314,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @param[out] result decoded value
*/
inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
inline __device__ void bytestream_readbe(orc_bytestream_s* bs,
int bitpos,
uint32_t numbits,
uint64_t& result)
Expand All @@ -337,7 +330,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @param[out] result decoded value
*/
inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
inline __device__ void bytestream_readbe(orc_bytestream_s* bs,
int bitpos,
uint32_t numbits,
int64_t& result)
Expand All @@ -354,7 +347,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
* @return length of varint in bytes
*/
template <class T>
inline __device__ uint32_t varint_length(volatile orc_bytestream_s* bs, int pos)
inline __device__ uint32_t varint_length(orc_bytestream_s* bs, int pos)
{
if (bytestream_readbyte(bs, pos) > 0x7f) {
uint32_t next32 = bytestream_readu32(bs, pos + 1);
Expand Down Expand Up @@ -392,7 +385,7 @@ inline __device__ uint32_t varint_length(volatile orc_bytestream_s* bs, int pos)
* @return new position in byte stream buffer
*/
template <class T>
inline __device__ int decode_base128_varint(volatile orc_bytestream_s* bs, int pos, T& result)
inline __device__ int decode_base128_varint(orc_bytestream_s* bs, int pos, T& result)
{
uint32_t v = bytestream_readbyte(bs, pos++);
if (v > 0x7f) {
Expand Down Expand Up @@ -446,7 +439,7 @@ inline __device__ int decode_base128_varint(volatile orc_bytestream_s* bs, int p
/**
* @brief Decodes a signed int128 encoded as base-128 varint (used for decimals)
*/
inline __device__ __int128_t decode_varint128(volatile orc_bytestream_s* bs, int pos)
inline __device__ __int128_t decode_varint128(orc_bytestream_s* bs, int pos)
{
auto byte = bytestream_readbyte(bs, pos++);
__int128_t const sign_mask = -(int32_t)(byte & 1);
Expand All @@ -463,7 +456,7 @@ inline __device__ __int128_t decode_varint128(volatile orc_bytestream_s* bs, int
/**
* @brief Decodes an unsigned 32-bit varint
*/
inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint32_t& result)
inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, uint32_t& result)
{
uint32_t u;
pos = decode_base128_varint<uint32_t>(bs, pos, u);
Expand All @@ -474,7 +467,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint
/**
* @brief Decodes an unsigned 64-bit varint
*/
inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint64_t& result)
inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, uint64_t& result)
{
uint64_t u;
pos = decode_base128_varint<uint64_t>(bs, pos, u);
Expand All @@ -485,7 +478,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint
/**
* @brief Signed version of 32-bit decode_varint
*/
inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int32_t& result)
inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, int32_t& result)
{
uint32_t u;
pos = decode_base128_varint<uint32_t>(bs, pos, u);
Expand All @@ -496,7 +489,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int3
/**
* @brief Signed version of 64-bit decode_varint
*/
inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int64_t& result)
inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, int64_t& result)
{
uint64_t u;
pos = decode_base128_varint<uint64_t>(bs, pos, u);
Expand All @@ -514,7 +507,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int6
* @return number of values decoded
*/
template <class T>
inline __device__ void lengths_to_positions(volatile T* vals, uint32_t numvals, unsigned int t)
inline __device__ void lengths_to_positions(T* vals, uint32_t numvals, unsigned int t)
{
for (uint32_t n = 1; n < numvals; n <<= 1) {
__syncthreads();
Expand All @@ -534,8 +527,8 @@ inline __device__ void lengths_to_positions(volatile T* vals, uint32_t numvals,
* @return number of values decoded
*/
template <class T>
static __device__ uint32_t Integer_RLEv1(
orc_bytestream_s* bs, volatile orc_rlev1_state_s* rle, volatile T* vals, uint32_t maxvals, int t)
static __device__ uint32_t
Integer_RLEv1(orc_bytestream_s* bs, orc_rlev1_state_s* rle, T* vals, uint32_t maxvals, int t)
{
uint32_t numvals, numruns;
if (t == 0) {
Expand Down Expand Up @@ -642,8 +635,8 @@ static const __device__ __constant__ uint8_t ClosestFixedBitsMap[65] = {
*/
template <class T>
static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
volatile orc_rlev2_state_s* rle,
volatile T* vals,
orc_rlev2_state_s* rle,
T* vals,
uint32_t maxvals,
int t,
bool has_buffered_values = false)
Expand Down Expand Up @@ -883,7 +876,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
*
* @return 32-bit value
*/
inline __device__ uint32_t rle8_read_bool32(volatile uint32_t* vals, uint32_t bitpos)
inline __device__ uint32_t rle8_read_bool32(uint32_t* vals, uint32_t bitpos)
{
uint32_t a = vals[(bitpos >> 5) + 0];
uint32_t b = vals[(bitpos >> 5) + 1];
Expand All @@ -903,11 +896,8 @@ inline __device__ uint32_t rle8_read_bool32(volatile uint32_t* vals, uint32_t bi
*
* @return number of values decoded
*/
static __device__ uint32_t Byte_RLE(orc_bytestream_s* bs,
volatile orc_byterle_state_s* rle,
volatile uint8_t* vals,
uint32_t maxvals,
int t)
static __device__ uint32_t
Byte_RLE(orc_bytestream_s* bs, orc_byterle_state_s* rle, uint8_t* vals, uint32_t maxvals, int t)
{
uint32_t numvals, numruns;
int r, tr;
Expand Down Expand Up @@ -1006,8 +996,8 @@ static const __device__ __constant__ int64_t kPow5i[28] = {1,
* @return number of values decoded
*/
static __device__ int Decode_Decimals(orc_bytestream_s* bs,
volatile orc_byterle_state_s* scratch,
volatile orcdec_state_s::values& vals,
orc_byterle_state_s* scratch,
orcdec_state_s::values& vals,
int val_scale,
int numvals,
type_id dtype_id,
Expand Down Expand Up @@ -1241,8 +1231,8 @@ __global__ void __launch_bounds__(block_size)
}
__syncthreads();
while (s->top.dict.dict_len > 0) {
uint32_t numvals = min(s->top.dict.dict_len, blockDim.x), len;
volatile uint32_t* vals = s->vals.u32;
uint32_t numvals = min(s->top.dict.dict_len, blockDim.x), len;
uint32_t* vals = s->vals.u32;
bytestream_fill(&s->bs, t);
__syncthreads();
if (is_rlev1(s->chunk.encoding_kind)) {
Expand Down Expand Up @@ -1310,12 +1300,12 @@ static __device__ void DecodeRowPositions(orcdec_state_s* s,
min((row_decoder_buffer_size - s->u.rowdec.nz_count) * 2, blockDim.x));
if (s->chunk.valid_map_base != nullptr) {
// We have a present stream
uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row);
auto r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row);
uint32_t valid = (t < nrows && r < rmax)
? (((uint8_t const*)s->chunk.valid_map_base)[r >> 3] >> (r & 7)) & 1
: 0;
volatile auto* row_ofs_plus1 = (volatile uint16_t*)&s->u.rowdec.row[s->u.rowdec.nz_count];
uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row);
auto r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row);
uint32_t valid = (t < nrows && r < rmax)
? (((uint8_t const*)s->chunk.valid_map_base)[r >> 3] >> (r & 7)) & 1
: 0;
auto* row_ofs_plus1 = (uint16_t*)&s->u.rowdec.row[s->u.rowdec.nz_count];
uint32_t nz_pos, row_plus1, nz_count = s->u.rowdec.nz_count, last_row;
if (t < nrows) { row_ofs_plus1[t] = valid; }
lengths_to_positions<uint16_t>(row_ofs_plus1, nrows, t);
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ constexpr bool zero_pll_war = true;
struct byterle_enc_state_s {
uint32_t literal_run;
uint32_t repeat_run;
volatile uint32_t rpt_map[(512 / 32) + 1];
uint32_t rpt_map[(512 / 32) + 1];
};

struct intrle_enc_state_s {
Expand All @@ -63,7 +63,7 @@ struct intrle_enc_state_s {
uint32_t literal_w;
uint32_t hdr_bytes;
uint32_t pl_bytes;
volatile uint32_t delta_map[(512 / 32) + 1];
uint32_t delta_map[(512 / 32) + 1];
};

struct strdata_enc_state_s {
Expand Down Expand Up @@ -366,7 +366,7 @@ static __device__ uint32_t IntegerRLE(
using block_reduce = cub::BlockReduce<T, block_size>;
uint8_t* dst = s->stream.data_ptrs[cid] + s->strm_pos[cid];
uint32_t out_cnt = 0;
__shared__ volatile uint64_t block_vmin;
__shared__ uint64_t block_vmin;

while (numvals > 0) {
T v0 = (t < numvals) ? inbuf[(inpos + t) & inmask] : 0;
Expand Down Expand Up @@ -615,7 +615,7 @@ static __device__ void StoreStringData(uint8_t* dst,
* @param[in] t thread id
*/
template <class T>
inline __device__ void lengths_to_positions(volatile T* vals, uint32_t numvals, unsigned int t)
inline __device__ void lengths_to_positions(T* vals, uint32_t numvals, unsigned int t)
{
for (uint32_t n = 1; n < numvals; n <<= 1) {
__syncthreads();
Expand Down Expand Up @@ -1143,7 +1143,7 @@ __global__ void __launch_bounds__(256)
uint32_t comp_block_align)
{
__shared__ __align__(16) StripeStream ss;
__shared__ uint8_t* volatile uncomp_base_g;
__shared__ uint8_t* uncomp_base_g;

auto const padded_block_header_size = util::round_up_unsafe(block_header_size, comp_block_align);
auto const padded_comp_block_size = util::round_up_unsafe(max_comp_blk_size, comp_block_align);
Expand Down Expand Up @@ -1196,8 +1196,8 @@ __global__ void __launch_bounds__(1024)
uint32_t max_comp_blk_size)
{
__shared__ __align__(16) StripeStream ss;
__shared__ uint8_t const* volatile comp_src_g;
__shared__ uint32_t volatile comp_len_g;
__shared__ uint8_t const* comp_src_g;
__shared__ uint32_t comp_len_g;

auto const stripe_id = blockIdx.x;
auto const stream_id = blockIdx.y;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ __global__ void __launch_bounds__(128, 8) gpuParseRowGroupIndex(RowGroup* row_gr
: row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row;
for (int j = t4; j < rowgroup_size4; j += 4) {
((uint32_t*)&row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x])[j] =
((volatile uint32_t*)&s->rowgroups[i])[j];
((uint32_t*)&s->rowgroups[i])[j];
}
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows = num_rows;
// Updating in case of struct
Expand Down

0 comments on commit 29556a2

Please sign in to comment.