From d8d49596a011e6313df54f20830c5cd7cd49eef7 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 17 Jan 2025 15:08:08 -0500 Subject: [PATCH 1/9] Remove legacy cuFile integration --- cpp/CMakeLists.txt | 21 +- cpp/include/cudf/io/config_utils.hpp | 12 +- cpp/src/io/utilities/config_utils.cpp | 8 +- cpp/src/io/utilities/data_sink.cpp | 40 +-- cpp/src/io/utilities/datasource.cpp | 30 +-- cpp/src/io/utilities/file_io_utilities.cpp | 300 +-------------------- cpp/src/io/utilities/file_io_utilities.hpp | 171 +----------- cpp/tests/CMakeLists.txt | 5 - cpp/tests/io/file_io_test.cpp | 43 --- docs/cudf/source/user_guide/io/io.md | 16 +- 10 files changed, 27 insertions(+), 619 deletions(-) delete mode 100644 cpp/tests/io/file_io_test.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 354560998c5..a9d70cfa09f 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -79,7 +79,6 @@ option(CUDA_ENABLE_LINEINFO option(CUDA_WARNINGS_AS_ERRORS "Enable -Werror=all-warnings for all CUDA compilation" ON) # cudart can be statically linked or dynamically linked. The python ecosystem wants dynamic linking option(CUDA_STATIC_RUNTIME "Statically link the CUDA runtime" OFF) -option(CUDA_STATIC_CUFILE "Statically link cuFile" OFF) set(DEFAULT_CUDF_BUILD_STREAMS_TEST_UTIL ON) if(CUDA_STATIC_RUNTIME OR NOT BUILD_SHARED_LIBS) @@ -924,15 +923,6 @@ target_compile_definitions( # Enable remote IO through KvikIO target_compile_definitions(cudf PRIVATE $<$:CUDF_KVIKIO_REMOTE_IO>) -# Enable cuFile support -set(_cufile_suffix) -if(CUDA_STATIC_CUFILE) - set(_cufile_suffix _static) -endif() -if(TARGET CUDA::cuFile${_cufile_suffix}) - target_compile_definitions(cudf PRIVATE CUDF_CUFILE_FOUND) -endif() - # Remove this after upgrading to a CCCL that has a proper CMake option. See # https://github.com/NVIDIA/cccl/pull/2844 target_compile_definitions(cudf PRIVATE THRUST_FORCE_32_BIT_OFFSET_TYPE=1) @@ -944,15 +934,8 @@ add_dependencies(cudf jitify_preprocess_run) target_link_libraries( cudf PUBLIC CCCL::CCCL rmm::rmm rmm::rmm_logger $ cudf_logger - PRIVATE $ - cuco::cuco - ZLIB::ZLIB - nvcomp::nvcomp - kvikio::kvikio - $ - nanoarrow - rmm::rmm_logger_impl - cudf_logger_impl + PRIVATE $ cuco::cuco ZLIB::ZLIB nvcomp::nvcomp + kvikio::kvikio nanoarrow rmm::rmm_logger_impl cudf_logger_impl ) # Add Conda library, and include paths if specified diff --git a/cpp/include/cudf/io/config_utils.hpp b/cpp/include/cudf/io/config_utils.hpp index 070b59a117c..50b7f54069e 100644 --- a/cpp/include/cudf/io/config_utils.hpp +++ b/cpp/include/cudf/io/config_utils.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,16 +21,6 @@ namespace CUDF_EXPORT cudf { namespace io { namespace cufile_integration { -/** - * @brief Returns true if cuFile and its compatibility mode are enabled. - */ -bool is_always_enabled(); - -/** - * @brief Returns true if only direct IO through cuFile is enabled (compatibility mode is disabled). - */ -bool is_gds_enabled(); - /** * @brief Returns true if KvikIO is enabled. */ diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index 726feca328b..e3be8afa681 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -30,7 +30,7 @@ namespace { /** * @brief Defines which cuFile usage to enable. */ -enum class usage_policy : uint8_t { OFF, GDS, ALWAYS, KVIKIO }; +enum class usage_policy : uint8_t { OFF, KVIKIO }; /** * @brief Get the current usage policy. @@ -39,17 +39,11 @@ usage_policy get_env_policy() { static auto const env_val = getenv_or("LIBCUDF_CUFILE_POLICY", "KVIKIO"); if (env_val == "OFF") return usage_policy::OFF; - if (env_val == "GDS") return usage_policy::GDS; - if (env_val == "ALWAYS") return usage_policy::ALWAYS; if (env_val == "KVIKIO") return usage_policy::KVIKIO; CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val); } } // namespace -bool is_always_enabled() { return get_env_policy() == usage_policy::ALWAYS; } - -bool is_gds_enabled() { return is_always_enabled() or get_env_policy() == usage_policy::GDS; } - bool is_kvikio_enabled() { return get_env_policy() == usage_policy::KVIKIO; } void set_up_kvikio() diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 975206646c6..23e6d6e2921 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -41,14 +41,10 @@ class file_sink : public data_sink { _output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc); if (!_output_stream.is_open()) { detail::throw_on_file_open_failure(filepath, true); } - if (cufile_integration::is_kvikio_enabled()) { - cufile_integration::set_up_kvikio(); - _kvikio_file = kvikio::FileHandle(filepath, "w"); - CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.", - _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); - } else { - _cufile_out = detail::make_cufile_output(filepath); - } + cufile_integration::set_up_kvikio(); + _kvikio_file = kvikio::FileHandle(filepath, "w"); + CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.", + _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } // Marked as NOLINT because we are calling a virtual method in the destructor @@ -65,19 +61,11 @@ class file_sink : public data_sink { size_t bytes_written() override { return _bytes_written; } - [[nodiscard]] bool supports_device_write() const override - { - return !_kvikio_file.closed() || _cufile_out != nullptr; - } + [[nodiscard]] bool supports_device_write() const override { return !_kvikio_file.closed(); } [[nodiscard]] bool is_device_write_preferred(size_t size) const override { - if (!supports_device_write()) { return false; } - - // Always prefer device writes if kvikio is enabled - if (!_kvikio_file.closed()) { return true; } - - return size >= _gds_write_preferred_threshold; + return supports_device_write(); } std::future device_write_async(void const* gpu_data, @@ -89,14 +77,11 @@ class file_sink : public data_sink { size_t const offset = _bytes_written; _bytes_written += size; - if (!_kvikio_file.closed()) { - // KvikIO's `pwrite()` returns a `std::future` so we convert it - // to `std::future` - return std::async(std::launch::deferred, [this, gpu_data, size, offset] { - _kvikio_file.pwrite(gpu_data, size, offset).get(); - }); - } - return _cufile_out->write_async(gpu_data, offset, size); + // KvikIO's `pwrite()` returns a `std::future` so we convert it + // to `std::future` + return std::async(std::launch::deferred, [this, gpu_data, size, offset] { + _kvikio_file.pwrite(gpu_data, size, offset).get(); + }); } void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override @@ -107,10 +92,7 @@ class file_sink : public data_sink { private: std::ofstream _output_stream; size_t _bytes_written = 0; - std::unique_ptr _cufile_out; kvikio::FileHandle _kvikio_file; - // The write size above which GDS is faster then d2h-copy + posix-write - static constexpr size_t _gds_write_preferred_threshold = 128 << 10; // 128KB }; /** diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 87b3c6facdf..dc3d6d3a28c 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -52,14 +52,10 @@ class file_source : public datasource { explicit file_source(char const* filepath) : _file(filepath, O_RDONLY) { detail::force_init_cuda_context(); - if (cufile_integration::is_kvikio_enabled()) { - cufile_integration::set_up_kvikio(); - _kvikio_file = kvikio::FileHandle(filepath); - CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode %s.", - _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); - } else { - _cufile_in = detail::make_cufile_input(filepath); - } + cufile_integration::set_up_kvikio(); + _kvikio_file = kvikio::FileHandle(filepath); + CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode %s.", + _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } std::unique_ptr host_read(size_t offset, size_t size) override @@ -88,19 +84,11 @@ class file_source : public datasource { ~file_source() override = default; - [[nodiscard]] bool supports_device_read() const override - { - return !_kvikio_file.closed() || _cufile_in != nullptr; - } + [[nodiscard]] bool supports_device_read() const override { return !_kvikio_file.closed(); } [[nodiscard]] bool is_device_read_preferred(size_t size) const override { - if (!supports_device_read()) { return false; } - - // Always prefer device reads if kvikio is enabled - if (!_kvikio_file.closed()) { return true; } - - return size >= _gds_read_preferred_threshold; + return supports_device_read(); } std::future device_read_async(size_t offset, @@ -111,8 +99,7 @@ class file_source : public datasource { CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); auto const read_size = std::min(size, _file.size() - offset); - if (!_kvikio_file.closed()) { return _kvikio_file.pread(dst, read_size, offset); } - return _cufile_in->read_async(offset, read_size, dst, stream); + return _kvikio_file.pread(dst, read_size, offset); } size_t device_read(size_t offset, @@ -140,10 +127,7 @@ class file_source : public datasource { detail::file_wrapper _file; private: - std::unique_ptr _cufile_in; kvikio::FileHandle _kvikio_file; - // The read size above which GDS is faster then posix-read + h2d-copy - static constexpr size_t _gds_read_preferred_threshold = 128 << 10; // 128KB }; /** diff --git a/cpp/src/io/utilities/file_io_utilities.cpp b/cpp/src/io/utilities/file_io_utilities.cpp index 28367c95430..d03ead3e5d6 100644 --- a/cpp/src/io/utilities/file_io_utilities.cpp +++ b/cpp/src/io/utilities/file_io_utilities.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -86,304 +86,6 @@ file_wrapper::file_wrapper(std::string const& filepath, int flags, mode_t mode) file_wrapper::~file_wrapper() { close(fd); } -#ifdef CUDF_CUFILE_FOUND - -/** - * @brief Class that dynamically loads the cuFile library and manages the cuFile driver. - */ -class cufile_shim { - private: - cufile_shim(); - void modify_cufile_json() const; - void load_cufile_lib(); - - void* cf_lib = nullptr; - decltype(cuFileDriverOpen)* driver_open = nullptr; - decltype(cuFileDriverClose)* driver_close = nullptr; - - std::unique_ptr init_error; - [[nodiscard]] auto is_valid() const noexcept { return init_error == nullptr; } - - public: - cufile_shim(cufile_shim const&) = delete; - cufile_shim& operator=(cufile_shim const&) = delete; - - static cufile_shim const* instance(); - - ~cufile_shim() - { - // Explicit cuFile driver close should not be performed here to avoid segfault. However, in the - // absence of driver_close(), cuFile will implicitly do that, which in most cases causes - // segfault anyway. TODO: Revisit this conundrum once cuFile is fixed. - // https://github.com/rapidsai/cudf/issues/17121 - - if (cf_lib != nullptr) dlclose(cf_lib); - } - - decltype(cuFileHandleRegister)* handle_register = nullptr; - decltype(cuFileHandleDeregister)* handle_deregister = nullptr; - decltype(cuFileRead)* read = nullptr; - decltype(cuFileWrite)* write = nullptr; -}; - -void cufile_shim::modify_cufile_json() const -{ - std::string const json_path_env_var = "CUFILE_ENV_PATH_JSON"; - static temp_directory const tmp_config_dir{"cudf_cufile_config"}; - - // Modify the config file based on the policy - auto const config_file_path = getenv_or(json_path_env_var, "/etc/cufile.json"); - std::ifstream user_config_file(config_file_path); - // Modified config file is stored in a temporary directory - auto const cudf_config_path = tmp_config_dir.path() + "cufile.json"; - std::ofstream cudf_config_file(cudf_config_path); - - std::string line; - while (std::getline(user_config_file, line)) { - std::string const tag = "\"allow_compat_mode\""; - if (line.find(tag) != std::string::npos) { - // TODO: only replace the true/false value instead of replacing the whole line - // Enable compatibility mode when cuDF does not fall back to host path - cudf_config_file << tag << ": " - << (cufile_integration::is_always_enabled() ? "true" : "false") << ",\n"; - } else { - cudf_config_file << line << '\n'; - } - - // Point libcufile to the modified config file - CUDF_EXPECTS(setenv(json_path_env_var.c_str(), cudf_config_path.c_str(), 0) == 0, - "Failed to set the cuFile config file environment variable."); - } -} - -void cufile_shim::load_cufile_lib() -{ - for (auto&& name : {"libcufile.so.0", - // Prior to CUDA 11.7.1, although ABI - // compatibility was maintained, some (at least - // Debian) packages do not have the .0 symlink, - // instead request the exact version. - "libcufile.so.1.3.0" /* 11.7.0 */, - "libcufile.so.1.2.1" /* 11.6.2, 11.6.1 */, - "libcufile.so.1.2.0" /* 11.6.0 */, - "libcufile.so.1.1.1" /* 11.5.1 */, - "libcufile.so.1.1.0" /* 11.5.0 */, - "libcufile.so.1.0.2" /* 11.4.4, 11.4.3, 11.4.2 */, - "libcufile.so.1.0.1" /* 11.4.1 */, - "libcufile.so.1.0.0" /* 11.4.0 */}) { - cf_lib = dlopen(name, RTLD_LAZY | RTLD_LOCAL | RTLD_NODELETE); - if (cf_lib != nullptr) break; - } - CUDF_EXPECTS(cf_lib != nullptr, "Failed to load cuFile library"); - driver_open = reinterpret_cast(dlsym(cf_lib, "cuFileDriverOpen")); - CUDF_EXPECTS(driver_open != nullptr, "could not find cuFile cuFileDriverOpen symbol"); - driver_close = reinterpret_cast(dlsym(cf_lib, "cuFileDriverClose")); - CUDF_EXPECTS(driver_close != nullptr, "could not find cuFile cuFileDriverClose symbol"); - handle_register = - reinterpret_cast(dlsym(cf_lib, "cuFileHandleRegister")); - CUDF_EXPECTS(handle_register != nullptr, "could not find cuFile cuFileHandleRegister symbol"); - handle_deregister = - reinterpret_cast(dlsym(cf_lib, "cuFileHandleDeregister")); - CUDF_EXPECTS(handle_deregister != nullptr, "could not find cuFile cuFileHandleDeregister symbol"); - read = reinterpret_cast(dlsym(cf_lib, "cuFileRead")); - CUDF_EXPECTS(read != nullptr, "could not find cuFile cuFileRead symbol"); - write = reinterpret_cast(dlsym(cf_lib, "cuFileWrite")); - CUDF_EXPECTS(write != nullptr, "could not find cuFile cuFileWrite symbol"); -} - -cufile_shim::cufile_shim() -{ - try { - modify_cufile_json(); - load_cufile_lib(); - - CUDF_EXPECTS(driver_open().err == CU_FILE_SUCCESS, "Failed to initialize cuFile driver"); - } catch (cudf::logic_error const& err) { - init_error = std::make_unique(err); - } -} - -cufile_shim const* cufile_shim::instance() -{ - static cufile_shim _instance; - // Defer throwing to avoid repeated attempts to load the library - if (!_instance.is_valid()) CUDF_FAIL("" + std::string(_instance.init_error->what())); - - return &_instance; -} - -void cufile_registered_file::register_handle() -{ - CUfileDescr_t cufile_desc{}; - cufile_desc.handle.fd = _file.desc(); - cufile_desc.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD; - CUDF_EXPECTS(shim->handle_register(&cf_handle, &cufile_desc).err == CU_FILE_SUCCESS, - "Cannot register file handle with cuFile"); -} - -cufile_registered_file::~cufile_registered_file() { shim->handle_deregister(cf_handle); } - -cufile_input_impl::cufile_input_impl(std::string const& filepath) - : shim{cufile_shim::instance()}, - cf_file(shim, filepath, O_RDONLY | O_DIRECT), - // The benefit from multithreaded read plateaus around 16 threads - pool(getenv_or("LIBCUDF_CUFILE_THREAD_COUNT", 16)) -{ -} - -namespace { - -template > -std::vector> make_sliced_tasks( - F function, DataT* ptr, size_t offset, size_t size, BS::thread_pool& pool) -{ - constexpr size_t default_max_slice_size = 4 * 1024 * 1024; - static auto const max_slice_size = getenv_or("LIBCUDF_CUFILE_SLICE_SIZE", default_max_slice_size); - auto const slices = make_file_io_slices(size, max_slice_size); - std::vector> slice_tasks; - std::transform(slices.cbegin(), slices.cend(), std::back_inserter(slice_tasks), [&](auto& slice) { - return pool.submit_task( - [=] { return function(ptr + slice.offset, slice.size, offset + slice.offset); }); - }); - return slice_tasks; -} - -} // namespace - -std::future cufile_input_impl::read_async(size_t offset, - size_t size, - uint8_t* dst, - rmm::cuda_stream_view stream) -{ - int device = 0; - CUDF_CUDA_TRY(cudaGetDevice(&device)); - - auto read_slice = [device, gds_read = shim->read, file_handle = cf_file.handle()]( - void* dst, size_t size, size_t offset) -> ssize_t { - CUDF_CUDA_TRY(cudaSetDevice(device)); - auto read_size = gds_read(file_handle, dst, size, offset, 0); - CUDF_EXPECTS(read_size != -1, "cuFile error reading from a file"); - return read_size; - }; - - auto slice_tasks = make_sliced_tasks(read_slice, dst, offset, size, pool); - - auto waiter = [](auto slice_tasks) -> size_t { - return std::accumulate(slice_tasks.begin(), slice_tasks.end(), 0, [](auto sum, auto& task) { - return sum + task.get(); - }); - }; - // The future returned from this function is deferred, not async because we want to avoid creating - // threads for each read_async call. This overhead is significant in case of multiple small reads. - return std::async(std::launch::deferred, waiter, std::move(slice_tasks)); -} - -cufile_output_impl::cufile_output_impl(std::string const& filepath) - : shim{cufile_shim::instance()}, - cf_file(shim, filepath, O_CREAT | O_RDWR | O_DIRECT, 0664), - pool(getenv_or("LIBCUDF_CUFILE_THREAD_COUNT", 16)) -{ -} - -std::future cufile_output_impl::write_async(void const* data, size_t offset, size_t size) -{ - int device = 0; - CUDF_CUDA_TRY(cudaGetDevice(&device)); - - auto write_slice = [device, gds_write = shim->write, file_handle = cf_file.handle()]( - void const* src, size_t size, size_t offset) -> void { - CUDF_CUDA_TRY(cudaSetDevice(device)); - auto write_size = gds_write(file_handle, src, size, offset, 0); - CUDF_EXPECTS(write_size != -1 and write_size == static_cast(size), - "cuFile error writing to a file"); - }; - - auto source = static_cast(data); - auto slice_tasks = make_sliced_tasks(write_slice, source, offset, size, pool); - - auto waiter = [](auto slice_tasks) -> void { - for (auto const& task : slice_tasks) { - task.wait(); - } - }; - // The future returned from this function is deferred, not async because we want to avoid creating - // threads for each write_async call. This overhead is significant in case of multiple small - // writes. - return std::async(std::launch::deferred, waiter, std::move(slice_tasks)); -} -#else -cufile_input_impl::cufile_input_impl(std::string const& filepath) -{ - CUDF_FAIL("Cannot create cuFile source, current build was compiled without cuFile headers"); -} - -cufile_output_impl::cufile_output_impl(std::string const& filepath) -{ - CUDF_FAIL("Cannot create cuFile sink, current build was compiled without cuFile headers"); -} -#endif - -std::unique_ptr make_cufile_input(std::string const& filepath) -{ - if (cufile_integration::is_gds_enabled()) { - try { - auto cufile_in = std::make_unique(filepath); - CUDF_LOG_INFO("File successfully opened for reading with GDS."); - return cufile_in; - } catch (...) { - if (cufile_integration::is_always_enabled()) { - CUDF_LOG_ERROR( - "Failed to open file for reading with GDS. Enable bounce buffer fallback to read this " - "file."); - throw; - } - CUDF_LOG_INFO( - "Failed to open file for reading with GDS. Data will be read from the file using a bounce " - "buffer (possible performance impact)."); - } - } - return {}; -} - -std::unique_ptr make_cufile_output(std::string const& filepath) -{ - if (cufile_integration::is_gds_enabled()) { - try { - auto cufile_out = std::make_unique(filepath); - CUDF_LOG_INFO("File successfully opened for writing with GDS."); - return cufile_out; - } catch (...) { - if (cufile_integration::is_always_enabled()) { - CUDF_LOG_ERROR( - "Failed to open file for writing with GDS. Enable bounce buffer fallback to write to " - "this file."); - throw; - } - CUDF_LOG_INFO( - "Failed to open file for writing with GDS. Data will be written to the file using a bounce " - "buffer (possible performance impact)."); - } - } - return {}; -} - -std::vector make_file_io_slices(size_t size, size_t max_slice_size) -{ - max_slice_size = std::max(1024ul, max_slice_size); - auto const n_slices = util::div_rounding_up_safe(size, max_slice_size); - std::vector slices; - slices.reserve(n_slices); - std::generate_n(std::back_inserter(slices), n_slices, [&, idx = 0]() mutable { - auto const slice_offset = idx++ * max_slice_size; - auto const slice_size = std::min(size - slice_offset, max_slice_size); - return file_io_slice{slice_offset, slice_size}; - }); - - return slices; -} - } // namespace detail } // namespace io } // namespace cudf diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp index c844a596869..5f5b02dca4d 100644 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ b/cpp/src/io/utilities/file_io_utilities.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,6 @@ #pragma once -#ifdef CUDF_CUFILE_FOUND -#include - -#include -#include -#endif - #include #include #include @@ -54,161 +47,6 @@ class file_wrapper { [[nodiscard]] auto desc() const { return fd; } }; -/** - * @brief Interface class for cufile input. - */ -class cufile_input { - public: - /** - * @brief Asynchronously reads into existing device memory. - * - * @throws cudf::logic_error on cuFile error - * - * @param offset Number of bytes from the start - * @param size Number of bytes to read - * @param dst Address of the existing device memory - * @param stream CUDA stream to use - * - * @return The number of bytes read as an std::future - */ - virtual std::future read_async(size_t offset, - size_t size, - uint8_t* dst, - rmm::cuda_stream_view stream) = 0; -}; - -/** - * @brief Interface class for cufile output. - */ -class cufile_output { - public: - /** - * @brief Asynchronously writes the data from a device buffer into a file. - * - * It is the caller's responsibility to not invalidate `data` until the result from this function - * is synchronized. - * - * @throws cudf::logic_error on cuFile error - * - * @param data Pointer to the buffer to be written into the output file - * @param offset Number of bytes from the start - * @param size Number of bytes to write - */ - virtual std::future write_async(void const* data, size_t offset, size_t size) = 0; -}; - -#ifdef CUDF_CUFILE_FOUND - -class cufile_shim; - -/** - * @brief Class that provides RAII for cuFile file registration. - */ -class cufile_registered_file { - void register_handle(); - - public: - cufile_registered_file(cufile_shim const* shim, std::string const& filepath, int flags) - : _file(filepath, flags), shim{shim} - { - register_handle(); - } - - cufile_registered_file(cufile_shim const* shim, - std::string const& filepath, - int flags, - mode_t mode) - : _file(filepath, flags, mode), shim{shim} - { - register_handle(); - } - - [[nodiscard]] auto const& handle() const noexcept { return cf_handle; } - - ~cufile_registered_file(); - - private: - file_wrapper const _file; - CUfileHandle_t cf_handle = nullptr; - cufile_shim const* shim = nullptr; -}; - -/** - * @brief Adapter for the `cuFileRead` API. - * - * Exposes APIs to read directly from a file into device memory. - */ -class cufile_input_impl final : public cufile_input { - public: - cufile_input_impl(std::string const& filepath); - - std::future read_async(size_t offset, - size_t size, - uint8_t* dst, - rmm::cuda_stream_view stream) override; - - private: - cufile_shim const* shim = nullptr; - cufile_registered_file const cf_file; - BS::thread_pool pool; -}; - -/** - * @brief Adapter for the `cuFileWrite` API. - * - * Exposes an API to write directly into a file from device memory. - */ -class cufile_output_impl final : public cufile_output { - public: - cufile_output_impl(std::string const& filepath); - - std::future write_async(void const* data, size_t offset, size_t size) override; - - private: - cufile_shim const* shim = nullptr; - cufile_registered_file const cf_file; - BS::thread_pool pool; -}; -#else - -class cufile_input_impl final : public cufile_input { - public: - cufile_input_impl(std::string const& filepath); - std::future read_async(size_t offset, - size_t size, - uint8_t* dst, - rmm::cuda_stream_view stream) override - { - CUDF_FAIL("Only used to compile without cufile library, should not be called"); - } -}; - -class cufile_output_impl final : public cufile_output { - public: - cufile_output_impl(std::string const& filepath); - std::future write_async(void const* data, size_t offset, size_t size) override - { - CUDF_FAIL("Only used to compile without cufile library, should not be called"); - } -}; -#endif - -/** - * @brief Creates a `cufile_input_impl` object - * - * Returns a null pointer if an exception occurs in the `cufile_input_impl` constructor, or if the - * cuFile library is not installed. - */ -std::unique_ptr make_cufile_input(std::string const& filepath); - -/** - * @brief Creates a `cufile_output_impl` object - * - * Returns a null pointer if an exception occurs in the `cufile_output_impl` constructor, or if the - * cuFile library is not installed. - */ -std::unique_ptr make_cufile_output(std::string const& filepath); - /** * @brief Byte range to be read/written in a single operation. */ @@ -217,13 +55,6 @@ CUDF_EXPORT struct file_io_slice { size_t size; }; -/** - * @brief Split the total number of bytes to read/write into slices to enable parallel IO. - * - * If `max_slice_size` is below 1024, 1024 will be used instead to prevent potential misuse. - */ -CUDF_EXPORT std::vector make_file_io_slices(size_t size, size_t max_slice_size); - } // namespace detail } // namespace io } // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index e031597ed18..db26aba6697 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -306,11 +306,6 @@ ConfigureTest( GPUS 1 PERCENT 30 EXTRA_LIBS ${ARROW_LIBRARIES} ) -ConfigureTest( - FILE_IO_TEST io/file_io_test.cpp - GPUS 1 - PERCENT 30 -) ConfigureTest( ORC_TEST io/orc_chunked_reader_test.cu io/orc_test.cpp GPUS 1 diff --git a/cpp/tests/io/file_io_test.cpp b/cpp/tests/io/file_io_test.cpp deleted file mode 100644 index 1b85541687a..00000000000 --- a/cpp/tests/io/file_io_test.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include - -// Base test fixture for tests -struct CuFileIOTest : public cudf::test::BaseFixture {}; - -TEST_F(CuFileIOTest, SliceSize) -{ - std::vector> test_cases{ - {1 << 20, 1 << 18}, {1 << 18, 1 << 20}, {1 << 20, 3333}, {0, 1 << 18}, {0, 0}, {1 << 20, 0}}; - for (auto const& test_case : test_cases) { - auto const slices = cudf::io::detail::make_file_io_slices(test_case.first, test_case.second); - if (slices.empty()) { - ASSERT_EQ(test_case.first, 0); - } else { - ASSERT_EQ(slices.front().offset, 0); - ASSERT_EQ(slices.back().offset + slices.back().size, test_case.first); - for (auto i = 1u; i < slices.size(); ++i) { - ASSERT_EQ(slices[i].offset, slices[i - 1].offset + slices[i - 1].size); - } - } - } -} - -CUDF_TEST_PROGRAM_MAIN() diff --git a/docs/cudf/source/user_guide/io/io.md b/docs/cudf/source/user_guide/io/io.md index 7d863d890e2..859f5efa384 100644 --- a/docs/cudf/source/user_guide/io/io.md +++ b/docs/cudf/source/user_guide/io/io.md @@ -92,15 +92,11 @@ SDK is available for download included in CUDA Toolkit 11.4 and higher. Use of GPUDirect Storage in cuDF is disabled by default, but can be -enabled through the environment variable `LIBCUDF_CUFILE_POLICY`. +enabled through the environment variable `LIBCUDF_CUFILE_POLICY` and `KVIKIO_COMPAT_MODE`. This variable also controls the GDS compatibility mode. -There are four valid values for the environment variable: +There are two valid values for the environment variable: -- "GDS": Enable GDS use. If the cuFile library cannot be properly loaded, -fall back to the GDS compatibility mode. -- "ALWAYS": Enable GDS use. If the cuFile library cannot be properly loaded, -throw an exception. - "KVIKIO": Enable GDS compatibility mode through [KvikIO](https://github.com/rapidsai/kvikio). Note that KvikIO also provides the environment variable `KVIKIO_COMPAT_MODE` for GDS control that may alter the effect of "KVIKIO" option in cuDF: @@ -112,18 +108,12 @@ control that may alter the effect of "KVIKIO" option in cuDF: configuration check, and will error out if GDS requirements are not met. The only exceptional case is that if the system does not support files being opened with the `O_DIRECT` flag, the GDS compatibility mode will be used. -- "OFF": Completely disable GDS and kvikIO use. +- "OFF": Completely disable GDS and KvikIO use. If no value is set, behavior will be the same as the "KVIKIO" option. This environment variable also affects how cuDF treats GDS errors. -- When `LIBCUDF_CUFILE_POLICY` is set to "GDS" and a GDS API call - fails for any reason, cuDF falls back to the internal implementation - with bounce buffers. -- When `LIBCUDF_CUFILE_POLICY` is set to "ALWAYS" and a GDS API call -fails for any reason (unlikely, given that the compatibility mode is -on), cuDF throws an exception to propagate the error to the user. - When `LIBCUDF_CUFILE_POLICY` is set to "KVIKIO" and a KvikIO API call fails for any reason (unlikely, given that KvikIO implements its own compatibility mode) cuDF throws an exception to propagate From d22a36eb61224281062636b5aec88f8b083c8f04 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 17 Jan 2025 15:46:41 -0500 Subject: [PATCH 2/9] Remove OFF mode --- cpp/src/io/utilities/config_utils.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index e3be8afa681..89b9f8a48c4 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -30,7 +30,7 @@ namespace { /** * @brief Defines which cuFile usage to enable. */ -enum class usage_policy : uint8_t { OFF, KVIKIO }; +enum class usage_policy : uint8_t { KVIKIO }; /** * @brief Get the current usage policy. @@ -38,7 +38,6 @@ enum class usage_policy : uint8_t { OFF, KVIKIO }; usage_policy get_env_policy() { static auto const env_val = getenv_or("LIBCUDF_CUFILE_POLICY", "KVIKIO"); - if (env_val == "OFF") return usage_policy::OFF; if (env_val == "KVIKIO") return usage_policy::KVIKIO; CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val); } From da7085df38db18780ae3a1a3ec7e013b59d51d6b Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Sat, 18 Jan 2025 01:40:06 -0500 Subject: [PATCH 3/9] Use KvikIO for host_read/write --- cpp/src/io/utilities/data_sink.cpp | 14 +++---- cpp/src/io/utilities/datasource.cpp | 40 ++++++++------------ cpp/src/io/utilities/file_io_utilities.cpp | 43 ---------------------- cpp/src/io/utilities/file_io_utilities.hpp | 24 ------------ 4 files changed, 21 insertions(+), 100 deletions(-) diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 23e6d6e2921..abbef4ce410 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -25,8 +25,6 @@ #include -#include - namespace cudf { namespace io { @@ -38,9 +36,6 @@ class file_sink : public data_sink { explicit file_sink(std::string const& filepath) { detail::force_init_cuda_context(); - _output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc); - if (!_output_stream.is_open()) { detail::throw_on_file_open_failure(filepath, true); } - cufile_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath, "w"); CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.", @@ -52,12 +47,14 @@ class file_sink : public data_sink { void host_write(void const* data, size_t size) override { - _output_stream.seekp(_bytes_written); - _output_stream.write(static_cast(data), size); + _kvikio_file.pwrite(data, size, _bytes_written).get(); _bytes_written += size; } - void flush() override { _output_stream.flush(); } + void flush() override + { + // NOOP. _kvikio_file write is unbuffered and does not need flush. + } size_t bytes_written() override { return _bytes_written; } @@ -90,7 +87,6 @@ class file_sink : public data_sink { } private: - std::ofstream _output_stream; size_t _bytes_written = 0; kvikio::FileHandle _kvikio_file; }; diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index dc3d6d3a28c..ea98c7b953c 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -49,36 +49,29 @@ namespace { */ class file_source : public datasource { public: - explicit file_source(char const* filepath) : _file(filepath, O_RDONLY) + explicit file_source(char const* filepath) { detail::force_init_cuda_context(); cufile_integration::set_up_kvikio(); - _kvikio_file = kvikio::FileHandle(filepath); + _kvikio_file = kvikio::FileHandle(filepath, "r"); CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode %s.", _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } std::unique_ptr host_read(size_t offset, size_t size) override { - lseek(_file.desc(), offset, SEEK_SET); - // Clamp length to available data - ssize_t const read_size = std::min(size, _file.size() - offset); - + auto const read_size = std::min(size, _kvikio_file.nbytes() - offset); std::vector v(read_size); - CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed"); + CUDF_EXPECTS(_kvikio_file.pread(v.data(), read_size, offset).get() == read_size, "read failed"); return buffer::create(std::move(v)); } size_t host_read(size_t offset, size_t size, uint8_t* dst) override { - lseek(_file.desc(), offset, SEEK_SET); - // Clamp length to available data - auto const read_size = std::min(size, _file.size() - offset); - - CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast(read_size), - "read failed"); + auto const read_size = std::min(size, _kvikio_file.nbytes() - offset); + CUDF_EXPECTS(_kvikio_file.pread(dst, read_size, offset).get() == read_size, "read failed"); return read_size; } @@ -98,7 +91,7 @@ class file_source : public datasource { { CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - auto const read_size = std::min(size, _file.size() - offset); + auto const read_size = std::min(size, _kvikio_file.nbytes() - offset); return _kvikio_file.pread(dst, read_size, offset); } @@ -121,12 +114,9 @@ class file_source : public datasource { return datasource::buffer::create(std::move(out_data)); } - [[nodiscard]] size_t size() const override { return _file.size(); } + [[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); } protected: - detail::file_wrapper _file; - - private: kvikio::FileHandle _kvikio_file; }; @@ -141,9 +131,9 @@ class memory_mapped_source : public file_source { explicit memory_mapped_source(char const* filepath, size_t offset, size_t max_size_estimate) : file_source(filepath) { - if (_file.size() != 0) { + if (_kvikio_file.nbytes() != 0) { // Memory mapping is not exclusive, so we can include the whole region we expect to read - map(_file.desc(), offset, max_size_estimate); + map(_kvikio_file.fd(), offset, max_size_estimate); } } @@ -155,7 +145,7 @@ class memory_mapped_source : public file_source { std::unique_ptr host_read(size_t offset, size_t size) override { // Clamp length to available data - auto const read_size = std::min(size, +_file.size() - offset); + auto const read_size = std::min(size, +_kvikio_file.nbytes() - offset); // If the requested range is outside of the mapped region, read from the file if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { @@ -179,7 +169,7 @@ class memory_mapped_source : public file_source { size_t host_read(size_t offset, size_t size, uint8_t* dst) override { // Clamp length to available data - auto const read_size = std::min(size, +_file.size() - offset); + auto const read_size = std::min(size, +_kvikio_file.nbytes() - offset); // If the requested range is outside of the mapped region, read from the file if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { @@ -194,12 +184,14 @@ class memory_mapped_source : public file_source { private: void map(int fd, size_t offset, size_t size) { - CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file", std::overflow_error); + CUDF_EXPECTS(offset < _kvikio_file.nbytes(), "Offset is past end of file", std::overflow_error); // Offset for `mmap()` must be page aligned _map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1); - if (size == 0 || (offset + size) > _file.size()) { size = _file.size() - offset; } + if (size == 0 || (offset + size) > _kvikio_file.nbytes()) { + size = _kvikio_file.nbytes() - offset; + } // Size for `mmap()` needs to include the page padding _map_size = size + (offset - _map_offset); diff --git a/cpp/src/io/utilities/file_io_utilities.cpp b/cpp/src/io/utilities/file_io_utilities.cpp index d03ead3e5d6..6ef3d643d9b 100644 --- a/cpp/src/io/utilities/file_io_utilities.cpp +++ b/cpp/src/io/utilities/file_io_utilities.cpp @@ -34,24 +34,6 @@ namespace cudf { namespace io { namespace detail { -namespace { - -[[nodiscard]] int open_file_checked(std::string const& filepath, int flags, mode_t mode) -{ - auto const fd = open(filepath.c_str(), flags, mode); - if (fd == -1) { throw_on_file_open_failure(filepath, flags & O_CREAT); } - - return fd; -} - -[[nodiscard]] size_t get_file_size(int file_descriptor) -{ - struct stat st {}; - CUDF_EXPECTS(fstat(file_descriptor, &st) != -1, "Cannot query file size"); - return static_cast(st.st_size); -} - -} // namespace void force_init_cuda_context() { @@ -61,31 +43,6 @@ void force_init_cuda_context() cudaFree(nullptr); } -[[noreturn]] void throw_on_file_open_failure(std::string const& filepath, bool is_create) -{ - // save errno because it may be overwritten by subsequent calls - auto const err = errno; - - if (auto const path = std::filesystem::path(filepath); is_create) { - CUDF_EXPECTS(std::filesystem::exists(path.parent_path()), - "Cannot create output file; directory does not exist"); - - } else { - CUDF_EXPECTS(std::filesystem::exists(path), "Cannot open file; it does not exist"); - } - - std::array error_msg_buffer{}; - auto const error_msg = strerror_r(err, error_msg_buffer.data(), 1024); - CUDF_FAIL("Cannot open file; failed with errno: " + std::string{error_msg}); -} - -file_wrapper::file_wrapper(std::string const& filepath, int flags, mode_t mode) - : fd(open_file_checked(filepath.c_str(), flags, mode)), _size{get_file_size(fd)} -{ -} - -file_wrapper::~file_wrapper() { close(fd); } - } // namespace detail } // namespace io } // namespace cudf diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp index 5f5b02dca4d..9be05997d3a 100644 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ b/cpp/src/io/utilities/file_io_utilities.hpp @@ -28,33 +28,9 @@ namespace cudf { namespace io { namespace detail { -[[noreturn]] void throw_on_file_open_failure(std::string const& filepath, bool is_create); - // Call before any cuFile API calls to ensure the CUDA context is initialized. void force_init_cuda_context(); -/** - * @brief Class that provides RAII for file handling. - */ -class file_wrapper { - int fd = -1; - size_t _size = 0; - - public: - explicit file_wrapper(std::string const& filepath, int flags, mode_t mode = 0); - ~file_wrapper(); - [[nodiscard]] auto size() const { return _size; } - [[nodiscard]] auto desc() const { return fd; } -}; - -/** - * @brief Byte range to be read/written in a single operation. - */ -CUDF_EXPORT struct file_io_slice { - size_t offset; - size_t size; -}; - } // namespace detail } // namespace io } // namespace cudf From 2d621c7d796de204447dc71b6c3eb610bf1ed3d9 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Sat, 18 Jan 2025 08:34:02 -0500 Subject: [PATCH 4/9] Revise comment --- cpp/src/io/utilities/data_sink.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index abbef4ce410..2c0ec379f44 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -53,7 +53,8 @@ class file_sink : public data_sink { void flush() override { - // NOOP. _kvikio_file write is unbuffered and does not need flush. + // KvikIO's pwrite() is a system call that reaches the kernel buffer. Flushing the application + // buffer is therefore not needed. } size_t bytes_written() override { return _bytes_written; } From 1a06b26445ca2064913475020f7b8aa30589c837 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Sat, 18 Jan 2025 16:36:40 -0500 Subject: [PATCH 5/9] Remove LIBCUDF_CUFILE_POLICY --- cpp/include/cudf/io/config_utils.hpp | 9 ++------- cpp/src/io/utilities/config_utils.cpp | 24 +++--------------------- cpp/src/io/utilities/data_sink.cpp | 2 +- cpp/src/io/utilities/datasource.cpp | 2 +- 4 files changed, 7 insertions(+), 30 deletions(-) diff --git a/cpp/include/cudf/io/config_utils.hpp b/cpp/include/cudf/io/config_utils.hpp index 50b7f54069e..13953280c80 100644 --- a/cpp/include/cudf/io/config_utils.hpp +++ b/cpp/include/cudf/io/config_utils.hpp @@ -19,12 +19,7 @@ namespace CUDF_EXPORT cudf { namespace io { -namespace cufile_integration { - -/** - * @brief Returns true if KvikIO is enabled. - */ -bool is_kvikio_enabled(); +namespace kvikio_integration { /** * @brief Set KvikIO parameters, including: @@ -35,7 +30,7 @@ bool is_kvikio_enabled(); */ void set_up_kvikio(); -} // namespace cufile_integration +} // namespace kvikio_integration namespace nvcomp_integration { diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index 89b9f8a48c4..08de5a35af8 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -24,26 +24,7 @@ namespace cudf::io { -namespace cufile_integration { - -namespace { -/** - * @brief Defines which cuFile usage to enable. - */ -enum class usage_policy : uint8_t { KVIKIO }; - -/** - * @brief Get the current usage policy. - */ -usage_policy get_env_policy() -{ - static auto const env_val = getenv_or("LIBCUDF_CUFILE_POLICY", "KVIKIO"); - if (env_val == "KVIKIO") return usage_policy::KVIKIO; - CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val); -} -} // namespace - -bool is_kvikio_enabled() { return get_env_policy() == usage_policy::KVIKIO; } +namespace kvikio_integration { void set_up_kvikio() { @@ -56,7 +37,8 @@ void set_up_kvikio() kvikio::defaults::thread_pool_nthreads_reset(nthreads); }); } -} // namespace cufile_integration + +} // namespace kvikio_integration namespace nvcomp_integration { diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 2c0ec379f44..eff011e030e 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -36,7 +36,7 @@ class file_sink : public data_sink { explicit file_sink(std::string const& filepath) { detail::force_init_cuda_context(); - cufile_integration::set_up_kvikio(); + kvikio_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath, "w"); CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.", _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index ea98c7b953c..8f3771ec1cc 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -52,7 +52,7 @@ class file_source : public datasource { explicit file_source(char const* filepath) { detail::force_init_cuda_context(); - cufile_integration::set_up_kvikio(); + kvikio_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath, "r"); CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode %s.", _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); From b3266385333e3479deb8b09cf7b0dfb67736c519 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 20 Jan 2025 14:30:47 -0500 Subject: [PATCH 6/9] Update --- cpp/include/cudf/io/data_sink.hpp | 4 +- cpp/src/io/utilities/data_sink.cpp | 9 ++-- docs/cudf/source/user_guide/io/io.md | 62 +++++++++++----------------- 3 files changed, 32 insertions(+), 43 deletions(-) diff --git a/cpp/include/cudf/io/data_sink.hpp b/cpp/include/cudf/io/data_sink.hpp index e1eb9c042c7..fe10f46d6b1 100644 --- a/cpp/include/cudf/io/data_sink.hpp +++ b/cpp/include/cudf/io/data_sink.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -122,7 +122,7 @@ class data_sink { * * In the case where the sink type is itself a memory buffered write, this ends up * being effectively a second memcpy. So a useful optimization for a "smart" - * custom data_sink is to do it's own internal management of the movement + * custom data_sink is to do its own internal management of the movement * of data between cpu and gpu; turning the internals of the writer into simply * * sink->device_write(device_buffer, size) diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index eff011e030e..48090e6bdf1 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -53,8 +53,9 @@ class file_sink : public data_sink { void flush() override { - // KvikIO's pwrite() is a system call that reaches the kernel buffer. Flushing the application - // buffer is therefore not needed. + // kvikio::FileHandle::pwrite() makes system calls that reach the kernel buffer cache. This + // process does not involve application buffer. Therefore calls to ::fflush() or + // ofstream::flush() do not apply. } size_t bytes_written() override { return _bytes_written; } @@ -77,7 +78,7 @@ class file_sink : public data_sink { // KvikIO's `pwrite()` returns a `std::future` so we convert it // to `std::future` - return std::async(std::launch::deferred, [this, gpu_data, size, offset] { + return std::async(std::launch::deferred, [this, gpu_data, size, offset]() -> void { _kvikio_file.pwrite(gpu_data, size, offset).get(); }); } @@ -141,7 +142,7 @@ class void_sink : public data_sink { rmm::cuda_stream_view stream) override { _bytes_written += size; - return std::async(std::launch::deferred, [] {}); + return std::async(std::launch::deferred, []() -> void {}); } void flush() override {} diff --git a/docs/cudf/source/user_guide/io/io.md b/docs/cudf/source/user_guide/io/io.md index 859f5efa384..9b3a53f6867 100644 --- a/docs/cudf/source/user_guide/io/io.md +++ b/docs/cudf/source/user_guide/io/io.md @@ -80,7 +80,10 @@ IO format. - \[¹\] - Not all orientations are GPU-accelerated. - \[²\] - Not GPU-accelerated. -## Magnum IO GPUDirect Storage Integration +## KvikIO Integration + +cuDF leverages the [KvikIO](https://github.com/rapidsai/kvikio) library for high-performance +I/O features, such as parallel I/O operations and Nvidia Magnum IO GPUDirect Storage (GDS). Many IO APIs can use Magnum IO GPUDirect Storage (GDS) library to optimize IO operations. GDS enables a direct data path for direct memory access @@ -91,33 +94,27 @@ SDK is available for download [here](https://developer.nvidia.com/gpudirect-storage). GDS is also included in CUDA Toolkit 11.4 and higher. -Use of GPUDirect Storage in cuDF is disabled by default, but can be -enabled through the environment variable `LIBCUDF_CUFILE_POLICY` and `KVIKIO_COMPAT_MODE`. -This variable also controls the GDS compatibility mode. - -There are two valid values for the environment variable: - -- "KVIKIO": Enable GDS compatibility mode through [KvikIO](https://github.com/rapidsai/kvikio). -Note that KvikIO also provides the environment variable `KVIKIO_COMPAT_MODE` for GDS -control that may alter the effect of "KVIKIO" option in cuDF: - - By default, `KVIKIO_COMPAT_MODE` is unset. In this case, cuDF enforces - the GDS compatibility mode, and the system configuration check for GDS I/O - is never performed. - - If `KVIKIO_COMPAT_MODE=ON`, this is the same with the above case. - - If `KVIKIO_COMPAT_MODE=OFF`, KvikIO enforces GDS I/O without system - configuration check, and will error out if GDS requirements are not met. The - only exceptional case is that if the system does not support files being - opened with the `O_DIRECT` flag, the GDS compatibility mode will be used. -- "OFF": Completely disable GDS and KvikIO use. - -If no value is set, behavior will be the same as the "KVIKIO" option. - -This environment variable also affects how cuDF treats GDS errors. - -- When `LIBCUDF_CUFILE_POLICY` is set to "KVIKIO" and a KvikIO API - call fails for any reason (unlikely, given that KvikIO implements - its own compatibility mode) cuDF throws an exception to propagate - the error to the user. +Use of GDS in cuDF is controlled by KvikIO's environment variable `KVIKIO_COMPAT_MODE`. It has +3 options (case-insensitive): + +- `ON` (aliases: `TRUE`, `YES`, `1`): Enable the compatibility mode, which enforces KvikIO POSIX I/O path. + This is the default option in cuDF. +- `OFF` (aliases: `FALSE`, `NO`, `0`): Force-enable KvikIO cuFile (the underlying API for GDS) I/O path. + GDS will be activated if the system requirements for cuFile are met and cuFile is properly + configured. However, if the system is not suited for cuFile, I/O operations under the `OFF` + option may error out, crash or hang. +- `AUTO`: Try KvikIO cuFile I/O path first, and fall back to KvikIO POSIX I/O if the system requirements + for cuFile are not met. + +Note that: +- Even if KvikIO cuFile I/O path is taken, it is possible that GDS is still not activated, where cuFile falls back + to its internal compatibility mode. This will happen, for example, on an ext4 file system whose journaling + mode has not been explicitly set to `data=ordered`. This may also happen if cuFile's environment variable + `CUFILE_FORCE_COMPAT_MODE` is set to true. For more details, refer to + [cuFile compatibility mode](https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html#cufile-compatibility-mode) + and [cuFile environment variables](https://docs.nvidia.com/gpudirect-storage/troubleshooting-guide/index.html#environment-variables). +- Details of the GDS system requirements can be found in the [GDS documentation](https://docs.nvidia.com/gpudirect-storage/index.html). +- If a KvikIO API call fails for any reason, cuDF throws an exception to propagate the error to the user. For more information about error handling, compatibility mode, and tuning parameters in KvikIO see: @@ -133,15 +130,6 @@ Operations that support the use of GPUDirect Storage: - {py:meth}`cudf.DataFrame.to_parquet` - {py:meth}`cudf.DataFrame.to_orc` -Several parameters that can be used to tune the performance of -GDS-enabled I/O are exposed through environment variables: - -- `LIBCUDF_CUFILE_THREAD_COUNT`: Integral value, maximum number of - parallel reads/writes per file (default 16); -- `LIBCUDF_CUFILE_SLICE_SIZE`: Integral value, maximum size of each - GDS read/write, in bytes (default 4MB). Larger I/O operations are - split into multiple calls. - ## nvCOMP Integration Some types of compression/decompression can be performed using either From e1b26e2113a5ab579bac3a6d640f1bb1492426aa Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 4 Feb 2025 15:50:12 -0500 Subject: [PATCH 7/9] Address review comments --- cpp/CMakeLists.txt | 1 - cpp/src/io/utilities/config_utils.cpp | 5 +++ cpp/src/io/utilities/data_sink.cpp | 6 +-- cpp/src/io/utilities/datasource.cpp | 23 +++++------ cpp/src/io/utilities/file_io_utilities.cpp | 48 ---------------------- cpp/src/io/utilities/file_io_utilities.hpp | 36 ---------------- docs/cudf/source/user_guide/io/io.md | 6 +-- 7 files changed, 19 insertions(+), 106 deletions(-) delete mode 100644 cpp/src/io/utilities/file_io_utilities.cpp delete mode 100644 cpp/src/io/utilities/file_io_utilities.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a9d70cfa09f..ab4fb7a7015 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -548,7 +548,6 @@ add_library( src/io/utilities/data_casting.cu src/io/utilities/data_sink.cpp src/io/utilities/datasource.cpp - src/io/utilities/file_io_utilities.cpp src/io/utilities/row_selection.cpp src/io/utilities/type_inference.cu src/io/utilities/trie.cu diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index 08de5a35af8..46816604918 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -30,6 +30,11 @@ void set_up_kvikio() { static std::once_flag flag{}; std::call_once(flag, [] { + // Workaround for https://github.com/rapidsai/cudf/issues/14140, where cuFileDriverOpen errors + // out if no CUDA calls have been made before it. This is a no-op if the CUDA context is already + // initialized. + cudaFree(nullptr); + auto const compat_mode = kvikio::getenv_or("KVIKIO_COMPAT_MODE", kvikio::CompatMode::ON); kvikio::defaults::compat_mode_reset(compat_mode); diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 48090e6bdf1..e8a05f431bd 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -14,8 +14,6 @@ * limitations under the License. */ -#include "file_io_utilities.hpp" - #include #include #include @@ -35,9 +33,9 @@ class file_sink : public data_sink { public: explicit file_sink(std::string const& filepath) { - detail::force_init_cuda_context(); kvikio_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath, "w"); + CUDF_EXPECTS(!_kvikio_file.closed(), "KvikIO did not open the file successfully."); CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.", _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } @@ -60,7 +58,7 @@ class file_sink : public data_sink { size_t bytes_written() override { return _bytes_written; } - [[nodiscard]] bool supports_device_write() const override { return !_kvikio_file.closed(); } + [[nodiscard]] bool supports_device_write() const override { return true; } [[nodiscard]] bool is_device_write_preferred(size_t size) const override { diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 8f3771ec1cc..f41a2f8e8d9 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -14,7 +14,6 @@ * limitations under the License. */ -#include "file_io_utilities.hpp" #include "getenv_or.hpp" #include @@ -51,9 +50,9 @@ class file_source : public datasource { public: explicit file_source(char const* filepath) { - detail::force_init_cuda_context(); kvikio_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath, "r"); + CUDF_EXPECTS(_kvikio_file.closed(), "KvikIO did not open the file successfully."); CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode %s.", _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } @@ -61,7 +60,7 @@ class file_source : public datasource { std::unique_ptr host_read(size_t offset, size_t size) override { // Clamp length to available data - auto const read_size = std::min(size, _kvikio_file.nbytes() - offset); + auto const read_size = std::min(size, this->size() - offset); std::vector v(read_size); CUDF_EXPECTS(_kvikio_file.pread(v.data(), read_size, offset).get() == read_size, "read failed"); return buffer::create(std::move(v)); @@ -70,14 +69,14 @@ class file_source : public datasource { size_t host_read(size_t offset, size_t size, uint8_t* dst) override { // Clamp length to available data - auto const read_size = std::min(size, _kvikio_file.nbytes() - offset); + auto const read_size = std::min(size, this->size() - offset); CUDF_EXPECTS(_kvikio_file.pread(dst, read_size, offset).get() == read_size, "read failed"); return read_size; } ~file_source() override = default; - [[nodiscard]] bool supports_device_read() const override { return !_kvikio_file.closed(); } + [[nodiscard]] bool supports_device_read() const override { return true; } [[nodiscard]] bool is_device_read_preferred(size_t size) const override { @@ -91,7 +90,7 @@ class file_source : public datasource { { CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - auto const read_size = std::min(size, _kvikio_file.nbytes() - offset); + auto const read_size = std::min(size, this->size() - offset); return _kvikio_file.pread(dst, read_size, offset); } @@ -131,7 +130,7 @@ class memory_mapped_source : public file_source { explicit memory_mapped_source(char const* filepath, size_t offset, size_t max_size_estimate) : file_source(filepath) { - if (_kvikio_file.nbytes() != 0) { + if (size() != 0) { // Memory mapping is not exclusive, so we can include the whole region we expect to read map(_kvikio_file.fd(), offset, max_size_estimate); } @@ -145,7 +144,7 @@ class memory_mapped_source : public file_source { std::unique_ptr host_read(size_t offset, size_t size) override { // Clamp length to available data - auto const read_size = std::min(size, +_kvikio_file.nbytes() - offset); + auto const read_size = std::min(size, this->size() - offset); // If the requested range is outside of the mapped region, read from the file if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { @@ -169,7 +168,7 @@ class memory_mapped_source : public file_source { size_t host_read(size_t offset, size_t size, uint8_t* dst) override { // Clamp length to available data - auto const read_size = std::min(size, +_kvikio_file.nbytes() - offset); + auto const read_size = std::min(size, this->size() - offset); // If the requested range is outside of the mapped region, read from the file if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { @@ -184,14 +183,12 @@ class memory_mapped_source : public file_source { private: void map(int fd, size_t offset, size_t size) { - CUDF_EXPECTS(offset < _kvikio_file.nbytes(), "Offset is past end of file", std::overflow_error); + CUDF_EXPECTS(offset < this->size(), "Offset is past end of file", std::overflow_error); // Offset for `mmap()` must be page aligned _map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1); - if (size == 0 || (offset + size) > _kvikio_file.nbytes()) { - size = _kvikio_file.nbytes() - offset; - } + if (size == 0 || (offset + size) > this->size()) { size = this->size() - offset; } // Size for `mmap()` needs to include the page padding _map_size = size + (offset - _map_offset); diff --git a/cpp/src/io/utilities/file_io_utilities.cpp b/cpp/src/io/utilities/file_io_utilities.cpp deleted file mode 100644 index 6ef3d643d9b..00000000000 --- a/cpp/src/io/utilities/file_io_utilities.cpp +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "file_io_utilities.hpp" - -#include "getenv_or.hpp" - -#include -#include -#include - -#include -#include - -#include -#include -#include -#include -#include - -namespace cudf { -namespace io { -namespace detail { - -void force_init_cuda_context() -{ - // Workaround for https://github.com/rapidsai/cudf/issues/14140, where cuFileDriverOpen errors - // out if no CUDA calls have been made before it. This is a no-op if the CUDA context is already - // initialized. - cudaFree(nullptr); -} - -} // namespace detail -} // namespace io -} // namespace cudf diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp deleted file mode 100644 index 9be05997d3a..00000000000 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include - -#include - -namespace cudf { -namespace io { -namespace detail { - -// Call before any cuFile API calls to ensure the CUDA context is initialized. -void force_init_cuda_context(); - -} // namespace detail -} // namespace io -} // namespace cudf diff --git a/docs/cudf/source/user_guide/io/io.md b/docs/cudf/source/user_guide/io/io.md index 9b3a53f6867..3db2eb4b829 100644 --- a/docs/cudf/source/user_guide/io/io.md +++ b/docs/cudf/source/user_guide/io/io.md @@ -88,9 +88,7 @@ I/O features, such as parallel I/O operations and Nvidia Magnum IO GPUDirect Sto Many IO APIs can use Magnum IO GPUDirect Storage (GDS) library to optimize IO operations. GDS enables a direct data path for direct memory access (DMA) transfers between GPU memory and storage, which avoids a bounce -buffer through the CPU. GDS also has a compatibility mode that allows -the library to fall back to copying through a CPU bounce buffer. The -SDK is available for download +buffer through the CPU. The SDK is available for download [here](https://developer.nvidia.com/gpudirect-storage). GDS is also included in CUDA Toolkit 11.4 and higher. @@ -102,7 +100,7 @@ Use of GDS in cuDF is controlled by KvikIO's environment variable `KVIKIO_COMPAT - `OFF` (aliases: `FALSE`, `NO`, `0`): Force-enable KvikIO cuFile (the underlying API for GDS) I/O path. GDS will be activated if the system requirements for cuFile are met and cuFile is properly configured. However, if the system is not suited for cuFile, I/O operations under the `OFF` - option may error out, crash or hang. + option may error out. - `AUTO`: Try KvikIO cuFile I/O path first, and fall back to KvikIO POSIX I/O if the system requirements for cuFile are not met. From 343228af35ead67c8da2573b27e6691220e4fafb Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 4 Feb 2025 15:52:57 -0500 Subject: [PATCH 8/9] Minor fix --- cpp/src/io/utilities/datasource.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index f41a2f8e8d9..5568ca0ab10 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -130,7 +130,7 @@ class memory_mapped_source : public file_source { explicit memory_mapped_source(char const* filepath, size_t offset, size_t max_size_estimate) : file_source(filepath) { - if (size() != 0) { + if (this->size() != 0) { // Memory mapping is not exclusive, so we can include the whole region we expect to read map(_kvikio_file.fd(), offset, max_size_estimate); } From 3e6b780e09baa2453b593eaf905722575be319b5 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 4 Feb 2025 22:06:48 -0500 Subject: [PATCH 9/9] Quick fix --- cpp/src/io/utilities/datasource.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 5568ca0ab10..14b6bc6f774 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -52,7 +52,7 @@ class file_source : public datasource { { kvikio_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath, "r"); - CUDF_EXPECTS(_kvikio_file.closed(), "KvikIO did not open the file successfully."); + CUDF_EXPECTS(!_kvikio_file.closed(), "KvikIO did not open the file successfully."); CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode %s.", _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); }