Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Host compression #17656

Open
wants to merge 46 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
81dcfa6
random clean up
vuule Dec 18, 2024
4f7794d
jesus
vuule Dec 18, 2024
3166acb
Merge branch 'branch-25.02' into comp-headers-cleanup
vuule Dec 18, 2024
b3f03e8
style
vuule Dec 18, 2024
53205c5
style
vuule Dec 18, 2024
05d07ba
Merge branch 'branch-25.02' into comp-headers-cleanup
vuule Dec 18, 2024
7d23502
Merge branch 'branch-25.02' into comp-headers-cleanup
vuule Dec 18, 2024
324d635
Update cpp/src/io/comp/common.hpp
vuule Dec 18, 2024
350db40
Merge branch 'branch-25.02' into comp-headers-cleanup
vuule Dec 19, 2024
0cf8375
Merge branch 'branch-25.02' into comp-headers-cleanup
vuule Dec 19, 2024
947fbd4
fix
vuule Dec 20, 2024
0a64f1c
Merge branch 'comp-headers-cleanup' of https://github.com/vuule/cudf …
vuule Dec 20, 2024
54d9bb9
fix some more
vuule Dec 20, 2024
2ca535b
Merge branch 'branch-25.02' of https://github.com/rapidsai/cudf into …
vuule Dec 20, 2024
963f066
avoid part of nvcomp enabled checks in writers
vuule Dec 20, 2024
e119ad8
single-threaded host comp
vuule Dec 20, 2024
3ab8c41
now with more threads
vuule Dec 21, 2024
a14b351
decouple orc writer from nvcomp
vuule Dec 21, 2024
a8e1dec
Merge branch 'branch-25.02' of https://github.com/rapidsai/cudf into …
vuule Dec 23, 2024
0b6b72d
decouple pq writer from nvcomp
vuule Dec 23, 2024
d83abac
missed DEFLATE
vuule Dec 23, 2024
e2dce81
simplify
vuule Dec 23, 2024
9b8fc71
clean up
vuule Dec 23, 2024
7aaf5ed
fix
vuule Dec 23, 2024
9a5ca7d
Merge branch 'branch-25.02' into high-lvl-comp-api
vuule Jan 2, 2025
3d311ae
style
vuule Jan 2, 2025
ba5321a
Merge branch 'high-lvl-comp-api' of https://github.com/vuule/cudf int…
vuule Jan 2, 2025
e010f9f
style some more
vuule Jan 2, 2025
19cd311
Merge branch 'branch-25.02' into high-lvl-comp-api
vuule Jan 3, 2025
3094173
Merge branch 'branch-25.02' into high-lvl-comp-api
vuule Jan 3, 2025
b83c1ff
Merge branch 'branch-25.02' of https://github.com/rapidsai/cudf into …
vuule Jan 6, 2025
8ceecff
handle AUTO compression in options
vuule Jan 6, 2025
7fa6055
Merge branch 'high-lvl-comp-api' of https://github.com/vuule/cudf int…
vuule Jan 6, 2025
b2cdcf4
Merge branch 'branch-25.02' into high-lvl-comp-api
vuule Jan 6, 2025
b5b06aa
style
vuule Jan 7, 2025
bfae53a
Merge branch 'high-lvl-comp-api' of https://github.com/vuule/cudf int…
vuule Jan 7, 2025
11ca033
Merge branch 'branch-25.02' into high-lvl-comp-api
vuule Jan 7, 2025
ea04f43
Merge branch 'branch-25.02' into high-lvl-comp-api
vuule Jan 7, 2025
3e403b0
Merge branch 'branch-25.02' into high-lvl-comp-api
vuule Jan 8, 2025
ae1b980
remove unused function
vuule Jan 8, 2025
a725970
Merge branch 'branch-25.02' into high-lvl-comp-api
vuule Jan 8, 2025
05b5f3d
code review suggestions
vuule Jan 10, 2025
70baa8d
Merge branch 'high-lvl-comp-api' of https://github.com/vuule/cudf int…
vuule Jan 10, 2025
b81c7f5
Merge branch 'branch-25.02' into high-lvl-comp-api
vuule Jan 10, 2025
c72b66d
env var; limit num streams
vuule Jan 11, 2025
559ca43
Merge branch 'high-lvl-comp-api' of https://github.com/vuule/cudf int…
vuule Jan 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -578,7 +578,7 @@ class orc_writer_options {
// Specify the sink to use for writer output
sink_info _sink;
// Specify the compression format to use
compression_type _compression = compression_type::AUTO;
compression_type _compression = compression_type::SNAPPY;
// Specify frequency of statistics collection
statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP;
// Maximum size of each stripe (unless smaller than a single row group)
Expand Down Expand Up @@ -733,7 +733,11 @@ class orc_writer_options {
*
* @param comp Compression type
*/
void set_compression(compression_type comp) { _compression = comp; }
void set_compression(compression_type comp)
{
_compression = comp;
if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my education, Is it common sense that AUTO should be just SNAPPY?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compression_type is common for all file formats, so AUTO may mean different compression types for different formats.
I guess we could remove AUTO and set a concrete compression type as the default for each format.

}

/**
* @brief Choose granularity of statistics collection.
Expand Down Expand Up @@ -865,7 +869,7 @@ class orc_writer_options_builder {
*/
orc_writer_options_builder& compression(compression_type comp)
{
options._compression = comp;
options.set_compression(comp);
return *this;
}

Expand Down Expand Up @@ -1026,7 +1030,7 @@ class chunked_orc_writer_options {
// Specify the sink to use for writer output
sink_info _sink;
// Specify the compression format to use
compression_type _compression = compression_type::AUTO;
compression_type _compression = compression_type::SNAPPY;
// Specify granularity of statistics collection
statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP;
// Maximum size of each stripe (unless smaller than a single row group)
Expand Down Expand Up @@ -1157,7 +1161,11 @@ class chunked_orc_writer_options {
*
* @param comp The compression type to use
*/
void set_compression(compression_type comp) { _compression = comp; }
void set_compression(compression_type comp)
{
_compression = comp;
if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; }
}

/**
* @brief Choose granularity of statistics collection
Expand Down Expand Up @@ -1279,7 +1287,7 @@ class chunked_orc_writer_options_builder {
*/
chunked_orc_writer_options_builder& compression(compression_type comp)
{
options._compression = comp;
options.set_compression(comp);
return *this;
}

Expand Down
157 changes: 156 additions & 1 deletion cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,22 +16,43 @@

#include "comp.hpp"

#include "gpuinflate.hpp"
#include "io/utilities/getenv_or.hpp"
#include "io/utilities/hostdevice_vector.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <BS_thread_pool.hpp>
#include <zlib.h> // GZIP compression

namespace cudf::io::detail {

namespace {

auto& h_comp_pool()
{
static BS::thread_pool pool(std::thread::hardware_concurrency());
return pool;
}

std::optional<nvcomp::compression_type> to_nvcomp_compression(compression_type compression)
{
switch (compression) {
case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY;
case compression_type::ZSTD: return nvcomp::compression_type::ZSTD;
case compression_type::LZ4: return nvcomp::compression_type::LZ4;
case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE;
default: return std::nullopt;
}
}

/**
* @brief GZIP host compressor (includes header)
*/
Expand Down Expand Up @@ -98,8 +119,128 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
return cudf::detail::make_std_vector_sync<uint8_t>(d_dst, stream);
}

void device_compress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream)
{
if (compression == compression_type::NONE) { return; }

auto const nvcomp_type = to_nvcomp_compression(compression);
auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_compression_disabled(*nvcomp_type)
: "invalid compression type";
if (not nvcomp_disabled) {
return nvcomp::batched_compress(*nvcomp_type, inputs, outputs, results, stream);
}

switch (compression) {
case compression_type::SNAPPY: return gpu_snap(inputs, outputs, results, stream);
default: CUDF_FAIL("Compression error: " + nvcomp_disabled.value());
}
}

void host_compress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream)
{
if (compression == compression_type::NONE) { return; }

auto const num_blocks = inputs.size();
auto h_results = cudf::detail::make_host_vector<compression_result>(num_blocks, stream);
auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream);
auto const h_outputs = cudf::detail::make_host_vector_async(outputs, stream);
stream.synchronize();
shrshi marked this conversation as resolved.
Show resolved Hide resolved

std::vector<std::future<size_t>> tasks;
auto streams = cudf::detail::fork_streams(stream, h_comp_pool().get_thread_count());
vuule marked this conversation as resolved.
Show resolved Hide resolved
for (size_t i = 0; i < num_blocks; ++i) {
auto cur_stream = streams[i % streams.size()];
auto task = [d_in = h_inputs[i], d_out = h_outputs[i], cur_stream, compression]() -> size_t {
auto const h_in = cudf::detail::make_host_vector_sync(d_in, cur_stream);
auto const h_out = compress(compression, h_in, cur_stream);
cudf::detail::cuda_memcpy<uint8_t>(d_out.subspan(0, h_out.size()), h_out, cur_stream);
return h_out.size();
};
tasks.emplace_back(h_comp_pool().submit_task(std::move(task)));
}

for (auto i = 0ul; i < num_blocks; ++i) {
h_results[i] = {tasks[i].get(), compression_status::SUCCESS};
}
cudf::detail::cuda_memcpy_async<compression_result>(results, h_results, stream);
shrshi marked this conversation as resolved.
Show resolved Hide resolved
}

[[nodiscard]] bool host_compression_supported(compression_type compression)
{
switch (compression) {
case compression_type::GZIP:
case compression_type::NONE: return true;
default: return false;
}
}

[[nodiscard]] bool device_compression_supported(compression_type compression)
{
auto const nvcomp_type = to_nvcomp_compression(compression);
switch (compression) {
case compression_type::LZ4:
case compression_type::ZLIB:
case compression_type::ZSTD: return not nvcomp::is_compression_disabled(nvcomp_type.value());
case compression_type::SNAPPY:
case compression_type::NONE: return true;
default: return false;
}
}

[[nodiscard]] bool use_host_compression(
compression_type compression,
[[maybe_unused]] device_span<device_span<uint8_t const> const> inputs,
[[maybe_unused]] device_span<device_span<uint8_t> const> outputs)
{
CUDF_EXPECTS(
not host_compression_supported(compression) or device_compression_supported(compression),
"Unsupported compression type");
if (not host_compression_supported(compression)) { return false; }
if (not device_compression_supported(compression)) { return true; }
// If both host and device compression are supported, use the host if the env var is set
return getenv_or("LIBCUDF_USE_HOST_COMPRESSION", 0);
}

} // namespace

