Skip to content

Commit

Permalink
utf8 field name support (experimental)
Browse files Browse the repository at this point in the history
  • Loading branch information
karthikeyann committed Sep 24, 2024
1 parent 7ec6ba1 commit 6f8a4e2
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 9 deletions.
1 change: 1 addition & 0 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
gpu_tree,
is_array_of_arrays,
options.is_enabled_lines(),
options.is_enabled_experimental(),
stream,
cudf::get_current_device_resource_ref());

Expand Down
152 changes: 143 additions & 9 deletions cpp/src/io/json/json_tree.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
* limitations under the License.
*/

#include "io/utilities/hostdevice_vector.hpp"
#include "io/utilities/parsing_utils.cuh"
#include "io/utilities/string_parsing.hpp"
#include "nested_json.hpp"

#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/scatter.cuh>
#include <cudf/detail/utilities/algorithm.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/hashing/detail/default_hash.cuh>
#include <cudf/hashing/detail/hashing.hpp>
#include <cudf/hashing/detail/helper_functions.cuh>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>
Expand All @@ -34,12 +35,14 @@
#include <rmm/exec_policy.hpp>

#include <cub/device/device_radix_sort.cuh>
#include <cuco/static_map.cuh>
#include <cuco/static_set.cuh>
#include <cuda/functional>
#include <thrust/binary_search.h>
#include <thrust/copy.h>
#include <thrust/count.h>
#include <thrust/fill.h>
#include <thrust/functional.h>
#include <thrust/gather.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/discard_iterator.h>
Expand Down Expand Up @@ -492,6 +495,84 @@ tree_meta_t get_tree_representation(device_span<PdaTokenT const> tokens,
std::move(node_range_end)};
}

