Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream parameter to List Manipulation and Operations APIs #14248

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/include/cudf/lists/combine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ enum class concatenate_null_policy { IGNORE, NULLIFY_OUTPUT_ROW };
* @param input Table of lists to be concatenated.
* @param null_policy The parameter to specify whether a null list element will be ignored from
* concatenation, or any concatenation involving a null element will result in a null list.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
* @return A new column in which each row is a list resulted from concatenating all list elements in
* the corresponding row of the input table.
*/
std::unique_ptr<column> concatenate_rows(
table_view const& input,
concatenate_null_policy null_policy = concatenate_null_policy::IGNORE,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -86,13 +88,15 @@ std::unique_ptr<column> concatenate_rows(
* @param input The lists column containing lists of list elements to concatenate.
* @param null_policy The parameter to specify whether a null list element will be ignored from
* concatenation, or any concatenation involving a null element will result in a null list.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
* @return A new column in which each row is a list resulted from concatenating all list elements in
* the corresponding row of the input lists column.
*/
std::unique_ptr<column> concatenate_list_elements(
column_view const& input,
concatenate_null_policy null_policy = concatenate_null_policy::IGNORE,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
14 changes: 12 additions & 2 deletions cpp/include/cudf/lists/contains.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ namespace lists {
*
* @param lists Lists column whose `n` rows are to be searched
* @param search_key The scalar key to be looked up in each list row
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
* @return BOOL8 column of `n` rows with the result of the lookup
*/
std::unique_ptr<column> contains(
cudf::lists_column_view const& lists,
cudf::scalar const& search_key,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -63,13 +65,15 @@ std::unique_ptr<column> contains(
* 2. The list row `lists[i]` is null
*
* @param lists Lists column whose `n` rows are to be searched
* @param search_keys Column of elements to be looked up in each list row
* @param search_keys Column of elements to be looked up in each list row.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
* @return BOOL8 column of `n` rows with the result of the lookup
*/
std::unique_ptr<column> contains(
cudf::lists_column_view const& lists,
cudf::column_view const& search_keys,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -84,12 +88,14 @@ std::unique_ptr<column> contains(
* A row with an empty list will always return false.
* Nulls inside non-null nested elements (such as lists or structs) are not considered.
*
* @param lists Lists column whose `n` rows are to be searched
* @param lists Lists column whose `n` rows are to be searched.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
* @return BOOL8 column of `n` rows with the result of the lookup
*/
std::unique_ptr<column> contains_nulls(
cudf::lists_column_view const& lists,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -125,13 +131,15 @@ enum class duplicate_find_option : int32_t {
* @param search_key The scalar key to be looked up in each list row
* @param find_option Whether to return the position of the first match (`FIND_FIRST`) or
* last (`FIND_LAST`)
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return column of `n` rows with the location of the `search_key`
*/
std::unique_ptr<column> index_of(
cudf::lists_column_view const& lists,
cudf::scalar const& search_key,
duplicate_find_option find_option = duplicate_find_option::FIND_FIRST,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -160,13 +168,15 @@ std::unique_ptr<column> index_of(
* `lists`
* @param find_option Whether to return the position of the first match (`FIND_FIRST`) or
* last (`FIND_LAST`)
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return column of `n` rows with the location of the `search_key`
*/
std::unique_ptr<column> index_of(
cudf::lists_column_view const& lists,
cudf::column_view const& search_keys,
duplicate_find_option find_option = duplicate_find_option::FIND_FIRST,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/cudf/lists/count_elements.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ namespace lists {
* in the output column.
*
* @param input Input lists column
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return New column with the number of elements for each row
*/
std::unique_ptr<column> count_elements(
lists_column_view const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of lists_elements group
Expand Down
45 changes: 33 additions & 12 deletions cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf/copying.hpp>
#include <cudf/detail/concatenate.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/dictionary/encode.hpp>
#include <cudf/fixed_point/fixed_point.hpp>
Expand Down Expand Up @@ -1281,6 +1282,11 @@ class dictionary_column_wrapper<std::string> : public detail::column_wrapper {
template <typename T, typename SourceElementT = T>
class lists_column_wrapper : public detail::column_wrapper {
public:
/**
* @brief Cast to lists_column_view
*/
operator lists_column_view() const { return cudf::lists_column_view{wrapped->view()}; }

/**
* @brief Construct a lists column containing a single list of fixed-width
* type from an initializer list of values.
Expand Down Expand Up @@ -1542,8 +1548,12 @@ class lists_column_wrapper : public detail::column_wrapper {
rmm::device_buffer&& null_mask)
{
// construct the list column
wrapped = make_lists_column(
num_rows, std::move(offsets), std::move(values), null_count, std::move(null_mask));
wrapped = make_lists_column(num_rows,
std::move(offsets),
std::move(values),
null_count,
std::move(null_mask),
cudf::test::get_default_stream());
}

/**
Expand Down Expand Up @@ -1618,8 +1628,12 @@ class lists_column_wrapper : public detail::column_wrapper {
}();

// construct the list column
wrapped = make_lists_column(
cols.size(), std::move(offsets), std::move(data), null_count, std::move(null_mask));
wrapped = make_lists_column(cols.size(),
std::move(offsets),
std::move(data),
null_count,
std::move(null_mask),
cudf::test::get_default_stream());
}

/**
Expand Down Expand Up @@ -1647,8 +1661,12 @@ class lists_column_wrapper : public detail::column_wrapper {
depth = 0;

size_type num_elements = offsets->size() == 0 ? 0 : offsets->size() - 1;
wrapped =
make_lists_column(num_elements, std::move(offsets), std::move(c), 0, rmm::device_buffer{});
wrapped = make_lists_column(num_elements,
std::move(offsets),
std::move(c),
0,
rmm::device_buffer{},
cudf::test::get_default_stream());
}

/**
Expand Down Expand Up @@ -1697,12 +1715,15 @@ class lists_column_wrapper : public detail::column_wrapper {
}

lists_column_view lcv(col);
return make_lists_column(col.size(),
std::make_unique<column>(lcv.offsets()),
normalize_column(lists_column_view(col).child(),
lists_column_view(expected_hierarchy).child()),
col.null_count(),
copy_bitmask(col));
return make_lists_column(
col.size(),
std::make_unique<column>(lcv.offsets()),
normalize_column(lists_column_view(col).child(),
lists_column_view(expected_hierarchy).child()),
col.null_count(),
cudf::detail::copy_bitmask(
col, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()),
cudf::test::get_default_stream());
}

std::pair<std::vector<column_view>, std::vector<std::unique_ptr<column>>> preprocess_columns(
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/lists/combine/concatenate_list_elements.cu
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,11 @@ std::unique_ptr<column> concatenate_list_elements(column_view const& input,
*/
std::unique_ptr<column> concatenate_list_elements(column_view const& input,
concatenate_null_policy null_policy,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::concatenate_list_elements(input, null_policy, cudf::get_default_stream(), mr);
return detail::concatenate_list_elements(input, null_policy, stream, mr);
}

} // namespace lists
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/lists/combine/concatenate_rows.cu
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,11 @@ std::unique_ptr<column> concatenate_rows(table_view const& input,
*/
std::unique_ptr<column> concatenate_rows(table_view const& input,
concatenate_null_policy null_policy,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::concatenate_rows(input, null_policy, cudf::get_default_stream(), mr);
return detail::concatenate_rows(input, null_policy, stream, mr);
}

} // namespace lists
Expand Down
37 changes: 21 additions & 16 deletions cpp/src/lists/contains.cu
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ std::unique_ptr<column> index_of(lists_column_view const& lists,
}

auto search_key_col = cudf::make_column_from_scalar(search_key, lists.size(), stream, mr);
return index_of(lists, search_key_col->view(), find_option, stream, mr);
return detail::index_of(lists, search_key_col->view(), find_option, stream, mr);
}

std::unique_ptr<column> index_of(lists_column_view const& lists,
Expand All @@ -306,11 +306,11 @@ std::unique_ptr<column> contains(lists_column_view const& lists,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto key_indices = index_of(lists,
search_key,
duplicate_find_option::FIND_FIRST,
stream,
rmm::mr::get_current_device_resource());
auto key_indices = detail::index_of(lists,
search_key,
duplicate_find_option::FIND_FIRST,
stream,
rmm::mr::get_current_device_resource());
return to_contains(std::move(key_indices), stream, mr);
}

Expand All @@ -322,11 +322,11 @@ std::unique_ptr<column> contains(lists_column_view const& lists,
CUDF_EXPECTS(search_keys.size() == lists.size(),
"Number of search keys must match list column size.");

auto key_indices = index_of(lists,
search_keys,
duplicate_find_option::FIND_FIRST,
stream,
rmm::mr::get_current_device_resource());
auto key_indices = detail::index_of(lists,
search_keys,
duplicate_find_option::FIND_FIRST,
stream,
rmm::mr::get_current_device_resource());
return to_contains(std::move(key_indices), stream, mr);
}

Expand Down Expand Up @@ -364,43 +364,48 @@ std::unique_ptr<column> contains_nulls(lists_column_view const& lists,

std::unique_ptr<column> contains(lists_column_view const& lists,
cudf::scalar const& search_key,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::contains(lists, search_key, cudf::get_default_stream(), mr);
return detail::contains(lists, search_key, stream, mr);
}

std::unique_ptr<column> contains(lists_column_view const& lists,
column_view const& search_keys,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::contains(lists, search_keys, cudf::get_default_stream(), mr);
return detail::contains(lists, search_keys, stream, mr);
}

std::unique_ptr<column> contains_nulls(lists_column_view const& lists,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::contains_nulls(lists, cudf::get_default_stream(), mr);
return detail::contains_nulls(lists, stream, mr);
}

std::unique_ptr<column> index_of(lists_column_view const& lists,
cudf::scalar const& search_key,
duplicate_find_option find_option,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::index_of(lists, search_key, find_option, cudf::get_default_stream(), mr);
return detail::index_of(lists, search_key, find_option, stream, mr);
}

std::unique_ptr<column> index_of(lists_column_view const& lists,
column_view const& search_keys,
duplicate_find_option find_option,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::index_of(lists, search_keys, find_option, cudf::get_default_stream(), mr);
return detail::index_of(lists, search_keys, find_option, stream, mr);
}

} // namespace cudf::lists
5 changes: 3 additions & 2 deletions cpp/src/lists/copying/concatenate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/detail/concatenate_masks.hpp>
#include <cudf/detail/get_value.cuh>
#include <cudf/detail/null_mask.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/lists/lists_column_view.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -123,8 +124,8 @@ std::unique_ptr<column> concatenate(host_span<column_view const> columns,
// if any of the input columns have nulls, construct the output mask
bool const has_nulls =
std::any_of(columns.begin(), columns.end(), [](auto const& col) { return col.has_nulls(); });
rmm::device_buffer null_mask = create_null_mask(
total_list_count, has_nulls ? mask_state::UNINITIALIZED : mask_state::UNALLOCATED);
rmm::device_buffer null_mask = cudf::detail::create_null_mask(
total_list_count, has_nulls ? mask_state::UNINITIALIZED : mask_state::UNALLOCATED, stream, mr);
auto null_mask_data = static_cast<bitmask_type*>(null_mask.data());
auto const null_count =
has_nulls ? cudf::detail::concatenate_masks(columns, null_mask_data, stream) : size_type{0};
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/lists/count_elements.cu
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ std::unique_ptr<column> count_elements(lists_column_view const& input,
// external APIS

std::unique_ptr<column> count_elements(lists_column_view const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::count_elements(input, cudf::get_default_stream(), mr);
return detail::count_elements(input, stream, mr);
}

} // namespace lists
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ ConfigureTest(
)
ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_TEXT_TEST streams/text/ngrams_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)

# ##################################################################################################
# Install tests ####################################################################################
Expand Down
Loading