diff --git a/README.md b/README.md index 95d7d55c0..cadd877bf 100644 --- a/README.md +++ b/README.md @@ -166,14 +166,14 @@ Fields `j2mpi` and `mpi2j` of `MPIManager` are associative collections mapping j This launches a total of 5 processes, mpi rank 0 is the julia pid 1. mpi rank 1 is julia pid 2 and so on. -The program must call `MPI.start(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`. +The program must call `MPI.start_main_loop(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`. On mpi rank 0, it returns a `manager` which can be used with `@mpi_do` On other processes (i.e., the workers) the function does not return ### MPIManager ### (MPI transport - all processes execute MPI code) -`MPI.start` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport. +`MPI.start_main_loop` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport. `mpirun -np 5 julia 06-cman-transport.jl MPI` will run the example using MPI as transport. ## Julia MPI-only interface @@ -189,6 +189,11 @@ juliacomm = MPI.COMM_WORLD ccomm = MPI.CComm(juliacomm) ``` +### MPIWindowIOManager +This manager is started using the `MPI_WINDOW_IO` or `MPI_WINDOW_NOWAIT` transports. It uses asynchronous IO +based on MPI windows. The `MPI_WINDOW_NOWAIT` will only use the clustermanager for code preceeded by the `@cluster` +macro. See `test_windowcman.jl` and `test_windowcman_nowait.jl` for examples. + ### Currently wrapped MPI functions Convention: `MPI_Fun => MPI.Fun` diff --git a/src/MPI.jl b/src/MPI.jl index fa78b517e..bbeb8963b 100644 --- a/src/MPI.jl +++ b/src/MPI.jl @@ -19,6 +19,8 @@ include(depfile) include("mpi-base.jl") include("cman.jl") +include("window-io.jl") +include("window-cman.jl") const mpitype_dict = Dict{DataType, Cint}() const mpitype_dict_inverse = Dict{Cint, DataType}() diff --git a/src/cman.jl b/src/cman.jl index 2111bf0b4..2f53ec68d 100644 --- a/src/cman.jl +++ b/src/cman.jl @@ -1,31 +1,37 @@ import Base: kill export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do -export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL +export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL, MPI_WINDOW_IO, MPI_WINDOW_NOWAIT using Compat using Compat.Distributed import Compat.Sockets: connect, listenany, accept, IPv4, getsockname -################################################################################ -# MPI Cluster Manager -# Note: The cluster manager object lives only in the manager process, -# except for MPI_TRANSPORT_ALL +""" +MPI Cluster Manager +Note: The cluster manager object lives only in the manager process, +except for MPI_TRANSPORT_ALL and MPI_WINDOW_IO + +There are four different transport modes: -# There are three different transport modes: +MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This +allows interactive use from a Julia shell, using the familiar `addprocs` +interface. -# MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This -# allows interactive use from a Julia shell, using the familiar `addprocs` -# interface. +MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager +process. This corresponds to the "usual" way in which MPI is used in a +headless mode, e.g. submitted as a script to a queueing system. -# MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager -# process. This corresponds to the "usual" way in which MPI is used in a -# headless mode, e.g. submitted as a script to a queueing system. +TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its +communication between processes. MPI can still be used by the user. -# TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its -# communication between processes. MPI can still be used by the user. +MPI_WINDOW_IO: Uses the MPI shared memory model with passive communication on all processes. +The program must be started with mpirun or equivalent. -@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL +MPI_WINDOW_NOWAIT: Sets up a cluster manager, but only uses it for code enlosed in the @cluster +macro. All other code runs as regular MPI code (single program, multiple data). +""" +@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL MPI_WINDOW_IO MPI_WINDOW_NOWAIT mutable struct MPIManager <: ClusterManager np::Int # number of worker processes (excluding the manager process) @@ -319,8 +325,9 @@ end ################################################################################ # Alternative startup model: All Julia processes are started via an external # mpirun, and the user does not call addprocs. - -# Enter the MPI cluster manager's main loop (does not return on the workers) +""" +Enter the MPI cluster manager's main loop (does not return on the workers) +""" function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL; comm::MPI.Comm=MPI.COMM_WORLD) !MPI.Initialized() && MPI.Init() @@ -392,6 +399,10 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL; MPI.Finalize() exit() end + elseif mode == MPI_WINDOW_IO + start_window_worker(comm, true) + elseif mode == MPI_WINDOW_NOWAIT + start_window_worker(comm, false) else error("Unknown mode $mode") end diff --git a/src/window-cman.jl b/src/window-cman.jl new file mode 100644 index 000000000..6a4f8e29d --- /dev/null +++ b/src/window-cman.jl @@ -0,0 +1,187 @@ +import Base: kill +using Distributed +import Distributed: launch, kill, manage, connect +export MPIWindowIOManager, launch, kill, manage, connect, @cluster + +""" +Stores the buffers needed for communication, in one instance per rank. Loop stops when the stop_condition is triggered +""" +mutable struct MPIWindowIOManager <: ClusterManager + comm::MPI.Comm + connection_windows::Vector{WindowIO} + stdio_windows::Vector{WindowIO} + workers_wait::Bool + + function MPIWindowIOManager(comm::MPI.Comm, workers_wait::Bool) + nb_procs = MPI.Comm_size(comm) + connection_windows = Vector{WindowIO}(undef,nb_procs) + stdio_windows = Vector{WindowIO}(undef,nb_procs) + + for i in 1:nb_procs + connection_windows[i] = WindowIO(comm) + stdio_windows[i] = WindowIO(comm) + end + + # Make sure all windows are created before continuing + MPI.Barrier(comm) + + return new(comm, connection_windows, stdio_windows, workers_wait) + end +end + +# Closes all local MPI Windows in a manager. Must be called collectively on all ranks +function closeall(manager::MPIWindowIOManager) + for w in manager.connection_windows + close(w) + end + for w in manager.stdio_windows + close(w) + end +end + +function launch(mgr::MPIWindowIOManager, params::Dict, + instances::Array, cond::Condition) + try + nprocs = MPI.Comm_size(mgr.comm) + for cnt in 1:(nprocs-1) + push!(instances, WorkerConfig()) + end + notify(cond) + catch e + println("Error in MPI launch $e") + rethrow(e) + end +end + +function kill(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig) + @spawnat pid notify(_stop_requested) + Distributed.set_worker_state(Distributed.Worker(pid), Distributed.W_TERMINATED) +end + +function manage(mgr::MPIWindowIOManager, id::Integer, config::WorkerConfig, op::Symbol) end + +function connect(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig) + myrank = MPI.Comm_rank(mgr.comm) + if myrank == 0 + proc_stdio = mgr.stdio_windows[pid] + @async while !eof(proc_stdio) + try + println("\tFrom worker $(pid):\t$(readline(proc_stdio))") + catch e + end + end + end + return (mgr.connection_windows[pid], WindowWriter(mgr.connection_windows[myrank+1], pid-1)) +end + +function redirect_to_mpi(s::WindowWriter) + (rd, wr) = redirect_stdout() + @async while !eof(rd) && isopen(s.winio) + av = readline(rd) + if isopen(s.winio) + println(s,av) + flush(s) + end + end +end + +function checkworkers() + for w in workers() + if w != (@fetchfrom w myid()) + error("worker $w is not waiting") + end + end +end + +function notify_workers() + for w in workers() + @spawnat(w, notify(_stop_requested)) + end +end + +function wait_for_events() + global _stop_requested + wait(_stop_requested) +end + +""" +Initialize the current process as a Julia parallel worker. Must be called on all ranks. +If comm is not supplied, MPI is initialized and MPI_COMM_WORLD is used. +""" +function start_window_worker(comm::Comm, workers_wait) + rank = MPI.Comm_rank(comm) + N = MPI.Comm_size(comm) + + manager = MPIWindowIOManager(comm, workers_wait) + cookie = string(comm) + if length(cookie) > Distributed.HDR_COOKIE_LEN + cookie = cookie[1:Distributed.HDR_COOKIE_LEN] + end + + try + if rank == 0 + Distributed.cluster_cookie(cookie) + MPI.Barrier(comm) + addprocs(manager) + @assert nprocs() == N + @assert nworkers() == (N == 1 ? 1 : N-1) + + if !workers_wait + checkworkers() + notify_workers() + end + else + init_worker(cookie, manager) + MPI.Barrier(comm) + redirect_to_mpi(WindowWriter(manager.stdio_windows[rank+1], 0)) + for i in vcat([1], (rank+2):N) + # Receiving end of connections to all higher workers and master + Distributed.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1)) + end + + global _stop_requested = Condition() + wait_for_events() + end + catch e + Base.display_error(stderr,"exception $e on rank $rank",backtrace()) + end + + if workers_wait && rank != 0 + closeall(manager) + MPI.Finalize() + exit(0) + end + + return manager +end + +""" +Stop the manager. This closes all windows and calls MPI.Finalize on all workers +""" +function stop_main_loop(manager::MPIWindowIOManager) + if myid() != 1 + wait_for_events() + else + checkworkers() + if nprocs() > 1 + rmprocs(workers()) + end + end + closeall(manager) + MPI.Finalize() +end + +""" +Runs the given expression using the Julia parallel cluster. Useful when running with MPI_WINDOW_NOWAIT, +since this will temporarily activate the worker event loops to listen for messages. +""" +macro cluster(expr) + quote + if myid() != 1 + wait_for_events() + else + $(esc(expr)) + notify_workers() + end + end +end \ No newline at end of file diff --git a/src/window-io.jl b/src/window-io.jl new file mode 100644 index 000000000..ecd26b875 --- /dev/null +++ b/src/window-io.jl @@ -0,0 +1,291 @@ +export WindowIO, WindowWriter + +# Type of the number of available entries +const WinCountT = Cint + +mutable struct BufferHeader + count::WinCountT # Number of elements in the buffer + address::Cptrdiff_t + length::WinCountT # Current size of the buffer + needed_length::WinCountT # Size the buffer should have to handle all pending writes +end + +""" + + WindowIO(target::Integer, comm=MPI.COMM_WORLD, bufsize=1024^2) + +Expose an MPI RMA window using the IO interface. Must be constructed on all ranks in the communicator. +The target is the rank to which data is sent when calling write, comm is the communicator to use and +bufsize is the initial size of the buffer. A target may be written to by multiple ranks concurrently, +and the receive buffer will be grown as needed, but never shrinks. +Communication happens only when flush is called. +""" +mutable struct WindowIO <: IO + comm::MPI.Comm + myrank::Int + # Represents the received data. First elements contain a counter with the total number of entries in the buffer + buffer::Array{UInt8,1} + win::Win + header::BufferHeader + remote_header::BufferHeader + header_win::Win + is_open::Bool + # Current read position + ptr::WinCountT + data_available::Condition + read_requested::Condition + lock::ReentrantLock # Needed for Base + waiter + + function WindowIO(comm=MPI.COMM_WORLD, bufsize=1024^2) + buffer = Array{UInt8,1}(undef, bufsize) + header_win = MPI.Win() + header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) + remote_header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) + header_arr = unsafe_wrap(Vector{UInt8}, Ptr{UInt8}(pointer_from_objref(header)), sizeof(BufferHeader)) + MPI.Win_create(header_arr, MPI.INFO_NULL, comm, header_win) + win = MPI.Win() + MPI.Win_create_dynamic(MPI.INFO_NULL, comm, win) + MPI.Win_attach(win, buffer) + + w = new(comm, + MPI.Comm_rank(comm), + buffer, + win, + header, + remote_header, + header_win, + true, + 0, + Condition(), + Condition(), + ReentrantLock(), + nothing) + + w.waiter = Task(function() + wait(w.read_requested) + while w.is_open + while !has_data_available(w) && w.is_open + yield() + end + if w.is_open + notify(w.data_available) + wait(w.read_requested) + end + end + end) + + yield(w.waiter) + + return w + end +end + + +Base.nb_available(w::WindowIO)::WinCountT = w.header.count - w.ptr + +# Checks if data is available and grows the buffer if needed by the writing side +function has_data_available(w::WindowIO) + if !w.is_open + return false + end + + MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win) + have_data = w.header.count > w.ptr + need_grow = w.header.needed_length > w.header.length + MPI.Win_unlock(w.myrank, w.header_win) + + # Grow buffer if needed + if need_grow + MPI.Win_detach(w.win, w.buffer) + resize!(w.buffer, w.header.needed_length) + MPI.Win_attach(w.win, w.buffer) + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) + w.header.address = MPI.Get_address(w.buffer) + w.header.length = w.header.needed_length + MPI.Win_unlock(w.myrank, w.header_win) + end + + return have_data +end + +function Base.wait(w::WindowIO) + notify(w.read_requested) + wait(w.data_available) +end + +# Waits for data and returns the number of available bytes +function wait_nb_available(w) + if !has_data_available(w) + wait(w) + end + return nb_available(w) +end + +# wait until the specified number of bytes is available or the stream is closed +function wait_nb_available(w, nb) + nb_found = 0 + while nb_found < nb && w.is_open + nb_found = wait_nb_available(w) + end + return nb_found +end + +mutable struct WindowWriter <: IO + winio::WindowIO + target::Int + # Writes are buffered to only lock and communicate upon flush + write_buffer::Vector{UInt8} + lock::ReentrantLock + nb_written::Int + + function WindowWriter(w::WindowIO, target::Integer) + return new(w, target, Vector{UInt8}(undef,1024^2), ReentrantLock(), 0) + end +end + +@inline Base.isopen(w::WindowIO)::Bool = w.is_open +@inline Base.isopen(s::WindowWriter)::Bool = s.winio.is_open + +function Base.eof(w::WindowIO) + if !isopen(w) + return true + else + wait_nb_available(w) + end + return !isopen(w) +end + +Base.iswritable(::WindowIO) = false +Base.isreadable(::WindowIO) = true +Base.iswritable(::WindowWriter) = true +Base.isreadable(::WindowWriter) = false + +function Base.close(w::WindowIO) + w.is_open = false + notify(w.read_requested) + fetch(w.waiter) # Wait for the data notification loop to finish + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) + w.header.count = 0 + w.ptr = 0 + MPI.Win_unlock(w.myrank, w.header_win) + MPI.Barrier(w.comm) + MPI.Win_free(w.win) + MPI.Win_free(w.header_win) +end +Base.close(s::WindowWriter) = nothing + +# Checks if all available data is read, and if so resets the counter with the number of written bytes to 0 +function complete_read(w::WindowIO) + if w.header.count != 0 && w.header.count == w.ptr + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) + if w.header.count != 0 && w.header.count == w.ptr # Check again after locking + w.header.count = 0 + w.ptr = 0 + end + MPI.Win_unlock(w.myrank, w.header_win) + end +end + +function Base.read(w::WindowIO, ::Type{UInt8}) + if wait_nb_available(w) < 1 + throw(EOFError()) + end + + w.ptr += 1 + result = w.buffer[w.ptr] + complete_read(w) + return result +end + +function Base.readbytes!(w::WindowIO, b::AbstractVector{UInt8}, nb=length(b); all::Bool=true) + nb_obtained = nb_available(w) + if all + nb_obtained = wait_nb_available(w,nb) + if nb_obtained < nb + throw(EOFError()) + end + resize!(b, nb) + end + nb_read = min(nb_obtained, nb) + if nb_read == 0 + return 0 + end + copyto!(b, 1, w.buffer, w.ptr+1, nb_read) + w.ptr += nb_read + complete_read(w) + return nb_read +end + +Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(undef,nb_available(w))) + +@inline function Base.unsafe_read(w::WindowIO, p::Ptr{UInt8}, nb::UInt) + nb_obtained = wait_nb_available(w,nb) + nb_read = min(nb_obtained, nb) + unsafe_copyto!(p, pointer(w.buffer, w.ptr+1), nb_read) + w.ptr += nb_read + complete_read(w) + if nb_read != nb + throw(EOFError()) + end + return +end + +function Base.read(w::WindowIO, nb::Integer; all::Bool=true) + buf = Vector{UInt8}(undef,nb) + readbytes!(w, buf, nb, all=all) + return buf +end + +function ensureroom(w::WindowWriter) + if w.nb_written > length(w.write_buffer) + resize!(w.write_buffer, w.nb_written) + end +end + +function Base.write(w::WindowWriter, b::UInt8) + w.nb_written += 1 + ensureroom(w) + w.write_buffer[w.nb_written] = b + return sizeof(UInt8) +end +function Base.unsafe_write(w::WindowWriter, p::Ptr{UInt8}, nb::UInt) + offset = w.nb_written+1 + w.nb_written += nb + ensureroom(w) + copyto!(w.write_buffer, offset, unsafe_wrap(Array{UInt8}, p, nb), 1, nb) + return nb +end + +Base.flush(::WindowIO) = error("WindowIO is read-only, did you mean to flush an associated WindowWriter?") + +function Base.flush(s::WindowWriter) + if !isopen(s) + throw(EOFError()) + end + nb_to_write = s.nb_written + free = 0 + header = s.winio.remote_header + header_win = s.winio.header_win + while free < nb_to_write + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, s.target, 0, header_win) + MPI.Get(Ptr{UInt8}(pointer_from_objref(header)), sizeof(BufferHeader), s.target, 0, header_win) + MPI.Win_flush(s.target, header_win) + free = header.length - header.count + if free >= nb_to_write + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, s.target, 0, s.winio.win) + MPI.Put(pointer(s.write_buffer), nb_to_write, s.target, header.address + header.count, s.winio.win) + MPI.Win_unlock(s.target, s.winio.win) + MPI.Put(Ref{WinCountT}(header.count + nb_to_write), s.target, header_win) + s.nb_written = 0 + else + # Request to grow buffer, if not done already + new_needed_length = max(header.needed_length, header.count + nb_to_write) + if (new_needed_length > header.needed_length) + header.needed_length = new_needed_length + MPI.Put(Ptr{UInt8}(pointer_from_objref(header)), sizeof(BufferHeader), s.target, 0, header_win) + end + end + MPI.Win_unlock(s.target, header_win) + end +end diff --git a/test/test_reduce.jl b/test/test_reduce.jl index 46432d788..b90258da5 100644 --- a/test/test_reduce.jl +++ b/test/test_reduce.jl @@ -27,4 +27,35 @@ sum_mesg = MPI.Reduce(mesg, MPI.SUM, root, comm) sum_mesg = rank == root ? sum_mesg : size*mesg @test isapprox(norm(sum_mesg-size*mesg), 0.0) +# For comparison with the clustermanager version +const ARRSIZE = 1024^2*100 +@test ARRSIZE % size == 0 +const my_arr = fill(1*(rank+1),ARRSIZE ÷ size) + +function mpi_sum(arr)::Int + mysum = 0 + for x in arr + mysum += x + end + totalsum = MPI.Reduce(mysum, +, 0, comm) + return rank == 0 ? totalsum[1] : 0 +end + +const sumresult = mpi_sum(my_arr) +const expected = sum((ARRSIZE ÷ size) * (1:size)) +if rank == 0 + @test sumresult == expected +end +if rank == 0 + println("Timings for MPI reduce:") + @time expected == mpi_sum(my_arr) + @time expected == mpi_sum(my_arr) + @time expected == mpi_sum(my_arr) +else + mpi_sum(my_arr) + mpi_sum(my_arr) + mpi_sum(my_arr) +end + + MPI.Finalize() diff --git a/test/test_windowcman.jl b/test/test_windowcman.jl new file mode 100644 index 000000000..14136f2e7 --- /dev/null +++ b/test/test_windowcman.jl @@ -0,0 +1,155 @@ +using Distributed + +const using_mpi = (nprocs() == 1) + +if using_mpi + using MPI + using Test + transport_mode = MPI.MPI_WINDOW_IO # This test can run with MPPI_TRANSPORT_ALL and MPI_WINDOW_IO + mgr = MPI.start_main_loop(transport_mode) + @everywhere const comm = MPI.COMM_WORLD + rank = MPI.Comm_rank(comm) + size = MPI.Comm_size(comm) +else + @everywhere using Test +end + +@everywhere const N = nprocs() + +if using_mpi + @test N == size +end + +@test procs() == collect(1:N) +if N > 1 + @test workers() == collect(2:N) +end + +results = Vector{Any}(undef,N) +for i in 1:N + results[i] = remotecall(myid, i) +end + +for i in 1:N + @test fetch(results[i]) == i +end + +@everywhere function test_nprocs() + @test N == nprocs() +end +for i in 1:N + remotecall_wait(test_nprocs, i) +end + +results = zeros(Int, N) +for i in 1:N + results[i] = fetch(@spawnat i myid() + 1) +end +for i in 1:N + @test results[i] == i+1 +end + +# Check if DistributedArrays is available +has_arrays = true +try + @everywhere using DistributedArrays +catch e + global has_arrays = false +end + +if has_arrays + @everywhere begin + const ARRSIZE = 1024^2*100 + @test ARRSIZE % N == 0 + + function myrange() + my_nb_indices = ARRSIZE ÷ N + range_start = ((myid()-1)*my_nb_indices+1) + return range_start:range_start+my_nb_indices-1 + end + end + + const d_arr = dfill(1,(ARRSIZE,), procs(), (N,)) + for i in 1:N + @test fetch(@spawnat i localindexes(d_arr))[1] == remotecall_fetch(myrange, i) + end + + println("Normal array sum timings:") + const arr = fill(1,ARRSIZE) + @test sum(arr) == ARRSIZE + @time sum(arr) + @time sum(arr) + @time sum(arr) + + println("DArray sum timings:") + @test sum(d_arr) == ARRSIZE + @time sum(d_arr) + @time sum(d_arr) + @time sum(d_arr) + + @everywhere const my_arr = fill(1*myid(),ARRSIZE ÷ N) + + @everywhere function do_sum(result) + put!(result, sum(my_arr)) + return + end + + function collect_sum() + result = sum(my_arr) + result_chan = RemoteChannel(()->Channel{Int}(N-1)) + for p in workers() + remote_do(do_sum, p, result_chan) + end + nb_collected = 0 + while nb_collected != N-1 + result += take!(result_chan) + nb_collected += 1 + end + return result + end + + println("Split array sum timings:") + @test collect_sum() == sum((ARRSIZE ÷ N) * (1:N)) + @time collect_sum() + @time collect_sum() + @time collect_sum() + + if using_mpi + @everywhere function mpi_sum() + global my_arr + my_sum = [0] + for x in my_arr + my_sum[1] += x + end + return MPI.Reduce(my_sum, MPI.SUM, 0, comm) + end + + function do_mpi_sum() + for w in workers() + @spawnat w mpi_sum() + end + return mpi_sum()[1] + end + + println("MPI_Reduce sum timings:") + @test do_mpi_sum() == sum((ARRSIZE ÷ N) * (1:N)) + @time do_mpi_sum() + @time do_mpi_sum() + @time do_mpi_sum() + end +end + +const the_channel = RemoteChannel() +for p in workers() + @spawnat p put!(the_channel, myid()) +end + +chan_results = [] +for i in 2:N + push!(chan_results, take!(the_channel)) +end +@test sort(chan_results) == workers() + +if using_mpi + MPI.stop_main_loop(mgr) +end \ No newline at end of file diff --git a/test/test_windowcman_nowait.jl b/test/test_windowcman_nowait.jl new file mode 100644 index 000000000..ea19039fa --- /dev/null +++ b/test/test_windowcman_nowait.jl @@ -0,0 +1,65 @@ +using Distributed +using MPI +using Test + +mgr = MPI.start_main_loop(MPI_WINDOW_NOWAIT) + +comm = mgr.comm +rank = MPI.Comm_rank(comm) +size = MPI.Comm_size(comm) +@test rank == myid()-1 +@test size == nprocs() + +const ARRSIZE = 1024^2*100 +@test ARRSIZE % size == 0 +const my_arr = fill(1*(rank+1),ARRSIZE ÷ size) + +function mpi_sum(arr)::Int + mysum = 0 + for x in arr + mysum += x + end + totalsum = MPI.Reduce(mysum, +, 0, comm) + return rank == 0 ? totalsum[1] : 0 +end + +const sumresult = mpi_sum(my_arr) +const expected = sum((ARRSIZE ÷ size) * (1:size)) +if rank == 0 + @test sumresult == expected +end +if rank == 0 + println("Timings for MPI_WINDOW_NOWAIT reduce:") + for i in 1:5 + @time mpi_sum(my_arr) + end +else + for i in 1:5 + mpi_sum(my_arr) + end +end + +# Do some Julia cluster stuff +@cluster begin + @everywhere println("test") + + for w in workers() + @test @fetchfrom w myid() == w + end +end + +sleep(1) + +# Verify that the reduce still works +if rank == 0 + println("Timing after cluster communication:") + for i in 1:5 + @time mpi_sum(my_arr) + end +else + for i in 1:5 + mpi_sum(my_arr) + end +end + +MPI.stop_main_loop(mgr) diff --git a/test/test_windowio.jl b/test/test_windowio.jl new file mode 100644 index 000000000..0f6bb82ea --- /dev/null +++ b/test/test_windowio.jl @@ -0,0 +1,176 @@ +using Test +using MPI + +MPI.Init() + +comm = MPI.COMM_WORLD + +const rank = MPI.Comm_rank(comm) +const N = MPI.Comm_size(comm) + +if N < 2 + error("This test needs at least 2 processes") + exit(1) +end + +const winio = WindowIO(comm) +const writer = WindowWriter(winio, 0) + +# directly test the different read functions +if rank == 1 + write(writer, UInt8(5)) + flush(writer) # Must flush to trigger communication +elseif rank == 0 + @test MPI.wait_nb_available(winio) == 1 + @test read(winio, UInt8) == UInt8(5) + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + @test MPI.wait_nb_available(winio) == 5 + arr = Vector{UInt8}(undef,5) + readbytes!(winio, arr) + @test String(arr) == "hello" + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + @test MPI.wait_nb_available(winio) == 5 + arr = Vector{UInt8}(undef,5) + unsafe_read(winio, pointer(arr), 2) + @test nb_available(winio) == 3 + unsafe_read(winio, pointer(arr,3), 3) + @test String(arr) == "hello" + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + arr = Vector{UInt8}(undef,3) + readbytes!(winio, arr) # waits for data + @test nb_available(winio) == 2 + @test String(copy(arr)) == "hel" + fill!(arr, UInt8('a')) + readbytes!(winio, arr, all=false) # reads what's available + @test String(arr) == "loa" + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + result = "hello" + for i in 1:5 + @test read(winio, Char) == result[i] + @test nb_available(winio) == 5 - i + end +end + +MPI.Barrier(comm) + +if rank != 0 + write(writer, 1, 2.0, Cint(3)) + flush(writer) # Must flush to trigger communication +else + expected_nb_recv = sizeof(1) + sizeof(2.0) + sizeof(Cint(3)) + result = read(winio, (N-1)*expected_nb_recv) + @test nb_available(winio) == 0 + @test length(result) == (N-1)*expected_nb_recv +end + +MPI.Barrier(comm) + +# Test blocking read +if rank != 0 + write(writer, rank) + flush(writer) # Must flush to trigger communication +else + result = read(winio, (N-1)*sizeof(Int)) # Blocks until all required data is read + @test sort(reinterpret(Int,result)) == collect(1:N-1) + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +# Timing +if rank == 1 + const arr = ones(UInt8,100000) + println("write and flush timings:") + @time write(writer, arr) + @time flush(writer) + @time write(writer, arr) + @time flush(writer) + @time write(writer, arr) + @time flush(writer) + MPI.Barrier(comm) +elseif rank == 0 + const recarr = Vector{UInt8}(undef,100000) + MPI.Barrier(comm) + + println("read timings:") + @time read!(winio, recarr) + @time read!(winio, recarr) + @time read!(winio, recarr) + @test recarr == ones(UInt8,100000) +else + MPI.Barrier(comm) +end + +MPI.Barrier(comm) + +# Test readline +message = "Hi from rank " +if rank != 0 + println(writer, message*string(rank)) + flush(writer) # Must flush to trigger communication +else + global nb_received = 0 + while nb_received != N-1 + line = readline(winio) + @test startswith(line, message) + @test line[length(message)+1:end] ∈ string.(1:N) + global nb_received += 1 + end +end + +MPI.Barrier(comm) +close(winio) +MPI.Barrier(comm) + +# Send more than the buffer can contain +BS = 24 +const winio2 = WindowIO(comm, BS+sizeof(MPI.WinCountT)) +const writer2 = WindowWriter(winio2, 0) +if rank != 0 + write(writer2, rank*ones(BS)) + flush(writer2) # Must flush to trigger communication +else + result = Vector{Float64}(undef,BS*(N-1)) + read!(winio2, result) + header = winio2.header + @test nb_available(winio2) == 0 + @test header.count == 0 + @test header.length == header.needed_length + @test header.length > BS+sizeof(MPI.WinCountT) + @test sum(result) == sum((1:N-1)*BS) +end + +close(winio2) +MPI.Finalize()