std::optional<size_t> compress_max_allowed_block_size(compression_type compression)
{
if (auto nvcomp_type = to_nvcomp_compression(compression);
nvcomp_type.has_value() and not nvcomp::is_compression_disabled(*nvcomp_type)) {
return nvcomp::compress_max_allowed_chunk_size(*nvcomp_type);
}
return std::nullopt;
}

[[nodiscard]] size_t compress_required_block_alignment(compression_type compression)
{
auto nvcomp_type = to_nvcomp_compression(compression);
if (compression == compression_type::NONE or not nvcomp_type.has_value() or
nvcomp::is_compression_disabled(*nvcomp_type)) {
return 1ul;
}

return nvcomp::required_alignment(*nvcomp_type);
}

[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size)
{
if (compression == compression_type::NONE) { return uncompressed_size; }

if (auto nvcomp_type = to_nvcomp_compression(compression); nvcomp_type.has_value()) {
return nvcomp::compress_max_output_chunk_size(*nvcomp_type, uncompressed_size);
}
CUDF_FAIL("Unsupported compression type");
}

std::vector<std::uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
Expand All @@ -112,4 +253,18 @@ std::vector<std::uint8_t> compress(compression_type compression,
}
}

void compress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
if (use_host_compression(compression, inputs, outputs)) {
return host_compress(compression, inputs, outputs, results, stream);
} else {
return device_compress(compression, inputs, outputs, results, stream);
}
}

} // namespace cudf::io::detail
54 changes: 53 additions & 1 deletion cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,5 +57,57 @@ std::vector<uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream);

