diff --git a/.gitignore b/.gitignore index 7854792f2..2db77ed8c 100644 --- a/.gitignore +++ b/.gitignore @@ -144,3 +144,6 @@ ENV/ # mypy .mypy_cache/ + +# RMM log files +rmm_log.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index 76af92110..e60b6df80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## New Features + - PR #608 Add stream wrapper type + ## Improvements - PR #599 Make the arena memory resource work better with the producer/consumer mode diff --git a/README.md b/README.md index d25325cd6..eb7ef276f 100644 --- a/README.md +++ b/README.md @@ -185,10 +185,10 @@ freeing device memory. It has two key functions: -1. `void* device_memory_resource::allocate(std::size_t bytes, cudaStream_t s)` +1. `void* device_memory_resource::allocate(std::size_t bytes, cuda_stream_view s)` - Returns a pointer to an allocation of at least `bytes` bytes. -2. `void device_memory_resource::deallocate(void* p, std::size_t bytes, cudaStream_t s)` +2. `void device_memory_resource::deallocate(void* p, std::size_t bytes, cuda_stream_view s)` - Reclaims a previous allocation of size `bytes` pointed to by `p`. - `p` *must* have been returned by a previous call to `allocate(bytes)`, otherwise behavior is undefined @@ -198,9 +198,21 @@ It is up to a derived class to provide implementations of these functions. See Unlike `std::pmr::memory_resource`, `rmm::mr::device_memory_resource` does not allow specifying an alignment argument. All allocations are required to be aligned to at least 256B. Furthermore, -`device_memory_resource` adds an additional `cudaStream_t` argument to allow specifying the stream +`device_memory_resource` adds an additional `cuda_stream_view` argument to allow specifying the stream on which to perform the (de)allocation. +## `cuda_stream_view` and `cuda_stream` + +`rmm::cuda_stream_view` is a simple non-owning wrapper around a CUDA `cudaStream_t`. This wrapper's +purpose is to provide strong type safety for stream types. (`cudaStream_t` is an alias for a pointer, +which can lead to ambiguity in APIs when it is assigned `0`.) All RMM stream-ordered APIs take a +`rmm::cuda_stream_view` argument. + +`rmm::cuda_stream` is a simple owning wrapper around a CUDA `cudaStream_t`. This class provides +RAII semantics (constructor creates the CUDA stream, destructor destroys it). An `rmm::cuda_stream` +can never represent the CUDA default stream or per-thread default stream, it only ever represents +a single non-default stream. `rmm::cuda_stream` cannot be copied but can be moved. + ### Thread Safety All current device memory resources are thread safe unless documented otherwise. More specifically, @@ -335,11 +347,11 @@ An untyped, unintialized RAII class for stream ordered device memory allocation. #### Example ```c++ -cudaStream_t s; +cuda_stream_view s{...}; rmm::device_buffer b{100,s}; // Allocates at least 100 bytes on stream `s` using the *default* resource void* p = b.data(); // Raw, untyped pointer to underlying device memory -kernel<<<..., s>>>(b.data()); // `b` is only safe to use on `s` +kernel<<<..., s.value()>>>(b.data()); // `b` is only safe to use on `s` rmm::mr::device_memory_resource * mr = new my_custom_resource{...}; rmm::device_buffer b2{100, s, mr}; // Allocates at least 100 bytes on stream `s` using the explicitly provided resource @@ -353,9 +365,9 @@ contained elements. This optimization restricts the types `T` to trivially copya #### Example ```c++ -cudaStream_t s; +cuda_stream_view s{...}; rmm::device_uvector v(100, s); /// Allocates uninitialized storage for 100 `int32_t` elements on stream `s` using the default resource -thrust::uninitialized_fill(thrust::cuda::par.on(s), v.begin(), v.end(), int32_t{0}); // Initializes the elements to 0 +thrust::uninitialized_fill(thrust::cuda::par.on(s.value()), v.begin(), v.end(), int32_t{0}); // Initializes the elements to 0 rmm::mr::device_memory_resource * mr = new my_custom_resource{...}; rmm::device_vector v2{100, s, mr}; // Allocates uninitialized storage for 100 `int32_t` elements on stream `s` using the explicitly provided resource @@ -368,11 +380,11 @@ modifying the value in device memory from the host, or retrieving the value from #### Example ```c++ -cudaStream_t s; +cuda_stream_view s{...}; rmm::device_scalar a{s}; // Allocates uninitialized storage for a single `int32_t` in device memory a.set_value(42, s); // Updates the value in device memory to `42` on stream `s` -kernel<<<...,s>>>(a.data()); // Pass raw pointer to underlying element in device memory +kernel<<<...,s.value()>>>(a.data()); // Pass raw pointer to underlying element in device memory int32_t v = a.value(s); // Retrieves the value from device to host on stream `s` ``` diff --git a/benchmarks/device_uvector/device_uvector_bench.cu b/benchmarks/device_uvector/device_uvector_bench.cu index 47943b242..214ab1d0e 100644 --- a/benchmarks/device_uvector/device_uvector_bench.cu +++ b/benchmarks/device_uvector/device_uvector_bench.cu @@ -30,7 +30,7 @@ static void BM_UvectorSizeConstruction(benchmark::State& state) rmm::mr::set_current_device_resource(&mr); for (auto _ : state) { - rmm::device_uvector vec(state.range(0), cudaStream_t{0}); + rmm::device_uvector vec(state.range(0), rmm::cuda_stream_view{}); cudaDeviceSynchronize(); } diff --git a/benchmarks/random_allocations/random_allocations.cpp b/benchmarks/random_allocations/random_allocations.cpp index 2a91f3634..98d6ea6ba 100644 --- a/benchmarks/random_allocations/random_allocations.cpp +++ b/benchmarks/random_allocations/random_allocations.cpp @@ -62,7 +62,7 @@ void random_allocation_free(rmm::mr::device_memory_resource& mr, SizeDistribution size_distribution, size_t num_allocations, size_t max_usage, // in MiB - cudaStream_t stream = 0) + rmm::cuda_stream_view stream = {}) { std::default_random_engine generator; @@ -139,7 +139,7 @@ void uniform_random_allocations(rmm::mr::device_memory_resource& mr, size_t num_allocations, size_t max_allocation_size, // in MiB size_t max_usage, - cudaStream_t stream = 0) + rmm::cuda_stream_view stream = {}) { std::uniform_int_distribution size_distribution(1, max_allocation_size * size_mb); random_allocation_free(mr, size_distribution, num_allocations, max_usage, stream); @@ -151,7 +151,7 @@ void uniform_random_allocations(rmm::mr::device_memory_resource& mr, size_t mean_allocation_size = 500, // in MiB size_t stddev_allocation_size = 500, // in MiB size_t max_usage = 8 << 20, - cudaStream_t stream) { + cuda_stream_view stream) { std::normal_distribution size_distribution(, max_allocation_size * size_mb); }*/ diff --git a/benchmarks/replay/replay.cpp b/benchmarks/replay/replay.cpp index d0c28d135..6b159ce08 100644 --- a/benchmarks/replay/replay.cpp +++ b/benchmarks/replay/replay.cpp @@ -242,8 +242,7 @@ std::vector> parse_per_thread_events(std::string RMM_EXPECTS(std::all_of(all_events.begin(), all_events.end(), [](auto const& e) { - return (e.stream == cudaStreamDefault) or - (e.stream == reinterpret_cast(cudaStreamPerThread)); + return e.stream.is_default() or e.stream.is_per_thread_default(); }), "Non-default streams not currently supported."); diff --git a/benchmarks/utilities/log_parser.hpp b/benchmarks/utilities/log_parser.hpp index 84c88ce44..d299c3e7c 100644 --- a/benchmarks/utilities/log_parser.hpp +++ b/benchmarks/utilities/log_parser.hpp @@ -21,6 +21,7 @@ #include #include "rapidcsv.h" +#include "rmm/cuda_stream_view.hpp" #include #include @@ -50,25 +51,26 @@ struct event { event(action a, std::size_t s, uintptr_t p) : act{a}, size{s}, pointer{p} {} - event(std::size_t tid, action a, std::size_t sz, uintptr_t p, uintptr_t s, std::size_t i) + event( + std::size_t tid, action a, std::size_t sz, uintptr_t p, rmm::cuda_stream_view s, std::size_t i) : thread_id{tid}, act{a}, size{sz}, pointer{p}, stream{s}, index{i} { } - event(std::size_t tid, action a, std::size_t sz, void* p, uintptr_t s, std::size_t i) + event(std::size_t tid, action a, std::size_t sz, void* p, rmm::cuda_stream_view s, std::size_t i) : event{tid, a, sz, reinterpret_cast(p), s, i} { } friend std::ostream& operator<<(std::ostream& os, event const& e); - action act{}; ///< Indicates if the event is an allocation or a free - std::size_t size{}; ///< The size of the memory allocated or freed - uintptr_t pointer{}; ///< The pointer returned from an allocation, or the - ///< pointer freed - std::size_t thread_id; ///< ID of the thread that initiated the event - uintptr_t stream; ///< Numeric representation of the CUDA stream on which the event occurred - std::size_t index; ///< Original ordering index of the event + action act{}; ///< Indicates if the event is an allocation or a free + std::size_t size{}; ///< The size of the memory allocated or freed + uintptr_t pointer{}; ///< The pointer returned from an allocation, or the + ///< pointer freed + std::size_t thread_id; ///< ID of the thread that initiated the event + rmm::cuda_stream_view stream; ///< The CUDA stream on which the event occurred + std::size_t index; ///< Original ordering index of the event }; inline std::ostream& operator<<(std::ostream& os, event const& e) @@ -127,11 +129,25 @@ inline std::vector parse_csv(std::string const& filename) { rapidcsv::Document csv(filename, rapidcsv::LabelParams(0, -1)); - std::vector tids = csv.GetColumn("Thread"); - std::vector actions = csv.GetColumn("Action"); - std::vector pointers = csv.GetColumn("Pointer"); - std::vector sizes = csv.GetColumn("Size"); - std::vector streams = csv.GetColumn("Stream"); + std::vector tids = csv.GetColumn("Thread"); + std::vector actions = csv.GetColumn("Action"); + + auto parse_pointer = [](std::string const& s, uintptr_t& ptr) { + ptr = std::stoll(s, nullptr, 16); + }; + + std::vector pointers = csv.GetColumn("Pointer", parse_pointer); + std::vector sizes = csv.GetColumn("Size"); + + auto parse_stream = [](std::string const& s, rmm::cuda_stream_view& stream) { + cudaStream_t cs; + uintptr_t ls = std::stoll(s); + std::memcpy(&cs, &ls, sizeof(cudaStream_t)); + stream = rmm::cuda_stream_view{cs}; + }; + + std::vector streams = + csv.GetColumn("Stream", parse_stream); auto const size_list = {tids.size(), actions.size(), pointers.size(), streams.size()}; @@ -146,10 +162,10 @@ inline std::vector parse_csv(std::string const& filename) auto const& a = actions[i]; RMM_EXPECTS((a == "allocate") or (a == "free"), "Invalid action string."); auto act = (a == "allocate") ? action::ALLOCATE : action::FREE; - events[i] = event{tids[i], act, sizes[i], hex_string_to_int(pointers[i]), streams[i], i}; + events[i] = event{tids[i], act, sizes[i], pointers[i], streams[i], i}; } return events; -} +} // namespace detail } // namespace detail } // namespace rmm diff --git a/benchmarks/utilities/simulated_memory_resource.hpp b/benchmarks/utilities/simulated_memory_resource.hpp index ba8a2243e..67883ad5d 100644 --- a/benchmarks/utilities/simulated_memory_resource.hpp +++ b/benchmarks/utilities/simulated_memory_resource.hpp @@ -74,7 +74,7 @@ class simulated_memory_resource final : public device_memory_resource { * @param bytes The size, in bytes, of the allocation * @return void* Pointer to the newly allocated memory */ - void* do_allocate(std::size_t bytes, cudaStream_t) override + void* do_allocate(std::size_t bytes, cuda_stream_view) override { RMM_EXPECTS(begin_ + bytes <= end_, rmm::bad_alloc, "Simulated memory size exceeded"); auto p = static_cast(begin_); @@ -91,7 +91,7 @@ class simulated_memory_resource final : public device_memory_resource { * * @param p Pointer to be deallocated */ - void do_deallocate(void* p, std::size_t, cudaStream_t) override {} + void do_deallocate(void* p, std::size_t, cuda_stream_view) override {} /** * @brief Get free and available memory for memory resource. @@ -99,7 +99,7 @@ class simulated_memory_resource final : public device_memory_resource { * @param stream to execute on. * @return std::pair containing free_size and total_size of memory. */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { return std::make_pair(0, 0); } diff --git a/conda/recipes/librmm/meta.yaml b/conda/recipes/librmm/meta.yaml index 328360011..eba6cf8f8 100644 --- a/conda/recipes/librmm/meta.yaml +++ b/conda/recipes/librmm/meta.yaml @@ -34,19 +34,24 @@ test: commands: - test -f $PREFIX/include/rmm/thrust_rmm_allocator.h - test -f $PREFIX/include/rmm/logger.hpp + - test -f $PREFIX/include/rmm/cuda_stream.hpp + - test -f $PREFIX/include/rmm/cuda_stream_view.hpp - test -f $PREFIX/include/rmm/device_uvector.hpp - test -f $PREFIX/include/rmm/device_scalar.hpp - test -f $PREFIX/include/rmm/device_buffer.hpp - test -f $PREFIX/include/rmm/detail/aligned.hpp - test -f $PREFIX/include/rmm/detail/error.hpp + - test -f $PREFIX/include/rmm/mr/device/detail/arena.hpp - test -f $PREFIX/include/rmm/mr/device/detail/free_list.hpp - test -f $PREFIX/include/rmm/mr/device/detail/coalescing_free_list.hpp - test -f $PREFIX/include/rmm/mr/device/detail/fixed_size_free_list.hpp - test -f $PREFIX/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp + - test -f $PREFIX/include/rmm/mr/device/arena_memory_resource.hpp - test -f $PREFIX/include/rmm/mr/device/binning_memory_resource.hpp - test -f $PREFIX/include/rmm/mr/device/cuda_memory_resource.hpp - test -f $PREFIX/include/rmm/mr/device/device_memory_resource.hpp - test -f $PREFIX/include/rmm/mr/device/fixed_size_memory_resource.hpp + - test -f $PREFIX/include/rmm/mr/device/limiting_resource_adaptor.hpp - test -f $PREFIX/include/rmm/mr/device/logging_resource_adaptor.hpp - test -f $PREFIX/include/rmm/mr/device/managed_memory_resource.hpp - test -f $PREFIX/include/rmm/mr/device/owning_wrapper.hpp diff --git a/include/rmm/cuda_stream.hpp b/include/rmm/cuda_stream.hpp new file mode 100644 index 000000000..10d944c8f --- /dev/null +++ b/include/rmm/cuda_stream.hpp @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +#include + +namespace rmm { + +/** + * @brief Owning wrapper for a CUDA stream. + * + * Provides RAII lifetime semantics for a CUDA stream. + */ +class cuda_stream { + public: + /** + * @brief Move constructor (default) + * + * A moved-from cuda_stream is invalid and it is Undefined Behavior to call methods that access + * the owned stream. + */ + cuda_stream(cuda_stream&&) = default; + /** + * @brief Move copy assignment operator (default) + * + * A moved-from cuda_stream is invalid and it is Undefined Behavior to call methods that access + * the owned stream. + */ + cuda_stream& operator=(cuda_stream&&) = default; + ~cuda_stream() = default; + cuda_stream(cuda_stream const&) = delete; // Copying disallowed: one stream one owner + cuda_stream& operator=(cuda_stream&) = delete; + + /** + * @brief Construct a new cuda stream object + * + * @throw rmm::cuda_error if stream creation fails + */ + cuda_stream() + : stream_{[]() { + cudaStream_t* s = new cudaStream_t; + RMM_CUDA_TRY(cudaStreamCreate(s)); + return s; + }(), + [](cudaStream_t* s) { + RMM_ASSERT_CUDA_SUCCESS(cudaStreamDestroy(*s)); + delete s; + }} + { + } + + /** + * @brief Returns true if the owned stream is non-null + * + * @return true If the owned stream has not been explicitly moved and is therefore non-null. + * @return false If the owned stream has been explicitly moved and is therefore null. + */ + bool is_valid() const { return stream_ != nullptr; } + + /** + * @brief Get the value of the wrapped CUDA stream. + * + * @return cudaStream_t The wrapped CUDA stream. + */ + cudaStream_t value() const + { + RMM_LOGGING_ASSERT(is_valid()); + return *stream_; + } + + /** + * @brief Explicit conversion to cudaStream_t. + */ + explicit operator cudaStream_t() const noexcept { return value(); } + + /** + * @brief Creates an immutable, non-owning view of the wrapped CUDA stream. + * + * @return rmm::cuda_stream_view The view of the CUDA stream + */ + cuda_stream_view view() const { return cuda_stream_view{value()}; } + + /** + * @brief Implicit conversion to cuda_stream_view + * + * @return A view of the owned stream + */ + operator cuda_stream_view() const { return view(); } + + /** + * @brief Synchronize the owned CUDA stream. + * + * Calls `cudaStreamSynchronize()`. + * + * @throw rmm::cuda_error if stream synchronization fails + */ + void synchronize() const { RMM_CUDA_TRY(cudaStreamSynchronize(value())); } + + /** + * @brief Synchronize the owned CUDA stream. Does not throw if there is an error. + * + * Calls `cudaStreamSynchronize()` and asserts if there is an error. + */ + void synchronize_no_throw() const noexcept + { + RMM_ASSERT_CUDA_SUCCESS(cudaStreamSynchronize(value())); + } + + private: + std::unique_ptr> stream_; +}; + +} // namespace rmm diff --git a/include/rmm/cuda_stream_view.hpp b/include/rmm/cuda_stream_view.hpp new file mode 100644 index 000000000..a588b06ad --- /dev/null +++ b/include/rmm/cuda_stream_view.hpp @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +#include +#include +#include + +namespace rmm { + +/** + * @brief Strongly-typed non-owning wrapper for CUDA streams with default constructor. + * + * This wrapper is simply a "view": it does not own the lifetime of the stream it wraps. + */ +class cuda_stream_view { + public: + constexpr cuda_stream_view() = default; + constexpr cuda_stream_view(cuda_stream_view const&) = default; + constexpr cuda_stream_view(cuda_stream_view&&) = default; + constexpr cuda_stream_view& operator=(cuda_stream_view const&) = default; + constexpr cuda_stream_view& operator=(cuda_stream_view&&) = default; + ~cuda_stream_view() = default; + + // TODO disable construction from 0 after cuDF and others adopt cuda_stream_view + // cuda_stream_view(int) = delete; //< Prevent cast from 0 + // cuda_stream_view(std::nullptr_t) = delete; //< Prevent cast from nullptr + // TODO also disable implicit conversion from cudaStream_t + + /** + * @brief Implicit conversion from cudaStream_t. + */ + constexpr cuda_stream_view(cudaStream_t stream) noexcept : stream_{stream} {} + + /** + * @brief Get the wrapped stream. + * + * @return cudaStream_t The wrapped stream. + */ + constexpr cudaStream_t value() const noexcept { return stream_; } + + /** + * @brief Explicit conversion to cudaStream_t. + */ + explicit constexpr operator cudaStream_t() const noexcept { return value(); } + + /** + * @brief Return true if the wrapped stream is the CUDA per-thread default stream. + */ + bool is_per_thread_default() const noexcept + { +#ifdef CUDA_API_PER_THREAD_DEFAULT_STREAM + return value() == cudaStreamPerThread || value() == 0; +#else + return value() == cudaStreamPerThread; +#endif + } + + /** + * @brief Return true if the wrapped stream is explicitly the CUDA legacy default stream. + */ + bool is_default() const noexcept + { +#ifdef CUDA_API_PER_THREAD_DEFAULT_STREAM + return value() == cudaStreamLegacy; +#else + return value() == cudaStreamLegacy || value() == 0; +#endif + } + + /** + * @brief Synchronize the viewed CUDA stream. + * + * Calls `cudaStreamSynchronize()`. + * + * @throw rmm::cuda_error if stream synchronization fails + */ + void synchronize() const { RMM_CUDA_TRY(cudaStreamSynchronize(stream_)); } + + /** + * @brief Synchronize the viewed CUDA stream. Does not throw if there is an error. + * + * Calls `cudaStreamSynchronize()` and asserts if there is an error. + */ + void synchronize_no_throw() const noexcept + { + RMM_ASSERT_CUDA_SUCCESS(cudaStreamSynchronize(stream_)); + } + + private: + cudaStream_t stream_{0}; +}; + +/** + * @brief Static cuda_stream_view of the default stream (stream 0), for convenience + */ +static cuda_stream_view cuda_stream_default{}; + +/** + * @brief Static cuda_stream_view of cudaStreamLegacy, for convenience + */ +static cuda_stream_view cuda_stream_legacy{cudaStreamLegacy}; + +/** + * @brief Static cuda_stream_view of cudaStreamPerThread, for convenience + */ +static cuda_stream_view cuda_stream_per_thread{cudaStreamPerThread}; + +/** + * @brief Equality comparison operator for streams + * + * @param lhs The first stream view to compare + * @param rhs The second stream view to compare + * @return true if equal, false if unequal + */ +inline bool operator==(cuda_stream_view lhs, cuda_stream_view rhs) +{ + return lhs.value() == rhs.value(); +} + +/** + * @brief Inequality comparison operator for streams + * + * @param lhs The first stream view to compare + * @param rhs The second stream view to compare + * @return true if unequal, false if equal + */ +inline bool operator!=(cuda_stream_view lhs, cuda_stream_view rhs) { return not(lhs == rhs); } + +/** + * @brief Output stream operator for printing / logging streams + * + * @param os The output ostream + * @param sv The cuda_stream_view to output + * @return std::ostream& The output ostream + */ +inline std::ostream& operator<<(std::ostream& os, cuda_stream_view sv) +{ + os << sv.value(); + return os; +} + +} // namespace rmm diff --git a/include/rmm/device_buffer.hpp b/include/rmm/device_buffer.hpp index c8237bcc5..97095f3b2 100644 --- a/include/rmm/device_buffer.hpp +++ b/include/rmm/device_buffer.hpp @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include #include @@ -46,7 +47,7 @@ namespace rmm { * // allocates at least 100 bytes using the custom memory resource and * // specified stream * custom_memory_resource mr; - * cudaStream_t stream = 0; + * cuda_stream_view stream = cuda_stream_view{}; * device_buffer custom_buff(100, stream, &mr); * * // deep copies `buff` into a new device buffer using the default stream @@ -94,7 +95,7 @@ class device_buffer { * @param mr Memory resource to use for the device memory allocation. */ explicit device_buffer(std::size_t size, - cudaStream_t stream = 0, + cuda_stream_view stream = cuda_stream_view{}, mr::device_memory_resource* mr = mr::get_current_device_resource()) : _stream{stream}, _mr{mr} { @@ -117,7 +118,7 @@ class device_buffer { */ device_buffer(void const* source_data, std::size_t size, - cudaStream_t stream = 0, + cuda_stream_view stream = cuda_stream_view{}, mr::device_memory_resource* mr = mr::get_current_device_resource()) : _stream{stream}, _mr{mr} { @@ -142,7 +143,7 @@ class device_buffer { * @param mr The resource to use for allocating the new `device_buffer` */ device_buffer(device_buffer const& other, - cudaStream_t stream = 0, + cuda_stream_view stream = cuda_stream_view{}, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) : device_buffer{other.data(), other.size(), stream, mr} { @@ -171,7 +172,7 @@ class device_buffer { other._data = nullptr; other._size = 0; other._capacity = 0; - other.set_stream(0); + other.set_stream(cuda_stream_view{}); } /** @@ -241,7 +242,7 @@ class device_buffer { other._data = nullptr; other._size = 0; other._capacity = 0; - other.set_stream(0); + other.set_stream(cuda_stream_view{}); } return *this; } @@ -257,7 +258,7 @@ class device_buffer { { deallocate(); _mr = nullptr; - _stream = 0; + _stream = cuda_stream_view{}; } /** @@ -285,7 +286,7 @@ class device_buffer { * @param new_size The requested new size, in bytes * @param stream The stream to use for allocation and copy */ - void resize(std::size_t new_size, cudaStream_t stream = 0) + void resize(std::size_t new_size, cuda_stream_view stream = cuda_stream_view{}) { set_stream(stream); // If the requested size is smaller than the current capacity, just update @@ -294,7 +295,8 @@ class device_buffer { _size = new_size; } else { void* const new_data = _mr->allocate(new_size, this->stream()); - RMM_CUDA_TRY(cudaMemcpyAsync(new_data, data(), size(), cudaMemcpyDefault, this->stream())); + RMM_CUDA_TRY( + cudaMemcpyAsync(new_data, data(), size(), cudaMemcpyDefault, this->stream().value())); deallocate(); _data = new_data; _size = new_size; @@ -315,7 +317,7 @@ class device_buffer { * * @param stream The stream on which the allocation and copy are performed */ - void shrink_to_fit(cudaStream_t stream = 0) + void shrink_to_fit(cuda_stream_view stream = cuda_stream_view{}) { set_stream(stream); if (size() != capacity()) { @@ -362,7 +364,7 @@ class device_buffer { /** * @brief Returns stream most recently specified for allocation/deallocation */ - cudaStream_t stream() const noexcept { return _stream; } + cuda_stream_view stream() const noexcept { return _stream; } /** * @brief Sets the stream to be used for deallocation @@ -374,7 +376,7 @@ class device_buffer { * called after this, the later stream parameter will be stored and used in * the destructor. */ - void set_stream(cudaStream_t stream) noexcept { _stream = stream; } + void set_stream(cuda_stream_view stream) noexcept { _stream = stream; } /** * @brief Returns pointer to the memory resource used to allocate and @@ -383,10 +385,10 @@ class device_buffer { mr::device_memory_resource* memory_resource() const noexcept { return _mr; } private: - void* _data{nullptr}; ///< Pointer to device memory allocation - std::size_t _size{}; ///< Requested size of the device memory allocation - std::size_t _capacity{}; ///< The actual size of the device memory allocation - cudaStream_t _stream{}; ///< Stream to use for device memory deallocation + void* _data{nullptr}; ///< Pointer to device memory allocation + std::size_t _size{}; ///< Requested size of the device memory allocation + std::size_t _capacity{}; ///< The actual size of the device memory allocation + cuda_stream_view _stream{}; ///< Stream to use for device memory deallocation mr::device_memory_resource* _mr{ mr::get_current_device_resource()}; ///< The memory resource used to ///< allocate/deallocate device memory @@ -440,7 +442,7 @@ class device_buffer { if (bytes > 0) { RMM_EXPECTS(nullptr != source, "Invalid copy from nullptr."); - RMM_CUDA_TRY(cudaMemcpyAsync(_data, source, bytes, cudaMemcpyDefault, stream())); + RMM_CUDA_TRY(cudaMemcpyAsync(_data, source, bytes, cudaMemcpyDefault, stream().value())); } } }; diff --git a/include/rmm/device_scalar.hpp b/include/rmm/device_scalar.hpp index 274bd049f..5615121c7 100644 --- a/include/rmm/device_scalar.hpp +++ b/include/rmm/device_scalar.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -51,7 +52,7 @@ class device_scalar { * @param mr Optional, resource with which to allocate. */ explicit device_scalar( - cudaStream_t stream, + cuda_stream_view const &stream, rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource()) : buffer{sizeof(T), stream, mr} { @@ -75,7 +76,7 @@ class device_scalar { */ explicit device_scalar( T const &initial_value, - cudaStream_t stream = 0, + cuda_stream_view const &stream = cuda_stream_view{}, rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource()) : buffer{sizeof(T), stream, mr} { @@ -95,7 +96,7 @@ class device_scalar { * @param mr The resource to use for allocating the new `device_scalar` */ device_scalar(device_scalar const &other, - cudaStream_t stream = 0, + cuda_stream_view const &stream = {}, rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource()) : buffer{other.buffer, stream, mr} { @@ -117,11 +118,11 @@ class device_scalar { * @return T The value of the scalar. * @param stream CUDA stream on which to perform the copy and synchronize. */ - T value(cudaStream_t stream = 0) const + T value(cuda_stream_view const &stream = cuda_stream_view{}) const { T host_value{}; _memcpy(&host_value, buffer.data(), stream); - RMM_CUDA_TRY(cudaStreamSynchronize(stream)); + stream.synchronize(); return host_value; } @@ -160,11 +161,11 @@ class device_scalar { * @param stream CUDA stream on which to perform the copy */ template - auto set_value(T const &host_value, cudaStream_t stream = 0) + auto set_value(T const &host_value, cuda_stream_view const &stream = cuda_stream_view{}) -> std::enable_if_t::value, Placeholder> { if (host_value == T{0}) { - RMM_CUDA_TRY(cudaMemsetAsync(buffer.data(), 0, sizeof(T), stream)); + RMM_CUDA_TRY(cudaMemsetAsync(buffer.data(), 0, sizeof(T), stream.value())); } else { _memcpy(buffer.data(), &host_value, stream); } @@ -205,7 +206,7 @@ class device_scalar { * @param stream CUDA stream on which to perform the copy */ template - auto set_value(T const &host_value, cudaStream_t stream = 0) + auto set_value(T const &host_value, cuda_stream_view const &stream = cuda_stream_view{}) -> std::enable_if_t::value, Placeholder> { _memcpy(buffer.data(), &host_value, stream); @@ -240,9 +241,9 @@ class device_scalar { private: rmm::device_buffer buffer{sizeof(T)}; - inline void _memcpy(void *dst, const void *src, cudaStream_t stream) const + inline void _memcpy(void *dst, const void *src, cuda_stream_view const &stream) const { - RMM_CUDA_TRY(cudaMemcpyAsync(dst, src, sizeof(T), cudaMemcpyDefault, stream)); + RMM_CUDA_TRY(cudaMemcpyAsync(dst, src, sizeof(T), cudaMemcpyDefault, stream.value())); } }; } // namespace rmm diff --git a/include/rmm/device_uvector.hpp b/include/rmm/device_uvector.hpp index 599ddbc27..1b22cb587 100644 --- a/include/rmm/device_uvector.hpp +++ b/include/rmm/device_uvector.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -38,7 +39,7 @@ namespace rmm { * Example: * @code * rmm::mr::device_memory_resource * mr = new my_custom_resource(); - * cudaStream_t s; + * rmm::cuda_stream_view s{}; * * // Allocates *uninitialized* device memory on stream `s` sufficient for 100 ints using the * // supplied resource `mr` @@ -109,7 +110,7 @@ class device_uvector { */ explicit device_uvector( std::size_t size, - cudaStream_t stream, + cuda_stream_view stream, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) : _storage{elements_to_bytes(size), stream, mr} { @@ -126,7 +127,7 @@ class device_uvector { */ explicit device_uvector( device_uvector const& other, - cudaStream_t stream, + cuda_stream_view stream, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) : _storage{other._storage, stream, mr} { @@ -189,12 +190,13 @@ class device_uvector { * @param v The value to copy to the specified element * @param s The stream on which to perform the copy */ - void set_element(std::size_t element_index, T const& v, cudaStream_t s) + void set_element(std::size_t element_index, T const& v, cuda_stream_view s) { RMM_EXPECTS( element_index < size(), rmm::out_of_range, "Attempt to access out of bounds element."); - RMM_CUDA_TRY(cudaMemcpyAsync(element_ptr(element_index), &v, sizeof(v), cudaMemcpyDefault, s)); - RMM_CUDA_TRY(cudaStreamSynchronize(s)); + RMM_CUDA_TRY( + cudaMemcpyAsync(element_ptr(element_index), &v, sizeof(v), cudaMemcpyDefault, s.value())); + s.synchronize_no_throw(); } /** @@ -226,11 +228,12 @@ class device_uvector { * @param v The value to copy to the specified element * @param s The stream on which to perform the copy */ - void set_element_async(std::size_t element_index, value_type const& v, cudaStream_t s) + void set_element_async(std::size_t element_index, value_type const& v, cuda_stream_view s) { RMM_EXPECTS( element_index < size(), rmm::out_of_range, "Attempt to access out of bounds element."); - RMM_CUDA_TRY(cudaMemcpyAsync(element_ptr(element_index), &v, sizeof(v), cudaMemcpyDefault, s)); + RMM_CUDA_TRY( + cudaMemcpyAsync(element_ptr(element_index), &v, sizeof(v), cudaMemcpyDefault, s.value())); } /** @@ -245,13 +248,14 @@ class device_uvector { * @param s The stream on which to perform the copy * @return The value of the specified element */ - value_type element(std::size_t element_index, cudaStream_t s) const + value_type element(std::size_t element_index, cuda_stream_view s) const { RMM_EXPECTS( element_index < size(), rmm::out_of_range, "Attempt to access out of bounds element."); value_type v; - RMM_CUDA_TRY(cudaMemcpyAsync(&v, element_ptr(element_index), sizeof(v), cudaMemcpyDefault, s)); - RMM_CUDA_TRY(cudaStreamSynchronize(s)); + RMM_CUDA_TRY( + cudaMemcpyAsync(&v, element_ptr(element_index), sizeof(v), cudaMemcpyDefault, s.value())); + s.synchronize(); return v; } @@ -265,7 +269,7 @@ class device_uvector { * @param s The stream on which to perform the copy * @return The value of the first element */ - value_type front_element(cudaStream_t s) const { return element(0, s); } + value_type front_element(cuda_stream_view s) const { return element(0, s); } /** * @brief Returns the last element. @@ -277,7 +281,7 @@ class device_uvector { * @param s The stream on which to perform the copy * @return The value of the last element */ - value_type back_element(cudaStream_t s) const { return element(size() - 1, s); } + value_type back_element(cuda_stream_view s) const { return element(size() - 1, s); } /** * @brief Resizes the vector to contain `new_size` elements. @@ -295,7 +299,7 @@ class device_uvector { * @param new_size The desired number of elements * @param stream The stream on which to perform the allocation/copy (if any) */ - void resize(std::size_t new_size, cudaStream_t stream) + void resize(std::size_t new_size, cuda_stream_view stream) { _storage.resize(elements_to_bytes(new_size), stream); } @@ -307,7 +311,7 @@ class device_uvector { * * @param stream Stream on which to perform allocation and copy */ - void shrink_to_fit(cudaStream_t stream) { _storage.shrink_to_fit(stream); } + void shrink_to_fit(cuda_stream_view stream) { _storage.shrink_to_fit(stream); } /** * @brief Release ownership of device memory storage. diff --git a/include/rmm/mr/device/arena_memory_resource.hpp b/include/rmm/mr/device/arena_memory_resource.hpp index 4cc20b258..b46e590c9 100644 --- a/include/rmm/mr/device/arena_memory_resource.hpp +++ b/include/rmm/mr/device/arena_memory_resource.hpp @@ -127,7 +127,7 @@ class arena_memory_resource final : public device_memory_resource { * @param stream The stream to associate this allocation with. * @return void* Pointer to the newly allocated memory. */ - void* do_allocate(std::size_t bytes, cudaStream_t stream) override + void* do_allocate(std::size_t bytes, cuda_stream_view stream) override { if (bytes <= 0) return nullptr; @@ -143,7 +143,7 @@ class arena_memory_resource final : public device_memory_resource { * value of `bytes` that was passed to the `allocate` call that returned `p`. * @param stream Stream on which to perform deallocation. */ - void do_deallocate(void* p, std::size_t bytes, cudaStream_t stream) override + void do_deallocate(void* p, std::size_t bytes, cuda_stream_view stream) override { if (p == nullptr || bytes <= 0) return; @@ -161,9 +161,9 @@ class arena_memory_resource final : public device_memory_resource { * value of `bytes` that was passed to the `allocate` call that returned `p`. * @param stream Stream on which to perform deallocation. */ - void deallocate_from_other_arena(void* p, std::size_t bytes, cudaStream_t stream) + void deallocate_from_other_arena(void* p, std::size_t bytes, cuda_stream_view stream) { - RMM_ASSERT_CUDA_SUCCESS(cudaStreamSynchronize(stream)); + stream.synchronize_no_throw(); read_lock lock(mtx_); @@ -178,7 +178,7 @@ class arena_memory_resource final : public device_memory_resource { for (auto& kv : stream_arenas_) { // If the arena does not belong to the current stream, try to deallocate from it, and return // if successful. - if (kv.first != stream && kv.second.deallocate(p, bytes)) return; + if (stream != kv.first && kv.second.deallocate(p, bytes)) return; } } @@ -193,7 +193,7 @@ class arena_memory_resource final : public device_memory_resource { * @param stream The stream associated with the arena. * @return arena& The arena associated with the current thread or the given stream. */ - arena& get_arena(cudaStream_t stream) + arena& get_arena(cuda_stream_view stream) { if (use_per_thread_arena(stream)) { return get_thread_arena(); @@ -229,18 +229,18 @@ class arena_memory_resource final : public device_memory_resource { * * @return arena& The arena associated with the given stream. */ - arena& get_stream_arena(cudaStream_t stream) + arena& get_stream_arena(cuda_stream_view stream) { RMM_LOGGING_ASSERT(!use_per_thread_arena(stream)); { read_lock lock(mtx_); - auto const it = stream_arenas_.find(stream); + auto const it = stream_arenas_.find(stream.value()); if (it != stream_arenas_.end()) { return it->second; } } { write_lock lock(mtx_); - stream_arenas_.emplace(stream, global_arena_); - return stream_arenas_.at(stream); + stream_arenas_.emplace(stream.value(), global_arena_); + return stream_arenas_.at(stream.value()); } } @@ -250,7 +250,7 @@ class arena_memory_resource final : public device_memory_resource { * @param stream to execute on. * @return std::pair containing free_size and total_size of memory. */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { return std::make_pair(0, 0); } @@ -261,13 +261,9 @@ class arena_memory_resource final : public device_memory_resource { * @param stream to check. * @return true if per-thread arena should be used, false otherwise. */ - static bool use_per_thread_arena(cudaStream_t stream) + static bool use_per_thread_arena(cuda_stream_view stream) { -#ifdef CUDA_API_PER_THREAD_DEFAULT_STREAM - return stream == cudaStreamDefault || stream == cudaStreamPerThread; -#else - return stream == cudaStreamPerThread; -#endif + return stream.is_per_thread_default(); } /// The global arena to allocate superblocks from. diff --git a/include/rmm/mr/device/binning_memory_resource.hpp b/include/rmm/mr/device/binning_memory_resource.hpp index c385b2f70..2e69f28c9 100644 --- a/include/rmm/mr/device/binning_memory_resource.hpp +++ b/include/rmm/mr/device/binning_memory_resource.hpp @@ -175,7 +175,7 @@ class binning_memory_resource final : public device_memory_resource { * @param stream Stream on which to perform allocation * @return void* Pointer to the newly allocated memory */ - void* do_allocate(std::size_t bytes, cudaStream_t stream) override + void* do_allocate(std::size_t bytes, cuda_stream_view stream) override { if (bytes <= 0) return nullptr; return get_resource(bytes)->allocate(bytes, stream); @@ -191,7 +191,7 @@ class binning_memory_resource final : public device_memory_resource { * value of `bytes` that was passed to the `allocate` call that returned `p`. * @param stream Stream on which to perform deallocation */ - void do_deallocate(void* p, std::size_t bytes, cudaStream_t stream) override + void do_deallocate(void* p, std::size_t bytes, cuda_stream_view stream) override { auto res = get_resource(bytes); if (res != nullptr) res->deallocate(p, bytes, stream); @@ -205,7 +205,7 @@ class binning_memory_resource final : public device_memory_resource { * @param stream the stream being executed on * @return std::pair with available and free memory for resource */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { return std::make_pair(0, 0); } diff --git a/include/rmm/mr/device/cuda_memory_resource.hpp b/include/rmm/mr/device/cuda_memory_resource.hpp index 9a7e4c8de..f3ebf7e93 100644 --- a/include/rmm/mr/device/cuda_memory_resource.hpp +++ b/include/rmm/mr/device/cuda_memory_resource.hpp @@ -17,6 +17,7 @@ #include "device_memory_resource.hpp" +#include #include namespace rmm { @@ -62,7 +63,7 @@ class cuda_memory_resource final : public device_memory_resource { * @param bytes The size, in bytes, of the allocation * @return void* Pointer to the newly allocated memory */ - void* do_allocate(std::size_t bytes, cudaStream_t) override + void* do_allocate(std::size_t bytes, cuda_stream_view) override { void* p{nullptr}; RMM_CUDA_TRY(cudaMalloc(&p, bytes), rmm::bad_alloc); @@ -78,7 +79,7 @@ class cuda_memory_resource final : public device_memory_resource { * * @param p Pointer to be deallocated */ - void do_deallocate(void* p, std::size_t, cudaStream_t) override + void do_deallocate(void* p, std::size_t, cuda_stream_view) override { RMM_ASSERT_CUDA_SUCCESS(cudaFree(p)); } @@ -107,7 +108,7 @@ class cuda_memory_resource final : public device_memory_resource { * * @return std::pair contaiing free_size and total_size of memory */ - std::pair do_get_mem_info(cudaStream_t) const override + std::pair do_get_mem_info(cuda_stream_view) const override { std::size_t free_size; std::size_t total_size; diff --git a/include/rmm/mr/device/detail/arena.hpp b/include/rmm/mr/device/detail/arena.hpp index c454f32ca..0bef810b4 100644 --- a/include/rmm/mr/device/detail/arena.hpp +++ b/include/rmm/mr/device/detail/arena.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include #include @@ -458,7 +459,7 @@ class arena { * @param stream Stream on which to perform deallocation. * @return true if the allocation is found, false otherwise. */ - bool deallocate(void* p, std::size_t bytes, cudaStream_t stream) + bool deallocate(void* p, std::size_t bytes, cuda_stream_view stream) { lock_guard lock(mtx_); auto const b = free_block(p, bytes); @@ -564,12 +565,12 @@ class arena { * @param b The block that can be used to shrink the arena. * @param stream Stream on which to perform shrinking. */ - void shrink_arena(block const& b, cudaStream_t stream) + void shrink_arena(block const& b, cuda_stream_view stream) { // Don't shrink if b is not a superblock. if (!b.is_superblock()) return; - RMM_CUDA_TRY(cudaStreamSynchronize(stream)); + stream.synchronize_no_throw(); global_arena_.deallocate(b); free_blocks_.erase(b); diff --git a/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp b/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp index 7a0c8ea28..bb8ea358f 100644 --- a/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp +++ b/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp @@ -67,7 +67,7 @@ struct crtp { * documented separately: * * 1. `size_t get_maximum_allocation_size() const` - * 2. `block_type expand_pool(size_t size, free_list& blocks, cudaStream_t stream)` + * 2. `block_type expand_pool(size_t size, free_list& blocks, cuda_stream_view stream)` * 3. `split_block allocate_from_block(block_type const& b, size_t size)` * 4. `block_type free_block(void* p, size_t size) noexcept` */ @@ -117,7 +117,7 @@ class stream_ordered_memory_resource : public crtp, public device_ * @param stream The stream on which the memory is to be used. * @return block_type a block of at least `size` bytes */ - // block_type expand_pool(size_t size, free_list& blocks, cudaStream_t stream) + // block_type expand_pool(size_t size, free_list& blocks, cuda_stream_view stream) /// Struct representing a block that has been split for allocation struct split_block { @@ -155,12 +155,12 @@ class stream_ordered_memory_resource : public crtp, public device_ * @param b The block to insert into the pool. * @param stream The stream on which the memory was last used. */ - void insert_block(block_type const& b, cudaStream_t stream) + void insert_block(block_type const& b, cuda_stream_view stream) { stream_free_blocks_[get_event(stream)].insert(b); } - void insert_blocks(free_list&& blocks, cudaStream_t stream) + void insert_blocks(free_list&& blocks, cuda_stream_view stream) { stream_free_blocks_[get_event(stream)].insert(std::move(blocks)); } @@ -201,9 +201,9 @@ class stream_ordered_memory_resource : public crtp, public device_ * @param stream The stream to associate this allocation with * @return void* Pointer to the newly allocated memory */ - virtual void* do_allocate(std::size_t bytes, cudaStream_t stream) override + virtual void* do_allocate(std::size_t bytes, cuda_stream_view stream) override { - RMM_LOG_TRACE("[A][stream {:p}][{}B]", static_cast(stream), bytes); + RMM_LOG_TRACE("[A][stream {:p}][{}B]", fmt::ptr(stream.value()), bytes); if (bytes <= 0) return nullptr; @@ -219,9 +219,9 @@ class stream_ordered_memory_resource : public crtp, public device_ auto split = this->underlying().allocate_from_block(b, bytes); if (split.remainder.is_valid()) stream_free_blocks_[stream_event].insert(split.remainder); RMM_LOG_TRACE("[A][stream {:p}][{}B][{:p}]", - static_cast(stream_event.stream), + fmt::ptr(stream_event.stream), bytes, - static_cast(split.allocated_pointer)); + fmt::ptr(split.allocated_pointer)); log_summary_trace(); @@ -235,11 +235,11 @@ class stream_ordered_memory_resource : public crtp, public device_ * * @param p Pointer to be deallocated */ - virtual void do_deallocate(void* p, std::size_t bytes, cudaStream_t stream) override + virtual void do_deallocate(void* p, std::size_t bytes, cuda_stream_view stream) override { lock_guard lock(mtx_); auto stream_event = get_event(stream); - RMM_LOG_TRACE("[D][stream {:p}][{}B][{:p}]", static_cast(stream_event.stream), bytes, p); + RMM_LOG_TRACE("[D][stream {:p}][{}B][{:p}]", fmt::ptr(stream_event.stream), bytes, p); bytes = rmm::detail::align_up(bytes, allocation_alignment); auto const b = this->underlying().free_block(p, bytes); @@ -247,7 +247,7 @@ class stream_ordered_memory_resource : public crtp, public device_ // TODO: cudaEventRecord has significant overhead on deallocations. For the non-PTDS case // we may be able to delay recording the event in some situations. But using events rather than // streams allows stealing from deleted streams. - RMM_ASSERT_CUDA_SUCCESS(cudaEventRecord(stream_event.event, stream)); + RMM_ASSERT_CUDA_SUCCESS(cudaEventRecord(stream_event.event, stream.value())); stream_free_blocks_[stream_event].insert(b); @@ -277,36 +277,28 @@ class stream_ordered_memory_resource : public crtp, public device_ * @param stream The stream for which to get an event. * @return The stream_event for `stream`. */ - stream_event_pair get_event(cudaStream_t stream) + stream_event_pair get_event(cuda_stream_view stream) { -#ifdef CUDA_API_PER_THREAD_DEFAULT_STREAM - if (cudaStreamDefault == stream || cudaStreamPerThread == stream) { -#else - if (cudaStreamPerThread == stream) { -#endif + if (stream.is_per_thread_default()) { // Create a thread-local shared event wrapper. Shared pointers in the thread and in each MR // instance ensures it is destroyed cleaned up only after all are finished with it. thread_local auto event_tls = std::make_shared(); default_stream_events.insert(event_tls); - return stream_event_pair{stream, event_tls.get()->event}; + return stream_event_pair{stream.value(), event_tls.get()->event}; } -#ifndef CUDA_API_PER_THREAD_DEFAULT_STREAM // We use cudaStreamLegacy as the event map key for the default stream for consistency between // PTDS and non-PTDS mode. In PTDS mode, the cudaStreamLegacy map key will only exist if the // user explicitly passes it, so it is used as the default location for the free list // at construction. For consistency, the same key is used for null stream free lists in non-PTDS // mode. - else if (cudaStreamDefault == stream) { - stream = cudaStreamLegacy; - } -#endif + auto const stream_to_store = stream.is_default() ? cudaStreamLegacy : stream.value(); - auto iter = stream_events_.find(stream); + auto const iter = stream_events_.find(stream_to_store); return (iter != stream_events_.end()) ? iter->second : [&]() { - stream_event_pair stream_event{stream}; + stream_event_pair stream_event{stream_to_store}; RMM_ASSERT_CUDA_SUCCESS( cudaEventCreateWithFlags(&stream_event.event, cudaEventDisableTiming)); - stream_events_[stream] = stream_event; + stream_events_[stream_to_store] = stream_event; return stream_event; }(); } @@ -345,19 +337,17 @@ class stream_ordered_memory_resource : public crtp, public device_ log_summary_trace(); // no large enough blocks available after merging, so grow the pool - return this->underlying().expand_pool(size, blocks, stream_event.stream); + return this->underlying().expand_pool(size, blocks, cuda_stream_view{stream_event.stream}); } /** * @brief Find a free block of at least `size` bytes in a `free_list` with a different * stream/event than `stream_event`. * - * If an appropriate block is found in a free list F associated with event E, if - * `CUDA_API_PER_THREAD_DEFAULT_STREAM` is defined, `stream_event.stream` will be made to wait - * on event E. Otherwise, the stream associated with free list F will be synchronized. In either - * case all other blocks in free list F will be moved to the free list associated with - * `stream_event.stream`. This results in coalescing with other blocks in that free list, - * hopefully reducing fragmentation. + * If an appropriate block is found in a free list F associated with event E, + * `stream_event.stream` will be made to wait on event E. All other blocks in free list F will be + * moved to the free list associated with `stream_event.stream`. This results in coalescing with + * other blocks in that free list, hopefully reducing fragmentation. * * @param size The requested size of the allocation. * @param stream_event The stream and associated event on which the allocation is being @@ -382,9 +372,9 @@ class stream_ordered_memory_resource : public crtp, public device_ merge_lists(stream_event, blocks, other_event, std::move(other_blocks)); RMM_LOG_DEBUG("[A][Stream {:p}][{}B][Merged stream {:p}]", - static_cast(stream_event.stream), + fmt::ptr(stream_event.stream), size, - static_cast(it->first.stream)); + fmt::ptr(it->first.stream)); stream_free_blocks_.erase(it); @@ -397,9 +387,9 @@ class stream_ordered_memory_resource : public crtp, public device_ if (b.is_valid()) { RMM_LOG_DEBUG((merge_first) ? "[A][Stream {:p}][{}B][Found after merging stream {:p}]" : "[A][Stream {:p}][{}B][Taken from stream {:p}]", - static_cast(stream_event.stream), + fmt::ptr(stream_event.stream), size, - static_cast(it->first.stream)); + fmt::ptr(it->first.stream)); if (not merge_first) { merge_lists(stream_event, blocks, other_event, std::move(other_blocks)); diff --git a/include/rmm/mr/device/device_memory_resource.hpp b/include/rmm/mr/device/device_memory_resource.hpp index e201ed7b0..4b5011d1d 100644 --- a/include/rmm/mr/device/device_memory_resource.hpp +++ b/include/rmm/mr/device/device_memory_resource.hpp @@ -15,12 +15,11 @@ */ #pragma once -#include +#include #include -#include -// forward decl -using cudaStream_t = struct CUstream_st*; +#include +#include namespace rmm { @@ -100,7 +99,7 @@ class device_memory_resource { * @param stream Stream on which to perform allocation * @return void* Pointer to the newly allocated memory */ - void* allocate(std::size_t bytes, cudaStream_t stream = 0) + void* allocate(std::size_t bytes, cuda_stream_view stream = cuda_stream_view{}) { return do_allocate(rmm::detail::align_up(bytes, 8), stream); } @@ -123,7 +122,7 @@ class device_memory_resource { * value of `bytes` that was passed to the `allocate` call that returned `p`. * @param stream Stream on which to perform deallocation */ - void deallocate(void* p, std::size_t bytes, cudaStream_t stream = 0) + void deallocate(void* p, std::size_t bytes, cuda_stream_view stream = cuda_stream_view{}) { do_deallocate(p, rmm::detail::align_up(bytes, 8), stream); } @@ -166,7 +165,7 @@ class device_memory_resource { * @returns a std::pair which contains free memory in bytes * in .first and total amount of memory in .second */ - std::pair get_mem_info(cudaStream_t stream) const + std::pair get_mem_info(cuda_stream_view stream) const { return do_get_mem_info(stream); } @@ -184,7 +183,7 @@ class device_memory_resource { * @param stream Stream on which to perform allocation * @return void* Pointer to the newly allocated memory */ - virtual void* do_allocate(std::size_t bytes, cudaStream_t stream) = 0; + virtual void* do_allocate(std::size_t bytes, cuda_stream_view stream) = 0; /** * @brief Deallocate memory pointed to by \p p. @@ -197,7 +196,7 @@ class device_memory_resource { * value of `bytes` that was passed to the `allocate` call that returned `p`. * @param stream Stream on which to perform deallocation */ - virtual void do_deallocate(void* p, std::size_t bytes, cudaStream_t stream) = 0; + virtual void do_deallocate(void* p, std::size_t bytes, cuda_stream_view stream) = 0; /** * @brief Compare this resource to another. @@ -226,7 +225,7 @@ class device_memory_resource { * @param stream the stream being executed on * @return std::pair with available and free memory for resource */ - virtual std::pair do_get_mem_info(cudaStream_t stream) const = 0; + virtual std::pair do_get_mem_info(cuda_stream_view stream) const = 0; }; } // namespace mr } // namespace rmm diff --git a/include/rmm/mr/device/fixed_size_memory_resource.hpp b/include/rmm/mr/device/fixed_size_memory_resource.hpp index f00314493..25e2af1cf 100644 --- a/include/rmm/mr/device/fixed_size_memory_resource.hpp +++ b/include/rmm/mr/device/fixed_size_memory_resource.hpp @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include #include @@ -145,7 +146,7 @@ class fixed_size_memory_resource * @param stream The stream on which the memory is to be used. * @return block_type The allocated block */ - block_type expand_pool(size_t size, free_list& blocks, cudaStream_t stream) + block_type expand_pool(size_t size, free_list& blocks, cuda_stream_view stream) { blocks.insert(std::move(blocks_from_upstream(stream))); return blocks.get_block(size); @@ -158,7 +159,7 @@ class fixed_size_memory_resource * @param stream The stream on which the memory is to be used. * @return block_type The allocated block */ - free_list blocks_from_upstream(cudaStream_t stream) + free_list blocks_from_upstream(cuda_stream_view stream) { void* p = upstream_mr_->allocate(upstream_chunk_size_, stream); block_type b{p}; @@ -212,7 +213,7 @@ class fixed_size_memory_resource * @param stream the stream being executed on * @return std::pair with available and free memory for resource */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { return std::make_pair(0, 0); } diff --git a/include/rmm/mr/device/limiting_resource_adaptor.hpp b/include/rmm/mr/device/limiting_resource_adaptor.hpp index 5352771a6..8c10b4bad 100644 --- a/include/rmm/mr/device/limiting_resource_adaptor.hpp +++ b/include/rmm/mr/device/limiting_resource_adaptor.hpp @@ -121,7 +121,7 @@ class limiting_resource_adaptor final : public device_memory_resource { * @param stream Stream on which to perform the allocation * @return void* Pointer to the newly allocated memory */ - void* do_allocate(std::size_t bytes, cudaStream_t stream) override + void* do_allocate(std::size_t bytes, cuda_stream_view stream) override { void* p = nullptr; @@ -145,7 +145,7 @@ class limiting_resource_adaptor final : public device_memory_resource { * @param bytes Size of the allocation * @param stream Stream on which to perform the deallocation */ - void do_deallocate(void* p, std::size_t bytes, cudaStream_t stream) override + void do_deallocate(void* p, std::size_t bytes, cuda_stream_view stream) override { std::size_t allocated_size = rmm::detail::align_up(bytes, allocation_alignment_); upstream_->deallocate(p, bytes, stream); @@ -183,7 +183,7 @@ class limiting_resource_adaptor final : public device_memory_resource { * @param stream Stream on which to get the mem info. * @return std::pair contaiing free_size and total_size of memory */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { return {allocation_limit_ - allocated_bytes_, allocation_limit_}; } diff --git a/include/rmm/mr/device/logging_resource_adaptor.hpp b/include/rmm/mr/device/logging_resource_adaptor.hpp index bc011b816..1cdf8ab8e 100644 --- a/include/rmm/mr/device/logging_resource_adaptor.hpp +++ b/include/rmm/mr/device/logging_resource_adaptor.hpp @@ -17,6 +17,7 @@ #include +#include #include // If using GCC, temporary workaround for older libcudacxx defining _LIBCPP_VERSION @@ -202,18 +203,10 @@ class logging_resource_adaptor final : public device_memory_resource { * @param stream Stream on which to perform the allocation * @return void* Pointer to the newly allocated memory */ - void* do_allocate(std::size_t bytes, cudaStream_t stream) override + void* do_allocate(std::size_t bytes, cuda_stream_view stream) override { auto const p = upstream_->allocate(bytes, stream); - std::string msg{"allocate,"}; - std::stringstream ss; - ss << p; - msg += ss.str(); - msg += ","; - msg += std::to_string(bytes); - msg += ","; - msg += std::to_string(reinterpret_cast(stream)); - logger_->info(msg); + logger_->info("allocate,{},{},{}", p, bytes, fmt::ptr(stream.value())); return p; } @@ -233,17 +226,9 @@ class logging_resource_adaptor final : public device_memory_resource { * @param bytes Size of the allocation * @param stream Stream on which to perform the deallocation */ - void do_deallocate(void* p, std::size_t bytes, cudaStream_t stream) override + void do_deallocate(void* p, std::size_t bytes, cuda_stream_view stream) override { - std::string msg{"free,"}; - std::stringstream ss; - ss << p; - msg += ss.str(); - msg += ","; - msg += std::to_string(bytes); - msg += ","; - msg += std::to_string(reinterpret_cast(stream)); - logger_->info(msg); + logger_->info("free,{},{},{}", p, bytes, fmt::ptr(stream.value())); upstream_->deallocate(p, bytes, stream); } @@ -278,7 +263,7 @@ class logging_resource_adaptor final : public device_memory_resource { * @param stream Stream on which to get the mem info. * @return std::pair contaiing free_size and total_size of memory */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { return upstream_->get_mem_info(stream); } diff --git a/include/rmm/mr/device/managed_memory_resource.hpp b/include/rmm/mr/device/managed_memory_resource.hpp index 214b70d98..d0ec75de8 100644 --- a/include/rmm/mr/device/managed_memory_resource.hpp +++ b/include/rmm/mr/device/managed_memory_resource.hpp @@ -17,6 +17,7 @@ #include "device_memory_resource.hpp" +#include #include namespace rmm { @@ -62,7 +63,7 @@ class managed_memory_resource final : public device_memory_resource { * @param bytes The size, in bytes, of the allocation * @return void* Pointer to the newly allocated memory */ - void* do_allocate(std::size_t bytes, cudaStream_t) override + void* do_allocate(std::size_t bytes, cuda_stream_view) override { // FIXME: Unlike cudaMalloc, cudaMallocManaged will throw an error for 0 // size allocations. @@ -82,7 +83,7 @@ class managed_memory_resource final : public device_memory_resource { * * @param p Pointer to be deallocated */ - void do_deallocate(void* p, std::size_t, cudaStream_t) override + void do_deallocate(void* p, std::size_t, cuda_stream_view) override { RMM_ASSERT_CUDA_SUCCESS(cudaFree(p)); } @@ -112,7 +113,7 @@ class managed_memory_resource final : public device_memory_resource { * @param stream to execute on * @return std::pair contaiing free_size and total_size of memory */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { std::size_t free_size{}; std::size_t total_size{}; diff --git a/include/rmm/mr/device/owning_wrapper.hpp b/include/rmm/mr/device/owning_wrapper.hpp index 238346cd6..cee32e0fe 100644 --- a/include/rmm/mr/device/owning_wrapper.hpp +++ b/include/rmm/mr/device/owning_wrapper.hpp @@ -148,7 +148,7 @@ class owning_wrapper : public device_memory_resource { * @param stream Stream on which to perform the allocation * @return void* Pointer to the memory allocated by the wrapped resource */ - void* do_allocate(std::size_t bytes, cudaStream_t stream) override + void* do_allocate(std::size_t bytes, cuda_stream_view stream) override { return wrapped().allocate(bytes, stream); } @@ -164,7 +164,7 @@ class owning_wrapper : public device_memory_resource { * @param bytes Size of the allocation * @param stream Stream on which to deallocate the memory */ - void do_deallocate(void* p, std::size_t bytes, cudaStream_t stream) override + void do_deallocate(void* p, std::size_t bytes, cuda_stream_view stream) override { wrapped().deallocate(p, bytes, stream); } @@ -202,7 +202,7 @@ class owning_wrapper : public device_memory_resource { * @param stream Stream on which to get the mem info. * @return std::pair contaiing free_size and total_size of memory */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { return wrapped().get_mem_info(stream); } diff --git a/include/rmm/mr/device/pool_memory_resource.hpp b/include/rmm/mr/device/pool_memory_resource.hpp index 4a0bc7874..4aa252528 100644 --- a/include/rmm/mr/device/pool_memory_resource.hpp +++ b/include/rmm/mr/device/pool_memory_resource.hpp @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include #include @@ -85,8 +86,8 @@ class pool_memory_resource final * @param upstream_mr The memory_resource from which to allocate blocks for the pool. * @param initial_pool_size Minimum size, in bytes, of the initial pool. Defaults to half of the * available memory on the current device. - * @param maximum_pool_size Maximum size, in bytes, that the pool can grow to. Defaults to all of - * the available memory on the current device. + * @param maximum_pool_size Maximum size, in bytes, that the pool can grow to. Defaults to all + * of the available memory on the current device. */ explicit pool_memory_resource(Upstream* upstream_mr, thrust::optional initial_pool_size = thrust::nullopt, @@ -156,7 +157,8 @@ class pool_memory_resource final size_t get_maximum_allocation_size() const { return std::numeric_limits::max(); } /** - * @brief Try to expand the pool by allocating a block of at least `min_size` bytes from upstream + * @brief Try to expand the pool by allocating a block of at least `min_size` bytes from + * upstream * * Attempts to allocate `try_size` bytes from upstream. If it fails, it iteratively reduces the * attempted size by half until `min_size`, returning the allocated block once it succeeds. @@ -169,7 +171,7 @@ class pool_memory_resource final * @param stream The stream on which the memory is to be used. * @return block_type a block of at least `min_size` bytes */ - block_type try_to_expand(std::size_t try_size, std::size_t min_size, cudaStream_t stream) + block_type try_to_expand(std::size_t try_size, std::size_t min_size, cuda_stream_view stream) { while (try_size >= min_size) { auto b = block_from_upstream(try_size, stream); @@ -181,7 +183,7 @@ class pool_memory_resource final try_size = std::max(min_size, try_size / 2); } RMM_LOG_ERROR("[A][Stream {}][Upstream {}B][FAILURE maximum pool size exceeded]", - reinterpret_cast(stream), + fmt::ptr(stream.value()), min_size); RMM_FAIL("Maximum pool size exceeded", rmm::bad_alloc); } @@ -234,7 +236,7 @@ class pool_memory_resource final * @param stream The stream on which the memory is to be used. * @return block_type a block of at least `size` bytes */ - block_type expand_pool(std::size_t size, free_list& blocks, cudaStream_t stream) + block_type expand_pool(std::size_t size, free_list& blocks, cuda_stream_view stream) { // Strategy: If maximum_pool_size_ is set, then grow geometrically, e.g. by halfway to the // limit each time. If it is not set, grow exponentially, e.g. by doubling the pool size each @@ -273,9 +275,9 @@ class pool_memory_resource final * @param stream The stream on which the memory is to be used. * @return block_type The allocated block */ - thrust::optional block_from_upstream(size_t size, cudaStream_t stream) + thrust::optional block_from_upstream(size_t size, cuda_stream_view stream) { - RMM_LOG_DEBUG("[A][Stream {}][Upstream {}B]", reinterpret_cast(stream), size); + RMM_LOG_DEBUG("[A][Stream {}][Upstream {}B]", fmt::ptr(stream.value()), size); if (size == 0) return {}; @@ -415,7 +417,7 @@ class pool_memory_resource final * @param stream to execute on * @return std::pair contaiing free_size and total_size of memory */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { std::size_t free_size{}; std::size_t total_size{}; diff --git a/include/rmm/mr/device/thread_safe_resource_adaptor.hpp b/include/rmm/mr/device/thread_safe_resource_adaptor.hpp index 212d0ddb2..b0b2f1273 100644 --- a/include/rmm/mr/device/thread_safe_resource_adaptor.hpp +++ b/include/rmm/mr/device/thread_safe_resource_adaptor.hpp @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include @@ -89,7 +90,7 @@ class thread_safe_resource_adaptor final : public device_memory_resource { * @param stream Stream on which to perform the allocation * @return void* Pointer to the newly allocated memory */ - void* do_allocate(std::size_t bytes, cudaStream_t stream) override + void* do_allocate(std::size_t bytes, cuda_stream_view stream) override { lock_t lock(mtx); return upstream_->allocate(bytes, stream); @@ -105,7 +106,7 @@ class thread_safe_resource_adaptor final : public device_memory_resource { * @param bytes Size of the allocation * @param stream Stream on which to perform the deallocation */ - void do_deallocate(void* p, std::size_t bytes, cudaStream_t stream) override + void do_deallocate(void* p, std::size_t bytes, cuda_stream_view stream) override { lock_t lock(mtx); upstream_->deallocate(p, bytes, stream); @@ -141,7 +142,7 @@ class thread_safe_resource_adaptor final : public device_memory_resource { * @param stream Stream on which to get the mem info. * @return std::pair contaiing free_size and total_size of memory */ - std::pair do_get_mem_info(cudaStream_t stream) const override + std::pair do_get_mem_info(cuda_stream_view stream) const override { lock_t lock(mtx); return upstream_->get_mem_info(stream); diff --git a/include/rmm/mr/device/thrust_allocator_adaptor.hpp b/include/rmm/mr/device/thrust_allocator_adaptor.hpp index 1b20eed5a..77b7bd222 100644 --- a/include/rmm/mr/device/thrust_allocator_adaptor.hpp +++ b/include/rmm/mr/device/thrust_allocator_adaptor.hpp @@ -64,7 +64,7 @@ class thrust_allocator : public thrust::device_malloc_allocator { * * @param stream The stream to be used for device memory (de)allocation */ - explicit thrust_allocator(cudaStream_t stream) : _stream{stream} {} + explicit thrust_allocator(cuda_stream_view stream) : _stream{stream} {} /** * @brief Constructs a `thrust_allocator` using a device memory resource and @@ -73,7 +73,9 @@ class thrust_allocator : public thrust::device_malloc_allocator { * @param mr The resource to be used for device memory allocation * @param stream The stream to be used for device memory (de)allocation */ - thrust_allocator(device_memory_resource* mr, cudaStream_t stream) : _mr(mr), _stream{stream} {} + thrust_allocator(device_memory_resource* mr, cuda_stream_view stream) : _mr(mr), _stream{stream} + { + } /** * @brief Copy constructor. Copies the resource pointer and stream. @@ -117,11 +119,11 @@ class thrust_allocator : public thrust::device_malloc_allocator { /** * @brief Returns the stream used by this allocator. */ - cudaStream_t stream() const noexcept { return _stream; } + cuda_stream_view stream() const noexcept { return _stream; } private: device_memory_resource* _mr{rmm::mr::get_current_device_resource()}; - cudaStream_t _stream{0}; + cuda_stream_view _stream{}; }; } // namespace mr } // namespace rmm diff --git a/include/rmm/thrust_rmm_allocator.h b/include/rmm/thrust_rmm_allocator.h index 1ffd2a438..70b867d00 100644 --- a/include/rmm/thrust_rmm_allocator.h +++ b/include/rmm/thrust_rmm_allocator.h @@ -24,6 +24,7 @@ #ifndef THRUST_RMM_ALLOCATOR_H #define THRUST_RMM_ALLOCATOR_H +#include #include #include @@ -37,7 +38,7 @@ namespace rmm { template using device_vector = thrust::device_vector>; -using par_t = decltype(thrust::cuda::par(*(new rmm::mr::thrust_allocator(0)))); +using par_t = decltype(thrust::cuda::par(*(new rmm::mr::thrust_allocator()))); using deleter_t = std::function; using exec_policy_t = std::unique_ptr; @@ -52,7 +53,7 @@ using exec_policy_t = std::unique_ptr; * allocation. */ /* --------------------------------------------------------------------------*/ -inline exec_policy_t exec_policy(cudaStream_t stream = 0) +inline exec_policy_t exec_policy(cuda_stream_view const &stream = cuda_stream_view{}) { auto *alloc = new rmm::mr::thrust_allocator(stream); auto deleter = [alloc](par_t *pointer) { diff --git a/python/rmm/_lib/cuda_stream_view.pxd b/python/rmm/_lib/cuda_stream_view.pxd new file mode 100644 index 000000000..725c1a05b --- /dev/null +++ b/python/rmm/_lib/cuda_stream_view.pxd @@ -0,0 +1,34 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from libcpp cimport bool +from libcpp.memory cimport unique_ptr + +from rmm._lib.lib cimport cudaStream_t + +cdef extern from "rmm/cuda_stream_view.hpp" namespace "rmm" nogil: + cdef cppclass cuda_stream_view: + cuda_stream_view() + cuda_stream_view(cudaStream_t) + bool is_default() + bool is_per_thread_default() + void synchronize() except + + + cdef bool operator==(cuda_stream_view const, cuda_stream_view const) + +cdef class CudaStreamView: + cdef unique_ptr[cuda_stream_view] c_obj + + cpdef bool is_default(self) except * + cpdef bool is_per_thread_default(self) except * diff --git a/python/rmm/_lib/cuda_stream_view.pyx b/python/rmm/_lib/cuda_stream_view.pyx new file mode 100644 index 000000000..817d091f2 --- /dev/null +++ b/python/rmm/_lib/cuda_stream_view.pyx @@ -0,0 +1,41 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from libc.stdint cimport uintptr_t + + +cdef class CudaStreamView: + + def __cinit__(self, uintptr_t stream=0): + """Construct a view of the specified CUDA stream + + Parameters + ---------- + stream : uintptr_t, optional + CUDA stream to wrap, default 0 + """ + if (stream == 0): + self.c_obj.reset(new cuda_stream_view()) + else: + self.c_obj.reset(new cuda_stream_view(stream)) + + cpdef bool is_default(self) except *: + """Returns True if this is the CUDA default stream + """ + return self.c_obj.get()[0].is_default() + + cpdef bool is_per_thread_default(self) except *: + """Returns True if this is a CUDA per-thread default stream + """ + return self.c_obj.get()[0].is_per_thread_default() diff --git a/python/rmm/_lib/device_buffer.pxd b/python/rmm/_lib/device_buffer.pxd index ebf99ecfb..7d2234ac7 100644 --- a/python/rmm/_lib/device_buffer.pxd +++ b/python/rmm/_lib/device_buffer.pxd @@ -15,16 +15,16 @@ from libcpp.memory cimport unique_ptr from libc.stdint cimport uintptr_t -from rmm._lib.lib cimport cudaStream_t, cudaMemcpyAsync, cudaMemcpyDeviceToHost +from rmm._lib.cuda_stream_view cimport cuda_stream_view, CudaStreamView cdef extern from "rmm/device_buffer.hpp" namespace "rmm" nogil: cdef cppclass device_buffer: device_buffer() device_buffer(size_t size) except + - device_buffer(size_t size, cudaStream_t stream) except + + device_buffer(size_t size, cuda_stream_view stream) except + device_buffer(const void* source_data, size_t size) except + device_buffer(const void* source_data, - size_t size, cudaStream_t stream) except + + size_t size, cuda_stream_view stream) except + device_buffer(const device_buffer& other) except + void resize(size_t new_size) except + void shrink_to_fit() except + @@ -41,11 +41,11 @@ cdef class DeviceBuffer: @staticmethod cdef DeviceBuffer c_to_device(const unsigned char[::1] b, - uintptr_t stream=*) - cpdef copy_to_host(self, ary=*, uintptr_t stream=*) - cpdef copy_from_host(self, ary, uintptr_t stream=*) - cpdef copy_from_device(self, cuda_ary, uintptr_t stream=*) - cpdef bytes tobytes(self, uintptr_t stream=*) + CudaStreamView stream=*) + cpdef copy_to_host(self, ary=*, CudaStreamView stream=*) + cpdef copy_from_host(self, ary, CudaStreamView stream=*) + cpdef copy_from_device(self, cuda_ary, CudaStreamView stream=*) + cpdef bytes tobytes(self, CudaStreamView stream=*) cdef size_t c_size(self) except * cpdef void resize(self, size_t new_size) except * @@ -54,16 +54,17 @@ cdef class DeviceBuffer: cdef device_buffer c_release(self) except * -cpdef DeviceBuffer to_device(const unsigned char[::1] b, uintptr_t stream=*) +cpdef DeviceBuffer to_device(const unsigned char[::1] b, + CudaStreamView stream=*) cpdef void copy_ptr_to_host(uintptr_t db, unsigned char[::1] hb, - uintptr_t stream=*) nogil except * + CudaStreamView stream=*) except * cpdef void copy_host_to_ptr(const unsigned char[::1] hb, uintptr_t db, - uintptr_t stream=*) nogil except * + CudaStreamView stream=*) except * cpdef void copy_device_to_ptr(uintptr_t d_src, uintptr_t d_dst, size_t count, - uintptr_t stream=*) nogil except * + CudaStreamView stream=*) except * diff --git a/python/rmm/_lib/device_buffer.pyx b/python/rmm/_lib/device_buffer.pyx index 8f15eb6f9..0108f0f77 100644 --- a/python/rmm/_lib/device_buffer.pyx +++ b/python/rmm/_lib/device_buffer.pyx @@ -15,15 +15,18 @@ import numpy as np cimport cython from cpython.bytes cimport PyBytes_AS_STRING, PyBytes_FromStringAndSize +from cython.operator cimport dereference from libc.stdint cimport uintptr_t from libcpp.memory cimport unique_ptr from libcpp.utility cimport move from rmm._cuda.gpu cimport cudaError, cudaError_t from rmm._lib.lib cimport ( + cudaMemcpyAsync, cudaMemcpyDeviceToDevice, cudaMemcpyDeviceToHost, cudaMemcpyHostToDevice, + cudaMemcpyKind, cudaStream_t, cudaStreamSynchronize, ) @@ -34,7 +37,7 @@ cdef class DeviceBuffer: def __cinit__(self, *, uintptr_t ptr=0, size_t size=0, - uintptr_t stream=0): + CudaStreamView stream=CudaStreamView()): """Construct a ``DeviceBuffer`` with optional size and data pointer Parameters @@ -42,7 +45,8 @@ cdef class DeviceBuffer: ptr : pointer to some data on host or device to copy over size : size of the buffer to allocate (and possibly size of data to copy) - stream : CUDA stream to use for construction and/or copying, default 0 + stream : CUDA stream to use for construction and/or copying, + default the default stream Note ---- @@ -57,12 +61,12 @@ cdef class DeviceBuffer: >>> db = rmm.DeviceBuffer(size=5) """ cdef const void* c_ptr - cdef cudaStream_t c_stream + cdef cuda_stream_view c_stream cdef cudaError_t err with nogil: c_ptr = ptr - c_stream = stream + c_stream = dereference(stream.c_obj.get()) if size == 0: self.c_obj.reset(new device_buffer()) @@ -71,13 +75,8 @@ cdef class DeviceBuffer: else: self.c_obj.reset(new device_buffer(c_ptr, size, c_stream)) - if c_stream == NULL: - err = cudaStreamSynchronize(c_stream) - if err != cudaError.cudaSuccess: - with gil: - raise RuntimeError( - f"Stream sync failed with error: {err}" - ) + if c_stream.is_default(): + c_stream.synchronize() def __len__(self): return self.size @@ -122,22 +121,23 @@ cdef class DeviceBuffer: @staticmethod cdef DeviceBuffer c_to_device(const unsigned char[::1] b, - uintptr_t stream=0): + CudaStreamView stream=CudaStreamView()): """Calls ``to_device`` function on arguments provided""" return to_device(b, stream) @staticmethod - def to_device(const unsigned char[::1] b, uintptr_t stream=0): + def to_device(const unsigned char[::1] b, + CudaStreamView stream=CudaStreamView()): """Calls ``to_device`` function on arguments provided""" return to_device(b, stream) - cpdef copy_to_host(self, ary=None, uintptr_t stream=0): + cpdef copy_to_host(self, ary=None, CudaStreamView stream=CudaStreamView()): """Copy from a ``DeviceBuffer`` to a buffer on host Parameters ---------- hb : ``bytes``-like buffer to write into - stream : CUDA stream to use for copying, default 0 + stream : CUDA stream to use for copying, default the default stream Examples -------- @@ -164,18 +164,17 @@ cdef class DeviceBuffer: "Argument `hb` is too small. Need space for %i bytes." % s ) - with nogil: - copy_ptr_to_host(dbp.data(), hb[:s], stream) + copy_ptr_to_host(dbp.data(), hb[:s], stream) return ary - cpdef copy_from_host(self, ary, uintptr_t stream=0): + cpdef copy_from_host(self, ary, CudaStreamView stream=CudaStreamView()): """Copy from a buffer on host to ``self`` Parameters ---------- hb : ``bytes``-like buffer to copy from - stream : CUDA stream to use for copying, default 0 + stream : CUDA stream to use for copying, default the default stream Examples -------- @@ -196,16 +195,16 @@ cdef class DeviceBuffer: "Argument `hb` is too large. Need space for %i bytes." % s ) - with nogil: - copy_host_to_ptr(hb[:s], dbp.data(), stream) + copy_host_to_ptr(hb[:s], dbp.data(), stream) - cpdef copy_from_device(self, cuda_ary, uintptr_t stream=0): + cpdef copy_from_device(self, cuda_ary, + CudaStreamView stream=CudaStreamView()): """Copy from a buffer on host to ``self`` Parameters ---------- cuda_ary : object to copy from that has ``__cuda_array_interface__`` - stream : CUDA stream to use for copying, default 0 + stream : CUDA stream to use for copying, default the default stream Examples -------- @@ -250,15 +249,14 @@ cdef class DeviceBuffer: cdef device_buffer* dbp = self.c_obj.get() - with nogil: - copy_device_to_ptr( - src_ptr, - dbp.data(), - s, - stream - ) + copy_device_to_ptr( + src_ptr, + dbp.data(), + s, + stream + ) - cpdef bytes tobytes(self, uintptr_t stream=0): + cpdef bytes tobytes(self, CudaStreamView stream=CudaStreamView()): cdef const device_buffer* dbp = self.c_obj.get() cdef size_t s = dbp.size() @@ -289,13 +287,14 @@ cdef class DeviceBuffer: @cython.boundscheck(False) -cpdef DeviceBuffer to_device(const unsigned char[::1] b, uintptr_t stream=0): +cpdef DeviceBuffer to_device(const unsigned char[::1] b, + CudaStreamView stream=CudaStreamView()): """Return a new ``DeviceBuffer`` with a copy of the data Parameters ---------- b : ``bytes``-like data on host to copy to device - stream : CUDA stream to use for copying, default 0 + stream : CUDA stream to use for copying, default the default stream Returns ------- @@ -320,17 +319,53 @@ cpdef DeviceBuffer to_device(const unsigned char[::1] b, uintptr_t stream=0): return DeviceBuffer(ptr=p, size=s, stream=stream) +@cython.boundscheck(False) +cdef void _copy_async(const void* src, + void* dst, + size_t count, + cudaMemcpyKind kind, + cuda_stream_view stream) nogil: + """ + Asynchronously copy data between host and/or device pointers + + This is a convenience wrapper around cudaMemcpyAsync that + checks for errors. Only used for internal implementation. + + Parameters + ---------- + src : pointer to ``bytes``-like host buffer to or device data to copy from + dst : pointer to ``bytes``-like host buffer to or device data to copy into + count : the size in bytes to copy + stream : CUDA stream to use for copying, default the default stream + + Note + ---- + + If ``stream`` is the default stream, it is synchronized after the copy. + However if a non-default ``stream`` is provided, this function is fully + asynchronous. + """ + cdef cudaError_t err = cudaMemcpyAsync(dst, src, count, kind, + stream) + + if err != cudaError.cudaSuccess: + raise RuntimeError(f"Memcpy failed with error: {err}") + + if stream.is_default(): + stream.synchronize() + + @cython.boundscheck(False) cpdef void copy_ptr_to_host(uintptr_t db, unsigned char[::1] hb, - uintptr_t stream=0) nogil except *: + CudaStreamView stream=CudaStreamView()) except *: """Copy from a device pointer to a buffer on host Parameters ---------- db : pointer to data on device to copy hb : ``bytes``-like buffer to write into - stream : CUDA stream to use for copying, default 0 + stream : CUDA stream to use for copying, default the default stream Note ---- @@ -350,38 +385,30 @@ cpdef void copy_ptr_to_host(uintptr_t db, """ if hb is None: - with gil: - raise TypeError( - "Argument `hb` has incorrect type" - " (expected bytes-like, got NoneType)" - ) + raise TypeError( + "Argument `hb` has incorrect type" + " (expected bytes-like, got NoneType)" + ) - cdef cudaError_t err + cdef cuda_stream_view c_stream - err = cudaMemcpyAsync(&hb[0], db, len(hb), - cudaMemcpyDeviceToHost, stream) - if err != cudaError.cudaSuccess: - with gil: - raise RuntimeError(f"Memcpy failed with error: {err}") - - if stream == 0: - err = cudaStreamSynchronize(stream) - if err != cudaError.cudaSuccess: - with gil: - raise RuntimeError(f"Stream sync failed with error: {err}") + with nogil: + c_stream = dereference(stream.c_obj.get()) + _copy_async(db, &hb[0], len(hb), + cudaMemcpyDeviceToHost, c_stream) @cython.boundscheck(False) cpdef void copy_host_to_ptr(const unsigned char[::1] hb, uintptr_t db, - uintptr_t stream=0) nogil except *: + CudaStreamView stream=CudaStreamView()) except *: """Copy from a host pointer to a device pointer Parameters ---------- hb : ``bytes``-like host buffer to copy db : pointer to data on device to write into - stream : CUDA stream to use for copying, default 0 + stream : CUDA stream to use for copying, default the default stream Note ---- @@ -402,39 +429,31 @@ cpdef void copy_host_to_ptr(const unsigned char[::1] hb, """ if hb is None: - with gil: - raise TypeError( - "Argument `hb` has incorrect type" - " (expected bytes-like, got NoneType)" - ) - - cdef cudaError_t err + raise TypeError( + "Argument `hb` has incorrect type" + " (expected bytes-like, got NoneType)" + ) - err = cudaMemcpyAsync(db, &hb[0], len(hb), - cudaMemcpyHostToDevice, stream) - if err != cudaError.cudaSuccess: - with gil: - raise RuntimeError(f"Memcpy failed with error: {err}") + cdef cuda_stream_view c_stream - if stream == 0: - err = cudaStreamSynchronize(stream) - if err != cudaError.cudaSuccess: - with gil: - raise RuntimeError(f"Stream sync failed with error: {err}") + with nogil: + c_stream = dereference(stream.c_obj.get()) + _copy_async(&hb[0], db, len(hb), + cudaMemcpyHostToDevice, c_stream) @cython.boundscheck(False) cpdef void copy_device_to_ptr(uintptr_t d_src, uintptr_t d_dst, size_t count, - uintptr_t stream=0) nogil except *: + CudaStreamView stream=CudaStreamView()) except *: """Copy from a host pointer to a device pointer Parameters ---------- d_src : pointer to data on device to copy from d_dst : pointer to data on device to write into - stream : CUDA stream to use for copying, default 0 + stream : CUDA stream to use for copying, default the default stream Note ---- @@ -454,16 +473,10 @@ cpdef void copy_device_to_ptr(uintptr_t d_src, >>> print(hb) array([10, 11, 12, 0, 0], dtype=uint8) """ - cdef cudaError_t err - err = cudaMemcpyAsync(d_dst, d_src, count, - cudaMemcpyDeviceToDevice, stream) - if err != cudaError.cudaSuccess: - with gil: - raise RuntimeError(f"Memcpy failed with error: {err}") - - if stream == 0: - err = cudaStreamSynchronize(stream) - if err != cudaError.cudaSuccess: - with gil: - raise RuntimeError(f"Stream sync failed with error: {err}") + cdef cuda_stream_view c_stream + + with nogil: + c_stream = dereference(stream.c_obj.get()) + _copy_async(d_src, d_dst, count, + cudaMemcpyDeviceToDevice, c_stream) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 25ef4f768..e787aa017 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -96,9 +96,17 @@ target_compile_definitions(LIMITING_TEST PUBLIC CUDA_API_PER_THREAD_DEFAULT_STRE set(HOST_MR_TEST_SRC "${CMAKE_CURRENT_SOURCE_DIR}/mr/host/mr_tests.cpp") - + ConfigureTest(HOST_MR_TEST "${HOST_MR_TEST_SRC}") +################################################################################################### +# - cuda stream tests + +set(CUDA_STREAM_TEST_SRC + "${CMAKE_CURRENT_SOURCE_DIR}/cuda_stream_tests.cpp") + +ConfigureTest(CUDA_STREAM_TEST "${CUDA_STREAM_TEST_SRC}") + ################################################################################################### # - device buffer tests diff --git a/tests/cuda_stream_tests.cpp b/tests/cuda_stream_tests.cpp new file mode 100644 index 000000000..4cbdc6511 --- /dev/null +++ b/tests/cuda_stream_tests.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gtest/gtest.h" + +#include +#include +#include + +#include + +struct CudaStreamTest : public ::testing::Test { +}; + +TEST_F(CudaStreamTest, Equality) +{ + rmm::cuda_stream stream_a; + auto const view_a = stream_a.view(); + auto const view_default = rmm::cuda_stream_view{}; + + EXPECT_EQ(stream_a, view_a); + EXPECT_NE(stream_a, view_default); + EXPECT_EQ(view_default, rmm::cuda_stream_view{}); + EXPECT_EQ(view_default, rmm::cuda_stream_default); + EXPECT_NE(view_a, rmm::cuda_stream()); + EXPECT_NE(stream_a, rmm::cuda_stream()); + + rmm::device_buffer buff(0); + EXPECT_EQ(buff.stream(), view_default); +} + +TEST_F(CudaStreamTest, MoveConstructor) +{ + rmm::cuda_stream stream_a; + auto const view_a = stream_a.view(); + rmm::cuda_stream stream_b = std::move(stream_a); + EXPECT_FALSE(stream_a.is_valid()); // Any other operations on stream_a are UB, may segfault + EXPECT_EQ(stream_b, view_a); +} diff --git a/tests/device_buffer_tests.cu b/tests/device_buffer_tests.cu index 920e70acb..95ea23a93 100644 --- a/tests/device_buffer_tests.cu +++ b/tests/device_buffer_tests.cu @@ -16,6 +16,7 @@ #include +#include #include #include #include @@ -30,11 +31,9 @@ #include #include -void sync_stream(cudaStream_t stream) { EXPECT_EQ(cudaSuccess, cudaStreamSynchronize(stream)); } - template struct DeviceBufferTest : public ::testing::Test { - cudaStream_t stream{}; + rmm::cuda_stream stream{}; std::size_t size{}; MemoryResourceType mr{}; @@ -44,10 +43,6 @@ struct DeviceBufferTest : public ::testing::Test { std::uniform_int_distribution distribution(1000, 100000); size = distribution(generator); } - - void SetUp() override { EXPECT_EQ(cudaSuccess, cudaStreamCreate(&stream)); } - - void TearDown() override { EXPECT_EQ(cudaSuccess, cudaStreamDestroy(stream)); }; }; using resources = ::testing::Types; @@ -61,13 +56,13 @@ TYPED_TEST(DeviceBufferTest, DefaultMemoryResource) EXPECT_EQ(this->size, buff.size()); EXPECT_EQ(this->size, buff.capacity()); EXPECT_EQ(rmm::mr::get_current_device_resource(), buff.memory_resource()); - EXPECT_EQ(0, buff.stream()); + EXPECT_EQ(rmm::cuda_stream_view{}, buff.stream()); } TYPED_TEST(DeviceBufferTest, DefaultMemoryResourceStream) { rmm::device_buffer buff(this->size, this->stream); - sync_stream(this->stream); + this->stream.synchronize(); EXPECT_NE(nullptr, buff.data()); EXPECT_EQ(this->size, buff.size()); EXPECT_EQ(this->size, buff.capacity()); @@ -77,19 +72,19 @@ TYPED_TEST(DeviceBufferTest, DefaultMemoryResourceStream) TYPED_TEST(DeviceBufferTest, ExplicitMemoryResource) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); EXPECT_NE(nullptr, buff.data()); EXPECT_EQ(this->size, buff.size()); EXPECT_EQ(this->size, buff.capacity()); EXPECT_EQ(&this->mr, buff.memory_resource()); EXPECT_TRUE(this->mr.is_equal(*buff.memory_resource())); - EXPECT_EQ(0, buff.stream()); + EXPECT_EQ(rmm::cuda_stream_view{}, buff.stream()); } TYPED_TEST(DeviceBufferTest, ExplicitMemoryResourceStream) { rmm::device_buffer buff(this->size, this->stream, &this->mr); - sync_stream(this->stream); + this->stream.synchronize(); EXPECT_NE(nullptr, buff.data()); EXPECT_EQ(this->size, buff.size()); EXPECT_EQ(this->size, buff.capacity()); @@ -107,7 +102,7 @@ TYPED_TEST(DeviceBufferTest, CopyFromRawDevicePointer) EXPECT_EQ(this->size, buff.size()); EXPECT_EQ(this->size, buff.capacity()); EXPECT_EQ(rmm::mr::get_current_device_resource(), buff.memory_resource()); - EXPECT_EQ(0, buff.stream()); + EXPECT_EQ(rmm::cuda_stream_view{}, buff.stream()); // TODO check for equality between the contents of the two allocations EXPECT_EQ(cudaSuccess, cudaFree(device_memory)); } @@ -120,7 +115,7 @@ TYPED_TEST(DeviceBufferTest, CopyFromRawHostPointer) EXPECT_EQ(this->size, buff.size()); EXPECT_EQ(this->size, buff.capacity()); EXPECT_EQ(rmm::mr::get_current_device_resource(), buff.memory_resource()); - EXPECT_EQ(0, buff.stream()); + EXPECT_EQ(rmm::cuda_stream_view{}, buff.stream()); // TODO check for equality between the contents of the two allocations } @@ -132,7 +127,7 @@ TYPED_TEST(DeviceBufferTest, CopyFromNullptr) EXPECT_EQ(0, buff.size()); EXPECT_EQ(0, buff.capacity()); EXPECT_EQ(rmm::mr::get_current_device_resource(), buff.memory_resource()); - EXPECT_EQ(0, buff.stream()); + EXPECT_EQ(rmm::cuda_stream_view{}, buff.stream()); } TYPED_TEST(DeviceBufferTest, CopyFromNullptrNonZero) @@ -143,7 +138,7 @@ TYPED_TEST(DeviceBufferTest, CopyFromNullptrNonZero) TYPED_TEST(DeviceBufferTest, CopyConstructor) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); // Initialize buffer thrust::sequence(thrust::device, @@ -158,7 +153,7 @@ TYPED_TEST(DeviceBufferTest, CopyConstructor) EXPECT_EQ(buff.capacity(), buff_copy.capacity()); EXPECT_EQ(buff_copy.memory_resource(), rmm::mr::get_current_device_resource()); EXPECT_TRUE(buff_copy.memory_resource()->is_equal(*rmm::mr::get_current_device_resource())); - EXPECT_EQ(buff_copy.stream(), cudaStream_t{0}); + EXPECT_EQ(buff_copy.stream(), rmm::cuda_stream_view{}); EXPECT_TRUE(thrust::equal(thrust::device, static_cast(buff.data()), @@ -179,7 +174,7 @@ TYPED_TEST(DeviceBufferTest, CopyConstructor) TYPED_TEST(DeviceBufferTest, CopyCapacityLargerThanSize) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); // Resizing smaller to make `size()` < `capacity()` auto new_size = this->size - 1; @@ -198,7 +193,7 @@ TYPED_TEST(DeviceBufferTest, CopyCapacityLargerThanSize) EXPECT_EQ(new_size, buff_copy.capacity()); EXPECT_EQ(buff_copy.memory_resource(), rmm::mr::get_current_device_resource()); EXPECT_TRUE(buff_copy.memory_resource()->is_equal(*rmm::mr::get_current_device_resource())); - EXPECT_EQ(buff_copy.stream(), cudaStream_t{0}); + EXPECT_EQ(buff_copy.stream(), rmm::cuda_stream_view{}); // EXPECT_TRUE( // thrust::equal(thrust::device, static_cast(buff.data()), @@ -208,7 +203,7 @@ TYPED_TEST(DeviceBufferTest, CopyCapacityLargerThanSize) TYPED_TEST(DeviceBufferTest, CopyConstructorExplicitMr) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); // Can't do this until RMM cmake is setup to build cuda files // thrust::sequence(thrust::device, static_cast(buff.data()), // static_cast(buffer.data()) + buff.size(), @@ -230,7 +225,7 @@ TYPED_TEST(DeviceBufferTest, CopyConstructorExplicitMr) TYPED_TEST(DeviceBufferTest, CopyCapacityLargerThanSizeExplicitMr) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); // Resizing smaller to make `size()` < `capacity()` auto new_size = this->size - 1; @@ -260,7 +255,7 @@ TYPED_TEST(DeviceBufferTest, CopyCapacityLargerThanSizeExplicitMr) TYPED_TEST(DeviceBufferTest, CopyAssignmentToDefault) { - rmm::device_buffer const from(this->size, 0, &this->mr); + rmm::device_buffer const from(this->size, rmm::cuda_stream_view{}, &this->mr); rmm::device_buffer to{}; EXPECT_NO_THROW(to = from); EXPECT_NE(nullptr, to.data()); @@ -275,8 +270,8 @@ TYPED_TEST(DeviceBufferTest, CopyAssignmentToDefault) TYPED_TEST(DeviceBufferTest, CopyAssignment) { - rmm::device_buffer from(this->size, 0, &this->mr); - rmm::device_buffer to(this->size - 1, 0, &this->mr); + rmm::device_buffer from(this->size, rmm::cuda_stream_view{}, &this->mr); + rmm::device_buffer to(this->size - 1, rmm::cuda_stream_view{}, &this->mr); EXPECT_NO_THROW(to = from); EXPECT_NE(nullptr, to.data()); EXPECT_NE(nullptr, from.data()); @@ -290,9 +285,9 @@ TYPED_TEST(DeviceBufferTest, CopyAssignment) TYPED_TEST(DeviceBufferTest, CopyAssignmentCapacityLargerThanSize) { - rmm::device_buffer from(this->size, 0, &this->mr); + rmm::device_buffer from(this->size, rmm::cuda_stream_view{}, &this->mr); from.resize(from.size() - 1); - rmm::device_buffer to(42, 0, &this->mr); + rmm::device_buffer to(42, rmm::cuda_stream_view{}, &this->mr); EXPECT_NO_THROW(to = from); EXPECT_NE(nullptr, to.data()); EXPECT_NE(nullptr, from.data()); @@ -307,7 +302,7 @@ TYPED_TEST(DeviceBufferTest, CopyAssignmentCapacityLargerThanSize) TYPED_TEST(DeviceBufferTest, SelfCopyAssignment) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); auto p = buff.data(); auto size = buff.size(); auto capacity = buff.capacity(); @@ -325,7 +320,7 @@ TYPED_TEST(DeviceBufferTest, SelfCopyAssignment) TYPED_TEST(DeviceBufferTest, MoveConstructor) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); auto p = buff.data(); auto size = buff.size(); auto capacity = buff.capacity(); @@ -345,14 +340,14 @@ TYPED_TEST(DeviceBufferTest, MoveConstructor) EXPECT_EQ(nullptr, buff.data()); EXPECT_EQ(0, buff.size()); EXPECT_EQ(0, buff.capacity()); - EXPECT_EQ(0, buff.stream()); + EXPECT_EQ(rmm::cuda_stream_view{}, buff.stream()); EXPECT_NE(nullptr, buff.memory_resource()); } TYPED_TEST(DeviceBufferTest, MoveConstructorStream) { rmm::device_buffer buff(this->size, this->stream, &this->mr); - sync_stream(this->stream); + this->stream.synchronize(); auto p = buff.data(); auto size = buff.size(); auto capacity = buff.capacity(); @@ -361,7 +356,7 @@ TYPED_TEST(DeviceBufferTest, MoveConstructorStream) // New buffer should have the same contents as the original rmm::device_buffer buff_new(std::move(buff)); - sync_stream(this->stream); + this->stream.synchronize(); EXPECT_NE(nullptr, buff_new.data()); EXPECT_EQ(p, buff_new.data()); EXPECT_EQ(size, buff_new.size()); @@ -373,13 +368,13 @@ TYPED_TEST(DeviceBufferTest, MoveConstructorStream) EXPECT_EQ(nullptr, buff.data()); EXPECT_EQ(0, buff.size()); EXPECT_EQ(0, buff.capacity()); - EXPECT_EQ(0, buff.stream()); + EXPECT_EQ(rmm::cuda_stream_view{}, buff.stream()); EXPECT_NE(nullptr, buff.memory_resource()); } TYPED_TEST(DeviceBufferTest, MoveAssignmentToDefault) { - rmm::device_buffer from(this->size, 0, &this->mr); + rmm::device_buffer from(this->size, rmm::cuda_stream_view{}, &this->mr); auto p = from.data(); auto size = from.size(); auto capacity = from.capacity(); @@ -401,20 +396,20 @@ TYPED_TEST(DeviceBufferTest, MoveAssignmentToDefault) EXPECT_EQ(nullptr, from.data()); EXPECT_EQ(0, from.size()); EXPECT_EQ(0, from.capacity()); - EXPECT_EQ(0, from.stream()); + EXPECT_EQ(rmm::cuda_stream_view{}, from.stream()); EXPECT_NE(nullptr, from.memory_resource()); } TYPED_TEST(DeviceBufferTest, MoveAssignment) { - rmm::device_buffer from(this->size, 0, &this->mr); + rmm::device_buffer from(this->size, rmm::cuda_stream_view{}, &this->mr); auto p = from.data(); auto size = from.size(); auto capacity = from.capacity(); auto mr = from.memory_resource(); auto stream = from.stream(); - rmm::device_buffer to(this->size - 1, 0, &this->mr); + rmm::device_buffer to(this->size - 1, rmm::cuda_stream_view{}, &this->mr); EXPECT_NO_THROW(to = std::move(from)); // contents of `from` should be in `to` @@ -429,13 +424,13 @@ TYPED_TEST(DeviceBufferTest, MoveAssignment) EXPECT_EQ(nullptr, from.data()); EXPECT_EQ(0, from.size()); EXPECT_EQ(0, from.capacity()); - EXPECT_EQ(0, from.stream()); + EXPECT_EQ(rmm::cuda_stream_view{}, from.stream()); EXPECT_NE(nullptr, from.memory_resource()); } TYPED_TEST(DeviceBufferTest, SelfMoveAssignment) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); auto p = buff.data(); auto size = buff.size(); auto capacity = buff.capacity(); @@ -453,7 +448,7 @@ TYPED_TEST(DeviceBufferTest, SelfMoveAssignment) TYPED_TEST(DeviceBufferTest, ResizeSmaller) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); auto old_data = buff.data(); auto new_size = this->size - 1; buff.resize(new_size); @@ -474,7 +469,7 @@ TYPED_TEST(DeviceBufferTest, ResizeSmaller) TYPED_TEST(DeviceBufferTest, ResizeBigger) { - rmm::device_buffer buff(this->size, 0, &this->mr); + rmm::device_buffer buff(this->size, rmm::cuda_stream_view{}, &this->mr); auto old_data = buff.data(); auto new_size = this->size + 1; buff.resize(new_size); diff --git a/tests/device_scalar_tests.cpp b/tests/device_scalar_tests.cpp index 9e283f600..b9a6def97 100644 --- a/tests/device_scalar_tests.cpp +++ b/tests/device_scalar_tests.cpp @@ -16,6 +16,7 @@ #include +#include #include #include #include @@ -25,11 +26,9 @@ #include #include -void sync_stream(cudaStream_t stream) { EXPECT_EQ(cudaSuccess, cudaStreamSynchronize(stream)); } - template struct DeviceScalarTest : public ::testing::Test { - cudaStream_t stream{}; + rmm::cuda_stream stream{}; rmm::mr::device_memory_resource* mr{rmm::mr::get_current_device_resource()}; T value{}; std::default_random_engine generator{}; @@ -37,10 +36,6 @@ struct DeviceScalarTest : public ::testing::Test { std::numeric_limits::max()}; DeviceScalarTest() { value = distribution(generator); } - - void SetUp() override { EXPECT_EQ(cudaSuccess, cudaStreamCreate(&stream)); } - - void TearDown() override { EXPECT_EQ(cudaSuccess, cudaStreamDestroy(stream)); }; }; using Types = ::testing::Types; diff --git a/tests/device_uvector_tests.cpp b/tests/device_uvector_tests.cpp index 052bbe2d2..9fcffbd43 100644 --- a/tests/device_uvector_tests.cpp +++ b/tests/device_uvector_tests.cpp @@ -18,11 +18,12 @@ #include #include +#include #include template struct TypedUVectorTest : ::testing::Test { - cudaStream_t stream() const noexcept { return cudaStream_t{0}; } + rmm::cuda_stream_view stream() const noexcept { return rmm::cuda_stream_view{}; } }; using TestTypes = ::testing::Types; diff --git a/tests/mr/device/mr_multithreaded_tests.cpp b/tests/mr/device/mr_multithreaded_tests.cpp index a64b59f89..cb305089a 100644 --- a/tests/mr/device/mr_multithreaded_tests.cpp +++ b/tests/mr/device/mr_multithreaded_tests.cpp @@ -18,6 +18,7 @@ #include +#include #include #include #include @@ -134,39 +135,39 @@ TEST_P(mr_test_mt, SetCurrentDeviceResourcePerThread_mt) TEST_P(mr_test_mt, AllocateDefaultStream) { - spawn(test_various_allocations, this->mr.get(), cudaStream_t{cudaStreamDefault}); + spawn(test_various_allocations, this->mr.get(), rmm::cuda_stream_view{}); } TEST_P(mr_test_mt, AllocateOnStream) { - spawn(test_various_allocations, this->mr.get(), this->stream); + spawn(test_various_allocations, this->mr.get(), this->stream.view()); } TEST_P(mr_test_mt, RandomAllocationsDefaultStream) { - spawn(test_random_allocations, this->mr.get(), 100, 5_MiB, cudaStream_t{cudaStreamDefault}); + spawn(test_random_allocations, this->mr.get(), 100, 5_MiB, rmm::cuda_stream_view{}); } TEST_P(mr_test_mt, RandomAllocationsStream) { - spawn(test_random_allocations, this->mr.get(), 100, 5_MiB, this->stream); + spawn(test_random_allocations, this->mr.get(), 100, 5_MiB, this->stream.view()); } TEST_P(mr_test_mt, MixedRandomAllocationFreeDefaultStream) { - spawn(test_mixed_random_allocation_free, this->mr.get(), 5_MiB, cudaStream_t{cudaStreamDefault}); + spawn(test_mixed_random_allocation_free, this->mr.get(), 5_MiB, rmm::cuda_stream_view{}); } TEST_P(mr_test_mt, MixedRandomAllocationFreeStream) { - spawn(test_mixed_random_allocation_free, this->mr.get(), 5_MiB, this->stream); + spawn(test_mixed_random_allocation_free, this->mr.get(), 5_MiB, this->stream.view()); } void allocate_loop(rmm::mr::device_memory_resource* mr, std::size_t num_allocations, std::list& allocations, std::mutex& mtx, - cudaStream_t stream) + rmm::cuda_stream_view stream) { constexpr std::size_t max_size{1_MiB}; @@ -188,7 +189,7 @@ void deallocate_loop(rmm::mr::device_memory_resource* mr, std::size_t num_allocations, std::list& allocations, std::mutex& mtx, - cudaStream_t stream) + rmm::cuda_stream_view stream) { for (std::size_t i = 0; i < num_allocations;) { std::lock_guard lock(mtx); @@ -204,8 +205,8 @@ void deallocate_loop(rmm::mr::device_memory_resource* mr, } void test_allocate_free_different_threads(rmm::mr::device_memory_resource* mr, - cudaStream_t streamA, - cudaStream_t streamB) + rmm::cuda_stream_view streamA, + rmm::cuda_stream_view streamB) { constexpr std::size_t num_allocations{100}; @@ -225,13 +226,13 @@ void test_allocate_free_different_threads(rmm::mr::device_memory_resource* mr, TEST_P(mr_test_mt, AllocFreeDifferentThreadsDefaultStream) { test_allocate_free_different_threads( - this->mr.get(), cudaStream_t{cudaStreamDefault}, cudaStream_t{cudaStreamDefault}); + this->mr.get(), rmm::cuda_stream_default, rmm::cuda_stream_default); } TEST_P(mr_test_mt, AllocFreeDifferentThreadsPerThreadDefaultStream) { test_allocate_free_different_threads( - this->mr.get(), cudaStream_t{cudaStreamPerThread}, cudaStream_t{cudaStreamPerThread}); + this->mr.get(), rmm::cuda_stream_per_thread, rmm::cuda_stream_per_thread); } TEST_P(mr_test_mt, AllocFreeDifferentThreadsSameStream) @@ -241,11 +242,11 @@ TEST_P(mr_test_mt, AllocFreeDifferentThreadsSameStream) TEST_P(mr_test_mt, AllocFreeDifferentThreadsDifferentStream) { - cudaStream_t streamB{}; - EXPECT_EQ(cudaSuccess, cudaStreamCreate(&streamB)); - test_allocate_free_different_threads(this->mr.get(), this->stream, streamB); - EXPECT_EQ(cudaSuccess, cudaStreamSynchronize(streamB)); - EXPECT_EQ(cudaSuccess, cudaStreamDestroy(streamB)); + EXPECT_NO_THROW([this]() { + rmm::cuda_stream streamB; + test_allocate_free_different_threads(this->mr.get(), this->stream, streamB); + streamB.synchronize(); + }()); } } // namespace diff --git a/tests/mr/device/mr_test.hpp b/tests/mr/device/mr_test.hpp index e12c7e6bb..61df90e47 100644 --- a/tests/mr/device/mr_test.hpp +++ b/tests/mr/device/mr_test.hpp @@ -18,6 +18,8 @@ #include +#include +#include #include #include #include @@ -88,27 +90,27 @@ inline void test_get_current_device_resource() inline void test_allocate(rmm::mr::device_memory_resource* mr, std::size_t bytes, - cudaStream_t stream = 0) + cuda_stream_view stream = {}) { void* p{nullptr}; EXPECT_NO_THROW(p = mr->allocate(bytes)); - if (stream != 0) EXPECT_EQ(cudaSuccess, cudaStreamSynchronize(stream)); + if (not stream.is_default()) stream.synchronize(); EXPECT_NE(nullptr, p); EXPECT_TRUE(is_pointer_aligned(p)); EXPECT_TRUE(is_device_memory(p)); EXPECT_NO_THROW(mr->deallocate(p, bytes)); - if (stream != 0) EXPECT_EQ(cudaSuccess, cudaStreamSynchronize(stream)); + if (not stream.is_default()) stream.synchronize(); } -inline void test_various_allocations(rmm::mr::device_memory_resource* mr, cudaStream_t stream) +inline void test_various_allocations(rmm::mr::device_memory_resource* mr, cuda_stream_view stream) { // test allocating zero bytes on non-default stream { void* p{nullptr}; EXPECT_NO_THROW(p = mr->allocate(0, stream)); - EXPECT_EQ(cudaSuccess, cudaStreamSynchronize(stream)); + stream.synchronize(); EXPECT_NO_THROW(mr->deallocate(p, 0, stream)); - EXPECT_EQ(cudaSuccess, cudaStreamSynchronize(stream)); + stream.synchronize(); } test_allocate(mr, 4_B, stream); @@ -127,7 +129,7 @@ inline void test_various_allocations(rmm::mr::device_memory_resource* mr, cudaSt inline void test_random_allocations(rmm::mr::device_memory_resource* mr, std::size_t num_allocations = 100, std::size_t max_size = 5_MiB, - cudaStream_t stream = 0) + cuda_stream_view stream = {}) { std::vector allocations(num_allocations); @@ -139,7 +141,7 @@ inline void test_random_allocations(rmm::mr::device_memory_resource* mr, allocations.begin(), allocations.end(), [&generator, &distribution, stream, mr](allocation& a) { a.size = distribution(generator); EXPECT_NO_THROW(a.p = mr->allocate(a.size, stream)); - if (stream != 0) EXPECT_EQ(cudaSuccess, cudaStreamSynchronize(stream)); + if (not stream.is_default()) stream.synchronize(); EXPECT_NE(nullptr, a.p); EXPECT_TRUE(is_pointer_aligned(a.p)); }); @@ -147,13 +149,13 @@ inline void test_random_allocations(rmm::mr::device_memory_resource* mr, std::for_each( allocations.begin(), allocations.end(), [generator, distribution, stream, mr](allocation& a) { EXPECT_NO_THROW(mr->deallocate(a.p, a.size, stream)); - if (stream != 0) EXPECT_EQ(cudaSuccess, cudaStreamSynchronize(stream)); + if (not stream.is_default()) stream.synchronize(); }); } inline void test_mixed_random_allocation_free(rmm::mr::device_memory_resource* mr, - std::size_t max_size = 5_MiB, - cudaStream_t stream = 0) + std::size_t max_size = 5_MiB, + cuda_stream_view stream = {}) { std::default_random_engine generator; constexpr std::size_t num_allocations{100}; @@ -214,13 +216,10 @@ struct mr_test : public ::testing::TestWithParam { { auto factory = GetParam().f; mr = factory(); - EXPECT_EQ(cudaSuccess, cudaStreamCreate(&stream)); } - void TearDown() override { EXPECT_EQ(cudaSuccess, cudaStreamDestroy(stream)); }; - std::shared_ptr mr; ///< Pointer to resource to use in tests - cudaStream_t stream; + rmm::cuda_stream stream{}; }; /// MR factory functions diff --git a/tests/mr/device/mr_tests.cpp b/tests/mr/device/mr_tests.cpp index c122307d9..31246c55a 100644 --- a/tests/mr/device/mr_tests.cpp +++ b/tests/mr/device/mr_tests.cpp @@ -71,7 +71,7 @@ TEST_P(mr_test, SelfEquality) { EXPECT_TRUE(this->mr->is_equal(*this->mr)); } TEST_P(mr_test, AllocateDefaultStream) { - test_various_allocations(this->mr.get(), cudaStreamDefault); + test_various_allocations(this->mr.get(), cuda_stream_view{}); } TEST_P(mr_test, AllocateOnStream) { test_various_allocations(this->mr.get(), this->stream); } @@ -85,7 +85,7 @@ TEST_P(mr_test, RandomAllocationsStream) TEST_P(mr_test, MixedRandomAllocationFree) { - test_mixed_random_allocation_free(this->mr.get(), 5_MiB, cudaStreamDefault); + test_mixed_random_allocation_free(this->mr.get(), 5_MiB, cuda_stream_view{}); } TEST_P(mr_test, MixedRandomAllocationFreeStream) @@ -97,11 +97,11 @@ TEST_P(mr_test, GetMemInfo) { if (this->mr->supports_get_mem_info()) { std::pair mem_info; - EXPECT_NO_THROW(mem_info = this->mr->get_mem_info(0)); + EXPECT_NO_THROW(mem_info = this->mr->get_mem_info(rmm::cuda_stream_view{})); std::size_t allocation_size = 16 * 256; void* ptr; EXPECT_NO_THROW(ptr = this->mr->allocate(allocation_size)); - EXPECT_NO_THROW(mem_info = this->mr->get_mem_info(0)); + EXPECT_NO_THROW(mem_info = this->mr->get_mem_info(rmm::cuda_stream_view{})); EXPECT_TRUE(mem_info.first >= allocation_size); EXPECT_NO_THROW(this->mr->deallocate(ptr, allocation_size)); } diff --git a/tests/mr/device/pool_mr_tests.cpp b/tests/mr/device/pool_mr_tests.cpp index 98660fcd8..21d9d25f2 100644 --- a/tests/mr/device/pool_mr_tests.cpp +++ b/tests/mr/device/pool_mr_tests.cpp @@ -88,9 +88,9 @@ TEST(PoolTest, ForceGrowth) TEST(PoolTest, DeletedStream) { pool_mr mr{rmm::mr::get_current_device_resource(), 0}; - cudaStream_t stream; + cudaStream_t stream; // we don't use rmm::cuda_stream here to make destruction more explicit const int size = 10000; - cudaStreamCreate(&stream); + EXPECT_EQ(cudaSuccess, cudaStreamCreate(&stream)); EXPECT_NO_THROW(rmm::device_buffer buff(size, stream, &mr)); EXPECT_EQ(cudaSuccess, cudaStreamDestroy(stream)); EXPECT_NO_THROW(mr.allocate(size));