Skip to content

Commit

Permalink
Merge pull request #21 from SC-SGS/rework_buffer_manager
Browse files Browse the repository at this point in the history
Refactor buffer manager
  • Loading branch information
G-071 authored Jun 7, 2023
2 parents c922bcc + 7e16672 commit c084385
Show file tree
Hide file tree
Showing 18 changed files with 842 additions and 634 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ jobs:
- name: Test
working-directory: ${{github.workspace}}/build
shell: bash
run: ctest
run: ctest --output-on-failure
271 changes: 204 additions & 67 deletions CMakeLists.txt

Large diffs are not rendered by default.

17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,32 @@ In this use-case, allocating GPU buffers for all sub-grids in advance would have

- Allocators that reuse previousely allocated buffers if available (works with normal heap memory, pinned memory, aligned memory, CUDA/HIP device memory, and Kokkos Views). Note that separate buffers do not coexist on a single chunk of continuous memory, but use different allocations.
- Executor pools and various scheduling policies (round robin, priority queue, multi-gpu), which rely on reference counting to gauge the current load of a executor instead of querying the device itself. Tested with CUDA, HIP and Kokkos executors provided by HPX / HPX-Kokkos.
- Special Executors/Allocators for on-the-fly work GPU aggregation (using HPX).

#### Requirements

