diff --git a/include/aggregation_manager.hpp b/include/aggregation_manager.hpp index 92ad5f8d..e31022c9 100644 --- a/include/aggregation_manager.hpp +++ b/include/aggregation_manager.hpp @@ -384,10 +384,10 @@ template class Aggregated_Executor { const Aggregated_Executor_Modes mode; const size_t max_slices; std::atomic current_slices; - /// Executor reference and its ID in the exextutor pool - std::tuple executor_tuple; - /// Reference to the executor (presumably residing in the executor pool) - Executor &executor; + /// Wrapper to the executor interface from the stream pool + /// Automatically hooks into the stream_pools reference counting + /// for cpu/gpu load balancing + std::unique_ptr>> executor_wrapper; public: size_t gpu_id; @@ -516,7 +516,8 @@ template class Aggregated_Executor { } Executor& get_underlying_executor(void) { - return parent.executor; + assert(parent.executor_wrapper); + return *(parent.executor_wrapper); } }; @@ -548,6 +549,7 @@ template class Aggregated_Executor { template T *get(const size_t size, const size_t slice_alloc_counter) { assert(slices_exhausted == true); + assert(executor_wrapper); assert(executor_slices_alive == true); // Add aggreated buffer entry in case it hasn't happened yet for this call // First: Check if it already has happened @@ -624,6 +626,7 @@ template class Aggregated_Executor { template void mark_unused(T *p, const size_t size) { assert(slices_exhausted == true); + assert(executor_wrapper); void *ptr_key = static_cast(p); size_t slice_alloc_counter = buffer_allocations_map[p]; @@ -663,8 +666,11 @@ template class Aggregated_Executor { if (current_deallocs == buffer_counter) { std::lock_guard guard(mut); buffers_in_use = false; - if (!executor_slices_alive && !buffers_in_use) + if (!executor_slices_alive && !buffers_in_use) { slices_exhausted = false; + // Release executor + executor_wrapper.reset(nullptr); + } } } } @@ -681,11 +687,12 @@ template class Aggregated_Executor { bool sync_aggregation_slices(const size_t slice_launch_counter) { std::lock_guard guard(mut); assert(slices_exhausted == true); + assert(executor_wrapper); // Add function call object in case it hasn't happened for this launch yet if (overall_launch_counter <= slice_launch_counter) { /* std::lock_guard guard(mut); */ if (overall_launch_counter <= slice_launch_counter) { - function_calls.emplace_back(current_slices, false, executor); + function_calls.emplace_back(current_slices, false, *executor_wrapper); overall_launch_counter = function_calls.size(); return function_calls[slice_launch_counter].sync_aggregation_slices( last_stream_launch_done); @@ -701,11 +708,12 @@ template class Aggregated_Executor { void post(const size_t slice_launch_counter, F &&f, Ts &&...ts) { std::lock_guard guard(mut); assert(slices_exhausted == true); + assert(executor_wrapper); // Add function call object in case it hasn't happened for this launch yet if (overall_launch_counter <= slice_launch_counter) { /* std::lock_guard guard(mut); */ if (overall_launch_counter <= slice_launch_counter) { - function_calls.emplace_back(current_slices, false, executor); + function_calls.emplace_back(current_slices, false, *executor_wrapper); overall_launch_counter = function_calls.size(); function_calls[slice_launch_counter].post_when( last_stream_launch_done, std::forward(f), std::forward(ts)...); @@ -724,11 +732,12 @@ template class Aggregated_Executor { Ts &&...ts) { std::lock_guard guard(mut); assert(slices_exhausted == true); + assert(executor_wrapper); // Add function call object in case it hasn't happened for this launch yet if (overall_launch_counter <= slice_launch_counter) { /* std::lock_guard guard(mut); */ if (overall_launch_counter <= slice_launch_counter) { - function_calls.emplace_back(current_slices, true, executor); + function_calls.emplace_back(current_slices, true, *executor_wrapper); overall_launch_counter = function_calls.size(); return function_calls[slice_launch_counter].async_when( last_stream_launch_done, std::forward(f), std::forward(ts)...); @@ -744,11 +753,12 @@ template class Aggregated_Executor { Ts &&...ts) { std::lock_guard guard(mut); assert(slices_exhausted == true); + assert(executor_wrapper); // Add function call object in case it hasn't happened for this launch yet if (overall_launch_counter <= slice_launch_counter) { /* std::lock_guard guard(mut); */ if (overall_launch_counter <= slice_launch_counter) { - function_calls.emplace_back(current_slices, true, executor); + function_calls.emplace_back(current_slices, true, *executor_wrapper); overall_launch_counter = function_calls.size(); return function_calls[slice_launch_counter].wrap_async( last_stream_launch_done, std::forward(f), std::forward(ts)...); @@ -813,7 +823,13 @@ template class Aggregated_Executor { // Executor_Slice // futures to ready if the launch conditions are met if (local_slice_id == 1) { - // Renew promise that all slices will be ready as the primary launch criteria... + // Redraw executor + assert(!executor_wrapper); + stream_pool::select_device>(gpu_id); + executor_wrapper.reset( + new stream_interface>(gpu_id)); + // Renew promise that all slices will be ready as the primary launch + // criteria... hpx::lcos::shared_future fut; if (mode == Aggregated_Executor_Modes::EAGER || mode == Aggregated_Executor_Modes::ENDLESS) { @@ -821,8 +837,8 @@ template class Aggregated_Executor { // is ready /* auto slices_full_fut = slices_full_promise.get_future(); */ stream_pool::select_device>(gpu_id); - auto exec_fut = executor.get_future(); - /* fut = hpx::when_any(exec_fut, slices_full_fut); */ + auto exec_fut = (*executor_wrapper).get_future(); + /* auto fut = hpx::when_any(exec_fut, slices_full_fut); */ fut = std::move(exec_fut); } else { auto slices_full_fut = slices_full_promise.get_shared_future(); @@ -870,25 +886,20 @@ template class Aggregated_Executor { void reduce_usage_counter(void) { /* std::lock_guard guard(mut); */ assert(slices_exhausted == true); + assert(executor_wrapper); assert(executor_slices_alive == true); assert(launched_slices >= 1); assert(current_slices >= 0 && current_slices <= launched_slices); const size_t local_slice_id = --current_slices; // Last slice goes out scope? if (local_slice_id == 0) { - - // Draw new underlying executor TODO Test if it's better to redraw at - // the first slice request stream_pool::release_interface>( std::get<1>(executor_tuple)); - // executor_tuple = stream_pool::get_interface>(); executor = - // std::get<0>(executor_tuple); // Mark executor fit for reusage - std::lock_guard guard(mut); executor_slices_alive = false; if (!executor_slices_alive && !buffers_in_use) { + // Release executor slices_exhausted = false; + executor_wrapper.reset(nullptr); } } } @@ -922,11 +933,10 @@ template class Aggregated_Executor { Aggregated_Executor(const size_t number_slices, Aggregated_Executor_Modes mode, const size_t gpu_id = 0) - : max_slices(number_slices), current_slices(0), slices_exhausted(false),dealloc_counter(0), - mode(mode), executor_slices_alive(false), buffers_in_use(false), gpu_id(gpu_id), - executor_tuple( - stream_pool::get_interface>(gpu_id)), - executor(std::get<0>(executor_tuple)), + : max_slices(number_slices), current_slices(0), slices_exhausted(false), + dealloc_counter(0), mode(mode), executor_slices_alive(false), + buffers_in_use(false), gpu_id(gpu_id), + executor_wrapper(nullptr), current_continuation(hpx::make_ready_future()), last_stream_launch_done(hpx::make_ready_future()) {} // Not meant to be copied or moved diff --git a/include/stream_manager.hpp b/include/stream_manager.hpp index 40631491..34c05d19 100644 --- a/include/stream_manager.hpp +++ b/include/stream_manager.hpp @@ -315,6 +315,10 @@ template class stream_interface { return interface.async_execute(std::forward(f), std::forward(ts)...); } + inline decltype(auto) get_future() { + return interface.get_future(); + } + // allow implict conversion operator Interface &() { // NOLINT return interface;