/**
* @brief Maximum size of uncompressed blocks that can be compressed.
*
* @param compression Compression type
* @returns maximum block size
*/
[[nodiscard]] std::optional<size_t> compress_max_allowed_block_size(compression_type compression);

/**
* @brief Gets input and output alignment requirements for the given compression type.
*
* @param compression Compression type
* @returns required alignment
*/
[[nodiscard]] size_t compress_required_block_alignment(compression_type compression);
vuule marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Gets the maximum size any chunk could compress to in the batch.
*
* @param compression Compression type
* @param uncompressed_size Size of the largest uncompressed chunk in the batch
*/
[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size);

/**
* @brief Compresses device memory buffers.
*
* @param compression Type of compression of the input data
* @param inputs Device memory buffers to compress
* @param outputs Device memory buffers to store the compressed output
* @param results Compression results
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void compress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream);

/**
* @brief Aggregate results of compression into a single statistics object.
*
* @param inputs List of uncompressed input buffers
* @param results List of compression results
* @param stream CUDA stream to use
* @return writer_compression_statistics
*/
[[nodiscard]] writer_compression_statistics collect_compression_statistics(
device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result const> results,
rmm::cuda_stream_view stream);
vuule marked this conversation as resolved.
Show resolved Hide resolved

} // namespace io::detail
} // namespace CUDF_EXPORT cudf
15 changes: 1 addition & 14 deletions cpp/src/io/comp/gpuinflate.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -124,17 +124,4 @@ void gpu_snap(device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream);

/**
* @brief Aggregate results of compression into a single statistics object.
*
* @param inputs List of uncompressed input buffers
* @param results List of compression results
* @param stream CUDA stream to use
* @return writer_compression_statistics
*/
[[nodiscard]] writer_compression_statistics collect_compression_statistics(
device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result const> results,
rmm::cuda_stream_view stream);

} // namespace cudf::io::detail
4 changes: 2 additions & 2 deletions cpp/src/io/comp/statistics.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "gpuinflate.hpp"
#include "comp.hpp"

#include <rmm/exec_policy.hpp>

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -766,6 +766,7 @@ void parquet_writer_options_base::set_stats_level(statistics_freq sf) { _stats_l
void parquet_writer_options_base::set_compression(compression_type compression)
{
_compression = compression;
if (compression == compression_type::AUTO) { _compression = compression_type::SNAPPY; }
}

void parquet_writer_options_base::enable_int96_timestamps(bool req)
Expand Down
Loading
Loading