// Return field node ids after unicode decoding of field names and matching them to same field names
std::pair<size_t, rmm::device_uvector<size_type>> remapped_field_nodes_after_unicode_decode(
device_span<SymbolT const> d_input,
tree_meta_t const& d_tree,
device_span<size_type const> keys,
rmm::cuda_stream_view stream)
{
size_t num_keys = keys.size();
if (num_keys == 0) { return {num_keys, rmm::device_uvector<size_type>(num_keys, stream)}; }
rmm::device_uvector<size_type> offsets(num_keys, stream);
rmm::device_uvector<size_type> lengths(num_keys, stream);
thrust::transform(rmm::exec_policy_nosync(stream),
keys.begin(),
keys.end(),
thrust::make_zip_iterator(offsets.begin(), lengths.begin()),
[node_range_begin = d_tree.node_range_begin.data(),
node_range_end = d_tree.node_range_end.data()] __device__(auto key) {
return thrust::make_tuple(node_range_begin[key],
node_range_end[key] - node_range_begin[key]);
});
auto offset_length_it = thrust::make_zip_iterator(offsets.begin(), lengths.begin());
cudf::io::parse_options_view opt{',', '\n', '\0', '.'};
opt.keepquotes = true;

auto utf8_decoded_fields = parse_data(d_input.data(),
offset_length_it,
num_keys,
data_type{type_id::STRING},
rmm::device_buffer{},
0,
opt,
stream,
cudf::get_current_device_resource_ref());
// hash using iter, create a hashmap for 0-num_keys.
// insert and find. -> array
// store to static_map with keys as field key[index], and values as key[array[index]]

auto str_view = strings_column_view{utf8_decoded_fields->view()};
auto const char_ptr = str_view.chars_begin(stream);
auto const offset_ptr = str_view.offsets().begin<size_type>();

// String hasher
auto const d_hasher = cuda::proclaim_return_type<
typename cudf::hashing::detail::default_hash<cudf::string_view>::result_type>(
[char_ptr, offset_ptr] __device__(auto node_id) {
auto const field_name = cudf::string_view(char_ptr + offset_ptr[node_id],
offset_ptr[node_id + 1] - offset_ptr[node_id]);
return cudf::hashing::detail::default_hash<cudf::string_view>{}(field_name);
});
auto const d_equal = [char_ptr, offset_ptr] __device__(auto node_id1, auto node_id2) {
auto const field_name1 = cudf::string_view(char_ptr + offset_ptr[node_id1],
offset_ptr[node_id1 + 1] - offset_ptr[node_id1]);
auto const field_name2 = cudf::string_view(char_ptr + offset_ptr[node_id2],
offset_ptr[node_id2 + 1] - offset_ptr[node_id2]);
return field_name1 == field_name2;
};

using hasher_type = decltype(d_hasher);
constexpr size_type empty_node_index_sentinel = -1;
auto key_set = cuco::static_set{
cuco::extent{compute_hash_table_size(num_keys, 100)}, // 40% occupancy
cuco::empty_key{empty_node_index_sentinel},
d_equal,
cuco::linear_probing<1, hasher_type>{d_hasher},
{},
{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};
auto const counting_iter = thrust::make_counting_iterator<size_type>(0);
rmm::device_uvector<size_type> found_keys(num_keys, stream);
key_set.insert_and_find_async(counting_iter,
counting_iter + num_keys,
found_keys.begin(),
thrust::make_discard_iterator(),
stream.value());
return {key_set.size(stream), std::move(found_keys)};
}

/**
* @brief Generates unique node_type id for each node.
* Field nodes with the same name are assigned the same node_type id.
Expand All @@ -500,11 +581,14 @@ tree_meta_t get_tree_representation(device_span<PdaTokenT const> tokens,
* All inputs and outputs are in node_id order.
* @param d_input JSON string in device memory
* @param d_tree Tree representation of the JSON
* @param is_enabled_experimental Whether to enable experimental features such as
* utf8 field name support
* @param stream CUDA stream used for device memory operations and kernel launches.
* @return Vector of node_type ids
*/
rmm::device_uvector<size_type> hash_node_type_with_field_name(device_span<SymbolT const> d_input,
tree_meta_t const& d_tree,
bool is_enabled_experimental,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
Expand Down Expand Up @@ -536,7 +620,7 @@ rmm::device_uvector<size_type> hash_node_type_with_field_name(device_span<Symbol
};
// key-value pairs: uses node_id itself as node_type. (unique node_id for a field name due to
// hashing)
auto const iter = thrust::make_counting_iterator<size_type>(0);
auto const counting_iter = thrust::make_counting_iterator<size_type>(0);

auto const is_field_name_node = [node_categories =
d_tree.node_categories.data()] __device__(auto node_id) {
Expand All @@ -554,15 +638,61 @@ rmm::device_uvector<size_type> hash_node_type_with_field_name(device_span<Symbol
{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};
key_set.insert_if_async(iter,
iter + num_nodes,
key_set.insert_if_async(counting_iter,
counting_iter + num_nodes,
thrust::counting_iterator<size_type>(0), // stencil
is_field_name_node,
stream.value());

// experimental feature: utf8 field name support
// parse_data on field names,
// rehash it using another map,
// reassign the reverse map values to new matched node indices.
auto get_utf8_matched_field_nodes = [&]() {
auto make_map = [&stream](auto num_keys) {
using hasher_type3 = cudf::hashing::detail::default_hash<size_type>;
return cuco::static_map{
cuco::extent{compute_hash_table_size(num_keys, 100)}, // 100% occupancy
cuco::empty_key{empty_node_index_sentinel},
cuco::empty_value{empty_node_index_sentinel},
{},
cuco::linear_probing<1, hasher_type3>{hasher_type3{}},
{},
{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};
};
if (!is_enabled_experimental) { return std::pair{false, make_map(0)}; }
// get all unique field node ids for utf8 decoding
auto num_keys = key_set.size();
rmm::device_uvector<size_type> keys(num_keys, stream);
key_set.retrieve_all(keys.data(), stream.value());

auto [num_unique_fields, found_keys] =
remapped_field_nodes_after_unicode_decode(d_input, d_tree, keys, stream);

auto is_need_remap = num_unique_fields != num_keys;
if (!is_need_remap) { return std::pair{false, make_map(0)}; }

// store to static_map with keys as field keys[index], and values as keys[found_keys[index]]
auto reverse_map = make_map(num_keys);
auto matching_keys_iter = thrust::make_permutation_iterator(keys.begin(), found_keys.begin());
auto pair_iter =
thrust::make_zip_iterator(thrust::make_tuple(keys.begin(), matching_keys_iter));
reverse_map.insert_async(pair_iter, pair_iter + num_keys, stream);
return std::pair{is_need_remap, std::move(reverse_map)};
};
auto [is_need_remap, reverse_map] = get_utf8_matched_field_nodes();

auto const get_hash_value =
[key_set = key_set.ref(cuco::op::find)] __device__(auto node_id) -> size_type {
[key_set = key_set.ref(cuco::op::find),
is_need_remap = is_need_remap,
rm = reverse_map.ref(cuco::op::find)] __device__(auto node_id) -> size_type {
auto const it = key_set.find(node_id);
if (it != key_set.end() and is_need_remap) {
auto const it2 = rm.find(*it);
return (it2 == rm.end()) ? size_type{0} : it2->second;
}
return (it == key_set.end()) ? size_type{0} : *it;
};

Expand Down Expand Up @@ -771,6 +901,8 @@ std::pair<rmm::device_uvector<size_type>, rmm::device_uvector<size_type>> hash_n
* @param d_tree Tree representation of the JSON
* @param is_array_of_arrays Whether the tree is an array of arrays
* @param is_enabled_lines Whether the input is a line-delimited JSON
* @param is_enabled_experimental Whether the experimental feature is enabled such as
* utf8 field name support
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return column_id, parent_column_id
Expand All @@ -780,6 +912,7 @@ std::pair<rmm::device_uvector<NodeIndexT>, rmm::device_uvector<NodeIndexT>> gene
tree_meta_t const& d_tree,
bool is_array_of_arrays,
bool is_enabled_lines,
bool is_enabled_experimental,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -793,7 +926,7 @@ std::pair<rmm::device_uvector<NodeIndexT>, rmm::device_uvector<NodeIndexT>> gene
auto [col_id, unique_keys] = [&]() {
// Convert node_category + field_name to node_type.
rmm::device_uvector<size_type> node_type =
hash_node_type_with_field_name(d_input, d_tree, stream);
hash_node_type_with_field_name(d_input, d_tree, is_enabled_experimental, stream);

// hash entire path from node to root.
return hash_node_path(d_tree.node_levels,
Expand Down Expand Up @@ -948,12 +1081,13 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
tree_meta_t const& d_tree,
bool is_array_of_arrays,
bool is_enabled_lines,
bool is_enabled_experimental,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
auto [new_col_id, new_parent_col_id] =
generate_column_id(d_input, d_tree, is_array_of_arrays, is_enabled_lines, stream, mr);
auto [new_col_id, new_parent_col_id] = generate_column_id(
d_input, d_tree, is_array_of_arrays, is_enabled_lines, is_enabled_experimental, stream, mr);

auto row_offsets = compute_row_offsets(
std::move(new_parent_col_id), d_tree, is_array_of_arrays, is_enabled_lines, stream, mr);
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ tree_meta_t get_tree_representation(device_span<PdaTokenT const> tokens,
* index, level, begin index, and end index in the input JSON string
* @param is_array_of_arrays Whether the tree is an array of arrays
* @param is_enabled_lines Whether the input is a line-delimited JSON
* @param is_enabled_experimental Whether to enable experimental features such as utf-8 field name
* support
* @param stream The CUDA stream to which kernels are dispatched
* @param mr Optional, resource with which to allocate
* @return A tuple of the output column indices and the row offsets within each column for each node
Expand All @@ -277,6 +279,7 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
tree_meta_t const& d_tree,
bool is_array_of_arrays,
bool is_enabled_lines,
bool is_enabled_experimental,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

Expand Down
25 changes: 25 additions & 0 deletions cpp/tests/io/json/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2884,6 +2884,31 @@ TEST_F(JsonReaderTest, MixedTypesWithSchema)
EXPECT_EQ(result.tbl->get_column(0).child(1).type().id(), cudf::type_id::STRING);
}

TEST_F(JsonReaderTest, UnicodeFieldname)
{
// unicode at nested and leaf levels
std::string data = R"({"data": {"a": 0, "b c": 1}}
{"data": {"\u0061": 2, "\u0062\tc": 3}}
{"d\u0061ta": {"a": 4}})";

cudf::io::json_reader_options in_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()})
.recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL)
.experimental(true)
.lines(true);
cudf::io::table_with_metadata result = cudf::io::read_json(in_options);
EXPECT_EQ(result.tbl->num_columns(), 1);
EXPECT_EQ(result.tbl->num_rows(), 3);
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRUCT);
EXPECT_EQ(result.tbl->get_column(0).num_children(), 2);
EXPECT_EQ(result.tbl->get_column(0).child(0).type().id(), cudf::type_id::INT64);
EXPECT_EQ(result.tbl->get_column(0).child(1).type().id(), cudf::type_id::INT64);
EXPECT_EQ(result.metadata.schema_info.at(0).name, "data");
EXPECT_EQ(result.metadata.schema_info.at(0).children.at(0).name, "a");
EXPECT_EQ(result.metadata.schema_info.at(0).children.at(1).name, "b\tc");
EXPECT_EQ(result.metadata.schema_info.at(0).children.size(), 2);
}

TEST_F(JsonReaderTest, JsonDtypeSchema)
{
std::string data = R"(
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/io/json/json_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ TEST_P(JsonTreeTraversalTest, CPUvsGPUTraversal)
gpu_tree,
is_array_of_arrays,
json_lines,
false,
stream,
cudf::get_current_device_resource_ref());
#if LIBCUDF_JSON_DEBUG_DUMP
Expand Down

0 comments on commit 6f8a4e2

Please sign in to comment.