Skip to content

Commit

Permalink
remove hardcoding of delimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
shrshi committed Oct 16, 2024
1 parent 3d0a51d commit 911e065
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
25 changes: 14 additions & 11 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(

std::size_t const total_source_size = sources_size(sources, 0, 0);
auto constexpr num_delimiter_chars = 1;
auto const delimiter = reader_opts.get_delimiter();
auto const num_extra_delimiters = num_delimiter_chars * sources.size();
compression_type const reader_compression = reader_opts.get_compression();
std::size_t const chunk_offset = reader_opts.get_byte_range_offset();
Expand Down Expand Up @@ -155,12 +156,12 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(

// Offset within buffer indicating first read position
std::int64_t buffer_offset = 0;
auto readbufspan =
ingest_raw_input(bufspan, sources, reader_compression, chunk_offset, chunk_size, stream);
auto readbufspan = ingest_raw_input(
bufspan, sources, reader_compression, chunk_offset, chunk_size, delimiter, stream);

auto const shift_for_nonzero_offset = std::min<std::int64_t>(chunk_offset, 1);
auto const first_delim_pos =
chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream);
chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, delimiter, stream);
if (first_delim_pos == -1) {
// return empty owning datasource buffer
auto empty_buf = rmm::device_buffer(0, stream);
Expand All @@ -180,14 +181,15 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
reader_compression,
next_subchunk_start,
size_per_subchunk,
delimiter,
stream);
next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset;
next_delim_pos = find_first_delimiter(readbufspan, delimiter, stream) + buffer_offset;
next_subchunk_start += size_per_subchunk;
}
if (next_delim_pos < buffer_offset) {
if (next_subchunk_start >= total_source_size) {
// If we have reached the end of source list but the source does not terminate with a
// newline character
// delimiter character
next_delim_pos = buffer_offset + readbufspan.size();
} else {
// Our buffer_size estimate is insufficient to read until the end of the line! We need to
Expand All @@ -212,9 +214,9 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(

// Add delimiter to end of buffer iff
// (i) We are reading till the end of the last source i.e. should_load_till_last_source is
// true (ii) The last character in bufspan is not newline.
// For (ii) in the case of Spark, if the last character is not a newline, it could be the case
// that there are characters after the newline in the last record. We then consider those
// true (ii) The last character in bufspan is not delimiter.
// For (ii) in the case of Spark, if the last character is not a delimiter, it could be the case
// that there are characters after the delimiter in the last record. We then consider those
// characters to be a part of a new (possibly empty) line.
size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset;
if (num_chars) {
Expand All @@ -225,8 +227,8 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
cudaMemcpyDeviceToHost,
stream.value()));
stream.synchronize();
if (last_char != '\n') {
last_char = '\n';
if (last_char != delimiter) {
last_char = delimiter;
CUDF_CUDA_TRY(cudaMemcpyAsync(reinterpret_cast<char*>(buffer.data()) + readbufspan.size(),
&last_char,
sizeof(char),
Expand Down Expand Up @@ -274,6 +276,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
compression_type compression,
std::size_t range_offset,
std::size_t range_size,
char delimiter,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
Expand Down Expand Up @@ -325,7 +328,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
if (sources.size() > 1) {
static_assert(num_delimiter_chars == 1,
"Currently only single-character delimiters are supported");
auto const delimiter_source = thrust::make_constant_iterator('\n');
auto const delimiter_source = thrust::make_constant_iterator(delimiter);
auto const d_delimiter_map = cudf::detail::make_device_uvector_async(
delimiter_map, stream, cudf::get_current_device_resource_ref());
thrust::scatter(rmm::exec_policy_nosync(stream),
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/json/read_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
compression_type compression,
size_t range_offset,
size_t range_size,
char delimiter,
rmm::cuda_stream_view stream);

/**
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/io/json/json_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ std::vector<cudf::io::table_with_metadata> split_byte_range_reading(
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
reader_opts.get_delimiter(),
stream);
// Note: we cannot reuse cudf::io::json::detail::find_first_delimiter since the
// return type of that function is size_type. However, when the chunk_size is
Expand Down

0 comments on commit 911e065

Please sign in to comment.