Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use stream pool for gather/scatter. #14162

Draft
wants to merge 2 commits into
base: branch-23.10
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions cpp/include/cudf/detail/gather.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/assert.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
Expand Down Expand Up @@ -656,34 +657,49 @@ std::unique_ptr<table> gather(table_view const& source_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
std::vector<std::unique_ptr<column>> destination_columns;

// TODO: Could be beneficial to use streams internally here

for (auto const& source_column : source_table) {
// The data gather for n columns will be put on the first n streams
destination_columns.push_back(
cudf::type_dispatcher<dispatch_storage_type>(source_column.type(),
column_gatherer{},
source_column,
gather_map_begin,
gather_map_end,
bounds_policy == out_of_bounds_policy::NULLIFY,
stream,
mr));
auto const num_columns = source_table.num_columns();
auto result = std::vector<std::unique_ptr<column>>(num_columns);

// The data gather for n columns will be executed over n streams. If there is
// only a single column, the fork/join overhead should be avoided.
auto streams = std::vector<rmm::cuda_stream_view>{};
if (num_columns > 1) {
streams = cudf::detail::fork_streams(stream, num_columns);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious how this works for per-thread default stream. For spark, we build cuDF with PTDS. Will streams will be have number-of-columns vector of the PTDS stream?

Copy link
Contributor Author

@bdice bdice Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream passed in would be the PTDS stream. Then a stream pool (for that thread) would be created (or reused), the work would be executed across that stream pool, and then the join step would insert events for all the elements of streams to be synchronized with stream before new work on stream (the PTDS stream in Spark's case) would be runnable.

} else {
streams.push_back(stream);
}

auto it = thrust::make_counting_iterator<size_type>(0);

std::transform(it, it + num_columns, result.begin(), [&](size_type i) {
auto const& source_column = source_table.column(i);
return cudf::type_dispatcher<dispatch_storage_type>(
source_column.type(),
column_gatherer{},
source_column,
gather_map_begin,
gather_map_end,
bounds_policy == out_of_bounds_policy::NULLIFY,
streams[i],
mr);
});

auto const nullable = bounds_policy == out_of_bounds_policy::NULLIFY ||
std::any_of(source_table.begin(), source_table.end(), [](auto const& col) {
return col.nullable();
});
if (nullable) {
auto const op = bounds_policy == out_of_bounds_policy::NULLIFY ? gather_bitmask_op::NULLIFY
: gather_bitmask_op::DONT_CHECK;
gather_bitmask(source_table, gather_map_begin, destination_columns, op, stream, mr);
gather_bitmask(source_table, gather_map_begin, result, op, stream, mr);
}

return std::make_unique<table>(std::move(destination_columns));
// Join streams as late as possible so that null mask computations can run on
// the passed in stream while other streams are gathering. Skip joining if
// only one column, since it used the passed in stream rather than forking.
if (num_columns > 1) { cudf::detail::join_streams(streams, stream); }

return std::make_unique<table>(std::move(result));
}

} // namespace detail
Expand Down
49 changes: 33 additions & 16 deletions cpp/include/cudf/detail/scatter.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf/detail/gather.cuh>
#include <cudf/detail/indexalator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/dictionary/detail/update_keys.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
Expand Down Expand Up @@ -405,22 +406,32 @@ std::unique_ptr<table> scatter(table_view const& source,
thrust::make_transform_iterator(scatter_map_begin, index_converter<MapType>{target.num_rows()});
auto updated_scatter_map_end =
thrust::make_transform_iterator(scatter_map_end, index_converter<MapType>{target.num_rows()});
auto result = std::vector<std::unique_ptr<column>>(target.num_columns());

std::transform(source.begin(),
source.end(),
target.begin(),
result.begin(),
[=](auto const& source_col, auto const& target_col) {
return type_dispatcher<dispatch_storage_type>(source_col.type(),
column_scatterer{},
source_col,
updated_scatter_map_begin,
updated_scatter_map_end,
target_col,
stream,
mr);
});

auto const num_columns = target.num_columns();
auto result = std::vector<std::unique_ptr<column>>(num_columns);

// The data scatter for n columns will be executed over n streams. If there is
// only a single column, the fork/join overhead should be avoided.
auto streams = std::vector<rmm::cuda_stream_view>{};
if (num_columns > 1) {
streams = cudf::detail::fork_streams(stream, num_columns);
} else {
streams.push_back(stream);
}

auto it = thrust::make_counting_iterator<size_type>(0);

std::transform(it, it + num_columns, result.begin(), [&](size_type i) {
auto const& source_col = source.column(i);
return type_dispatcher<dispatch_storage_type>(source_col.type(),
column_scatterer{},
source_col,
updated_scatter_map_begin,
updated_scatter_map_end,
target.column(i),
streams[i],
mr);
});

// We still need to call `gather_bitmask` even when the source columns are not nullable,
// as if the target has null_mask, that null_mask needs to be updated after scattering.
Expand Down Expand Up @@ -451,6 +462,12 @@ std::unique_ptr<table> scatter(table_view const& source,
}
});
}

// Join streams as late as possible so that null mask computations can run on
// the passed in stream while other streams are scattering. Skip joining if
// only one column, since it used the passed in stream rather than forking.
if (num_columns > 1) { cudf::detail::join_streams(streams, stream); }

return std::make_unique<table>(std::move(result));
}
} // namespace detail
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/detail/utilities/stream_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace cudf::detail {
* auto const num_streams = 2;
* // do work on stream
* // allocate streams and wait for an event on stream before executing on any of streams
* auto streams = cudf::detail::fork_stream(stream, num_streams);
* auto streams = cudf::detail::fork_streams(stream, num_streams);
* // do work on streams[0] and streams[1]
* // wait for event on streams before continuing to do work on stream
* cudf::detail::join_streams(streams, stream);
Expand Down