diff --git a/benchmarks/gbench/common/sort.cpp b/benchmarks/gbench/common/sort.cpp index 52698199cc..e95f33d411 100644 --- a/benchmarks/gbench/common/sort.cpp +++ b/benchmarks/gbench/common/sort.cpp @@ -15,29 +15,38 @@ template void fill_random(X &&x) { class DRSortFixture : public benchmark::Fixture { protected: xhp::distributed_vector *a; + xhp::distributed_vector *vec; + std::vector local_vec; public: void SetUp(::benchmark::State &) { a = new xhp::distributed_vector(default_vector_size); - std::vector local(default_vector_size); - fill_random(local); - xhp::copy(local, rng::begin(*a)); + vec = new xhp::distributed_vector(default_vector_size); + local_vec = std::vector(default_vector_size); + fill_random(local_vec); + xhp::copy(local_vec, rng::begin(*a)); } - void TearDown(::benchmark::State &) { delete a; } + void TearDown(::benchmark::State &state) { + // copy back to check if last sort really sorted + xhp::copy(*vec, rng::begin(local_vec)); + delete a; + delete vec; + + if (!rng::is_sorted(local_vec)) { + state.SkipWithError("mhp sort did not sort the vector"); + } + } }; BENCHMARK_DEFINE_F(DRSortFixture, Sort_DR)(benchmark::State &state) { Stats stats(state, sizeof(T) * a->size()); - xhp::distributed_vector vec(a->size()); for (auto _ : state) { state.PauseTiming(); - xhp::copy(*a, rng::begin(vec)); + xhp::copy(*a, rng::begin(*vec)); stats.rep(); state.ResumeTiming(); - - // sort not implemented in mhp yet - xhp::sort(vec); + xhp::sort(*vec); } } diff --git a/include/dr/detail/communicator.hpp b/include/dr/detail/communicator.hpp index 596a298bae..331253ab63 100644 --- a/include/dr/detail/communicator.hpp +++ b/include/dr/detail/communicator.hpp @@ -4,6 +4,10 @@ #pragma once +#define MPI_SUPPORTS_RGET_C \ + (MPI_VERSION >= 4) || \ + (defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000)) + namespace dr { class communicator { @@ -193,10 +197,10 @@ class communicator { assert(rng::size(recvcnt) == size_); assert(rng::size(recvdsp) == size_); - std::vector _sendcnt(size_); - std::vector _senddsp(size_); - std::vector _recvcnt(size_); - std::vector _recvdsp(size_); + std::vector _sendcnt(size_); + std::vector _senddsp(size_); + std::vector _recvcnt(size_); + std::vector _recvdsp(size_); rng::transform(sendcnt, _sendcnt.begin(), [](auto e) { return e * sizeof(valT); }); @@ -207,9 +211,10 @@ class communicator { rng::transform(recvdsp, _recvdsp.begin(), [](auto e) { return e * sizeof(valT); }); - MPI_Alltoallv(rng::data(sendbuf), rng::data(_sendcnt), rng::data(_senddsp), - MPI_BYTE, rng::data(recvbuf), rng::data(_recvcnt), - rng::data(_recvdsp), MPI_BYTE, mpi_comm_); + MPI_Alltoallv_c(rng::data(sendbuf), rng::data(_sendcnt), + rng::data(_senddsp), MPI_BYTE, rng::data(recvbuf), + rng::data(_recvcnt), rng::data(_recvdsp), MPI_BYTE, + mpi_comm_); } bool operator==(const communicator &other) const { @@ -254,7 +259,15 @@ class rma_window { std::size_t disp) const { DRLOG("MPI comm get:: ({}:{}:{})", rank, disp, size); MPI_Request request; +#if (MPI_VERSION >= 4) || \ + (defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000)) + MPI_Rget_c(dst, size, MPI_BYTE, rank, disp, size, MPI_BYTE, win_, &request); +#else + assert( + size <= (std::size_t)INT_MAX && + "MPI API requires origin_count to be positive signed 32-bit integer"); MPI_Rget(dst, size, MPI_BYTE, rank, disp, size, MPI_BYTE, win_, &request); +#endif MPI_Wait(&request, MPI_STATUS_IGNORE); } @@ -266,7 +279,18 @@ class rma_window { std::size_t disp) const { DRLOG("MPI comm put:: ({}:{}:{})", rank, disp, size); MPI_Request request; + +#if (MPI_VERSION >= 4) || \ + (defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000)) + MPI_Rput_c(src, size, MPI_BYTE, rank, disp, size, MPI_BYTE, win_, &request); +#else + // MPI_Rput origin_count is 32-bit signed int - check range + assert( + size <= (std::size_t)INT_MAX && + "MPI API requires origin_count to be positive signed 32-bit integer"); MPI_Rput(src, size, MPI_BYTE, rank, disp, size, MPI_BYTE, win_, &request); +#endif + DRLOG("MPI comm wait:: ({}:{}:{})", rank, disp, size); MPI_Wait(&request, MPI_STATUS_IGNORE); DRLOG("MPI comm wait finished:: ({}:{}:{})", rank, disp, size); diff --git a/include/dr/mhp/algorithms/sort.hpp b/include/dr/mhp/algorithms/sort.hpp index b889868967..de2855e652 100644 --- a/include/dr/mhp/algorithms/sort.hpp +++ b/include/dr/mhp/algorithms/sort.hpp @@ -139,7 +139,6 @@ template void local_sort(R &r, Compare &&comp) { template void _find_split_idx(std::size_t &vidx, std::size_t &segidx, Compare &&comp, auto &ls, auto &vec_v, auto &vec_i, auto &vec_s) { - while (vidx < default_comm().size() && segidx < rng::size(ls)) { if (comp(vec_v[vidx - 1], ls[segidx])) { vec_i[vidx] = segidx; @@ -231,26 +230,26 @@ void splitters(Seg &lsegment, Compare &&comp, } template -void shift_data(const int shift_left, const int shift_right, +void shift_data(const int64_t shift_left, const int64_t shift_right, buffer &vec_recvdata, buffer &vec_left, buffer &vec_right) { - const std::size_t _comm_rank = default_comm().rank(); MPI_Request req_l, req_r; MPI_Status stat_l, stat_r; - assert(static_cast(rng::size(vec_left)) == std::max(0, shift_left)); - assert(static_cast(rng::size(vec_right)) == std::max(0, shift_right)); + assert(static_cast(rng::size(vec_left)) == std::max(0L, shift_left)); + assert(static_cast(rng::size(vec_right)) == + std::max(0L, shift_right)); - if (static_cast(rng::size(vec_recvdata)) < -shift_left) { + if (static_cast(rng::size(vec_recvdata)) < -shift_left) { // Too little data in recv buffer to shift left - first get from right, // then send left DRLOG("Get from right first, recvdata size {} shift left {}", rng::size(vec_recvdata), shift_left); // ** This will never happen, because values eq to split go left ** assert(false); - } else if (static_cast(rng::size(vec_recvdata)) < -shift_right) { + } else if (static_cast(rng::size(vec_recvdata)) < -shift_right) { // Too little data in buffer to shift right - first get from left, then // send right assert(shift_left > 0); @@ -280,18 +279,16 @@ void shift_data(const int shift_left, const int shift_right, MPI_Wait(&req_r, &stat_r); } else { // enough data in recv buffer - if (shift_left < 0) { default_comm().isend(rng::data(vec_recvdata), -shift_left, _comm_rank - 1, &req_l); } else if (shift_left > 0) { - assert(shift_left == static_cast(rng::size(vec_left))); + assert(shift_left == static_cast(rng::size(vec_left))); default_comm().irecv(rng::data(vec_left), rng::size(vec_left), _comm_rank - 1, &req_l); } - if (shift_right > 0) { - assert(shift_right == static_cast(rng::size(vec_right))); + assert(shift_right == static_cast(rng::size(vec_right))); default_comm().irecv(rng::data(vec_right), rng::size(vec_right), _comm_rank + 1, &req_r); } else if (shift_right < 0) { @@ -299,7 +296,6 @@ void shift_data(const int shift_left, const int shift_right, shift_right, -shift_right, _comm_rank + 1, &req_r); } - if (shift_left != 0) MPI_Wait(&req_l, &stat_l); if (shift_right != 0) @@ -308,11 +304,11 @@ void shift_data(const int shift_left, const int shift_right, } template -void copy_results(auto &lsegment, const int shift_left, const int shift_right, - buffer &vec_recvdata, buffer &vec_left, - buffer &vec_right) { - const std::size_t invalidate_left = std::max(-shift_left, 0); - const std::size_t invalidate_right = std::max(-shift_right, 0); +void copy_results(auto &lsegment, const int64_t shift_left, + const int64_t shift_right, buffer &vec_recvdata, + buffer &vec_left, buffer &vec_right) { + const std::size_t invalidate_left = std::max(-shift_left, 0L); + const std::size_t invalidate_right = std::max(-shift_right, 0L); const std::size_t size_l = rng::size(vec_left); const std::size_t size_r = rng::size(vec_right); @@ -355,7 +351,6 @@ void copy_results(auto &lsegment, const int shift_left, const int shift_right, template void dist_sort(R &r, Compare &&comp) { - using valT = typename R::value_type; const std::size_t _comm_rank = default_comm().rank(); @@ -370,6 +365,8 @@ void dist_sort(R &r, Compare &&comp) { std::vector vec_recv_elems(_comm_size, 0); std::size_t _total_elems = 0; + DRLOG("Rank {}: Dist sort, local segment size {}", default_comm().rank(), + rng::size(lsegment)); __detail::local_sort(lsegment, comp); /* find splitting values - limits of areas to send to other processes */ @@ -383,12 +380,8 @@ void dist_sort(R &r, Compare &&comp) { /* send and receive data belonging to each node, then redistribute * data to achieve size of data equal to size of local segment */ - - /* TODO: all_gather() below can be asynchronous - to be verified in CI - * (currently hangs in CI unit tests, but going well when started manually) - */ + /* async all_gather causes problems on some systems */ // MPI_Request req_recvelems; - // default_comm().i_all_gather(_recv_elems, vec_recv_elems, &req_recvelems); default_comm().all_gather(_recv_elems, vec_recv_elems); /* buffer for received data */ @@ -402,13 +395,12 @@ void dist_sort(R &r, Compare &&comp) { /* TODO: vec recvdata is partially sorted, implementation of merge on GPU is * desirable */ __detail::local_sort(vec_recvdata, comp); - // MPI_Wait(&req_recvelems, MPI_STATUS_IGNORE); _total_elems = std::reduce(vec_recv_elems.begin(), vec_recv_elems.end()); /* prepare data for shift to neighboring processes */ - std::vector vec_shift(_comm_size - 1); + std::vector vec_shift(_comm_size - 1); const auto desired_elems_num = (_total_elems + _comm_size - 1) / _comm_size; @@ -417,12 +409,12 @@ void dist_sort(R &r, Compare &&comp) { vec_shift[_i] = vec_shift[_i - 1] + desired_elems_num - vec_recv_elems[_i]; } - const int shift_left = _comm_rank == 0 ? 0 : -vec_shift[_comm_rank - 1]; - const int shift_right = + const int64_t shift_left = _comm_rank == 0 ? 0 : -vec_shift[_comm_rank - 1]; + const int64_t shift_right = _comm_rank == _comm_size - 1 ? 0 : vec_shift[_comm_rank]; - buffer vec_left(std::max(shift_left, 0)); - buffer vec_right(std::max(shift_right, 0)); + buffer vec_left(std::max(shift_left, 0L)); + buffer vec_right(std::max(shift_right, 0L)); /* shift data if necessary, to have exactly the number of elements equal to * lsegment size */ @@ -432,7 +424,6 @@ void dist_sort(R &r, Compare &&comp) { /* copy results to distributed vector's local segment */ __detail::copy_results(lsegment, shift_left, shift_right, vec_recvdata, vec_left, vec_right); - } // __detail::dist_sort } // namespace __detail @@ -446,6 +437,7 @@ void sort(R &r, Compare &&comp = Compare()) { std::size_t _comm_size = default_comm().size(); // dr-style ignore if (_comm_size == 1) { + DRLOG("mhp::sort() - one node only"); auto &&lsegment = local_segment(r); __detail::local_sort(lsegment, comp); @@ -453,7 +445,7 @@ void sort(R &r, Compare &&comp = Compare()) { /* Distributed vector of size <= (comm_size-1) * (comm_size-1) may have * 0-size local segments. It is also small enough to prefer sequential sort */ - DRLOG("mhp::sort() - local sort"); + DRLOG("mhp::sort() - local sort on node 0"); std::vector vec_recvdata(rng::size(r)); dr::mhp::copy(0, r, rng::begin(vec_recvdata)); diff --git a/include/dr/mhp/containers/distributed_vector.hpp b/include/dr/mhp/containers/distributed_vector.hpp index eeb4b322a4..8958eea407 100644 --- a/include/dr/mhp/containers/distributed_vector.hpp +++ b/include/dr/mhp/containers/distributed_vector.hpp @@ -38,7 +38,20 @@ class MpiBackend { "segm_offset:{}, size:{}, peer:{})", dst, offset, datalen, segment_index); +#if (MPI_VERSION >= 4) || \ + (defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000)) + // 64-bit API inside win_.get(dst, datalen, segment_index, offset); +#else + for (std::size_t remainder = datalen, off = 0UL; remainder > 0;) { + std::size_t s = std::min(remainder, (std::size_t)INT_MAX); + DRLOG("{}:{} win_.get total {} now {} bytes at off {}, dst offset {}", + default_comm().rank(), __LINE__, datalen, s, off, offset + off); + win_.get((uint8_t *)dst + off, s, segment_index, offset + off); + off += s; + remainder -= s; + } +#endif } void putmem(void const *src, std::size_t offset, std::size_t datalen, @@ -46,7 +59,21 @@ class MpiBackend { DRLOG("calling MPI put(segm_offset:{}, " "src:{}, size:{}, peer:{})", offset, src, datalen, segment_index); + +#if (MPI_VERSION >= 4) || \ + (defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000)) + // 64-bit API inside win_.put(src, datalen, segment_index, offset); +#else + for (std::size_t remainder = datalen, off = 0UL; remainder > 0;) { + std::size_t s = std::min(remainder, (std::size_t)INT_MAX); + DRLOG("{}:{} win_.put {} bytes at off {}, dst offset {}", + default_comm().rank(), __LINE__, s, off, offset + off); + win_.put((uint8_t *)src + off, s, segment_index, offset + off); + off += s; + remainder -= s; + } +#endif } std::size_t getrank() { return win_.communicator().rank(); }