-
Notifications
You must be signed in to change notification settings - Fork 916
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
JSON tree algorithms refactor I: CSR data structure for column tree (#…
…15979) Part of #15903. 1. Introduces the Compressed Sparse Row (CSR) format to store the adjacency information of the column tree. 2. Analogous to `reduce_to_column_tree`, `reduce_to_column_tree_csr` reduces node tree representation to column tree stored in CSR format. TODO: - [x] Correctness test Authors: - Shruti Shivakumar (https://github.com/shrshi) - Vukasin Milovanovic (https://github.com/vuule) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) - Vukasin Milovanovic (https://github.com/vuule) - Nghia Truong (https://github.com/ttnghia) - Karthikeyan (https://github.com/karthikeyann) - Kyle Edwards (https://github.com/KyleFromNVIDIA) URL: #15979
- Loading branch information
Showing
6 changed files
with
758 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,304 @@ | ||
/* | ||
* Copyright (c) 2024, NVIDIA CORPORATION. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "nested_json.hpp" | ||
|
||
#include <cudf/detail/nvtx/ranges.hpp> | ||
#include <cudf/detail/utilities/vector_factories.hpp> | ||
#include <cudf/types.hpp> | ||
#include <cudf/utilities/error.hpp> | ||
#include <cudf/utilities/memory_resource.hpp> | ||
#include <cudf/utilities/span.hpp> | ||
|
||
#include <rmm/cuda_stream_view.hpp> | ||
#include <rmm/device_uvector.hpp> | ||
#include <rmm/exec_policy.hpp> | ||
|
||
#include <cuda/functional> | ||
#include <thrust/for_each.h> | ||
#include <thrust/iterator/constant_iterator.h> | ||
#include <thrust/iterator/counting_iterator.h> | ||
#include <thrust/iterator/permutation_iterator.h> | ||
#include <thrust/iterator/transform_output_iterator.h> | ||
#include <thrust/iterator/zip_iterator.h> | ||
#include <thrust/reduce.h> | ||
#include <thrust/scan.h> | ||
#include <thrust/sort.h> | ||
#include <thrust/transform.h> | ||
#include <thrust/transform_scan.h> | ||
#include <thrust/unique.h> | ||
|
||
namespace cudf::io::json { | ||
|
||
using row_offset_t = size_type; | ||
|
||
#ifdef CSR_DEBUG_PRINT | ||
template <typename T> | ||
void print(device_span<T const> d_vec, std::string name, rmm::cuda_stream_view stream) | ||
{ | ||
stream.synchronize(); | ||
auto h_vec = cudf::detail::make_std_vector_sync(d_vec, stream); | ||
std::cout << name << " = "; | ||
for (auto e : h_vec) { | ||
std::cout << e << " "; | ||
} | ||
std::cout << std::endl; | ||
} | ||
#endif | ||
|
||
namespace experimental::detail { | ||
|
||
struct level_ordering { | ||
device_span<TreeDepthT const> node_levels; | ||
device_span<NodeIndexT const> col_ids; | ||
device_span<NodeIndexT const> parent_node_ids; | ||
__device__ bool operator()(NodeIndexT lhs_node_id, NodeIndexT rhs_node_id) const | ||
{ | ||
auto lhs_parent_col_id = parent_node_ids[lhs_node_id] == parent_node_sentinel | ||
? parent_node_sentinel | ||
: col_ids[parent_node_ids[lhs_node_id]]; | ||
auto rhs_parent_col_id = parent_node_ids[rhs_node_id] == parent_node_sentinel | ||
? parent_node_sentinel | ||
: col_ids[parent_node_ids[rhs_node_id]]; | ||
|
||
return (node_levels[lhs_node_id] < node_levels[rhs_node_id]) || | ||
(node_levels[lhs_node_id] == node_levels[rhs_node_id] && | ||
lhs_parent_col_id < rhs_parent_col_id) || | ||
(node_levels[lhs_node_id] == node_levels[rhs_node_id] && | ||
lhs_parent_col_id == rhs_parent_col_id && col_ids[lhs_node_id] < col_ids[rhs_node_id]); | ||
} | ||
}; | ||
|
||
struct parent_nodeids_to_colids { | ||
device_span<NodeIndexT const> rev_mapped_col_ids; | ||
__device__ auto operator()(NodeIndexT parent_node_id) -> NodeIndexT | ||
{ | ||
return parent_node_id == parent_node_sentinel ? parent_node_sentinel | ||
: rev_mapped_col_ids[parent_node_id]; | ||
} | ||
}; | ||
|
||
/** | ||
* @brief Reduces node tree representation to column tree CSR representation. | ||
* | ||
* @param node_tree Node tree representation of JSON string | ||
* @param original_col_ids Column ids of nodes | ||
* @param row_offsets Row offsets of nodes | ||
* @param is_array_of_arrays Whether the tree is an array of arrays | ||
* @param row_array_parent_col_id Column id of row array, if is_array_of_arrays is true | ||
* @param stream CUDA stream used for device memory operations and kernel launches | ||
* @return A tuple of column tree representation of JSON string, column ids of columns, and | ||
* max row offsets of columns | ||
*/ | ||
std::tuple<compressed_sparse_row, column_tree_properties> reduce_to_column_tree( | ||
tree_meta_t& node_tree, | ||
device_span<NodeIndexT const> original_col_ids, | ||
device_span<NodeIndexT const> sorted_col_ids, | ||
device_span<NodeIndexT const> ordered_node_ids, | ||
device_span<row_offset_t const> row_offsets, | ||
bool is_array_of_arrays, | ||
NodeIndexT row_array_parent_col_id, | ||
rmm::cuda_stream_view stream) | ||
{ | ||
CUDF_FUNC_RANGE(); | ||
|
||
if (original_col_ids.empty()) { | ||
rmm::device_uvector<NodeIndexT> empty_row_idx(0, stream); | ||
rmm::device_uvector<NodeIndexT> empty_col_idx(0, stream); | ||
rmm::device_uvector<NodeT> empty_column_categories(0, stream); | ||
rmm::device_uvector<row_offset_t> empty_max_row_offsets(0, stream); | ||
rmm::device_uvector<NodeIndexT> empty_mapped_col_ids(0, stream); | ||
return std::tuple{compressed_sparse_row{std::move(empty_row_idx), std::move(empty_col_idx)}, | ||
column_tree_properties{std::move(empty_column_categories), | ||
std::move(empty_max_row_offsets), | ||
std::move(empty_mapped_col_ids)}}; | ||
} | ||
|
||
auto [unpermuted_tree, unpermuted_col_ids, unpermuted_max_row_offsets] = | ||
cudf::io::json::detail::reduce_to_column_tree(node_tree, | ||
original_col_ids, | ||
sorted_col_ids, | ||
ordered_node_ids, | ||
row_offsets, | ||
is_array_of_arrays, | ||
row_array_parent_col_id, | ||
stream); | ||
|
||
NodeIndexT num_columns = unpermuted_col_ids.size(); | ||
|
||
auto mapped_col_ids = cudf::detail::make_device_uvector_async( | ||
unpermuted_col_ids, stream, cudf::get_current_device_resource_ref()); | ||
rmm::device_uvector<NodeIndexT> rev_mapped_col_ids(num_columns, stream); | ||
rmm::device_uvector<NodeIndexT> reordering_index(unpermuted_col_ids.size(), stream); | ||
|
||
thrust::sequence( | ||
rmm::exec_policy_nosync(stream), reordering_index.begin(), reordering_index.end()); | ||
// Reorder nodes and column ids in level-wise fashion | ||
thrust::sort_by_key( | ||
rmm::exec_policy_nosync(stream), | ||
reordering_index.begin(), | ||
reordering_index.end(), | ||
mapped_col_ids.begin(), | ||
level_ordering{ | ||
unpermuted_tree.node_levels, unpermuted_col_ids, unpermuted_tree.parent_node_ids}); | ||
|
||
{ | ||
auto mapped_col_ids_copy = cudf::detail::make_device_uvector_async( | ||
mapped_col_ids, stream, cudf::get_current_device_resource_ref()); | ||
thrust::sequence( | ||
rmm::exec_policy_nosync(stream), rev_mapped_col_ids.begin(), rev_mapped_col_ids.end()); | ||
thrust::sort_by_key(rmm::exec_policy_nosync(stream), | ||
mapped_col_ids_copy.begin(), | ||
mapped_col_ids_copy.end(), | ||
rev_mapped_col_ids.begin()); | ||
} | ||
|
||
rmm::device_uvector<NodeIndexT> parent_col_ids(num_columns, stream); | ||
thrust::transform_output_iterator parent_col_ids_it(parent_col_ids.begin(), | ||
parent_nodeids_to_colids{rev_mapped_col_ids}); | ||
rmm::device_uvector<row_offset_t> max_row_offsets(num_columns, stream); | ||
rmm::device_uvector<NodeT> column_categories(num_columns, stream); | ||
thrust::copy_n( | ||
rmm::exec_policy_nosync(stream), | ||
thrust::make_zip_iterator(thrust::make_permutation_iterator( | ||
unpermuted_tree.parent_node_ids.begin(), reordering_index.begin()), | ||
thrust::make_permutation_iterator(unpermuted_max_row_offsets.begin(), | ||
reordering_index.begin()), | ||
thrust::make_permutation_iterator( | ||
unpermuted_tree.node_categories.begin(), reordering_index.begin())), | ||
num_columns, | ||
thrust::make_zip_iterator( | ||
parent_col_ids_it, max_row_offsets.begin(), column_categories.begin())); | ||
|
||
#ifdef CSR_DEBUG_PRINT | ||
print<NodeIndexT>(reordering_index, "h_reordering_index", stream); | ||
print<NodeIndexT>(mapped_col_ids, "h_mapped_col_ids", stream); | ||
print<NodeIndexT>(rev_mapped_col_ids, "h_rev_mapped_col_ids", stream); | ||
print<NodeIndexT>(parent_col_ids, "h_parent_col_ids", stream); | ||
print<row_offset_t>(max_row_offsets, "h_max_row_offsets", stream); | ||
#endif | ||
|
||
auto construct_row_idx = [&stream](NodeIndexT num_columns, | ||
device_span<NodeIndexT const> parent_col_ids) { | ||
auto row_idx = cudf::detail::make_zeroed_device_uvector_async<NodeIndexT>( | ||
static_cast<std::size_t>(num_columns + 1), stream, cudf::get_current_device_resource_ref()); | ||
// Note that the first element of csr_parent_col_ids is -1 (parent_node_sentinel) | ||
// children adjacency | ||
|
||
auto num_non_leaf_columns = thrust::unique_count( | ||
rmm::exec_policy_nosync(stream), parent_col_ids.begin() + 1, parent_col_ids.end()); | ||
rmm::device_uvector<NodeIndexT> non_leaf_nodes(num_non_leaf_columns, stream); | ||
rmm::device_uvector<NodeIndexT> non_leaf_nodes_children(num_non_leaf_columns, stream); | ||
thrust::reduce_by_key(rmm::exec_policy_nosync(stream), | ||
parent_col_ids.begin() + 1, | ||
parent_col_ids.end(), | ||
thrust::make_constant_iterator(1), | ||
non_leaf_nodes.begin(), | ||
non_leaf_nodes_children.begin(), | ||
thrust::equal_to<TreeDepthT>()); | ||
|
||
thrust::scatter(rmm::exec_policy_nosync(stream), | ||
non_leaf_nodes_children.begin(), | ||
non_leaf_nodes_children.end(), | ||
non_leaf_nodes.begin(), | ||
row_idx.begin() + 1); | ||
|
||
if (num_columns > 1) { | ||
thrust::transform_inclusive_scan( | ||
rmm::exec_policy_nosync(stream), | ||
thrust::make_zip_iterator(thrust::make_counting_iterator(1), row_idx.begin() + 1), | ||
thrust::make_zip_iterator(thrust::make_counting_iterator(1) + num_columns, row_idx.end()), | ||
row_idx.begin() + 1, | ||
cuda::proclaim_return_type<NodeIndexT>([] __device__(auto a) { | ||
auto n = thrust::get<0>(a); | ||
auto idx = thrust::get<1>(a); | ||
return n == 1 ? idx : idx + 1; | ||
}), | ||
thrust::plus<NodeIndexT>{}); | ||
} else { | ||
auto single_node = 1; | ||
row_idx.set_element_async(1, single_node, stream); | ||
} | ||
|
||
#ifdef CSR_DEBUG_PRINT | ||
print<NodeIndexT>(row_idx, "h_row_idx", stream); | ||
#endif | ||
return row_idx; | ||
}; | ||
|
||
auto construct_col_idx = [&stream](NodeIndexT num_columns, | ||
device_span<NodeIndexT const> parent_col_ids, | ||
device_span<NodeIndexT const> row_idx) { | ||
rmm::device_uvector<NodeIndexT> col_idx((num_columns - 1) * 2, stream); | ||
thrust::fill(rmm::exec_policy_nosync(stream), col_idx.begin(), col_idx.end(), -1); | ||
// excluding root node, construct scatter map | ||
rmm::device_uvector<NodeIndexT> map(num_columns - 1, stream); | ||
thrust::inclusive_scan_by_key(rmm::exec_policy_nosync(stream), | ||
parent_col_ids.begin() + 1, | ||
parent_col_ids.end(), | ||
thrust::make_constant_iterator(1), | ||
map.begin()); | ||
thrust::for_each_n(rmm::exec_policy_nosync(stream), | ||
thrust::make_counting_iterator(1), | ||
num_columns - 1, | ||
[row_idx = row_idx.begin(), | ||
map = map.begin(), | ||
parent_col_ids = parent_col_ids.begin()] __device__(auto i) { | ||
auto parent_col_id = parent_col_ids[i]; | ||
if (parent_col_id == 0) | ||
--map[i - 1]; | ||
else | ||
map[i - 1] += row_idx[parent_col_id]; | ||
}); | ||
thrust::scatter(rmm::exec_policy_nosync(stream), | ||
thrust::make_counting_iterator(1), | ||
thrust::make_counting_iterator(1) + num_columns - 1, | ||
map.begin(), | ||
col_idx.begin()); | ||
|
||
// Skip the parent of root node | ||
thrust::scatter(rmm::exec_policy_nosync(stream), | ||
parent_col_ids.begin() + 1, | ||
parent_col_ids.end(), | ||
row_idx.begin() + 1, | ||
col_idx.begin()); | ||
|
||
#ifdef CSR_DEBUG_PRINT | ||
print<NodeIndexT>(col_idx, "h_col_idx", stream); | ||
#endif | ||
|
||
return col_idx; | ||
}; | ||
|
||
/* | ||
5. CSR construction: | ||
a. Sort column levels and get their ordering | ||
b. For each column node coln iterated according to sorted_column_levels; do | ||
i. Find nodes that have coln as the parent node -> set adj_coln | ||
ii. row idx[coln] = size of adj_coln + 1 | ||
iii. col idx[coln] = adj_coln U {parent_col_id[coln]} | ||
*/ | ||
auto row_idx = construct_row_idx(num_columns, parent_col_ids); | ||
auto col_idx = construct_col_idx(num_columns, parent_col_ids, row_idx); | ||
|
||
return std::tuple{ | ||
compressed_sparse_row{std::move(row_idx), std::move(col_idx)}, | ||
column_tree_properties{ | ||
std::move(column_categories), std::move(max_row_offsets), std::move(mapped_col_ids)}}; | ||
} | ||
|
||
} // namespace experimental::detail | ||
} // namespace cudf::io::json |
Oops, something went wrong.