Skip to content

Commit

Permalink
decouple pq writer from nvcomp
Browse files Browse the repository at this point in the history
  • Loading branch information
vuule committed Dec 23, 2024
1 parent a8e1dec commit 0b6b72d
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 131 deletions.
13 changes: 13 additions & 0 deletions cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,18 @@ void compress(compression_type compression,
device_span<compression_result> results,
rmm::cuda_stream_view stream);

/**
* @brief Aggregate results of compression into a single statistics object.
*
* @param inputs List of uncompressed input buffers
* @param results List of compression results
* @param stream CUDA stream to use
* @return writer_compression_statistics
*/
[[nodiscard]] writer_compression_statistics collect_compression_statistics(
device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result const> results,
rmm::cuda_stream_view stream);

} // namespace io::detail
} // namespace CUDF_EXPORT cudf
13 changes: 0 additions & 13 deletions cpp/src/io/comp/gpuinflate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,4 @@ void gpu_snap(device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream);

/**
* @brief Aggregate results of compression into a single statistics object.
*
* @param inputs List of uncompressed input buffers
* @param results List of compression results
* @param stream CUDA stream to use
* @return writer_compression_statistics
*/
[[nodiscard]] writer_compression_statistics collect_compression_statistics(
device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result const> results,
rmm::cuda_stream_view stream);

} // namespace cudf::io::detail
2 changes: 1 addition & 1 deletion cpp/src/io/comp/statistics.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "gpuinflate.hpp"
#include "comp.hpp"

#include <rmm/exec_policy.hpp>

