diff --git a/docs/src/reference/collective.md b/docs/src/reference/collective.md index c89e6f880..96c4b6cb1 100644 --- a/docs/src/reference/collective.md +++ b/docs/src/reference/collective.md @@ -13,6 +13,7 @@ MPI.Ibarrier MPI.Bcast! MPI.Bcast MPI.bcast +MPI.Ibcast! ``` ## Gather/Scatter @@ -57,6 +58,7 @@ MPI.Reduce! MPI.Reduce MPI.Allreduce! MPI.Allreduce +MPI.Iallreduce! MPI.Scan! MPI.Scan MPI.Exscan! diff --git a/src/collective.jl b/src/collective.jl index 9366fbf26..5d1b89c26 100644 --- a/src/collective.jl +++ b/src/collective.jl @@ -103,6 +103,32 @@ function bcast(obj, root::Integer, comm::Comm) return obj end + +""" + Ibcast!(buf, comm::Comm; root::Integer=0[, req::AbstractRequest = Request()]) + +Broadcast the buffer `buf` from `root` to all processes in `comm`. + +The operation is non-blocking, and the request object `req` can be used to wait +for the operation to complete. + +# External links +$(_doc_external("MPI_Ibcast")) +""" +Ibcast!(buf, comm::Comm; root::Integer=Cint(0)) = + Ibcast!(buf, root, comm) + +function Ibcast!(buf::Buffer, root::Integer, comm::Comm, req::AbstractRequest = Request()) + # int MPI_Ibcast(void *buffer, int count, MPI_Datatype datatype, int root, + # MPI_Comm comm, MPI_Request *request) + API.MPI_Ibcast(buf.data, buf.count, buf.datatype, root, comm, req) + return req +end +function Ibcast!(data, root::Integer, comm::Comm) + Ibcast!(Buffer(data), root, comm) +end + + """ Scatter!(sendbuf::Union{UBuffer,Nothing}, recvbuf, comm::Comm; root::Integer=0) @@ -775,6 +801,43 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) = Allreduce(obj::T, op, comm::Comm) where {T} = Allreduce!(Ref(obj), Ref{T}(), op, comm)[] +## Iallreduce + +# mutating +""" + Iallreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) + Iallreduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request()) + +Performs elementwise reduction using the operator `op` on the buffer `sendbuf`, +storing the result in the `recvbuf` of all processes in the group. + +If only one `sendrecvbuf` buffer is provided, then the operation is performed +in-place. + +The operation is non-blocking, and the request object `req` can be used to wait +for the operation to complete. + +# See also +- [`Op`](@ref) for details on reduction operators. + +# External links +$(_doc_external("MPI_Iallreduce")) +""" +function Iallreduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, comm::Comm, req::AbstractRequest=Request()) + # int MPI_Iallreduce(const void *sendbuf, void *recvbuf, int count, + # MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, + # MPI_Request *request) + API.MPI_Iallreduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, comm, req) + return req +end +Iallreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(rbuf, Op(op, eltype(rbuf)), comm, req) +Iallreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(RBuffer(sendbuf, recvbuf), op, comm, req) + +# inplace +Iallreduce!(buf, op, comm::Comm, req::AbstractRequest=Request()) = Iallreduce!(IN_PLACE, buf, op, comm, req) + ## Scan # mutating diff --git a/test/test_iallreduce.jl b/test/test_iallreduce.jl new file mode 100644 index 000000000..25b897242 --- /dev/null +++ b/test/test_iallreduce.jl @@ -0,0 +1,51 @@ +include("common.jl") + +MPI.Init() + +comm_size = MPI.Comm_size(MPI.COMM_WORLD) + +if ArrayType != Array || + MPI.MPI_LIBRARY == "MicrosoftMPI" && Sys.WORD_SIZE == 32 || + Sys.ARCH === :powerpc64le || Sys.ARCH === :ppc64le || + Sys.ARCH === :aarch64 || startswith(string(Sys.ARCH), "arm") + operators = [MPI.SUM, +] +else + operators = [MPI.SUM, +, (x,y) -> 2x+y-x] +end + +for T = [Int] + for dims = [1, 2, 3] + send_arr = ArrayType(zeros(T, Tuple(3 for i in 1:dims))) + send_arr[:] .= 1:length(send_arr) + synchronize() + + for op in operators + + # Non allocating version + recv_arr = ArrayType{T}(undef, size(send_arr)) + req = MPI.Iallreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) + sleep(rand()) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr + + # Assertions when output buffer too small + recv_arr = ArrayType{T}(undef, size(send_arr).-1) + @test_throws AssertionError MPI.Iallreduce!(send_arr, recv_arr, + op, MPI.COMM_WORLD) + # IN_PLACE + recv_arr = copy(send_arr) + synchronize() + req = MPI.Iallreduce!(recv_arr, op, MPI.COMM_WORLD) + sleep(rand()) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr + end + end +end + + +MPI.Barrier( MPI.COMM_WORLD ) + +GC.gc() +MPI.Finalize() +@test MPI.Finalized() diff --git a/test/test_ibcast.jl b/test/test_ibcast.jl new file mode 100644 index 000000000..f5bb378d4 --- /dev/null +++ b/test/test_ibcast.jl @@ -0,0 +1,35 @@ +include("common.jl") +using Random + +MPI.Init() + +comm = MPI.COMM_WORLD +root = 0 +matsize = (17,17) + +for T in MPITestTypes + # This test depends on the stability of the rng and we have observed with + # CUDA.jl that it is not guaranteed that the same number of rand calls will + # occur on each rank. (This is a hypothesis). To be sure we shall seed the rng + # just before we call rand. + Random.seed!(17) + A = ArrayType(rand(T, matsize)) + B = MPI.Comm_rank(comm) == root ? A : similar(A) + req = MPI.Ibcast!(B, comm; root=root) + sleep(rand()) + MPI.Wait(req) + @test B == A +end + +# Char +A = ['s', 't', 'a', 'r', ' ', 'w', 'a', 'r', 's'] +B = MPI.Comm_rank(comm) == root ? A : similar(A) +req = MPI.Ibcast!(B, comm; root=root) +sleep(rand()) +MPI.Wait(req) +@test B == A + + + +MPI.Finalize() +@test MPI.Finalized()