diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index c19f2ccaeae..c355a87fd88 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -127,6 +127,7 @@ datasource::owning_buffer get_record_range_raw_input( std::size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; + auto const delimiter = reader_opts.get_delimiter(); auto const num_extra_delimiters = num_delimiter_chars * sources.size(); compression_type const reader_compression = reader_opts.get_compression(); std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); @@ -155,12 +156,12 @@ datasource::owning_buffer get_record_range_raw_input( // Offset within buffer indicating first read position std::int64_t buffer_offset = 0; - auto readbufspan = - ingest_raw_input(bufspan, sources, reader_compression, chunk_offset, chunk_size, stream); + auto readbufspan = ingest_raw_input( + bufspan, sources, reader_compression, chunk_offset, chunk_size, delimiter, stream); auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); auto const first_delim_pos = - chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream); + chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, delimiter, stream); if (first_delim_pos == -1) { // return empty owning datasource buffer auto empty_buf = rmm::device_buffer(0, stream); @@ -180,14 +181,15 @@ datasource::owning_buffer get_record_range_raw_input( reader_compression, next_subchunk_start, size_per_subchunk, + delimiter, stream); - next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset; + next_delim_pos = find_first_delimiter(readbufspan, delimiter, stream) + buffer_offset; next_subchunk_start += size_per_subchunk; } if (next_delim_pos < buffer_offset) { if (next_subchunk_start >= total_source_size) { // If we have reached the end of source list but the source does not terminate with a - // newline character + // delimiter character next_delim_pos = buffer_offset + readbufspan.size(); } else { // Our buffer_size estimate is insufficient to read until the end of the line! We need to @@ -212,9 +214,9 @@ datasource::owning_buffer get_record_range_raw_input( // Add delimiter to end of buffer iff // (i) We are reading till the end of the last source i.e. should_load_till_last_source is - // true (ii) The last character in bufspan is not newline. - // For (ii) in the case of Spark, if the last character is not a newline, it could be the case - // that there are characters after the newline in the last record. We then consider those + // true (ii) The last character in bufspan is not delimiter. + // For (ii) in the case of Spark, if the last character is not a delimiter, it could be the case + // that there are characters after the delimiter in the last record. We then consider those // characters to be a part of a new (possibly empty) line. size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset; if (num_chars) { @@ -225,8 +227,8 @@ datasource::owning_buffer get_record_range_raw_input( cudaMemcpyDeviceToHost, stream.value())); stream.synchronize(); - if (last_char != '\n') { - last_char = '\n'; + if (last_char != delimiter) { + last_char = delimiter; CUDF_CUDA_TRY(cudaMemcpyAsync(reinterpret_cast(buffer.data()) + readbufspan.size(), &last_char, sizeof(char), @@ -274,6 +276,7 @@ device_span ingest_raw_input(device_span buffer, compression_type compression, std::size_t range_offset, std::size_t range_size, + char delimiter, rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); @@ -325,7 +328,7 @@ device_span ingest_raw_input(device_span buffer, if (sources.size() > 1) { static_assert(num_delimiter_chars == 1, "Currently only single-character delimiters are supported"); - auto const delimiter_source = thrust::make_constant_iterator('\n'); + auto const delimiter_source = thrust::make_constant_iterator(delimiter); auto const d_delimiter_map = cudf::detail::make_device_uvector_async( delimiter_map, stream, cudf::get_current_device_resource_ref()); thrust::scatter(rmm::exec_policy_nosync(stream), diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 982190eecb5..4def69cc629 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -56,6 +56,7 @@ device_span ingest_raw_input(device_span buffer, compression_type compression, size_t range_offset, size_t range_size, + char delimiter, rmm::cuda_stream_view stream); /** diff --git a/cpp/tests/io/json/json_utils.cuh b/cpp/tests/io/json/json_utils.cuh index 9383797d91b..c31bb2d24e0 100644 --- a/cpp/tests/io/json/json_utils.cuh +++ b/cpp/tests/io/json/json_utils.cuh @@ -52,6 +52,7 @@ std::vector split_byte_range_reading( reader_opts.get_compression(), reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size(), + reader_opts.get_delimiter(), stream); // Note: we cannot reuse cudf::io::json::detail::find_first_delimiter since the // return type of that function is size_type. However, when the chunk_size is