- C++14
- CMake (>= 3.11)
- C++17
- CMake (>= 3.16)
- Optional (for the header-only utilities / test): CUDA, Boost, [HPX](https://github.com/STEllAR-GROUP/hpx), [Kokkos](https://github.com/kokkos/kokkos), [HPX-Kokkos](https://github.com/STEllAR-GROUP/hpx-kokkos)

The submodules can be used to obtain the optional dependencies which are required for testing the header-only utilities. If these tests are not required, the submodule (and the respective buildscripts in /scripts) can be ignored safely.

#### Build / Install

Basic build

```
cmake -H/path/to/source -B$/path/to/build -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/path/to/install/cppuddle -DCPPUDDLE_WITH_TESTS=OFF -DCPPUDDLE_WITH_COUNTERS=OFF
cmake --build /path/to/build -- -j4 VERBOSE=1
cmake --build /path/to/build --target install
```
If installed correctly, cppuddle can be used in other cmake-based projects via
If installed correctly, CPPuddle can be used in other CMake-based projects via
```
find_package(CPPuddle REQUIRED)
```

Recommended build:
```
cmake -H/path/to/source -B$/path/to/build -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/path/to/install/cppuddle -DCPPUDDLE_WITH_HPX=ON -DCPPUDDLE_WITH_HPX_AWARE_ALLOCATORS=ON -DCPPUDDLE_WITH_TESTS=OFF -DCPPUDDLE_WITH_COUNTERS=OFF
```


48 changes: 32 additions & 16 deletions include/aggregation_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@
#include "../include/buffer_manager.hpp"
#include "../include/stream_manager.hpp"

using aggregation_mutex_t = hpx::lcos::local::mutex;
#if defined(CPPUDDLE_HAVE_HPX_MUTEX)
using aggregation_mutex_t = hpx::spinlock;
#else
using aggregation_mutex_t = std::mutex;
#endif

//===============================================================================
//===============================================================================
Expand Down Expand Up @@ -134,7 +138,7 @@ template <typename Executor> class aggregated_function_call {

#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
#pragma message \
"Running slow work aggegator debug build! Run with NDEBUG defined for fast build..."
"Building slow work aggegator build with additional runtime checks! Build with NDEBUG defined for fast build..."
/// Stores the function call of the first slice as reference for error
/// checking
std::any function_tuple;
Expand Down Expand Up @@ -529,7 +533,7 @@ template <typename Executor> class Aggregated_Executor {
/// Data entry for a buffer allocation: void* pointer, size_t for
/// buffer-size, atomic for the slice counter
using buffer_entry_t =
std::tuple<void*, const size_t, std::atomic<size_t>, bool>;
std::tuple<void*, const size_t, std::atomic<size_t>, bool, const size_t>;
/// Keeps track of the aggregated buffer allocations done in all the slices
std::deque<buffer_entry_t> buffer_allocations;
/// Map pointer to deque index for fast access in the deallocations
Expand All @@ -552,15 +556,27 @@ template <typename Executor> class Aggregated_Executor {
if (buffer_counter <= slice_alloc_counter) {
constexpr bool manage_content_lifetime = false;
buffers_in_use = true;

// Default location -- useful for GPU builds as we otherwise create way too
// many different buffers for different aggregation sizes on different GPUs
size_t location_id = 0;
#ifdef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS
if (max_slices == 1) {
// get prefered location: aka the current hpx threads location
// Usually handy for CPU builds where we want to use the buffers
// close to the current CPU core
location_id = hpx::get_worker_thread_num();
}
#endif
// Get shiny and new buffer that will be shared between all slices
// Buffer might be recycled from previous allocations by the
// buffer_recycler...
T *aggregated_buffer =
recycler::detail::buffer_recycler::get<T, Host_Allocator>(size,
manage_content_lifetime);
recycler::detail::buffer_recycler::get<T, Host_Allocator>(
size, manage_content_lifetime, location_id);
// Create buffer entry for this buffer
buffer_allocations.emplace_back(static_cast<void *>(aggregated_buffer),
size, 1, true);
size, 1, true, location_id);

#ifndef NDEBUG
// if previousely used the buffer should not be in usage anymore
Expand Down Expand Up @@ -613,6 +629,7 @@ template <typename Executor> class Aggregated_Executor {
const auto buffer_size = std::get<1>(buffer_allocations[slice_alloc_counter]);
auto &buffer_allocation_counter = std::get<2>(buffer_allocations[slice_alloc_counter]);
auto &valid = std::get<3>(buffer_allocations[slice_alloc_counter]);
const auto &location_id = std::get<4>(buffer_allocations[slice_alloc_counter]);
assert(valid);
T *buffer_pointer = static_cast<T *>(buffer_pointer_void);

Expand All @@ -630,7 +647,7 @@ template <typename Executor> class Aggregated_Executor {
if (valid) {
assert(buffers_in_use == true);
recycler::detail::buffer_recycler::mark_unused<T, Host_Allocator>(
buffer_pointer, buffer_size);
buffer_pointer, buffer_size, location_id);
// mark buffer as invalid to prevent any other slice from marking the
// buffer as unused
valid = false;
Expand Down Expand Up @@ -752,9 +769,9 @@ template <typename Executor> class Aggregated_Executor {
std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
#ifndef NDEBUG
for (const auto &buffer_entry : buffer_allocations) {
const auto &[buffer_pointer_any, buffer_size,
buffer_allocation_counter,
valid] = buffer_entry;
const auto &[buffer_pointer_any, buffer_size,
buffer_allocation_counter, valid, location_id] =
buffer_entry;
assert(!valid);
}
#endif
Expand Down Expand Up @@ -879,12 +896,11 @@ template <typename Executor> class Aggregated_Executor {
overall_launch_counter = 0;
#ifndef NDEBUG
for (const auto &buffer_entry : buffer_allocations) {
const auto &[buffer_pointer_any, buffer_size,
buffer_allocation_counter,
valid] = buffer_entry;
const auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter,
valid, location_id] = buffer_entry;
assert(!valid);
}
#endif
#endif
buffer_allocations.clear();
buffer_allocations_map.clear();
buffer_counter = 0;
Expand All @@ -900,8 +916,8 @@ template <typename Executor> class Aggregated_Executor {
executor_tuple(
stream_pool::get_interface<Executor, round_robin_pool<Executor>>()),
executor(std::get<0>(executor_tuple)),
current_continuation(hpx::lcos::make_ready_future()),
last_stream_launch_done(hpx::lcos::make_ready_future()) {}
current_continuation(hpx::make_ready_future()),
last_stream_launch_done(hpx::make_ready_future()) {}
// Not meant to be copied or moved
Aggregated_Executor(const Aggregated_Executor &other) = delete;
Aggregated_Executor &operator=(const Aggregated_Executor &other) = delete;
Expand Down
Loading

0 comments on commit c084385

Please sign in to comment.