Skip to content

Commit

Permalink
Replace direct cudaMemcpyAsync calls with utility functions (within…
Browse files Browse the repository at this point in the history
… `/src`) (#17550)

Replaced the calls to `cudaMemcpyAsync` with the new `cuda_memcpy`/`cuda_memcpy_async` utility, which optionally avoids using the copy engine.

Also took the opportunity to use cudf::detail::host_vector and its factories to enable wider pinned memory use.

Remaining instances are either not viable (e.g. copying `h_needs_fallback`, interop) or D2D copies.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Nghia Truong (https://github.com/ttnghia)

URL: #17550
  • Loading branch information
vuule authored Dec 16, 2024
1 parent e975ca3 commit a5ac4bf
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 109 deletions.
2 changes: 1 addition & 1 deletion cpp/include/cudf/detail/device_scalar.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class device_scalar : public rmm::device_scalar<T> {
[[nodiscard]] T value(rmm::cuda_stream_view stream) const
{
cuda_memcpy<T>(bounce_buffer, device_span<T const>{this->data(), 1}, stream);
return bounce_buffer[0];
return std::move(bounce_buffer[0]);
}

void set_value_async(T const& value, rmm::cuda_stream_view stream)
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/bitmask/is_element_valid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include <cudf/detail/is_element_valid.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/error.hpp>

Expand All @@ -30,15 +31,14 @@ bool is_element_valid_sync(column_view const& col_view,
CUDF_EXPECTS(element_index >= 0 and element_index < col_view.size(), "invalid index.");
if (!col_view.nullable()) { return true; }

bitmask_type word = 0;
// null_mask() returns device ptr to bitmask without offset
size_type const index = element_index + col_view.offset();
CUDF_CUDA_TRY(cudaMemcpyAsync(&word,
col_view.null_mask() + word_index(index),
sizeof(bitmask_type),
cudaMemcpyDefault,
stream.value()));
stream.synchronize();

auto const word =
cudf::detail::make_host_vector_sync(
device_span<bitmask_type const>{col_view.null_mask() + word_index(index), 1}, stream)
.front();

return static_cast<bool>(word & (bitmask_type{1} << intra_word_index(index)));
}

Expand Down
16 changes: 5 additions & 11 deletions cpp/src/column/column_device_view.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/error.hpp>

Expand Down Expand Up @@ -60,13 +61,12 @@ create_device_view_from_view(ColumnView const& source, rmm::cuda_stream_view str
// A buffer of CPU memory is allocated to hold the ColumnDeviceView
// objects. Once filled, the CPU memory is copied to device memory
// and then set into the d_children member pointer.
std::vector<char> staging_buffer(descendant_storage_bytes);
auto staging_buffer = detail::make_host_vector<char>(descendant_storage_bytes, stream);

// Each ColumnDeviceView instance may have child objects that
// require setting some internal device pointers before being copied
// from CPU to device.
rmm::device_buffer* const descendant_storage =
new rmm::device_buffer(descendant_storage_bytes, stream);
auto const descendant_storage = new rmm::device_uvector<char>(descendant_storage_bytes, stream);

auto deleter = [descendant_storage](ColumnDeviceView* v) {
v->destroy();
Expand All @@ -77,13 +77,7 @@ create_device_view_from_view(ColumnView const& source, rmm::cuda_stream_view str
new ColumnDeviceView(source, staging_buffer.data(), descendant_storage->data()), deleter};

// copy the CPU memory with all the children into device memory
CUDF_CUDA_TRY(cudaMemcpyAsync(descendant_storage->data(),
staging_buffer.data(),
descendant_storage->size(),
cudaMemcpyDefault,
stream.value()));

stream.synchronize();
detail::cuda_memcpy<char>(*descendant_storage, staging_buffer, stream);

return result;
}
Expand Down
95 changes: 43 additions & 52 deletions cpp/src/copying/contiguous_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,8 @@ struct packed_split_indices_and_src_buf_info {
src_buf_info_size(
cudf::util::round_up_safe(num_src_bufs * sizeof(src_buf_info), split_align)),
// host-side
h_indices_and_source_info(indices_size + src_buf_info_size),
h_indices_and_source_info{
detail::make_host_vector<uint8_t>(indices_size + src_buf_info_size, stream)},
h_indices{reinterpret_cast<size_type*>(h_indices_and_source_info.data())},
h_src_buf_info{
reinterpret_cast<src_buf_info*>(h_indices_and_source_info.data() + indices_size)}
Expand All @@ -1025,15 +1026,18 @@ struct packed_split_indices_and_src_buf_info {
reinterpret_cast<size_type*>(reinterpret_cast<uint8_t*>(d_indices_and_source_info.data()) +
indices_size + src_buf_info_size);

CUDF_CUDA_TRY(cudaMemcpyAsync(
d_indices, h_indices, indices_size + src_buf_info_size, cudaMemcpyDefault, stream.value()));
detail::cuda_memcpy_async<uint8_t>(
device_span<uint8_t>{static_cast<uint8_t*>(d_indices_and_source_info.data()),
h_indices_and_source_info.size()},
h_indices_and_source_info,
stream);
}

size_type const indices_size;
std::size_t const src_buf_info_size;
std::size_t offset_stack_size;

std::vector<uint8_t> h_indices_and_source_info;
detail::host_vector<uint8_t> h_indices_and_source_info;
rmm::device_buffer d_indices_and_source_info;

size_type* const h_indices;
Expand All @@ -1055,27 +1059,26 @@ struct packed_partition_buf_size_and_dst_buf_info {
buf_sizes_size{cudf::util::round_up_safe(num_partitions * sizeof(std::size_t), split_align)},
dst_buf_info_size{cudf::util::round_up_safe(num_bufs * sizeof(dst_buf_info), split_align)},
// host-side
h_buf_sizes_and_dst_info(buf_sizes_size + dst_buf_info_size),
h_buf_sizes_and_dst_info{
detail::make_host_vector<uint8_t>(buf_sizes_size + dst_buf_info_size, stream)},
h_buf_sizes{reinterpret_cast<std::size_t*>(h_buf_sizes_and_dst_info.data())},
h_dst_buf_info{
reinterpret_cast<dst_buf_info*>(h_buf_sizes_and_dst_info.data() + buf_sizes_size)},
reinterpret_cast<dst_buf_info*>(h_buf_sizes_and_dst_info.data() + buf_sizes_size),
num_bufs,
h_buf_sizes_and_dst_info.get_allocator().is_device_accessible()},
// device-side
d_buf_sizes_and_dst_info(buf_sizes_size + dst_buf_info_size, stream, temp_mr),
d_buf_sizes_and_dst_info(h_buf_sizes_and_dst_info.size(), stream, temp_mr),
d_buf_sizes{reinterpret_cast<std::size_t*>(d_buf_sizes_and_dst_info.data())},
// destination buffer info
d_dst_buf_info{reinterpret_cast<dst_buf_info*>(
static_cast<uint8_t*>(d_buf_sizes_and_dst_info.data()) + buf_sizes_size)}
d_dst_buf_info{
reinterpret_cast<dst_buf_info*>(d_buf_sizes_and_dst_info.data() + buf_sizes_size), num_bufs}
{
}

void copy_to_host()
{
// DtoH buf sizes and col info back to the host
CUDF_CUDA_TRY(cudaMemcpyAsync(h_buf_sizes,
d_buf_sizes,
buf_sizes_size + dst_buf_info_size,
cudaMemcpyDefault,
stream.value()));
detail::cuda_memcpy_async<uint8_t>(h_buf_sizes_and_dst_info, d_buf_sizes_and_dst_info, stream);
}

rmm::cuda_stream_view const stream;
Expand All @@ -1084,13 +1087,13 @@ struct packed_partition_buf_size_and_dst_buf_info {
std::size_t const buf_sizes_size;
std::size_t const dst_buf_info_size;

std::vector<uint8_t> h_buf_sizes_and_dst_info;
detail::host_vector<uint8_t> h_buf_sizes_and_dst_info;
std::size_t* const h_buf_sizes;
dst_buf_info* const h_dst_buf_info;
host_span<dst_buf_info> const h_dst_buf_info;

rmm::device_buffer d_buf_sizes_and_dst_info;
rmm::device_uvector<uint8_t> d_buf_sizes_and_dst_info;
std::size_t* const d_buf_sizes;
dst_buf_info* const d_dst_buf_info;
device_span<dst_buf_info> const d_dst_buf_info;
};

// Packed block of memory 3:
Expand All @@ -1106,11 +1109,12 @@ struct packed_src_and_dst_pointers {
src_bufs_size{cudf::util::round_up_safe(num_src_bufs * sizeof(uint8_t*), split_align)},
dst_bufs_size{cudf::util::round_up_safe(num_partitions * sizeof(uint8_t*), split_align)},
// host-side
h_src_and_dst_buffers(src_bufs_size + dst_bufs_size),
h_src_and_dst_buffers{
detail::make_host_vector<uint8_t>(src_bufs_size + dst_bufs_size, stream)},
h_src_bufs{reinterpret_cast<uint8_t const**>(h_src_and_dst_buffers.data())},
h_dst_bufs{reinterpret_cast<uint8_t**>(h_src_and_dst_buffers.data() + src_bufs_size)},
// device-side
d_src_and_dst_buffers{rmm::device_buffer(src_bufs_size + dst_bufs_size, stream, temp_mr)},
d_src_and_dst_buffers{h_src_and_dst_buffers.size(), stream, temp_mr},
d_src_bufs{reinterpret_cast<uint8_t const**>(d_src_and_dst_buffers.data())},
d_dst_bufs{reinterpret_cast<uint8_t**>(
reinterpret_cast<uint8_t*>(d_src_and_dst_buffers.data()) + src_bufs_size)}
Expand All @@ -1121,18 +1125,18 @@ struct packed_src_and_dst_pointers {

void copy_to_device()
{
CUDF_CUDA_TRY(cudaMemcpyAsync(d_src_and_dst_buffers.data(),
h_src_and_dst_buffers.data(),
src_bufs_size + dst_bufs_size,
cudaMemcpyDefault,
stream.value()));
detail::cuda_memcpy_async<uint8_t>(
device_span<uint8_t>{static_cast<uint8_t*>(d_src_and_dst_buffers.data()),
d_src_and_dst_buffers.size()},
h_src_and_dst_buffers,
stream);
}

rmm::cuda_stream_view const stream;
std::size_t const src_bufs_size;
std::size_t const dst_bufs_size;

std::vector<uint8_t> h_src_and_dst_buffers;
detail::host_vector<uint8_t> h_src_and_dst_buffers;
uint8_t const** const h_src_bufs;
uint8_t** const h_dst_bufs;

Expand Down Expand Up @@ -1205,7 +1209,7 @@ std::unique_ptr<packed_partition_buf_size_and_dst_buf_info> compute_splits(
std::make_unique<packed_partition_buf_size_and_dst_buf_info>(
num_partitions, num_bufs, stream, temp_mr);

auto const d_dst_buf_info = partition_buf_size_and_dst_buf_info->d_dst_buf_info;
auto const d_dst_buf_info = partition_buf_size_and_dst_buf_info->d_dst_buf_info.begin();
auto const d_buf_sizes = partition_buf_size_and_dst_buf_info->d_buf_sizes;

auto const split_indices_and_src_buf_info = packed_split_indices_and_src_buf_info(
Expand Down Expand Up @@ -1518,26 +1522,19 @@ std::unique_ptr<chunk_iteration_state> chunk_iteration_state::create(
*/
if (user_buffer_size != 0) {
// copy the batch offsets back to host
std::vector<std::size_t> h_offsets(num_batches + 1);
{
rmm::device_uvector<std::size_t> offsets(h_offsets.size(), stream, temp_mr);
auto const h_offsets = [&] {
rmm::device_uvector<std::size_t> offsets(num_batches + 1, stream, temp_mr);
auto const batch_byte_size_iter = cudf::detail::make_counting_transform_iterator(
0, batch_byte_size_function{num_batches, d_batched_dst_buf_info.begin()});

thrust::exclusive_scan(rmm::exec_policy(stream, temp_mr),
thrust::exclusive_scan(rmm::exec_policy_nosync(stream, temp_mr),
batch_byte_size_iter,
batch_byte_size_iter + num_batches + 1,
batch_byte_size_iter + offsets.size(),
offsets.begin());

CUDF_CUDA_TRY(cudaMemcpyAsync(h_offsets.data(),
offsets.data(),
sizeof(std::size_t) * offsets.size(),
cudaMemcpyDefault,
stream.value()));

// the next part is working on the CPU, so we want to synchronize here
stream.synchronize();
}
return detail::make_host_vector_sync(offsets, stream);
}();

std::vector<std::size_t> num_batches_per_iteration;
std::vector<std::size_t> size_of_batches_per_iteration;
Expand Down Expand Up @@ -1699,7 +1696,7 @@ void copy_data(int num_batches_to_copy,
int starting_batch,
uint8_t const** d_src_bufs,
uint8_t** d_dst_bufs,
rmm::device_uvector<dst_buf_info>& d_dst_buf_info,
device_span<dst_buf_info> d_dst_buf_info,
uint8_t* user_buffer,
rmm::cuda_stream_view stream)
{
Expand Down Expand Up @@ -1833,15 +1830,9 @@ struct contiguous_split_state {
keys + num_batches_total,
values,
thrust::make_discard_iterator(),
dst_valid_count_output_iterator{d_orig_dst_buf_info});

CUDF_CUDA_TRY(cudaMemcpyAsync(h_orig_dst_buf_info,
d_orig_dst_buf_info,
partition_buf_size_and_dst_buf_info->dst_buf_info_size,
cudaMemcpyDefault,
stream.value()));
dst_valid_count_output_iterator{d_orig_dst_buf_info.begin()});

stream.synchronize();
detail::cuda_memcpy<dst_buf_info>(h_orig_dst_buf_info, d_orig_dst_buf_info, stream);

// not necessary for the non-chunked case, but it makes it so further calls to has_next
// return false, just in case
Expand Down Expand Up @@ -1889,7 +1880,7 @@ struct contiguous_split_state {
}

auto& h_dst_buf_info = partition_buf_size_and_dst_buf_info->h_dst_buf_info;
auto cur_dst_buf_info = h_dst_buf_info;
auto cur_dst_buf_info = h_dst_buf_info.data();
detail::metadata_builder mb{input.num_columns()};

populate_metadata(input.begin(), input.end(), cur_dst_buf_info, mb);
Expand Down Expand Up @@ -1927,7 +1918,7 @@ struct contiguous_split_state {

// Second pass: uses `dst_buf_info` to break down the work into 1MB batches.
chunk_iter_state = compute_batches(num_bufs,
partition_buf_size_and_dst_buf_info->d_dst_buf_info,
partition_buf_size_and_dst_buf_info->d_dst_buf_info.data(),
partition_buf_size_and_dst_buf_info->h_buf_sizes,
num_partitions,
user_buffer_size,
Expand Down Expand Up @@ -1963,7 +1954,7 @@ struct contiguous_split_state {
auto& h_dst_buf_info = partition_buf_size_and_dst_buf_info->h_dst_buf_info;
auto& h_dst_bufs = src_and_dst_pointers->h_dst_bufs;

auto cur_dst_buf_info = h_dst_buf_info;
auto cur_dst_buf_info = h_dst_buf_info.data();
detail::metadata_builder mb(input.num_columns());

for (std::size_t idx = 0; idx < num_partitions; idx++) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

#include "csv_common.hpp"
#include "csv_gpu.hpp"
#include "cudf/detail/utilities/cuda_memcpy.hpp"
#include "io/comp/io_uncomp.hpp"
#include "io/utilities/column_buffer.hpp"
#include "io/utilities/hostdevice_vector.hpp"
#include "io/utilities/parsing_utils.cuh"

#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/io/csv.hpp>
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* @brief cuDF-IO ORC writer class implementation
*/

#include "cudf/detail/utilities/cuda_memcpy.hpp"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/orc/orc_gpu.hpp"
#include "io/statistics/column_statistics.cuh"
Expand All @@ -30,6 +29,7 @@
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/batched_memcpy.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/logger.hpp>
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/reductions/minmax.cu
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,8 @@ struct minmax_functor {
auto dev_result = reduce<cudf::string_view>(col, stream);
// copy the minmax_pair to the host; does not copy the strings
using OutputType = minmax_pair<cudf::string_view>;
OutputType host_result;
CUDF_CUDA_TRY(cudaMemcpyAsync(
&host_result, dev_result.data(), sizeof(OutputType), cudaMemcpyDefault, stream.value()));

auto const host_result = dev_result.value(stream);
// strings are copied to create the scalars here
return {std::make_unique<string_scalar>(host_result.min_val, true, stream, mr),
std::make_unique<string_scalar>(host_result.max_val, true, stream, mr)};
Expand All @@ -236,10 +235,8 @@ struct minmax_functor {
// compute minimum and maximum values
auto dev_result = reduce<T>(col, stream);
// copy the minmax_pair to the host to call get_element
using OutputType = minmax_pair<T>;
OutputType host_result;
CUDF_CUDA_TRY(cudaMemcpyAsync(
&host_result, dev_result.data(), sizeof(OutputType), cudaMemcpyDefault, stream.value()));
using OutputType = minmax_pair<T>;
OutputType host_result = dev_result.value(stream);
// get the keys for those indexes
auto const keys = dictionary_column_view(col).keys();
return {detail::get_element(keys, static_cast<size_type>(host_result.min_val), stream, mr),
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/scalar/scalar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,10 @@ string_scalar::operator std::string() const { return this->to_string(cudf::get_d

std::string string_scalar::to_string(rmm::cuda_stream_view stream) const
{
std::string result;
result.resize(_data.size());
CUDF_CUDA_TRY(
cudaMemcpyAsync(&result[0], _data.data(), _data.size(), cudaMemcpyDefault, stream.value()));
stream.synchronize();
std::string result(size(), '\0');
detail::cuda_memcpy(host_span<char>{result.data(), result.size()},
device_span<char const>{data(), _data.size()},
stream);
return result;
}

Expand Down
Loading

0 comments on commit a5ac4bf

Please sign in to comment.