Expand Down
98 changes: 40 additions & 58 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
#include "compact_protocol_reader.hpp"
#include "compact_protocol_writer.hpp"
#include "interop/decimal_conversion_utilities.cuh"
#include "io/comp/gpuinflate.hpp"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/comp/comp.hpp"
#include "io/parquet/parquet.hpp"
#include "io/parquet/parquet_gpu.hpp"
#include "io/statistics/column_statistics.cuh"
Expand Down Expand Up @@ -67,6 +66,20 @@ namespace cudf::io::parquet::detail {

using namespace cudf::io::detail;

Compression to_parquet_compression(compression_type compression)
{
switch (compression) {
case compression_type::AUTO:
case compression_type::SNAPPY: return Compression::SNAPPY;
case compression_type::ZSTD: return Compression::ZSTD;
case compression_type::LZ4:
// Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4
return Compression::LZ4_RAW;
case compression_type::NONE: return Compression::UNCOMPRESSED;
default: CUDF_FAIL("Unsupported compression type");
}
}

struct aggregate_writer_metadata {
aggregate_writer_metadata(host_span<partition_info const> partitions,
host_span<std::map<std::string, std::string> const> kv_md,
Expand Down Expand Up @@ -1172,7 +1185,7 @@ auto init_page_sizes(hostdevice_2dvector<EncColumnChunk>& chunks,
size_t max_page_size_bytes,
size_type max_page_size_rows,
bool write_v2_headers,
Compression compression_codec,
compression_type compression,
rmm::cuda_stream_view stream)
{
if (chunks.is_empty()) { return cudf::detail::hostdevice_vector<size_type>{}; }
Expand All @@ -1187,7 +1200,7 @@ auto init_page_sizes(hostdevice_2dvector<EncColumnChunk>& chunks,
num_columns,
max_page_size_bytes,
max_page_size_rows,
page_alignment(compression_codec),
compress_required_block_alignment(compression),
write_v2_headers,
nullptr,
nullptr,
Expand All @@ -1212,7 +1225,7 @@ auto init_page_sizes(hostdevice_2dvector<EncColumnChunk>& chunks,
num_columns,
max_page_size_bytes,
max_page_size_rows,
page_alignment(compression_codec),
compress_required_block_alignment(compression),
write_v2_headers,
nullptr,
nullptr,
Expand All @@ -1221,12 +1234,10 @@ auto init_page_sizes(hostdevice_2dvector<EncColumnChunk>& chunks,

// Get per-page max compressed size
cudf::detail::hostdevice_vector<size_type> comp_page_sizes(num_pages, stream);
std::transform(page_sizes.begin(),
page_sizes.end(),
comp_page_sizes.begin(),
[compression_codec](auto page_size) {
return max_compression_output_size(compression_codec, page_size);
});
std::transform(
page_sizes.begin(), page_sizes.end(), comp_page_sizes.begin(), [compression](auto page_size) {
return max_compressed_size(compression, page_size);
});
comp_page_sizes.host_to_device_async(stream);

// Use per-page max compressed size to calculate chunk.compressed_size
Expand All @@ -1238,7 +1249,7 @@ auto init_page_sizes(hostdevice_2dvector<EncColumnChunk>& chunks,
num_columns,
max_page_size_bytes,
max_page_size_rows,
page_alignment(compression_codec),
compress_required_block_alignment(compression),
write_v2_headers,
nullptr,
nullptr,
Expand All @@ -1247,16 +1258,13 @@ auto init_page_sizes(hostdevice_2dvector<EncColumnChunk>& chunks,
return comp_page_sizes;
}

size_t max_page_bytes(Compression compression, size_t max_page_size_bytes)
size_t max_page_bytes(compression_type compression, size_t max_page_size_bytes)
{
if (compression == Compression::UNCOMPRESSED) { return max_page_size_bytes; }
if (compression == compression_type::NONE) { return max_page_size_bytes; }

auto const ncomp_type = to_nvcomp_compression_type(compression);
auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type)
? std::nullopt
: nvcomp::compress_max_allowed_chunk_size(ncomp_type);
auto const comp_limit = compress_max_allowed_block_size(compression);

auto max_size = std::min(nvcomp_limit.value_or(max_page_size_bytes), max_page_size_bytes);
auto max_size = std::min(comp_limit.value_or(max_page_size_bytes), max_page_size_bytes);
// page size must fit in a 32-bit signed integer
return std::min<size_t>(max_size, std::numeric_limits<int32_t>::max());
}
Expand All @@ -1265,7 +1273,7 @@ std::pair<std::vector<rmm::device_uvector<size_type>>, std::vector<rmm::device_u
build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
host_span<parquet_column_device_view const> col_desc,
device_2dspan<PageFragment const> frags,
Compression compression,
compression_type compression,
dictionary_policy dict_policy,
size_t max_dict_size,
rmm::cuda_stream_view stream)
Expand Down Expand Up @@ -1404,7 +1412,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
* @param num_columns Total number of columns
* @param num_pages Total number of pages
* @param num_stats_bfr Number of statistics buffers
* @param compression Compression format
* @param alignment Page alignment
* @param max_page_size_bytes Maximum uncompressed page size, in bytes
* @param max_page_size_rows Maximum page size, in rows
* @param write_v2_headers True if version 2 page headers are to be written
Expand All @@ -1419,7 +1427,7 @@ void init_encoder_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
uint32_t num_columns,
uint32_t num_pages,
uint32_t num_stats_bfr,
Compression compression,
size_t alignment,
size_t max_page_size_bytes,
size_type max_page_size_rows,
bool write_v2_headers,
Expand All @@ -1435,7 +1443,7 @@ void init_encoder_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
num_columns,
max_page_size_bytes,
max_page_size_rows,
page_alignment(compression),
alignment,
write_v2_headers,
(num_stats_bfr) ? page_stats_mrg.data() : nullptr,
(num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr,
Expand All @@ -1455,31 +1463,6 @@ void init_encoder_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
stream.synchronize();
}

Compression to_parquet_compression(compression_type compression)
{
switch (compression) {
case compression_type::AUTO:
case compression_type::SNAPPY: return Compression::SNAPPY;
case compression_type::ZSTD: return Compression::ZSTD;
case compression_type::LZ4:
// Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4
return Compression::LZ4_RAW;
case compression_type::NONE: return Compression::UNCOMPRESSED;
default: CUDF_FAIL("Unsupported compression type");
}
}

compression_type from_parquet_compression(Compression codec)
{
switch (codec) {
case Compression::SNAPPY: return compression_type::SNAPPY;
case Compression::ZSTD: return compression_type::ZSTD;
case Compression::LZ4_RAW: return compression_type::LZ4;
case Compression::UNCOMPRESSED: return compression_type::NONE;
default: CUDF_FAIL("Unsupported compression type");
}
}

/**
* @brief Encode pages.
*
Expand All @@ -1503,7 +1486,7 @@ void encode_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
statistics_chunk const* chunk_stats,
statistics_chunk const* column_stats,
std::optional<writer_compression_statistics>& comp_stats,
Compression compression,
compression_type compression,
int32_t column_index_truncate_length,
bool write_v2_headers,
rmm::cuda_stream_view stream)
Expand All @@ -1513,7 +1496,7 @@ void encode_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
? device_span<statistics_chunk const>(page_stats, num_pages)
: device_span<statistics_chunk const>();

uint32_t max_comp_pages = (compression != Compression::UNCOMPRESSED) ? num_pages : 0;
uint32_t max_comp_pages = (compression != compression_type::NONE) ? num_pages : 0;

rmm::device_uvector<device_span<uint8_t const>> comp_in(max_comp_pages, stream);
rmm::device_uvector<device_span<uint8_t>> comp_out(max_comp_pages, stream);
Expand All @@ -1524,8 +1507,7 @@ void encode_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
compression_result{0, compression_status::FAILURE});

EncodePages(pages, write_v2_headers, comp_in, comp_out, comp_res, stream);
cudf::io::detail::compress(
from_parquet_compression(compression), comp_in, comp_out, comp_res, stream);
compress(compression, comp_in, comp_out, comp_res, stream);

// TBD: Not clear if the official spec actually allows dynamically turning off compression at the
// chunk-level
Expand Down Expand Up @@ -1743,7 +1725,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
size_type max_page_size_rows,
int32_t column_index_truncate_length,
statistics_freq stats_granularity,
Compression compression,
compression_type compression,
bool collect_compression_statistics,
dictionary_policy dict_policy,
size_t max_dictionary_size,
Expand Down Expand Up @@ -2145,7 +2127,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
}

// Clear compressed buffer size if compression has been turned off
if (compression == Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; }
if (compression == compression_type::NONE) { max_comp_bfr_size = 0; }

// Initialize data pointers
uint32_t const num_stats_bfr =
Expand Down Expand Up @@ -2213,7 +2195,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
num_columns,
num_pages,
num_stats_bfr,
compression,
compress_required_block_alignment(compression),
max_page_size_bytes,
max_page_size_rows,
write_v2_headers,
Expand Down Expand Up @@ -2269,7 +2251,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
auto const dev_bfr = ck.is_compressed ? ck.compressed_bfr : ck.uncompressed_bfr;
auto& column_chunk_meta = row_group.columns[i].meta_data;

if (ck.is_compressed) { column_chunk_meta.codec = compression; }
if (ck.is_compressed) { column_chunk_meta.codec = to_parquet_compression(compression); }
if (!out_sink[p]->is_device_write_preferred(ck.compressed_size)) {
all_device_write = false;
}
Expand Down Expand Up @@ -2374,7 +2356,7 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
single_write_mode mode,
rmm::cuda_stream_view stream)
: _stream(stream),
_compression(to_parquet_compression(options.get_compression())),
_compression(options.get_compression()),
_max_row_group_size{options.get_row_group_size_bytes()},
_max_row_group_rows{options.get_row_group_size_rows()},
_max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())),
Expand Down Expand Up @@ -2405,7 +2387,7 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
single_write_mode mode,
rmm::cuda_stream_view stream)
: _stream(stream),
_compression(to_parquet_compression(options.get_compression())),
_compression(options.get_compression()),
_max_row_group_size{options.get_row_group_size_bytes()},
_max_row_group_rows{options.get_row_group_size_rows()},
_max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())),
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class writer::impl {
rmm::cuda_stream_view _stream;

// Writer options.
Compression const _compression;
compression_type const _compression;
size_t const _max_row_group_size;
size_type const _max_row_group_rows;
size_t const _max_page_size_bytes;
Expand Down
30 changes: 0 additions & 30 deletions cpp/src/io/parquet/writer_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

#include "writer_impl_helpers.hpp"

#include "io/comp/nvcomp_adapter.hpp"

#include <cudf/lists/lists_column_view.hpp>
#include <cudf/strings/detail/utilities.hpp>
#include <cudf/strings/strings_column_view.hpp>
Expand All @@ -32,34 +30,6 @@ namespace cudf::io::parquet::detail {

using namespace cudf::io::detail;

nvcomp::compression_type to_nvcomp_compression_type(Compression codec)
{
switch (codec) {
case Compression::SNAPPY: return nvcomp::compression_type::SNAPPY;
case Compression::ZSTD: return nvcomp::compression_type::ZSTD;
// Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4
case Compression::LZ4_RAW: return nvcomp::compression_type::LZ4;
default: CUDF_FAIL("Unsupported compression type");
}
}

uint32_t page_alignment(Compression codec)
{
if (codec == Compression::UNCOMPRESSED or
nvcomp::is_compression_disabled(to_nvcomp_compression_type(codec))) {
return 1u;
}

return nvcomp::required_alignment(to_nvcomp_compression_type(codec));
}

size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize)
{
if (codec == Compression::UNCOMPRESSED) return 0;

return compress_max_output_chunk_size(to_nvcomp_compression_type(codec), compression_blocksize);
}

void fill_table_meta(table_input_metadata& table_meta)
{
// Fill unnamed columns' names in table_meta
Expand Down
28 changes: 0 additions & 28 deletions cpp/src/io/parquet/writer_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,12 @@
*/

#pragma once
#include "parquet_common.hpp"

#include <cudf/detail/utilities/linked_column.hpp>
#include <cudf/io/detail/parquet.hpp>
#include <cudf/io/nvcomp_adapter.hpp>

namespace cudf::io::parquet::detail {

/**
* @brief Function that translates the given compression codec to nvcomp compression type.
*
* @param codec Compression codec
* @return Translated nvcomp compression type
*/
cudf::io::detail::nvcomp::compression_type to_nvcomp_compression_type(Compression codec);

/**
* @brief Function that computes input alignment requirements for the given compression type.
*
* @param codec Compression codec
* @return Required alignment
*/
uint32_t page_alignment(Compression codec);

/**
* @brief Gets the maximum compressed chunk size for the largest chunk uncompressed chunk in the
* batch.
*
* @param codec Compression codec
* @param compression_blocksize Size of the largest uncompressed chunk in the batch
* @return Maximum compressed chunk size
*/
size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize);

/**
* @brief Fill the table metadata with default column names.
*
Expand Down

0 comments on commit 0b6b72d

Please sign in to comment.