Skip to content

Commit

Permalink
Writing compressed output using JSON writer (#17323)
Browse files Browse the repository at this point in the history
Depends on #17161 for implementations of compression and decompression functions (`io/comp/comp.cu`, `io/comp/comp.hpp`, `io/comp/io_uncomp.hpp` and `io/comp/uncomp.cpp`)

Adds support for writing GZIP- and SNAPPY-compressed JSON to the JSON writer.
Verifies correctness using a parameterized test in `tests/io/json/json_writer.cpp`

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Kyle Edwards (https://github.com/KyleFromNVIDIA)
  - Karthikeyan (https://github.com/karthikeyann)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #17323
  • Loading branch information
shrshi authored Nov 19, 2024
1 parent 5f9a97f commit 384abae
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 110 deletions.
28 changes: 28 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,8 @@ class json_writer_options_builder;
class json_writer_options {
// Specify the sink to use for writer output
sink_info _sink;
// Specify the compression format of the sink
compression_type _compression = compression_type::NONE;
// maximum number of rows to write in each chunk (limits memory use)
size_type _rows_per_chunk = std::numeric_limits<size_type>::max();
// Set of columns to output
Expand Down Expand Up @@ -1022,6 +1024,13 @@ class json_writer_options {
*/
[[nodiscard]] std::string const& get_na_rep() const { return _na_rep; }

/**
* @brief Returns compression type used for sink
*
* @return compression type for sink
*/
[[nodiscard]] compression_type get_compression() const { return _compression; }

/**
* @brief Whether to output nulls as 'null'.
*
Expand Down Expand Up @@ -1066,6 +1075,13 @@ class json_writer_options {
*/
void set_table(table_view tbl) { _table = tbl; }

/**
* @brief Sets compression type to be used
*
* @param comptype Compression type for sink
*/
void set_compression(compression_type comptype) { _compression = comptype; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1153,6 +1169,18 @@ class json_writer_options_builder {
return *this;
}

/**
* @brief Sets compression type of output sink
*
* @param comptype Compression type used
* @return this for chaining
*/
json_writer_options_builder& compression(compression_type comptype)
{
options._compression = comptype;
return *this;
}

/**
* @brief Sets optional metadata (with column names).
*
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <zlib.h> // compress
#include <zlib.h> // GZIP compression

namespace cudf::io::detail {

Expand Down Expand Up @@ -77,12 +77,12 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
{
auto const d_src =
cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref());
rmm::device_uvector<uint8_t> d_dst(src.size(), stream);

cudf::detail::hostdevice_vector<device_span<uint8_t const>> inputs(1, stream);
inputs[0] = d_src;
inputs.host_to_device_async(stream);

auto dst_size = compress_max_output_chunk_size(nvcomp::compression_type::SNAPPY, src.size());
rmm::device_uvector<uint8_t> d_dst(dst_size, stream);
cudf::detail::hostdevice_vector<device_span<uint8_t>> outputs(1, stream);
outputs[0] = d_dst;
outputs.host_to_device_async(stream);
Expand All @@ -93,13 +93,10 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,

nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

stream.synchronize();
hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
std::vector<uint8_t> dst(d_dst.size());
cudf::detail::cuda_memcpy(host_span<uint8_t>{dst}, device_span<uint8_t const>{d_dst}, stream);
return dst;
return cudf::detail::make_std_vector_sync<uint8_t>(d_dst, stream);
}

} // namespace
Expand Down
29 changes: 25 additions & 4 deletions cpp/src/io/json/write_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @brief cuDF-IO JSON writer implementation
*/

#include "io/comp/comp.hpp"
#include "io/csv/durations.hpp"
#include "io/utilities/parsing_utils.cuh"
#include "lists/utilities.hpp"
Expand Down Expand Up @@ -828,10 +829,10 @@ void write_chunked(data_sink* out_sink,
}
}

void write_json(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
void write_json_uncompressed(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
std::vector<column_name_info> user_column_names = [&]() {
Expand Down Expand Up @@ -934,4 +935,24 @@ void write_json(data_sink* out_sink,
}
}

void write_json(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
{
if (options.get_compression() != compression_type::NONE) {
std::vector<char> hbuf;
auto hbuf_sink_ptr = data_sink::create(&hbuf);
write_json_uncompressed(hbuf_sink_ptr.get(), table, options, stream);
stream.synchronize();
auto comp_hbuf = cudf::io::detail::compress(
options.get_compression(),
host_span<uint8_t>(reinterpret_cast<uint8_t*>(hbuf.data()), hbuf.size()),
stream);
out_sink->host_write(comp_hbuf.data(), comp_hbuf.size());
return;
}
write_json_uncompressed(out_sink, table, options, stream);
}

} // namespace cudf::io::json::detail
Loading

0 comments on commit 384abae

Please sign in to comment.