From 81dcfa6b56032626ed465627113f756f2c5de3a5 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 17 Dec 2024 16:55:14 -0800 Subject: [PATCH 01/23] random clean up --- cpp/src/io/comp/debrotli.cu | 1 - cpp/src/io/comp/gpuinflate.cu | 1 - cpp/src/io/comp/gpuinflate.hpp | 1 - cpp/src/io/comp/snap.cu | 1 - cpp/src/io/comp/unsnap.cu | 7 ------- cpp/src/io/orc/reader_impl_decode.cu | 24 ++++++++++-------------- 6 files changed, 10 insertions(+), 25 deletions(-) diff --git a/cpp/src/io/comp/debrotli.cu b/cpp/src/io/comp/debrotli.cu index 72649dbe427..c595fa56ed4 100644 --- a/cpp/src/io/comp/debrotli.cu +++ b/cpp/src/io/comp/debrotli.cu @@ -2020,7 +2020,6 @@ CUDF_KERNEL void __launch_bounds__(block_size, 2) results[block_id].status = (s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE; // Return ext heap used by last block (statistics) - results[block_id].reserved = s->fb_size; } } diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index 090ea1430b5..07c52c9d433 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -1139,7 +1139,6 @@ CUDF_KERNEL void __launch_bounds__(block_size) default: return compression_status::FAILURE; } }(); - results[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes } } diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 8bfca2b30df..132acaba36f 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -43,7 +43,6 @@ enum class compression_status : uint8_t { struct compression_result { uint64_t bytes_written; compression_status status; - uint32_t reserved; }; enum class gzip_header_included { NO, YES }; diff --git a/cpp/src/io/comp/snap.cu b/cpp/src/io/comp/snap.cu index 7d4dcffa713..daa7fe76bc7 100644 --- a/cpp/src/io/comp/snap.cu +++ b/cpp/src/io/comp/snap.cu @@ -329,7 +329,6 @@ CUDF_KERNEL void __launch_bounds__(128) results[blockIdx.x].bytes_written = s->dst - s->dst_base; results[blockIdx.x].status = (s->dst > s->end) ? compression_status::FAILURE : compression_status::SUCCESS; - results[blockIdx.x].reserved = 0; } } diff --git a/cpp/src/io/comp/unsnap.cu b/cpp/src/io/comp/unsnap.cu index 9b01272ac70..099b4ba905b 100644 --- a/cpp/src/io/comp/unsnap.cu +++ b/cpp/src/io/comp/unsnap.cu @@ -26,7 +26,6 @@ namespace io { constexpr int32_t batch_size = (1 << 5); constexpr int32_t batch_count = (1 << 2); constexpr int32_t prefetch_size = (1 << 9); // 512B, in 32B chunks -constexpr bool log_cyclecount = false; void __device__ busy_wait(size_t cycles) { @@ -647,7 +646,6 @@ CUDF_KERNEL void __launch_bounds__(block_size) auto cur = s->src.begin(); auto const end = s->src.end(); s->error = 0; - if (log_cyclecount) { s->tstart = clock(); } if (cur < end) { // Read uncompressed size (varint), limited to 32-bit uint32_t uncompressed_size = *cur++; @@ -705,11 +703,6 @@ CUDF_KERNEL void __launch_bounds__(block_size) results[strm_id].bytes_written = s->uncompressed_size - s->bytes_left; results[strm_id].status = (s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE; - if (log_cyclecount) { - results[strm_id].reserved = clock() - s->tstart; - } else { - results[strm_id].reserved = 0; - } } } diff --git a/cpp/src/io/orc/reader_impl_decode.cu b/cpp/src/io/orc/reader_impl_decode.cu index 0081ed30d17..c14889f4256 100644 --- a/cpp/src/io/orc/reader_impl_decode.cu +++ b/cpp/src/io/orc/reader_impl_decode.cu @@ -192,18 +192,14 @@ rmm::device_buffer decompress_stripe_data( // Dispatch batches of blocks to decompress if (num_compressed_blocks > 0) { - device_span> inflate_in_view{inflate_in.data(), - num_compressed_blocks}; - device_span> inflate_out_view{inflate_out.data(), num_compressed_blocks}; switch (decompressor.compression()) { case compression_type::ZLIB: if (nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE)) { - gpuinflate( - inflate_in_view, inflate_out_view, inflate_res, gzip_header_included::NO, stream); + gpuinflate(inflate_in, inflate_out, inflate_res, gzip_header_included::NO, stream); } else { nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE, - inflate_in_view, - inflate_out_view, + inflate_in, + inflate_out, inflate_res, max_uncomp_block_size, total_decomp_size, @@ -212,11 +208,11 @@ rmm::device_buffer decompress_stripe_data( break; case compression_type::SNAPPY: if (nvcomp::is_decompression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_unsnap(inflate_in_view, inflate_out_view, inflate_res, stream); + gpu_unsnap(inflate_in, inflate_out, inflate_res, stream); } else { nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, - inflate_in_view, - inflate_out_view, + inflate_in, + inflate_out, inflate_res, max_uncomp_block_size, total_decomp_size, @@ -229,8 +225,8 @@ rmm::device_buffer decompress_stripe_data( CUDF_FAIL("Decompression error: " + reason.value()); } nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, - inflate_in_view, - inflate_out_view, + inflate_in, + inflate_out, inflate_res, max_uncomp_block_size, total_decomp_size, @@ -242,8 +238,8 @@ rmm::device_buffer decompress_stripe_data( CUDF_FAIL("Decompression error: " + reason.value()); } nvcomp::batched_decompress(nvcomp::compression_type::LZ4, - inflate_in_view, - inflate_out_view, + inflate_in, + inflate_out, inflate_res, max_uncomp_block_size, total_decomp_size, From 4f7794d3a67bb6793009a1d53a09799a4169ee9f Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 17 Dec 2024 22:49:41 -0800 Subject: [PATCH 02/23] jesus --- cpp/include/cudf/io/nvcomp_adapter.hpp | 4 +- cpp/src/io/comp/common.hpp | 37 +++++++++++++++ cpp/src/io/comp/comp.cpp | 5 +- cpp/src/io/comp/comp.hpp | 22 ++++++++- cpp/src/io/comp/debrotli.cu | 7 ++- cpp/src/io/comp/gpuinflate.cu | 6 +-- cpp/src/io/comp/gpuinflate.hpp | 40 ++-------------- cpp/src/io/comp/io_uncomp.hpp | 6 +-- cpp/src/io/comp/nvcomp_adapter.cpp | 4 +- cpp/src/io/comp/nvcomp_adapter.cu | 4 +- cpp/src/io/comp/nvcomp_adapter.cuh | 6 +-- cpp/src/io/comp/nvcomp_adapter.hpp | 6 +-- cpp/src/io/comp/snap.cu | 6 +-- cpp/src/io/comp/statistics.cu | 4 +- cpp/src/io/comp/unsnap.cu | 6 +-- cpp/src/io/orc/orc_gpu.hpp | 20 ++++---- cpp/src/io/orc/reader_impl_decode.cu | 2 +- cpp/src/io/orc/stripe_enc.cu | 9 +++- cpp/src/io/orc/writer_impl.cu | 6 ++- cpp/src/io/parquet/page_enc.cu | 3 ++ cpp/src/io/parquet/parquet_gpu.hpp | 18 +++---- cpp/src/io/parquet/reader_impl_chunking.cu | 50 ++++++++++++-------- cpp/src/io/parquet/reader_impl_preprocess.cu | 8 ++-- cpp/src/io/parquet/writer_impl.cu | 1 + cpp/src/io/parquet/writer_impl_helpers.cpp | 2 + cpp/src/io/parquet/writer_impl_helpers.hpp | 4 +- cpp/src/io/text/bgzip_data_chunk_source.cu | 24 ++++++---- cpp/tests/io/comp/decomp_test.cpp | 47 +++++++++--------- cpp/tests/io/orc_test.cpp | 8 ++-- 29 files changed, 205 insertions(+), 160 deletions(-) create mode 100644 cpp/src/io/comp/common.hpp diff --git a/cpp/include/cudf/io/nvcomp_adapter.hpp b/cpp/include/cudf/io/nvcomp_adapter.hpp index 0d74a4158ad..4ad760d278f 100644 --- a/cpp/include/cudf/io/nvcomp_adapter.hpp +++ b/cpp/include/cudf/io/nvcomp_adapter.hpp @@ -22,7 +22,7 @@ #include namespace CUDF_EXPORT cudf { -namespace io::nvcomp { +namespace io::detail::nvcomp { enum class compression_type { SNAPPY, ZSTD, DEFLATE, LZ4, GZIP }; @@ -88,5 +88,5 @@ inline bool operator==(feature_status_parameters const& lhs, feature_status_para [[nodiscard]] std::optional is_decompression_disabled( compression_type compression, feature_status_parameters params = feature_status_parameters()); -} // namespace io::nvcomp +} // namespace io::detail::nvcomp } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/common.hpp b/cpp/src/io/comp/common.hpp new file mode 100644 index 00000000000..c4c7512b7ac --- /dev/null +++ b/cpp/src/io/comp/common.hpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace cudf::io::detail { + +/** + * @brief The value used for padding a data buffer such that its size will be multiple of it. + * + * Padding is necessary for input/output buffers of several compression/decompression kernels + * (inflate_kernel and nvcomp snappy). Such kernels operate on aligned data pointers, which require + * padding to the buffers so that the pointers can shift along the address space to satisfy their + * alignment requirement. + * + * In the meantime, it is not entirely clear why such padding is needed. We need to further + * investigate and implement a better fix rather than just padding the buffer. + * See https://github.com/rapidsai/cudf/issues/13605. + */ +constexpr std::size_t BUFFER_PADDING_MULTIPLE{8}; + +} // namespace cudf::io::detail \ No newline at end of file diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 2dda2287e09..26535bed43b 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -87,15 +87,14 @@ std::vector compress_snappy(host_span src, outputs[0] = d_dst; outputs.host_to_device_async(stream); - cudf::detail::hostdevice_vector hd_status(1, stream); + cudf::detail::hostdevice_vector hd_status(1, stream); hd_status[0] = {}; hd_status.host_to_device_async(stream); nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream); hd_status.device_to_host_sync(stream); - CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS, - "snappy compression failed"); + CUDF_EXPECTS(hd_status[0].status == compression_status::SUCCESS, "snappy compression failed"); return cudf::detail::make_std_vector_sync(d_dst, stream); } diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index 652abbbeda6..e16f26e1f06 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -16,16 +16,34 @@ #pragma once +#include "common.hpp" + #include #include -#include -#include #include namespace CUDF_EXPORT cudf { namespace io::detail { +/** + * @brief Status of a compression/decompression operation. + */ +enum class compression_status : uint8_t { + SUCCESS, ///< Successful, output is valid + FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way) + SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used) + OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output +}; + +/** + * @brief Descriptor of compression/decompression result. + */ +struct compression_result { + uint64_t bytes_written; + compression_status status; +}; + /** * @brief Compresses a system memory buffer. * diff --git a/cpp/src/io/comp/debrotli.cu b/cpp/src/io/comp/debrotli.cu index c595fa56ed4..151f72d262e 100644 --- a/cpp/src/io/comp/debrotli.cu +++ b/cpp/src/io/comp/debrotli.cu @@ -63,8 +63,8 @@ THE SOFTWARE. #include -namespace cudf { -namespace io { +namespace cudf::io::detail { + constexpr uint32_t huffman_lookup_table_width = 8; constexpr int8_t brotli_code_length_codes = 18; constexpr uint32_t brotli_num_distance_short_codes = 16; @@ -2114,5 +2114,4 @@ void gpu_debrotli(device_span const> inputs, #endif } -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index 07c52c9d433..6e5ce4ce6c3 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -49,8 +49,7 @@ Mark Adler madler@alumni.caltech.edu #include -namespace cudf { -namespace io { +namespace cudf::io::detail { constexpr int max_bits = 15; // maximum bits in a code constexpr int max_l_codes = 286; // maximum number of literal/length codes @@ -1223,5 +1222,4 @@ void gpu_copy_uncompressed_blocks(device_span const> } } -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 132acaba36f..4b09bd5a84c 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -16,6 +16,8 @@ #pragma once +#include "io/comp/comp.hpp" + #include #include #include @@ -24,43 +26,10 @@ #include -namespace cudf { -namespace io { - -/** - * @brief Status of a compression/decompression operation. - */ -enum class compression_status : uint8_t { - SUCCESS, ///< Successful, output is valid - FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way) - SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used) - OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output -}; - -/** - * @brief Descriptor of compression/decompression result. - */ -struct compression_result { - uint64_t bytes_written; - compression_status status; -}; +namespace cudf::io::detail { enum class gzip_header_included { NO, YES }; -/** - * @brief The value used for padding a data buffer such that its size will be multiple of it. - * - * Padding is necessary for input/output buffers of several compression/decompression kernels - * (inflate_kernel and nvcomp snappy). Such kernels operate on aligned data pointers, which require - * padding to the buffers so that the pointers can shift along the address space to satisfy their - * alignment requirement. - * - * In the meantime, it is not entirely clear why such padding is needed. We need to further - * investigate and implement a better fix rather than just padding the buffer. - * See https://github.com/rapidsai/cudf/issues/13605. - */ -constexpr std::size_t BUFFER_PADDING_MULTIPLE{8}; - /** * @brief Interface for decompressing GZIP-compressed data * @@ -168,5 +137,4 @@ void gpu_snap(device_span const> inputs, device_span results, rmm::cuda_stream_view stream); -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index ca722a9b7ee..711a1c3274f 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -16,15 +16,13 @@ #pragma once +#include "common.hpp" + #include #include -#include -#include #include -using cudf::host_span; - namespace CUDF_EXPORT cudf { namespace io::detail { diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index d45c02f374f..3a4e315348c 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -30,7 +30,7 @@ #include -namespace cudf::io::nvcomp { +namespace cudf::io::detail::nvcomp { namespace { // Dispatcher for nvcompBatchedDecompressGetTempSizeEx @@ -478,4 +478,4 @@ std::optional compress_max_allowed_chunk_size(compression_type compressi } } -} // namespace cudf::io::nvcomp +} // namespace cudf::io::detail::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.cu b/cpp/src/io/comp/nvcomp_adapter.cu index 794d452ebf2..39e8de2bb85 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cu +++ b/cpp/src/io/comp/nvcomp_adapter.cu @@ -23,7 +23,7 @@ #include #include -namespace cudf::io::nvcomp { +namespace cudf::io::detail::nvcomp { batched_args create_batched_nvcomp_args(device_span const> inputs, device_span const> outputs, @@ -127,4 +127,4 @@ std::pair max_chunk_and_total_input_size(device_span @@ -27,7 +27,7 @@ #include -namespace cudf::io::nvcomp { +namespace cudf::io::detail::nvcomp { struct batched_args { rmm::device_uvector input_data_ptrs; @@ -76,4 +76,4 @@ void skip_unsupported_inputs(device_span input_sizes, std::pair max_chunk_and_total_input_size(device_span input_sizes, rmm::cuda_stream_view stream); -} // namespace cudf::io::nvcomp +} // namespace cudf::io::detail::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.hpp b/cpp/src/io/comp/nvcomp_adapter.hpp index 2e1cda2d6b7..5c402523168 100644 --- a/cpp/src/io/comp/nvcomp_adapter.hpp +++ b/cpp/src/io/comp/nvcomp_adapter.hpp @@ -16,7 +16,7 @@ #pragma once -#include "gpuinflate.hpp" +#include "io/comp/comp.hpp" #include #include @@ -25,7 +25,7 @@ #include -namespace cudf::io::nvcomp { +namespace cudf::io::detail::nvcomp { /** * @brief Device batch decompression of given type. * @@ -103,4 +103,4 @@ void batched_compress(compression_type compression, device_span results, rmm::cuda_stream_view stream); -} // namespace cudf::io::nvcomp +} // namespace cudf::io::detail::nvcomp diff --git a/cpp/src/io/comp/snap.cu b/cpp/src/io/comp/snap.cu index daa7fe76bc7..1443bfd38a2 100644 --- a/cpp/src/io/comp/snap.cu +++ b/cpp/src/io/comp/snap.cu @@ -19,8 +19,7 @@ #include -namespace cudf { -namespace io { +namespace cudf::io::detail { constexpr int hash_bits = 12; // TBD: Tentatively limits to 2-byte codes to prevent long copy search followed by long literal @@ -344,5 +343,4 @@ void gpu_snap(device_span const> inputs, } } -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/statistics.cu index faf967041bc..caee9145d2c 100644 --- a/cpp/src/io/comp/statistics.cu +++ b/cpp/src/io/comp/statistics.cu @@ -21,7 +21,7 @@ #include #include -namespace cudf::io { +namespace cudf::io::detail { writer_compression_statistics collect_compression_statistics( device_span const> inputs, @@ -61,4 +61,4 @@ writer_compression_statistics collect_compression_statistics( output_size_successful}; } -} // namespace cudf::io +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/unsnap.cu b/cpp/src/io/comp/unsnap.cu index 099b4ba905b..cf841c435a3 100644 --- a/cpp/src/io/comp/unsnap.cu +++ b/cpp/src/io/comp/unsnap.cu @@ -21,8 +21,7 @@ #include -namespace cudf { -namespace io { +namespace cudf::io::detail { constexpr int32_t batch_size = (1 << 5); constexpr int32_t batch_count = (1 << 2); constexpr int32_t prefetch_size = (1 << 9); // 512B, in 32B chunks @@ -717,5 +716,4 @@ void gpu_unsnap(device_span const> inputs, unsnap_kernel<128><<>>(inputs, outputs, results); } -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 0949fafe9a4..daff429c087 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -16,7 +16,7 @@ #pragma once -#include "io/comp/gpuinflate.hpp" +#include "io/comp/comp.hpp" #include "io/statistics/statistics.cuh" #include "io/utilities/column_buffer.hpp" #include "orc.hpp" @@ -73,14 +73,14 @@ struct CompressedStreamInfo { uint8_t const* compressed_data{}; // [in] base ptr to compressed stream data uint8_t* uncompressed_data{}; // [in] base ptr to uncompressed stream data or NULL if not known yet - size_t compressed_data_size{}; // [in] compressed data size for this stream - device_span* dec_in_ctl{}; // [in] input buffer to decompress - device_span* dec_out_ctl{}; // [in] output buffer to decompress into - device_span dec_res{}; // [in] results of decompression - device_span* copy_in_ctl{}; // [out] input buffer to copy - device_span* copy_out_ctl{}; // [out] output buffer to copy to - uint32_t num_compressed_blocks{}; // [in,out] number of entries in decctl(in), number of - // compressed blocks(out) + size_t compressed_data_size{}; // [in] compressed data size for this stream + device_span* dec_in_ctl{}; // [in] input buffer to decompress + device_span* dec_out_ctl{}; // [in] output buffer to decompress into + device_span dec_res{}; // [in] results of decompression + device_span* copy_in_ctl{}; // [out] input buffer to copy + device_span* copy_out_ctl{}; // [out] output buffer to copy to + uint32_t num_compressed_blocks{}; // [in,out] number of entries in decctl(in), number of + // compressed blocks(out) uint32_t num_uncompressed_blocks{}; // [in,out] number of entries in dec_in_ctl(in), number of // uncompressed blocks(out) uint64_t max_uncompressed_size{}; // [out] maximum uncompressed data size of stream @@ -414,7 +414,7 @@ std::optional CompressOrcDataStreams( bool collect_statistics, device_2dspan strm_desc, device_2dspan enc_streams, - device_span comp_res, + device_span comp_res, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/orc/reader_impl_decode.cu b/cpp/src/io/orc/reader_impl_decode.cu index c14889f4256..accc7861ec0 100644 --- a/cpp/src/io/orc/reader_impl_decode.cu +++ b/cpp/src/io/orc/reader_impl_decode.cu @@ -265,7 +265,7 @@ rmm::device_buffer decompress_stripe_data( num_uncompressed_blocks}; device_span> copy_out_view{inflate_out.data() + num_compressed_blocks, num_uncompressed_blocks}; - gpu_copy_uncompressed_blocks(copy_in_view, copy_out_view, stream); + cudf::io::detail::gpu_copy_uncompressed_blocks(copy_in_view, copy_out_view, stream); } // Copy without stream sync, thus need to wait for stream sync below to access. diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 07172b6b7f7..79ecca0ca99 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "io/comp/gpuinflate.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/block_utils.cuh" #include "io/utilities/time_utils.cuh" @@ -44,7 +45,11 @@ namespace io { namespace orc { namespace gpu { +namespace nvcomp = cudf::io::detail::nvcomp; + using cudf::detail::device_2dspan; +using cudf::io::detail::compression_result; +using cudf::io::detail::compression_status; constexpr int scratch_buffer_size = 512 * 4; constexpr int compact_streams_block_size = 1024; @@ -1385,7 +1390,7 @@ std::optional CompressOrcDataStreams( if (compression == SNAPPY) { try { if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_snap(comp_in, comp_out, comp_res, stream); + cudf::io::detail::gpu_snap(comp_in, comp_out, comp_res, stream); } else { nvcomp::batched_compress( nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); @@ -1429,7 +1434,7 @@ std::optional CompressOrcDataStreams( strm_desc, comp_in, comp_out, comp_res, compressed_data, comp_blk_size, max_comp_blk_size); if (collect_statistics) { - return cudf::io::collect_compression_statistics(comp_in, comp_res, stream); + return cudf::io::detail::collect_compression_statistics(comp_in, comp_res, stream); } else { return std::nullopt; } diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 6b9c19368dc..ce868b83c04 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -71,6 +71,8 @@ namespace cudf::io::orc::detail { +namespace nvcomp = cudf::io::detail::nvcomp; + template [[nodiscard]] constexpr int varint_size(T val) { @@ -2023,8 +2025,8 @@ size_t max_compression_output_size(CompressionKind compression_kind, uint32_t co { if (compression_kind == NONE) return 0; - return compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), - compression_blocksize); + return nvcomp::compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), + compression_blocksize); } std::unique_ptr make_table_meta(table_view const& input) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index e9558735929..a1edd21f8a2 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -51,6 +51,9 @@ namespace { using ::cudf::detail::device_2dspan; +using cudf::io::detail::compression_result; +using cudf::io::detail::compression_status; + constexpr int encode_block_size = 128; constexpr int rle_buffer_size = 2 * encode_block_size; constexpr int num_encode_warps = encode_block_size / cudf::detail::warp_size; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index ce9d48693ec..b2563ab5065 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -17,7 +17,7 @@ #pragma once #include "error.hpp" -#include "io/comp/gpuinflate.hpp" +#include "io/comp/comp.hpp" #include "io/parquet/parquet.hpp" #include "io/parquet/parquet_common.hpp" #include "io/statistics/statistics.cuh" @@ -599,12 +599,12 @@ struct EncColumnChunk { */ struct EncPage { // all pointers at the top to keep things properly aligned - uint8_t* page_data; //!< Ptr to uncompressed page - uint8_t* compressed_data; //!< Ptr to compressed page - EncColumnChunk* chunk; //!< Chunk that this page belongs to - compression_result* comp_res; //!< Ptr to compression result - uint32_t* def_histogram; //!< Histogram of counts for each definition level - uint32_t* rep_histogram; //!< Histogram of counts for each repetition level + uint8_t* page_data; //!< Ptr to uncompressed page + uint8_t* compressed_data; //!< Ptr to compressed page + EncColumnChunk* chunk; //!< Chunk that this page belongs to + cudf::io::detail::compression_result* comp_res; //!< Ptr to compression result + uint32_t* def_histogram; //!< Histogram of counts for each definition level + uint32_t* rep_histogram; //!< Histogram of counts for each repetition level // put this here in case it's ever made 64-bit encode_kernel_mask kernel_mask; //!< Mask used to control which encoding kernels to run // the rest can be 4 byte aligned @@ -1023,7 +1023,7 @@ void EncodePages(device_span pages, bool write_v2_headers, device_span> comp_in, device_span> comp_out, - device_span comp_res, + device_span comp_res, rmm::cuda_stream_view stream); /** @@ -1046,7 +1046,7 @@ void DecideCompression(device_span chunks, rmm::cuda_stream_view * @param[in] stream CUDA stream to use */ void EncodePageHeaders(device_span pages, - device_span comp_res, + device_span comp_res, device_span page_stats, statistics_chunk const* chunk_stats, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 27312a4da89..933be889b1a 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -15,6 +15,8 @@ */ #include "compact_protocol_reader.hpp" +#include "io/comp/comp.hpp" +#include "io/comp/gpuinflate.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/time_utils.cuh" #include "reader_impl.hpp" @@ -44,6 +46,10 @@ namespace cudf::io::parquet::detail { namespace { +namespace nvcomp = cudf::io::detail::nvcomp; +using cudf::io::detail::compression_result; +using cudf::io::detail::compression_status; + struct split_info { row_range rows; int64_t split_pos; @@ -795,14 +801,16 @@ std::vector compute_page_splits_by_row(device_span 0) { - debrotli_scratch.resize(get_gpu_debrotli_scratch_size(codec.num_pages), stream); + debrotli_scratch.resize(cudf::io::detail::get_gpu_debrotli_scratch_size(codec.num_pages), + stream); } } // Dispatch batches of pages to decompress for each codec. // Buffer needs to be padded, required by `gpuDecodePageData`. rmm::device_buffer decomp_pages( - cudf::util::round_up_safe(total_decomp_size, BUFFER_PADDING_MULTIPLE), stream); + cudf::util::round_up_safe(total_decomp_size, cudf::io::detail::BUFFER_PADDING_MULTIPLE), + stream); auto comp_in = cudf::detail::make_empty_host_vector>(num_comp_pages, stream); @@ -874,8 +882,11 @@ std::vector compute_page_splits_by_row(device_span compute_page_splits_by_row(device_span @@ -251,8 +252,8 @@ void generate_depth_remappings( if (source->is_device_read_preferred(io_size)) { // Buffer needs to be padded. // Required by `gpuDecodePageData`. - page_data[chunk] = - rmm::device_buffer(cudf::util::round_up_safe(io_size, BUFFER_PADDING_MULTIPLE), stream); + page_data[chunk] = rmm::device_buffer( + cudf::util::round_up_safe(io_size, cudf::io::detail::BUFFER_PADDING_MULTIPLE), stream); auto fut_read_size = source->device_read_async( io_offset, io_size, static_cast(page_data[chunk].data()), stream); read_tasks.emplace_back(std::move(fut_read_size)); @@ -261,7 +262,8 @@ void generate_depth_remappings( // Buffer needs to be padded. // Required by `gpuDecodePageData`. page_data[chunk] = rmm::device_buffer( - cudf::util::round_up_safe(read_buffer->size(), BUFFER_PADDING_MULTIPLE), stream); + cudf::util::round_up_safe(read_buffer->size(), cudf::io::detail::BUFFER_PADDING_MULTIPLE), + stream); CUDF_CUDA_TRY(cudaMemcpyAsync(page_data[chunk].data(), read_buffer->data(), read_buffer->size(), diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 188e6a8c0d8..2eb9c49fd88 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -23,6 +23,7 @@ #include "compact_protocol_reader.hpp" #include "compact_protocol_writer.hpp" #include "interop/decimal_conversion_utilities.cuh" +#include "io/comp/gpuinflate.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/parquet/parquet.hpp" #include "io/parquet/parquet_gpu.hpp" diff --git a/cpp/src/io/parquet/writer_impl_helpers.cpp b/cpp/src/io/parquet/writer_impl_helpers.cpp index 396d44c0763..f15ea1f3c37 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.cpp +++ b/cpp/src/io/parquet/writer_impl_helpers.cpp @@ -21,6 +21,8 @@ #include "writer_impl_helpers.hpp" +#include "io/comp/nvcomp_adapter.hpp" + #include #include #include diff --git a/cpp/src/io/parquet/writer_impl_helpers.hpp b/cpp/src/io/parquet/writer_impl_helpers.hpp index a85411594e9..14a9a0ed5b7 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.hpp +++ b/cpp/src/io/parquet/writer_impl_helpers.hpp @@ -20,11 +20,11 @@ */ #pragma once -#include "io/comp/nvcomp_adapter.hpp" #include "parquet_common.hpp" #include #include +#include namespace cudf::io::parquet::detail { @@ -42,7 +42,7 @@ Compression to_parquet_compression(compression_type compression); * @param codec Compression codec * @return Translated nvcomp compression type */ -nvcomp::compression_type to_nvcomp_compression_type(Compression codec); +cudf::io::detail::nvcomp::compression_type to_nvcomp_compression_type(Compression codec); /** * @brief Function that computes input alignment requirements for the given compression type. diff --git a/cpp/src/io/text/bgzip_data_chunk_source.cu b/cpp/src/io/text/bgzip_data_chunk_source.cu index 06069630685..162da62ef03 100644 --- a/cpp/src/io/text/bgzip_data_chunk_source.cu +++ b/cpp/src/io/text/bgzip_data_chunk_source.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "io/comp/gpuinflate.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/text/device_data_chunks.hpp" @@ -41,6 +42,8 @@ namespace cudf::io::text { namespace { +namespace nvcomp = cudf::io::detail::nvcomp; + /** * @brief Transforms offset tuples of the form [compressed_begin, compressed_end, * decompressed_begin, decompressed_end] into span tuples of the form [compressed_device_span, @@ -73,7 +76,8 @@ class bgzip_data_chunk_reader : public data_chunk_reader { { // Buffer needs to be padded. // Required by `inflate_kernel`. - device.resize(cudf::util::round_up_safe(host.size(), BUFFER_PADDING_MULTIPLE), stream); + device.resize(cudf::util::round_up_safe(host.size(), cudf::io::detail::BUFFER_PADDING_MULTIPLE), + stream); cudf::detail::cuda_memcpy_async( device_span{device}.subspan(0, host.size()), host, stream); } @@ -94,7 +98,7 @@ class bgzip_data_chunk_reader : public data_chunk_reader { rmm::device_uvector d_decompressed_offsets; rmm::device_uvector> d_compressed_spans; rmm::device_uvector> d_decompressed_spans; - rmm::device_uvector d_decompression_results; + rmm::device_uvector d_decompression_results; std::size_t compressed_size_with_headers{}; std::size_t max_decompressed_size{}; // this is usually equal to decompressed_size() @@ -152,16 +156,16 @@ class bgzip_data_chunk_reader : public data_chunk_reader { gpuinflate(d_compressed_spans, d_decompressed_spans, d_decompression_results, - gzip_header_included::NO, + cudf::io::detail::gzip_header_included::NO, stream); } else { - cudf::io::nvcomp::batched_decompress(cudf::io::nvcomp::compression_type::DEFLATE, - d_compressed_spans, - d_decompressed_spans, - d_decompression_results, - max_decompressed_size, - decompressed_size(), - stream); + nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE, + d_compressed_spans, + d_decompressed_spans, + d_decompression_results, + max_decompressed_size, + decompressed_size(), + stream); } } is_decompressed = true; diff --git a/cpp/tests/io/comp/decomp_test.cpp b/cpp/tests/io/comp/decomp_test.cpp index 54262dc3b44..5bbe8b63c47 100644 --- a/cpp/tests/io/comp/decomp_test.cpp +++ b/cpp/tests/io/comp/decomp_test.cpp @@ -30,6 +30,9 @@ #include using cudf::device_span; +using cudf::io::detail::compression_result; +using cudf::io::detail::compression_status; +namespace nvcomp = cudf::io::detail::nvcomp; /** * @brief Base test fixture for decompression @@ -61,7 +64,7 @@ struct DecompressTest : public cudf::test::BaseFixture { inf_out[0] = dst; inf_out.host_to_device_async(stream); - cudf::detail::hostdevice_vector inf_stat(1, stream); + cudf::detail::hostdevice_vector inf_stat(1, stream); inf_stat[0] = {}; inf_stat.host_to_device_async(stream); @@ -69,7 +72,7 @@ struct DecompressTest : public cudf::test::BaseFixture { CUDF_CUDA_TRY(cudaMemcpyAsync( decompressed.data(), dst.data(), dst.size(), cudaMemcpyDefault, stream.value())); inf_stat.device_to_host_sync(stream); - ASSERT_EQ(inf_stat[0].status, cudf::io::compression_status::SUCCESS); + ASSERT_EQ(inf_stat[0].status, compression_status::SUCCESS); } }; @@ -79,13 +82,13 @@ struct DecompressTest : public cudf::test::BaseFixture { struct GzipDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { - cudf::io::gpuinflate(d_inf_in, - d_inf_out, - d_inf_stat, - cudf::io::gzip_header_included::YES, - cudf::get_default_stream()); + cudf::io::detail::gpuinflate(d_inf_in, + d_inf_out, + d_inf_stat, + cudf::io::detail::gzip_header_included::YES, + cudf::get_default_stream()); } }; @@ -95,9 +98,9 @@ struct GzipDecompressTest : public DecompressTest { struct SnappyDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { - cudf::io::gpu_unsnap(d_inf_in, d_inf_out, d_inf_stat, cudf::get_default_stream()); + cudf::io::detail::gpu_unsnap(d_inf_in, d_inf_out, d_inf_stat, cudf::get_default_stream()); } }; @@ -107,17 +110,17 @@ struct SnappyDecompressTest : public DecompressTest { struct BrotliDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { - rmm::device_buffer d_scratch{cudf::io::get_gpu_debrotli_scratch_size(1), + rmm::device_buffer d_scratch{cudf::io::detail::get_gpu_debrotli_scratch_size(1), cudf::get_default_stream()}; - cudf::io::gpu_debrotli(d_inf_in, - d_inf_out, - d_inf_stat, - d_scratch.data(), - d_scratch.size(), - cudf::get_default_stream()); + cudf::io::detail::gpu_debrotli(d_inf_in, + d_inf_out, + d_inf_stat, + d_scratch.data(), + d_scratch.size(), + cudf::get_default_stream()); } }; @@ -181,8 +184,8 @@ TEST_F(BrotliDecompressTest, HelloWorld) TEST_F(NvcompConfigTest, Compression) { - using cudf::io::nvcomp::compression_type; - auto const& comp_disabled = cudf::io::nvcomp::is_compression_disabled; + using nvcomp::compression_type; + auto const& comp_disabled = nvcomp::is_compression_disabled; EXPECT_FALSE(comp_disabled(compression_type::DEFLATE, {true, true})); // all integrations enabled required @@ -201,8 +204,8 @@ TEST_F(NvcompConfigTest, Compression) TEST_F(NvcompConfigTest, Decompression) { - using cudf::io::nvcomp::compression_type; - auto const& decomp_disabled = cudf::io::nvcomp::is_decompression_disabled; + using nvcomp::compression_type; + auto const& decomp_disabled = nvcomp::is_decompression_disabled; EXPECT_FALSE(decomp_disabled(compression_type::DEFLATE, {true, true})); // all integrations enabled required diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index fce99187516..2209a30149d 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -40,6 +40,8 @@ #include #include +namespace nvcomp = cudf::io::detail::nvcomp; + template using column_wrapper = std::conditional_t, @@ -1135,7 +1137,7 @@ TEST_F(OrcReaderTest, SingleInputs) TEST_F(OrcReaderTest, zstdCompressionRegression) { - if (cudf::io::nvcomp::is_decompression_disabled(cudf::io::nvcomp::compression_type::ZSTD)) { + if (nvcomp::is_decompression_disabled(nvcomp::compression_type::ZSTD)) { GTEST_SKIP() << "Newer nvCOMP version is required"; } @@ -1700,8 +1702,8 @@ TEST_F(OrcMetadataReaderTest, TestNested) TEST_F(OrcReaderTest, ZstdMaxCompressionRate) { - if (cudf::io::nvcomp::is_decompression_disabled(cudf::io::nvcomp::compression_type::ZSTD) or - cudf::io::nvcomp::is_compression_disabled(cudf::io::nvcomp::compression_type::ZSTD)) { + if (nvcomp::is_decompression_disabled(nvcomp::compression_type::ZSTD) or + nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD)) { GTEST_SKIP() << "Newer nvCOMP version is required"; } From b3f03e8181257a14ab05b8e920d6ac3bcd84dc54 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 17 Dec 2024 22:54:02 -0800 Subject: [PATCH 03/23] style --- cpp/src/io/comp/common.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/comp/common.hpp b/cpp/src/io/comp/common.hpp index c4c7512b7ac..72bb63e817e 100644 --- a/cpp/src/io/comp/common.hpp +++ b/cpp/src/io/comp/common.hpp @@ -34,4 +34,4 @@ namespace cudf::io::detail { */ constexpr std::size_t BUFFER_PADDING_MULTIPLE{8}; -} // namespace cudf::io::detail \ No newline at end of file +} // namespace cudf::io::detail From 53205c5857170014743ecddef9a30a3efee700bc Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 18 Dec 2024 11:24:53 -0800 Subject: [PATCH 04/23] style --- cpp/src/io/comp/nvcomp_adapter.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/comp/nvcomp_adapter.cu b/cpp/src/io/comp/nvcomp_adapter.cu index 39e8de2bb85..cf5996dfd93 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cu +++ b/cpp/src/io/comp/nvcomp_adapter.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 324d635ab41755659b0439496590b520aade18bc Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 18 Dec 2024 13:24:18 -0800 Subject: [PATCH 05/23] Update cpp/src/io/comp/common.hpp Co-authored-by: Bradley Dice --- cpp/src/io/comp/common.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/comp/common.hpp b/cpp/src/io/comp/common.hpp index 72bb63e817e..a81ac60e03a 100644 --- a/cpp/src/io/comp/common.hpp +++ b/cpp/src/io/comp/common.hpp @@ -21,7 +21,7 @@ namespace cudf::io::detail { /** - * @brief The value used for padding a data buffer such that its size will be multiple of it. + * @brief The size used for padding a data buffer's size to a multiple of the padding. * * Padding is necessary for input/output buffers of several compression/decompression kernels * (inflate_kernel and nvcomp snappy). Such kernels operate on aligned data pointers, which require From 947fbd4f79a70f456172f2b7e4af26b5fb9781f5 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 19 Dec 2024 17:33:39 -0800 Subject: [PATCH 06/23] fix --- cpp/src/io/orc/reader_impl_decode.cu | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/orc/reader_impl_decode.cu b/cpp/src/io/orc/reader_impl_decode.cu index accc7861ec0..be7254db51c 100644 --- a/cpp/src/io/orc/reader_impl_decode.cu +++ b/cpp/src/io/orc/reader_impl_decode.cu @@ -192,14 +192,17 @@ rmm::device_buffer decompress_stripe_data( // Dispatch batches of blocks to decompress if (num_compressed_blocks > 0) { + device_span> inflate_in_view{inflate_in.data(), + num_compressed_blocks}; + device_span> inflate_out_view{inflate_out.data(), num_compressed_blocks}; switch (decompressor.compression()) { case compression_type::ZLIB: if (nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE)) { - gpuinflate(inflate_in, inflate_out, inflate_res, gzip_header_included::NO, stream); + gpuinflate(inflate_in_view, inflate_out, inflate_res, gzip_header_included::NO, stream); } else { nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE, - inflate_in, - inflate_out, + inflate_in_view, + inflate_out_view, inflate_res, max_uncomp_block_size, total_decomp_size, @@ -208,11 +211,11 @@ rmm::device_buffer decompress_stripe_data( break; case compression_type::SNAPPY: if (nvcomp::is_decompression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_unsnap(inflate_in, inflate_out, inflate_res, stream); + gpu_unsnap(inflate_in_view, inflate_out_view, inflate_res, stream); } else { nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, - inflate_in, - inflate_out, + inflate_in_view, + inflate_out_view, inflate_res, max_uncomp_block_size, total_decomp_size, @@ -225,8 +228,8 @@ rmm::device_buffer decompress_stripe_data( CUDF_FAIL("Decompression error: " + reason.value()); } nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, - inflate_in, - inflate_out, + inflate_in_view, + inflate_out_view, inflate_res, max_uncomp_block_size, total_decomp_size, @@ -238,8 +241,8 @@ rmm::device_buffer decompress_stripe_data( CUDF_FAIL("Decompression error: " + reason.value()); } nvcomp::batched_decompress(nvcomp::compression_type::LZ4, - inflate_in, - inflate_out, + inflate_in_view, + inflate_out_view, inflate_res, max_uncomp_block_size, total_decomp_size, From 54d9bb941791e9e3b4fbfadf7b0425d94d2c2280 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 19 Dec 2024 17:34:45 -0800 Subject: [PATCH 07/23] fix some more --- cpp/src/io/orc/reader_impl_decode.cu | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/orc/reader_impl_decode.cu b/cpp/src/io/orc/reader_impl_decode.cu index be7254db51c..b661bb4ff90 100644 --- a/cpp/src/io/orc/reader_impl_decode.cu +++ b/cpp/src/io/orc/reader_impl_decode.cu @@ -198,7 +198,8 @@ rmm::device_buffer decompress_stripe_data( switch (decompressor.compression()) { case compression_type::ZLIB: if (nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE)) { - gpuinflate(inflate_in_view, inflate_out, inflate_res, gzip_header_included::NO, stream); + gpuinflate( + inflate_in_view, inflate_out_view, inflate_res, gzip_header_included::NO, stream); } else { nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE, inflate_in_view, From 963f066d2a786916519ea7a3fe5dd0e8d4d9d112 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 20 Dec 2024 10:56:15 -0800 Subject: [PATCH 08/23] avoid part of nvcomp enabled checks in writers --- cpp/src/io/comp/comp.cpp | 38 +++++++++++++++ cpp/src/io/comp/comp.hpp | 15 ++++++ cpp/src/io/orc/stripe_enc.cu | 55 ++++++---------------- cpp/src/io/parquet/writer_impl.cu | 55 +++++++++++----------- cpp/src/io/parquet/writer_impl_helpers.cpp | 14 ------ cpp/src/io/parquet/writer_impl_helpers.hpp | 8 ---- 6 files changed, 94 insertions(+), 91 deletions(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 26535bed43b..044b9b2137a 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -16,6 +16,7 @@ #include "comp.hpp" +#include "gpuinflate.hpp" #include "io/utilities/hostdevice_vector.hpp" #include "nvcomp_adapter.hpp" @@ -112,4 +113,41 @@ std::vector compress(compression_type compression, } } +void compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + switch (compression) { + case compression_type::SNAPPY: + if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { + gpu_snap(inputs, outputs, results, stream); + } else { + nvcomp::batched_compress( + nvcomp::compression_type::SNAPPY, inputs, outputs, results, stream); + } + break; + case compression_type::ZSTD: { + if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); + reason) { + CUDF_FAIL("Compression error: " + reason.value()); + } + nvcomp::batched_compress(nvcomp::compression_type::ZSTD, inputs, outputs, results, stream); + break; + } + case compression_type::LZ4: { + if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); + reason) { + CUDF_FAIL("Compression error: " + reason.value()); + } + nvcomp::batched_compress(nvcomp::compression_type::LZ4, inputs, outputs, results, stream); + break; + } + case compression_type::NONE: return; + default: CUDF_FAIL("invalid compression type"); + } +} + } // namespace cudf::io::detail diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index e16f26e1f06..abe90473fdd 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -57,5 +57,20 @@ std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream); +/** + * @brief Compresses device memory buffers. + * + * @param compression Type of compression of the input data + * @param inputs Device memory buffers to compress + * @param outputs Device memory buffers to store the compressed output + * @param results Compression results + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream); + } // namespace io::detail } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 79ecca0ca99..f376ccb5ebb 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1359,6 +1359,18 @@ void CompactOrcDataStreams(device_2dspan strm_desc, srcs.begin(), dsts.begin(), lengths.begin(), lengths.size(), stream); } +compression_type from_orc_compression(orc::CompressionKind compression) +{ + switch (compression) { + case orc::CompressionKind::NONE: return compression_type::NONE; + case orc::CompressionKind::SNAPPY: return compression_type::SNAPPY; + case orc::CompressionKind::ZLIB: return compression_type::ZLIB; + case orc::CompressionKind::ZSTD: return compression_type::ZSTD; + case orc::CompressionKind::LZ4: return compression_type::LZ4; + default: CUDF_FAIL("Unsupported compression type"); + } +} + std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, @@ -1387,47 +1399,8 @@ std::optional CompressOrcDataStreams( max_comp_blk_size, comp_block_align); - if (compression == SNAPPY) { - try { - if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - cudf::io::detail::gpu_snap(comp_in, comp_out, comp_res, stream); - } else { - nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); - } - } catch (...) { - // There was an error in compressing so set an error status for each block - thrust::for_each( - rmm::exec_policy(stream), - comp_res.begin(), - comp_res.end(), - [] __device__(compression_result & stat) { stat.status = compression_status::FAILURE; }); - // Since SNAPPY is the default compression (may not be explicitly requested), fall back to - // writing without compression - CUDF_LOG_WARN("ORC writer: compression failed, writing uncompressed data"); - } - } else if (compression == ZLIB) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::DEFLATE); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress( - nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_res, stream); - } else if (compression == ZSTD) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); - } else if (compression == LZ4) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::LZ4, comp_in, comp_out, comp_res, stream); - } else if (compression != NONE) { - CUDF_FAIL("Unsupported compression type"); - } + cudf::io::detail::compress( + from_orc_compression(compression), comp_in, comp_out, comp_res, stream); dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 6b1a20701f9..acfba108f97 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1455,6 +1455,31 @@ void init_encoder_pages(hostdevice_2dvector& chunks, stream.synchronize(); } +Compression to_parquet_compression(compression_type compression) +{ + switch (compression) { + case compression_type::AUTO: + case compression_type::SNAPPY: return Compression::SNAPPY; + case compression_type::ZSTD: return Compression::ZSTD; + case compression_type::LZ4: + // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 + return Compression::LZ4_RAW; + case compression_type::NONE: return Compression::UNCOMPRESSED; + default: CUDF_FAIL("Unsupported compression type"); + } +} + +compression_type from_parquet_compression(Compression codec) +{ + switch (codec) { + case Compression::SNAPPY: return compression_type::SNAPPY; + case Compression::ZSTD: return compression_type::ZSTD; + case Compression::LZ4_RAW: return compression_type::LZ4; + case Compression::UNCOMPRESSED: return compression_type::NONE; + default: CUDF_FAIL("Unsupported compression type"); + } +} + /** * @brief Encode pages. * @@ -1499,34 +1524,8 @@ void encode_pages(hostdevice_2dvector& chunks, compression_result{0, compression_status::FAILURE}); EncodePages(pages, write_v2_headers, comp_in, comp_out, comp_res, stream); - switch (compression) { - case Compression::SNAPPY: - if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_snap(comp_in, comp_out, comp_res, stream); - } else { - nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); - } - break; - case Compression::ZSTD: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); - break; - } - case Compression::LZ4_RAW: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::LZ4, comp_in, comp_out, comp_res, stream); - break; - } - case Compression::UNCOMPRESSED: break; - default: CUDF_FAIL("invalid compression type"); - } + cudf::io::detail::compress( + from_parquet_compression(compression), comp_in, comp_out, comp_res, stream); // TBD: Not clear if the official spec actually allows dynamically turning off compression at the // chunk-level diff --git a/cpp/src/io/parquet/writer_impl_helpers.cpp b/cpp/src/io/parquet/writer_impl_helpers.cpp index f15ea1f3c37..4533813230b 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.cpp +++ b/cpp/src/io/parquet/writer_impl_helpers.cpp @@ -32,20 +32,6 @@ namespace cudf::io::parquet::detail { using namespace cudf::io::detail; -Compression to_parquet_compression(compression_type compression) -{ - switch (compression) { - case compression_type::AUTO: - case compression_type::SNAPPY: return Compression::SNAPPY; - case compression_type::ZSTD: return Compression::ZSTD; - case compression_type::LZ4: - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - return Compression::LZ4_RAW; - case compression_type::NONE: return Compression::UNCOMPRESSED; - default: CUDF_FAIL("Unsupported compression type"); - } -} - nvcomp::compression_type to_nvcomp_compression_type(Compression codec) { switch (codec) { diff --git a/cpp/src/io/parquet/writer_impl_helpers.hpp b/cpp/src/io/parquet/writer_impl_helpers.hpp index 14a9a0ed5b7..e716882c4f5 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.hpp +++ b/cpp/src/io/parquet/writer_impl_helpers.hpp @@ -28,14 +28,6 @@ namespace cudf::io::parquet::detail { -/** - * @brief Function that translates GDF compression to parquet compression. - * - * @param compression The compression type - * @return The supported Parquet compression - */ -Compression to_parquet_compression(compression_type compression); - /** * @brief Function that translates the given compression codec to nvcomp compression type. * From e119ad8ded39a01fd80157a8cb8632989c7aec33 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 20 Dec 2024 15:15:14 -0800 Subject: [PATCH 09/23] single-threaded host comp --- cpp/src/io/comp/comp.cpp | 116 ++++++++++++++++++++++++++++++++------- 1 file changed, 96 insertions(+), 20 deletions(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 044b9b2137a..e9f593aabdc 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -17,6 +17,7 @@ #include "comp.hpp" #include "gpuinflate.hpp" +#include "io/utilities/getenv_or.hpp" #include "io/utilities/hostdevice_vector.hpp" #include "nvcomp_adapter.hpp" @@ -27,12 +28,19 @@ #include #include +#include #include // GZIP compression namespace cudf::io::detail { namespace { +[[maybe_unused]] auto& h_comp_pool() +{ + static BS::thread_pool pool(std::thread::hardware_concurrency()); + return pool; +} + /** * @brief GZIP host compressor (includes header) */ @@ -99,27 +107,12 @@ std::vector compress_snappy(host_span src, return cudf::detail::make_std_vector_sync(d_dst, stream); } -} // namespace - -std::vector compress(compression_type compression, - host_span src, - rmm::cuda_stream_view stream) +void device_compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) { - CUDF_FUNC_RANGE(); - switch (compression) { - case compression_type::GZIP: return compress_gzip(src); - case compression_type::SNAPPY: return compress_snappy(src, stream); - default: CUDF_FAIL("Unsupported compression type"); - } -} - -void compress(compression_type compression, - device_span const> inputs, - device_span const> outputs, - device_span results, - rmm::cuda_stream_view stream) -{ - CUDF_FUNC_RANGE(); switch (compression) { case compression_type::SNAPPY: if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { @@ -150,4 +143,87 @@ void compress(compression_type compression, } } +void host_compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + if (compression == compression_type::NONE) { return; } + auto const num_blocks = inputs.size(); + auto h_results = cudf::detail::make_host_vector(num_blocks, stream); + auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream); + auto const h_outputs = cudf::detail::make_host_vector_async(outputs, stream); + stream.synchronize(); + for (size_t i = 0; i < num_blocks; ++i) { + auto const h_input = cudf::detail::make_host_vector_sync(h_inputs[i], stream); + auto const h_output = compress(compression, h_input, stream); + cudf::detail::cuda_memcpy(h_outputs[i].subspan(0, h_output.size()), h_output, stream); + h_results[i] = {h_output.size(), compression_status::SUCCESS}; + } + cudf::detail::cuda_memcpy_async(results, h_results, stream); +} + +[[nodiscard]] bool host_compression_supported(compression_type compression) +{ + switch (compression) { + case compression_type::SNAPPY: return true; + case compression_type::GZIP: return true; + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool device_compression_supported(compression_type compression) +{ + switch (compression) { + case compression_type::SNAPPY: return true; + case compression_type::ZSTD: return true; + case compression_type::LZ4: return true; + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool use_host_compression( + compression_type compression, + [[maybe_unused]] device_span const> inputs, + [[maybe_unused]] device_span const> outputs) +{ + CUDF_EXPECTS( + not host_compression_supported(compression) or device_compression_supported(compression), + "Unsupported compression type"); + if (not host_compression_supported(compression)) { return false; } + if (not device_compression_supported(compression)) { return true; } + return getenv_or("LIBCUDF_USE_HOST_COMPRESSION", false); +} + +} // namespace + +std::vector compress(compression_type compression, + host_span src, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + switch (compression) { + case compression_type::GZIP: return compress_gzip(src); + case compression_type::SNAPPY: return compress_snappy(src, stream); + default: std::cout << (int)compression << std::endl; CUDF_FAIL("Unsupported compression type"); + } +} + +void compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + if (not use_host_compression(compression, inputs, outputs)) { + device_compress(compression, inputs, outputs, results, stream); + } else { + host_compress(compression, inputs, outputs, results, stream); + } +} + } // namespace cudf::io::detail From 3ab8c41fd65d9cdb0facc8e84a9c70abb67f59c6 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 20 Dec 2024 16:58:05 -0800 Subject: [PATCH 10/23] now with more threads --- cpp/src/io/comp/comp.cpp | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index e9f593aabdc..ba6b9cd03e0 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -149,17 +150,29 @@ void host_compress(compression_type compression, device_span results, rmm::cuda_stream_view stream) { + CUDF_FUNC_RANGE(); if (compression == compression_type::NONE) { return; } auto const num_blocks = inputs.size(); auto h_results = cudf::detail::make_host_vector(num_blocks, stream); auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream); auto const h_outputs = cudf::detail::make_host_vector_async(outputs, stream); stream.synchronize(); + + std::vector> tasks; + auto streams = cudf::detail::fork_streams(stream, h_comp_pool().get_thread_count()); for (size_t i = 0; i < num_blocks; ++i) { - auto const h_input = cudf::detail::make_host_vector_sync(h_inputs[i], stream); - auto const h_output = compress(compression, h_input, stream); - cudf::detail::cuda_memcpy(h_outputs[i].subspan(0, h_output.size()), h_output, stream); - h_results[i] = {h_output.size(), compression_status::SUCCESS}; + auto cur_stream = streams[i % streams.size()]; + auto task = [d_in = h_inputs[i], d_out = h_outputs[i], cur_stream, compression]() -> size_t { + auto const h_in = cudf::detail::make_host_vector_sync(d_in, cur_stream); + auto const h_out = compress(compression, h_in, cur_stream); + cudf::detail::cuda_memcpy(d_out.subspan(0, h_out.size()), h_out, cur_stream); + return h_out.size(); + }; + tasks.emplace_back(h_comp_pool().submit_task(std::move(task))); + } + + for (auto i = 0ul; i < num_blocks; ++i) { + h_results[i] = {tasks[i].get(), compression_status::SUCCESS}; } cudf::detail::cuda_memcpy_async(results, h_results, stream); } @@ -195,7 +208,7 @@ void host_compress(compression_type compression, "Unsupported compression type"); if (not host_compression_supported(compression)) { return false; } if (not device_compression_supported(compression)) { return true; } - return getenv_or("LIBCUDF_USE_HOST_COMPRESSION", false); + return getenv_or("LIBCUDF_USE_HOST_COMPRESSION", 0); } } // namespace From a14b3511a51dffa930c5215a07cfe8fe9ae53b5f Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Sat, 21 Dec 2024 00:03:30 -0800 Subject: [PATCH 11/23] decouple orc writer from nvcomp --- cpp/src/io/comp/comp.cpp | 43 +++++++++- cpp/src/io/comp/comp.hpp | 24 ++++++ cpp/src/io/orc/orc_gpu.hpp | 2 +- cpp/src/io/orc/stripe_enc.cu | 8 +- cpp/src/io/orc/writer_impl.cu | 142 +++++++++++---------------------- cpp/src/io/orc/writer_impl.hpp | 2 +- 6 files changed, 115 insertions(+), 106 deletions(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index ba6b9cd03e0..270f310f245 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -36,12 +36,22 @@ namespace cudf::io::detail { namespace { -[[maybe_unused]] auto& h_comp_pool() +auto& h_comp_pool() { static BS::thread_pool pool(std::thread::hardware_concurrency()); return pool; } +std::optional to_nvcomp_compression(compression_type compression) +{ + switch (compression) { + case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY; + case compression_type::ZSTD: return nvcomp::compression_type::ZSTD; + case compression_type::LZ4: return nvcomp::compression_type::LZ4; + default: return std::nullopt; + } +} + /** * @brief GZIP host compressor (includes header) */ @@ -180,7 +190,6 @@ void host_compress(compression_type compression, [[nodiscard]] bool host_compression_supported(compression_type compression) { switch (compression) { - case compression_type::SNAPPY: return true; case compression_type::GZIP: return true; case compression_type::NONE: return true; default: return false; @@ -213,6 +222,36 @@ void host_compress(compression_type compression, } // namespace +std::optional compress_max_allowed_block_size(compression_type compression) +{ + if (auto nvcomp_type = to_nvcomp_compression(compression); + nvcomp_type.has_value() and not nvcomp::is_compression_disabled(*nvcomp_type)) { + return nvcomp::compress_max_allowed_chunk_size(*nvcomp_type); + } + return std::nullopt; +} + +[[nodiscard]] size_t compress_required_block_alignment(compression_type compression) +{ + auto nvcomp_type = to_nvcomp_compression(compression); + if (compression == compression_type::NONE or not nvcomp_type.has_value() or + nvcomp::is_compression_disabled(*nvcomp_type)) { + return 1ul; + } + + return nvcomp::required_alignment(*nvcomp_type); +} + +[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size) +{ + if (compression == compression_type::NONE) { return uncompressed_size; } + + if (auto nvcomp_type = to_nvcomp_compression(compression); nvcomp_type.has_value()) { + return nvcomp::compress_max_output_chunk_size(*nvcomp_type, uncompressed_size); + } + CUDF_FAIL("Unsupported compression type"); +} + std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream) diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index abe90473fdd..a67d604cf0f 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -57,6 +57,30 @@ std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream); +/** + * @brief Maximum size of uncompressed blocks that can be compressed. + * + * @param compression Compression type + * @returns maximum block size + */ +[[nodiscard]] std::optional compress_max_allowed_block_size(compression_type compression); + +/** + * @brief Gets input and output alignment requirements for the given compression type. + * + * @param compression Compression type + * @returns required alignment + */ +[[nodiscard]] size_t compress_required_block_alignment(compression_type compression); + +/** + * @brief Gets the maximum size any chunk could compress to in the batch. + * + * @param compression Compression type + * @param uncompressed_size Size of the largest uncompressed chunk in the batch + */ +[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size); + /** * @brief Compresses device memory buffers. * diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index f4e75f78dec..d08071b899f 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -407,7 +407,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, - CompressionKind compression, + compression_type compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, uint32_t comp_block_align, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index f376ccb5ebb..a4f5279023d 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -15,7 +15,6 @@ */ #include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/block_utils.cuh" #include "io/utilities/time_utils.cuh" #include "orc_gpu.hpp" @@ -45,8 +44,6 @@ namespace io { namespace orc { namespace gpu { -namespace nvcomp = cudf::io::detail::nvcomp; - using cudf::detail::device_2dspan; using cudf::io::detail::compression_result; using cudf::io::detail::compression_status; @@ -1374,7 +1371,7 @@ compression_type from_orc_compression(orc::CompressionKind compression) std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, - CompressionKind compression, + compression_type compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, uint32_t comp_block_align, @@ -1399,8 +1396,7 @@ std::optional CompressOrcDataStreams( max_comp_blk_size, comp_block_align); - cudf::io::detail::compress( - from_orc_compression(compression), comp_in, comp_out, comp_res, stream); + cudf::io::detail::compress(compression, comp_in, comp_out, comp_res, stream); dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index ce868b83c04..8be10e444f5 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -19,7 +19,6 @@ * @brief cuDF-IO ORC writer class implementation */ -#include "io/comp/nvcomp_adapter.hpp" #include "io/orc/orc_gpu.hpp" #include "io/statistics/column_statistics.cuh" #include "io/utilities/column_utils.cuh" @@ -71,8 +70,6 @@ namespace cudf::io::orc::detail { -namespace nvcomp = cudf::io::detail::nvcomp; - template [[nodiscard]] constexpr int varint_size(T val) { @@ -92,21 +89,8 @@ struct row_group_index_info { }; namespace { - /** - * @brief Translates ORC compression to nvCOMP compression - */ -auto to_nvcomp_compression_type(CompressionKind compression_kind) -{ - if (compression_kind == SNAPPY) return nvcomp::compression_type::SNAPPY; - if (compression_kind == ZLIB) return nvcomp::compression_type::DEFLATE; - if (compression_kind == ZSTD) return nvcomp::compression_type::ZSTD; - if (compression_kind == LZ4) return nvcomp::compression_type::LZ4; - CUDF_FAIL("Unsupported compression type"); -} - -/** - * @brief Translates cuDF compression to ORC compression + * @brief Translates cuDF compression to ORC compression. */ orc::CompressionKind to_orc_compression(compression_type compression) { @@ -122,19 +106,14 @@ orc::CompressionKind to_orc_compression(compression_type compression) } /** - * @brief Returns the block size for a given compression kind. + * @brief Returns the block size for a given compression format. */ -constexpr size_t compression_block_size(orc::CompressionKind compression) +size_t compression_block_size(compression_type compression) { - if (compression == orc::CompressionKind::NONE) { return 0; } - - auto const ncomp_type = to_nvcomp_compression_type(compression); - auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type) - ? std::nullopt - : nvcomp::compress_max_allowed_chunk_size(ncomp_type); + auto const comp_limit = compress_max_allowed_block_size(compression); constexpr size_t max_block_size = 256 * 1024; - return std::min(nvcomp_limit.value_or(max_block_size), max_block_size); + return std::min(comp_limit.value_or(max_block_size), max_block_size); } /** @@ -534,26 +513,6 @@ size_t RLE_stream_size(TypeKind kind, size_t count) } } -auto uncomp_block_alignment(CompressionKind compression_kind) -{ - if (compression_kind == NONE or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) { - return 1ul; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(compression_kind)); -} - -auto comp_block_alignment(CompressionKind compression_kind) -{ - if (compression_kind == NONE or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) { - return 1ul; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(compression_kind)); -} - /** * @brief Builds up per-column streams. * @@ -566,7 +525,7 @@ orc_streams create_streams(host_span columns, file_segmentation const& segmentation, std::map const& decimal_column_sizes, bool enable_dictionary, - CompressionKind compression_kind, + compression_type compression, single_write_mode write_mode) { // 'column 0' row index stream @@ -610,7 +569,7 @@ orc_streams create_streams(host_span columns, auto add_stream = [&](gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { - auto const max_alignment_padding = uncomp_block_alignment(compression_kind) - 1; + auto const max_alignment_padding = compress_required_block_alignment(compression) - 1; const auto base = column.index() * gpu::CI_NUM_STREAMS; ids[base + index_type] = streams.size(); streams.push_back(orc::Stream{ @@ -1473,7 +1432,7 @@ encoded_footer_statistics finish_statistic_blobs(Footer const& footer, * @param[in] rg_stats row group level statistics * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in] compression_kind The compression kind + * @param[in] compression The compression format * @param[in] compression_blocksize The block size used for compression * @param[in] out_sink Sink for writing data */ @@ -1487,7 +1446,7 @@ void write_index_stream(int32_t stripe_id, host_span rg_stats, StripeInformation* stripe, orc_streams* streams, - CompressionKind compression_kind, + compression_type compression, size_t compression_blocksize, std::unique_ptr const& out_sink) { @@ -1501,7 +1460,7 @@ void write_index_stream(int32_t stripe_id, row_group_index_info record; if (stream.ids[type] > 0) { record.pos = 0; - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { auto const& ss = strm_desc[stripe_id][stream.ids[type] - (columns.size() + 1)]; record.blk_pos = ss.first_block; record.comp_pos = 0; @@ -1541,7 +1500,7 @@ void write_index_stream(int32_t stripe_id, } } - ProtobufWriter pbw((compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((compression != compression_type::NONE) ? 3 : 0); // Add row index entries auto const& rowgroups_range = segmentation.stripes[stripe_id]; @@ -1566,7 +1525,7 @@ void write_index_stream(int32_t stripe_id, }); (*streams)[stream_id].length = pbw.size(); - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { uint32_t uncomp_ix_len = (uint32_t)((*streams)[stream_id].length - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_ix_len >> 0); pbw.buffer()[1] = static_cast(uncomp_ix_len >> 8); @@ -1585,7 +1544,7 @@ void write_index_stream(int32_t stripe_id, * @param[in,out] bounce_buffer Pinned memory bounce buffer for D2H data transfer * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in] compression_kind The compression kind + * @param[in] compression The compression format * @param[in] out_sink Sink for writing data * @param[in] stream CUDA stream used for device memory operations and kernel launches * @return An std::future that should be synchronized to ensure the writing is complete @@ -1596,7 +1555,7 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, host_span bounce_buffer, StripeInformation* stripe, orc_streams* streams, - CompressionKind compression_kind, + compression_type compression, std::unique_ptr const& out_sink, rmm::cuda_stream_view stream) { @@ -1606,8 +1565,9 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, return std::async(std::launch::deferred, [] {}); } - auto const* stream_in = (compression_kind == NONE) ? enc_stream.data_ptrs[strm_desc.stream_type] - : (compressed_data + strm_desc.bfr_offset); + auto const* stream_in = (compression == compression_type::NONE) + ? enc_stream.data_ptrs[strm_desc.stream_type] + : (compressed_data + strm_desc.bfr_offset); auto write_task = [&]() { if (out_sink->is_device_write_preferred(length)) { @@ -1627,15 +1587,15 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, /** * @brief Insert 3-byte uncompressed block headers in a byte vector * - * @param compression_kind The compression kind + * @param compression The compression kind * @param compression_blocksize The block size used for compression * @param v The destitation byte vector to write, which must include initial 3-byte header */ -void add_uncompressed_block_headers(CompressionKind compression_kind, +void add_uncompressed_block_headers(compression_type compression, size_t compression_blocksize, std::vector& v) { - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { size_t uncomp_len = v.size() - 3, pos = 0, block_len; while (uncomp_len > compression_blocksize) { block_len = compression_blocksize * 2 + 1; @@ -2021,14 +1981,6 @@ std::map decimal_column_sizes( return column_sizes; } -size_t max_compression_output_size(CompressionKind compression_kind, uint32_t compression_blocksize) -{ - if (compression_kind == NONE) return 0; - - return nvcomp::compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), - compression_blocksize); -} - std::unique_ptr make_table_meta(table_view const& input) { auto table_meta = std::make_unique(input); @@ -2287,7 +2239,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, * @param row_index_stride The row index stride * @param enable_dictionary Whether dictionary is enabled * @param sort_dictionaries Whether to sort the dictionaries - * @param compression_kind The compression kind + * @param compression The compression format * @param compression_blocksize The block size used for compression * @param stats_freq Column statistics granularity type for parquet/orc writers * @param collect_compression_stats Flag to indicate if compression statistics should be collected @@ -2302,7 +2254,7 @@ auto convert_table_to_orc_data(table_view const& input, size_type row_index_stride, bool enable_dictionary, bool sort_dictionaries, - CompressionKind compression_kind, + compression_type compression, size_t compression_blocksize, statistics_freq stats_freq, bool collect_compression_stats, @@ -2329,17 +2281,16 @@ auto convert_table_to_orc_data(table_view const& input, auto stripe_dicts = build_dictionaries(orc_table, segmentation, sort_dictionaries, stream); auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); - auto const uncompressed_block_align = uncomp_block_alignment(compression_kind); - auto const compressed_block_align = comp_block_alignment(compression_kind); + auto const block_align = compress_required_block_alignment(compression); auto streams = create_streams(orc_table.columns, segmentation, decimal_column_sizes(dec_chunk_sizes.rg_sizes), enable_dictionary, - compression_kind, + compression, write_mode); auto enc_data = encode_columns( - orc_table, std::move(dec_chunk_sizes), segmentation, streams, uncompressed_block_align, stream); + orc_table, std::move(dec_chunk_sizes), segmentation, streams, block_align, stream); stripe_dicts.on_encode_complete(stream); @@ -2371,16 +2322,15 @@ auto convert_table_to_orc_data(table_view const& input, size_t compressed_bfr_size = 0; size_t num_compressed_blocks = 0; - auto const max_compressed_block_size = - max_compression_output_size(compression_kind, compression_blocksize); + auto const max_compressed_block_size = max_compressed_size(compression, compression_blocksize); auto const padded_max_compressed_block_size = - util::round_up_unsafe(max_compressed_block_size, compressed_block_align); + util::round_up_unsafe(max_compressed_block_size, block_align); auto const padded_block_header_size = - util::round_up_unsafe(block_header_size, compressed_block_align); + util::round_up_unsafe(block_header_size, block_align); for (auto& ss : strm_descs.host_view().flat_view()) { size_t stream_size = ss.stream_size; - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { ss.first_block = num_compressed_blocks; ss.bfr_offset = compressed_bfr_size; @@ -2401,14 +2351,14 @@ auto convert_table_to_orc_data(table_view const& input, comp_results.d_begin(), comp_results.d_end(), compression_result{0, compression_status::FAILURE}); - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { strm_descs.host_to_device_async(stream); compression_stats = gpu::CompressOrcDataStreams(compressed_data, num_compressed_blocks, - compression_kind, + compression, compression_blocksize, max_compressed_block_size, - compressed_block_align, + block_align, collect_compression_stats, strm_descs, enc_data.streams, @@ -2459,8 +2409,8 @@ writer::impl::impl(std::unique_ptr sink, : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, _row_index_stride{options.get_row_index_stride()}, - _compression_kind(to_orc_compression(options.get_compression())), - _compression_blocksize(compression_block_size(_compression_kind)), + _compression{options.get_compression()}, + _compression_blocksize(compression_block_size(_compression)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _sort_dictionaries{options.get_enable_dictionary_sort()}, @@ -2480,8 +2430,8 @@ writer::impl::impl(std::unique_ptr sink, : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, _row_index_stride{options.get_row_index_stride()}, - _compression_kind(to_orc_compression(options.get_compression())), - _compression_blocksize(compression_block_size(_compression_kind)), + _compression{options.get_compression()}, + _compression_blocksize(compression_block_size(_compression)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _sort_dictionaries{options.get_enable_dictionary_sort()}, @@ -2526,7 +2476,7 @@ void writer::impl::write(table_view const& input) _row_index_stride, _enable_dictionary, _sort_dictionaries, - _compression_kind, + _compression, _compression_blocksize, _stats_freq, _compression_statistics != nullptr, @@ -2613,7 +2563,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, rg_stats, &stripe, &streams, - _compression_kind, + _compression, _compression_blocksize, _out_sink); } @@ -2627,7 +2577,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, bounce_buffer, &stripe, &streams, - _compression_kind, + _compression, _out_sink, _stream)); } @@ -2645,10 +2595,10 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, : 0; if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } } - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(sf); stripe.footerLength = pbw.size(); - if (_compression_kind != NONE) { + if (_compression != compression_type::NONE) { uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_sf_len >> 0); pbw.buffer()[1] = static_cast(uncomp_sf_len >> 8); @@ -2780,21 +2730,21 @@ void writer::impl::close() // Write statistics metadata if (not _orc_meta.stripeStats.empty()) { - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(_orc_meta); - add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); + add_uncompressed_block_headers(_compression, _compression_blocksize, pbw.buffer()); ps.metadataLength = pbw.size(); _out_sink->host_write(pbw.data(), pbw.size()); } else { ps.metadataLength = 0; } - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(_footer); - add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); + add_uncompressed_block_headers(_compression, _compression_blocksize, pbw.buffer()); // Write postscript metadata ps.footerLength = pbw.size(); - ps.compression = _compression_kind; + ps.compression = to_orc_compression(_compression); ps.compressionBlockSize = _compression_blocksize; ps.version = {0, 12}; // Hive 0.12 ps.writerVersion = cudf_writer_version; diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index cae849ee315..8d371daba1d 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -342,7 +342,7 @@ class writer::impl { // Writer options. stripe_size_limits const _max_stripe_size; size_type const _row_index_stride; - CompressionKind const _compression_kind; + compression_type const _compression; size_t const _compression_blocksize; std::shared_ptr _compression_statistics; // Optional output statistics_freq const _stats_freq; From 0b6b72de41c35467f95787efbddfdf922e7bfc58 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 23 Dec 2024 13:28:34 -0800 Subject: [PATCH 12/23] decouple pq writer from nvcomp --- cpp/src/io/comp/comp.hpp | 13 +++ cpp/src/io/comp/gpuinflate.hpp | 13 --- cpp/src/io/comp/statistics.cu | 2 +- cpp/src/io/parquet/writer_impl.cu | 98 +++++++++------------- cpp/src/io/parquet/writer_impl.hpp | 2 +- cpp/src/io/parquet/writer_impl_helpers.cpp | 30 ------- cpp/src/io/parquet/writer_impl_helpers.hpp | 28 ------- 7 files changed, 55 insertions(+), 131 deletions(-) diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index a67d604cf0f..3f1084873d7 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -96,5 +96,18 @@ void compress(compression_type compression, device_span results, rmm::cuda_stream_view stream); +/** + * @brief Aggregate results of compression into a single statistics object. + * + * @param inputs List of uncompressed input buffers + * @param results List of compression results + * @param stream CUDA stream to use + * @return writer_compression_statistics + */ +[[nodiscard]] writer_compression_statistics collect_compression_statistics( + device_span const> inputs, + device_span results, + rmm::cuda_stream_view stream); + } // namespace io::detail } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 4b09bd5a84c..58024ea3004 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -124,17 +124,4 @@ void gpu_snap(device_span const> inputs, device_span results, rmm::cuda_stream_view stream); -/** - * @brief Aggregate results of compression into a single statistics object. - * - * @param inputs List of uncompressed input buffers - * @param results List of compression results - * @param stream CUDA stream to use - * @return writer_compression_statistics - */ -[[nodiscard]] writer_compression_statistics collect_compression_statistics( - device_span const> inputs, - device_span results, - rmm::cuda_stream_view stream); - } // namespace cudf::io::detail diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/statistics.cu index caee9145d2c..4c745935a0f 100644 --- a/cpp/src/io/comp/statistics.cu +++ b/cpp/src/io/comp/statistics.cu @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "gpuinflate.hpp" +#include "comp.hpp" #include diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index acfba108f97..c0178688120 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -23,8 +23,7 @@ #include "compact_protocol_reader.hpp" #include "compact_protocol_writer.hpp" #include "interop/decimal_conversion_utilities.cuh" -#include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" +#include "io/comp/comp.hpp" #include "io/parquet/parquet.hpp" #include "io/parquet/parquet_gpu.hpp" #include "io/statistics/column_statistics.cuh" @@ -67,6 +66,20 @@ namespace cudf::io::parquet::detail { using namespace cudf::io::detail; +Compression to_parquet_compression(compression_type compression) +{ + switch (compression) { + case compression_type::AUTO: + case compression_type::SNAPPY: return Compression::SNAPPY; + case compression_type::ZSTD: return Compression::ZSTD; + case compression_type::LZ4: + // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 + return Compression::LZ4_RAW; + case compression_type::NONE: return Compression::UNCOMPRESSED; + default: CUDF_FAIL("Unsupported compression type"); + } +} + struct aggregate_writer_metadata { aggregate_writer_metadata(host_span partitions, host_span const> kv_md, @@ -1172,7 +1185,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, - Compression compression_codec, + compression_type compression, rmm::cuda_stream_view stream) { if (chunks.is_empty()) { return cudf::detail::hostdevice_vector{}; } @@ -1187,7 +1200,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_block_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1212,7 +1225,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_block_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1221,12 +1234,10 @@ auto init_page_sizes(hostdevice_2dvector& chunks, // Get per-page max compressed size cudf::detail::hostdevice_vector comp_page_sizes(num_pages, stream); - std::transform(page_sizes.begin(), - page_sizes.end(), - comp_page_sizes.begin(), - [compression_codec](auto page_size) { - return max_compression_output_size(compression_codec, page_size); - }); + std::transform( + page_sizes.begin(), page_sizes.end(), comp_page_sizes.begin(), [compression](auto page_size) { + return max_compressed_size(compression, page_size); + }); comp_page_sizes.host_to_device_async(stream); // Use per-page max compressed size to calculate chunk.compressed_size @@ -1238,7 +1249,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_block_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1247,16 +1258,13 @@ auto init_page_sizes(hostdevice_2dvector& chunks, return comp_page_sizes; } -size_t max_page_bytes(Compression compression, size_t max_page_size_bytes) +size_t max_page_bytes(compression_type compression, size_t max_page_size_bytes) { - if (compression == Compression::UNCOMPRESSED) { return max_page_size_bytes; } + if (compression == compression_type::NONE) { return max_page_size_bytes; } - auto const ncomp_type = to_nvcomp_compression_type(compression); - auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type) - ? std::nullopt - : nvcomp::compress_max_allowed_chunk_size(ncomp_type); + auto const comp_limit = compress_max_allowed_block_size(compression); - auto max_size = std::min(nvcomp_limit.value_or(max_page_size_bytes), max_page_size_bytes); + auto max_size = std::min(comp_limit.value_or(max_page_size_bytes), max_page_size_bytes); // page size must fit in a 32-bit signed integer return std::min(max_size, std::numeric_limits::max()); } @@ -1265,7 +1273,7 @@ std::pair>, std::vector& chunks, host_span col_desc, device_2dspan frags, - Compression compression, + compression_type compression, dictionary_policy dict_policy, size_t max_dict_size, rmm::cuda_stream_view stream) @@ -1404,7 +1412,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, * @param num_columns Total number of columns * @param num_pages Total number of pages * @param num_stats_bfr Number of statistics buffers - * @param compression Compression format + * @param alignment Page alignment * @param max_page_size_bytes Maximum uncompressed page size, in bytes * @param max_page_size_rows Maximum page size, in rows * @param write_v2_headers True if version 2 page headers are to be written @@ -1419,7 +1427,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, uint32_t num_columns, uint32_t num_pages, uint32_t num_stats_bfr, - Compression compression, + size_t alignment, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, @@ -1435,7 +1443,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression), + alignment, write_v2_headers, (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, @@ -1455,31 +1463,6 @@ void init_encoder_pages(hostdevice_2dvector& chunks, stream.synchronize(); } -Compression to_parquet_compression(compression_type compression) -{ - switch (compression) { - case compression_type::AUTO: - case compression_type::SNAPPY: return Compression::SNAPPY; - case compression_type::ZSTD: return Compression::ZSTD; - case compression_type::LZ4: - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - return Compression::LZ4_RAW; - case compression_type::NONE: return Compression::UNCOMPRESSED; - default: CUDF_FAIL("Unsupported compression type"); - } -} - -compression_type from_parquet_compression(Compression codec) -{ - switch (codec) { - case Compression::SNAPPY: return compression_type::SNAPPY; - case Compression::ZSTD: return compression_type::ZSTD; - case Compression::LZ4_RAW: return compression_type::LZ4; - case Compression::UNCOMPRESSED: return compression_type::NONE; - default: CUDF_FAIL("Unsupported compression type"); - } -} - /** * @brief Encode pages. * @@ -1503,7 +1486,7 @@ void encode_pages(hostdevice_2dvector& chunks, statistics_chunk const* chunk_stats, statistics_chunk const* column_stats, std::optional& comp_stats, - Compression compression, + compression_type compression, int32_t column_index_truncate_length, bool write_v2_headers, rmm::cuda_stream_view stream) @@ -1513,7 +1496,7 @@ void encode_pages(hostdevice_2dvector& chunks, ? device_span(page_stats, num_pages) : device_span(); - uint32_t max_comp_pages = (compression != Compression::UNCOMPRESSED) ? num_pages : 0; + uint32_t max_comp_pages = (compression != compression_type::NONE) ? num_pages : 0; rmm::device_uvector> comp_in(max_comp_pages, stream); rmm::device_uvector> comp_out(max_comp_pages, stream); @@ -1524,8 +1507,7 @@ void encode_pages(hostdevice_2dvector& chunks, compression_result{0, compression_status::FAILURE}); EncodePages(pages, write_v2_headers, comp_in, comp_out, comp_res, stream); - cudf::io::detail::compress( - from_parquet_compression(compression), comp_in, comp_out, comp_res, stream); + compress(compression, comp_in, comp_out, comp_res, stream); // TBD: Not clear if the official spec actually allows dynamically turning off compression at the // chunk-level @@ -1743,7 +1725,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, size_type max_page_size_rows, int32_t column_index_truncate_length, statistics_freq stats_granularity, - Compression compression, + compression_type compression, bool collect_compression_statistics, dictionary_policy dict_policy, size_t max_dictionary_size, @@ -2145,7 +2127,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } // Clear compressed buffer size if compression has been turned off - if (compression == Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; } + if (compression == compression_type::NONE) { max_comp_bfr_size = 0; } // Initialize data pointers uint32_t const num_stats_bfr = @@ -2213,7 +2195,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, num_columns, num_pages, num_stats_bfr, - compression, + compress_required_block_alignment(compression), max_page_size_bytes, max_page_size_rows, write_v2_headers, @@ -2269,7 +2251,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, auto const dev_bfr = ck.is_compressed ? ck.compressed_bfr : ck.uncompressed_bfr; auto& column_chunk_meta = row_group.columns[i].meta_data; - if (ck.is_compressed) { column_chunk_meta.codec = compression; } + if (ck.is_compressed) { column_chunk_meta.codec = to_parquet_compression(compression); } if (!out_sink[p]->is_device_write_preferred(ck.compressed_size)) { all_device_write = false; } @@ -2374,7 +2356,7 @@ writer::impl::impl(std::vector> sinks, single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), + _compression(options.get_compression()), _max_row_group_size{options.get_row_group_size_bytes()}, _max_row_group_rows{options.get_row_group_size_rows()}, _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), @@ -2405,7 +2387,7 @@ writer::impl::impl(std::vector> sinks, single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), + _compression(options.get_compression()), _max_row_group_size{options.get_row_group_size_bytes()}, _max_row_group_rows{options.get_row_group_size_rows()}, _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 63128faf993..aa3973987ba 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -144,7 +144,7 @@ class writer::impl { rmm::cuda_stream_view _stream; // Writer options. - Compression const _compression; + compression_type const _compression; size_t const _max_row_group_size; size_type const _max_row_group_rows; size_t const _max_page_size_bytes; diff --git a/cpp/src/io/parquet/writer_impl_helpers.cpp b/cpp/src/io/parquet/writer_impl_helpers.cpp index 4533813230b..729f05af9ee 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.cpp +++ b/cpp/src/io/parquet/writer_impl_helpers.cpp @@ -21,8 +21,6 @@ #include "writer_impl_helpers.hpp" -#include "io/comp/nvcomp_adapter.hpp" - #include #include #include @@ -32,34 +30,6 @@ namespace cudf::io::parquet::detail { using namespace cudf::io::detail; -nvcomp::compression_type to_nvcomp_compression_type(Compression codec) -{ - switch (codec) { - case Compression::SNAPPY: return nvcomp::compression_type::SNAPPY; - case Compression::ZSTD: return nvcomp::compression_type::ZSTD; - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - case Compression::LZ4_RAW: return nvcomp::compression_type::LZ4; - default: CUDF_FAIL("Unsupported compression type"); - } -} - -uint32_t page_alignment(Compression codec) -{ - if (codec == Compression::UNCOMPRESSED or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(codec))) { - return 1u; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(codec)); -} - -size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize) -{ - if (codec == Compression::UNCOMPRESSED) return 0; - - return compress_max_output_chunk_size(to_nvcomp_compression_type(codec), compression_blocksize); -} - void fill_table_meta(table_input_metadata& table_meta) { // Fill unnamed columns' names in table_meta diff --git a/cpp/src/io/parquet/writer_impl_helpers.hpp b/cpp/src/io/parquet/writer_impl_helpers.hpp index e716882c4f5..b7056fb0053 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.hpp +++ b/cpp/src/io/parquet/writer_impl_helpers.hpp @@ -20,40 +20,12 @@ */ #pragma once -#include "parquet_common.hpp" #include #include -#include namespace cudf::io::parquet::detail { -/** - * @brief Function that translates the given compression codec to nvcomp compression type. - * - * @param codec Compression codec - * @return Translated nvcomp compression type - */ -cudf::io::detail::nvcomp::compression_type to_nvcomp_compression_type(Compression codec); - -/** - * @brief Function that computes input alignment requirements for the given compression type. - * - * @param codec Compression codec - * @return Required alignment - */ -uint32_t page_alignment(Compression codec); - -/** - * @brief Gets the maximum compressed chunk size for the largest chunk uncompressed chunk in the - * batch. - * - * @param codec Compression codec - * @param compression_blocksize Size of the largest uncompressed chunk in the batch - * @return Maximum compressed chunk size - */ -size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize); - /** * @brief Fill the table metadata with default column names. * From d83abacde86b339c7573922d9a9429f42deb9729 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 23 Dec 2024 14:08:23 -0800 Subject: [PATCH 13/23] missed DEFLATE --- cpp/src/io/comp/comp.cpp | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 270f310f245..d145384de7f 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -48,6 +48,7 @@ std::optional to_nvcomp_compression(compression_type c case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY; case compression_type::ZSTD: return nvcomp::compression_type::ZSTD; case compression_type::LZ4: return nvcomp::compression_type::LZ4; + case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE; default: return std::nullopt; } } @@ -149,6 +150,14 @@ void device_compress(compression_type compression, nvcomp::batched_compress(nvcomp::compression_type::LZ4, inputs, outputs, results, stream); break; } + case compression_type::ZLIB: { + if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::DEFLATE); + reason) { + CUDF_FAIL("Compression error: " + reason.value()); + } + nvcomp::batched_compress(nvcomp::compression_type::DEFLATE, inputs, outputs, results, stream); + break; + } case compression_type::NONE: return; default: CUDF_FAIL("invalid compression type"); } @@ -198,10 +207,12 @@ void host_compress(compression_type compression, [[nodiscard]] bool device_compression_supported(compression_type compression) { + auto const nvcomp_type = to_nvcomp_compression(compression); switch (compression) { case compression_type::SNAPPY: return true; - case compression_type::ZSTD: return true; - case compression_type::LZ4: return true; + case compression_type::ZSTD: + case compression_type::LZ4: + case compression_type::ZLIB: return not nvcomp::is_compression_disabled(nvcomp_type.value()); case compression_type::NONE: return true; default: return false; } @@ -260,7 +271,7 @@ std::vector compress(compression_type compression, switch (compression) { case compression_type::GZIP: return compress_gzip(src); case compression_type::SNAPPY: return compress_snappy(src, stream); - default: std::cout << (int)compression << std::endl; CUDF_FAIL("Unsupported compression type"); + default: CUDF_FAIL("Unsupported compression type"); } } From e2dce81675830ddfe52f0bade3285a219d6cd00c Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 23 Dec 2024 14:22:45 -0800 Subject: [PATCH 14/23] simplify --- cpp/src/io/comp/comp.cpp | 43 +++++++++------------------------------- 1 file changed, 9 insertions(+), 34 deletions(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index d145384de7f..dfbeb8d7fc0 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -125,41 +125,16 @@ void device_compress(compression_type compression, device_span results, rmm::cuda_stream_view stream) { + auto const nvcomp_type = to_nvcomp_compression(compression); + auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_compression_disabled(*nvcomp_type) + : "invalid compression type"; + if (not nvcomp_disabled) { + return nvcomp::batched_compress(*nvcomp_type, inputs, outputs, results, stream); + } + switch (compression) { - case compression_type::SNAPPY: - if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_snap(inputs, outputs, results, stream); - } else { - nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, inputs, outputs, results, stream); - } - break; - case compression_type::ZSTD: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::ZSTD, inputs, outputs, results, stream); - break; - } - case compression_type::LZ4: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::LZ4, inputs, outputs, results, stream); - break; - } - case compression_type::ZLIB: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::DEFLATE); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::DEFLATE, inputs, outputs, results, stream); - break; - } - case compression_type::NONE: return; - default: CUDF_FAIL("invalid compression type"); + case compression_type::SNAPPY: return gpu_snap(inputs, outputs, results, stream); + default: CUDF_FAIL("Compression error: " + nvcomp_disabled.value()); } } From 9b8fc7133e7731b5bd269de71a97ceb12cd1584c Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 23 Dec 2024 14:52:23 -0800 Subject: [PATCH 15/23] clean up --- cpp/src/io/comp/comp.cpp | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index dfbeb8d7fc0..e61aefe680e 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -174,7 +174,7 @@ void host_compress(compression_type compression, [[nodiscard]] bool host_compression_supported(compression_type compression) { switch (compression) { - case compression_type::GZIP: return true; + case compression_type::GZIP: case compression_type::NONE: return true; default: return false; } @@ -184,10 +184,10 @@ void host_compress(compression_type compression, { auto const nvcomp_type = to_nvcomp_compression(compression); switch (compression) { - case compression_type::SNAPPY: return true; - case compression_type::ZSTD: case compression_type::LZ4: - case compression_type::ZLIB: return not nvcomp::is_compression_disabled(nvcomp_type.value()); + case compression_type::ZLIB: + case compression_type::ZSTD: return not nvcomp::is_compression_disabled(nvcomp_type.value()); + case compression_type::SNAPPY: case compression_type::NONE: return true; default: return false; } @@ -203,6 +203,7 @@ void host_compress(compression_type compression, "Unsupported compression type"); if (not host_compression_supported(compression)) { return false; } if (not device_compression_supported(compression)) { return true; } + // If both host and device compression are supported, use the host if the env var is set return getenv_or("LIBCUDF_USE_HOST_COMPRESSION", 0); } @@ -257,10 +258,10 @@ void compress(compression_type compression, rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); - if (not use_host_compression(compression, inputs, outputs)) { - device_compress(compression, inputs, outputs, results, stream); + if (use_host_compression(compression, inputs, outputs)) { + return host_compress(compression, inputs, outputs, results, stream); } else { - host_compress(compression, inputs, outputs, results, stream); + return device_compress(compression, inputs, outputs, results, stream); } } From 7aaf5eddd27e881f124fb47c13d4d4e1d33299b0 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 23 Dec 2024 15:56:00 -0800 Subject: [PATCH 16/23] fix --- cpp/src/io/comp/comp.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index e61aefe680e..63bc697b33f 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -125,6 +125,8 @@ void device_compress(compression_type compression, device_span results, rmm::cuda_stream_view stream) { + if (compression == compression_type::NONE) { return; } + auto const nvcomp_type = to_nvcomp_compression(compression); auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_compression_disabled(*nvcomp_type) : "invalid compression type"; @@ -144,8 +146,8 @@ void host_compress(compression_type compression, device_span results, rmm::cuda_stream_view stream) { - CUDF_FUNC_RANGE(); if (compression == compression_type::NONE) { return; } + auto const num_blocks = inputs.size(); auto h_results = cudf::detail::make_host_vector(num_blocks, stream); auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream); From 3d311ae281d0563afd96da6d8b66600bc8ee0378 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 2 Jan 2025 11:04:10 -0800 Subject: [PATCH 17/23] style --- cpp/src/io/comp/comp.cpp | 2 +- cpp/src/io/comp/comp.hpp | 4 ++-- cpp/src/io/comp/gpuinflate.hpp | 2 +- cpp/src/io/comp/statistics.cu | 2 +- cpp/src/io/orc/orc_gpu.hpp | 2 +- cpp/src/io/orc/stripe_enc.cu | 2 +- cpp/src/io/orc/writer_impl.cu | 2 +- cpp/src/io/orc/writer_impl.hpp | 2 +- cpp/src/io/parquet/writer_impl.cu | 2 +- cpp/src/io/parquet/writer_impl.hpp | 2 +- cpp/src/io/parquet/writer_impl_helpers.cpp | 2 +- cpp/src/io/parquet/writer_impl_helpers.hpp | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 63bc697b33f..943b2b0b28d 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index 3f1084873d7..2b345defed3 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -110,4 +110,4 @@ void compress(compression_type compression, rmm::cuda_stream_view stream); } // namespace io::detail -} // namespace CUDF_EXPORT cudf +} // namespace cudf diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 58024ea3004..0a35b230242 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/statistics.cu index 4c745935a0f..af0f73869a2 100644 --- a/cpp/src/io/comp/statistics.cu +++ b/cpp/src/io/comp/statistics.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index d08071b899f..8b30cee6681 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index a4f5279023d..526a0dc2c0d 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 8be10e444f5..f73aec100b6 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 8d371daba1d..7d23482cb17 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index c0178688120..a53caff7a16 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index aa3973987ba..d5a5a534b93 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/parquet/writer_impl_helpers.cpp b/cpp/src/io/parquet/writer_impl_helpers.cpp index 729f05af9ee..ede788c97c2 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.cpp +++ b/cpp/src/io/parquet/writer_impl_helpers.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/parquet/writer_impl_helpers.hpp b/cpp/src/io/parquet/writer_impl_helpers.hpp index b7056fb0053..b5c73c348fe 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.hpp +++ b/cpp/src/io/parquet/writer_impl_helpers.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From e010f9f46b27ceaa46809108bcca2ff1ac24d670 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 2 Jan 2025 11:23:52 -0800 Subject: [PATCH 18/23] style some more --- cpp/src/io/comp/comp.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index 2b345defed3..aa91a76ed29 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -110,4 +110,4 @@ void compress(compression_type compression, rmm::cuda_stream_view stream); } // namespace io::detail -} // namespace cudf +} // namespace CUDF_EXPORT cudf From 8ceecff1a82362e30fa53f709f7d9f440fb63235 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 6 Jan 2025 15:17:22 -0800 Subject: [PATCH 19/23] handle AUTO compression in options --- cpp/include/cudf/io/orc.hpp | 20 ++++++++++++++------ cpp/src/io/functions.cpp | 1 + cpp/tests/io/orc_test.cpp | 1 + cpp/tests/io/parquet_misc_test.cpp | 1 + 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 163fa20806d..0de9e996d20 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -578,7 +578,7 @@ class orc_writer_options { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use - compression_type _compression = compression_type::AUTO; + compression_type _compression = compression_type::SNAPPY; // Specify frequency of statistics collection statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) @@ -733,7 +733,11 @@ class orc_writer_options { * * @param comp Compression type */ - void set_compression(compression_type comp) { _compression = comp; } + void set_compression(compression_type comp) + { + _compression = comp; + if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; } + } /** * @brief Choose granularity of statistics collection. @@ -865,7 +869,7 @@ class orc_writer_options_builder { */ orc_writer_options_builder& compression(compression_type comp) { - options._compression = comp; + options.set_compression(comp); return *this; } @@ -1026,7 +1030,7 @@ class chunked_orc_writer_options { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use - compression_type _compression = compression_type::AUTO; + compression_type _compression = compression_type::SNAPPY; // Specify granularity of statistics collection statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) @@ -1157,7 +1161,11 @@ class chunked_orc_writer_options { * * @param comp The compression type to use */ - void set_compression(compression_type comp) { _compression = comp; } + void set_compression(compression_type comp) + { + _compression = comp; + if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; } + } /** * @brief Choose granularity of statistics collection @@ -1279,7 +1287,7 @@ class chunked_orc_writer_options_builder { */ chunked_orc_writer_options_builder& compression(compression_type comp) { - options._compression = comp; + options.set_compression(comp); return *this; } diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 88423122e16..da4203133c5 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -766,6 +766,7 @@ void parquet_writer_options_base::set_stats_level(statistics_freq sf) { _stats_l void parquet_writer_options_base::set_compression(compression_type compression) { _compression = compression; + if (compression == compression_type::AUTO) { _compression = compression_type::SNAPPY; } } void parquet_writer_options_base::enable_int96_timestamps(bool req) diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 2209a30149d..9b9e59e8319 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -2068,6 +2068,7 @@ TEST_P(OrcCompressionTest, Basic) INSTANTIATE_TEST_CASE_P(OrcCompressionTest, OrcCompressionTest, ::testing::Values(cudf::io::compression_type::NONE, + cudf::io::compression_type::AUTO, cudf::io::compression_type::SNAPPY, cudf::io::compression_type::LZ4, cudf::io::compression_type::ZSTD)); diff --git a/cpp/tests/io/parquet_misc_test.cpp b/cpp/tests/io/parquet_misc_test.cpp index d66f685cd9c..e49b0ea9eb0 100644 --- a/cpp/tests/io/parquet_misc_test.cpp +++ b/cpp/tests/io/parquet_misc_test.cpp @@ -268,6 +268,7 @@ TEST_P(ParquetCompressionTest, Basic) INSTANTIATE_TEST_CASE_P(ParquetCompressionTest, ParquetCompressionTest, ::testing::Values(cudf::io::compression_type::NONE, + cudf::io::compression_type::AUTO, cudf::io::compression_type::SNAPPY, cudf::io::compression_type::LZ4, cudf::io::compression_type::ZSTD)); From b5b06aaac50693c03bb64e18b69e8b8a7323f564 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 6 Jan 2025 16:18:12 -0800 Subject: [PATCH 20/23] style --- cpp/include/cudf/io/orc.hpp | 2 +- cpp/src/io/functions.cpp | 2 +- cpp/tests/io/orc_test.cpp | 2 +- cpp/tests/io/parquet_misc_test.cpp | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 0de9e996d20..82f7761da2e 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index da4203133c5..d63fa9f5c35 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 9b9e59e8319..708c2045a74 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/tests/io/parquet_misc_test.cpp b/cpp/tests/io/parquet_misc_test.cpp index e49b0ea9eb0..419ac909ac6 100644 --- a/cpp/tests/io/parquet_misc_test.cpp +++ b/cpp/tests/io/parquet_misc_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From ae1b9801843e3f6a2f7b0d46de2a617183cc4ad7 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 8 Jan 2025 11:02:50 -0800 Subject: [PATCH 21/23] remove unused function --- cpp/src/io/orc/stripe_enc.cu | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 526a0dc2c0d..4f296bb5bfc 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1356,18 +1356,6 @@ void CompactOrcDataStreams(device_2dspan strm_desc, srcs.begin(), dsts.begin(), lengths.begin(), lengths.size(), stream); } -compression_type from_orc_compression(orc::CompressionKind compression) -{ - switch (compression) { - case orc::CompressionKind::NONE: return compression_type::NONE; - case orc::CompressionKind::SNAPPY: return compression_type::SNAPPY; - case orc::CompressionKind::ZLIB: return compression_type::ZLIB; - case orc::CompressionKind::ZSTD: return compression_type::ZSTD; - case orc::CompressionKind::LZ4: return compression_type::LZ4; - default: CUDF_FAIL("Unsupported compression type"); - } -} - std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, From 05b5f3d1d6c951e59db4cd22208f4b58a7027b04 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 10 Jan 2025 13:06:14 -0800 Subject: [PATCH 22/23] code review suggestions --- cpp/CMakeLists.txt | 2 +- cpp/src/io/comp/comp.cpp | 16 ++++++++-------- cpp/src/io/comp/{statistics.cu => comp.cu} | 0 cpp/src/io/comp/comp.hpp | 8 ++++---- cpp/src/io/orc/writer_impl.cu | 6 +++--- cpp/src/io/parquet/writer_impl.cu | 10 +++++----- 6 files changed, 21 insertions(+), 21 deletions(-) rename cpp/src/io/comp/{statistics.cu => comp.cu} (100%) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9dabe4e8800..252cc7897d8 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -477,13 +477,13 @@ add_library( src/io/avro/reader_impl.cu src/io/comp/brotli_dict.cpp src/io/comp/comp.cpp + src/io/comp/comp.cu src/io/comp/cpu_unbz2.cpp src/io/comp/debrotli.cu src/io/comp/gpuinflate.cu src/io/comp/nvcomp_adapter.cpp src/io/comp/nvcomp_adapter.cu src/io/comp/snap.cu - src/io/comp/statistics.cu src/io/comp/uncomp.cpp src/io/comp/unsnap.cu src/io/csv/csv_gpu.cu diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 943b2b0b28d..0f4e335bc64 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -148,16 +148,16 @@ void host_compress(compression_type compression, { if (compression == compression_type::NONE) { return; } - auto const num_blocks = inputs.size(); - auto h_results = cudf::detail::make_host_vector(num_blocks, stream); + auto const num_chunks = inputs.size(); + auto h_results = cudf::detail::make_host_vector(num_chunks, stream); auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream); auto const h_outputs = cudf::detail::make_host_vector_async(outputs, stream); stream.synchronize(); std::vector> tasks; - auto streams = cudf::detail::fork_streams(stream, h_comp_pool().get_thread_count()); - for (size_t i = 0; i < num_blocks; ++i) { - auto cur_stream = streams[i % streams.size()]; + auto const streams = cudf::detail::fork_streams(stream, h_comp_pool().get_thread_count()); + for (size_t i = 0; i < num_chunks; ++i) { + auto const cur_stream = streams[i % streams.size()]; auto task = [d_in = h_inputs[i], d_out = h_outputs[i], cur_stream, compression]() -> size_t { auto const h_in = cudf::detail::make_host_vector_sync(d_in, cur_stream); auto const h_out = compress(compression, h_in, cur_stream); @@ -167,7 +167,7 @@ void host_compress(compression_type compression, tasks.emplace_back(h_comp_pool().submit_task(std::move(task))); } - for (auto i = 0ul; i < num_blocks; ++i) { + for (auto i = 0ul; i < num_chunks; ++i) { h_results[i] = {tasks[i].get(), compression_status::SUCCESS}; } cudf::detail::cuda_memcpy_async(results, h_results, stream); @@ -211,7 +211,7 @@ void host_compress(compression_type compression, } // namespace -std::optional compress_max_allowed_block_size(compression_type compression) +std::optional compress_max_allowed_chunk_size(compression_type compression) { if (auto nvcomp_type = to_nvcomp_compression(compression); nvcomp_type.has_value() and not nvcomp::is_compression_disabled(*nvcomp_type)) { @@ -220,7 +220,7 @@ std::optional compress_max_allowed_block_size(compression_type compressi return std::nullopt; } -[[nodiscard]] size_t compress_required_block_alignment(compression_type compression) +[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression) { auto nvcomp_type = to_nvcomp_compression(compression); if (compression == compression_type::NONE or not nvcomp_type.has_value() or diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/comp.cu similarity index 100% rename from cpp/src/io/comp/statistics.cu rename to cpp/src/io/comp/comp.cu diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index aa91a76ed29..90932a11499 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -58,12 +58,12 @@ std::vector compress(compression_type compression, rmm::cuda_stream_view stream); /** - * @brief Maximum size of uncompressed blocks that can be compressed. + * @brief Maximum size of uncompressed chunks that can be compressed. * * @param compression Compression type - * @returns maximum block size + * @returns maximum chunk size */ -[[nodiscard]] std::optional compress_max_allowed_block_size(compression_type compression); +[[nodiscard]] std::optional compress_max_allowed_chunk_size(compression_type compression); /** * @brief Gets input and output alignment requirements for the given compression type. @@ -71,7 +71,7 @@ std::vector compress(compression_type compression, * @param compression Compression type * @returns required alignment */ -[[nodiscard]] size_t compress_required_block_alignment(compression_type compression); +[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression); /** * @brief Gets the maximum size any chunk could compress to in the batch. diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index f73aec100b6..aa0b509981a 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -110,7 +110,7 @@ orc::CompressionKind to_orc_compression(compression_type compression) */ size_t compression_block_size(compression_type compression) { - auto const comp_limit = compress_max_allowed_block_size(compression); + auto const comp_limit = compress_max_allowed_chunk_size(compression); constexpr size_t max_block_size = 256 * 1024; return std::min(comp_limit.value_or(max_block_size), max_block_size); @@ -569,7 +569,7 @@ orc_streams create_streams(host_span columns, auto add_stream = [&](gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { - auto const max_alignment_padding = compress_required_block_alignment(compression) - 1; + auto const max_alignment_padding = compress_required_chunk_alignment(compression) - 1; const auto base = column.index() * gpu::CI_NUM_STREAMS; ids[base + index_type] = streams.size(); streams.push_back(orc::Stream{ @@ -2281,7 +2281,7 @@ auto convert_table_to_orc_data(table_view const& input, auto stripe_dicts = build_dictionaries(orc_table, segmentation, sort_dictionaries, stream); auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); - auto const block_align = compress_required_block_alignment(compression); + auto const block_align = compress_required_chunk_alignment(compression); auto streams = create_streams(orc_table.columns, segmentation, diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 61fdd181f6f..1b67b53ae8e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1200,7 +1200,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - compress_required_block_alignment(compression), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1225,7 +1225,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - compress_required_block_alignment(compression), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1249,7 +1249,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - compress_required_block_alignment(compression), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1262,7 +1262,7 @@ size_t max_page_bytes(compression_type compression, size_t max_page_size_bytes) { if (compression == compression_type::NONE) { return max_page_size_bytes; } - auto const comp_limit = compress_max_allowed_block_size(compression); + auto const comp_limit = compress_max_allowed_chunk_size(compression); auto max_size = std::min(comp_limit.value_or(max_page_size_bytes), max_page_size_bytes); // page size must fit in a 32-bit signed integer @@ -2195,7 +2195,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, num_columns, num_pages, num_stats_bfr, - compress_required_block_alignment(compression), + compress_required_chunk_alignment(compression), max_page_size_bytes, max_page_size_rows, write_v2_headers, From c72b66d45e62fbbdf31fc37a86da38a4db542f1e Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 10 Jan 2025 17:03:27 -0800 Subject: [PATCH 23/23] env var; limit num streams --- cpp/src/io/comp/comp.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 0f4e335bc64..3800835eaf1 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -38,7 +38,9 @@ namespace { auto& h_comp_pool() { - static BS::thread_pool pool(std::thread::hardware_concurrency()); + static std::size_t pool_size = + getenv_or("LIBCUDF_HOST_COMPRESSION_NUM_THREADS", std::thread::hardware_concurrency()); + static BS::thread_pool pool(pool_size); return pool; } @@ -155,7 +157,11 @@ void host_compress(compression_type compression, stream.synchronize(); std::vector> tasks; - auto const streams = cudf::detail::fork_streams(stream, h_comp_pool().get_thread_count()); + auto const num_streams = + std::min({num_chunks, + cudf::detail::global_cuda_stream_pool().get_stream_pool_size(), + h_comp_pool().get_thread_count()}); + auto const streams = cudf::detail::fork_streams(stream, num_streams); for (size_t i = 0; i < num_chunks; ++i) { auto const cur_stream = streams[i % streams.size()]; auto task = [d_in = h_inputs[i], d_out = h_outputs[i], cur_stream, compression]() -> size_t {