From 4080f495200224c9656e3c2738fe507d7e4abee9 Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Fri, 22 Nov 2024 18:21:48 -0800 Subject: [PATCH] Implement `from_json_to_structs` (#2510) * Implement `castStringsToBooleans` Signed-off-by: Nghia Truong * Implement `removeQuotes` Signed-off-by: Nghia Truong * Rewrite using offsets and chars Signed-off-by: Nghia Truong * Fix empty input Signed-off-by: Nghia Truong * Misc Signed-off-by: Nghia Truong * Add `nullifyIfNotQuoted` option for `removeQuotes` Signed-off-by: Nghia Truong * Implement `castStringsToDecimals` Signed-off-by: Nghia Truong * Implement `removeQuotesForFloats` Signed-off-by: Nghia Truong * Fix `removeQuotesForFloats` Signed-off-by: Nghia Truong * Implement `castStringsToIntegers` Signed-off-by: Nghia Truong * Implement non-legacy `castStringsToDates` Signed-off-by: Nghia Truong * WIP for `cast_strings_to_dates_legacy` Signed-off-by: Nghia Truong * Revert "WIP for `cast_strings_to_dates_legacy`" This reverts commit dcb463e6f74def2479c97d1dc29082446ba1637b. * Fix compile issues Signed-off-by: Nghia Truong * WIP: Implement `from_json_to_structs` Signed-off-by: Nghia Truong * Fix cmake Signed-off-by: Nghia Truong * Fix compile issues Signed-off-by: Nghia Truong * Implement `castStringsToFloats` Signed-off-by: Nghia Truong * WIP Signed-off-by: Nghia Truong * WIP: Implementing `fromJSONToStructs` Signed-off-by: Nghia Truong * Fix compile errors Signed-off-by: Nghia Truong * Cleanup Signed-off-by: Nghia Truong * Revert code as we still need them * Add error check Signed-off-by: Nghia Truong * Add more comments Signed-off-by: Nghia Truong * Cleanup Signed-off-by: Nghia Truong * Return as-is if the column is date/time Signed-off-by: Nghia Truong * Update test Signed-off-by: Nghia Truong * Update cudf Signed-off-by: Nghia Truong * Revert "Update cudf" This reverts commit 5d07db1d7c23344a993cad6b143a9b8952b23cfa. * Update cudf * Update cudf * Change header * Rewrite JSONUtils.cpp * Implement a common function for converting column Signed-off-by: Nghia Truong * Rewrite `convert_data_type` Signed-off-by: Nghia Truong * Remove `cast_strings_to_dates` Signed-off-by: Nghia Truong * Implement `convert_data_type` Signed-off-by: Nghia Truong * Fix compile errors Signed-off-by: Nghia Truong * Add `CUDF_FUNC_RANGE();` Signed-off-by: Nghia Truong * Fix schema Signed-off-by: Nghia Truong * Complete `from_json_to_structs` Signed-off-by: Nghia Truong * Fix null mask Signed-off-by: Nghia Truong * Write Javadoc Signed-off-by: Nghia Truong * Rewrite JNI Signed-off-by: Nghia Truong * Remove deprecated function Signed-off-by: Nghia Truong * Revert test Signed-off-by: Nghia Truong * Remove header Signed-off-by: Nghia Truong * Rewrite Javadoc Signed-off-by: Nghia Truong * Rename variable Signed-off-by: Nghia Truong * Rewrite docs Signed-off-by: Nghia Truong * Revert test Signed-off-by: Nghia Truong * Cleanup headers Signed-off-by: Nghia Truong * Cleanup Signed-off-by: Nghia Truong * Rewrite the conversion functions Signed-off-by: Nghia Truong * Move code Signed-off-by: Nghia Truong * Remove call to `make_structs_column` Signed-off-by: Nghia Truong * Cleanup Signed-off-by: Nghia Truong * Optimize conversion further, avoiding to materialize column if not needed Signed-off-by: Nghia Truong * Rewrite docs and change function name Signed-off-by: Nghia Truong * Reorganize code Signed-off-by: Nghia Truong * Handle schema mismatching Signed-off-by: Nghia Truong * Add test Signed-off-by: Nghia Truong * Add another test Signed-off-by: Nghia Truong * Revert "Add another test" This reverts commit 8a17651545a3c848dbeb848e07039a870df4b921. * Fix schema mismatch Signed-off-by: Nghia Truong * Cleanup Signed-off-by: Nghia Truong * Add another test Signed-off-by: Nghia Truong * Revert "Add another test" This reverts commit cf9d6bf4019b4c2bafaa0331a5376b822c6ed629. * Revert "Add test" This reverts commit 553d7d0bb14e4c23f882e7a0509afe6d3be6d68c. Signed-off-by: Nghia Truong * Add prefix `spark_rapids_jni::` Signed-off-by: Nghia Truong * Remove handling for schema mismatching Signed-off-by: Nghia Truong * Avoid materializing a column when converting strings Signed-off-by: Nghia Truong * Revert "Remove handling for schema mismatching" This reverts commit d2b6fb591e8ebe70dbc1e5ff60006b8c130afcaf. * Fix handling for schema mismatching in case of `column_view` input Signed-off-by: Nghia Truong --------- Signed-off-by: Nghia Truong --- src/main/cpp/CMakeLists.txt | 3 +- src/main/cpp/src/JSONUtilsJni.cpp | 126 +- src/main/cpp/src/from_json_to_structs.cu | 1083 +++++++++++++++++ src/main/cpp/src/json_utils.cu | 40 +- src/main/cpp/src/json_utils.hpp | 52 +- .../nvidia/spark/rapids/jni/JSONUtils.java | 139 ++- 6 files changed, 1309 insertions(+), 134 deletions(-) create mode 100644 src/main/cpp/src/from_json_to_structs.cu diff --git a/src/main/cpp/CMakeLists.txt b/src/main/cpp/CMakeLists.txt index bfb9d55377..20a918266e 100644 --- a/src/main/cpp/CMakeLists.txt +++ b/src/main/cpp/CMakeLists.txt @@ -207,13 +207,14 @@ add_library( src/bloom_filter.cu src/case_when.cu src/cast_decimal_to_string.cu - src/format_float.cu src/cast_float_to_string.cu src/cast_string.cu src/cast_string_to_float.cu src/datetime_rebase.cu src/decimal_utils.cu + src/format_float.cu src/from_json_to_raw_map.cu + src/from_json_to_structs.cu src/get_json_object.cu src/histogram.cu src/json_utils.cu diff --git a/src/main/cpp/src/JSONUtilsJni.cpp b/src/main/cpp/src/JSONUtilsJni.cpp index 2e0f624a49..2c4b8e1aaa 100644 --- a/src/main/cpp/src/JSONUtilsJni.cpp +++ b/src/main/cpp/src/JSONUtilsJni.cpp @@ -166,50 +166,118 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_extractRawMap CATCH_STD(env, 0); } -JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_concatenateJsonStrings( - JNIEnv* env, jclass, jlong j_input) +JNIEXPORT jlong JNICALL +Java_com_nvidia_spark_rapids_jni_JSONUtils_fromJSONToStructs(JNIEnv* env, + jclass, + jlong j_input, + jobjectArray j_col_names, + jintArray j_num_children, + jintArray j_types, + jintArray j_scales, + jintArray j_precisions, + jboolean normalize_single_quotes, + jboolean allow_leading_zeros, + jboolean allow_nonnumeric_numbers, + jboolean allow_unquoted_control, + jboolean is_us_locale) { JNI_NULL_CHECK(env, j_input, "j_input is null", 0); + JNI_NULL_CHECK(env, j_col_names, "j_col_names is null", 0); + JNI_NULL_CHECK(env, j_num_children, "j_num_children is null", 0); + JNI_NULL_CHECK(env, j_types, "j_types is null", 0); + JNI_NULL_CHECK(env, j_scales, "j_scales is null", 0); + JNI_NULL_CHECK(env, j_precisions, "j_precisions is null", 0); try { cudf::jni::auto_set_device(env); - auto const input_cv = reinterpret_cast(j_input); - // Currently, set `nullify_invalid_rows = false` as `concatenateJsonStrings` is used only for - // `from_json` with struct schema. - auto [joined_strings, delimiter, should_be_nullify] = spark_rapids_jni::concat_json( - cudf::strings_column_view{*input_cv}, /*nullify_invalid_rows*/ false); - - // The output array contains 5 elements: - // [0]: address of the cudf::column object `is_valid` in host memory - // [1]: address of data buffer of the concatenated strings in device memory - // [2]: data length - // [3]: address of the rmm::device_buffer object (of the concatenated strings) in host memory - // [4]: delimiter char - auto out_handles = cudf::jni::native_jlongArray(env, 5); - out_handles[0] = reinterpret_cast(should_be_nullify.release()); - out_handles[1] = reinterpret_cast(joined_strings->data()); - out_handles[2] = static_cast(joined_strings->size()); - out_handles[3] = reinterpret_cast(joined_strings.release()); - out_handles[4] = static_cast(delimiter); - return out_handles.get_jArray(); + auto const input_cv = reinterpret_cast(j_input); + auto const col_names = cudf::jni::native_jstringArray(env, j_col_names).as_cpp_vector(); + auto const num_children = cudf::jni::native_jintArray(env, j_num_children).to_vector(); + auto const types = cudf::jni::native_jintArray(env, j_types).to_vector(); + auto const scales = cudf::jni::native_jintArray(env, j_scales).to_vector(); + auto const precisions = cudf::jni::native_jintArray(env, j_precisions).to_vector(); + + CUDF_EXPECTS(col_names.size() > 0, "Invalid schema data: col_names."); + CUDF_EXPECTS(col_names.size() == num_children.size(), "Invalid schema data: num_children."); + CUDF_EXPECTS(col_names.size() == types.size(), "Invalid schema data: types."); + CUDF_EXPECTS(col_names.size() == scales.size(), "Invalid schema data: scales."); + CUDF_EXPECTS(col_names.size() == precisions.size(), "Invalid schema data: precisions."); + + return cudf::jni::ptr_as_jlong( + spark_rapids_jni::from_json_to_structs(cudf::strings_column_view{*input_cv}, + col_names, + num_children, + types, + scales, + precisions, + normalize_single_quotes, + allow_leading_zeros, + allow_nonnumeric_numbers, + allow_unquoted_control, + is_us_locale) + .release()); } CATCH_STD(env, 0); } -JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_makeStructs( - JNIEnv* env, jclass, jlongArray j_children, jlong j_is_null) +JNIEXPORT jlong JNICALL +Java_com_nvidia_spark_rapids_jni_JSONUtils_convertFromStrings(JNIEnv* env, + jclass, + jlong j_input, + jintArray j_num_children, + jintArray j_types, + jintArray j_scales, + jintArray j_precisions, + jboolean allow_nonnumeric_numbers, + jboolean is_us_locale) { - JNI_NULL_CHECK(env, j_children, "j_children is null", 0); - JNI_NULL_CHECK(env, j_is_null, "j_is_null is null", 0); + JNI_NULL_CHECK(env, j_input, "j_input is null", 0); + JNI_NULL_CHECK(env, j_num_children, "j_num_children is null", 0); + JNI_NULL_CHECK(env, j_types, "j_types is null", 0); + JNI_NULL_CHECK(env, j_scales, "j_scales is null", 0); + JNI_NULL_CHECK(env, j_precisions, "j_precisions is null", 0); try { cudf::jni::auto_set_device(env); - auto const children = - cudf::jni::native_jpointerArray{env, j_children}.get_dereferenced(); - auto const is_null = *reinterpret_cast(j_is_null); - return cudf::jni::ptr_as_jlong(spark_rapids_jni::make_structs(children, is_null).release()); + + auto const input_cv = reinterpret_cast(j_input); + auto const num_children = cudf::jni::native_jintArray(env, j_num_children).to_vector(); + auto const types = cudf::jni::native_jintArray(env, j_types).to_vector(); + auto const scales = cudf::jni::native_jintArray(env, j_scales).to_vector(); + auto const precisions = cudf::jni::native_jintArray(env, j_precisions).to_vector(); + + CUDF_EXPECTS(num_children.size() > 0, "Invalid schema data: num_children."); + CUDF_EXPECTS(num_children.size() == types.size(), "Invalid schema data: types."); + CUDF_EXPECTS(num_children.size() == scales.size(), "Invalid schema data: scales."); + CUDF_EXPECTS(num_children.size() == precisions.size(), "Invalid schema data: precisions."); + + return cudf::jni::ptr_as_jlong( + spark_rapids_jni::convert_from_strings(cudf::strings_column_view{*input_cv}, + num_children, + types, + scales, + precisions, + allow_nonnumeric_numbers, + is_us_locale) + .release()); } CATCH_STD(env, 0); } + +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_removeQuotes( + JNIEnv* env, jclass, jlong j_input, jboolean nullify_if_not_quoted) +{ + JNI_NULL_CHECK(env, j_input, "j_input is null", 0); + + try { + cudf::jni::auto_set_device(env); + auto const input_cv = reinterpret_cast(j_input); + return cudf::jni::ptr_as_jlong( + spark_rapids_jni::remove_quotes(cudf::strings_column_view{*input_cv}, nullify_if_not_quoted) + .release()); + } + CATCH_STD(env, 0); } + +} // extern "C" diff --git a/src/main/cpp/src/from_json_to_structs.cu b/src/main/cpp/src/from_json_to_structs.cu new file mode 100644 index 0000000000..ddfdcc4c4f --- /dev/null +++ b/src/main/cpp/src/from_json_to_structs.cu @@ -0,0 +1,1083 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cast_string.hpp" +#include "json_utils.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace spark_rapids_jni { + +namespace detail { + +namespace { + +/** + * @brief The struct similar to `cudf::io::schema_element` with adding decimal precision and + * preserving column order. + */ +struct schema_element_with_precision { + cudf::data_type type; + int precision; + std::vector> child_types; +}; + +std::pair parse_schema_element( + std::size_t& index, + std::vector const& col_names, + std::vector const& num_children, + std::vector const& types, + std::vector const& scales, + std::vector const& precisions) +{ + // Get data for the current column. + auto const d_type = cudf::data_type{static_cast(types[index]), scales[index]}; + auto const precision = precisions[index]; + auto const col_num_children = num_children[index]; + index++; + + std::map children; + std::vector> children_with_precisions; + std::vector child_names(col_num_children); + + if (d_type.id() == cudf::type_id::STRUCT || d_type.id() == cudf::type_id::LIST) { + for (int i = 0; i < col_num_children; ++i) { + auto const& name = col_names[index]; + auto [child, child_with_precision] = + parse_schema_element(index, col_names, num_children, types, scales, precisions); + children.emplace(name, std::move(child)); + children_with_precisions.emplace_back(name, std::move(child_with_precision)); + child_names[i] = name; + } + } else { + CUDF_EXPECTS(col_num_children == 0, + "Found children for a non-nested type that should have none.", + std::invalid_argument); + } + + // Note that if the first schema element does not has type STRUCT/LIST then it always has type + // STRING, since we intentionally parse JSON into strings column for later post-processing. + auto const schema_dtype = + d_type.id() == cudf::type_id::STRUCT || d_type.id() == cudf::type_id::LIST + ? d_type + : cudf::data_type{cudf::type_id::STRING}; + return {cudf::io::schema_element{schema_dtype, std::move(children), {std::move(child_names)}}, + schema_element_with_precision{d_type, precision, std::move(children_with_precisions)}}; +} + +// Generate struct type schemas by traveling the schema data by depth-first search order. +// Two separate schemas is generated: +// - The first one is used as input to `cudf::read_json`, in which the data types of all columns +// are specified as STRING type. As such, the table returned by `cudf::read_json` will contain +// only strings columns or nested (LIST/STRUCT) columns. +// - The second schema contains decimal precision (if available) and preserves schema column types +// as well as the column order, used for converting from STRING type to the desired types for the +// final output. +std::pair generate_struct_schema( + std::vector const& col_names, + std::vector const& num_children, + std::vector const& types, + std::vector const& scales, + std::vector const& precisions) +{ + std::map schema_cols; + std::vector> schema_cols_with_precisions; + std::vector name_order; + + std::size_t index = 0; + while (index < types.size()) { + auto const& name = col_names[index]; + auto [child, child_with_precision] = + parse_schema_element(index, col_names, num_children, types, scales, precisions); + schema_cols.emplace(name, std::move(child)); + schema_cols_with_precisions.emplace_back(name, std::move(child_with_precision)); + name_order.push_back(name); + } + return { + cudf::io::schema_element{ + cudf::data_type{cudf::type_id::STRUCT}, std::move(schema_cols), {std::move(name_order)}}, + schema_element_with_precision{ + cudf::data_type{cudf::type_id::STRUCT}, -1, std::move(schema_cols_with_precisions)}}; +} + +using string_index_pair = thrust::pair; + +std::unique_ptr cast_strings_to_booleans(cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto const string_count = input.size(); + if (string_count == 0) { return cudf::make_empty_column(cudf::data_type{cudf::type_id::BOOL8}); } + + auto output = cudf::make_fixed_width_column( + cudf::data_type{cudf::type_id::BOOL8}, string_count, cudf::mask_state::UNALLOCATED, stream, mr); + auto validity = rmm::device_uvector(string_count, stream); + + auto const input_sv = cudf::strings_column_view{input}; + auto const offsets_it = + cudf::detail::offsetalator_factory::make_input_iterator(input_sv.offsets()); + auto const d_input_ptr = cudf::column_device_view::create(input, stream); + auto const is_valid_it = cudf::detail::make_validity_iterator(*d_input_ptr); + auto const output_it = thrust::make_zip_iterator( + thrust::make_tuple(output->mutable_view().begin(), validity.begin())); + thrust::tabulate( + rmm::exec_policy_nosync(stream), + output_it, + output_it + string_count, + [chars = input_sv.chars_begin(stream), offsets = offsets_it, is_valid = is_valid_it] __device__( + auto idx) -> thrust::tuple { + if (is_valid[idx]) { + auto const start_offset = offsets[idx]; + auto const end_offset = offsets[idx + 1]; + auto const size = end_offset - start_offset; + auto const str = chars + start_offset; + + if (size == 4 && str[0] == 't' && str[1] == 'r' && str[2] == 'u' && str[3] == 'e') { + return {true, true}; + } + if (size == 5 && str[0] == 'f' && str[1] == 'a' && str[2] == 'l' && str[3] == 's' && + str[4] == 'e') { + return {false, true}; + } + } + + // Either null input, or the input string is neither `true` nor `false`. + return {false, false}; + }); + + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity{}, stream, mr); + output->set_null_mask(null_count > 0 ? std::move(null_mask) : rmm::device_buffer{0, stream, mr}, + null_count); + + return output; +} + +std::unique_ptr cast_strings_to_integers(cudf::column_view const& input, + cudf::data_type output_type, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto const string_count = input.size(); + if (string_count == 0) { return cudf::make_empty_column(output_type); } + + auto const input_sv = cudf::strings_column_view{input}; + auto const input_offsets_it = + cudf::detail::offsetalator_factory::make_input_iterator(input_sv.offsets()); + auto const d_input_ptr = cudf::column_device_view::create(input, stream); + auto const valid_input_it = cudf::detail::make_validity_iterator(*d_input_ptr); + + // We need to nullify the invalid string rows. + // Technically, we should just mask out these rows as nulls through the nullmask. + // These masked out non-empty nulls will be handled in the conversion API. + auto valids = rmm::device_uvector(string_count, stream); + + // Since the strings store integer numbers, they should be very short. + // As such, using one thread per string should be fine. + thrust::tabulate(rmm::exec_policy_nosync(stream), + valids.begin(), + valids.end(), + [chars = input_sv.chars_begin(stream), + offsets = input_offsets_it, + valid_input = valid_input_it] __device__(cudf::size_type idx) -> bool { + if (!valid_input[idx]) { return false; } + + auto in_ptr = chars + offsets[idx]; + auto const in_end = chars + offsets[idx + 1]; + while (in_ptr != in_end) { + if (*in_ptr == '.' || *in_ptr == 'e' || *in_ptr == 'E') { return false; } + ++in_ptr; + } + + return true; + }); + + auto const [null_mask, null_count] = + cudf::detail::valid_if(valids.begin(), + valids.end(), + thrust::identity{}, + stream, + cudf::get_current_device_resource_ref()); + // If the null count doesn't change, just use the input column for conversion. + auto const input_applied_null = + null_count == input.null_count() + ? cudf::column_view{} + : cudf::column_view{cudf::data_type{cudf::type_id::STRING}, + input_sv.size(), + input_sv.chars_begin(stream), + reinterpret_cast(null_mask.data()), + null_count, + input_sv.offset(), + std::vector{input_sv.offsets()}}; + + return spark_rapids_jni::string_to_integer( + output_type, + null_count == input.null_count() ? input_sv : cudf::strings_column_view{input_applied_null}, + /*ansi_mode*/ false, + /*strip*/ false, + stream, + mr); +} + +std::pair, bool> try_remove_quotes_for_floats( + cudf::column_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto const string_count = input.size(); + if (string_count == 0) { return {nullptr, false}; } + + auto const input_sv = cudf::strings_column_view{input}; + auto const input_offsets_it = + cudf::detail::offsetalator_factory::make_input_iterator(input_sv.offsets()); + auto const d_input_ptr = cudf::column_device_view::create(input, stream); + auto const is_valid_it = cudf::detail::make_validity_iterator(*d_input_ptr); + + auto string_pairs = rmm::device_uvector(string_count, stream); + thrust::tabulate(rmm::exec_policy_nosync(stream), + string_pairs.begin(), + string_pairs.end(), + [chars = input_sv.chars_begin(stream), + offsets = input_offsets_it, + is_valid = is_valid_it] __device__(cudf::size_type idx) -> string_index_pair { + if (!is_valid[idx]) { return {nullptr, 0}; } + + auto const start_offset = offsets[idx]; + auto const end_offset = offsets[idx + 1]; + auto const size = end_offset - start_offset; + auto const str = chars + start_offset; + + // Need to check for size, since the input string may contain just a single + // character `"`. Such input should not be considered as quoted. + auto const is_quoted = size > 1 && str[0] == '"' && str[size - 1] == '"'; + + // We check and remove quotes only for the special cases (non-numeric numbers + // wrapped in double quotes) that are accepted in `from_json`. + // They are "NaN", "+INF", "-INF", "+Infinity", "Infinity", "-Infinity". + if (is_quoted) { + // "NaN" + auto accepted = size == 5 && str[1] == 'N' && str[2] == 'a' && str[3] == 'N'; + + // "+INF" and "-INF" + accepted = accepted || (size == 6 && (str[1] == '+' || str[1] == '-') && + str[2] == 'I' && str[3] == 'N' && str[4] == 'F'); + + // "Infinity" + accepted = accepted || (size == 10 && str[1] == 'I' && str[2] == 'n' && + str[3] == 'f' && str[4] == 'i' && str[5] == 'n' && + str[6] == 'i' && str[7] == 't' && str[8] == 'y'); + + // "+Infinity" and "-Infinity" + accepted = accepted || (size == 11 && (str[1] == '+' || str[1] == '-') && + str[2] == 'I' && str[3] == 'n' && str[4] == 'f' && + str[5] == 'i' && str[6] == 'n' && str[7] == 'i' && + str[8] == 't' && str[9] == 'y'); + + if (accepted) { return {str + 1, size - 2}; } + } + + return {str, size}; + }); + + auto const size_it = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [string_pairs = string_pairs.begin()] __device__(cudf::size_type idx) -> cudf::size_type { + return string_pairs[idx].second; + })); + auto [offsets_column, bytes] = + cudf::strings::detail::make_offsets_child_column(size_it, size_it + string_count, stream, mr); + + // If the output has the same total bytes, the output should be the same as the input. + if (bytes == input_sv.chars_size(stream)) { return {nullptr, false}; } + + auto chars_data = cudf::strings::detail::make_chars_buffer( + offsets_column->view(), bytes, string_pairs.begin(), string_count, stream, mr); + + return {cudf::make_strings_column(string_count, + std::move(offsets_column), + chars_data.release(), + input.null_count(), + cudf::detail::copy_bitmask(input, stream, mr)), + true}; +} + +std::unique_ptr cast_strings_to_floats(cudf::column_view const& input, + cudf::data_type output_type, + bool allow_nonnumeric_numbers, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto const string_count = input.size(); + if (string_count == 0) { return cudf::make_empty_column(output_type); } + + if (allow_nonnumeric_numbers) { + // Non-numeric numbers are always quoted. + auto const [removed_quotes, success] = try_remove_quotes_for_floats(input, stream, mr); + return spark_rapids_jni::string_to_float( + output_type, + cudf::strings_column_view{success ? removed_quotes->view() : input}, + /*ansi_mode*/ false, + stream, + mr); + } + return spark_rapids_jni::string_to_float( + output_type, cudf::strings_column_view{input}, /*ansi_mode*/ false, stream, mr); +} + +// TODO there is a bug here around 0 https://github.com/NVIDIA/spark-rapids/issues/10898 +std::unique_ptr cast_strings_to_decimals(cudf::column_view const& input, + cudf::data_type output_type, + int precision, + bool is_us_locale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto const string_count = input.size(); + if (string_count == 0) { return cudf::make_empty_column(output_type); } + + CUDF_EXPECTS(is_us_locale, "String to decimal conversion is only supported in US locale."); + + auto const input_sv = cudf::strings_column_view{input}; + auto const in_offsets = + cudf::detail::offsetalator_factory::make_input_iterator(input_sv.offsets()); + + // Count the number of characters `"`. + rmm::device_uvector quote_counts(string_count, stream); + // Count the number of characters `"` and `,` in each string. + rmm::device_uvector remove_counts(string_count, stream); + + { + using count_type = thrust::tuple; + auto const check_it = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [chars = input_sv.chars_begin(stream)] __device__(auto idx) { + auto const c = chars[idx]; + auto const is_quote = c == '"'; + auto const should_remove = is_quote || c == ','; + return count_type{static_cast(is_quote), static_cast(should_remove)}; + })); + auto const plus_op = + cuda::proclaim_return_type([] __device__(count_type lhs, count_type rhs) { + return count_type{thrust::get<0>(lhs) + thrust::get<0>(rhs), + thrust::get<1>(lhs) + thrust::get<1>(rhs)}; + }); + + auto const out_count_it = + thrust::make_zip_iterator(quote_counts.begin(), remove_counts.begin()); + + std::size_t temp_storage_bytes = 0; + cub::DeviceSegmentedReduce::Reduce(nullptr, + temp_storage_bytes, + check_it, + out_count_it, + string_count, + in_offsets, + in_offsets + 1, + plus_op, + count_type{0, 0}, + stream.value()); + auto d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream}; + cub::DeviceSegmentedReduce::Reduce(d_temp_storage.data(), + temp_storage_bytes, + check_it, + out_count_it, + string_count, + in_offsets, + in_offsets + 1, + plus_op, + count_type{0, 0}, + stream.value()); + } + + auto const out_size_it = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [offsets = in_offsets, + quote_counts = quote_counts.begin(), + remove_counts = remove_counts.begin()] __device__(auto idx) { + auto const input_size = offsets[idx + 1] - offsets[idx]; + // If the current row is non-quoted, just return the original string. + // As such, non-quoted string containing `,` character will not be preprocessed. + if (quote_counts[idx] == 0) { return static_cast(input_size); } + + // For quoted strings, we will modify them, removing characters '"' and ','. + return static_cast(input_size - remove_counts[idx]); + })); + auto [offsets_column, bytes] = cudf::strings::detail::make_offsets_child_column( + out_size_it, out_size_it + string_count, stream, mr); + + // If the output strings column does not change in its total bytes, we can use the input directly. + if (bytes == input_sv.chars_size(stream)) { + return spark_rapids_jni::string_to_decimal(precision, + output_type.scale(), + input_sv, + /*ansi_mode*/ false, + /*strip*/ false, + stream, + mr); + } + + auto const out_offsets = + cudf::detail::offsetalator_factory::make_input_iterator(offsets_column->view()); + auto chars_data = rmm::device_uvector(bytes, stream, mr); + + // Since the strings store decimal numbers, they should not be very long. + // As such, using one thread per string should be fine. + thrust::for_each(rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(string_count), + [in_offsets, + out_offsets, + input = input_sv.chars_begin(stream), + output = chars_data.begin()] __device__(auto idx) { + auto const in_size = in_offsets[idx + 1] - in_offsets[idx]; + auto const out_size = out_offsets[idx + 1] - out_offsets[idx]; + if (in_size == 0) { return; } + + // If the output size is not changed, we are returning the original unquoted + // string. Such string may still contain other alphabet characters, but that + // should be handled in the conversion function later on. + if (in_size == out_size) { + memcpy(output + out_offsets[idx], input + in_offsets[idx], in_size); + } else { // copy byte by byte, ignoring '"' and ',' characters. + auto in_ptr = input + in_offsets[idx]; + auto in_end = input + in_offsets[idx + 1]; + auto out_ptr = output + out_offsets[idx]; + while (in_ptr != in_end) { + if (*in_ptr != '"' && *in_ptr != ',') { + *out_ptr = *in_ptr; + ++out_ptr; + } + ++in_ptr; + } + } + }); + + // Don't care about the null mask, as nulls imply empty strings, which will also result in nulls. + auto const unquoted_strings = + cudf::make_strings_column(string_count, std::move(offsets_column), chars_data.release(), 0, {}); + + return spark_rapids_jni::string_to_decimal(precision, + output_type.scale(), + cudf::strings_column_view{unquoted_strings->view()}, + /*ansi_mode*/ false, + /*strip*/ false, + stream, + mr); +} + +std::pair, bool> try_remove_quotes( + cudf::strings_column_view const& input, + bool nullify_if_not_quoted, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto const string_count = input.size(); + if (string_count == 0) { return {nullptr, false}; } + + auto const input_offsets_it = + cudf::detail::offsetalator_factory::make_input_iterator(input.offsets()); + auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream); + auto const is_valid_it = cudf::detail::make_validity_iterator(*d_input_ptr); + + auto string_pairs = rmm::device_uvector(string_count, stream); + thrust::tabulate(rmm::exec_policy_nosync(stream), + string_pairs.begin(), + string_pairs.end(), + [nullify_if_not_quoted, + chars = input.chars_begin(stream), + offsets = input_offsets_it, + is_valid = is_valid_it] __device__(cudf::size_type idx) -> string_index_pair { + if (!is_valid[idx]) { return {nullptr, 0}; } + + auto const start_offset = offsets[idx]; + auto const end_offset = offsets[idx + 1]; + auto const size = end_offset - start_offset; + auto const str = chars + start_offset; + + // Need to check for size, since the input string may contain just a single + // character `"`. Such input should not be considered as quoted. + auto const is_quoted = size > 1 && str[0] == '"' && str[size - 1] == '"'; + if (nullify_if_not_quoted && !is_quoted) { return {nullptr, 0}; } + + if (is_quoted) { return {chars + start_offset + 1, size - 2}; } + return {chars + start_offset, size}; + }); + + auto const size_it = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [string_pairs = string_pairs.begin()] __device__(cudf::size_type idx) -> cudf::size_type { + return string_pairs[idx].second; + })); + auto [offsets_column, bytes] = + cudf::strings::detail::make_offsets_child_column(size_it, size_it + string_count, stream, mr); + + // If the output has the same total bytes, the output should be the same as the input. + if (bytes == input.chars_size(stream)) { return {nullptr, false}; } + + auto chars_data = cudf::strings::detail::make_chars_buffer( + offsets_column->view(), bytes, string_pairs.begin(), string_count, stream, mr); + + if (nullify_if_not_quoted) { + auto output = cudf::make_strings_column(string_count, + std::move(offsets_column), + chars_data.release(), + 0, + rmm::device_buffer{0, stream, mr}); + + auto [null_mask, null_count] = cudf::detail::valid_if( + string_pairs.begin(), + string_pairs.end(), + [] __device__(string_index_pair const& pair) { return pair.first != nullptr; }, + stream, + mr); + if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } + + return {std::move(output), true}; + } + + return {cudf::make_strings_column(string_count, + std::move(offsets_column), + chars_data.release(), + input.null_count(), + cudf::detail::copy_bitmask(input.parent(), stream, mr)), + true}; +} + +// Copied and modified from `cudf/cpp/src/io/json/parser_features.cpp`. +struct empty_column_functor { + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; + + template ())> + std::unique_ptr operator()(schema_element_with_precision const& schema) const + { + return cudf::make_empty_column(schema.type); + } + + template )> + std::unique_ptr operator()(schema_element_with_precision const& schema) const + { + CUDF_EXPECTS(schema.child_types.size() == 1, "Lists column should have only one child"); + auto offsets = cudf::make_empty_column(cudf::data_type(cudf::type_to_id())); + auto child = cudf::type_dispatcher( + schema.child_types.front().second.type, *this, schema.child_types.front().second); + return cudf::make_lists_column(0, std::move(offsets), std::move(child), 0, {}, stream, mr); + } + + template )> + std::unique_ptr operator()(schema_element_with_precision const& schema) const + { + std::vector> children; + for (auto const& [child_name, child_schema] : schema.child_types) { + children.emplace_back(cudf::type_dispatcher(child_schema.type, *this, child_schema)); + } + return cudf::make_structs_column(0, std::move(children), 0, {}, stream, mr); + } +}; + +// Copied and modified from `cudf/cpp/src/io/json/parser_features.cpp`. +struct allnull_column_functor { + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; + + private: + auto make_zeroed_offsets(cudf::size_type size) const + { + auto offsets_buff = + cudf::detail::make_zeroed_device_uvector_async(size + 1, stream, mr); + return std::make_unique(std::move(offsets_buff), rmm::device_buffer{}, 0); + } + + public: + template () && !std::is_same_v && + !std::is_same_v && + !std::is_same_v)> + std::unique_ptr operator()(Args...) const + { + CUDF_FAIL("Invalid type."); + } + + template ())> + std::unique_ptr operator()(schema_element_with_precision const& schema, + cudf::size_type size) const + { + return cudf::make_fixed_width_column(schema.type, size, cudf::mask_state::ALL_NULL, stream, mr); + } + + template )> + std::unique_ptr operator()(schema_element_with_precision const&, + cudf::size_type size) const + { + auto offsets = make_zeroed_offsets(size); + auto null_mask = cudf::detail::create_null_mask(size, cudf::mask_state::ALL_NULL, stream, mr); + return cudf::make_strings_column( + size, std::move(offsets), rmm::device_buffer{}, size, std::move(null_mask)); + } + + template )> + std::unique_ptr operator()(schema_element_with_precision const& schema, + cudf::size_type size) const + { + CUDF_EXPECTS(schema.child_types.size() == 1, "Lists column should have only one child"); + std::vector> children; + children.emplace_back(make_zeroed_offsets(size)); + children.emplace_back(cudf::type_dispatcher(schema.child_types.front().second.type, + empty_column_functor{stream, mr}, + schema.child_types.front().second)); + auto null_mask = cudf::detail::create_null_mask(size, cudf::mask_state::ALL_NULL, stream, mr); + // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` + // on the child column as it does not have non-empty nulls. + return std::make_unique(cudf::data_type{cudf::type_id::LIST}, + size, + rmm::device_buffer{}, + std::move(null_mask), + size, + std::move(children)); + } + + template )> + std::unique_ptr operator()(schema_element_with_precision const& schema, + cudf::size_type size) const + { + std::vector> children; + children.reserve(schema.child_types.size()); + for (auto const& [child_name, child_schema] : schema.child_types) { + children.emplace_back(cudf::type_dispatcher(child_schema.type, *this, child_schema, size)); + } + auto null_mask = cudf::detail::create_null_mask(size, cudf::mask_state::ALL_NULL, stream, mr); + // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` + // on the children columns. + return std::make_unique(cudf::data_type{cudf::type_id::STRUCT}, + size, + rmm::device_buffer{}, + std::move(null_mask), + size, + std::move(children)); + } +}; + +// This is a workaround for https://github.com/rapidsai/cudf/issues/17167. +// When the issue is fixed, we should remove this utility and adopt it. +std::unique_ptr make_all_nulls_column(schema_element_with_precision const& schema, + cudf::size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return cudf::type_dispatcher(schema.type, allnull_column_functor{stream, mr}, schema, num_rows); +} + +template +std::unique_ptr convert_data_type(InputType&& input, + schema_element_with_precision const& schema, + bool allow_nonnumeric_numbers, + bool is_us_locale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + using DecayInputT = std::decay_t; + auto constexpr input_is_const_cv = std::is_same_v; + auto constexpr input_is_column_ptr = std::is_same_v>; + static_assert(input_is_const_cv ^ input_is_column_ptr, + "Input to `convert_data_type` must either be `cudf::column_view const&` or " + "`std::unique_ptr`"); + + auto const [d_type, num_rows] = [&]() -> std::pair { + if constexpr (input_is_column_ptr) { + return {input->type().id(), input->size()}; + } else { + return {input.type().id(), input.size()}; + } + }(); + + if (d_type == cudf::type_id::STRING) { + if (cudf::is_chrono(schema.type)) { + // Date/time is not processed here - it should be handled separately in spark-rapids. + if constexpr (input_is_column_ptr) { + return std::move(input); + } else { + CUDF_FAIL("Cannot convert data type to a chrono (date/time) type."); + return nullptr; + } + } + + if (schema.type.id() == cudf::type_id::BOOL8) { + if constexpr (input_is_column_ptr) { + return cast_strings_to_booleans(input->view(), stream, mr); + } else { + return cast_strings_to_booleans(input, stream, mr); + } + } + + if (cudf::is_integral(schema.type)) { + if constexpr (input_is_column_ptr) { + return cast_strings_to_integers(input->view(), schema.type, stream, mr); + } else { + return cast_strings_to_integers(input, schema.type, stream, mr); + } + } + + if (cudf::is_floating_point(schema.type)) { + if constexpr (input_is_column_ptr) { + return cast_strings_to_floats( + input->view(), schema.type, allow_nonnumeric_numbers, stream, mr); + } else { + return cast_strings_to_floats(input, schema.type, allow_nonnumeric_numbers, stream, mr); + } + } + + if (cudf::is_fixed_point(schema.type)) { + if constexpr (input_is_column_ptr) { + return cast_strings_to_decimals( + input->view(), schema.type, schema.precision, is_us_locale, stream, mr); + } else { + return cast_strings_to_decimals( + input, schema.type, schema.precision, is_us_locale, stream, mr); + } + } + + if (schema.type.id() == cudf::type_id::STRING) { + if constexpr (input_is_column_ptr) { + auto [removed_quotes, success] = + try_remove_quotes(input->view(), /*nullify_if_not_quoted*/ false, stream, mr); + return std::move(success ? removed_quotes : input); + } else { + auto [removed_quotes, success] = + try_remove_quotes(input, /*nullify_if_not_quoted*/ false, stream, mr); + return success ? std::move(removed_quotes) + : std::make_unique(input, stream, mr); + } + } + + CUDF_FAIL("Unexpected column type for conversion."); + return nullptr; + } // d_type == cudf::type_id::STRING + + // From here, the input column should have type either LIST or STRUCT. + + // Handle mismatched schema. + if (schema.type.id() != d_type) { return make_all_nulls_column(schema, num_rows, stream, mr); } + + if constexpr (input_is_column_ptr) { + auto const null_count = input->null_count(); + auto const num_children = input->num_children(); + auto input_content = input->release(); + + if (schema.type.id() == cudf::type_id::LIST) { + auto const& child_schema = schema.child_types.front().second; + auto& child = input_content.children[cudf::lists_column_view::child_column_index]; + + // Handle mismatched child schema. + if (cudf::is_nested(child_schema.type) && (child_schema.type.id() != child->type().id())) { + return make_all_nulls_column(schema, num_rows, stream, mr); + } + + std::vector> new_children; + new_children.emplace_back( + std::move(input_content.children[cudf::lists_column_view::offsets_column_index])); + new_children.emplace_back(convert_data_type( + std::move(child), child_schema, allow_nonnumeric_numbers, is_us_locale, stream, mr)); + + // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` + // on the child column as it does not have non-empty nulls. + return std::make_unique(cudf::data_type{cudf::type_id::LIST}, + num_rows, + rmm::device_buffer{}, + std::move(*input_content.null_mask), + null_count, + std::move(new_children)); + } + + if (schema.type.id() == cudf::type_id::STRUCT) { + std::vector> new_children; + new_children.reserve(num_children); + for (cudf::size_type i = 0; i < num_children; ++i) { + new_children.emplace_back(convert_data_type(std::move(input_content.children[i]), + schema.child_types[i].second, + allow_nonnumeric_numbers, + is_us_locale, + stream, + mr)); + } + + // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` + // on the children columns. + return std::make_unique(cudf::data_type{cudf::type_id::STRUCT}, + num_rows, + rmm::device_buffer{}, + std::move(*input_content.null_mask), + null_count, + std::move(new_children)); + } + } else { // input_is_const_cv + auto const null_count = input.null_count(); + auto const num_children = input.num_children(); + + if (schema.type.id() == cudf::type_id::LIST) { + auto const& child_schema = schema.child_types.front().second; + auto const child = input.child(cudf::lists_column_view::child_column_index); + + // Handle mismatched child schema. + if (cudf::is_nested(child_schema.type) && (child_schema.type.id() != child.type().id())) { + return make_all_nulls_column(schema, num_rows, stream, mr); + } + + std::vector> new_children; + new_children.emplace_back( + std::make_unique(input.child(cudf::lists_column_view::offsets_column_index))); + new_children.emplace_back( + convert_data_type(child, child_schema, allow_nonnumeric_numbers, is_us_locale, stream, mr)); + + // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` + // on the child column as it does not have non-empty nulls. + return std::make_unique(cudf::data_type{cudf::type_id::LIST}, + num_rows, + rmm::device_buffer{}, + cudf::detail::copy_bitmask(input, stream, mr), + null_count, + std::move(new_children)); + } + + if (schema.type.id() == cudf::type_id::STRUCT) { + std::vector> new_children; + new_children.reserve(num_children); + for (cudf::size_type i = 0; i < num_children; ++i) { + new_children.emplace_back(convert_data_type(input.child(i), + schema.child_types[i].second, + allow_nonnumeric_numbers, + is_us_locale, + stream, + mr)); + } + + // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` + // on the children columns. + return std::make_unique(cudf::data_type{cudf::type_id::STRUCT}, + num_rows, + rmm::device_buffer{}, + cudf::detail::copy_bitmask(input, stream, mr), + null_count, + std::move(new_children)); + } + } + + CUDF_FAIL("Unexpected column type for conversion."); + return nullptr; +} + +std::unique_ptr from_json_to_structs(cudf::strings_column_view const& input, + std::vector const& col_names, + std::vector const& num_children, + std::vector const& types, + std::vector const& scales, + std::vector const& precisions, + bool normalize_single_quotes, + bool allow_leading_zeros, + bool allow_nonnumeric_numbers, + bool allow_unquoted_control, + bool is_us_locale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto const [concat_input, delimiter, should_be_nullified] = + concat_json(input, false, stream, cudf::get_current_device_resource()); + auto const [schema, schema_with_precision] = + generate_struct_schema(col_names, num_children, types, scales, precisions); + + auto opts_builder = + cudf::io::json_reader_options::builder( + cudf::io::source_info{cudf::device_span{ + static_cast(concat_input->data()), concat_input->size()}}) + // fixed options + .lines(true) + .recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL) + .normalize_whitespace(true) + .mixed_types_as_string(true) + .keep_quotes(true) + .experimental(true) + .strict_validation(true) + // specifying parameters + .normalize_single_quotes(normalize_single_quotes) + .delimiter(delimiter) + .numeric_leading_zeros(allow_leading_zeros) + .nonnumeric_numbers(allow_nonnumeric_numbers) + .unquoted_control_chars(allow_unquoted_control) + .dtypes(schema) + .prune_columns(schema.child_types.size() != 0); + + auto const parsed_table_with_meta = cudf::io::read_json(opts_builder.build()); + auto const& parsed_meta = parsed_table_with_meta.metadata; + auto parsed_columns = parsed_table_with_meta.tbl->release(); + + CUDF_EXPECTS(parsed_columns.size() == schema.child_types.size(), + "Numbers of output columns is different from schema size."); + + std::vector> converted_cols; + converted_cols.reserve(parsed_columns.size()); + for (std::size_t i = 0; i < parsed_columns.size(); ++i) { + auto const d_type = parsed_columns[i]->type().id(); + CUDF_EXPECTS(d_type == cudf::type_id::LIST || d_type == cudf::type_id::STRUCT || + d_type == cudf::type_id::STRING, + "Parsed JSON columns should be STRING or nested."); + + auto const& [col_name, col_schema] = schema_with_precision.child_types[i]; + CUDF_EXPECTS(parsed_meta.schema_info[i].name == col_name, "Mismatched column name."); + converted_cols.emplace_back(convert_data_type(std::move(parsed_columns[i]), + col_schema, + allow_nonnumeric_numbers, + is_us_locale, + stream, + mr)); + } + + auto const valid_it = should_be_nullified->view().begin(); + auto [null_mask, null_count] = cudf::detail::valid_if( + valid_it, valid_it + should_be_nullified->size(), thrust::logical_not{}, stream, mr); + + // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` + // on the children columns. + return std::make_unique( + cudf::data_type{cudf::type_id::STRUCT}, + input.size(), + rmm::device_buffer{}, + null_count > 0 ? std::move(null_mask) : rmm::device_buffer{0, stream, mr}, + null_count, + std::move(converted_cols)); +} + +} // namespace + +} // namespace detail + +std::unique_ptr from_json_to_structs(cudf::strings_column_view const& input, + std::vector const& col_names, + std::vector const& num_children, + std::vector const& types, + std::vector const& scales, + std::vector const& precisions, + bool normalize_single_quotes, + bool allow_leading_zeros, + bool allow_nonnumeric_numbers, + bool allow_unquoted_control, + bool is_us_locale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + return detail::from_json_to_structs(input, + col_names, + num_children, + types, + scales, + precisions, + normalize_single_quotes, + allow_leading_zeros, + allow_nonnumeric_numbers, + allow_unquoted_control, + is_us_locale, + stream, + mr); +} + +std::unique_ptr convert_from_strings(cudf::strings_column_view const& input, + std::vector const& num_children, + std::vector const& types, + std::vector const& scales, + std::vector const& precisions, + bool allow_nonnumeric_numbers, + bool is_us_locale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + [[maybe_unused]] auto const [schema, schema_with_precision] = detail::generate_struct_schema( + /*dummy col_names*/ std::vector(num_children.size(), std::string{}), + num_children, + types, + scales, + precisions); + CUDF_EXPECTS(schema_with_precision.child_types.size() == 1, + "The input schema to convert must have exactly one column."); + + auto const input_cv = input.parent(); + return detail::convert_data_type(input_cv, + schema_with_precision.child_types.front().second, + allow_nonnumeric_numbers, + is_us_locale, + stream, + mr); +} + +std::unique_ptr remove_quotes(cudf::strings_column_view const& input, + bool nullify_if_not_quoted, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto const input_cv = input.parent(); + auto [removed_quotes, success] = + detail::try_remove_quotes(input_cv, nullify_if_not_quoted, stream, mr); + return success ? std::move(removed_quotes) : std::make_unique(input_cv, stream, mr); +} + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/src/json_utils.cu b/src/main/cpp/src/json_utils.cu index db0606c38c..ef3d0db0f8 100644 --- a/src/main/cpp/src/json_utils.cu +++ b/src/main/cpp/src/json_utils.cu @@ -190,8 +190,7 @@ std::tuple, char, std::unique_ptr, char, std::unique_ptr(null_mask.data()), null_count, - 0, + input.offset(), std::vector{input.offsets()}}; auto concat_strings = cudf::strings::detail::join_strings( @@ -215,32 +214,6 @@ std::tuple, char, std::unique_ptr(std::move(should_be_nullified), rmm::device_buffer{}, 0)}; } -std::unique_ptr make_structs(std::vector const& children, - cudf::column_view const& is_null, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - if (children.size() == 0) { return nullptr; } - - auto const row_count = children.front().size(); - for (auto const& col : children) { - CUDF_EXPECTS(col.size() == row_count, "All columns must have the same number of rows."); - } - - auto const [null_mask, null_count] = cudf::detail::valid_if( - is_null.begin(), is_null.end(), thrust::logical_not{}, stream, mr); - - auto const structs = - cudf::column_view(cudf::data_type{cudf::type_id::STRUCT}, - row_count, - nullptr, - reinterpret_cast(null_mask.data()), - null_count, - 0, - children); - return std::make_unique(structs, stream, mr); -} - } // namespace detail std::tuple, char, std::unique_ptr> concat_json( @@ -253,13 +226,4 @@ std::tuple, char, std::unique_ptr make_structs(std::vector const& children, - cudf::column_view const& is_null, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - return detail::make_structs(children, is_null, stream, mr); -} - } // namespace spark_rapids_jni diff --git a/src/main/cpp/src/json_utils.hpp b/src/main/cpp/src/json_utils.hpp index 9fab3047cb..319c81103d 100644 --- a/src/main/cpp/src/json_utils.hpp +++ b/src/main/cpp/src/json_utils.hpp @@ -22,12 +22,14 @@ #include #include -#include #include namespace spark_rapids_jni { +/** + * @brief Extract a map column from the JSON strings given by an input strings column. + */ std::unique_ptr from_json_to_raw_map( cudf::strings_column_view const& input, bool normalize_single_quotes, @@ -37,9 +39,51 @@ std::unique_ptr from_json_to_raw_map( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource()); -std::unique_ptr make_structs( - std::vector const& input, - cudf::column_view const& is_null, +/** + * @brief Parse JSON strings into a struct column followed by a given data schema. + * + * The data schema is specified as data arrays flattened by depth-first-search order. + */ +std::unique_ptr from_json_to_structs( + cudf::strings_column_view const& input, + std::vector const& col_names, + std::vector const& num_children, + std::vector const& types, + std::vector const& scales, + std::vector const& precisions, + bool normalize_single_quotes, + bool allow_leading_zeros, + bool allow_nonnumeric_numbers, + bool allow_unquoted_control, + bool is_us_locale, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource()); + +/** + * @brief Convert from a strings column to a column with the desired type given by a data schema. + * + * The given column schema is specified as data arrays flattened by depth-first-search order. + */ +std::unique_ptr convert_from_strings( + cudf::strings_column_view const& input, + std::vector const& num_children, + std::vector const& types, + std::vector const& scales, + std::vector const& precisions, + bool allow_nonnumeric_numbers, + bool is_us_locale, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource()); + +/** + * @brief Remove quotes from each string in the given strings column. + * + * If `nullify_if_not_quoted` is true, an input string that is not quoted will result in a null. + * Otherwise, the output will be the same as the unquoted input. + */ +std::unique_ptr remove_quotes( + cudf::strings_column_view const& input, + bool nullify_if_not_quoted, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource()); diff --git a/src/main/java/com/nvidia/spark/rapids/jni/JSONUtils.java b/src/main/java/com/nvidia/spark/rapids/jni/JSONUtils.java index c58caa62e9..9cf00acff2 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/JSONUtils.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/JSONUtils.java @@ -166,80 +166,78 @@ public static ColumnVector extractRawMapFromJsonString(ColumnView input, JSONOpt } /** - * Extract key-value pairs for each output map from the given json strings. This method is - * similar to {@link #extractRawMapFromJsonString(ColumnView, JSONOptions)} but is deprecated. - * - * @deprecated This method is deprecated since it does not have parameters to control various - * JSON reader behaviors. + * Parse a JSON string into a struct column following by the given data schema. + *

+ * Many JSON options in the given {@code opts} parameter are ignored from passing down to the + * native code. That is because these options are hard-coded with the same values in both the + * plugin code and native code. Specifically:
+ * - {@code RecoverWithNull: true}
+ * - {@code MixedTypesAsStrings: true}
+ * - {@code NormalizeWhitespace: true}
+ * - {@code KeepQuotes: true}
+ * - {@code StrictValidation: true}
+ * - {@code Experimental: true} * * @param input The input strings column in which each row specifies a json object - * @return A map column (i.e., a column of type {@code List>}) in - * which the key-value pairs are extracted directly from the input json strings + * @param schema The schema of the output struct column + * @param opts The options for parsing JSON strings + * @param isUSLocale Whether the current local is US locale, used when converting strings to + * decimal types + * @return A struct column in which each row is parsed from the corresponding json string */ - public static ColumnVector extractRawMapFromJsonString(ColumnView input) { + public static ColumnVector fromJSONToStructs(ColumnView input, Schema schema, JSONOptions opts, + boolean isUSLocale) { assert (input.getType().equals(DType.STRING)) : "Input must be of STRING type"; - return new ColumnVector(extractRawMapFromJsonString(input.getNativeView(), - true, true, true, true)); - } - - /** - * A class to hold the result when concatenating JSON strings. - *

- * A long with the concatenated data, the result also contains a vector that indicates - * whether each row in the input is null or empty, and the delimiter used for concatenation. - */ - public static class ConcatenatedJson implements AutoCloseable { - public final ColumnVector isNullOrEmpty; - public final DeviceMemoryBuffer data; - public final char delimiter; - - public ConcatenatedJson(ColumnVector isNullOrEmpty, DeviceMemoryBuffer data, char delimiter) { - this.isNullOrEmpty = isNullOrEmpty; - this.data = data; - this.delimiter = delimiter; - } - - @Override - public void close() { - isNullOrEmpty.close(); - data.close(); - } + return new ColumnVector(fromJSONToStructs(input.getNativeView(), + schema.getFlattenedColumnNames(), + schema.getFlattenedNumChildren(), + schema.getFlattenedTypeIds(), + schema.getFlattenedTypeScales(), + schema.getFlattenedDecimalPrecisions(), + opts.isNormalizeSingleQuotes(), + opts.leadingZerosAllowed(), + opts.nonNumericNumbersAllowed(), + opts.unquotedControlChars(), + isUSLocale)); } /** - * Concatenate JSON strings in the input column into a single JSON string. - *

- * During concatenation, the function also generates a boolean vector that indicates whether - * each row in the input is null or empty. The delimiter used for concatenation is also returned. + * Convert from a strings column to a column with the desired type given by a data schema. * - * @param input The input strings column to concatenate - * @return A {@link ConcatenatedJson} object that contains the concatenated output + * @param input The input strings column + * @param schema The schema of the output column + * @param allowedNonNumericNumbers Whether non-numeric numbers are allowed, used when converting + * strings to float types + * @param isUSLocale Whether the current local is US locale, used when converting strings to + * decimal types + * @return A column with the desired data type */ - public static ConcatenatedJson concatenateJsonStrings(ColumnView input) { + public static ColumnVector convertFromStrings(ColumnView input, Schema schema, + boolean allowedNonNumericNumbers, + boolean isUSLocale) { assert (input.getType().equals(DType.STRING)) : "Input must be of STRING type"; - long[] concatenated = concatenateJsonStrings(input.getNativeView()); - return new ConcatenatedJson(new ColumnVector(concatenated[0]), - DeviceMemoryBuffer.fromRmm(concatenated[1], concatenated[2], concatenated[3]), - (char) concatenated[4]); + return new ColumnVector(convertFromStrings(input.getNativeView(), + schema.getFlattenedNumChildren(), + schema.getFlattenedTypeIds(), + schema.getFlattenedTypeScales(), + schema.getFlattenedDecimalPrecisions(), + allowedNonNumericNumbers, + isUSLocale)); } /** - * Create a structs column from the given children columns and a boolean column specifying - * the rows at which the output column.should be null. - *

- * Note that the children columns are expected to have null rows at the same positions indicated - * by the input isNull column. + * Remove quotes from each string in the given strings column. + *

+ * If `nullifyIfNotQuoted` is true, an input string that is not quoted will result in a null. + * Otherwise, the output will be the same as the unquoted input. * - * @param children The children columns of the output structs column - * @param isNull A boolean column specifying the rows at which the output column should be null - * @return A structs column created from the given children and the isNull column + * @param input The input strings column + * @param nullifyIfNotQuoted Whether to output a null row if the input string is not quoted + * @return A strings column in which quotes are removed from all strings */ - public static ColumnVector makeStructs(ColumnView[] children, ColumnView isNull) { - long[] handles = new long[children.length]; - for (int i = 0; i < children.length; i++) { - handles[i] = children[i].getNativeView(); - } - return new ColumnVector(makeStructs(handles, isNull.getNativeView())); + public static ColumnVector removeQuotes(ColumnView input, boolean nullifyIfNotQuoted) { + assert (input.getType().equals(DType.STRING)) : "Input must be of STRING type"; + return new ColumnVector(removeQuotes(input.getNativeView(), nullifyIfNotQuoted)); } private static native int getMaxJSONPathDepth(); @@ -257,14 +255,31 @@ private static native long[] getJsonObjectMultiplePaths(long input, long memoryBudgetBytes, int parallelOverride); - private static native long extractRawMapFromJsonString(long input, boolean normalizeSingleQuotes, boolean leadingZerosAllowed, boolean nonNumericNumbersAllowed, boolean unquotedControlChars); - private static native long[] concatenateJsonStrings(long input); + private static native long fromJSONToStructs(long input, + String[] names, + int[] numChildren, + int[] typeIds, + int[] typeScales, + int[] typePrecision, + boolean normalizeSingleQuotes, + boolean leadingZerosAllowed, + boolean nonNumericNumbersAllowed, + boolean unquotedControlChars, + boolean isUSLocale); + + private static native long convertFromStrings(long input, + int[] numChildren, + int[] typeIds, + int[] typeScales, + int[] typePrecision, + boolean nonNumericNumbersAllowed, + boolean isUSLocale); - private static native long makeStructs(long[] children, long isNull); + private static native long removeQuotes(long input, boolean nullifyIfNotQuoted); }