Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use KvikIO to enable file's fast host read and host write #17764

Draft
wants to merge 6 commits into
base: branch-25.04
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 2 additions & 19 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ option(CUDA_ENABLE_LINEINFO
option(CUDA_WARNINGS_AS_ERRORS "Enable -Werror=all-warnings for all CUDA compilation" ON)
# cudart can be statically linked or dynamically linked. The python ecosystem wants dynamic linking
option(CUDA_STATIC_RUNTIME "Statically link the CUDA runtime" OFF)
option(CUDA_STATIC_CUFILE "Statically link cuFile" OFF)

set(DEFAULT_CUDF_BUILD_STREAMS_TEST_UTIL ON)
if(CUDA_STATIC_RUNTIME OR NOT BUILD_SHARED_LIBS)
Expand Down Expand Up @@ -924,15 +923,6 @@ target_compile_definitions(
# Enable remote IO through KvikIO
target_compile_definitions(cudf PRIVATE $<$<BOOL:${CUDF_KVIKIO_REMOTE_IO}>:CUDF_KVIKIO_REMOTE_IO>)

# Enable cuFile support
set(_cufile_suffix)
if(CUDA_STATIC_CUFILE)
set(_cufile_suffix _static)
endif()
if(TARGET CUDA::cuFile${_cufile_suffix})
target_compile_definitions(cudf PRIVATE CUDF_CUFILE_FOUND)
endif()

# Remove this after upgrading to a CCCL that has a proper CMake option. See
# https://github.com/NVIDIA/cccl/pull/2844
target_compile_definitions(cudf PRIVATE THRUST_FORCE_32_BIT_OFFSET_TYPE=1)
Expand All @@ -944,15 +934,8 @@ add_dependencies(cudf jitify_preprocess_run)
target_link_libraries(
cudf
PUBLIC CCCL::CCCL rmm::rmm rmm::rmm_logger $<BUILD_LOCAL_INTERFACE:BS::thread_pool> cudf_logger
PRIVATE $<BUILD_LOCAL_INTERFACE:nvtx3::nvtx3-cpp>
cuco::cuco
ZLIB::ZLIB
nvcomp::nvcomp
kvikio::kvikio
$<TARGET_NAME_IF_EXISTS:CUDA::cuFile${_cufile_suffix}>
nanoarrow
rmm::rmm_logger_impl
cudf_logger_impl
PRIVATE $<BUILD_LOCAL_INTERFACE:nvtx3::nvtx3-cpp> cuco::cuco ZLIB::ZLIB nvcomp::nvcomp
kvikio::kvikio nanoarrow rmm::rmm_logger_impl cudf_logger_impl
)

# Add Conda library, and include paths if specified
Expand Down
21 changes: 3 additions & 18 deletions cpp/include/cudf/io/config_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,22 +19,7 @@

namespace CUDF_EXPORT cudf {
namespace io {
namespace cufile_integration {

/**
* @brief Returns true if cuFile and its compatibility mode are enabled.
*/
bool is_always_enabled();

/**
* @brief Returns true if only direct IO through cuFile is enabled (compatibility mode is disabled).
*/
bool is_gds_enabled();

/**
* @brief Returns true if KvikIO is enabled.
*/
bool is_kvikio_enabled();
namespace kvikio_integration {

/**
* @brief Set KvikIO parameters, including:
Expand All @@ -45,7 +30,7 @@ bool is_kvikio_enabled();
*/
void set_up_kvikio();

} // namespace cufile_integration
} // namespace kvikio_integration

namespace nvcomp_integration {

Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/io/data_sink.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -122,7 +122,7 @@ class data_sink {
*
* In the case where the sink type is itself a memory buffered write, this ends up
* being effectively a second memcpy. So a useful optimization for a "smart"
* custom data_sink is to do it's own internal management of the movement
* custom data_sink is to do its own internal management of the movement
* of data between cpu and gpu; turning the internals of the writer into simply
*
* sink->device_write(device_buffer, size)
Expand Down
31 changes: 3 additions & 28 deletions cpp/src/io/utilities/config_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,7 @@

namespace cudf::io {

namespace cufile_integration {

namespace {
/**
* @brief Defines which cuFile usage to enable.
*/
enum class usage_policy : uint8_t { OFF, GDS, ALWAYS, KVIKIO };

/**
* @brief Get the current usage policy.
*/
usage_policy get_env_policy()
{
static auto const env_val = getenv_or<std::string>("LIBCUDF_CUFILE_POLICY", "KVIKIO");
if (env_val == "OFF") return usage_policy::OFF;
if (env_val == "GDS") return usage_policy::GDS;
if (env_val == "ALWAYS") return usage_policy::ALWAYS;
if (env_val == "KVIKIO") return usage_policy::KVIKIO;
CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val);
}
} // namespace

bool is_always_enabled() { return get_env_policy() == usage_policy::ALWAYS; }

bool is_gds_enabled() { return is_always_enabled() or get_env_policy() == usage_policy::GDS; }

bool is_kvikio_enabled() { return get_env_policy() == usage_policy::KVIKIO; }
namespace kvikio_integration {

void set_up_kvikio()
{
Expand All @@ -63,7 +37,8 @@ void set_up_kvikio()
kvikio::defaults::thread_pool_nthreads_reset(nthreads);
});
}
} // namespace cufile_integration

} // namespace kvikio_integration

namespace nvcomp_integration {

Expand Down
58 changes: 19 additions & 39 deletions cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

#include <rmm/cuda_stream_view.hpp>

#include <fstream>

namespace cudf {
namespace io {

Expand All @@ -38,46 +36,35 @@ class file_sink : public data_sink {
explicit file_sink(std::string const& filepath)
{
detail::force_init_cuda_context();
_output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc);
if (!_output_stream.is_open()) { detail::throw_on_file_open_failure(filepath, true); }

if (cufile_integration::is_kvikio_enabled()) {
cufile_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath, "w");
CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.",
_kvikio_file.is_compat_mode_preferred() ? "on" : "off");
} else {
_cufile_out = detail::make_cufile_output(filepath);
}
kvikio_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath, "w");
CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.",
_kvikio_file.is_compat_mode_preferred() ? "on" : "off");
}

// Marked as NOLINT because we are calling a virtual method in the destructor
~file_sink() override { flush(); } // NOLINT

void host_write(void const* data, size_t size) override
{
_output_stream.seekp(_bytes_written);
_output_stream.write(static_cast<char const*>(data), size);
_kvikio_file.pwrite(data, size, _bytes_written).get();
_bytes_written += size;
}

void flush() override { _output_stream.flush(); }
void flush() override
{
// kvikio::FileHandle::pwrite() makes system calls that reach the kernel buffer cache. This
// process does not involve application buffer. Therefore calls to ::fflush() or
// ofstream::flush() do not apply.
}

size_t bytes_written() override { return _bytes_written; }

[[nodiscard]] bool supports_device_write() const override
{
return !_kvikio_file.closed() || _cufile_out != nullptr;
}
[[nodiscard]] bool supports_device_write() const override { return !_kvikio_file.closed(); }

[[nodiscard]] bool is_device_write_preferred(size_t size) const override
{
if (!supports_device_write()) { return false; }

// Always prefer device writes if kvikio is enabled
if (!_kvikio_file.closed()) { return true; }

return size >= _gds_write_preferred_threshold;
return supports_device_write();
}

std::future<void> device_write_async(void const* gpu_data,
Expand All @@ -89,14 +76,11 @@ class file_sink : public data_sink {
size_t const offset = _bytes_written;
_bytes_written += size;

if (!_kvikio_file.closed()) {
// KvikIO's `pwrite()` returns a `std::future<size_t>` so we convert it
// to `std::future<void>`
return std::async(std::launch::deferred, [this, gpu_data, size, offset] {
_kvikio_file.pwrite(gpu_data, size, offset).get();
});
}
return _cufile_out->write_async(gpu_data, offset, size);
// KvikIO's `pwrite()` returns a `std::future<size_t>` so we convert it
// to `std::future<void>`
return std::async(std::launch::deferred, [this, gpu_data, size, offset]() -> void {
_kvikio_file.pwrite(gpu_data, size, offset).get();
});
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
Expand All @@ -105,12 +89,8 @@ class file_sink : public data_sink {
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
std::unique_ptr<detail::cufile_output_impl> _cufile_out;
kvikio::FileHandle _kvikio_file;
// The write size above which GDS is faster then d2h-copy + posix-write
static constexpr size_t _gds_write_preferred_threshold = 128 << 10; // 128KB
};

/**
Expand Down Expand Up @@ -162,7 +142,7 @@ class void_sink : public data_sink {
rmm::cuda_stream_view stream) override
{
_bytes_written += size;
return std::async(std::launch::deferred, [] {});
return std::async(std::launch::deferred, []() -> void {});
}

void flush() override {}
Expand Down
Loading
Loading