diff --git a/CMakeLists.txt b/CMakeLists.txt index 12674d2e..46685075 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,8 +12,8 @@ set(CMAKE_CXX_STANDARD 17) # Version set(CPPUDDLE_VERSION_MAJOR 0) -set(CPPUDDLE_VERSION_MINOR 1) -set(CPPUDDLE_VERSION_PATCH 99) +set(CPPUDDLE_VERSION_MINOR 3) +set(CPPUDDLE_VERSION_PATCH 0) set(CPPUDDLE_VERSION_STRING "${CPPUDDLE_VERSION_MAJOR}.${CPPUDDLE_VERSION_MINOR}.${CPPUDDLE_VERSION_PATCH}.") #------------------------------------------------------------------------------------------------------------ @@ -23,17 +23,19 @@ set(CPPUDDLE_VERSION_STRING "${CPPUDDLE_VERSION_MAJOR}.${CPPUDDLE_VERSION_MINOR} option(CPPUDDLE_WITH_CUDA "Enable CUDA tests/examples" OFF) option(CPPUDDLE_WITH_MULTIGPU_SUPPORT "Enables experimental MultiGPU support" OFF) option(CPPUDDLE_WITH_KOKKOS "Enable KOKKOS tests/examples" OFF) +set(CPPUDDLE_WITH_MAX_NUMBER_GPUS "1" CACHE STRING "Number of GPUs that will be used. Should match the number of GPUs used when using the maximum number of HPX worker threads. Should be 1 for non-HPX builds.") # HPX-related options option(CPPUDDLE_WITH_HPX "Enable basic HPX integration and examples" OFF) option(CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS "Enable HPX-aware allocators for even better HPX integration" ON) set(CPPUDDLE_WITH_HPX_MUTEX OFF CACHE BOOL "Use HPX spinlock mutex instead of std::mutex") +set(CPPUDDLE_WITH_NUMBER_BUCKETS "128" CACHE STRING "Number of internal recycle buckets buffer type. Should ideally match the intended number of HPX workers or be 1 in non-HPX builds.") # Test-related options option(CPPUDDLE_WITH_COUNTERS "Turns on allocations counters. Useful for extended testing" OFF) option(CPPUDDLE_WITH_TESTS "Build tests/examples" OFF) set(CPPUDDLE_WITH_DEADLOCK_TEST_REPETITONS "100000" CACHE STRING "Number of repetitions for the aggregation executor deadlock tests") -option(CPPUDDLE_DEACTIVATE_BUFFER_RECYCLING "Deactivates the default recycling behaviour" OFF) -option(CPPUDDLE_DEACTIVATE_AGGRESSIVE_ALLOCATORS "Deactivates the aggressive allocators" OFF) +option(CPPUDDLE_WITH_BUFFER_RECYCLING "Enables the default recycling behaviour! Turning this off will have a major negative performance impact and is only intended for testing!" ON) +option(CPPUDDLE_WITH_AGGRESSIVE_CONTENT_RECYCLING "Allows the aggressive allocators variants to reuse contents from previous buffers (and thus skip initializations)" ON) # Tooling options option(CPPUDDLE_WITH_CLANG_TIDY "Enable clang tidy warnings" OFF) option(CPPUDDLE_WITH_CLANG_FORMAT "Enable clang format target" OFF) @@ -61,6 +63,19 @@ if(CPPUDDLE_WITH_HPX) endif() endif() +if(CPPUDDLE_WITH_NUMBER_GPUS GREATER 1) + if(NOT CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS) + message(FATAL_ERROR " CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS=ON is required Multi-GPU builds!") + endif() +endif() + +if(CPPUDDLE_WITH_NUMBER_BUCKETS GREATER 1) + if(NOT CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS) + message(FATAL_ERROR " CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS=ON is required for Multi-Worker build! \ + Either turn it on or configure with CPPUDDLE_WITH_NUMBER_BUCKETS=1 !") + endif() +endif() + # HPX-aware allocators require HPX-Support. Warn if HPX support is disabled as we fallback on non-aware # allocators if(NOT CPPUDDLE_WITH_HPX) @@ -80,7 +95,7 @@ if (CPPUDDLE_WITH_KOKKOS) find_package(Kokkos 3.0.0 REQUIRED) find_package(HPXKokkos REQUIRED) - # Check that everything required is actyivated + # Check that everything required is activated if (NOT CPPUDDLE_WITH_HPX) message(FATAL_ERROR " KOKKOS support requires HPX flag to be turned on") endif() @@ -149,7 +164,15 @@ if (CPPUDDLE_WITH_HPX) if(CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS) message(INFO " Compiling with HPX-aware allocators!") target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS") + target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_HAVE_MAX_NUMBER_GPUS=${CPPUDDLE_WITH_MAX_NUMBER_GPUS}") + target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_HAVE_NUMBER_BUCKETS=${CPPUDDLE_WITH_NUMBER_BUCKETS}") + else() + target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_HAVE_MAX_NUMBER_GPUS=1") + target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_HAVE_NUMBER_BUCKETS=1") endif() +else() + target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_HAVE_MAX_NUMBER_GPUS=1") + target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_HAVE_NUMBER_BUCKETS=1") endif() if (CPPUDDLE_WITH_COUNTERS) target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_HAVE_COUNTERS") @@ -164,7 +187,15 @@ if (CPPUDDLE_WITH_HPX) target_compile_definitions(stream_manager INTERFACE "CPPUDDLE_HAVE_HPX") if(CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS) target_compile_definitions(stream_manager INTERFACE "CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS") + target_compile_definitions(stream_manager INTERFACE "CPPUDDLE_HAVE_MAX_NUMBER_GPUS=${CPPUDDLE_WITH_MAX_NUMBER_GPUS}") + target_compile_definitions(stream_manager INTERFACE "CPPUDDLE_HAVE_NUMBER_BUCKETS=${CPPUDDLE_WITH_NUMBER_BUCKETS}") + else() + target_compile_definitions(stream_manager INTERFACE "CPPUDDLE_HAVE_MAX_NUMBER_GPUS=1") + target_compile_definitions(stream_manager INTERFACE "CPPUDDLE_HAVE_NUMBER_BUCKETS=1") endif() +else() + target_compile_definitions(stream_manager INTERFACE "CPPUDDLE_HAVE_MAX_NUMBER_GPUS=1") + target_compile_definitions(stream_manager INTERFACE "CPPUDDLE_HAVE_NUMBER_BUCKETS=1") endif() if (CPPUDDLE_WITH_COUNTERS) target_compile_definitions(stream_manager INTERFACE "CPPUDDLE_HAVE_COUNTERS") @@ -182,16 +213,18 @@ else() message(INFO " Compiling with std::mutex!") endif() -if(CPPUDDLE_DEACTIVATE_BUFFER_RECYCLING) - target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_DEACTIVATE_BUFFER_RECYCLING") - message(WARNING " Slow Build: Buffer recycling is deactivated. This should only be used for performance tests!") -else() +if(CPPUDDLE_WITH_BUFFER_RECYCLING) message(INFO " Using default buffer recycling behaviour!") +else() + message(WARNING " Slow Build: Buffer recycling is deactivated. This should only be used for performance tests!") + target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_DEACTIVATE_BUFFER_RECYCLING") endif() -if(CPPUDDLE_DEACTIVATE_AGGRESSIVE_ALLOCATORS) +if(CPPUDDLE_WITH_AGGRESSIVE_CONTENT_RECYCLING) + message(INFO " Using default behaviour for aggressive content reusage (only relevant for aggressive allocators)!") +else() target_compile_definitions(buffer_manager INTERFACE "CPPUDDLE_DEACTIVATE_AGGRESSIVE_ALLOCATORS") - message(WARNING " Slow Build: Aggressive allocators disabled. This should only be used for performance tests!") + message(WARNING " Slow Build: Aggressive allocators (and thus content recycling) is disabled. This should only be used for performance tests!") endif() # install libs with the defitions: @@ -212,6 +245,9 @@ install(EXPORT CPPuddle NAMESPACE CPPuddle:: DESTINATION ${CMAKE_INSTALL_PREFIX} ## Add target for tests and tests definitions if (CPPUDDLE_WITH_TESTS) + if(NOT CPPUDDLE_WITH_BUFFER_RECYCLING) + message(FATAL_ERROR "The CPPuddle tests only work with CPPUDDLE_WITH_BUFFER_RECYCLING=ON. Turning off buffer recycling is not recommended in general!") + endif() add_executable(allocator_test tests/allocator_test.cpp) if (CPPUDDLE_WITH_HPX) target_link_libraries(allocator_test @@ -362,15 +398,10 @@ if (CPPUDDLE_WITH_TESTS) ) endif() if (NOT CMAKE_BUILD_TYPE MATCHES "Debug") # Performance tests only make sense with optimizations on - add_test(allocator_test.performance.analyse_recycle_performance cat allocator_test.out) - set_tests_properties(allocator_test.performance.analyse_recycle_performance PROPERTIES - FIXTURES_REQUIRED allocator_test_output - PASS_REGULAR_EXPRESSION "Test information: Recycler was faster than default allocator!" - ) add_test(allocator_test.performance.analyse_aggressive_performance cat allocator_test.out) set_tests_properties(allocator_test.performance.analyse_aggressive_performance PROPERTIES FIXTURES_REQUIRED allocator_test_output - PASS_REGULAR_EXPRESSION "Test information: Recycler was faster than default allocator!" + PASS_REGULAR_EXPRESSION "Test information: Aggressive recycler was faster than default allocator!" ) endif() add_test(allocator_test.fixture_cleanup ${CMAKE_COMMAND} -E remove allocator_test.out) @@ -384,12 +415,12 @@ if (CPPUDDLE_WITH_TESTS) find_program(VALGRIND_COMMAND valgrind) if (VALGRIND_COMMAND) add_test(allocator_memcheck.valgrind - ${VALGRIND_COMMAND} --trace-children=yes --leak-check=full ./allocator_test --arraysize 5000000 --passes 200) + ${VALGRIND_COMMAND} --trace-children=yes --leak-check=full --undef-value-errors=no --show-error-list=yes ./allocator_test --arraysize 5000000 --passes 200) set_tests_properties(allocator_memcheck.valgrind PROPERTIES PASS_REGULAR_EXPRESSION "ERROR SUMMARY: 0 errors from 0 contexts" ) add_test(allocator_aligned_memcheck.valgrind - ${VALGRIND_COMMAND} --trace-children=yes --leak-check=full ./allocator_aligned_test --arraysize 5000000 --passes 200) + ${VALGRIND_COMMAND} --trace-children=yes --leak-check=full --undef-value-errors=no --show-error-list=yes ./allocator_aligned_test --arraysize 5000000 --passes 200) set_tests_properties(allocator_aligned_memcheck.valgrind PROPERTIES PASS_REGULAR_EXPRESSION "ERROR SUMMARY: 0 errors from 0 contexts" ) @@ -429,15 +460,10 @@ if (CPPUDDLE_WITH_TESTS) ) endif() if (NOT CMAKE_BUILD_TYPE MATCHES "Debug") # Performance tests only make sense with optimizations on - add_test(allocator_aligned_test.performance.analyse_recycle_performance cat allocator_aligned_test.out) - set_tests_properties(allocator_aligned_test.performance.analyse_recycle_performance PROPERTIES - FIXTURES_REQUIRED allocator_aligned_test_output - PASS_REGULAR_EXPRESSION "Test information: Recycler was faster than default allocator!" - ) add_test(allocator_aligned_test.performance.analyse_aggressive_performance cat allocator_aligned_test.out) set_tests_properties(allocator_aligned_test.performance.analyse_aggressive_performance PROPERTIES FIXTURES_REQUIRED allocator_aligned_test_output - PASS_REGULAR_EXPRESSION "Test information: Recycler was faster than default allocator!" + PASS_REGULAR_EXPRESSION "Test information: Aggressive recycler was faster than default allocator!" ) endif() add_test(allocator_aligned_test.fixture_cleanup ${CMAKE_COMMAND} -E remove allocator_aligned_test.out) @@ -485,15 +511,10 @@ if (CPPUDDLE_WITH_TESTS) ) endif() if (NOT CMAKE_BUILD_TYPE MATCHES "Debug") # Performance tests only make sense with optimizations on - add_test(allocator_concurrency_test.performance.analyse_recycle_performance cat allocator_concurrency_test.out) - set_tests_properties(allocator_concurrency_test.performance.analyse_recycle_performance PROPERTIES - FIXTURES_REQUIRED allocator_concurrency_output - PASS_REGULAR_EXPRESSION "Test information: Recycler was faster than default allocator!" - ) add_test(allocator_concurrency_test.performance.analyse_aggressive_performance cat allocator_concurrency_test.out) set_tests_properties(allocator_concurrency_test.performance.analyse_aggressive_performance PROPERTIES FIXTURES_REQUIRED allocator_concurrency_output - PASS_REGULAR_EXPRESSION "Test information: Recycler was faster than default allocator!" + PASS_REGULAR_EXPRESSION "Test information: Aggressive recycler was faster than default allocator!" ) endif() add_test(allocator_concurrency_test.fixture_cleanup ${CMAKE_COMMAND} -E remove allocator_concurrency_test.out) @@ -530,12 +551,12 @@ if (CPPUDDLE_WITH_TESTS) add_test(allocator_kokkos_test.analyse_cleaned_buffers cat allocator_kokkos_test.out) set_tests_properties(allocator_kokkos_test.analyse_cleaned_buffers PROPERTIES FIXTURES_REQUIRED allocator_kokkos_output - PASS_REGULAR_EXPRESSION "--> Number cleaned up buffers:[ ]* 2" + PASS_REGULAR_EXPRESSION "--> Number cleaned up buffers:[ ]* 3" ) add_test(allocator_kokkos_test.analyse_created_buffers cat allocator_kokkos_test.out) set_tests_properties(allocator_kokkos_test.analyse_created_buffers PROPERTIES FIXTURES_REQUIRED allocator_kokkos_output - PASS_REGULAR_EXPRESSION "--> Number of times a new buffer had to be created for a request:[ ]* 2" + PASS_REGULAR_EXPRESSION "--> Number of times a new buffer had to be created for a request:[ ]* 3" ) add_test(allocator_kokkos_test.analyse_bad_allocs cat allocator_kokkos_test.out) set_tests_properties(allocator_kokkos_test.analyse_bad_allocs PROPERTIES diff --git a/include/aggregation_manager.hpp b/include/aggregation_manager.hpp index fe1de846..92ad5f8d 100644 --- a/include/aggregation_manager.hpp +++ b/include/aggregation_manager.hpp @@ -1,6 +1,12 @@ +// Copyright (c) 2022-2023 Gregor Daiß +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + #ifndef WORK_AGGREGATION_MANAGER #define WORK_AGGREGATION_MANAGER +#include #define DEBUG_AGGREGATION_CALLS 1 #include @@ -27,7 +33,6 @@ #include #include #include -//#include // obsolete #include #if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP) @@ -41,12 +46,7 @@ #include "../include/buffer_manager.hpp" #include "../include/stream_manager.hpp" - -#if defined(CPPUDDLE_HAVE_HPX_MUTEX) -using aggregation_mutex_t = hpx::spinlock; -#else -using aggregation_mutex_t = std::mutex; -#endif +#include "../include/detail/config.hpp" //=============================================================================== //=============================================================================== @@ -144,7 +144,7 @@ template class aggregated_function_call { std::any function_tuple; /// Stores the string of the first function call for debug output std::string debug_type_information; - aggregation_mutex_t debug_mut; + recycler::aggregation_mutex_t debug_mut; #endif std::vector> potential_async_promises{}; @@ -175,7 +175,7 @@ template class aggregated_function_call { #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS) // needed for concurrent access to function_tuple and debug_type_information // Not required for normal use - std::lock_guard guard(debug_mut); + std::lock_guard guard(debug_mut); #endif assert(!async_mode); assert(potential_async_promises.empty()); @@ -249,7 +249,7 @@ template class aggregated_function_call { #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS) // needed for concurrent access to function_tuple and debug_type_information // Not required for normal use - std::lock_guard guard(debug_mut); + std::lock_guard guard(debug_mut); #endif assert(async_mode); assert(!potential_async_promises.empty()); @@ -313,7 +313,8 @@ template class aggregated_function_call { potential_async_promises[local_counter].get_future(); if (local_counter == number_slices - 1) { /* slices_ready_promise.set_value(); */ - auto fut = exec_async_wrapper(underlying_executor, std::forward(f), std::forward(ts)...); + auto fut = exec_async_wrapper( + underlying_executor, std::forward(f), std::forward(ts)...); fut.then([this](auto &&fut) { for (auto &promise : potential_async_promises) { promise.set_value(); @@ -389,6 +390,7 @@ template class Aggregated_Executor { Executor &executor; public: + size_t gpu_id; // Subclasses /// Slice class - meant as a scope interface to the aggregated executor @@ -528,18 +530,18 @@ template class Aggregated_Executor { /// slices have called it std::deque> function_calls; /// For synchronizing the access to the function calls list - aggregation_mutex_t mut; + recycler::aggregation_mutex_t mut; /// Data entry for a buffer allocation: void* pointer, size_t for - /// buffer-size, atomic for the slice counter + /// buffer-size, atomic for the slice counter, location_id, gpu_id using buffer_entry_t = - std::tuple, bool, const size_t>; + std::tuple, bool, const size_t, size_t>; /// Keeps track of the aggregated buffer allocations done in all the slices std::deque buffer_allocations; /// Map pointer to deque index for fast access in the deallocations std::unordered_map buffer_allocations_map; /// For synchronizing the access to the buffer_allocations - aggregation_mutex_t buffer_mut; + recycler::aggregation_mutex_t buffer_mut; std::atomic buffer_counter = 0; /// Get new buffer OR get buffer already allocated by different slice @@ -551,7 +553,7 @@ template class Aggregated_Executor { // First: Check if it already has happened if (buffer_counter <= slice_alloc_counter) { // we might be the first! Lock... - std::lock_guard guard(buffer_mut); + std::lock_guard guard(buffer_mut); // ... and recheck if (buffer_counter <= slice_alloc_counter) { constexpr bool manage_content_lifetime = false; @@ -559,13 +561,17 @@ template class Aggregated_Executor { // Default location -- useful for GPU builds as we otherwise create way too // many different buffers for different aggregation sizes on different GPUs - size_t location_id = 0; + /* size_t location_id = gpu_id * instances_per_gpu; */ + // Use integer conversion to only use 0 16 32 ... as buckets + size_t location_id = ((hpx::get_worker_thread_num() % recycler::number_instances) / 16) * 16; #ifdef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS if (max_slices == 1) { // get prefered location: aka the current hpx threads location // Usually handy for CPU builds where we want to use the buffers // close to the current CPU core - location_id = hpx::get_worker_thread_num(); + /* location_id = (hpx::get_worker_thread_num() / instances_per_gpu) * instances_per_gpu; */ + /* location_id = (gpu_id) * instances_per_gpu; */ + // division makes sure that we always use the same instance to store our gpu buffers. } #endif // Get shiny and new buffer that will be shared between all slices @@ -573,10 +579,10 @@ template class Aggregated_Executor { // buffer_recycler... T *aggregated_buffer = recycler::detail::buffer_recycler::get( - size, manage_content_lifetime, location_id); + size, manage_content_lifetime, location_id, gpu_id); // Create buffer entry for this buffer buffer_allocations.emplace_back(static_cast(aggregated_buffer), - size, 1, true, location_id); + size, 1, true, location_id, gpu_id); #ifndef NDEBUG // if previousely used the buffer should not be in usage anymore @@ -630,6 +636,7 @@ template class Aggregated_Executor { auto &buffer_allocation_counter = std::get<2>(buffer_allocations[slice_alloc_counter]); auto &valid = std::get<3>(buffer_allocations[slice_alloc_counter]); const auto &location_id = std::get<4>(buffer_allocations[slice_alloc_counter]); + const auto &gpu_id = std::get<5>(buffer_allocations[slice_alloc_counter]); assert(valid); T *buffer_pointer = static_cast(buffer_pointer_void); @@ -641,20 +648,20 @@ template class Aggregated_Executor { // Check if all slices are done with this buffer? if (buffer_allocation_counter == 0) { // Yes! "Deallocate" by telling the recylcer the buffer is fit for reusage - std::lock_guard guard(buffer_mut); + std::lock_guard guard(buffer_mut); // Only mark unused if another buffer has not done so already (and marked // it as invalid) if (valid) { assert(buffers_in_use == true); recycler::detail::buffer_recycler::mark_unused( - buffer_pointer, buffer_size, location_id); + buffer_pointer, buffer_size, location_id, gpu_id); // mark buffer as invalid to prevent any other slice from marking the // buffer as unused valid = false; const size_t current_deallocs = ++dealloc_counter; if (current_deallocs == buffer_counter) { - std::lock_guard guard(mut); + std::lock_guard guard(mut); buffers_in_use = false; if (!executor_slices_alive && !buffers_in_use) slices_exhausted = false; @@ -672,11 +679,11 @@ template class Aggregated_Executor { /// Only meant to be accessed by the slice executors bool sync_aggregation_slices(const size_t slice_launch_counter) { - std::lock_guard guard(mut); + std::lock_guard guard(mut); assert(slices_exhausted == true); // Add function call object in case it hasn't happened for this launch yet if (overall_launch_counter <= slice_launch_counter) { - /* std::lock_guard guard(mut); */ + /* std::lock_guard guard(mut); */ if (overall_launch_counter <= slice_launch_counter) { function_calls.emplace_back(current_slices, false, executor); overall_launch_counter = function_calls.size(); @@ -692,11 +699,11 @@ template class Aggregated_Executor { /// Only meant to be accessed by the slice executors template void post(const size_t slice_launch_counter, F &&f, Ts &&...ts) { - std::lock_guard guard(mut); + std::lock_guard guard(mut); assert(slices_exhausted == true); // Add function call object in case it hasn't happened for this launch yet if (overall_launch_counter <= slice_launch_counter) { - /* std::lock_guard guard(mut); */ + /* std::lock_guard guard(mut); */ if (overall_launch_counter <= slice_launch_counter) { function_calls.emplace_back(current_slices, false, executor); overall_launch_counter = function_calls.size(); @@ -715,11 +722,11 @@ template class Aggregated_Executor { template hpx::lcos::future async(const size_t slice_launch_counter, F &&f, Ts &&...ts) { - std::lock_guard guard(mut); + std::lock_guard guard(mut); assert(slices_exhausted == true); // Add function call object in case it hasn't happened for this launch yet if (overall_launch_counter <= slice_launch_counter) { - /* std::lock_guard guard(mut); */ + /* std::lock_guard guard(mut); */ if (overall_launch_counter <= slice_launch_counter) { function_calls.emplace_back(current_slices, true, executor); overall_launch_counter = function_calls.size(); @@ -735,11 +742,11 @@ template class Aggregated_Executor { template hpx::lcos::shared_future wrap_async(const size_t slice_launch_counter, F &&f, Ts &&...ts) { - std::lock_guard guard(mut); + std::lock_guard guard(mut); assert(slices_exhausted == true); // Add function call object in case it hasn't happened for this launch yet if (overall_launch_counter <= slice_launch_counter) { - /* std::lock_guard guard(mut); */ + /* std::lock_guard guard(mut); */ if (overall_launch_counter <= slice_launch_counter) { function_calls.emplace_back(current_slices, true, executor); overall_launch_counter = function_calls.size(); @@ -753,12 +760,12 @@ template class Aggregated_Executor { } bool slice_available(void) { - std::lock_guard guard(mut); + std::lock_guard guard(mut); return !slices_exhausted; } std::optional> request_executor_slice() { - std::lock_guard guard(mut); + std::lock_guard guard(mut); if (!slices_exhausted) { const size_t local_slice_id = ++current_slices; if (local_slice_id == 1) { @@ -766,11 +773,11 @@ template class Aggregated_Executor { // TODO still required? Should be clean here already function_calls.clear(); overall_launch_counter = 0; - std::lock_guard guard(buffer_mut); + std::lock_guard guard(buffer_mut); #ifndef NDEBUG for (const auto &buffer_entry : buffer_allocations) { const auto &[buffer_pointer_any, buffer_size, - buffer_allocation_counter, valid, location_id] = + buffer_allocation_counter, valid, location_id, device_id] = buffer_entry; assert(!valid); } @@ -808,9 +815,12 @@ template class Aggregated_Executor { if (local_slice_id == 1) { // Renew promise that all slices will be ready as the primary launch criteria... hpx::lcos::shared_future fut; - if (mode == Aggregated_Executor_Modes::EAGER || mode == Aggregated_Executor_Modes::ENDLESS) { - // Fallback launch condidtion: Launch as soon as the underlying stream is ready + if (mode == Aggregated_Executor_Modes::EAGER || + mode == Aggregated_Executor_Modes::ENDLESS) { + // Fallback launch condidtion: Launch as soon as the underlying stream + // is ready /* auto slices_full_fut = slices_full_promise.get_future(); */ + stream_pool::select_device>(gpu_id); auto exec_fut = executor.get_future(); /* fut = hpx::when_any(exec_fut, slices_full_fut); */ fut = std::move(exec_fut); @@ -821,7 +831,7 @@ template class Aggregated_Executor { } // Launch all executor slices within this continuation current_continuation = fut.then([this](auto &&fut) { - std::lock_guard guard(mut); + std::lock_guard guard(mut); slices_exhausted = true; launched_slices = current_slices; size_t id = 0; @@ -835,15 +845,16 @@ template class Aggregated_Executor { } if (local_slice_id >= max_slices && mode != Aggregated_Executor_Modes::ENDLESS) { - slices_exhausted = true; // prevents any more threads from entering before the continuation is launched - /* launched_slices = current_slices; */ - /* size_t id = 0; */ - /* for (auto &slice_promise : executor_slices) { */ - /* slice_promise.set_value( */ - /* Executor_Slice{*this, id, launched_slices}); */ - /* id++; */ - /* } */ - /* executor_slices.clear(); */ + slices_exhausted = true; // prevents any more threads from entering + // before the continuation is launched + /* launched_slices = current_slices; */ + /* size_t id = 0; */ + /* for (auto &slice_promise : executor_slices) { */ + /* slice_promise.set_value( */ + /* Executor_Slice{*this, id, launched_slices}); */ + /* id++; */ + /* } */ + /* executor_slices.clear(); */ if (mode == Aggregated_Executor_Modes::STRICT ) { slices_full_promise.set_value(); // Trigger slices launch condition continuation } @@ -857,7 +868,7 @@ template class Aggregated_Executor { } size_t launched_slices; void reduce_usage_counter(void) { - /* std::lock_guard guard(mut); */ + /* std::lock_guard guard(mut); */ assert(slices_exhausted == true); assert(executor_slices_alive == true); assert(launched_slices >= 1); @@ -874,7 +885,7 @@ template class Aggregated_Executor { // std::get<0>(executor_tuple); // Mark executor fit for reusage - std::lock_guard guard(mut); + std::lock_guard guard(mut); executor_slices_alive = false; if (!executor_slices_alive && !buffers_in_use) { slices_exhausted = false; @@ -897,7 +908,7 @@ template class Aggregated_Executor { #ifndef NDEBUG for (const auto &buffer_entry : buffer_allocations) { const auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter, - valid, location_id] = buffer_entry; + valid, location_id, device_id] = buffer_entry; assert(!valid); } #endif @@ -910,11 +921,11 @@ template class Aggregated_Executor { } Aggregated_Executor(const size_t number_slices, - Aggregated_Executor_Modes mode) + Aggregated_Executor_Modes mode, const size_t gpu_id = 0) : max_slices(number_slices), current_slices(0), slices_exhausted(false),dealloc_counter(0), - mode(mode), executor_slices_alive(false), buffers_in_use(false), + mode(mode), executor_slices_alive(false), buffers_in_use(false), gpu_id(gpu_id), executor_tuple( - stream_pool::get_interface>()), + stream_pool::get_interface>(gpu_id)), executor(std::get<0>(executor_tuple)), current_continuation(hpx::make_ready_future()), last_stream_launch_done(hpx::make_ready_future()) {} @@ -1004,39 +1015,63 @@ class aggregation_pool { /// interface template static void init(size_t number_of_executors, size_t slices_per_executor, - Aggregated_Executor_Modes mode) { - std::lock_guard guard(instance.pool_mutex); - assert(instance.aggregation_executor_pool.empty()); - for (int i = 0; i < number_of_executors; i++) { - instance.aggregation_executor_pool.emplace_back(slices_per_executor, - mode); + Aggregated_Executor_Modes mode, size_t num_devices = 1) { + if (is_initialized) { + throw std::runtime_error( + std::string("Trying to initialize cppuddle aggregation pool twice") + + " Agg pool name: " + std::string(kernelname)); + } + if (num_devices > recycler::max_number_gpus) { + throw std::runtime_error( + std::string( + "Trying to initialize aggregation with more devices than the " + "maximum number of GPUs given at compiletime") + + " Agg pool name: " + std::string(kernelname)); } - instance.slices_per_executor = slices_per_executor; - instance.mode = mode; + number_devices = num_devices; + for (size_t gpu_id = 0; gpu_id < number_devices; gpu_id++) { + + std::lock_guard guard(instance()[gpu_id].pool_mutex); + assert(instance()[gpu_id].aggregation_executor_pool.empty()); + for (int i = 0; i < number_of_executors; i++) { + instance()[gpu_id].aggregation_executor_pool.emplace_back(slices_per_executor, + mode, gpu_id); + } + instance()[gpu_id].slices_per_executor = slices_per_executor; + instance()[gpu_id].mode = mode; + } + is_initialized = true; } /// Will always return a valid executor slice static decltype(auto) request_executor_slice(void) { - std::lock_guard guard(instance.pool_mutex); - assert(!instance.aggregation_executor_pool.empty()); + if (!is_initialized) { + throw std::runtime_error( + std::string("Trying to use cppuddle aggregation pool without first calling init") + + " Agg poolname: " + std::string(kernelname)); + } + const size_t gpu_id = recycler::get_device_id(number_devices); + /* const size_t gpu_id = 1; */ + std::lock_guard guard(instance()[gpu_id].pool_mutex); + assert(!instance()[gpu_id].aggregation_executor_pool.empty()); std::optional::Executor_Slice>> ret; - size_t local_id = (instance.current_interface) % - instance.aggregation_executor_pool.size(); - ret = instance.aggregation_executor_pool[local_id].request_executor_slice(); + size_t local_id = (instance()[gpu_id].current_interface) % + instance()[gpu_id].aggregation_executor_pool.size(); + ret = instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice(); // Expected case: current aggregation executor is free if (ret.has_value()) { return ret; } // current interface is bad -> find free one size_t abort_counter = 0; - const size_t abort_number = instance.aggregation_executor_pool.size() + 1; + const size_t abort_number = instance()[gpu_id].aggregation_executor_pool.size() + 1; do { - local_id = (++(instance.current_interface)) % // increment interface - instance.aggregation_executor_pool.size(); + local_id = (++(instance()[gpu_id].current_interface)) % // increment interface + instance()[gpu_id].aggregation_executor_pool.size(); ret = - instance.aggregation_executor_pool[local_id].request_executor_slice(); + instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice(); if (ret.has_value()) { return ret; } @@ -1044,12 +1079,15 @@ class aggregation_pool { } while (abort_counter <= abort_number); // Everything's busy -> create new aggregation executor (growing pool) OR // return empty optional - if (instance.growing_pool) { - instance.aggregation_executor_pool.emplace_back( - instance.slices_per_executor, instance.mode); - instance.current_interface = instance.aggregation_executor_pool.size() - 1; - assert(instance.aggregation_executor_pool.size() < 20480); - ret = instance.aggregation_executor_pool[instance.current_interface].request_executor_slice(); + if (instance()[gpu_id].growing_pool) { + instance()[gpu_id].aggregation_executor_pool.emplace_back( + instance()[gpu_id].slices_per_executor, instance()[gpu_id].mode, gpu_id); + instance()[gpu_id].current_interface = + instance()[gpu_id].aggregation_executor_pool.size() - 1; + assert(instance()[gpu_id].aggregation_executor_pool.size() < 20480); + ret = instance()[gpu_id] + .aggregation_executor_pool[instance()[gpu_id].current_interface] + .request_executor_slice(); assert(ret.has_value()); // fresh executor -- should always have slices // available } @@ -1066,9 +1104,15 @@ class aggregation_pool { private: /// Required for dealing with adding elements to the deque of /// aggregated_executors - static inline aggregation_mutex_t pool_mutex; + recycler::aggregation_mutex_t pool_mutex; /// Global access instance - static inline aggregation_pool instance{}; + static std::unique_ptr& instance(void) { + static std::unique_ptr pool_instances{ + new aggregation_pool[recycler::max_number_gpus]}; + return pool_instances; + } + static inline size_t number_devices = 1; + static inline bool is_initialized = false; aggregation_pool() = default; public: diff --git a/include/aligned_buffer_util.hpp b/include/aligned_buffer_util.hpp index 456420bc..d36a994a 100644 --- a/include/aligned_buffer_util.hpp +++ b/include/aligned_buffer_util.hpp @@ -10,6 +10,14 @@ #include namespace recycler { +namespace device_selection { +template +struct select_device_functor< + T, boost::alignment::aligned_allocator> { + void operator()(const size_t device_id) {} +}; +} // namespace device_selection + template ::value, int> = 0> using recycle_aligned = detail::recycle_allocator< diff --git a/include/buffer_manager.hpp b/include/buffer_manager.hpp index ddbffb70..92a5f46b 100644 --- a/include/buffer_manager.hpp +++ b/include/buffer_manager.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 Gregor Daiß +// Copyright (c) 2020-2023 Gregor Daiß // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -23,7 +23,7 @@ #ifndef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS #pragma message \ "Warning: CPPuddle build with HPX support but without HPX-aware allocators enabled. \ -For better performance configure CPPuddle with the cmake option CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS=ON !" +For better performance configure CPPuddle with CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS=ON!" #else // include runtime to get HPX thread IDs required for the HPX-aware allocators #include @@ -37,37 +37,53 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_WITH_HP #ifdef CPPUDDLE_HAVE_COUNTERS #include +#if defined(CPPUDDLE_HAVE_HPX) +#include #endif +#endif + +#include "../include/detail/config.hpp" namespace recycler { -constexpr size_t number_instances = 128; + +namespace device_selection { +template struct select_device_functor { + void operator()(const size_t device_id) { + if constexpr (max_number_gpus > 1) + throw std::runtime_error( + "Allocators used in Multi-GPU builds need explicit Multi-GPU support " + "(by having a select_device_functor overload"); + } +}; +template struct select_device_functor> { + void operator()(const size_t device_id) {} +}; +} // namespace device_selection + namespace detail { -#if defined(CPPUDDLE_HAVE_HPX) && defined(CPPUDDLE_HAVE_HPX_MUTEX) -using mutex_t = hpx::spinlock; -#else -using mutex_t = std::mutex; -#endif class buffer_recycler { - // Public interface public: #if defined(CPPUDDLE_DEACTIVATE_BUFFER_RECYCLING) // Warn about suboptimal performance without recycling #pragma message \ "Warning: Building without buffer recycling! Use only for performance testing! \ -For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIVATE_BUFFER_RECYCLING=OFF !" +For better performance configure CPPuddle with CPPUDDLE_WITH_BUFFER_RECYCLING=ON!" template static T *get(size_t number_elements, bool manage_content_lifetime = false, - std::optional location_hint = std::nullopt) { + std::optional location_hint = std::nullopt, + std::optional device_id = std::nullopt) { + return Host_Allocator{}.allocate(number_elements); } /// Marks an buffer as unused and fit for reusage template static void mark_unused(T *p, size_t number_elements, - std::optional location_hint = std::nullopt) { + std::optional location_hint = std::nullopt, + std::optional device_id = std::nullopt) { return Host_Allocator{}.deallocate(p, number_elements); } #else @@ -75,17 +91,44 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV /// buffer template static T *get(size_t number_elements, bool manage_content_lifetime = false, - std::optional location_hint = std::nullopt) { - return buffer_manager::get(number_elements, - manage_content_lifetime, location_hint); + std::optional location_hint = std::nullopt, + std::optional device_id = std::nullopt) { + try { + return buffer_manager::get( + number_elements, manage_content_lifetime, location_hint, device_id); + } catch (const std::exception &exc) { + std::cerr << "ERROR: Encountered unhandled exception in cppuddle get: " << exc.what() << std::endl; + std::cerr << "Rethrowing exception... " << std::endl;; + throw; + } } /// Marks an buffer as unused and fit for reusage template static void mark_unused(T *p, size_t number_elements, - std::optional location_hint = std::nullopt) { - return buffer_manager::mark_unused(p, number_elements); + std::optional location_hint = std::nullopt, + std::optional device_id = std::nullopt) { + try { + return buffer_manager::mark_unused(p, number_elements, + location_hint, device_id); + } catch (const std::exception &exc) { + std::cerr << "ERROR: Encountered unhandled exception in cppuddle mark_unused: " << exc.what() << std::endl; + std::cerr << "Rethrowing exception... " << std::endl;; + throw; + } } #endif + template + static void register_allocator_counters_with_hpx(void) { +#ifdef CPPUDDLE_HAVE_COUNTERS + buffer_manager::register_counters_with_hpx(); +#else + std::cerr << "Warning: Trying to register allocator performance counters " + "with HPX but CPPuddle was built " + "without CPPUDDLE_WITH_COUNTERS -- operation will be ignored!" + << std::endl; +#endif + } + /// Deallocate all buffers, no matter whether they are marked as used or not static void clean_all() { std::lock_guard guard(instance().callback_protection_mut); @@ -111,6 +154,20 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV } } + static void print_performance_counters() { +#ifdef CPPUDDLE_HAVE_COUNTERS + std::lock_guard guard(instance().callback_protection_mut); + for (const auto &print_function : + instance().print_callbacks) { + print_function(); + } +#else + std::cerr << "Warning: Trying to print allocator performance counters but CPPuddle was built " + "without CPPUDDLE_WITH_COUNTERS -- operation will be ignored!" + << std::endl; +#endif + } + // Member variables and methods private: @@ -119,6 +176,8 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV static buffer_recycler singleton{}; return singleton; } + /// Callbacks for printing the performance counter data + std::list> print_callbacks; /// Callbacks for buffer_manager finalize - each callback completely destroys /// one buffer_manager std::list> finalize_callbacks; @@ -150,6 +209,12 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV std::lock_guard guard(instance().callback_protection_mut); instance().finalize_callbacks.push_back(func); } + /// Add a callback function that gets executed upon partial (unused memory) + /// cleanup + static void add_print_callback(const std::function &func) { + std::lock_guard guard(instance().callback_protection_mut); + instance().print_callbacks.push_back(func); + } public: ~buffer_recycler() = default; @@ -164,19 +229,27 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV // well using buffer_entry_type = std::tuple; + public: /// Cleanup and delete this singleton static void clean() { assert(instance() && !is_finalized); - for (auto i = 0; i < number_instances; i++) { + for (auto i = 0; i < number_instances * max_number_gpus; i++) { std::lock_guard guard(instance()[i].mut); instance()[i].clean_all_buffers(); } } + static void print_performance_counters() { + assert(instance() && !is_finalized); + for (auto i = 0; i < number_instances * max_number_gpus; i++) { + std::lock_guard guard(instance()[i].mut); + instance()[i].print_counters(); + } + } static void finalize() { assert(instance() && !is_finalized); is_finalized = true; - for (auto i = 0; i < number_instances; i++) { + for (auto i = 0; i < number_instances * max_number_gpus; i++) { std::lock_guard guard(instance()[i].mut); instance()[i].clean_all_buffers(); } @@ -185,7 +258,7 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV /// Cleanup all buffers not currently in use static void clean_unused_buffers_only() { assert(instance() && !is_finalized); - for (auto i = 0; i < number_instances; i++) { + for (auto i = 0; i < number_instances * max_number_gpus; i++) { std::lock_guard guard(instance()[i].mut); for (auto &buffer_tuple : instance()[i].unused_buffer_list) { Host_Allocator alloc; @@ -197,10 +270,85 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV instance()[i].unused_buffer_list.clear(); } } +#if defined(CPPUDDLE_HAVE_COUNTERS) && defined(CPPUDDLE_HAVE_HPX) + static size_t get_sum_number_recycling(bool reset) { + if (reset) + sum_number_recycling = 0; + return sum_number_recycling; + } + static size_t get_sum_number_allocation(bool reset) { + if (reset) + sum_number_allocation = 0; + return sum_number_allocation; + } + static size_t get_sum_number_creation(bool reset) { + if (reset) + sum_number_creation = 0; + return sum_number_creation; + } + static size_t get_sum_number_deallocation(bool reset) { + if (reset) + sum_number_deallocation = 0; + return sum_number_deallocation; + } + static size_t get_sum_number_wrong_hints(bool reset) { + if (reset) + sum_number_wrong_hints = 0; + return sum_number_wrong_hints; + } + static size_t get_sum_number_wrong_device_hints(bool reset) { + if (reset) + sum_number_wrong_hints = 0; + return sum_number_wrong_device_hints; + } + static size_t get_sum_number_bad_allocs(bool reset) { + if (reset) + sum_number_bad_allocs = 0; + return sum_number_bad_allocs; + } + + static void register_counters_with_hpx(void) { + std::string alloc_name = + boost::core::demangle(typeid(Host_Allocator).name()) + + std::string("_") + boost::core::demangle(typeid(T).name()); + hpx::performance_counters::install_counter_type( + std::string("/cppuddle/allocators/") + alloc_name + std::string("/number_recycling/"), + &get_sum_number_recycling, + "Number of allocations using a recycled buffer with this " + "allocator"); + hpx::performance_counters::install_counter_type( + std::string("/cppuddle/allocators/") + alloc_name + std::string("/number_allocations/"), + &get_sum_number_allocation, + "Number of allocations with this allocator"); + hpx::performance_counters::install_counter_type( + std::string("/cppuddle/allocators/") + alloc_name + std::string("/number_creations/"), + &get_sum_number_creation, + "Number of allocations not using a recycled buffer with this " + "allocator"); + hpx::performance_counters::install_counter_type( + std::string("/cppuddle/allocators/") + alloc_name + std::string("/number_deallocations/"), + &get_sum_number_deallocation, + "Number of deallocations yielding buffers to be recycled with this " + "allocator"); + hpx::performance_counters::install_counter_type( + std::string("/cppuddle/allocators/") + alloc_name + std::string("/number_wrong_hints/"), + &get_sum_number_wrong_hints, + "Number of wrong hints supplied to the dealloc method with this allocator"); + hpx::performance_counters::install_counter_type( + std::string("/cppuddle/allocators/") + alloc_name + std::string("/number_wrong_device_hints/"), + &get_sum_number_wrong_device_hints, + "Number of wrong device hints supplied to the dealloc method with this allocator"); + hpx::performance_counters::install_counter_type( + std::string("/cppuddle/allocators/") + alloc_name + std::string("/number_bad_allocs/"), + &get_sum_number_bad_allocs, + "Number of wrong bad allocs which triggered a cleanup of unused buffers"); + } +#endif /// Tries to recycle or create a buffer of type T and size number_elements. static T *get(size_t number_of_elements, bool manage_content_lifetime, - std::optional location_hint = std::nullopt) { + std::optional location_hint = std::nullopt, + std::optional gpu_device_id = std::nullopt) { init_callbacks_once(); if (is_finalized) { throw std::runtime_error("Tried allocation after finalization"); @@ -209,13 +357,28 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV size_t location_id = 0; if (location_hint) { - location_id = location_hint.value(); + location_id = *location_hint; } + if (location_id >= number_instances) { + throw std::runtime_error("Tried to create buffer with invalid location_id [get]"); + } + size_t device_id = 0; + if (gpu_device_id) { + device_id = *gpu_device_id; + } + if (device_id >= max_number_gpus) { + throw std::runtime_error("Tried to create buffer with invalid device id [get]! " + "Is multigpu support enabled with the correct number " + "of GPUs?"); + } + + location_id = location_id + device_id * number_instances; std::lock_guard guard(instance()[location_id].mut); #ifdef CPPUDDLE_HAVE_COUNTERS instance()[location_id].number_allocation++; + sum_number_allocation++; #endif // Check for unused buffers we can recycle: for (auto iter = instance()[location_id].unused_buffer_list.begin(); @@ -237,6 +400,7 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV instance()[location_id].buffer_map.insert({std::get<0>(tuple), tuple}); #ifdef CPPUDDLE_HAVE_COUNTERS instance()[location_id].number_recycling++; + sum_number_recycling++; #endif return std::get<0>(tuple); } @@ -244,6 +408,8 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV // No unused buffer found -> Create new one and return it try { + recycler::device_selection::select_device_functor{}( + device_id); Host_Allocator alloc; T *buffer = alloc.allocate(number_of_elements); instance()[location_id].buffer_map.insert( @@ -251,6 +417,7 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV manage_content_lifetime)}); #ifdef CPPUDDLE_HAVE_COUNTERS instance()[location_id].number_creation++; + sum_number_creation++; #endif if (manage_content_lifetime) { std::uninitialized_value_construct_n(buffer, number_of_elements); @@ -258,20 +425,26 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV return buffer; } catch (std::bad_alloc &e) { // not enough memory left! Cleanup and attempt again: - std::cerr << "Not enough memory left. Cleaning up unused buffers now..." << std::endl; + std::cerr + << "Not enough memory left. Cleaning up unused buffers now..." + << std::endl; buffer_recycler::clean_unused_buffers(); std::cerr << "Buffers cleaned! Try allocation again..." << std::endl; // If there still isn't enough memory left, the caller has to handle it // We've done all we can in here Host_Allocator alloc; + recycler::device_selection::select_device_functor{}( + device_id); T *buffer = alloc.allocate(number_of_elements); instance()[location_id].buffer_map.insert( {buffer, std::make_tuple(buffer, number_of_elements, 1, manage_content_lifetime)}); #ifdef CPPUDDLE_HAVE_COUNTERS instance()[location_id].number_creation++; + sum_number_creation++; instance()[location_id].number_bad_alloc++; + sum_number_bad_allocs++; #endif std::cerr << "Second attempt allocation successful!" << std::endl; if (manage_content_lifetime) { @@ -282,18 +455,38 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV } static void mark_unused(T *memory_location, size_t number_of_elements, - std::optional location_hint = std::nullopt) { + std::optional location_hint = std::nullopt, + std::optional device_hint = std::nullopt) { if (is_finalized) return; assert(instance() && !is_finalized); + size_t location_id = 0; + if (location_hint) { + location_id = *location_hint; + if (location_id >= number_instances) { + throw std::runtime_error( + "Buffer recylcer received invalid location hint [mark_unused]"); + } + } + size_t device_id = 0; + if (device_hint) { + device_id = *device_hint; + if (device_id >= max_number_gpus) { + throw std::runtime_error( + "Buffer recylcer received invalid devce hint [mark_unused]"); + } + } + + // Attempt 1 to find the correct bucket/location: Look at provided hint: if (location_hint) { - size_t location_id = location_hint.value(); + size_t location_id = location_hint.value() + device_id * number_instances; std::lock_guard guard(instance()[location_id].mut); if (instance()[location_id].buffer_map.find(memory_location) != instance()[location_id].buffer_map.end()) { #ifdef CPPUDDLE_HAVE_COUNTERS - instance()[location_id].number_dealloacation++; + instance()[location_id].number_deallocation++; + sum_number_deallocation++; #endif auto it = instance()[location_id].buffer_map.find(memory_location); assert(it != instance()[location_id].buffer_map.end()); @@ -305,24 +498,27 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV instance()[location_id].buffer_map.erase(memory_location); return; // Success } - // hint was wrong - note that, and continue on with all other buffer - // managers + // hint was wrong #ifdef CPPUDDLE_HAVE_COUNTERS instance()[location_id].number_wrong_hints++; + sum_number_wrong_hints++; #endif } - - for(size_t location_id = 0; location_id < number_instances; location_id++) { + // Failed to find buffer in the specified localtion/device! + // Attempt 2 - Look for buffer other locations on the same device... + for (size_t location_id = device_id * number_instances; + location_id < (device_id + 1) * number_instances; location_id++) { if (location_hint) { - if (location_hint.value() == location_id) { - continue; // already tried this -> skip - } + if (*location_hint + device_id * max_number_gpus == location_id) { + continue; // already tried this -> skip + } } std::lock_guard guard(instance()[location_id].mut); if (instance()[location_id].buffer_map.find(memory_location) != instance()[location_id].buffer_map.end()) { #ifdef CPPUDDLE_HAVE_COUNTERS - instance()[location_id].number_dealloacation++; + instance()[location_id].number_deallocation++; + sum_number_deallocation++; #endif auto it = instance()[location_id].buffer_map.find(memory_location); assert(it != instance()[location_id].buffer_map.end()); @@ -335,6 +531,70 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV return; // Success } } + // device hint was wrong +#ifdef CPPUDDLE_HAVE_COUNTERS + if (device_hint) { + sum_number_wrong_device_hints++; + } +#endif + // Failed to find buffer on the specified device! + // Attempt 3 - Look for buffer on other devices... + for (size_t local_device_id = 0; local_device_id < max_number_gpus; + local_device_id++) { + if (local_device_id == device_id) + continue; // aldready tried this device + + // Try hint localtion first yet again (though on different device) + if (location_hint) { + size_t location_id = location_hint.value() + local_device_id * number_instances; + std::lock_guard guard(instance()[location_id].mut); + if (instance()[location_id].buffer_map.find(memory_location) != + instance()[location_id].buffer_map.end()) { +#ifdef CPPUDDLE_HAVE_COUNTERS + instance()[location_id].number_deallocation++; + sum_number_deallocation++; +#endif + auto it = instance()[location_id].buffer_map.find(memory_location); + assert(it != instance()[location_id].buffer_map.end()); + auto &tuple = it->second; + // sanity checks: + assert(std::get<1>(tuple) == number_of_elements); + // move to the unused_buffer list + instance()[location_id].unused_buffer_list.push_front(tuple); + instance()[location_id].buffer_map.erase(memory_location); + return; // Success + } + } + // Failed - check all other localtions on device + for (size_t location_id = local_device_id * number_instances; + location_id < (local_device_id + 1) * number_instances; location_id++) { + if (location_hint) { + if (*location_hint + local_device_id * max_number_gpus == location_id) { + continue; // already tried this -> skip + } + } + std::lock_guard guard(instance()[location_id].mut); + if (instance()[location_id].buffer_map.find(memory_location) != + instance()[location_id].buffer_map.end()) { +#ifdef CPPUDDLE_HAVE_COUNTERS + instance()[location_id].number_deallocation++; + sum_number_deallocation++; +#endif + auto it = instance()[location_id].buffer_map.find(memory_location); + assert(it != instance()[location_id].buffer_map.end()); + auto &tuple = it->second; + // sanity checks: + assert(std::get<1>(tuple) == number_of_elements); + // move to the unused_buffer list + instance()[location_id].unused_buffer_list.push_front(tuple); + instance()[location_id].buffer_map.erase(memory_location); + return; // Success + } + } + } + // Buffer that is to be deleted is nowhere to be found - we looked everywhere! + // => + // Failure! Handle here... // TODO Throw exception instead in the futures, as soon as the recycler finalize is // in all user codes @@ -348,7 +608,7 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV << "Warning! Tried to delete non-existing buffer within CPPuddle!" << std::endl; std::cerr << "Did you forget to call recycler::finalize?" << std::endl; - } + } private: /// List with all buffers still in usage @@ -359,20 +619,24 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV mutex_t mut; #ifdef CPPUDDLE_HAVE_COUNTERS /// Performance counters - size_t number_allocation{0}, number_dealloacation{0}, number_wrong_hints{0}; - size_t number_recycling{0}, number_creation{0}, number_bad_alloc{0}; + size_t number_allocation{0}, number_deallocation{0}, number_wrong_hints{0}, + number_recycling{0}, number_creation{0}, number_bad_alloc{0}; + + static inline std::atomic sum_number_allocation{0}, + sum_number_deallocation{0}, sum_number_wrong_hints{0}, + sum_number_wrong_device_hints{0}, sum_number_recycling{0}, + sum_number_creation{0}, sum_number_bad_allocs{0}; #endif - /// default, private constructor - not automatically constructed due to the - /// deleted constructors - buffer_manager() = default; + /// default, private constructor - not automatically constructed due to + /// the deleted constructors + buffer_manager() = default; buffer_manager& operator=(buffer_manager const &other) = default; buffer_manager& operator=(buffer_manager &&other) = delete; static std::unique_ptr& instance(void) { - /* static std::array instances{{}}; */ static std::unique_ptr instances{ - new buffer_manager[number_instances]}; + new buffer_manager[number_instances * max_number_gpus]}; return instances; } static void init_callbacks_once(void) { @@ -390,35 +654,18 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV clean_unused_buffers_only); buffer_recycler::add_finalize_callback( finalize); +#ifdef CPPUDDLE_HAVE_COUNTERS + buffer_recycler::add_print_callback( + print_performance_counters); +#endif }); } static inline std::atomic is_finalized; - - void clean_all_buffers(void) { #ifdef CPPUDDLE_HAVE_COUNTERS - if (number_allocation == 0 && number_recycling == 0 && - number_bad_alloc == 0 && number_creation == 0 && - unused_buffer_list.empty() && buffer_map.empty()) { + void print_counters(void) { + if (number_allocation == 0) return; - } -#endif - for (auto &buffer_tuple : unused_buffer_list) { - Host_Allocator alloc; - if (std::get<3>(buffer_tuple)) { - std::destroy_n(std::get<0>(buffer_tuple), std::get<1>(buffer_tuple)); - } - alloc.deallocate(std::get<0>(buffer_tuple), std::get<1>(buffer_tuple)); - } - for (auto &map_tuple : buffer_map) { - auto buffer_tuple = map_tuple.second; - Host_Allocator alloc; - if (std::get<3>(buffer_tuple)) { - std::destroy_n(std::get<0>(buffer_tuple), std::get<1>(buffer_tuple)); - } - alloc.deallocate(std::get<0>(buffer_tuple), std::get<1>(buffer_tuple)); - } -#ifdef CPPUDDLE_HAVE_COUNTERS // Print performance counters size_t number_cleaned = unused_buffer_list.size() + buffer_map.size(); std::cout << "\nBuffer manager destructor for (Alloc: " @@ -453,7 +700,32 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV << static_cast(number_recycling) / number_allocation * 100.0f << "%" << std::endl; + } +#endif + + void clean_all_buffers(void) { +#ifdef CPPUDDLE_HAVE_COUNTERS + if (number_allocation == 0 && number_recycling == 0 && + number_bad_alloc == 0 && number_creation == 0 && + unused_buffer_list.empty() && buffer_map.empty()) { + return; + } #endif + for (auto &buffer_tuple : unused_buffer_list) { + Host_Allocator alloc; + if (std::get<3>(buffer_tuple)) { + std::destroy_n(std::get<0>(buffer_tuple), std::get<1>(buffer_tuple)); + } + alloc.deallocate(std::get<0>(buffer_tuple), std::get<1>(buffer_tuple)); + } + for (auto &map_tuple : buffer_map) { + auto buffer_tuple = map_tuple.second; + Host_Allocator alloc; + if (std::get<3>(buffer_tuple)) { + std::destroy_n(std::get<0>(buffer_tuple), std::get<1>(buffer_tuple)); + } + alloc.deallocate(std::get<0>(buffer_tuple), std::get<1>(buffer_tuple)); + } unused_buffer_list.clear(); buffer_map.clear(); #ifdef CPPUDDLE_HAVE_COUNTERS @@ -489,16 +761,19 @@ For better performance configure CPPuddle with the cmake option CPPUDDLE_DEACTIV template struct recycle_allocator { using value_type = T; + using underlying_allocator_type = Host_Allocator; + static_assert(std::is_same_v); const std::optional dealloc_hint; + const std::optional device_id; #ifndef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS recycle_allocator() noexcept - : dealloc_hint(std::nullopt) {} + : dealloc_hint(std::nullopt), device_id(std::nullopt) {} explicit recycle_allocator(size_t hint) noexcept - : dealloc_hint(std::nullopt) {} + : dealloc_hint(std::nullopt), device_id(std::nullopt) {} explicit recycle_allocator( recycle_allocator const &other) noexcept - : dealloc_hint(std::nullopt) {} + : dealloc_hint(std::nullopt), device_id(std::nullopt) {} T *allocate(std::size_t n) { T *data = buffer_recycler::get(n); return data; @@ -508,19 +783,22 @@ template struct recycle_allocator { } #else recycle_allocator() noexcept - : dealloc_hint(hpx::get_worker_thread_num()) {} - explicit recycle_allocator(size_t hint) noexcept - : dealloc_hint(hint) {} + : dealloc_hint(hpx::get_worker_thread_num() % number_instances), device_id(0) {} + explicit recycle_allocator(const size_t device_id) noexcept + : dealloc_hint(hpx::get_worker_thread_num() % number_instances), device_id(device_id) {} + explicit recycle_allocator(const size_t device_i, const size_t location_id) noexcept + : dealloc_hint(location_id), device_id(device_id) {} explicit recycle_allocator( recycle_allocator const &other) noexcept - : dealloc_hint(other.dealloc_hint) {} + : dealloc_hint(other.dealloc_hint), device_id(other.device_id) {} T *allocate(std::size_t n) { T *data = buffer_recycler::get( - n, false, hpx::get_worker_thread_num()); + n, false, hpx::get_worker_thread_num() % number_instances, device_id); return data; } void deallocate(T *p, std::size_t n) { - buffer_recycler::mark_unused(p, n, dealloc_hint); + buffer_recycler::mark_unused(p, n, dealloc_hint, + device_id); } #endif @@ -553,16 +831,19 @@ operator!=(recycle_allocator const &, template struct aggressive_recycle_allocator { using value_type = T; - std::optional dealloc_hint; + using underlying_allocator_type = Host_Allocator; + static_assert(std::is_same_v); + const std::optional dealloc_hint; + const std::optional device_id; #ifndef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS aggressive_recycle_allocator() noexcept - : dealloc_hint(std::nullopt) {} + : dealloc_hint(std::nullopt), device_id(std::nullopt) {} explicit aggressive_recycle_allocator(size_t hint) noexcept - : dealloc_hint(std::nullopt) {} + : dealloc_hint(std::nullopt), device_id(std::nullopt) {} explicit aggressive_recycle_allocator( aggressive_recycle_allocator const &) noexcept - : dealloc_hint(std::nullopt) {} + : dealloc_hint(std::nullopt), device_id(std::nullopt) {} T *allocate(std::size_t n) { T *data = buffer_recycler::get( n, true); // also initializes the buffer if it isn't reused @@ -573,20 +854,23 @@ struct aggressive_recycle_allocator { } #else aggressive_recycle_allocator() noexcept - : dealloc_hint(hpx::get_worker_thread_num()) {} - explicit aggressive_recycle_allocator(size_t hint) noexcept - : dealloc_hint(hint) {} + : dealloc_hint(hpx::get_worker_thread_num() % number_instances), device_id(0) {} + explicit aggressive_recycle_allocator(const size_t device_id) noexcept + : dealloc_hint(hpx::get_worker_thread_num() % number_instances), device_id(device_id) {} + explicit aggressive_recycle_allocator(const size_t device_id, const size_t location_id) noexcept + : dealloc_hint(location_id), device_id(device_id) {} explicit aggressive_recycle_allocator( recycle_allocator const &other) noexcept - : dealloc_hint(other.dealloc_hint) {} + : dealloc_hint(other.dealloc_hint), device_id(other.device_id) {} T *allocate(std::size_t n) { T *data = buffer_recycler::get( - n, true, hpx::get_worker_thread_num()); // also initializes the buffer + n, true, dealloc_hint, device_id); // also initializes the buffer // if it isn't reused return data; } void deallocate(T *p, std::size_t n) { - buffer_recycler::mark_unused(p, n, dealloc_hint); + buffer_recycler::mark_unused(p, n, dealloc_hint, + device_id); } #endif @@ -603,7 +887,7 @@ struct aggressive_recycle_allocator { // Warn about suboptimal performance without recycling #pragma message \ "Warning: Building without content reusage for aggressive allocators! \ -For better performance configure with the cmake option CPPUDDLE_DEACTIVATE_AGGRESSIVE_ALLOCATORS=OFF !" +For better performance configure with CPPUDDLE_WITH_AGGRESSIVE_CONTENT_RECYCLING=ON !" template inline void construct(T *p, Args... args) noexcept { ::new (static_cast(p)) T(std::forward(args)...); @@ -639,6 +923,7 @@ template ::value, int> = 0> using aggressive_recycle_std = detail::aggressive_recycle_allocator>; +inline void print_performance_counters() { detail::buffer_recycler::print_performance_counters(); } /// Deletes all buffers (even ones still marked as used), delete the buffer /// managers and the recycler itself inline void force_cleanup() { detail::buffer_recycler::clean_all(); } diff --git a/include/cuda_buffer_util.hpp b/include/cuda_buffer_util.hpp index d2d0f596..55d3397a 100644 --- a/include/cuda_buffer_util.hpp +++ b/include/cuda_buffer_util.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 Gregor Daiß +// Copyright (c) 2020-2023 Gregor Daiß // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -7,6 +7,7 @@ #define CUDA_BUFFER_UTIL_HPP #include "buffer_manager.hpp" +#include "detail/config.hpp" #include #include @@ -16,6 +17,8 @@ namespace recycler { namespace detail { + + template struct cuda_pinned_allocator { using value_type = T; cuda_pinned_allocator() noexcept = default; @@ -45,6 +48,7 @@ template struct cuda_pinned_allocator { } } }; + template constexpr bool operator==(cuda_pinned_allocator const &, cuda_pinned_allocator const &) noexcept { @@ -95,6 +99,7 @@ constexpr bool operator!=(cuda_device_allocator const &, return false; } + } // end namespace detail template ::value, int> = 0> @@ -106,39 +111,18 @@ using recycle_allocator_cuda_device = template ::value, int> = 0> struct cuda_device_buffer { - size_t gpu_id{0}; + recycle_allocator_cuda_device allocator; T *device_side_buffer; size_t number_of_elements; - explicit cuda_device_buffer(size_t number_of_elements) - : number_of_elements(number_of_elements) { - device_side_buffer = - recycle_allocator_cuda_device{}.allocate(number_of_elements); - } - explicit cuda_device_buffer(size_t number_of_elements, size_t gpu_id) - : gpu_id(gpu_id), number_of_elements(number_of_elements), set_id(true) { -#if defined(CPPUDDLE_HAVE_MULTIGPU) - cudaSetDevice(gpu_id); -#else - // TODO It would be better to have separate method for this but it would change the interface - // This will have to do for some testing. If it's worth it, add separate method without cudaSetDevice - // Allows for testing without any changes to other projects - assert(gpu_id == 0); -#endif + + cuda_device_buffer(const size_t number_of_elements, const size_t device_id = 0) + : allocator{device_id}, number_of_elements(number_of_elements) { + assert(device_id < max_number_gpus); device_side_buffer = - recycle_allocator_cuda_device{}.allocate(number_of_elements); + allocator.allocate(number_of_elements); } ~cuda_device_buffer() { -#if defined(CPPUDDLE_HAVE_MULTIGPU) - if (set_id) - cudaSetDevice(gpu_id); -#else - // TODO It would be better to have separate method for this but it would change the interface - // This will have to do for some testing. If it's worth it, add separate method without cudaSetDevice - // Allows for testing without any changes to other projects - assert(gpu_id == 0); -#endif - recycle_allocator_cuda_device{}.deallocate(device_side_buffer, - number_of_elements); + allocator.deallocate(device_side_buffer, number_of_elements); } // not yet implemented cuda_device_buffer(cuda_device_buffer const &other) = delete; @@ -146,45 +130,19 @@ struct cuda_device_buffer { cuda_device_buffer(cuda_device_buffer const &&other) = delete; cuda_device_buffer operator=(cuda_device_buffer const &&other) = delete; -private: - bool set_id{false}; }; template ::value, int> = 0> struct cuda_aggregated_device_buffer { - size_t gpu_id{0}; T *device_side_buffer; size_t number_of_elements; - explicit cuda_aggregated_device_buffer(size_t number_of_elements) - : number_of_elements(number_of_elements) { - device_side_buffer = - recycle_allocator_cuda_device{}.allocate(number_of_elements); - } - explicit cuda_aggregated_device_buffer(size_t number_of_elements, size_t gpu_id, Host_Allocator &alloc) - : gpu_id(gpu_id), number_of_elements(number_of_elements), set_id(true), alloc(alloc) { -#if defined(CPPUDDLE_HAVE_MULTIGPU) - cudaSetDevice(gpu_id); -#else - // TODO It would be better to have separate method for this but it would change the interface - // This will have to do for some testing. If it's worth it, add separate method without cudaSetDevice - // Allows for testing without any changes to other projects - assert(gpu_id == 0); -#endif + cuda_aggregated_device_buffer(size_t number_of_elements, Host_Allocator &alloc) + : number_of_elements(number_of_elements), alloc(alloc) { device_side_buffer = alloc.allocate(number_of_elements); } ~cuda_aggregated_device_buffer() { -#if defined(CPPUDDLE_HAVE_MULTIGPU) - if (set_id) - cudaSetDevice(gpu_id); -#else - // TODO It would be better to have separate method for this but it would change the interface - // This will have to do for some testing. If it's worth it, add separate method without cudaSetDevice - // Allows for testing without any changes to other projects - assert(gpu_id == 0); -#endif - alloc.deallocate(device_side_buffer, - number_of_elements); + alloc.deallocate(device_side_buffer, number_of_elements); } // not yet implemented cuda_aggregated_device_buffer(cuda_aggregated_device_buffer const &other) = delete; @@ -193,9 +151,20 @@ struct cuda_aggregated_device_buffer { cuda_aggregated_device_buffer operator=(cuda_aggregated_device_buffer const &&other) = delete; private: - bool set_id{false}; - Host_Allocator &alloc; + Host_Allocator &alloc; // will stay valid for the entire aggregation region and hence + // for the entire lifetime of this buffer +}; + +namespace device_selection { +template +struct select_device_functor> { + void operator()(const size_t device_id) { cudaSetDevice(device_id); } +}; +template +struct select_device_functor> { + void operator()(const size_t device_id) { cudaSetDevice(device_id); } }; +} // namespace device_selection } // end namespace recycler #endif diff --git a/include/detail/config.hpp b/include/detail/config.hpp new file mode 100644 index 00000000..1764ffff --- /dev/null +++ b/include/detail/config.hpp @@ -0,0 +1,74 @@ +// Copyright (c) 2023-2023 Gregor Daiß +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#ifndef CPPUDDLE_CONFIG_HPP +#define CPPUDDLE_CONFIG_HPP + + +// Mutex configuration +// +#if defined(CPPUDDLE_HAVE_HPX) && defined(CPPUDDLE_HAVE_HPX_MUTEX) +#include +#else +#include +#endif + +// HPX-aware configuration +// +#ifdef CPPUDDLE_HAVE_HPX +#ifndef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS +#pragma message \ +"Warning: CPPuddle build with HPX support but without HPX-aware allocators enabled. \ +For better performance configure CPPuddle with CPPUDDLE_WITH_HPX_AWARE_ALLOCATORS=ON!" +#else +// include runtime to get HPX thread IDs required for the HPX-aware allocators +#include +#endif +#endif + +namespace recycler { + +#if defined(CPPUDDLE_HAVE_HPX) && defined(CPPUDDLE_HAVE_HPX_MUTEX) +using mutex_t = hpx::spinlock_no_backoff; +using aggregation_mutex_t = hpx::mutex; +#else +using mutex_t = std::mutex; +using aggregation_mutex_t = std::mutex; +#endif + +// Recycling configuration +// TODO Add warnings here + +// Aggressive recycling configuration +// TODO Add warning here + +// Aggregation Debug configuration +// TODO Add warning here + +// Thread and MultiGPU configuration +// +constexpr size_t number_instances = CPPUDDLE_HAVE_NUMBER_BUCKETS; +static_assert(number_instances >= 1); +constexpr size_t max_number_gpus = CPPUDDLE_HAVE_MAX_NUMBER_GPUS; +#ifndef CPPUDDLE_HAVE_HPX +static_assert(max_number_gpus == 1, "Non HPX builds do not support multigpu"); +#endif +//static_assert(number_instances >= max_number_gpus); +static_assert(max_number_gpus > 0); +//constexpr size_t instances_per_gpu = number_instances / max_number_gpus; + +/// Uses HPX thread information to determine which GPU should be used +inline size_t get_device_id(const size_t number_gpus) { +#if defined(CPPUDDLE_HAVE_HPX) + assert(number_gpus <= max_number_gpus); + return hpx::get_worker_thread_num() % number_gpus; +#else + return 0; +#endif +} + +} // end namespace recycler + +#endif diff --git a/include/hip_buffer_util.hpp b/include/hip_buffer_util.hpp index 5a4209c1..e2364095 100644 --- a/include/hip_buffer_util.hpp +++ b/include/hip_buffer_util.hpp @@ -113,28 +113,18 @@ using recycle_allocator_hip_device = // TODO Is this even required? (cuda version should work fine...) template ::value, int> = 0> struct hip_device_buffer { - size_t gpu_id{0}; + recycle_allocator_hip_device allocator; T *device_side_buffer; size_t number_of_elements; - explicit hip_device_buffer(size_t number_of_elements) - : number_of_elements(number_of_elements) { - device_side_buffer = - recycle_allocator_hip_device{}.allocate(number_of_elements); - } - explicit hip_device_buffer(size_t number_of_elements, size_t gpu_id) - : gpu_id(gpu_id), number_of_elements(number_of_elements), set_id(true) { - // TODO Fix Multi GPU support - // hipSetDevice(gpu_id); + hip_device_buffer(size_t number_of_elements, size_t device_id) + : allocator{device_id}, number_of_elements(number_of_elements) { + assert(device_id < max_number_gpus); device_side_buffer = - recycle_allocator_hip_device{}.allocate(number_of_elements); + allocator.allocate(number_of_elements); } ~hip_device_buffer() { - // TODO Fix Multi GPU support - // if (set_id) - // hipSetDevice(gpu_id); - recycle_allocator_hip_device{}.deallocate(device_side_buffer, - number_of_elements); + allocator.deallocate(device_side_buffer, number_of_elements); } // not yet implemented hip_device_buffer(hip_device_buffer const &other) = delete; @@ -142,45 +132,19 @@ struct hip_device_buffer { hip_device_buffer(hip_device_buffer const &&other) = delete; hip_device_buffer operator=(hip_device_buffer const &&other) = delete; -private: - bool set_id{false}; }; template ::value, int> = 0> struct hip_aggregated_device_buffer { - size_t gpu_id{0}; T *device_side_buffer; size_t number_of_elements; - explicit hip_aggregated_device_buffer(size_t number_of_elements) - : number_of_elements(number_of_elements) { - device_side_buffer = - recycle_allocator_hip_device{}.allocate(number_of_elements); - } - explicit hip_aggregated_device_buffer(size_t number_of_elements, size_t gpu_id, Host_Allocator &alloc) - : gpu_id(gpu_id), number_of_elements(number_of_elements), set_id(true), alloc(alloc) { -#if defined(CPPUDDLE_HAVE_MULTIGPU) - hipSetDevice(gpu_id); -#else - // TODO It would be better to have separate method for this but it would change the interface - // This will have to do for some testing. If it's worth it, add separate method without hipSetDevice - // Allows for testing without any changes to other projects - assert(gpu_id == 0); -#endif + hip_aggregated_device_buffer(size_t number_of_elements, Host_Allocator &alloc) + : number_of_elements(number_of_elements), alloc(alloc) { device_side_buffer = alloc.allocate(number_of_elements); } ~hip_aggregated_device_buffer() { -#if defined(CPPUDDLE_HAVE_MULTIGPU) - if (set_id) - hipSetDevice(gpu_id); -#else - // TODO It would be better to have separate method for this but it would change the interface - // This will have to do for some testing. If it's worth it, add separate method without hipSetDevice - // Allows for testing without any changes to other projects - assert(gpu_id == 0); -#endif - alloc.deallocate(device_side_buffer, - number_of_elements); + alloc.deallocate(device_side_buffer, number_of_elements); } // not yet implemented hip_aggregated_device_buffer(hip_aggregated_device_buffer const &other) = delete; @@ -189,9 +153,20 @@ struct hip_aggregated_device_buffer { hip_aggregated_device_buffer operator=(hip_aggregated_device_buffer const &&other) = delete; private: - bool set_id{false}; - Host_Allocator &alloc; + Host_Allocator &alloc; // will stay valid for the entire aggregation region and hence + // for the entire lifetime of this buffer +}; + +namespace device_selection { +template +struct select_device_functor> { + void operator()(const size_t device_id) { hipSetDevice(device_id); } +}; +template +struct select_device_functor> { + void operator()(const size_t device_id) { hipSetDevice(device_id); } }; +} // namespace device_selection } // end namespace recycler #endif diff --git a/include/kokkos_buffer_util.hpp b/include/kokkos_buffer_util.hpp index f78f6bbb..2945b422 100644 --- a/include/kokkos_buffer_util.hpp +++ b/include/kokkos_buffer_util.hpp @@ -7,6 +7,8 @@ #define KOKKOS_BUFFER_UTIL_HPP #include #include +#include +#include namespace recycler { @@ -14,7 +16,7 @@ template struct view_deleter { alloc_type allocator; size_t total_elements; - view_deleter(alloc_type &alloc, size_t total_elements) : allocator(alloc), + view_deleter(alloc_type alloc, size_t total_elements) : allocator(alloc), total_elements(total_elements) {} void operator()(element_type* p) { allocator.deallocate(p, total_elements); @@ -27,6 +29,7 @@ class aggregated_recycled_view : public kokkos_type { alloc_type allocator; size_t total_elements{0}; std::shared_ptr data_ref_counter; + static_assert(std::is_same_v); public: using view_type = kokkos_type; @@ -77,25 +80,52 @@ class aggregated_recycled_view : public kokkos_type { ~aggregated_recycled_view() {} }; + template class recycled_view : public kokkos_type { private: - static alloc_type allocator; size_t total_elements{0}; std::shared_ptr data_ref_counter; public: using view_type = kokkos_type; - template - explicit recycled_view(Args... args) + static_assert(std::is_same_v); + template = true> + recycled_view(Args... args) + : kokkos_type( + alloc_type{}.allocate(kokkos_type::required_allocation_size(args...) / + sizeof(element_type)), + args...), + total_elements(kokkos_type::required_allocation_size(args...) / + sizeof(element_type)), + data_ref_counter(this->data(), view_deleter( + alloc_type{}, total_elements)) {} + + template = true> + recycled_view(const size_t device_id, Args... args) : kokkos_type( - allocator.allocate(kokkos_type::required_allocation_size(args...) / + alloc_type{device_id}.allocate(kokkos_type::required_allocation_size(args...) / sizeof(element_type)), args...), total_elements(kokkos_type::required_allocation_size(args...) / sizeof(element_type)), data_ref_counter(this->data(), view_deleter( - allocator, total_elements)) {} + alloc_type{device_id}, total_elements)) {} + + template < + typename layout_t, + std::enable_if_t::value, bool> = true> + recycled_view(std::size_t device_id, layout_t layout) + : kokkos_type( + alloc_type{device_id}.allocate(kokkos_type::required_allocation_size(layout) / + sizeof(element_type)), + layout), + total_elements(kokkos_type::required_allocation_size(layout) / + sizeof(element_type)), + data_ref_counter(this->data(), view_deleter( + alloc_type{device_id}, total_elements)) {} recycled_view( const recycled_view &other) @@ -110,7 +140,6 @@ class recycled_view : public kokkos_type { data_ref_counter = other.data_ref_counter; kokkos_type::operator=(other); total_elements = other.total_elements; - allocator.increase_usage_counter(other.data(), other.total_elements); return *this; } @@ -132,8 +161,6 @@ class recycled_view : public kokkos_type { ~recycled_view() { } }; -template -alloc_type recycled_view::allocator; } // end namespace recycler diff --git a/include/stream_manager.hpp b/include/stream_manager.hpp index 1d48bcd6..40631491 100644 --- a/include/stream_manager.hpp +++ b/include/stream_manager.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 Gregor Daiß +// Copyright (c) 2020-2023 Gregor Daiß // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -7,28 +7,25 @@ #define STREAM_MANAGER_HPP #include +#include #include #include #include #include #include #include +#include #include -#if defined(CPPUDDLE_HAVE_HPX) && defined(CPPUDDLE_HAVE_HPX_MUTEX) -// For builds with The HPX mutex -#include -#endif - -#if defined(CPPUDDLE_HAVE_HPX) && defined(CPPUDDLE_HAVE_HPX_MUTEX) -using mutex_t = hpx::spinlock; -#else -using mutex_t = std::mutex; -#endif +#include "../include/detail/config.hpp" -//#include -// #include -// #include +/// Turns a std::array_mutex into an scoped lock +template +auto make_scoped_lock_from_array(mutex_array_t& mutexes) +{ + return std::apply([](auto&... mutexes) { return std::scoped_lock{mutexes...}; }, + mutexes); +} template class round_robin_pool { private: @@ -38,15 +35,16 @@ template class round_robin_pool { public: template - explicit round_robin_pool(size_t number_of_streams, Ts &&... executor_args) { + round_robin_pool(size_t number_of_streams, Ts... executor_args) { ref_counters.reserve(number_of_streams); for (int i = 0; i < number_of_streams; i++) { - pool.emplace_back(std::forward(executor_args)...); + pool.emplace_back(executor_args...); ref_counters.emplace_back(0); } } // return a tuple with the interface and its index (to release it later) std::tuple get_interface() { + assert(!(pool.empty())); size_t last_interface = current_interface; current_interface = (current_interface + 1) % pool.size(); ref_counters[last_interface]++; @@ -62,9 +60,10 @@ template class round_robin_pool { return *( std::min_element(std::begin(ref_counters), std::end(ref_counters))); } - size_t get_next_device_id() { - return 0; // single gpu pool - } + // TODO Remove + /* size_t get_next_device_id() { */ + /* return 0; // single gpu pool */ + /* } */ }; template class priority_pool { @@ -74,11 +73,11 @@ template class priority_pool { std::vector priorities{}; // Ref counters public: template - explicit priority_pool(size_t number_of_streams, Ts &&... executor_args) { + priority_pool(size_t number_of_streams, Ts... executor_args) { ref_counters.reserve(number_of_streams); priorities.reserve(number_of_streams); for (auto i = 0; i < number_of_streams; i++) { - pool.emplace_back(std::forward(executor_args)...); + pool.emplace_back(executor_args...); ref_counters.emplace_back(0); priorities.emplace_back(i); } @@ -105,150 +104,66 @@ template class priority_pool { return ref_counters[priorities[0]] < load_limit; } size_t get_current_load() { return ref_counters[priorities[0]]; } - size_t get_next_device_id() { - return 0; // single gpu pool - } -}; - -template class multi_gpu_round_robin_pool { -private: - using gpu_entry = std::tuple; // interface, ref counter - std::deque pool{}; - size_t current_interface{0}; - size_t streams_per_gpu{0}; - -public: - template - multi_gpu_round_robin_pool(size_t number_of_streams, int number_of_gpus, - Ts &&... executor_args) - : streams_per_gpu{number_of_streams} { - for (auto gpu_id = 0; gpu_id < number_of_gpus; gpu_id++) { - pool.push_back(std::make_tuple( - Pool(number_of_streams, gpu_id, std::forward(executor_args)...), - 0)); - } - } - - // return a tuple with the interface and its index (to release it later) - std::tuple get_interface() { - size_t last_interface = current_interface; - current_interface = (current_interface + 1) % pool.size(); - std::get<1>(pool[last_interface])++; - size_t gpu_offset = last_interface * streams_per_gpu; - std::tuple stream_entry = - std::get<0>(pool[last_interface]).get_interface(); - std::get<1>(stream_entry) += gpu_offset; - return stream_entry; - } - void release_interface(size_t index) { - size_t gpu_index = index / streams_per_gpu; - size_t stream_index = index % streams_per_gpu; - std::get<1>(pool[gpu_index])--; - std::get<0>(pool[gpu_index]).release_interface(stream_index); - } - bool interface_available(size_t load_limit) { - auto ¤t_min_gpu = std::get<0>(*(std::min_element( - std::begin(pool), std::end(pool), - [](const gpu_entry &first, const gpu_entry &second) -> bool { - return std::get<1>(first) < std::get<1>(second); - }))); - return current_min_gpu.interface_available(load_limit); - } - size_t get_current_load() { - auto ¤t_min_gpu = std::get<0>(*(std::min_element( - std::begin(pool), std::end(pool), - [](const gpu_entry &first, const gpu_entry &second) -> bool { - return std::get<1>(first) < std::get<1>(second); - }))); - return current_min_gpu.get_current_load(); - } - size_t get_next_device_id() { return current_interface; } -}; - -template class priority_pool_multi_gpu { -private: - std::vector priorities{}; - std::vector ref_counters{}; - std::deque gpu_interfaces{}; - size_t streams_per_gpu{0}; - -public: - template - priority_pool_multi_gpu(size_t number_of_streams, int number_of_gpus, - Ts &&... executor_args) - : streams_per_gpu(number_of_streams) { - ref_counters.reserve(number_of_gpus); - priorities.reserve(number_of_gpus); - for (auto gpu_id = 0; gpu_id < number_of_gpus; gpu_id++) { - priorities.emplace_back(gpu_id); - ref_counters.emplace_back(0); - gpu_interfaces.emplace_back(streams_per_gpu, gpu_id, - std::forward(executor_args)...); - } - } - // return a tuple with the interface and its index (to release it later) - std::tuple get_interface() { - auto gpu = priorities[0]; - ref_counters[gpu]++; - std::make_heap(std::begin(priorities), std::end(priorities), - [this](const size_t &first, const size_t &second) -> bool { - return ref_counters[first] > ref_counters[second]; - }); - size_t gpu_offset = gpu * streams_per_gpu; - auto stream_entry = gpu_interfaces[gpu].get_interface(); - std::get<1>(stream_entry) += gpu_offset; - return stream_entry; - } - void release_interface(size_t index) { - size_t gpu_index = index / streams_per_gpu; - size_t stream_index = index % streams_per_gpu; - ref_counters[gpu_index]--; - std::make_heap(std::begin(priorities), std::end(priorities), - [this](const size_t &first, const size_t &second) -> bool { - return ref_counters[first] > ref_counters[second]; - }); - gpu_interfaces[gpu_index].release_interface(stream_index); - } - bool interface_available(size_t load_limit) { - return gpu_interfaces[priorities[0]].interface_available(load_limit); - } - size_t get_current_load() { - return gpu_interfaces[priorities[0]].get_current_load(); - } - size_t get_next_device_id() { return priorities[0]; } + // TODO remove + /* size_t get_next_device_id() { */ + /* return 0; // single gpu pool */ + /* } */ }; /// Access/Concurrency Control for stream pool implementation class stream_pool { public: template - static void init(size_t number_of_streams, Ts &&... executor_args) { - stream_pool_implementation::init( - number_of_streams, std::forward(executor_args)...); + static void init(size_t number_of_streams, Ts ... executor_args) { + stream_pool_implementation::init(number_of_streams, + executor_args...); + } + template + static void init_all_executor_pools(size_t number_of_streams, Ts ... executor_args) { + stream_pool_implementation::init_all_executor_pools(number_of_streams, + executor_args...); + } + template + static void init_executor_pool(size_t pool_id, size_t number_of_streams, Ts ... executor_args) { + stream_pool_implementation::init_executor_pool(pool_id, number_of_streams, + executor_args...); } template static void cleanup() { stream_pool_implementation::cleanup(); } template - static std::tuple get_interface() { - return stream_pool_implementation::get_interface(); + static std::tuple get_interface(const size_t gpu_id) { + return stream_pool_implementation::get_interface(gpu_id); } template - static void release_interface(size_t index) noexcept { - stream_pool_implementation::release_interface(index); + static void release_interface(size_t index, const size_t gpu_id) noexcept { + stream_pool_implementation::release_interface(index, + gpu_id); } template - static bool interface_available(size_t load_limit) noexcept { + static bool interface_available(size_t load_limit, const size_t gpu_id) noexcept { return stream_pool_implementation::interface_available( - load_limit); + load_limit, gpu_id); } template - static size_t get_current_load() noexcept { - return stream_pool_implementation::get_current_load(); + static size_t get_current_load(const size_t gpu_id = 0) noexcept { + return stream_pool_implementation::get_current_load( + gpu_id); } template - static size_t get_next_device_id() noexcept { - return stream_pool_implementation::get_next_device_id(); + static size_t get_next_device_id(const size_t number_gpus) noexcept { + // TODO add round robin and min strategy + return recycler::get_device_id(number_gpus); + } + + template + static void set_device_selector(std::function select_gpu_function) { + stream_pool_implementation::set_device_selector(select_gpu_function); + } + + template + static void select_device(size_t gpu_id) { + stream_pool_implementation::select_device(gpu_id); } private: @@ -257,65 +172,103 @@ class stream_pool { private: template class stream_pool_implementation { public: + /// Deprecated! Use init_on_all_gpu or init_on_gpu template - static void init(size_t number_of_streams, Ts &&... executor_args) { - // TODO(daissgr) What should happen if the instance already exists? - // warning? - if (!pool_instance && number_of_streams > 0) { - // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) - pool_instance.reset(new stream_pool_implementation()); - // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) - pool_instance->streampool.reset( - new Pool{number_of_streams, std::forward(executor_args)...}); + static void init(size_t number_of_streams, Ts ... executor_args) { + /* static_assert(sizeof...(Ts) == sizeof...(Ts) && recycler::max_number_gpus == 1, */ + /* "deprecated stream_pool::init does not support multigpu"); */ + auto guard = make_scoped_lock_from_array(instance().gpu_mutexes); + instance().streampools.emplace_back(number_of_streams, executor_args...); + assert(instance().streampools.size() <= recycler::max_number_gpus); + } + + /// Multi-GPU init where executors / interfaces on all GPUs are initialized with the same arguments + template + static void init_all_executor_pools(size_t number_of_streams, Ts ... executor_args) { + auto guard = make_scoped_lock_from_array(instance().gpu_mutexes); + if (number_of_streams > 0) { + for (size_t gpu_id = 0; gpu_id < recycler::max_number_gpus; gpu_id++) { + instance().select_gpu_function(gpu_id); + instance().streampools.emplace_back(number_of_streams, + executor_args...); + } } + assert(instance().streampools.size() <= recycler::max_number_gpus); } - static void cleanup() { - std::lock_guard guard(pool_mut); - if (pool_instance) { - pool_instance->streampool.reset(nullptr); - pool_instance.reset(nullptr); + + /// Per-GPU init allowing for different init parameters depending on the GPU + /// (useful for executor that expect an GPU-id during construction) + template + static void init_executor_pool(size_t gpu_id, size_t number_of_streams, Ts ... executor_args) { + auto guard = make_scoped_lock_from_array(instance().gpu_mutexes); + if (number_of_streams > 0) { + instance().select_gpu_function(gpu_id); + instance().streampools.emplace_back(number_of_streams, + executor_args...); } + assert(instance().streampools.size() <= recycler::max_number_gpus); } - static std::tuple get_interface() { - std::lock_guard guard(pool_mut); - assert(pool_instance); // should already be initialized - return pool_instance->streampool->get_interface(); + // TODO add/rename into finalize? + static void cleanup() { + auto guard = make_scoped_lock_from_array(instance().gpu_mutexes); + assert(instance().streampools.size() == recycler::max_number_gpus); + instance().streampools.clear(); } - static void release_interface(size_t index) { - std::lock_guard guard(pool_mut); - assert(pool_instance); // should already be initialized - pool_instance->streampool->release_interface(index); + + static std::tuple get_interface(const size_t gpu_id = 0) { + std::lock_guard guard(instance().gpu_mutexes[gpu_id]); + assert(gpu_id < instance().streampools.size()); + return instance().streampools[gpu_id].get_interface(); } - static bool interface_available(size_t load_limit) { - std::lock_guard guard(pool_mut); - if (!pool_instance) { - return false; - } - return pool_instance->streampool->interface_available(load_limit); + static void release_interface(size_t index, const size_t gpu_id = 0) { + std::lock_guard guard(instance().gpu_mutexes[gpu_id]); + assert(gpu_id < instance().streampools.size()); + instance().streampools[gpu_id].release_interface(index); } - static size_t get_current_load() { - std::lock_guard guard(pool_mut); - if (!pool_instance) { - return 0; - } - assert(pool_instance); // should already be initialized - return pool_instance->streampool->get_current_load(); + static bool interface_available(size_t load_limit, const size_t gpu_id = 0) { + std::lock_guard guard(instance().gpu_mutexes[gpu_id]); + assert(gpu_id < instance().streampools.size()); + return instance().streampools[gpu_id].interface_available(load_limit); } - static size_t get_next_device_id() { - std::lock_guard guard(pool_mut); - if (!pool_instance) { - return 0; - } - return pool_instance->streampool->get_next_device_id(); + static size_t get_current_load(const size_t gpu_id = 0) { + std::lock_guard guard(instance().gpu_mutexes[gpu_id]); + assert(gpu_id < instance().streampools.size()); + return instance().streampools[gpu_id].get_current_load(); + } + // TODO deprecated! Remove... + /* static size_t get_next_device_id(const size_t gpu_id = 0) { */ + /* std::lock_guard guard(instance().gpu_mutexes[gpu_id]); */ + /* assert(instance().streampools.size() == recycler::max_number_gpus); */ + /* return instance().streampools[gpu_id].get_next_device_id(); */ + /* } */ + + static void set_device_selector(std::function select_gpu_function) { + auto guard = make_scoped_lock_from_array(instance().gpu_mutexes); + assert(instance().streampools.size() == recycler::max_number_gpus); + instance().select_gpu_function = select_gpu_function; + } + + static void select_device(size_t gpu_id) { + instance().select_gpu_function(gpu_id); } private: - inline static std::unique_ptr pool_instance{}; stream_pool_implementation() = default; - inline static mutex_t pool_mut{}; + recycler::mutex_t pool_mut{}; + std::function select_gpu_function = [](size_t gpu_id) { + // By default no multi gpu support + assert(recycler::max_number_gpus == 1 || instance().streampools.size() == 1); + assert(gpu_id == 0); + }; + + std::deque streampools{}; + std::array gpu_mutexes; - std::unique_ptr streampool{nullptr}; + static stream_pool_implementation& instance(void) { + static stream_pool_implementation pool_instance{}; + return pool_instance; + } public: ~stream_pool_implementation() = default; @@ -338,22 +291,18 @@ class stream_pool { stream_pool &operator=(stream_pool &&other) = delete; }; -/* template */ -/* std::unique_ptr> */ -/* stream_pool::stream_pool_implementation::pool_instance{}; */ - template class stream_interface { public: - explicit stream_interface() - : t(stream_pool::get_interface()), - interface(std::get<0>(t)), interface_index(std::get<1>(t)) {} + explicit stream_interface(size_t gpu_id) + : t(stream_pool::get_interface(gpu_id)), + interface(std::get<0>(t)), interface_index(std::get<1>(t)), gpu_id(gpu_id) {} stream_interface(const stream_interface &other) = delete; stream_interface &operator=(const stream_interface &other) = delete; stream_interface(stream_interface &&other) = delete; stream_interface &operator=(stream_interface &&other) = delete; ~stream_interface() { - stream_pool::release_interface(interface_index); + stream_pool::release_interface(interface_index, gpu_id); } template @@ -366,8 +315,6 @@ template class stream_interface { return interface.async_execute(std::forward(f), std::forward(ts)...); } - inline size_t get_gpu_id() noexcept { return interface.get_gpu_id(); } - // allow implict conversion operator Interface &() { // NOLINT return interface; @@ -376,6 +323,7 @@ template class stream_interface { private: std::tuple t; size_t interface_index; + size_t gpu_id; public: Interface &interface; diff --git a/include/sycl_buffer_util.hpp b/include/sycl_buffer_util.hpp index 6469aa4e..61d22f8f 100644 --- a/include/sycl_buffer_util.hpp +++ b/include/sycl_buffer_util.hpp @@ -16,6 +16,8 @@ namespace recycler { namespace detail { +static_assert(max_number_gpus == 1, "CPPuddle currently does not support MultiGPU SYCL builds!"); + template struct sycl_host_default_allocator { using value_type = T; sycl_host_default_allocator() noexcept = default; diff --git a/tests/allocator_aligned_test.cpp b/tests/allocator_aligned_test.cpp index 882a2c0c..c3c09217 100644 --- a/tests/allocator_aligned_test.cpp +++ b/tests/allocator_aligned_test.cpp @@ -92,6 +92,7 @@ int main(int argc, char *argv[]) { std::cout << "\n==> Aggressive recycle allocation test took " << aggressive_duration << "ms" << std::endl; } + recycler::print_performance_counters(); recycler::force_cleanup(); // Cleanup all buffers and the managers for better // comparison @@ -113,6 +114,7 @@ int main(int argc, char *argv[]) { std::cout << "\n\n==> Recycle allocation test took " << recycle_duration << "ms" << std::endl; } + recycler::print_performance_counters(); recycler::force_cleanup(); // Cleanup all buffers and the managers for better // comparison @@ -140,10 +142,11 @@ int main(int argc, char *argv[]) { "recycler!" << std::endl; } - if (recycle_duration < default_duration) { - std::cout << "Test information: Recycler was faster than default allocator!" + if (aggressive_duration < default_duration) { + std::cout << "Test information: Aggressive recycler was faster than default allocator!" << std::endl; } + recycler::print_performance_counters(); #ifdef CPPUDDLE_HAVE_HPX return hpx::finalize(); #else diff --git a/tests/allocator_hpx_test.cpp b/tests/allocator_hpx_test.cpp index 4d11cc16..9d8cc44b 100644 --- a/tests/allocator_hpx_test.cpp +++ b/tests/allocator_hpx_test.cpp @@ -126,6 +126,7 @@ int hpx_main(int argc, char *argv[]) { std::cout << "\n==> Recycle allocation test took " << recycle_duration << "ms" << std::endl; } + recycler::print_performance_counters(); recycler::force_cleanup(); // Cleanup all buffers and the managers for better // comparison @@ -166,6 +167,7 @@ int hpx_main(int argc, char *argv[]) { std::cout << "\n==> Aggressive recycle allocation test took " << aggressive_duration << "ms" << std::endl; } + recycler::print_performance_counters(); recycler::force_cleanup(); // Cleanup all buffers and the managers for better // comparison @@ -175,8 +177,8 @@ int hpx_main(int argc, char *argv[]) { "recycler!" << std::endl; } - if (recycle_duration < default_duration) { - std::cout << "Test information: Recycler was faster than default allocator!" + if (aggressive_duration < default_duration) { + std::cout << "Test information: Aggressive recycler was faster than default allocator!" << std::endl; } } diff --git a/tests/allocator_kokkos_executor_for_loop_test.cpp b/tests/allocator_kokkos_executor_for_loop_test.cpp index f2256fdd..7708fe56 100644 --- a/tests/allocator_kokkos_executor_for_loop_test.cpp +++ b/tests/allocator_kokkos_executor_for_loop_test.cpp @@ -143,6 +143,7 @@ int main(int argc, char *argv[]) { // otherwise the HPX cuda polling futures won't work hpx::cuda::experimental::detail::unregister_polling(hpx::resource::get_thread_pool(0)); + recycler::print_performance_counters(); // Cleanup all cuda views // (otherwise the cuda driver might shut down before this gets done automatically at // the end of the programm) diff --git a/tests/allocator_kokkos_test.cpp b/tests/allocator_kokkos_test.cpp index de808859..e2770458 100644 --- a/tests/allocator_kokkos_test.cpp +++ b/tests/allocator_kokkos_test.cpp @@ -81,6 +81,7 @@ int main(int argc, char *argv[]) { for (size_t pass = 0; pass < passes; pass++) { test_view my_wrapper_test1(1000); test_view my_wrapper_test2(1000); + test_view my_wrapper_test3(0, 1000); // test 1D with explicit device id parameter double t = 2.6; Kokkos::parallel_for(Kokkos::RangePolicy(0, 1000), KOKKOS_LAMBDA(const int n) { @@ -90,6 +91,7 @@ int main(int argc, char *argv[]) { }); Kokkos::fence(); } + recycler::print_performance_counters(); #ifdef CPPUDDLE_HAVE_HPX return hpx::finalize(); #else diff --git a/tests/allocator_test.cpp b/tests/allocator_test.cpp index e86f00ce..004368a4 100644 --- a/tests/allocator_test.cpp +++ b/tests/allocator_test.cpp @@ -88,6 +88,7 @@ int main(int argc, char *argv[]) { std::cout << "\n\n==> Aggressive recycle allocation test took " << aggressive_duration << "ms" << std::endl; } + recycler::print_performance_counters(); recycler::force_cleanup(); // Cleanup all buffers and the managers for better // comparison @@ -107,6 +108,7 @@ int main(int argc, char *argv[]) { std::cout << "\n\n==> Recycle allocation test took " << recycle_duration << "ms" << std::endl; } + recycler::print_performance_counters(); recycler::force_cleanup(); // Cleanup all buffers and the managers for better // comparison @@ -132,10 +134,11 @@ int main(int argc, char *argv[]) { "recycler!" << std::endl; } - if (recycle_duration < default_duration) { - std::cout << "Test information: Recycler was faster than default allocator!" + if (aggressive_duration < default_duration) { + std::cout << "Test information: Aggressive recycler was faster than default allocator!" << std::endl; } + recycler::print_performance_counters(); #ifdef CPPUDDLE_HAVE_HPX return hpx::finalize(); #else diff --git a/tests/stream_test.cpp b/tests/stream_test.cpp index e7010d0f..96599759 100644 --- a/tests/stream_test.cpp +++ b/tests/stream_test.cpp @@ -31,30 +31,9 @@ int main(int argc, char *argv[]) { test_pool_ref_counting< hpx::cuda::experimental::cuda_executor, round_robin_pool>(2, 0, false); - test_pool_ref_counting< - hpx::cuda::experimental::cuda_executor, - multi_gpu_round_robin_pool< - hpx::cuda::experimental::cuda_executor, - round_robin_pool>>(2, 1, - false); - test_pool_ref_counting< - hpx::cuda::experimental::cuda_executor, - priority_pool_multi_gpu< - hpx::cuda::experimental::cuda_executor, - priority_pool>>(2, 1, false); - test_pool_ref_counting< - hpx::cuda::experimental::cuda_executor, - multi_gpu_round_robin_pool< - hpx::cuda::experimental::cuda_executor, - priority_pool>>(2, 1, false); - test_pool_ref_counting< - hpx::cuda::experimental::cuda_executor, - priority_pool_multi_gpu< - hpx::cuda::experimental::cuda_executor, - round_robin_pool>>(2, 1, - false); std::cout << "Finished ref counting tests!" << std::endl; + std::cout << "Starting wrapper objects tests ..." << std::endl; test_pool_wrappers>( @@ -62,62 +41,16 @@ int main(int argc, char *argv[]) { test_pool_wrappers>( 2, 0, false); - test_pool_wrappers< - hpx::cuda::experimental::cuda_executor, - multi_gpu_round_robin_pool< - hpx::cuda::experimental::cuda_executor, - round_robin_pool>>(2, 1, - false); - test_pool_wrappers< - hpx::cuda::experimental::cuda_executor, - priority_pool_multi_gpu< - hpx::cuda::experimental::cuda_executor, - priority_pool>>(2, 1, false); - - test_pool_wrappers< - hpx::cuda::experimental::cuda_executor, - multi_gpu_round_robin_pool< - hpx::cuda::experimental::cuda_executor, - priority_pool>>(2, 1, false); - test_pool_wrappers< - hpx::cuda::experimental::cuda_executor, - priority_pool_multi_gpu< - hpx::cuda::experimental::cuda_executor, - round_robin_pool>>(2, 1, - false); std::cout << "Finished wrapper objects tests!" << std::endl; std::cout << "Starting memcpy tests... " << std::endl; test_pool_memcpy>( 2, 0, false); - test_pool_memcpy< - hpx::cuda::experimental::cuda_executor, - multi_gpu_round_robin_pool< - hpx::cuda::experimental::cuda_executor, - round_robin_pool>>(2, 1, - false); test_pool_memcpy>( 2, 0, false); - test_pool_memcpy>>( - 2, 1, false); - // combo pool - test_pool_memcpy>>( - 2, 1, false); - test_pool_memcpy< - hpx::cuda::experimental::cuda_executor, - priority_pool_multi_gpu< - hpx::cuda::experimental::cuda_executor, - round_robin_pool>>(2, 1, - false); std::cout << "Finished memcpy tests! " << std::endl; std::cout << "Starting memcpy polling tests... " << std::endl; @@ -127,33 +60,9 @@ int main(int argc, char *argv[]) { test_pool_memcpy>( 2, 0, true); - test_pool_memcpy< - hpx::cuda::experimental::cuda_executor, - multi_gpu_round_robin_pool< - hpx::cuda::experimental::cuda_executor, - round_robin_pool>>(2, 1, - true); test_pool_memcpy>( 2, 0, true); - test_pool_memcpy< - hpx::cuda::experimental::cuda_executor, - priority_pool_multi_gpu< - hpx::cuda::experimental::cuda_executor, - priority_pool>>(2, 1, true); - - // combo pool - test_pool_memcpy< - hpx::cuda::experimental::cuda_executor, - multi_gpu_round_robin_pool< - hpx::cuda::experimental::cuda_executor, - priority_pool>>(2, 1, true); - test_pool_memcpy< - hpx::cuda::experimental::cuda_executor, - priority_pool_multi_gpu< - hpx::cuda::experimental::cuda_executor, - round_robin_pool>>(2, 1, - true); hpx::cuda::experimental::detail::unregister_polling(hpx::resource::get_thread_pool(0)); } recycler::force_cleanup(); diff --git a/tests/stream_test.hpp b/tests/stream_test.hpp index 716a8ba7..07de4c44 100644 --- a/tests/stream_test.hpp +++ b/tests/stream_test.hpp @@ -20,7 +20,7 @@ void test_pool_memcpy(const size_t stream_parameter, Ts &&... ts) { stream_pool::init(stream_parameter, std::forward(ts)...); // without interface wrapper { - auto test1 = stream_pool::get_interface(); + auto test1 = stream_pool::get_interface(0); Interface test1_interface = std::get<0>(test1); size_t interface_id = std::get<1>(test1); hpx::apply(test1_interface, cudaMemcpyAsync, devicebuffer.device_side_buffer, @@ -30,12 +30,12 @@ void test_pool_memcpy(const size_t stream_parameter, Ts &&... ts) { cudaMemcpyAsync, hostbuffer.data(), devicebuffer.device_side_buffer, 512 * sizeof(double), cudaMemcpyDeviceToHost); fut1.get(); - stream_pool::release_interface(interface_id); + stream_pool::release_interface(interface_id, 0); } // with interface wrapper { - stream_interface test1_interface; + stream_interface test1_interface{0}; // hpx::cuda::cuda_executor test1_interface(0, false); hpx::apply(test1_interface.interface, cudaMemcpyAsync, devicebuffer.device_side_buffer, hostbuffer.data(), 512 * sizeof(double), @@ -55,43 +55,43 @@ void test_pool_ref_counting(const size_t stream_parameter, Ts &&... ts) { stream_pool::init(stream_parameter, std::forward(ts)...); { // Allocating - auto test1 = stream_pool::get_interface(); - auto load1 = stream_pool::get_current_load(); + auto test1 = stream_pool::get_interface(0); + auto load1 = stream_pool::get_current_load(0); assert(load1 == 0); Interface test1_interface = std::get<0>(test1); size_t test1_index = std::get<1>(test1); - auto test2 = stream_pool::get_interface(); - auto load2 = stream_pool::get_current_load(); + auto test2 = stream_pool::get_interface(0); + auto load2 = stream_pool::get_current_load(0); assert(load2 == 1); Interface test2_interface = std::get<0>(test2); // auto fut = test2_interface.get_future(); size_t test2_index = std::get<1>(test2); - auto test3 = stream_pool::get_interface(); - auto load3 = stream_pool::get_current_load(); + auto test3 = stream_pool::get_interface(0); + auto load3 = stream_pool::get_current_load(0); assert(load3 == 1); Interface test3_interface = std::get<0>(test3); size_t test3_index = std::get<1>(test3); - auto test4 = stream_pool::get_interface(); - auto load4 = stream_pool::get_current_load(); + auto test4 = stream_pool::get_interface(0); + auto load4 = stream_pool::get_current_load(0); Interface test4_interface = std::get<0>(test4); size_t test4_index = std::get<1>(test4); assert(load4 == 2); // Releasing - stream_pool::release_interface(test4_index); - load4 = stream_pool::get_current_load(); + stream_pool::release_interface(test4_index, 0); + load4 = stream_pool::get_current_load(0); assert(load4 == 1); - stream_pool::release_interface(test3_index); - load3 = stream_pool::get_current_load(); + stream_pool::release_interface(test3_index, 0); + load3 = stream_pool::get_current_load(0); assert(load3 == 1); - stream_pool::release_interface(test2_index); - load2 = stream_pool::get_current_load(); + stream_pool::release_interface(test2_index, 0); + load2 = stream_pool::get_current_load(0); assert(load2 == 0); - stream_pool::release_interface(test1_index); - load1 = stream_pool::get_current_load(); + stream_pool::release_interface(test1_index, 0); + load1 = stream_pool::get_current_load(0); assert(load1 == 0); } // Clear - auto load0 = stream_pool::get_current_load(); + auto load0 = stream_pool::get_current_load(0); assert(load0 == 0); stream_pool::cleanup(); } @@ -102,28 +102,28 @@ void test_pool_wrappers(const size_t stream_parameter, Ts &&... ts) { // init ppol stream_pool::init(stream_parameter, std::forward(ts)...); { - wrapper_type test1; - auto load = stream_pool::get_current_load(); + wrapper_type test1{0}; + auto load = stream_pool::get_current_load(0); assert(load == 0); - wrapper_type test2; - load = stream_pool::get_current_load(); + wrapper_type test2{0}; + load = stream_pool::get_current_load(0); // auto fut = test2.get_future(); assert(load == 1); - wrapper_type test3; - load = stream_pool::get_current_load(); + wrapper_type test3{0}; + load = stream_pool::get_current_load(0); assert(load == 1); - wrapper_type test4; - load = stream_pool::get_current_load(); + wrapper_type test4{0}; + load = stream_pool::get_current_load(0); assert(load == 2); // Check availability method: - bool avail = stream_pool::interface_available(1); + bool avail = stream_pool::interface_available(1, 0); assert(avail == false); // NOLINT - avail = stream_pool::interface_available(2); + avail = stream_pool::interface_available(2, 0); assert(avail == false); // NOLINT - avail = stream_pool::interface_available(3); + avail = stream_pool::interface_available(3, 0); assert(avail == true); // NOLINT } - auto load0 = stream_pool::get_current_load(); + auto load0 = stream_pool::get_current_load(0); assert(load0 == 0); stream_pool::cleanup(); } diff --git a/tests/work_aggregation_cuda_triad.cpp b/tests/work_aggregation_cuda_triad.cpp index 4bc050b1..f3f6ec92 100644 --- a/tests/work_aggregation_cuda_triad.cpp +++ b/tests/work_aggregation_cuda_triad.cpp @@ -208,7 +208,7 @@ int hpx_main(int argc, char *argv[]) { recycler::cuda_aggregated_device_buffer - device_A(slice_exec.number_slices * kernel_size, 0, + device_A(slice_exec.number_slices * kernel_size, alloc_device); std::vector local_B( @@ -216,7 +216,7 @@ int hpx_main(int argc, char *argv[]) { alloc_host); recycler::cuda_aggregated_device_buffer - device_B(slice_exec.number_slices * kernel_size, 0, + device_B(slice_exec.number_slices * kernel_size, alloc_device); std::vector local_C( @@ -224,7 +224,7 @@ int hpx_main(int argc, char *argv[]) { alloc_host); recycler::cuda_aggregated_device_buffer - device_C(slice_exec.number_slices * kernel_size, 0, + device_C(slice_exec.number_slices * kernel_size, alloc_device); for (size_t i = task_id * kernel_size, j = 0; diff --git a/tests/work_aggregation_test.cpp b/tests/work_aggregation_test.cpp index e4050e3f..25455633 100644 --- a/tests/work_aggregation_test.cpp +++ b/tests/work_aggregation_test.cpp @@ -605,7 +605,7 @@ void references_add_test(void) { auto &agg_exec = std::get<0>(stream_pool::get_interface< Aggregated_Executor, - round_robin_pool>>()); + round_robin_pool>>(0)); std::vector erg(512); std::vector> slices_done_futs; @@ -863,6 +863,7 @@ int hpx_main(int argc, char *argv[]) { std::flush(hpx::cout); sleep(1); + recycler::print_performance_counters(); recycler::force_cleanup(); // Cleanup all buffers and the managers return hpx::finalize(); }