diff --git a/source/tit/core/CMakeLists.txt b/source/tit/core/CMakeLists.txt index df527cde..63c4321d 100644 --- a/source/tit/core/CMakeLists.txt +++ b/source/tit/core/CMakeLists.txt @@ -44,6 +44,7 @@ add_tit_library( "par/control.cpp" "par/control.hpp" "par/memory_pool.hpp" + "par/partitioner.hpp" "par/task_group.hpp" "profiler.cpp" "profiler.hpp" diff --git a/source/tit/core/containers/multivector.hpp b/source/tit/core/containers/multivector.hpp index ee3f9cce..135f7556 100644 --- a/source/tit/core/containers/multivector.hpp +++ b/source/tit/core/containers/multivector.hpp @@ -166,7 +166,7 @@ class Multivector { constexpr void assign_pairs_par_wide(size_t count, Pairs&& pairs) { TIT_ASSUME_UNIVERSAL(Pairs, pairs); assign_pairs_wide_impl_(count, [&pairs](auto func) { - par::static_for_each(pairs, std::move(func)); + par::deterministic_for_each(pairs, std::move(func)); }); } template @@ -174,9 +174,12 @@ class Multivector { std::ranges::join_view pairs) { auto base = std::move(pairs).base(); assign_pairs_wide_impl_(count, [&base](auto func) { - par::static_for_each(base, [&func](size_t thread, const auto& range) { - std::ranges::for_each(range, std::bind_front(std::ref(func), thread)); - }); + par::deterministic_for_each(base, + [&func](const auto& range, size_t thread) { + std::ranges::for_each( + range, + std::bind_back(std::ref(func), thread)); + }); }); } /// @} @@ -223,7 +226,7 @@ class Multivector { static Mdvector per_thread_ranges{}; const auto num_threads = par::num_threads(); per_thread_ranges.assign(num_threads, count + 1); - for_each_pair([count](size_t thread, const auto& pair) { + for_each_pair([count](const auto& pair, size_t thread) { const auto index = std::get<0>(pair); TIT_ASSERT(index < count, "Index of the value is out of expected range!"); per_thread_ranges[thread, index] += 1; @@ -242,7 +245,7 @@ class Multivector { // Place each value into position of the first element of it's index // range, then increment the position. vals_.resize(val_ranges_.back()); - for_each_pair([count, this](size_t thread, const auto& pair) { + for_each_pair([count, this](const auto& pair, size_t thread) { const auto& [index, value] = pair; TIT_ASSERT(index < count, "Index of the value is out of expected range!"); auto& position = per_thread_ranges[thread, index]; diff --git a/source/tit/core/par/algorithms.hpp b/source/tit/core/par/algorithms.hpp index cfd1e8af..22ba1aab 100644 --- a/source/tit/core/par/algorithms.hpp +++ b/source/tit/core/par/algorithms.hpp @@ -10,39 +10,55 @@ #include #include #include +#include #include -#include #include +#include #include #include #include "tit/core/basic_types.hpp" +#include "tit/core/containers/boost.hpp" #include "tit/core/missing.hpp" // IWYU pragma: keep #include "tit/core/par/atomic.hpp" #include "tit/core/par/control.hpp" +#include "tit/core/par/partitioner.hpp" // IWYU pragma: export #include "tit/core/utils.hpp" namespace tit::par { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Batch operations. +// -/// Range that could be processed in parallel. -template -concept range = - std::ranges::sized_range && std::ranges::random_access_range; +/// Iterate through the blocks of the range in parallel. +struct ForEachRange final : AutoPartitionerInvoker { + using AutoPartitionerInvoker::operator(); + /// @todo Clang segfaults if this or similar functions below are `static`. + template> Func> + void operator()(Partitioner partitioner, Range&& range, Func func) const { + TIT_ASSUME_UNIVERSAL(Range, range); + tbb::parallel_for(partitioner.blockify(range), std::move(func)); + } +}; -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +/// @copydoc ForEachRange +inline constexpr ForEachRange for_each_range{}; -/// Iterate through the range in parallel (dynamic partitioning). -struct ForEach { - template> Func> - void operator()(Range&& range, Func func) const { - /// @todo Replace with `tbb::parallel_for_each` when it supports ranges. - TIT_ASSUME_UNIVERSAL(Range, range); - tbb::parallel_for(tbb::blocked_range{std::begin(range), std::end(range)}, - std::bind_back(std::ranges::for_each, std::move(func))); + void operator()(Partitioner partitioner, Range&& range, Func func) const { + for_each_range(std::move(partitioner), + std::forward(range), + std::bind_back(std::ranges::for_each, std::move(func))); } }; @@ -51,48 +67,64 @@ inline constexpr ForEach for_each{}; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -/// Iterate through the range in parallel (static partitioning). -struct StaticForEach { +/// Iterate through the blocks of the range in parallel. +/// Per-thread range partition will always be the same. +struct DeterministicForEachRange final { template> Func> + std::regular_invocable, size_t> Func> void operator()(Range&& range, Func func) const { TIT_ASSUME_UNIVERSAL(Range, range); - const auto thread_count = num_threads(); - auto block_first = [quotient = std::size(range) / thread_count, - remainder = std::size(range) % thread_count, - first = std::begin(range)](size_t index) { - const auto offset = index * quotient + std::min(index, remainder); - return first + offset; - }; - tbb::parallel_for( - /*first=*/0, - /*last=*/thread_count, - /*step=*/1, - [&block_first, &func](size_t thread_index) { - std::for_each(block_first(thread_index), - block_first(thread_index + 1), - std::bind_front(std::ref(func), thread_index)); + const auto count = num_threads(); + const auto quotient = static_cast(std::size(range)) / count; + const auto remainder = static_cast(std::size(range)) % count; + const auto iter = std::begin(range); + tbb::parallel_for( + size_t{0}, + count, + [quotient, remainder, iter, &func](size_t thread) { + const std::ranges::subrange subrange{ + iter + thread * quotient + std::min(thread, remainder), + iter + (thread + 1) * quotient + std::min(thread + 1, remainder)}; + std::invoke(func, subrange, thread); }, tbb::static_partitioner{}); } }; -/// @copydoc StaticForEach -inline constexpr StaticForEach static_for_each{}; +/// @copydoc DeterministicForEachRange +inline constexpr DeterministicForEachRange deterministic_for_each_range{}; + +/// Iterate through the range in parallel. +/// All thread partition will always be the same. +struct DeterministicForEach final { + template, + size_t> Func> + void operator()(Range&& range, Func func) const { + deterministic_for_each_range( + std::forward(range), + [&func](std::ranges::view auto subrange, size_t thread) { + std::ranges::for_each(std::move(subrange), + std::bind_back(std::ref(func), thread)); + }); + } +}; + +/// @copydoc DeterministicForEach +inline constexpr DeterministicForEach deterministic_for_each{}; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /// Iterate through the block of ranges in parallel. -struct BlockForEach { +struct BlockForEach final { template>> Func> + std::regular_invocable>> Func> void operator()(Range&& range, Func func) const { - TIT_ASSUME_UNIVERSAL(Range, range); - for (auto chunk : std::views::chunk(range, num_threads())) { - for_each(std::move(chunk), - std::bind_back(std::ranges::for_each, std::cref(func))); - } + std::ranges::for_each( + std::views::chunk(std::forward(range), num_threads()), + std::bind_back(for_each, + std::bind_back(std::ranges::for_each, std::ref(func)))); } }; @@ -100,37 +132,133 @@ struct BlockForEach { inline constexpr BlockForEach block_for_each{}; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Fold operations. +// + +/// Parallel range-wise fold. +struct FoldRange final : AutoPartitionerInvoker { + using AutoPartitionerInvoker::operator(); + template, + std::regular_invocable, Result> Func, + std::regular_invocable ResultFunc> + requires ( + std::assignable_from< + Result&, + std::invoke_result_t, Result>> && + std::assignable_from>) + auto operator()(Partitioner partitioner, + Range&& range, + Result init, + Func func, + ResultFunc result_func) const -> Result { + TIT_ASSUME_UNIVERSAL(Range, range); + return tbb::parallel_reduce(partitioner.blockify(range), + std::move(init), + std::move(func), + std::move(result_func)); + } +}; + +/// @copydoc FoldRange +inline constexpr FoldRange fold_range{}; + +/// Parallel fold. +struct Fold final : AutoPartitionerInvoker { + using AutoPartitionerInvoker::operator(); + template, + std::regular_invocable> + Func = std::plus<>, + std::regular_invocable ResultFunc = Func> + requires ( + std::assignable_from< + Result&, + std::invoke_result_t>> && + std::assignable_from>) + auto operator()(Partitioner partitioner, + Range&& range, + Result init = {}, + Func func = {}, + ResultFunc result_func = {}) const -> Result { + return fold_range(std::move(partitioner), + std::forward(range), + std::move(init), // NOLINTNEXTLINE(*-include-cleaner) + std::bind_back(std::ranges::fold_left, std::move(func)), + std::move(result_func)); + } +}; + +/// @copydoc Fold +inline constexpr Fold fold{}; + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copy operations. +// -/// Parallel copy-if. +/// Parallel unstable copy-if. /// Relative order of the elements in the output range is not preserved. -struct CopyIf { - template, Proj>> Pred> requires std::indirectly_copyable, OutIter> - auto operator()(Range&& range, OutIter out, Pred pred, Proj proj = {}) const - -> OutIter { - TIT_ASSUME_UNIVERSAL(Range, range); - ssize_t out_index = 0; - for_each(range, [&out_index, &out, &pred, &proj](const auto& item) { - if (std::invoke(pred, std::invoke(proj, item))) { - *std::next(out, fetch_and_add(out_index, 1)) = item; - } - }); - return std::next(out, out_index); + auto operator()(Partitioner partitioner, + Range&& range, + OutIter out, + Pred pred, + Proj proj = {}) const -> OutIter { + ssize_t index = 0; + for_each_range( + std::move(partitioner), + std::forward(range), + [&index, out, &pred, &proj](std::ranges::view auto subrange) { + // Filter the chunk into the intermediate buffer, then move the buffer + // into the output range. Intermediate buffer is used to reduce the + // number of atomic operations. + using Val = std::ranges::range_value_t; + static constexpr size_t BufferCap = 64; + InplaceVector buffer{}; + for (const auto& chunk : + std::views::chunk(std::move(subrange), BufferCap)) { + std::ranges::copy_if(chunk, + std::back_inserter(buffer), + std::ref(pred), + std::ref(proj)); + std::ranges::move(buffer, + out + fetch_and_add(index, buffer.size())); + buffer.clear(); + } + }); + return out + index; } }; /// @copydoc CopyIf -inline constexpr CopyIf copy_if{}; +inline constexpr UnstableCopyIf unstable_copy_if{}; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Transformation operations. +// /// Parallel transform. -struct Transform final { - template @@ -139,15 +267,18 @@ struct Transform final { std::indirect_result_t< Func&, std::projected, Proj>>> - auto operator()(Range&& range, OutIter out, Func func, Proj proj = {}) const - -> OutIter { - TIT_ASSUME_UNIVERSAL(Range, range); + auto operator()(Partitioner partitioner, + Range&& range, + OutIter out, + Func func, + Proj proj = {}) const -> OutIter { const auto out_end = std::next(out, std::size(range)); - for_each(std::views::zip(range, std::ranges::subrange{out, out_end}), - [&func, &proj](Pair&& source_and_dest) { - TIT_ASSUME_UNIVERSAL(Pair, source_and_dest); - auto&& [source, dest] = source_and_dest; - dest = std::invoke(func, std::invoke(proj, source)); + for_each(std::move(partitioner), + std::views::zip(std::ranges::subrange{out, out_end}, + std::views::as_const(std::forward(range))), + [&func, &proj](auto arg) { + std::get<0>(arg) = + std::invoke(func, std::invoke(proj, std::get<1>(arg))); }); return out_end; } @@ -157,21 +288,24 @@ struct Transform final { inline constexpr Transform transform{}; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Sorting operations. +// /// Parallel sort. struct Sort final { template, class Proj = std::identity> requires std::sortable, Compare, Proj> - static void operator()(Range&& range, Compare compare = {}, Proj proj = {}) { + void operator()(Range&& range, Compare compare = {}, Proj proj = {}) const { TIT_ASSUME_UNIVERSAL(Range, range); - tbb::parallel_sort( // - std::begin(range), - std::end(range), - [&compare, &proj](const auto& a, const auto& b) { - return std::invoke(compare, - std::invoke(proj, a), - std::invoke(proj, b)); - }); + using Ref = std::ranges::range_reference_t; + tbb::parallel_sort(std::begin(range), + std::end(range), + [&compare, &proj](Ref& a, Ref& b) { + return std::invoke(compare, + std::invoke(proj, a), + std::invoke(proj, b)); + }); } }; diff --git a/source/tit/core/par/algorithms.test.cpp b/source/tit/core/par/algorithms.test.cpp index e899d0d4..8ebbcba0 100644 --- a/source/tit/core/par/algorithms.test.cpp +++ b/source/tit/core/par/algorithms.test.cpp @@ -4,10 +4,9 @@ \* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ #include -#include +#include #include #include -#include #include #include "tit/core/basic_types.hpp" @@ -25,57 +24,84 @@ namespace { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// A wrapper for a function that sleeps for a given amount of time. +template +class SleepFunc final { +public: + + // Initialize a sleep function. + constexpr explicit SleepFunc(Func func, + std::chrono::milliseconds duration = + std::chrono::milliseconds(10)) noexcept + : func_{std::move(func)}, duration_{duration} {} + + // Sleep for the specified amount of time and call the function. + template + requires std::invocable + constexpr auto operator()(Args&&... args) const -> decltype(auto) { + std::this_thread::sleep_for(duration_); + return func_(std::forward(args)...); + } + +private: + + Func func_; + std::chrono::milliseconds duration_; + +}; // class SleepFunc + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + TEST_CASE("par::for_each") { par::set_num_threads(4); - std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; SUBCASE("basic") { // Ensure the loop is executed. - par::for_each(data, [](int& i) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - i += 1; - }); + par::for_each(data, SleepFunc{[](int& i) { i += 1; }}); CHECK(data == std::vector{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); } SUBCASE("exceptions") { // Ensure the exceptions from the worker threads are caught. const auto loop = [&data] { - par::for_each(data, [](int i) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - if (i == 7) throw std::runtime_error{"Loop failed!"}; - }); + par::for_each( // + data, + SleepFunc{[](int i) { + if (i == 7) throw std::runtime_error{"Algorithm failed!"}; + }}); FAIL("Loop should have thrown an exception!"); }; - CHECK_THROWS_WITH_AS(loop(), "Loop failed!", std::runtime_error); + CHECK_THROWS_WITH_AS(loop(), "Algorithm failed!", std::runtime_error); } } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -TEST_CASE("par::static_for_each") { +TEST_CASE("par::deterministic_for_each") { par::set_num_threads(4); - std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; std::vector indices(data.size()); SUBCASE("basic") { // Ensure the loop is executed and the thread distribution is correct. - par::static_for_each( // - std::views::zip(data, indices), - [](size_t thread_index, auto out_pair) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - auto& [i, out_thread_index] = out_pair; + par::deterministic_for_each( // + std::views::zip(indices, data), + SleepFunc{[](auto out_pair, size_t thread_index) { + auto& [out_thread_index, i] = out_pair; out_thread_index = thread_index, i += 1; - }); + }}); CHECK(data == std::vector{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); CHECK(indices == std::vector{0, 0, 0, 1, 1, 1, 2, 2, 3, 3}); } SUBCASE("exceptions") { // Ensure the exceptions from worker threads are caught. const auto loop = [&data] { - par::static_for_each(data, [](size_t /*thread_index*/, int i) { - if (i == 7) throw std::runtime_error{"Loop failed!"}; - }); + par::deterministic_for_each( // + data, + SleepFunc{[](int i, size_t /*thread_index*/) { + if (i == 7) throw std::runtime_error{"Algorithm failed!"}; + }}); FAIL("Loop should have thrown an exception!"); }; - CHECK_THROWS_WITH_AS(loop(), "Loop failed!", std::runtime_error); + CHECK_THROWS_WITH_AS(loop(), "Algorithm failed!", std::runtime_error); } } @@ -89,37 +115,62 @@ TEST_CASE("par::block_for_each") { // Ensure the loop is executed. /// @todo This test does not really test that the iterations are done in /// in chunks, it is hard to test that without exposing the internals. - par::block_for_each(data, [](int& i) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - i += 1; - }); + par::block_for_each(data, SleepFunc{[](int& i) { i += 1; }}); CHECK(data == VectorOfVectors{{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}}); } SUBCASE("exceptions") { // Ensure the exceptions from the worker threads are caught. const auto loop = [&data] { - par::block_for_each(data, [](int i) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - if (i == 7) throw std::runtime_error{"Loop failed!"}; - }); + par::block_for_each( // + data, + SleepFunc{[](int i) { + if (i == 7) throw std::runtime_error{"Algorithm failed!"}; + }}); FAIL("Loop should have thrown an exception!"); }; - CHECK_THROWS_WITH_AS(loop(), "Loop failed!", std::runtime_error); + CHECK_THROWS_WITH_AS(loop(), "Algorithm failed!", std::runtime_error); } } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -TEST_CASE("par::copy_if") { +TEST_CASE("par::fold") { par::set_num_threads(4); - std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + const std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + SUBCASE("basic") { + // Ensure the loop is executed. + const auto result = par::fold(data, 0, SleepFunc{std::plus{}}, std::plus{}); + CHECK(result == 45); + } + SUBCASE("exceptions") { + // Ensure the exceptions from the worker threads are caught. + const auto algorithm = [&data] { + par::fold( // + data, + 0, + SleepFunc{[](int partial, int i) { + if (i == 7) throw std::runtime_error{"Algorithm failed!"}; + return partial + i; + }}, + std::plus{}); + FAIL("Algorithm should have thrown an exception!"); + }; + CHECK_THROWS_WITH_AS(algorithm(), "Algorithm failed!", std::runtime_error); + } +} + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +TEST_CASE("par::unstable_copy_if") { + par::set_num_threads(4); + std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; std::vector out(data.size()); SUBCASE("basic") { // Ensure the loop is executed. - const auto iter = par::copy_if(data, out.begin(), [](int i) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - return i % 2 == 0; - }); + const auto iter = par::unstable_copy_if( // + data, + out.begin(), + SleepFunc{[](int i) { return i % 2 == 0; }}); CHECK(iter == out.begin() + 5); const auto out_range = std::ranges::subrange(out.begin(), iter); std::ranges::sort(out_range); @@ -128,11 +179,13 @@ TEST_CASE("par::copy_if") { SUBCASE("exceptions") { // Ensure the exceptions from the worker threads are caught. const auto algorithm = [&data, &out] { - par::copy_if(data, out.begin(), [](int i) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - if (i == 7) throw std::runtime_error{"Algorithm failed!"}; - return i % 2 == 0; - }); + par::unstable_copy_if( // + data, + out.begin(), + SleepFunc{[](int i) { + if (i == 7) throw std::runtime_error{"Algorithm failed!"}; + return i % 2 == 0; + }}); FAIL("Algorithm should have thrown an exception!"); }; CHECK_THROWS_WITH_AS(algorithm(), "Algorithm failed!", std::runtime_error); @@ -143,25 +196,27 @@ TEST_CASE("par::copy_if") { TEST_CASE("par::transform") { par::set_num_threads(4); - std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; std::vector out(data.size()); SUBCASE("basic") { // Ensure the loop is executed. - const auto iter = par::transform(data, out.begin(), [](int i) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - return 2 * i + 1; - }); + const auto iter = par::transform( // + data, + out.begin(), + SleepFunc{[](int i) { return 2 * i + 1; }}); CHECK(iter == out.end()); CHECK_RANGE_EQ(out, std::vector{1, 3, 5, 7, 9, 11, 13, 15, 17, 19}); } SUBCASE("exceptions") { // Ensure the exceptions from the worker threads are caught. const auto algorithm = [&data, &out] { - par::transform(data, out.begin(), [](int i) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - if (i == 7) throw std::runtime_error{"Algorithm failed!"}; - return 2 * i + 1; - }); + par::transform( // + data, + out.begin(), + SleepFunc{[](int i) { + if (i == 7) throw std::runtime_error{"Algorithm failed!"}; + return 2 * i + 1; + }}); FAIL("Algorithm should have thrown an exception!"); }; CHECK_THROWS_WITH_AS(algorithm(), "Algorithm failed!", std::runtime_error); diff --git a/source/tit/core/par/partitioner.hpp b/source/tit/core/par/partitioner.hpp new file mode 100644 index 00000000..a6f5642d --- /dev/null +++ b/source/tit/core/par/partitioner.hpp @@ -0,0 +1,100 @@ +/* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ *| + * Part of BlueTit Solver, licensed under Apache 2.0 with Commons Clause. + * Commercial use, including SaaS, requires a separate license, see /LICENSE.md +\* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ + +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include "tit/core/basic_types.hpp" +#include "tit/core/missing.hpp" // IWYU pragma: keep +#include "tit/core/par/control.hpp" +#include "tit/core/uint_utils.hpp" +#include "tit/core/utils.hpp" + +// Mark the `tbb::blocked_range` as a view. +template +inline constexpr bool std::ranges::enable_view> = true; + +namespace tit::par { + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +/// Range that could be processed in parallel. +template +concept range = + std::ranges::common_range && std::ranges::sized_range && + std::ranges::random_access_range; + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +/// Blockified range. +template +using blocked_range_t = tbb::blocked_range>; + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +/// Automatic parallelization partitioner. +struct AutoPartitioner final { + /// Blockify the range. + template + auto blockify(Range&& range) const noexcept -> blocked_range_t { + TIT_ASSUME_UNIVERSAL(Range, range); + return {std::begin(range), std::end(range)}; + } +}; + +/// @copydoc AutoPartitioner +inline constexpr AutoPartitioner auto_{}; + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +/// Static parallelization partitioner. +struct StaticPartitioner final { + /// Blockify the range. + template + auto blockify(Range&& range) const noexcept -> blocked_range_t { + TIT_ASSUME_UNIVERSAL(Range, range); + const auto grain_size_hint = + divide_up(static_cast(std::size(range)), num_threads()); + return {std::begin(range), std::end(range), grain_size_hint}; + } +}; + +/// @copydoc StaticPartitioner +inline constexpr StaticPartitioner static_{}; + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +/// Partitioner type. +template +concept partitioner = std::same_as || // + std::same_as; + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +/// Helper base class that provides `operator()` overload when no partitioner +/// is specified. +struct AutoPartitionerInvoker { + /// Invoke the algorithm with the auto partitioner. + template Self, + class Arg, + class... RestArgs> + requires (!partitioner>) + auto operator()(this Self& self, Arg&& arg, RestArgs&&... rest_args) { + return self(auto_, + std::forward(arg), + std::forward(rest_args)...); + } +}; + +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +} // namespace tit::par diff --git a/source/tit/geom/bbox.hpp b/source/tit/geom/bbox.hpp index d9e63091..a720b2e6 100644 --- a/source/tit/geom/bbox.hpp +++ b/source/tit/geom/bbox.hpp @@ -101,6 +101,13 @@ class BBox final { return *this; } + /// Join the bounding box with another @p bbox. + constexpr auto join(const BBox& bbox) -> BBox& { + low_ = minimum(low_, bbox.low()); + high_ = maximum(high_, bbox.high()); + return *this; + } + /// Split the bounding box into two parts by the plane. constexpr auto split(size_t axis, const vec_num_t& val, diff --git a/source/tit/geom/bbox.test.cpp b/source/tit/geom/bbox.test.cpp index 3229444f..4d7ffeac 100644 --- a/source/tit/geom/bbox.test.cpp +++ b/source/tit/geom/bbox.test.cpp @@ -90,6 +90,13 @@ TEST_CASE("geom::BBox::intersect") { CHECK(box.high() == Vec{2.0, 2.0}); } +TEST_CASE("geom::BBox::join") { + geom::BBox box{Vec{0.0, 0.0}, Vec{2.0, 2.0}}; + box.join(geom::BBox{Vec{1.0, 1.0}, Vec{3.0, 3.0}}); + CHECK(box.low() == Vec{0.0, 0.0}); + CHECK(box.high() == Vec{3.0, 3.0}); +} + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ TEST_CASE("geom::BBox::split(plane)") { diff --git a/source/tit/geom/point_range.hpp b/source/tit/geom/point_range.hpp index ee7acb0a..2c91056b 100644 --- a/source/tit/geom/point_range.hpp +++ b/source/tit/geom/point_range.hpp @@ -14,6 +14,7 @@ #include "tit/core/basic_types.hpp" #include "tit/core/checks.hpp" #include "tit/core/mat.hpp" +#include "tit/core/par/algorithms.hpp" #include "tit/core/range_utils.hpp" #include "tit/core/utils.hpp" #include "tit/core/vec.hpp" @@ -67,8 +68,7 @@ template constexpr auto compute_center(Points&& points) -> point_range_vec_t { TIT_ASSUME_UNIVERSAL(Points, points); TIT_ASSERT(!std::ranges::empty(points), "Points must not be empty!"); - auto sum = *std::begin(points); - for (const auto& point : points | std::views::drop(1)) sum += point; + const auto sum = par::fold(par::static_, points); return sum / count_points(points); } template @@ -91,9 +91,13 @@ constexpr auto compute_bbox(Points&& points) -> point_range_bbox_t { #if !defined(TIT_HAVE_GCOV) || !TIT_HAVE_GCOV TIT_ASSERT(!std::ranges::empty(points), "Points must not be empty!"); #endif - BBox box{*std::begin(points)}; - for (const auto& point : points | std::views::drop(1)) box.expand(point); - return box; + using Box = point_range_bbox_t; + return par::fold( + par::static_, + points, + Box{*std::begin(points)}, + [](Box partial, const auto& point) { return partial.expand(point); }, + [](Box partial, const auto& other) { return partial.join(other); }); } template constexpr auto compute_bbox(Points&& points, Perm&& perm) @@ -116,12 +120,20 @@ constexpr auto compute_inertia_tensor(Points&& points) -> point_range_mat_t { TIT_ASSUME_UNIVERSAL(Points, points); TIT_ASSERT(!std::ranges::empty(points), "Points must not be empty!"); - auto sum = *std::begin(points); - auto inertia_tensor = outer_sqr(sum); - for (const auto& point : points | std::views::drop(1)) { - sum += point; - inertia_tensor += outer_sqr(point); - } + struct Result { + point_range_vec_t sum; + point_range_mat_t tensor; + }; + auto [sum, inertia_tensor] = par::fold( + par::static_, + points, + Result{}, + [](Result partial, const auto& point) -> Result { + return {partial.sum + point, partial.tensor + outer_sqr(point)}; + }, + [](Result partial, const Result& other) -> Result { + return {partial.sum + other.sum, partial.tensor + other.tensor}; + }); const auto center = sum / count_points(points); inertia_tensor -= outer(sum, center); return inertia_tensor; diff --git a/source/tit/sph/fluid_equations.hpp b/source/tit/sph/fluid_equations.hpp index 8703c7fd..a363f04b 100644 --- a/source/tit/sph/fluid_equations.hpp +++ b/source/tit/sph/fluid_equations.hpp @@ -200,7 +200,7 @@ class FluidEquations final { using PV = ParticleView; // Clean-up continuity equation fields and apply source terms. - par::for_each(particles.all(), [this](PV a) { + par::for_each(par::static_, particles.all(), [this](PV a) { // Clean-up continuity equation fields. drho_dt[a] = {}; if constexpr (has(grad_rho)) grad_rho[a] = {}; @@ -252,7 +252,7 @@ class FluidEquations final { }); // Renormalize fields. - par::for_each(particles.all(), [](PV a) { + par::for_each(par::static_, particles.all(), [](PV a) { // Renormalize density, if possible. if constexpr (has(C)) { if (!is_tiny(C[a])) rho[a] /= C[a]; @@ -296,7 +296,7 @@ class FluidEquations final { // Clean-up momentum and energy equation fields, compute pressure, // sound speed and apply source terms. - par::for_each(particles.all(), [this](PV a) { + par::for_each(par::static_, particles.all(), [this](PV a) { // Clean-up momentum and energy equation fields. dv_dt[a] = {}; if constexpr (has(div_v)) div_v[a] = {}; @@ -370,7 +370,7 @@ class FluidEquations final { // Compute artificial viscosity switch. if constexpr (has(dalpha_dt)) { - par::for_each(particles.fluid(), [this](PV a) { + par::for_each(par::static_, particles.fluid(), [this](PV a) { dalpha_dt[a] = momentum_equation_.artificial_viscosity().switch_source(a); }); @@ -405,8 +405,13 @@ class FluidEquations final { const auto a_0 = particles[0]; const auto FS_FAR = 2 * CFL * Ma * pow2(h[a_0]); static constexpr auto FS_ON = std::numeric_limits::min(); - par::for_each(particles.fluid(), [](PV a) { FS[a] = FS_ON, dr[a] = {}; }); - par::for_each(particles.fixed(), [FS_FAR](PV a) { FS[a] = FS_FAR; }); + par::for_each(par::static_, particles.fluid(), [](PV a) { + FS[a] = FS_ON; + dr[a] = {}; + }); + par::for_each(par::static_, particles.fixed(), [FS_FAR](PV a) { + FS[a] = FS_FAR; + }); // Classify the particles into free surface and non-free surface. // diff --git a/source/tit/sph/particle_mesh.hpp b/source/tit/sph/particle_mesh.hpp index 01ae104f..2b3d65ce 100644 --- a/source/tit/sph/particle_mesh.hpp +++ b/source/tit/sph/particle_mesh.hpp @@ -238,8 +238,9 @@ class ParticleMesh final { if (is_first_level) { interface.resize(particles.size()); const auto all_particles = iota_perm(particles.all()); - const auto not_interface_iter = - par::copy_if(all_particles, interface.begin(), is_interface); + const auto not_interface_iter = par::unstable_copy_if(all_particles, + interface.begin(), + is_interface); interface.erase(not_interface_iter, interface.end()); } else { // Note: `std::ranges::partition` is faster than `std::erase_if` diff --git a/source/tit/sph/time_integrator.hpp b/source/tit/sph/time_integrator.hpp index f442e67e..f3405d77 100644 --- a/source/tit/sph/time_integrator.hpp +++ b/source/tit/sph/time_integrator.hpp @@ -67,13 +67,14 @@ class KickDriftIntegrator final { // Update particle density. equations_.compute_density(mesh, particles); if constexpr (has(drho_dt)) { - par::for_each(particles.fluid(), - [dt](PV a) { rho[a] += dt * drho_dt[a]; }); + par::for_each(par::static_, particles.fluid(), [dt](PV a) { + rho[a] += dt * drho_dt[a]; + }); } // Update particle velocty, internal energy, etc. equations_.compute_forces(mesh, particles); - par::for_each(particles.fluid(), [dt](PV a) { + par::for_each(par::static_, particles.fluid(), [dt](PV a) { v[a] += dt * dv_dt[a]; r[a] += dt * v[a]; // Kick-Drift: position is updated after velocity. if constexpr (has(u, du_dt)) u[a] += dt * du_dt[a]; @@ -83,7 +84,9 @@ class KickDriftIntegrator final { // Apply particle shifting. if constexpr (has(dr)) { equations_.compute_shifts(mesh, particles); - par::for_each(particles.fluid(), [](PV a) { r[a] += dr[a]; }); + par::for_each(par::static_, particles.fluid(), [](PV a) { + r[a] += dr[a]; + }); } // Increment step index. @@ -141,7 +144,7 @@ class KickDriftKickIntegrator final { // step. const auto dt_2 = dt / 2; equations_.compute_forces(mesh, particles); - par::for_each(particles.fluid(), [dt, dt_2](PV a) { + par::for_each(par::static_, particles.fluid(), [dt, dt_2](PV a) { v[a] += dt_2 * dv_dt[a]; r[a] += dt * v[a]; // Kick-Drift: position is updated after velocity. if constexpr (has(u, du_dt)) u[a] += dt_2 * du_dt[a]; @@ -151,22 +154,25 @@ class KickDriftKickIntegrator final { // Update particle velocity to the full step. equations_.compute_density(mesh, particles); if constexpr (has(drho_dt)) { - par::for_each(particles.fluid(), - [dt](PV a) { rho[a] += dt * drho_dt[a]; }); + par::for_each(par::static_, particles.fluid(), [dt](PV a) { + rho[a] += dt * drho_dt[a]; + }); } // Update particle velocity to the full step. equations_.compute_forces(mesh, particles); - par::for_each(particles.fluid(), [dt_2](PV a) { + par::for_each(par::static_, particles.fluid(), [dt_2](PV a) { v[a] += dt_2 * dv_dt[a]; // Kick. if constexpr (has(u, du_dt)) u[a] += dt_2 * du_dt[a]; if constexpr (has(alpha, dalpha_dt)) alpha[a] += dt_2 * dalpha_dt[a]; }); - // Apply particle shifting, if necessary. + // Apply particle shifting. if constexpr (has(dr)) { equations_.compute_shifts(mesh, particles); - par::for_each(particles.fluid(), [](PV a) { r[a] += dr[a]; }); + par::for_each(par::static_, particles.fluid(), [](PV a) { + r[a] += dr[a]; + }); } // Increment step index. @@ -223,10 +229,12 @@ class RungeKuttaIntegrator final { substep_(dt, mesh, particles); lincomb_(1.0 / 3.0, old_particles, 2.0 / 3.0, particles); - // Apply particle shifting, if necessary. + // Apply particle shifting. if constexpr (has(dr)) { equations_.compute_shifts(mesh, particles); - par::for_each(particles.fluid(), [](PV a) { r[a] += dr[a]; }); + par::for_each(par::static_, particles.fluid(), [](PV a) { + r[a] += dr[a]; + }); } // Increment step index. @@ -249,7 +257,7 @@ class RungeKuttaIntegrator final { equations_.compute_forces(mesh, particles); // Integrate. - par::for_each(particles.fluid(), [dt](PV a) { + par::for_each(par::static_, particles.fluid(), [dt](PV a) { r[a] += dt * v[a]; // Drift-Kick: position is updated before velocity. v[a] += dt * dv_dt[a]; if constexpr (has(drho_dt)) rho[a] += dt * drho_dt[a]; @@ -260,12 +268,13 @@ class RungeKuttaIntegrator final { // Compute the linear combination of the two substeps. template ParticleArray> - void lincomb_(particle_num_t weight, - const ParticleArray& particles, - particle_num_t out_weight, - ParticleArray& out_particles) { + static void lincomb_(particle_num_t weight, + const ParticleArray& particles, + particle_num_t out_weight, + ParticleArray& out_particles) { using PV = ParticleView; par::for_each( // + par::static_, out_particles.fluid(), [out_weight, weight, &particles](PV out_a) { const auto a = particles[out_a.index()];